s3_migration_lib.py 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064
  1. # PROJECT LONGBOW - LIB FOR TRANSMISSION BETWEEN AMAZON S3
  2. import datetime
  3. import logging
  4. import hashlib
  5. import concurrent.futures
  6. import threading
  7. import base64
  8. import urllib.request
  9. import urllib.parse
  10. import boto3
  11. from botocore.config import Config
  12. from botocore.exceptions import ClientError
  13. import json
  14. import os
  15. import sys
  16. import time
  17. from fnmatch import fnmatchcase
  18. from pathlib import PurePosixPath
  19. logger = logging.getLogger()
  20. # Configure logging
  21. def set_log(LoggingLevel, this_file_name):
  22. logger.setLevel(logging.WARNING)
  23. if LoggingLevel == 'INFO':
  24. logger.setLevel(logging.INFO)
  25. elif LoggingLevel == 'DEBUG':
  26. logger.setLevel(logging.DEBUG)
  27. # File logging
  28. file_path = os.path.split(os.path.abspath(__file__))[0]
  29. log_path = file_path + '/s3_migration_log'
  30. if not os.path.exists(log_path):
  31. os.system(f"mkdir {log_path}")
  32. start_time = datetime.datetime.now().isoformat().replace(':', '-')[:19]
  33. _log_file_name = f'{log_path}/{this_file_name}-{start_time}.log'
  34. print('Log file:', _log_file_name)
  35. fileHandler = logging.FileHandler(filename=_log_file_name)
  36. fileHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
  37. logger.addHandler(fileHandler)
  38. return logger, _log_file_name
  39. # Set environment
  40. def set_env(JobType, LocalProfileMode, table_queue_name, sqs_queue_name, ssm_parameter_credentials):
  41. s3_config = Config(max_pool_connections=200) # boto default 10
  42. if os.uname()[0] == 'Linux' and not LocalProfileMode: # on EC2, use EC2 role
  43. logger.info('Get instance-id and running region')
  44. instance_id = urllib.request.urlopen(urllib.request.Request(
  45. "http://169.254.169.254/latest/meta-data/instance-id"
  46. )).read().decode('utf-8')
  47. region = json.loads(urllib.request.urlopen(urllib.request.Request(
  48. "http://169.254.169.254/latest/dynamic/instance-identity/document"
  49. )).read().decode('utf-8'))['region']
  50. sqs = boto3.client('sqs', region)
  51. dynamodb = boto3.resource('dynamodb', region)
  52. ssm = boto3.client('ssm', region)
  53. # 取另一个Account的credentials
  54. logger.info(f'Get ssm_parameter_credentials: {ssm_parameter_credentials}')
  55. try:
  56. credentials = json.loads(ssm.get_parameter(
  57. Name=ssm_parameter_credentials,
  58. WithDecryption=True
  59. )['Parameter']['Value'])
  60. except Exception as e:
  61. logger.error(f'Fail to get {ssm_parameter_credentials} in SSM Parameter store. '
  62. f'Fix and restart Jobsender. {str(e)}')
  63. sys.exit(0)
  64. credentials_session = boto3.session.Session(
  65. aws_access_key_id=credentials["aws_access_key_id"],
  66. aws_secret_access_key=credentials["aws_secret_access_key"],
  67. region_name=credentials["region"]
  68. )
  69. if JobType.upper() == "PUT":
  70. s3_src_client = boto3.client('s3', region, config=s3_config)
  71. s3_des_client = credentials_session.client('s3', config=s3_config)
  72. elif JobType.upper() == "GET":
  73. s3_des_client = boto3.client('s3', region, config=s3_config)
  74. s3_src_client = credentials_session.client('s3', config=s3_config)
  75. else:
  76. logger.error('Wrong JobType setting in config.ini file')
  77. sys.exit(0)
  78. # 在没有Role的环境运行,例如本地Mac测试
  79. else:
  80. instance_id = "local"
  81. src_session = boto3.session.Session(profile_name='iad')
  82. des_session = boto3.session.Session(profile_name='zhy')
  83. sqs = src_session.client('sqs')
  84. dynamodb = src_session.resource('dynamodb')
  85. ssm = src_session.client('ssm')
  86. s3_src_client = src_session.client('s3', config=s3_config)
  87. s3_des_client = des_session.client('s3', config=s3_config)
  88. # 下在当前屏幕也输出,便于local debug监控。
  89. streamHandler = logging.StreamHandler()
  90. streamHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
  91. logger.addHandler(streamHandler)
  92. table = dynamodb.Table(table_queue_name)
  93. table.wait_until_exists()
  94. sqs_queue = wait_sqs_available(sqs, sqs_queue_name)
  95. return sqs, sqs_queue, table, s3_src_client, s3_des_client, instance_id, ssm
  96. def wait_sqs_available(sqs, sqs_queue_name):
  97. while True:
  98. try:
  99. return sqs.get_queue_url(QueueName=sqs_queue_name)['QueueUrl']
  100. except Exception as e:
  101. logger.warning(f'Waiting for SQS availability. {str(e)}')
  102. time.sleep(10)
  103. def check_sqs_empty(sqs, sqs_queue):
  104. try:
  105. sqs_in_flight = sqs.get_queue_attributes(
  106. QueueUrl=sqs_queue,
  107. AttributeNames=['ApproximateNumberOfMessagesNotVisible', 'ApproximateNumberOfMessages']
  108. )
  109. except Exception as e:
  110. logger.error(f'Fail to get_queue_attributes: {str(e)}')
  111. return False # Can't get sqs status, then consider it is not empty
  112. NotVisible = sqs_in_flight['Attributes']['ApproximateNumberOfMessagesNotVisible']
  113. Visible = sqs_in_flight['Attributes']['ApproximateNumberOfMessages']
  114. logger.info(f'ApproximateNumberOfMessagesNotVisible: {NotVisible}, ApproximateNumberOfMessages: {Visible}')
  115. if NotVisible == '0' and (Visible == '0' or Visible == '1'):
  116. # In init state, the new created bucket trigger SQS will send one test message to SQS.
  117. # So here to ignore the case Visible == '1'
  118. return True # sqs is empty
  119. return False # sqs is not empty
  120. def get_s3_file_list(s3_client, bucket, S3Prefix, del_prefix=False):
  121. # For delete prefix in des_prefix
  122. if S3Prefix == '' or S3Prefix == '/':
  123. # 目的bucket没有设置 Prefix
  124. dp_len = 0
  125. else:
  126. # 目的bucket的 "prefix/"长度
  127. dp_len = len(S3Prefix) + 1
  128. # get s3 file list with loop retry every 5 sec
  129. des_file_list = []
  130. paginator = s3_client.get_paginator('list_objects_v2')
  131. for retry in range(5):
  132. try:
  133. if S3Prefix == '/':
  134. S3Prefix = ''
  135. logger.info(f'Get s3 file list from: {bucket}/{S3Prefix}')
  136. response_iterator = paginator.paginate(
  137. Bucket=bucket,
  138. Prefix=S3Prefix
  139. )
  140. for page in response_iterator:
  141. if "Contents" in page:
  142. for n in page["Contents"]:
  143. key = n["Key"]
  144. # For delete prefix in des_prefix
  145. if del_prefix:
  146. key = key[dp_len:]
  147. des_file_list.append({
  148. "Key": key,
  149. "Size": n["Size"]
  150. })
  151. break
  152. except Exception as err:
  153. logger.warning(f'Fail to get s3 list objests: {str(err)}')
  154. time.sleep(5)
  155. logger.info(f'Bucket list length:{str(len(des_file_list))}')
  156. return des_file_list
  157. def job_upload_sqs_ddb(sqs, sqs_queue, table, job_list, MaxRetry=30):
  158. sqs_batch = 0
  159. sqs_message = []
  160. logger.info(f'Start uploading jobs to queue: {sqs_queue}')
  161. # create ddb writer, 这里写table是为了一次性发数十万的job到sqs时在table有记录可以核对
  162. with table.batch_writer() as ddb_batch:
  163. for job in job_list:
  164. # write to ddb, auto batch
  165. for retry in range(MaxRetry + 1):
  166. try:
  167. ddb_key = str(PurePosixPath(job["Src_bucket"]) / job["Src_key"])
  168. if job["Src_key"][-1] == '/':
  169. ddb_key += '/'
  170. ddb_batch.put_item(Item={
  171. "Key": ddb_key,
  172. # "Src_bucket": job["Src_bucket"],
  173. # "Des_bucket": job["Des_bucket"],
  174. # "Des_key": job["Des_key"],
  175. "Size": job["Size"]
  176. })
  177. break
  178. except Exception as e:
  179. logger.warning(f'Fail to writing to DDB: {ddb_key}, {str(e)}')
  180. if retry >= MaxRetry:
  181. logger.error(f'Fail writing to DDB: {ddb_key}')
  182. else:
  183. time.sleep(5 * retry)
  184. # construct sqs messages
  185. sqs_message.append({
  186. "Id": str(sqs_batch),
  187. "MessageBody": json.dumps(job),
  188. })
  189. sqs_batch += 1
  190. # write to sqs in batch 10 or is last one
  191. if sqs_batch == 10 or job == job_list[-1]:
  192. for retry in range(MaxRetry + 1):
  193. try:
  194. sqs.send_message_batch(QueueUrl=sqs_queue, Entries=sqs_message)
  195. break
  196. except Exception as e:
  197. logger.warning(f'Fail to send sqs message: {str(sqs_message)}, {str(e)}')
  198. if retry >= MaxRetry:
  199. logger.error(f'Fail MaxRetry {MaxRetry} send sqs message: {str(sqs_message)}')
  200. else:
  201. time.sleep(5 * retry)
  202. sqs_batch = 0
  203. sqs_message = []
  204. logger.info(f'Complete upload job to queue: {sqs_queue}')
  205. return
  206. def delta_job_list(src_file_list, des_file_list, src_bucket, src_prefix, des_bucket, des_prefix, ignore_list):
  207. # Delta list
  208. logger.info(f'Compare source s3://{src_bucket}/{src_prefix} and '
  209. f'destination s3://{des_bucket}/{des_prefix}')
  210. start_time = int(time.time())
  211. job_list = []
  212. ignore_records = []
  213. for src in src_file_list:
  214. # 排除掉 ignore_list 里面列的 bucket/key
  215. src_bucket_key = src_bucket + '/' + src['Key']
  216. ignore_match = False
  217. # 每个 ignore key 匹配一次,匹配上任何一个就跳过这个scr_key
  218. for ignore_key in ignore_list:
  219. if fnmatchcase(src_bucket_key, ignore_key):
  220. ignore_match = True
  221. break
  222. # 匹配不上,循环下一个 ignore_key
  223. if ignore_match:
  224. ignore_records.append(src_bucket_key)
  225. continue # 跳过当前 src
  226. # 比对源文件是否在目标中
  227. if src in des_file_list:
  228. # 下一个源文件
  229. continue
  230. # 把源文件加入job list
  231. else:
  232. Des_key = str(PurePosixPath(des_prefix) / src["Key"])
  233. if src["Key"][-1] == '/': # 源Key是个目录的情况,需要额外加 /
  234. Des_key += '/'
  235. job_list.append(
  236. {
  237. "Src_bucket": src_bucket,
  238. "Src_key": src["Key"], # Src_key已经包含了Prefix
  239. "Des_bucket": des_bucket,
  240. "Des_key": Des_key,
  241. "Size": src["Size"],
  242. }
  243. )
  244. spent_time = int(time.time()) - start_time
  245. logger.info(f'Delta list: {len(job_list)}, ignore list: {len(ignore_records)} - SPENT TIME: {spent_time}S')
  246. return job_list, ignore_records
  247. # Split one file size into list of start byte position list
  248. def split(Size, ChunkSize):
  249. partnumber = 1
  250. indexList = [0]
  251. if int(Size / ChunkSize) + 1 > 10000:
  252. ChunkSize = int(Size / 10000) + 1024 # 对于大于10000分片的大文件,自动调整Chunksize
  253. logger.info(f'Size excess 10000 parts limit. Auto change ChunkSize to {ChunkSize}')
  254. while ChunkSize * partnumber < Size: # 如果刚好是"=",则无需再分下一part,所以这里不能用"<="
  255. indexList.append(ChunkSize * partnumber)
  256. partnumber += 1
  257. return indexList, ChunkSize
  258. # Get unfinished multipart upload id from s3
  259. def get_uploaded_list(s3_client, Des_bucket, Des_key, MaxRetry):
  260. NextKeyMarker = ''
  261. IsTruncated = True
  262. multipart_uploaded_list = []
  263. while IsTruncated:
  264. IsTruncated = False
  265. for retry in range(MaxRetry + 1):
  266. try:
  267. logger.info(f'Getting unfinished upload id list {retry} retry {Des_bucket}/{Des_key}...')
  268. list_multipart_uploads = s3_client.list_multipart_uploads(
  269. Bucket=Des_bucket,
  270. Prefix=Des_key,
  271. MaxUploads=1000,
  272. KeyMarker=NextKeyMarker
  273. )
  274. IsTruncated = list_multipart_uploads["IsTruncated"]
  275. NextKeyMarker = list_multipart_uploads["NextKeyMarker"]
  276. if "Uploads" in list_multipart_uploads:
  277. for i in list_multipart_uploads["Uploads"]:
  278. if i["Key"] == Des_key:
  279. multipart_uploaded_list.append({
  280. "Key": i["Key"],
  281. "Initiated": i["Initiated"],
  282. "UploadId": i["UploadId"]
  283. })
  284. logger.info(f'Unfinished upload, Key: {i["Key"]}, Time: {i["Initiated"]}')
  285. break # 退出重试循环
  286. except Exception as e:
  287. logger.warning(f'Fail to list multipart upload {str(e)}')
  288. if retry >= MaxRetry:
  289. logger.error(f'Fail MaxRetry list multipart upload {str(e)}')
  290. return []
  291. else:
  292. time.sleep(5 * retry)
  293. return multipart_uploaded_list
  294. # Check file on the list from get_uploaded_list and get created multipart id
  295. def check_file_exist(prefix_and_key, UploadIdList):
  296. # 查Key是否有未完成的UploadID
  297. keyIDList = []
  298. for u in UploadIdList:
  299. if u["Key"] == prefix_and_key:
  300. keyIDList.append(u)
  301. # 如果找不到上传过的Upload,则从头开始传
  302. if not keyIDList:
  303. return 'UPLOAD'
  304. # 对同一个Key(文件)的不同Upload找出时间最晚的值
  305. UploadID_latest = keyIDList[0]
  306. for u in keyIDList:
  307. if u["Initiated"] > UploadID_latest["Initiated"]:
  308. UploadID_latest = u
  309. # pick last one upload id with latest Initiated time
  310. logger.info(f"Pick UploadId Initiated Time: {UploadID_latest['Initiated']}")
  311. return UploadID_latest["UploadId"]
  312. # Check uploaded part number list on Des_bucket
  313. def checkPartnumberList(Des_bucket, Des_key, uploadId, s3_des_client, MaxRetry=10):
  314. partnumberList = []
  315. PartNumberMarker = 0
  316. IsTruncated = True
  317. while IsTruncated:
  318. IsTruncated = False
  319. for retry in range(MaxRetry + 1):
  320. try:
  321. logger.info(f'Get partnumber list {retry} retry, PartNumberMarker: {PartNumberMarker}...')
  322. response_uploadedList = s3_des_client.list_parts(
  323. Bucket=Des_bucket,
  324. Key=Des_key,
  325. UploadId=uploadId,
  326. MaxParts=1000,
  327. PartNumberMarker=PartNumberMarker
  328. )
  329. PartNumberMarker = response_uploadedList['NextPartNumberMarker']
  330. IsTruncated = response_uploadedList['IsTruncated']
  331. if 'Parts' in response_uploadedList:
  332. logger.info(f'Response part number list len: {len(response_uploadedList["Parts"])}')
  333. for partnumberObject in response_uploadedList["Parts"]:
  334. partnumberList.append(partnumberObject["PartNumber"])
  335. break
  336. except Exception as e:
  337. logger.warning(f'Fail to list parts in checkPartnumberList. {str(e)}')
  338. if retry >= MaxRetry:
  339. logger.error(f'Fail MaxRetry list parts in checkPartnumberList. {str(e)}')
  340. return []
  341. else:
  342. time.sleep(5 * retry)
  343. # 循环完成获取list
  344. if partnumberList: # 如果空则表示没有查到已上传的Part
  345. logger.info(f"Found uploaded partnumber: {len(partnumberList)} - {json.dumps(partnumberList)}")
  346. else:
  347. logger.info(f'Part number list is empty')
  348. return partnumberList
  349. # Process one job
  350. def job_processor(uploadId, indexList, partnumberList, job, s3_src_client, s3_des_client,
  351. MaxThread, ChunkSize, MaxRetry, JobTimeout, ifVerifyMD5Twice):
  352. # 线程生成器,配合thread pool给出每个线程的对应关系,便于设置超时控制
  353. def thread_gen(woker_thread, pool,
  354. stop_signal, partnumber, total, md5list, partnumberList, complete_list):
  355. for partStartIndex in indexList:
  356. # start to upload part
  357. if partnumber not in partnumberList:
  358. dryrun = False # dryrun 是为了沿用现有的流程做出完成列表,方便后面计算 MD5
  359. else:
  360. dryrun = True
  361. th = pool.submit(woker_thread, stop_signal, partnumber, partStartIndex,
  362. total, md5list, dryrun, complete_list)
  363. partnumber += 1
  364. yield th
  365. # download part from src. s3 and upload to dest. s3
  366. def woker_thread(stop_signal, partnumber, partStartIndex, total, md5list, dryrun, complete_list):
  367. if stop_signal.is_set():
  368. return "TIMEOUT"
  369. Src_bucket = job['Src_bucket']
  370. Src_key = job['Src_key']
  371. Des_bucket = job['Des_bucket']
  372. Des_key = job['Des_key']
  373. # 下载文件
  374. if ifVerifyMD5Twice or not dryrun: # 如果 ifVerifyMD5Twice 则无论是否已有上传过都重新下载,作为校验整个文件用
  375. if not dryrun:
  376. logger.info(f"--->Downloading {ChunkSize} Bytes {Src_bucket}/{Src_key} - {partnumber}/{total}")
  377. else:
  378. logger.info(
  379. f"--->Downloading {ChunkSize} Bytes for verify MD5 {Src_bucket}/{Src_key} - {partnumber}/{total}")
  380. retryTime = 0
  381. # 正常工作情况下出现 stop_signal 需要退出 Thread
  382. while retryTime <= MaxRetry and not stop_signal.is_set():
  383. retryTime += 1
  384. try:
  385. response_get_object = s3_src_client.get_object(
  386. Bucket=Src_bucket,
  387. Key=Src_key,
  388. Range="bytes=" + str(partStartIndex) + "-" + str(partStartIndex + ChunkSize - 1)
  389. )
  390. getBody = response_get_object["Body"].read()
  391. chunkdata_md5 = hashlib.md5(getBody)
  392. md5list[partnumber - 1] = chunkdata_md5
  393. break # 完成下载,不用重试
  394. except ClientError as err:
  395. if err.response['Error']['Code'] in ['NoSuchKey', 'AccessDenied']:
  396. # 没这个ID,文件已经删除,或者无权限访问
  397. logger.error(f"Fail to access {Src_bucket}/{Src_key} - ERR: {str(err)}.")
  398. stop_signal.set()
  399. return "QUIT"
  400. logger.warning(f"Fail to download {Src_bucket}/{Src_key} - ERR: : {str(err)}. "
  401. f"Retry part: {partnumber} - Attempts: {retryTime}")
  402. if retryTime > MaxRetry: # 超过次数退出
  403. logger.error(f"Quit for Max Download retries: {retryTime} - {Src_bucket}/{Src_key}")
  404. stop_signal.set()
  405. return "TIMEOUT" # 退出Thread
  406. else:
  407. time.sleep(5 * retryTime)
  408. # 递增延迟,返回重试
  409. except Exception as e:
  410. logger.error(f'Fail Downloading {str(e)}')
  411. # 上传文件
  412. if not dryrun: # 这里就不用考虑 ifVerifyMD5Twice 了,
  413. retryTime = 0
  414. while retryTime <= MaxRetry and not stop_signal.is_set():
  415. retryTime += 1
  416. try:
  417. logger.info(f'--->Uploading {ChunkSize} Bytes {Des_bucket}/{Des_key} - {partnumber}/{total}')
  418. s3_des_client.upload_part(
  419. Body=getBody,
  420. Bucket=Des_bucket,
  421. Key=Des_key,
  422. PartNumber=partnumber,
  423. UploadId=uploadId,
  424. ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
  425. )
  426. # 请求已经带上md5,如果s3校验是错的就Exception
  427. break
  428. except ClientError as err:
  429. if err.response['Error']['Code'] == 'NoSuchUpload':
  430. # 没这个ID,则是别人已经完成这个Job了。
  431. logger.warning(f'ClientError: Fail to upload part - might be duplicated job:'
  432. f' {Des_bucket}/{Des_key}, {str(err)}')
  433. stop_signal.set()
  434. return "QUIT"
  435. logger.warning(f"ClientError: Fail to upload part - {Des_bucket}/{Des_key} - {str(err)}, "
  436. f"retry part: {partnumber} Attempts: {retryTime}")
  437. if retryTime > MaxRetry:
  438. logger.error(f"ClientError: Quit for Max Upload retries: {retryTime} - {Des_bucket}/{Des_key}")
  439. # 改为跳下一个文件
  440. stop_signal.set()
  441. return "TIMEOUT"
  442. else:
  443. time.sleep(5 * retryTime) # 递增延迟重试
  444. except Exception as e:
  445. logger.error(f'Fail Uploading {str(e)}')
  446. if not stop_signal.is_set():
  447. complete_list.append(partnumber)
  448. if not dryrun:
  449. logger.info(
  450. f'--->Complete {ChunkSize} Bytes {Src_bucket}/{Src_key} - {partnumber}/{total} {len(complete_list) / total:.2%}')
  451. else:
  452. return "TIMEOUT"
  453. return "COMPLETE"
  454. # woker_thread END
  455. # job_processor Main
  456. partnumber = 1 # 当前循环要上传的Partnumber
  457. total = len(indexList)
  458. md5list = [hashlib.md5(b'')] * total
  459. complete_list = []
  460. # 线程池
  461. try:
  462. stop_signal = threading.Event() # 用于JobTimeout终止当前文件的所有线程
  463. with concurrent.futures.ThreadPoolExecutor(max_workers=MaxThread) as pool:
  464. # 这里要用迭代器拿到threads对象
  465. threads = list(thread_gen(woker_thread, pool, stop_signal,
  466. partnumber, total, md5list, partnumberList, complete_list))
  467. result = concurrent.futures.wait(threads, timeout=JobTimeout, return_when="ALL_COMPLETED")
  468. # 异常退出
  469. if "QUIT" in [t.result() for t in result[0]]: # result[0] 是函数done
  470. logger.warning(f'QUIT. Canceling {len(result[1])} waiting threads in pool ...')
  471. stop_signal.set()
  472. for t in result[1]:
  473. t.cancel()
  474. logger.warning(f'QUIT Job: {job["Src_bucket"]}/{job["Src_key"]}')
  475. return "QUIT"
  476. # 超时
  477. if len(result[1]) > 0: # # result[0] 是函数not_done, 即timeout有未完成的
  478. logger.warning(f'TIMEOUT. Canceling {len(result[1])} waiting threads in pool ...')
  479. stop_signal.set()
  480. for t in result[1]:
  481. t.cancel()
  482. logger.warning(f'TIMEOUT {JobTimeout}S Job: {job["Src_bucket"]}/{job["Src_key"]}')
  483. return "TIMEOUT"
  484. # 线程池End
  485. logger.info(f'All parts uploaded: {job["Src_bucket"]}/{job["Src_key"]} - Size:{job["Size"]}')
  486. # 计算所有分片列表的总etag: cal_etag
  487. digests = b"".join(m.digest() for m in md5list)
  488. md5full = hashlib.md5(digests)
  489. cal_etag = '"%s-%s"' % (md5full.hexdigest(), len(md5list))
  490. except Exception as e:
  491. logger.error(f'Exception in job_processor: {str(e)}')
  492. return "ERR"
  493. return cal_etag
  494. # Complete multipart upload
  495. # 通过查询回来的所有Part列表uploadedListParts来构建completeStructJSON
  496. def completeUpload(uploadId, Des_bucket, Des_key, len_indexList, s3_des_client, MaxRetry):
  497. # 查询S3的所有Part列表uploadedListParts构建completeStructJSON
  498. # 发现跟checkPartnumberList有点像,但计算Etag不同,隔太久了,懒得合并了 :)
  499. uploadedListPartsClean = []
  500. PartNumberMarker = 0
  501. IsTruncated = True
  502. while IsTruncated:
  503. IsTruncated = False
  504. for retryTime in range(MaxRetry + 1):
  505. try:
  506. logger.info(f'Get complete partnumber list {retryTime} retry, PartNumberMarker: {PartNumberMarker}...')
  507. response_uploadedList = s3_des_client.list_parts(
  508. Bucket=Des_bucket,
  509. Key=Des_key,
  510. UploadId=uploadId,
  511. MaxParts=1000,
  512. PartNumberMarker=PartNumberMarker
  513. )
  514. NextPartNumberMarker = response_uploadedList['NextPartNumberMarker']
  515. IsTruncated = response_uploadedList['IsTruncated']
  516. # 把 ETag 加入到 Part List
  517. for partObject in response_uploadedList["Parts"]:
  518. ETag = partObject["ETag"]
  519. PartNumber = partObject["PartNumber"]
  520. addup = {
  521. "ETag": ETag,
  522. "PartNumber": PartNumber
  523. }
  524. uploadedListPartsClean.append(addup)
  525. PartNumberMarker = NextPartNumberMarker
  526. break
  527. except ClientError as e:
  528. if e.response['Error']['Code'] == 'NoSuchUpload':
  529. # Fail to list part list,没这个ID,则是别人已经完成这个Job了。
  530. logger.warning(f'Fail to list parts while completeUpload, might be duplicated job:'
  531. f' {Des_bucket}/{Des_key}, {str(e)}')
  532. return "ERR"
  533. logger.warning(f'Fail to list parts while completeUpload {Des_bucket}/{Des_key}, {str(e)}')
  534. if retryTime >= MaxRetry:
  535. logger.error(f'Fail MaxRetry list parts while completeUpload {Des_bucket}/{Des_key}')
  536. return "ERR"
  537. else:
  538. time.sleep(5 * retryTime)
  539. except Exception as e:
  540. logger.warning(f'Fail to list parts while completeUpload {Des_bucket}/{Des_key}, {str(e)}')
  541. if retryTime >= MaxRetry:
  542. logger.error(f'Fail MaxRetry list parts while completeUpload {Des_bucket}/{Des_key}')
  543. return "ERR"
  544. else:
  545. time.sleep(5 * retryTime)
  546. # 循环获取直到拿完全部parts
  547. if len(uploadedListPartsClean) != len_indexList:
  548. logger.warning(f'Uploaded parts size not match - {Des_bucket}/{Des_key}')
  549. return "ERR"
  550. completeStructJSON = {"Parts": uploadedListPartsClean}
  551. # S3合并multipart upload任务
  552. for retryTime in range(MaxRetry + 1):
  553. try:
  554. logger.info(f'Try to merge multipart upload {Des_bucket}/{Des_key}')
  555. response_complete = s3_des_client.complete_multipart_upload(
  556. Bucket=Des_bucket,
  557. Key=Des_key,
  558. UploadId=uploadId,
  559. MultipartUpload=completeStructJSON
  560. )
  561. result = response_complete['ETag']
  562. break
  563. except Exception as e:
  564. logger.warning(f'Fail to complete multipart upload {Des_bucket}/{Des_key}, {str(e)}')
  565. if retryTime >= MaxRetry:
  566. logger.error(f'Fail MaxRetry complete multipart upload {Des_bucket}/{Des_key}')
  567. return "ERR"
  568. else:
  569. time.sleep(5 * retryTime)
  570. logger.info(f'Complete merge file {Des_bucket}/{Des_key}')
  571. return result
  572. # Continuely get job message to invoke one processor per job
  573. def job_looper(sqs, sqs_queue, table, s3_src_client, s3_des_client, instance_id,
  574. StorageClass, ChunkSize, MaxRetry, MaxThread, ResumableThreshold,
  575. JobTimeout, ifVerifyMD5Twice, CleanUnfinishedUpload,
  576. Des_bucket_default, Des_prefix_default):
  577. while True:
  578. # Get Job from sqs
  579. try:
  580. logger.info('Get Job from sqs queue...')
  581. sqs_job_get = sqs.receive_message(QueueUrl=sqs_queue)
  582. # Empty queue message available
  583. if 'Messages' not in sqs_job_get: # No message on sqs queue
  584. logger.info('No message in queue available, wait...')
  585. time.sleep(60)
  586. # 拿到 Job message
  587. else:
  588. # TODO: 尚未完整处理 SQS 存在多条消息的情况,实际只针对一次取一个SQS消息
  589. for sqs_job in sqs_job_get["Messages"]:
  590. job = json.loads(sqs_job["Body"])
  591. job_receipt = sqs_job["ReceiptHandle"] # 用于后面删除message
  592. # 判断是S3来的消息,而不是jodsender来的就转换一下
  593. if 'Records' in job: # S3来的消息带着'Records'
  594. for One_record in job['Records']:
  595. if 's3' in One_record:
  596. Src_bucket = One_record['s3']['bucket']['name']
  597. Src_key = One_record['s3']['object']['key']
  598. Src_key = urllib.parse.unquote_plus(Src_key)
  599. Size = One_record['s3']['object']['size']
  600. Des_bucket, Des_prefix = Des_bucket_default, Des_prefix_default
  601. Des_key = str(PurePosixPath(Des_prefix) / Src_key)
  602. if Src_key[-1] == '/': # 针对空目录对象
  603. Des_key += '/'
  604. job = {
  605. 'Src_bucket': Src_bucket,
  606. 'Src_key': Src_key,
  607. 'Size': Size,
  608. 'Des_bucket': Des_bucket,
  609. 'Des_key': Des_key
  610. }
  611. if 'Des_bucket' not in job and 'Event' not in job:
  612. logger.warning(f'Wrong sqs job: {json.dumps(job, default=str)}')
  613. logger.warning('Try to handle next message')
  614. time.sleep(1)
  615. continue
  616. # 主流程
  617. if 'Event' not in job:
  618. if job['Size'] > ResumableThreshold:
  619. upload_etag_full = step_function(job, table, s3_src_client, s3_des_client, instance_id,
  620. StorageClass, ChunkSize, MaxRetry, MaxThread,
  621. JobTimeout, ifVerifyMD5Twice, CleanUnfinishedUpload)
  622. else:
  623. upload_etag_full = step_fn_small_file(job, table, s3_src_client, s3_des_client, instance_id,
  624. StorageClass, MaxRetry)
  625. else:
  626. if job['Event'] == 's3:TestEvent':
  627. logger.info('Skip s3:TestEvent')
  628. upload_etag_full = "s3:TestEvent"
  629. else:
  630. upload_etag_full = "OtherEvent"
  631. # Del Job on sqs
  632. logger.info(f'upload_etag_full={upload_etag_full}, job={str(sqs_job)}')
  633. if upload_etag_full != "TIMEOUT":
  634. # 如果是超时的就不删SQS消息,是正常结束或QUIT就删
  635. # QUIT 是 NoSuchUpload, NoSuchKey, AccessDenied,可以认为没必要再让下一个worker再试了
  636. # 直接删除SQS,并且DDB并不会记录结束状态
  637. for retry in range(MaxRetry + 1):
  638. try:
  639. logger.info(f'Try to finsh job message on sqs. {str(sqs_job)}')
  640. sqs.delete_message(
  641. QueueUrl=sqs_queue,
  642. ReceiptHandle=job_receipt
  643. )
  644. break
  645. except Exception as e:
  646. logger.warning(f'Fail to delete sqs message: {str(sqs_job)}, {str(e)}')
  647. if retry >= MaxRetry:
  648. logger.error(f'Fail MaxRetry delete sqs message: {str(sqs_job)}, {str(e)}')
  649. else:
  650. time.sleep(5 * retry)
  651. except Exception as e:
  652. logger.error(f'Fail. Wait for 5 seconds. ERR: {str(e)}')
  653. time.sleep(5)
  654. # Finish Job, go back to get next job in queue
  655. def step_function(job, table, s3_src_client, s3_des_client, instance_id,
  656. StorageClass, ChunkSize, MaxRetry, MaxThread,
  657. JobTimeout, ifVerifyMD5Twice, CleanUnfinishedUpload):
  658. # 正常开始处理
  659. Src_bucket = job['Src_bucket']
  660. Src_key = job['Src_key']
  661. Size = job['Size']
  662. Des_bucket = job['Des_bucket']
  663. Des_key = job['Des_key']
  664. logger.info(f'Start: {Src_bucket}/{Src_key}, Size: {Size}')
  665. # Get dest s3 unfinish multipart upload of this file
  666. multipart_uploaded_list = get_uploaded_list(s3_des_client, Des_bucket, Des_key, MaxRetry)
  667. # Debug用,清理S3上现有未完成的Multipart Upload ID(不只是当前Job,而对应目标Bucket上所有的)
  668. if multipart_uploaded_list and CleanUnfinishedUpload:
  669. logger.warning(f'You set CleanUnfinishedUpload. There are {len(multipart_uploaded_list)}.'
  670. f' Now clean them and restart!')
  671. multipart_uploaded_list = get_uploaded_list(s3_des_client, Des_bucket, "", MaxRetry)
  672. for clean_i in multipart_uploaded_list:
  673. try:
  674. s3_des_client.abort_multipart_upload(
  675. Bucket=Des_bucket,
  676. Key=clean_i["Key"],
  677. UploadId=clean_i["UploadId"]
  678. )
  679. except Exception as e:
  680. logger.error(f'Fail to clean {str(e)}')
  681. multipart_uploaded_list = []
  682. logger.info('CLEAN FINISHED')
  683. # 开始 Job 步骤
  684. # 循环重试3次(如果MD5计算的ETag不一致)
  685. for md5_retry in range(3):
  686. # Job 准备
  687. # 检查文件没Multipart UploadID要新建, 有则 return UploadID
  688. response_check_upload = check_file_exist(
  689. Des_key,
  690. multipart_uploaded_list
  691. )
  692. if response_check_upload == 'UPLOAD':
  693. try:
  694. logger.info(f'Create multipart upload: {Des_bucket}/{Des_key}')
  695. response_new_upload = s3_des_client.create_multipart_upload(
  696. Bucket=Des_bucket,
  697. Key=Des_key,
  698. StorageClass=StorageClass
  699. )
  700. # Write log to DDB in first round of job
  701. ddb_first_round(table, Src_bucket, Src_key, Size, MaxRetry)
  702. except Exception as e:
  703. logger.warning(f'Fail to create new multipart upload. {str(e)}')
  704. if md5_retry >= 2:
  705. upload_etag_full = "ERR"
  706. break
  707. else:
  708. time.sleep(5 * md5_retry)
  709. continue
  710. # logger.info("UploadId: "+response_new_upload["UploadId"])
  711. reponse_uploadId = response_new_upload["UploadId"]
  712. partnumberList = []
  713. else:
  714. reponse_uploadId = response_check_upload
  715. logger.info(f'Resume upload id: {Des_bucket}/{Des_key}')
  716. # 获取已上传partnumberList
  717. partnumberList = checkPartnumberList(
  718. Des_bucket,
  719. Des_key,
  720. reponse_uploadId,
  721. s3_des_client,
  722. MaxRetry
  723. )
  724. # 获取文件拆分片索引列表,例如[0, 10, 20]
  725. indexList, ChunkSize_auto = split(
  726. Size,
  727. ChunkSize
  728. ) # 对于大于10000分片的大文件,自动调整为Chunksize_auto
  729. # Write log to DDB in first round of job
  730. percent = int(len(partnumberList) / len(indexList) * 100)
  731. ddb_this_round(table, percent, Src_bucket, Src_key, instance_id, MaxRetry)
  732. # Job Thread: uploadPart, 超时或Key不对返回 TIMEOUT/QUIT
  733. upload_etag_full = job_processor(
  734. reponse_uploadId,
  735. indexList,
  736. partnumberList,
  737. job,
  738. s3_src_client,
  739. s3_des_client,
  740. MaxThread,
  741. ChunkSize_auto, # 对单个文件使用自动调整的 Chunksize_auto
  742. MaxRetry,
  743. JobTimeout,
  744. ifVerifyMD5Twice
  745. )
  746. if upload_etag_full == "TIMEOUT" or upload_etag_full == "QUIT":
  747. break # 退出处理该Job
  748. elif upload_etag_full == "ERR":
  749. multipart_uploaded_list = [] # 清掉已上传id列表,以便重新上传
  750. continue # 循环重试
  751. # 合并S3上的文件
  752. complete_etag = completeUpload(reponse_uploadId, Des_bucket, Des_key,
  753. len(indexList), s3_des_client, MaxRetry)
  754. logger.info(f'Merged: {Des_bucket}/{Des_key}')
  755. if complete_etag == "ERR":
  756. multipart_uploaded_list = [] # 清掉已上传id列表,以便重新上传
  757. continue # 循环重试
  758. # 检查文件MD5
  759. if ifVerifyMD5Twice:
  760. if complete_etag == upload_etag_full:
  761. logger.info(f'MD5 ETag Matched - {Des_bucket}/{Des_key} - {complete_etag}')
  762. break # 结束本文件,下一个sqs job
  763. else: # ETag 不匹配,删除目的S3的文件,重试
  764. logger.warning(f'MD5 ETag NOT MATCHED {Des_bucket}/{Des_key}( Destination / Origin ): '
  765. f'{complete_etag} - {upload_etag_full}')
  766. try:
  767. s3_des_client.delete_object(
  768. Bucket=Des_bucket,
  769. Key=Des_key
  770. )
  771. except Exception as e:
  772. logger.warning(f'Fail to delete on S3. {str(e)}')
  773. multipart_uploaded_list = []
  774. if md5_retry >= 2:
  775. logger.error(f'MD5 ETag NOT MATCHED Exceed Max Retries - {Des_bucket}/{Des_key}')
  776. upload_etag_full = "ERR"
  777. else:
  778. logger.warning(f'Retry {Des_bucket}/{Des_key}')
  779. continue
  780. # 正常结束 md5_retry 循环
  781. break
  782. # END md5_retry 超过次数
  783. # DynamoDB log: ADD status: DONE/ERR(upload_etag_full)
  784. ddb_complete(upload_etag_full, table, Src_bucket, Src_key, MaxRetry)
  785. # complete one job
  786. return upload_etag_full
  787. # Write log to DDB in first round of job
  788. def ddb_first_round(table, Src_bucket, Src_key, Size, MaxRetry):
  789. for retry in range(MaxRetry + 1):
  790. try:
  791. logger.info(f'Write log to DDB in first round of job: {Src_bucket}/{Src_key}')
  792. cur_time = time.time()
  793. table_key = str(PurePosixPath(Src_bucket) / Src_key)
  794. if Src_key[-1] == '/': # 针对空目录对象
  795. table_key += '/'
  796. table.update_item(
  797. Key={
  798. "Key": table_key
  799. },
  800. UpdateExpression="SET firstTime=:s, firstTime_f=:s_format, Size=:size",
  801. ExpressionAttributeValues={
  802. ":s": int(cur_time),
  803. ":s_format": time.asctime(time.localtime(cur_time)),
  804. ":size": Size
  805. }
  806. )
  807. break
  808. except Exception as e:
  809. # 日志写不了
  810. logger.warning(f'Fail to put log to DDB at starting this round: {Src_bucket}/{Src_key}, {str(e)}')
  811. if retry >= MaxRetry:
  812. logger.error(f'Fail MaxRetry put log to DDB at start {Src_bucket}/{Src_key}')
  813. else:
  814. time.sleep(5 * retry)
  815. # Write log to DDB in first round of job
  816. def ddb_start_small(table, Src_bucket, Src_key, Size, MaxRetry, instance_id):
  817. for retry in range(MaxRetry + 1):
  818. try:
  819. logger.info(f'Write log to DDB start small file job: {Src_bucket}/{Src_key}')
  820. cur_time = time.time()
  821. table_key = str(PurePosixPath(Src_bucket) / Src_key)
  822. if Src_key[-1] == '/': # 针对空目录对象
  823. table_key += '/'
  824. table.update_item(
  825. Key={
  826. "Key": table_key
  827. },
  828. UpdateExpression="ADD instanceID :id, tryTimes :t "
  829. "SET firstTime=:s, firstTime_f=:s_format, Size=:size",
  830. ExpressionAttributeValues={
  831. ":t": 1,
  832. ":id": {instance_id},
  833. ":s": int(cur_time),
  834. ":s_format": time.asctime(time.localtime(cur_time)),
  835. ":size": Size
  836. }
  837. )
  838. break
  839. except Exception as e:
  840. # 日志写不了
  841. logger.warning(f'Fail to put log to DDB at starting small file job: {Src_bucket}/{Src_key}, {str(e)}')
  842. if retry >= MaxRetry:
  843. logger.error(f'Fail MaxRetry put log to DDB at start small file: {Src_bucket}/{Src_key}')
  844. else:
  845. time.sleep(5 * retry)
  846. # DynamoDB log: ADD retry time, instance-id list, SET startTime of this round
  847. def ddb_this_round(table, percent, Src_bucket, Src_key, instance_id, MaxRetry):
  848. for retry in range(MaxRetry + 1):
  849. try:
  850. logger.info(f'Write log to DDB via start this round of job: {Src_bucket}/{Src_key}')
  851. cur_time = time.time()
  852. table_key = str(PurePosixPath(Src_bucket) / Src_key)
  853. if Src_key[-1] == '/': # 针对空目录对象
  854. table_key += '/'
  855. table.update_item(
  856. Key={
  857. "Key": table_key
  858. },
  859. UpdateExpression="ADD instanceID :id, tryTimes :t "
  860. "SET thisRoundStart=:s, thisRoundStart_f=:s_format, lastTimeProgress=:p",
  861. ExpressionAttributeValues={
  862. ":t": 1,
  863. ":id": {instance_id},
  864. ":s": int(cur_time),
  865. ":s_format": time.asctime(time.localtime(cur_time)),
  866. ":p": percent
  867. }
  868. )
  869. break
  870. except Exception as e:
  871. # 日志写不了
  872. logger.warning(f'Fail to put log to DDB at starting this round: {Src_bucket}/{Src_key}, {str(e)}')
  873. if retry >= MaxRetry:
  874. logger.error(f'Fail MaxRetry put log to DDB at start {Src_bucket}/{Src_key}')
  875. else:
  876. time.sleep(5 * retry)
  877. # DynamoDB log: ADD status: DONE/ERR(upload_etag_full)
  878. def ddb_complete(upload_etag_full, table, Src_bucket, Src_key, MaxRetry):
  879. status = "DONE"
  880. if upload_etag_full == "TIMEOUT":
  881. status = "TIMEOUT_or_MaxRetry"
  882. elif upload_etag_full == "ERR":
  883. status = "ERR"
  884. logger.info(f'Write job complete status to DDB: {status}')
  885. cur_time = time.time()
  886. table_key = str(PurePosixPath(Src_bucket) / Src_key)
  887. if Src_key[-1] == '/': # 针对空目录对象
  888. table_key += '/'
  889. if status == "DONE": # 正常写DDB
  890. UpdateExpression = "SET totalSpentTime=:s-firstTime, lastTimeProgress=:p, endTime=:s, endTime_f=:e" \
  891. " ADD jobStatus :done"
  892. ExpressionAttributeValues = {
  893. ":done": {status},
  894. ":s": int(cur_time),
  895. ":p": 100,
  896. ":e": time.asctime(time.localtime(cur_time))
  897. }
  898. else: # 状态异常,写DDB不能覆盖 lastTimeProgress
  899. UpdateExpression = "SET totalSpentTime=:s-firstTime, endTime=:s, endTime_f=:e" \
  900. " ADD jobStatus :done"
  901. ExpressionAttributeValues = {
  902. ":done": {status},
  903. ":s": int(cur_time),
  904. ":e": time.asctime(time.localtime(cur_time))
  905. }
  906. for retry in range(MaxRetry + 1):
  907. try:
  908. table.update_item(
  909. Key={"Key": table_key},
  910. UpdateExpression=UpdateExpression,
  911. ExpressionAttributeValues=ExpressionAttributeValues
  912. )
  913. break
  914. except Exception as e:
  915. logger.warning(f'Fail to put log to DDB at end:{Src_bucket}/{Src_key} {str(e)}')
  916. if retry >= MaxRetry:
  917. logger.error(f'Fail MaxRetry to put log to DDB at end of job:{Src_bucket}/{Src_key} {str(e)}')
  918. else:
  919. time.sleep(5 * retry)
  920. def step_fn_small_file(job, table, s3_src_client, s3_des_client, instance_id,
  921. StorageClass, MaxRetry):
  922. # 开始处理小文件
  923. Src_bucket = job['Src_bucket']
  924. Src_key = job['Src_key']
  925. Size = job['Size']
  926. Des_bucket = job['Des_bucket']
  927. Des_key = job['Des_key']
  928. logger.info(f'Start small file procedure: {Src_bucket}/{Src_key}, Size: {Size}')
  929. # Write DDB log for first round
  930. ddb_start_small(table, Src_bucket, Src_key, Size, MaxRetry, instance_id)
  931. upload_etag_full = []
  932. for retryTime in range(MaxRetry + 1):
  933. try:
  934. # Get object
  935. logger.info(f'--->Downloading {Size} Bytes {Src_bucket}/{Src_key} - Small file 1/1')
  936. response_get_object = s3_src_client.get_object(
  937. Bucket=Src_bucket,
  938. Key=Src_key
  939. )
  940. getBody = response_get_object["Body"].read()
  941. chunkdata_md5 = hashlib.md5(getBody)
  942. ContentMD5 = base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
  943. # Put object
  944. logger.info(f'--->Uploading {Size} Bytes {Des_bucket}/{Des_key} - Small file 1/1')
  945. response_put_object = s3_des_client.put_object(
  946. Body=getBody,
  947. Bucket=Des_bucket,
  948. Key=Des_key,
  949. ContentMD5=ContentMD5,
  950. StorageClass=StorageClass
  951. )
  952. # 请求已经带上md5,如果s3校验是错的就Exception
  953. upload_etag_full = response_put_object['ETag']
  954. # 结束 Upload/download
  955. break
  956. except ClientError as e:
  957. if e.response['Error']['Code'] in ['NoSuchKey', 'AccessDenied']:
  958. logger.error(f"Fail to access {Src_bucket}/{Src_key} - ERR: {str(e)}.")
  959. return "QUIT"
  960. logger.warning(f'Download/Upload small file Fail: {Src_bucket}/{Src_key}, '
  961. f'{str(e)}, Attempts: {retryTime}')
  962. if retryTime >= MaxRetry:
  963. logger.error(f'Fail MaxRetry Download/Upload small file: {Des_bucket}/{Des_key}')
  964. return "TIMEOUT"
  965. else:
  966. time.sleep(5 * retryTime)
  967. except Exception as e:
  968. logger.error(f'Fail in step_fn_small {str(e)}')
  969. # Write DDB log for complete
  970. ddb_complete(upload_etag_full, table, Src_bucket, Src_key, MaxRetry)
  971. # complete one job
  972. return upload_etag_full