s3_migration_cluster_jobsender.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # PROJECT LONGBOW - JOBSENDER FOR COMPARE AMAZON S3 AND CREATE DELTA JOB LIST TO SQS
  2. import json
  3. import os
  4. import sys
  5. import time
  6. from configparser import ConfigParser
  7. from s3_migration_lib import set_env, set_log, job_upload_sqs_ddb, delta_job_list, check_sqs_empty, \
  8. get_des_file_list, get_src_file_list
  9. from operator import itemgetter
  10. from pathlib import Path
  11. # Read config.ini
  12. cfg = ConfigParser()
  13. try:
  14. file_path = os.path.split(os.path.abspath(__file__))[0]
  15. cfg.read(f'{file_path}/s3_migration_cluster_config.ini', encoding='utf-8-sig')
  16. table_queue_name = cfg.get('Basic', 'table_queue_name')
  17. sqs_queue_name = cfg.get('Basic', 'sqs_queue_name')
  18. ssm_parameter_bucket = cfg.get('Basic', 'ssm_parameter_bucket')
  19. ssm_parameter_credentials = cfg.get('Basic', 'ssm_parameter_credentials')
  20. LocalProfileMode = cfg.getboolean('Debug', 'LocalProfileMode')
  21. JobType = cfg.get('Basic', 'JobType')
  22. MaxRetry = cfg.getint('Mode', 'MaxRetry')
  23. MaxThread = cfg.getint('Mode', 'MaxThread')
  24. MaxParallelFile = cfg.getint('Mode', 'MaxParallelFile')
  25. LoggingLevel = cfg.get('Debug', 'LoggingLevel')
  26. JobsenderCompareVersionId = cfg.getboolean('Mode', 'JobsenderCompareVersionId')
  27. except Exception as e:
  28. print("s3_migration_cluster_config.ini ERR: ", str(e))
  29. sys.exit(0)
  30. # if CDK deploy, get para from environment variable
  31. try:
  32. table_queue_name = os.environ['table_queue_name']
  33. sqs_queue_name = os.environ['sqs_queue_name']
  34. ssm_parameter_bucket = os.environ['ssm_parameter_bucket']
  35. except Exception as e:
  36. print("No Environment Variable from CDK, use the para from config.ini", str(e))
  37. # Main
  38. if __name__ == '__main__':
  39. # Set Logging
  40. logger, log_file_name = set_log(LoggingLevel, 'jobsender')
  41. # Get Environment
  42. sqs, sqs_queue, table, s3_src_client, s3_des_client, instance_id, ssm = \
  43. set_env(JobType=JobType,
  44. LocalProfileMode=LocalProfileMode,
  45. table_queue_name=table_queue_name,
  46. sqs_queue_name=sqs_queue_name,
  47. ssm_parameter_credentials=ssm_parameter_credentials,
  48. MaxRetry=MaxRetry)
  49. #######
  50. # Program start processing here
  51. #######
  52. # Get ignore file list
  53. ignore_list_path = os.path.split(os.path.abspath(__file__))[0] + '/s3_migration_ignore_list.txt'
  54. ignore_list = []
  55. try:
  56. with open(ignore_list_path, 'r') as f:
  57. ignore_list = f.read().splitlines()
  58. logger.info(f'Found ignore files list Length: {len(ignore_list)}, in {ignore_list_path}')
  59. except Exception as e:
  60. if e.args[1] == 'No such file or directory':
  61. logger.info(f'No ignore files list in {ignore_list_path}')
  62. print(f'No ignore files list in {ignore_list_path}')
  63. else:
  64. logger.info(str(e))
  65. # Check SQS is empty or not
  66. if check_sqs_empty(sqs, sqs_queue):
  67. logger.info('Job sqs queue is empty, now process comparing s3 bucket...')
  68. # Load Bucket para from ssm parameter store
  69. logger.info(f'Get ssm_parameter_bucket: {ssm_parameter_bucket}')
  70. try:
  71. load_bucket_para = json.loads(ssm.get_parameter(Name=ssm_parameter_bucket)['Parameter']['Value'])
  72. logger.info(f'Recieved ssm {json.dumps(load_bucket_para)}')
  73. except Exception as e:
  74. logger.error(f'Fail to get buckets info from ssm_parameter_bucket, fix and restart Jobsender. {str(e)}')
  75. sys.exit(0)
  76. for bucket_para in load_bucket_para:
  77. src_bucket = bucket_para['src_bucket']
  78. src_prefix = bucket_para['src_prefix']
  79. des_bucket = bucket_para['des_bucket']
  80. des_prefix = bucket_para['des_prefix']
  81. # Get List on S3
  82. logger.info('Get source bucket')
  83. src_file_list = get_src_file_list(
  84. s3_client=s3_src_client,
  85. bucket=src_bucket,
  86. S3Prefix=src_prefix,
  87. JobsenderCompareVersionId=JobsenderCompareVersionId
  88. )
  89. logger.info('Get destination bucket')
  90. des_file_list = get_des_file_list(
  91. s3_client=s3_des_client,
  92. bucket=des_bucket,
  93. S3Prefix=des_prefix,
  94. table=table,
  95. JobsenderCompareVersionId=JobsenderCompareVersionId
  96. )
  97. # Generate job list
  98. job_list, ignore_records = delta_job_list(
  99. src_file_list=src_file_list,
  100. des_file_list=des_file_list,
  101. src_bucket=src_bucket,
  102. src_prefix=src_prefix,
  103. des_bucket=des_bucket,
  104. des_prefix=des_prefix,
  105. ignore_list=ignore_list,
  106. JobsenderCompareVersionId=JobsenderCompareVersionId
  107. )
  108. # Upload jobs to sqs
  109. if len(job_list) != 0:
  110. job_upload_sqs_ddb(
  111. sqs=sqs,
  112. sqs_queue=sqs_queue,
  113. job_list=job_list
  114. )
  115. max_object = max(job_list, key=itemgetter('Size'))
  116. MaxChunkSize = int(max_object['Size'] / 10000) + 1024
  117. if MaxChunkSize < 5*1024*1024:
  118. MaxChunkSize = 5*1024*1024
  119. logger.warning(f'Max object size in job_list: {max_object["Size"]}.\n Require instance memory'
  120. f' > MaxChunksize x MaxThread x MaxParallelFile, i.e. '
  121. f'{MaxChunkSize} x {MaxThread} x {MaxParallelFile} = '
  122. f'{MaxChunkSize*MaxThread*MaxParallelFile}.\n If less memory, instance may crash!')
  123. else:
  124. logger.info('Source list are all in Destination, no job to send.')
  125. # Just backup for debug
  126. logger.info('Writing job and ignore list to local file backup...')
  127. t = time.localtime()
  128. start_time = f'{t.tm_year}-{t.tm_mon}-{t.tm_mday}-{t.tm_hour}-{t.tm_min}-{t.tm_sec}'
  129. log_path = str(Path(log_file_name).parent)
  130. if job_list:
  131. local_backup_list = f'{log_path}/job-list-{src_bucket}-{start_time}.json'
  132. with open(local_backup_list, 'w') as f:
  133. json.dump(job_list, f)
  134. logger.info(f'Write Job List: {os.path.abspath(local_backup_list)}')
  135. if ignore_records:
  136. local_ignore_records = f'{log_path}/ignore-records-{src_bucket}-{start_time}.json'
  137. with open(local_ignore_records, 'w') as f:
  138. json.dump(ignore_records, f)
  139. logger.info(f'Write Ignore List: {os.path.abspath(local_ignore_records)}')
  140. else:
  141. logger.error('Job sqs queue is not empty or fail to get_queue_attributes. Stop process.')
  142. print('Completed and logged to file:', os.path.abspath(log_file_name))