A3C(Asynchronous Advantage Actor-Critic)の理論

A3C(Asynchronous Advantage Actor-Critic)はDeepMindが2016年に発表した深層強化学習アルゴリズムです。複数のエージェントが非同期に並列で学習することで、Experience Replayなしでもデータの相関を除去し、効率的かつ安定した学習を実現します。Actor-Criticアーキテクチャを基盤とし、Advantage関数でベースラインを自然に組み込んでいます。

本記事の内容

  • Actor-Criticアーキテクチャの基本
  • Advantage関数の意味と推定方法
  • A3Cの非同期学習の仕組みと利点
  • A2C(同期版)との比較
  • Pythonでの実装

前提知識

この記事を読む前に、以下の記事を読んでおくと理解が深まります。

Actor-Criticアーキテクチャ

概要

Actor-Criticは、方策ベース(Actor)と価値ベース(Critic)のアプローチを組み合わせた手法です。

  • Actor(行動者): 方策 $\pi_{\bm{\theta}}(a|s)$ を出力し、行動を決定する
  • Critic(批評家): 状態価値関数 $V_{\bm{w}}(s)$ を推定し、Actorの行動を評価する

REINFORCEからの改良

REINFORCEアルゴリズムの勾配は

$$ \nabla_{\bm{\theta}} J(\bm{\theta}) = E\left[\sum_{t} \nabla_{\bm{\theta}} \log \pi_{\bm{\theta}}(a_t|s_t) \cdot G_t\right] $$

ここで $G_t = \sum_{k=t}^{T-1} \gamma^{k-t} r_{k+1}$ はモンテカルロ収益であり、エピソード終了まで待つ必要があり、分散が大きいという問題がありました。

Actor-Criticでは、$G_t$ の代わりにCriticが推定したアドバンテージを使います。

$$ \nabla_{\bm{\theta}} J(\bm{\theta}) = E\left[\sum_{t} \nabla_{\bm{\theta}} \log \pi_{\bm{\theta}}(a_t|s_t) \cdot \hat{A}_t\right] $$

エピソード途中でも更新でき(ブートストラップ)、分散が低減します。

Actor-Criticの更新則

Actor の更新(方策パラメータ $\bm{\theta}$):

$$ \bm{\theta} \leftarrow \bm{\theta} + \alpha_{\theta} \nabla_{\bm{\theta}} \log \pi_{\bm{\theta}}(a_t|s_t) \cdot \hat{A}_t $$

Critic の更新(価値関数パラメータ $\bm{w}$):

$$ \bm{w} \leftarrow \bm{w} – \alpha_w \nabla_{\bm{w}} (V_{\bm{w}}(s_t) – V_t^{\text{target}})^2 $$

Advantage関数

定義

Advantage関数は、ある状態で特定の行動を取ることが、平均的な行動と比べてどれだけ良いかを表します。

$$ \boxed{A^{\pi}(s, a) = Q^{\pi}(s, a) – V^{\pi}(s)} $$

$A > 0$ なら平均より良い行動、$A < 0$ なら平均より悪い行動です。

Advantageを使う理由

方策勾配において $G_t$ や $Q(s_t, a_t)$ をそのまま使うと、すべての報酬が正の場合、すべての行動の確率が増加する方向に更新されてしまいます。Advantage を使えば、良い行動の確率は上がり、悪い行動の確率は下がります。

性質

アドバンテージの期待値はゼロです。

$$ E_{a \sim \pi}[A^{\pi}(s, a)] = E_{a \sim \pi}[Q^{\pi}(s, a) – V^{\pi}(s)] = V^{\pi}(s) – V^{\pi}(s) = 0 $$

これにより、勾配推定の分散が低減されます。

Advantageの推定方法

$Q$ を直接推定する代わりに、TD残差を使って推定します。

1ステップの推定:

$$ \hat{A}_t = r_{t+1} + \gamma V_{\bm{w}}(s_{t+1}) – V_{\bm{w}}(s_t) = \delta_t $$

$n$ ステップの推定:

$$ \hat{A}_t^{(n)} = \sum_{k=0}^{n-1} \gamma^k r_{t+k+1} + \gamma^n V_{\bm{w}}(s_{t+n}) – V_{\bm{w}}(s_t) $$

A3Cでは典型的に $n = 5 \sim 20$ ステップの推定が使われます。

A3Cアルゴリズム

非同期学習の構成

A3Cの核心は非同期(Asynchronous)な並列学習です。

  • グローバルネットワーク: 共有パラメータ $\bm{\theta}$, $\bm{w}$ を保持
  • ワーカー $1, 2, \ldots, N$: それぞれが環境のコピーを持ち、独立にインタラクション

