人工衛星のテレメトリデータで異常を検出したいとき、最大の障壁はラベル付きデータの不足です。衛星の異常事象は極めてまれであり、1つのミッションで数年間にわずか数件しか発生しないこともあります。さらに、異常のパターンは衛星ごと、サブシステムごとに異なるため、ある衛星で学習したモデルが別の衛星にそのまま使えるとは限りません。
従来の異常検知手法は、正常データで自己符号化器や予測モデルを学習し、再構成誤差や予測誤差が閾値を超えた点を異常とします。しかし、この「正常の定義」がドメインごとに異なるため、新しいデータセットに適用するたびに学習し直す必要がありました。
TimeRCD(NeurIPS 2025)は、この問題に根本的に異なるアプローチで取り組みます。「正常とは何か」を学習する代わりに、隣接する時間窓の間の相対的な不整合(Relative Contextual Discrepancy)を検出するという発想です。正常データでは隣接窓は滑らかに遷移しますが、異常が発生すると不連続な変化が生じます。この「相対的な乖離」はドメインに依存しない普遍的な異常の特徴であり、合成データで事前学習するだけでゼロショットの異常検知が可能になります。
TimeRCDを理解することは、以下のような場面で直接役立ちます。
- 衛星テレメトリの異常検知: ラベルなしでゼロショットに異常を検出できるため、新しい衛星ミッションに即座に適用可能です
- 産業設備のモニタリング: 工場の振動・温度・圧力データに対して、事前学習済みモデルでドメイン固有の学習なしに異常検知が可能です
- テレメトリ検索との統合: 異常パターンの検出と類似異常の検索を組み合わせることで、過去の障害事例の知識を活用できます
本記事の内容
- 既存異常検知手法の限界(ドメイン依存性)
- Relative Contextual Discrepancy(RCD)の概念と数式
- 合成異常データの生成戦略
- トークンレベルの異常スコアリング
- ゼロショット異常検知のパイプライン
- Pythonでの実装とシミュレーション
前提知識
この記事を読む前に、以下の記事を読んでおくと理解が深まります。
既存手法の限界
ドメイン依存の正常モデル
従来の時系列異常検知は、大きく以下の3つのアプローチに分類されます。
再構成ベース: 正常データでAutoEncoderやVAEを学習し、再構成誤差が高い点を異常とする。代表例はOmniAnomaly、USAD。
$$ \text{anomaly\_score}(x_t) = \|x_t – \hat{x}_t\|^2, \quad \hat{x}_t = \text{Decoder}(\text{Encoder}(x_t)) $$
予測ベース: 正常データで時系列予測モデルを学習し、予測誤差が高い点を異常とする。
$$ \text{anomaly\_score}(x_t) = \|x_t – f_\theta(x_{t-w:t-1})\|^2 $$
対照学習ベース: 正常パターンの表現を学習し、表現空間で外れた点を異常とする。前回解説したCLPNM-ADもこのカテゴリです。
これらの手法に共通する問題は、正常データでの学習が必須であり、新しいドメインに適用するたびに学習し直す必要があることです。
ゼロショット異常検知の困難さ
時系列基盤モデル(Chronos、TimesFM、MOMENT等)の登場により、ゼロショットの時系列予測は可能になりました。しかし、予測ベースの異常検知にこれらを使うと、以下の問題が生じます。
- 閾値の設定: 予測誤差がどの程度なら異常とするかの閾値は、ドメインごとに異なる
- 誤差の分布: 正常時の予測誤差の分布が未知なため、統計的な検定が困難
- 異常の多様性: スパイク、レベルシフト、トレンド変化など、異常の種類は多様で、予測誤差だけでは区別が困難
TimeRCDはこれらの問題を、相対的な乖離という枠組みで解決します。
Relative Contextual Discrepancy(RCD)
核心的アイデア
TimeRCDの核心は、異常を「絶対的な逸脱」ではなく「隣接窓間の相対的な乖離」として捉えることです。
時系列 $\bm{x} = (x_1, x_2, \ldots, x_T)$ に対して、隣接する2つの窓 $\bm{x}_{t-w:t}$(左窓)と $\bm{x}_{t:t+w}$(右窓)を考えます。正常な時系列では、左窓と右窓の間に滑らかな遷移が見られます。異常が時刻 $t$ 付近で発生すると、左窓と右窓の間に不連続な変化(パターンの断絶)が生じます。
この直感を数式化します。エンコーダ $f_\theta$ で各窓を埋め込みベクトルに変換し、その差分で異常スコアを定義します:
$$ \text{RCD}(t) = d\left(f_\theta(\bm{x}_{t-w:t}), \, f_\theta(\bm{x}_{t:t+w})\right) $$
ここで $d(\cdot, \cdot)$ は距離関数(コサイン距離やユークリッド距離)です。
なぜ「相対的」が重要か
「相対的」であることの重要性を具体例で示しましょう。
例1: 衛星の日照・日陰サイクル 衛星が日陰に入ると太陽電池の出力が急激に低下し、温度も大きく変化します。これは「絶対的な値の変化」としては大きいですが、毎軌道で発生する正常な変動です。左窓と右窓のパターンは「日照→日陰遷移の定常パターン」として一貫しており、RCDは低い値を示します。
例2: 姿勢制御の異常 姿勢角速度が突然スパイクし、その後正常に戻った場合。スパイクの絶対値は日照/日陰遷移より小さいかもしれませんが、左窓の定常パターンと右窓のスパイク含有パターンは明確に異なり、RCDは高い値を示します。
つまり、RCDはパターンの変化に反応し、値の大きさには鈍感です。これにより、ドメインごとのスケールや正常範囲を事前に知る必要がなくなります。
トークンレベルの異常スコア
TimeRCDはRCDをさらに精緻化し、トークンレベル(パッチレベル)の異常スコアを計算します。
時系列をパッチに分割し、各パッチを $\bm{p}_i$ とします。エンコーダはパッチ列を入力として各パッチの埋め込み $\bm{z}_i$ を出力します:
$$ [\bm{z}_1, \bm{z}_2, \ldots, \bm{z}_n] = f_\theta([\bm{p}_1, \bm{p}_2, \ldots, \bm{p}_n]) $$
左窓と右窓の境界付近のパッチ埋め込みを比較し、各パッチの異常スコアを算出します:
$$ s_i = \|\bm{z}_i^{\text{left}} – \bm{z}_i^{\text{right}}\|^2 $$
ここで $\bm{z}_i^{\text{left}}$ は左窓コンテキストでの埋め込み、$\bm{z}_i^{\text{right}}$ は右窓コンテキストでの埋め込みです。同じパッチを異なるコンテキストで評価したとき、異常パッチでは埋め込みが大きく変化します。
合成データによる事前学習
合成異常の生成
TimeRCDの最もユニークな特徴は、実データの異常ラベルを一切使わず、合成データだけで事前学習することです。合成異常は以下の4タイプを含みます。
Type 1: スパイク(Point Anomaly) $$ x’_t = x_t + A \cdot \delta(t – t_0), \quad A \sim \mathcal{N}(0, \sigma_{\text{spike}}^2) $$
Type 2: レベルシフト(Contextual Anomaly) $$ x’_t = x_t + c, \quad t \in [t_0, t_0 + \Delta], \quad c \sim \mathcal{U}[-C, C] $$
Type 3: トレンド変化(Trend Anomaly) $$ x’_t = x_t + \alpha(t – t_0), \quad t > t_0, \quad \alpha \sim \mathcal{N}(0, \sigma_{\text{trend}}^2) $$
Type 4: パターン変化(Shapelet Anomaly) $$ x’_t = \text{random\_shapelet}(t), \quad t \in [t_0, t_0 + \Delta] $$
各合成異常にはトークンレベルの正確な異常ラベルが自動付与されます。これにより、大規模な教師あり学習が可能になります。
事前学習の目標
事前学習の損失関数は、RCDスコアと合成異常ラベルの交差エントロピーです:
$$ \mathcal{L} = -\frac{1}{n}\sum_{i=1}^{n}\left[y_i \log(s_i) + (1-y_i)\log(1-s_i)\right] $$
ここで $y_i \in \{0, 1\}$ は合成異常ラベル、$s_i = \sigma(\text{RCD}_i)$ はシグモイドで正規化された異常スコアです。
合成データの多様性が鍵です。スパイク、レベルシフト、トレンド変化、パターン変化を組み合わせた大量の合成異常を生成することで、モデルは「ドメインに依存しない異常の普遍的なパターン」を学習します。
Pythonによる実装
RCDベースの異常検知シミュレーション
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
def generate_normal_series(length=512):
"""正常な時系列を生成する(周期+トレンド+ノイズ)。"""
t = np.arange(length)
series = (np.sin(2 * np.pi * t / 50) +
0.5 * np.sin(2 * np.pi * t / 120) +
0.01 * t / length +
np.random.randn(length) * 0.1)
return series
def inject_anomalies(series, anomaly_type='spike', start=None, duration=10):
"""合成異常を注入する。"""
s = series.copy()
length = len(s)
if start is None:
start = length // 2
labels = np.zeros(length)
if anomaly_type == 'spike':
spike_pos = start
s[spike_pos] += 5 * np.std(series)
labels[spike_pos] = 1
elif anomaly_type == 'level_shift':
s[start:start+duration] += 3 * np.std(series)
labels[start:start+duration] = 1
elif anomaly_type == 'trend_change':
slope = 0.05
for i in range(start, min(start+duration, length)):
s[i] += slope * (i - start)
labels[i] = 1
elif anomaly_type == 'pattern_change':
# 正弦波パターンを突然変更
t_anom = np.arange(duration)
s[start:start+duration] = 3 * np.sin(2 * np.pi * t_anom / 5)
labels[start:start+duration] = 1
return s, labels
def compute_rcd_scores(series, window_size=32, stride=1):
"""RCD(Relative Contextual Discrepancy)スコアを計算する。"""
length = len(series)
scores = np.zeros(length)
for t in range(window_size, length - window_size, stride):
left_window = series[t-window_size:t]
right_window = series[t:t+window_size]
# 各窓の統計的特徴を抽出(簡略化版)
left_features = np.array([
np.mean(left_window),
np.std(left_window),
np.mean(np.diff(left_window)), # 平均変化率
np.max(np.abs(np.diff(left_window))), # 最大変化率
np.polyfit(np.arange(window_size), left_window, 1)[0], # トレンド
])
right_features = np.array([
np.mean(right_window),
np.std(right_window),
np.mean(np.diff(right_window)),
np.max(np.abs(np.diff(right_window))),
np.polyfit(np.arange(window_size), right_window, 1)[0],
])
# ユークリッド距離
scores[t] = np.sqrt(np.sum((left_features - right_features)**2))
return scores
# 4種類の異常に対するRCDスコアを計算
anomaly_types = ['spike', 'level_shift', 'trend_change', 'pattern_change']
fig, axes = plt.subplots(4, 2, figsize=(16, 16))
for idx, atype in enumerate(anomaly_types):
normal = generate_normal_series(512)
anomalous, labels = inject_anomalies(normal, anomaly_type=atype,
start=256, duration=30)
rcd_scores = compute_rcd_scores(anomalous, window_size=32)
# 左: 時系列
axes[idx, 0].plot(normal, alpha=0.3, label='Normal', color='gray')
axes[idx, 0].plot(anomalous, label='With anomaly', color='#00d4ff')
# 異常区間をハイライト
anomaly_mask = labels > 0
if np.any(anomaly_mask):
anom_start = np.where(anomaly_mask)[0][0]
anom_end = np.where(anomaly_mask)[0][-1]
axes[idx, 0].axvspan(anom_start, anom_end, alpha=0.2, color='red')
axes[idx, 0].set_title(f'Time Series ({atype})')
axes[idx, 0].set_ylabel('Value')
axes[idx, 0].legend(fontsize=8)
axes[idx, 0].grid(True, alpha=0.3)
# 右: RCDスコア
axes[idx, 1].plot(rcd_scores, color='#ffa726')
if np.any(anomaly_mask):
axes[idx, 1].axvspan(anom_start, anom_end, alpha=0.2, color='red')
axes[idx, 1].set_title(f'RCD Score ({atype})')
axes[idx, 1].set_ylabel('Score')
axes[idx, 1].grid(True, alpha=0.3)
axes[-1, 0].set_xlabel('Time Step')
axes[-1, 1].set_xlabel('Time Step')
plt.tight_layout()
plt.savefig('timercd_anomaly_detection.png', dpi=150, bbox_inches='tight')
plt.show()
各行が異常タイプに対応しています。左列が時系列(赤帯が異常区間)、右列がRCDスコアです。
スパイク(1行目): 点異常の前後でRCDスコアが急上昇しています。左窓と右窓でスパイクの有無が変わるため、統計的特徴に大きな差が生じます。
レベルシフト(2行目): シフトの開始と終了でRCDスコアがピークを示しています。シフト中は左窓も右窓もシフト後のレベルを含むため、スコアは安定しています。これはRCDが「変化点」に反応する特性を示しています。
トレンド変化(3行目): トレンドの変化点付近でRCDスコアが上昇します。左窓は元のトレンド、右窓は新しいトレンドを含むため、傾きの差がスコアに反映されます。
パターン変化(4行目): パターンが突然変わる区間の境界でRCDスコアが高くなります。周波数や振幅が大きく異なるパターンが隣接するため、統計的特徴の差が最大化されます。
重要なのは、これら4種類の異常全てに対して、同じRCD計算で検出できていることです。これがドメインに依存しない検出の本質です。
合成データ生成と学習シミュレーション
def generate_synthetic_training_data(n_samples=500, length=256):
"""合成異常付き学習データを生成する。"""
X = []
Y = []
for _ in range(n_samples):
series = generate_normal_series(length)
if np.random.random() < 0.5:
# 異常を注入
atype = np.random.choice(['spike', 'level_shift',
'trend_change', 'pattern_change'])
start = np.random.randint(length // 4, 3 * length // 4)
duration = np.random.randint(5, 40)
series, labels = inject_anomalies(series, atype, start, duration)
else:
labels = np.zeros(length)
X.append(series)
Y.append(labels)
return np.array(X), np.array(Y)
# 合成データ生成
X_train, Y_train = generate_synthetic_training_data(500, 256)
# RCDスコアと真のラベルの相関を評価
from sklearn.metrics import roc_auc_score
all_scores = []
all_labels = []
for i in range(len(X_train)):
scores = compute_rcd_scores(X_train[i], window_size=24)
all_scores.extend(scores[24:-24]) # 境界を除外
all_labels.extend(Y_train[i][24:-24])
all_scores = np.array(all_scores)
all_labels = np.array(all_labels)
# 非ゼロスコアのみ
mask = all_scores > 0
if np.sum(all_labels[mask]) > 0:
auc = roc_auc_score(all_labels[mask], all_scores[mask])
print(f"RCD-based AUC-ROC: {auc:.4f}")
# 閾値による検出性能
thresholds = np.percentile(all_scores[mask], np.arange(80, 100, 1))
precisions = []
recalls = []
for thresh in thresholds:
pred = (all_scores > thresh).astype(int)
tp = np.sum((pred == 1) & (all_labels == 1))
fp = np.sum((pred == 1) & (all_labels == 0))
fn = np.sum((pred == 0) & (all_labels == 1))
precision = tp / (tp + fp + 1e-10)
recall = tp / (tp + fn + 1e-10)
precisions.append(precision)
recalls.append(recall)
plt.figure(figsize=(8, 5))
plt.plot(recalls, precisions, 'o-', color='#00d4ff', markersize=4)
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title(f'RCD Anomaly Detection: Precision-Recall (AUC-ROC={auc:.3f})')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('timercd_pr_curve.png', dpi=150, bbox_inches='tight')
plt.show()
この精度-再現率曲線は、合成異常に対するRCDベースの検出性能を示しています。学習なし(ゼロショット)の統計的特徴量だけでもAUC-ROCが高い水準にあることが確認できます。TimeRCDの実際の実装ではTransformerベースのエンコーダを使用するため、より複雑なパターンの異常も高精度に検出できます。
テレメトリ異常検索への応用
TimeRCDのRCDスコアは、異常検知だけでなく異常パターンの検索にも活用できます。
- 異常区間の埋め込み: 高RCDスコア区間のエンコーダ出力を埋め込みベクトルとして保存
- 類似異常の検索: 新しい異常が検出されたとき、その埋め込みベクトルで過去の類似異常を検索
- 根本原因の推定: 類似した過去の異常とその原因(既知であれば)を参照して、新しい異常の原因を推定
まとめ
本記事では、NeurIPS 2025のTimeRCDについて解説しました。
- 従来の異常検知は正常データでの学習が必須であり、新ドメインへの適用にコストがかかる
- TimeRCDは隣接窓間の相対的な乖離(RCD)を異常指標とすることで、正常の定義を不要にした
- 合成異常データ(スパイク、レベルシフト、トレンド変化、パターン変化)で事前学習し、ゼロショットで多様な異常を検出
- トークンレベルの異常スコアにより、異常の位置を高精度に特定
- RCDは「パターンの変化」に反応し「値の大きさ」に鈍感であるため、ドメインをまたいだ汎化が可能
次のステップとして、以下の記事も参考にしてください。