123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- import numpy as np
- from functools import reduce
- from model import element_wise_op, ReluActivator, IdentityActivator, SigmoidActivator, TanhActivator
- class LstmLayer(object):
- """ 实现LSTM层 """
- def __init__(self, input_width, state_width, learning_rate):
- """设置超参数,初始化LSTM层
- :param input_width: 输入维度
- :param state_width: 保存状态的向量维度
- :param learning_rate: 学习速率
- """
- self.input_width = input_width
- self.state_width = state_width
- self.learning_rate = learning_rate
- # 门的激活函数
- self.gate_activator = SigmoidActivator()
- # 输出的激活函数
- self.output_activator = TanhActivator()
- # 当前时刻初始化为t0
- self.times = 0
- # 各个时刻的单元状态向量c
- self.c_list = self.init_state_vec()
- # 各个时刻的输出向量h
- self.h_list = self.init_state_vec()
- # 各个时刻的遗忘门f
- self.f_list = self.init_state_vec()
- # 各个时刻的输入门i
- self.i_list = self.init_state_vec()
- # 各个时刻的输出门o
- self.o_list = self.init_state_vec()
- # 各个时刻的即时状态c
- self.ct_list = self.init_state_vec()
- # 遗忘门权重矩阵Wfh, Wfx, 偏置项bf
- self.Wfh, self.Wfx, self.bf = (self.init_weight_mat())
- # 输入门权重矩阵Wfh, Wfx, 偏置项bf
- self.Wih, self.Wix, self.bi = (self.init_weight_mat())
- # 输出门权重矩阵Wfh, Wfx, 偏置项bf
- self.Woh, self.Wox, self.bo = (self.init_weight_mat())
- # 单元状态权重矩阵Wfh, Wfx, 偏置项bf
- self.Wch, self.Wcx, self.bc = (self.init_weight_mat())
- def init_state_vec(self):
- """初始化保存状态的向量
- :return:
- """
- state_vec_list = [np.zeros((self.state_width, 1))]
- return state_vec_list
- def init_weight_mat(self):
- """初始化权重矩阵
- :return:
- """
- Wh = np.random.uniform(-1e-4, 1e-4, (self.state_width, self.state_width))
- Wx = np.random.uniform(-1e-4, 1e-4, (self.state_width, self.input_width))
- b = np.zeros((self.state_width, 1))
- return Wh, Wx, b
- def forward(self, x):
- """前向计算
- :param x: 输入
- :return:
- """
- self.times += 1
- # 遗忘门
- fg = self.calc_gate(x, self.Wfx, self.Wfh, self.bf, self.gate_activator)
- self.f_list.append(fg)
- # 输入门
- ig = self.calc_gate(x, self.Wix, self.Wih, self.bi, self.gate_activator)
- self.i_list.append(ig)
- # 输出门
- og = self.calc_gate(x, self.Wox, self.Woh, self.bo, self.gate_activator)
- self.o_list.append(og)
- # 即时状态
- ct = self.calc_gate(x, self.Wcx, self.Wch, self.bc, self.output_activator)
- self.ct_list.append(ct)
- # 单元状态
- c = fg * self.c_list[self.times - 1] + ig * ct
- self.c_list.append(c)
- # 输出
- h = og * self.output_activator.forward(c)
- self.h_list.append(h)
- def calc_gate(self, x, Wx, Wh, b, activator):
- """计算门
- :param x: 输入
- :param Wx: 权重矩阵Wx
- :param Wh: 权重矩阵Wh
- :param b: 偏置值
- :param activator: 激活函数
- :return:
- """
- h = self.h_list[self.times - 1] # 上次的LSTM输出
- net = np.dot(Wh, h) + np.dot(Wx, x) + b
- gate = activator.forward(net)
- return gate
- def backward(self, x, delta_h, activator):
- """实现LSTM训练算法
- :param x: 输入
- :param delta_h: 上层误差矩阵
- :param activator: 激活函数
- :return:
- """
- self.calc_delta(delta_h, activator)
- self.calc_gradient(x)
- def calc_delta(self, delta_h, activator):
- """计算误差
- :param delta_h: 上层误差矩阵
- :param activator: 激活函数
- :return:
- """
- # 初始化各个时刻的误差项
- self.delta_h_list = self.init_delta() # 输出误差项
- self.delta_o_list = self.init_delta() # 输出门误差项
- self.delta_i_list = self.init_delta() # 输入门误差项
- self.delta_f_list = self.init_delta() # 遗忘门误差项
- self.delta_ct_list = self.init_delta() # 即时输出误差项
- # 保存从上一层传递下来的当前时刻的误差项
- self.delta_h_list[-1] = delta_h
- # 迭代计算每个时刻的误差项
- for k in range(self.times, 0, -1):
- self.calc_delta_k(k)
- def init_delta(self):
- """ 初始化误差项 """
- delta_list = []
- for i in range(self.times + 1):
- delta_list.append(np.zeros((self.state_width, 1)))
- return delta_list
- def calc_delta_k(self, k):
- """根据k时刻的delta_h,计算k时刻的delta_f、
- delta_i、delta_o、delta_ct,以及k-1时刻的delta_h
- :param k: 时刻
- :return:
- """
- # 获得k时刻前向计算的值
- ig = self.i_list[k]
- og = self.o_list[k]
- fg = self.f_list[k]
- ct = self.ct_list[k]
- c = self.c_list[k]
- c_prev = self.c_list[k - 1]
- tanh_c = self.output_activator.forward(c)
- delta_k = self.delta_h_list[k]
- # 根据式9计算delta_o
- delta_o = (delta_k * tanh_c * self.gate_activator.backward(og))
- delta_f = (delta_k * og * (1 - tanh_c * tanh_c) * c_prev * self.gate_activator.backward(fg))
- delta_i = (delta_k * og * (1 - tanh_c * tanh_c) * ct * self.gate_activator.backward(ig))
- delta_ct = (delta_k * og * (1 - tanh_c * tanh_c) * ig * self.output_activator.backward(ct))
- delta_h_prev = (np.dot(delta_o.transpose(), self.Woh) + np.dot(delta_i.transpose(), self.Wih) + np.dot(
- delta_f.transpose(), self.Wfh) + np.dot(delta_ct.transpose(), self.Wch)).transpose()
- # 保存全部delta值
- self.delta_h_list[k - 1] = delta_h_prev
- self.delta_f_list[k] = delta_f
- self.delta_i_list[k] = delta_i
- self.delta_o_list[k] = delta_o
- self.delta_ct_list[k] = delta_ct
- def calc_gradient(self, x):
- """计算梯度
- :param x: 输入
- :return:
- """
- # 初始化遗忘门权重梯度矩阵和偏置项
- self.Wfh_grad, self.Wfx_grad, self.bf_grad = (self.init_weight_gradient_mat())
- # 初始化输入门权重梯度矩阵和偏置项
- self.Wih_grad, self.Wix_grad, self.bi_grad = (self.init_weight_gradient_mat())
- # 初始化输出门权重梯度矩阵和偏置项
- self.Woh_grad, self.Wox_grad, self.bo_grad = (self.init_weight_gradient_mat())
- # 初始化单元状态权重梯度矩阵和偏置项
- self.Wch_grad, self.Wcx_grad, self.bc_grad = (self.init_weight_gradient_mat())
- # 计算对上一次输出h的权重梯度
- for t in range(self.times, 0, -1):
- # 计算各个时刻的梯度
- (Wfh_grad, bf_grad,
- Wih_grad, bi_grad,
- Woh_grad, bo_grad,
- Wch_grad, bc_grad) = (self.calc_gradient_t(t))
- # 实际梯度是各时刻梯度之和
- self.Wfh_grad += Wfh_grad
- self.bf_grad += bf_grad
- self.Wih_grad += Wih_grad
- self.bi_grad += bi_grad
- self.Woh_grad += Woh_grad
- self.bo_grad += bo_grad
- self.Wch_grad += Wch_grad
- self.bc_grad += bc_grad
- # 计算对本次输入x的权重梯度
- xt = x.transpose()
- self.Wfx_grad = np.dot(self.delta_f_list[-1], xt)
- self.Wix_grad = np.dot(self.delta_i_list[-1], xt)
- self.Wox_grad = np.dot(self.delta_o_list[-1], xt)
- self.Wcx_grad = np.dot(self.delta_ct_list[-1], xt)
- def init_weight_gradient_mat(self):
- """ 初始化权重矩阵 """
- Wh_grad = np.zeros((self.state_width, self.state_width))
- Wx_grad = np.zeros((self.state_width, self.input_width))
- b_grad = np.zeros((self.state_width, 1))
- return Wh_grad, Wx_grad, b_grad
- def calc_gradient_t(self, t):
- """计算每个时刻t权重的梯度
- :param t: 时刻
- :return:
- """
- h_prev = self.h_list[t - 1].transpose()
- Wfh_grad = np.dot(self.delta_f_list[t], h_prev)
- bf_grad = self.delta_f_list[t]
- Wih_grad = np.dot(self.delta_i_list[t], h_prev)
- bi_grad = self.delta_f_list[t]
- Woh_grad = np.dot(self.delta_o_list[t], h_prev)
- bo_grad = self.delta_f_list[t]
- Wch_grad = np.dot(self.delta_ct_list[t], h_prev)
- bc_grad = self.delta_ct_list[t]
- return Wfh_grad, bf_grad, Wih_grad, bi_grad, Woh_grad, bo_grad, Wch_grad, bc_grad
- def update(self):
- """ 按照梯度下降,更新权重 """
- self.Wfh -= self.learning_rate * self.Whf_grad
- self.Wfx -= self.learning_rate * self.Whx_grad
- self.bf -= self.learning_rate * self.bf_grad
- self.Wih -= self.learning_rate * self.Whi_grad
- self.Wix -= self.learning_rate * self.Whi_grad
- self.bi -= self.learning_rate * self.bi_grad
- self.Woh -= self.learning_rate * self.Wof_grad
- self.Wox -= self.learning_rate * self.Wox_grad
- self.bo -= self.learning_rate * self.bo_grad
- self.Wch -= self.learning_rate * self.Wcf_grad
- self.Wcx -= self.learning_rate * self.Wcx_grad
- self.bc -= self.learning_rate * self.bc_grad
- def reset_state(self):
- """ 重置内部状态 """
- # 当前时刻初始化为t0
- self.times = 0
- # 各个时刻的单元状态向量c
- self.c_list = self.init_state_vec()
- # 各个时刻的输出向量h
- self.h_list = self.init_state_vec()
- # 各个时刻的遗忘门f
- self.f_list = self.init_state_vec()
- # 各个时刻的输入门i
- self.i_list = self.init_state_vec()
- # 各个时刻的输出门o
- self.o_list = self.init_state_vec()
- # 各个时刻的即时状态c~
- self.ct_list = self.init_state_vec()
|