各ワーカーは以下を繰り返します。

  1. グローバルパラメータを自身のローカルネットワークにコピー
  2. $t_{\max}$ ステップ分のデータを収集
  3. アドバンテージと勾配を計算
  4. グローバルパラメータに勾配を非同期に適用

非同期学習の利点

  1. データの相関除去: 各ワーカーが独立した環境で異なる状態を探索するため、データ間の相関が自然に解消される。Experience Replayが不要
  2. 探索の多様性: 異なるワーカーが異なる方策パラメータで行動するため、状態空間の異なる領域を並列に探索
  3. 計算効率: GPUではなくCPUの複数コアで効率的に学習。メモリ使用量も少ない
  4. on-policyの維持: 常に最新に近い方策でデータを収集

アルゴリズム(ワーカーごとの処理)

各ワーカーは以下を繰り返します。

入力: グローバルパラメータ $\bm{\theta}, \bm{w}$、更新間隔 $t_{\max}$

  1. グローバルパラメータを同期: $\bm{\theta}’ \leftarrow \bm{\theta}$, $\bm{w}’ \leftarrow \bm{w}$
  2. $t_{\text{start}} = t$ として、$t_{\max}$ ステップ分のデータを収集: – 方策 $\pi_{\bm{\theta}’}(a_t|s_t)$ に従い行動 $a_t$ を選択 – 報酬 $r_{t+1}$ と次の状態 $s_{t+1}$ を観測
  3. $n$ ステップリターンを計算:

$$ R = \begin{cases} 0 & \text{(エピソード終了)} \\ V_{\bm{w}’}(s_t) & \text{(それ以外)} \end{cases} $$

$$ \text{for } i = t-1, \ldots, t_{\text{start}}: \quad R \leftarrow r_{i+1} + \gamma R $$

  1. 勾配を蓄積:

$$ d\bm{\theta} \leftarrow d\bm{\theta} + \nabla_{\bm{\theta}’} \log \pi_{\bm{\theta}’}(a_i|s_i)(R – V_{\bm{w}’}(s_i)) $$

$$ d\bm{w} \leftarrow d\bm{w} + \nabla_{\bm{w}’} (R – V_{\bm{w}’}(s_i))^2 $$

  1. グローバルパラメータを非同期更新:

$$ \bm{\theta} \leftarrow \bm{\theta} + \alpha_{\theta} \, d\bm{\theta} $$

$$ \bm{w} \leftarrow \bm{w} – \alpha_w \, d\bm{w} $$

エントロピー正則化

A3Cでは方策のエントロピーを損失に加えて探索を促進します。

$$ H(\pi_{\bm{\theta}}(\cdot|s)) = -\sum_a \pi_{\bm{\theta}}(a|s) \log \pi_{\bm{\theta}}(a|s) $$

Actorの損失:

$$ L_{\text{actor}} = -\log \pi_{\bm{\theta}}(a_t|s_t) \hat{A}_t – \beta H(\pi_{\bm{\theta}}(\cdot|s_t)) $$

$\beta$ はエントロピー係数(通常 0.01)です。エントロピーが大きいほど方策が一様に近く、早期の方策の収束(局所最適への落ち込み)を防ぎます。

A2C(Advantage Actor-Critic)

A3Cからの変更点

A2CはA3Cの同期版です。非同期更新の代わりに、すべてのワーカーが同じポイントで同期して更新します。

  1. すべてのワーカーが $t_{\max}$ ステップ分のデータを収集するまで待つ
  2. 全ワーカーの勾配を平均する
  3. グローバルパラメータを一括更新

A3CとA2Cの比較

A3C A2C
更新方式 非同期(各ワーカーが独立に更新) 同期(全ワーカーの結果を集約)
勾配の整合性 古いパラメータでの勾配が混在 常に最新パラメータでの勾配
実装 ロック機構、スレッド管理が複雑 バッチ処理でシンプル
GPU活用 困難(非同期のため) 容易(バッチ処理)
性能 A2Cとほぼ同等(場合による) A3Cと同等以上の場合が多い

A2Cはバッチ処理によりGPUの並列計算を活用でき、実装もシンプルなため、実用上はA2Cが好まれることが多いです。

Pythonでの実装

CartPole環境でのA2C

A2C(同期版)を実装します。

import numpy as np
import matplotlib.pyplot as plt

