123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- # ga.py
- import numpy as np
- import random
- import fit_funcs
- from sampling import *
- class Individual:
- def __init__(self, dna, pop_seed_name):
- self.dna = dna
- self.dna_len = len(dna)
- self.pop_seed_name = pop_seed_name
- self.f = []
- self.choose_time = 0
- self.error_number = None
- self.list_er = []
- def __lt__(self, other):
- if self.f[0] > other.f[0]:
- return True
- return False
- def get_fitness(self, F, domain_flags, data_collection_para):
- # len_F = len(F)
- # fitness = np.zeros(shape=len_F)
- # for i in range(len_F):
- fitness, error_number, list_er = F(self.dna, self.pop_seed_name, self.error_number, self.choose_time,
- self.list_er, domain_flags, data_collection_para)
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", type(fitness))
- return fitness, error_number, list_er
- # def cross_newObj(self, p1, p2,iterate_time):
- # # 不能修改之前的,需要生成一个新的对象
- # p1_dna = p1.dna.copy()
- # p2_dna = p2.dna.copy()
- # cut_i = np.random.randint(1, self.dna_len - 1)
- # temp = p1_dna[cut_i:].copy()
- # p1_dna[cut_i:] = p2_dna[cut_i:]
- # p2_dna[cut_i:] = temp
- # cross_name=p1.pop_seed_name.split("_")
- # p1_name="seed_"+iterate_time+"_1_"+cross_name[3]
- # cross_name=p2.pop_seed_name.split("_")
- # p2_name="seed_"+iterate_time+"_2_"+cross_name[3]
- # q1 = individual(p1_dna,p1_name)
- # q2 = individual(p2_dna,p2_name)
- # return q1, q2
- def cross(self, p1, p2):
- cut_i = np.random.randint(1, self.dna_len - 1)
- temp = p1[cut_i:].copy()
- p1[cut_i:] = p2[cut_i:]
- p2[cut_i:] = temp
- return p1, p2
- # def mutation(self, domain):
- # point = np.random.randint(0, self.dna_len)
- # low = domain[point][0]
- # high = domain[point][1]
- # self.dna[point] = random.randint(low,high)
- # return self.dna
- # 实数编码,SBX交叉
- def cross_newObj(self, domain, p1, p2, iterate_time, m, n):
- eta_c = 10
- pcross_real = 1.0 # ori : 0.9
- # 如果随机概率大于交叉概率则不进行交叉操作
- p1_dna = p1.dna.copy()
- p2_dna = p2.dna.copy()
- if random.random() <= pcross_real:
- # 对两个个体执行SBX交叉操作
- for j in range(self.dna_len):
- # 判断是否对某自变量交叉
- if random.random() > 0.5:
- continue
- # 如果两个体某自变量相等则不操作
- if p1_dna[j] == p2_dna[j]:
- continue
- # 对某自变量交叉
- y1 = min(p1_dna[j], p2_dna[j])
- y2 = max(p1_dna[j], p2_dna[j])
- ylow = domain[j][0]
- yup = domain[j][1]
- r = random.random()
- beta = 1.0 + (2.0 * (y1 - ylow) / (y2 - y1))
- alpha = 2.0 - beta ** (-(eta_c + 1.0))
- if r <= (1.0 / alpha):
- betaq = (r * alpha) ** (1.0 / (eta_c + 1.0))
- else:
- betaq = (1.0 / (2.0 - r * alpha)) ** (1.0 / (eta_c + 1.0))
- child1 = 0.5 * ((y1 + y2) - betaq * (y2 - y1))
- beta = 1.0 + (2.0 * (yup - y2) / (y2 - y1))
- alpha = 2.0 - beta ** (-(eta_c + 1.0))
- if r <= (1.0 / alpha):
- betaq = (r * alpha) ** (1.0 / (eta_c + 1.0))
- else:
- betaq = (1.0 / (2.0 - r * alpha)) ** (1.0 / (eta_c + 1.0))
- child2 = 0.5 * ((y1 + y2) - betaq * (y2 - y1))
- if str(child1) == 'nan':
- child1 = 0
- if str(child2) == 'nan':
- child2 = 0
- child1 = min(max(child1, ylow), yup)
- child2 = min(max(child2, ylow), yup)
- p1_dna[j] = child1
- p2_dna[j] = child2
- cross_name = p1.pop_seed_name.split("_")
- p1_name = "seed_" + iterate_time + "_" + str(m) + "_" + cross_name[3]
- cross_name = p2.pop_seed_name.split("_")
- p2_name = "seed_" + iterate_time + "_" + str(n) + "_" + cross_name[3]
- q1 = Individual(p1_dna, p1_name)
- q2 = Individual(p2_dna, p2_name)
- return q1, q2
- def mutation(self, domain, pm):
- eta_m = 5
- for j in range(self.dna_len):
- r = random.random()
- # 对个体某变量进行变异
- if r <= pm:
- y = self.dna[j]
- ylow = domain[j][0]
- yup = domain[j][1]
- delta1 = 1.0 * (y - ylow) / (yup - ylow)
- delta2 = 1.0 * (yup - y) / (yup - ylow)
- # delta=min(delta1, delta2)
- r = random.random()
- mut_pow = 1.0 / (eta_m + 1.0)
- if r <= 0.5:
- xy = 1.0 - delta1
- val = 2.0 * r + (1.0 - 2.0 * r) * (xy ** (eta_m + 1.0))
- deltaq = val ** mut_pow - 1.0
- else:
- xy = 1.0 - delta2
- val = 2.0 * (1.0 - r) + 2.0 * (r - 0.5) * (xy ** (eta_m + 1.0))
- deltaq = 1.0 - val ** mut_pow
- y = y + deltaq * (yup - ylow)
- y = min(yup, max(y, ylow))
- self.dna[j] = y
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", self.dna)
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", "77777777777777777777777777777")
- class ga:
- def __init__(self, pri_queue_size, pop_size, dna_size, pc, pm, f_funcs, domain, domain_flags):
- self.pri_queue_size = pri_queue_size
- self.pop_size = pop_size
- self.dna_size = dna_size
- # self.f_size = len(f_funcs)
- self.pc = pc
- self.pm = pm
- self.f_funcs = f_funcs
- self.domain = domain
- self.domain_flags = domain_flags
- def ini_pop(self, if_sampling):
- rseed = 8.35
- random.seed(rseed)
- if not if_sampling:
- # 24x8 随机参数矩阵 初始种群24x可变参数8
- P_dna = np.random.random_integers(0, 0, size=(self.pop_size, self.dna_size))
- for i in range(self.pop_size):
- for j in range(len(self.domain)):
- item = self.domain[j]
- P_dna[i, j] = random.randint(item[0], item[1])
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", P_dna)
- P = [] # 初始种群P\
- Seed_ini = []
- for j in range(self.pop_size):
- seed = "seed_0_" + str(j + 1) + "_" + str(int(j / 4) + 1) + ".xosc"
- Seed_ini.append(seed)
- for m in range(self.pop_size):
- P.append(Individual(P_dna[m], Seed_ini[m]))
- else:
- # 72x8
- P_dna = np.random.random_integers(0, 0, size=(3 * self.pop_size, self.dna_size))
- # print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", P_dna)
- for i in range(3 * self.pop_size):
- for j in range(len(self.domain)):
- item = self.domain[j]
- P_dna[i, j] = random.randint(item[0], item[1])
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", P_dna)
- P = [] # 初始种群P\
- Seed_ini = []
- Sample_predict = []
- choose = [0, 0, 0, 0, 0, 0]
- choose_result = []
- for j in range(3 * self.pop_size):
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", "Sampling # ", j)
- # 前25个 前12:0_j+1_4 后12:0_j+1_5 0_25_6
- if int(j / 12) + 4 <= 6:
- num = int(j / 12) + 4
- seed = "seed_0_" + str(j + 1) + "_" + str(num) + ".xosc"
- # 26-72个 0_26_1 0_27_1 ... 0_49_2 0_50_2... 0_61_
- else:
- num = int((j-36)/ 12 + 1)
- seed = "seed_0_" + str(j + 1) + "_" + str(num) + ".xosc"
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", "Generate seed_name: ", seed)
- #seed = "seed_0_" + str(j + 1) + "_" + str(int(j / 12) + 1) + ".xosc"
- Seed_ini.append(seed)
- cal_number = sample_simulate(P_dna[j, 0], P_dna[j, 1], P_dna[j, 2], P_dna[j, 3], P_dna[j, 4],
- P_dna[j, 5], P_dna[j, 6], P_dna[j, 7], seed, self.domain_flags)
- Sample_predict.append(cal_number)
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", "*******************")
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", "VGG error num:"+str(cal_number))
- with open('r_list.csv', 'a+', encoding='utf-8') as l: # save entropy
- cw = csv.writer(l)
- cw.writerow([seed, P_dna[j], cal_number])
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", Sample_predict)
- Sample_predict = np.array(Sample_predict)
- # 加负号后按从大到小排序
- result = np.argsort(-Sample_predict)
- # result为错误数排序后的下标索引数组
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", "Sample_predict result:", result)
- for m in range(3 * self.pop_size): # select top 4 seed of every scene( 4 * 6 ) from 72 seeds
- # 12应该为pop_size * 3 / seed_num
- loc = int(result[m] / 12)
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", "loc:", loc)
- # 4应该为pop_size / seed_num
- if (choose[loc] < 4):
- choose[loc] = choose[loc] + 1
- choose_result.append(result[m])
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", "Sample choose_result:", choose_result)
- for m in range(self.pop_size):
- # print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", P_dna[m])
- # print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", Seed_ini[m])
- P.append(Individual(P_dna[choose_result[m]], Seed_ini[choose_result[m]]))
- return P
- # def calculate_pop_fitness(self, pop, total):
- # # 计算Pop适应度,存储到individual中
- # normalization = []
- # #for p in pop:
- # # if p.f == None:
- # # p.f, p.error_number, p.list_er = p.get_fitness(self.f_funcs, total)
- # for p in pop:
- # p.f, p.error_number, p.list_er = p.get_fitness(self.f_funcs, total)
- # print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", p.f)
- # return pop
- def run_sim(self, R,
- data_collection_para): # data_collection_para : [is_new_seed(for entropy), is_err_collection(collect err or not), err_type(collect normal(1)/sampling(2)/random data(3))]
- for p in R:
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", "Run sim index: # ", R.index(p))
- p.f, p.error_number, p.list_er = p.get_fitness(self.f_funcs, self.domain_flags, data_collection_para)
- print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", p.f, '\n')
- def select(self, R):
- R.sort()
- choose_number = [0, 0]
- # print("[" + os.path.basename(__file__) + ", Line " + str(sys._getframe().f_lineno) + ", " + sys._getframe().f_code.co_name + "] ", len(R))
- pop = []
- for i in range(2):
- randomnumber1 = random.randint(0, self.pop_size - 1)
- randomnumber2 = random.randint(0, self.pop_size - 1)
- randomnumber3 = random.randint(0, self.pop_size - 1)
- randomnumber4 = random.randint(0, self.pop_size - 1)
- if R[randomnumber1].f > R[randomnumber2].f:
- pop.append(R[randomnumber1])
- choose_number[0] = randomnumber1
- else:
- pop.append(R[randomnumber2])
- choose_number[0] = randomnumber2
- if R[randomnumber3].f > R[randomnumber4].f:
- pop.append(R[randomnumber3])
- choose_number[1] = randomnumber3
- else:
- pop.append(R[randomnumber4])
- choose_number[1] = randomnumber4
- R[choose_number[0]].choose_time = R[choose_number[0]].choose_time + 1
- R[choose_number[1]].choose_time = R[choose_number[1]].choose_time + 1
- return pop, R
- def pop_cross(self, P, iterate_time):
- new_Q = []
- i = 0
- P_len = len(P)
- while i < P_len:
- # q1, q2 = P[i].cross_newObj(P[i], P[i + 1],str(iterate_time))
- q1, q2 = P[i].cross_newObj(self.domain, P[i], P[i + 1], str(iterate_time), (i + 1), (i + 2))
- new_Q.append(q1)
- new_Q.append(q2)
- i += 2
- return new_Q
- def pop_mutation(self, Q):
- for q in Q:
- q.mutation(self.domain, self.pm)
- return Q
- def calulate_pop(self, R, pri_queue_size):
- R.sort()
- return R[:pri_queue_size]
- def calculate_pop_fitness(self, R):
- fit_funcs.cal_fitness(R)
- normalization = []
- for r in R:
- normalization.append(r.f[2])
- max_val = max(normalization)
- min_val = min(normalization)
- minus = max_val - min_val if max_val - min_val != 0 else 1
- for r in R:
- r.f[0] += r.f[1] + (r.f[2] - min_val) / minus
|