トークナイゼーション(BPE/WordPiece/SentencePiece)を解説

自然言語処理(NLP)において、テキストをモデルに入力するための最初のステップがトークナイゼーション(tokenization)です。人間が読む文章をどのような単位に分割するかは、モデルの性能・語彙サイズ・未知語への対応力に直結する極めて重要な問題です。

近年の大規模言語モデル(BERT、GPT など)では、単語レベルでも文字レベルでもないサブワード分割が標準的に用いられています。本記事では、サブワードトークナイゼーションの代表的手法である BPE(Byte Pair Encoding)、WordPiece、Unigram Language Model、そして SentencePiece について、アルゴリズムの理論を数式で丁寧に導出し、Python でスクラッチ実装します。

本記事の内容

  • トークナイゼーションの基本と語彙サイズ・OOV 問題のトレードオフ
  • 文字レベル vs 単語レベルの比較
  • BPE(Byte Pair Encoding)のアルゴリズムと具体的なマージ手順
  • WordPiece の尤度ベースマージ基準の導出
  • Unigram Language Model の周辺尤度最大化と EM アルゴリズム
  • SentencePiece の特徴(言語非依存・生テキスト入力)
  • 各手法の比較表
  • Python でのスクラッチ実装(BPE・WordPiece・Unigram)

前提知識

この記事を読む前に、以下の記事を読んでおくと理解が深まります。

トークナイゼーションとは

トークナイゼーションとは、テキスト(文字列)をトークンと呼ばれる離散的な単位に分割する処理のことです。NLP モデルは入力としてトークンの列を受け取り、各トークンを語彙(vocabulary)のインデックスに対応付けて処理します。

語彙サイズと OOV 問題

語彙 $\mathcal{V}$ のサイズ $|\mathcal{V}|$ をどう設定するかは重要な設計判断です。

単語レベルトークナイゼーションでは、テキストを空白や句読点で区切り、各単語を1つのトークンとします。英語の場合、一般的なテキストに対応するには $|\mathcal{V}| \geq 100{,}000$ 程度が必要になりますが、それでも訓練時に出現しなかった単語は未知語(OOV: Out-Of-Vocabulary)として [UNK] トークンに置換されてしまいます。

語彙サイズを大きくすれば OOV は減りますが、以下の問題が生じます。

  • 埋め込み行列 $\bm{E} \in \mathbb{R}^{|\mathcal{V}| \times d}$ のパラメータ数が膨大になる
  • 低頻度語の埋め込みが十分に学習されない
  • 形態的に類似する語(play, playing, played)が独立したトークンとなり、形態情報が共有されない

文字レベルトークナイゼーションでは、各文字(または各バイト)を1つのトークンとします。語彙サイズは非常に小さく(英語で数十、Unicode 全体でも数千程度)、OOV は原理的に発生しません。しかし、1つの単語が多数のトークンに分割されるため系列長が爆発し、長距離依存の学習が困難になります。

サブワードトークナイゼーション

サブワードトークナイゼーションは、単語レベルと文字レベルの中間を取るアプローチです。頻出する単語はそのまま1トークンとし、稀な単語はより小さなサブワード単位に分割します。

例えば “tokenization” という単語に対して:

方式 分割結果 トークン数
単語レベル tokenization 1
サブワード token + ization 2
文字レベル t + o + k + e + n + i + z + a + t + i + o + n 12

サブワード方式では、語彙サイズを $|\mathcal{V}| \approx 30{,}000 \sim 50{,}000$ 程度に抑えつつ、OOV を実質的にゼロにできます。さらに、”tokenize” と “tokenization” が共通のサブワード “token” を共有するため、形態的な情報も自然に活用されます。

BPE(Byte Pair Encoding)

BPE は元々データ圧縮アルゴリズムとして提案された手法を、Sennrich et al.(2016)がサブワードトークナイゼーションに応用したものです。

アルゴリズム

BPE のアルゴリズムは以下の通りです。

