# -*- coding: utf-8 -*- """ Created on Thu May 21 19:19:01 2020 读取数据并对数据做预处理 统计出训练数据中出现频次最多的5k个单词,用这出现最多的5k个单词创建词表(词向量) 对于测试数据,直接用训练数据构建的词表 @author: """ import os import copy import torch import torch.nn as nn from torch.autograd import Variable import torch.utils.data # 新添加代码 import sklearn from sklearn import model_selection import numpy as np import pymysql import classifyer from nlpcda import Simbert import logging import character_processor from bert4keras.backend import keras, K from bert4keras.models import build_transformer_model from bert4keras.tokenizers import Tokenizer from bert4keras.snippets import sequence_padding, AutoRegressiveDecoder fileName = './model_train.log' formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(module)s: %(message)s', datefmt='%m/%d/%Y %H:%M:%S') handler = logging.FileHandler(filename=fileName, encoding="utf-8") handler.setFormatter(formatter) logging.basicConfig(level=logging.DEBUG, handlers=[handler]) simbert_config = { 'model_path': './chinese_roformer-sim-char_L-12_H-768_A-12', 'CUDA_VISIBLE_DEVICES': '0,1', 'max_len': 64, 'seed': 1 } bug_type = ["不正常退出", "功能不完整", "用户体验", "页面布局缺陷", "性能", "安全"] num_classes = len(bug_type) word2index_path_base = "../word2index/" torch.manual_seed(123) datas = [] labels = [] processor = character_processor.DataProcessor("./ltp_data_v3.4.0/cws.model", "./ltp_data_v3.4.0/pos.model") synonym_dict = processor.synonym_word_dict("./ltp_data_v3.4.0/HIT-IRLab-同义词词林.txt") def read_file(file_path): f = open(file_path, "r", encoding="utf-8") msg = f.read() return msg def process_file(root_path): # 获取该目录下所有的文件名称和目录名称 dir_or_files = os.listdir(root_path) for dir_file in dir_or_files: # 获取目录或者文件的路径 dir_file_path = os.path.join(root_path, dir_file) # 判断该路径为文件还是路径 if os.path.isdir(dir_file_path): # 递归获取所有文件和目录的路径 process_file(dir_file_path) else: if "after" in dir_file_path: description = read_file(dir_file_path) datas.append(description) label_china = dir_file_path.split("/")[8] label = [] for i in bug_type: if i == label_china: label.append(1) else: label.append(0) labels.append(label) class DataProcessor(object): def __init__(self, dataset_name=None, host=None, user=None, password=None): self.dataset_name = dataset_name self.datas_path = "./word_list_data/" + str(self.dataset_name) + ".npy" self.labels_path = "./word_list_data/" + str(self.dataset_name) + "_label.npy" self.datas_increase_path = "./word_list_data/" + str(self.dataset_name) + "_increase.npy" self.labels_increase_path = "./word_list_data/" + str(self.dataset_name) + "_label_increase.npy" self.host = host self.user = user self.password = password self.directory = "./splited_data/" if user == None or password == None: self.host = "127.0.0.1" self.user = "root" self.password = "123456" def read_text_from_db(self): datas = [] labels = [] conn = pymysql.connect(host=self.host, user=self.user, password=self.password, database="mt_clerk_test", charset="utf8") cursor = conn.cursor() try: sql = "select id from dataset where name = %s" cursor.execute(sql, str(self.dataset_name)) dataset_id = cursor.fetchall()[0][0] sql = "select test_process,test_requirement,product_version_module,tccategory_id,name from test_case where dataset_id = %s and tccategory_id is not null" cursor.execute(sql, str(dataset_id)) results = cursor.fetchall() for row in results: test_process = row[0] test_requirement = row[1] product_version_module = row[2] tccategory_id = int(row[3]) name = row[4] text = classifyer.text_after_ltp(test_process, test_requirement, product_version_module, name) datas.append(text) label = [] for i in range(num_classes): if i == tccategory_id - 1: label.append(1) else: label.append(0) labels.append(label) except Exception as e: raise e finally: cursor.close() conn.close() np.save(self.datas_path, datas) np.save(self.labels_path, labels) return datas, labels def read_text_from_file_system(self): global datas, labels process_file("/Users/tanghaojie/Desktop/final/手动标记后的数据/决赛自主可控众测web自主可控运维管理系统") return datas, labels def increase_data(self): logging.info("开始数据扩增") datas_pre = np.load(self.directory + self.dataset_name + "_train.npy", allow_pickle=True).tolist() labels_pre = np.load(self.directory + self.dataset_name + "_label_train.npy", allow_pickle=True).tolist() datas = [] labels = [] type_num_test = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0} for label in labels_pre: type_num_test[label.index(1) + 1] += 1 k = 500 # 每种type 要求数量 self.increase_by_synonms_list(datas, labels, datas_pre, labels_pre, type_num_test, k) np.save(self.datas_increase_path, datas) np.save(self.labels_increase_path, labels) return datas, labels def increase_by_synonms_list(self, datas, labels, datas_pre, labels_pre, type_num_test, k): if len(datas_pre) == len(labels_pre): for type in type_num_test: type_num_test[type] = int(k / type_num_test[type]) for i in range(len(datas_pre)): temp_type = labels_pre[i].index(1) + 1 # print(temp_type) datas_append = self.get_case_increased(datas_pre[i], type_num_test[temp_type]) for p in range(len(datas_append)): labels.append(labels_pre[i]) datas.extend(datas_append) def get_case_increased(self, origin, k): res = [[]] for word in origin: if len(res) < k: synonym_word_list = processor.synonym_word_list(word, synonym_dict) else: synonym_word_list = [word] # res = [l.append(w) for w in synonym_word_list for l in res] temp_res = [] for l in res: for w in synonym_word_list: t = l.copy() t.append(w) temp_res.append(t) res = temp_res return res[:k] def increase_by_simbert(self, datas, labels, datas_pre, labels_pre, type_num_test, k): # TODO 按最终比例 if len(datas_pre) == len(labels_pre): simbert = Simbert(config=simbert_config) nums = [3, 0, 0, 3, 5, 5] for i in range(len(datas_pre)): datas.append(datas_pre[i]) labels.append(labels_pre[i]) synonym_list = [] temp_type = labels_pre[i].index(1) if nums[temp_type] > 0: for word in datas_pre[i]: # synonyms = gen_synonyms(word,nums[temp_type]*5,nums[temp_type]) synonyms = sorted(simbert.replace(sent=word, create_num=nums[temp_type]), key=lambda item: item[1], reverse=True) synonym_list.append(synonyms) min_len = min([len(k) for k in synonym_list]) # 可能生成单词数不足nums[temp_type] data_increased = [[synonym_list[j][i][0] for j in range(len(synonym_list))] for i in range(min_len)] for j in range(len(data_increased)): datas.append(data_increased[j]) labels.append(labels_pre[i]) def word_count(self, datas): # 统计单词出现的频次,并将其降序排列,得出出现频次最多的单词 dic = {} for data in datas: for word in data: word = word.lower() # 所有单词转化为小写,中文没有小写 todo if (word in dic): dic[word] += 1 else: dic[word] = 1 word_count_sorted = sorted(dic.items(), key=lambda item: item[1], reverse=True) return word_count_sorted # 键是词,值是出现的次数 def word_index(self, datas, vocab_size): # 创建词表 word_count_sorted = self.word_count(datas) word2index = {} # 词表中未出现的词,因为词表大小有限,所以有些句子中的词不在词表中 word2index[""] = 0 # 句子添加的padding,whats this word2index[""] = 1 # 词表的实际大小由词的数量和限定大小决定 vocab_size = min(len(word_count_sorted), vocab_size) for i in range(vocab_size): word = word_count_sorted[i][0] word2index[word] = i + 2 # 键是 词,值是在word2index列表中的位置 word2index_path = word2index_path_base + self.dataset_name + ".npy" np.save(word2index_path, word2index) return word2index, vocab_size def get_datasets_origin(self, vocab_size, max_len): # 注,由于nn.Embedding每次生成的词嵌入不固定,因此此处同时获取训练数据的词嵌入和测试数据的词嵌入 # 测试数据的词表也用训练数据创建 logging.info('正在从数据库读取原始数据') # txt_origin, label_origin = self.read_text_from_db() # txt_origin = np.load(self.datas_path, allow_pickle=True).tolist() # label_origin = np.load(self.labels_path, allow_pickle=True).tolist() # logging.info('正在对原始数据进行数据扩增') # txt_origin, label_origin = self.increase_data() # txt_origin = np.load(self.datas_increase_path, allow_pickle=True).tolist() # label_origin = np.load(self.labels_increase_path, allow_pickle=True).tolist() # # train_datas, test_datas, train_labels, test_labels = sklearn.model_selection.train_test_split(txt_origin, # label_origin, # random_state=2, # train_size=0.6, # test_size=0.4) # test_datas, develop_datas, test_labels, develop_labels = sklearn.model_selection.train_test_split(test_datas, # test_labels, # random_state=2, # train_size=0.5, # test_size=0.5) directory = "./splited_data/" # train_datas = np.load(directory + self.dataset_name + "_train.npy", allow_pickle=True).tolist() # train_labels = np.load(directory + self.dataset_name + "_label_train.npy", allow_pickle=True).tolist() # train_datas = np.load(self.datas_increase_path, allow_pickle=True).tolist() # train_labels = np.load(self.labels_increase_path, allow_pickle=True).tolist() # train_datas = np.load(directory + self.dataset_name + "_train.npy", allow_pickle=True).tolist() # train_labels = np.load(directory + self.dataset_name + "_label_train.npy", allow_pickle=True).tolist() train_datas = np.load(self.datas_increase_path, allow_pickle=True).tolist() train_labels = np.load(self.labels_increase_path, allow_pickle=True).tolist() test_datas = np.load(directory + self.dataset_name + "_test.npy", allow_pickle=True).tolist() test_labels = np.load(directory + self.dataset_name + "_label_test.npy", allow_pickle=True).tolist() develop_datas = np.load(directory + self.dataset_name + "_develop.npy", allow_pickle=True).tolist() develop_labels = np.load(directory + self.dataset_name + "_label_develop.npy", allow_pickle=True).tolist() # txt_origin, label_origin = self.increase_data() logging.info('正在制作词表') word_datas = copy.deepcopy(train_datas) word_datas.extend(develop_datas) word_datas.extend(test_datas) word2index, vocab_size = self.word_index(word_datas, vocab_size) # 获得word2index词表 和 词表的实际大小 logging.info('正在获取词向量') train_features = [] for data in train_datas: feature = [] for word in data: word = word.lower() # 词表中的单词均为小写 if word in word2index: feature.append(word2index[word]) else: feature.append(word2index[""]) # 词表中未出现的词用代替 if (len(feature) == max_len): # 限制句子的最大长度,超出部分直接截断 break # 对未达到最大长度的句子添加padding feature = feature + [word2index[""]] * (max_len - len(feature)) train_features.append(feature) develop_features = [] for data in develop_datas: feature = [] for word in data: word = word.lower() # 词表中的单词均为小写 if word in word2index: feature.append(word2index[word]) else: feature.append(word2index[""]) # 词表中未出现的词用代替 if (len(feature) == max_len): # 限制句子的最大长度,超出部分直接截断 break # 对未达到最大长度的句子添加padding feature = feature + [word2index[""]] * (max_len - len(feature)) develop_features.append(feature) test_features = [] for data in test_datas: feature = [] for word in data: word = word.lower() # 词表中的单词均为小写 if word in word2index: feature.append(word2index[word]) else: feature.append(word2index[""]) # 词表中未出现的词用代替 if (len(feature) == max_len): # 限制句子的最大长度,超出部分直接截断 break # 对未达到最大长度的句子添加padding feature = feature + [word2index[""]] * (max_len - len(feature)) test_features.append(feature) return train_features, develop_features, test_features, train_labels, develop_labels, test_labels, word2index def get_datasets(self, train_features, develop_features, test_features, train_labels, develop_labels, test_labels, vocab_size, embedding_size): # 将词的index转换成tensor,train_features中数据的维度需要一致,否则会报错 train_features = torch.LongTensor(train_features) train_labels = torch.FloatTensor(train_labels) develop_features = torch.LongTensor(develop_features) develop_labels = torch.FloatTensor(develop_labels) test_features = torch.LongTensor(test_features) test_labels = torch.FloatTensor(test_labels) # 将词转化为embedding # 词表中有两个特殊的词,所以词表实际大小为vocab_size + 2 embed = nn.Embedding(vocab_size + 2, embedding_size) # https://www.jianshu.com/p/63e7acc5e890 train_features = embed(train_features) develop_features = embed(develop_features) test_features = embed(test_features) # 指定输入特征是否需要计算梯度 train_features = Variable(train_features, requires_grad=False) # https://www.cnblogs.com/henuliulei/p/11363121.html train_datasets = torch.utils.data.TensorDataset(train_features, train_labels) develop_features = Variable(develop_features, requires_grad=False) # https://www.cnblogs.com/henuliulei/p/11363121.html develop_datasets = torch.utils.data.TensorDataset(develop_features, develop_labels) test_features = Variable(test_features, requires_grad=False) test_datasets = torch.utils.data.TensorDataset(test_features, test_labels) # https://www.cnblogs.com/hahaah/p/14914603.html return train_datasets, develop_datasets, test_datasets def new_split(dataset_name): names = [dataset_name] directory = "./splited_data/" for name in names: dp = DataProcessor(dataset_name=name) txt_origin = np.load(dp.datas_path, allow_pickle=True).tolist() label_origin = np.load(dp.labels_path, allow_pickle=True).tolist() train_datas, test_datas, train_labels, test_labels = sklearn.model_selection.train_test_split(txt_origin, label_origin, random_state=13, train_size=0.6, test_size=0.4) test_datas, develop_datas, test_labels, develop_labels = sklearn.model_selection.train_test_split(test_datas, test_labels, random_state=13, train_size=0.5, test_size=0.5) np.save(directory + name + "_train.npy", train_datas) np.save(directory + name + "_label_train.npy", train_labels) np.save(directory + name + "_test.npy", test_datas) np.save(directory + name + "_label_test.npy", test_labels) np.save(directory + name + "_develop.npy", develop_datas) np.save(directory + name + "_label_develop.npy", develop_labels) def increase_data(names): for name in names: dp = DataProcessor(dataset_name=name) # train_labels = np.load(dp.directory + dp.dataset_name + "_label_train.npy", allow_pickle=True).tolist() # test_labels = np.load(dp.directory + dp.dataset_name + "_label_test.npy", allow_pickle=True).tolist() train_labels = np.load("./splited_data/"+name+"_label_train.npy", allow_pickle=True).tolist() test_labels = np.load(dp.labels_increase_path, allow_pickle=True).tolist() type_num_train = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0} type_num_test = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0} for label in test_labels: type_num_test[label.index(1) + 1] += 1 for label in train_labels: type_num_train[label.index(1) + 1] += 1 print("扩增前:", type_num_train) print("扩增后:", type_num_test) def increase_test(names): for name in names: dp = DataProcessor(dataset_name=name) dp.increase_data() def read_data(): names = ["航天中认自主可控众包测试练习赛", "决赛自主可控众测web自主可控运维管理系统"] for name in names: dp = DataProcessor(dataset_name=name) dp.read_text_from_db() if __name__ == '__main__': direc = "./splited_data/" names = ['趣享GIF众包测试201908试题'] # new_split(names[0]) # read_data() increase_test(names) increase_data(names)