lambda_function_jobsender.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. # PROJECT LONGBOW - JOBSENDER FOR COMPARE AMAZON S3 AND CREATE DELTA JOB LIST TO SQS
  2. import json
  3. import logging
  4. import os
  5. import ssl
  6. import urllib.request
  7. from operator import itemgetter
  8. from s3_migration_lib import get_s3_file_list, job_upload_sqs_ddb, delta_job_list, check_sqs_empty
  9. import boto3
  10. # 环境变量
  11. table_queue_name = os.environ['table_queue_name']
  12. StorageClass = os.environ['StorageClass']
  13. ssm_parameter_credentials = os.environ['ssm_parameter_credentials']
  14. checkip_url = os.environ['checkip_url']
  15. sqs_queue_name = os.environ['sqs_queue']
  16. ssm_parameter_ignore_list = os.environ['ssm_parameter_ignore_list']
  17. ssm_parameter_bucket = os.environ['ssm_parameter_bucket']
  18. # 内部参数
  19. JobType = "PUT"
  20. # Set environment
  21. logger = logging.getLogger()
  22. logger.setLevel(logging.INFO)
  23. dynamodb = boto3.resource('dynamodb')
  24. table = dynamodb.Table(table_queue_name)
  25. sqs = boto3.client('sqs')
  26. sqs_queue = sqs.get_queue_url(QueueName=sqs_queue_name)['QueueUrl']
  27. # Get credentials of the other account
  28. ssm = boto3.client('ssm')
  29. logger.info(f'Get ssm_parameter_credentials: {ssm_parameter_credentials}')
  30. credentials = json.loads(ssm.get_parameter(
  31. Name=ssm_parameter_credentials,
  32. WithDecryption=True
  33. )['Parameter']['Value'])
  34. credentials_session = boto3.session.Session(
  35. aws_access_key_id=credentials["aws_access_key_id"],
  36. aws_secret_access_key=credentials["aws_secret_access_key"],
  37. region_name=credentials["region"]
  38. )
  39. # Get buckets information
  40. logger.info(f'Get ssm_parameter_bucket: {ssm_parameter_bucket}')
  41. load_bucket_para = json.loads(ssm.get_parameter(Name=ssm_parameter_bucket)['Parameter']['Value'])
  42. logger.info(f'Recieved ssm {json.dumps(load_bucket_para)}')
  43. # Default Jobtype is PUT
  44. s3_src_client = boto3.client('s3')
  45. s3_des_client = credentials_session.client('s3')
  46. if JobType.upper() == "GET":
  47. s3_src_client, s3_des_client = s3_des_client, s3_src_client
  48. try:
  49. context = ssl._create_unverified_context()
  50. response = urllib.request.urlopen(
  51. urllib.request.Request(checkip_url), timeout=3, context=context
  52. ).read()
  53. instance_id = "lambda-" + response.decode('utf-8')
  54. except Exception as e:
  55. logger.warning(f'Fail to connect to checkip.amazonaws.com')
  56. instance_id = 'lambda-ip-timeout'
  57. # handler
  58. def lambda_handler(event, context):
  59. # Get ignore file list
  60. ignore_list = []
  61. try:
  62. logger.info('Try to get ignore list from ssm parameter')
  63. ignore_list = ssm.get_parameter(Name=ssm_parameter_ignore_list)['Parameter']['Value'].splitlines()
  64. logger.info(f'Get ignore list: {str(ignore_list)}')
  65. except Exception:
  66. logger.info(f'No ignore list in ssm parameter')
  67. # Check SQS is empty or not
  68. if check_sqs_empty(sqs, sqs_queue):
  69. logger.info('Job sqs queue is empty, now process comparing s3 bucket...')
  70. for bucket_para in load_bucket_para:
  71. src_bucket = bucket_para['src_bucket']
  72. src_prefix = bucket_para['src_prefix']
  73. des_bucket = bucket_para['des_bucket']
  74. des_prefix = bucket_para['des_prefix']
  75. # Get List on S3
  76. logger.info('Get source bucket')
  77. src_file_list = get_s3_file_list(s3_src_client, src_bucket, src_prefix)
  78. logger.info('Get destination bucket')
  79. des_file_list = get_s3_file_list(s3_des_client, des_bucket, des_prefix, True)
  80. # Generate job list
  81. job_list, ignore_records = delta_job_list(src_file_list, des_file_list,
  82. src_bucket, src_prefix, des_bucket, des_prefix, ignore_list)
  83. # Output for debug
  84. print("job_list: ")
  85. if job_list:
  86. for n in job_list:
  87. print(str(n))
  88. print("ignore_records: ")
  89. if ignore_records:
  90. for n in ignore_records:
  91. print(str(n))
  92. # Upload jobs to sqs
  93. if len(job_list) != 0:
  94. job_upload_sqs_ddb(sqs, sqs_queue, table, job_list)
  95. max_object = max(job_list, key=itemgetter('Size'))
  96. MaxChunkSize = int(max_object['Size'] / 10000) + 1024
  97. if max_object['Size'] >= 50 * 1024 * 1024 * 1024:
  98. logger.warning(f'Max object in job_list is {str(max_object)}. Remember to check instance memory >= '
  99. f'MaxChunksize({MaxChunkSize}) x MaxThread x MaxParallelFile')
  100. else:
  101. logger.info('Source list are all in Destination, no job to send.')
  102. else:
  103. logger.error('Job sqs queue is not empty or fail to get_queue_attributes. Stop process.')
  104. # print('Completed and logged to file:', os.path.abspath(log_file_name))