初期化: テキストコーパス中の各単語を文字列に分解し、末尾に特殊記号 </w>(語末マーカー)を付加します。初期語彙 $\mathcal{V}_0$ は全ての個別文字と </w> の集合です。

反復マージ: 以下を $K$ 回繰り返します($K$ はハイパーパラメータ)。

  1. コーパス中の全てのトークン列において、隣接する2つのトークンのペア $(a, b)$ を数え上げ、最も頻出するペア $(a^*, b^*)$ を見つける

$$ (a^*, b^*) = \arg\max_{(a,b)} \text{count}(a, b) $$

  1. コーパス中の全ての $(a^*, b^*)$ の出現を、新しいトークン $ab$ ($a$ と $b$ を連結したもの)に置換する
  2. $ab$ を語彙 $\mathcal{V}$ に追加する

最終的な語彙サイズは $|\mathcal{V}_0| + K$ となります。

具体例

コーパスが以下の単語頻度で構成されているとします。

単語 頻度
low 5
lower 2
newest 6
widest 3

ステップ 0(初期化): 各単語を文字に分解します。

l o w </w>       : 5
l o w e r </w>   : 2
n e w e s t </w> : 6
w i d e s t </w> : 3

初期語彙: {l, o, w, e, r, n, s, t, i, d, </w>}

ステップ 1: 隣接ペアの頻度を数えます。

ペア 頻度
(e, s) 6 + 3 = 9
(s, t) 6 + 3 = 9
(l, o) 5 + 2 = 7
(o, w) 5 + 2 = 7
(t, ) 6 + 3 = 9
(n, e) 6
(w, e) 6

最頻ペアが複数ある場合は任意に1つ選びます。ここでは (e, s) を選びマージします。

l o w </w>        : 5
l o w e r </w>    : 2
n e w es t </w>   : 6
w i d es t </w>   : 3

語彙に es を追加。

ステップ 2: 次の最頻ペアは (es, t) で頻度 9 です。マージします。

l o w </w>        : 5
l o w e r </w>    : 2
n e w est </w>    : 6
w i d est </w>    : 3

語彙に est を追加。

ステップ 3: 次の最頻ペアは (est, </w>) で頻度 9 です。マージします。

l o w </w>        : 5
l o w e r </w>    : 2
n e w est</w>     : 6
w i d est</w>     : 3

語彙に est</w> を追加。

このようにマージを繰り返すことで、頻出パターンが徐々に長いトークンとして語彙に登録されていきます。

エンコード時の手順

学習済みの BPE を用いて新しいテキストをエンコードする際は、学習時に記録したマージルール(ペアの優先順位リスト)を、優先度の高い順に適用します。つまり、最初に学習されたマージから順番に、テキスト中で該当するペアを結合していきます。

WordPiece

WordPiece は Google が開発したサブワードトークナイゼーション手法で、BERT で採用されています。BPE と構造は似ていますが、マージの選択基準が異なります。

マージ基準の導出

BPE では単純な頻度に基づいてペアを選択しますが、WordPiece では言語モデルの尤度の増加量を基準とします。

テキストコーパス $\mathcal{D}$ の unigram 対数尤度を

$$ \mathcal{L} = \sum_{w \in \mathcal{D}} \log P(w) $$

と定義します。ここで $P(w)$ は単語(トークン) $w$ の unigram 確率で、$P(w) = \frac{\text{count}(w)}{\sum_{w’}\text{count}(w’)}$ です。

ペア $(a, b)$ をマージして新トークン $ab$ を作った場合、尤度の変化量 $\Delta\mathcal{L}$ を計算しましょう。マージ前の $a$ と $b$ による尤度への寄与は

$$ \mathcal{L}_{\text{before}} = \text{count}(ab) \cdot \log P(a) + \text{count}(ab) \cdot \log P(b) + (\text{他の文脈での } a, b \text{ の寄与}) $$

ここで $\text{count}(ab)$ は $a$ と $b$ が隣接して出現する回数です。マージ後は隣接出現が $ab$ に置換されるため

