ベクトルデータベース入門 — IVF・HNSW・PQを理解する

RAGやセマンティック検索を支える技術として、ベクトルデータベースが注目を集めています。高次元の埋め込みベクトルを効率的に保存・検索するために、さまざまなアルゴリズムが開発されています。

本記事では、ベクトルデータベースの核心技術である近似最近傍探索(ANN: Approximate Nearest Neighbor)アルゴリズムについて、数学的な原理とPython実装を交えて解説します。

本記事の内容

  • ベクトル検索の基本と計算量の課題
  • 近似最近傍探索アルゴリズム(IVF、HNSW、PQ)
  • Pythonでのベクトル検索の実装

前提知識

この記事を読む前に、以下の記事を読んでおくと理解が深まります。

ベクトル検索の課題

最近傍探索問題

$n$ 個の $d$ 次元ベクトル $\mathcal{D} = \{\bm{x}_1, \bm{x}_2, \ldots, \bm{x}_n\}$ からなるデータベースがあるとき、クエリベクトル $\bm{q}$ に最も近いベクトルを見つける問題を最近傍探索(Nearest Neighbor Search)と呼びます。

$$ \bm{x}^* = \underset{\bm{x} \in \mathcal{D}}{\arg\min} \, d(\bm{q}, \bm{x}) $$

ここで $d(\cdot, \cdot)$ は距離関数です。一般的にはユークリッド距離が使われます。

$$ d(\bm{q}, \bm{x}) = \|\bm{q} – \bm{x}\|_2 = \sqrt{\sum_{i=1}^{d} (q_i – x_i)^2} $$

計算量の問題

全探索(Brute Force)では、クエリごとに $O(nd)$ の計算が必要です。

パラメータ 典型的な値
文書数 $n$ 100万〜10億
次元数 $d$ 384〜4096

例えば100万文書、768次元のベクトルに対して全探索を行うと、1クエリあたり約7.7億回の浮動小数点演算が必要になります。これをリアルタイムで処理するのは困難です。

近似最近傍探索(ANN)

厳密な最近傍ではなく、高い確率で近い点を高速に見つける近似最近傍探索(ANN: Approximate Nearest Neighbor)が実用的な解決策です。

精度と速度のトレードオフ

ANNは以下の指標でトレードオフを評価します。

Recall@k: 真の最近傍が上位 $k$ 件に含まれる割合

$$ \text{Recall@}k = \frac{|\text{TrueTopK} \cap \text{ANNTopK}|}{k} $$

QPS(Queries Per Second): 1秒あたりの処理クエリ数

IVF(Inverted File Index)

アルゴリズムの概要

IVFは、ベクトル空間をクラスタに分割し、クエリに近いクラスタのみを探索することで高速化します。

インデックス構築

  1. k-meansでベクトル空間を $C$ 個のクラスタに分割
  2. 各クラスタの重心(セントロイド)$\{\bm{c}_1, \ldots, \bm{c}_C\}$ を計算
  3. 各ベクトルを最も近いクラスタに割り当て

検索

  1. クエリ $\bm{q}$ に最も近い $n_{\text{probe}}$ 個のクラスタを特定
  2. 選択したクラスタ内のベクトルのみを探索

数学的定式化

クラスタ割り当て関数:

$$ \text{assign}(\bm{x}) = \underset{j \in \{1, \ldots, C\}}{\arg\min} \|\bm{x} – \bm{c}_j\|_2 $$

検索時に調べるクラスタ:

$$ \mathcal{P}(\bm{q}) = \underset{S \subseteq \{1, \ldots, C\}, |S| = n_{\text{probe}}}{\arg\min} \sum_{j \in S} \|\bm{q} – \bm{c}_j\|_2 $$

計算量は $O(C + n \cdot n_{\text{probe}} / C \cdot d)$ に削減されます。

Pythonでの実装

import numpy as np
from sklearn.cluster import KMeans

