main_26.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. def pseudonymize_7(graph: ProvDocument) -> ProvDocument:
  2. log.info(f"pseudonymize agents in {graph=}")
  3. # get all records except for agents and relations
  4. records = list(graph.get_records((ProvActivity, ProvEntity)))
  5. pseudonyms = dict()
  6. for agent in graph.get_records(ProvAgent):
  7. name = get_attribute(agent, USERNAME)
  8. mail = get_attribute(agent, USEREMAIL)
  9. if name is None:
  10. raise ValueError("ProvAgent representing a user has to have a name!")
  11. # hash name & mail if present
  12. namehash = hashlib.sha256(bytes(name, "utf-8")).hexdigest()
  13. mailhash = hashlib.sha256(bytes(mail, "utf-8")).hexdigest() if mail else None
  14. # create a new id as a pseudonym using the hashes
  15. pseudonym = qualified_name(f"User?name={namehash}&email={mailhash}")
  16. # map the old id to the pseudonym
  17. pseudonyms[agent.identifier] = pseudonym
  18. # keep only prov role & prov type
  19. # replace name & mail with hashes
  20. pseudonymized = pseudonymize_agent(
  21. agent,
  22. identifier=pseudonym,
  23. keep=[PROV_ROLE, PROV_TYPE],
  24. replace={USERNAME: namehash, USEREMAIL: mailhash},
  25. )
  26. # add pseudonymized agent to the list of records
  27. records.append(pseudonymized)
  28. # replace old id occurences with the pseudonymized id
  29. for relation in graph.get_records(ProvRelation):
  30. formal = [(key, pseudonyms.get(val, val)) for key, val in relation.formal_attributes]
  31. extra = [(key, pseudonyms.get(val, val)) for key, val in relation.extra_attributes]
  32. r_type = PROV_REC_CLS.get(relation.get_type())
  33. records.append(r_type(relation.bundle, relation.identifier, formal + extra))
  34. return graph_factory(records)