$$ \mathcal{L}_{\text{after}} = \text{count}(ab) \cdot \log P(ab) + (\text{他の文脈での } a, b \text{ の寄与}) $$

他の文脈での寄与は変わらないとすると(近似)、尤度の変化量は

$$ \Delta\mathcal{L} \approx \text{count}(ab) \cdot \left[\log P(ab) – \log P(a) – \log P(b)\right] $$

$$ = \text{count}(ab) \cdot \log \frac{P(ab)}{P(a) \cdot P(b)} $$

この $\frac{P(ab)}{P(a) \cdot P(b)}$ は自己相互情報量(Pointwise Mutual Information, PMI)と同じ形をしています。マージ候補の選択基準は

$$ (a^*, b^*) = \arg\max_{(a,b)} \log \frac{P(ab)}{P(a) \cdot P(b)} $$

となります。$\text{count}(ab)$ を含める場合と含めない場合がありますが、WordPiece の典型的な実装では PMI スコア自体を使います。

直感的な解釈

BPE が「最も頻繁に隣接するペア」を選ぶのに対し、WordPiece は「共起が独立な場合に比べて最も有意に高いペア」を選びます。つまり、$a$ と $b$ が個別に頻出していても、それが偶然の共起である場合は PMI スコアは低くなります。逆に、$a$ と $b$ の個別頻度は低くても、隣接出現が偶然では説明できないほど多い場合に高いスコアを得ます。

エンコード時の手順

WordPiece のエンコードでは、未知の単語を最長一致法(greedy longest-match-first)で分割します。語彙中の最長のサブワードから順にマッチングを試み、先頭から貪欲に切り出していきます。単語の先頭以外のサブワードには ## プレフィックスが付きます。

例: “unaffable” → un, ##aff, ##able

Unigram Language Model

Kudo(2018)が提案した Unigram LM は、BPE や WordPiece とは逆のアプローチを取ります。大きな初期語彙から始めて、不要なトークンを削減していく方式です。

定式化

テキスト $X = (x_1, x_2, \dots, x_N)$ に対して、サブワード分割 $\bm{s} = (s_1, s_2, \dots, s_M)$ を考えます。ここで $s_1 s_2 \cdots s_M = X$(連結すると元のテキストになる)です。

各サブワード $s_i$ の確率を $p(s_i)$ とする unigram モデルでは、分割の確率は

$$ P(\bm{s}) = \prod_{i=1}^{M} p(s_i) $$

テキスト $X$ に対する最適な分割は

$$ \bm{s}^* = \arg\max_{\bm{s} \in \mathcal{S}(X)} P(\bm{s}) = \arg\max_{\bm{s} \in \mathcal{S}(X)} \sum_{i=1}^{M} \log p(s_i) $$

ここで $\mathcal{S}(X)$ は $X$ の全ての有効な分割の集合です。

周辺尤度の最大化

コーパス全体 $\mathcal{D} = \{X_1, X_2, \dots, X_D\}$ に対する周辺尤度は、各テキストの全分割の確率の和です。

$$ \mathcal{L} = \sum_{d=1}^{D} \log P(X_d) = \sum_{d=1}^{D} \log \left( \sum_{\bm{s} \in \mathcal{S}(X_d)} P(\bm{s}) \right) $$

$$ = \sum_{d=1}^{D} \log \left( \sum_{\bm{s} \in \mathcal{S}(X_d)} \prod_{i=1}^{|\bm{s}|} p(s_i) \right) $$

この周辺尤度を最大化するパラメータ $\{p(s)\}_{s \in \mathcal{V}}$ を求めるために、EM アルゴリズムを使います。

EM アルゴリズム

E ステップ: 現在のパラメータ $\{p^{(t)}(s)\}$ のもとで、各分割の事後確率を計算します。テキスト $X_d$ に対する分割 $\bm{s}$ の事後確率は

