def pseudonymize(colname):
if colname not in key:
sha3 = hashlib.sha3_512()
data = salt + colname
sha3.update(data.encode('utf-8'))
hexdigest = sha3.hexdigest()
key[colname] = hexdigest
return hexdigest
else:
return key[colname]
import logging
from typing import Tuple, Union, List
import microdata_validator
from job_executor.exception import BuilderStepError
from job_executor.adapter import pseudonym_service
from job_executor.model import Metadata
logger = logging.getLogger()
def _get_unit_types(
metadata: Metadata
) -> Tuple[Union[str, None], Union[str, None]]:
return (
metadata.get_identifier_key_type_name(),
metadata.get_measure_key_type_name()
)
def _pseudonymize_identifier_only(
input_csv_path: str,
unit_id_type: str,
job_id: str
) -> str:
unique_identifiers = set()
with open(input_csv_path, newline='', encoding='utf8') as csv_file:
for line in csv_file:
unit_id = line.strip().split(';')[1]
unique_identifiers.add(unit_id)
identifier_to_pseudonym = pseudonym_service.pseudonymize(
list(unique_identifiers), unit_id_type, job_id
)
output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
for line in csv_file:
row = line.strip().split(';')
line_number: int = row[0]
unit_id: str = row[1]
value: str = row[2]
start_date: str = row[3]
stop_date: str = row[4]
target_file.write(
';'.join([
str(line_number),
str(identifier_to_pseudonym[unit_id]),
value,
start_date, stop_date
]) + '\n'
)
target_file.close()
return output_csv_path
def _pseudonymize_measure_only(
input_csv_path: str,
unit_id_type: str,
job_id: str
) -> str:
unique_measure_values = set()
with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
for line in csv_file:
value = line.strip().split(';')[2]
unique_measure_values.add(value)
value_to_pseudonym = pseudonym_service.pseudonymize(
list(unique_measure_values), unit_id_type, job_id
)
output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
for line in csv_file:
row = line.strip().split(';')
line_number: int = row[0]
unit_id: str = row[1]
value: str = row[2]
start_date: str = row[3]
stop_date: str = row[4]
target_file.write(
';'.join([
str(line_number),
unit_id,
str(value_to_pseudonym[value]),
start_date, stop_date
]) + '\n'
)
target_file.close()
return output_csv_path
def _pseudonymize_identifier_and_measure(
input_csv_path: str,
identifier_unit_id_type: str,
measure_unit_id_type: str,
job_id: str
) -> str:
unique_idents = set()
unique_measure_values = set()
with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
for line in csv_file:
row = line.strip().split(';')
unit_id = row[1]
value = row[2]
unique_idents.add(unit_id)
unique_measure_values.add(value)
identifier_to_pseudonym = pseudonym_service.pseudonymize(
list(unique_idents), identifier_unit_id_type, job_id
)
value_to_pseudonym = pseudonym_service.pseudonymize(
list(unique_measure_values), measure_unit_id_type, job_id
)
output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
for line in csv_file:
row = line.strip().split(';')
line_number: int = row[0]
unit_id: str = row[1]
value: str = row[2]
start_date: str = row[3]
stop_date: str = row[4]
target_file.write(
';'.join([
str(line_number),
str(identifier_to_pseudonym[unit_id]),
str(value_to_pseudonym[value]),
start_date, stop_date
]) + '\n'
)
target_file.close()
return output_csv_path
def _pseudonymize_csv(
input_csv_path: str,
identifier_unit_id_type: Union[str, None],
measure_unit_id_type: Union[str, None],
job_id: str
) -> str:
if identifier_unit_id_type and not measure_unit_id_type:
logger.info('Pseudonymizing identifier')
return _pseudonymize_identifier_only(
input_csv_path, identifier_unit_id_type, job_id
)
elif measure_unit_id_type and not identifier_unit_id_type:
logger.info('Pseudonymizing measure')
return _pseudonymize_measure_only(
input_csv_path, measure_unit_id_type, job_id
)
elif identifier_unit_id_type and measure_unit_id_type:
logger.info('Pseudonymizing identifier and measure')
return _pseudonymize_identifier_and_measure(
input_csv_path,
identifier_unit_id_type,
measure_unit_id_type,
job_id
)
else:
logger.info('No pseudonymization')
return input_csv_path
def run(input_csv_path: str, metadata: Metadata, job_id: str) -> str:
"""
Pseudonymizes the identifier column of the dataset. Requests pseudonyms
from an external service and replaces all values in the identifier column.
"""
try:
logger.info(f'Pseudonymizing data {input_csv_path}')
identifier_unit_type, measure_unit_type = (
_get_unit_types(metadata)
)
identifier_unit_id_type = (
None if identifier_unit_type is None
else microdata_validator.get_unit_id_type_for_unit_type(
identifier_unit_type
)
)
measure_unit_id_type = (
None if measure_unit_type is None
else microdata_validator.get_unit_id_type_for_unit_type(
measure_unit_type
)
)
output_file = _pseudonymize_csv(
input_csv_path,
identifier_unit_id_type,
measure_unit_id_type,
job_id
)
logger.info(f'Pseudonymization step done {output_file}')
return output_file
except Exception as e:
logger.error(f'Error during pseudonymization: {str(e)}')
raise BuilderStepError('Failed to pseudonymize dataset') from e
def pseudonymize_1(self, df, schema): #: list[list[str]]):
""" Performs pseudonymization of the given dataframe based on the provided schema.
For example, if the given df is for an entity called person,
2 dataframes will be returned, one called person that has hashed ids and masked fields,
and one called person_lookup that contains the original person_id, person_id_pseudo,
and the non-masked values for columns marked to be masked."""
df_pseudo = df_lookup = df
for col_name, dtype, op in schema:
if op == "hash-no-lookup" or op == "hnl":
# This means that the lookup can be performed against a different table so no lookup is needed.
df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(self.salt)),
256)).withColumnRenamed(col_name,
col_name + "_pseudonym")
df_lookup = df_lookup.drop(col_name)
elif op == "hash" or op == 'h':
df_pseudo = df_pseudo.withColumn(col_name, F.sha2(F.concat(F.col(col_name), F.lit(self.salt)),
256)).withColumnRenamed(col_name,
col_name + "_pseudonym")
df_lookup = df_lookup.withColumn(col_name + "_pseudonym",
F.sha2(F.concat(F.col(col_name), F.lit(self.salt)), 256))
elif op == "mask" or op == 'm':
df_pseudo = df_pseudo.withColumn(col_name, F.lit('*'))
elif op == "partition-by":
pass # make no changes for this column so that it will be in both dataframes and can be used for partitioning
elif op == "no-op" or op == 'x':
df_lookup = df_lookup.drop(col_name)
df_pseudo = self.fix_column_names(df_pseudo)
df_lookup = self.fix_column_names(df_lookup)
return (df_pseudo, df_lookup)
def pseudonymize_2(value, salt=SALT_KEY):
"""Pseudonymize value with salt, using HMAC-SHA256 encoding
Parameters
----------
value: value to be pseudonymized
salt: hazard salt for additional protection
Returns
-------
pseudonymized value using HMAC-SHA256
"""
# NOTE: Here we must bypass empty or None value as
# it will introduce specific hash value
if value is None or value is np.nan or value == '':
return None
return hmac.new(
key=salt.encode('utf-8'), # La clé
msg=str(value).encode('utf-8'), # La donnée à pseudonymiser
digestmod=hashlib.sha256 # La fonction de hash
).hexdigest() # L’encodage en hexadécimal
def pseudonymize_row(row):
"""
Replace some identifying information with others:
- Fake name
- Birthdate is replaced with the age
"""
anonymized_row = row.copy()
# using Faker (https://faker.readthedocs.io/en/master/), we generate fake names
if anonymized_row['Gender'] == 'Female':
anonymized_row['Fullname'] = faker.name_female()
else:
anonymized_row['Fullname'] = faker.name_male()
del anonymized_row['Birthdate']
birthdate = datetime.strptime(row['Birthdate'], '%Y-%m-%d')
age = today.year - birthdate.year - ((today.month, today.day) < (birthdate.month, birthdate.day))
anonymized_row['Age'] = age
return anonymized_row
def anonymize_one(self, column, delete: bool, pattern: AnonymizationPattern = None):
if column is None:
return Logger.log_none_type_error('column')
Logger.log_info_table_manipulation_started(self.filename, f'Anonymize One ({column})')
# delete column in every dataset if found
error_count = 0
if delete:
for ds in self.datasets:
out = ds.delete_column(column)
if out < 1:
error_count += 1
self.remove_columnnames([column])
else:
# if column is not deleted: generate a value for column, random or by pattern
if pattern is None:
for ds in self.datasets:
out = ds.set_columnvalue_random(column)
if out < 1:
error_count += 1
else:
for ds in self.datasets:
out = ds.set_columnvalue_by_pattern(column, pattern)
if out < 1:
error_count += 1
Logger.log_info_table_manipulation_finished(error_count)
return error_count
def pseudonymize_3(field):
return sha256(field.encode() + get_seed(seed).encode()).hexdigest()[:20]
def pseudonymize_columns(dataframe, cols,
ps_key='test',
api_key=SHARED_KEY):
actions = [
{"name": "pseudonymize-{}".format(c),
"transform-value": {
"key": c,
"pseudonymize": {
"method": "merengue",
"key": ps_key,
}
}
} for c in cols]
items = dataframe.fillna('').T.to_dict()
item_list = list(items.values())
data = requests.post(
'https://api.kiprotect.com/v1/transform',
data=json.dumps(
{"actions": actions, "items": item_list},
allow_nan=False),
headers={
'Authorization': 'Bearer {}'.format(api_key)})
return pd.DataFrame(data.json()['items'])
def _parse_url_parts(self, tld_extractor: TLDExtract, url_str: str) -> dict:
url = tld_extractor(url_str)
parts = {}
parts["scheme"] = self._find_first(r"^([a-z0-9]+)\:\/\/", url_str)
parts["auth"] = self._find_first(r"(?:.*\/\/|^)(.*:.*)@.*", url_str)
parts["domain"] = url.domain
parts["subdomain"] = url.subdomain
parts["suffix"] = url.suffix
url_list = ".".join(list(url))
parts["path"] = self._find_first(
rf"(?:^[a-z0-9]+\:\/\/)?{url_list}(?:\:\d+)?([^#^\?]*).*", url_str
)
parts["query"] = self._find_first(r".*(\?\w+=[a-zA-Z0-9](?:&\w+=[a-zA-Z0-9]+)*).*", url_str)
parts["fragment"] = self._find_first(r".*#(.*)", url_str)
return parts
def _pseudonymize_value(self, value: str, pseudonyms: list[dict]) -> str:
hash_string = self._hasher.hash_str(value, salt=self._config.hash_salt)
if self._cache.requires_storing(hash_string):
encrypted_origin = self._encrypter.encrypt(value)
pseudonyms.append({"pseudonym": hash_string, "origin": encrypted_origin})
return self._wrap_hash(hash_string)
def base64_method(data_path, columns):
data = pd.read_csv(data_path)
data.dropna()
data.reset_index(drop=True, inplace=True)
existing_columns = list(data)
for column in columns:
if column in existing_columns:
data[column] = data[column].apply(str)
data[column] = data[column].apply(lambda x: base64.b64encode(bytes(x, 'utf-8')))
return data
def pseudonymize_4(self, s):
sl = len(s) / self.__byte
return struct.unpack('<%dh' % sl, s)
def _replace_name(item, value, field, dicom):
sex = dicom.get("PatientSex")
sex = {"F": "Female", "M": "Male", "O": "Other", "": "Unk"}[sex]
age = Deider._round_to_nearest(parse_AS_as_int(dicom.get("PatientAge")), 5)
return f"{sex} {age:03d}Y {dicom.get('Modality')}"
def apply(config, val):
""" Pseudonymize using format preserving encryption.
Example config:
{
'func': 'fpe',
'key': 'some-secret-key',
'alphabet': string.ascii_letters
}
"""
validate_func_params(config, MANDATORY_CONFIG_PARAMS)
try:
alphabet = config.get('alphabet', string.printable)
e = pyffx.String(config['key'].encode("utf-8"), alphabet, length=len(val))
return e.encrypt(val)
except ValueError:
raise PseudoFuncError("Could not pseudonymize '{0}'. Check alphabet compatibility ({1})".format(val, alphabet))
def pseudonymize_6(text: str, tagger: SequenceTagger) -> Tuple[str, str]:
"""
Perform the pseudonymization action and return both the tagged version (see function "tag_entities") and the pseudonymized version
Args:
text (str): the input text to pseudonymize
tagger (SequenceTagger): the flair model for NER
Returns:
Tuple[str, str]: the original text with tags, and the pseudonymized text
"""
with sw.timer("root"):
text_sentences = [Sentence(t.strip()) for t in text.split("\n") if t.strip()]
with sw.timer("model_annotation"):
# inplace function
tagger.predict(
sentences=text_sentences,
mini_batch_size=32,
embedding_storage_mode="none",
verbose=True,
)
return tag_entities(sentences=text_sentences)
def get_replacement_stock() -> List[str]:
"""
A list of faked names to replace the information you want to hide
"""
stock = [f"{letter}..." for letter in ascii_uppercase] + [
f"{a}{b}..." for a, b in list(itertools.combinations(ascii_uppercase, 2))
]
random.shuffle(stock)
return stock
def apply_tagging_sentence(
starts: List[int],
ends: List[int],
tags: List[str],
entities: List[str],
plain_text: str,
replacement_dict: Dict[str, str],
) -> Tuple[str, str]:
"""
Args:
starts, ends, tags, entity texts of the entities found in the sentence + the text of the sentence + the prepared replacement dictionary for pseudo
Returns:
str, str: a text where the entities have a XML tag, and a text where entities have been pseudonymized
"""
assert (
len(starts) == len(ends) == len(tags) == len(entities)
), "Input lists mast be of the same length"
shift_tags_start, shift_tags_end = 0, 0 # shift due to the add of tags
shift_pseudo_start, shift_pseudo_end = 0, 0
tagged_sentence, pseudo_sentence = plain_text, plain_text
n_entities = len(starts)
for i in range(n_entities):
start, end, entity, tag = starts[i], ends[i], entities[i], tags[i]
replacement = replacement_dict[entity]
pseudo_sentence = (
pseudo_sentence[: start + shift_pseudo_start]
+ replacement
+ pseudo_sentence[end + shift_pseudo_end:]
)
shift_pseudo_start += len(replacement) - (end - start)
shift_pseudo_end += len(replacement) - (end - start)
tagged_sentence = (
tagged_sentence[: start + shift_tags_start]
+ ""
+ f"<{tag}>"
+ plain_text[start:end]
+ f"{tag}>"
+ ""
+ tagged_sentence[end + shift_tags_end:]
)
shift_tags_start += (
5 + 6 + 3 + 4
) # 5 characters for tag (or LOC or ORG) + 6 for + 3 for and 4 for
shift_tags_end += (
5 + 6 + 3 + 4
) # 5 characters for tag (or LOC or ORG) + 6 for + 3 for and 4 for
tagged_sentence = "" + tagged_sentence + ""
tagged_sentence = tagged_sentence.replace("", "")
return (
f"{tagged_sentence}",
pseudo_sentence,
)
def english_pseudo(text):
anon = AnonymizerChain(Anonymization('en_US'))
anon.add_anonymizers(EmailAnonymizer, NamedEntitiesAnonymizer('en_core_web_lg'))
clean_text, patch = anon.pseudonymize(text)
def pseudonymize_user_name(self, user_name: UserName) -> PseudoUserName:
hasher = hashlib.sha256()
hasher.update(user_name.encode('utf-8'))
# salt
hasher.update(b'\0')
hasher.update(self.salt)
pseudonymized = base64.b64encode(hasher.digest()).decode('utf-8')
return PseudoUserName(pseudonymized)
def parse_lines(text):
lines = []
for m in LINE_RE.finditer(text):
ln = {"TIMESTAMP": parse_date(m.group(1).strip("\n").strip()),
"SPEAKER": m.group(2).strip(),
"MESSAGE": m.group(3).strip()}
lines.append(ln)
return
def pseudonymize_7(graph: ProvDocument) -> ProvDocument:
log.info(f"pseudonymize agents in {graph=}")
# get all records except for agents and relations
records = list(graph.get_records((ProvActivity, ProvEntity)))
pseudonyms = dict()
for agent in graph.get_records(ProvAgent):
name = get_attribute(agent, USERNAME)
mail = get_attribute(agent, USEREMAIL)
if name is None:
raise ValueError("ProvAgent representing a user has to have a name!")
# hash name & mail if present
namehash = hashlib.sha256(bytes(name, "utf-8")).hexdigest()
mailhash = hashlib.sha256(bytes(mail, "utf-8")).hexdigest() if mail else None
# create a new id as a pseudonym using the hashes
pseudonym = qualified_name(f"User?name={namehash}&email={mailhash}")
# map the old id to the pseudonym
pseudonyms[agent.identifier] = pseudonym
# keep only prov role & prov type
# replace name & mail with hashes
pseudonymized = pseudonymize_agent(
agent,
identifier=pseudonym,
keep=[PROV_ROLE, PROV_TYPE],
replace={USERNAME: namehash, USEREMAIL: mailhash},
)
# add pseudonymized agent to the list of records
records.append(pseudonymized)
# replace old id occurences with the pseudonymized id
for relation in graph.get_records(ProvRelation):
formal = [(key, pseudonyms.get(val, val)) for key, val in relation.formal_attributes]
extra = [(key, pseudonyms.get(val, val)) for key, val in relation.extra_attributes]
r_type = PROV_REC_CLS.get(relation.get_type())
records.append(r_type(relation.bundle, relation.identifier, formal + extra))
return graph_factory(records)
def _make_sentence(self, tokens_left, tokens_right, seq_length=128):
len_left = len(tokens_left)
len_right = len(tokens_right)
cut_len = len_left + len_right - (seq_length - 1)
if cut_len > 0:
cut_left = len_left - seq_length // 2
cut_right = len_right - (seq_length - 1) // 2
if cut_left < 0:
cut_left, cut_right = 0, cut_left + cut_right
elif cut_right < 0:
cut_left, cut_right = cut_left + cut_right, 0
else:
cut_left, cut_right = 0, 0
tokens_left = tokens_left[cut_left:]
# tokens_right = tokens_right[:-cut_right]
tokens_right = tokens_right[:len(tokens_right) - cut_right]
tokens = tokens_left + [self.bert_tokenizer.mask_token] + tokens_right
attention_mask = [1] * len(tokens_left) + [1] + [1] * len(tokens_right)
if len(tokens) < seq_length:
num_padding = seq_length - len(tokens)
tokens += [self.bert_tokenizer.pad_token] * num_padding
attention_mask += [0] * num_paddi
def _random_word_context(self, text, max_trial=10):
puncs = list("[]!\"#$%&'()*+,./:;<=>?@\^_`{|}~-")
words = text.split()
trial = 0
done = False
while trial < max_trial and not done:
trial += 1
w_idx = random.randint(0, len(words) - 1)
word, left_res, right_res = words[w_idx], [], []
# If the word is already in vocab, it's good to go.
if len(word) >= self.min_word_len and \
(word.lower() in self.dictionary) and \
len(word) < DEFAULT_MAX_CHARACTER_POSITIONS - 4:
done = True
else:
# Otherwise, detach puncs at the first and the last char, and check again
if word[0] in puncs:
word, left_res = word[1:], [word[0]]
else:
word, left_res = word, []
if not word: continue # The word was just a punc
if word[-1] in puncs:
word, right_res = word[:-1], [word[-1]]
else:
word, right_res = word, []
if len(word) < self.min_word_len or \
(not word.lower() in self.dictionary) or \
len(word) >= DEFAULT_MAX_CHARACTER_POSITIONS - 4:
continue
# Check whether it's anonymized field
right_snip = ' '.join(words[w_idx + 1:w_idx + 5])
if '**]' in right_snip and '[**' not in right_snip:
continue
left_snip = ' '.join(words[w_idx - 4:w_idx])
if '[**' in left_snip and '**]' not in left_snip:
continue
# Pass!
done = True
if done:
return word, ' '.join(words[:w_idx] + left_res), ' '.join(right_res + words[w_idx + 1:])
else:
raise ValueError('failed to choose word')
def _random_word_context(self, text, max_trial=10):
puncs = list("[]!\"#$%&'()*+,./:;<=>?@\^_`{|}~-")
words = text.split()
trial = 0
done = False
while trial < max_trial and not done:
trial += 1
w_idx = random.randint(0, len(words) - 1)
word, left_res, right_res = words[w_idx], [], []
# If the word is already in vocab, it's good to go.
if len(word) >= self.min_word_len and \
(word.lower() in self.dictionary) and \
len(word) < DEFAULT_MAX_CHARACTER_POSITIONS - 4:
done = True
else:
# Otherwise, detach puncs at the first and the last char, and check again
if word[0] in puncs:
word, left_res = word[1:], [word[0]]
else:
word, left_res = word, []
if not word: continue # The word was just a punc
if word[-1] in puncs:
word, right_res = word[:-1], [word[-1]]
else:
word, right_res = word, []
if len(word) < self.min_word_len or \
(not word.lower() in self.dictionary) or \
len(word) >= DEFAULT_MAX_CHARACTER_POSITIONS - 4:
continue
# Check whether it's anonymized field
right_snip = ' '.join(words[w_idx + 1:w_idx + 5])
if '**]' in right_snip and '[**' not in right_snip:
continue
left_snip = ' '.join(words[w_idx - 4:w_idx])
if '[**' in left_snip and '**]' not in left_snip:
continue
# Pass!
done = True
if done:
return word, ' '.join(words[:w_idx] + left_res), ' '.join(right_res + words[w_idx + 1:])
else:
raise ValueError('failed to choose word')
def __next__(self):
# Select next note (length >= 2000)
while True:
try:
_, row = next(self.note_iterrows)
except StopIteration:
self._load_random_csv()
_, row = next(self.note_iterrows)
note_id = int(row.ROW_ID)
note = row.TEXT.strip()
# if len(note) >= 2000:
# break
if len(note) < 2000:
continue
try:
correct, left, right = self._random_word_context(note)
except:
# import traceback; traceback.print_exc();
continue
break
# Corrupt and pseudonymize
correct = correct.lower()
if random.uniform(0, 1) >= self.no_corruption_prob:
typo = self.word_corrupter.corrupt_word(correct)
else:
typo = correct
left = self.mimic_pseudo.pseudonymize(left)
left = self._process_note(left)
left = ' '.join(left.split(' ')[-128:])
right = self.mimic_pseudo.pseudonymize(right)
right = self._process_note(right)
right = ' '.join(right.split(' ')[:128])
# Parse
temp_csv_row = [-1, note_id, typo, left, right, correct]
# print(f'{self.csv_fname}({note_id}, {_}/{len(self.df_note)}): {correct} -> {typo}')
example = self._parse_row(temp_csv_row)
return example
def pseudonymize_8(self, s):
return struct.unpack(">" + ("I" * (len(s) / self.__stride)), s)
def pseudonymize(field):
return sha256(field.encode() + salt.encode()).hexdigest()[:16]
def pseudonymize(
self,
original_text: str,
presidio_response: List[RecognizerResult],
count: int,
):
"""
:param original_text: str containing the original text
:param presidio_response: list of results from Presidio, to be used to know where entities are
:param count: number of perturbations to return
:return: List[str] with fake perturbations of original text
"""
presidio_response = sorted(presidio_response, key=lambda resp: resp.start)
anonymizer_engine = AnonymizerEngine()
anonymized_result = anonymizer_engine.anonymize(
text=original_text, analyzer_results=presidio_response
)
templated_text = anonymized_result.text
templated_text = templated_text.replace(">", "}}").replace("<", "{{")
fake_texts = [self.parse(templated_text, add_spans=False) for _ in range(count)]
return fake_texts
def pseudonymize(
self, key_file: KeyFile, identifiers: List["Identifier"]
) -> List["Key"]:
"""Get a pseudonym for each identifier. If identifier is known in PIMS,
return this. Otherwise, have PIMS generate a new pseudonym and return that.
Parameters
----------
identifiers: List[Identifier]
The identifiers to get pseudonyms for
key_file: KeyFile
The key_file to use
Notes
-----
Each call this function calls PIMS API twice for each unique source in
identifiers. This is result of the way the API can be called
Returns
-------
List[Key]
The PIMS pseudonym for each identifier
"""
keys = []
# Each call to process a list of identifiers only allows a single source.
# Split identifiers by source
per_source = defaultdict(list)
for x in identifiers:
per_source[x.source].append(x)
for source, items in per_source.items():
keys = keys + self.deidentify(key_file, [x.value for x in items], source)
return keys
def pseudonymize(self, s):
sl = len(s) / 2
return struct.unpack('<%dh' % sl, s)
def regex_anonymizer(self, text: str, regex: Pattern, provider: str) -> str:
'''
Anonymize all substring matching a specific regex using a Faker provider
'''
matchs = re.findall(regex, text)
return self.replace_all(text, matchs, provider)
def psdnmyz_2():
# load TWO csv to be sent to be pseudonymz
# metrics_df=pd.read_csv('/home/arasan/testrep/psmd/jureca/TOTAL_METRICS_Skel_header.csv')
seg_df = pd.read_csv('/home/arasan/testrep/psmd/jureca/psmd_seg_vols.csv')
# add rnadom id column to both df
# below line is a disaster
# metrics_df['RNDNAME'] = metrics_df['NAME'].apply(lambda x: gocept.pseudonymize.integer(x, 'secret'))
# seg_df['RNDNAME'] = seg_df['NAME'].apply(lambda x: gocept.pseudonymize.integer(x, 'secret'))
# a=np.random.randint(100000,999999,metrics_df.NAME.values.size)
# metrics_df['RNDNAME']=a
# print 'after rqndom id has been added'
# flagg=True
# while(flagg):
# try:
# print pd.concat(g for _, g in metrics_df.groupby("RNDNAME") if len(g) > 1)
# except ValueError:
# print 'NO DUPLICAtes'
# metrics_df.to_csv('/home/arasan/testrep/psmd/jureca/TOTAL_rnd_temp.csv')
# flagg=False
# else:
# print 'DUPES'
# metrics_df=metrics_df.drop('RNDNAME', axis=1)
# a=np.random.randint(100000,999999,metrics_df.NAME.values.size)
# metrics_df['RNDNAME']=a
# load double chekced randomeized df 1) above try catch 2) using np unique
metrnd = pd.read_csv('/home/arasan/testrep/psmd/jureca/TOTAL_rnd_temp.csv')
seg_df['SNO'] = seg_df.index + 1
metrnd['SNO'] = seg_df.index + 1
# add RNDAME column to seg_df
seg_df['RNDNAME'] = metrnd.RNDNAME.values
# rename columns NANME to ID and RNDNAME to NAME
seg_df = seg_df.rename(index=str, columns={"NAME": "ID"})
seg_df = seg_df.rename(index=str, columns={"RNDNAME": "NAME"})
metrnd = metrnd.rename(index=str, columns={"NAME": "ID"})
metrnd = metrnd.rename(index=str, columns={"RNDNAME": "NAME"})
# dump map out with 3 columns ID,NAME,SNO
mapdf = metrnd[['ID', 'NAME', 'SNO']]
mapdf.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet2/psdnmyz_map.csv', index=False)
# drop ID and SNO
seg_df = seg_df.drop(['ID', 'SNO'], axis=1)
metrnd = metrnd.drop(['ID', 'SNO'], axis=1)
# move NAME column to first position
metrnd = metrnd[['NAME', 'mean_skel_MD_LH_RH', 'sd_skel_MD_LH_RH', 'Pw90S_skel_MD_LH_RH', 'mean_skel_FA_LH_RH',
'sd_skel_FA_LH_RH', 'mean_skel_AD_LH_RH', 'sd_skel_AD_LH_RH', 'mean_skel_RD_LH_RH',
'sd_skel_RD_LH_RH']]
seg_df = seg_df[['NAME', 'AGE', 'SEX', 'GMV', 'WMV', 'CSFV', 'ICV']]
# if pd.concat(g for _, g in metrics_df.groupby("RNDNAME") if len(g) > 1).RNDNAME.values.size:
# print 'NOT OK'
# else:
# print 'OK'
metrnd.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet2/TOTAL_METRICS_Skel_header.csv', index=False)
seg_df.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet2/psmd_seg_vols.csv', index=False)
def psdnmyz_3():
# load TWO csv to be sent to be pseudonymz
# metrics_df=pd.read_csv('/home/arasan/testrep/psmd/jureca/TOTAL_METRICS_Skel_header.csv')
seg_df = pd.read_csv('/home/arasan/testrep/psmd/jureca/psmd_seg2_vols.csv')
# add rnadom id column to both df
# below line is a disaster
# metrics_df['RNDNAME'] = metrics_df['NAME'].apply(lambda x: gocept.pseudonymize.integer(x, 'secret'))
# seg_df['RNDNAME'] = seg_df['NAME'].apply(lambda x: gocept.pseudonymize.integer(x, 'secret'))
# a=np.random.randint(100000,999999,metrics_df.NAME.values.size)
# metrics_df['RNDNAME']=a
# print 'after rqndom id has been added'
# flagg=True
# while(flagg):
# try:
# print pd.concat(g for _, g in metrics_df.groupby("RNDNAME") if len(g) > 1)
# except ValueError:
# print 'NO DUPLICAtes'
# metrics_df.to_csv('/home/arasan/testrep/psmd/jureca/TOTAL_rnd_temp.csv')
# flagg=False
# else:
# print 'DUPES'
# metrics_df=metrics_df.drop('RNDNAME', axis=1)
# a=np.random.randint(100000,999999,metrics_df.NAME.values.size)
# metrics_df['RNDNAME']=a
# load double chekced randomeized df 1) above try catch 2) using np unique
metrnd = pd.read_csv('/home/arasan/testrep/psmd/jureca/TOTAL_rnd_temp.csv')
seg_df['SNO'] = seg_df.index + 1
# metrnd['SNO']=seg_df.index+1
# add RNDAME column to seg_df
seg_df['RNDNAME'] = metrnd.RNDNAME.values
# rename columns NANME to ID and RNDNAME to NAME
# seg_df=seg_df.rename(index=str, columns={"NAME": "ID"})
seg_df = seg_df.rename(index=str, columns={"RNDNAME": "NAME"})
# metrnd=metrnd.rename(index=str, columns={"NAME": "ID"})
# metrnd=metrnd.rename(index=str, columns={"RNDNAME": "NAME"})
# dump map out with 3 columns ID,NAME,SNO
# mapdf=metrnd[['ID','NAME','SNO']]
# mapdf.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet2/psdnmyz_map.csv',index=False)
# drop ID and SNO
seg_df = seg_df.drop(['ID', 'SNO'], axis=1)
# metrnd=metrnd.drop(['ID','SNO'],axis=1)
# move NAME column to first position
# metrnd=metrnd[['NAME','mean_skel_MD_LH_RH','sd_skel_MD_LH_RH','Pw90S_skel_MD_LH_RH','mean_skel_FA_LH_RH','sd_skel_FA_LH_RH','mean_skel_AD_LH_RH','sd_skel_AD_LH_RH','mean_skel_RD_LH_RH','sd_skel_RD_LH_RH']]
seg_df = seg_df[['NAME', 'AGE', 'SEX', 'ICV']]
# if pd.concat(g for _, g in metrics_df.groupby("RNDNAME") if len(g) > 1).RNDNAME.values.size:
# print 'NOT OK'
# else:
# print 'OK'
# metrnd.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet2/TOTAL_METRICS_Skel_header.csv',index=False)
seg_df.to_csv('/home/arasan/testrep/psmd/jureca/bordeaux_packet3/psmd_seg2_vols.csv', index=False)
def hashPseudonym(self, i, key, tile):
digest = hashes.Hash(hashes.SHA256(), default_backend())
# for i in range (0,len(plainTail)): # {
_digest = digest.copy()
# key = secrets.token_bytes(32)
_digest.update(bytes(i))
_digest.update(key)
_digest.update(bytes(tile))
p = _digest.finalize() # }
# digest.finalize()
return p
def test_localization_of_pseudonym(self):
name = b" a 16 byte name "
target = b"PEP3 storage_facility"
pp = pep3_pb2.Pseudonymizable(data=name,
state=pep3_pb2.Pseudonymizable.UNENCRYPTED_NAME)
self.collector.pseudonymize([pp])
self.collector.relocalize([pp],
self.config.collector.warrants.to_sf)
sfp = elgamal.Triple.unpack(pp.data) \
.decrypt(self.sf.private_keys['pseudonym'])
pseudonym_secrets = {}
for peer_secrets in self.secrets.peers.values():
for shard, shard_secrets in peer_secrets.by_shard.items():
pseudonym_secrets[shard] \
= shard_secrets.pseudonym_component_secret
s = 1
e = ed25519.scalar_unpack(common.sha256(target))
for secret in pseudonym_secrets.values():
s *= pow(ed25519.scalar_unpack(secret), e, ed25519.l)
s %= ed25519.l
self.assertEqual(
sfp * ed25519.scalar_inv(s),
ed25519.Point.lizard(name))
def test_store_and_retrieve(self):
# first store a record with random source and target ip addresses,
# and see if we can recover it.
col_request = pep3_pb2.StoreRequest()
col_request.id = os.urandom(16)
flowrecord = col_request.records.add()
flowrecord.source_ip.data = os.urandom(16)
flowrecord.source_ip.state = pep3_pb2.Pseudonymizable.UNENCRYPTED_NAME
flowrecord.destination_ip.data = os.urandom(16)
flowrecord.destination_ip.state = \
pep3_pb2.Pseudonymizable.UNENCRYPTED_NAME
flowrecord.anonymous_part.number_of_bytes = 123
flowrecord.anonymous_part.number_of_packets = 456
updates = list(self.collector.connect_to('collector').Store(
iter([col_request])))
self.assertEqual(len(updates), 1)
self.assertEqual(updates[0].stored_id, col_request.id)
# store the same flowrecord twice, to see if that causes troubles
col_request.id = os.urandom(16)
updates = list(self.collector.connect_to('collector').Store(
iter([col_request])))
self.assertEqual(len(updates), 1)
self.assertEqual(updates[0].stored_id, col_request.id)
query = pep3_pb2.SqlQuery()
# manually compute storage_facility-local pseudonyms for query
sf_name = b"PEP3 storage_facility"
pseudonym_secrets = {}
for peer_secrets in self.secrets.peers.values():
for shard, shard_secrets in peer_secrets.by_shard.items():
pseudonym_secrets[shard] \
= shard_secrets.pseudonym_component_secret
s = 1
e = ed25519.scalar_unpack(common.sha256(sf_name))
for secret in pseudonym_secrets.values():
s *= pow(ed25519.scalar_unpack(secret), e, ed25519.l)
s %= ed25519.l
# see if the record was stored correctly by querying the
# database directly.
query.query = """SELECT peped_flows.p_dst_ip FROM peped_flows
WHERE peped_flows.p_src_ip=:ip"""
ip = query.parameters['ip'].pseudonymizable_value
ip.data = (ed25519.Point.lizard(
flowrecord.source_ip.data) * s).pack()
ip.state = pep3_pb2.Pseudonymizable.UNENCRYPTED_PSEUDONYM
row = self.sf.connect_to('database') \
.Query(query).next().rows[0]
self.assertEqual(row.cells[0].pseudonymizable_value.data,
(ed25519.Point.lizard(flowrecord.destination_ip.data) * s
).pack())
# manually compute researcher-local pseudonyms for query
researcher_name = b"PEP3 researcher"
pseudonym_secrets = {}
for peer_secrets in self.secrets.peers.values():
for shard, shard_secrets in peer_secrets.by_shard.items():
pseudonym_secrets[shard] \
= shard_secrets.pseudonym_component_secret
s = 1
e = ed25519.scalar_unpack(common.sha256(researcher_name))
for secret in pseudonym_secrets.values():
s *= pow(ed25519.scalar_unpack(secret), e, ed25519.l)
s %= ed25519.l
# now query via the researcher
query.parameters['ip'].pseudonymizable_value.data \
= (ed25519.Point.lizard(flowrecord.source_ip.data) * s).pack()
row = self.researcher.connect_to('researcher') \
.Query(query).next().rows[0]
self.assertEqual(row.cells[0].pseudonymizable_value.data,
(ed25519.Point.lizard(flowrecord.destination_ip.data) * s
).pack())
def test_depseudonymize(self):
ip = os.urandom(16)
# manually compute investigator-local pseudonym
pseudonym_secrets = {}
for peer_secrets in self.secrets.peers.values():
for shard, shard_secrets in peer_secrets.by_shard.items():
pseudonym_secrets[shard] \
= shard_secrets.pseudonym_component_secret
s = 1
e = ed25519.scalar_unpack(common.sha256(b"PEP3 investigator"))
for secret in pseudonym_secrets.values():
s *= pow(ed25519.scalar_unpack(secret), e, ed25519.l)
s %= ed25519.l
investigator_local_ip = (ed25519.Point.lizard(ip) * s).pack()
# manually create warrant
warrant = pep3_pb2.DepseudonymizationRequest.Warrant()
warrant.act.actor = b"PEP3 investigator"
warrant.act.name.state = pep3_pb2.Pseudonymizable.UNENCRYPTED_PSEUDONYM
warrant.act.name.data = investigator_local_ip
self.investigator.encrypt([warrant.act.name],
self.investigator.public_keys['pseudonym'])
warrant.signature = crypto.sign(
crypto.load_privatekey(crypto.FILETYPE_PEM,
self.secrets.root_certificate_keys.warrants),
warrant.act.SerializeToString(), 'sha256')
result = self.investigator.connect_to("investigator") \
.Depseudonymize(warrant)
self.assertEqual(result.data, ip)
def anonymize(cls, user, ldap_attrs, **kwargs):
# type: (User, Dict[AnyStr, Any], **Any) -> Dict[AnyStr, AnyStr]
"""
Change values of function arguments to anonymize/pseudonymize user if
UCRV asm/attributes//anonymize is true. Will return
unchanged function arguments otherwise.
:param User user: user object
:param dict ldap_attrs: dictionary with the users LDAP attributes
:return: dictionary with [modified] function arguments
:rtype: dict
:raises NotImplementedError: if cls.ucr_anonymize_key_base is unset
"""
ucr = get_ucr()
if ucr.is_true(cls.ucr_anonymize_key_base):
for k, v in cls.anonymize_mapping().items():
if v and v.startswith('%'):
attr = v[1:].strip()
try:
v = ldap_attrs[attr][0]
except KeyError:
raise ValueError('Attribute {!r} not found in LDAP object of {}.'.format(attr, user))
except IndexError:
raise ValueError('Attribute {!r} empty in LDAP object of {}.'.format(attr, user))
kwargs[k] = v
return kwargs
def _modify_dataset(
self,
anonymizer: Anonymizer,
pseudonym: str,
ds: Dataset,
) -> None:
"""Optionally pseudonymize an incoming dataset with the given pseudonym
and add the trial ID and name to the DICOM header if specified."""
if pseudonym:
# All dates get pseudonymized, but we want to retain the study date.
study_date = ds.StudyDate
anonymizer.anonymize(ds)
ds.StudyDate = study_date
ds.PatientID = pseudonym
ds.PatientName = pseudonym
trial_protocol_id = (self.transfer_task.job.trial_protocol_id,)
trial_protocol_name = self.transfer_task.job.trial_protocol_name
if trial_protocol_id:
ds.ClinicalTrialProtocolID = trial_protocol_id
if trial_protocol_name:
ds.ClinicalTrialProtocolName = trial_protocol_name
if pseudonym and trial_protocol_id:
session_id = f"{ds.StudyDate}-{ds.StudyTime}"
ds.PatientComments = f"Project:{trial_protocol_id} Subject:{pseudonym} Session:{pseudonym}_{session_id}"
def _psc1(psc1, psc2_from_psc1):
if 'TEST' in psc1.upper():
# skip test subjects
logging.debug('skipping test subject "%s"', psc1)
else:
# find and skip subjects with invalid identifier
if psc1[-3:] in {'FU2', 'FU3'}:
psc1 = psc1[:-3]
elif psc1[-2:] == 'SB':
psc1 = psc1[:-2]
if psc1 in psc2_from_psc1:
return psc1
elif psc1 in {'0x0000xxxxxx'}:
logging.info('skipping known invalid subject identifier "%s"',
psc1)
else:
logging.error('invalid subject identifier "%s"', psc1)
return None
def pseudonymize_node_name(name):
"""Replace Node.Name (detector ID) by a hash with secret key"""
h = hashlib.md5((app.secret_key + name).encode('utf-8'))
return 'node.' + h.hexdigest()[:6]
def pseudonymize(self, size=None):
"""
Return pseudonymized values for this attribute, which is used to
substitute identifiable data with a reversible, consistent value.
"""
size = size or self.size
if size != self.size:
attr = Series(np.random.choice(self.bins, size=size, p=self.prs))
else:
attr = self
if self.categorical:
mapping = {b: utils.pseudonymise_string(b) for b in self.bins}
return attr.map(lambda x: mapping[x])
if self.type == 'string':
return attr.map(utils.pseudonymise_string)
elif self.is_numerical or self.type == 'datetime':
return attr.map(str).map(utils.pseudonymise_string)
def pseudonymize(self, content):
if not content: return content
content_modified = ''
start = 0
for mo in re.finditer("\[\*\*[^\[]*\*\*\]", content):
replacement = self.mapper.get_mapping(mo.group(0))
content_modified += content[start: mo.start()]
content_modified += replacement
start = mo.end()
if start < len(content):
content_modified += content[start: len(content)]
return content_modified