123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433 |
- """
- Created on Thu May 21 19:19:01 2020
- 读取数据并对数据做预处理
- 统计出训练数据中出现频次最多的5k个单词,用这出现最多的5k个单词创建词表(词向量)
- 对于测试数据,直接用训练数据构建的词表
- @author:
- """
- import os
- import copy
- import torch
- import torch.nn as nn
- from torch.autograd import Variable
- import torch.utils.data
- import sklearn
- from sklearn import model_selection
- import numpy as np
- import pymysql
- import classifyer
- from nlpcda import Simbert
- import logging
- import character_processor
- from bert4keras.backend import keras, K
- from bert4keras.models import build_transformer_model
- from bert4keras.tokenizers import Tokenizer
- from bert4keras.snippets import sequence_padding, AutoRegressiveDecoder
- fileName = './model_train.log'
- formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(module)s: %(message)s',
- datefmt='%m/%d/%Y %H:%M:%S')
- handler = logging.FileHandler(filename=fileName, encoding="utf-8")
- handler.setFormatter(formatter)
- logging.basicConfig(level=logging.DEBUG, handlers=[handler])
- simbert_config = {
- 'model_path': './chinese_roformer-sim-char_L-12_H-768_A-12',
- 'CUDA_VISIBLE_DEVICES': '0,1',
- 'max_len': 64,
- 'seed': 1
- }
- bug_type = ["不正常退出", "功能不完整", "用户体验", "页面布局缺陷", "性能", "安全"]
- num_classes = len(bug_type)
- word2index_path_base = "../word2index/"
- torch.manual_seed(123)
- datas = []
- labels = []
- processor = character_processor.DataProcessor("./ltp_data_v3.4.0/cws.model", "./ltp_data_v3.4.0/pos.model")
- synonym_dict = processor.synonym_word_dict("./ltp_data_v3.4.0/HIT-IRLab-同义词词林.txt")
- def read_file(file_path):
- f = open(file_path, "r", encoding="utf-8")
- msg = f.read()
- return msg
- def process_file(root_path):
-
- dir_or_files = os.listdir(root_path)
- for dir_file in dir_or_files:
-
- dir_file_path = os.path.join(root_path, dir_file)
-
- if os.path.isdir(dir_file_path):
-
- process_file(dir_file_path)
- else:
- if "after" in dir_file_path:
- description = read_file(dir_file_path)
- datas.append(description)
- label_china = dir_file_path.split("/")[8]
- label = []
- for i in bug_type:
- if i == label_china:
- label.append(1)
- else:
- label.append(0)
- labels.append(label)
- class DataProcessor(object):
- def __init__(self, dataset_name=None, host=None, user=None, password=None):
- self.dataset_name = dataset_name
- self.datas_path = "./word_list_data/" + str(self.dataset_name) + ".npy"
- self.labels_path = "./word_list_data/" + str(self.dataset_name) + "_label.npy"
- self.datas_increase_path = "./word_list_data/" + str(self.dataset_name) + "_increase.npy"
- self.labels_increase_path = "./word_list_data/" + str(self.dataset_name) + "_label_increase.npy"
- self.host = host
- self.user = user
- self.password = password
- self.directory = "./splited_data/"
- if user == None or password == None:
- self.host = "127.0.0.1"
- self.user = "root"
- self.password = "123456"
- def read_text_from_db(self):
- datas = []
- labels = []
- conn = pymysql.connect(host=self.host, user=self.user, password=self.password, database="mt_clerk_test",
- charset="utf8")
- cursor = conn.cursor()
- try:
- sql = "select id from dataset where name = %s"
- cursor.execute(sql, str(self.dataset_name))
- dataset_id = cursor.fetchall()[0][0]
- sql = "select test_process,test_requirement,product_version_module,tccategory_id,name from test_case where dataset_id = %s and tccategory_id is not null"
- cursor.execute(sql, str(dataset_id))
- results = cursor.fetchall()
- for row in results:
- test_process = row[0]
- test_requirement = row[1]
- product_version_module = row[2]
- tccategory_id = int(row[3])
- name = row[4]
- text = classifyer.text_after_ltp(test_process, test_requirement, product_version_module, name)
- datas.append(text)
- label = []
- for i in range(num_classes):
- if i == tccategory_id - 1:
- label.append(1)
- else:
- label.append(0)
- labels.append(label)
- except Exception as e:
- raise e
- finally:
- cursor.close()
- conn.close()
- np.save(self.datas_path, datas)
- np.save(self.labels_path, labels)
- return datas, labels
- def read_text_from_file_system(self):
- global datas, labels
- process_file("/Users/tanghaojie/Desktop/final/手动标记后的数据/决赛自主可控众测web自主可控运维管理系统")
- return datas, labels
- def increase_data(self):
- logging.info("开始数据扩增")
- datas_pre = np.load(self.directory + self.dataset_name + "_train.npy", allow_pickle=True).tolist()
- labels_pre = np.load(self.directory + self.dataset_name + "_label_train.npy", allow_pickle=True).tolist()
- datas = []
- labels = []
- type_num_test = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0}
- for label in labels_pre:
- type_num_test[label.index(1) + 1] += 1
- k = 500
- self.increase_by_synonms_list(datas, labels, datas_pre, labels_pre, type_num_test, k)
- np.save(self.datas_increase_path, datas)
- np.save(self.labels_increase_path, labels)
- return datas, labels
- def increase_by_synonms_list(self, datas, labels, datas_pre, labels_pre, type_num_test, k):
- if len(datas_pre) == len(labels_pre):
- for type in type_num_test:
- type_num_test[type] = int(k / type_num_test[type])
- for i in range(len(datas_pre)):
- temp_type = labels_pre[i].index(1) + 1
-
- datas_append = self.get_case_increased(datas_pre[i], type_num_test[temp_type])
- for p in range(len(datas_append)):
- labels.append(labels_pre[i])
- datas.extend(datas_append)
- def get_case_increased(self, origin, k):
- res = [[]]
- for word in origin:
- if len(res) < k:
- synonym_word_list = processor.synonym_word_list(word, synonym_dict)
- else:
- synonym_word_list = [word]
-
- temp_res = []
- for l in res:
- for w in synonym_word_list:
- t = l.copy()
- t.append(w)
- temp_res.append(t)
- res = temp_res
- return res[:k]
- def increase_by_simbert(self, datas, labels, datas_pre, labels_pre, type_num_test, k):
-
- if len(datas_pre) == len(labels_pre):
- simbert = Simbert(config=simbert_config)
- nums = [3, 0, 0, 3, 5, 5]
- for i in range(len(datas_pre)):
- datas.append(datas_pre[i])
- labels.append(labels_pre[i])
- synonym_list = []
- temp_type = labels_pre[i].index(1)
- if nums[temp_type] > 0:
- for word in datas_pre[i]:
-
- synonyms = sorted(simbert.replace(sent=word, create_num=nums[temp_type]),
- key=lambda item: item[1],
- reverse=True)
- synonym_list.append(synonyms)
- min_len = min([len(k) for k in synonym_list])
- data_increased = [[synonym_list[j][i][0] for j in range(len(synonym_list))] for i in range(min_len)]
- for j in range(len(data_increased)):
- datas.append(data_increased[j])
- labels.append(labels_pre[i])
- def word_count(self, datas):
-
- dic = {}
- for data in datas:
- for word in data:
- word = word.lower()
- if (word in dic):
- dic[word] += 1
- else:
- dic[word] = 1
- word_count_sorted = sorted(dic.items(), key=lambda item: item[1], reverse=True)
- return word_count_sorted
- def word_index(self, datas, vocab_size):
-
- word_count_sorted = self.word_count(datas)
- word2index = {}
-
- word2index["<unk>"] = 0
-
- word2index["<pad>"] = 1
-
- vocab_size = min(len(word_count_sorted), vocab_size)
- for i in range(vocab_size):
- word = word_count_sorted[i][0]
- word2index[word] = i + 2
- word2index_path = word2index_path_base + self.dataset_name + ".npy"
- np.save(word2index_path, word2index)
- return word2index, vocab_size
- def get_datasets_origin(self, vocab_size, max_len):
-
-
- logging.info('正在从数据库读取原始数据')
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- directory = "./splited_data/"
-
-
-
-
-
-
- train_datas = np.load(self.datas_increase_path, allow_pickle=True).tolist()
- train_labels = np.load(self.labels_increase_path, allow_pickle=True).tolist()
- test_datas = np.load(directory + self.dataset_name + "_test.npy", allow_pickle=True).tolist()
- test_labels = np.load(directory + self.dataset_name + "_label_test.npy", allow_pickle=True).tolist()
- develop_datas = np.load(directory + self.dataset_name + "_develop.npy", allow_pickle=True).tolist()
- develop_labels = np.load(directory + self.dataset_name + "_label_develop.npy", allow_pickle=True).tolist()
-
- logging.info('正在制作词表')
- word_datas = copy.deepcopy(train_datas)
- word_datas.extend(develop_datas)
- word_datas.extend(test_datas)
- word2index, vocab_size = self.word_index(word_datas, vocab_size)
- logging.info('正在获取词向量')
- train_features = []
- for data in train_datas:
- feature = []
- for word in data:
- word = word.lower()
- if word in word2index:
- feature.append(word2index[word])
- else:
- feature.append(word2index["<unk>"])
- if (len(feature) == max_len):
- break
-
- feature = feature + [word2index["<pad>"]] * (max_len - len(feature))
- train_features.append(feature)
- develop_features = []
- for data in develop_datas:
- feature = []
- for word in data:
- word = word.lower()
- if word in word2index:
- feature.append(word2index[word])
- else:
- feature.append(word2index["<unk>"])
- if (len(feature) == max_len):
- break
-
- feature = feature + [word2index["<pad>"]] * (max_len - len(feature))
- develop_features.append(feature)
- test_features = []
- for data in test_datas:
- feature = []
- for word in data:
- word = word.lower()
- if word in word2index:
- feature.append(word2index[word])
- else:
- feature.append(word2index["<unk>"])
- if (len(feature) == max_len):
- break
-
- feature = feature + [word2index["<pad>"]] * (max_len - len(feature))
- test_features.append(feature)
- return train_features, develop_features, test_features, train_labels, develop_labels, test_labels, word2index
- def get_datasets(self, train_features, develop_features, test_features, train_labels, develop_labels, test_labels,
- vocab_size, embedding_size):
-
- train_features = torch.LongTensor(train_features)
- train_labels = torch.FloatTensor(train_labels)
- develop_features = torch.LongTensor(develop_features)
- develop_labels = torch.FloatTensor(develop_labels)
- test_features = torch.LongTensor(test_features)
- test_labels = torch.FloatTensor(test_labels)
-
-
- embed = nn.Embedding(vocab_size + 2, embedding_size)
- train_features = embed(train_features)
- develop_features = embed(develop_features)
- test_features = embed(test_features)
-
- train_features = Variable(train_features,
- requires_grad=False)
- train_datasets = torch.utils.data.TensorDataset(train_features, train_labels)
- develop_features = Variable(develop_features,
- requires_grad=False)
- develop_datasets = torch.utils.data.TensorDataset(develop_features, develop_labels)
- test_features = Variable(test_features, requires_grad=False)
- test_datasets = torch.utils.data.TensorDataset(test_features,
- test_labels)
- return train_datasets, develop_datasets, test_datasets
- def new_split(dataset_name):
- names = [dataset_name]
- directory = "./splited_data/"
- for name in names:
- dp = DataProcessor(dataset_name=name)
- txt_origin = np.load(dp.datas_path, allow_pickle=True).tolist()
- label_origin = np.load(dp.labels_path, allow_pickle=True).tolist()
- train_datas, test_datas, train_labels, test_labels = sklearn.model_selection.train_test_split(txt_origin,
- label_origin,
- random_state=13,
- train_size=0.6,
- test_size=0.4)
- test_datas, develop_datas, test_labels, develop_labels = sklearn.model_selection.train_test_split(test_datas,
- test_labels,
- random_state=13,
- train_size=0.5,
- test_size=0.5)
- np.save(directory + name + "_train.npy", train_datas)
- np.save(directory + name + "_label_train.npy", train_labels)
- np.save(directory + name + "_test.npy", test_datas)
- np.save(directory + name + "_label_test.npy", test_labels)
- np.save(directory + name + "_develop.npy", develop_datas)
- np.save(directory + name + "_label_develop.npy", develop_labels)
- def increase_data(names):
- for name in names:
- dp = DataProcessor(dataset_name=name)
-
-
- train_labels = np.load("./splited_data/"+name+"_label_train.npy", allow_pickle=True).tolist()
- test_labels = np.load(dp.labels_increase_path, allow_pickle=True).tolist()
- type_num_train = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0}
- type_num_test = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0}
- for label in test_labels:
- type_num_test[label.index(1) + 1] += 1
- for label in train_labels:
- type_num_train[label.index(1) + 1] += 1
- print("扩增前:", type_num_train)
- print("扩增后:", type_num_test)
- def increase_test(names):
- for name in names:
- dp = DataProcessor(dataset_name=name)
- dp.increase_data()
- def read_data():
- names = ["航天中认自主可控众包测试练习赛", "决赛自主可控众测web自主可控运维管理系统"]
- for name in names:
- dp = DataProcessor(dataset_name=name)
- dp.read_text_from_db()
- if __name__ == '__main__':
- direc = "./splited_data/"
- names = ['趣享GIF众包测试201908试题']
-
-
- increase_test(names)
- increase_data(names)
|