$$ q(\bm{s} | X_d) = \frac{P(\bm{s})}{\sum_{\bm{s}’ \in \mathcal{S}(X_d)} P(\bm{s}’)} = \frac{\prod_{i} p^{(t)}(s_i)}{\sum_{\bm{s}’} \prod_{j} p^{(t)}(s’_j)} $$

各サブワード $s$ の期待出現回数を計算します。

$$ c(s) = \sum_{d=1}^{D} \sum_{\bm{s} \in \mathcal{S}(X_d)} q(\bm{s} | X_d) \cdot \text{count}(s \text{ in } \bm{s}) $$

M ステップ: 期待出現回数を用いてパラメータを更新します。

$$ p^{(t+1)}(s) = \frac{c(s)}{\sum_{s’ \in \mathcal{V}} c(s’)} $$

実際には、全ての分割を列挙することは計算量的に困難なため、Viterbi 近似(最も確率の高い分割のみを用いる)や前向きアルゴリズムによる効率的な計算が使われます。

語彙の削減

EM アルゴリズムで確率を推定した後、各トークン $s$ を語彙から除いた場合の尤度の減少量

$$ \text{loss}(s) = \mathcal{L} – \mathcal{L}_{-s} $$

を計算し、$\text{loss}(s)$ の小さいトークン(除いても尤度がほとんど下がらないトークン)を削除します。これを目標語彙サイズに達するまで繰り返します。ただし、個別の文字は必ず保持し、OOV を防ぎます。

Viterbi 分割

学習済みの確率 $\{p(s)\}$ を用いてテキストを分割する際は、Viterbi アルゴリズム(動的計画法)を使います。テキスト $X = x_1 x_2 \cdots x_n$ に対して、位置 $j$ までの最適分割の対数確率を $\text{dp}[j]$ とすると

$$ \text{dp}[j] = \max_{i < j,\, x_{i+1}\cdots x_j \in \mathcal{V}} \left( \text{dp}[i] + \log p(x_{i+1}\cdots x_j) \right) $$

初期条件 $\text{dp}[0] = 0$ で、$\text{dp}[n]$ が最適分割の対数確率を与えます。バックトレースにより最適な分割を復元できます。

SentencePiece

SentencePiece(Kudo & Richardson, 2018)は、BPE や Unigram LM を言語非依存に実装するためのフレームワークです。

主な特徴

  1. 生テキスト入力: 事前のトークナイゼーション(空白分割等)を必要としません。テキストを Unicode 文字列としてそのまま受け取り、空白も通常の文字として扱います。これにより、日本語・中国語など空白で単語が区切られない言語にも対応できます。

  2. 空白の記号化: 空白を特殊記号 (U+2581)に置換してから処理します。これにより、デコード時にトークン列から元のテキストを完全に復元できます(可逆変換)。

  3. サブワードアルゴリズムの選択: BPE または Unigram LM のいずれかをバックエンドのアルゴリズムとして選択できます。

  4. バイトフォールバック: 語彙に含まれない文字に対して、UTF-8 のバイト列にフォールバックすることで OOV を完全に排除できます。

SentencePiece における処理の流れ

  1. 入力テキストの正規化(NFKC 正規化等)
  2. 空白を に置換
  3. 選択されたアルゴリズム(BPE or Unigram LM)でサブワード分割を学習・適用
  4. サブワード列を出力

各手法の比較

特性 BPE WordPiece Unigram LM
マージ方向 ボトムアップ(マージ) ボトムアップ(マージ) トップダウン(削減)
マージ基準 頻度最大 PMI 最大 尤度減少最小
確率モデル なし(決定的) あり(暗黙的) あり(明示的)
エンコード マージルール適用 最長一致法 Viterbi 分割
分割の一意性 一意 一意 確率的(複数候補)
使用モデル GPT 系 BERT T5, mBART
SentencePiece 対応 はい いいえ はい

Python でのスクラッチ実装

BPE の実装

import re
from collections import Counter, defaultdict

