123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740 |
- import sys
- sys.path.append("../")
- from scripts.tools import utils
- import math
- from typing import *
- from scripts.mutation.mutation_utils import *
- from scripts.mutation.layer_matching import LayerMatching
- import random
- import os
- import warnings
- from scripts.logger.lemon_logger import Logger
- import datetime
- warnings.filterwarnings("ignore")
- os.environ["TF_CPP_MIN_LOG_LEVEL"] = '2' # 只显示 warning 和 Error
- os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
- os.environ["CUDA_VISIBLE_DEVICES"] = ""
- mylogger = Logger()
- def _assert_indices(mutated_layer_indices: List[int] , depth_layer: int):
- assert max(mutated_layer_indices) < depth_layer,"Max index should be less than layer depth"
- assert min(mutated_layer_indices) >= 0,"Min index should be greater than or equal to zero"
- def _shuffle_conv2d(weights, mutate_ratio):
- new_weights = []
- for val in weights:
- # val is bias if len(val.shape) == 1
- if len(val.shape) > 1:
- val_shape = val.shape
- filter_width, filter_height, num_of_input_channels, num_of_output_channels = val_shape
- mutate_output_channels = utils.ModelUtils.generate_permutation(num_of_output_channels, mutate_ratio)
- for output_channel in mutate_output_channels:
- copy_list = val.copy()
- copy_list = np.reshape(copy_list,(filter_width * filter_height * num_of_input_channels, num_of_output_channels))
- selected_list = copy_list[:,output_channel]
- shuffle_selected_list = utils.ModelUtils.shuffle(selected_list)
- copy_list[:, output_channel] = shuffle_selected_list
- val = np.reshape(copy_list,(filter_width, filter_height, num_of_input_channels, num_of_output_channels))
- new_weights.append(val)
- return new_weights
- def _shuffle_dense(weights,mutate_ratio):
- new_weights = []
- for val in weights:
- # val is bias if len(val.shape) == 1
- if len(val.shape) > 1:
- val_shape = val.shape
- input_dim,output_dim = val_shape
- mutate_output_dims = utils.ModelUtils.generate_permutation(output_dim, mutate_ratio)
- copy_list = val.copy()
- for output_dim in mutate_output_dims:
- selected_list = copy_list[:, output_dim]
- shuffle_selected_list = utils.ModelUtils.shuffle(selected_list)
- copy_list[:, output_dim] = shuffle_selected_list
- val = copy_list
- new_weights .append(val)
- return new_weights
- def _LA_model_scan(model, new_layers, mutated_layer_indices=None):
- layer_utils = LayerUtils()
- layers = model.layers
- # new layers can never be added after the last layer
- positions_to_add = np.arange(len(layers) - 1) if mutated_layer_indices is None else mutated_layer_indices
- _assert_indices(positions_to_add, len(layers))
- insertion_points = {}
- available_new_layers = [layer for layer in
- layer_utils.available_model_level_layers.keys()] if new_layers is None else new_layers
- for i, layer in enumerate(layers):
- if hasattr(layer, 'activation') and 'softmax' in layer.activation.__name__.lower():
- break
- if i in positions_to_add:
- for available_new_layer in available_new_layers:
- if layer_utils.is_input_legal[available_new_layer](layer.output.shape):
- if i not in insertion_points.keys():
- insertion_points[i] = [available_new_layer]
- else:
- insertion_points[i].append(available_new_layer)
- return insertion_points
- def _MLA_model_scan(model, new_layers, mutated_layer_indices=None):
- layer_matching = LayerMatching()
- layers = model.layers
- # new layers can never be added after the last layer
- positions_to_add = np.arange(len(layers) - 1) if mutated_layer_indices is None else mutated_layer_indices
- _assert_indices(positions_to_add, len(layers))
- insertion_points = {}
- available_new_layers = [layer for layer in layer_matching.layer_concats.keys()] if new_layers is None else new_layers
- for i, layer in enumerate(layers):
- if hasattr(layer, 'activation') and 'softmax' in layer.activation.__name__.lower():
- break
- if i in positions_to_add:
- for available_new_layer in available_new_layers:
- # print('{} test shape: {} as list: {}'.format(available_new_layer, layer.output.shape,
- # layer.output.shape.as_list()))
- if layer_matching.input_legal[available_new_layer](layer.output.shape):
- # print('shape {} can be inserted'. format(layer.output.shape))
- if i not in insertion_points.keys():
- insertion_points[i] = [available_new_layer]
- else:
- insertion_points[i].append(available_new_layer)
- return insertion_points
- def _LC_and_LR_scan(model, mutated_layer_indices):
- layers = model.layers
- # the last layer should not be copied or removed
- mutated_layer_indices = np.arange(len(layers) - 1) if mutated_layer_indices is None else mutated_layer_indices
- _assert_indices(mutated_layer_indices, len(layers))
- available_layer_indices = []
- for i, layer in enumerate(layers):
- if hasattr(layer, 'activation') and 'softmax' in layer.activation.__name__.lower():
- break
- if i in mutated_layer_indices:
- # InputLayer should not be copied or removed
- from keras.engine.input_layer import InputLayer
- if isinstance(layer, InputLayer):
- continue
- # layers with multiple input tensors can't be copied or removed
- if isinstance(layer.input, list) and len(layer.input) > 1:
- continue
- layer_input_shape = layer.input.shape.as_list()
- layer_output_shape = layer.output.shape.as_list()
- if layer_input_shape == layer_output_shape:
- available_layer_indices.append(i)
- np.random.shuffle(available_layer_indices)
- return available_layer_indices
- def _LS_scan(model):
- layers = model.layers
- shape_dict = {}
- for i,layer in enumerate(layers):
- if hasattr(layer, 'activation') and 'softmax' in layer.activation.__name__.lower():
- break
- if isinstance(layer.input, list) and len(layer.input) > 1:
- continue
- layer_input_shape = [str(i) for i in layer.input.shape.as_list()[1:]]
- layer_output_shape = [str(i) for i in layer.output.shape.as_list()[1:]]
- input_shape = "-".join(layer_input_shape)
- output_shape = "-".join(layer_output_shape)
- k = "+".join([input_shape,output_shape])
- if k not in shape_dict.keys():
- shape_dict[k] = [i]
- else:
- shape_dict[k].append(i)
- return shape_dict
- def GF_mut(model, mutation_ratio, distribution='normal', STD=0.1, lower_bound=None, upper_bound=None):
- valid_distributions = ['normal', 'uniform']
- assert distribution in valid_distributions, 'Distribution %s is not support.' % distribution
- if distribution == 'uniform' and (lower_bound is None or upper_bound is None):
- mylogger.error('Lower bound and Upper bound is required for uniform distribution.')
- raise ValueError('Lower bound and Upper bound is required for uniform distribution.')
- mylogger.info('copying model...')
- GF_model = utils.ModelUtils.model_copy(model, 'GF')
- mylogger.info('model copied')
- chosed_index = np.random.randint(0, len(GF_model.layers))
- layer = GF_model.layers[chosed_index]
- mylogger.info('executing mutation of {}'.format(layer.name))
- weights = layer.get_weights()
- new_weights = []
- for weight in weights:
- weight_shape = weight.shape
- weight_flat = weight.flatten()
- permu_num = math.floor(len(weight_flat) * mutation_ratio)
- permutation = np.random.permutation(len(weight_flat))[:permu_num]
- STD = math.sqrt(weight_flat.var()) * STD
- weight_flat[permutation] += np.random.normal(scale=STD, size=len(permutation))
- weight = weight_flat.reshape(weight_shape)
- new_weights.append(weight)
- layer.set_weights(new_weights)
- return GF_model
- def WS_mut(model, mutation_ratio, mutated_layer_indices=None):
- WS_model = utils.ModelUtils.model_copy(model, 'WS')
- layers = WS_model.layers
- depth_layer = len(layers)
- mutated_layer_indices = np.arange(depth_layer) if mutated_layer_indices is None else mutated_layer_indices
- if 0 < mutation_ratio <= 1.0:
- _assert_indices(mutated_layer_indices, depth_layer)
- np.random.shuffle(mutated_layer_indices)
- i = mutated_layer_indices[0]
- layer = layers[i]
- weights = layer.get_weights()
- layer_name = type(layer).__name__
- if layer_name == "Conv2D" and len(weights) != 0:
- layer.set_weights(_shuffle_conv2d(weights, mutation_ratio))
- elif layer_name == "Dense" and len(weights) != 0:
- layer.set_weights(_shuffle_dense(weights, mutation_ratio))
- else:
- pass
- else:
- mylogger.error("mutation_ratio or index are wrong")
- raise Exception("mutation_ratio or index are wrong")
- return WS_model
- def NEB_mut(model, mutation_ratio, mutated_layer_indices=None):
- NEB_model = utils.ModelUtils.model_copy(model, 'NEB')
- layers = NEB_model.layers
- mutated_layer_indices = np.arange(len(layers) - 1) if mutated_layer_indices is None else mutated_layer_indices
- if 0 < mutation_ratio <= 1.0:
- _assert_indices(mutated_layer_indices, len(layers))
- layer_utils = LayerUtils()
- np.random.shuffle(mutated_layer_indices)
- for i in mutated_layer_indices:
- layer = layers[i]
- # skip if layer is not in white list
- if not layer_utils.is_layer_in_weight_change_white_list(layer):
- continue
- weights = layer.get_weights()
- if len(weights) > 0:
- if isinstance(weights, list):
- # assert len(weights) == 2
- if len(weights) != 2:
- continue
- else:
- weights_w, weights_b = weights
- weights_w = weights_w.transpose()
- permutation = utils.ModelUtils.generate_permutation(weights_w.shape[0], mutation_ratio)
- weights_w[permutation] = np.zeros(weights_w[0].shape)
- weights_w = weights_w.transpose()
- weights_b[permutation] = 0
- weights = weights_w, weights_b
- layer.set_weights(weights)
- else:
- assert isinstance(weights, np.ndarray)
- weights_w = weights
- weights_w = weights_w.transpose()
- permutation = utils.ModelUtils.generate_permutation(weights_w.shape[0], mutation_ratio)
- weights_w[permutation] = np.zeros(weights_w[0].shape)
- weights_w = weights_w.transpose()
- weights = [weights_w]
- layer.set_weights(weights)
- break
- return NEB_model
- else:
- mylogger.error("mutation_ratio or index are wrong")
- raise Exception("mutation_ratio or index are wrong")
- def NAI_mut(model, mutation_ratio, mutated_layer_indices=None):
- NAI_model = utils.ModelUtils.model_copy(model, 'NAI')
- layers = NAI_model.layers
- mutated_layer_indices = np.arange(len(layers) - 1) if mutated_layer_indices is None else mutated_layer_indices
- if 0 < mutation_ratio <= 1.0:
- _assert_indices(mutated_layer_indices, len(layers))
- np.random.shuffle(mutated_layer_indices)
- layer_utils = LayerUtils()
- for i in mutated_layer_indices:
- layer = layers[i]
- if not layer_utils.is_layer_in_weight_change_white_list(layer):
- continue
- weights = layer.get_weights()
- if len(weights) > 0:
- if isinstance(weights, list):
- if len(weights) != 2:
- continue
- else:
- weights_w, weights_b = weights
- weights_w = weights_w.transpose()
- permutation = utils.ModelUtils.generate_permutation(weights_w.shape[0], mutation_ratio)
- # print(permutation)
- weights_w[permutation] *= -1
- weights_w = weights_w.transpose()
- weights_b[permutation] *= -1
- weights = weights_w, weights_b
- layer.set_weights(weights)
- else:
- weights_w = weights[0]
- weights_w = weights_w.transpose()
- permutation = utils.ModelUtils.generate_permutation(weights_w.shape[0], mutation_ratio)
- # print(permutation)
- weights_w[permutation] *= -1
- weights_w = weights_w.transpose()
- weights = [weights_w]
- layer.set_weights(weights)
- break
- else:
- mylogger.error("mutation_ratio or index are wrong")
- raise Exception("mutation_ratio or index are wrong")
- return NAI_model
- def NS_mut(model, mutated_layer_indices=None):
- NS_model = utils.ModelUtils.model_copy(model, 'NS')
- layers = NS_model.layers
- mutated_layer_indices = np.arange(len(layers) - 1) if mutated_layer_indices is None else mutated_layer_indices
- _assert_indices(mutated_layer_indices, len(layers))
- layer_utils = LayerUtils()
- for i in mutated_layer_indices:
- layer = layers[i]
- if not layer_utils.is_layer_in_weight_change_white_list(layer):
- continue
- weights = layer.get_weights()
- if len(weights) > 0:
- if isinstance(weights, list):
- if len(weights) != 2:
- continue
- weights_w, weights_b = weights
- weights_w = weights_w.transpose()
- if weights_w.shape[0] >= 2:
- permutation = np.random.permutation(weights_w.shape[0])[:2]
- weights_w[permutation[0]], weights_w[permutation[1]] = \
- weights_w[permutation[1]].copy(), weights_w[permutation[0]].copy()
- weights_w = weights_w.transpose()
- weights_b[permutation[0]], weights_b[permutation[1]] = \
- weights_b[permutation[1]].copy(), weights_b[permutation[0]].copy()
- weights = weights_w, weights_b
- layer.set_weights(weights)
- else:
- mylogger.warning("NS not used! One neuron can't be shuffle!")
- else:
- assert isinstance(weights, np.ndarray)
- weights_w = weights
- weights_w = weights_w.transpose()
- if weights_w.shape[0] >= 2:
- permutation = np.random.permutation(weights_w.shape[0])[:2]
- weights_w[permutation[0]], weights_w[permutation[1]] = \
- weights_w[permutation[1]].copy(), weights_w[permutation[0]].copy()
- weights_w = weights_w.transpose()
- weights = [weights_w]
- layer.set_weights(weights)
- else:
- mylogger.warning("NS not used! One neuron can't be shuffle!")
- break
- return NS_model
- def ARem_mut(model, mutated_layer_indices=None):
- ARem_model = utils.ModelUtils.model_copy(model, 'ARem')
- layers = ARem_model.layers
- # the activation of last layer should not be removed
- mutated_layer_indices = np.arange(len(layers) - 1) if mutated_layer_indices is None else mutated_layer_indices
- np.random.shuffle(mutated_layer_indices)
- _assert_indices(mutated_layer_indices, len(layers))
- for i in mutated_layer_indices:
- layer = layers[i]
- if hasattr(layer, 'activation') and 'softmax' not in layer.activation.__name__.lower():
- layer.activation = ActivationUtils.no_activation
- break
- return ARem_model
- def ARep_mut(model, new_activations=None, mutated_layer_indices=None):
- activation_utils = ActivationUtils()
- ARep_model = utils.ModelUtils.model_copy(model, 'ARep')
- layers = ARep_model.layers
- # the activation of last layer should not be replaced
- mutated_layer_indices = np.arange(len(layers) - 1) if mutated_layer_indices is None else mutated_layer_indices
- np.random.shuffle(mutated_layer_indices)
- _assert_indices(mutated_layer_indices, len(layers))
- for i in mutated_layer_indices:
- layer = layers[i]
- if hasattr(layer, 'activation') and 'softmax' not in layer.activation.__name__.lower():
- layer.activation = activation_utils.pick_activation_randomly(new_activations)
- break
- return ARep_model
- def LA_mut(model, new_layers=None, mutated_layer_indices=None):
- layer_utils = LayerUtils()
- if new_layers is not None:
- for layer in new_layers:
- if layer not in layer_utils.available_model_level_layers.keys():
- mylogger.error('Layer {} is not supported.'.format(layer))
- raise Exception('Layer {} is not supported.'.format(layer))
- LA_model = utils.ModelUtils.model_copy(model, 'LA')
- insertion_points = _LA_model_scan(LA_model, new_layers, mutated_layer_indices)
- if len(insertion_points.keys()) == 0:
- mylogger.warning('no appropriate layer to insert')
- return None
- for key in insertion_points.keys():
- mylogger.info('{} can be added after layer {} ({})'
- .format(insertion_points[key], key, type(model.layers[key])))
- layers_index_avaliable = list(insertion_points.keys())
- layer_index_to_insert = layers_index_avaliable[np.random.randint(0, len(layers_index_avaliable))]
- available_new_layers = insertion_points[layer_index_to_insert]
- layer_name_to_insert = available_new_layers[np.random.randint(0, len(available_new_layers))]
- mylogger.info('insert {} after {}'.format(layer_name_to_insert, LA_model.layers[layer_index_to_insert].name))
- # insert new layer
- if model.__class__.__name__ == 'Sequential':
- import keras
- new_model = keras.models.Sequential()
- for i, layer in enumerate(LA_model.layers):
- new_layer = LayerUtils.clone(layer)
- new_model.add(new_layer)
- if i == layer_index_to_insert:
- output_shape = layer.output_shape
- new_model.add(layer_utils.available_model_level_layers[layer_name_to_insert](output_shape))
- else:
- def layer_addition(x, layer):
- x = layer(x)
- output_shape = layer.output_shape
- new_layer = layer_utils.available_model_level_layers[layer_name_to_insert](output_shape)
- x = new_layer(x)
- return x
- new_model = utils.ModelUtils.functional_model_operation(LA_model, operation={LA_model.layers[layer_index_to_insert].name: layer_addition})
- assert len(new_model.layers) == len(model.layers) + 1
- tuples = []
- import time
- old_model_layers = {}
- for layer in model.layers:
- old_model_layers[layer.name] = layer
- new_model_layers = {}
- for layer in new_model.layers:
- layer_name = layer.name
- if layer_name.endswith('_copy_LA'):
- key = layer_name[:-8]
- else:
- key = layer_name
- new_model_layers[key] = layer
- for layer_name in old_model_layers.keys():
- layer_weights = old_model_layers[layer_name].get_weights()
- for sw, w in zip(new_model_layers[layer_name].weights, layer_weights):
- shape_sw = np.shape(sw)
- shape_w = np.shape(w)
- assert len(shape_sw) == len(shape_w)
- for i in range(len(shape_sw)):
- assert shape_sw[i] == shape_w[i], '{}'.format(layer_name)
- tuples.append((sw, w))
- import keras.backend as K
- K.batch_set_value(tuples)
- return new_model
- def MLA_mut(model, new_layers = None, mutated_layer_indices=None):
- # mutiple layers addition
- layer_matching = LayerMatching()
- if new_layers is not None:
- for layer in new_layers:
- if layer not in layer_matching.layer_concats.keys():
- raise Exception('Layer {} is not supported.'.format(layer))
- MLA_model = utils.ModelUtils.model_copy(model, 'MLA')
- insertion_points = _MLA_model_scan(model, new_layers, mutated_layer_indices)
- mylogger.info(insertion_points)
- if len(insertion_points.keys()) == 0:
- mylogger.warning('no appropriate layer to insert')
- return None
- for key in insertion_points.keys():
- mylogger.info('{} can be added after layer {} ({})'
- .format(insertion_points[key], key, type(model.layers[key])))
- # use logic: randomly select a new layer available to insert into the layer which can be inserted
- layers_index_avaliable = list(insertion_points.keys())
- # layer_index_to_insert = np.max([i for i in insertion_points.keys()])
- layer_index_to_insert = layers_index_avaliable[np.random.randint(0, len(layers_index_avaliable))]
- available_new_layers = insertion_points[layer_index_to_insert]
- layer_name_to_insert = available_new_layers[np.random.randint(0, len(available_new_layers))]
- mylogger.info('choose to insert {} after {}'.format(layer_name_to_insert, MLA_model.layers[layer_index_to_insert].name))
- # insert new layers
- if model.__class__.__name__ == 'Sequential':
- import keras
- new_model = keras.models.Sequential()
- for i, layer in enumerate(MLA_model.layers):
- new_layer = LayerUtils.clone(layer)
- # new_layer.name += "_copy"
- new_model.add(new_layer)
- if i == layer_index_to_insert:
- output_shape = layer.output.shape.as_list()
- layers_to_insert = layer_matching.layer_concats[layer_name_to_insert](output_shape)
- for layer_to_insert in layers_to_insert:
- layer_to_insert.name += "_insert"
- mylogger.info(layer_to_insert)
- new_model.add(layer_to_insert)
- new_model.build(MLA_model.input_shape)
- else:
- def layer_addition(x, layer):
- x = layer(x)
- output_shape = layer.output.shape.as_list()
- new_layers = layer_matching.layer_concats[layer_name_to_insert](output_shape)
- for l in new_layers:
- l.name += "_insert"
- mylogger.info('insert layer {}'.format(str(l)))
- x = l(x)
- return x
- new_model = utils.ModelUtils.functional_model_operation(MLA_model, operation={MLA_model.layers[layer_index_to_insert].name: layer_addition})
- tuples = []
- import time
- start_time = time.time()
- old_model_layers = {}
- for layer in model.layers:
- old_model_layers[layer.name] = layer
- new_model_layers = {}
- for layer in new_model.layers:
- layer_name = layer.name
- if layer_name.endswith('_copy_MLA'):
- key = layer_name[:-9]
- else:
- key = layer_name
- new_model_layers[key] = layer
- for layer_name in old_model_layers.keys():
- layer_weights = old_model_layers[layer_name].get_weights()
- for sw, w in zip(new_model_layers[layer_name].weights, layer_weights):
- shape_sw = np.shape(sw)
- shape_w = np.shape(w)
- assert len(shape_sw) == len(shape_w)
- assert shape_sw[0] == shape_w[0]
- tuples.append((sw, w))
- import keras.backend as K
- K.batch_set_value(tuples)
- end_time = time.time()
- print('set weight cost {}'.format(end_time - start_time))
- return new_model
- def LC_mut(model, mutated_layer_indices=None):
- LC_model = utils.ModelUtils.model_copy(model, 'LC')
- available_layer_indices = _LC_and_LR_scan(LC_model, mutated_layer_indices)
- if len(available_layer_indices) == 0:
- mylogger.warning('no appropriate layer to copy (input and output shape should be same)')
- return None
- # use logic: copy the last available layer
- copy_layer_index = available_layer_indices[-1]
- copy_layer_name = LC_model.layers[copy_layer_index].name + '_repeat'
- mylogger.info('choose to copy layer {}'.format(LC_model.layers[copy_layer_index].name))
- if model.__class__.__name__ == 'Sequential':
- import keras
- new_model = keras.models.Sequential()
- for i, layer in enumerate(LC_model.layers):
- new_model.add(LayerUtils.clone(layer))
- if i == copy_layer_index:
- copy_layer = LayerUtils.clone(layer)
- copy_layer.name += '_repeat'
- new_model.add(copy_layer)
- else:
- def layer_repeat(x, layer):
- x = layer(x)
- copy_layer = LayerUtils.clone(layer)
- copy_layer.name += '_repeat'
- x = copy_layer(x)
- return x
- new_model = utils.ModelUtils.functional_model_operation(LC_model, operation={LC_model.layers[copy_layer_index].name: layer_repeat})
- # update weights
- assert len(new_model.layers) == len(model.layers) + 1
- tuples = []
- old_model_layers = {}
- for layer in model.layers:
- old_model_layers[layer.name] = layer
- new_model_layers = {}
- for layer in new_model.layers:
- layer_name = layer.name
- if layer_name.endswith('_copy_LC'):
- key = layer_name[:-8]
- else:
- key = layer_name
- new_model_layers[key] = layer
- for layer_name in old_model_layers.keys():
- layer_weights = old_model_layers[layer_name].get_weights()
- if layer_name + '_copy_LC_repeat' == copy_layer_name:
- for sw, w in zip(new_model_layers[copy_layer_name].weights, layer_weights):
- shape_sw = np.shape(sw)
- shape_w = np.shape(w)
- assert len(shape_sw) == len(shape_w)
- assert shape_sw[0] == shape_w[0]
- tuples.append((sw, w))
- for sw, w in zip(new_model_layers[layer_name].weights, layer_weights):
- shape_sw = np.shape(sw)
- shape_w = np.shape(w)
- assert len(shape_sw) == len(shape_w)
- assert shape_sw[0] == shape_w[0]
- tuples.append((sw, w))
- import keras.backend as K
- K.batch_set_value(tuples)
- return new_model
- def LR_mut(model, mutated_layer_indices=None):
- LR_model = utils.ModelUtils.model_copy(model, 'LR')
- available_layer_indices = _LC_and_LR_scan(LR_model, mutated_layer_indices)
- if len(available_layer_indices) == 0:
- mylogger.warning('no appropriate layer to remove (input and output shape should be same)')
- return None
- # use logic: remove the last available layer
- remove_layer_index = available_layer_indices[-1]
- mylogger.info('choose to remove layer {}'.format(LR_model.layers[remove_layer_index].name))
- if model.__class__.__name__ == 'Sequential':
- import keras
- new_model = keras.models.Sequential()
- for i, layer in enumerate(LR_model.layers):
- if i != remove_layer_index:
- new_layer = LayerUtils.clone(layer)
- # new_layer.name += '_copy'
- new_model.add(new_layer)
- else:
- new_model = utils.ModelUtils.functional_model_operation(LR_model, operation={LR_model.layers[remove_layer_index].name: lambda x, layer: x})
- # update weights
- assert len(new_model.layers) == len(model.layers) - 1
- tuples = []
- old_model_layers = {}
- for layer in model.layers:
- old_model_layers[layer.name] = layer
- new_model_layers = {}
- for layer in new_model.layers:
- layer_name = layer.name
- if layer_name.endswith('_copy_LR'):
- key = layer_name[:-8]
- else:
- key = layer_name
- new_model_layers[key] = layer
- for layer_name in new_model_layers.keys():
- layer_weights = old_model_layers[layer_name].get_weights()
- for sw, w in zip(new_model_layers[layer_name].weights, layer_weights):
- shape_sw = np.shape(sw)
- shape_w = np.shape(w)
- assert len(shape_sw) == len(shape_w)
- assert shape_sw[0] == shape_w[0]
- tuples.append((sw, w))
- import keras.backend as K
- K.batch_set_value(tuples)
- return new_model
- def LS_mut(model):
- LS_model = utils.ModelUtils.model_copy(model,"LS")
- shape_dict = _LS_scan(LS_model)
- layers = LS_model.layers
- swap_list = []
- for v in shape_dict.values():
- if len(v) > 1:
- swap_list.append(v)
- if len(swap_list) == 0:
- mylogger.warning("No layers to swap!")
- return None
- swap_list = swap_list[random.randint(0, len(swap_list)-1)]
- choose_index = random.sample(swap_list, 2)
- mylogger.info('choose to swap {} ({} - {}) and {} ({} - {})'.format(layers[choose_index[0]].name,
- layers[choose_index[0]].input.shape,
- layers[choose_index[0]].output.shape,
- layers[choose_index[1]].name,
- layers[choose_index[1]].input.shape,
- layers[choose_index[1]].output.shape))
- if model.__class__.__name__ == 'Sequential':
- import keras
- new_model = keras.Sequential()
- for i, layer in enumerate(layers):
- if i == choose_index[0]:
- new_model.add(LayerUtils.clone(layers[choose_index[1]]))
- elif i == choose_index[1]:
- new_model.add(LayerUtils.clone(layers[choose_index[0]]))
- else:
- new_model.add(LayerUtils.clone(layer))
- else:
- layer_1 = layers[choose_index[0]]
- layer_2 = layers[choose_index[1]]
- new_model = utils.ModelUtils.functional_model_operation(LS_model, {layer_1.name: lambda x, layer: LayerUtils.clone(layer_2)(x),
- layer_2.name: lambda x, layer: LayerUtils.clone(layer_1)(x)})
- # update weights
- assert len(new_model.layers) == len(model.layers)
- tuples = []
- old_model_layers = {}
- for layer in model.layers:
- old_model_layers[layer.name] = layer
- new_model_layers = {}
- for layer in new_model.layers:
- layer_name = layer.name
- if layer_name.endswith('_copy_LS'):
- key = layer_name[:-8]
- else:
- key = layer_name
- new_model_layers[key] = layer
- for layer_name in old_model_layers.keys():
- layer_weights = old_model_layers[layer_name].get_weights()
- for sw, w in zip(new_model_layers[layer_name].weights, layer_weights):
- shape_sw = np.shape(sw)
- shape_w = np.shape(w)
- assert len(shape_sw) == len(shape_w)
- assert shape_sw[0] == shape_w[0]
- tuples.append((sw, w))
- import keras.backend as K
- K.batch_set_value(tuples)
- return new_model
- if __name__ == '__main__':
- pass
|