# --- CartPole環境 ---
class CartPoleEnv:
    """簡易CartPole環境"""

    def __init__(self):
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = self.masscart + self.masspole
        self.length = 0.5
        self.polemass_length = self.masspole * self.length
        self.force_mag = 10.0
        self.tau = 0.02
        self.theta_threshold = 12 * np.pi / 180
        self.x_threshold = 2.4
        self.max_steps = 500

    def reset(self):
        self.state = np.random.uniform(-0.05, 0.05, size=4)
        self.steps = 0
        return self.state.copy()

    def step(self, action):
        x, x_dot, theta, theta_dot = self.state
        force = self.force_mag if action == 1 else -self.force_mag
        costheta = np.cos(theta)
        sintheta = np.sin(theta)
        temp = (force + self.polemass_length * theta_dot**2 * sintheta) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta * temp) / (
            self.length * (4.0/3.0 - self.masspole * costheta**2 / self.total_mass))
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        x += self.tau * x_dot
        x_dot += self.tau * xacc
        theta += self.tau * theta_dot
        theta_dot += self.tau * thetaacc

        self.state = np.array([x, x_dot, theta, theta_dot])
        self.steps += 1
        done = (abs(x) > self.x_threshold or
                abs(theta) > self.theta_threshold or
                self.steps >= self.max_steps)
        reward = 1.0 if not done or self.steps >= self.max_steps else 0.0
        return self.state.copy(), reward, done


# --- Actor-Criticネットワーク ---
class ActorCriticNet:
    """Actor-Criticネットワーク(NumPyスクラッチ)"""

    def __init__(self, state_size, action_size, hidden_size=128, lr_actor=1e-3, lr_critic=5e-3):
        self.action_size = action_size
        self.lr_actor = lr_actor
        self.lr_critic = lr_critic

        # 共有層
        self.W1 = np.random.randn(state_size, hidden_size) * np.sqrt(2.0 / state_size)
        self.b1 = np.zeros(hidden_size)

        # Actor ヘッド
        self.W_a = np.random.randn(hidden_size, action_size) * 0.01
        self.b_a = np.zeros(action_size)

        # Critic ヘッド
        self.W_v = np.random.randn(hidden_size, 1) * 0.01
        self.b_v = np.zeros(1)

    def forward(self, states):
        """順伝播"""
        self.states = states
        self.h = np.maximum(0, states @ self.W1 + self.b1)

        logits = self.h @ self.W_a + self.b_a
        exp_l = np.exp(logits - np.max(logits, axis=-1, keepdims=True))
        self.probs = exp_l / np.sum(exp_l, axis=-1, keepdims=True)

        self.values = (self.h @ self.W_v + self.b_v).flatten()
        return self.probs, self.values

    def get_action(self, state):
        """単一状態から行動をサンプリング"""
        probs, value = self.forward(state.reshape(1, -1))
        p = probs[0]
        action = np.random.choice(self.action_size, p=p)
        return action, np.log(p[action] + 1e-8), value[0]

    def update(self, states, actions, advantages, returns, entropy_coeff=0.01):
        """A2C更新"""
        probs, values = self.forward(states)
        batch_size = len(states)

        # エントロピー
        entropy = -np.sum(probs * np.log(probs + 1e-8), axis=1)

        # Actor更新
        for i in range(batch_size):
            adv = advantages[i]
            a = actions[i]
            p = probs[i]
            h = self.h[i:i+1]  # (1, hidden)

            # 方策勾配 + エントロピー正則化
            grad_logit = np.zeros(self.action_size)
            for j in range(self.action_size):
                if j == a:
                    grad_logit[j] = (1 - p[j]) * adv + entropy_coeff * (np.log(p[j] + 1e-8) + 1) * (-1 + p[j])
                else:
                    grad_logit[j] = -p[j] * adv + entropy_coeff * (np.log(p[j] + 1e-8) + 1) * p[j]

            self.W_a += self.lr_actor * h.T @ grad_logit.reshape(1, -1) / batch_size
            self.b_a += self.lr_actor * grad_logit / batch_size

        # Critic更新
        value_error = (values - returns)
        dW_v = self.h.T @ value_error.reshape(-1, 1) / batch_size
        db_v = np.mean(value_error)
        self.W_v -= self.lr_critic * dW_v
        self.b_v -= self.lr_critic * db_v

        return np.mean(advantages ** 2)


# --- A2C学習(複数ワーカーのシミュレーション) ---
np.random.seed(42)

n_workers = 4
n_steps = 20        # 各ワーカーのステップ数
n_updates = 600
gamma = 0.99
entropy_coeff = 0.01

# ネットワークとワーカー環境
net = ActorCriticNet(state_size=4, action_size=2, hidden_size=128,
                     lr_actor=5e-4, lr_critic=1e-3)
envs = [CartPoleEnv() for _ in range(n_workers)]
states = [env.reset() for env in envs]