class BPETokenizer:
    """Byte Pair Encoding トークナイザのミニマル実装"""

    def __init__(self, num_merges=100):
        self.num_merges = num_merges
        self.merges = []  # マージルールの順序リスト
        self.vocab = set()

    def _get_word_freqs(self, corpus):
        """コーパスから単語頻度を取得し、文字に分解する"""
        word_freqs = Counter()
        for sentence in corpus:
            words = sentence.strip().split()
            for word in words:
                word_freqs[word] += 1
        # 各単語を文字のタプルに変換(末尾に </w> を追加)
        split_words = {}
        for word, freq in word_freqs.items():
            chars = tuple(list(word) + ["</w>"])
            split_words[chars] = freq
        return split_words

    def _get_pair_freqs(self, split_words):
        """隣接ペアの頻度を計算する"""
        pair_freqs = Counter()
        for chars, freq in split_words.items():
            for i in range(len(chars) - 1):
                pair_freqs[(chars[i], chars[i + 1])] += freq
        return pair_freqs

    def _merge_pair(self, pair, split_words):
        """指定されたペアをマージする"""
        new_split_words = {}
        merged = pair[0] + pair[1]
        for chars, freq in split_words.items():
            new_chars = []
            i = 0
            while i < len(chars):
                if i < len(chars) - 1 and chars[i] == pair[0] and chars[i + 1] == pair[1]:
                    new_chars.append(merged)
                    i += 2
                else:
                    new_chars.append(chars[i])
                    i += 1
            new_split_words[tuple(new_chars)] = freq
        return new_split_words

    def train(self, corpus):
        """BPE を学習する"""
        split_words = self._get_word_freqs(corpus)

        # 初期語彙を構築
        self.vocab = set()
        for chars in split_words.keys():
            for c in chars:
                self.vocab.add(c)

        print(f"初期語彙サイズ: {len(self.vocab)}")

        for step in range(self.num_merges):
            pair_freqs = self._get_pair_freqs(split_words)
            if not pair_freqs:
                break

            # 最頻ペアを選択
            best_pair = max(pair_freqs, key=pair_freqs.get)
            best_freq = pair_freqs[best_pair]

            # マージを実行
            split_words = self._merge_pair(best_pair, split_words)
            merged_token = best_pair[0] + best_pair[1]
            self.merges.append(best_pair)
            self.vocab.add(merged_token)

            if step < 10 or step % 20 == 0:
                print(f"ステップ {step + 1}: ({best_pair[0]}, {best_pair[1]}) -> "
                      f"{merged_token} (頻度: {best_freq})")

        print(f"最終語彙サイズ: {len(self.vocab)}")

    def encode(self, word):
        """単語をBPEトークン列に変換する"""
        tokens = list(word) + ["</w>"]

        for pair in self.merges:
            i = 0
            while i < len(tokens) - 1:
                if tokens[i] == pair[0] and tokens[i + 1] == pair[1]:
                    tokens = tokens[:i] + [pair[0] + pair[1]] + tokens[i + 2:]
                else:
                    i += 1
        return tokens


# BPE のデモ
corpus = [
    "low low low low low",
    "lower lower",
    "newest newest newest newest newest newest",
    "widest widest widest",
]

bpe = BPETokenizer(num_merges=15)
bpe.train(corpus)

print("\n--- エンコード例 ---")
for word in ["newest", "lowest", "newer"]:
    tokens = bpe.encode(word)
    print(f"{word} -> {tokens}")

上記のコードでは、コーパスから単語頻度を取得し、隣接ペアの頻度を計算して、最頻ペアを逐次マージしていきます。エンコード時にはマージルールを学習順に適用することで、同じ分割結果を再現します。

WordPiece の実装

import math
from collections import Counter