class SimpleIVF:
    def __init__(self, n_clusters=100):
        """IVFインデックスの初期化"""
        self.n_clusters = n_clusters
        self.kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        self.inverted_lists = {}  # クラスタID -> ベクトルリスト
        self.vectors = None

    def build(self, vectors):
        """インデックスを構築"""
        self.vectors = vectors
        n = len(vectors)

        # k-meansでクラスタリング
        labels = self.kmeans.fit_predict(vectors)

        # 転置リストを構築
        for i in range(self.n_clusters):
            self.inverted_lists[i] = []

        for idx, label in enumerate(labels):
            self.inverted_lists[label].append(idx)

        print(f"Built IVF index: {n} vectors, {self.n_clusters} clusters")

    def search(self, query, k=5, n_probe=10):
        """最近傍を検索"""
        # クエリに近いクラスタを特定
        centroids = self.kmeans.cluster_centers_
        distances_to_centroids = np.linalg.norm(centroids - query, axis=1)
        probe_clusters = np.argsort(distances_to_centroids)[:n_probe]

        # 選択したクラスタ内のベクトルを探索
        candidates = []
        for cluster_id in probe_clusters:
            candidates.extend(self.inverted_lists[cluster_id])

        # 候補内で全探索
        candidate_vectors = self.vectors[candidates]
        distances = np.linalg.norm(candidate_vectors - query, axis=1)

        # Top-kを返す
        top_k_local = np.argsort(distances)[:k]
        top_k_global = [candidates[i] for i in top_k_local]
        top_k_distances = distances[top_k_local]

        return top_k_global, top_k_distances

# 使用例
np.random.seed(42)
n_vectors = 10000
dim = 128
vectors = np.random.randn(n_vectors, dim).astype(np.float32)

# インデックス構築
ivf = SimpleIVF(n_clusters=100)
ivf.build(vectors)

# 検索
query = np.random.randn(dim).astype(np.float32)
indices, distances = ivf.search(query, k=5, n_probe=10)

print(f"\nQuery shape: {query.shape}")
print(f"Top-5 nearest neighbors: {indices}")
print(f"Distances: {distances}")

HNSW(Hierarchical Navigable Small World)

アルゴリズムの概要

HNSWは、スモールワールドグラフを階層的に構築することで、対数時間での検索を実現します。

グラフ構造

  • 各ベクトルをノードとするグラフを構築
  • 各ノードは近傍ノードへのエッジを持つ
  • 複数の階層を持ち、上位層はスパース、下位層はデンス

検索アルゴリズム(Greedy Search)

  1. 最上位層のエントリポイントから開始
  2. 現在のノードの近傍の中で、クエリに最も近いノードへ移動
  3. 改善がなくなったら下の層へ移動
  4. 最下層で最終的な最近傍を返す

グラフ構築の数学

ノード $\bm{x}$ の層レベルは以下の確率分布に従います。

$$ l = \lfloor -\ln(\text{uniform}(0, 1)) \cdot m_L \rfloor $$

ここで $m_L$ は層の分布を制御するパラメータです。

各層でのエッジ数(近傍数):

$$ M = \begin{cases} M_{\max} & \text{(layer 0)} \\ M & \text{(layer > 0)} \end{cases} $$

計算量

  • 検索: $O(\log n)$
  • インデックス構築: $O(n \log n)$

Pythonでの簡易実装

import numpy as np
import heapq
from collections import defaultdict

