23-Third_step.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. '''
  2. Third step to create anonymize csv
  3. '''
  4. from pyspark.sql import SparkSession
  5. from faker import Factory
  6. from pyspark.sql.functions import udf
  7. from pyspark.sql.types import *
  8. import ConfigParser
  9. import logging
  10. logging.basicConfig(filename='today.log',level=logging.INFO)
  11. config = ConfigParser.ConfigParser()
  12. config.read('config.ini')
  13. #create faker factory
  14. fake = Factory.create('en_AU')
  15. #user define function for the first name faker
  16. def fake_first_name():
  17. return fake.first_name()
  18. fake_first_name_udf = udf(fake_first_name, StringType())
  19. #user define function for the last name faker
  20. def fake_last_name():
  21. return fake.last_name()
  22. fake_last_name_udf = udf(fake_last_name, StringType())
  23. #user define function for the address faker
  24. def fake_address():
  25. return fake.address().replace('\n','')
  26. fake_address_udf = udf(fake_address, StringType())
  27. spark = SparkSession.builder.appName('Third_step').getOrCreate()
  28. df = spark.read.option('header',True).option('delimiter','|').option("inferSchema",True).csv("data/"+config.get('DEFAULT','csv_file.name'))
  29. #logging.info(df.printSchema())
  30. df.show(truncate=False)
  31. anonymize_df = df.withColumn(config.get('FIXED_WIDTH_FILE','first_name.name'), fake_first_name_udf()) \
  32. .withColumn(config.get('FIXED_WIDTH_FILE','last_name.name'), fake_last_name_udf()) \
  33. .withColumn('address', fake_address_udf())
  34. # integrity check
  35. if int(anonymize_df.count()) != int(config.get('DEFAULT','number_of_records')):
  36. logging.error("Integrity check failed because data frame record count is %d and expected count is %d",anonymize_df.count(), int(config.get('DEFAULT','number_of_records') ))
  37. else:
  38. logging.info('Integrity check passed.')
  39. # anonymize_df.show(truncate=False)
  40. anonymize_df.write.option('delimiter','|').option('header',True).format('csv').save('data/'+config.get('DEFAULT','anonymize_file.name'))