class WordPieceTokenizer:
    """WordPiece トークナイザのミニマル実装"""

    def __init__(self, num_merges=100):
        self.num_merges = num_merges
        self.vocab = set()
        self.merges = []

    def _get_word_freqs(self, corpus):
        """コーパスから単語頻度を取得"""
        word_freqs = Counter()
        for sentence in corpus:
            for word in sentence.strip().split():
                word_freqs[word] += 1
        split_words = {}
        for word, freq in word_freqs.items():
            chars = tuple(list(word) + ["</w>"])
            split_words[chars] = freq
        return split_words

    def _get_token_freqs(self, split_words):
        """各トークンの出現頻度を計算"""
        token_freqs = Counter()
        for chars, freq in split_words.items():
            for c in chars:
                token_freqs[c] += freq
        return token_freqs

    def _get_pair_scores(self, split_words):
        """PMI ベースのペアスコアを計算する"""
        pair_freqs = Counter()
        token_freqs = Counter()
        total = 0

        for chars, freq in split_words.items():
            for i in range(len(chars)):
                token_freqs[chars[i]] += freq
                total += freq
                if i < len(chars) - 1:
                    pair_freqs[(chars[i], chars[i + 1])] += freq

        # PMI スコア: log(P(ab) / (P(a) * P(b)))
        # P(ab) ∝ count(ab), P(a) ∝ count(a), P(b) ∝ count(b)
        # score = count(ab) / (count(a) * count(b)) に比例
        pair_scores = {}
        for pair, freq in pair_freqs.items():
            a, b = pair
            score = freq / (token_freqs[a] * token_freqs[b])
            pair_scores[pair] = score

        return pair_scores

    def _merge_pair(self, pair, split_words):
        """ペアをマージする"""
        new_split_words = {}
        merged = pair[0] + pair[1]
        for chars, freq in split_words.items():
            new_chars = []
            i = 0
            while i < len(chars):
                if i < len(chars) - 1 and chars[i] == pair[0] and chars[i + 1] == pair[1]:
                    new_chars.append(merged)
                    i += 2
                else:
                    new_chars.append(chars[i])
                    i += 1
            new_split_words[tuple(new_chars)] = freq
        return new_split_words

    def train(self, corpus):
        """WordPiece を学習する"""
        split_words = self._get_word_freqs(corpus)

        self.vocab = set()
        for chars in split_words.keys():
            for c in chars:
                self.vocab.add(c)

        print(f"初期語彙サイズ: {len(self.vocab)}")

        for step in range(self.num_merges):
            pair_scores = self._get_pair_scores(split_words)
            if not pair_scores:
                break

            best_pair = max(pair_scores, key=pair_scores.get)
            best_score = pair_scores[best_pair]

            split_words = self._merge_pair(best_pair, split_words)
            merged_token = best_pair[0] + best_pair[1]
            self.merges.append(best_pair)
            self.vocab.add(merged_token)

            if step < 10 or step % 20 == 0:
                print(f"ステップ {step + 1}: ({best_pair[0]}, {best_pair[1]}) -> "
                      f"{merged_token} (PMIスコア: {best_score:.6f})")

        print(f"最終語彙サイズ: {len(self.vocab)}")

    def encode(self, word):
        """最長一致法でエンコードする"""
        tokens = []
        chars = word + "</w>"
        start = 0
        while start < len(chars):
            end = len(chars)
            found = False
            while start < end:
                substr = chars[start:end]
                if start > 0:
                    substr = "##" + substr
                if substr.replace("##", "") in self.vocab or substr in self.vocab:
                    tokens.append(substr)
                    found = True
                    break
                end -= 1
            if not found:
                tokens.append("[UNK]")
                start += 1
            else:
                if start > 0:
                    start = end
                else:
                    start = end
        return tokens


# WordPiece のデモ
corpus = [
    "low low low low low",
    "lower lower",
    "newest newest newest newest newest newest",
    "widest widest widest",
]

wp = WordPieceTokenizer(num_merges=15)
wp.train(corpus)

print("\n--- WordPiece エンコード例 ---")
for word in ["newest", "lowest", "wider"]:
    tokens = wp.encode(word)
    print(f"{word} -> {tokens}")

WordPiece の実装のポイントは _get_pair_scores メソッドです。BPE の頻度最大とは異なり、$\text{count}(ab) / (\text{count}(a) \times \text{count}(b))$ という PMI に基づくスコアで選択を行います。

