123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- import numpy as np
- from model import IdentityActivator
- class TreeNode(object):
- """ 树节点结构,用它保存卷积神经网络生成的整棵树 """
- def __init__(self, data, children=None, children_data=None):
- """初始化树节点结构
- :param data: 输入数据
- :param children: 子节点
- :param children_data: 父节点
- """
- if children_data is None:
- children_data = []
- if children is None:
- children = []
- self.parent = None
- self.children = children
- self.children_data = children_data
- self.data = data
- for child in children:
- child.parent = self
- class RecursiveLayer(object):
- """ 递归神经网络实现 """
- def __init__(self, node_width, child_count, activator, learning_rate):
- """递归神经网络构造函数
- :param node_width: 表示每个节点的向量的维度
- :param child_count: 每个父节点有几个子节点
- :param activator: 激活函数
- :param learning_rate: 学习速率
- """
- self.node_width = node_width
- self.child_count = child_count
- self.activator = activator
- self.learning_rate = learning_rate
- # 权重数组W
- self.W = np.random.uniform(-1e-4, 1e-4, (node_width, node_width * child_count))
- # 偏置项b
- self.b = np.zeros((node_width, 1))
- # 递归神经网络生成的树的根节点
- self.root = None
- def forward(self, *children):
- """前向计算
- 递归神经网络将这些树节点作为子节点,并计算它们的父节点。最后,将计算的父节点保存在self.root变量中
- :param children: 一系列的树节点对象
- """
- children_data = self.concatenate(children)
- parent_data = self.activator.forward(np.dot(self.W, children_data) + self.b)
- self.root = TreeNode(parent_data, children, children_data)
- def concatenate(self, tree_nodes):
- """将各个树节点中的数据拼接成一个长向量
- :param tree_nodes: 各个树节点
- :return:
- """
- concat = np.zeros((0, 1))
- for node in tree_nodes:
- concat = np.concatenate((concat, node.data))
- return concat
- def backward(self, parent_delta):
- """BPTS反向传播算法
- :param parent_delta: 父节点误差
- :return:
- """
- # 各个节点的误差项
- self.calc_delta(parent_delta, self.root)
- # 梯度
- self.W_grad, self.b_grad = self.calc_gradient(self.root)
- def calc_delta(self, parent_delta, parent):
- """计算每个节点的delta
- :param parent_delta: 父节点误差
- :param parent: 父节点
- :return:
- """
- parent.delta = parent_delta
- if parent.children:
- # 根据式2计算每个子节点的delta
- children_delta = np.dot(self.W.T, parent_delta) * (self.activator.backward(parent.children_data))
- # slices = [(子节点编号,子节点delta起始位置,子节点delta结束位置)]
- slices = [(i, i * self.node_width, (i + 1) * self.node_width) for i in range(self.child_count)]
- # 针对每个子节点,递归调用calc_delta函数
- for s in slices:
- self.calc_delta(children_delta[s[1]:s[2]], parent.children[s[0]])
- def calc_gradient(self, parent):
- """计算每个节点权重的梯度,并将它们求和,得到最终的梯度
- :param parent: 父节点
- :return:
- """
- # 初始化
- W_grad = np.zeros((self.node_width, self.node_width * self.child_count))
- b_grad = np.zeros((self.node_width, 1))
- if not parent.children:
- return W_grad, b_grad
- parent.W_grad = np.dot(parent.delta, parent.children_data.T)
- parent.b_grad = parent.delta
- W_grad += parent.W_grad
- b_grad += parent.b_grad
- # 将每个节点梯度求和
- for child in parent.children:
- W, b = self.calc_gradient(child)
- W_grad += W
- b_grad += b
- return W_grad, b_grad
- def update(self):
- """ 使用SGD算法更新权重 """
- self.W -= self.learning_rate * self.W_grad
- self.b -= self.learning_rate * self.b_grad
- def reset_state(self):
- """ 重置父节点 """
- self.root = None
- def dump(self, dump_grad=False):
- """ 打印递归网络 """
- print('root.data: %s' % self.root.data)
- print('root.children_data: %s' % self.root.children_data)
- if dump_grad:
- print('W_grad: %s' % self.W_grad)
- print('b_grad: %s' % self.b_grad)
|