rnn.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import numpy as np
  2. from functools import reduce
  3. from model import element_wise_op, ReluActivator, IdentityActivator
  4. class RecurrentLayer(object):
  5. """ 实现循环层 """
  6. def __init__(self, input_width, state_width, activator, learning_rate):
  7. """设置卷积层的超参数
  8. :param input_width: 输入数组维度
  9. :param state_width: state维度
  10. :param activator: 激活函数
  11. :param learning_rate: 学习速率
  12. """
  13. self.input_width = input_width
  14. self.state_width = state_width
  15. self.activator = activator
  16. self.learning_rate = learning_rate
  17. self.times = 0 # 当前时刻初始化为t0
  18. self.state_list = [] # 保存各个时刻的state
  19. self.state_list.append(np.zeros((state_width, 1))) # 初始化s0
  20. self.U = np.random.uniform(-1e-4, 1e-4, (state_width, input_width)) # 初始化U
  21. self.W = np.random.uniform(-1e-4, 1e-4, (state_width, state_width)) # 初始化W
  22. def forward(self, input_array):
  23. """根据st=f(Uxt+Wst-1)进行前向计算
  24. :param input_array: 输入数组
  25. :return:
  26. """
  27. # 时刻加1
  28. self.times += 1
  29. state = (np.dot(self.U, input_array) + np.dot(self.W, self.state_list[-1]))
  30. element_wise_op(state, self.activator.forward)
  31. self.state_list.append(state)
  32. def backward(self, sensitivity_array, activator):
  33. """实现BPTT算法
  34. :param sensitivity_array: 误差项数组
  35. :param activator: 激活函数
  36. :return:
  37. """
  38. self.calc_delta(sensitivity_array, activator)
  39. self.calc_gradient()
  40. def calc_delta(self, sensitivity_array, activator):
  41. """计算误差
  42. :param sensitivity_array: 上层误差函数
  43. :param activator: 激活函数
  44. :return:
  45. """
  46. self.delta_list = [] # 用来保存各个时刻的误差项
  47. for i in range(self.times):
  48. self.delta_list.append(np.zeros((self.state_width, 1)))
  49. self.delta_list.append(sensitivity_array)
  50. # 迭代计算每个时刻的误差项
  51. for k in range(self.times - 1, 0, -1):
  52. self.calc_delta_k(k, activator)
  53. def calc_delta_k(self, k, activator):
  54. """根据k+1时刻的delta计算k时刻的delta
  55. :param k: 时刻
  56. :param activator: 激活函数
  57. :return:
  58. """
  59. state = self.state_list[k + 1].copy()
  60. element_wise_op(self.state_list[k + 1], activator.backward)
  61. self.delta_list[k] = np.dot(np.dot(self.delta_list[k + 1].T, self.W), np.diag(state[:, 0])).T
  62. def calc_gradient(self):
  63. """ 计算梯度 """
  64. self.gradient_list = [] # 保存各个时刻的权重梯度
  65. for t in range(self.times + 1):
  66. self.gradient_list.append(np.zeros((self.state_width, self.state_width)))
  67. for t in range(self.times, 0, -1):
  68. self.calc_gradient_t(t)
  69. # 实际的梯度是各个时刻梯度之和
  70. self.gradient = reduce(lambda a, b: a + b, self.gradient_list, self.gradient_list[0]) # [0]被初始化为0且没有被修改过
  71. def calc_gradient_t(self, t):
  72. """计算每个时刻t权重的梯度
  73. :param t: 时刻
  74. :return:
  75. """
  76. gradient = np.dot(self.delta_list[t], self.state_list[t - 1].T)
  77. self.gradient_list[t] = gradient
  78. def reset_state(self):
  79. """循环层是一个带状态的层,每次forword都会改变循环层的内部状态,这给梯度检查带来了麻烦。
  80. 因此,我们需要一个reset_state方法,来重置循环层的内部状态
  81. :return:
  82. """
  83. self.times = 0 # 当前时刻初始化为t0
  84. self.state_list = [] # 保存各个时刻的state
  85. self.state_list.append(np.zeros((self.state_width, 1))) # 初始化s0
  86. def update(self):
  87. """ 按照梯度下降,更新权重 """
  88. self.W -= self.learning_rate * self.gradient