状態空間モデル(State Space Model)は、時系列データを「直接観測できない内部状態」と「観測された出力」の関係として定式化する、非常に柔軟で強力なフレームワークです。トレンド、季節性、回帰成分などを統一的に扱うことができ、カルマンフィルタを用いた効率的な推定が可能です。
ARIMA モデルが「差分を取って定常にする」というアプローチを取るのに対し、状態空間モデルは「背後にある構造(トレンド・季節性など)を陽にモデル化する」というアプローチを取ります。実際、ARIMA モデルは状態空間モデルの特殊ケースとして表現できることが知られています。
本記事では、状態空間モデルの一般形から、カルマンフィルタ・カルマンスムーザの導出、各種時系列モデルの状態空間表現、EM アルゴリズムによるパラメータ推定まで、数式を省略せずに解説し、Python で実装します。
本記事の内容
- 状態空間モデルの一般形(状態方程式・観測方程式)
- カルマンフィルタの予測ステップ・更新ステップの完全な導出
- カルマンスムーザの導出
- ローカルレベルモデル(ランダムウォーク+ノイズ)
- ローカルトレンドモデル
- 季節性成分のモデル化
- ARIMA との関係(ARIMA は状態空間モデルの特殊ケース)
- EM アルゴリズムによるパラメータ推定
- Python でのカルマンフィルタの実装とトレンド+季節性の分解
前提知識
この記事を読む前に、以下の概念を理解しておくと理解が深まります。
- 多変量正規分布の基本的性質
- 条件付き分布
- 行列演算(逆行列・行列の積)
状態空間モデルの一般形
定式化
線形ガウス状態空間モデルは、以下の2つの方程式で定義されます。
状態方程式(state equation / transition equation):
$$ \bm{x}_t = \bm{F}_t \bm{x}_{t-1} + \bm{B}_t \bm{u}_t + \bm{w}_t, \quad \bm{w}_t \sim \mathcal{N}(\bm{0}, \bm{Q}_t) $$
観測方程式(observation equation / measurement equation):
$$ \bm{y}_t = \bm{H}_t \bm{x}_t + \bm{v}_t, \quad \bm{v}_t \sim \mathcal{N}(\bm{0}, \bm{R}_t) $$
各変数の意味は以下の通りです。
| 記号 | 次元 | 意味 |
|---|---|---|
| $\bm{x}_t$ | $n \times 1$ | 時刻 $t$ の状態ベクトル(直接観測できない) |
| $\bm{y}_t$ | $m \times 1$ | 時刻 $t$ の観測ベクトル(観測データ) |
| $\bm{F}_t$ | $n \times n$ | 状態遷移行列 |
| $\bm{B}_t$ | $n \times r$ | 制御入力行列 |
| $\bm{u}_t$ | $r \times 1$ | 制御入力(既知) |
| $\bm{H}_t$ | $m \times n$ | 観測行列 |
| $\bm{w}_t$ | $n \times 1$ | 過程ノイズ(状態の不確実性) |
| $\bm{v}_t$ | $m \times 1$ | 観測ノイズ |
| $\bm{Q}_t$ | $n \times n$ | 過程ノイズの共分散行列 |
| $\bm{R}_t$ | $m \times m$ | 観測ノイズの共分散行列 |
さらに、$\bm{w}_t$ と $\bm{v}_s$ は全ての $t$, $s$ に対して互いに独立と仮定します。初期状態は $\bm{x}_0 \sim \mathcal{N}(\bm{\mu}_0, \bm{P}_0)$ とします。
直感的な理解
状態空間モデルの基本的なアイデアは以下の通りです。
- 私たちが直接観測できるのは $\bm{y}_t$ のみ
- 背後には隠れた「真の状態」$\bm{x}_t$ が存在する
- 状態は時間とともに $\bm{F}_t$ に従って遷移し、過程ノイズ $\bm{w}_t$ で揺らぐ
- 観測は状態の(ノイズを含む)不完全な射影 $\bm{H}_t \bm{x}_t + \bm{v}_t$
カルマンフィルタ
目的
カルマンフィルタの目的は、時刻 $1, 2, \dots, t$ までの観測 $\bm{y}_{1:t} = \{\bm{y}_1, \bm{y}_2, \dots, \bm{y}_t\}$ が与えられたもとで、状態 $\bm{x}_t$ の事後分布を逐次的に求めることです。
線形ガウスモデルでは、全ての条件付き分布がガウス分布となるため、平均と共分散のみで完全に特徴づけられます。
$$ \bm{x}_t | \bm{y}_{1:t} \sim \mathcal{N}(\hat{\bm{x}}_{t|t}, \bm{P}_{t|t}) $$
表記法
- $\hat{\bm{x}}_{t|s} = \mathbb{E}[\bm{x}_t | \bm{y}_{1:s}]$: 時刻 $s$ までの観測に基づく時刻 $t$ の状態の推定値
- $\bm{P}_{t|s} = \text{Cov}[\bm{x}_t | \bm{y}_{1:s}]$: その推定誤差の共分散
予測ステップ(Predict)
時刻 $t-1$ までの観測に基づくフィルタ分布 $\bm{x}_{t-1} | \bm{y}_{1:t-1} \sim \mathcal{N}(\hat{\bm{x}}_{t-1|t-1}, \bm{P}_{t-1|t-1})$ が既知であるとします。
状態方程式 $\bm{x}_t = \bm{F}_t \bm{x}_{t-1} + \bm{B}_t \bm{u}_t + \bm{w}_t$ を用いて、時刻 $t$ の事前分布を計算します。
事前状態推定値:
$$ \begin{align} \hat{\bm{x}}_{t|t-1} &= \mathbb{E}[\bm{x}_t | \bm{y}_{1:t-1}] \\ &= \mathbb{E}[\bm{F}_t \bm{x}_{t-1} + \bm{B}_t \bm{u}_t + \bm{w}_t | \bm{y}_{1:t-1}] \\ &= \bm{F}_t \mathbb{E}[\bm{x}_{t-1} | \bm{y}_{1:t-1}] + \bm{B}_t \bm{u}_t + \mathbb{E}[\bm{w}_t] \\ &= \bm{F}_t \hat{\bm{x}}_{t-1|t-1} + \bm{B}_t \bm{u}_t \end{align} $$
最後の等号で $\mathbb{E}[\bm{w}_t] = \bm{0}$ を用いました。
事前誤差共分散:
推定誤差 $\tilde{\bm{x}}_{t|t-1} = \bm{x}_t – \hat{\bm{x}}_{t|t-1}$ を計算します。
$$ \begin{align} \tilde{\bm{x}}_{t|t-1} &= (\bm{F}_t \bm{x}_{t-1} + \bm{B}_t \bm{u}_t + \bm{w}_t) – (\bm{F}_t \hat{\bm{x}}_{t-1|t-1} + \bm{B}_t \bm{u}_t) \\ &= \bm{F}_t (\bm{x}_{t-1} – \hat{\bm{x}}_{t-1|t-1}) + \bm{w}_t \\ &= \bm{F}_t \tilde{\bm{x}}_{t-1|t-1} + \bm{w}_t \end{align} $$
したがって
$$ \begin{align} \bm{P}_{t|t-1} &= \mathbb{E}[\tilde{\bm{x}}_{t|t-1}\tilde{\bm{x}}_{t|t-1}^T] \\ &= \mathbb{E}[(\bm{F}_t \tilde{\bm{x}}_{t-1|t-1} + \bm{w}_t)(\bm{F}_t \tilde{\bm{x}}_{t-1|t-1} + \bm{w}_t)^T] \\ &= \bm{F}_t \mathbb{E}[\tilde{\bm{x}}_{t-1|t-1}\tilde{\bm{x}}_{t-1|t-1}^T]\bm{F}_t^T + \mathbb{E}[\bm{w}_t \bm{w}_t^T] \\ &= \bm{F}_t \bm{P}_{t-1|t-1} \bm{F}_t^T + \bm{Q}_t \end{align} $$
3行目で、$\tilde{\bm{x}}_{t-1|t-1}$(過去の情報に基づく推定誤差)と $\bm{w}_t$(時刻 $t$ の新しいノイズ)が独立であることから交差項が消えることを用いました。
更新ステップ(Update)
時刻 $t$ の観測 $\bm{y}_t$ を取り込んで、事前分布を事後分布に更新します。
イノベーション(観測残差):
$$ \bm{e}_t = \bm{y}_t – \hat{\bm{y}}_{t|t-1} = \bm{y}_t – \bm{H}_t \hat{\bm{x}}_{t|t-1} $$
これは実際の観測と予測された観測の差であり、「新しい情報」を表します。
イノベーションの共分散:
$$ \begin{align} \bm{S}_t &= \text{Cov}[\bm{e}_t] = \text{Cov}[\bm{H}_t \tilde{\bm{x}}_{t|t-1} + \bm{v}_t] \\ &= \bm{H}_t \bm{P}_{t|t-1} \bm{H}_t^T + \bm{R}_t \end{align} $$
カルマンゲイン:
$$ \bm{K}_t = \bm{P}_{t|t-1} \bm{H}_t^T \bm{S}_t^{-1} $$
カルマンゲインの導出を示します。$\bm{x}_t | \bm{y}_{1:t-1}$ と $\bm{y}_t | \bm{y}_{1:t-1}$ の同時分布は
$$ \begin{pmatrix} \bm{x}_t \\ \bm{y}_t \end{pmatrix} \bigg| \bm{y}_{1:t-1} \sim \mathcal{N}\left(\begin{pmatrix} \hat{\bm{x}}_{t|t-1} \\ \bm{H}_t \hat{\bm{x}}_{t|t-1} \end{pmatrix}, \begin{pmatrix} \bm{P}_{t|t-1} & \bm{P}_{t|t-1}\bm{H}_t^T \\ \bm{H}_t\bm{P}_{t|t-1} & \bm{S}_t \end{pmatrix}\right) $$
多変量正規分布の条件付き分布の公式を適用します。$\begin{pmatrix}\bm{a} \\ \bm{b}\end{pmatrix} \sim \mathcal{N}\left(\begin{pmatrix}\bm{\mu}_a \\ \bm{\mu}_b\end{pmatrix}, \begin{pmatrix}\bm{\Sigma}_{aa} & \bm{\Sigma}_{ab} \\ \bm{\Sigma}_{ba} & \bm{\Sigma}_{bb}\end{pmatrix}\right)$ のとき
$$ \bm{a} | \bm{b} \sim \mathcal{N}(\bm{\mu}_a + \bm{\Sigma}_{ab}\bm{\Sigma}_{bb}^{-1}(\bm{b} – \bm{\mu}_b), \; \bm{\Sigma}_{aa} – \bm{\Sigma}_{ab}\bm{\Sigma}_{bb}^{-1}\bm{\Sigma}_{ba}) $$
これを適用すると
事後状態推定値:
$$ \begin{align} \hat{\bm{x}}_{t|t} &= \hat{\bm{x}}_{t|t-1} + \bm{P}_{t|t-1}\bm{H}_t^T \bm{S}_t^{-1}(\bm{y}_t – \bm{H}_t\hat{\bm{x}}_{t|t-1}) \\ &= \hat{\bm{x}}_{t|t-1} + \bm{K}_t \bm{e}_t \end{align} $$
事後誤差共分散:
$$ \begin{align} \bm{P}_{t|t} &= \bm{P}_{t|t-1} – \bm{P}_{t|t-1}\bm{H}_t^T\bm{S}_t^{-1}\bm{H}_t\bm{P}_{t|t-1} \\ &= (\bm{I} – \bm{K}_t\bm{H}_t)\bm{P}_{t|t-1} \end{align} $$
カルマンフィルタのアルゴリズムまとめ
初期化: $\hat{\bm{x}}_{0|0} = \bm{\mu}_0$, $\bm{P}_{0|0} = \bm{P}_0$
$t = 1, 2, \dots, T$ について以下を繰り返す:
-
予測: – $\hat{\bm{x}}_{t|t-1} = \bm{F}_t \hat{\bm{x}}_{t-1|t-1} + \bm{B}_t \bm{u}_t$ – $\bm{P}_{t|t-1} = \bm{F}_t \bm{P}_{t-1|t-1} \bm{F}_t^T + \bm{Q}_t$
-
更新: – $\bm{e}_t = \bm{y}_t – \bm{H}_t \hat{\bm{x}}_{t|t-1}$ – $\bm{S}_t = \bm{H}_t \bm{P}_{t|t-1} \bm{H}_t^T + \bm{R}_t$ – $\bm{K}_t = \bm{P}_{t|t-1} \bm{H}_t^T \bm{S}_t^{-1}$ – $\hat{\bm{x}}_{t|t} = \hat{\bm{x}}_{t|t-1} + \bm{K}_t \bm{e}_t$ – $\bm{P}_{t|t} = (\bm{I} – \bm{K}_t \bm{H}_t) \bm{P}_{t|t-1}$
カルマンスムーザ
カルマンフィルタは「時刻 $t$ までの情報を用いた推定」(フィルタリング)ですが、全時点の観測 $\bm{y}_{1:T}$ を用いて各時点の状態を推定する方が精度が高くなります。これをスムージングと呼びます。
$$ \bm{x}_t | \bm{y}_{1:T} \sim \mathcal{N}(\hat{\bm{x}}_{t|T}, \bm{P}_{t|T}) $$
Rauch-Tung-Striebel(RTS)スムーザの導出
RTS スムーザは、カルマンフィルタの結果を用いて $t = T, T-1, \dots, 1$ の逆順で計算します。
スムーザゲイン:
$$ \bm{L}_t = \bm{P}_{t|t} \bm{F}_{t+1}^T \bm{P}_{t+1|t}^{-1} $$
この導出を示します。$\bm{x}_t | \bm{y}_{1:T}$ を求めるために、$\bm{x}_t | \bm{y}_{1:t}$ と $\bm{x}_{t+1} | \bm{y}_{1:T}$ の関係を利用します。
マルコフ性から $\bm{x}_t | \bm{x}_{t+1}, \bm{y}_{1:t}$ の分布は $\bm{x}_t | \bm{x}_{t+1}, \bm{y}_{1:t}$ で与えられます。$(\bm{x}_t, \bm{x}_{t+1}) | \bm{y}_{1:t}$ の同時分布を考えると
$$ \begin{pmatrix} \bm{x}_t \\ \bm{x}_{t+1} \end{pmatrix} \bigg| \bm{y}_{1:t} \sim \mathcal{N}\left(\begin{pmatrix} \hat{\bm{x}}_{t|t} \\ \bm{F}_{t+1}\hat{\bm{x}}_{t|t} \end{pmatrix}, \begin{pmatrix} \bm{P}_{t|t} & \bm{P}_{t|t}\bm{F}_{t+1}^T \\ \bm{F}_{t+1}\bm{P}_{t|t} & \bm{P}_{t+1|t} \end{pmatrix}\right) $$
条件付き分布の公式を適用して $\bm{x}_t | \bm{x}_{t+1}, \bm{y}_{1:t}$ を求めると
$$ \mathbb{E}[\bm{x}_t | \bm{x}_{t+1}, \bm{y}_{1:t}] = \hat{\bm{x}}_{t|t} + \bm{L}_t(\bm{x}_{t+1} – \hat{\bm{x}}_{t+1|t}) $$
$\bm{x}_{t+1}$ の期待値を $\bm{y}_{1:T}$ のもとで取ると
スムーズド状態推定値:
$$ \hat{\bm{x}}_{t|T} = \hat{\bm{x}}_{t|t} + \bm{L}_t(\hat{\bm{x}}_{t+1|T} – \hat{\bm{x}}_{t+1|t}) $$
スムーズド誤差共分散:
$$ \bm{P}_{t|T} = \bm{P}_{t|t} + \bm{L}_t(\bm{P}_{t+1|T} – \bm{P}_{t+1|t})\bm{L}_t^T $$
RTS スムーザのアルゴリズム
初期化: $\hat{\bm{x}}_{T|T}$, $\bm{P}_{T|T}$(カルマンフィルタの最終ステップの結果)
$t = T-1, T-2, \dots, 1$ について以下を繰り返す:
- $\bm{L}_t = \bm{P}_{t|t} \bm{F}_{t+1}^T \bm{P}_{t+1|t}^{-1}$
- $\hat{\bm{x}}_{t|T} = \hat{\bm{x}}_{t|t} + \bm{L}_t(\hat{\bm{x}}_{t+1|T} – \hat{\bm{x}}_{t+1|t})$
- $\bm{P}_{t|T} = \bm{P}_{t|t} + \bm{L}_t(\bm{P}_{t+1|T} – \bm{P}_{t+1|t})\bm{L}_t^T$
時系列モデルの状態空間表現
ローカルレベルモデル(ランダムウォーク+ノイズ)
最も単純な構造的時系列モデルは、ローカルレベルモデルです。
$$ \begin{align} \mu_t &= \mu_{t-1} + w_t, \quad w_t \sim \mathcal{N}(0, \sigma_w^2) \\ y_t &= \mu_t + v_t, \quad v_t \sim \mathcal{N}(0, \sigma_v^2) \end{align} $$
状態 $\mu_t$ はランダムウォークに従う「レベル(水準)」で、観測 $y_t$ はレベルにノイズが加わったものです。
状態空間形式では $x_t = \mu_t$, $F = 1$, $H = 1$, $Q = \sigma_w^2$, $R = \sigma_v^2$ です。
信号対雑音比 $q = \sigma_w^2 / \sigma_v^2$ がモデルの振る舞いを決定します。$q$ が大きいとレベルの変動が大きく観測値に追従し、$q$ が小さいとレベルの変動が小さく滑らかな推定となります。
ローカルトレンドモデル
ローカルトレンドモデルは、レベルに加えて「傾き(slope)」も時変にするモデルです。
$$ \begin{align} \mu_t &= \mu_{t-1} + \nu_{t-1} + w_t^{(\mu)}, \quad w_t^{(\mu)} \sim \mathcal{N}(0, \sigma_\mu^2) \\ \nu_t &= \nu_{t-1} + w_t^{(\nu)}, \quad w_t^{(\nu)} \sim \mathcal{N}(0, \sigma_\nu^2) \\ y_t &= \mu_t + v_t, \quad v_t \sim \mathcal{N}(0, \sigma_v^2) \end{align} $$
状態ベクトルは $\bm{x}_t = (\mu_t, \nu_t)^T$ で、状態空間形式は
$$ \bm{F} = \begin{pmatrix} 1 & 1 \\ 0 & 1 \end{pmatrix}, \quad \bm{H} = \begin{pmatrix} 1 & 0 \end{pmatrix}, \quad \bm{Q} = \begin{pmatrix} \sigma_\mu^2 & 0 \\ 0 & \sigma_\nu^2 \end{pmatrix}, \quad R = \sigma_v^2 $$
季節性成分のモデル化
周期 $s$ の季節性成分 $\gamma_t$ は、過去 $s$ 個の季節成分の和が0になるという制約で定義されます。
$$ \gamma_t + \gamma_{t-1} + \cdots + \gamma_{t-s+1} = w_t^{(\gamma)}, \quad w_t^{(\gamma)} \sim \mathcal{N}(0, \sigma_\gamma^2) $$
これを状態空間形式で書くと、状態ベクトルは $\bm{s}_t = (\gamma_t, \gamma_{t-1}, \dots, \gamma_{t-s+2})^T \in \mathbb{R}^{s-1}$ で
$$ \bm{F}_\gamma = \begin{pmatrix} -1 & -1 & -1 & \cdots & -1 \\ 1 & 0 & 0 & \cdots & 0 \\ 0 & 1 & 0 & \cdots & 0 \\ \vdots & & \ddots & & \vdots \\ 0 & 0 & \cdots & 1 & 0 \end{pmatrix} \in \mathbb{R}^{(s-1) \times (s-1)} $$
$$ \bm{H}_\gamma = \begin{pmatrix} 1 & 0 & \cdots & 0 \end{pmatrix} \in \mathbb{R}^{1 \times (s-1)} $$
トレンド+季節性の統合モデル
ローカルトレンドと季節性を組み合わせた構造的時系列モデルは
$$ y_t = \mu_t + \gamma_t + v_t $$
状態ベクトルを $\bm{x}_t = (\mu_t, \nu_t, \gamma_t, \gamma_{t-1}, \dots, \gamma_{t-s+2})^T$ と定義し、各成分に対応するブロック構造の状態空間モデルを構築します。
$$ \bm{F} = \begin{pmatrix} \bm{F}_{\text{trend}} & \bm{0} \\ \bm{0} & \bm{F}_\gamma \end{pmatrix}, \quad \bm{H} = \begin{pmatrix} 1 & 0 & 1 & 0 & \cdots & 0 \end{pmatrix} $$
ARIMA との関係
ARIMA モデルは状態空間モデルの特殊ケースとして表現できます。
AR(1) の状態空間表現
$x_t = \phi x_{t-1} + \varepsilon_t$, $y_t = x_t$ の場合
$$ F = \phi, \quad H = 1, \quad Q = \sigma_\varepsilon^2, \quad R = 0 $$
観測ノイズ $R = 0$ の場合、カルマンフィルタのゲインは $K_t = 1$ となり、フィルタ推定値が観測値と一致します。
ARIMA(1,1,1) の状態空間表現
差分系列 $\nabla y_t = y_t – y_{t-1}$ が ARMA(1,1) に従う場合、状態ベクトルを $\bm{x}_t = (\nabla y_t, \varepsilon_t)^T$ として
$$ \bm{F} = \begin{pmatrix} \phi & \theta \\ 0 & 0 \end{pmatrix}, \quad \bm{H} = \begin{pmatrix} 1 & 1 \end{pmatrix}, \quad \bm{Q} = \begin{pmatrix} 0 & 0 \\ 0 & \sigma^2 \end{pmatrix}, \quad R = 0 $$
一般に、任意の ARIMA(p,d,q) モデルは適切な状態空間表現に変換できます。この対応関係により、状態空間モデルのフレームワーク(カルマンフィルタ等)を ARIMA モデルの推定にも利用できます。
EM アルゴリズムによるパラメータ推定
状態空間モデルのパラメータ $\bm{\Psi} = \{\bm{F}, \bm{H}, \bm{Q}, \bm{R}\}$ が未知の場合、EM アルゴリズムで推定できます。
対数尤度
観測データ $\bm{y}_{1:T}$ の対数尤度は、イノベーション分解を用いて
$$ \log p(\bm{y}_{1:T} | \bm{\Psi}) = \sum_{t=1}^{T} \log p(\bm{y}_t | \bm{y}_{1:t-1}, \bm{\Psi}) $$
各項は
$$ \log p(\bm{y}_t | \bm{y}_{1:t-1}) = -\frac{m}{2}\log(2\pi) – \frac{1}{2}\log|\bm{S}_t| – \frac{1}{2}\bm{e}_t^T\bm{S}_t^{-1}\bm{e}_t $$
ここで $\bm{e}_t$ はイノベーション、$\bm{S}_t$ はイノベーション共分散です。
E ステップ
現在のパラメータ $\bm{\Psi}^{(k)}$ のもとで、カルマンフィルタとカルマンスムーザを実行し、以下の十分統計量を計算します。
$$ \begin{align} \hat{\bm{x}}_{t|T} &\quad \text{(スムーズド状態推定値)} \\ \bm{P}_{t|T} &\quad \text{(スムーズド誤差共分散)} \\ \bm{P}_{t, t-1|T} &= \bm{L}_{t-1}\bm{P}_{t|T} \quad \text{(隣接時点間のクロス共分散)} \end{align} $$
以下の期待値を計算します。
$$ \begin{align} \bm{A} &= \sum_{t=1}^{T} (\bm{P}_{t|T} + \hat{\bm{x}}_{t|T}\hat{\bm{x}}_{t|T}^T) \\ \bm{B} &= \sum_{t=1}^{T} (\bm{P}_{t,t-1|T} + \hat{\bm{x}}_{t|T}\hat{\bm{x}}_{t-1|T}^T) \\ \bm{C} &= \sum_{t=1}^{T} (\bm{P}_{t-1|T} + \hat{\bm{x}}_{t-1|T}\hat{\bm{x}}_{t-1|T}^T) \end{align} $$
M ステップ
十分統計量を用いてパラメータを更新します。
$$ \begin{align} \bm{F}^{(k+1)} &= \bm{B}\bm{C}^{-1} \\ \bm{Q}^{(k+1)} &= \frac{1}{T}(\bm{A} – \bm{B}\bm{C}^{-1}\bm{B}^T) \\ \bm{R}^{(k+1)} &= \frac{1}{T}\sum_{t=1}^{T}\left[(\bm{y}_t – \bm{H}\hat{\bm{x}}_{t|T})(\bm{y}_t – \bm{H}\hat{\bm{x}}_{t|T})^T + \bm{H}\bm{P}_{t|T}\bm{H}^T\right] \end{align} $$
E ステップと M ステップを交互に繰り返すことで、対数尤度は単調に増加し、局所最適解に収束します。
Python でのカルマンフィルタの実装
カルマンフィルタとスムーザのスクラッチ実装
import numpy as np
import matplotlib.pyplot as plt
class KalmanFilter:
"""線形ガウス状態空間モデルのカルマンフィルタ・スムーザ"""
def __init__(self, F, H, Q, R, x0, P0):
"""
F: 状態遷移行列 (n x n)
H: 観測行列 (m x n)
Q: 過程ノイズ共分散 (n x n)
R: 観測ノイズ共分散 (m x m)
x0: 初期状態 (n,)
P0: 初期誤差共分散 (n x n)
"""
self.F = np.atleast_2d(F).astype(float)
self.H = np.atleast_2d(H).astype(float)
self.Q = np.atleast_2d(Q).astype(float)
self.R = np.atleast_2d(R).astype(float)
self.x0 = np.atleast_1d(x0).astype(float)
self.P0 = np.atleast_2d(P0).astype(float)
self.n = self.F.shape[0] # 状態の次元
self.m = self.H.shape[0] # 観測の次元
def filter(self, y):
"""カルマンフィルタを実行する"""
T = len(y)
# 格納用配列
x_pred = np.zeros((T, self.n)) # 事前推定値
P_pred = np.zeros((T, self.n, self.n)) # 事前共分散
x_filt = np.zeros((T, self.n)) # 事後推定値
P_filt = np.zeros((T, self.n, self.n)) # 事後共分散
K_all = np.zeros((T, self.n, self.m)) # カルマンゲイン
innovations = np.zeros((T, self.m)) # イノベーション
S_all = np.zeros((T, self.m, self.m)) # イノベーション共分散
log_likelihood = 0.0
x_prev = self.x0
P_prev = self.P0
for t in range(T):
# --- 予測ステップ ---
x_pred[t] = self.F @ x_prev
P_pred[t] = self.F @ P_prev @ self.F.T + self.Q
# --- 更新ステップ ---
# イノベーション
y_t = np.atleast_1d(y[t])
e_t = y_t - self.H @ x_pred[t]
innovations[t] = e_t
# イノベーション共分散
S_t = self.H @ P_pred[t] @ self.H.T + self.R
S_all[t] = S_t
# カルマンゲイン
K_t = P_pred[t] @ self.H.T @ np.linalg.inv(S_t)
K_all[t] = K_t
# 事後推定
x_filt[t] = x_pred[t] + K_t @ e_t
P_filt[t] = (np.eye(self.n) - K_t @ self.H) @ P_pred[t]
# 対数尤度の加算
log_likelihood += -0.5 * (
self.m * np.log(2 * np.pi) +
np.log(np.linalg.det(S_t)) +
e_t @ np.linalg.inv(S_t) @ e_t
)
x_prev = x_filt[t]
P_prev = P_filt[t]
self.x_pred = x_pred
self.P_pred = P_pred
self.x_filt = x_filt
self.P_filt = P_filt
self.K_all = K_all
self.innovations = innovations
self.S_all = S_all
self.log_likelihood = log_likelihood
return x_filt, P_filt
def smooth(self):
"""RTS スムーザを実行する"""
T = len(self.x_filt)
x_smooth = np.zeros((T, self.n))
P_smooth = np.zeros((T, self.n, self.n))
# 最終時点はフィルタ結果と同じ
x_smooth[-1] = self.x_filt[-1]
P_smooth[-1] = self.P_filt[-1]
for t in range(T - 2, -1, -1):
# スムーザゲイン
L_t = self.P_filt[t] @ self.F.T @ np.linalg.inv(self.P_pred[t + 1])
# スムーズド推定
x_smooth[t] = (self.x_filt[t] +
L_t @ (x_smooth[t + 1] - self.x_pred[t + 1]))
P_smooth[t] = (self.P_filt[t] +
L_t @ (P_smooth[t + 1] - self.P_pred[t + 1]) @ L_t.T)
self.x_smooth = x_smooth
self.P_smooth = P_smooth
return x_smooth, P_smooth
# ローカルレベルモデルのデモ
np.random.seed(42)
T = 200
# 真のパラメータ
sigma_w = 0.5 # 過程ノイズ(レベルの変動)
sigma_v = 2.0 # 観測ノイズ
# データ生成
true_level = np.zeros(T)
y = np.zeros(T)
true_level[0] = 0.0
y[0] = true_level[0] + np.random.randn() * sigma_v
for t in range(1, T):
true_level[t] = true_level[t-1] + np.random.randn() * sigma_w
y[t] = true_level[t] + np.random.randn() * sigma_v
# カルマンフィルタの適用
kf = KalmanFilter(
F=np.array([[1.0]]),
H=np.array([[1.0]]),
Q=np.array([[sigma_w**2]]),
R=np.array([[sigma_v**2]]),
x0=np.array([0.0]),
P0=np.array([[1.0]])
)
x_filt, P_filt = kf.filter(y)
x_smooth, P_smooth = kf.smooth()
print(f"対数尤度: {kf.log_likelihood:.2f}")
フィルタリングとスムージングの結果を可視化
import numpy as np
import matplotlib.pyplot as plt
# (上記のコードで kf, y, true_level, x_filt, P_filt, x_smooth, P_smooth が定義済みとする)
# ここではデモ用にデータを再生成
np.random.seed(42)
T = 200
sigma_w = 0.5
sigma_v = 2.0
true_level = np.zeros(T)
y_data = np.zeros(T)
true_level[0] = 0.0
y_data[0] = true_level[0] + np.random.randn() * sigma_v
for t in range(1, T):
true_level[t] = true_level[t-1] + np.random.randn() * sigma_w
y_data[t] = true_level[t] + np.random.randn() * sigma_v
# カルマンフィルタ(簡易実装)
n = 1
F = np.array([[1.0]])
H = np.array([[1.0]])
Q = np.array([[sigma_w**2]])
R = np.array([[sigma_v**2]])
x_filt = np.zeros(T)
P_filt = np.zeros(T)
x_pred = np.zeros(T)
P_pred = np.zeros(T)
x_prev, P_prev = 0.0, 1.0
for t in range(T):
x_pred[t] = x_prev
P_pred[t] = P_prev + sigma_w**2
S = P_pred[t] + sigma_v**2
K = P_pred[t] / S
x_filt[t] = x_pred[t] + K * (y_data[t] - x_pred[t])
P_filt[t] = (1 - K) * P_pred[t]
x_prev = x_filt[t]
P_prev = P_filt[t]
# RTS スムーザ
x_smooth = np.zeros(T)
P_smooth = np.zeros(T)
x_smooth[-1] = x_filt[-1]
P_smooth[-1] = P_filt[-1]
for t in range(T-2, -1, -1):
L = P_filt[t] / P_pred[t+1]
x_smooth[t] = x_filt[t] + L * (x_smooth[t+1] - x_pred[t+1])
P_smooth[t] = P_filt[t] + L**2 * (P_smooth[t+1] - P_pred[t+1])
# 可視化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (1) フィルタリング結果
filt_std = np.sqrt(P_filt)
axes[0, 0].scatter(range(T), y_data, s=5, c='gray', alpha=0.5, label='Observations')
axes[0, 0].plot(true_level, 'g-', linewidth=1.5, label='True Level')
axes[0, 0].plot(x_filt, 'r-', linewidth=1.5, label='Filtered')
axes[0, 0].fill_between(range(T),
x_filt - 2*filt_std, x_filt + 2*filt_std,
color='red', alpha=0.1, label='95% CI')
axes[0, 0].set_title('Kalman Filter (Filtering)', fontsize=13)
axes[0, 0].legend(fontsize=9)
axes[0, 0].grid(True, alpha=0.3)
# (2) スムージング結果
smooth_std = np.sqrt(P_smooth)
axes[0, 1].scatter(range(T), y_data, s=5, c='gray', alpha=0.5, label='Observations')
axes[0, 1].plot(true_level, 'g-', linewidth=1.5, label='True Level')
axes[0, 1].plot(x_smooth, 'b-', linewidth=1.5, label='Smoothed')
axes[0, 1].fill_between(range(T),
x_smooth - 2*smooth_std, x_smooth + 2*smooth_std,
color='blue', alpha=0.1, label='95% CI')
axes[0, 1].set_title('Kalman Smoother (Smoothing)', fontsize=13)
axes[0, 1].legend(fontsize=9)
axes[0, 1].grid(True, alpha=0.3)
# (3) カルマンゲインの推移
K_values = P_pred / (P_pred + sigma_v**2)
axes[1, 0].plot(K_values, 'purple', linewidth=1.5)
axes[1, 0].set_title('Kalman Gain Over Time', fontsize=13)
axes[1, 0].set_xlabel('Time')
axes[1, 0].set_ylabel('Kalman Gain')
axes[1, 0].grid(True, alpha=0.3)
# (4) フィルタ vs スムーザの不確実性の比較
axes[1, 1].plot(filt_std, 'r-', linewidth=1.5, label='Filter std')
axes[1, 1].plot(smooth_std, 'b-', linewidth=1.5, label='Smoother std')
axes[1, 1].set_title('Estimation Uncertainty: Filter vs Smoother', fontsize=13)
axes[1, 1].set_xlabel('Time')
axes[1, 1].set_ylabel('Standard Deviation')
axes[1, 1].legend(fontsize=11)
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
トレンド+季節性の分解
import numpy as np
import matplotlib.pyplot as plt
class StructuralTimeSeriesModel:
"""トレンド+季節性の構造的時系列モデル"""
def __init__(self, seasonal_period=12,
sigma_level=0.5, sigma_slope=0.01,
sigma_seasonal=0.1, sigma_obs=1.0):
self.s = seasonal_period
self.sigma_level = sigma_level
self.sigma_slope = sigma_slope
self.sigma_seasonal = sigma_seasonal
self.sigma_obs = sigma_obs
# 状態次元: レベル(1) + 傾き(1) + 季節(s-1)
self.n = 2 + (seasonal_period - 1)
# 状態遷移行列の構築
self.F = np.zeros((self.n, self.n))
# トレンド部分
self.F[0, 0] = 1.0 # μ_t = μ_{t-1} + ν_{t-1} + w
self.F[0, 1] = 1.0
self.F[1, 1] = 1.0 # ν_t = ν_{t-1} + w
# 季節部分
idx = 2
self.F[idx, idx:idx+self.s-1] = -1.0 # -γ_{t-1} - ... - γ_{t-s+1}
for i in range(1, self.s - 1):
self.F[idx + i, idx + i - 1] = 1.0 # シフト
# 観測行列
self.H = np.zeros((1, self.n))
self.H[0, 0] = 1.0 # レベル
self.H[0, 2] = 1.0 # 季節
# 過程ノイズ共分散
self.Q = np.zeros((self.n, self.n))
self.Q[0, 0] = sigma_level**2
self.Q[1, 1] = sigma_slope**2
self.Q[2, 2] = sigma_seasonal**2
# 観測ノイズ共分散
self.R = np.array([[sigma_obs**2]])
def fit(self, y):
"""カルマンフィルタ + スムーザで時系列を分解"""
T = len(y)
# 初期化
x = np.zeros(self.n)
P = np.eye(self.n) * 10.0 # 不確実性が大きい初期状態
# 格納用
x_filt = np.zeros((T, self.n))
P_filt = np.zeros((T, self.n, self.n))
x_pred = np.zeros((T, self.n))
P_pred = np.zeros((T, self.n, self.n))
# フィルタリング
for t in range(T):
# 予測
x_p = self.F @ x
P_p = self.F @ P @ self.F.T + self.Q
x_pred[t] = x_p
P_pred[t] = P_p
# 更新
e = y[t] - self.H @ x_p
S = self.H @ P_p @ self.H.T + self.R
K = P_p @ self.H.T @ np.linalg.inv(S)
x = x_p + (K @ e).flatten()
P = (np.eye(self.n) - K @ self.H) @ P_p
x_filt[t] = x
P_filt[t] = P
# スムージング(RTS)
x_smooth = np.zeros((T, self.n))
P_smooth = np.zeros((T, self.n, self.n))
x_smooth[-1] = x_filt[-1]
P_smooth[-1] = P_filt[-1]
for t in range(T-2, -1, -1):
L = P_filt[t] @ self.F.T @ np.linalg.inv(P_pred[t+1])
x_smooth[t] = x_filt[t] + L @ (x_smooth[t+1] - x_pred[t+1])
P_smooth[t] = P_filt[t] + L @ (P_smooth[t+1] - P_pred[t+1]) @ L.T
self.x_smooth = x_smooth
self.trend = x_smooth[:, 0] # レベル成分
self.slope = x_smooth[:, 1] # 傾き成分
self.seasonal = x_smooth[:, 2] # 季節成分
return self.trend, self.slope, self.seasonal
# データ生成(トレンド+季節性+ノイズ)
np.random.seed(42)
T = 144 # 12年分の月次データ
t = np.arange(T)
# 真のトレンド(緩やかに上昇)
true_trend = 100 + 0.5 * t + 0.005 * t**2
# 真の季節性(12ヶ月周期)
true_seasonal = 20 * np.sin(2 * np.pi * t / 12) + 10 * np.cos(4 * np.pi * t / 12)
# 観測ノイズ
noise = np.random.randn(T) * 5
# 観測データ
y = true_trend + true_seasonal + noise
# モデルの適用
model = StructuralTimeSeriesModel(
seasonal_period=12,
sigma_level=1.0,
sigma_slope=0.05,
sigma_seasonal=0.5,
sigma_obs=5.0
)
est_trend, est_slope, est_seasonal = model.fit(y)
# 可視化
fig, axes = plt.subplots(4, 1, figsize=(14, 14))
# (1) 観測データとトレンド
axes[0].plot(y, 'gray', alpha=0.6, linewidth=1, label='Observations')
axes[0].plot(true_trend, 'g--', linewidth=2, label='True Trend')
axes[0].plot(est_trend, 'r-', linewidth=2, label='Estimated Trend')
axes[0].set_title('Observed Data and Trend', fontsize=13)
axes[0].legend(fontsize=10)
axes[0].grid(True, alpha=0.3)
# (2) 季節成分
axes[1].plot(true_seasonal, 'g--', linewidth=2, label='True Seasonal')
axes[1].plot(est_seasonal, 'b-', linewidth=2, label='Estimated Seasonal')
axes[1].set_title('Seasonal Component', fontsize=13)
axes[1].legend(fontsize=10)
axes[1].grid(True, alpha=0.3)
# (3) 傾き成分
true_slope_approx = np.gradient(true_trend)
axes[2].plot(true_slope_approx, 'g--', linewidth=2, label='True Slope (approx)')
axes[2].plot(est_slope, 'purple', linewidth=2, label='Estimated Slope')
axes[2].set_title('Slope Component', fontsize=13)
axes[2].legend(fontsize=10)
axes[2].grid(True, alpha=0.3)
# (4) 残差
residuals = y - est_trend - est_seasonal
axes[3].plot(residuals, 'k-', linewidth=0.8)
axes[3].axhline(y=0, color='r', linestyle='--')
axes[3].set_title('Residuals', fontsize=13)
axes[3].set_xlabel('Time', fontsize=12)
axes[3].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"残差の標準偏差: {np.std(residuals):.2f}")
print(f"残差の平均: {np.mean(residuals):.2f}")
信号対雑音比の影響の可視化
import numpy as np
import matplotlib.pyplot as plt
def local_level_filter(y, sigma_w, sigma_v):
"""ローカルレベルモデルのカルマンフィルタ"""
T = len(y)
x_filt = np.zeros(T)
x_prev = 0.0
P_prev = 1.0
for t in range(T):
# 予測
x_pred = x_prev
P_pred = P_prev + sigma_w**2
# 更新
S = P_pred + sigma_v**2
K = P_pred / S
x_filt[t] = x_pred + K * (y[t] - x_pred)
P_prev = (1 - K) * P_pred
x_prev = x_filt[t]
return x_filt
# テストデータ
np.random.seed(42)
T = 200
true_level = np.cumsum(np.random.randn(T) * 0.5)
y = true_level + np.random.randn(T) * 2.0
# 異なる信号対雑音比での比較
snr_configs = [
(0.1, 2.0, 'Low SNR (q=0.0025)'),
(0.5, 2.0, 'Medium SNR (q=0.0625)'),
(2.0, 2.0, 'High SNR (q=1.0)'),
(5.0, 2.0, 'Very High SNR (q=6.25)'),
]
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes = axes.flatten()
for idx, (sw, sv, title) in enumerate(snr_configs):
x_est = local_level_filter(y, sw, sv)
q = sw**2 / sv**2
axes[idx].scatter(range(T), y, s=3, c='gray', alpha=0.4, label='Observations')
axes[idx].plot(true_level, 'g-', linewidth=1.5, label='True Level')
axes[idx].plot(x_est, 'r-', linewidth=1.5, label='Estimated')
axes[idx].set_title(f'{title}', fontsize=12)
axes[idx].legend(fontsize=9)
axes[idx].grid(True, alpha=0.3)
plt.suptitle('Effect of Signal-to-Noise Ratio on Kalman Filter', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
この図は、信号対雑音比 $q = \sigma_w^2 / \sigma_v^2$ がカルマンフィルタの推定結果に与える影響を示しています。$q$ が小さいとフィルタは滑らかな推定を行い(観測ノイズを多く除去)、$q$ が大きいと観測値に強く追従します(レベルの変動を重視)。
まとめ
本記事では、状態空間モデルの理論と時系列分析への応用について解説しました。
- 状態空間モデル は状態方程式と観測方程式の2つで時系列を記述する柔軟なフレームワークである
- カルマンフィルタ は予測→更新の再帰的な計算により、多変量正規分布の条件付き分布の公式から厳密に導出される
- カルマンスムーザ(RTS) は全観測データを用いることでフィルタよりも精度の高い状態推定が可能
- ローカルレベルモデル、ローカルトレンドモデル、季節性モデル など、様々な時系列構造を統一的に扱える
- ARIMA は状態空間モデルの特殊ケース であり、状態空間フレームワークの方がより一般的
- EM アルゴリズム によりモデルパラメータの自動推定が可能
次のステップとして、以下の記事も参考にしてください。