1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- '''
- Third step to create anonymize csv
- '''
- from pyspark.sql import SparkSession
- from faker import Factory
- from pyspark.sql.functions import udf
- from pyspark.sql.types import *
- import ConfigParser
- import logging
- logging.basicConfig(filename='today.log',level=logging.INFO)
- config = ConfigParser.ConfigParser()
- config.read('config.ini')
- #create faker factory
- fake = Factory.create('en_AU')
- #user define function for the first name faker
- def fake_first_name():
- return fake.first_name()
- fake_first_name_udf = udf(fake_first_name, StringType())
- #user define function for the last name faker
- def fake_last_name():
- return fake.last_name()
- fake_last_name_udf = udf(fake_last_name, StringType())
- #user define function for the address faker
- def fake_address():
- return fake.address().replace('\n','')
- fake_address_udf = udf(fake_address, StringType())
- spark = SparkSession.builder.appName('Third_step').getOrCreate()
- df = spark.read.option('header',True).option('delimiter','|').option("inferSchema",True).csv("data/"+config.get('DEFAULT','csv_file.name'))
- #logging.info(df.printSchema())
- df.show(truncate=False)
- anonymize_df = df.withColumn(config.get('FIXED_WIDTH_FILE','first_name.name'), fake_first_name_udf()) \
- .withColumn(config.get('FIXED_WIDTH_FILE','last_name.name'), fake_last_name_udf()) \
- .withColumn('address', fake_address_udf())
- # integrity check
- if int(anonymize_df.count()) != int(config.get('DEFAULT','number_of_records')):
- logging.error("Integrity check failed because data frame record count is %d and expected count is %d",anonymize_df.count(), int(config.get('DEFAULT','number_of_records') ))
- else:
- logging.info('Integrity check passed.')
- # anonymize_df.show(truncate=False)
- anonymize_df.write.option('delimiter','|').option('header',True).format('csv').save('data/'+config.get('DEFAULT','anonymize_file.name'))
|