前回の記事ではオートエンコーダ(AE)による異常検知を解説しました。AEは再構成誤差を異常スコアとして用いるシンプルな手法ですが、確率的な解釈が欠けているという課題があります。「この再構成誤差は本当に異常なのか?」「どの程度確信を持って異常と言えるのか?」という問いに答えるのが困難です。
VAE(Variational Autoencoder)による異常検知は、この課題を解決します。VAEは生成モデルとしてデータの確率分布を明示的にモデリングするため、「データの尤度(もっともらしさ)」を直接評価できます。さらに、潜在空間が正規分布に正則化されているため、潜在表現のレベルでも異常を定量的に検出できます。
本記事の内容
- VAEベース異常検知の原理
- 3つの異常スコア(再構成確率・ELBO・マハラノビス距離)
- 再構成確率のモンテカルロ推定の導出
- 潜在空間における異常検知の仕組み
- 条件付きVAE(CVAE)による条件付き異常検知の概要
- AEとVAEの理論的比較
- Pythonでの実装と性能比較
前提知識
この記事を読む前に、以下の記事を読んでおくと理解が深まります。
VAEの復習
VAEの生成モデルとしての定式化
VAEは、潜在変数 $\bm{z}$ からデータ $\bm{x}$ を生成する確率モデルです。
$$ p_\theta(\bm{x}) = \int p_\theta(\bm{x}|\bm{z}) p(\bm{z})\ d\bm{z} $$
ここで $p(\bm{z}) = \mathcal{N}(\bm{0}, \bm{I})$ は潜在変数の事前分布、$p_\theta(\bm{x}|\bm{z})$ はデコーダが定義する条件付き分布です。
ELBOの導出
データの対数尤度 $\log p_\theta(\bm{x})$ の下界であるELBO(Evidence Lower BOund)は、変分推論から導かれます。
任意の分布 $q_\phi(\bm{z}|\bm{x})$ に対して、
$$ \begin{align} \log p_\theta(\bm{x}) &= \log \int p_\theta(\bm{x}|\bm{z})p(\bm{z})\ d\bm{z} \\ &= \log \int \frac{p_\theta(\bm{x}|\bm{z})p(\bm{z})}{q_\phi(\bm{z}|\bm{x})} q_\phi(\bm{z}|\bm{x})\ d\bm{z} \\ &\geq \int q_\phi(\bm{z}|\bm{x}) \log \frac{p_\theta(\bm{x}|\bm{z})p(\bm{z})}{q_\phi(\bm{z}|\bm{x})}\ d\bm{z} \quad (\because \text{Jensenの不等式}) \end{align} $$
この下界を整理すると、
$$ \begin{align} \text{ELBO}(\bm{x}) &= \mathbb{E}_{q_\phi(\bm{z}|\bm{x})}\left[\log p_\theta(\bm{x}|\bm{z})\right] – D_{\text{KL}}\left(q_\phi(\bm{z}|\bm{x}) \| p(\bm{z})\right) \end{align} $$
第1項は再構成項(再構成の対数尤度の期待値)、第2項は正則化項(事後分布と事前分布のKLダイバージェンス)です。
VAEの学習
VAEは、エンコーダ $q_\phi(\bm{z}|\bm{x}) = \mathcal{N}(\bm{\mu}_\phi(\bm{x}), \text{diag}(\bm{\sigma}_\phi^2(\bm{x})))$ とデコーダ $p_\theta(\bm{x}|\bm{z})$ を同時に学習し、ELBOを最大化します。
$$ \max_{\phi, \theta} \sum_{i=1}^{N} \text{ELBO}(\bm{x}_i) $$
VAEベース異常検知の原理
基本的なアイデア
VAEベースの異常検知の考え方は以下の通りです。
- 学習フェーズ: 正常データのみでVAEを学習する。VAEは正常データの生成分布 $p_\theta(\bm{x})$ を学習する
- 推論フェーズ: 新しいデータ $\bm{x}$ に対して、学習した分布の下での「もっともらしさ」を評価する
- 異常判定: もっともらしくないデータ(確率が低いデータ)を異常と判定する
AEとの本質的な違いは、VAEは確率分布を明示的にモデリングしているため、単なる再構成誤差だけでなく、確率的な指標を複数使えることです。
3つの異常スコア
VAEでは主に3つの異常スコアが用いられます。それぞれの数学的な定式化と特性を解説します。
異常スコア1: 再構成確率
再構成確率(Reconstruction Probability)は、VAEが生成するデータの対数尤度の期待値です。
$$ \text{Score}_{\text{recon}}(\bm{x}) = -\mathbb{E}_{q_\phi(\bm{z}|\bm{x})}\left[\log p_\theta(\bm{x}|\bm{z})\right] $$
符号を反転しているため、スコアが大きいほど異常です。
デコーダがガウス分布の場合
$p_\theta(\bm{x}|\bm{z}) = \mathcal{N}(\bm{x}; \bm{\mu}_\theta(\bm{z}), \sigma^2 \bm{I})$ の場合、
$$ \log p_\theta(\bm{x}|\bm{z}) = -\frac{d}{2}\log(2\pi\sigma^2) – \frac{1}{2\sigma^2}\|\bm{x} – \bm{\mu}_\theta(\bm{z})\|^2 $$
定数項を無視すると、再構成確率は再構成誤差に比例します。
$$ -\log p_\theta(\bm{x}|\bm{z}) \propto \|\bm{x} – \bm{\mu}_\theta(\bm{z})\|^2 $$
ベルヌーイ分布の場合
二値データで $p_\theta(\bm{x}|\bm{z}) = \prod_{j=1}^{d} \hat{x}_j^{x_j}(1-\hat{x}_j)^{1-x_j}$ の場合、
$$ \log p_\theta(\bm{x}|\bm{z}) = \sum_{j=1}^{d}\left[x_j \log \hat{x}_j + (1-x_j)\log(1-\hat{x}_j)\right] $$
これはバイナリ交差エントロピーの負値に等しくなります。
モンテカルロ推定
再構成確率の期待値は一般に解析的に計算できないため、モンテカルロ推定を用います。エンコーダから $L$ 個のサンプルを生成して、
$$ \mathbb{E}_{q_\phi(\bm{z}|\bm{x})}\left[\log p_\theta(\bm{x}|\bm{z})\right] \approx \frac{1}{L}\sum_{l=1}^{L} \log p_\theta(\bm{x}|\bm{z}^{(l)}) $$
ここで $\bm{z}^{(l)} = \bm{\mu}_\phi(\bm{x}) + \bm{\sigma}_\phi(\bm{x}) \odot \bm{\epsilon}^{(l)}, \quad \bm{\epsilon}^{(l)} \sim \mathcal{N}(\bm{0}, \bm{I})$ はreparameterization trickによるサンプルです。
モンテカルロ推定の分散は $L$ に反比例して減少します。
$$ \text{Var}\left[\frac{1}{L}\sum_{l=1}^{L}\log p_\theta(\bm{x}|\bm{z}^{(l)})\right] = \frac{1}{L}\text{Var}\left[\log p_\theta(\bm{x}|\bm{z})\right] $$
実用的には $L = 50 \sim 100$ で十分な精度が得られることが多いです。
異常スコア2: ELBO
ELBOはデータの対数尤度の下界であるため、ELBOが低いデータは対数尤度も低い、すなわち異常である可能性が高いと考えられます。
$$ \text{Score}_{\text{ELBO}}(\bm{x}) = -\text{ELBO}(\bm{x}) = -\mathbb{E}_{q_\phi(\bm{z}|\bm{x})}\left[\log p_\theta(\bm{x}|\bm{z})\right] + D_{\text{KL}}\left(q_\phi(\bm{z}|\bm{x}) \| p(\bm{z})\right) $$
ELBOスコアは再構成確率とKLダイバージェンスの両方を含んでいるため、以下の2つの異常パターンの両方を捉えることができます。
- 再構成が困難な異常: 再構成確率が低い → 第1項が大きい
- 潜在空間で異常な位置にエンコードされる異常: 事前分布から乖離 → 第2項が大きい
KLダイバージェンスの解析的な計算
ガウス分布の場合、KLダイバージェンスは解析的に計算できます。
$$ \begin{align} D_{\text{KL}}(q_\phi(\bm{z}|\bm{x}) \| p(\bm{z})) &= D_{\text{KL}}(\mathcal{N}(\bm{\mu}, \text{diag}(\bm{\sigma}^2)) \| \mathcal{N}(\bm{0}, \bm{I})) \\ &= \frac{1}{2}\sum_{j=1}^{k}\left(\mu_j^2 + \sigma_j^2 – 1 – \log \sigma_j^2\right) \end{align} $$
この導出を示します。一般にガウス分布間のKLダイバージェンスは、
$$ D_{\text{KL}}(\mathcal{N}(\bm{\mu}_1, \bm{\Sigma}_1) \| \mathcal{N}(\bm{\mu}_2, \bm{\Sigma}_2)) = \frac{1}{2}\left[\text{tr}(\bm{\Sigma}_2^{-1}\bm{\Sigma}_1) + (\bm{\mu}_2 – \bm{\mu}_1)^T\bm{\Sigma}_2^{-1}(\bm{\mu}_2 – \bm{\mu}_1) – k + \ln\frac{|\bm{\Sigma}_2|}{|\bm{\Sigma}_1|}\right] $$
$\bm{\mu}_2 = \bm{0}, \bm{\Sigma}_2 = \bm{I}, \bm{\Sigma}_1 = \text{diag}(\bm{\sigma}^2)$ を代入すると、
$$ \begin{align} D_{\text{KL}} &= \frac{1}{2}\left[\text{tr}(\text{diag}(\bm{\sigma}^2)) + \bm{\mu}^T\bm{\mu} – k + \ln\frac{1}{\prod_j \sigma_j^2}\right] \\ &= \frac{1}{2}\left[\sum_{j=1}^{k}\sigma_j^2 + \sum_{j=1}^{k}\mu_j^2 – k – \sum_{j=1}^{k}\ln\sigma_j^2\right] \\ &= \frac{1}{2}\sum_{j=1}^{k}\left(\mu_j^2 + \sigma_j^2 – 1 – \ln\sigma_j^2\right) \end{align} $$
異常スコア3: 潜在空間でのマハラノビス距離
VAEは正常データの潜在表現を標準正規分布 $\mathcal{N}(\bm{0}, \bm{I})$ に近づくよう正則化しています。したがって、異常データの潜在表現は標準正規分布から逸脱する傾向があります。
潜在空間でのマハラノビス距離を異常スコアとして使います。
$$ \text{Score}_{\text{Maha}}(\bm{x}) = \bm{\mu}_\phi(\bm{x})^T \bm{\Sigma}_{\text{train}}^{-1} \bm{\mu}_\phi(\bm{x}) $$
ここで $\bm{\Sigma}_{\text{train}}$ は、学習データの潜在表現の共分散行列です。
もし学習が理想的であれば $\bm{\Sigma}_{\text{train}} \approx \bm{I}$ なので、
$$ \text{Score}_{\text{Maha}}(\bm{x}) \approx \|\bm{\mu}_\phi(\bm{x})\|^2 $$
すなわち、潜在表現が原点から遠いデータほど異常である可能性が高くなります。
マハラノビス距離の統計的解釈
$k$ 次元の多変量正規分布 $\mathcal{N}(\bm{0}, \bm{I})$ に従う $\bm{z}$ のマハラノビス距離の二乗 $\|\bm{z}\|^2$ は自由度 $k$ のカイ二乗分布に従います。
$$ \|\bm{z}\|^2 \sim \chi^2_k $$
したがって、カイ二乗分布の上側確率を用いてp値ベースの閾値設定が可能です。有意水準 $\alpha$ の閾値は、
$$ \tau = \chi^2_{k, 1-\alpha} $$
ここで $\chi^2_{k, 1-\alpha}$ はカイ二乗分布の $1-\alpha$ パーセンタイルです。
潜在空間での異常検知
潜在空間の構造と異常
VAEの大きな利点は、潜在空間が構造化されていることです。KLダイバージェンスの正則化により、正常データの潜在表現は標準正規分布の周辺にまとまります。
異常データは以下の2つのパターンで潜在空間に現れます。
- 外れ値パターン: 正常データの潜在表現の分布から大きく外れた位置にエンコードされる
- エンコーダの不確実性パターン: エンコーダの出力する分散 $\bm{\sigma}_\phi^2(\bm{x})$ が異常に大きくなる
エンコーダの不確実性を活用する
VAEのエンコーダは平均 $\bm{\mu}_\phi(\bm{x})$ だけでなく分散 $\bm{\sigma}_\phi^2(\bm{x})$ も出力します。この分散は「データ $\bm{x}$ をどの程度確信を持ってエンコードできるか」を表しています。
エンコーダの不確実性を異常スコアに組み込むことができます。
$$ \text{Score}_{\text{unc}}(\bm{x}) = \sum_{j=1}^{k} \sigma_{\phi,j}^2(\bm{x}) $$
正常データでは学習済みのパターンをエンコーダが「よく知っている」ため分散が小さくなり、異常データでは「見たことがない」ため分散が大きくなる傾向があります。
条件付きVAE(CVAE)による条件付き異常検知
CVAEの定式化
条件付きVAE(Conditional VAE, CVAE)は、ラベルや条件情報 $\bm{c}$ を追加で入力とするVAEです。
$$ \begin{align} q_\phi(\bm{z}|\bm{x}, \bm{c}) &= \mathcal{N}(\bm{\mu}_\phi(\bm{x}, \bm{c}), \text{diag}(\bm{\sigma}_\phi^2(\bm{x}, \bm{c}))) \\ p_\theta(\bm{x}|\bm{z}, \bm{c}) &= \mathcal{N}(\bm{\mu}_\theta(\bm{z}, \bm{c}), \sigma^2\bm{I}) \end{align} $$
CVAEのELBOは、
$$ \text{ELBO}(\bm{x}, \bm{c}) = \mathbb{E}_{q_\phi(\bm{z}|\bm{x},\bm{c})}\left[\log p_\theta(\bm{x}|\bm{z},\bm{c})\right] – D_{\text{KL}}\left(q_\phi(\bm{z}|\bm{x},\bm{c}) \| p(\bm{z})\right) $$
条件付き異常検知への応用
CVAEを用いると、「条件 $\bm{c}$ のもとで $\bm{x}$ は正常か?」という条件付き異常検知が可能になります。
例えば、製造ラインのデータで $\bm{c}$ が製品の種類やロット番号である場合、「この種類の製品としては異常か?」という判断ができます。条件を明示的にモデルに入れることで、条件に起因する正常な変動と、条件によらない異常な変動を区別できます。
AEとVAEの比較
理論的な違い
| 特性 | AE | VAE |
|---|---|---|
| モデル | 決定的関数 | 確率的生成モデル |
| 潜在空間 | 構造なし | 標準正規分布に正則化 |
| 異常スコア | 再構成誤差のみ | 再構成確率・ELBO・マハラノビス距離 |
| 不確実性 | 定量化不可 | エンコーダの分散で定量化可能 |
| 補間 | 意味のある補間が困難 | 潜在空間での滑らかな補間が可能 |
| 学習の安定性 | 安定 | KL消失(posterior collapse)のリスク |
| 計算コスト | 低い | やや高い(サンプリングが必要) |
どちらを選ぶべきか
- AEが適する場合: 計算リソースが限られる、単純な再構成誤差で十分な検出性能が得られる、潜在空間の解釈性が不要
- VAEが適する場合: 異常スコアの確率的解釈が必要、不確実性の定量化が求められる、潜在空間での分析が有用、条件付き異常検知を行いたい
Pythonでの実装
AEとVAEの異常検知性能比較
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, roc_curve
# ===== 1. 合成データの生成 =====
np.random.seed(42)
def generate_data(n_normal=1000, n_anomaly=50, n_features=8):
"""正常データと異常データの生成"""
# 正常データ: 相関のある多変量正規分布
A = np.random.randn(n_features, n_features) * 0.3
cov = A @ A.T + np.eye(n_features) * 0.5
X_normal = np.random.multivariate_normal(np.zeros(n_features), cov, n_normal)
# 異常データ: 正常分布から離れたデータ
X_anomaly = np.random.multivariate_normal(np.zeros(n_features), cov, n_anomaly)
for i in range(n_anomaly):
n_corrupt = np.random.randint(2, 4)
corrupt_idx = np.random.choice(n_features, n_corrupt, replace=False)
X_anomaly[i, corrupt_idx] += np.random.choice([-1, 1], n_corrupt) * np.random.uniform(3, 6, n_corrupt)
X = np.vstack([X_normal, X_anomaly])
y = np.array([0] * n_normal + [1] * n_anomaly)
return X, y
X, y_true = generate_data()
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 学習データ(正常のみ)とテストデータの分割
X_train = X_scaled[y_true == 0][:800]
X_test = X_scaled
y_test = y_true
input_dim = X_scaled.shape[1]
print(f"入力次元: {input_dim}")
print(f"学習データ: {X_train.shape[0]}点(正常のみ)")
print(f"テストデータ: {X_test.shape[0]}点(正常{np.sum(y_test==0)} + 異常{np.sum(y_test==1)})")
# ===== 2. ニューラルネットワークのユーティリティ =====
def relu(x):
return np.maximum(0, x)
def relu_deriv(x):
return (x > 0).astype(float)
def sigmoid(x):
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
class DenseLayer:
"""全結合層"""
def __init__(self, in_dim, out_dim, activation='relu'):
scale = np.sqrt(2.0 / (in_dim + out_dim))
self.W = np.random.randn(in_dim, out_dim) * scale
self.b = np.zeros(out_dim)
self.activation = activation
# Adam用のモーメント
self.mW = np.zeros_like(self.W)
self.vW = np.zeros_like(self.W)
self.mb = np.zeros_like(self.b)
self.vb = np.zeros_like(self.b)
def forward(self, x):
self.x = x
self.z = x @ self.W + self.b
if self.activation == 'relu':
self.a = relu(self.z)
elif self.activation == 'linear':
self.a = self.z
elif self.activation == 'sigmoid':
self.a = sigmoid(self.z)
return self.a
def backward(self, grad_output):
if self.activation == 'relu':
grad_z = grad_output * relu_deriv(self.z)
elif self.activation == 'linear':
grad_z = grad_output
elif self.activation == 'sigmoid':
s = sigmoid(self.z)
grad_z = grad_output * s * (1 - s)
self.grad_W = self.x.T @ grad_z / self.x.shape[0]
self.grad_b = np.mean(grad_z, axis=0)
grad_input = grad_z @ self.W.T
return grad_input
def update(self, lr=0.001, beta1=0.9, beta2=0.999, eps=1e-8, t=1):
"""Adam更新"""
self.mW = beta1 * self.mW + (1 - beta1) * self.grad_W
self.vW = beta2 * self.vW + (1 - beta2) * self.grad_W ** 2
mW_hat = self.mW / (1 - beta1 ** t)
vW_hat = self.vW / (1 - beta2 ** t)
self.W -= lr * mW_hat / (np.sqrt(vW_hat) + eps)
self.mb = beta1 * self.mb + (1 - beta1) * self.grad_b
self.vb = beta2 * self.vb + (1 - beta2) * self.grad_b ** 2
mb_hat = self.mb / (1 - beta1 ** t)
vb_hat = self.vb / (1 - beta2 ** t)
self.b -= lr * mb_hat / (np.sqrt(vb_hat) + eps)
print("DenseLayer クラスの定義完了")
# ===== 3. VAEの実装 =====
class VAE:
"""NumPyベースのVAE実装"""
def __init__(self, input_dim, hidden_dim=64, latent_dim=4, lr=0.001):
self.input_dim = input_dim
self.latent_dim = latent_dim
self.lr = lr
# エンコーダ
self.enc_h1 = DenseLayer(input_dim, hidden_dim, 'relu')
self.enc_h2 = DenseLayer(hidden_dim, hidden_dim // 2, 'relu')
self.enc_mu = DenseLayer(hidden_dim // 2, latent_dim, 'linear')
self.enc_logvar = DenseLayer(hidden_dim // 2, latent_dim, 'linear')
# デコーダ
self.dec_h1 = DenseLayer(latent_dim, hidden_dim // 2, 'relu')
self.dec_h2 = DenseLayer(hidden_dim // 2, hidden_dim, 'relu')
self.dec_out = DenseLayer(hidden_dim, input_dim, 'linear')
self.all_layers = [
self.enc_h1, self.enc_h2, self.enc_mu, self.enc_logvar,
self.dec_h1, self.dec_h2, self.dec_out
]
self.t = 0 # Adam のタイムステップ
def encode(self, x):
"""エンコーダ: x → (μ, log σ²)"""
h = self.enc_h1.forward(x)
h = self.enc_h2.forward(h)
mu = self.enc_mu.forward(h)
logvar = self.enc_logvar.forward(h)
return mu, logvar
def reparameterize(self, mu, logvar):
"""Reparameterization trick: z = μ + σ ⊙ ε"""
std = np.exp(0.5 * logvar)
self.eps = np.random.randn(*mu.shape)
z = mu + std * self.eps
return z
def decode(self, z):
"""デコーダ: z → x_hat"""
h = self.dec_h1.forward(z)
h = self.dec_h2.forward(h)
x_hat = self.dec_out.forward(h)
return x_hat
def forward(self, x):
"""順伝播"""
self.mu, self.logvar = self.encode(x)
self.z = self.reparameterize(self.mu, self.logvar)
self.x_hat = self.decode(self.z)
return self.x_hat, self.mu, self.logvar
def loss(self, x, x_hat, mu, logvar):
"""VAE損失 = 再構成誤差 + KLダイバージェンス"""
# 再構成誤差(MSE)
recon_loss = np.mean(np.sum((x - x_hat) ** 2, axis=1))
# KLダイバージェンス: (1/2) Σ(μ² + σ² - 1 - log σ²)
kl_loss = -0.5 * np.mean(np.sum(1 + logvar - mu ** 2 - np.exp(logvar), axis=1))
return recon_loss, kl_loss, recon_loss + kl_loss
def backward(self, x):
"""逆伝播"""
m = x.shape[0]
# 出力層の勾配: d(MSE)/d(x_hat) = 2(x_hat - x) / m
grad_xhat = 2 * (self.x_hat - x) / m
# デコーダの逆伝播
grad_dec = self.dec_out.backward(grad_xhat)
grad_dec = self.dec_h2.backward(grad_dec)
grad_z = self.dec_h1.backward(grad_dec)
# Reparameterization trickの逆伝播
std = np.exp(0.5 * self.logvar)
grad_mu_from_z = grad_z
grad_logvar_from_z = grad_z * self.eps * 0.5 * std
# KL損失の勾配
grad_mu_kl = self.mu / m
grad_logvar_kl = 0.5 * (np.exp(self.logvar) - 1) / m
# 合計
grad_mu = grad_mu_from_z + grad_mu_kl
grad_logvar = grad_logvar_from_z + grad_logvar_kl
# エンコーダの逆伝播
grad_enc_mu = self.enc_mu.backward(grad_mu)
grad_enc_logvar = self.enc_logvar.backward(grad_logvar)
grad_h2 = grad_enc_mu + grad_enc_logvar
grad_h2 = self.enc_h2.backward(grad_h2)
self.enc_h1.backward(grad_h2)
def update(self):
"""全層のパラメータ更新"""
self.t += 1
for layer in self.all_layers:
layer.update(lr=self.lr, t=self.t)
def fit(self, X_train, epochs=300, batch_size=64, verbose=True):
"""学習"""
n = X_train.shape[0]
history = {'recon': [], 'kl': [], 'total': []}
for epoch in range(epochs):
indices = np.random.permutation(n)
epoch_recon = 0
epoch_kl = 0
for start in range(0, n, batch_size):
end = min(start + batch_size, n)
X_batch = X_train[indices[start:end]]
# 順伝播
x_hat, mu, logvar = self.forward(X_batch)
# 損失計算
recon, kl, total = self.loss(X_batch, x_hat, mu, logvar)
epoch_recon += recon * (end - start)
epoch_kl += kl * (end - start)
# 逆伝播と更新
self.backward(X_batch)
self.update()
epoch_recon /= n
epoch_kl /= n
history['recon'].append(epoch_recon)
history['kl'].append(epoch_kl)
history['total'].append(epoch_recon + epoch_kl)
if verbose and (epoch + 1) % 50 == 0:
print(f"Epoch {epoch+1}/{epochs}: "
f"Recon={epoch_recon:.4f}, KL={epoch_kl:.4f}, "
f"Total={epoch_recon + epoch_kl:.4f}")
return history
def anomaly_score_reconstruction(self, X, n_samples=50):
"""異常スコア1: 再構成確率(モンテカルロ推定)"""
mu, logvar = self.encode(X)
scores = np.zeros(X.shape[0])
for _ in range(n_samples):
z = self.reparameterize(mu, logvar)
x_hat = self.decode(z)
scores += np.sum((X - x_hat) ** 2, axis=1)
return scores / n_samples
def anomaly_score_elbo(self, X, n_samples=50):
"""異常スコア2: -ELBO"""
mu, logvar = self.encode(X)
# 再構成確率のモンテカルロ推定
recon_scores = np.zeros(X.shape[0])
for _ in range(n_samples):
z = self.reparameterize(mu, logvar)
x_hat = self.decode(z)
recon_scores += np.sum((X - x_hat) ** 2, axis=1)
recon_scores /= n_samples
# KLダイバージェンス(解析的)
kl_scores = -0.5 * np.sum(1 + logvar - mu ** 2 - np.exp(logvar), axis=1)
return recon_scores + kl_scores
def anomaly_score_mahalanobis(self, X):
"""異常スコア3: 潜在空間でのマハラノビス距離"""
mu, logvar = self.encode(X)
return np.sum(mu ** 2, axis=1)
print("VAE クラスの定義完了")
# ===== 4. 通常AEの実装 =====
class SimpleAE:
"""NumPyベースの通常オートエンコーダ"""
def __init__(self, input_dim, hidden_dim=64, latent_dim=4, lr=0.001):
self.lr = lr
# エンコーダ
self.enc_h1 = DenseLayer(input_dim, hidden_dim, 'relu')
self.enc_h2 = DenseLayer(hidden_dim, hidden_dim // 2, 'relu')
self.enc_out = DenseLayer(hidden_dim // 2, latent_dim, 'linear')
# デコーダ
self.dec_h1 = DenseLayer(latent_dim, hidden_dim // 2, 'relu')
self.dec_h2 = DenseLayer(hidden_dim // 2, hidden_dim, 'relu')
self.dec_out = DenseLayer(hidden_dim, input_dim, 'linear')
self.all_layers = [
self.enc_h1, self.enc_h2, self.enc_out,
self.dec_h1, self.dec_h2, self.dec_out
]
self.t = 0
def forward(self, x):
h = self.enc_h1.forward(x)
h = self.enc_h2.forward(h)
self.z = self.enc_out.forward(h)
h = self.dec_h1.forward(self.z)
h = self.dec_h2.forward(h)
self.x_hat = self.dec_out.forward(h)
return self.x_hat
def backward(self, x):
m = x.shape[0]
grad = 2 * (self.x_hat - x) / m
grad = self.dec_out.backward(grad)
grad = self.dec_h2.backward(grad)
grad = self.dec_h1.backward(grad)
grad = self.enc_out.backward(grad)
grad = self.enc_h2.backward(grad)
self.enc_h1.backward(grad)
def update(self):
self.t += 1
for layer in self.all_layers:
layer.update(lr=self.lr, t=self.t)
def fit(self, X_train, epochs=300, batch_size=64, verbose=True):
n = X_train.shape[0]
losses = []
for epoch in range(epochs):
indices = np.random.permutation(n)
epoch_loss = 0
for start in range(0, n, batch_size):
end = min(start + batch_size, n)
X_batch = X_train[indices[start:end]]
x_hat = self.forward(X_batch)
loss = np.mean(np.sum((X_batch - x_hat) ** 2, axis=1))
epoch_loss += loss * (end - start)
self.backward(X_batch)
self.update()
epoch_loss /= n
losses.append(epoch_loss)
if verbose and (epoch + 1) % 50 == 0:
print(f"Epoch {epoch+1}/{epochs}: Loss={epoch_loss:.4f}")
return losses
def anomaly_score(self, X):
x_hat = self.forward(X)
return np.sum((X - x_hat) ** 2, axis=1)
print("SimpleAE クラスの定義完了")
# ===== 5. モデルの学習 =====
latent_dim = 4
hidden_dim = 64
# VAEの学習
print("=" * 50)
print("VAEの学習")
print("=" * 50)
vae = VAE(input_dim, hidden_dim=hidden_dim, latent_dim=latent_dim, lr=0.001)
vae_history = vae.fit(X_train, epochs=300, batch_size=64)
# AEの学習
print("\n" + "=" * 50)
print("AEの学習")
print("=" * 50)
ae = SimpleAE(input_dim, hidden_dim=hidden_dim, latent_dim=latent_dim, lr=0.001)
ae_losses = ae.fit(X_train, epochs=300, batch_size=64)
# 学習曲線の比較
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].plot(ae_losses, label='AE Loss', color='blue', linewidth=2)
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('MSE Loss')
axes[0].set_title('AE Training Loss')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[1].plot(vae_history['recon'], label='Recon Loss', color='blue', linewidth=2)
axes[1].plot(vae_history['kl'], label='KL Loss', color='red', linewidth=2)
axes[1].plot(vae_history['total'], label='Total Loss', color='green', linewidth=2)
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Loss')
axes[1].set_title('VAE Training Loss')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# ===== 6. 異常スコアの計算と比較 =====
# AEの異常スコア
ae_scores = ae.anomaly_score(X_test)
# VAEの3つの異常スコア
vae_recon_scores = vae.anomaly_score_reconstruction(X_test, n_samples=50)
vae_elbo_scores = vae.anomaly_score_elbo(X_test, n_samples=50)
vae_maha_scores = vae.anomaly_score_mahalanobis(X_test)
# AUC-ROCの計算
auc_ae = roc_auc_score(y_test, ae_scores)
auc_vae_recon = roc_auc_score(y_test, vae_recon_scores)
auc_vae_elbo = roc_auc_score(y_test, vae_elbo_scores)
auc_vae_maha = roc_auc_score(y_test, vae_maha_scores)
print("AUC-ROC スコア:")
print(f" AE (再構成誤差): {auc_ae:.4f}")
print(f" VAE (再構成確率): {auc_vae_recon:.4f}")
print(f" VAE (ELBO): {auc_vae_elbo:.4f}")
print(f" VAE (マハラノビス距離): {auc_vae_maha:.4f}")
# ===== 7. ROC曲線と結果の可視化 =====
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# (a) ROC曲線の比較
scores_dict = {
f'AE Recon (AUC={auc_ae:.3f})': ae_scores,
f'VAE Recon (AUC={auc_vae_recon:.3f})': vae_recon_scores,
f'VAE ELBO (AUC={auc_vae_elbo:.3f})': vae_elbo_scores,
f'VAE Maha (AUC={auc_vae_maha:.3f})': vae_maha_scores,
}
colors = ['blue', 'red', 'green', 'orange']
for (name, scores), color in zip(scores_dict.items(), colors):
fpr, tpr, _ = roc_curve(y_test, scores)
axes[0, 0].plot(fpr, tpr, color=color, lw=2, label=name)
axes[0, 0].plot([0, 1], [0, 1], 'k--', lw=1, alpha=0.5)
axes[0, 0].set_xlabel('False Positive Rate')
axes[0, 0].set_ylabel('True Positive Rate')
axes[0, 0].set_title('ROC Curve Comparison')
axes[0, 0].legend(fontsize=9)
axes[0, 0].grid(True, alpha=0.3)
# (b) スコア分布の比較(VAE ELBOスコア)
axes[0, 1].hist(vae_elbo_scores[y_test == 0], bins=40, alpha=0.7,
label='Normal', density=True, color='blue')
axes[0, 1].hist(vae_elbo_scores[y_test == 1], bins=20, alpha=0.7,
label='Anomaly', density=True, color='red')
axes[0, 1].set_xlabel('ELBO Anomaly Score')
axes[0, 1].set_ylabel('Density')
axes[0, 1].set_title('VAE ELBO Score Distribution')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# (c) 潜在空間の可視化
mu_test, _ = vae.encode(X_test)
if latent_dim >= 2:
axes[1, 0].scatter(mu_test[y_test == 0, 0], mu_test[y_test == 0, 1],
c='blue', s=10, alpha=0.5, label='Normal')
axes[1, 0].scatter(mu_test[y_test == 1, 0], mu_test[y_test == 1, 1],
c='red', s=30, marker='x', label='Anomaly')
# 標準正規分布の等高線
theta = np.linspace(0, 2 * np.pi, 100)
for r in [1, 2, 3]:
axes[1, 0].plot(r * np.cos(theta), r * np.sin(theta),
'k--', alpha=0.3, linewidth=0.5)
axes[1, 0].set_xlabel('z_1')
axes[1, 0].set_ylabel('z_2')
axes[1, 0].set_title('Latent Space (first 2 dims)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# (d) 異常スコアの散布図(再構成確率 vs KLダイバージェンス)
mu_all, logvar_all = vae.encode(X_test)
kl_per_sample = -0.5 * np.sum(1 + logvar_all - mu_all**2 - np.exp(logvar_all), axis=1)
axes[1, 1].scatter(vae_recon_scores[y_test == 0], kl_per_sample[y_test == 0],
c='blue', s=10, alpha=0.5, label='Normal')
axes[1, 1].scatter(vae_recon_scores[y_test == 1], kl_per_sample[y_test == 1],
c='red', s=30, marker='x', label='Anomaly')
axes[1, 1].set_xlabel('Reconstruction Score')
axes[1, 1].set_ylabel('KL Divergence')
axes[1, 1].set_title('Recon Score vs KL Divergence')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.suptitle('VAE vs AE Anomaly Detection', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()
# ===== 8. 異常スコアの分布分析 =====
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
score_names = ['AE Recon Error', 'VAE Recon Prob', 'VAE ELBO', 'VAE Mahalanobis']
all_scores = [ae_scores, vae_recon_scores, vae_elbo_scores, vae_maha_scores]
for idx, (name, scores) in enumerate(zip(score_names, all_scores)):
ax = axes[idx // 2, idx % 2]
# 正常と異常のスコア統計
normal_scores = scores[y_test == 0]
anomaly_scores = scores[y_test == 1]
# ヒストグラム
bins = np.linspace(scores.min(), np.percentile(scores, 99), 50)
ax.hist(normal_scores, bins=bins, alpha=0.7, label='Normal', density=True, color='blue')
ax.hist(anomaly_scores, bins=bins, alpha=0.7, label='Anomaly', density=True, color='red')
# 統計量の表示
textstr = (f'Normal: μ={np.mean(normal_scores):.2f}, σ={np.std(normal_scores):.2f}\n'
f'Anomaly: μ={np.mean(anomaly_scores):.2f}, σ={np.std(anomaly_scores):.2f}')
ax.text(0.95, 0.95, textstr, transform=ax.transAxes, fontsize=8,
verticalalignment='top', horizontalalignment='right',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
ax.set_xlabel('Anomaly Score')
ax.set_ylabel('Density')
ax.set_title(name)
ax.legend()
ax.grid(True, alpha=0.3)
plt.suptitle('Anomaly Score Distributions', fontsize=14)
plt.tight_layout()
plt.show()
# 最終的なAUC比較の表示
print("\n" + "=" * 60)
print("最終結果: AUC-ROC スコア比較")
print("=" * 60)
print(f" AE (再構成誤差): {auc_ae:.4f}")
print(f" VAE (再構成確率): {auc_vae_recon:.4f}")
print(f" VAE (ELBO): {auc_vae_elbo:.4f}")
print(f" VAE (マハラノビス距離): {auc_vae_maha:.4f}")
print(f" VAE 最良スコア: {max(auc_vae_recon, auc_vae_elbo, auc_vae_maha):.4f}")
まとめ
本記事では、VAEによる異常検知の理論と実装について解説しました。
- VAEの利点: 確率的生成モデルとして、再構成誤差だけでなく複数の確率的異常スコアを利用できる
- 3つの異常スコア: 再構成確率 $-\log p_\theta(\bm{x}|\bm{z})$、ELBO、潜在空間でのマハラノビス距離のそれぞれが異なる種類の異常を捉える
- モンテカルロ推定: 再構成確率はreparameterization trickによるサンプリングで推定する
- 潜在空間の活用: KLダイバージェンスの正則化により、潜在空間が構造化され、異常データの位置を可視化・定量化できる
- AEとの比較: VAEは不確実性の定量化が可能であり、確率的な解釈が求められる場面で有利である
- Python実装: AEとVAEの異常検知性能を合成データで比較し、各スコアの特性を確認した
次のステップとして、以下の記事も参考にしてください。