123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227 |
- def pseudonymize(colname):
- if colname not in key:
- sha3 = hashlib.sha3_512()
- data = salt + colname
- sha3.update(data.encode('utf-8'))
- hexdigest = sha3.hexdigest()
- key[colname] = hexdigest
- return hexdigest
- else:
- return key[colname]
- import logging
- from typing import Tuple, Union, List
- import microdata_validator
- from job_executor.exception import BuilderStepError
- from job_executor.adapter import pseudonym_service
- from job_executor.model import Metadata
- logger = logging.getLogger()
- def _get_unit_types(
- metadata: Metadata
- ) -> Tuple[Union[str, None], Union[str, None]]:
- return (
- metadata.get_identifier_key_type_name(),
- metadata.get_measure_key_type_name()
- )
- def _pseudonymize_identifier_only(
- input_csv_path: str,
- unit_id_type: str,
- job_id: str
- ) -> str:
- unique_identifiers = set()
- with open(input_csv_path, newline='', encoding='utf8') as csv_file:
- for line in csv_file:
- unit_id = line.strip().split(';')[1]
- unique_identifiers.add(unit_id)
- identifier_to_pseudonym = pseudonym_service.pseudonymize(
- list(unique_identifiers), unit_id_type, job_id
- )
- output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
- target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
- with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
- for line in csv_file:
- row = line.strip().split(';')
- line_number: int = row[0]
- unit_id: str = row[1]
- value: str = row[2]
- start_date: str = row[3]
- stop_date: str = row[4]
- target_file.write(
- ';'.join([
- str(line_number),
- str(identifier_to_pseudonym[unit_id]),
- value,
- start_date, stop_date
- ]) + '\n'
- )
- target_file.close()
- return output_csv_path
- def _pseudonymize_measure_only(
- input_csv_path: str,
- unit_id_type: str,
- job_id: str
- ) -> str:
- unique_measure_values = set()
- with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
- for line in csv_file:
- value = line.strip().split(';')[2]
- unique_measure_values.add(value)
- value_to_pseudonym = pseudonym_service.pseudonymize(
- list(unique_measure_values), unit_id_type, job_id
- )
- output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
- target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
- with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
- for line in csv_file:
- row = line.strip().split(';')
- line_number: int = row[0]
- unit_id: str = row[1]
- value: str = row[2]
- start_date: str = row[3]
- stop_date: str = row[4]
- target_file.write(
- ';'.join([
- str(line_number),
- unit_id,
- str(value_to_pseudonym[value]),
- start_date, stop_date
- ]) + '\n'
- )
- target_file.close()
- return output_csv_path
- def _pseudonymize_identifier_and_measure(
- input_csv_path: str,
- identifier_unit_id_type: str,
- measure_unit_id_type: str,
- job_id: str
- ) -> str:
- unique_idents = set()
- unique_measure_values = set()
- with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
- for line in csv_file:
- row = line.strip().split(';')
- unit_id = row[1]
- value = row[2]
- unique_idents.add(unit_id)
- unique_measure_values.add(value)
- identifier_to_pseudonym = pseudonym_service.pseudonymize(
- list(unique_idents), identifier_unit_id_type, job_id
- )
- value_to_pseudonym = pseudonym_service.pseudonymize(
- list(unique_measure_values), measure_unit_id_type, job_id
- )
- output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
- target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
- with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
- for line in csv_file:
- row = line.strip().split(';')
- line_number: int = row[0]
- unit_id: str = row[1]
- value: str = row[2]
- start_date: str = row[3]
- stop_date: str = row[4]
- target_file.write(
- ';'.join([
- str(line_number),
- str(identifier_to_pseudonym[unit_id]),
- str(value_to_pseudonym[value]),
- start_date, stop_date
- ]) + '\n'
- )
- target_file.close()
- return output_csv_path
- def _pseudonymize_csv(
- input_csv_path: str,
- identifier_unit_id_type: Union[str, None],
- measure_unit_id_type: Union[str, None],
- job_id: str
- ) -> str:
- if identifier_unit_id_type and not measure_unit_id_type:
- logger.info('Pseudonymizing identifier')
- return _pseudonymize_identifier_only(
- input_csv_path, identifier_unit_id_type, job_id
- )
- elif measure_unit_id_type and not identifier_unit_id_type:
- logger.info('Pseudonymizing measure')
- return _pseudonymize_measure_only(
- input_csv_path, measure_unit_id_type, job_id
- )
- elif identifier_unit_id_type and measure_unit_id_type:
- logger.info('Pseudonymizing identifier and measure')
- return _pseudonymize_identifier_and_measure(
- input_csv_path,
- identifier_unit_id_type,
- measure_unit_id_type,
- job_id
- )
- else:
- logger.info('No pseudonymization')
- return input_csv_path
- def run(input_csv_path: str, metadata: Metadata, job_id: str) -> str:
- """
- Pseudonymizes the identifier column of the dataset. Requests pseudonyms
- from an external service and replaces all values in the identifier column.
- """
- try:
- logger.info(f'Pseudonymizing data {input_csv_path}')
- identifier_unit_type, measure_unit_type = (
- _get_unit_types(metadata)
- )
- identifier_unit_id_type = (
- None if identifier_unit_type is None
- else microdata_validator.get_unit_id_type_for_unit_type(
- identifier_unit_type
- )
- )
- measure_unit_id_type = (
- None if measure_unit_type is None
- else microdata_validator.get_unit_id_type_for_unit_type(
- measure_unit_type
- )
- )
- output_file = _pseudonymize_csv(
- input_csv_path,
- identifier_unit_id_type,
- measure_unit_id_type,
- job_id
- )
- logger.info(f'Pseudonymization step done {output_file}')
- return output_file
- except Exception as e:
- logger.error(f'Error during pseudonymization: {str(e)}')
- raise BuilderStepError('Failed to pseudonymize dataset') from e
- def pseudonymize_1(self, df, schema): #: list[list[str]]):
- """ Performs pseudonymization of the given dataframe based on the provided schema.
- For example, if the given df is for an entity called person,
- 2 dataframes will be returned, one called person that has hashed ids and masked fields,
- and one called person_lookup that contains the original person_id, person_id_pseudo,
- and the non-masked values for columns marked to be masked."""
- df_pseudo = df_lookup = df
- for col_name, dtype, op in schema:
- if op == "hash-no-lookup" or op == "hnl":
- # This means that the lookup can be performed against a different table so no lookup is needed.
- df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(self.salt)),
- 256)).withColumnRenamed(col_name,
- col_name + "_pseudonym")
- df_lookup = df_lookup.drop(col_name)
- elif op == "hash" or op == 'h':
- df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(self.salt)),
- 256)).withColumnRenamed(col_name,
- col_name + "_pseudonym")
- df_lookup = df_lookup.withColumn(col_name + "_pseudonym",
- F.sha2(F.concat(F.col(col_name), F.lit(self.salt)), 256))
- elif op == "mask" or op == 'm':
- df_pseudo = df_pseudo.withColumn(col_name, F.lit('*'))
- elif op == "partition-by":
- pass # make no changes for this column so that it will be in both dataframes and can be used for partitioning
- elif op == "no-op" or op == 'x':
- df_lookup = df_lookup.drop(col_name)
- df_pseudo = self.fix_column_names(df_pseudo)
- df_lookup = self.fix_column_names(df_lookup)
- return (df_pseudo, df_lookup)
- def pseudonymize_2(value, salt=SALT_KEY):
- """Pseudonymize value with salt, using HMAC-SHA256 encoding
- Parameters
- ----------
- value: value to be pseudonymized
- salt: hazard salt for additional protection
- Returns
- -------
- pseudonymized value using HMAC-SHA256
- """
- # NOTE: Here we must bypass empty or None value as
- # it will introduce specific hash value
- if value is None or value is np.nan or value == '':
- return None
- return hmac.new(
- key=salt.encode('utf-8'), # La clé
- msg=str(value).encode('utf-8'), # La donnée à pseudonymiser
- digestmod=hashlib.sha256 # La fonction de hash
- ).hexdigest() # L’encodage en hexadécimal
- def pseudonymize_row(row):
- """
- Replace some identifying information with others:
- - Fake name
- - Birthdate is replaced with the age
- """
- anonymized_row = row.copy()
- # using Faker (https://faker.readthedocs.io/en/master/), we generate fake names
- if anonymized_row['Gender'] == 'Female':
- anonymized_row['Fullname'] = faker.name_female()
- else:
- anonymized_row['Fullname'] = faker.name_male()
- del anonymized_row['Birthdate']
- birthdate = datetime.strptime(row['Birthdate'], '%Y-%m-%d')
- age = today.year - birthdate.year - ((today.month, today.day) < (birthdate.month, birthdate.day))
- anonymized_row['Age'] = age
- return anonymized_row
- def anonymize_one(self, column, delete: bool, pattern: AnonymizationPattern = None):
- if column is None:
- return Logger.log_none_type_error('column')
- Logger.log_info_table_manipulation_started(self.filename, f'Anonymize One ({column})')
- # delete column in every dataset if found
- error_count = 0
- if delete:
- for ds in self.datasets:
- out = ds.delete_column(column)
- if out < 1:
- error_count += 1
- self.remove_columnnames([column])
- else:
- # if column is not deleted: generate a value for column, random or by pattern
- if pattern is None:
- for ds in self.datasets:
- out = ds.set_columnvalue_random(column)
- if out < 1:
- error_count += 1
- else:
- for ds in self.datasets:
- out = ds.set_columnvalue_by_pattern(column, pattern)
- if out < 1:
- error_count += 1
- Logger.log_info_table_manipulation_finished(error_count)
- return error_count
- def pseudonymize_3(field):
- return sha256(field.encode() + get_seed(seed).encode()).hexdigest()[:20]
- def pseudonymize_columns(dataframe, cols,
- ps_key='test',
- api_key=SHARED_KEY):
- actions = [
- {"name": "pseudonymize-{}".format(c),
- "transform-value": {
- "key": c,
- "pseudonymize": {
- "method": "merengue",
- "key": ps_key,
- }
- }
- } for c in cols]
- items = dataframe.fillna('').T.to_dict()
- item_list = list(items.values())
- data = requests.post(
- 'https://api.kiprotect.com/v1/transform',
- data=json.dumps(
- {"actions": actions, "items": item_list},
- allow_nan=False),
- headers={
- 'Authorization': 'Bearer {}'.format(api_key)})
- return pd.DataFrame(data.json()['items'])
- def _parse_url_parts(self, tld_extractor: TLDExtract, url_str: str) -> dict:
- url = tld_extractor(url_str)
- parts = {}
- parts["scheme"] = self._find_first(r"^([a-z0-9]+)\:\/\/", url_str)
- parts["auth"] = self._find_first(r"(?:.*\/\/|^)(.*:.*)@.*", url_str)
- parts["domain"] = url.domain
- parts["subdomain"] = url.subdomain
- parts["suffix"] = url.suffix
- url_list = ".".join(list(url))
- parts["path"] = self._find_first(
- rf"(?:^[a-z0-9]+\:\/\/)?{url_list}(?:\:\d+)?([^#^\?]*).*", url_str
- )
- parts["query"] = self._find_first(r".*(\?\w+=[a-zA-Z0-9](?:&\w+=[a-zA-Z0-9]+)*).*", url_str)
- parts["fragment"] = self._find_first(r".*#(.*)", url_str)
- return parts
- def _pseudonymize_value(self, value: str, pseudonyms: list[dict]) -> str:
- hash_string = self._hasher.hash_str(value, salt=self._config.hash_salt)
- if self._cache.requires_storing(hash_string):
- encrypted_origin = self._encrypter.encrypt(value)
- pseudonyms.append({"pseudonym": hash_string, "origin": encrypted_origin})
- return self._wrap_hash(hash_string)
- def base64_method(data_path, columns):
- data = pd.read_csv(data_path)
- data.dropna()
- data.reset_index(drop=True, inplace=True)
- existing_columns = list(data)
- for column in columns:
- if column in existing_columns:
- data[column] = data[column].apply(str)
- data[column] = data[column].apply(lambda x: base64.b64encode(bytes(x, 'utf-8')))
- return data
- def pseudonymize_4(self, s):
- sl = len(s) / self.__byte
- return struct.unpack('<%dh' % sl, s)
- def _replace_name(item, value, field, dicom):
- sex = dicom.get("PatientSex")
- sex = {"F": "Female", "M": "Male", "O": "Other", "": "Unk"}[sex]
- age = Deider._round_to_nearest(parse_AS_as_int(dicom.get("PatientAge")), 5)
- return f"{sex} {age:03d}Y {dicom.get('Modality')}"
- def apply(config, val):
- """ Pseudonymize using format preserving encryption.
- Example config:
- {
- 'func': 'fpe',
- 'key': 'some-secret-key',
- 'alphabet': string.ascii_letters
- }
- """
- validate_func_params(config, MANDATORY_CONFIG_PARAMS)
- try:
- alphabet = config.get('alphabet', string.printable)
- e = pyffx.String(config['key'].encode("utf-8"), alphabet, length=len(val))
- return e.encrypt(val)
- except ValueError:
- raise PseudoFuncError("Could not pseudonymize '{0}'. Check alphabet compatibility ({1})".format(val, alphabet))
- def pseudonymize_6(text: str, tagger: SequenceTagger) -> Tuple[str, str]:
- """
- Perform the pseudonymization action and return both the tagged version (see function "tag_entities") and the pseudonymized version
- Args:
- text (str): the input text to pseudonymize
- tagger (SequenceTagger): the flair model for NER
- Returns:
- Tuple[str, str]: the original text with tags, and the pseudonymized text
- """
- with sw.timer("root"):
- text_sentences = [Sentence(t.strip()) for t in text.split("\n") if t.strip()]
- with sw.timer("model_annotation"):
- # inplace function
- tagger.predict(
- sentences=text_sentences,
- mini_batch_size=32,
- embedding_storage_mode="none",
- verbose=True,
- )
- return tag_entities(sentences=text_sentences)
- def get_replacement_stock() -> List[str]:
- """
- A list of faked names to replace the information you want to hide
- """
- stock = [f"{letter}..." for letter in ascii_uppercase] + [
- f"{a}{b}..." for a, b in list(itertools.combinations(ascii_uppercase, 2))
- ]
- random.shuffle(stock)
- return stock
- def apply_tagging_sentence(
- starts: List[int],
- ends: List[int],
- tags: List[str],
- entities: List[str],
- plain_text: str,
- replacement_dict: Dict[str, str],
- ) -> Tuple[str, str]:
- """
- Args:
- starts, ends, tags, entity texts of the entities found in the sentence + the text of the sentence + the prepared replacement dictionary for pseudo
- Returns:
- str, str: a text where the entities have a XML tag, and a text where entities have been pseudonymized
- """
- assert (
- len(starts) == len(ends) == len(tags) == len(entities)
- ), "Input lists mast be of the same length"
- shift_tags_start, shift_tags_end = 0, 0 # shift due to the add of tags
- shift_pseudo_start, shift_pseudo_end = 0, 0
- tagged_sentence, pseudo_sentence = plain_text, plain_text
- n_entities = len(starts)
- for i in range(n_entities):
- start, end, entity, tag = starts[i], ends[i], entities[i], tags[i]
- replacement = replacement_dict[entity]
- pseudo_sentence = (
- pseudo_sentence[: start + shift_pseudo_start]
- + replacement
- + pseudo_sentence[end + shift_pseudo_end:]
- )
- shift_pseudo_start += len(replacement) - (end - start)
- shift_pseudo_end += len(replacement) - (end - start)
- tagged_sentence = (
- tagged_sentence[: start + shift_tags_start]
- + "</a>"
- + f"<{tag}>"
- + plain_text[start:end]
- + f"</{tag}>"
- + "<a>"
- + tagged_sentence[end + shift_tags_end:]
- )
- shift_tags_start += (
- 5 + 6 + 3 + 4
- ) # 5 characters for tag <PER> (or LOC or ORG) + 6 for </PER> + 3 for <a> and 4 for </a>
- shift_tags_end += (
- 5 + 6 + 3 + 4
- ) # 5 characters for tag <PER> (or LOC or ORG) + 6 for </PER> + 3 for <a> and 4 for </a>
- tagged_sentence = "<a>" + tagged_sentence + "</a>"
- tagged_sentence = tagged_sentence.replace("<a></a>", "")
- return (
- f"<sentence>{tagged_sentence}</sentence>",
- pseudo_sentence,
- )
- def english_pseudo(text):
- anon = AnonymizerChain(Anonymization('en_US'))
- anon.add_anonymizers(EmailAnonymizer, NamedEntitiesAnonymizer('en_core_web_lg'))
- clean_text, patch = anon.pseudonymize(text)
- def pseudonymize_user_name(self, user_name: UserName) -> PseudoUserName:
- hasher = hashlib.sha256()
- hasher.update(user_name.encode('utf-8'))
- # salt
- hasher.update(b'\0')
- hasher.update(self.salt)
- pseudonymized = base64.b64encode(hasher.digest()).decode('utf-8')
- return PseudoUserName(pseudonymized)
- def parse_lines(text):
- lines = []
- for m in LINE_RE.finditer(text):
- ln = {"TIMESTAMP": parse_date(m.group(1).strip("\n").strip()),
- "SPEAKER": m.group(2).strip(),
- "MESSAGE": m.group(3).strip()}
- lines.append(ln)
- return
- def pseudonymize_7(graph: ProvDocument) -> ProvDocument:
- log.info(f"pseudonymize agents in {graph=}")
- # get all records except for agents and relations
- records = list(graph.get_records((ProvActivity, ProvEntity)))
- pseudonyms = dict()
- for agent in graph.get_records(ProvAgent):
- name = get_attribute(agent, USERNAME)
- mail = get_attribute(agent, USEREMAIL)
- if name is None:
- raise ValueError("ProvAgent representing a user has to have a name!")
- # hash name & mail if present
- namehash = hashlib.sha256(bytes(name, "utf-8")).hexdigest()
- mailhash = hashlib.sha256(bytes(mail, "utf-8")).hexdigest() if mail else None
- # create a new id as a pseudonym using the hashes
- pseudonym = qualified_name(f"User?name={namehash}&email={mailhash}")
- # map the old id to the pseudonym
- pseudonyms[agent.identifier] = pseudonym
- # keep only prov role & prov type
- # replace name & mail with hashes
- pseudonymized = pseudonymize_agent(
- agent,
- identifier=pseudonym,
- keep=[PROV_ROLE, PROV_TYPE],
- replace={USERNAME: namehash, USEREMAIL: mailhash},
- )
- # add pseudonymized agent to the list of records
- records.append(pseudonymized)
- # replace old id occurences with the pseudonymized id
- for relation in graph.get_records(ProvRelation):
- formal = [(key, pseudonyms.get(val, val)) for key, val in relation.formal_attributes]
- extra = [(key, pseudonyms.get(val, val)) for key, val in relation.extra_attributes]
- r_type = PROV_REC_CLS.get(relation.get_type())
- records.append(r_type(relation.bundle, relation.identifier, formal + extra))
- return graph_factory(records)
- def _make_sentence(self, tokens_left, tokens_right, seq_length=128):
- len_left = len(tokens_left)
- len_right = len(tokens_right)
- cut_len = len_left + len_right - (seq_length - 1)
- if cut_len > 0:
- cut_left = len_left - seq_length // 2
- cut_right = len_right - (seq_length - 1) // 2
- if cut_left < 0:
- cut_left, cut_right = 0, cut_left + cut_right
- elif cut_right < 0:
- cut_left, cut_right = cut_left + cut_right, 0
- else:
- cut_left, cut_right = 0, 0
- tokens_left = tokens_left[cut_left:]
- # tokens_right = tokens_right[:-cut_right]
- tokens_right = tokens_right[:len(tokens_right) - cut_right]
- tokens = tokens_left + [self.bert_tokenizer.mask_token] + tokens_right
- attention_mask = [1] * len(tokens_left) + [1] + [1] * len(tokens_right)
- if len(tokens) < seq_length:
- num_padding = seq_length - len(tokens)
- tokens += [self.bert_tokenizer.pad_token] * num_padding
- attention_mask += [0] * num_paddi
- def _random_word_context(self, text, max_trial=10):
- puncs = list("[]!\"#$%&'()*+,./:;<=>?@\^_`{|}~-")
- words = text.split()
- trial = 0
- done = False
- while trial < max_trial and not done:
- trial += 1
- w_idx = random.randint(0, len(words) - 1)
- word, left_res, right_res = words[w_idx], [], []
- # If the word is already in vocab, it's good to go.
- if len(word) >= self.min_word_len and \
- (word.lower() in self.dictionary) and \
- len(word) < DEFAULT_MAX_CHARACTER_POSITIONS - 4:
- done = True
- else:
- # Otherwise, detach puncs at the first and the last char, and check again
- if word[0] in puncs:
- word, left_res = word[1:], [word[0]]
- else:
- word, left_res = word, []
- if not word: continue # The word was just a punc
- if word[-1] in puncs:
- word, right_res = word[:-1], [word[-1]]
- else:
- word, right_res = word, []
- if len(word) < self.min_word_len or \
- (not word.lower() in self.dictionary) or \
- len(word) >= DEFAULT_MAX_CHARACTER_POSITIONS - 4:
- continue
- # Check whether it's anonymized field
- right_snip = ' '.join(words[w_idx + 1:w_idx + 5])
- if '**]' in right_snip and '[**' not in right_snip:
- continue
- left_snip = ' '.join(words[w_idx - 4:w_idx])
- if '[**' in left_snip and '**]' not in left_snip:
- continue
- # Pass!
- done = True
- if done:
- return word, ' '.join(words[:w_idx] + left_res), ' '.join(right_res + words[w_idx + 1:])
- else:
- raise ValueError('failed to choose word')
- def _random_word_context(self, text, max_trial=10):
- puncs = list("[]!\"#$%&'()*+,./:;<=>?@\^_`{|}~-")
- words = text.split()
- trial = 0
- done = False
- while trial < max_trial and not done:
- trial += 1
- w_idx = random.randint(0, len(words) - 1)
- word, left_res, right_res = words[w_idx], [], []
- # If the word is already in vocab, it's good to go.
- if len(word) >= self.min_word_len and \
- (word.lower() in self.dictionary) and \
- len(word) < DEFAULT_MAX_CHARACTER_POSITIONS - 4:
- done = True
- else:
- # Otherwise, detach puncs at the first and the last char, and check again
- if word[0] in puncs:
- word, left_res = word[1:], [word[0]]
- else:
- word, left_res = word, []
- if not word: continue # The word was just a punc
- if word[-1] in puncs:
- word, right_res = word[:-1], [word[-1]]
- else:
- word, right_res = word, []
- if len(word) < self.min_word_len or \
- (not word.lower() in self.dictionary) or \
- len(word) >= DEFAULT_MAX_CHARACTER_POSITIONS - 4:
- continue
- # Check whether it's anonymized field
- right_snip = ' '.join(words[w_idx + 1:w_idx + 5])
- if '**]' in right_snip and '[**' not in right_snip:
- continue
- left_snip = ' '.join(words[w_idx - 4:w_idx])
- if '[**' in left_snip and '**]' not in left_snip:
- continue
- # Pass!
- done = True
- if done:
- return word, ' '.join(words[:w_idx] + left_res), ' '.join(right_res + words[w_idx + 1:])
- else:
- raise ValueError('failed to choose word')
- def __next__(self):
- # Select next note (length >= 2000)
- while True:
- try:
- _, row = next(self.note_iterrows)
- except StopIteration:
- self._load_random_csv()
- _, row = next(self.note_iterrows)
- note_id = int(row.ROW_ID)
- note = row.TEXT.strip()
- # if len(note) >= 2000:
- # break
- if len(note) < 2000:
- continue
- try:
- correct, left, right = self._random_word_context(note)
- except:
- # import traceback; traceback.print_exc();
- continue
- break
- # Corrupt and pseudonymize
- correct = correct.lower()
- if random.uniform(0, 1) >= self.no_corruption_prob:
- typo = self.word_corrupter.corrupt_word(correct)
- else:
- typo = correct
- left = self.mimic_pseudo.pseudonymize(left)
- left = self._process_note(left)
- left = ' '.join(left.split(' ')[-128:])
- right = self.mimic_pseudo.pseudonymize(right)
- right = self._process_note(right)
- right = ' '.join(right.split(' ')[:128])
- # Parse
- temp_csv_row = [-1, note_id, typo, left, right, correct]
- # print(f'{self.csv_fname}({note_id}, {_}/{len(self.df_note)}): {correct} -> {typo}')
- example = self._parse_row(temp_csv_row)
- return example
- def pseudonymize_8(self, s):
- return struct.unpack(">" + ("I" * (len(s) / self.__stride)), s)
- def pseudonymize(field):
- return sha256(field.encode() + salt.encode()).hexdigest()[:16]
- def pseudonymize(
- self,
- original_text: str,
- presidio_response: List[RecognizerResult],
- count: int,
- ):
- """
- :param original_text: str containing the original text
- :param presidio_response: list of results from Presidio, to be used to know where entities are
- :param count: number of perturbations to return
- :return: List[str] with fake perturbations of original text
- """
- presidio_response = sorted(presidio_response, key=lambda resp: resp.start)
- anonymizer_engine = AnonymizerEngine()
- anonymized_result = anonymizer_engine.anonymize(
- text=original_text, analyzer_results=presidio_response
- )
- templated_text = anonymized_result.text
- templated_text = templated_text.replace(">", "}}").replace("<", "{{")
- fake_texts = [self.parse(templated_text, add_spans=False) for _ in range(count)]
- return fake_texts
- def pseudonymize(
- self, key_file: KeyFile, identifiers: List["Identifier"]
- ) -> List["Key"]:
- """Get a pseudonym for each identifier. If identifier is known in PIMS,
- return this. Otherwise, have PIMS generate a new pseudonym and return that.
- Parameters
- ----------
- identifiers: List[Identifier]
- The identifiers to get pseudonyms for
- key_file: KeyFile
- The key_file to use
- Notes
- -----
- Each call this function calls PIMS API twice for each unique source in
- identifiers. This is result of the way the API can be called
- Returns
- -------
- List[Key]
- The PIMS pseudonym for each identifier
- """
- keys = []
- # Each call to process a list of identifiers only allows a single source.
- # Split identifiers by source
- per_source = defaultdict(list)
- for x in identifiers:
- per_source[x.source].append(x)
- for source, items in per_source.items():
- keys = keys + self.deidentify(key_file, [x.value for x in items], source)
- return keys
- def pseudonymize(self, s):
- sl = len(s) / 2
- return struct.unpack('<%dh' % sl, s)
- def regex_anonymizer(self, text: str, regex: Pattern, provider: str) -> str:
- '''
- Anonymize all substring matching a specific regex using a Faker provider
- '''
- matchs = re.findall(regex, text)
- return self.replace_all(text, matchs, provider)
- def psdnmyz_2():
- # load TWO csv to be sent to be pseudonymz
- # metrics_df=pd.read_csv('/home/arasan/testrep/psmd/jureca/TOTAL_METRICS_Skel_header.csv')
- seg_df = pd.read_csv('/home/arasan/testrep/psmd/jureca/psmd_seg_vols.csv')
- # add rnadom id column to both df
- # below line is a disaster
- # metrics_df['RNDNAME'] = metrics_df['NAME'].apply(lambda x: gocept.pseudonymize.integer(x, 'secret'))
- # seg_df['RNDNAME'] = seg_df['NAME'].apply(lambda x: gocept.pseudonymize.integer(x, 'secret'))
- # a=np.random.randint(100000,999999,metrics_df.NAME.values.size)
- # metrics_df['RNDNAME']=a
- # print 'after rqndom id has been added'
- # flagg=True
- # while(flagg):
- # try:
- # print pd.concat(g for _, g in metrics_df.groupby("RNDNAME") if len(g) > 1)
- # except ValueError:
- # print 'NO DUPLICAtes'
- # metrics_df.to_csv('/home/arasan/testrep/psmd/jureca/TOTAL_rnd_temp.csv')
- # flagg=False
- # else:
- # print 'DUPES'
- # metrics_df=metrics_df.drop('RNDNAME', axis=1)
- # a=np.random.randint(100000,999999,metrics_df.NAME.values.size)
- # metrics_df['RNDNAME']=a
- # load double chekced randomeized df 1) above try catch 2) using np unique
- metrnd = pd.read_csv('/home/arasan/testrep/psmd/jureca/TOTAL_rnd_temp.csv')
- seg_df['SNO'] = seg_df.index + 1
- metrnd['SNO'] = seg_df.index + 1
- # add RNDAME column to seg_df
- seg_df['RNDNAME'] = metrnd.RNDNAME.values
- # rename columns NANME to ID and RNDNAME to NAME
- seg_df = seg_df.rename(index=str, columns={"NAME": "ID"})
- seg_df = seg_df.rename(index=str, columns={"RNDNAME": "NAME"})
- metrnd = metrnd.rename(index=str, columns={"NAME": "ID"})
- metrnd = metrnd.rename(index=str, columns={"RNDNAME": "NAME"})
- # dump map out with 3 columns ID,NAME,SNO
- mapdf = metrnd[['ID', 'NAME', 'SNO']]
- mapdf.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet2/psdnmyz_map.csv', index=False)
- # drop ID and SNO
- seg_df = seg_df.drop(['ID', 'SNO'], axis=1)
- metrnd = metrnd.drop(['ID', 'SNO'], axis=1)
- # move NAME column to first position
- metrnd = metrnd[['NAME', 'mean_skel_MD_LH_RH', 'sd_skel_MD_LH_RH', 'Pw90S_skel_MD_LH_RH', 'mean_skel_FA_LH_RH',
- 'sd_skel_FA_LH_RH', 'mean_skel_AD_LH_RH', 'sd_skel_AD_LH_RH', 'mean_skel_RD_LH_RH',
- 'sd_skel_RD_LH_RH']]
- seg_df = seg_df[['NAME', 'AGE', 'SEX', 'GMV', 'WMV', 'CSFV', 'ICV']]
- # if pd.concat(g for _, g in metrics_df.groupby("RNDNAME") if len(g) > 1).RNDNAME.values.size:
- # print 'NOT OK'
- # else:
- # print 'OK'
- metrnd.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet2/TOTAL_METRICS_Skel_header.csv', index=False)
- seg_df.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet2/psmd_seg_vols.csv', index=False)
- def psdnmyz_3():
- # load TWO csv to be sent to be pseudonymz
- # metrics_df=pd.read_csv('/home/arasan/testrep/psmd/jureca/TOTAL_METRICS_Skel_header.csv')
- seg_df = pd.read_csv('/home/arasan/testrep/psmd/jureca/psmd_seg2_vols.csv')
- # add rnadom id column to both df
- # below line is a disaster
- # metrics_df['RNDNAME'] = metrics_df['NAME'].apply(lambda x: gocept.pseudonymize.integer(x, 'secret'))
- # seg_df['RNDNAME'] = seg_df['NAME'].apply(lambda x: gocept.pseudonymize.integer(x, 'secret'))
- # a=np.random.randint(100000,999999,metrics_df.NAME.values.size)
- # metrics_df['RNDNAME']=a
- # print 'after rqndom id has been added'
- # flagg=True
- # while(flagg):
- # try:
- # print pd.concat(g for _, g in metrics_df.groupby("RNDNAME") if len(g) > 1)
- # except ValueError:
- # print 'NO DUPLICAtes'
- # metrics_df.to_csv('/home/arasan/testrep/psmd/jureca/TOTAL_rnd_temp.csv')
- # flagg=False
- # else:
- # print 'DUPES'
- # metrics_df=metrics_df.drop('RNDNAME', axis=1)
- # a=np.random.randint(100000,999999,metrics_df.NAME.values.size)
- # metrics_df['RNDNAME']=a
- # load double chekced randomeized df 1) above try catch 2) using np unique
- metrnd = pd.read_csv('/home/arasan/testrep/psmd/jureca/TOTAL_rnd_temp.csv')
- seg_df['SNO'] = seg_df.index + 1
- # metrnd['SNO']=seg_df.index+1
- # add RNDAME column to seg_df
- seg_df['RNDNAME'] = metrnd.RNDNAME.values
- # rename columns NANME to ID and RNDNAME to NAME
- # seg_df=seg_df.rename(index=str, columns={"NAME": "ID"})
- seg_df = seg_df.rename(index=str, columns={"RNDNAME": "NAME"})
- # metrnd=metrnd.rename(index=str, columns={"NAME": "ID"})
- # metrnd=metrnd.rename(index=str, columns={"RNDNAME": "NAME"})
- # dump map out with 3 columns ID,NAME,SNO
- # mapdf=metrnd[['ID','NAME','SNO']]
- # mapdf.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet2/psdnmyz_map.csv',index=False)
- # drop ID and SNO
- seg_df = seg_df.drop(['ID', 'SNO'], axis=1)
- # metrnd=metrnd.drop(['ID','SNO'],axis=1)
- # move NAME column to first position
- # metrnd=metrnd[['NAME','mean_skel_MD_LH_RH','sd_skel_MD_LH_RH','Pw90S_skel_MD_LH_RH','mean_skel_FA_LH_RH','sd_skel_FA_LH_RH','mean_skel_AD_LH_RH','sd_skel_AD_LH_RH','mean_skel_RD_LH_RH','sd_skel_RD_LH_RH']]
- seg_df = seg_df[['NAME', 'AGE', 'SEX', 'ICV']]
- # if pd.concat(g for _, g in metrics_df.groupby("RNDNAME") if len(g) > 1).RNDNAME.values.size:
- # print 'NOT OK'
- # else:
- # print 'OK'
- # metrnd.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet2/TOTAL_METRICS_Skel_header.csv',index=False)
- seg_df.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet3/psmd_seg2_vols.csv', index=False)
- def hashPseudonym(self, i, key, tile):
- digest = hashes.Hash(hashes.SHA256(), default_backend())
- # for i in range (0,len(plainTail)): # {
- _digest = digest.copy()
- # key = secrets.token_bytes(32)
- _digest.update(bytes(i))
- _digest.update(key)
- _digest.update(bytes(tile))
- p = _digest.finalize() # }
- # digest.finalize()
- return p
- def test_localization_of_pseudonym(self):
- name = b" a 16 byte name "
- target = b"PEP3 storage_facility"
- pp = pep3_pb2.Pseudonymizable(data=name,
- state=pep3_pb2.Pseudonymizable.UNENCRYPTED_NAME)
- self.collector.pseudonymize([pp])
- self.collector.relocalize([pp],
- self.config.collector.warrants.to_sf)
- sfp = elgamal.Triple.unpack(pp.data) \
- .decrypt(self.sf.private_keys['pseudonym'])
- pseudonym_secrets = {}
- for peer_secrets in self.secrets.peers.values():
- for shard, shard_secrets in peer_secrets.by_shard.items():
- pseudonym_secrets[shard] \
- = shard_secrets.pseudonym_component_secret
- s = 1
- e = ed25519.scalar_unpack(common.sha256(target))
- for secret in pseudonym_secrets.values():
- s *= pow(ed25519.scalar_unpack(secret), e, ed25519.l)
- s %= ed25519.l
- self.assertEqual(
- sfp * ed25519.scalar_inv(s),
- ed25519.Point.lizard(name))
- def test_store_and_retrieve(self):
- # first store a record with random source and target ip addresses,
- # and see if we can recover it.
- col_request = pep3_pb2.StoreRequest()
- col_request.id = os.urandom(16)
- flowrecord = col_request.records.add()
- flowrecord.source_ip.data = os.urandom(16)
- flowrecord.source_ip.state = pep3_pb2.Pseudonymizable.UNENCRYPTED_NAME
- flowrecord.destination_ip.data = os.urandom(16)
- flowrecord.destination_ip.state = \
- pep3_pb2.Pseudonymizable.UNENCRYPTED_NAME
- flowrecord.anonymous_part.number_of_bytes = 123
- flowrecord.anonymous_part.number_of_packets = 456
- updates = list(self.collector.connect_to('collector').Store(
- iter([col_request])))
- self.assertEqual(len(updates), 1)
- self.assertEqual(updates[0].stored_id, col_request.id)
- # store the same flowrecord twice, to see if that causes troubles
- col_request.id = os.urandom(16)
- updates = list(self.collector.connect_to('collector').Store(
- iter([col_request])))
- self.assertEqual(len(updates), 1)
- self.assertEqual(updates[0].stored_id, col_request.id)
- query = pep3_pb2.SqlQuery()
- # manually compute storage_facility-local pseudonyms for query
- sf_name = b"PEP3 storage_facility"
- pseudonym_secrets = {}
- for peer_secrets in self.secrets.peers.values():
- for shard, shard_secrets in peer_secrets.by_shard.items():
- pseudonym_secrets[shard] \
- = shard_secrets.pseudonym_component_secret
- s = 1
- e = ed25519.scalar_unpack(common.sha256(sf_name))
- for secret in pseudonym_secrets.values():
- s *= pow(ed25519.scalar_unpack(secret), e, ed25519.l)
- s %= ed25519.l
- # see if the record was stored correctly by querying the
- # database directly.
- query.query = """SELECT peped_flows.p_dst_ip FROM peped_flows
- WHERE peped_flows.p_src_ip=:ip"""
- ip = query.parameters['ip'].pseudonymizable_value
- ip.data = (ed25519.Point.lizard(
- flowrecord.source_ip.data) * s).pack()
- ip.state = pep3_pb2.Pseudonymizable.UNENCRYPTED_PSEUDONYM
- row = self.sf.connect_to('database') \
- .Query(query).next().rows[0]
- self.assertEqual(row.cells[0].pseudonymizable_value.data,
- (ed25519.Point.lizard(flowrecord.destination_ip.data) * s
- ).pack())
- # manually compute researcher-local pseudonyms for query
- researcher_name = b"PEP3 researcher"
- pseudonym_secrets = {}
- for peer_secrets in self.secrets.peers.values():
- for shard, shard_secrets in peer_secrets.by_shard.items():
- pseudonym_secrets[shard] \
- = shard_secrets.pseudonym_component_secret
- s = 1
- e = ed25519.scalar_unpack(common.sha256(researcher_name))
- for secret in pseudonym_secrets.values():
- s *= pow(ed25519.scalar_unpack(secret), e, ed25519.l)
- s %= ed25519.l
- # now query via the researcher
- query.parameters['ip'].pseudonymizable_value.data \
- = (ed25519.Point.lizard(flowrecord.source_ip.data) * s).pack()
- row = self.researcher.connect_to('researcher') \
- .Query(query).next().rows[0]
- self.assertEqual(row.cells[0].pseudonymizable_value.data,
- (ed25519.Point.lizard(flowrecord.destination_ip.data) * s
- ).pack())
- def test_depseudonymize(self):
- ip = os.urandom(16)
- # manually compute investigator-local pseudonym
- pseudonym_secrets = {}
- for peer_secrets in self.secrets.peers.values():
- for shard, shard_secrets in peer_secrets.by_shard.items():
- pseudonym_secrets[shard] \
- = shard_secrets.pseudonym_component_secret
- s = 1
- e = ed25519.scalar_unpack(common.sha256(b"PEP3 investigator"))
- for secret in pseudonym_secrets.values():
- s *= pow(ed25519.scalar_unpack(secret), e, ed25519.l)
- s %= ed25519.l
- investigator_local_ip = (ed25519.Point.lizard(ip) * s).pack()
- # manually create warrant
- warrant = pep3_pb2.DepseudonymizationRequest.Warrant()
- warrant.act.actor = b"PEP3 investigator"
- warrant.act.name.state = pep3_pb2.Pseudonymizable.UNENCRYPTED_PSEUDONYM
- warrant.act.name.data = investigator_local_ip
- self.investigator.encrypt([warrant.act.name],
- self.investigator.public_keys['pseudonym'])
- warrant.signature = crypto.sign(
- crypto.load_privatekey(crypto.FILETYPE_PEM,
- self.secrets.root_certificate_keys.warrants),
- warrant.act.SerializeToString(), 'sha256')
- result = self.investigator.connect_to("investigator") \
- .Depseudonymize(warrant)
- self.assertEqual(result.data, ip)
- def anonymize(cls, user, ldap_attrs, **kwargs):
- # type: (User, Dict[AnyStr, Any], **Any) -> Dict[AnyStr, AnyStr]
- """
- Change values of function arguments to anonymize/pseudonymize user if
- UCRV asm/attributes/<staff/student>/anonymize is true. Will return
- unchanged function arguments otherwise.
- :param User user: user object
- :param dict ldap_attrs: dictionary with the users LDAP attributes
- :return: dictionary with [modified] function arguments
- :rtype: dict
- :raises NotImplementedError: if cls.ucr_anonymize_key_base is unset
- """
- ucr = get_ucr()
- if ucr.is_true(cls.ucr_anonymize_key_base):
- for k, v in cls.anonymize_mapping().items():
- if v and v.startswith('%'):
- attr = v[1:].strip()
- try:
- v = ldap_attrs[attr][0]
- except KeyError:
- raise ValueError('Attribute {!r} not found in LDAP object of {}.'.format(attr, user))
- except IndexError:
- raise ValueError('Attribute {!r} empty in LDAP object of {}.'.format(attr, user))
- kwargs[k] = v
- return kwargs
- def _modify_dataset(
- self,
- anonymizer: Anonymizer,
- pseudonym: str,
- ds: Dataset,
- ) -> None:
- """Optionally pseudonymize an incoming dataset with the given pseudonym
- and add the trial ID and name to the DICOM header if specified."""
- if pseudonym:
- # All dates get pseudonymized, but we want to retain the study date.
- study_date = ds.StudyDate
- anonymizer.anonymize(ds)
- ds.StudyDate = study_date
- ds.PatientID = pseudonym
- ds.PatientName = pseudonym
- trial_protocol_id = (self.transfer_task.job.trial_protocol_id,)
- trial_protocol_name = self.transfer_task.job.trial_protocol_name
- if trial_protocol_id:
- ds.ClinicalTrialProtocolID = trial_protocol_id
- if trial_protocol_name:
- ds.ClinicalTrialProtocolName = trial_protocol_name
- if pseudonym and trial_protocol_id:
- session_id = f"{ds.StudyDate}-{ds.StudyTime}"
- ds.PatientComments = f"Project:{trial_protocol_id} Subject:{pseudonym} Session:{pseudonym}_{session_id}"
- def _psc1(psc1, psc2_from_psc1):
- if 'TEST' in psc1.upper():
- # skip test subjects
- logging.debug('skipping test subject "%s"', psc1)
- else:
- # find and skip subjects with invalid identifier
- if psc1[-3:] in {'FU2', 'FU3'}:
- psc1 = psc1[:-3]
- elif psc1[-2:] == 'SB':
- psc1 = psc1[:-2]
- if psc1 in psc2_from_psc1:
- return psc1
- elif psc1 in {'0x0000xxxxxx'}:
- logging.info('skipping known invalid subject identifier "%s"',
- psc1)
- else:
- logging.error('invalid subject identifier "%s"', psc1)
- return None
- def pseudonymize_node_name(name):
- """Replace Node.Name (detector ID) by a hash with secret key"""
- h = hashlib.md5((app.secret_key + name).encode('utf-8'))
- return 'node.' + h.hexdigest()[:6]
- def pseudonymize(self, size=None):
- """
- Return pseudonymized values for this attribute, which is used to
- substitute identifiable data with a reversible, consistent value.
- """
- size = size or self.size
- if size != self.size:
- attr = Series(np.random.choice(self.bins, size=size, p=self.prs))
- else:
- attr = self
- if self.categorical:
- mapping = {b: utils.pseudonymise_string(b) for b in self.bins}
- return attr.map(lambda x: mapping[x])
- if self.type == 'string':
- return attr.map(utils.pseudonymise_string)
- elif self.is_numerical or self.type == 'datetime':
- return attr.map(str).map(utils.pseudonymise_string)
- def pseudonymize(self, content):
- if not content: return content
- content_modified = ''
- start = 0
- for mo in re.finditer("\[\*\*[^\[]*\*\*\]", content):
- replacement = self.mapper.get_mapping(mo.group(0))
- content_modified += content[start: mo.start()]
- content_modified += replacement
- start = mo.end()
- if start < len(content):
- content_modified += content[start: len(content)]
- return content_modified
|