CLPNM-AD — プロトタイプベースの対照学習で衛星テレメトリの異常を検知する【Sensors 2023】

地球を周回する人工衛星は、温度、電圧、電流、姿勢角など数十〜数百のテレメトリチャネルを地上局に送り続けています。地上の運用チームはこのテレメトリデータから衛星の健全性を判断しますが、膨大なデータの中から異常の兆候を人手で見つけるのは極めて困難です。しかも、衛星の異常は稀にしか発生しないため、ラベル付きの異常データはほとんど存在しません。

閾値ベースの従来手法は、単一チャネルの逸脱は検出できますが、複数チャネル間の相関が崩れる「相関異常」は見逃してしまいます。例えば、太陽電池の電圧と温度は通常強い相関を持ちますが、この相関パターンが崩れた場合、個々のチャネルは正常範囲内にあっても、衛星全体としては異常な状態かもしれません。

CLPNM-AD(Guo et al., Sensors 2023)は、対照学習を衛星テレメトリの異常検知に直接適用した先駆的な研究です。正常データの多次元的な相関構造を対照学習で学習し、プロトタイプベースのハードネガティブ合成で表現の質を高め、マハラノビス距離で異常度を判定します。特に、中国の量子科学実験衛星「墨子(Micius)」の実テレメトリデータで有効性が実証されている点が重要です。

CLPNM-ADの理解は、以下の応用領域で直接的に役立ちます。

  • 衛星運用: テレメトリの自動監視と異常の早期検知。予防保全によるミッション寿命の延伸
  • 産業IoT: 工場設備のセンサデータから設備異常を検知する予知保全。テレメトリと同じ多変量表形式データ
  • 医療監視: バイタルサイン(心拍・血圧・体温・SpO2)の多変量相関異常の検知
  • サイバーセキュリティ: ネットワークトラフィックの多次元的なパターン異常の検出

本記事の内容

  • CLPNM-ADが解決する課題 — テレメトリの相関異常検知
  • Random Feature Corruption によるデータ拡張
  • TabEncoder — 表形式データ用の1D-CNNエンコーダ
  • Sinkhorn-Knoppアルゴリズムによるプロトタイプ割り当て
  • Prototype-Based Negative Mixing — ハードネガティブ合成の核心
  • マハラノビス距離による異常スコア
  • PyTorchによる簡易版CLPNM-ADの実装と異常検知デモ
  • 実験結果の解説

前提知識

この記事を読む前に、以下の記事を読んでおくと理解が深まります。

画像なし
対照学習(Contrastive Learning)の理論
InfoNCE損失の数学的導出と、正例・負例の設計方法を解説しています。
画像なし
対照学習による事前学習
SimCLR・MoCo・CLIPなど、対照学習フレームワークの設計思想を比較解説しています。
正規分布とは
正規分布の定義やグラフの見方を解説しています。マハラノビス距離の基礎となる概念です。
画像なし
CLIPの対照学習を数式から理解してPythonで実装する
画像とテキストの対照学習の原型であるCLIPの理論と実装を解説しています。

CLPNM-ADの全体像 — 4段階のパイプライン

CLPNM-ADは、ラベルなしの正常データから異常検知モデルを構築する自己教師あり(self-supervised)フレームワークです。学習時に異常データを一切使わず、正常データの内部構造だけを学習し、正常から逸脱したサンプルを異常と判定します。

パイプラインを料理の比喩で説明しましょう。

  1. 食材の準備(Random Feature Corruption): 元の食材(正常サンプル)の一部をランダムに別の食材に入れ替えて、「微妙に違うバリエーション」を作ります。これが対照学習の正例ペアになります
  2. 食材の分析(TabEncoder): 各バリエーションを1D-CNNで埋め込み空間に変換します
  3. レシピの分類(プロトタイプ割り当て): 似た食材をグループ(プロトタイプ)に分類します。Sinkhorn-Knoppアルゴリズムがこの「均等な分類」を担います
  4. 味の区別(Prototype-Based Negative Mixing): 同じレシピの食材同士は近くに、異なるレシピの食材は遠くに配置します。偽の負例(同じレシピなのに異なるとされるペア)を防ぐのがプロトタイプの役割です
  5. 品質検査(マハラノビス距離): 新しい食材がどのレシピのグループからも遠ければ「異常」と判定します

この全体像を頭に入れた上で、各コンポーネントの詳細を見ていきましょう。

既存手法の課題 — なぜ単純な対照学習ではダメなのか

深層学習ベースの異常検知の限界

従来の深層学習ベースの異常検知手法には、以下の問題があります。

DAGMMは再構成誤差とガウス混合モデルを組み合わせますが、多次元テレメトリの複雑な相関構造を十分に捉えきれません。

Deep SVDDはデータを超球面に写像しますが、全データを一つの超球面に押し込むため、正常データの中にある意味的なサブグループ(例: 日照時の運用モードと日陰時の運用モード)を区別できません。

対照学習の「偽負例」問題

対照学習を異常検知に適用する際の最大の問題は偽負例(false negatives)です。バッチ内でランダムに選ばれた負例ペアが、実際には同じ意味カテゴリに属するサンプルであるケースがあります。

対照学習のInfoNCE損失にはhardness-aware property(難易度認識特性)があり、アンカーに近い負例ほど大きな勾配を受けます。偽負例は(同カテゴリなので)アンカーに近いため、特に大きな勾配で押し離されてしまいます。これは局所的な意味構造を破壊する深刻な問題です。

