dataset_pseudonymizer_4.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. def _pseudonymize_identifier_and_measure(
  2. input_csv_path: str,
  3. identifier_unit_id_type: str,
  4. measure_unit_id_type: str,
  5. job_id: str
  6. ) -> str:
  7. unique_idents = set()
  8. unique_measure_values = set()
  9. with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
  10. for line in csv_file:
  11. row = line.strip().split(';')
  12. unit_id = row[1]
  13. value = row[2]
  14. unique_idents.add(unit_id)
  15. unique_measure_values.add(value)
  16. identifier_to_pseudonym = pseudonym_service.pseudonymize(
  17. list(unique_idents), identifier_unit_id_type, job_id
  18. )
  19. value_to_pseudonym = pseudonym_service.pseudonymize(
  20. list(unique_measure_values), measure_unit_id_type, job_id
  21. )
  22. output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
  23. target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
  24. with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
  25. for line in csv_file:
  26. row = line.strip().split(';')
  27. line_number: int = row[0]
  28. unit_id: str = row[1]
  29. value: str = row[2]
  30. start_date: str = row[3]
  31. stop_date: str = row[4]
  32. target_file.write(
  33. ';'.join([
  34. str(line_number),
  35. str(identifier_to_pseudonym[unit_id]),
  36. str(value_to_pseudonym[value]),
  37. start_date, stop_date
  38. ]) + '\n'
  39. )
  40. target_file.close()
  41. return output_csv_path