123456789101112131415161718192021222324252627282930313233343536373839404142 |
- def pseudonymize_7(graph: ProvDocument) -> ProvDocument:
- log.info(f"pseudonymize agents in {graph=}")
- # get all records except for agents and relations
- records = list(graph.get_records((ProvActivity, ProvEntity)))
- pseudonyms = dict()
- for agent in graph.get_records(ProvAgent):
- name = get_attribute(agent, USERNAME)
- mail = get_attribute(agent, USEREMAIL)
- if name is None:
- raise ValueError("ProvAgent representing a user has to have a name!")
- # hash name & mail if present
- namehash = hashlib.sha256(bytes(name, "utf-8")).hexdigest()
- mailhash = hashlib.sha256(bytes(mail, "utf-8")).hexdigest() if mail else None
- # create a new id as a pseudonym using the hashes
- pseudonym = qualified_name(f"User?name={namehash}&email={mailhash}")
- # map the old id to the pseudonym
- pseudonyms[agent.identifier] = pseudonym
- # keep only prov role & prov type
- # replace name & mail with hashes
- pseudonymized = pseudonymize_agent(
- agent,
- identifier=pseudonym,
- keep=[PROV_ROLE, PROV_TYPE],
- replace={USERNAME: namehash, USEREMAIL: mailhash},
- )
- # add pseudonymized agent to the list of records
- records.append(pseudonymized)
- # replace old id occurences with the pseudonymized id
- for relation in graph.get_records(ProvRelation):
- formal = [(key, pseudonyms.get(val, val)) for key, val in relation.formal_attributes]
- extra = [(key, pseudonyms.get(val, val)) for key, val in relation.extra_attributes]
- r_type = PROV_REC_CLS.get(relation.get_type())
- records.append(r_type(relation.bundle, relation.identifier, formal + extra))
- return graph_factory(records)
|