時系列データにおける異常検知は、製造業の故障予知、金融の不正取引検出、サーバー監視、医療データの異常発見など、実社会の多くの場面で重要な役割を果たしています。しかし、時系列データは時間的な相関を持つため、静的なデータに対する異常検知手法をそのまま適用することはできません。
本記事では、時系列異常の分類から始まり、統計的手法、機械学習手法、ベイズ変化点検出まで主要な手法を体系的に解説します。各手法の数学的な定式化を丁寧に導出し、Pythonで合成データに対して実装・性能比較を行います。
本記事の内容
- 時系列異常の分類(点異常・文脈異常・集合異常)
- 統計的手法(移動平均+σルール、CUSUM)
- STL分解ベースの異常検知
- Isolation Forestの時系列適用
- LSTM-AEによる異常検知
- ベイズオンライン変化点検出(BOCPD)
- Python実装と各手法のROC-AUC比較
前提知識
この記事を読む前に、以下の記事を読んでおくと理解が深まります。
時系列異常の分類
時系列データにおける異常は、大きく3つのタイプに分類されます。
点異常(Point Anomaly)
個々の観測値が、他の観測値と比較して著しく逸脱しているケースです。例えば、通常は $[20, 30]$ の範囲で推移するセンサー値が、ある時刻に突然 $100$ を記録した場合などが該当します。
数学的には、時刻 $t$ の観測値 $x_t$ が以下を満たすとき点異常と判定できます。
$$ |x_t – \mu_t| > k \sigma_t $$
ここで $\mu_t$ と $\sigma_t$ は局所的な平均と標準偏差、$k$ は閾値パラメータです。
文脈異常(Contextual Anomaly)
値そのものは正常範囲内であっても、その文脈(前後の時間的コンテキスト)において異常であるケースです。例えば、夏の気温が $30^\circ\mathrm{C}$ であれば正常ですが、冬に $30^\circ\mathrm{C}$ であれば異常です。
文脈異常の検出には、条件付き確率を考える必要があります。
$$ p(x_t | x_{t-w}, \dots, x_{t-1}) < \epsilon $$
過去のウィンドウを条件として、現在の値の条件付き確率が閾値 $\epsilon$ を下回る場合に異常と判定します。
集合異常(Collective Anomaly)
個々の観測値は正常であっても、特定の期間にわたる観測値のパターン全体が異常であるケースです。例えば、心電図データにおいて個々の値は正常範囲内でも、波形パターンが通常と異なる場合などが該当します。
集合異常は部分系列 $\bm{x}_{t:t+w} = (x_t, x_{t+1}, \dots, x_{t+w-1})$ の分布が正常パターンから逸脱しているかどうかで判定します。
統計的手法
移動平均 + σルール
最もシンプルな異常検知手法です。移動平均と移動標準偏差を計算し、そこから外れた点を異常とします。
ウィンドウ幅 $w$ の移動平均と移動標準偏差は以下で計算されます。
$$ \bar{x}_t = \frac{1}{w}\sum_{i=0}^{w-1} x_{t-i} $$
$$ s_t = \sqrt{\frac{1}{w-1}\sum_{i=0}^{w-1}(x_{t-i} – \bar{x}_t)^2} $$
異常判定は以下のルールで行います。
$$ \text{anomaly}_t = \begin{cases} 1 & \text{if } |x_t – \bar{x}_{t-1}| > k \cdot s_{t-1} \\ 0 & \text{otherwise} \end{cases} $$
ここで $k$ は通常 2〜3 に設定します($k=3$ で正規分布の約99.7%をカバー)。$\bar{x}_{t-1}$ と $s_{t-1}$ を使うのは、現在の値 $x_t$ が影響しないようにするためです。
CUSUM(累積和制御図)
CUSUM(Cumulative Sum)は、プロセスの平均が目標値からシフトした場合を効率的に検出する手法です。ウォルド(Wald)の逐次確率比検定に基づいています。
上方CUSUMと下方CUSUMの統計量をそれぞれ以下のように定義します。
$$ C_t^+ = \max(0, C_{t-1}^+ + x_t – \mu_0 – k) $$
$$ C_t^- = \max(0, C_{t-1}^- – x_t + \mu_0 – k) $$
ここで、
- $\mu_0$: プロセスの正常時の平均値(目標値)
- $k$: 参照値(reference value)。検出したいシフト量 $\delta$ に対して通常 $k = \delta / 2$ と設定
- $C_0^+ = C_0^- = 0$: 初期値
異常判定は以下で行います。
$$ \text{anomaly}_t = \begin{cases} 1 & \text{if } C_t^+ > h \text{ or } C_t^- > h \\ 0 & \text{otherwise} \end{cases} $$
閾値 $h$ はARL(Average Run Length、誤検知までの平均期間)を所望の値に設定することで決定します。
CUSUMの統計的背景
CUSUMの動作原理を理解するために、尤度比の観点から見てみましょう。観測値 $x_t$ が正規分布 $\mathcal{N}(\mu, \sigma^2)$ に従うとき、平均が $\mu_0$(正常)から $\mu_1 = \mu_0 + \delta$(異常)にシフトしたかどうかの対数尤度比は以下になります。
$$ \begin{align} \log \frac{f_1(x_t)}{f_0(x_t)} &= \log \frac{\frac{1}{\sqrt{2\pi}\sigma}\exp\left(-\frac{(x_t – \mu_1)^2}{2\sigma^2}\right)}{\frac{1}{\sqrt{2\pi}\sigma}\exp\left(-\frac{(x_t – \mu_0)^2}{2\sigma^2}\right)} \\ &= \frac{(x_t – \mu_0)^2 – (x_t – \mu_1)^2}{2\sigma^2} \\ &= \frac{2x_t(\mu_1 – \mu_0) – (\mu_1^2 – \mu_0^2)}{2\sigma^2} \\ &= \frac{\delta}{\sigma^2}\left(x_t – \mu_0 – \frac{\delta}{2}\right) \end{align} $$
$\sigma = 1$ と正規化すると、これは $\delta(x_t – \mu_0 – \delta/2)$ に比例します。$k = \delta/2$ と置けば、CUSUM統計量の更新式の括弧内 $(x_t – \mu_0 – k)$ が対数尤度比に対応することがわかります。
つまり、CUSUMは対数尤度比の累積和を追跡しており、正常時には0付近に留まり、シフトが発生すると累積的に増加して閾値 $h$ を超えるという仕組みです。
STL分解ベースの異常検知
STL(Seasonal and Trend decomposition using Loess)は、時系列をトレンド、季節性、残差の3成分に分解する手法です。
$$ x_t = T_t + S_t + R_t $$
ここで $T_t$ はトレンド成分、$S_t$ は季節成分、$R_t$ は残差成分です。
異常検知には残差成分 $R_t$ を利用します。正常データでは残差はほぼ正規分布に従うため、以下の判定基準を使います。
$$ \text{anomaly}_t = \begin{cases} 1 & \text{if } |R_t| > k \cdot \text{IQR}(R) \\ 0 & \text{otherwise} \end{cases} $$
ここで $\text{IQR}(R) = Q_3(R) – Q_1(R)$ は残差の四分位範囲です。$k = 1.5$ は一般的な外れ値基準、$k = 3.0$ は極端な外れ値基準に対応します。
STL分解の利点は、トレンドや季節変動を適切に除去した上で異常を検出できる点です。季節性のある時系列(気温、電力消費など)に特に有効です。
Isolation Forestの時系列適用
Isolation Forestは、データ点を孤立させるために必要な分割回数に基づいて異常を検出する手法です。異常点は正常点よりも少ない分割で孤立できるという直感に基づいています。
基本原理
$n$ 個のデータ点に対して、ランダムに特徴量と分割値を選んで二分木を構築します。データ点 $\bm{x}$ の異常スコアは、木の根からその点に到達するまでのパス長 $h(\bm{x})$ で定義されます。
異常スコアは以下で正規化されます。
$$ s(\bm{x}, n) = 2^{-\frac{\mathbb{E}[h(\bm{x})]}{c(n)}} $$
ここで $c(n)$ は $n$ 個のデータに対する二分探索木の平均パス長です。
$$ c(n) = 2 H(n-1) – \frac{2(n-1)}{n} $$
$H(k) = \ln k + \gamma$($\gamma$ はオイラー定数 $\approx 0.5772$)は調和級数です。
$s \to 1$ は異常(パス長が短い)、$s \to 0.5$ は正常(平均的なパス長)を示します。
時系列への適用: 特徴量エンジニアリング
Isolation Forestは本来、静的なデータに対する手法なので、時系列データに適用するには特徴量エンジニアリングが必要です。各時刻 $t$ に対して、ウィンドウベースの特徴量を構成します。
$$ \bm{f}_t = \left(\bar{x}_t, s_t, x_t – \bar{x}_t, \Delta x_t, |\Delta x_t|, \max_{w} x_t, \min_{w} x_t \right) $$
ここで、
- $\bar{x}_t$: ウィンドウ内の移動平均
- $s_t$: ウィンドウ内の移動標準偏差
- $x_t – \bar{x}_t$: 移動平均からの偏差
- $\Delta x_t = x_t – x_{t-1}$: 差分(変化量)
- $|\Delta x_t|$: 変化量の絶対値
- $\max_{w} x_t, \min_{w} x_t$: ウィンドウ内の最大値・最小値
LSTM-AEによる異常検知
LSTM-AE(LSTM Autoencoder)は、正常データで学習したオートエンコーダの再構成誤差を異常スコアとする手法です。
アーキテクチャ
エンコーダはLSTMで入力系列を固定長の潜在ベクトルに圧縮し、デコーダはLSTMで潜在ベクトルから入力系列を再構成します。
エンコーダ:
$$ \bm{h}_t^{\text{enc}}, \bm{c}_t^{\text{enc}} = \text{LSTM}_{\text{enc}}(\bm{x}_t, \bm{h}_{t-1}^{\text{enc}}, \bm{c}_{t-1}^{\text{enc}}) $$
最後の隠れ状態 $\bm{h}_T^{\text{enc}}$ が潜在表現となります。
デコーダ:
$$ \bm{h}_t^{\text{dec}}, \bm{c}_t^{\text{dec}} = \text{LSTM}_{\text{dec}}(\hat{\bm{x}}_{t-1}, \bm{h}_{t-1}^{\text{dec}}, \bm{c}_{t-1}^{\text{dec}}) $$
$$ \hat{\bm{x}}_t = \bm{W}_{\text{out}} \bm{h}_t^{\text{dec}} + \bm{b}_{\text{out}} $$
デコーダの初期隠れ状態は $\bm{h}_0^{\text{dec}} = \bm{h}_T^{\text{enc}}$ とします。
再構成誤差による異常スコア
学習は正常データのみで行い、入力と再構成の二乗誤差を最小化します。
$$ \mathcal{L} = \frac{1}{T}\sum_{t=1}^{T} \|\bm{x}_t – \hat{\bm{x}}_t\|^2 $$
推論時の異常スコアは、ウィンドウ $\bm{x}_{t:t+w}$ の再構成誤差として定義します。
$$ \text{score}_t = \frac{1}{w}\sum_{i=0}^{w-1} (x_{t+i} – \hat{x}_{t+i})^2 $$
正常データでは再構成誤差が小さく、異常データでは学習したパターンから外れるため再構成誤差が大きくなります。閾値は正常データの再構成誤差の分布から設定します。
$$ \text{threshold} = \mu_{\text{train}} + k \cdot \sigma_{\text{train}} $$
ベイズオンライン変化点検出(BOCPD)
BOCPD(Bayesian Online Changepoint Detection)は、時系列データの生成過程が変化する「変化点」をオンラインで検出するベイズ的手法です。Adams & MacKay(2007)によって提案されました。
ラン長の定義
ラン長(run length)$r_t$ を、最後の変化点からの経過時間として定義します。$r_t = 0$ は時刻 $t$ で変化点が発生したことを意味し、$r_t = r_{t-1} + 1$ は変化点が発生していないことを意味します。
ラン長事後分布の逐次更新
観測データ $x_{1:t} = (x_1, x_2, \dots, x_t)$ が与えられたときのラン長の事後分布 $p(r_t | x_{1:t})$ を逐次的に更新します。
まず、同時分布 $p(r_t, x_{1:t})$ を考えます。ベイズの定理より、
$$ p(r_t | x_{1:t}) = \frac{p(r_t, x_{1:t})}{p(x_{1:t})} $$
$p(x_{1:t}) = \sum_{r_t} p(r_t, x_{1:t})$ は正規化定数です。
同時分布の更新則を導出します。$r_t$ の取りうる値は $0$ または $r_{t-1} + 1$ の2つです。$r_{t-1}$ から $r_t$ への遷移を周辺化します。
$$ p(r_t, x_{1:t}) = \sum_{r_{t-1}} p(r_t, r_{t-1}, x_{1:t}) $$
同時分布を条件付き確率で分解します。
$$ \begin{align} p(r_t, r_{t-1}, x_{1:t}) &= p(x_t | r_t, r_{t-1}, x_{1:t-1}) \cdot p(r_t | r_{t-1}, x_{1:t-1}) \cdot p(r_{t-1}, x_{1:t-1}) \end{align} $$
ここで以下の2つの仮定を置きます。
仮定1: 新しい観測 $x_t$ はラン長 $r_t$ と、現在のランに含まれる観測値のみに依存する。
$$ p(x_t | r_t, r_{t-1}, x_{1:t-1}) = p(x_t | r_t, x_{t-r_t:t-1}) \equiv \pi_t^{(r_t)} $$
これを予測確率(predictive probability)と呼びます。
仮定2: 変化点の発生確率は過去のデータに依存しない(ハザード関数)。
$$ p(r_t | r_{t-1}, x_{1:t-1}) = p(r_t | r_{t-1}) = \begin{cases} H(r_{t-1} + 1) & \text{if } r_t = 0 \\ 1 – H(r_{t-1} + 1) & \text{if } r_t = r_{t-1} + 1 \end{cases} $$
ここで $H(\tau)$ はハザード関数で、ランが長さ $\tau$ まで続いたときに変化点が発生する確率です。定数ハザード $H(\tau) = 1/\lambda$ とすると、ラン長の事前分布が幾何分布(平均 $\lambda$)になります。
これらの仮定のもとで、更新則は以下の2つのステップに分かれます。
成長確率(Growth Probability): 変化点が発生しない場合、$r_t = r_{t-1} + 1$ です。
$$ p(r_t = r_{t-1} + 1, x_{1:t}) = p(r_{t-1}, x_{1:t-1}) \cdot \pi_t^{(r_{t-1}+1)} \cdot (1 – H(r_{t-1} + 1)) $$
変化点確率(Changepoint Probability): 変化点が発生する場合、$r_t = 0$ です。
$$ p(r_t = 0, x_{1:t}) = \sum_{r_{t-1}} p(r_{t-1}, x_{1:t-1}) \cdot \pi_t^{(0)} \cdot H(r_{t-1} + 1) $$
$\pi_t^{(0)}$ は新しいランの最初の観測に対する予測確率で、事前分布のみに基づきます。
最後に正規化します。
$$ p(r_t | x_{1:t}) = \frac{p(r_t, x_{1:t})}{\sum_{r_t} p(r_t, x_{1:t})} $$
予測確率の計算
データが正規分布に従うと仮定し、正規-逆ガンマ事前分布を使うと、予測確率は解析的にStudent-t分布になります。ラン長 $r$ の間の十分統計量(データ数 $n$、合計 $S$、二乗合計 $Q$)から、
$$ \bar{x}_r = \frac{S}{n}, \quad s_r^2 = \frac{Q – S^2/n}{n – 1} $$
予測確率は自由度 $n – 1$ のStudent-t分布で与えられます。
$$ \pi_t^{(r)} = t_{n-1}\left(x_t \bigg| \bar{x}_r, s_r^2\left(1 + \frac{1}{n}\right)\right) $$
Pythonでの実装
ここからは、各手法をPythonで実装し、合成データ上で性能を比較します。
合成データの生成
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score, roc_curve
from scipy import stats
np.random.seed(42)
# 合成データの生成: 正常データ + 点異常 + 文脈異常
T = 1000
t = np.arange(T)
# 正常データ: 正弦波 + ノイズ
normal_data = np.sin(2 * np.pi * t / 50) + 0.3 * np.random.randn(T)
# 異常ラベル(0: 正常, 1: 異常)
labels = np.zeros(T, dtype=int)
# 点異常を挿入(ランダムな位置にスパイクを追加)
anomaly_data = normal_data.copy()
point_anomaly_idx = [150, 300, 450, 600, 750, 850]
for idx in point_anomaly_idx:
anomaly_data[idx] += np.random.choice([-1, 1]) * (4 + np.random.rand())
labels[idx] = 1
# 文脈異常を挿入(急激なレベルシフト)
anomaly_data[500:520] += 2.5
labels[500:520] = 1
# データの可視化
fig, axes = plt.subplots(2, 1, figsize=(14, 6))
axes[0].plot(t, anomaly_data, linewidth=0.7)
axes[0].scatter(t[labels == 1], anomaly_data[labels == 1],
c='red', s=20, zorder=5, label='Anomaly')
axes[0].set_title('Synthetic Time Series with Anomalies', fontsize=14)
axes[0].set_xlabel('Time', fontsize=12)
axes[0].set_ylabel('Value', fontsize=12)
axes[0].legend(fontsize=10)
axes[0].grid(True, alpha=0.3)
axes[1].plot(t, labels, 'r-', linewidth=1.5)
axes[1].set_title('Ground Truth Anomaly Labels', fontsize=14)
axes[1].set_xlabel('Time', fontsize=12)
axes[1].set_ylabel('Label', fontsize=12)
axes[1].set_ylim(-0.1, 1.1)
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
手法1: 移動平均 + σルール
def moving_average_detector(data, window=50, k=3.0):
"""移動平均 + σルールによる異常検知"""
n = len(data)
scores = np.zeros(n)
for i in range(window, n):
window_data = data[i - window:i]
mu = np.mean(window_data)
sigma = np.std(window_data, ddof=1)
if sigma > 0:
scores[i] = np.abs(data[i] - mu) / sigma
else:
scores[i] = 0.0
return scores
# 移動平均法の実行
ma_scores = moving_average_detector(anomaly_data, window=50, k=3.0)
手法2: CUSUM
def cusum_detector(data, mu_0=None, k=0.5, h=4.0):
"""CUSUM(累積和制御図)による異常検知"""
n = len(data)
if mu_0 is None:
mu_0 = np.mean(data[:100]) # 最初の100点から目標値を推定
C_plus = np.zeros(n)
C_minus = np.zeros(n)
scores = np.zeros(n)
for i in range(1, n):
# 上方CUSUM: 平均の上昇を検出
C_plus[i] = max(0, C_plus[i-1] + data[i] - mu_0 - k)
# 下方CUSUM: 平均の低下を検出
C_minus[i] = max(0, C_minus[i-1] - data[i] + mu_0 - k)
# 異常スコア: 上方と下方の最大値
scores[i] = max(C_plus[i], C_minus[i])
return scores
# CUSUM法の実行
cusum_scores = cusum_detector(anomaly_data, k=0.5, h=4.0)
手法3: STL分解ベース
from statsmodels.tsa.seasonal import STL
def stl_detector(data, period=50):
"""STL分解ベースの異常検知"""
stl = STL(data, period=period, robust=True)
result = stl.fit()
residuals = result.resid
# 残差のIQRに基づくスコア
q1, q3 = np.percentile(residuals, [25, 75])
iqr = q3 - q1
if iqr > 0:
scores = np.abs(residuals) / iqr
else:
scores = np.abs(residuals)
return scores
# STL分解法の実行
stl_scores = stl_detector(anomaly_data, period=50)
手法4: Isolation Forest
from sklearn.ensemble import IsolationForest
def isolation_forest_detector(data, window=20, contamination=0.05):
"""Isolation Forestの時系列適用(特徴量エンジニアリング)"""
n = len(data)
features = []
valid_idx = []
for i in range(window, n):
w = data[i - window:i]
feat = [
np.mean(w), # 移動平均
np.std(w, ddof=1), # 移動標準偏差
data[i] - np.mean(w), # 偏差
data[i] - data[i-1], # 差分
np.abs(data[i] - data[i-1]), # 差分の絶対値
np.max(w), # ウィンドウ最大値
np.min(w), # ウィンドウ最小値
]
features.append(feat)
valid_idx.append(i)
features = np.array(features)
# Isolation Forestの学習と予測
clf = IsolationForest(contamination=contamination, random_state=42, n_estimators=200)
clf.fit(features)
# スコアの計算(-1 * decision_function で異常ほど大きい値)
raw_scores = -clf.decision_function(features)
# 全時刻のスコアに変換
scores = np.zeros(n)
for idx, s in zip(valid_idx, raw_scores):
scores[idx] = s
return scores
# Isolation Forestの実行
if_scores = isolation_forest_detector(anomaly_data, window=20)
手法5: LSTM-AE(簡易版: NumPy実装)
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
class LSTMAutoencoder(nn.Module):
"""LSTM Autoencoder"""
def __init__(self, input_dim=1, hidden_dim=32, latent_dim=16):
super().__init__()
# エンコーダ
self.encoder = nn.LSTM(input_dim, hidden_dim, batch_first=True)
self.enc_to_latent = nn.Linear(hidden_dim, latent_dim)
# デコーダ
self.latent_to_dec = nn.Linear(latent_dim, hidden_dim)
self.decoder = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
self.output_layer = nn.Linear(hidden_dim, input_dim)
def forward(self, x):
# x: (batch, seq_len, input_dim)
batch_size, seq_len, _ = x.shape
# エンコード
_, (h_enc, _) = self.encoder(x)
latent = self.enc_to_latent(h_enc.squeeze(0)) # (batch, latent_dim)
# デコード
dec_input = self.latent_to_dec(latent).unsqueeze(1).repeat(1, seq_len, 1)
dec_output, _ = self.decoder(dec_input)
recon = self.output_layer(dec_output) # (batch, seq_len, input_dim)
return recon
def lstm_ae_detector(data, window=30, epochs=50, lr=1e-3):
"""LSTM-AEによる異常検知"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# ウィンドウで分割
n = len(data)
windows = []
for i in range(n - window + 1):
windows.append(data[i:i + window])
windows = np.array(windows)
# 正規化
mean_val = np.mean(data[:int(n * 0.7)])
std_val = np.std(data[:int(n * 0.7)])
windows_norm = (windows - mean_val) / (std_val + 1e-8)
# テンソル変換
X = torch.tensor(windows_norm, dtype=torch.float32).unsqueeze(-1)
# 訓練データ(最初の70%を正常とみなす)
train_size = int(len(X) * 0.7)
X_train = X[:train_size]
train_loader = DataLoader(TensorDataset(X_train), batch_size=32, shuffle=True)
# モデルの学習
model = LSTMAutoencoder(input_dim=1, hidden_dim=32, latent_dim=16).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()
model.train()
for epoch in range(epochs):
for (batch,) in train_loader:
batch = batch.to(device)
optimizer.zero_grad()
recon = model(batch)
loss = criterion(recon, batch)
loss.backward()
optimizer.step()
# 全データの再構成誤差を計算
model.eval()
with torch.no_grad():
X_all = X.to(device)
recon_all = model(X_all)
errors = torch.mean((X_all - recon_all) ** 2, dim=(1, 2)).cpu().numpy()
# 各時刻のスコアに変換(ウィンドウの最後の時刻に割り当て)
scores = np.zeros(n)
for i in range(len(errors)):
t_idx = i + window - 1
scores[t_idx] = max(scores[t_idx], errors[i])
return scores
# LSTM-AEの実行
lstm_scores = lstm_ae_detector(anomaly_data, window=30, epochs=50)
手法6: BOCPD(簡易版)
def bocpd_detector(data, hazard_lambda=200, mu_prior=0.0, sigma_prior=1.0):
"""ベイズオンライン変化点検出(BOCPD)"""
n = len(data)
# ラン長確率行列 (最大ラン長 x 時刻)
max_run = n
R = np.zeros((max_run + 1, n + 1))
R[0, 0] = 1.0 # 初期条件: r_0 = 0
# 十分統計量
mu_params = np.zeros(max_run + 1) # 各ラン長ごとの平均
var_params = np.ones(max_run + 1) # 各ラン長ごとの分散
count = np.zeros(max_run + 1) # データ数
sum_x = np.zeros(max_run + 1) # 合計
sum_x2 = np.zeros(max_run + 1) # 二乗合計
# ハザード関数(定数: 1/lambda)
H = 1.0 / hazard_lambda
changepoint_prob = np.zeros(n)
for t in range(n):
x = data[t]
# 予測確率の計算
predprobs = np.zeros(t + 1)
for r in range(t + 1):
if count[r] < 2:
# データが少ない場合は事前分布から予測
predprobs[r] = stats.norm.pdf(x, mu_prior, sigma_prior)
else:
mu_r = sum_x[r] / count[r]
var_r = (sum_x2[r] - sum_x[r]**2 / count[r]) / (count[r] - 1)
var_r = max(var_r, 1e-6)
# Student-t近似(正規近似で代替)
pred_var = var_r * (1 + 1.0 / count[r])
predprobs[r] = stats.norm.pdf(x, mu_r, np.sqrt(pred_var))
# 成長確率
growth = R[:t+1, t] * predprobs * (1 - H)
# 変化点確率
cp = np.sum(R[:t+1, t] * predprobs * H)
# 更新
R[1:t+2, t+1] = growth
R[0, t+1] = cp
# 正規化
evidence = np.sum(R[:t+2, t+1])
if evidence > 0:
R[:t+2, t+1] /= evidence
# 変化点確率(r=0の確率)
changepoint_prob[t] = R[0, t+1]
# 十分統計量の更新
new_count = count[:t+1] + 1
new_sum = sum_x[:t+1] + x
new_sum2 = sum_x2[:t+1] + x**2
count[1:t+2] = new_count
sum_x[1:t+2] = new_sum
sum_x2[1:t+2] = new_sum2
# r=0(新しいラン)をリセット
count[0] = 0
sum_x[0] = 0
sum_x2[0] = 0
return changepoint_prob
# BOCPDの実行
bocpd_scores = bocpd_detector(anomaly_data, hazard_lambda=200)
ROC-AUC比較
# 有効範囲(全手法が値を持つ範囲)
valid_start = 50 # 移動平均のウィンドウ幅の影響
# 各手法のスコアを有効範囲で切り出し
scores_dict = {
'Moving Avg + σ': ma_scores[valid_start:],
'CUSUM': cusum_scores[valid_start:],
'STL Decomposition': stl_scores[valid_start:],
'Isolation Forest': if_scores[valid_start:],
'LSTM-AE': lstm_scores[valid_start:],
'BOCPD': bocpd_scores[valid_start:],
}
labels_valid = labels[valid_start:]
# ROC-AUCの計算と表示
print("=" * 50)
print(f"{'Method':<25} {'ROC-AUC':>10}")
print("=" * 50)
auc_results = {}
for name, scores in scores_dict.items():
try:
auc = roc_auc_score(labels_valid, scores)
auc_results[name] = auc
print(f"{name:<25} {auc:>10.4f}")
except ValueError:
print(f"{name:<25} {'N/A':>10}")
print("=" * 50)
# ROC曲線の可視化
plt.figure(figsize=(10, 8))
colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b']
for (name, scores), color in zip(scores_dict.items(), colors):
try:
fpr, tpr, _ = roc_curve(labels_valid, scores)
auc = auc_results.get(name, 0)
plt.plot(fpr, tpr, color=color, linewidth=2,
label=f'{name} (AUC={auc:.3f})')
except ValueError:
pass
plt.plot([0, 1], [0, 1], 'k--', linewidth=1, alpha=0.5)
plt.xlabel('False Positive Rate', fontsize=12)
plt.ylabel('True Positive Rate', fontsize=12)
plt.title('ROC Curves: Time Series Anomaly Detection Methods', fontsize=14)
plt.legend(fontsize=10, loc='lower right')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
異常スコアの可視化
fig, axes = plt.subplots(7, 1, figsize=(14, 20))
# 元データ
axes[0].plot(t, anomaly_data, linewidth=0.7)
axes[0].scatter(t[labels == 1], anomaly_data[labels == 1],
c='red', s=15, zorder=5)
axes[0].set_title('Original Data with Anomalies', fontsize=12)
axes[0].set_ylabel('Value', fontsize=10)
axes[0].grid(True, alpha=0.3)
# 各手法のスコア
method_scores = [
('Moving Avg + σ Rule', ma_scores),
('CUSUM', cusum_scores),
('STL Decomposition', stl_scores),
('Isolation Forest', if_scores),
('LSTM-AE', lstm_scores),
('BOCPD', bocpd_scores),
]
for i, (name, scores) in enumerate(method_scores):
ax = axes[i + 1]
ax.plot(t, scores, linewidth=0.7, color=colors[i])
# 異常位置をハイライト
for idx in range(T):
if labels[idx] == 1:
ax.axvline(x=idx, color='red', alpha=0.1, linewidth=1)
ax.set_title(f'{name} Anomaly Score', fontsize=12)
ax.set_ylabel('Score', fontsize=10)
ax.grid(True, alpha=0.3)
axes[-1].set_xlabel('Time', fontsize=12)
plt.tight_layout()
plt.show()
まとめ
本記事では、時系列データにおける異常検知手法を体系的に解説し、Python実装によるROC-AUC比較を行いました。
- 時系列異常は点異常、文脈異常、集合異常の3タイプに分類される
- 移動平均+σルールはシンプルだが、定常的な時系列には有効。パラメータ $k$ と $w$ の調整が重要
- CUSUMは累積和に基づき、平均のシフトを高感度に検出する。対数尤度比の累積和と解釈できる
- STL分解はトレンド・季節性を除去した残差で異常を判定し、周期的な時系列に強い
- Isolation Forestは特徴量エンジニアリングにより時系列に適用でき、非線形パターンの異常に対応する
- LSTM-AEは正常パターンの再構成誤差で異常を検出し、複雑なパターンの異常に有効
- BOCPDはラン長の事後分布を逐次更新し、変化点をベイズ的に検出する
次のステップとして、以下の記事も参考にしてください。