CLPNM-ADは、この問題をプロトタイプベースの負例合成で解決しています。プロトタイプの割り当てにより各サンプルの意味カテゴリを推定し、同じプロトタイプに属するサンプルは負例から除外異なるプロトタイプのサンプルのみを負例として使用します。

さらに、単なる除外ではなく、プロトタイプ間の距離に基づいてハードネガティブを合成することで、より効果的な学習を実現しています。

では、パイプラインの最初のステップであるデータ拡張から見ていきましょう。

Random Feature Corruption — 表形式データのための対照学習拡張

画像拡張との違い

画像の対照学習(SimCLR等)では、回転、クロップ、色調変換などの空間的な変換が正例ペアの生成に使われます。しかし、テレメトリデータは表形式(tabular)であり、各列が物理量(温度、電圧、電流など)を表します。画像的な変換は意味をなしません。

CLPNM-ADは、SCARF(Self-supervised Contrastive Learning using Random Feature Corruption)に触発されたランダム特徴破壊を拡張戦略として採用しています。

アルゴリズム

入力サンプル $\bm{x} \in \mathbb{R}^D$($D$ 次元の特徴ベクトル)に対して、以下の手順で拡張サンプルを生成します。

  1. データセットからランダムに別のサンプル $\bm{x}_{\text{rand}}$ を選ぶ
  2. $D$ 個の特徴次元から、$k$ 個をランダムに選択($k$ は破壊数)
  3. 選ばれた次元を $\bm{x}_{\text{rand}}$ の対応する値で置換する

$$ \bm{x}_{\text{aug}}[d] = \begin{cases} \bm{x}_{\text{rand}}[d] & \text{if } d \in \text{corruption\_idx} \\ \bm{x}[d] & \text{otherwise} \end{cases} $$

破壊数 $k$ はハイパーパラメータで、データセットの次元数に応じて設定されます。

データセット 入力次元 $D$ 破壊数 $k$ 破壊率
Thyroid 6 4 67%
Micius(墨子) 19 16 84%
Satellite 36 34 94%
Satimage 36 32 89%

破壊率が非常に高い(67%〜94%)点は注目に値します。これは、テレメトリデータでは残された少数の特徴からでも全体のパターンを復元できるほど、チャネル間の相関が強いことを意味しています。逆に、相関が崩れたサンプル(異常)は、この対照学習の枠組みで「正常」から大きく外れた表現を持つことになります。

データ拡張の仕組みが理解できたところで、次は拡張されたサンプルを埋め込み空間に変換するエンコーダの構造を見ていきましょう。

TabEncoder — 表形式データを1D-CNNで処理する

設計思想

テレメトリデータは時系列のスナップショットとして表形式で表されます(各行が1時点、各列が1チャネル)。この表形式データをニューラルネットワークで効率的に処理するために、CLPNM-ADは1D-CNNベースのTabEncoderを採用しています。

TabEncoderのアイデアは、$D$ 次元の入力をまず2次元の疑似画像に変換(リシェイプ)し、1D-CNNで特徴を抽出するものです。具体的には、入力 $\bm{x} \in \mathbb{R}^D$ を線形層で $\text{channel\_input} \times \text{sign\_size}$ の2次元表現に変換し、channel_input を CNN のチャネル方向として扱います。

$$ \bm{h} = \text{Linear}(\bm{x}) \in \mathbb{R}^{C_{\text{in}} \times S} $$

ここで $C_{\text{in}} = 16$(チャネル入力数)、$S = 8$(シグネチャサイズ)がデフォルト値です。この2次元表現に対して、3層のカーネルサイズ1の1D-CNNを適用します。

$$ \bm{e} = \text{CNN}(\bm{h}) \in \mathbb{R}^{d_{\text{hidden}}} $$

CNNの各層には BatchNorm と LeakyReLU(傾き0.2)が挿入されています。最終的にフラットにして線形層で $d_{\text{hidden}}$ 次元の埋め込みに射影します。

Projection Head

対照学習の実践では、エンコーダの出力をそのまま対照損失に使うより、追加の射影ヘッドを通した表現を使う方が性能が良いことがSimCLRで示されています。CLPNM-ADも同様に、LeakyReLU + 線形層のprojection headを使用します。

$$ \bm{z} = \text{ProjHead}(\bm{e}) = \text{Linear}(\text{LeakyReLU}(\bm{e})) \in \mathbb{R}^{d_{\text{hidden}}} $$

異常検知の推論時には、projection headの出力 $\bm{z}$ ではなく、エンコーダの埋め込み $\bm{e}$ を特徴量として使用します。これはSimCLRの知見と一致しており、projection headは対照学習の最適化を安定させるための「犠牲層」として機能します。

エンコーダの出力が得られたところで、次はこの表現をプロトタイプに割り当てるSinkhorn-Knoppアルゴリズムを見ていきましょう。

Sinkhorn-Knoppによるプロトタイプ割り当て

プロトタイプの役割

プロトタイプとは、データの意味的なクラスタの「代表点」です。正常テレメトリデータにも、運用モード(日照/日陰、軌道上昇/下降など)に応じた複数のサブグループが存在します。プロトタイプはこれらのサブグループを捉え、以下の2つの役割を果たします。

  1. 偽負例の防止: 同じプロトタイプに属するサンプルは同じ意味カテゴリとみなし、負例から除外する
  2. ハードネガティブ合成のガイド: プロトタイプ間の距離に基づいて、適切な難易度の合成負例を生成する

プロトタイプ線形層

