時系列異常検知(TSAD: Time Series Anomaly Detection)は、センサーデータやシステムログ、金融データなど、時系列データの中から正常な挙動から逸脱した異常を検出する技術です。
多変量時系列(MTS: Multivariate Time Series)の異常検知は近年特に活発に研究されており、ICML, ICLR, KDD等のトップカンファレンスでも多数の手法が提案されています。
本記事では、時系列異常検知の代表的な手法を体系的にまとめます。
本記事の内容
- 時系列データの異常の分類
- 統計的手法による異常検知
- 機械学習ベースの手法
- 深層学習ベースの手法
時系列データの異常の分類
時系列データにおける異常は、大きく以下の3種類に分類されます。
| 異常の種類 | 説明 | 例 |
|---|---|---|
| Point Anomaly | 個々のデータ点が異常 | 急激なスパイク |
| Contextual Anomaly | 文脈上異常(値自体は正常範囲内) | 夏に暖房の稼働 |
| Collective Anomaly | データ点の集合が異常 | トレンドの急変 |
統計的手法
移動平均と標準偏差による検知
最もシンプルな手法は、移動平均からの乖離を閾値で判定する方法です。
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# 正常なセンサーデータの生成
n = 500
t = np.arange(n)
normal_data = 10 + 2 * np.sin(2 * np.pi * t / 100) + np.random.normal(0, 0.5, n)
# 異常の注入
anomaly_data = normal_data.copy()
anomaly_indices = [100, 200, 350, 420]
anomaly_data[100] += 8
anomaly_data[200] -= 7
anomaly_data[350] += 10
anomaly_data[420] -= 9
# 移動平均と移動標準偏差の計算
window = 20
rolling_mean = np.convolve(anomaly_data, np.ones(window)/window, mode='same')
rolling_std = np.array([anomaly_data[max(0,i-window):i+1].std() for i in range(n)])
# 異常検知: 移動平均から3シグマ以上乖離
threshold = 3
upper_bound = rolling_mean + threshold * rolling_std
lower_bound = rolling_mean - threshold * rolling_std
is_anomaly = (anomaly_data > upper_bound) | (anomaly_data < lower_bound)
# 可視化
fig, ax = plt.subplots(figsize=(14, 5))
ax.plot(t, anomaly_data, 'b-', linewidth=0.8, label='Data')
ax.plot(t, rolling_mean, 'g-', linewidth=1.5, label='Rolling Mean')
ax.fill_between(t, lower_bound, upper_bound, alpha=0.2, color='green', label='3-sigma band')
ax.scatter(t[is_anomaly], anomaly_data[is_anomaly], color='red', s=80, zorder=5, label='Detected Anomaly')
ax.set_title("Anomaly Detection: Moving Average + 3-Sigma")
ax.set_xlabel("Time")
ax.set_ylabel("Value")
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"検出された異常点: {t[is_anomaly].tolist()}")
指数加重移動平均(EWMA)
指数加重移動平均は、直近のデータに大きな重みを付ける手法で、変化に素早く反応できます。
$$ \hat{y}_t = \alpha y_t + (1 – \alpha) \hat{y}_{t-1} $$
ここで $\alpha \in (0, 1)$ は平滑化パラメータです。
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
n = 500
t = np.arange(n)
normal_data = 10 + 2 * np.sin(2 * np.pi * t / 100) + np.random.normal(0, 0.5, n)
anomaly_data = normal_data.copy()
anomaly_data[100] += 8
anomaly_data[200] -= 7
anomaly_data[350] += 10
# EWMAの計算
alpha = 0.1
ewma = np.zeros(n)
ewma[0] = anomaly_data[0]
for i in range(1, n):
ewma[i] = alpha * anomaly_data[i] + (1 - alpha) * ewma[i-1]
# 残差の標準偏差で閾値を設定
residuals = anomaly_data - ewma
residual_std = residuals.std()
is_anomaly = np.abs(residuals) > 3 * residual_std
plt.figure(figsize=(14, 5))
plt.plot(t, anomaly_data, 'b-', linewidth=0.8, label='Data')
plt.plot(t, ewma, 'orange', linewidth=2, label=f'EWMA (alpha={alpha})')
plt.scatter(t[is_anomaly], anomaly_data[is_anomaly], color='red', s=80, zorder=5, label='Anomaly')
plt.title("Anomaly Detection: EWMA")
plt.xlabel("Time")
plt.ylabel("Value")
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
機械学習ベースの手法
Isolation Forest
Isolation Forestは、データ点を孤立させるのに必要な分割回数に基づいて異常を検出します。異常なデータ点は少ない分割で孤立するため、低いスコアが割り当てられます。
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
np.random.seed(42)
# 多変量時系列データの生成
n = 500
t = np.arange(n)
x1 = np.sin(2 * np.pi * t / 50) + np.random.normal(0, 0.2, n)
x2 = np.cos(2 * np.pi * t / 50) + np.random.normal(0, 0.2, n)
# 異常の注入
x1[100] += 5
x2[100] += 5
x1[300] -= 4
x2[300] += 4
# 特徴量行列
X = np.column_stack([x1, x2])
# Isolation Forest
clf = IsolationForest(contamination=0.02, random_state=42)
predictions = clf.fit_predict(X)
scores = clf.decision_function(X)
is_anomaly = predictions == -1
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 時系列でのスコア表示
axes[0].plot(t, scores, 'b-', linewidth=0.8)
axes[0].scatter(t[is_anomaly], scores[is_anomaly], color='red', s=60, zorder=5, label='Anomaly')
axes[0].axhline(y=0, color='gray', linestyle='--', alpha=0.5)
axes[0].set_title("Isolation Forest: Anomaly Score")
axes[0].set_xlabel("Time")
axes[0].set_ylabel("Score")
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 2次元空間での表示
axes[1].scatter(x1[~is_anomaly], x2[~is_anomaly], c='steelblue', s=10, alpha=0.5, label='Normal')
axes[1].scatter(x1[is_anomaly], x2[is_anomaly], c='red', s=80, marker='x', label='Anomaly')
axes[1].set_title("Isolation Forest: 2D View")
axes[1].set_xlabel("x1")
axes[1].set_ylabel("x2")
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
深層学習ベースの手法の概要
近年のトップカンファレンスで提案されている深層学習ベースの手法を紹介します。
| 手法 | 年 | カテゴリ | 概要 |
|---|---|---|---|
| LSTM-NDT | 2018 | 予測ベース | LSTMで予測し、残差で異常検知 |
| OmniAnomaly | 2019 | 再構成ベース | VAE + GRU で時系列を再構成 |
| USAD | 2020 | 再構成ベース | 敵対的学習を用いたAutoencoder |
| GDN | 2021 | グラフベース | グラフニューラルネットワークでセンサー間の関係を学習 |
| Anomaly Transformer | 2022 | Attention | Association Discrepancy で異常検知 |
Autoencoderによる再構成ベースの異常検知(概念実装)
再構成ベースの手法は、正常データでAutoencoderを学習し、再構成誤差が大きいデータ点を異常と判定します。
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
# 正常データで簡易的な再構成誤差の概念を示す
n = 500
t = np.arange(n)
data = np.sin(2 * np.pi * t / 50) + np.random.normal(0, 0.3, n)
# 異常の注入
data_with_anomaly = data.copy()
anomaly_idx = [100, 250, 400]
data_with_anomaly[100] += 5
data_with_anomaly[250] -= 6
data_with_anomaly[400] += 7
# 簡易的な再構成(移動平均をモデルの出力と見立てる)
window = 10
reconstructed = np.convolve(data_with_anomaly, np.ones(window)/window, mode='same')
# 再構成誤差
recon_error = (data_with_anomaly - reconstructed) ** 2
# 閾値: 95パーセンタイル
threshold = np.percentile(recon_error, 95)
is_anomaly = recon_error > threshold
fig, axes = plt.subplots(2, 1, figsize=(14, 8))
axes[0].plot(t, data_with_anomaly, 'b-', linewidth=0.8, label='Input')
axes[0].plot(t, reconstructed, 'orange', linewidth=1.5, label='Reconstructed')
axes[0].scatter(t[is_anomaly], data_with_anomaly[is_anomaly], color='red', s=40, zorder=5, label='Anomaly')
axes[0].set_title("Reconstruction-based Anomaly Detection")
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[1].plot(t, recon_error, 'b-', linewidth=0.8)
axes[1].axhline(y=threshold, color='red', linestyle='--', label=f'Threshold (95th percentile)')
axes[1].scatter(t[is_anomaly], recon_error[is_anomaly], color='red', s=40, zorder=5)
axes[1].set_title("Reconstruction Error")
axes[1].set_xlabel("Time")
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
まとめ
本記事では、時系列異常検知(TSAD)の代表的な手法をまとめました。
- 時系列の異常はPoint / Contextual / Collectiveの3種類に分類される
- 統計的手法(移動平均 + シグマ、EWMA)はシンプルで解釈性が高い
- 機械学習ベース(Isolation Forest等)は多変量データに対応しやすい
- 深層学習ベース(Autoencoder、Transformer)は高次元で複雑なパターンの検出に強い
- 手法の選択はデータの特性(次元数、異常の種類、リアルタイム性の要求)に応じて行う