lambda_function_worker.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # PROJECT LONGBOW
  2. # AWS LAMBDA WORKER NODE FOR TRANSMISSION BETWEEN AMAZON S3
  3. import json
  4. import logging
  5. import os
  6. import ssl
  7. import urllib
  8. import urllib.error
  9. import urllib.parse
  10. import urllib.request
  11. from pathlib import PurePosixPath
  12. import boto3
  13. from botocore.config import Config
  14. from s3_migration_lib import step_function, step_fn_small_file
  15. # 环境变量
  16. table_queue_name = os.environ['table_queue_name']
  17. StorageClass = os.environ['StorageClass']
  18. try:
  19. Des_bucket_default = os.environ['Des_bucket_default']
  20. Des_prefix_default = os.environ['Des_prefix_default']
  21. except Exception as e:
  22. print('No Env Des_bucket_default/Des_prefix_default ', e)
  23. Des_bucket_default, Des_prefix_default = "", ""
  24. ssm_parameter_credentials = os.environ['ssm_parameter_credentials']
  25. checkip_url = os.environ['checkip_url']
  26. JobType = os.environ['JobType']
  27. MaxRetry = int(os.environ['MaxRetry']) # 最大请求重试次数
  28. MaxThread = int(os.environ['MaxThread']) # 最大线程数
  29. MaxParallelFile = int(os.environ['MaxParallelFile']) # Lambda 中暂时没用到
  30. JobTimeout = int(os.environ['JobTimeout'])
  31. UpdateVersionId = os.environ['UpdateVersionId'].upper() == 'TRUE' # get lastest version id from s3 before get object
  32. GetObjectWithVersionId = os.environ['GetObjectWithVersionId'].upper() == 'TRUE' # get object with version id
  33. # 内部参数
  34. ResumableThreshold = 5 * 1024 * 1024 # Accelerate to ignore small file
  35. CleanUnfinishedUpload = False # For debug
  36. ChunkSize = 5 * 1024 * 1024 # For debug, will be auto-change
  37. ifVerifyMD5Twice = False # For debug
  38. s3_config = Config(max_pool_connections=200,
  39. retries={'max_attempts': MaxRetry}) # 最大连接数
  40. # Set environment
  41. logger = logging.getLogger()
  42. logger.setLevel(logging.INFO)
  43. dynamodb = boto3.resource('dynamodb')
  44. table = dynamodb.Table(table_queue_name)
  45. # 取另一个Account的credentials
  46. ssm = boto3.client('ssm')
  47. logger.info(f'Get ssm_parameter_credentials: {ssm_parameter_credentials}')
  48. credentials = json.loads(ssm.get_parameter(
  49. Name=ssm_parameter_credentials,
  50. WithDecryption=True
  51. )['Parameter']['Value'])
  52. credentials_session = boto3.session.Session(
  53. aws_access_key_id=credentials["aws_access_key_id"],
  54. aws_secret_access_key=credentials["aws_secret_access_key"],
  55. region_name=credentials["region"]
  56. )
  57. s3_src_client = boto3.client('s3', config=s3_config)
  58. s3_des_client = credentials_session.client('s3', config=s3_config)
  59. if JobType.upper() == "GET":
  60. s3_src_client, s3_des_client = s3_des_client, s3_src_client
  61. try:
  62. context = ssl.SSLContext(ssl.PROTOCOL_TLS)
  63. response = urllib.request.urlopen(
  64. urllib.request.Request(checkip_url), timeout=3, context=context
  65. ).read()
  66. instance_id = "lambda-" + response.decode('utf-8')
  67. except urllib.error.URLError as e:
  68. logger.warning(f'Fail to connect to checkip api: {checkip_url} - {str(e)}')
  69. instance_id = 'lambda-ip-timeout'
  70. class TimeoutOrMaxRetry(Exception):
  71. pass
  72. class WrongRecordFormat(Exception):
  73. pass
  74. def lambda_handler(event, context):
  75. print("Lambda or NAT IP Address:", instance_id)
  76. logger.info(json.dumps(event, default=str))
  77. for trigger_record in event['Records']:
  78. trigger_body = trigger_record['body']
  79. job = json.loads(trigger_body)
  80. logger.info(json.dumps(job, default=str))
  81. # 跳过初次配置时候, S3 自动写SQS的访问测试记录
  82. if 'Event' in job:
  83. if job['Event'] == 's3:TestEvent':
  84. logger.info('Skip s3:TestEvent')
  85. continue
  86. # 判断是S3来的消息,而不是jodsender来的就转换一下
  87. if 'Records' in job: # S3来的消息带着'Records'
  88. for One_record in job['Records']:
  89. if 's3' in One_record:
  90. Src_bucket = One_record['s3']['bucket']['name']
  91. Src_key = One_record['s3']['object']['key']
  92. Src_key = urllib.parse.unquote_plus(Src_key) # 加号转回空格
  93. Size = One_record['s3']['object']['size']
  94. if "versionId" in One_record['s3']['object']:
  95. versionId = One_record['s3']['object']['versionId']
  96. else:
  97. versionId = 'null'
  98. Des_bucket, Des_prefix = Des_bucket_default, Des_prefix_default
  99. Des_key = str(PurePosixPath(Des_prefix) / Src_key)
  100. if Src_key[-1] == '/': # 针对空目录对象
  101. Des_key += '/'
  102. job = {
  103. 'Src_bucket': Src_bucket,
  104. 'Src_key': Src_key,
  105. 'Size': Size,
  106. 'Des_bucket': Des_bucket,
  107. 'Des_key': Des_key,
  108. 'versionId': versionId
  109. }
  110. if 'Des_bucket' not in job: # 消息结构不对
  111. logger.warning(f'Wrong sqs job: {json.dumps(job, default=str)}')
  112. logger.warning('Try to handle next message')
  113. raise WrongRecordFormat
  114. if 'versionId' not in job:
  115. job['versionId'] = 'null'
  116. # TODO: 如果是一次多条Job并且出现一半失败的问题未处理,所以目前只设置SQS Batch=1
  117. if job['Size'] > ResumableThreshold:
  118. upload_etag_full = step_function(
  119. job=job,
  120. table=table,
  121. s3_src_client=s3_src_client,
  122. s3_des_client=s3_des_client,
  123. instance_id=instance_id,
  124. StorageClass=StorageClass,
  125. ChunkSize=ChunkSize,
  126. MaxRetry=MaxRetry,
  127. MaxThread=MaxThread,
  128. JobTimeout=JobTimeout,
  129. ifVerifyMD5Twice=ifVerifyMD5Twice,
  130. CleanUnfinishedUpload=CleanUnfinishedUpload,
  131. UpdateVersionId=UpdateVersionId,
  132. GetObjectWithVersionId=GetObjectWithVersionId
  133. )
  134. else:
  135. upload_etag_full = step_fn_small_file(
  136. job=job,
  137. table=table,
  138. s3_src_client=s3_src_client,
  139. s3_des_client=s3_des_client,
  140. instance_id=instance_id,
  141. StorageClass=StorageClass,
  142. MaxRetry=MaxRetry,
  143. UpdateVersionId=UpdateVersionId,
  144. GetObjectWithVersionId=GetObjectWithVersionId
  145. )
  146. if upload_etag_full != "TIMEOUT" and upload_etag_full != "ERR":
  147. # 如果是超时或ERR的就不删SQS消息,是正常结束就删
  148. # 大文件会在退出线程时设 MaxRetry 为 TIMEOUT,小文件则会返回 MaxRetry
  149. # 小文件出现该问题可以认为没必要再让下一个worker再试了,不是因为文件下载太大导致,而是权限设置导致
  150. # 直接删除SQS,并且DDB并不会记录结束状态
  151. # 如果希望小文件也继续让SQS消息恢复,并让下一个worker再试,则在上面判断加upload_etag_full != "MaxRetry"
  152. continue
  153. else:
  154. raise TimeoutOrMaxRetry
  155. return {
  156. 'statusCode': 200,
  157. 'body': 'Jobs completed'
  158. }