プロトタイプは、projection headの出力に適用される線形層 $\bm{W}_{\text{proto}} \in \mathbb{R}^{d_{\text{hidden}} \times K}$($K$ はプロトタイプ数)として実装されます。重みは各ステップで $L_2$ 正規化されます。

$$ \bm{p} = \bm{z}^T \bm{W}_{\text{proto}} \in \mathbb{R}^K $$

$\bm{p}$ の各成分は、サンプルの各プロトタイプへの「近さ」(非正規化)を表します。

Sinkhorn-Knoppアルゴリズム

$\bm{p}$ をソフト割り当て行列 $\bm{Q}$ に変換するために、Sinkhorn-Knoppアルゴリズム(最適輸送の反復正規化解法)が使われます。

まず、温度パラメータ $\epsilon$ でスケーリングした指数関数を適用します。

$$ \bm{Q} = \exp(\bm{p} / \epsilon)^T \in \mathbb{R}^{K \times B} $$

ここで $\epsilon = 0.05$ です。$\epsilon$ が小さいほど割り当てが「硬く」(one-hotに近く)なり、大きいほど「柔らかく」(一様に近く)なります。

次に、$\bm{Q}$ を合計1に正規化した後、行と列を交互に正規化する反復を5回行います。

$$ \bm{Q} \leftarrow \frac{\bm{Q}}{\sum \bm{Q}} $$

各反復で:

$$ \bm{Q} \leftarrow \frac{\bm{Q}}{\sum_{\text{rows}} \bm{Q}} \cdot \frac{1}{K}, \quad \bm{Q} \leftarrow \frac{\bm{Q}}{\sum_{\text{cols}} \bm{Q}} \cdot \frac{1}{B} $$

最後に $\bm{Q} \leftarrow \bm{Q} \cdot B$ として、列の合計が1になるように調整します。

この反復正規化の意味は、各プロトタイプに(ほぼ)均等な数のサンプルが割り当てられるように強制することです。一つのプロトタイプに全サンプルが集中する「崩壊」を防ぎ、意味的に多様なクラスタ構造が保持されます。

ハードラベルの取得

ソフト割り当て $\bm{Q}$ から、各サンプルのプロトタイプラベルを取得します。

$$ \text{hard\_label}_i = \arg\max_k Q_{ki} $$

このラベルが、次のPrototype-Based Negative Mixingで使われます。

SwAV整合損失

プロトタイプ割り当ての一貫性を強化するために、SwAV(Swapping Assignments between Views)スタイルの損失が追加されます。同じサンプルの2つの拡張ビューが、同じプロトタイプに割り当てられるべきだという制約です。

$$ L_{\text{swav}} = -\frac{1}{B} \sum_{i=1}^{B} \sum_{k=1}^{K} Q_{ki} \log \frac{\exp(p_k^{(v)} / \tau)}{\sum_{k’} \exp(p_{k’}^{(v)} / \tau)} $$

ここで $(v)$ は別のビュー、$\tau$ は温度パラメータです。一方のビューの割り当て $\bm{Q}$ を、他方のビューのプロトタイプ出力 $\bm{p}^{(v)}$ で予測する交差損失です。

プロトタイプ割り当ての仕組みが理解できたところで、CLPNM-ADの最も重要な貢献であるPrototype-Based Negative Mixingに進みましょう。

Prototype-Based Negative Mixing — 核心的貢献

2つのフェーズ

CLPNM-ADの対照学習は、ウォームアップフェーズ本学習フェーズの2段階で構成されています。

ウォームアップフェーズ(epoch < warm_up)

プロトタイプがまだ信頼できない初期段階では、標準的なSimCLRスタイルのInfoNCE損失を使用します。

$$ L_{\text{warmup}} = -\log \frac{\exp(\text{sim}(\bm{z}_i, \bm{z}_i^+) / \tau)}{\sum_{j \neq i} \exp(\text{sim}(\bm{z}_i, \bm{z}_j) / \tau)} $$

温度 $\tau = 0.07$ で、バッチ内の他サンプルが負例です。この段階でエンコーダとプロトタイプの表現がある程度安定し、プロトタイプ割り当てが信頼できるようになります。

本学習フェーズ(epoch >= warm_up)

プロトタイプが安定した後に、Prototype-Based Negative Mixingが有効化されます。

プロトタイプ間距離の計算

まず、$K$ 個のプロトタイプ間のコサイン距離を計算します。

$$ D_{\text{proto}}(i, j) = 1 – \frac{\bm{w}_i \cdot \bm{w}_j}{\|\bm{w}_i\| \|\bm{w}_j\|} $$

ここで $\bm{w}_i$ は正規化されたプロトタイプ $i$ の重みベクトルです。$D_{\text{proto}}(i, j)$ が大きいほど、プロトタイプ $i$ と $j$ は意味的に遠いことを意味します。

混合重み $\alpha$ の計算

各サンプルに対して、プロトタイプ間距離に基づく混合重みを計算します。サンプル $n$ がプロトタイプ $c$ に属するとき、別のサンプル $m$(プロトタイプ $c’$ に属する)との混合重み $\alpha$ は以下のように決まります。

$$ \alpha_{\text{raw}}(n, m) = D_{\text{proto}}(c, c’) $$

同じプロトタイプに属するサンプル($c = c’$)は最小距離に設定し、min-max正規化で $[0, 1]$ にスケーリングします。

$$ \alpha(n, m) = \frac{\alpha_{\text{raw}}(n, m) – \alpha_{\min}}{\alpha_{\max} – \alpha_{\min} + 10^{-12}} $$

ハードネガティブの合成

混合重み $\alpha$ を使って、アンカーと他のサンプルを線形補間し、ハードネガティブを合成します。

