123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- # PROJECT LONGBOW
- # AWS LAMBDA WORKER NODE FOR TRANSMISSION BETWEEN AMAZON S3
- import json
- import logging
- import os
- import ssl
- import urllib
- import urllib.error
- import urllib.parse
- import urllib.request
- from pathlib import PurePosixPath
- import boto3
- from botocore.config import Config
- from s3_migration_lib import step_function, step_fn_small_file
- # 环境变量
- table_queue_name = os.environ['table_queue_name']
- StorageClass = os.environ['StorageClass']
- try:
- Des_bucket_default = os.environ['Des_bucket_default']
- Des_prefix_default = os.environ['Des_prefix_default']
- except Exception as e:
- print('No Env Des_bucket_default/Des_prefix_default ', e)
- Des_bucket_default, Des_prefix_default = "", ""
- ssm_parameter_credentials = os.environ['ssm_parameter_credentials']
- checkip_url = os.environ['checkip_url']
- JobType = os.environ['JobType']
- MaxRetry = int(os.environ['MaxRetry']) # 最大请求重试次数
- MaxThread = int(os.environ['MaxThread']) # 最大线程数
- MaxParallelFile = int(os.environ['MaxParallelFile']) # Lambda 中暂时没用到
- JobTimeout = int(os.environ['JobTimeout'])
- UpdateVersionId = os.environ['UpdateVersionId'].upper() == 'TRUE' # get lastest version id from s3 before get object
- GetObjectWithVersionId = os.environ['GetObjectWithVersionId'].upper() == 'TRUE' # get object with version id
- # 内部参数
- ResumableThreshold = 5 * 1024 * 1024 # Accelerate to ignore small file
- CleanUnfinishedUpload = False # For debug
- ChunkSize = 5 * 1024 * 1024 # For debug, will be auto-change
- ifVerifyMD5Twice = False # For debug
- s3_config = Config(max_pool_connections=200,
- retries={'max_attempts': MaxRetry}) # 最大连接数
- # Set environment
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- dynamodb = boto3.resource('dynamodb')
- table = dynamodb.Table(table_queue_name)
- # 取另一个Account的credentials
- ssm = boto3.client('ssm')
- logger.info(f'Get ssm_parameter_credentials: {ssm_parameter_credentials}')
- credentials = json.loads(ssm.get_parameter(
- Name=ssm_parameter_credentials,
- WithDecryption=True
- )['Parameter']['Value'])
- credentials_session = boto3.session.Session(
- aws_access_key_id=credentials["aws_access_key_id"],
- aws_secret_access_key=credentials["aws_secret_access_key"],
- region_name=credentials["region"]
- )
- s3_src_client = boto3.client('s3', config=s3_config)
- s3_des_client = credentials_session.client('s3', config=s3_config)
- if JobType.upper() == "GET":
- s3_src_client, s3_des_client = s3_des_client, s3_src_client
- try:
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
- response = urllib.request.urlopen(
- urllib.request.Request(checkip_url), timeout=3, context=context
- ).read()
- instance_id = "lambda-" + response.decode('utf-8')
- except urllib.error.URLError as e:
- logger.warning(f'Fail to connect to checkip api: {checkip_url} - {str(e)}')
- instance_id = 'lambda-ip-timeout'
- class TimeoutOrMaxRetry(Exception):
- pass
- class WrongRecordFormat(Exception):
- pass
- def lambda_handler(event, context):
- print("Lambda or NAT IP Address:", instance_id)
- logger.info(json.dumps(event, default=str))
- for trigger_record in event['Records']:
- trigger_body = trigger_record['body']
- job = json.loads(trigger_body)
- logger.info(json.dumps(job, default=str))
- # 跳过初次配置时候, S3 自动写SQS的访问测试记录
- if 'Event' in job:
- if job['Event'] == 's3:TestEvent':
- logger.info('Skip s3:TestEvent')
- continue
- # 判断是S3来的消息,而不是jodsender来的就转换一下
- if 'Records' in job: # S3来的消息带着'Records'
- for One_record in job['Records']:
- if 's3' in One_record:
- Src_bucket = One_record['s3']['bucket']['name']
- Src_key = One_record['s3']['object']['key']
- Src_key = urllib.parse.unquote_plus(Src_key) # 加号转回空格
- Size = One_record['s3']['object']['size']
- if "versionId" in One_record['s3']['object']:
- versionId = One_record['s3']['object']['versionId']
- else:
- versionId = 'null'
- Des_bucket, Des_prefix = Des_bucket_default, Des_prefix_default
- Des_key = str(PurePosixPath(Des_prefix) / Src_key)
- if Src_key[-1] == '/': # 针对空目录对象
- Des_key += '/'
- job = {
- 'Src_bucket': Src_bucket,
- 'Src_key': Src_key,
- 'Size': Size,
- 'Des_bucket': Des_bucket,
- 'Des_key': Des_key,
- 'versionId': versionId
- }
- if 'Des_bucket' not in job: # 消息结构不对
- logger.warning(f'Wrong sqs job: {json.dumps(job, default=str)}')
- logger.warning('Try to handle next message')
- raise WrongRecordFormat
- if 'versionId' not in job:
- job['versionId'] = 'null'
- # TODO: 如果是一次多条Job并且出现一半失败的问题未处理,所以目前只设置SQS Batch=1
- if job['Size'] > ResumableThreshold:
- upload_etag_full = step_function(
- job=job,
- table=table,
- s3_src_client=s3_src_client,
- s3_des_client=s3_des_client,
- instance_id=instance_id,
- StorageClass=StorageClass,
- ChunkSize=ChunkSize,
- MaxRetry=MaxRetry,
- MaxThread=MaxThread,
- JobTimeout=JobTimeout,
- ifVerifyMD5Twice=ifVerifyMD5Twice,
- CleanUnfinishedUpload=CleanUnfinishedUpload,
- UpdateVersionId=UpdateVersionId,
- GetObjectWithVersionId=GetObjectWithVersionId
- )
- else:
- upload_etag_full = step_fn_small_file(
- job=job,
- table=table,
- s3_src_client=s3_src_client,
- s3_des_client=s3_des_client,
- instance_id=instance_id,
- StorageClass=StorageClass,
- MaxRetry=MaxRetry,
- UpdateVersionId=UpdateVersionId,
- GetObjectWithVersionId=GetObjectWithVersionId
- )
- if upload_etag_full != "TIMEOUT" and upload_etag_full != "ERR":
- # 如果是超时或ERR的就不删SQS消息,是正常结束就删
- # 大文件会在退出线程时设 MaxRetry 为 TIMEOUT,小文件则会返回 MaxRetry
- # 小文件出现该问题可以认为没必要再让下一个worker再试了,不是因为文件下载太大导致,而是权限设置导致
- # 直接删除SQS,并且DDB并不会记录结束状态
- # 如果希望小文件也继续让SQS消息恢复,并让下一个worker再试,则在上面判断加upload_etag_full != "MaxRetry"
- continue
- else:
- raise TimeoutOrMaxRetry
- return {
- 'statusCode': 200,
- 'body': 'Jobs completed'
- }
|