異常検知(Anomaly Detection)は、正常なパターンから逸脱したデータを検出するタスクです。従来の統計的手法やIsolation Forestなどの古典的な機械学習手法に加え、近年は深層学習を活用した異常検知手法が大きな成果を上げています。
深層学習による異常検知は、画像の微細な欠陥検出、複雑な時系列データからの異常パターン抽出、高次元データにおける非線形な正常パターンの学習など、従来手法では困難だった問題に対して優れた性能を発揮します。
本記事では、深層学習による異常検知手法を体系的に分類し、各手法の理論的な基盤と特性を解説します。
本記事の内容
- 異常検知の問題設定の分類(教師あり・半教師あり・教師なし)
- 再構成ベース手法の統一的理解(AE・VAE・DAGMM)
- GANベース手法(AnoGAN・f-AnoGAN)の理論
- 自己教師ありベース手法(幾何変換・対照学習)
- Deep SVDDの定式化と理論
- 知識蒸留ベース手法の概要
- 各手法の比較と選択指針
- 代表手法のPythonミニ実装と比較
前提知識
この記事を読む前に、以下の記事を読んでおくと理解が深まります。
異常検知の問題設定の分類
教師あり異常検知
正常・異常の両方のラベル付きデータが利用可能な場合です。
$$ \mathcal{D} = \{(\bm{x}_i, y_i)\}_{i=1}^{N}, \quad y_i \in \{0, 1\} $$
ここで $y_i = 0$ は正常、$y_i = 1$ は異常を表します。通常の二値分類として定式化できますが、異常データが極端に少ないクラス不均衡問題が大きな課題です。
半教師あり異常検知
正常データのみのラベルが利用可能な場合です。これが最も一般的な設定です。
$$ \mathcal{D}_{\text{train}} = \{\bm{x}_i\}_{i=1}^{N}, \quad \text{全て正常データ} $$
学習時は正常データの特徴を学び、テスト時に正常パターンから逸脱するデータを異常と判定します。AEやVAEによる異常検知はこの設定に該当します。
教師なし異常検知
ラベルが全くなく、データの中に異常が混在している可能性がある場合です。
$$ \mathcal{D} = \{\bm{x}_i\}_{i=1}^{N}, \quad \text{正常・異常が混在} $$
異常は少数であるという仮定のもと、多数派から逸脱するデータを異常と見なします。Isolation Forestやクラスタリングベースの手法がこの設定で用いられます。
再構成ベース手法の統一的理解
再構成ベースの手法は、データの正常パターンを「再構成する能力」を学習し、再構成の失敗度合いを異常スコアとして使います。
統一的な枠組み
再構成ベースの異常スコアは、一般に次の形で書けます。
$$ s(\bm{x}) = d(\bm{x}, g_\theta(\bm{x})) $$
ここで $g_\theta$ は何らかの再構成関数、$d$ は距離関数です。手法によって $g_\theta$ と $d$ の定義が異なります。
| 手法 | 再構成関数 $g_\theta$ | 距離関数 $d$ |
|---|---|---|
| AE | $D_\theta(E_\phi(\bm{x}))$ | $\|\bm{x} – g_\theta(\bm{x})\|^2$ |
| VAE | $\mathbb{E}[D_\theta(\bm{z})]$, $\bm{z} \sim q_\phi(\bm{z}|\bm{x})$ | $-\text{ELBO}(\bm{x})$ |
| DAGMM | $D_\theta(E_\phi(\bm{x}))$ + GMM | エネルギースコア |
AE(オートエンコーダ)
最も基本的な再構成ベース手法です。
$$ s_{\text{AE}}(\bm{x}) = \|\bm{x} – D_\theta(E_\phi(\bm{x}))\|^2 $$
正常データで学習したAEは、正常パターンの再構成は得意ですが、異常パターンの再構成は苦手です。
VAE(変分オートエンコーダ)
VAEは確率的な生成モデルとして、再構成確率やELBOを異常スコアに使います。
$$ s_{\text{VAE}}(\bm{x}) = -\text{ELBO}(\bm{x}) = -\mathbb{E}_{q_\phi(\bm{z}|\bm{x})}\left[\log p_\theta(\bm{x}|\bm{z})\right] + D_{\text{KL}}(q_\phi(\bm{z}|\bm{x}) \| p(\bm{z})) $$
AEとの違いは、潜在空間の正則化により不確実性の定量化が可能な点です。
DAGMM(Deep Autoencoding Gaussian Mixture Model)
DAGMMは、AEの再構成とGMM(Gaussian Mixture Model)を組み合わせた手法です。
アーキテクチャ
- 圧縮ネットワーク: AEで入力を潜在表現 $\bm{z}_c$ に変換し、再構成誤差の特徴量 $\bm{z}_r$ を計算する
- 推定ネットワーク: 結合特徴量 $\bm{z} = [\bm{z}_c; \bm{z}_r]$ からGMMの混合係数 $\hat{\gamma}$ を推定する
数学的定式化
再構成誤差の特徴量は、相対ユークリッド距離とコサイン類似度で構成されます。
$$ \bm{z}_r = \left[\frac{\|\bm{x} – \hat{\bm{x}}\|_2}{\|\bm{x}\|_2}, \quad 1 – \frac{\bm{x} \cdot \hat{\bm{x}}}{\|\bm{x}\|_2 \|\hat{\bm{x}}\|_2}\right] $$
結合特徴量 $\bm{z} = [\bm{z}_c; \bm{z}_r]$ に対して、推定ネットワークが混合係数を出力します。
$$ \hat{\bm{\gamma}} = \text{softmax}(f_{\text{est}}(\bm{z})) \in \mathbb{R}^K $$
ここで $K$ はGMMの混合数です。
GMMのパラメータ(混合比率 $\hat{\phi}_k$、平均 $\hat{\bm{\mu}}_k$、共分散 $\hat{\bm{\Sigma}}_k$)は、ミニバッチの推定結果から計算します。
$$ \begin{align} \hat{\phi}_k &= \frac{1}{N}\sum_{i=1}^{N}\hat{\gamma}_{ik} \\ \hat{\bm{\mu}}_k &= \frac{\sum_{i=1}^{N}\hat{\gamma}_{ik}\bm{z}_i}{\sum_{i=1}^{N}\hat{\gamma}_{ik}} \\ \hat{\bm{\Sigma}}_k &= \frac{\sum_{i=1}^{N}\hat{\gamma}_{ik}(\bm{z}_i – \hat{\bm{\mu}}_k)(\bm{z}_i – \hat{\bm{\mu}}_k)^T}{\sum_{i=1}^{N}\hat{\gamma}_{ik}} \end{align} $$
エネルギースコア
異常スコアはGMMのエネルギー(負の対数尤度)として定義されます。
$$ E(\bm{z}) = -\log\left(\sum_{k=1}^{K}\hat{\phi}_k \frac{1}{(2\pi)^{d/2}|\hat{\bm{\Sigma}}_k|^{1/2}} \exp\left(-\frac{1}{2}(\bm{z} – \hat{\bm{\mu}}_k)^T\hat{\bm{\Sigma}}_k^{-1}(\bm{z} – \hat{\bm{\mu}}_k)\right)\right) $$
エネルギーが高い(尤度が低い)データほど異常と判定されます。
DAGMMの損失関数
DAGMMの損失関数は3つの項から構成されます。
$$ \mathcal{L} = \mathcal{L}_{\text{recon}} + \lambda_1 \mathcal{L}_{\text{energy}} + \lambda_2 \mathcal{L}_{\text{diag}} $$
- $\mathcal{L}_{\text{recon}} = \frac{1}{N}\sum_{i=1}^{N}\|\bm{x}_i – \hat{\bm{x}}_i\|^2$: 再構成損失
- $\mathcal{L}_{\text{energy}} = \frac{1}{N}\sum_{i=1}^{N}E(\bm{z}_i)$: エネルギー損失
- $\mathcal{L}_{\text{diag}} = \sum_{k=1}^{K}\frac{1}{\text{diag}(\hat{\bm{\Sigma}}_k)}$: 特異性防止の正則化
GANベース手法
AnoGAN
AnoGAN(Anomaly GAN, Schlegl et al., 2017)は、GAN(Generative Adversarial Network)を異常検知に応用した先駆的な手法です。
基本的なアイデア
- 学習フェーズ: 正常データのみでGANを学習する。生成器 $G$ は正常データの分布を学習する
- 推論フェーズ: 異常検知対象のデータ $\bm{x}$ に対して、$G(\bm{z}) \approx \bm{x}$ となる潜在変数 $\bm{z}^*$ を最適化で求める
- 異常判定: $G(\bm{z}^*)$ と $\bm{x}$ の差異を異常スコアとする
正常データは生成器の出力空間に含まれるため、良い $\bm{z}^*$ が見つかり再構成が良好になります。異常データは生成器の出力空間に含まれないため、最良の $\bm{z}^*$ でも再構成精度が低くなります。
潜在空間への逆写像の最適化
AnoGANの異常スコアは、以下の最適化問題の目的関数値で定義されます。
$$ \bm{z}^* = \arg\min_{\bm{z}} \left[\|\bm{x} – G(\bm{z})\|_1 + \lambda \|f(\bm{x}) – f(G(\bm{z}))\|_1\right] $$
ここで、
- 第1項 $\|\bm{x} – G(\bm{z})\|_1$: 残差損失(Residual Loss) — 入力と生成画像のピクセルレベルの差
- 第2項 $\|f(\bm{x}) – f(G(\bm{z}))\|_1$: 識別損失(Discrimination Loss) — 識別器 $D$ の中間層の特徴量 $f$ での差
$\lambda$ は2つの損失のバランスを制御するハイパーパラメータです。
残差損失の意味
残差損失は、生成画像がピクセルレベルで入力にどれだけ近いかを測ります。
$$ \mathcal{L}_R(\bm{z}) = \|\bm{x} – G(\bm{z})\|_1 = \sum_{j=1}^{d} |x_j – G(\bm{z})_j| $$
L1ノルムを使うことで、L2ノルムよりもスパースな差(局所的な異常)に対して感度が高くなります。
識別損失の意味
識別損失は、識別器が学習した特徴空間での差を測ります。識別器の中間層 $f$ は、データの高次の特徴(テクスチャ、構造など)を捉えています。
$$ \mathcal{L}_D(\bm{z}) = \|f(\bm{x}) – f(G(\bm{z}))\|_1 $$
識別損失により、ピクセルレベルでは似ていても構造的に異なるデータを検出できます。
最適化手順
$\bm{z}^*$ はランダム初期化から勾配降下法で求めます。
$$ \bm{z}^{(t+1)} = \bm{z}^{(t)} – \eta \nabla_{\bm{z}}\left[\|\bm{x} – G(\bm{z}^{(t)})\|_1 + \lambda\|f(\bm{x}) – f(G(\bm{z}^{(t)}))\|_1\right] $$
この最適化を $T$ ステップ実行し、最終的な異常スコアは
$$ s(\bm{x}) = (1 – \lambda)\|\bm{x} – G(\bm{z}^*)\|_1 + \lambda\|f(\bm{x}) – f(G(\bm{z}^*))\|_1 $$
f-AnoGAN
f-AnoGAN(Schlegl et al., 2019)はAnoGANの計算効率を大幅に改善した手法です。
AnoGANの問題点
AnoGANでは、各テストデータに対して $\bm{z}^*$ を求める最適化(通常数百ステップ)が必要です。これは推論時に非常に時間がかかります。
f-AnoGANの解決策
f-AnoGANは、入力 $\bm{x}$ から直接潜在変数 $\bm{z}$ を推定するエンコーダネットワーク $E$ を追加で学習します。
$$ \bm{z} = E(\bm{x}) $$
エンコーダの学習は、GANの学習後に以下の損失で行います。
$$ \mathcal{L}_E = \frac{1}{N}\sum_{i=1}^{N}\left[\|\bm{x}_i – G(E(\bm{x}_i))\|_2^2 + \kappa\|f(\bm{x}_i) – f(G(E(\bm{x}_i)))\|_2^2\right] $$
推論時は $E$ を一度適用するだけで済むため、AnoGANの数百倍高速になります。
異常スコア
f-AnoGANの異常スコアは以下の通りです。
$$ s(\bm{x}) = \|\bm{x} – G(E(\bm{x}))\|_2^2 + \kappa\|f(\bm{x}) – f(G(E(\bm{x})))\|_2^2 $$
Deep SVDD
Support Vector Data Description (SVDD) の復習
SVDD(Tax & Duin, 2004)は、データを包含する最小のハイパースフィア(超球)を見つけることで異常検知を行う手法です。
中心 $\bm{c}$ と半径 $R$ のハイパースフィアを見つける問題は、
$$ \min_{R, \bm{c}} R^2 + \frac{1}{\nu N}\sum_{i=1}^{N}\max(0, \|\bm{x}_i – \bm{c}\|^2 – R^2) $$
ここで $\nu \in (0, 1]$ は異常データの割合の上限を制御するパラメータです。
Deep SVDDの定式化
Deep SVDD(Ruff et al., 2018)は、SVDDをニューラルネットワークの特徴空間に拡張した手法です。
ニューラルネットワーク $\phi(\cdot; \bm{W})$ でデータを特徴空間に写像し、その空間でハイパースフィアの最小化を行います。
ソフトバウンダリ Deep SVDD
$$ \min_{\bm{W}, R} R^2 + \frac{1}{\nu N}\sum_{i=1}^{N}\max\left(0, \|\phi(\bm{x}_i; \bm{W}) – \bm{c}\|^2 – R^2\right) + \frac{\lambda}{2}\sum_{\ell}\|\bm{W}^{(\ell)}\|_F^2 $$
第3項はネットワークの重み正則化です。
ワンクラス Deep SVDD
より単純な定式化として、半径 $R$ を最適化変数から除いた「ワンクラス」版があります。
$$ \min_{\bm{W}} \frac{1}{N}\sum_{i=1}^{N}\|\phi(\bm{x}_i; \bm{W}) – \bm{c}\|^2 + \frac{\lambda}{2}\sum_{\ell}\|\bm{W}^{(\ell)}\|_F^2 $$
この目的関数は、全ての学習データの特徴表現を中心 $\bm{c}$ に近づけることを意味します。正常データは $\bm{c}$ の近くに写像されるよう学習されるため、$\bm{c}$ から遠い特徴表現を持つデータは異常と判定されます。
異常スコア
$$ s(\bm{x}) = \|\phi(\bm{x}; \bm{W}) – \bm{c}\|^2 $$
中心 $\bm{c}$ の設定
中心 $\bm{c}$ は事前に固定します。初期化済みのネットワークで全学習データを順伝播させ、その特徴表現の平均を $\bm{c}$ とします。
$$ \bm{c} = \frac{1}{N}\sum_{i=1}^{N}\phi(\bm{x}_i; \bm{W}_{\text{init}}) $$
学習中は $\bm{c}$ を更新しません。これにより、ネットワークが全てを1点に写像する「崩壊解」を避けられます。
バイアス項の除去
Deep SVDDでは、ネットワークのバイアス項を全て0に固定します。バイアス項があると、全ての入力を定数に写像する自明な解が存在してしまうためです。
自己教師ありベース手法
幾何変換ベース(GEOM)
Golan & El-Yaniv(2018)が提案した手法です。
基本的なアイデア
- データに複数の幾何変換(回転、反転、移動など)を適用する
- 「どの変換が適用されたか」を識別する分類器を学習する
- 正常データでは変換の識別が容易だが、異常データでは困難になることを利用する
数学的定式化
$T$ 個の幾何変換 $\{g_1, g_2, \dots, g_T\}$ を定義します。分類器 $f_\theta$ は、変換後のデータ $g_t(\bm{x})$ からどの変換 $t$ が適用されたかを識別します。
$$ \mathcal{L}_{\text{GEOM}} = -\frac{1}{NT}\sum_{i=1}^{N}\sum_{t=1}^{T}\log p_\theta(t | g_t(\bm{x}_i)) $$
異常スコア
テスト時の異常スコアは、全変換に対する分類確率の平均的な低さで定義されます。
$$ s(\bm{x}) = -\frac{1}{T}\sum_{t=1}^{T}\log p_\theta(t | g_t(\bm{x})) $$
正常データでは各変換を正しく識別できるため $p_\theta(t|g_t(\bm{x}))$ が高くなり、スコアは低くなります。異常データでは識別が困難になり、スコアが高くなります。
対照学習ベース
SimCLRなどの対照学習フレームワークを異常検知に適用する手法です。
CSI(Contrasting Shifted Instances, Tack et al., 2020)
- 正常データに対するデータ拡張で正例ペアを生成する
- 対照学習で特徴抽出器を学習する
- 学習した特徴空間での距離を異常スコアとして使う
対照学習の損失関数(NT-Xent loss)は以下の通りです。
$$ \mathcal{L}_{\text{con}} = -\frac{1}{2N}\sum_{i=1}^{N}\left[\log\frac{\exp(\text{sim}(\bm{z}_i, \bm{z}_i’)/\tau)}{\sum_{j=1}^{2N}\mathbb{1}_{[j \neq i]}\exp(\text{sim}(\bm{z}_i, \bm{z}_j)/\tau)}\right] $$
ここで $\text{sim}(\bm{u}, \bm{v}) = \bm{u}^T\bm{v}/(\|\bm{u}\|\|\bm{v}\|)$ はコサイン類似度、$\tau$ は温度パラメータ、$\bm{z}_i, \bm{z}_i’$ はデータ $\bm{x}_i$ の2つの異なるデータ拡張から得られた特徴表現です。
知識蒸留ベース手法
基本原理
知識蒸留(Knowledge Distillation)ベースの異常検知は、教師ネットワーク(Teacher)と生徒ネットワーク(Student)の出力の不一致を異常スコアとして利用します。
- 教師ネットワーク: 事前学習済みのネットワーク(例: ImageNetで学習したResNet)
- 生徒ネットワーク: 正常データのみで教師の出力を模倣するよう学習する
$$ \mathcal{L}_{\text{KD}} = \frac{1}{N}\sum_{i=1}^{N}\|f_T(\bm{x}_i) – f_S(\bm{x}_i)\|^2 $$
ここで $f_T, f_S$ はそれぞれ教師・生徒の特徴抽出器です。
異常スコア
$$ s(\bm{x}) = \|f_T(\bm{x}) – f_S(\bm{x})\|^2 $$
正常データでは教師と生徒の出力が一致するため、スコアが低くなります。異常データでは生徒が未学習のため出力が乖離し、スコアが高くなります。
代表的な手法: STPM
STPM(Student-Teacher Feature Pyramid Matching, Wang et al., 2021)は、特徴ピラミッドの複数スケールでの一致度を異常スコアとします。
$$ s(\bm{x}) = \sum_{\ell \in \mathcal{L}} \|f_T^{(\ell)}(\bm{x}) – f_S^{(\ell)}(\bm{x})\|^2 $$
ここで $\ell$ は特徴ピラミッドのレベルです。マルチスケールの特徴を使うことで、大きな構造的異常から微細な異常まで検出可能になります。
各手法の比較
性能・計算量の比較
| 手法 | 原理 | 推論速度 | 訓練コスト | 画像 | 表形式 | 時系列 |
|---|---|---|---|---|---|---|
| AE | 再構成 | 高速 | 低 | 可 | 可 | 可 |
| VAE | 再構成+確率 | やや高速 | 中 | 可 | 可 | 可 |
| DAGMM | 再構成+密度 | 高速 | 中 | 可 | 可 | 可 |
| AnoGAN | GAN逆写像 | 非常に遅い | 高 | 可 | 困難 | 困難 |
| f-AnoGAN | GAN+エンコーダ | 高速 | 高 | 可 | 困難 | 困難 |
| Deep SVDD | ハイパースフィア | 高速 | 低 | 可 | 可 | 可 |
| 幾何変換 | 自己教師あり | 中程度 | 中 | 可 | 困難 | 困難 |
| 対照学習 | 自己教師あり | 高速 | 高 | 可 | 困難 | 困難 |
| 知識蒸留 | 教師-生徒 | 高速 | 中 | 可 | 困難 | 困難 |
選択指針
表形式データ: AE/VAE/DAGMM、Deep SVDDが適している。特にDAGMMは密度推定を組み合わせるため、複雑な正常データ分布を捉えやすい
画像データ: 知識蒸留ベース(STPM等)が画像異常検知のベンチマーク(MVTecAD)で高い性能を示している。計算資源が限られる場合はCAE(畳み込みAE)が実用的
時系列データ: LSTMオートエンコーダやTransformerベースのAEが適している
不確実性の定量化が必要: VAEが最適。確率的な異常スコアを提供する
解釈性が重要: AEの特徴量別再構成誤差やDeep SVDDの特徴空間可視化が有用
Pythonでのミニ実装と比較
3つの代表手法の実装(AE・Deep SVDD・AnoGAN的手法)
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, roc_curve
# ===== 1. MVTecAD風の合成データ =====
np.random.seed(42)
def generate_mvtec_like_data(n_normal=800, n_anomaly=80, n_features=20):
"""
製造検査データを模した合成データ
正常データ: 低次元多様体上のデータ(実効的な自由度が低い)
異常データ: 多様体から逸脱したデータ
"""
# 正常データ: 5次元の潜在因子から20次元に写像
latent_dim = 5
W = np.random.randn(latent_dim, n_features) * 0.5
z_normal = np.random.randn(n_normal, latent_dim)
X_normal = z_normal @ W + np.random.randn(n_normal, n_features) * 0.1
# 非線形性を追加
X_normal += 0.3 * np.sin(X_normal)
# 異常データの生成(3種類)
n_each = n_anomaly // 3
# タイプ1: 特徴量のシフト
X_anom1 = np.random.randn(n_each, latent_dim) @ W + np.random.randn(n_each, n_features) * 0.1
X_anom1 += 0.3 * np.sin(X_anom1)
for i in range(n_each):
shift_idx = np.random.choice(n_features, 3, replace=False)
X_anom1[i, shift_idx] += np.random.choice([-1, 1], 3) * np.random.uniform(2, 4, 3)
# タイプ2: 分布外のデータ
X_anom2 = np.random.randn(n_each, n_features) * 1.5
# タイプ3: 相関構造の破壊
n_rest = n_anomaly - 2 * n_each
X_anom3 = np.random.randn(n_rest, latent_dim) @ W
X_anom3 += np.random.randn(n_rest, n_features) * 0.8 # ノイズを増大
X_anomaly = np.vstack([X_anom1, X_anom2, X_anom3])
X = np.vstack([X_normal, X_anomaly])
y = np.array([0] * n_normal + [1] * n_anomaly)
return X, y
X, y_true = generate_mvtec_like_data()
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 学習/テストの分割
n_train_normal = 600
X_train = X_scaled[y_true == 0][:n_train_normal]
X_test = X_scaled
y_test = y_true
input_dim = X_scaled.shape[1]
print(f"データ形状: {X.shape}, 正常: {np.sum(y_true==0)}, 異常: {np.sum(y_true==1)}")
# ===== 2. 手法1: オートエンコーダ =====
class AnomalyAE:
"""異常検知用オートエンコーダ"""
def __init__(self, input_dim, hidden_dim=64, latent_dim=8, lr=0.005):
self.lr = lr
dims = [input_dim, hidden_dim, hidden_dim // 2, latent_dim,
hidden_dim // 2, hidden_dim, input_dim]
self.weights = []
self.biases = []
for i in range(len(dims) - 1):
scale = np.sqrt(2.0 / (dims[i] + dims[i+1]))
self.weights.append(np.random.randn(dims[i], dims[i+1]) * scale)
self.biases.append(np.zeros(dims[i+1]))
self.n_layers = len(self.weights)
def forward(self, X):
self.acts = [X]
self.pres = []
h = X
for i in range(self.n_layers):
z = h @ self.weights[i] + self.biases[i]
self.pres.append(z)
if i < self.n_layers - 1:
h = np.maximum(0, z)
else:
h = z
self.acts.append(h)
return h
def backward_and_update(self, X, X_hat, batch_size):
delta = 2 * (X_hat - X) / batch_size
for i in range(self.n_layers - 1, -1, -1):
gW = self.acts[i].T @ delta
gb = np.sum(delta, axis=0)
self.weights[i] -= self.lr * gW
self.biases[i] -= self.lr * gb
if i > 0:
delta = (delta @ self.weights[i].T) * (self.pres[i-1] > 0).astype(float)
def fit(self, X_train, epochs=300, batch_size=64):
n = X_train.shape[0]
losses = []
for epoch in range(epochs):
idx = np.random.permutation(n)
eloss = 0
for s in range(0, n, batch_size):
e = min(s + batch_size, n)
Xb = X_train[idx[s:e]]
Xh = self.forward(Xb)
loss = np.mean((Xb - Xh) ** 2)
eloss += loss * (e - s)
self.backward_and_update(Xb, Xh, e - s)
losses.append(eloss / n)
return losses
def anomaly_score(self, X):
X_hat = self.forward(X)
return np.mean((X - X_hat) ** 2, axis=1)
# AEの学習
print("AEの学習...")
ae = AnomalyAE(input_dim, hidden_dim=64, latent_dim=8, lr=0.005)
ae_losses = ae.fit(X_train, epochs=300)
ae_scores = ae.anomaly_score(X_test)
print(f"AE AUC: {roc_auc_score(y_test, ae_scores):.4f}")
# ===== 3. 手法2: Deep SVDD =====
class DeepSVDD:
"""Deep Support Vector Data Description"""
def __init__(self, input_dim, hidden_dim=64, feature_dim=16, lr=0.005, weight_decay=1e-4):
self.lr = lr
self.weight_decay = weight_decay
self.feature_dim = feature_dim
# ネットワーク(バイアスなし! — Deep SVDDの重要な制約)
dims = [input_dim, hidden_dim, hidden_dim // 2, feature_dim]
self.weights = []
for i in range(len(dims) - 1):
scale = np.sqrt(2.0 / (dims[i] + dims[i+1]))
self.weights.append(np.random.randn(dims[i], dims[i+1]) * scale)
self.n_layers = len(self.weights)
self.center = None
def forward(self, X):
"""順伝播(バイアスなし)"""
self.acts = [X]
self.pres = []
h = X
for i in range(self.n_layers):
z = h @ self.weights[i] # バイアスなし
self.pres.append(z)
if i < self.n_layers - 1:
h = np.maximum(0, z) # ReLU
else:
h = z # 出力層は線形
self.acts.append(h)
return h
def init_center(self, X_train, batch_size=256):
"""中心 c の初期化: 全データの特徴表現の平均"""
features = []
for s in range(0, len(X_train), batch_size):
e = min(s + batch_size, len(X_train))
feat = self.forward(X_train[s:e])
features.append(feat)
features = np.vstack(features)
self.center = np.mean(features, axis=0)
# 中心が原点に近すぎる場合のガード
self.center[(np.abs(self.center) < 0.01)] = 0.01
print(f"中心ベクトルのノルム: {np.linalg.norm(self.center):.4f}")
def fit(self, X_train, epochs=300, batch_size=64):
"""ワンクラスDeep SVDDの学習"""
self.init_center(X_train)
n = X_train.shape[0]
losses = []
for epoch in range(epochs):
idx = np.random.permutation(n)
eloss = 0
for s in range(0, n, batch_size):
e = min(s + batch_size, n)
Xb = X_train[idx[s:e]]
m = e - s
# 順伝播
features = self.forward(Xb)
# 損失: ||φ(x) - c||² + λ||W||²
diff = features - self.center
dist_sq = np.sum(diff ** 2, axis=1)
loss = np.mean(dist_sq)
eloss += loss * m
# 逆伝播
delta = 2 * diff / m # d(||φ-c||²)/dφ
for i in range(self.n_layers - 1, -1, -1):
gW = self.acts[i].T @ delta
# 重み減衰の追加
gW += self.weight_decay * self.weights[i]
self.weights[i] -= self.lr * gW
if i > 0:
delta = (delta @ self.weights[i].T) * (self.pres[i-1] > 0).astype(float)
losses.append(eloss / n)
return losses
def anomaly_score(self, X):
"""異常スコア: ||φ(x) - c||²"""
features = self.forward(X)
return np.sum((features - self.center) ** 2, axis=1)
# Deep SVDDの学習
print("\nDeep SVDDの学習...")
svdd = DeepSVDD(input_dim, hidden_dim=64, feature_dim=16, lr=0.005, weight_decay=1e-5)
svdd_losses = svdd.fit(X_train, epochs=300)
svdd_scores = svdd.anomaly_score(X_test)
print(f"Deep SVDD AUC: {roc_auc_score(y_test, svdd_scores):.4f}")
# ===== 4. 手法3: AnoGAN的手法(簡易版) =====
class SimpleGenerator:
"""簡易生成器 G: z → x"""
def __init__(self, latent_dim, hidden_dim, output_dim, lr=0.001):
self.lr = lr
dims = [latent_dim, hidden_dim, hidden_dim, output_dim]
self.weights = []
self.biases = []
for i in range(len(dims) - 1):
scale = np.sqrt(2.0 / (dims[i] + dims[i+1]))
self.weights.append(np.random.randn(dims[i], dims[i+1]) * scale)
self.biases.append(np.zeros(dims[i+1]))
self.n_layers = len(self.weights)
def forward(self, z):
self.acts = [z]
self.pres = []
h = z
for i in range(self.n_layers):
pre = h @ self.weights[i] + self.biases[i]
self.pres.append(pre)
if i < self.n_layers - 1:
h = np.maximum(0.01 * pre, pre) # LeakyReLU
else:
h = pre # 線形出力
self.acts.append(h)
return h
def backward(self, grad_output):
delta = grad_output
for i in range(self.n_layers - 1, -1, -1):
gW = self.acts[i].T @ delta / delta.shape[0]
gb = np.mean(delta, axis=0)
self.weights[i] -= self.lr * gW
self.biases[i] -= self.lr * gb
if i > 0:
leaky_mask = np.where(self.pres[i-1] > 0, 1.0, 0.01)
delta = (delta @ self.weights[i].T) * leaky_mask
class SimpleDiscriminator:
"""簡易識別器 D: x → [0, 1]"""
def __init__(self, input_dim, hidden_dim, lr=0.001):
self.lr = lr
dims = [input_dim, hidden_dim, hidden_dim // 2, 1]
self.weights = []
self.biases = []
for i in range(len(dims) - 1):
scale = np.sqrt(2.0 / (dims[i] + dims[i+1]))
self.weights.append(np.random.randn(dims[i], dims[i+1]) * scale)
self.biases.append(np.zeros(dims[i+1]))
self.n_layers = len(self.weights)
def forward(self, x):
self.acts = [x]
self.pres = []
h = x
for i in range(self.n_layers):
pre = h @ self.weights[i] + self.biases[i]
self.pres.append(pre)
if i < self.n_layers - 1:
h = np.maximum(0.01 * pre, pre) # LeakyReLU
self.feature = h # 中間特徴量を保存(識別損失用)
else:
h = 1.0 / (1.0 + np.exp(-np.clip(pre, -500, 500))) # Sigmoid
self.acts.append(h)
return h
def get_features(self, x):
"""中間層の特徴量を返す"""
h = x
for i in range(self.n_layers - 1):
h = h @ self.weights[i] + self.biases[i]
h = np.maximum(0.01 * h, h)
return h
class AnoGANLike:
"""AnoGAN的な異常検知(f-AnoGAN風のエンコーダ付き)"""
def __init__(self, input_dim, latent_dim=8, hidden_dim=64, lr_g=0.0005,
lr_d=0.0005):
self.latent_dim = latent_dim
self.G = SimpleGenerator(latent_dim, hidden_dim, input_dim, lr=lr_g)
self.D = SimpleDiscriminator(input_dim, hidden_dim, lr=lr_d)
# f-AnoGAN風のエンコーダ
self.enc_weights = []
self.enc_biases = []
enc_dims = [input_dim, hidden_dim, hidden_dim // 2, latent_dim]
for i in range(len(enc_dims) - 1):
scale = np.sqrt(2.0 / (enc_dims[i] + enc_dims[i+1]))
self.enc_weights.append(np.random.randn(enc_dims[i], enc_dims[i+1]) * scale)
self.enc_biases.append(np.zeros(enc_dims[i+1]))
self.lr_enc = 0.001
def encode(self, x):
"""エンコーダ: x → z"""
h = x
for i in range(len(self.enc_weights) - 1):
h = h @ self.enc_weights[i] + self.enc_biases[i]
h = np.maximum(0, h)
h = h @ self.enc_weights[-1] + self.enc_biases[-1]
return h
def fit_gan(self, X_train, epochs=200, batch_size=64):
"""GANの学習"""
n = X_train.shape[0]
for epoch in range(epochs):
idx = np.random.permutation(n)
for s in range(0, n, batch_size):
e = min(s + batch_size, n)
X_real = X_train[idx[s:e]]
m = e - s
# ---- 識別器の更新 ----
z = np.random.randn(m, self.latent_dim)
X_fake = self.G.forward(z)
D_real = self.D.forward(X_real)
D_fake = self.D.forward(X_fake)
# 識別器の損失の勾配(Binary Cross Entropy)
grad_D_real = -(1.0 / (D_real + 1e-8)) / m
grad_D_fake = (1.0 / (1.0 - D_fake + 1e-8)) / m
# 簡略化: 識別器の重みを直接更新
for i in range(self.D.n_layers - 1, -1, -1):
if i == self.D.n_layers - 1:
# Sigmoid出力層の勾配
sig_real = D_real
delta_real = grad_D_real * sig_real * (1 - sig_real)
sig_fake = D_fake
delta_fake = grad_D_fake * sig_fake * (1 - sig_fake)
gW = (self.D.acts[i].T @ delta_real + self.D.acts[i].T @ delta_fake) / 2
gb = np.mean(delta_real + delta_fake, axis=0) / 2
self.D.weights[i] -= self.D.lr * np.clip(gW, -1, 1)
self.D.biases[i] -= self.D.lr * np.clip(gb, -1, 1)
if i > 0:
leaky = np.where(self.D.pres[i-1] > 0, 1.0, 0.01)
delta_real = (delta_real @ self.D.weights[i].T) * leaky
delta_fake = (delta_fake @ self.D.weights[i].T) * leaky
# ---- 生成器の更新 ----
z = np.random.randn(m, self.latent_dim)
X_fake = self.G.forward(z)
D_fake = self.D.forward(X_fake)
# 生成器の勾配: -log(D(G(z)))
grad_G = -(1.0 / (D_fake + 1e-8)) / m
sig = D_fake
delta = grad_G * sig * (1 - sig)
# D を通じた勾配
for i in range(self.D.n_layers - 1, -1, -1):
if i > 0:
leaky = np.where(self.D.pres[i-1] > 0, 1.0, 0.01)
delta = (delta @ self.D.weights[i].T) * leaky
else:
delta = delta @ self.D.weights[i].T
# Gの逆伝播
self.G.backward(delta)
def fit_encoder(self, X_train, epochs=200, batch_size=64):
"""f-AnoGAN風のエンコーダの学習"""
n = X_train.shape[0]
for epoch in range(epochs):
idx = np.random.permutation(n)
for s in range(0, n, batch_size):
e = min(s + batch_size, n)
X_batch = X_train[idx[s:e]]
m = e - s
# エンコーダ順伝播
z_enc = self.encode(X_batch)
x_recon = self.G.forward(z_enc)
# 損失: ||x - G(E(x))||²
recon_loss_grad = 2 * (x_recon - X_batch) / m
# G を通じた z への勾配(Gの重みは固定)
delta = recon_loss_grad
for i in range(self.G.n_layers - 1, -1, -1):
if i > 0:
leaky = np.where(self.G.pres[i-1] > 0, 1.0, 0.01)
delta = (delta @ self.G.weights[i].T) * leaky
else:
delta = delta @ self.G.weights[i].T
# エンコーダの逆伝播
grad_z = delta
h_list = [X_batch]
h = X_batch
for i in range(len(self.enc_weights) - 1):
h = h @ self.enc_weights[i] + self.enc_biases[i]
h = np.maximum(0, h)
h_list.append(h)
# 最終層
grad_enc = grad_z
for i in range(len(self.enc_weights) - 1, -1, -1):
gW = h_list[i].T @ grad_enc / m
gb = np.mean(grad_enc, axis=0)
self.enc_weights[i] -= self.lr_enc * np.clip(gW, -1, 1)
self.enc_biases[i] -= self.lr_enc * np.clip(gb, -1, 1)
if i > 0:
grad_enc = (grad_enc @ self.enc_weights[i].T) * (h_list[i] > 0).astype(float)
def fit(self, X_train, epochs_gan=200, epochs_enc=200, batch_size=64):
"""GANとエンコーダを順に学習"""
print(" GAN学習中...")
self.fit_gan(X_train, epochs=epochs_gan, batch_size=batch_size)
print(" エンコーダ学習中...")
self.fit_encoder(X_train, epochs=epochs_enc, batch_size=batch_size)
def anomaly_score(self, X):
"""異常スコア: ||x - G(E(x))||²"""
z = self.encode(X)
x_recon = self.G.forward(z)
return np.mean((X - x_recon) ** 2, axis=1)
# AnoGAN的手法の学習
print("\nAnoGAN的手法の学習...")
anogan = AnoGANLike(input_dim, latent_dim=8, hidden_dim=64)
anogan.fit(X_train, epochs_gan=200, epochs_enc=200)
anogan_scores = anogan.anomaly_score(X_test)
print(f"AnoGAN-like AUC: {roc_auc_score(y_test, anogan_scores):.4f}")
# ===== 5. 結果の比較と可視化 =====
# AUCスコアの計算
auc_ae = roc_auc_score(y_test, ae_scores)
auc_svdd = roc_auc_score(y_test, svdd_scores)
auc_anogan = roc_auc_score(y_test, anogan_scores)
print("\n" + "=" * 50)
print("AUC-ROC スコアの比較")
print("=" * 50)
print(f" AE: {auc_ae:.4f}")
print(f" Deep SVDD: {auc_svdd:.4f}")
print(f" AnoGAN-like: {auc_anogan:.4f}")
# ROC曲線の比較
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# (a) ROC曲線
methods = {
f'AE (AUC={auc_ae:.3f})': ae_scores,
f'Deep SVDD (AUC={auc_svdd:.3f})': svdd_scores,
f'AnoGAN-like (AUC={auc_anogan:.3f})': anogan_scores,
}
colors = ['blue', 'red', 'green']
for (name, scores), color in zip(methods.items(), colors):
fpr, tpr, _ = roc_curve(y_test, scores)
axes[0].plot(fpr, tpr, color=color, lw=2, label=name)
axes[0].plot([0, 1], [0, 1], 'k--', lw=1, alpha=0.5)
axes[0].set_xlabel('False Positive Rate')
axes[0].set_ylabel('True Positive Rate')
axes[0].set_title('ROC Curve Comparison')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# (b) スコア分布の比較(箱ひげ図)
all_scores_list = [ae_scores, svdd_scores, anogan_scores]
method_names = ['AE', 'Deep SVDD', 'AnoGAN-like']
normal_scores_list = [s[y_test == 0] for s in all_scores_list]
anomaly_scores_list = [s[y_test == 1] for s in all_scores_list]
positions_normal = [1, 3, 5]
positions_anomaly = [1.6, 3.6, 5.6]
bp1 = axes[1].boxplot([ns / np.percentile(ns, 95) for ns in normal_scores_list],
positions=positions_normal, widths=0.4,
patch_artist=True, boxprops=dict(facecolor='lightblue'))
bp2 = axes[1].boxplot([ans / np.percentile(ans, 95) for ans in anomaly_scores_list],
positions=positions_anomaly, widths=0.4,
patch_artist=True, boxprops=dict(facecolor='lightsalmon'))
axes[1].set_xticks([1.3, 3.3, 5.3])
axes[1].set_xticklabels(method_names)
axes[1].set_ylabel('Normalized Anomaly Score')
axes[1].set_title('Score Distribution (Normal vs Anomaly)')
axes[1].legend([bp1['boxes'][0], bp2['boxes'][0]], ['Normal', 'Anomaly'])
axes[1].grid(True, alpha=0.3)
# (c) 学習曲線の比較
axes[2].plot(ae_losses, label='AE', color='blue', alpha=0.7)
axes[2].plot(svdd_losses, label='Deep SVDD', color='red', alpha=0.7)
axes[2].set_xlabel('Epoch')
axes[2].set_ylabel('Training Loss')
axes[2].set_title('Training Loss Curves')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.suptitle('Deep Anomaly Detection: Method Comparison', fontsize=14)
plt.tight_layout()
plt.show()
# ===== 6. Deep SVDDの特徴空間の可視化 =====
from sklearn.decomposition import PCA
# Deep SVDDの特徴表現を取得
features_test = svdd.forward(X_test)
# PCAで2次元に射影
pca = PCA(n_components=2)
features_2d = pca.fit_transform(features_test)
center_2d = pca.transform(svdd.center.reshape(1, -1))[0]
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# (a) 特徴空間の可視化
scatter_normal = axes[0].scatter(features_2d[y_test == 0, 0], features_2d[y_test == 0, 1],
c='blue', s=10, alpha=0.5, label='Normal')
scatter_anomaly = axes[0].scatter(features_2d[y_test == 1, 0], features_2d[y_test == 1, 1],
c='red', s=30, marker='x', label='Anomaly')
axes[0].scatter(center_2d[0], center_2d[1], c='black', s=200, marker='*',
label='Center c', zorder=5)
# ハイパースフィアの等高線(2D射影)
normal_dists = np.sum((features_2d[y_test == 0] - center_2d) ** 2, axis=1)
radius = np.percentile(normal_dists, 95)
theta = np.linspace(0, 2 * np.pi, 100)
axes[0].plot(center_2d[0] + np.sqrt(radius) * np.cos(theta),
center_2d[1] + np.sqrt(radius) * np.sin(theta),
'k--', lw=2, alpha=0.5, label='95% boundary')
axes[0].set_xlabel('PC1')
axes[0].set_ylabel('PC2')
axes[0].set_title('Deep SVDD Feature Space (PCA projection)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# (b) 中心からの距離の分布
dists_normal = np.sqrt(svdd_scores[y_test == 0])
dists_anomaly = np.sqrt(svdd_scores[y_test == 1])
axes[1].hist(dists_normal, bins=40, alpha=0.7, label='Normal', density=True, color='blue')
axes[1].hist(dists_anomaly, bins=20, alpha=0.7, label='Anomaly', density=True, color='red')
axes[1].set_xlabel('Distance from center ||φ(x) - c||')
axes[1].set_ylabel('Density')
axes[1].set_title('Distribution of Distance from Center')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# ===== 7. 異常タイプ別の検出性能分析 =====
n_normal = np.sum(y_true == 0)
n_anom_each = (np.sum(y_true == 1)) // 3
# 異常タイプのラベル
anomaly_type = np.zeros(len(y_true), dtype=int)
anomaly_type[n_normal:n_normal + n_anom_each] = 1 # タイプ1: 特徴量シフト
anomaly_type[n_normal + n_anom_each:n_normal + 2*n_anom_each] = 2 # タイプ2: 分布外
anomaly_type[n_normal + 2*n_anom_each:] = 3 # タイプ3: 相関破壊
print("異常タイプ別の平均スコア:")
print(f"{'タイプ':<20} {'AE':>10} {'Deep SVDD':>12} {'AnoGAN-like':>14}")
print("-" * 58)
type_names = ['正常', '特徴量シフト', '分布外', '相関破壊']
for t in range(4):
mask = anomaly_type == t
print(f"{type_names[t]:<20} {np.mean(ae_scores[mask]):>10.4f} "
f"{np.mean(svdd_scores[mask]):>12.4f} {np.mean(anogan_scores[mask]):>14.4f}")
# 各手法の得意・不得意の可視化
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
method_scores = {'AE': ae_scores, 'Deep SVDD': svdd_scores, 'AnoGAN-like': anogan_scores}
for idx, (name, scores) in enumerate(method_scores.items()):
for t in range(4):
mask = anomaly_type == t
axes[idx].hist(scores[mask], bins=30, alpha=0.6,
label=type_names[t], density=True)
axes[idx].set_xlabel('Anomaly Score')
axes[idx].set_ylabel('Density')
axes[idx].set_title(f'{name}: Score by Anomaly Type')
axes[idx].legend(fontsize=8)
axes[idx].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# ===== 8. 最終比較テーブル =====
print("\n" + "=" * 70)
print("深層異常検知手法の比較サマリ")
print("=" * 70)
print(f"{'手法':<16} {'AUC':>8} {'原理':<16} {'推論速度':<10} {'学習容易性':<10}")
print("-" * 70)
results = [
('AE', auc_ae, '再構成', '高速', '容易'),
('Deep SVDD', auc_svdd, 'ハイパースフィア', '高速', '容易'),
('AnoGAN-like', auc_anogan, 'GAN逆写像', '高速(f-)', 'やや困難'),
]
for name, auc, principle, speed, ease in results:
print(f"{name:<16} {auc:>8.4f} {principle:<16} {speed:<10} {ease:<10}")
print("\n注意: 上記のAUCは合成データに対する結果であり、実データでの性能を")
print("保証するものではありません。データの特性に応じた手法選択が重要です。")
まとめ
本記事では、深層学習による異常検知手法を体系的に解説しました。
- 再構成ベース(AE/VAE/DAGMM): 正常データの再構成能力を学習し、再構成の失敗度合いを異常スコアとする。最もシンプルで広く使える手法群である
- GANベース(AnoGAN/f-AnoGAN): GANの生成空間への逆写像を利用する。f-AnoGANではエンコーダの追加により推論が高速化される
- Deep SVDD: 特徴空間でのハイパースフィア最小化により、正常データを中心の近くに、異常データを遠くに写像する
- 自己教師ありベース: 幾何変換の認識や対照学習を利用して、正常データの内部構造を捉える
- 知識蒸留ベース: 事前学習済み教師ネットワークと生徒ネットワークの出力差を異常スコアとする
- 手法の選択: データの種類(画像/表形式/時系列)、計算リソース、解釈性の要件に応じて適切な手法を選ぶことが重要
- Python実装: AE、Deep SVDD、AnoGAN的手法の3つをスクラッチ実装し、合成データで比較した
次のステップとして、以下の記事も参考にしてください。