$$ \bm{z}_{\text{neg}}(n, m) = (1 – \alpha(n, m)) \cdot \bm{z}_m + \alpha(n, m) \cdot \bm{z}_n $$

この式の直感を説明しましょう。

  • プロトタイプ間距離が大きい($\alpha$ が大きい)場合: 合成負例はアンカー $\bm{z}_n$ に近づく。意味的に遠いサンプル同士なので、それほど「ハード」にしなくても十分に区別できるため、アンカー寄りの表現でも学習に有効です
  • プロトタイプ間距離が小さい($\alpha$ が小さい)場合: 合成負例は元のサンプル $\bm{z}_m$ に近い。意味的に近いが異なるプロトタイプのサンプルなので、「ほぼ本物の負例」として厳しい識別を強制します
  • 同じプロトタイプ($\alpha = \alpha_{\min}$)の場合: 偽負例の可能性が高いため、マスクで除外されます

マスク機構

Prototype-Based Negative Mixingでは、2種類のマスクが使われます。

$$ \text{mask\_pos}_{nm} = \begin{cases} 1 & \text{if } c_n = c_m \text{(同じプロトタイプ)} \\ 0 & \text{otherwise} \end{cases} $$

$$ \text{mask\_neg}_{nm} = \begin{cases} 1 & \text{if } c_n \neq c_m \text{(異なるプロトタイプ)} \\ 0 & \text{otherwise} \end{cases} $$

最終的な類似度行列は、正例マスクが適用される部分には通常の類似度を、負例マスクが適用される部分には合成ネガティブの類似度を使用します。

$$ \bm{S}_{\text{final}} = \bm{S}_{\text{original}} \odot \text{mask\_pos} + \bm{S}_{\text{neg}} \odot \text{mask\_neg} $$

そして対照損失を計算します。

$$ L_{\text{CL}} = -\frac{1}{2B} \sum_{i=1}^{2B} \log \frac{\exp(\text{sim}(\bm{z}_i, \bm{z}_i^+) / \tau)}{(\bm{S}_{\text{final}} \odot \text{mask}_{\text{self}})_i} $$

ここで $\text{mask}_{\text{self}}$ は自分自身を除外する対角マスクです。

なぜこの設計が効果的なのか

この設計のエレガントさは、プロトタイプという構造を利用して、対照学習の本質的な弱点(偽負例問題)を直接解決している点にあります。従来の対照学習では「バッチ内の全てのペアを負例とみなす」ため、同じ意味カテゴリのサンプルが押し離されてしまいました。CLPNM-ADは:

  1. プロトタイプで意味カテゴリを推定し、偽負例を除外する
  2. プロトタイプ間距離で「どの程度ハードな負例にするか」を制御する
  3. 合成負例により、データ量に依存しない豊富な負例を生成する

この3段階の工夫により、正常データの意味構造を保持しながら、異常検知に必要な高品質な表現を獲得できるのです。

対照学習による表現が得られたところで、次はこの表現を使って異常スコアを計算する方法を見ていきましょう。

マハラノビス距離による異常スコア

なぜマハラノビス距離なのか

学習済みエンコーダで特徴量を抽出した後、各サンプルが「正常からどれだけ離れているか」を定量化する必要があります。ユークリッド距離は各次元を等しく扱いますが、特徴量の各次元は異なるスケールや相関を持っています。マハラノビス距離は共分散構造を考慮した距離尺度であり、特徴量間の相関を正しく反映した異常度を測定できます。

プロトタイプベースのマハラノビス距離

CLPNM-ADでは、学習済みの特徴量とプロトタイプラベルを使って、以下の手順で異常スコアを計算します。

  1. 学習データ全体をエンコーダに通し、エンコーダの埋め込み $\bm{e}$ とプロトタイプラベル $c$ を取得する
  2. Linear Discriminant Analysis(LDA)をプロトタイプラベルでフィットし、各プロトタイプの平均 $\bm{\mu}_c$ と共有共分散行列 $\bm{\Sigma}$ を得る
  3. テストサンプルの特徴量 $\bm{e}_{\text{test}}$ に対して、各プロトタイプとのマハラノビス距離を計算し、最小値を異常スコアとする

$$ d_{\text{Mahal}}(\bm{e}, \bm{\mu}_c) = \sqrt{(\bm{e} – \bm{\mu}_c)^T \bm{\Sigma}^{-1} (\bm{e} – \bm{\mu}_c)} $$

$$ \text{score}(\bm{e}) = \min_c \; d_{\text{Mahal}}(\bm{e}, \bm{\mu}_c) $$

正常サンプルはいずれかのプロトタイプの近くに位置するため、最小マハラノビス距離は小さくなります。異常サンプルはどのプロトタイプからも遠い位置に写像されるため、スコアが大きくなります。

閾値の決定

異常/正常の二値判定には、テストデータの異常率に基づくパーセンタイル閾値を使用します。

$$ \theta = \text{percentile}(\text{scores}, 100 – r \times 100) $$

ここで $r$ はテストデータにおける異常サンプルの割合です。スコアが $\theta$ を超えるサンプルを異常と判定します。

統合損失関数

CLPNM-ADの最終的な損失関数は、対照損失とSwAV整合損失の加重和です。

$$ L = L_{\text{CL}} + \alpha_{\text{swav}} \cdot L_{\text{swav}} $$

$\alpha_{\text{swav}} = 0.4$ で固定されています。

