lambda_function_worker.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. # PROJECT LONGBOW
  2. # AWS LAMBDA WORKER NODE FOR TRANSMISSION BETWEEN AMAZON S3
  3. import json, os, urllib, ssl, logging
  4. import boto3
  5. from s3_migration_lib import step_function, step_fn_small_file
  6. from botocore.config import Config
  7. from pathlib import PurePosixPath
  8. import urllib.request
  9. import urllib.parse
  10. # 环境变量
  11. table_queue_name = os.environ['table_queue_name']
  12. StorageClass = os.environ['StorageClass']
  13. try:
  14. Des_bucket_default = os.environ['Des_bucket_default']
  15. Des_prefix_default = os.environ['Des_prefix_default']
  16. except Exception as e:
  17. print('No Env Des_bucket_default/Des_prefix_default ', e)
  18. Des_bucket_default, Des_prefix_default = "", ""
  19. ssm_parameter_credentials = os.environ['ssm_parameter_credentials']
  20. checkip_url = os.environ['checkip_url']
  21. # 内部参数
  22. JobType = "PUT"
  23. MaxRetry = 10 # 最大请求重试次数
  24. MaxThread = 50 # 最大线程数
  25. MaxParallelFile = 1 # Lambda 中暂时没用到
  26. JobTimeout = 900
  27. ResumableThreshold = 5 * 1024 * 1024 # Accelerate to ignore small file
  28. CleanUnfinishedUpload = False # For debug
  29. ChunkSize = 5 * 1024 * 1024 # For debug, will be auto-change
  30. ifVerifyMD5Twice = False # For debug
  31. s3_config = Config(max_pool_connections=30) # 最大连接数
  32. # Set environment
  33. logger = logging.getLogger()
  34. logger.setLevel(logging.INFO)
  35. dynamodb = boto3.resource('dynamodb')
  36. table = dynamodb.Table(table_queue_name)
  37. # 取另一个Account的credentials
  38. ssm = boto3.client('ssm')
  39. logger.info(f'Get ssm_parameter_credentials: {ssm_parameter_credentials}')
  40. credentials = json.loads(ssm.get_parameter(
  41. Name=ssm_parameter_credentials,
  42. WithDecryption=True
  43. )['Parameter']['Value'])
  44. credentials_session = boto3.session.Session(
  45. aws_access_key_id=credentials["aws_access_key_id"],
  46. aws_secret_access_key=credentials["aws_secret_access_key"],
  47. region_name=credentials["region"]
  48. )
  49. s3_src_client = boto3.client('s3', config=s3_config)
  50. s3_des_client = credentials_session.client('s3', config=s3_config)
  51. if JobType.upper() == "GET":
  52. s3_src_client, s3_des_client = s3_des_client, s3_src_client
  53. try:
  54. context = ssl._create_unverified_context()
  55. response = urllib.request.urlopen(
  56. urllib.request.Request(checkip_url), timeout=3, context=context
  57. ).read()
  58. instance_id = "lambda-" + response.decode('utf-8')
  59. except Exception as e:
  60. logger.warning(f'Fail to connect to checkip.amazonaws.com')
  61. instance_id = 'lambda-ip-timeout'
  62. class TimeoutOrMaxRetry(Exception):
  63. pass
  64. class WrongRecordFormat(Exception):
  65. pass
  66. def lambda_handler(event, context):
  67. print("Lambda or NAT IP Address:", instance_id)
  68. logger.info(json.dumps(event, default=str))
  69. for trigger_record in event['Records']:
  70. trigger_body = trigger_record['body']
  71. job = json.loads(trigger_body)
  72. logger.info(json.dumps(job, default=str))
  73. # 跳过初次配置时候, S3 自动写SQS的访问测试记录
  74. if 'Event' in job:
  75. if job['Event'] == 's3:TestEvent':
  76. logger.info('Skip s3:TestEvent')
  77. continue
  78. # 判断是S3来的消息,而不是jodsender来的就转换一下
  79. if 'Records' in job: # S3来的消息带着'Records'
  80. for One_record in job['Records']:
  81. if 's3' in One_record:
  82. Src_bucket = One_record['s3']['bucket']['name']
  83. Src_key = One_record['s3']['object']['key']
  84. Src_key = urllib.parse.unquote_plus(Src_key)
  85. Size = One_record['s3']['object']['size']
  86. Des_bucket, Des_prefix = Des_bucket_default, Des_prefix_default
  87. Des_key = str(PurePosixPath(Des_prefix) / Src_key)
  88. if Src_key[-1] == '/': # 针对空目录对象
  89. Des_key += '/'
  90. job = {
  91. 'Src_bucket': Src_bucket,
  92. 'Src_key': Src_key,
  93. 'Size': Size,
  94. 'Des_bucket': Des_bucket,
  95. 'Des_key': Des_key
  96. }
  97. if 'Des_bucket' not in job: # 消息结构不对
  98. logger.warning(f'Wrong sqs job: {json.dumps(job, default=str)}')
  99. logger.warning('Try to handle next message')
  100. raise WrongRecordFormat
  101. # TODO: 如果是一次多条Job进来这里暂时没做并发处理,并且一半失败的问题未处理,所以目前不要处理SQS Batch
  102. if job['Size'] > ResumableThreshold:
  103. upload_etag_full = step_function(job, table, s3_src_client, s3_des_client, instance_id,
  104. StorageClass, ChunkSize, MaxRetry, MaxThread,
  105. JobTimeout, ifVerifyMD5Twice, CleanUnfinishedUpload)
  106. else:
  107. upload_etag_full = step_fn_small_file(job, table, s3_src_client, s3_des_client, instance_id,
  108. StorageClass, MaxRetry)
  109. if upload_etag_full != "TIMEOUT" and upload_etag_full != "ERR":
  110. # 如果是超时或ERR的就不删SQS消息,是正常结束就删
  111. # 大文件会在退出线程时设 MaxRetry 为 TIMEOUT,小文件则会返回 MaxRetry
  112. # 小文件出现该问题可以认为没必要再让下一个worker再试了,不是因为文件下载太大导致,而是权限设置导致
  113. # 直接删除SQS,并且DDB并不会记录结束状态
  114. # 如果希望小文件也继续让SQS消息恢复,并让下一个worker再试,则在上面判断加upload_etag_full != "MaxRetry"
  115. continue
  116. else:
  117. raise TimeoutOrMaxRetry
  118. return {
  119. 'statusCode': 200,
  120. 'body': 'Jobs completed'
  121. }