all_rewards = []
worker_episode_rewards = [0.0] * n_workers

for update in range(n_updates):
    # 各ワーカーでデータ収集
    all_states = [[] for _ in range(n_workers)]
    all_actions = [[] for _ in range(n_workers)]
    all_rewards_step = [[] for _ in range(n_workers)]
    all_values = [[] for _ in range(n_workers)]
    all_dones = [[] for _ in range(n_workers)]

    for step in range(n_steps):
        for w in range(n_workers):
            action, _, value = net.get_action(states[w])
            next_state, reward, done = envs[w].step(action)
            worker_episode_rewards[w] += reward

            all_states[w].append(states[w])
            all_actions[w].append(action)
            all_rewards_step[w].append(reward)
            all_values[w].append(value)
            all_dones[w].append(float(done))

            if done:
                all_rewards.append(worker_episode_rewards[w])
                worker_episode_rewards[w] = 0.0
                states[w] = envs[w].reset()
            else:
                states[w] = next_state

    # 各ワーカーのアドバンテージと収益を計算
    batch_states = []
    batch_actions = []
    batch_advantages = []
    batch_returns = []

    for w in range(n_workers):
        rewards_w = np.array(all_rewards_step[w])
        values_w = np.array(all_values[w])
        dones_w = np.array(all_dones[w])

        # ブートストラップ
        _, last_value = net.forward(states[w].reshape(1, -1))
        last_val = last_value[0]

        # nステップリターンの計算
        returns_w = np.zeros(n_steps)
        R = last_val
        for t in reversed(range(n_steps)):
            R = rewards_w[t] + gamma * R * (1 - dones_w[t])
            returns_w[t] = R

        advantages_w = returns_w - values_w

        batch_states.extend(all_states[w])
        batch_actions.extend(all_actions[w])
        batch_advantages.extend(advantages_w)
        batch_returns.extend(returns_w)

    # numpy配列に変換
    batch_states = np.array(batch_states)
    batch_actions = np.array(batch_actions)
    batch_advantages = np.array(batch_advantages)
    batch_returns = np.array(batch_returns)

    # アドバンテージの正規化
    batch_advantages = (batch_advantages - np.mean(batch_advantages)) / \
                       (np.std(batch_advantages) + 1e-8)

    # 更新
    net.update(batch_states, batch_actions, batch_advantages,
               batch_returns, entropy_coeff)

# --- 可視化 ---
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 学習曲線
window = 20
if len(all_rewards) > window:
    smooth = np.convolve(all_rewards, np.ones(window)/window, mode='valid')
    axes[0].plot(all_rewards, alpha=0.3, color='blue')
    axes[0].plot(np.arange(window-1, len(all_rewards)), smooth,
                 'b-', linewidth=2, label=f'Moving avg (window={window})')
    axes[0].legend(fontsize=10)

axes[0].set_xlabel('Episode', fontsize=11)
axes[0].set_ylabel('Total Reward', fontsize=11)
axes[0].set_title(f'A2C Learning Curve ({n_workers} workers)', fontsize=13)
axes[0].grid(True, alpha=0.3)

# 報酬分布の推移
if len(all_rewards) > 40:
    n_segments = 4
    segment_size = len(all_rewards) // n_segments
    colors = ['red', 'orange', 'green', 'blue']
    for i in range(n_segments):
        start = i * segment_size
        end = (i + 1) * segment_size if i < n_segments - 1 else len(all_rewards)
        axes[1].hist(all_rewards[start:end], bins=15, alpha=0.4,
                     color=colors[i], label=f'Segment {i+1}')
    axes[1].legend(fontsize=9)

axes[1].set_xlabel('Total Reward', fontsize=11)
axes[1].set_ylabel('Count', fontsize=11)
axes[1].set_title('Reward Distribution Over Time', fontsize=13)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

if len(all_rewards) >= 50:
    print(f"最後の50エピソードの平均報酬: {np.mean(all_rewards[-50:]):.1f}")

まとめ

本記事では、A3C(Asynchronous Advantage Actor-Critic)とA2Cの理論と実装について解説しました。

  • Actor-Critic: 方策(Actor)と価値関数(Critic)を同時に学習
  • Advantage関数: $A(s,a) = Q(s,a) – V(s)$ で行動の相対的な良さを評価、分散を低減
  • A3Cの非同期学習: 複数ワーカーが独立に環境とインタラクションし、非同期でグローバルパラメータを更新
  • Experience Replay不要: 非同期の並列学習がデータの相関を自然に除去
  • A2C: 同期版。バッチ処理でGPUを活用しやすく、実装がシンプル

次のステップとして、以下の記事も参考にしてください。