dataset_pseudonymizer.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import logging
  2. from typing import Tuple, Union
  3. import microdata_validator
  4. from job_executor.exception import BuilderStepError
  5. from job_executor.adapter import pseudonym_service
  6. from job_executor.model import Metadata
  7. logger = logging.getLogger()
  8. def _get_unit_types(
  9. metadata: Metadata
  10. ) -> Tuple[Union[str, None], Union[str, None]]:
  11. return (
  12. metadata.get_identifier_key_type_name(),
  13. metadata.get_measure_key_type_name()
  14. )
  15. def _pseudonymize_identifier_only(
  16. input_csv_path: str,
  17. unit_id_type: str,
  18. job_id: str
  19. ) -> str:
  20. unique_identifiers = set()
  21. with open(input_csv_path, newline='', encoding='utf8') as csv_file:
  22. for line in csv_file:
  23. unit_id = line.strip().split(';')[1]
  24. unique_identifiers.add(unit_id)
  25. identifier_to_pseudonym = pseudonym_service.pseudonymize(
  26. list(unique_identifiers), unit_id_type, job_id
  27. )
  28. output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
  29. target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
  30. with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
  31. for line in csv_file:
  32. row = line.strip().split(';')
  33. line_number: int = row[0]
  34. unit_id: str = row[1]
  35. value: str = row[2]
  36. start_date: str = row[3]
  37. stop_date: str = row[4]
  38. target_file.write(
  39. ';'.join([
  40. str(line_number),
  41. str(identifier_to_pseudonym[unit_id]),
  42. value,
  43. start_date, stop_date
  44. ]) + '\n'
  45. )
  46. target_file.close()
  47. return output_csv_path
  48. def _pseudonymize_measure_only(
  49. input_csv_path: str,
  50. unit_id_type: str,
  51. job_id: str
  52. ) -> str:
  53. unique_measure_values = set()
  54. with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
  55. for line in csv_file:
  56. value = line.strip().split(';')[2]
  57. unique_measure_values.add(value)
  58. value_to_pseudonym = pseudonym_service.pseudonymize(
  59. list(unique_measure_values), unit_id_type, job_id
  60. )
  61. output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
  62. target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
  63. with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
  64. for line in csv_file:
  65. row = line.strip().split(';')
  66. line_number: int = row[0]
  67. unit_id: str = row[1]
  68. value: str = row[2]
  69. start_date: str = row[3]
  70. stop_date: str = row[4]
  71. target_file.write(
  72. ';'.join([
  73. str(line_number),
  74. unit_id,
  75. str(value_to_pseudonym[value]),
  76. start_date, stop_date
  77. ]) + '\n'
  78. )
  79. target_file.close()
  80. return output_csv_path
  81. def _pseudonymize_identifier_and_measure(
  82. input_csv_path: str,
  83. identifier_unit_id_type: str,
  84. measure_unit_id_type: str,
  85. job_id: str
  86. ) -> str:
  87. unique_idents = set()
  88. unique_measure_values = set()
  89. with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
  90. for line in csv_file:
  91. row = line.strip().split(';')
  92. unit_id = row[1]
  93. value = row[2]
  94. unique_idents.add(unit_id)
  95. unique_measure_values.add(value)
  96. identifier_to_pseudonym = pseudonym_service.pseudonymize(
  97. list(unique_idents), identifier_unit_id_type, job_id
  98. )
  99. value_to_pseudonym = pseudonym_service.pseudonymize(
  100. list(unique_measure_values), measure_unit_id_type, job_id
  101. )
  102. output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
  103. target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
  104. with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
  105. for line in csv_file:
  106. row = line.strip().split(';')
  107. line_number: int = row[0]
  108. unit_id: str = row[1]
  109. value: str = row[2]
  110. start_date: str = row[3]
  111. stop_date: str = row[4]
  112. target_file.write(
  113. ';'.join([
  114. str(line_number),
  115. str(identifier_to_pseudonym[unit_id]),
  116. str(value_to_pseudonym[value]),
  117. start_date, stop_date
  118. ]) + '\n'
  119. )
  120. target_file.close()
  121. return output_csv_path
  122. def _pseudonymize_csv(
  123. input_csv_path: str,
  124. identifier_unit_id_type: Union[str, None],
  125. measure_unit_id_type: Union[str, None],
  126. job_id: str
  127. ) -> str:
  128. if identifier_unit_id_type and not measure_unit_id_type:
  129. logger.info('Pseudonymizing identifier')
  130. return _pseudonymize_identifier_only(
  131. input_csv_path, identifier_unit_id_type, job_id
  132. )
  133. elif measure_unit_id_type and not identifier_unit_id_type:
  134. logger.info('Pseudonymizing measure')
  135. return _pseudonymize_measure_only(
  136. input_csv_path, measure_unit_id_type, job_id
  137. )
  138. elif identifier_unit_id_type and measure_unit_id_type:
  139. logger.info('Pseudonymizing identifier and measure')
  140. return _pseudonymize_identifier_and_measure(
  141. input_csv_path,
  142. identifier_unit_id_type,
  143. measure_unit_id_type,
  144. job_id
  145. )
  146. else:
  147. logger.info('No pseudonymization')
  148. return input_csv_path
  149. def run(input_csv_path: str, metadata: Metadata, job_id: str) -> str:
  150. """
  151. Pseudonymizes the identifier column of the dataset. Requests pseudonyms
  152. from an external service and replaces all values in the identifier column.
  153. """
  154. try:
  155. logger.info(f'Pseudonymizing data {input_csv_path}')
  156. identifier_unit_type, measure_unit_type = (
  157. _get_unit_types(metadata)
  158. )
  159. identifier_unit_id_type = (
  160. None if identifier_unit_type is None
  161. else microdata_validator.get_unit_id_type_for_unit_type(
  162. identifier_unit_type
  163. )
  164. )
  165. measure_unit_id_type = (
  166. None if measure_unit_type is None
  167. else microdata_validator.get_unit_id_type_for_unit_type(
  168. measure_unit_type
  169. )
  170. )
  171. output_file = _pseudonymize_csv(
  172. input_csv_path,
  173. identifier_unit_id_type,
  174. measure_unit_id_type,
  175. job_id
  176. )
  177. logger.info(f'Pseudonymization step done {output_file}')
  178. return output_file
  179. except Exception as e:
  180. logger.error(f'Error during pseudonymization: {str(e)}')
  181. raise BuilderStepError('Failed to pseudonymize dataset') from e