s3_migration_cluster_jobsender.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # PROJECT LONGBOW - JOBSENDER FOR COMPARE AMAZON S3 AND CREATE DELTA JOB LIST TO SQS
  2. import json
  3. import os
  4. import sys
  5. import time
  6. from configparser import ConfigParser
  7. from s3_migration_lib import set_env, set_log, get_s3_file_list, job_upload_sqs_ddb, delta_job_list, check_sqs_empty
  8. from operator import itemgetter
  9. # Read config.ini
  10. cfg = ConfigParser()
  11. try:
  12. file_path = os.path.split(os.path.abspath(__file__))[0]
  13. cfg.read(f'{file_path}/s3_migration_cluster_config.ini', encoding='utf-8-sig')
  14. table_queue_name = cfg.get('Basic', 'table_queue_name')
  15. sqs_queue_name = cfg.get('Basic', 'sqs_queue_name')
  16. ssm_parameter_bucket = cfg.get('Basic', 'ssm_parameter_bucket')
  17. ssm_parameter_credentials = cfg.get('Basic', 'ssm_parameter_credentials')
  18. LocalProfileMode = cfg.getboolean('Debug', 'LocalProfileMode')
  19. JobType = cfg.get('Basic', 'JobType')
  20. MaxThread = cfg.getint('Mode', 'MaxThread')
  21. MaxParallelFile = cfg.getint('Mode', 'MaxParallelFile')
  22. LoggingLevel = cfg.get('Debug', 'LoggingLevel')
  23. except Exception as e:
  24. print("s3_migration_cluster_config.ini ERR: ", str(e))
  25. sys.exit(0)
  26. # if CDK deploy, get para from environment variable
  27. try:
  28. table_queue_name = os.environ['table_queue_name']
  29. sqs_queue_name = os.environ['sqs_queue_name']
  30. ssm_parameter_bucket = os.environ['ssm_parameter_bucket']
  31. except Exception:
  32. print("No Environment Variable from CDK, use the para from config.ini")
  33. # Main
  34. if __name__ == '__main__':
  35. # Set Logging
  36. logger, log_file_name = set_log(LoggingLevel, 'jobsender')
  37. # Get Environment
  38. sqs, sqs_queue, table, s3_src_client, s3_des_client, instance_id, ssm = \
  39. set_env(JobType, LocalProfileMode, table_queue_name, sqs_queue_name, ssm_parameter_credentials)
  40. #######
  41. # Program start processing here
  42. #######
  43. # Get ignore file list
  44. ignore_list_path = os.path.split(os.path.abspath(__file__))[0] + '/s3_migration_ignore_list.txt'
  45. ignore_list = []
  46. try:
  47. with open(ignore_list_path, 'r') as f:
  48. ignore_list = f.read().splitlines()
  49. logger.info(f'Found ignore files list Length: {len(ignore_list)}, in {ignore_list_path}')
  50. except Exception as e:
  51. if e.args[1] == 'No such file or directory':
  52. logger.info(f'No ignore files list in {ignore_list_path}')
  53. print(f'No ignore files list in {ignore_list_path}')
  54. else:
  55. logger.info(str(e))
  56. # Check SQS is empty or not
  57. if check_sqs_empty(sqs, sqs_queue):
  58. logger.info('Job sqs queue is empty, now process comparing s3 bucket...')
  59. # Load Bucket para from ssm parameter store
  60. logger.info(f'Get ssm_parameter_bucket: {ssm_parameter_bucket}')
  61. try:
  62. load_bucket_para = json.loads(ssm.get_parameter(Name=ssm_parameter_bucket)['Parameter']['Value'])
  63. logger.info(f'Recieved ssm {json.dumps(load_bucket_para)}')
  64. except Exception as e:
  65. logger.error(f'Fail to get buckets info from ssm_parameter_bucket, fix and restart Jobsender. {str(e)}')
  66. sys.exit(0)
  67. for bucket_para in load_bucket_para:
  68. src_bucket = bucket_para['src_bucket']
  69. src_prefix = bucket_para['src_prefix']
  70. des_bucket = bucket_para['des_bucket']
  71. des_prefix = bucket_para['des_prefix']
  72. # Get List on S3
  73. src_file_list = get_s3_file_list(s3_src_client, src_bucket, src_prefix)
  74. des_file_list = get_s3_file_list(s3_des_client, des_bucket, des_prefix, True)
  75. # Generate job list
  76. job_list, ignore_records = delta_job_list(src_file_list, des_file_list, src_bucket, src_prefix, des_bucket, des_prefix, ignore_list)
  77. # Just backup for debug
  78. logger.info('Writing job and ignore list to local file backup...')
  79. t = time.localtime()
  80. start_time = f'{t.tm_year}-{t.tm_mon}-{t.tm_mday}-{t.tm_hour}-{t.tm_min}-{t.tm_sec}'
  81. log_path = os.path.split(os.path.abspath(__file__))[0] + '/s3_migration_log'
  82. if job_list:
  83. local_backup_list = f'{log_path}/job-list-{src_bucket}-{start_time}.json'
  84. with open(local_backup_list, 'w') as f:
  85. json.dump(job_list, f)
  86. logger.info(f'Write Job List: {os.path.abspath(local_backup_list)}')
  87. if ignore_records:
  88. local_ignore_records = f'{log_path}/ignore-records-{src_bucket}-{start_time}.json'
  89. with open(local_ignore_records, 'w') as f:
  90. json.dump(ignore_records, f)
  91. logger.info(f'Write Ignore List: {os.path.abspath(local_ignore_records)}')
  92. # Upload jobs to sqs
  93. if len(job_list) != 0:
  94. job_upload_sqs_ddb(sqs, sqs_queue, table, job_list)
  95. max_object = max(job_list, key=itemgetter('Size'))
  96. MaxChunkSize = int(max_object['Size'] / 10000) + 1024
  97. if max_object['Size'] > 50*1024*1024*1024:
  98. logger.warning(f'Max object in job_list is {str(max_object)}. Remember to check instance memory >= '
  99. f'MaxChunksize x MaxThread x MaxParallelFile, i.e. '
  100. f'{MaxChunkSize} x {MaxThread} x {MaxParallelFile} = '
  101. f'{MaxChunkSize*MaxThread*MaxParallelFile}')
  102. else:
  103. logger.info(f'Max object in job_list is {str(max_object)}')
  104. else:
  105. logger.info('Source list are all in Destination, no job to send.')
  106. else:
  107. logger.error('Job sqs queue is not empty or fail to get_queue_attributes. Stop process.')
  108. print('Completed and logged to file:', os.path.abspath(log_file_name))