$L_{\text{CL}}$ は学習段階に応じて以下のように切り替わります。

  • epoch < warm_up: 標準的なSimCLRスタイルの対照損失(全バッチ内サンプルが負例)
  • epoch >= warm_up: Prototype-Based Negative Mixing 付きの対照損失(プロトタイプベースのマスクと合成ネガティブ)

理論の全体像が把握できたので、次はPyTorchで核心部分を実装して理解を深めましょう。

Python実装: 簡易版CLPNM-AD

TabEncoder の実装

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis


class TabEncoder(nn.Module):
    """表形式データ用の1D-CNNエンコーダ"""

    def __init__(self, input_dim, hidden_dim, channel_input=16, sign_size=8):
        super().__init__()
        self.channel_input = channel_input
        self.sign_size = sign_size

        # 入力を2次元疑似画像に変換
        self.trans = nn.Linear(input_dim, channel_input * sign_size)

        # 1D-CNNエンコーダ
        self.encoder = nn.Sequential(
            nn.LeakyReLU(0.2),
            nn.Conv1d(channel_input, 32, kernel_size=1),
            nn.BatchNorm1d(32),
            nn.LeakyReLU(0.2),
            nn.Conv1d(32, hidden_dim, kernel_size=1),
            nn.BatchNorm1d(hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Conv1d(hidden_dim, hidden_dim, kernel_size=1),
            nn.BatchNorm1d(hidden_dim),
            nn.Flatten(),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim * sign_size, hidden_dim),
        )

        # Projection Head
        self.proj_head = nn.Sequential(
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, hidden_dim),
        )

    def forward(self, x):
        """
        x: (B, input_dim)
        returns: embedding (B, hidden_dim), feature (B, hidden_dim)
        """
        x = self.trans(x)
        x = x.reshape(x.shape[0], self.channel_input, self.sign_size)
        embedding = self.encoder(x)
        feature = self.proj_head(embedding)
        return embedding, feature

TabEncoderは、$D$ 次元の入力を $16 \times 8$ の2次元表現に線形変換し、1D-CNNで処理するアーキテクチャです。カーネルサイズ1の畳み込みはチャネル方向のみに作用し、sign_size方向の情報はそのまま保持されます。最終的にflattenして線形層で hidden_dim 次元に圧縮します。

Random Feature Corruption の実装

class FeatureCorruption:
    """ランダム特徴破壊によるデータ拡張"""

    def __init__(self, input_dim, corruption_k):
        self.input_dim = input_dim
        self.corruption_k = corruption_k

    def __call__(self, x, dataset):
        """
        x: (B, D) - 元のサンプル
        dataset: (N, D) - データセット全体(ランダムサンプル元)
        returns: (B, D) - 拡張サンプル
        """
        B = x.shape[0]
        # ランダムにサンプルを選択
        rand_idx = torch.randint(0, len(dataset), (B,))
        rand_samples = dataset[rand_idx]

        # 破壊マスクの生成
        corruption_mask = torch.zeros(B, self.input_dim, dtype=torch.bool)
        for i in range(B):
            idx = torch.randperm(self.input_dim)[:self.corruption_k]
            corruption_mask[i, idx] = True

        # 選ばれた次元をランダムサンプルで置換
        x_aug = torch.where(corruption_mask, rand_samples, x)
        return x_aug

この実装では、各サンプルに対して corruption_k 個の次元をランダムに選び、別のランダムサンプルの値で置換しています。これにより、チャネル間の相関構造が部分的に保持されつつ、多様な正例ペアが生成されます。

Sinkhorn-Knopp と Prototype-Based Negative Mixing