class SimpleHNSW:
    def __init__(self, dim, M=16, ef_construction=200):
        """HNSWインデックスの初期化"""
        self.dim = dim
        self.M = M  # 各ノードの最大エッジ数
        self.ef_construction = ef_construction
        self.vectors = []
        self.graphs = defaultdict(lambda: defaultdict(list))  # layer -> node -> neighbors
        self.entry_point = None
        self.max_layer = 0
        self.node_layers = {}  # node -> max_layer

    def _distance(self, a, b):
        """ユークリッド距離を計算"""
        return np.linalg.norm(np.array(a) - np.array(b))

    def _get_layer(self):
        """ランダムに層を決定"""
        ml = 1.0 / np.log(self.M)
        return int(-np.log(np.random.uniform()) * ml)

    def _search_layer(self, query, entry_points, ef, layer):
        """単一層での貪欲探索"""
        visited = set(entry_points)
        candidates = []  # (distance, node) - min heap
        results = []  # (-distance, node) - max heap

        for ep in entry_points:
            dist = self._distance(query, self.vectors[ep])
            heapq.heappush(candidates, (dist, ep))
            heapq.heappush(results, (-dist, ep))

        while candidates:
            dist_c, c = heapq.heappop(candidates)
            dist_f = -results[0][0]

            if dist_c > dist_f:
                break

            for neighbor in self.graphs[layer][c]:
                if neighbor not in visited:
                    visited.add(neighbor)
                    dist_n = self._distance(query, self.vectors[neighbor])
                    dist_f = -results[0][0]

                    if dist_n < dist_f or len(results) < ef:
                        heapq.heappush(candidates, (dist_n, neighbor))
                        heapq.heappush(results, (-dist_n, neighbor))
                        if len(results) > ef:
                            heapq.heappop(results)

        return [node for _, node in sorted(results, key=lambda x: -x[0])]

    def add(self, vector):
        """ベクトルを追加"""
        idx = len(self.vectors)
        self.vectors.append(vector)

        if self.entry_point is None:
            self.entry_point = idx
            self.node_layers[idx] = 0
            return idx

        # ランダムに層を決定
        l = min(self._get_layer(), self.max_layer + 1)
        self.node_layers[idx] = l

        # 上位層から探索して最近傍を見つける
        ep = [self.entry_point]
        for layer in range(self.max_layer, l, -1):
            ep = self._search_layer(vector, ep, 1, layer)[:1]

        # 下位層でエッジを追加
        for layer in range(min(l, self.max_layer), -1, -1):
            neighbors = self._search_layer(vector, ep, self.ef_construction, layer)
            neighbors = neighbors[:self.M]

            # 双方向エッジを追加
            for neighbor in neighbors:
                self.graphs[layer][idx].append(neighbor)
                self.graphs[layer][neighbor].append(idx)

            ep = neighbors

        # エントリポイントを更新
        if l > self.max_layer:
            self.max_layer = l
            self.entry_point = idx

        return idx

    def search(self, query, k=5, ef=50):
        """k最近傍を検索"""
        if self.entry_point is None:
            return [], []

        ep = [self.entry_point]

        # 上位層から探索
        for layer in range(self.max_layer, 0, -1):
            ep = self._search_layer(query, ep, 1, layer)[:1]

        # 最下層で詳細探索
        candidates = self._search_layer(query, ep, ef, 0)[:k]

        indices = candidates
        distances = [self._distance(query, self.vectors[i]) for i in indices]

        return indices, distances

# 使用例
np.random.seed(42)
n_vectors = 1000
dim = 32

hnsw = SimpleHNSW(dim=dim, M=16)

# ベクトルを追加
vectors = np.random.randn(n_vectors, dim).astype(np.float32)
for vec in vectors:
    hnsw.add(vec)

print(f"Built HNSW index: {len(hnsw.vectors)} vectors")

# 検索
query = np.random.randn(dim).astype(np.float32)
indices, distances = hnsw.search(query, k=5)

print(f"\nTop-5 nearest neighbors: {indices}")
print(f"Distances: {distances}")

PQ(Product Quantization)

アルゴリズムの概要

Product Quantization(PQ)は、高次元ベクトルを圧縮して保存し、近似距離計算を高速化する手法です。

圧縮の原理

$d$ 次元ベクトルを $m$ 個のサブベクトル($d/m$ 次元)に分割し、各サブベクトルを $k$ 個の代表ベクトル(セントロイド)のいずれかに量子化します。

$$ \bm{x} = [\bm{x}^1, \bm{x}^2, \ldots, \bm{x}^m] $$

各サブベクトル $\bm{x}^j$ を、対応するコードブック $\mathcal{C}^j = \{\bm{c}_1^j, \ldots, \bm{c}_k^j\}$ 内の最も近いセントロイドのインデックスで表現します。

$$ q^j(\bm{x}^j) = \underset{i \in \{1, \ldots, k\}}{\arg\min} \|\bm{x}^j – \bm{c}_i^j\|_2 $$

圧縮率

  • 元のベクトル: $d \times 32$ ビット(float32)
  • 圧縮後: $m \times \lceil\log_2 k\rceil$ ビット

例: $d=768$, $m=96$, $k=256$ の場合 – 元: $768 \times 32 = 24,576$ ビット – 圧縮後: $96 \times 8 = 768$ ビット – 圧縮率: 32倍

近似距離計算

クエリ $\bm{q}$ とデータベースベクトル $\bm{x}$ の近似距離:

