123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- # PROJECT LONGBOW
- # AWS LAMBDA WORKER NODE FOR TRANSMISSION BETWEEN AMAZON S3
- import json, os, urllib, ssl, logging
- import boto3
- from s3_migration_lib import step_function, step_fn_small_file
- from botocore.config import Config
- from pathlib import PurePosixPath
- import urllib.request
- import urllib.parse
- # 环境变量
- table_queue_name = os.environ['table_queue_name']
- StorageClass = os.environ['StorageClass']
- try:
- Des_bucket_default = os.environ['Des_bucket_default']
- Des_prefix_default = os.environ['Des_prefix_default']
- except Exception as e:
- print('No Env Des_bucket_default/Des_prefix_default ', e)
- Des_bucket_default, Des_prefix_default = "", ""
- ssm_parameter_credentials = os.environ['ssm_parameter_credentials']
- checkip_url = os.environ['checkip_url']
- # 内部参数
- JobType = "PUT"
- MaxRetry = 10 # 最大请求重试次数
- MaxThread = 50 # 最大线程数
- MaxParallelFile = 1 # Lambda 中暂时没用到
- JobTimeout = 900
- ResumableThreshold = 5 * 1024 * 1024 # Accelerate to ignore small file
- CleanUnfinishedUpload = False # For debug
- ChunkSize = 5 * 1024 * 1024 # For debug, will be auto-change
- ifVerifyMD5Twice = False # For debug
- s3_config = Config(max_pool_connections=30) # 最大连接数
- # Set environment
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- dynamodb = boto3.resource('dynamodb')
- table = dynamodb.Table(table_queue_name)
- # 取另一个Account的credentials
- ssm = boto3.client('ssm')
- logger.info(f'Get ssm_parameter_credentials: {ssm_parameter_credentials}')
- credentials = json.loads(ssm.get_parameter(
- Name=ssm_parameter_credentials,
- WithDecryption=True
- )['Parameter']['Value'])
- credentials_session = boto3.session.Session(
- aws_access_key_id=credentials["aws_access_key_id"],
- aws_secret_access_key=credentials["aws_secret_access_key"],
- region_name=credentials["region"]
- )
- s3_src_client = boto3.client('s3', config=s3_config)
- s3_des_client = credentials_session.client('s3', config=s3_config)
- if JobType.upper() == "GET":
- s3_src_client, s3_des_client = s3_des_client, s3_src_client
- try:
- context = ssl._create_unverified_context()
- response = urllib.request.urlopen(
- urllib.request.Request(checkip_url), timeout=3, context=context
- ).read()
- instance_id = "lambda-" + response.decode('utf-8')
- except Exception as e:
- logger.warning(f'Fail to connect to checkip.amazonaws.com')
- instance_id = 'lambda-ip-timeout'
- class TimeoutOrMaxRetry(Exception):
- pass
- class WrongRecordFormat(Exception):
- pass
- def lambda_handler(event, context):
- print("Lambda or NAT IP Address:", instance_id)
- logger.info(json.dumps(event, default=str))
- for trigger_record in event['Records']:
- trigger_body = trigger_record['body']
- job = json.loads(trigger_body)
- logger.info(json.dumps(job, default=str))
- # 跳过初次配置时候, S3 自动写SQS的访问测试记录
- if 'Event' in job:
- if job['Event'] == 's3:TestEvent':
- logger.info('Skip s3:TestEvent')
- continue
- # 判断是S3来的消息,而不是jodsender来的就转换一下
- if 'Records' in job: # S3来的消息带着'Records'
- for One_record in job['Records']:
- if 's3' in One_record:
- Src_bucket = One_record['s3']['bucket']['name']
- Src_key = One_record['s3']['object']['key']
- Src_key = urllib.parse.unquote_plus(Src_key)
- Size = One_record['s3']['object']['size']
- Des_bucket, Des_prefix = Des_bucket_default, Des_prefix_default
- Des_key = str(PurePosixPath(Des_prefix) / Src_key)
- if Src_key[-1] == '/': # 针对空目录对象
- Des_key += '/'
- job = {
- 'Src_bucket': Src_bucket,
- 'Src_key': Src_key,
- 'Size': Size,
- 'Des_bucket': Des_bucket,
- 'Des_key': Des_key
- }
- if 'Des_bucket' not in job: # 消息结构不对
- logger.warning(f'Wrong sqs job: {json.dumps(job, default=str)}')
- logger.warning('Try to handle next message')
- raise WrongRecordFormat
- # TODO: 如果是一次多条Job进来这里暂时没做并发处理,并且一半失败的问题未处理,所以目前不要处理SQS Batch
- if job['Size'] > ResumableThreshold:
- upload_etag_full = step_function(job, table, s3_src_client, s3_des_client, instance_id,
- StorageClass, ChunkSize, MaxRetry, MaxThread,
- JobTimeout, ifVerifyMD5Twice, CleanUnfinishedUpload)
- else:
- upload_etag_full = step_fn_small_file(job, table, s3_src_client, s3_des_client, instance_id,
- StorageClass, MaxRetry)
- if upload_etag_full != "TIMEOUT" and upload_etag_full != "ERR":
- # 如果是超时或ERR的就不删SQS消息,是正常结束就删
- # 大文件会在退出线程时设 MaxRetry 为 TIMEOUT,小文件则会返回 MaxRetry
- # 小文件出现该问题可以认为没必要再让下一个worker再试了,不是因为文件下载太大导致,而是权限设置导致
- # 直接删除SQS,并且DDB并不会记录结束状态
- # 如果希望小文件也继续让SQS消息恢复,并让下一个worker再试,则在上面判断加upload_etag_full != "MaxRetry"
- continue
- else:
- raise TimeoutOrMaxRetry
- return {
- 'statusCode': 200,
- 'body': 'Jobs completed'
- }
|