class CLPNM(nn.Module):
    """CLPNM-ADモデル"""

    def __init__(self, input_dim, hidden_dim, num_prototypes,
                 corruption_k, epsilon=0.05, temperature=0.07):
        super().__init__()
        self.encoder = TabEncoder(input_dim, hidden_dim)
        self.prototypes = nn.Linear(hidden_dim, num_prototypes, bias=False)
        self.num_prototypes = num_prototypes
        self.epsilon = epsilon
        self.temperature = temperature
        self.corruption = FeatureCorruption(input_dim, corruption_k)

    @torch.no_grad()
    def sinkhorn_knopp(self, out, sinkhorn_iters=5):
        """Sinkhorn-Knoppアルゴリズムによるソフト割り当て"""
        Q = torch.exp(out / self.epsilon).t()
        B = Q.shape[1]
        K = Q.shape[0]

        Q /= torch.sum(Q)

        for _ in range(sinkhorn_iters):
            Q /= torch.sum(Q, dim=1, keepdim=True)
            Q /= K
            Q /= torch.sum(Q, dim=0, keepdim=True)
            Q /= B

        Q *= B
        return Q.t()

    def compute_proto_distance(self):
        """プロトタイプ間のコサイン距離"""
        w = self.prototypes.weight.data.clone()
        w = F.normalize(w, dim=1, p=2)
        cos_sim = F.cosine_similarity(w.unsqueeze(1), w.unsqueeze(0), dim=2)
        return 1.0 - cos_sim

    def contrastive_loss_warmup(self, z1, z2):
        """ウォームアップ用SimCLRスタイル損失"""
        B = z1.shape[0]
        z1 = F.normalize(z1, dim=-1)
        z2 = F.normalize(z2, dim=-1)
        z = torch.cat([z1, z2], dim=0)

        sim = torch.mm(z, z.t()) / self.temperature
        mask = torch.eye(2 * B, device=z.device).bool()
        sim.masked_fill_(mask, -1e9)

        # 正例: z1[i]とz2[i]のペア
        pos_mask = torch.zeros(2 * B, 2 * B, device=z.device)
        for i in range(B):
            pos_mask[i, B + i] = 1
            pos_mask[B + i, i] = 1

        exp_sim = torch.exp(sim)
        pos_sum = (exp_sim * pos_mask).sum(dim=1)
        all_sum = exp_sim.sum(dim=1)
        loss = -torch.log(pos_sum / all_sum + 1e-8).mean()
        return loss

    def contrastive_loss_pnm(self, z1, z2, labels):
        """Prototype-Based Negative Mixing損失"""
        B = z1.shape[0]
        z1 = F.normalize(z1, dim=-1)
        z2 = F.normalize(z2, dim=-1)
        z = torch.cat([z1, z2], dim=0)
        all_labels = torch.cat([labels, labels], dim=0)

        # プロトタイプ間距離
        proto_dist = self.compute_proto_distance()

        # 混合重みの計算
        N = 2 * B
        alpha = torch.zeros(N, N, device=z.device)
        for i in range(N):
            ci = all_labels[i]
            for j in range(N):
                cj = all_labels[j]
                alpha[i, j] = proto_dist[ci, cj]

        # 同プロトタイプは最小値に設定
        same_proto = (all_labels.unsqueeze(0) == all_labels.unsqueeze(1))
        a_min = alpha[~same_proto].min() if (~same_proto).any() else 0
        alpha[same_proto] = a_min

        # Min-max正規化
        a_min_val = alpha.min()
        a_max_val = alpha.max()
        alpha = (alpha - a_min_val) / (a_max_val - a_min_val + 1e-12)

        # ハードネガティブ合成
        z_i = z.unsqueeze(1).expand(-1, N, -1)  # (N, N, D)
        z_j = z.unsqueeze(0).expand(N, -1, -1)
        alpha_3d = alpha.unsqueeze(-1)
        z_neg = z_j * (1 - alpha_3d) + z_i * alpha_3d
        z_neg = F.normalize(z_neg, dim=-1)

        # 合成ネガティブとの類似度
        sim_neg = torch.einsum('nd,nmd->nm', z, z_neg) / self.temperature

        # 通常の類似度
        sim_orig = torch.mm(z, z.t()) / self.temperature

        # マスク構築
        mask_pos = same_proto.float()
        mask_neg = (~same_proto).float()
        mask_self = (1 - torch.eye(N, device=z.device))

        # 類似度行列の統合
        sim_final = torch.exp(sim_orig) * mask_pos + torch.exp(sim_neg) * mask_neg
        sim_final = sim_final * mask_self

        # 正例の類似度
        pos = torch.exp(torch.sum(z1 * z2, dim=-1) / self.temperature)
        pos = torch.cat([pos, pos], dim=0)

        loss = (-torch.log(pos / (sim_final.sum(dim=-1) + 1e-8))).mean()
        return loss

    def get_features_and_labels(self, data):
        """特徴量とプロトタイプラベルの取得"""
        self.eval()
        with torch.no_grad():
            embeddings, features = self.encoder(data)
            proto_out = self.prototypes(features)
            q = self.sinkhorn_knopp(proto_out)
            proto_labels = torch.argmax(q, dim=1)
        return embeddings, proto_labels

この実装の核心は contrastive_loss_pnm メソッドです。プロトタイプ間距離に基づく混合重み $\alpha$ を計算し、z_neg = z_j * (1 - alpha) + z_i * alpha で合成ネガティブを生成しています。同じプロトタイプに属するサンプルは mask_pos で通常の類似度を使い、異なるプロトタイプのサンプルは mask_neg で合成ネガティブの類似度を使う、という二重マスク構造になっています。

学習ループと異常検知デモ

# 合成データの生成(正常 + 異常)
np.random.seed(42)
torch.manual_seed(42)

input_dim = 19  # Micius衛星のチャネル数に合わせる
hidden_dim = 32
num_prototypes = 4
corruption_k = 12
n_normal = 500
n_anomaly = 50

# 正常データ: 相関のある多変量データ
mean_normal = np.zeros(input_dim)
cov_normal = np.eye(input_dim) * 0.5
for i in range(input_dim):
    for j in range(input_dim):
        if i != j:
            cov_normal[i, j] = 0.3 * np.exp(-abs(i - j) / 3)

normal_data = np.random.multivariate_normal(mean_normal, cov_normal, n_normal)

# 異常データ: 相関構造が崩れたデータ
anomaly_data = np.random.multivariate_normal(
    mean_normal + 0.5, np.eye(input_dim) * 2.0, n_anomaly
)

# テンソルに変換
train_data = torch.FloatTensor(normal_data)
test_normal = torch.FloatTensor(normal_data[:100])
test_anomaly = torch.FloatTensor(anomaly_data)

# モデルと学習
model = CLPNM(input_dim, hidden_dim, num_prototypes, corruption_k)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001,
                            momentum=0.9, weight_decay=1e-4)

losses = []
warm_up = 10
num_epochs = 60
batch_size = 64

