123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- # PROJECT LONGBOW - JOBSENDER FOR COMPARE AMAZON S3 AND CREATE DELTA JOB LIST TO SQS
- import json
- import logging
- import os
- import ssl
- import urllib.request
- import urllib.error
- from operator import itemgetter
- from s3_migration_lib import get_des_file_list, get_src_file_list, job_upload_sqs_ddb, delta_job_list, check_sqs_empty
- from botocore.config import Config
- import boto3
- # 环境变量
- table_queue_name = os.environ['table_queue_name']
- StorageClass = os.environ['StorageClass']
- ssm_parameter_credentials = os.environ['ssm_parameter_credentials']
- checkip_url = os.environ['checkip_url']
- sqs_queue_name = os.environ['sqs_queue']
- ssm_parameter_ignore_list = os.environ['ssm_parameter_ignore_list']
- ssm_parameter_bucket = os.environ['ssm_parameter_bucket']
- JobType = os.environ['JobType']
- MaxRetry = int(os.environ['MaxRetry']) # 最大请求重试次数
- JobsenderCompareVersionId = os.environ['JobsenderCompareVersionId'].upper() == 'TRUE'
- # Set environment
- s3_config = Config(max_pool_connections=50, retries={'max_attempts': MaxRetry}) # 最大连接数
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- dynamodb = boto3.resource('dynamodb')
- table = dynamodb.Table(table_queue_name)
- sqs = boto3.client('sqs')
- sqs_queue = sqs.get_queue_url(QueueName=sqs_queue_name)['QueueUrl']
- # Get credentials of the other account
- ssm = boto3.client('ssm')
- logger.info(f'Get ssm_parameter_credentials: {ssm_parameter_credentials}')
- credentials = json.loads(ssm.get_parameter(
- Name=ssm_parameter_credentials,
- WithDecryption=True
- )['Parameter']['Value'])
- credentials_session = boto3.session.Session(
- aws_access_key_id=credentials["aws_access_key_id"],
- aws_secret_access_key=credentials["aws_secret_access_key"],
- region_name=credentials["region"]
- )
- # Get buckets information
- logger.info(f'Get ssm_parameter_bucket: {ssm_parameter_bucket}')
- load_bucket_para = json.loads(ssm.get_parameter(Name=ssm_parameter_bucket)['Parameter']['Value'])
- logger.info(f'Recieved ssm {json.dumps(load_bucket_para)}')
- # Default Jobtype is PUT
- s3_src_client = boto3.client('s3', config=s3_config)
- s3_des_client = credentials_session.client('s3', config=s3_config)
- if JobType.upper() == "GET":
- s3_src_client, s3_des_client = s3_des_client, s3_src_client
- try:
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
- response = urllib.request.urlopen(
- urllib.request.Request(checkip_url), timeout=3, context=context
- ).read()
- instance_id = "lambda-" + response.decode('utf-8')
- except urllib.error.URLError as e:
- logger.warning(f'Fail to connect to checkip api: {checkip_url} - {str(e)}')
- instance_id = 'lambda-ip-timeout'
- # handler
- def lambda_handler(event, context):
- # Get ignore file list
- ignore_list = []
- try:
- logger.info('Try to get ignore list from ssm parameter')
- ignore_list = ssm.get_parameter(Name=ssm_parameter_ignore_list)['Parameter']['Value'].splitlines()
- logger.info(f'Get ignore list: {str(ignore_list)}')
- except Exception as e:
- logger.info(f'No ignore list in ssm parameter - {str(e)}')
- # Check SQS is empty or not
- if check_sqs_empty(sqs, sqs_queue):
- logger.info('Job sqs queue is empty, now process comparing s3 bucket...')
- for bucket_para in load_bucket_para:
- src_bucket = bucket_para['src_bucket']
- src_prefix = bucket_para['src_prefix']
- des_bucket = bucket_para['des_bucket']
- des_prefix = bucket_para['des_prefix']
- # Get List on S3
- logger.info('Get source bucket')
- src_file_list = get_src_file_list(
- s3_client=s3_src_client,
- bucket=src_bucket,
- S3Prefix=src_prefix,
- JobsenderCompareVersionId=JobsenderCompareVersionId
- )
- logger.info('Get destination bucket')
- des_file_list = get_des_file_list(
- s3_client=s3_des_client,
- bucket=des_bucket,
- S3Prefix=des_prefix,
- table=table,
- JobsenderCompareVersionId=JobsenderCompareVersionId
- )
- # Generate job list
- job_list, ignore_records = delta_job_list(
- src_file_list=src_file_list,
- des_file_list=des_file_list,
- src_bucket=src_bucket,
- src_prefix=src_prefix,
- des_bucket=des_bucket,
- des_prefix=des_prefix,
- ignore_list=ignore_list,
- JobsenderCompareVersionId=JobsenderCompareVersionId
- )
- # Upload jobs to sqs
- if len(job_list) != 0:
- job_upload_sqs_ddb(
- sqs=sqs,
- sqs_queue=sqs_queue,
- job_list=job_list
- )
- max_object = max(job_list, key=itemgetter('Size'))
- MaxChunkSize = int(max_object['Size'] / 10000) + 1024
- if MaxChunkSize < 5 * 1024 * 1024:
- MaxChunkSize = 5 * 1024 * 1024
- logger.warning(f'Max object size is {max_object["Size"]}. Require AWS Lambda memory > '
- f'MaxChunksize({MaxChunkSize}) x MaxThread(default: 1) x MaxParallelFile(default: 50)')
- else:
- logger.info('Source list are all in Destination, no job to send.')
- else:
- logger.error('Job sqs queue is not empty or fail to get_queue_attributes. Stop process.')
- # print('Completed and logged to file:', os.path.abspath(log_file_name))
|