RNN(Recurrent Neural Network)は1990年代の以外と古くから存在するニューラルネットワークの手法で、時系列予測や自然言語処理の多くのニューラルネットワークの手法の源流となっている手法です。
現在はGRUやLSTMが広く利用されていますが、RNNについて理解することは、これらの比較的モダンな手法を理解する重要な手がかりになるでしょう。
今回はRNNのアーキテクチャについて学習し、Pythonでスクラッチで実装をしていきます。
RNNの全体像
まず最初に、RNNの全体像はこのようになっています。ここで、$x_n$が入力の時系列で、$h_n$が$n$番目の層から出力される隠れ変数$h_0$(hidden variableの頭文字)を示しています。
元々のRNNの特徴として、入力次元の数を可変にすることができることが大きな特徴でありメリットですが、この層の数を入力された次元の数だけ繰り返し実行するだけで、どのようなサイズの入力でもモデルとして対応することができます。これが従来のニューラルネットワークと比較したRNNの大きなメリットになります。
RNNの更新式
まず最初に、RNNのセルの中身の概要と更新式を掲載します。この時点で何のことやら?という感じだとは思います。
\begin{equation} \bm{h}_t = tanh(\bm{h}_{t-1} \bm{W_h} + \bm{x}_t \bm{W_t} + \bm{b}) \end{equation}
ここで、RNNに登場する重み$\bm{W_h}$や$\bm{W_t}$は各レイヤーで共通の重みです。
つまり、RNNは、入力次元の数だけ層がありますが、その層の数だけ重みがあるのではなく、同じ重みが利用されているということを意識する必要性があります。
また、活性化関数には、$tanh$(ハイパボリックタンジェント)が利用されます。
また、RNNのように再起的に繰り返される要素のことをセル(cell)と呼ぶ。このセルの概念は、LSTMなどにも登場します。
それぞれの層で重みがあるわけではないことは意識しておけるとよいと思います。
RNNの勾配消失・勾配爆発
RNNは学習時に勾配消失や勾配爆発という、学習の精度が極端に落ちてしまう現象が発生することが知られています。これはどのような問題何でしょうか。
これはRNNの層と誤差逆伝播が大きく関係してきます
繰り返しになりますが、RNNのアーキテクチャはこのようになっています。上図では入力が4層の場合だけを対象としていますが、実際に動画フレームを学習する場合や音声、文章などを学習する際には、入力の長さが1000くらい大きなものになる場合が考えられます。
この場合、RNNの層が1000個重なるわけですが、これが誤差逆伝搬の際に問題を引き起こします。
今回は勾配消失の事象などは直接実装に関係ないので、そんなものか、という感じで覚えておけば良いと思います。
RNNを実装する
それでは、RNNを実装していきましょう。今回は、pytorchやtensorflowなどのディープラーニングライブラリを用いずに、スクラッチで実装していきます。
まず最初に、活性化関数であるシグモイド関数や学習に用いる用いる確率的勾配効果法(SVG)、最適化手法であるAdamを実装していきます。この辺りは、本記事の主題とは少しずれてしまうので、実装だけを置くことにし、解説は別の記事に委ねることにします。
まずシグモイド関数の実装です。
def sigmoid(x):
return 1 / (1 + np.exp(-x))
続いて、SVGとAdamを実装していきます。この辺りは、RNNモデルを組み上げて実際にデータで学習する際に用いるパーツになります。
class SGD:
def __init__(self, lr=0.01):
self.lr = lr
def update(self, params, grads):
for i in range(len(params)):
params[i] -= self.lr * grads[i]
class Adam:
def __init__(self, lr=0.001, beta1=0.9, beta2=0.999):
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.iter = 0
self.m = None
self.v = None
def update(self, params, grads):
if self.m is None:
self.m, self.v = [], []
for param in params:
self.m.append(np.zeros_like(param))
self.v.append(np.zeros_like(param))
self.iter += 1
lr_t = self.lr * np.sqrt(1.0 - self.beta2**self.iter) / (1.0 - self.beta1**self.iter)
for i in range(len(params)):
self.m[i][:] += (1 - self.beta1) * (grads[i] - self.m[i])
self.v[i][:] += (1 - self.beta2) * (grads[i]**2 - self.v[i])
params[i][:] -= lr_t * self.m[i] / (np.sqrt(self.v[i]) + 1e-7)
RNNセルの実装
続いて、RNNの実装に入っていきいます。
RNNの繰り返し構造を、ここではセルと表現するようにします。RNNのセルは次のような計算グラフで表現できます。視覚は、入力されるベクトルの形式を模擬的に表現しています。
ここでは、RNNに入力されるデータの特徴量の次元数が1の場合で図示しています。
入力データの特徴量の次元数が多次元(図では3次元の場合)は次のような入力になります。
class RNNCell:
def __init__(self, Wx, Wh, b):
"""
Wx: (IxH)
Wh: (HxH)
b: (1xH)
"""
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.cache = None
# 順伝搬の計算
def forward(self, x, h_prev):
Wx, Wh, b = self.params
a = np.dot(h_prev, Wh) + np.dot(x, Wx) + b
h = np.tanh(a)
self.cache = (x, h_prev, h)
return h
# 逆伝搬の計算
def backward(self, dh):
Wx, Wh, b = self.params
x, h_prev, h = self.cache
da = dh * (1 - h ** 2)
dWx = np.dot(x.T, da)
dWh = np.dot(h_prev.T, da)
db = np.sum(da, axis=0)
self.grads = [dWx, dWh, db]
dh_prev = np.dot(da, Wh.T)
dx = np.dot(da, Wx.T)
return dx, dh_prev
計算グラフの図と照らし合わせて考えると、順伝搬の計算はわかりやすいかもしれません。逆伝搬の計算の勾配の計算も、計算グラフの図に添えてある行列の形式を見るとわかりやすいかもしれません。
続いて、入力時系列を与えた際に、入力時系列の入力サイズの数だけRNNCellを通すようにするTimeRNNモデルを実装していきます。
class TimeRNN:
def __init__(self, Wx, Wh, b):
self.params = [Wx, Wh, b] # (I,H), (H,H), (H,)
self.grads = None
self.layers = None
def forward(self, x, h=None):
Wx, Wh, b = self.params
# N: batch num
# T: window size
# I: Input size(feature num)
# H: Hidden size
N, T, I = x.shape
H = Wh.shape[0]
if h is None:
h = np.zeros((N, H))
self.h = h
self.layers = []
h_time = np.zeros((N, T, H))
# Windowの数だけRNNレイヤーを追加
for t in range(T):
layer = RNNCell(*self.params)
self.layers.append(layer)
self.h = layer.forward(x[:, t, :], self.h)
h_time[:, t, :] = self.h
return h_time, self.h
def backward(self, dh_time, dh=0): #dh,は中間層の逆伝播
Wx, Wh, b = self.params
N, T, H = dh_time.shape
I = Wx.shape[0]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)] # 後で足し算をするので作っておく
# 前の層に渡すための出力
dx_time = np.zeros((N, T, I))
# 時間について逆順に計算
for t in reversed(range(T)):
layer = self.layers[t] # LSTMレイヤーの呼び出し
# 出力層への勾配dh_timeと次の時刻の中間層への勾配dhを足したものを逆伝播
dx, dh = layer.backward(dh_time[:, t, :] + dh)
dx_time[:, t, :] = dx
self.grads[0] += layer.grads[0]
self.grads[1] += layer.grads[1]
self.grads[2] += layer.grads[2]
return dx_time, dh
続いて、RNNの潜在変数の値から出力値を取り出すAffineレイヤーを実装していきます。
class Affine:
def __init__(self, W, b):
self.params = [W, b]
self.grads = None
self.x = None
def forward(self, x):
W, b = self.params
out = np.dot(x, W) + b
self.x = x
return out
def backward(self, dout):
W, b = self.params
dx = np.dot(dout, W.T)
dW = np.dot(self.x.T, dout)
db = np.sum(dout, axis=0)
self.grads = [dW, db]
return dx
class TimeAffine:
def __init__(self, W, b):
self.params = [W, b]
self.grads = None
self.layer = None
def forward(self, x):
N, T, H = x.shape
# アファインレイヤを生成
self.layer = Affine(*self.params) # * はカッコを外す *[W, b] = W, b
x = x.reshape(N*T, -1) #(N*T, H)
out = self.layer.forward(x) #(N*T, O)
return out.reshape(N, T, -1) #(N, T, O)
def backward(self, dout):
N, T, O = dout.shape
dout = dout.reshape(N*T, -1) #(N*T, O)
dx = self.layer.backward(dout) #(N*T, H)
dx = dx.reshape(N,T,-1) #(N, T, H)
# 勾配を格納
self.grads = self.layer.grads
return dx
最後に全体をまとめるモデルを実装していきます。
class RNN_model:
def __init__(self, input_size, hidden_size, output_size):
I, H, O = input_size, hidden_size, output_size
self.I, self.H, self.O = I, H, O
# RNNのパラメータ
rnn_Wx = np.random.randn(I, H) / np.sqrt(I) # He の初期値の真似
rnn_Wh = np.random.randn(H, H) / np.sqrt(H) # He の初期値の真似
rnn_b = np.zeros(H)
# アファイン層のパラメータ
affine_W = np.random.randn(H, O) / np.sqrt(H)
affine_b = np.zeros(O)
self.params = [rnn_Wx, rnn_Wh, rnn_b, affine_W, affine_b]
# レイヤの生成
self.trnn = TimeRNN(rnn_Wx, rnn_Wh, rnn_b) # RNNレイヤ
self.taffine = TimeAffine(affine_W, affine_b) # Affineレイヤ
def predict(self, x):
# 損失レイヤ以外の全てのレイヤについて計算する
x, _ = self.trnn.forward(x)
x = self.taffine.forward(x)
return x
def forward(self, x, t):
# 損失レイヤ以外の全てのレイヤについて計算する
self.pred = self.predict(x)
self.t = t
# 損失レイヤを計算する
N = x.shape[0]
loss = ((self.pred - t)**2).sum()/(2*N)
return loss
def backward(self):
# 損失レイヤの逆伝播計算
N = self.pred.shape[0]
dout = (self.pred - self.t)/N
# 損失レイヤ以外の逆伝播計算
dout = self.taffine.backward(dout)
dout, _ = self.trnn.backward(dout)
self.grads = self.trnn.grads + self.taffine.grads
return dout
ここまででモデルの実装を完了することができました。
実際に動かして確認してみましょう。
スクラッチ実装のRNNで時系列予測を行う
ひとまず動かしてみましょう。まずは1次元で実験してみます。
データセットを作成します
ar = np.arange(0,50,0.1)
series = np.sin(ar) * (1+ ar**0.5 )
import matplotlib.pyplot as plt
plt.plot(ar, series)
段々と値が大きくなるようなサインカーブをデータセットとして利用します。
データの加工や学習パラメータの設定などを行います。
x_train = series[:400].reshape(-1,1)
x_full = series.reshape(-1,1)
# ハイパーパラメータの設定
batch_size = 30 # バッチサイズ
hidden_size = 100 # RNNレイヤのノード数
input_size = 1
output_size = 1
lr = 0.001 # 学習率
epochs = 500 # エポックス数
np.random.seed(0)
xs = x_train[:-1] # 入力. 1時刻先を予測するので最後のデータは使わない
ts = x_train[1:] # 出力(教師ラベル). 1時刻先を予測するので最初のデータは使わない
# モデルの生成
model = RNN_model(1, hidden_size, 1)
# 最適化手法の定義
optimizer = Adam(lr=lr)
# 学習時に使用する変数
data_size = len(xs)
max_iters = max(data_size // (batch_size * time_size),1) # 1エポックで計算する回数
total_loss = 0
loss_count = 0
train_loss_list = []
test_loss_list = []
# 学習
for epoch in range(epochs):
for m in range(max_iters):
# ミニバッチの各サンプルの読み込み開始位置を計算
offsets = [np.random.choice(data_size-time_size) for i in range(batch_size)]
# ミニバッチを生成
batch_x = np.zeros((batch_size, time_size,input_size))
batch_t = np.zeros((batch_size, time_size,input_size))
for i, offset in enumerate(offsets):
batch_x[i] = xs[offset: offset+ time_size] #time_size分の長さの部分列
batch_t[i] = ts[offset: offset+ time_size] #time_size分の長さの部分列
# 勾配を求め、パラメータを更新
loss = model.forward(batch_x, batch_t) # 順伝播計算
model.backward() # 逆伝播計算
optimizer.update(model.params, model.grads) # パラメータの更新
total_loss += loss # ロスの追加
loss_count += 1
if epoch % 10 == 0:
# 訓練期間の平均損失の算出
ave_loss = total_loss / loss_count
train_loss_list.append(ave_loss)
total_loss, loss_count = 0, 0
# テスト期間の平均損失の計算
xs_test = x_full[100-time_size:-1]
ts_test = x_full[1+100-time_size:]
data_test_size = len(xs_test)
batch_test_size = data_test_size - time_size
# ミニバッチを生成
batch_test_x = np.zeros((batch_test_size, time_size, input_size))
batch_test_t = np.zeros((batch_test_size, time_size, input_size))
for i in range(batch_test_size):
batch_test_x[i] = xs_test[i:i+time_size]
batch_test_t[i] = ts_test[i:i+time_size]
# 平均lossを追加
test_loss = model.forward(batch_test_x, batch_test_t)
test_loss_list.append(test_loss)
print('| epoch %d | train loss %.5f | test loss %.5f'% (epoch, ave_loss, test_loss))
学習の収束したら、結果を確認してみましょう。
xs = x_full[:-1]#.reshape(-1,1) # 入力. 1時刻先を予測するので、最後のデータは使わない
predicts = np.zeros(len(x_full))
predicts[:time_size] = x_full[:time_size, 0]
for t in range(time_size, len(x_full)):
x_window = xs[t-time_size : t].reshape(1,time_size,1)
# 一つ先の時間を予測する
x_window_next = model.predict(x_window)
# 最後の期の予測値を記録
predicts[t] = x_window_next[0,-1,0]
print(ar.shape, predicts.shape)
plt.plot(ar,predicts, label = 'predicts')
plt.plot(ar,series, label = 'original_data', alpha = 0.7)
plt.legend()
plt.show()
きっちりと予測できていることがわかりました。