for epoch in range(num_epochs):
    model.train()
    # プロトタイプの重み正規化
    with torch.no_grad():
        w = model.prototypes.weight.data.clone()
        w = F.normalize(w, dim=1, p=2)
        model.prototypes.weight.copy_(w)

    # ミニバッチ
    perm = torch.randperm(len(train_data))[:batch_size]
    x = train_data[perm]

    # データ拡張
    x_aug = model.corruption(x, train_data)

    # エンコード
    emb1, feat1 = model.encoder(x)
    emb2, feat2 = model.encoder(x_aug)

    # プロトタイプ割り当て
    proto_out1 = model.prototypes(feat1)
    q1 = model.sinkhorn_knopp(proto_out1.detach())
    labels1 = torch.argmax(q1, dim=1)

    # SwAV損失
    proto_out2 = model.prototypes(feat2)
    swav_loss = -torch.mean(
        torch.sum(q1 * F.log_softmax(proto_out2 / 0.1, dim=1), dim=1)
    )

    # 対照損失
    if epoch < warm_up:
        cl_loss = model.contrastive_loss_warmup(feat1, feat2)
    else:
        cl_loss = model.contrastive_loss_pnm(feat1, feat2, labels1)

    loss = cl_loss + 0.4 * swav_loss

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    losses.append(loss.item())
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1:3d} | Loss: {loss.item():.4f} | "
              f"CL: {cl_loss.item():.4f} | SwAV: {swav_loss.item():.4f}")
# 損失推移の可視化
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(losses, linewidth=2)
ax.axvline(x=warm_up, color='r', linestyle='--', label=f'Warm-up end (epoch {warm_up})')
ax.set_xlabel('Epoch')
ax.set_ylabel('Total Loss')
ax.set_title('CLPNM-AD Training Loss')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

上のグラフから、以下の特徴が読み取れます。

  1. ウォームアップ期間(赤い破線の左側)では損失が急速に低下する — 標準的なSimCLRスタイルの学習が進み、基本的な表現が獲得されています
  2. ウォームアップ終了後に損失の挙動が変化する — Prototype-Based Negative Mixing に切り替わり、より洗練された学習が行われています。合成ネガティブを使うため、損失の変動パターンが変わることがあります
  3. 全体として損失は安定して低い値に収束する — プロトタイプベースのマスキングにより偽負例が排除され、学習が安定しています

異常検知のデモ

# 異常検知: マハラノビス距離による異常スコア計算
model.eval()

# 学習データの特徴量とプロトタイプラベルを取得
train_emb, train_proto_labels = model.get_features_and_labels(train_data)

# LDAフィット(マハラノビス距離の計算用)
gda = LinearDiscriminantAnalysis(solver='lsqr', shrinkage='auto',
                                  store_covariance=True)
gda.fit(train_emb.numpy(), train_proto_labels.numpy())

# テストデータの特徴量
with torch.no_grad():
    test_n_emb, _ = model.encoder(test_normal)
    test_a_emb, _ = model.encoder(test_anomaly)

# マハラノビス距離の計算
def mahalanobis_score(features, means, cov):
    features = features.numpy()
    N, D = features.shape
    K = means.shape[0]
    cov_inv = np.linalg.inv(cov + 1e-6 * np.eye(cov.shape[0]))
    features_exp = features.reshape(N, 1, D).repeat(K, axis=1)
    means_exp = means.reshape(1, K, D).repeat(N, axis=0)
    diff = features_exp - means_exp
    dist = np.sqrt(np.einsum('nkd,de,nke->nk', diff, cov_inv, diff))
    dist[np.isnan(dist)] = 1e12
    return dist.min(axis=1) if len(dist.shape) > 1 else dist

scores_normal = mahalanobis_score(test_n_emb, gda.means_, gda.covariance_)
scores_anomaly = mahalanobis_score(test_a_emb, gda.means_, gda.covariance_)

# 結果の可視化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# ヒストグラム
axes[0].hist(scores_normal, bins=30, alpha=0.7, label='Normal', color='steelblue')
axes[0].hist(scores_anomaly, bins=30, alpha=0.7, label='Anomaly', color='coral')
axes[0].set_xlabel('Mahalanobis Distance Score')
axes[0].set_ylabel('Count')
axes[0].set_title('Anomaly Score Distribution')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# ROC-like: スコアの分離
all_scores = np.concatenate([scores_normal, scores_anomaly])
all_labels = np.concatenate([np.zeros(len(scores_normal)),
                             np.ones(len(scores_anomaly))])
sorted_idx = np.argsort(all_scores)

axes[1].scatter(range(len(scores_normal)), np.sort(scores_normal),
                s=10, alpha=0.5, label='Normal', color='steelblue')
axes[1].scatter(range(len(scores_normal),
                      len(scores_normal) + len(scores_anomaly)),
                np.sort(scores_anomaly),
                s=10, alpha=0.5, label='Anomaly', color='coral')
threshold = np.percentile(all_scores, 100 - len(scores_anomaly) /
                          len(all_scores) * 100)
axes[1].axhline(y=threshold, color='r', linestyle='--',
                label=f'Threshold: {threshold:.2f}')
axes[1].set_xlabel('Sample Index (sorted)')
axes[1].set_ylabel('Anomaly Score')
axes[1].set_title('Anomaly Score Separation')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# F1スコアの計算
y_pred = (all_scores > threshold).astype(int)
from sklearn.metrics import precision_recall_fscore_support, roc_auc_score
precision, recall, f1, _ = precision_recall_fscore_support(
    all_labels.astype(int), y_pred, average='binary')
auc = roc_auc_score(all_labels, all_scores)
print(f"Precision: {precision:.4f} | Recall: {recall:.4f} | "
      f"F1: {f1:.4f} | AUC: {auc:.4f}")

この異常検知デモから、以下のことが確認できます。

  1. 正常サンプルと異常サンプルのスコア分布が明確に分離されている — 正常データは低いマハラノビス距離に集中し、異常データは高い値に分布しています。これは対照学習が正常データの構造を効果的に学習できていることを示します
  2. 閾値(赤い破線)によって正常と異常が適切に分類される — パーセンタイルベースの閾値設定が実用的であることが確認できます
  3. AUCスコアが高い値を示す — 合成データという理想的な条件下ではありますが、CLPNM-ADのパイプライン全体(対照学習 → プロトタイプ → マハラノビス距離)が正しく機能していることが検証できます

