cnn.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. import numpy as np
  2. from model import element_wise_op
  3. def padding(input_array, zp):
  4. """为数组增加Zero padding,自动适配输入为2D和3D的情况
  5. :param input_array: 输入数组
  6. :param zp:
  7. :return: 填充几圈
  8. """
  9. if zp == 0:
  10. return input_array
  11. else:
  12. if input_array.ndim == 3:
  13. input_width = input_array.shape[2]
  14. input_height = input_array.shape[1]
  15. input_depth = input_array.shape[0]
  16. padded_array = np.zeros((input_depth, input_height + 2 * zp, input_width + 2 * zp))
  17. padded_array[:, zp: zp + input_height, zp: zp + input_width] = input_array
  18. return padded_array
  19. elif input_array.ndim == 2:
  20. input_width = input_array.shape[1]
  21. input_height = input_array.shape[0]
  22. padded_array = np.zeros((input_height + 2 * zp, input_width + 2 * zp))
  23. padded_array[zp: zp + input_height, zp: zp + input_width] = input_array
  24. return padded_array
  25. def conv(input_array, kernel_array, output_array, stride, bias):
  26. """计算卷积,自动适配输入为2D和3D的情况
  27. :param input_array: 输入数组
  28. :param kernel_array: 权重数组
  29. :param output_array: 输出数组
  30. :param stride: 步长
  31. :param bias: 偏置值
  32. :return:
  33. """
  34. output_width = output_array.shape[1]
  35. output_height = output_array.shape[0]
  36. kernel_width = kernel_array.shape[-1]
  37. kernel_height = kernel_array.shape[-2]
  38. for i in range(output_height):
  39. for j in range(output_width):
  40. # 获取卷积的区域
  41. conv_value = get_patch(input_array, i, j, kernel_width, kernel_height, stride)
  42. output_array[i][j] = (conv_value * kernel_array).sum() + bias
  43. def get_patch(input_array, i, j, filter_width, filter_height, stride):
  44. """ 从输入数组中获取本次卷积的区域,自动适配输入为2D和3D的情况
  45. :param input_array: 输入数组
  46. :param i: 输出高度
  47. :param j: 输出宽度
  48. :param filter_width: filter宽度
  49. :param filter_height: filter高度
  50. :param stride: 步长
  51. :return:
  52. """
  53. start_i = i * stride
  54. start_j = j * stride
  55. if input_array.ndim == 2:
  56. return input_array[start_i: start_i + filter_height, start_j: start_j + filter_width]
  57. elif input_array.ndim == 3:
  58. return input_array[:, start_i: start_i + filter_height, start_j: start_j + filter_width]
  59. def get_max_index(array):
  60. """获取一个2D区域的最大值所在的索引
  61. :param array: 区域数组
  62. :return:
  63. """
  64. max_i = 0
  65. max_j = 0
  66. max_value = array[0, 0]
  67. for i in range(array.shape[0]):
  68. for j in range(array.shape[1]):
  69. if array[i, j] > max_value:
  70. max_value = array[i, j]
  71. max_i, max_j = i, j
  72. return max_i, max_j
  73. class ConvLayer(object):
  74. """ 卷积层的实现 """
  75. def __init__(self, input_width, input_height, channel_number, filter_width, filter_height, filter_number,
  76. zero_padding, stride, activator, learning_rate):
  77. """设置超参数,初始化卷积层
  78. :param input_width: 输入层宽度
  79. :param input_height: 输入层高度
  80. :param channel_number: 通道个数
  81. :param filter_width: filter宽度
  82. :param filter_height: filter高度
  83. :param filter_number: filter个数
  84. :param zero_padding: 在原图像外围填充多少圈的0
  85. :param stride: 步长
  86. :param activator: 激活函数
  87. :param learning_rate: 学习速率
  88. """
  89. self.input_width = input_width
  90. self.input_height = input_height
  91. self.channel_number = channel_number
  92. self.filter_width = filter_width
  93. self.filter_height = filter_height
  94. self.filter_number = filter_number
  95. self.zero_padding = zero_padding
  96. self.stride = stride
  97. self.activator = activator
  98. self.learning_rate = learning_rate
  99. # 输出宽度
  100. self.output_width = ConvLayer.calculate_output_size(self.input_width, filter_width, zero_padding, stride)
  101. # 输出高度
  102. self.output_height = ConvLayer.calculate_output_size(self.input_height, filter_height, zero_padding, stride)
  103. # 卷积层的结果输出
  104. self.output_array = np.zeros((self.filter_number, self.output_height, self.output_width))
  105. # 输出filter
  106. self.filters = [Filter(filter_width, filter_height, self.channel_number) for _ in range(filter_number)]
  107. def forward(self, input_array):
  108. """计算根据输入来计算卷积层的输出,输出结果保存在self.output_array
  109. :param input_array: 输入
  110. :return:
  111. """
  112. self.input_array = input_array
  113. # 为数组外围填充0
  114. self.padded_input_array = padding(input_array, self.zero_padding)
  115. for i in range(self.filter_number):
  116. filter_ = self.filters[i]
  117. conv(self.padded_input_array, filter_.get_weights(), self.output_array[i], self.stride, filter_.get_bias())
  118. # output_array每个元素激活函数
  119. element_wise_op(self.output_array, self.activator.forward)
  120. def backward(self, input_array, sensitivity_array, activator):
  121. """计算传递给前一层的误差项,以及计算每个权重的梯度
  122. 前一层的误差项保存在self.delta_array
  123. 梯度保存在Filter对象的weights_grad
  124. :param input_array:
  125. :param sensitivity_array:
  126. :param activator:
  127. :return:
  128. """
  129. self.forward(input_array)
  130. self.bp_sensitivity_map(sensitivity_array, activator)
  131. self.bp_gradient(sensitivity_array)
  132. def bp_sensitivity_map(self, sensitivity_array, activator):
  133. """计算传递到上一层的sensitivity map
  134. :param sensitivity_array: 本层的sensitivity map
  135. :param activator: 上一层的激活函数
  136. :return:
  137. """
  138. # 处理卷积步长,对原始sensitivity map进行扩展
  139. expanded_array = self.expand_sensitivity_map(sensitivity_array)
  140. # full卷积,对误差矩阵进行zero padding
  141. # 虽然原始输入的zero padding单元也会获得残差
  142. # 但这个残差不需要继续向上传递,因此就不计算了
  143. expanded_width = expanded_array.shape[2]
  144. zp = (self.input_width + self.filter_width - 1 - expanded_width) // 2
  145. padded_array = padding(expanded_array, zp)
  146. # 初始化delta_array,用于保存传递到上一层的sensitivity map
  147. self.delta_array = self.create_delta_array()
  148. # 对于具有多个filter的卷积层来说,最终传递到上一层的sensitivity map相当于所有的filter的sensitivity map之和
  149. for f in range(self.filter_number):
  150. filter_ = self.filters[f]
  151. # 将filter权重翻转180度
  152. flipped_weights = np.array(list(map(lambda i: np.rot90(i, 2), filter_.get_weights())))
  153. # 计算与一个filter对应的delta_array
  154. delta_array = self.create_delta_array()
  155. for d in range(delta_array.shape[0]):
  156. conv(padded_array[f], flipped_weights[d], delta_array[d], 1, 0)
  157. self.delta_array += delta_array
  158. # 将计算结果与激活函数的偏导数做element-wise乘法操作
  159. derivative_array = np.array(self.input_array)
  160. element_wise_op(derivative_array, activator.backward)
  161. self.delta_array *= derivative_array
  162. def create_delta_array(self):
  163. """初始化误差矩阵"""
  164. return np.zeros((self.channel_number, self.input_height, self.input_width))
  165. def expand_sensitivity_map(self, sensitivity_array):
  166. """将步长为S的sensitivity map "还原"为步长为1的sensitivity map
  167. :param sensitivity_array: sensitivity map
  168. :return:
  169. """
  170. depth = sensitivity_array.shape[0]
  171. # 确定扩展后sensitivity map的大小,计算stride为1时sensitivity map的大小
  172. expanded_width = (self.input_width - self.filter_width + 2 * self.zero_padding + 1)
  173. expanded_height = (self.input_height - self.filter_height + 2 * self.zero_padding + 1)
  174. # 构建新的sensitivity_map
  175. expand_array = np.zeros((depth, expanded_height, expanded_width))
  176. # 从原始sensitivity map拷贝误差值
  177. for i in range(self.output_height):
  178. for j in range(self.output_width):
  179. i_pos = i * self.stride
  180. j_pos = j * self.stride
  181. expand_array[:, i_pos, j_pos] = sensitivity_array[:, i, j]
  182. return expand_array
  183. def bp_gradient(self, sensitivity_array):
  184. """计算梯度
  185. :param sensitivity_array: sensitivity map
  186. :return:
  187. """
  188. # 处理卷积步长,对原始sensitivity map进行扩展
  189. expanded_array = self.expand_sensitivity_map(sensitivity_array)
  190. for f in range(self.filter_number):
  191. # 计算每个权重的梯度
  192. filter_ = self.filters[f]
  193. for d in range(filter_.weights.shape[0]):
  194. conv(self.padded_input_array[d], expanded_array[f], filter_.weights_grad[d], 1, 0)
  195. # 计算偏置项的梯度
  196. filter_.bias_grad = expanded_array[f].sum()
  197. def update(self):
  198. """按照梯度下降,更新权重
  199. :return:
  200. """
  201. for filter_ in self.filters:
  202. filter_.update(self.learning_rate)
  203. @staticmethod
  204. def calculate_output_size(input_size, filter_size, zero_padding, stride):
  205. """ 计算输出数组大小
  206. :param input_size: 对应长或宽输入大小
  207. :param filter_size: 对应filter长或宽输入大小
  208. :param zero_padding: zero_padding 圈数
  209. :param stride: 步长
  210. :return:
  211. """
  212. return (input_size - filter_size + 2 * zero_padding) // stride + 1
  213. class Filter(object):
  214. """ Filter类保存了卷积层的参数以及梯度,并且实现了用梯度下降算法来更新参数。 """
  215. def __init__(self, width, height, depth):
  216. """
  217. :param width: 宽度
  218. :param height: 长度
  219. :param depth: 通道数
  220. """
  221. # 权重
  222. self.weights = np.random.uniform(-1e-4, 1e-4, (depth, height, width))
  223. # 偏置值
  224. self.bias = 0
  225. # 权重梯度
  226. self.weights_grad = np.zeros(self.weights.shape)
  227. # 偏置值梯度
  228. self.bias_grad = 0
  229. def __repr__(self):
  230. return 'filter weights:\n%s\nbias:\n%s' % (repr(self.weights), repr(self.bias))
  231. def get_weights(self):
  232. return self.weights
  233. def get_bias(self):
  234. return self.bias
  235. def update(self, learning_rate):
  236. """ 梯度下降更新权重和偏置值 """
  237. self.weights -= learning_rate * self.weights_grad
  238. self.bias -= learning_rate * self.bias_grad
  239. class MaxPoolingLayer(object):
  240. """ Pool层实现 """
  241. def __init__(self, input_width, input_height, channel_number, filter_width, filter_height, stride):
  242. """初始化Pool层
  243. :param input_width: 输入宽度
  244. :param input_height: 输入高度
  245. :param channel_number: 通道个数
  246. :param filter_width: filter宽度
  247. :param filter_height: filter高度
  248. :param stride: 步长
  249. """
  250. self.input_width = input_width
  251. self.input_height = input_height
  252. self.channel_number = channel_number
  253. self.filter_width = filter_width
  254. self.filter_height = filter_height
  255. self.stride = stride
  256. # 输出宽
  257. self.output_width = (input_width - filter_width) // self.stride + 1
  258. # 输出高
  259. self.output_height = (input_height - filter_height) // self.stride + 1
  260. # 输出数组
  261. self.output_array = np.zeros((self.channel_number, self.output_height, self.output_width))
  262. def forward(self, input_array):
  263. """根据输入来输出池化(pool)层的结果
  264. :param input_array: 输入
  265. :return:
  266. """
  267. for d in range(self.channel_number):
  268. for i in range(self.output_height):
  269. for j in range(self.output_width):
  270. self.output_array[d, i, j] = (
  271. get_patch(input_array[d], i, j, self.filter_width, self.filter_height, self.stride).max())
  272. def backward(self, input_array, sensitivity_array):
  273. """计算传递给前一层的误差项,以及计算每个权重的梯度
  274. 前一层的误差项保存在self.delta_array
  275. :param input_array: 输入
  276. :param sensitivity_array: sensitivity map
  277. :return:
  278. """
  279. self.delta_array = np.zeros(input_array.shape)
  280. for d in range(self.channel_number):
  281. for i in range(self.output_height):
  282. for j in range(self.output_width):
  283. patch_array = get_patch(input_array[d], i, j, self.filter_width, self.filter_height, self.stride)
  284. k, l = get_max_index(patch_array)
  285. self.delta_array[d, i * self.stride + k, j * self.stride + l] = sensitivity_array[d, i, j]