Unigram LM の実装

import numpy as np
from collections import Counter

class UnigramTokenizer:
    """Unigram Language Model トークナイザのミニマル実装"""

    def __init__(self, target_vocab_size=50, max_piece_length=5):
        self.target_vocab_size = target_vocab_size
        self.max_piece_length = max_piece_length
        self.vocab = {}  # token -> log_prob

    def _initialize_vocab(self, corpus):
        """初期語彙を構築する(全サブストリングの頻度)"""
        substring_freqs = Counter()
        char_set = set()

        for sentence in corpus:
            for word in sentence.strip().split():
                word = word + "</w>"
                for i in range(len(word)):
                    for j in range(i + 1, min(i + self.max_piece_length + 1, len(word) + 1)):
                        substring_freqs[word[i:j]] += 1
                for c in word:
                    char_set.add(c)

        # 頻度の高いサブストリングを初期語彙とする
        total = sum(substring_freqs.values())
        self.vocab = {}
        for substr, freq in substring_freqs.most_common(self.target_vocab_size * 5):
            self.vocab[substr] = np.log(freq / total)

        # 個別の文字は必ず含める
        for c in char_set:
            if c not in self.vocab:
                self.vocab[c] = np.log(1e-10)

        self.char_set = char_set
        print(f"初期語彙サイズ: {len(self.vocab)}")

    def _viterbi_decode(self, text):
        """Viterbi アルゴリズムで最適分割を求める"""
        n = len(text)
        # dp[j] = 位置 j までの最適対数確率
        dp = [-float('inf')] * (n + 1)
        dp[0] = 0.0
        backpointer = [0] * (n + 1)

        for j in range(1, n + 1):
            for i in range(max(0, j - self.max_piece_length), j):
                piece = text[i:j]
                if piece in self.vocab:
                    score = dp[i] + self.vocab[piece]
                    if score > dp[j]:
                        dp[j] = score
                        backpointer[j] = i

        # バックトレース
        tokens = []
        j = n
        while j > 0:
            i = backpointer[j]
            tokens.append(text[i:j])
            j = i
        tokens.reverse()
        return tokens, dp[n]

    def _compute_loss(self, corpus_words):
        """各トークンを除いた場合の尤度減少量を計算する"""
        token_counts = Counter()
        total_log_prob = 0.0

        for word, freq in corpus_words.items():
            tokens, log_prob = self._viterbi_decode(word)
            total_log_prob += log_prob * freq
            for token in tokens:
                token_counts[token] += freq

        return token_counts, total_log_prob

    def train(self, corpus, n_iterations=5):
        """Unigram LM を学習する"""
        self._initialize_vocab(corpus)

        # コーパスの単語頻度
        word_freqs = Counter()
        for sentence in corpus:
            for word in sentence.strip().split():
                word_freqs[word + "</w>"] += 1

        for iteration in range(n_iterations):
            # E ステップ(Viterbi 近似): 最適分割で各トークンの頻度を計算
            token_counts = Counter()
            total_log_prob = 0.0

            for word, freq in word_freqs.items():
                tokens, log_prob = self._viterbi_decode(word)
                total_log_prob += log_prob * freq
                for token in tokens:
                    token_counts[token] += freq

            # M ステップ: 確率を更新
            total_count = sum(token_counts.values())
            for token in self.vocab:
                if token in token_counts:
                    self.vocab[token] = np.log(token_counts[token] / total_count)
                else:
                    self.vocab[token] = np.log(1e-10)

            print(f"反復 {iteration + 1}: 対数尤度 = {total_log_prob:.2f}, "
                  f"語彙サイズ = {len(self.vocab)}")

            # 語彙の削減
            if len(self.vocab) > self.target_vocab_size:
                # 各トークンの loss を計算(簡易版: 頻度の低いトークンを削除)
                scored = []
                for token, log_prob in self.vocab.items():
                    # 単一文字は削除しない
                    if len(token) == 1 or token in self.char_set:
                        continue
                    count = token_counts.get(token, 0)
                    scored.append((token, count))

                # 頻度の低い順にソートして削除
                scored.sort(key=lambda x: x[1])
                n_remove = max(1, int(len(self.vocab) * 0.1))
                n_remove = min(n_remove, len(self.vocab) - self.target_vocab_size)

                for token, _ in scored[:n_remove]:
                    if len(self.vocab) <= self.target_vocab_size:
                        break
                    del self.vocab[token]

        print(f"最終語彙サイズ: {len(self.vocab)}")

    def encode(self, word):
        """Viterbi 分割でエンコードする"""
        word = word + "</w>"
        tokens, log_prob = self._viterbi_decode(word)
        return tokens