$$ d(\bm{q}, \bm{x})^2 \approx \sum_{j=1}^{m} \|\bm{q}^j – \bm{c}_{q^j(\bm{x}^j)}^j\|_2^2 $$

距離テーブルを事前計算することで、検索時間を大幅に短縮できます。

Pythonでの実装

import numpy as np
from sklearn.cluster import KMeans

class SimpleProductQuantization:
    def __init__(self, dim, m=8, k=256):
        """PQの初期化"""
        self.dim = dim
        self.m = m  # サブベクトル数
        self.k = k  # コードブックサイズ
        self.sub_dim = dim // m
        self.codebooks = []  # m個のコードブック
        self.codes = None  # 圧縮されたデータベース

    def train(self, vectors):
        """コードブックを学習"""
        n = len(vectors)
        vectors = vectors.reshape(n, self.m, self.sub_dim)

        self.codebooks = []
        for j in range(self.m):
            # 各サブベクトル空間でk-means
            kmeans = KMeans(n_clusters=self.k, random_state=42, n_init=1)
            kmeans.fit(vectors[:, j, :])
            self.codebooks.append(kmeans.cluster_centers_)

        print(f"Trained PQ: {self.m} subvectors, {self.k} centroids each")

    def encode(self, vectors):
        """ベクトルを圧縮"""
        n = len(vectors)
        vectors = vectors.reshape(n, self.m, self.sub_dim)

        codes = np.zeros((n, self.m), dtype=np.uint8)
        for j in range(self.m):
            # 最も近いセントロイドのインデックスを割り当て
            distances = np.linalg.norm(
                vectors[:, j, :][:, np.newaxis, :] - self.codebooks[j],
                axis=2
            )
            codes[:, j] = np.argmin(distances, axis=1)

        return codes

    def build(self, vectors):
        """インデックスを構築"""
        self.train(vectors)
        self.codes = self.encode(vectors)
        print(f"Built PQ index: {len(vectors)} vectors")

    def search(self, query, k=5):
        """最近傍を検索"""
        # 距離テーブルを事前計算
        query = query.reshape(self.m, self.sub_dim)
        distance_table = np.zeros((self.m, self.k))

        for j in range(self.m):
            distance_table[j] = np.linalg.norm(
                self.codebooks[j] - query[j], axis=1
            ) ** 2

        # 近似距離を計算
        n = len(self.codes)
        approx_distances = np.zeros(n)

        for i in range(n):
            for j in range(self.m):
                approx_distances[i] += distance_table[j, self.codes[i, j]]

        approx_distances = np.sqrt(approx_distances)

        # Top-kを返す
        top_k_idx = np.argsort(approx_distances)[:k]
        return top_k_idx, approx_distances[top_k_idx]

# 使用例
np.random.seed(42)
n_vectors = 10000
dim = 128  # mで割り切れる値

vectors = np.random.randn(n_vectors, dim).astype(np.float32)

# インデックス構築
pq = SimpleProductQuantization(dim=dim, m=8, k=256)
pq.build(vectors)

# メモリ使用量の比較
original_size = vectors.nbytes
compressed_size = pq.codes.nbytes
print(f"\nOriginal size: {original_size / 1024:.1f} KB")
print(f"Compressed size: {compressed_size / 1024:.1f} KB")
print(f"Compression ratio: {original_size / compressed_size:.1f}x")

# 検索
query = np.random.randn(dim).astype(np.float32)
indices, distances = pq.search(query, k=5)

print(f"\nTop-5 nearest neighbors: {indices}")
print(f"Approximate distances: {distances}")

主要なベクトルデータベース

実用的なベクトルデータベースの例を紹介します。

名称 特徴 主なアルゴリズム
Faiss Meta AI製。高速で柔軟 IVF, HNSW, PQ
Pinecone フルマネージドSaaS 独自実装
Milvus オープンソース IVF, HNSW
Chroma 軽量でシンプル HNSW
Weaviate GraphQL対応 HNSW

まとめ

本記事では、ベクトルデータベースの核心技術である近似最近傍探索アルゴリズムを解説しました。

  • IVF: クラスタリングによる空間分割で探索範囲を削減
  • HNSW: 階層的グラフ構造で対数時間検索を実現
  • PQ: ベクトル圧縮でメモリ使用量を削減

次のステップとして、以下の記事も参考にしてください。