実験結果の解説

Guo et al. (2023) は、4つのデータセットでCLPNM-ADの有効性を検証しています。

データセット

データセット タイプ 次元数 説明
Thyroid UCI公開 6 甲状腺機能データ
Micius(墨子) 実衛星テレメトリ 19 量子科学実験衛星のテレメトリ
Satellite UCI/StatLog 36 衛星画像の土壌分類
Satimage 公開 36 衛星画像の異常検知

特にMiciusデータセットは、中国が2016年に打ち上げた世界初の量子通信衛星のテレメトリデータであり、実運用データでの検証という点で学術的・実用的に高い価値があります。

性能比較

全4データセットでCLPNM-ADがF1スコアで最高性能を達成しています。

  • Thyroid: 次善手法(DAGMM)に対してF1で+11.5%の改善
  • Micius: 次善手法(LOF)に対してF1で+11.0%の改善
  • Satellite: 次善手法(DAGMM)に対してF1で+3.5%の改善
  • Satimage: 次善手法(GOAD)に対してF1で+0.8%の改善

特にMiciusデータセットでの大幅な改善は注目に値します。DAGMMはこのデータセットで低い性能を示しましたが、これは正常と異常の境界が曖昧なテレメトリデータでは、再構成誤差ベースの手法が限界を持つためです。一方、CLPNM-ADはプロトタイプベースのセマンティックな分離により、微妙な相関異常も捉えることができています。

ベースライン手法の限界

  • DAGMM: 再構成誤差 + ガウス混合モデル。多次元の相関構造を十分に捉えきれない
  • Deep SVDD: 超球面への写像。意味的なサブグループの情報を失う
  • GOAD: アフィン変換ベースの拡張。テレメトリデータの相関構造を考慮していない
  • NeuTraL AD: ニューラルネットワークによる拡張。プロトタイプの概念がなく偽負例問題を解消できない

CLPNM-ADはプロトタイプによる意味構造の保持とハードネガティブ合成の組み合わせにより、これらの限界を克服しています。

アブレーション実験

論文のアブレーション実験では、以下が確認されています。

ハードネガティブ合成戦略の効果: 合成なし(ランダム負例のみ)と比較して、プロトタイプベースの合成により全データセットで性能が向上。偽負例の排除だけでなく、合成による多様な負例の効果が確認されています。

プロトタイプ数の影響: Micius、Satellite、Satimageではプロトタイプ数に対して頑健ですが、Thyroidでは4が最適。Thyroidはクラスタ構造が比較的明確で、4つのプロトタイプがちょうどデータの意味構造に対応しています。

まとめ

本記事では、Sensors 2023で発表された CLPNM-AD(Guo et al., 2023)について解説しました。CLPNM-ADは、衛星テレメトリに対照学習を直接適用した先駆的な研究であり、以下の技術的貢献を持ちます。

  • Random Feature Corruption: SCARFに触発された表形式データ向けの拡張戦略。ランダムに特徴の一部を別サンプルの値で置換し、チャネル間相関を保持した正例ペアを生成します

  • Prototype-Based Negative Mixing: 対照学習の偽負例問題を解決する核心的貢献。Sinkhorn-Knoppアルゴリズムでプロトタイプ割り当てを行い、プロトタイプ間距離に基づいてハードネガティブを合成します。同プロトタイプ内のサンプルは負例から除外され、意味構造が保持されます

  • マハラノビス距離による異常スコア: 学習済み特徴量とプロトタイプラベルからLDAをフィットし、共分散構造を考慮した異常スコアを計算します。ユークリッド距離では見逃す相関異常を正確に検出できます

  • 実衛星テレメトリでの検証: 量子科学実験衛星「墨子」のテレメトリデータを含む4データセットで全てF1 SOTAを達成。特にMiciusで+11.0%の大幅改善

今後の展望

CLPNM-ADは、対照学習を衛星テレメトリに適用する方向性を切り開きました。今後は以下のような発展が期待されます。

時系列構造の明示的な活用: 現在のCLPNM-ADは各時刻のスナップショットを独立に処理していますが、時間方向の依存性(例えば「過去30分のトレンド」)を組み込むことで、より早期の異常検知が可能になるでしょう。

マルチモーダルアライメントとの統合: TRACEのようなテキストと時系列のマルチモーダル学習と組み合わせることで、「太陽電池の劣化パターン」のようなテキストクエリでテレメトリ異常を検索する応用が考えられます。

次のステップとして、以下の記事も参考にしてください。

画像なし
対照学習(Contrastive Learning)の理論
InfoNCE損失の数学的導出と、正例・負例の設計方法を解説しています。CLPNM-ADの対照損失の基礎理論です。
画像なし
対照学習による事前学習
SimCLR・MoCo・BYOL・CLIPなど、対照学習フレームワークの設計思想を比較しています。CLPNM-ADのウォームアップ段階はSimCLRに基づいています。
画像なし
TRACE — 時系列データのマルチモーダル埋め込みと検索
時系列とテキストの対照学習を扱うTRACEは、CLPNM-ADの発展方向の一つとして参考になります。
画像なし
ConFGD — 周波数グラフ発見とLLM言語誘導で時系列ドメイン適応を革新する
対照学習を時系列のドメイン適応に応用したConFGDの理論を解説しています。