123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- import os
- import random
- import numpy as np
- def load_from_file(file_path, is_binary=0):
- """
- :param file_path:
- :param is_binary: 0:not binary, 1:binary type, 2:binary other
- :return:
- """
- node_id_data_list = []
- node_type_data_list = []
- node_id = list()
- node_type = list()
- graph_type = list()
- with open(file_path) as file:
- for line in file:
- if len(line.strip()) == 0:
- node_id_data_list.append([node_id, graph_type])
- node_type_data_list.append([node_type, graph_type])
- node_id = list()
- node_type = list()
- graph_type = list()
- elif len(line.split(' ')) == 3:
- if is_binary == 0:
- graph_type.append([int(line.split(' ')[1])])
- elif is_binary == 1:
- graph_type.append([1])
- else:
- graph_type.append([2])
- else:
- data = line.split(' ')
- node_id.append([int(data[0]), int(data[2]), int(data[3])])
- node_type.append([int(data[1]), int(data[2]), int(data[4])])
- return node_id_data_list, node_type_data_list
- def load_from_directory(path):
- node_id_data_list = []
- node_type_data_list = []
- for file_name in os.listdir(path):
- node_id, node_type = load_from_file(path + "/" + file_name)
- node_id_data_list.extend(node_id)
- node_type_data_list.extend(node_type)
- return node_id_data_list, node_type_data_list
- def find_max_edge_id(data_list):
- max_edge_id = 0
- for data in data_list:
- edges = data[0]
- for item in edges:
- if item[1] > max_edge_id:
- max_edge_id = item[1]
- return max_edge_id
- def find_max_node_id(data_list):
- max_node_id = 0
- for data in data_list:
- edges = data[0]
- for item in edges:
- if item[0] > max_node_id:
- max_node_id = item[0]
- if item[2] > max_node_id:
- max_node_id = item[2]
- return max_node_id
- def convert_program_data(data_list, n_annotation_dim, n_nodes):
-
- class_data_list = []
- for item in data_list:
- edge_list = item[0]
- target_list = item[1]
- for target in target_list:
- task_type = target[0]
- task_output = target[-1]
- annotation = np.zeros([n_nodes, n_annotation_dim])
- for edge in edge_list:
- src_idx = edge[0]
- if src_idx < len(annotation):
- annotation[src_idx - 1][0] = 1
- class_data_list.append([edge_list, annotation, task_output])
- return class_data_list
- def create_adjacency_matrix(edges, n_nodes, n_edge_types):
- a = np.zeros([n_nodes, n_nodes * n_edge_types * 2])
- for edge in edges:
- src_idx = edge[0]
- e_type = edge[1]
- tgt_idx = edge[2]
- if tgt_idx < len(a):
- a[tgt_idx - 1][(e_type - 1) * n_nodes + src_idx - 1] = 1
- if src_idx < len(a):
- a[src_idx - 1][(e_type - 1 + n_edge_types) * n_nodes + tgt_idx - 1] = 1
- return a
- def create_embedding_matrix(node_id_edges, node_type_edges, n_nodes, n_types):
- anno = np.zeros([n_nodes, n_types])
- for i in range(len(node_id_edges)):
- node_type = node_type_edges[i][0]
-
- src_idx = node_id_edges[i][0]
- anno[src_idx - 1][node_type - 1] = 1.0
- return anno
- class Dataset:
- """
- Load bAbI tasks for GGNN
- """
- def __init__(self, path, is_train):
- data_id = list()
- data_type = list()
- train_data_id, train_data_type = load_from_directory(path + "/train")
- test_data_id, test_data_type = load_from_directory(path + "/test")
- data_id.extend(train_data_id)
- data_id.extend(test_data_id)
- data_type.extend(train_data_type)
- data_type.extend(test_data_type)
- self.n_edge_types = find_max_edge_id(data_id)
- max_node_id = find_max_node_id(data_id)
- max_node_type = find_max_node_id(data_type)
- self.n_node_by_id = max_node_id
- self.n_node_by_type = max_node_type
- if is_train:
- self.node_by_id = convert_program_data(train_data_id, 1, self.n_node_by_id)
- self.node_by_type = convert_program_data(train_data_type, 1, self.n_node_by_type)
- else:
- self.node_by_id = convert_program_data(test_data_id, 1, self.n_node_by_id)
- self.node_by_type = convert_program_data(test_data_type, 1, self.n_node_by_type)
- def __getitem__(self, index):
- am = create_adjacency_matrix(self.node_by_id[index][0], self.n_node_by_id, self.n_edge_types)
- annotation = create_embedding_matrix(self.node_by_id[index][0], self.node_by_type[index][0], self.n_node_by_id,
- self.n_node_by_type)
- target = self.node_by_id[index][2] - 1
- return am, annotation, target
- def __len__(self):
- return len(self.node_by_id)
- def load_from_directory_binary(path, class_type):
- node_id_data_list = []
- node_type_data_list = []
-
- node_id_binary_true, node_type_binary_true = load_from_file(path + "/" + class_type + ".txt", 1)
- node_id_data_list.extend(node_id_binary_true)
- node_type_data_list.extend(node_type_binary_true)
- id_len = len(node_id_data_list)
-
- node_id_data_list_false = []
- node_type_data_list_false = []
- for file_name in os.listdir(path):
- if file_name != class_type + ".txt":
- node_id_binary_false, node_type_binary_false = load_from_file(path + "/" + file_name)
- node_id_data_list_false.extend(node_id_binary_false)
- node_type_data_list_false.extend(node_type_binary_false)
- random.shuffle(node_id_data_list_false)
- random.shuffle(node_type_data_list_false)
- node_id_data_list.extend(node_id_data_list_false[:id_len])
- node_type_data_list.extend(node_type_data_list_false[:id_len])
- return node_id_data_list, node_type_data_list
- class BinaryDataset:
- def __init__(self, path, class_type, is_train):
- data_id = list()
- data_type = list()
- train_data_id, train_data_type = load_from_directory_binary(path + "/train", class_type)
- test_data_id, test_data_type = load_from_directory_binary(path + "/test", class_type)
- data_id.extend(train_data_id)
- data_id.extend(test_data_id)
- data_type.extend(train_data_type)
- data_type.extend(test_data_type)
- self.n_edge_types = find_max_edge_id(data_id)
- max_node_id = find_max_node_id(data_id)
- max_node_type = find_max_node_id(data_type)
- self.n_node_by_id = max_node_id
- self.n_node_by_type = max_node_type
- if is_train:
- self.node_by_id = convert_program_data(train_data_id, 1, self.n_node_by_id)
- self.node_by_type = convert_program_data(train_data_type, 1, self.n_node_by_type)
- else:
- self.node_by_id = convert_program_data(test_data_id, 1, self.n_node_by_id)
- self.node_by_type = convert_program_data(test_data_type, 1, self.n_node_by_type)
- def __getitem__(self, index):
- am = create_adjacency_matrix(self.node_by_id[index][0], self.n_node_by_id, self.n_edge_types)
- annotation = create_embedding_matrix(self.node_by_id[index][0], self.node_by_type[index][0], self.n_node_by_id,
- self.n_node_by_type)
- target = self.node_by_id[index][2] - 1
- return am, annotation, target
- def __len__(self):
- return len(self.node_by_id)
- if __name__ == '__main__':
-
-
-
-
-
- binary_dataset = BinaryDataset("I:\Program\Python\sap\GnnForPrivacyScan\data\\traindatabinary", "Archive", True)
- for d in binary_dataset:
- a = 5
|