123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- import logging
- from typing import Tuple, Union
- import microdata_validator
- from job_executor.exception import BuilderStepError
- from job_executor.adapter import pseudonym_service
- from job_executor.model import Metadata
- logger = logging.getLogger()
- def _get_unit_types(
- metadata: Metadata
- ) -> Tuple[Union[str, None], Union[str, None]]:
- return (
- metadata.get_identifier_key_type_name(),
- metadata.get_measure_key_type_name()
- )
- def _pseudonymize_identifier_only(
- input_csv_path: str,
- unit_id_type: str,
- job_id: str
- ) -> str:
- unique_identifiers = set()
- with open(input_csv_path, newline='', encoding='utf8') as csv_file:
- for line in csv_file:
- unit_id = line.strip().split(';')[1]
- unique_identifiers.add(unit_id)
- identifier_to_pseudonym = pseudonym_service.pseudonymize(
- list(unique_identifiers), unit_id_type, job_id
- )
- output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
- target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
- with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
- for line in csv_file:
- row = line.strip().split(';')
- line_number: int = row[0]
- unit_id: str = row[1]
- value: str = row[2]
- start_date: str = row[3]
- stop_date: str = row[4]
- target_file.write(
- ';'.join([
- str(line_number),
- str(identifier_to_pseudonym[unit_id]),
- value,
- start_date, stop_date
- ]) + '\n'
- )
- target_file.close()
- return output_csv_path
- def _pseudonymize_measure_only(
- input_csv_path: str,
- unit_id_type: str,
- job_id: str
- ) -> str:
- unique_measure_values = set()
- with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
- for line in csv_file:
- value = line.strip().split(';')[2]
- unique_measure_values.add(value)
- value_to_pseudonym = pseudonym_service.pseudonymize(
- list(unique_measure_values), unit_id_type, job_id
- )
- output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
- target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
- with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
- for line in csv_file:
- row = line.strip().split(';')
- line_number: int = row[0]
- unit_id: str = row[1]
- value: str = row[2]
- start_date: str = row[3]
- stop_date: str = row[4]
- target_file.write(
- ';'.join([
- str(line_number),
- unit_id,
- str(value_to_pseudonym[value]),
- start_date, stop_date
- ]) + '\n'
- )
- target_file.close()
- return output_csv_path
- def _pseudonymize_identifier_and_measure(
- input_csv_path: str,
- identifier_unit_id_type: str,
- measure_unit_id_type: str,
- job_id: str
- ) -> str:
- unique_idents = set()
- unique_measure_values = set()
- with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
- for line in csv_file:
- row = line.strip().split(';')
- unit_id = row[1]
- value = row[2]
- unique_idents.add(unit_id)
- unique_measure_values.add(value)
- identifier_to_pseudonym = pseudonym_service.pseudonymize(
- list(unique_idents), identifier_unit_id_type, job_id
- )
- value_to_pseudonym = pseudonym_service.pseudonymize(
- list(unique_measure_values), measure_unit_id_type, job_id
- )
- output_csv_path = input_csv_path.replace('.csv', '_pseudonymized.csv')
- target_file = open(output_csv_path, 'w', newline='', encoding='utf-8')
- with open(input_csv_path, newline='', encoding='utf-8') as csv_file:
- for line in csv_file:
- row = line.strip().split(';')
- line_number: int = row[0]
- unit_id: str = row[1]
- value: str = row[2]
- start_date: str = row[3]
- stop_date: str = row[4]
- target_file.write(
- ';'.join([
- str(line_number),
- str(identifier_to_pseudonym[unit_id]),
- str(value_to_pseudonym[value]),
- start_date, stop_date
- ]) + '\n'
- )
- target_file.close()
- return output_csv_path
- def _pseudonymize_csv(
- input_csv_path: str,
- identifier_unit_id_type: Union[str, None],
- measure_unit_id_type: Union[str, None],
- job_id: str
- ) -> str:
- if identifier_unit_id_type and not measure_unit_id_type:
- logger.info('Pseudonymizing identifier')
- return _pseudonymize_identifier_only(
- input_csv_path, identifier_unit_id_type, job_id
- )
- elif measure_unit_id_type and not identifier_unit_id_type:
- logger.info('Pseudonymizing measure')
- return _pseudonymize_measure_only(
- input_csv_path, measure_unit_id_type, job_id
- )
- elif identifier_unit_id_type and measure_unit_id_type:
- logger.info('Pseudonymizing identifier and measure')
- return _pseudonymize_identifier_and_measure(
- input_csv_path,
- identifier_unit_id_type,
- measure_unit_id_type,
- job_id
- )
- else:
- logger.info('No pseudonymization')
- return input_csv_path
- def run(input_csv_path: str, metadata: Metadata, job_id: str) -> str:
- """
- Pseudonymizes the identifier column of the dataset. Requests pseudonyms
- from an external service and replaces all values in the identifier column.
- """
- try:
- logger.info(f'Pseudonymizing data {input_csv_path}')
- identifier_unit_type, measure_unit_type = (
- _get_unit_types(metadata)
- )
- identifier_unit_id_type = (
- None if identifier_unit_type is None
- else microdata_validator.get_unit_id_type_for_unit_type(
- identifier_unit_type
- )
- )
- measure_unit_id_type = (
- None if measure_unit_type is None
- else microdata_validator.get_unit_id_type_for_unit_type(
- measure_unit_type
- )
- )
- output_file = _pseudonymize_csv(
- input_csv_path,
- identifier_unit_id_type,
- measure_unit_id_type,
- job_id
- )
- logger.info(f'Pseudonymization step done {output_file}')
- return output_file
- except Exception as e:
- logger.error(f'Error during pseudonymization: {str(e)}')
- raise BuilderStepError('Failed to pseudonymize dataset') from e
|