# -*-coding:UTF-8-*- import csv from itertools import * import keras import json import networkx as nx import sys # sys.path.append("../") import os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from scripts.logger.lemon_logger import Logger from scripts.tools.mutator_selection_logic import Roulette, MCMC from scripts.mutation.model_mutation_generators import * import argparse import ast import numpy as np from scripts.mutation.mutation_utils import * import pickle from scripts.tools import utils from scripts.tools.utils import ModelUtils import shutil import re import datetime import configparser import warnings import math lines = 0 # np.random.seed(20200501) warnings.filterwarnings("ignore") os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "" import psutil def partially_nan_or_inf(predictions, bk_num): # 检查是否无穷大或空 """ Check if there is NAN in the result """ def get_nan_num(nds): _nan_num = 0 for nd in nds: if np.isnan(nd).any() or np.isinf(nd).any(): _nan_num += 1 return _nan_num if len(predictions) == bk_num: for input_predict in zip(*predictions): nan_num = get_nan_num(input_predict) if 0 < nan_num < bk_num: return True else: continue return False else: raise Exception("wrong backend amounts") def get_selector_by_startegy_name(mutator_s, mutant_s): mutant_strategy_dict = {"ROULETTE": Roulette} mutator_strategy_dict = {"MCMC": MCMC} return mutator_strategy_dict[mutator_s], mutant_strategy_dict[mutant_s] def save_mutate_history(selector, invalid_history: dict, mutant_history: list): mutator_history_path = os.path.join(experiment_dir, "mutator_history.csv") mutant_history_path = os.path.join(experiment_dir, "mutant_history.txt") with open(mutator_history_path, "w+") as fw: fw.write("Name,Success,Invalid,Total\n") for op in invalid_history.keys(): mtrs = selector.mutators[op] invalid_cnt = invalid_history[op] fw.write( "{},{},{},{}\n".format( op, mtrs.delta_bigger_than_zero, invalid_cnt, mtrs.total ) ) with open(mutant_history_path, "w+") as fw: for mutant in mutant_history: fw.write("{}\n".format(mutant)) def is_nan_or_inf(t): if math.isnan(t) or math.isinf(t): return True else: return False def continue_checker(**run_stat): # 判断算法是否满足退出条件 start_time = run_stat["start_time"] time_limitation = run_stat["time_limit"] cur_counters = run_stat["cur_counters"] counters_limit = run_stat["counters_limit"] s_mode = run_stat["stop_mode"] # if timing # 时间限制 if s_mode == "TIMING": hours, minutes, seconds = utils.ToolUtils.get_HH_mm_ss( datetime.datetime.now() - start_time ) total_minutes = hours * 60 + minutes mutate_logger.info( f"INFO: Mutation progress: {total_minutes}/{time_limitation} Minutes!" ) if total_minutes < time_limitation: return True else: return False # if counters # 次数限制,size(models) 0 else mutator.delta_bigger_than_zero ) if mutant_strategy == "ROULETTE" and delta > 0: # when size >= capacity: # random_mutant & Roulette would drop one and add new one if mutant_selector.is_full(): mutant_selector.pop_one_mutant() mutant_selector.add_mutant( new_seed_name ) # 如果放大了不一致程度,即ACC(m)>=ACC(s),就加入到种子模型集合里 last_inconsistency = accumulative_inconsistency # 29行 mutate_logger.info( "SUCCESS:{} pass testing!".format(new_seed_name) ) mutant_counter += 1 else: mutate_op_invalid_history[selected_op] += 1 mutate_logger.error("Invalid model Found!") else: mutate_logger.error("Crashed or NaN model Found!") else: mutate_logger.error( "Exception raised when mutate {} with {}".format( picked_seed, selected_op ) ) mutate_logger.info("Mutated op used history:") mutate_logger.info(mutate_op_history) mutate_logger.info("Invalid mutant generated history:") mutate_logger.info(mutate_op_invalid_history) run_stat["cur_counters"] = mutant_counter save_mutate_history(mutator_selector, mutate_op_invalid_history, mutant_history) # calc_cov = CoverageCalculatornew(all_json_path, api_config_pool_path) # lines = 0 # for file in os.listdir(folder_path): # if file == 'total.json': continue # file_path = os.path.join(folder_path, file) # calc_cov.load_json(file_path) # with open(file_path, 'r') as sub_json: # sub_info = json.load(sub_json) # outer_div = len(tar_set - set(sub_info['layer_type'])) # input_cov, config_cov, api_cov, op_type_cov, op_num_cov, edge_cov = calc_cov.cal_coverage() # with open(output_path, 'a+', newline='') as fi: # writer = csv.writer(fi) # head = ['Layer Input Coverage', 'Layer Parameter Diversity', 'Layer Sequence Diversity', # 'Operator Type Coverage', 'Operator Num Coverage', 'Edge Coverage', 'Accumulative inconsistency'] # if not lines: # writer.writerow(head) # lines += 1 # printlist = [input_cov, config_cov, api_cov, op_type_cov, op_num_cov, edge_cov, # acc[lines]] # writer.writerow(printlist) return res_dict def generate_metrics_result(res_dict, predict_output, model_idntfr): # 计算ACC mutate_logger.info("Generating Metrics Result") accumulative_incons = 0 backends_pairs_num = 0 # Compare results pair by pair for pair in combinations(predict_output.items(), 2): # 每一对库 backends_pairs_num += 1 backend1, backend2 = pair bk_name1, prediction1 = backend1 bk_name2, prediction2 = backend2 bk_pair = "{}_{}".format(bk_name1, bk_name2) for metrics_name, metrics_result_dict in res_dict.items(): metrics_func = utils.MetricsUtils.get_metrics_by_name(metrics_name) # 计算 # metrics_results in list type metrics_results = metrics_func( prediction1, prediction2, y_test[: flags.test_size] ) # 一共test_size个数据集,所以metrics_result是长度为test_size的预测结果列表 # ACC -> float: The sum of all inputs under all backends accumulative_incons += sum(metrics_results) # ACC=∑ for input_idx, delta in enumerate(metrics_results): delta_key = "{}_{}_{}_input{}".format( model_idntfr, bk_name1, bk_name2, input_idx ) metrics_result_dict[delta_key] = delta mutate_logger.info(f"Accumulative Inconsistency: {accumulative_incons}") return res_dict, accumulative_incons def generate_gini_result(predict_output, backends): gini_res = {bk: 0 for bk in backends} for pair in predict_output.items(): bk_name, prediction = pair gini_res[bk_name] = utils.MetricsUtils.get_gini_mean(prediction) return gini_res def generate_theta(predict_output, backends): theta_res = {bk: 0 for bk in backends} for pair in predict_output.items(): bk_name, prediction = pair theta_res[bk_name] = utils.MetricsUtils.get_theta_mean( prediction, y_test[: flags.test_size] ) return theta_res SHAPE_SPACE = 5 model_num = 0 def get_model_prediction(res_dict, model_path, model_name, exp, test_size, backends): # 计算ACC """ Get model prediction on different backends and calculate distance by metrics """ root_dir = model_path.split("origin_model")[0] npy_path = ( root_dir + "res.npy" ) # 保存模型预测结果的路径,patch_prediction_extractor.py中的44行改成一样的路径 predict_output = {b: [] for b in backends} model_idntfr = model_name[:-3] all_backends_predict_status = True for bk in backends: python_bin = f"{python_prefix}/{bk}/bin/python" predict_st = datetime.datetime.now() # 使用不同的库进行预测 pre_status_bk = os.system( f"{python_bin} -u -m patch_prediction_extractor --backend {bk} " f"--exp {exp} --test_size {test_size} --model {model_path} " f"--config_name {flags.config_name}" ) predict_et = datetime.datetime.now() predict_td = predict_et - predict_st h, m, s = utils.ToolUtils.get_HH_mm_ss(predict_td) mutate_logger.info( "Prediction Time Used on {} : {}h, {}m, {}s".format(bk, h, m, s) ) # If no exception is thrown,save prediction result if pre_status_bk == 0: # 预测执行成功,保存结果 # data = pickle.loads(redis_conn.hget("prediction_{}".format(model_name), bk)) data = np.load(npy_path) predict_output[bk] = data # print(data) # record the crashed backend else: all_backends_predict_status = False mutate_logger.error( "{} crash on backend {} when predicting ".format(model_name, bk) ) status = False accumulative_incons = None # run ok on all platforms if ( all_backends_predict_status ): # 所有的库都执行成功且保存了结果,判断结果中是否有错误 predictions = list(predict_output.values()) res_dict, accumulative_incons = generate_metrics_result( res_dict=res_dict, predict_output=predict_output, model_idntfr=model_idntfr ) # 计算ACC(用于衡量预测结果的不一致程度) # gini_res = generate_gini_result(predict_output=predict_output, backends=backends) # theta = generate_theta(predict_output=predict_output, backends=backends) # import csv # csvfile = open(r"D:\lemon_outputs\result\mobilenet.1.00.224-imagenet\tensorflow\5.csv", 'a+',newline='') # write=csv.writer(csvfile) # write.writerow([accumulative_incons, gini_res['tensorflow'], theta['tensorflow']]) # csvfile.close() # # csvfile = open(r"D:\lemon_outputs\result\mobilenet.1.00.224-imagenet\mxnet\5.csv", 'a+',newline='') # write=csv.writer(csvfile) # write.writerow([accumulative_incons, gini_res['mxnet'], theta['mxnet']]) # csvfile.close() # 计算gini # If all backends are working fine, check if there exists NAN or INF in the result # `accumulative_incons` is nan or inf --> NaN or INF in results if is_nan_or_inf(accumulative_incons): # has NaN on partial backends if partially_nan_or_inf(predictions, len(backends)): nan_model_path = os.path.join(nan_dir, f"{model_idntfr}_NaN_bug.h5") mutate_logger.error("Error: Found one NaN bug. move NAN model") # has NaN on all backends --> not a NaN bug else: nan_model_path = os.path.join( nan_dir, f"{model_idntfr}_NaN_on_all_backends.h5" ) mutate_logger.error( "Error: Found one NaN Model on all libraries. move NAN model" ) shutil.move(model_path, nan_model_path) else: # No NaN or INF on any backend print(model_path) for bk in backends: python_bin = f"{python_prefix}/{bk}/bin/python" os.system( f"{python_bin} -u -m model_to_txt --backend {bk} --model_path {model_path} --root_dir {root_dir}" ) # if 'svhn' in model_name or 'fashion2' in model_name: # file_path = os.path.join(folder_path, model_path.split("\\")[-1][:-5] + '.json') # else: # file_path = os.path.join(folder_path, model_path.split("\\")[-1][:-3] + '.json') # union_json(file_path, all_json_path) # model_now = keras.models.load_model(model_path, custom_objects=custom_objects()) # inner_div[model_num] = calc_inner_div(model_now) # with open(file_path, 'r') as sub_json: # sub_info = json.load(sub_json) # if len(set(sub_info['layer_type'])) > len(tar_set): # tar_set = set(sub_info['layer_type']) mutate_logger.info("Saving prediction") with open( "{}/prediction_{}.pkl".format(inner_output_dir, model_idntfr), "wb+" ) as f: pickle.dump(predict_output, file=f) status = True # save crashed model else: mutate_logger.error("Error: move crash model") crash_model_path = os.path.join(crash_dir, model_name) shutil.move(model_path, crash_model_path) return status, res_dict, accumulative_incons, predict_output if __name__ == "__main__": starttime = datetime.datetime.now() """ Parser of command args. It could make mutate_lemon.py run independently without relying on mutation_executor.py """ parse = argparse.ArgumentParser() parse.add_argument( "--is_mutate", type=ast.literal_eval, default=False, help="parameter to determine mutation option", ) parse.add_argument( "--mutate_op", type=str, nargs="+", choices=[ "WS", "GF", "NEB", "NAI", "NS", "ARem", "ARep", "LA", "LC", "LR", "LS", "MLA", ], help="parameter to determine mutation option", ) parse.add_argument( "--model", type=str, help="relative path of model file(from root dir)" ) parse.add_argument( "--output_dir", type=str, help="relative path of output dir(from root dir)" ) parse.add_argument("--backends", type=str, nargs="+", help="list of backends") parse.add_argument( "--mutate_num", type=int, help="number of variant models generated by each mutation operator", ) parse.add_argument("--mutate_ratio", type=float, help="ratio of mutation") parse.add_argument("--exp", type=str, help="experiments identifiers") parse.add_argument("--test_size", type=int, help="amount of testing image") parse.add_argument("--config_name", type=str, help="config name") flags, unparsed = parse.parse_known_args(sys.argv[1:]) warnings.filterwarnings("ignore") lemon_cfg = configparser.ConfigParser() # lemon_cfg.read(f".\config\{flags.config_name}") cfg_path = os.path.join(os.path.dirname(os.getcwd()), "config", flags.config_name) lemon_cfg.read(cfg_path) # lemon_cfg.read(f"config/demo.conf") time_limit = lemon_cfg["parameters"].getint("time_limit") mutator_strategy = lemon_cfg["parameters"].get("mutator_strategy").upper() mutant_strategy = lemon_cfg["parameters"].get("mutant_strategy").upper() stop_mode = lemon_cfg["parameters"].get("stop_mode").upper() alpha = lemon_cfg["parameters"].getfloat("alpha") mutate_logger = Logger() # pool = redis.ConnectionPool(host=lemon_cfg['redis']['host'], port=lemon_cfg['redis']['port'], # db=lemon_cfg['redis'].getint('redis_db')) # redis_conn = redis.Redis(connection_pool=pool) # for k in redis_conn.keys(): # if flags.exp in k.decode("utf-8"): # redis_conn.delete(k) # exp : like lenet5-mnist experiment_dir = os.path.join(flags.output_dir, flags.exp) mut_dir = os.path.join(experiment_dir, "mut_model") crash_dir = os.path.join(experiment_dir, "crash") nan_dir = os.path.join(experiment_dir, "nan") inner_output_dir = os.path.join(experiment_dir, "inner_output") metrics_result_dir = os.path.join(experiment_dir, "metrics_result") x, y = utils.DataUtils.get_data_by_exp(flags.exp) # 从conf文件中读取数据并转换形式 x_test, y_test = x[: flags.test_size], y[: flags.test_size] pool_size = lemon_cfg["parameters"].getint("pool_size") python_prefix = lemon_cfg["parameters"]["python_prefix"].rstrip("\\") try: # 执行算法 metrics_list = lemon_cfg["parameters"]["metrics"].split(" ") # D_MAD lemon_results = {k: dict() for k in metrics_list} lemon_results = _generate_and_predict( lemon_results, flags.model, flags.mutate_num, flags.mutate_op, flags.test_size, flags.exp, flags.backends, ) with open( "{}/{}_lemon_results.pkl".format(experiment_dir, flags.exp), "wb+" ) as f: pickle.dump(lemon_results, file=f) utils.MetricsUtils.generate_result_by_metrics( metrics_list, lemon_results, metrics_result_dir, flags.exp ) except Exception as e: mutate_logger.exception(sys.exc_info()) from keras import backend as K K.clear_session() endtime = datetime.datetime.now() time_delta = endtime - starttime h, m, s = utils.ToolUtils.get_HH_mm_ss(time_delta) mutate_logger.info( "Mutation process is done: Time used: {} hour,{} min,{} sec".format(h, m, s) )