lambda_function_jobsender.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # PROJECT LONGBOW - JOBSENDER FOR COMPARE AMAZON S3 AND CREATE DELTA JOB LIST TO SQS
  2. import json
  3. import logging
  4. import os
  5. import ssl
  6. import urllib.request
  7. import urllib.error
  8. from operator import itemgetter
  9. from s3_migration_lib import get_des_file_list, get_src_file_list, job_upload_sqs_ddb, delta_job_list, check_sqs_empty
  10. from botocore.config import Config
  11. import boto3
  12. # 环境变量
  13. table_queue_name = os.environ['table_queue_name']
  14. StorageClass = os.environ['StorageClass']
  15. ssm_parameter_credentials = os.environ['ssm_parameter_credentials']
  16. checkip_url = os.environ['checkip_url']
  17. sqs_queue_name = os.environ['sqs_queue']
  18. ssm_parameter_ignore_list = os.environ['ssm_parameter_ignore_list']
  19. ssm_parameter_bucket = os.environ['ssm_parameter_bucket']
  20. JobType = os.environ['JobType']
  21. MaxRetry = int(os.environ['MaxRetry']) # 最大请求重试次数
  22. JobsenderCompareVersionId = os.environ['JobsenderCompareVersionId'].upper() == 'TRUE'
  23. # Set environment
  24. s3_config = Config(max_pool_connections=50, retries={'max_attempts': MaxRetry}) # 最大连接数
  25. logger = logging.getLogger()
  26. logger.setLevel(logging.INFO)
  27. dynamodb = boto3.resource('dynamodb')
  28. table = dynamodb.Table(table_queue_name)
  29. sqs = boto3.client('sqs')
  30. sqs_queue = sqs.get_queue_url(QueueName=sqs_queue_name)['QueueUrl']
  31. # Get credentials of the other account
  32. ssm = boto3.client('ssm')
  33. logger.info(f'Get ssm_parameter_credentials: {ssm_parameter_credentials}')
  34. credentials = json.loads(ssm.get_parameter(
  35. Name=ssm_parameter_credentials,
  36. WithDecryption=True
  37. )['Parameter']['Value'])
  38. credentials_session = boto3.session.Session(
  39. aws_access_key_id=credentials["aws_access_key_id"],
  40. aws_secret_access_key=credentials["aws_secret_access_key"],
  41. region_name=credentials["region"]
  42. )
  43. # Get buckets information
  44. logger.info(f'Get ssm_parameter_bucket: {ssm_parameter_bucket}')
  45. load_bucket_para = json.loads(ssm.get_parameter(Name=ssm_parameter_bucket)['Parameter']['Value'])
  46. logger.info(f'Recieved ssm {json.dumps(load_bucket_para)}')
  47. # Default Jobtype is PUT
  48. s3_src_client = boto3.client('s3', config=s3_config)
  49. s3_des_client = credentials_session.client('s3', config=s3_config)
  50. if JobType.upper() == "GET":
  51. s3_src_client, s3_des_client = s3_des_client, s3_src_client
  52. try:
  53. context = ssl.SSLContext(ssl.PROTOCOL_TLS)
  54. response = urllib.request.urlopen(
  55. urllib.request.Request(checkip_url), timeout=3, context=context
  56. ).read()
  57. instance_id = "lambda-" + response.decode('utf-8')
  58. except urllib.error.URLError as e:
  59. logger.warning(f'Fail to connect to checkip api: {checkip_url} - {str(e)}')
  60. instance_id = 'lambda-ip-timeout'
  61. # handler
  62. def lambda_handler(event, context):
  63. # Get ignore file list
  64. ignore_list = []
  65. try:
  66. logger.info('Try to get ignore list from ssm parameter')
  67. ignore_list = ssm.get_parameter(Name=ssm_parameter_ignore_list)['Parameter']['Value'].splitlines()
  68. logger.info(f'Get ignore list: {str(ignore_list)}')
  69. except Exception as e:
  70. logger.info(f'No ignore list in ssm parameter - {str(e)}')
  71. # Check SQS is empty or not
  72. if check_sqs_empty(sqs, sqs_queue):
  73. logger.info('Job sqs queue is empty, now process comparing s3 bucket...')
  74. for bucket_para in load_bucket_para:
  75. src_bucket = bucket_para['src_bucket']
  76. src_prefix = bucket_para['src_prefix']
  77. des_bucket = bucket_para['des_bucket']
  78. des_prefix = bucket_para['des_prefix']
  79. # Get List on S3
  80. logger.info('Get source bucket')
  81. src_file_list = get_src_file_list(
  82. s3_client=s3_src_client,
  83. bucket=src_bucket,
  84. S3Prefix=src_prefix,
  85. JobsenderCompareVersionId=JobsenderCompareVersionId
  86. )
  87. logger.info('Get destination bucket')
  88. des_file_list = get_des_file_list(
  89. s3_client=s3_des_client,
  90. bucket=des_bucket,
  91. S3Prefix=des_prefix,
  92. table=table,
  93. JobsenderCompareVersionId=JobsenderCompareVersionId
  94. )
  95. # Generate job list
  96. job_list, ignore_records = delta_job_list(
  97. src_file_list=src_file_list,
  98. des_file_list=des_file_list,
  99. src_bucket=src_bucket,
  100. src_prefix=src_prefix,
  101. des_bucket=des_bucket,
  102. des_prefix=des_prefix,
  103. ignore_list=ignore_list,
  104. JobsenderCompareVersionId=JobsenderCompareVersionId
  105. )
  106. # Upload jobs to sqs
  107. if len(job_list) != 0:
  108. job_upload_sqs_ddb(
  109. sqs=sqs,
  110. sqs_queue=sqs_queue,
  111. job_list=job_list
  112. )
  113. max_object = max(job_list, key=itemgetter('Size'))
  114. MaxChunkSize = int(max_object['Size'] / 10000) + 1024
  115. if MaxChunkSize < 5 * 1024 * 1024:
  116. MaxChunkSize = 5 * 1024 * 1024
  117. logger.warning(f'Max object size is {max_object["Size"]}. Require AWS Lambda memory > '
  118. f'MaxChunksize({MaxChunkSize}) x MaxThread(default: 1) x MaxParallelFile(default: 50)')
  119. else:
  120. logger.info('Source list are all in Destination, no job to send.')
  121. else:
  122. logger.error('Job sqs queue is not empty or fail to get_queue_attributes. Stop process.')
  123. # print('Completed and logged to file:', os.path.abspath(log_file_name))