部分空間法は、データを特徴空間に射影し、その射影の性質を利用して分類や異常検知を行う手法です。主成分分析(PCA)で得られる主部分空間とその補空間(残差空間)を利用し、正常データの構造から逸脱したデータを異常として検出します。
シンプルでありながら理論的に明快で、製造業やシステム監視などの実応用でも広く利用されています。
本記事の内容
- 部分空間法の基本概念
- 主部分空間と残差空間による異常検知
- 異常度の数学的定義
- Pythonでのスクラッチ実装と可視化
前提知識
この記事を読む前に、以下の記事を読んでおくと理解が深まります。
部分空間法の基本概念
$d$ 次元の正常データ $\bm{x}_1, \dots, \bm{x}_n$ に対してPCAを行い、上位 $k$ 個の主成分で張られる $k$ 次元の部分空間(主部分空間)を構成します。
データの共分散行列を固有値分解すると、
$$ \bm{\Sigma} = \sum_{j=1}^{d} \lambda_j \bm{v}_j \bm{v}_j^T $$
ここで $\lambda_1 \geq \lambda_2 \geq \dots \geq \lambda_d \geq 0$ は固有値、$\bm{v}_j$ は対応する固有ベクトルです。
上位 $k$ 個の固有ベクトルで主部分空間を構成します。
$$ \bm{V}_k = [\bm{v}_1, \bm{v}_2, \dots, \bm{v}_k] $$
主部分空間と残差空間
$d$ 次元空間は、主部分空間とその直交補空間(残差空間)に分解されます。
射影行列(主部分空間への射影):
$$ \bm{P}_k = \bm{V}_k \bm{V}_k^T $$
残差射影行列:
$$ \bm{P}_{\perp} = \bm{I} – \bm{V}_k \bm{V}_k^T $$
データ点 $\bm{x}$ に対して、
$$ \bm{x} = \bm{P}_k \bm{x} + \bm{P}_{\perp} \bm{x} $$
$\bm{P}_k \bm{x}$ は主部分空間上の成分、$\bm{P}_{\perp} \bm{x}$ は残差空間上の成分です。
異常度の定義
部分空間法では、残差空間への射影の大きさを異常度として利用します。
SPE (Squared Prediction Error)
$$ \text{SPE}(\bm{x}) = \|\bm{P}_{\perp}(\bm{x} – \bar{\bm{x}})\|^2 = (\bm{x} – \bar{\bm{x}})^T (\bm{I} – \bm{V}_k \bm{V}_k^T)(\bm{x} – \bar{\bm{x}}) $$
SPEは「主部分空間で説明できないデータの成分」の大きさです。正常データは主部分空間にほぼ収まるため、SPEは小さくなります。異常データは主部分空間からの逸脱が大きいため、SPEが大きくなります。
T二乗統計量(主部分空間内の異常度)
主部分空間内での異常度も併用する場合は、$T^2$ 統計量を用います。
$$ T^2(\bm{x}) = (\bm{x} – \bar{\bm{x}})^T \bm{V}_k \bm{\Lambda}_k^{-1} \bm{V}_k^T (\bm{x} – \bar{\bm{x}}) $$
ここで $\bm{\Lambda}_k = \text{diag}(\lambda_1, \dots, \lambda_k)$ です。
SPEとT二乗統計量を組み合わせることで、「正常な範囲を超えた変動」と「主部分空間内での異常な変動」の両方を検出できます。
主成分数 $k$ の選び方
主成分数 $k$ は累積寄与率で決定するのが一般的です。
$$ \text{累積寄与率} = \frac{\sum_{j=1}^{k} \lambda_j}{\sum_{j=1}^{d} \lambda_j} $$
例えば累積寄与率が90%や95%を超える最小の $k$ を選びます。
Pythonでの実装
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
np.random.seed(42)
class SubspaceAnomalyDetector:
"""部分空間法による異常検知"""
def __init__(self, n_components=None, cumulative_ratio=0.95):
self.n_components = n_components
self.cumulative_ratio = cumulative_ratio
def fit(self, X):
self.scaler = StandardScaler()
X_scaled = self.scaler.fit_transform(X)
self.mean = X_scaled.mean(axis=0)
cov = np.cov(X_scaled.T)
eigenvalues, eigenvectors = np.linalg.eigh(cov)
# 降順にソート
idx = np.argsort(eigenvalues)[::-1]
self.eigenvalues = eigenvalues[idx]
self.eigenvectors = eigenvectors[:, idx]
# 主成分数の決定
if self.n_components is None:
cumsum = np.cumsum(self.eigenvalues) / self.eigenvalues.sum()
self.n_components = np.argmax(cumsum >= self.cumulative_ratio) + 1
self.V_k = self.eigenvectors[:, :self.n_components]
self.Lambda_k = np.diag(self.eigenvalues[:self.n_components])
# 射影行列
self.P_k = self.V_k @ self.V_k.T
self.P_perp = np.eye(X_scaled.shape[1]) - self.P_k
# 正常データでの閾値推定
spe_train = self._compute_spe(X_scaled)
self.spe_threshold = np.percentile(spe_train, 99)
return self
def _compute_spe(self, X_scaled):
diff = X_scaled - self.mean
residual = diff @ self.P_perp
return np.sum(residual ** 2, axis=1)
def _compute_t2(self, X_scaled):
diff = X_scaled - self.mean
projected = diff @ self.V_k
Lambda_inv = np.diag(1.0 / self.eigenvalues[:self.n_components])
return np.sum(projected @ Lambda_inv * projected, axis=1)
def score(self, X):
X_scaled = self.scaler.transform(X)
spe = self._compute_spe(X_scaled)
t2 = self._compute_t2(X_scaled)
return spe, t2
def predict(self, X):
spe, _ = self.score(X)
return (spe > self.spe_threshold).astype(int)
# --- データ生成 ---
n_normal = 300
n_anomaly = 20
d = 5
# 正常データ: 低ランク構造 + ノイズ
true_basis = np.random.randn(d, 2)
coeffs = np.random.randn(n_normal, 2)
X_normal = coeffs @ true_basis.T + np.random.randn(n_normal, d) * 0.3
# 異常データ: ランダムな方向にずれたデータ
X_anomaly = np.random.randn(n_anomaly, d) * 3
# --- 学習と評価 ---
detector = SubspaceAnomalyDetector(cumulative_ratio=0.90)
detector.fit(X_normal)
print(f"選択された主成分数: {detector.n_components}")
print(f"固有値: {detector.eigenvalues.round(3)}")
print(f"累積寄与率: {(np.cumsum(detector.eigenvalues) / detector.eigenvalues.sum()).round(3)}")
X_all = np.vstack([X_normal, X_anomaly])
labels = np.array([0] * n_normal + [1] * n_anomaly)
spe, t2 = detector.score(X_all)
predictions = detector.predict(X_all)
print(f"\n正常データのSPE: 平均={spe[:n_normal].mean():.3f}, 最大={spe[:n_normal].max():.3f}")
print(f"異常データのSPE: 平均={spe[n_normal:].mean():.3f}, 最大={spe[n_normal:].max():.3f}")
print(f"SPE閾値: {detector.spe_threshold:.3f}")
print(f"異常検出数: {predictions[n_normal:].sum()} / {n_anomaly}")
# --- 可視化 ---
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# SPEの分布
ax1 = axes[0]
ax1.hist(spe[:n_normal], bins=30, alpha=0.6, color='blue', label='Normal', density=True)
ax1.hist(spe[n_normal:], bins=15, alpha=0.6, color='red', label='Anomaly', density=True)
ax1.axvline(x=detector.spe_threshold, color='green', linestyle='--',
linewidth=2, label=f'Threshold')
ax1.set_xlabel('SPE')
ax1.set_ylabel('Density')
ax1.set_title('SPE Distribution')
ax1.legend()
ax1.grid(True, alpha=0.3)
# SPE vs T^2 プロット
ax2 = axes[1]
ax2.scatter(t2[:n_normal], spe[:n_normal], c='blue', alpha=0.5, s=15, label='Normal')
ax2.scatter(t2[n_normal:], spe[n_normal:], c='red', alpha=0.8, s=40,
marker='x', label='Anomaly')
ax2.axhline(y=detector.spe_threshold, color='green', linestyle='--', alpha=0.7)
ax2.set_xlabel('$T^2$ Statistic')
ax2.set_ylabel('SPE')
ax2.set_title('SPE vs $T^2$')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 累積寄与率
ax3 = axes[2]
cumsum = np.cumsum(detector.eigenvalues) / detector.eigenvalues.sum()
ax3.bar(range(1, len(detector.eigenvalues) + 1),
detector.eigenvalues / detector.eigenvalues.sum(),
alpha=0.6, color='steelblue', label='Individual')
ax3.plot(range(1, len(detector.eigenvalues) + 1), cumsum, 'ro-', label='Cumulative')
ax3.axhline(y=0.90, color='green', linestyle='--', alpha=0.7, label='90%')
ax3.set_xlabel('Principal Component')
ax3.set_ylabel('Ratio')
ax3.set_title('Explained Variance Ratio')
ax3.legend()
ax3.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
まとめ
本記事では、部分空間法による異常検知について解説しました。
- 部分空間法はPCAで得られる主部分空間と残差空間を利用して異常検知を行う
- SPE(残差空間への射影の大きさ)が主な異常度指標で、主部分空間で説明できない成分を測定する
- T二乗統計量を組み合わせることで、主部分空間内外の両方の異常を検出可能
- 主成分数 $k$ は累積寄与率に基づいて決定するのが一般的
次のステップとして、以下の記事も参考にしてください。