123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203 |
- # PROJECT LONGBOW - LIB FOR TRANSMISSION BETWEEN AMAZON S3
- import datetime
- import logging
- import hashlib
- import concurrent.futures
- import threading
- import base64
- import urllib.request
- import urllib.parse
- import boto3
- from botocore.config import Config
- from botocore.exceptions import ClientError
- # from boto3.dynamodb import conditions
- import json
- import os
- import sys
- import time
- from fnmatch import fnmatchcase
- from pathlib import PurePosixPath, Path
- logger = logging.getLogger()
- Max_md5_retry = 2
- # Configure logging
- def set_log(LoggingLevel, this_file_name):
- logger.setLevel(logging.WARNING)
- if LoggingLevel == 'INFO':
- logger.setLevel(logging.INFO)
- elif LoggingLevel == 'DEBUG':
- logger.setLevel(logging.DEBUG)
- # File logging
- log_path = Path(__file__).parent.parent / 'amazon-s3-migration-log'
- if not Path.exists(log_path):
- Path.mkdir(log_path)
- start_time = datetime.datetime.now().isoformat().replace(':', '-')[:19]
- _log_file_name = str(log_path / f'{this_file_name}-{start_time}.log')
- print('Log file:', _log_file_name)
- fileHandler = logging.FileHandler(filename=_log_file_name)
- fileHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
- logger.addHandler(fileHandler)
- return logger, _log_file_name
- # Set environment
- def set_env(*, JobType, LocalProfileMode, table_queue_name, sqs_queue_name, ssm_parameter_credentials, MaxRetry):
- s3_config = Config(max_pool_connections=200, retries={'max_attempts': MaxRetry}) # boto default 10
- if os.uname()[0] == 'Linux' and not LocalProfileMode: # on EC2, use EC2 role
- logger.info('Get instance-id and running region')
- instance_id = urllib.request.urlopen(urllib.request.Request(
- "http://169.254.169.254/latest/meta-data/instance-id"
- )).read().decode('utf-8')
- region = json.loads(urllib.request.urlopen(urllib.request.Request(
- "http://169.254.169.254/latest/dynamic/instance-identity/document"
- )).read().decode('utf-8'))['region']
- sqs = boto3.client('sqs', region)
- dynamodb = boto3.resource('dynamodb', region)
- ssm = boto3.client('ssm', region)
- # 取另一个Account的credentials
- logger.info(f'Get ssm_parameter_credentials: {ssm_parameter_credentials}')
- try:
- credentials = json.loads(ssm.get_parameter(
- Name=ssm_parameter_credentials,
- WithDecryption=True
- )['Parameter']['Value'])
- except Exception as e:
- logger.error(f'Fail to get {ssm_parameter_credentials} in SSM Parameter store. '
- f'Fix and restart Jobsender. {str(e)}')
- sys.exit(0)
- credentials_session = boto3.session.Session(
- aws_access_key_id=credentials["aws_access_key_id"],
- aws_secret_access_key=credentials["aws_secret_access_key"],
- region_name=credentials["region"]
- )
- if JobType.upper() == "PUT":
- s3_src_client = boto3.client('s3', region, config=s3_config)
- s3_des_client = credentials_session.client('s3', config=s3_config)
- elif JobType.upper() == "GET":
- s3_des_client = boto3.client('s3', region, config=s3_config)
- s3_src_client = credentials_session.client('s3', config=s3_config)
- else:
- logger.error('Wrong JobType setting in config.ini file')
- sys.exit(0)
- # 在没有Role的环境运行,例如本地Mac测试
- else:
- instance_id = "local"
- src_session = boto3.session.Session(profile_name='iad')
- des_session = boto3.session.Session(profile_name='zhy')
- sqs = src_session.client('sqs')
- dynamodb = src_session.resource('dynamodb')
- ssm = src_session.client('ssm')
- s3_src_client = src_session.client('s3', config=s3_config)
- s3_des_client = des_session.client('s3', config=s3_config)
- # 在当前屏幕也输出,便于local debug监控。
- streamHandler = logging.StreamHandler()
- streamHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
- logger.addHandler(streamHandler)
- table = dynamodb.Table(table_queue_name)
- table.wait_until_exists()
- sqs_queue = wait_sqs_available(
- sqs=sqs,
- sqs_queue_name=sqs_queue_name
- )
- return sqs, sqs_queue, table, s3_src_client, s3_des_client, instance_id, ssm
- def wait_sqs_available(*, sqs, sqs_queue_name):
- while True:
- try:
- return sqs.get_queue_url(QueueName=sqs_queue_name)['QueueUrl']
- except Exception as e:
- logger.warning(f'Waiting for SQS availability. {str(e)}')
- time.sleep(10)
- def check_sqs_empty(sqs, sqs_queue):
- try:
- sqs_in_flight = sqs.get_queue_attributes(
- QueueUrl=sqs_queue,
- AttributeNames=['ApproximateNumberOfMessagesNotVisible', 'ApproximateNumberOfMessages']
- )
- except Exception as e:
- logger.error(f'Fail to get_queue_attributes: {str(e)}')
- return False # Can't get sqs status, then consider it is not empty
- NotVisible = sqs_in_flight['Attributes']['ApproximateNumberOfMessagesNotVisible']
- Visible = sqs_in_flight['Attributes']['ApproximateNumberOfMessages']
- logger.info(f'ApproximateNumberOfMessagesNotVisible: {NotVisible}, ApproximateNumberOfMessages: {Visible}')
- if NotVisible == '0' and (Visible == '0' or Visible == '1'):
- # In init state, the new created bucket trigger SQS will send one test message to SQS.
- # So here to ignore the case Visible == '1'
- return True # sqs is empty
- return False # sqs is not empty
- # Get source S3 bucket file list with versionId
- def get_src_file_list(*, s3_client, bucket, S3Prefix, JobsenderCompareVersionId):
- # get s3 file list
- file_list = []
- if JobsenderCompareVersionId:
- paginator = s3_client.get_paginator('list_object_versions')
- else:
- paginator = s3_client.get_paginator('list_objects_v2') # 速度比 list_object_versions 快很多
- try:
- if S3Prefix == '/':
- S3Prefix = ''
- logger.info(f'Get s3 file list from: {bucket}/{S3Prefix}')
- response_iterator = paginator.paginate(
- Bucket=bucket,
- Prefix=S3Prefix
- )
- for page in response_iterator:
- if "Versions" in page: # JobsenderCompareVersionId==True
- logger.info(f'Got list_object_versions {bucket}/{S3Prefix}: {len(page["Versions"])}')
- for n in page["Versions"]:
- # 只拿最新版本的列表
- if n["IsLatest"]:
- file_list.append({
- "Key": n["Key"],
- "Size": n["Size"],
- "versionId": n["VersionId"] if JobsenderCompareVersionId else 'null'
- })
- elif "Contents" in page: # JobsenderCompareVersionId==False
- logger.info(f'Got list_objects_v2 {bucket}/{S3Prefix}: {len(page["Contents"])}')
- for n in page["Contents"]:
- file_list.append({
- "Key": n["Key"],
- "Size": n["Size"],
- "versionId": 'null'
- })
- except Exception as err:
- logger.error(f'Fail to get s3 list versions {bucket}/{S3Prefix}: {str(err)}')
- logger.info(f'Bucket list length(get_src_file_list):{str(len(file_list))}')
- return file_list
- # Get Destination S3 bucket file list with versionId from DDB
- def get_des_file_list(*, s3_client, bucket, S3Prefix, table, JobsenderCompareVersionId):
- if JobsenderCompareVersionId:
- ver_list = get_versionid_from_ddb(
- des_bucket=bucket,
- table=table
- )
- else:
- ver_list = {}
- # For delete prefix in des_prefix
- if S3Prefix == '' or S3Prefix == '/':
- # 目的bucket没有设置 Prefix
- dp_len = 0
- else:
- # 目的bucket的 "prefix/"长度
- dp_len = len(S3Prefix) + 1
- # get s3 file list
- file_list = []
- paginator = s3_client.get_paginator('list_objects_v2')
- try:
- if S3Prefix == '/':
- S3Prefix = ''
- logger.info(f'Get s3 file list from: {bucket}/{S3Prefix}')
- response_iterator = paginator.paginate(
- Bucket=bucket,
- Prefix=S3Prefix
- )
- for page in response_iterator:
- if "Contents" in page:
- logger.info(f'Got list_objects_v2 {bucket}/{S3Prefix}: {len(page["Contents"])}')
- for n in page["Contents"]:
- # 取这个key的versionId,如果是ver_list是空,则会全部都是None
- ver = ver_list.get(n["Key"])
- # 目的桶要去掉prefix
- file_list.append({
- "Key": n["Key"][dp_len:],
- "Size": n["Size"],
- "versionId": ver if (ver is not None and JobsenderCompareVersionId) else 'null'
- })
- except Exception as err:
- logger.error(f'Fail to get s3 list_objects_v2 {bucket}/{S3Prefix}: {str(err)}')
- logger.info(f'Bucket list length(get_des_file_list):{str(len(file_list))}')
- return file_list
- # Get s3 object versionId record in DDB
- def get_versionid_from_ddb(*, des_bucket, table):
- logger.info(f'Get des_bucket versionId list from DDB')
- ver_list = {}
- try:
- r = table.query(
- IndexName='desBucket-index',
- KeyConditionExpression='desBucket=:b',
- ExpressionAttributeValues={":b": des_bucket}
- )
- if 'Items' in r:
- for i in r['Items']:
- ver_list[i['desKey']] = i['versionId']
- logger.info(f'Got versionId list {des_bucket}: {len(ver_list)}')
- except Exception as e:
- logger.error(f'Fail to query DDB for versionId {des_bucket}- {str(e)}')
- return ver_list
- # Jobsender compare source and destination bucket list
- def delta_job_list(*, src_file_list, des_file_list, src_bucket, src_prefix, des_bucket, des_prefix, ignore_list,
- JobsenderCompareVersionId):
- # Delta list,只对比key&size,version不做对比,只用来发Job
- logger.info(f'Compare source s3://{src_bucket}/{src_prefix} and destination s3://{des_bucket}/{des_prefix}')
- start_time = int(time.time())
- job_list = []
- ignore_records = []
- for src in src_file_list:
- # 排除掉 ignore_list 里面列的 bucket/key
- src_bucket_key = src_bucket + '/' + src['Key']
- ignore_match = False
- # 每个 ignore key 匹配一次,匹配上任何一个就跳过这个scr_key
- for ignore_key in ignore_list:
- if fnmatchcase(src_bucket_key, ignore_key):
- ignore_match = True
- break
- # 匹配不上,循环下一个 ignore_key
- if ignore_match:
- ignore_records.append(src_bucket_key)
- continue # 跳过当前 src
- # 比对源文件是否在目标中,会连带versionId一起比较
- if src in des_file_list:
- continue # 在List中,下一个源文件
- # 不在List中,把源文件加入job list
- else:
- Des_key = str(PurePosixPath(des_prefix) / src["Key"])
- if src["Key"][-1] == '/': # 源Key是个目录的情况,需要额外加 /
- Des_key += '/'
- job_list.append(
- {
- "Src_bucket": src_bucket,
- "Src_key": src["Key"], # Src_key已经包含了Prefix
- "Des_bucket": des_bucket,
- "Des_key": Des_key,
- "Size": src["Size"],
- "versionId": src['versionId']
- }
- )
- spent_time = int(time.time()) - start_time
- if JobsenderCompareVersionId:
- logger.info(f'Finish compare key/size/versionId in {spent_time} Seconds (JobsenderCompareVersionId is enable)')
- else:
- logger.info(f'Finish compare key/size in {spent_time} Seconds (JobsenderCompareVersionId is disable)')
- logger.info(f'Delta Job List: {len(job_list)} - Ignore List: {len(ignore_records)}')
- return job_list, ignore_records
- def job_upload_sqs_ddb(*, sqs, sqs_queue, job_list):
- sqs_batch = 0
- sqs_message = []
- logger.info(f'Start uploading jobs to queue: {sqs_queue}')
- # create ddb writer
- # with table.batch_writer() as ddb_batch:
- for job in job_list:
- # construct sqs messages
- sqs_message.append({
- "Id": str(sqs_batch),
- "MessageBody": json.dumps(job),
- })
- sqs_batch += 1
- # write to sqs in batch 10 or is last one
- if sqs_batch == 10 or job == job_list[-1]:
- try:
- sqs.send_message_batch(QueueUrl=sqs_queue, Entries=sqs_message)
- except Exception as e:
- logger.error(f'Fail to send sqs message: {str(sqs_message)}, {str(e)}')
- sqs_batch = 0
- sqs_message = []
- logger.info(f'Complete upload job to queue: {sqs_queue}')
- return
- # Split one file size into list of start byte position list
- def split(Size, ChunkSize):
- partnumber = 1
- indexList = [0]
- if int(Size / ChunkSize) + 1 > 10000:
- ChunkSize = int(Size / 10000) + 1024 # 对于大于10000分片的大文件,自动调整Chunksize
- logger.info(f'Size excess 10000 parts limit. Auto change ChunkSize to {ChunkSize}')
- while ChunkSize * partnumber < Size: # 如果刚好是"=",则无需再分下一part,所以这里不能用"<="
- indexList.append(ChunkSize * partnumber)
- partnumber += 1
- return indexList, ChunkSize
- # Get S3 versionID
- def head_s3_version(*, s3_src_client, Src_bucket, Src_key):
- logger.info(f'Try to update VersionId: {Src_bucket}/{Src_key}')
- try:
- head = s3_src_client.head_object(
- Bucket=Src_bucket,
- Key=Src_key
- )
- versionId = head['VersionId']
- Size = head['ContentLength']
- logger.info(f'Got VersionId: {versionId} - {Src_bucket}/{Src_key} - Size:{Size}')
- except Exception as e:
- logger.error(f'Fail to head s3 - {Src_bucket}/{Src_key}, {str(e)}')
- return "ERR", 0
- return versionId, Size
- # Get unfinished multipart upload id from s3
- def get_uploaded_list(*, s3_client, Des_bucket, Des_key):
- multipart_uploaded_list = []
- paginator = s3_client.get_paginator('list_multipart_uploads')
- try:
- logger.info(f'Getting unfinished upload id list - {Des_bucket}/{Des_key}...')
- response_iterator = paginator.paginate(
- Bucket=Des_bucket,
- Prefix=Des_key
- )
- for page in response_iterator:
- if "Uploads" in page:
- for i in page["Uploads"]:
- if i["Key"] == Des_key or Des_key == '':
- multipart_uploaded_list.append({
- "Key": i["Key"],
- "Initiated": i["Initiated"],
- "UploadId": i["UploadId"]
- })
- logger.info(f'Unfinished upload, Key: {i["Key"]}, Time: {i["Initiated"]}')
- except Exception as e:
- logger.error(f'Fail to list multipart upload - {Des_bucket}/{Des_key} - {str(e)}')
- return multipart_uploaded_list
- # Check file on the list from get_uploaded_list and get created multipart id
- def check_file_exist(*, prefix_and_key, UploadIdList):
- # 查Key是否有未完成的UploadID
- keyIDList = []
- for u in UploadIdList:
- if u["Key"] == prefix_and_key:
- keyIDList.append(u)
- # 如果找不到上传过的Upload,则从头开始传
- if not keyIDList:
- return 'UPLOAD'
- # 对同一个Key(文件)的不同Upload找出时间最晚的值
- UploadID_latest = keyIDList[0]
- for u in keyIDList:
- if u["Initiated"] > UploadID_latest["Initiated"]:
- UploadID_latest = u
- # pick last one upload id with latest Initiated time
- logger.info(f"Pick UploadId Initiated Time: {UploadID_latest['Initiated']}")
- return UploadID_latest["UploadId"]
- # Check uploaded part number list on Des_bucket
- def checkPartnumberList(*, Des_bucket, Des_key, uploadId, s3_des_client):
- partnumberList = []
- logger.info(f'Get partnumber list - {Des_bucket}/{Des_key}')
- paginator = s3_des_client.get_paginator('list_parts')
- try:
- response_iterator = paginator.paginate(
- Bucket=Des_bucket,
- Key=Des_key,
- UploadId=uploadId
- )
- for page in response_iterator:
- if "Parts" in page:
- logger.info(f'Got list_parts: {len(page["Parts"])} - {Des_bucket}/{Des_key}')
- for p in page["Parts"]:
- partnumberList.append(p["PartNumber"])
- except Exception as e:
- logger.error(f'Fail to list parts in checkPartnumberList - {Des_bucket}/{Des_key} - {str(e)}')
- return []
- if partnumberList: # 如果空则表示没有查到已上传的Part
- logger.info(f"Found uploaded partnumber {len(partnumberList)} - {json.dumps(partnumberList)}"
- f" - {Des_bucket}/{Des_key}")
- else:
- logger.info(f'Part number list is empty - {Des_bucket}/{Des_key}')
- return partnumberList
- # Process one job
- def job_processor(*, uploadId, indexList, partnumberList, job, s3_src_client, s3_des_client,
- MaxThread, ChunkSize, MaxRetry, JobTimeout, ifVerifyMD5Twice, GetObjectWithVersionId):
- # 线程生成器,配合thread pool给出每个线程的对应关系,便于设置超时控制
- def thread_gen(woker_thread, pool,
- stop_signal, partnumber, total, md5list, partnumberList, complete_list):
- for partStartIndex in indexList:
- # start to upload part
- if partnumber not in partnumberList:
- dryrun = False # dryrun 是为了沿用现有的流程做出完成列表,方便后面计算 MD5
- else:
- dryrun = True
- th = pool.submit(woker_thread,
- stop_signal=stop_signal,
- partnumber=partnumber,
- partStartIndex=partStartIndex,
- total=total,
- md5list=md5list,
- dryrun=dryrun,
- complete_list=complete_list
- )
- partnumber += 1
- yield th
- # download part from src. s3 and upload to dest. s3
- def woker_thread(*, stop_signal, partnumber, partStartIndex, total, md5list, dryrun, complete_list):
- if stop_signal.is_set():
- return "TIMEOUT"
- Src_bucket = job['Src_bucket']
- Src_key = job['Src_key']
- Des_bucket = job['Des_bucket']
- Des_key = job['Des_key']
- versionId = job['versionId']
- getBody, chunkdata_md5 = b'', b'' # init
- # 下载文件
- if ifVerifyMD5Twice or not dryrun: # 如果 ifVerifyMD5Twice 则无论是否已有上传过都重新下载,作为校验整个文件用
- if GetObjectWithVersionId:
- logger.info(f"--->Downloading {ChunkSize} Bytes {Src_bucket}/{Src_key} - {partnumber}/{total}"
- f" - versionId: {versionId}")
- else:
- logger.info(f"--->Downloading {ChunkSize} Bytes {Src_bucket}/{Src_key} - {partnumber}/{total}")
- retryTime = 0
- # 正常工作情况下出现 stop_signal 需要退出 Thread
- while retryTime <= MaxRetry and not stop_signal.is_set():
- retryTime += 1
- try:
- if GetObjectWithVersionId: # 按VersionId获取Object
- response_get_object = s3_src_client.get_object(
- Bucket=Src_bucket,
- Key=Src_key,
- VersionId=versionId,
- Range="bytes=" + str(partStartIndex) + "-" + str(partStartIndex + ChunkSize - 1)
- )
- else: # 不带VersionId,即获取最新对象
- response_get_object = s3_src_client.get_object(
- Bucket=Src_bucket,
- Key=Src_key,
- Range="bytes=" + str(partStartIndex) + "-" + str(partStartIndex + ChunkSize - 1)
- )
- getBody = response_get_object["Body"].read()
- chunkdata_md5 = hashlib.md5(getBody)
- md5list[partnumber - 1] = chunkdata_md5
- break # 完成下载,不用重试
- except ClientError as err:
- if err.response['Error']['Code'] in ['NoSuchKey', 'AccessDenied']:
- # 没这个ID,文件已经删除,或者无权限访问
- logger.error(f"ClientError: Fail to access {Src_bucket}/{Src_key} - ERR: {str(err)}.")
- stop_signal.set()
- return "QUIT"
- logger.warning(f"ClientError: Fail to download {Src_bucket}/{Src_key} - ERR: {str(err)}. "
- f"Retry part: {partnumber} - Attempts: {retryTime}")
- if retryTime >= MaxRetry: # 超过次数退出
- logger.error(f"ClientError: Quit for Max Download retries: {retryTime} - "
- f"{Src_bucket}/{Src_key}")
- stop_signal.set()
- return "TIMEOUT" # 退出Thread
- else:
- time.sleep(5 * retryTime)
- continue
- # 递增延迟,返回重试
- except Exception as e:
- logger.warning(f"Fail to download {Src_bucket}/{Src_key} - ERR: {str(e)}. "
- f"Retry part: {partnumber} - Attempts: {retryTime}")
- if retryTime >= MaxRetry: # 超过次数退出
- logger.error(f"Quit for Max Download retries: {retryTime} - {Src_bucket}/{Src_key}")
- stop_signal.set()
- return "TIMEOUT" # 退出Thread
- else:
- time.sleep(5 * retryTime)
- continue
- # 上传文件
- if not dryrun: # 这里就不用考虑 ifVerifyMD5Twice 了,
- retryTime = 0
- while retryTime <= MaxRetry and not stop_signal.is_set():
- retryTime += 1
- try:
- logger.info(f'--->Uploading {len(getBody)} Bytes {Des_bucket}/{Des_key} - {partnumber}/{total}')
- s3_des_client.upload_part(
- Body=getBody,
- Bucket=Des_bucket,
- Key=Des_key,
- PartNumber=partnumber,
- UploadId=uploadId,
- ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
- )
- # 请求已经带上md5,如果s3校验是错的就Exception
- break
- except ClientError as err:
- if err.response['Error']['Code'] == 'NoSuchUpload':
- # 没这个ID,则是别人已经完成这个Job了。
- logger.warning(f'ClientError: Fail to upload part - might be duplicated job:'
- f' {Des_bucket}/{Des_key}, {str(err)}')
- stop_signal.set()
- return "QUIT"
- logger.warning(f"ClientError: Fail to upload part - {Des_bucket}/{Des_key} - {str(err)}, "
- f"retry part: {partnumber} Attempts: {retryTime}")
- if retryTime >= MaxRetry:
- logger.error(f"ClientError: Quit for Max Upload retries: {retryTime} - {Des_bucket}/{Des_key}")
- # 改为跳下一个文件
- stop_signal.set()
- return "TIMEOUT"
- else:
- time.sleep(5 * retryTime) # 递增延迟重试
- continue
- except Exception as e:
- logger.warning(f"Fail to upload part - {Des_bucket}/{Des_key} - {str(e)}, "
- f"retry part: {partnumber} Attempts: {retryTime}")
- if retryTime >= MaxRetry:
- logger.error(f"Quit for Max Upload retries: {retryTime} - {Des_bucket}/{Des_key}")
- # 改为跳下一个文件
- stop_signal.set()
- return "TIMEOUT"
- else:
- time.sleep(5 * retryTime)
- continue
- if not stop_signal.is_set():
- complete_list.append(partnumber)
- if not dryrun:
- logger.info(
- f'--->Complete {len(getBody)} Bytes {Src_bucket}/{Src_key}'
- f' - {partnumber}/{total} {len(complete_list) / total:.2%}')
- else:
- return "TIMEOUT"
- return "COMPLETE"
- # woker_thread END
- # job_processor Main
- partnumber = 1 # 当前循环要上传的Partnumber
- total = len(indexList)
- md5list = [hashlib.md5(b'')] * total
- complete_list = []
- # 线程池
- try:
- stop_signal = threading.Event() # 用于JobTimeout终止当前文件的所有线程
- with concurrent.futures.ThreadPoolExecutor(max_workers=MaxThread) as pool:
- # 这里要用迭代器拿到threads对象
- threads = list(thread_gen(woker_thread, pool, stop_signal,
- partnumber, total, md5list, partnumberList, complete_list))
- result = concurrent.futures.wait(threads, timeout=JobTimeout, return_when="ALL_COMPLETED")
- # 异常退出
- if "QUIT" in [t.result() for t in result[0]]: # result[0] 是函数done
- logger.warning(f'QUIT. Canceling {len(result[1])} waiting threads in pool ...')
- stop_signal.set()
- for t in result[1]:
- t.cancel()
- logger.warning(f'QUIT Job: {job["Src_bucket"]}/{job["Src_key"]}')
- return "QUIT"
- # 超时
- if len(result[1]) > 0: # # result[0] 是函数not_done, 即timeout有未完成的
- logger.warning(f'TIMEOUT. Canceling {len(result[1])} waiting threads in pool ...')
- stop_signal.set()
- for t in result[1]:
- t.cancel()
- logger.warning(f'TIMEOUT {JobTimeout}S Job: {job["Src_bucket"]}/{job["Src_key"]}')
- return "TIMEOUT"
- # 线程池End
- logger.info(f'All parts uploaded: {job["Src_bucket"]}/{job["Src_key"]} - Size:{job["Size"]}')
- # 计算所有分片列表的总etag: cal_etag
- digests = b"".join(m.digest() for m in md5list)
- md5full = hashlib.md5(digests)
- cal_etag = '"%s-%s"' % (md5full.hexdigest(), len(md5list))
- except Exception as e:
- logger.error(f'Exception in job_processor: {str(e)}')
- return "ERR"
- return cal_etag
- # Complete multipart upload
- # 通过查询回来的所有Part列表uploadedListParts来构建completeStructJSON
- def completeUpload(*, uploadId, Des_bucket, Des_key, len_indexList, s3_des_client):
- # 查询S3的所有Part列表uploadedListParts构建completeStructJSON
- uploadedListPartsClean = []
- logger.info(f'Get partnumber list - {Des_bucket}/{Des_key}')
- paginator = s3_des_client.get_paginator('list_parts')
- try:
- response_iterator = paginator.paginate(
- Bucket=Des_bucket,
- Key=Des_key,
- UploadId=uploadId
- )
- # 把 ETag 加入到 Part List
- for page in response_iterator:
- if "Parts" in page:
- logger.info(f'Got list_parts: {len(page["Parts"])} - {Des_bucket}/{Des_key}')
- for p in page["Parts"]:
- uploadedListPartsClean.append({
- "ETag": p["ETag"],
- "PartNumber": p["PartNumber"]
- })
- except ClientError as e:
- if e.response['Error']['Code'] == 'NoSuchUpload':
- # 没这个ID,则是别人已经完成这个Job了。
- logger.warning(f'ClientError: Fail to list parts while completeUpload - might be duplicated job:'
- f' {Des_bucket}/{Des_key}, {str(err)}')
- return "QUIT"
- logger.error(f'ClientError: Fail to list parts while completeUpload - {Des_bucket}/{Des_key} - {str(e)}')
- return "ERR"
- except Exception as e:
- logger.error(f'Fail to list parts while completeUpload - {Des_bucket}/{Des_key} - {str(e)}')
- return "ERR"
- if len(uploadedListPartsClean) != len_indexList:
- logger.error(f'Uploaded parts size not match - {Des_bucket}/{Des_key}')
- return "ERR"
- completeStructJSON = {"Parts": uploadedListPartsClean}
- # S3合并multipart upload任务
- try:
- logger.info(f'Try to merge multipart upload {Des_bucket}/{Des_key}')
- response_complete = s3_des_client.complete_multipart_upload(
- Bucket=Des_bucket,
- Key=Des_key,
- UploadId=uploadId,
- MultipartUpload=completeStructJSON
- )
- result = response_complete['ETag']
- logger.info(f'Merged: {Des_bucket}/{Des_key}')
- except Exception as e:
- logger.error(f'Fail to complete multipart upload {Des_bucket}/{Des_key}, {str(e)}')
- return "ERR"
- logger.info(f'Complete merge file {Des_bucket}/{Des_key}')
- return result
- # Continuely get job message to invoke one processor per job
- def job_looper(*, sqs, sqs_queue, table, s3_src_client, s3_des_client, instance_id,
- StorageClass, ChunkSize, MaxRetry, MaxThread, ResumableThreshold,
- JobTimeout, ifVerifyMD5Twice, CleanUnfinishedUpload,
- Des_bucket_default, Des_prefix_default, UpdateVersionId, GetObjectWithVersionId):
- while True:
- # Get Job from sqs
- try:
- logger.info('Get Job from sqs queue...')
- sqs_job_get = sqs.receive_message(QueueUrl=sqs_queue)
- # Empty queue message available
- if 'Messages' not in sqs_job_get: # No message on sqs queue
- logger.info('No message in queue available, wait...')
- time.sleep(60)
- # 拿到 Job message
- else:
- # TODO: 尚未完整处理 SQS 存在多条消息的情况的意外中断处理,建议只用于针对一次取一个SQS消息
- for sqs_job in sqs_job_get["Messages"]:
- job = json.loads(sqs_job["Body"])
- job_receipt = sqs_job["ReceiptHandle"] # 用于后面删除message
- # 判断是S3来的消息,而不是jodsender来的就转换一下
- if 'Records' in job: # S3来的消息带着'Records'
- for One_record in job['Records']:
- if 's3' in One_record:
- Src_bucket = One_record['s3']['bucket']['name']
- Src_key = One_record['s3']['object']['key']
- Src_key = urllib.parse.unquote_plus(Src_key)
- Size = One_record['s3']['object']['size']
- if "versionId" in One_record['s3']['object']:
- versionId = One_record['s3']['object']['versionId']
- else:
- versionId = 'null'
- Des_bucket, Des_prefix = Des_bucket_default, Des_prefix_default
- Des_key = str(PurePosixPath(Des_prefix) / Src_key)
- if Src_key[-1] == '/': # 针对空目录对象
- Des_key += '/'
- job = {
- 'Src_bucket': Src_bucket,
- 'Src_key': Src_key,
- 'Size': Size,
- 'Des_bucket': Des_bucket,
- 'Des_key': Des_key,
- 'versionId': versionId
- }
- if 'Des_bucket' not in job and 'Event' not in job:
- logger.warning(f'Wrong sqs job: {json.dumps(job, default=str)}')
- logger.warning('Try to handle next message')
- time.sleep(1)
- continue
- if 'versionId' not in job:
- job['versionId'] = 'null'
- # 主流程
- if 'Event' not in job:
- if job['Size'] > ResumableThreshold:
- upload_etag_full = step_function(
- job=job,
- table=table,
- s3_src_client=s3_src_client,
- s3_des_client=s3_des_client,
- instance_id=instance_id,
- StorageClass=StorageClass,
- ChunkSize=ChunkSize,
- MaxRetry=MaxRetry,
- MaxThread=MaxThread,
- JobTimeout=JobTimeout,
- ifVerifyMD5Twice=ifVerifyMD5Twice,
- CleanUnfinishedUpload=CleanUnfinishedUpload,
- UpdateVersionId=UpdateVersionId,
- GetObjectWithVersionId=GetObjectWithVersionId
- )
- else:
- upload_etag_full = step_fn_small_file(
- job=job,
- table=table,
- s3_src_client=s3_src_client,
- s3_des_client=s3_des_client,
- instance_id=instance_id,
- StorageClass=StorageClass,
- MaxRetry=MaxRetry,
- UpdateVersionId=UpdateVersionId,
- GetObjectWithVersionId=GetObjectWithVersionId
- )
- else:
- if job['Event'] == 's3:TestEvent':
- logger.info('Skip s3:TestEvent')
- upload_etag_full = "s3:TestEvent"
- else:
- upload_etag_full = "OtherEvent"
- # Del Job on sqs
- logger.info(f'upload_etag_full={upload_etag_full}, job={str(job)}')
- if upload_etag_full != "TIMEOUT":
- # 如果是超时的就不删SQS消息,是正常结束或QUIT就删
- # QUIT 是 NoSuchUpload, NoSuchKey, AccessDenied,可以认为没必要再让下一个worker再试了
- # 直接删除SQS,并且DDB并不会记录结束状态
- try:
- logger.info(f'Try to finsh job message on sqs. {str(job)}')
- sqs.delete_message(
- QueueUrl=sqs_queue,
- ReceiptHandle=job_receipt
- )
- except Exception as e:
- logger.error(f'Fail to delete sqs message: {str(sqs_job)}, {str(e)}')
- except Exception as e:
- logger.error(f'Fail. Wait for 5 seconds. ERR: {str(e)}')
- time.sleep(5)
- # Finish Job, go back to get next job in queue
- # 清理S3上现有未完成的Multipart Upload ID(当前Job)
- def clean_multipart_upload(*, s3_client, multipart_uploaded_list, Des_bucket):
- for clean_i in multipart_uploaded_list:
- try:
- s3_client.abort_multipart_upload(
- Bucket=Des_bucket,
- Key=clean_i["Key"],
- UploadId=clean_i["UploadId"]
- )
- logger.info(f'CLEAN FINISHED - {str(multipart_uploaded_list)}')
- except Exception as e:
- logger.error(f'Fail to clean - {str(multipart_uploaded_list)} - {str(e)}')
- # Steps func for multipart upload for one job
- def step_function(*, job, table, s3_src_client, s3_des_client, instance_id,
- StorageClass, ChunkSize, MaxRetry, MaxThread,
- JobTimeout, ifVerifyMD5Twice, CleanUnfinishedUpload, UpdateVersionId, GetObjectWithVersionId):
- # 正常开始处理
- Src_bucket = job['Src_bucket']
- Src_key = job['Src_key']
- Size = job['Size']
- Des_bucket = job['Des_bucket']
- Des_key = job['Des_key']
- versionId = job['versionId']
- upload_etag_full = ""
- logger.info(f'Start multipart: {Src_bucket}/{Src_key}, Size: {Size}, versionId: {versionId}')
- # Get dest s3 unfinish multipart upload of this file
- multipart_uploaded_list = get_uploaded_list(
- s3_client=s3_des_client,
- Des_bucket=Des_bucket,
- Des_key=Des_key
- )
- # 设置了CleanUnfinishedUpload,就清理S3上现有未完成的Multipart Upload ID(当前Job),不做断点续传
- if multipart_uploaded_list and CleanUnfinishedUpload:
- logger.warning(f'You set CleanUnfinishedUpload. Now clean: {str(multipart_uploaded_list)}')
- clean_multipart_upload(
- s3_client=s3_des_client,
- multipart_uploaded_list=multipart_uploaded_list,
- Des_bucket=Des_bucket
- )
- multipart_uploaded_list = []
- # 开始 Job 步骤
- # 循环重试3次(如果MD5计算的ETag,或VersionID不一致)
- for md5_retry in range(Max_md5_retry + 1):
- # Job 准备
- # 检查文件没Multipart UploadID要新建, 有则 return UploadID
- response_check_upload = check_file_exist(
- prefix_and_key=Des_key,
- UploadIdList=multipart_uploaded_list
- )
- if response_check_upload == 'UPLOAD':
- try:
- logger.info(f'Create multipart upload - {Des_bucket}/{Des_key}')
- response_new_upload = s3_des_client.create_multipart_upload(
- Bucket=Des_bucket,
- Key=Des_key,
- StorageClass=StorageClass
- )
- # If update versionID enabled, update s3 versionID
- # 但可能会出现中断重传的时候,拿到了另一个新version,从而导致文件半老半新,所以需要在最后完成时候校验一次versionId
- if UpdateVersionId:
- versionId, Size = head_s3_version(
- s3_src_client=s3_src_client,
- Src_bucket=Src_bucket,
- Src_key=Src_key
- )
- if versionId == "ERR":
- break
- job['versionId'] = versionId
- job['Size'] = Size
- reponse_uploadId = response_new_upload["UploadId"]
- partnumberList = []
- # Write log to DDB in first round of job
- # ddb_first_round(table, Src_bucket, Src_key, Size, versionId)
- except Exception as e:
- logger.error(f'Fail to create new multipart upload - {Des_bucket}/{Des_key} - {str(e)}')
- upload_etag_full = "ERR"
- break
- else:
- reponse_uploadId = response_check_upload
- logger.info(f'Resume upload id: {Des_bucket}/{Des_key}')
- # 获取已上传partnumberList
- partnumberList = checkPartnumberList(
- Des_bucket=Des_bucket,
- Des_key=Des_key,
- uploadId=reponse_uploadId,
- s3_des_client=s3_des_client
- )
- # Get versionId/Size from DDB,如果传输前或中断后文件被替换,Size可能改变,所以重新取启动时候的versionId和当时的Size
- if UpdateVersionId:
- versionId, Size = ddb_get(
- table=table,
- Src_bucket=Src_bucket,
- Src_key=Src_key
- )
- if versionId == "ERR":
- break
- job['versionId'] = versionId
- job['Size'] = Size
- # 获取文件拆分片索引列表,例如[0, 10, 20]
- indexList, ChunkSize_auto = split(
- Size,
- ChunkSize
- ) # 对于大于10000分片的大文件,自动调整为Chunksize_auto
- # Write log to DDB in first round of job
- percent = int(len(partnumberList) / len(indexList) * 100)
- # ddb_this_round(table, percent, Src_bucket, Src_key, instance_id)
- ddb_start(table=table,
- percent=percent,
- job=job,
- instance_id=instance_id,
- new_upload=(response_check_upload == 'UPLOAD'))
- # Job Thread: uploadPart, 超时或Key不对返回 TIMEOUT/QUIT
- upload_etag_full = job_processor(
- uploadId=reponse_uploadId,
- indexList=indexList,
- partnumberList=partnumberList,
- job=job,
- s3_src_client=s3_src_client,
- s3_des_client=s3_des_client,
- MaxThread=MaxThread,
- ChunkSize=ChunkSize_auto, # 对单个文件使用自动调整的 Chunksize_auto
- MaxRetry=MaxRetry,
- JobTimeout=JobTimeout,
- ifVerifyMD5Twice=ifVerifyMD5Twice,
- GetObjectWithVersionId=GetObjectWithVersionId
- )
- if upload_etag_full == "TIMEOUT" or upload_etag_full == "QUIT":
- logger.warning(f'Quit job upload_etag_full == {upload_etag_full} - {str(job)}')
- break # 退出处理该Job
- elif upload_etag_full == "ERR":
- # 清掉已上传id,以便重新上传
- logger.error(f'upload_etag_full ERR - {str(job)}')
- clean_multipart_upload(
- s3_client=s3_des_client,
- multipart_uploaded_list=multipart_uploaded_list,
- Des_bucket=Des_bucket
- )
- multipart_uploaded_list = []
- if md5_retry >= Max_md5_retry:
- logger.error(f'Quit job upload_etag_full ERR - {upload_etag_full} - {str(job)}')
- return 'ERR'
- continue
- # 循环重试Job
- # 合并S3上的文件
- complete_etag = completeUpload(
- uploadId=reponse_uploadId,
- Des_bucket=Des_bucket,
- Des_key=Des_key,
- len_indexList=len(indexList),
- s3_des_client=s3_des_client
- )
- if complete_etag == "ERR":
- # 清掉已上传id,以便重新上传
- logger.error(f'complete_etag ERR - {str(job)}')
- clean_multipart_upload(
- s3_client=s3_des_client,
- multipart_uploaded_list=multipart_uploaded_list,
- Des_bucket=Des_bucket
- )
- multipart_uploaded_list = []
- if md5_retry >= Max_md5_retry:
- logger.error(f'Quit job complete_etag - {upload_etag_full} - {str(job)}')
- return 'ERR'
- continue
- # 循环重试Job
- # 检查文件MD5
- if ifVerifyMD5Twice and complete_etag != "QUIT": # QUIT是因为重复ID,所以就不检查了
- if complete_etag == upload_etag_full:
- logger.info(f'MD5 ETag Matched - {Des_bucket}/{Des_key} - {complete_etag}')
- break # 结束本文件,下一个sqs job
- else: # ETag 不匹配,删除目的S3的文件,重试
- logger.error(f'MD5 ETag NOT MATCHED {Des_bucket}/{Des_key}( Destination / Origin ): '
- f'{complete_etag} - {upload_etag_full}')
- del_des_s3_object(
- s3_des_client=s3_des_client,
- Des_bucket=Des_bucket,
- Des_key=Des_key
- )
- # 清掉已上传id,以便重新上传
- logger.error(f'complete_etag != upload_etag_full ERR - {str(job)}')
- clean_multipart_upload(
- s3_client=s3_des_client,
- multipart_uploaded_list=multipart_uploaded_list,
- Des_bucket=Des_bucket
- )
- multipart_uploaded_list = []
- if md5_retry >= Max_md5_retry:
- logger.error(f'Quit job ifVerifyMD5Twice - {upload_etag_full} - {str(job)}')
- return 'ERR'
- continue
- # 重新执行Job
- # DynamoDB log: ADD status: DONE/ERR(upload_etag_full)
- ddb_complete(
- upload_etag_full=upload_etag_full,
- table=table,
- Src_bucket=Src_bucket,
- Src_key=Src_key
- )
- # 正常结束 md5_retry 循环
- break
- # END md5_retry 超过次数
- # complete one job
- return upload_etag_full
- # delete des s3 object
- def del_des_s3_object(*, s3_des_client, Des_bucket, Des_key):
- logger.info(f'Delete {Des_bucket}/{Des_key}')
- try:
- s3_des_client.delete_object(
- Bucket=Des_bucket,
- Key=Des_key
- )
- except Exception as e:
- logger.error(f'Fail to delete S3 object - {Des_bucket}/{Des_key} - {str(e)}')
- # Get iteam versionId and Size from DDB
- def ddb_get(*, table, Src_bucket, Src_key):
- logger.info(f'Get versionId and Size from DDB - {Src_bucket}/{Src_key}')
- table_key = str(PurePosixPath(Src_bucket) / Src_key)
- if Src_key[-1] == '/': # 针对空目录对象
- table_key += '/'
- try:
- response = table.get_item(
- Key={"Key": table_key},
- AttributesToGet=["versionId", "Size"]
- )
- versionId = response['Item']['versionId']
- Size = response['Item']['Size']
- logger.info(f'Got VersionId: {versionId} - {Src_bucket}/{Src_key} - Size:{Size}')
- except Exception as e:
- logger.error(f'Fail to Get versionId and Size from DDB - {Src_bucket}/{Src_key} - {str(e)}')
- return 'ERR', 0
- return versionId, Size
- # # Write log to DDB in first round of job
- def ddb_start(*, table, percent, job, instance_id, new_upload):
- Src_bucket = job['Src_bucket']
- Src_key = job['Src_key']
- Size = job['Size']
- Des_bucket = job['Des_bucket']
- Des_key = job['Des_key']
- versionId = job['versionId']
- logger.info(f'Write log to DDB start job - {Src_bucket}/{Src_key}')
- cur_time = time.time()
- table_key = str(PurePosixPath(Src_bucket) / Src_key)
- if Src_key[-1] == '/': # 针对空目录对象
- table_key += '/'
- UpdateExpression = "ADD instanceID :id, tryTimes :t, startTime_f :s_format " \
- "SET lastTimeProgress=:p, Size=:size, desBucket=:b, desKey=:k"
- ExpressionAttributeValues = {
- ":t": 1,
- ":id": {instance_id},
- ":s_format": {time.asctime(time.localtime(cur_time))},
- ":size": Size,
- ":p": percent,
- ":b": Des_bucket,
- ":k": Des_key
- }
- if new_upload:
- logger.info(f'Update DDB <firstTime> - {Src_bucket}/{Src_key}')
- ExpressionAttributeValues[":s"] = int(cur_time)
- ExpressionAttributeValues[":v"] = versionId
- UpdateExpression += ", firstTime =:s, versionId =:v"
- try:
- table.update_item(
- Key={"Key": table_key},
- UpdateExpression=UpdateExpression,
- ExpressionAttributeValues=ExpressionAttributeValues
- )
- except Exception as e:
- # 日志写不了
- logger.error(f'Fail to put log to DDB at start job - {Src_bucket}/{Src_key} - {str(e)}')
- return
- # DynamoDB log: ADD status: DONE/ERR(upload_etag_full)
- def ddb_complete(*, upload_etag_full, table, Src_bucket, Src_key):
- if upload_etag_full not in ["TIMEOUT", "ERR", "QUIT"]:
- status = "DONE"
- else:
- status = upload_etag_full
- cur_time = time.time()
- table_key = str(PurePosixPath(Src_bucket) / Src_key)
- if Src_key[-1] == '/': # 针对空目录对象
- table_key += '/'
- UpdateExpression = "ADD jobStatus :done SET totalSpentTime=:s-firstTime, endTime=:s, endTime_f=:e"
- ExpressionAttributeValues = {
- ":done": {status},
- ":s": int(cur_time),
- ":e": time.asctime(time.localtime(cur_time))
- }
- # 正常写DDB,如果是异常的就不加这个lastTimeProgress=100
- if status == "DONE":
- UpdateExpression += ", lastTimeProgress=:p"
- ExpressionAttributeValues[":p"] = 100
- # update DDB
- logger.info(f'Write job complete status to DDB: {status} - {Src_bucket}/{Src_key}')
- try:
- table.update_item(
- Key={"Key": table_key},
- UpdateExpression=UpdateExpression,
- ExpressionAttributeValues=ExpressionAttributeValues
- )
- except Exception as e:
- logger.error(f'Fail to put log to DDB at end - {Src_bucket}/{Src_key} - {str(e)}')
- return
- def step_fn_small_file(*, job, table, s3_src_client, s3_des_client, instance_id, StorageClass, MaxRetry,
- UpdateVersionId, GetObjectWithVersionId):
- # 开始处理小文件
- Src_bucket = job['Src_bucket']
- Src_key = job['Src_key']
- Size = job['Size']
- Des_bucket = job['Des_bucket']
- Des_key = job['Des_key']
- versionId = job['versionId']
- # If update versionID enabled, update s3 versionID
- if UpdateVersionId:
- versionId = head_s3_version(
- s3_src_client=s3_src_client,
- Src_bucket=Src_bucket,
- Src_key=Src_key
- )
- job['versionId'] = versionId
- logger.info(f'Start small: {Src_bucket}/{Src_key}, Size: {Size}, versionId: {versionId}')
- # Write DDB log for first round
- ddb_start(table=table,
- percent=0,
- job=job,
- instance_id=instance_id,
- new_upload=True)
- upload_etag_full = []
- for retryTime in range(MaxRetry + 1):
- try:
- # Get object
- logger.info(f'--->Downloading {Size} Bytes {Src_bucket}/{Src_key} - Small file 1/1')
- if GetObjectWithVersionId:
- response_get_object = s3_src_client.get_object(
- Bucket=Src_bucket,
- Key=Src_key,
- VersionId=versionId
- )
- else:
- response_get_object = s3_src_client.get_object(
- Bucket=Src_bucket,
- Key=Src_key
- )
- getBody = response_get_object["Body"].read()
- chunkdata_md5 = hashlib.md5(getBody)
- ContentMD5 = base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
- # Put object
- logger.info(f'--->Uploading {Size} Bytes {Des_bucket}/{Des_key} - Small file 1/1')
- response_put_object = s3_des_client.put_object(
- Body=getBody,
- Bucket=Des_bucket,
- Key=Des_key,
- ContentMD5=ContentMD5,
- StorageClass=StorageClass
- )
- # 请求已经带上md5,如果s3校验是错的就Exception
- upload_etag_full = response_put_object['ETag']
- # 结束 Upload/download
- break
- except ClientError as e:
- if e.response['Error']['Code'] in ['NoSuchKey', 'AccessDenied']:
- logger.error(f"Fail to access {Src_bucket}/{Src_key} - ERR: {str(e)}.")
- return "QUIT"
- logger.warning(f'Download/Upload small file Fail: {Src_bucket}/{Src_key}, '
- f'{str(e)}, Attempts: {retryTime}')
- if retryTime >= MaxRetry:
- logger.error(f'Fail MaxRetry Download/Upload small file: {Des_bucket}/{Des_key}')
- return "TIMEOUT"
- else:
- time.sleep(5 * retryTime)
- except Exception as e:
- logger.error(f'Fail in step_fn_small - {Des_bucket}/{Des_key} - {str(e)}')
- return "TIMEOUT"
- # Write DDB log for complete
- ddb_complete(
- upload_etag_full=upload_etag_full,
- table=table,
- Src_bucket=Src_bucket,
- Src_key=Src_key
- )
- # complete one job
- return upload_etag_full
|