# Unigram LM のデモ
corpus = [
    "low low low low low",
    "lower lower",
    "newest newest newest newest newest newest",
    "widest widest widest",
    "new new new new",
    "wide wide wide",
]

unigram = UnigramTokenizer(target_vocab_size=30, max_piece_length=6)
unigram.train(corpus, n_iterations=10)

print("\n--- Unigram LM エンコード例 ---")
for word in ["newest", "lowest", "wider"]:
    tokens = unigram.encode(word)
    print(f"{word} -> {tokens}")

各トークナイザの結果を比較する可視化

import numpy as np
import matplotlib.pyplot as plt

# 語彙サイズと OOV 率の関係をシミュレーション
np.random.seed(42)

# Zipf の法則に従う単語頻度を生成
n_words = 10000
ranks = np.arange(1, n_words + 1)
freqs = 1.0 / ranks
freqs /= freqs.sum()

# 累積カバレッジ
cumulative_coverage = np.cumsum(freqs)

# 異なる語彙サイズでのカバレッジ
vocab_sizes = [100, 500, 1000, 2000, 5000, 10000]

plt.figure(figsize=(10, 6))
plt.plot(ranks, cumulative_coverage * 100, 'b-', linewidth=2)
for vs in vocab_sizes:
    coverage = cumulative_coverage[vs - 1] * 100
    plt.axvline(x=vs, color='gray', linestyle='--', alpha=0.5)
    plt.annotate(f'|V|={vs}\n{coverage:.1f}%',
                xy=(vs, coverage), fontsize=8,
                xytext=(vs + 200, coverage - 5),
                arrowprops=dict(arrowstyle='->', color='red'))

plt.xlabel('Vocabulary Size', fontsize=12)
plt.ylabel('Corpus Coverage (%)', fontsize=12)
plt.title('Vocabulary Size vs Corpus Coverage (Zipf Distribution)', fontsize=14)
plt.xscale('log')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

このグラフは Zipf の法則に従う単語頻度分布において、語彙サイズに対するコーパスカバレッジの関係を示しています。語彙サイズを増やしてもカバレッジの改善は逓減し、少数の高頻度語で大部分のテキストをカバーできることが分かります。サブワード方式はこの性質を活用し、高頻度語はそのまま保持しつつ、低頻度語をサブワードに分解することで効率的な語彙を構築します。

まとめ

本記事では、自然言語処理の基盤であるトークナイゼーションについて解説しました。

  • 語彙サイズと OOV 問題はトレードオフの関係にあり、サブワード分割がその解決策となる
  • BPE は最頻ペアの逐次マージという単純なアルゴリズムで、GPT 系モデルで広く使われている
  • WordPiece は PMI($P(ab) / P(a)P(b)$)に基づくマージ基準を持ち、BERT で採用されている
  • Unigram LM は周辺尤度最大化と EM アルゴリズムに基づき、トップダウンで語彙を削減する
  • SentencePiece は言語非依存のフレームワークとして BPE/Unigram LM を統合的に利用できる
  • 各手法とも Python でスクラッチ実装が可能であり、アルゴリズムの本質を理解する助けとなる

次のステップとして、以下の記事も参考にしてください。