s3_migration_lib.py 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203
  1. # PROJECT LONGBOW - LIB FOR TRANSMISSION BETWEEN AMAZON S3
  2. import datetime
  3. import logging
  4. import hashlib
  5. import concurrent.futures
  6. import threading
  7. import base64
  8. import urllib.request
  9. import urllib.parse
  10. import boto3
  11. from botocore.config import Config
  12. from botocore.exceptions import ClientError
  13. # from boto3.dynamodb import conditions
  14. import json
  15. import os
  16. import sys
  17. import time
  18. from fnmatch import fnmatchcase
  19. from pathlib import PurePosixPath, Path
  20. logger = logging.getLogger()
  21. Max_md5_retry = 2
  22. # Configure logging
  23. def set_log(LoggingLevel, this_file_name):
  24. logger.setLevel(logging.WARNING)
  25. if LoggingLevel == 'INFO':
  26. logger.setLevel(logging.INFO)
  27. elif LoggingLevel == 'DEBUG':
  28. logger.setLevel(logging.DEBUG)
  29. # File logging
  30. log_path = Path(__file__).parent.parent / 'amazon-s3-migration-log'
  31. if not Path.exists(log_path):
  32. Path.mkdir(log_path)
  33. start_time = datetime.datetime.now().isoformat().replace(':', '-')[:19]
  34. _log_file_name = str(log_path / f'{this_file_name}-{start_time}.log')
  35. print('Log file:', _log_file_name)
  36. fileHandler = logging.FileHandler(filename=_log_file_name)
  37. fileHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
  38. logger.addHandler(fileHandler)
  39. return logger, _log_file_name
  40. # Set environment
  41. def set_env(*, JobType, LocalProfileMode, table_queue_name, sqs_queue_name, ssm_parameter_credentials, MaxRetry):
  42. s3_config = Config(max_pool_connections=200, retries={'max_attempts': MaxRetry}) # boto default 10
  43. if os.uname()[0] == 'Linux' and not LocalProfileMode: # on EC2, use EC2 role
  44. logger.info('Get instance-id and running region')
  45. instance_id = urllib.request.urlopen(urllib.request.Request(
  46. "http://169.254.169.254/latest/meta-data/instance-id"
  47. )).read().decode('utf-8')
  48. region = json.loads(urllib.request.urlopen(urllib.request.Request(
  49. "http://169.254.169.254/latest/dynamic/instance-identity/document"
  50. )).read().decode('utf-8'))['region']
  51. sqs = boto3.client('sqs', region)
  52. dynamodb = boto3.resource('dynamodb', region)
  53. ssm = boto3.client('ssm', region)
  54. # 取另一个Account的credentials
  55. logger.info(f'Get ssm_parameter_credentials: {ssm_parameter_credentials}')
  56. try:
  57. credentials = json.loads(ssm.get_parameter(
  58. Name=ssm_parameter_credentials,
  59. WithDecryption=True
  60. )['Parameter']['Value'])
  61. except Exception as e:
  62. logger.error(f'Fail to get {ssm_parameter_credentials} in SSM Parameter store. '
  63. f'Fix and restart Jobsender. {str(e)}')
  64. sys.exit(0)
  65. credentials_session = boto3.session.Session(
  66. aws_access_key_id=credentials["aws_access_key_id"],
  67. aws_secret_access_key=credentials["aws_secret_access_key"],
  68. region_name=credentials["region"]
  69. )
  70. if JobType.upper() == "PUT":
  71. s3_src_client = boto3.client('s3', region, config=s3_config)
  72. s3_des_client = credentials_session.client('s3', config=s3_config)
  73. elif JobType.upper() == "GET":
  74. s3_des_client = boto3.client('s3', region, config=s3_config)
  75. s3_src_client = credentials_session.client('s3', config=s3_config)
  76. else:
  77. logger.error('Wrong JobType setting in config.ini file')
  78. sys.exit(0)
  79. # 在没有Role的环境运行,例如本地Mac测试
  80. else:
  81. instance_id = "local"
  82. src_session = boto3.session.Session(profile_name='iad')
  83. des_session = boto3.session.Session(profile_name='zhy')
  84. sqs = src_session.client('sqs')
  85. dynamodb = src_session.resource('dynamodb')
  86. ssm = src_session.client('ssm')
  87. s3_src_client = src_session.client('s3', config=s3_config)
  88. s3_des_client = des_session.client('s3', config=s3_config)
  89. # 在当前屏幕也输出,便于local debug监控。
  90. streamHandler = logging.StreamHandler()
  91. streamHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
  92. logger.addHandler(streamHandler)
  93. table = dynamodb.Table(table_queue_name)
  94. table.wait_until_exists()
  95. sqs_queue = wait_sqs_available(
  96. sqs=sqs,
  97. sqs_queue_name=sqs_queue_name
  98. )
  99. return sqs, sqs_queue, table, s3_src_client, s3_des_client, instance_id, ssm
  100. def wait_sqs_available(*, sqs, sqs_queue_name):
  101. while True:
  102. try:
  103. return sqs.get_queue_url(QueueName=sqs_queue_name)['QueueUrl']
  104. except Exception as e:
  105. logger.warning(f'Waiting for SQS availability. {str(e)}')
  106. time.sleep(10)
  107. def check_sqs_empty(sqs, sqs_queue):
  108. try:
  109. sqs_in_flight = sqs.get_queue_attributes(
  110. QueueUrl=sqs_queue,
  111. AttributeNames=['ApproximateNumberOfMessagesNotVisible', 'ApproximateNumberOfMessages']
  112. )
  113. except Exception as e:
  114. logger.error(f'Fail to get_queue_attributes: {str(e)}')
  115. return False # Can't get sqs status, then consider it is not empty
  116. NotVisible = sqs_in_flight['Attributes']['ApproximateNumberOfMessagesNotVisible']
  117. Visible = sqs_in_flight['Attributes']['ApproximateNumberOfMessages']
  118. logger.info(f'ApproximateNumberOfMessagesNotVisible: {NotVisible}, ApproximateNumberOfMessages: {Visible}')
  119. if NotVisible == '0' and (Visible == '0' or Visible == '1'):
  120. # In init state, the new created bucket trigger SQS will send one test message to SQS.
  121. # So here to ignore the case Visible == '1'
  122. return True # sqs is empty
  123. return False # sqs is not empty
  124. # Get source S3 bucket file list with versionId
  125. def get_src_file_list(*, s3_client, bucket, S3Prefix, JobsenderCompareVersionId):
  126. # get s3 file list
  127. file_list = []
  128. if JobsenderCompareVersionId:
  129. paginator = s3_client.get_paginator('list_object_versions')
  130. else:
  131. paginator = s3_client.get_paginator('list_objects_v2') # 速度比 list_object_versions 快很多
  132. try:
  133. if S3Prefix == '/':
  134. S3Prefix = ''
  135. logger.info(f'Get s3 file list from: {bucket}/{S3Prefix}')
  136. response_iterator = paginator.paginate(
  137. Bucket=bucket,
  138. Prefix=S3Prefix
  139. )
  140. for page in response_iterator:
  141. if "Versions" in page: # JobsenderCompareVersionId==True
  142. logger.info(f'Got list_object_versions {bucket}/{S3Prefix}: {len(page["Versions"])}')
  143. for n in page["Versions"]:
  144. # 只拿最新版本的列表
  145. if n["IsLatest"]:
  146. file_list.append({
  147. "Key": n["Key"],
  148. "Size": n["Size"],
  149. "versionId": n["VersionId"] if JobsenderCompareVersionId else 'null'
  150. })
  151. elif "Contents" in page: # JobsenderCompareVersionId==False
  152. logger.info(f'Got list_objects_v2 {bucket}/{S3Prefix}: {len(page["Contents"])}')
  153. for n in page["Contents"]:
  154. file_list.append({
  155. "Key": n["Key"],
  156. "Size": n["Size"],
  157. "versionId": 'null'
  158. })
  159. except Exception as err:
  160. logger.error(f'Fail to get s3 list versions {bucket}/{S3Prefix}: {str(err)}')
  161. logger.info(f'Bucket list length(get_src_file_list):{str(len(file_list))}')
  162. return file_list
  163. # Get Destination S3 bucket file list with versionId from DDB
  164. def get_des_file_list(*, s3_client, bucket, S3Prefix, table, JobsenderCompareVersionId):
  165. if JobsenderCompareVersionId:
  166. ver_list = get_versionid_from_ddb(
  167. des_bucket=bucket,
  168. table=table
  169. )
  170. else:
  171. ver_list = {}
  172. # For delete prefix in des_prefix
  173. if S3Prefix == '' or S3Prefix == '/':
  174. # 目的bucket没有设置 Prefix
  175. dp_len = 0
  176. else:
  177. # 目的bucket的 "prefix/"长度
  178. dp_len = len(S3Prefix) + 1
  179. # get s3 file list
  180. file_list = []
  181. paginator = s3_client.get_paginator('list_objects_v2')
  182. try:
  183. if S3Prefix == '/':
  184. S3Prefix = ''
  185. logger.info(f'Get s3 file list from: {bucket}/{S3Prefix}')
  186. response_iterator = paginator.paginate(
  187. Bucket=bucket,
  188. Prefix=S3Prefix
  189. )
  190. for page in response_iterator:
  191. if "Contents" in page:
  192. logger.info(f'Got list_objects_v2 {bucket}/{S3Prefix}: {len(page["Contents"])}')
  193. for n in page["Contents"]:
  194. # 取这个key的versionId,如果是ver_list是空,则会全部都是None
  195. ver = ver_list.get(n["Key"])
  196. # 目的桶要去掉prefix
  197. file_list.append({
  198. "Key": n["Key"][dp_len:],
  199. "Size": n["Size"],
  200. "versionId": ver if (ver is not None and JobsenderCompareVersionId) else 'null'
  201. })
  202. except Exception as err:
  203. logger.error(f'Fail to get s3 list_objects_v2 {bucket}/{S3Prefix}: {str(err)}')
  204. logger.info(f'Bucket list length(get_des_file_list):{str(len(file_list))}')
  205. return file_list
  206. # Get s3 object versionId record in DDB
  207. def get_versionid_from_ddb(*, des_bucket, table):
  208. logger.info(f'Get des_bucket versionId list from DDB')
  209. ver_list = {}
  210. try:
  211. r = table.query(
  212. IndexName='desBucket-index',
  213. KeyConditionExpression='desBucket=:b',
  214. ExpressionAttributeValues={":b": des_bucket}
  215. )
  216. if 'Items' in r:
  217. for i in r['Items']:
  218. ver_list[i['desKey']] = i['versionId']
  219. logger.info(f'Got versionId list {des_bucket}: {len(ver_list)}')
  220. except Exception as e:
  221. logger.error(f'Fail to query DDB for versionId {des_bucket}- {str(e)}')
  222. return ver_list
  223. # Jobsender compare source and destination bucket list
  224. def delta_job_list(*, src_file_list, des_file_list, src_bucket, src_prefix, des_bucket, des_prefix, ignore_list,
  225. JobsenderCompareVersionId):
  226. # Delta list,只对比key&size,version不做对比,只用来发Job
  227. logger.info(f'Compare source s3://{src_bucket}/{src_prefix} and destination s3://{des_bucket}/{des_prefix}')
  228. start_time = int(time.time())
  229. job_list = []
  230. ignore_records = []
  231. for src in src_file_list:
  232. # 排除掉 ignore_list 里面列的 bucket/key
  233. src_bucket_key = src_bucket + '/' + src['Key']
  234. ignore_match = False
  235. # 每个 ignore key 匹配一次,匹配上任何一个就跳过这个scr_key
  236. for ignore_key in ignore_list:
  237. if fnmatchcase(src_bucket_key, ignore_key):
  238. ignore_match = True
  239. break
  240. # 匹配不上,循环下一个 ignore_key
  241. if ignore_match:
  242. ignore_records.append(src_bucket_key)
  243. continue # 跳过当前 src
  244. # 比对源文件是否在目标中,会连带versionId一起比较
  245. if src in des_file_list:
  246. continue # 在List中,下一个源文件
  247. # 不在List中,把源文件加入job list
  248. else:
  249. Des_key = str(PurePosixPath(des_prefix) / src["Key"])
  250. if src["Key"][-1] == '/': # 源Key是个目录的情况,需要额外加 /
  251. Des_key += '/'
  252. job_list.append(
  253. {
  254. "Src_bucket": src_bucket,
  255. "Src_key": src["Key"], # Src_key已经包含了Prefix
  256. "Des_bucket": des_bucket,
  257. "Des_key": Des_key,
  258. "Size": src["Size"],
  259. "versionId": src['versionId']
  260. }
  261. )
  262. spent_time = int(time.time()) - start_time
  263. if JobsenderCompareVersionId:
  264. logger.info(f'Finish compare key/size/versionId in {spent_time} Seconds (JobsenderCompareVersionId is enable)')
  265. else:
  266. logger.info(f'Finish compare key/size in {spent_time} Seconds (JobsenderCompareVersionId is disable)')
  267. logger.info(f'Delta Job List: {len(job_list)} - Ignore List: {len(ignore_records)}')
  268. return job_list, ignore_records
  269. def job_upload_sqs_ddb(*, sqs, sqs_queue, job_list):
  270. sqs_batch = 0
  271. sqs_message = []
  272. logger.info(f'Start uploading jobs to queue: {sqs_queue}')
  273. # create ddb writer
  274. # with table.batch_writer() as ddb_batch:
  275. for job in job_list:
  276. # construct sqs messages
  277. sqs_message.append({
  278. "Id": str(sqs_batch),
  279. "MessageBody": json.dumps(job),
  280. })
  281. sqs_batch += 1
  282. # write to sqs in batch 10 or is last one
  283. if sqs_batch == 10 or job == job_list[-1]:
  284. try:
  285. sqs.send_message_batch(QueueUrl=sqs_queue, Entries=sqs_message)
  286. except Exception as e:
  287. logger.error(f'Fail to send sqs message: {str(sqs_message)}, {str(e)}')
  288. sqs_batch = 0
  289. sqs_message = []
  290. logger.info(f'Complete upload job to queue: {sqs_queue}')
  291. return
  292. # Split one file size into list of start byte position list
  293. def split(Size, ChunkSize):
  294. partnumber = 1
  295. indexList = [0]
  296. if int(Size / ChunkSize) + 1 > 10000:
  297. ChunkSize = int(Size / 10000) + 1024 # 对于大于10000分片的大文件,自动调整Chunksize
  298. logger.info(f'Size excess 10000 parts limit. Auto change ChunkSize to {ChunkSize}')
  299. while ChunkSize * partnumber < Size: # 如果刚好是"=",则无需再分下一part,所以这里不能用"<="
  300. indexList.append(ChunkSize * partnumber)
  301. partnumber += 1
  302. return indexList, ChunkSize
  303. # Get S3 versionID
  304. def head_s3_version(*, s3_src_client, Src_bucket, Src_key):
  305. logger.info(f'Try to update VersionId: {Src_bucket}/{Src_key}')
  306. try:
  307. head = s3_src_client.head_object(
  308. Bucket=Src_bucket,
  309. Key=Src_key
  310. )
  311. versionId = head['VersionId']
  312. Size = head['ContentLength']
  313. logger.info(f'Got VersionId: {versionId} - {Src_bucket}/{Src_key} - Size:{Size}')
  314. except Exception as e:
  315. logger.error(f'Fail to head s3 - {Src_bucket}/{Src_key}, {str(e)}')
  316. return "ERR", 0
  317. return versionId, Size
  318. # Get unfinished multipart upload id from s3
  319. def get_uploaded_list(*, s3_client, Des_bucket, Des_key):
  320. multipart_uploaded_list = []
  321. paginator = s3_client.get_paginator('list_multipart_uploads')
  322. try:
  323. logger.info(f'Getting unfinished upload id list - {Des_bucket}/{Des_key}...')
  324. response_iterator = paginator.paginate(
  325. Bucket=Des_bucket,
  326. Prefix=Des_key
  327. )
  328. for page in response_iterator:
  329. if "Uploads" in page:
  330. for i in page["Uploads"]:
  331. if i["Key"] == Des_key or Des_key == '':
  332. multipart_uploaded_list.append({
  333. "Key": i["Key"],
  334. "Initiated": i["Initiated"],
  335. "UploadId": i["UploadId"]
  336. })
  337. logger.info(f'Unfinished upload, Key: {i["Key"]}, Time: {i["Initiated"]}')
  338. except Exception as e:
  339. logger.error(f'Fail to list multipart upload - {Des_bucket}/{Des_key} - {str(e)}')
  340. return multipart_uploaded_list
  341. # Check file on the list from get_uploaded_list and get created multipart id
  342. def check_file_exist(*, prefix_and_key, UploadIdList):
  343. # 查Key是否有未完成的UploadID
  344. keyIDList = []
  345. for u in UploadIdList:
  346. if u["Key"] == prefix_and_key:
  347. keyIDList.append(u)
  348. # 如果找不到上传过的Upload,则从头开始传
  349. if not keyIDList:
  350. return 'UPLOAD'
  351. # 对同一个Key(文件)的不同Upload找出时间最晚的值
  352. UploadID_latest = keyIDList[0]
  353. for u in keyIDList:
  354. if u["Initiated"] > UploadID_latest["Initiated"]:
  355. UploadID_latest = u
  356. # pick last one upload id with latest Initiated time
  357. logger.info(f"Pick UploadId Initiated Time: {UploadID_latest['Initiated']}")
  358. return UploadID_latest["UploadId"]
  359. # Check uploaded part number list on Des_bucket
  360. def checkPartnumberList(*, Des_bucket, Des_key, uploadId, s3_des_client):
  361. partnumberList = []
  362. logger.info(f'Get partnumber list - {Des_bucket}/{Des_key}')
  363. paginator = s3_des_client.get_paginator('list_parts')
  364. try:
  365. response_iterator = paginator.paginate(
  366. Bucket=Des_bucket,
  367. Key=Des_key,
  368. UploadId=uploadId
  369. )
  370. for page in response_iterator:
  371. if "Parts" in page:
  372. logger.info(f'Got list_parts: {len(page["Parts"])} - {Des_bucket}/{Des_key}')
  373. for p in page["Parts"]:
  374. partnumberList.append(p["PartNumber"])
  375. except Exception as e:
  376. logger.error(f'Fail to list parts in checkPartnumberList - {Des_bucket}/{Des_key} - {str(e)}')
  377. return []
  378. if partnumberList: # 如果空则表示没有查到已上传的Part
  379. logger.info(f"Found uploaded partnumber {len(partnumberList)} - {json.dumps(partnumberList)}"
  380. f" - {Des_bucket}/{Des_key}")
  381. else:
  382. logger.info(f'Part number list is empty - {Des_bucket}/{Des_key}')
  383. return partnumberList
  384. # Process one job
  385. def job_processor(*, uploadId, indexList, partnumberList, job, s3_src_client, s3_des_client,
  386. MaxThread, ChunkSize, MaxRetry, JobTimeout, ifVerifyMD5Twice, GetObjectWithVersionId):
  387. # 线程生成器,配合thread pool给出每个线程的对应关系,便于设置超时控制
  388. def thread_gen(woker_thread, pool,
  389. stop_signal, partnumber, total, md5list, partnumberList, complete_list):
  390. for partStartIndex in indexList:
  391. # start to upload part
  392. if partnumber not in partnumberList:
  393. dryrun = False # dryrun 是为了沿用现有的流程做出完成列表,方便后面计算 MD5
  394. else:
  395. dryrun = True
  396. th = pool.submit(woker_thread,
  397. stop_signal=stop_signal,
  398. partnumber=partnumber,
  399. partStartIndex=partStartIndex,
  400. total=total,
  401. md5list=md5list,
  402. dryrun=dryrun,
  403. complete_list=complete_list
  404. )
  405. partnumber += 1
  406. yield th
  407. # download part from src. s3 and upload to dest. s3
  408. def woker_thread(*, stop_signal, partnumber, partStartIndex, total, md5list, dryrun, complete_list):
  409. if stop_signal.is_set():
  410. return "TIMEOUT"
  411. Src_bucket = job['Src_bucket']
  412. Src_key = job['Src_key']
  413. Des_bucket = job['Des_bucket']
  414. Des_key = job['Des_key']
  415. versionId = job['versionId']
  416. getBody, chunkdata_md5 = b'', b'' # init
  417. # 下载文件
  418. if ifVerifyMD5Twice or not dryrun: # 如果 ifVerifyMD5Twice 则无论是否已有上传过都重新下载,作为校验整个文件用
  419. if GetObjectWithVersionId:
  420. logger.info(f"--->Downloading {ChunkSize} Bytes {Src_bucket}/{Src_key} - {partnumber}/{total}"
  421. f" - versionId: {versionId}")
  422. else:
  423. logger.info(f"--->Downloading {ChunkSize} Bytes {Src_bucket}/{Src_key} - {partnumber}/{total}")
  424. retryTime = 0
  425. # 正常工作情况下出现 stop_signal 需要退出 Thread
  426. while retryTime <= MaxRetry and not stop_signal.is_set():
  427. retryTime += 1
  428. try:
  429. if GetObjectWithVersionId: # 按VersionId获取Object
  430. response_get_object = s3_src_client.get_object(
  431. Bucket=Src_bucket,
  432. Key=Src_key,
  433. VersionId=versionId,
  434. Range="bytes=" + str(partStartIndex) + "-" + str(partStartIndex + ChunkSize - 1)
  435. )
  436. else: # 不带VersionId,即获取最新对象
  437. response_get_object = s3_src_client.get_object(
  438. Bucket=Src_bucket,
  439. Key=Src_key,
  440. Range="bytes=" + str(partStartIndex) + "-" + str(partStartIndex + ChunkSize - 1)
  441. )
  442. getBody = response_get_object["Body"].read()
  443. chunkdata_md5 = hashlib.md5(getBody)
  444. md5list[partnumber - 1] = chunkdata_md5
  445. break # 完成下载,不用重试
  446. except ClientError as err:
  447. if err.response['Error']['Code'] in ['NoSuchKey', 'AccessDenied']:
  448. # 没这个ID,文件已经删除,或者无权限访问
  449. logger.error(f"ClientError: Fail to access {Src_bucket}/{Src_key} - ERR: {str(err)}.")
  450. stop_signal.set()
  451. return "QUIT"
  452. logger.warning(f"ClientError: Fail to download {Src_bucket}/{Src_key} - ERR: {str(err)}. "
  453. f"Retry part: {partnumber} - Attempts: {retryTime}")
  454. if retryTime >= MaxRetry: # 超过次数退出
  455. logger.error(f"ClientError: Quit for Max Download retries: {retryTime} - "
  456. f"{Src_bucket}/{Src_key}")
  457. stop_signal.set()
  458. return "TIMEOUT" # 退出Thread
  459. else:
  460. time.sleep(5 * retryTime)
  461. continue
  462. # 递增延迟,返回重试
  463. except Exception as e:
  464. logger.warning(f"Fail to download {Src_bucket}/{Src_key} - ERR: {str(e)}. "
  465. f"Retry part: {partnumber} - Attempts: {retryTime}")
  466. if retryTime >= MaxRetry: # 超过次数退出
  467. logger.error(f"Quit for Max Download retries: {retryTime} - {Src_bucket}/{Src_key}")
  468. stop_signal.set()
  469. return "TIMEOUT" # 退出Thread
  470. else:
  471. time.sleep(5 * retryTime)
  472. continue
  473. # 上传文件
  474. if not dryrun: # 这里就不用考虑 ifVerifyMD5Twice 了,
  475. retryTime = 0
  476. while retryTime <= MaxRetry and not stop_signal.is_set():
  477. retryTime += 1
  478. try:
  479. logger.info(f'--->Uploading {len(getBody)} Bytes {Des_bucket}/{Des_key} - {partnumber}/{total}')
  480. s3_des_client.upload_part(
  481. Body=getBody,
  482. Bucket=Des_bucket,
  483. Key=Des_key,
  484. PartNumber=partnumber,
  485. UploadId=uploadId,
  486. ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
  487. )
  488. # 请求已经带上md5,如果s3校验是错的就Exception
  489. break
  490. except ClientError as err:
  491. if err.response['Error']['Code'] == 'NoSuchUpload':
  492. # 没这个ID,则是别人已经完成这个Job了。
  493. logger.warning(f'ClientError: Fail to upload part - might be duplicated job:'
  494. f' {Des_bucket}/{Des_key}, {str(err)}')
  495. stop_signal.set()
  496. return "QUIT"
  497. logger.warning(f"ClientError: Fail to upload part - {Des_bucket}/{Des_key} - {str(err)}, "
  498. f"retry part: {partnumber} Attempts: {retryTime}")
  499. if retryTime >= MaxRetry:
  500. logger.error(f"ClientError: Quit for Max Upload retries: {retryTime} - {Des_bucket}/{Des_key}")
  501. # 改为跳下一个文件
  502. stop_signal.set()
  503. return "TIMEOUT"
  504. else:
  505. time.sleep(5 * retryTime) # 递增延迟重试
  506. continue
  507. except Exception as e:
  508. logger.warning(f"Fail to upload part - {Des_bucket}/{Des_key} - {str(e)}, "
  509. f"retry part: {partnumber} Attempts: {retryTime}")
  510. if retryTime >= MaxRetry:
  511. logger.error(f"Quit for Max Upload retries: {retryTime} - {Des_bucket}/{Des_key}")
  512. # 改为跳下一个文件
  513. stop_signal.set()
  514. return "TIMEOUT"
  515. else:
  516. time.sleep(5 * retryTime)
  517. continue
  518. if not stop_signal.is_set():
  519. complete_list.append(partnumber)
  520. if not dryrun:
  521. logger.info(
  522. f'--->Complete {len(getBody)} Bytes {Src_bucket}/{Src_key}'
  523. f' - {partnumber}/{total} {len(complete_list) / total:.2%}')
  524. else:
  525. return "TIMEOUT"
  526. return "COMPLETE"
  527. # woker_thread END
  528. # job_processor Main
  529. partnumber = 1 # 当前循环要上传的Partnumber
  530. total = len(indexList)
  531. md5list = [hashlib.md5(b'')] * total
  532. complete_list = []
  533. # 线程池
  534. try:
  535. stop_signal = threading.Event() # 用于JobTimeout终止当前文件的所有线程
  536. with concurrent.futures.ThreadPoolExecutor(max_workers=MaxThread) as pool:
  537. # 这里要用迭代器拿到threads对象
  538. threads = list(thread_gen(woker_thread, pool, stop_signal,
  539. partnumber, total, md5list, partnumberList, complete_list))
  540. result = concurrent.futures.wait(threads, timeout=JobTimeout, return_when="ALL_COMPLETED")
  541. # 异常退出
  542. if "QUIT" in [t.result() for t in result[0]]: # result[0] 是函数done
  543. logger.warning(f'QUIT. Canceling {len(result[1])} waiting threads in pool ...')
  544. stop_signal.set()
  545. for t in result[1]:
  546. t.cancel()
  547. logger.warning(f'QUIT Job: {job["Src_bucket"]}/{job["Src_key"]}')
  548. return "QUIT"
  549. # 超时
  550. if len(result[1]) > 0: # # result[0] 是函数not_done, 即timeout有未完成的
  551. logger.warning(f'TIMEOUT. Canceling {len(result[1])} waiting threads in pool ...')
  552. stop_signal.set()
  553. for t in result[1]:
  554. t.cancel()
  555. logger.warning(f'TIMEOUT {JobTimeout}S Job: {job["Src_bucket"]}/{job["Src_key"]}')
  556. return "TIMEOUT"
  557. # 线程池End
  558. logger.info(f'All parts uploaded: {job["Src_bucket"]}/{job["Src_key"]} - Size:{job["Size"]}')
  559. # 计算所有分片列表的总etag: cal_etag
  560. digests = b"".join(m.digest() for m in md5list)
  561. md5full = hashlib.md5(digests)
  562. cal_etag = '"%s-%s"' % (md5full.hexdigest(), len(md5list))
  563. except Exception as e:
  564. logger.error(f'Exception in job_processor: {str(e)}')
  565. return "ERR"
  566. return cal_etag
  567. # Complete multipart upload
  568. # 通过查询回来的所有Part列表uploadedListParts来构建completeStructJSON
  569. def completeUpload(*, uploadId, Des_bucket, Des_key, len_indexList, s3_des_client):
  570. # 查询S3的所有Part列表uploadedListParts构建completeStructJSON
  571. uploadedListPartsClean = []
  572. logger.info(f'Get partnumber list - {Des_bucket}/{Des_key}')
  573. paginator = s3_des_client.get_paginator('list_parts')
  574. try:
  575. response_iterator = paginator.paginate(
  576. Bucket=Des_bucket,
  577. Key=Des_key,
  578. UploadId=uploadId
  579. )
  580. # 把 ETag 加入到 Part List
  581. for page in response_iterator:
  582. if "Parts" in page:
  583. logger.info(f'Got list_parts: {len(page["Parts"])} - {Des_bucket}/{Des_key}')
  584. for p in page["Parts"]:
  585. uploadedListPartsClean.append({
  586. "ETag": p["ETag"],
  587. "PartNumber": p["PartNumber"]
  588. })
  589. except ClientError as e:
  590. if e.response['Error']['Code'] == 'NoSuchUpload':
  591. # 没这个ID,则是别人已经完成这个Job了。
  592. logger.warning(f'ClientError: Fail to list parts while completeUpload - might be duplicated job:'
  593. f' {Des_bucket}/{Des_key}, {str(err)}')
  594. return "QUIT"
  595. logger.error(f'ClientError: Fail to list parts while completeUpload - {Des_bucket}/{Des_key} - {str(e)}')
  596. return "ERR"
  597. except Exception as e:
  598. logger.error(f'Fail to list parts while completeUpload - {Des_bucket}/{Des_key} - {str(e)}')
  599. return "ERR"
  600. if len(uploadedListPartsClean) != len_indexList:
  601. logger.error(f'Uploaded parts size not match - {Des_bucket}/{Des_key}')
  602. return "ERR"
  603. completeStructJSON = {"Parts": uploadedListPartsClean}
  604. # S3合并multipart upload任务
  605. try:
  606. logger.info(f'Try to merge multipart upload {Des_bucket}/{Des_key}')
  607. response_complete = s3_des_client.complete_multipart_upload(
  608. Bucket=Des_bucket,
  609. Key=Des_key,
  610. UploadId=uploadId,
  611. MultipartUpload=completeStructJSON
  612. )
  613. result = response_complete['ETag']
  614. logger.info(f'Merged: {Des_bucket}/{Des_key}')
  615. except Exception as e:
  616. logger.error(f'Fail to complete multipart upload {Des_bucket}/{Des_key}, {str(e)}')
  617. return "ERR"
  618. logger.info(f'Complete merge file {Des_bucket}/{Des_key}')
  619. return result
  620. # Continuely get job message to invoke one processor per job
  621. def job_looper(*, sqs, sqs_queue, table, s3_src_client, s3_des_client, instance_id,
  622. StorageClass, ChunkSize, MaxRetry, MaxThread, ResumableThreshold,
  623. JobTimeout, ifVerifyMD5Twice, CleanUnfinishedUpload,
  624. Des_bucket_default, Des_prefix_default, UpdateVersionId, GetObjectWithVersionId):
  625. while True:
  626. # Get Job from sqs
  627. try:
  628. logger.info('Get Job from sqs queue...')
  629. sqs_job_get = sqs.receive_message(QueueUrl=sqs_queue)
  630. # Empty queue message available
  631. if 'Messages' not in sqs_job_get: # No message on sqs queue
  632. logger.info('No message in queue available, wait...')
  633. time.sleep(60)
  634. # 拿到 Job message
  635. else:
  636. # TODO: 尚未完整处理 SQS 存在多条消息的情况的意外中断处理,建议只用于针对一次取一个SQS消息
  637. for sqs_job in sqs_job_get["Messages"]:
  638. job = json.loads(sqs_job["Body"])
  639. job_receipt = sqs_job["ReceiptHandle"] # 用于后面删除message
  640. # 判断是S3来的消息,而不是jodsender来的就转换一下
  641. if 'Records' in job: # S3来的消息带着'Records'
  642. for One_record in job['Records']:
  643. if 's3' in One_record:
  644. Src_bucket = One_record['s3']['bucket']['name']
  645. Src_key = One_record['s3']['object']['key']
  646. Src_key = urllib.parse.unquote_plus(Src_key)
  647. Size = One_record['s3']['object']['size']
  648. if "versionId" in One_record['s3']['object']:
  649. versionId = One_record['s3']['object']['versionId']
  650. else:
  651. versionId = 'null'
  652. Des_bucket, Des_prefix = Des_bucket_default, Des_prefix_default
  653. Des_key = str(PurePosixPath(Des_prefix) / Src_key)
  654. if Src_key[-1] == '/': # 针对空目录对象
  655. Des_key += '/'
  656. job = {
  657. 'Src_bucket': Src_bucket,
  658. 'Src_key': Src_key,
  659. 'Size': Size,
  660. 'Des_bucket': Des_bucket,
  661. 'Des_key': Des_key,
  662. 'versionId': versionId
  663. }
  664. if 'Des_bucket' not in job and 'Event' not in job:
  665. logger.warning(f'Wrong sqs job: {json.dumps(job, default=str)}')
  666. logger.warning('Try to handle next message')
  667. time.sleep(1)
  668. continue
  669. if 'versionId' not in job:
  670. job['versionId'] = 'null'
  671. # 主流程
  672. if 'Event' not in job:
  673. if job['Size'] > ResumableThreshold:
  674. upload_etag_full = step_function(
  675. job=job,
  676. table=table,
  677. s3_src_client=s3_src_client,
  678. s3_des_client=s3_des_client,
  679. instance_id=instance_id,
  680. StorageClass=StorageClass,
  681. ChunkSize=ChunkSize,
  682. MaxRetry=MaxRetry,
  683. MaxThread=MaxThread,
  684. JobTimeout=JobTimeout,
  685. ifVerifyMD5Twice=ifVerifyMD5Twice,
  686. CleanUnfinishedUpload=CleanUnfinishedUpload,
  687. UpdateVersionId=UpdateVersionId,
  688. GetObjectWithVersionId=GetObjectWithVersionId
  689. )
  690. else:
  691. upload_etag_full = step_fn_small_file(
  692. job=job,
  693. table=table,
  694. s3_src_client=s3_src_client,
  695. s3_des_client=s3_des_client,
  696. instance_id=instance_id,
  697. StorageClass=StorageClass,
  698. MaxRetry=MaxRetry,
  699. UpdateVersionId=UpdateVersionId,
  700. GetObjectWithVersionId=GetObjectWithVersionId
  701. )
  702. else:
  703. if job['Event'] == 's3:TestEvent':
  704. logger.info('Skip s3:TestEvent')
  705. upload_etag_full = "s3:TestEvent"
  706. else:
  707. upload_etag_full = "OtherEvent"
  708. # Del Job on sqs
  709. logger.info(f'upload_etag_full={upload_etag_full}, job={str(job)}')
  710. if upload_etag_full != "TIMEOUT":
  711. # 如果是超时的就不删SQS消息,是正常结束或QUIT就删
  712. # QUIT 是 NoSuchUpload, NoSuchKey, AccessDenied,可以认为没必要再让下一个worker再试了
  713. # 直接删除SQS,并且DDB并不会记录结束状态
  714. try:
  715. logger.info(f'Try to finsh job message on sqs. {str(job)}')
  716. sqs.delete_message(
  717. QueueUrl=sqs_queue,
  718. ReceiptHandle=job_receipt
  719. )
  720. except Exception as e:
  721. logger.error(f'Fail to delete sqs message: {str(sqs_job)}, {str(e)}')
  722. except Exception as e:
  723. logger.error(f'Fail. Wait for 5 seconds. ERR: {str(e)}')
  724. time.sleep(5)
  725. # Finish Job, go back to get next job in queue
  726. # 清理S3上现有未完成的Multipart Upload ID(当前Job)
  727. def clean_multipart_upload(*, s3_client, multipart_uploaded_list, Des_bucket):
  728. for clean_i in multipart_uploaded_list:
  729. try:
  730. s3_client.abort_multipart_upload(
  731. Bucket=Des_bucket,
  732. Key=clean_i["Key"],
  733. UploadId=clean_i["UploadId"]
  734. )
  735. logger.info(f'CLEAN FINISHED - {str(multipart_uploaded_list)}')
  736. except Exception as e:
  737. logger.error(f'Fail to clean - {str(multipart_uploaded_list)} - {str(e)}')
  738. # Steps func for multipart upload for one job
  739. def step_function(*, job, table, s3_src_client, s3_des_client, instance_id,
  740. StorageClass, ChunkSize, MaxRetry, MaxThread,
  741. JobTimeout, ifVerifyMD5Twice, CleanUnfinishedUpload, UpdateVersionId, GetObjectWithVersionId):
  742. # 正常开始处理
  743. Src_bucket = job['Src_bucket']
  744. Src_key = job['Src_key']
  745. Size = job['Size']
  746. Des_bucket = job['Des_bucket']
  747. Des_key = job['Des_key']
  748. versionId = job['versionId']
  749. upload_etag_full = ""
  750. logger.info(f'Start multipart: {Src_bucket}/{Src_key}, Size: {Size}, versionId: {versionId}')
  751. # Get dest s3 unfinish multipart upload of this file
  752. multipart_uploaded_list = get_uploaded_list(
  753. s3_client=s3_des_client,
  754. Des_bucket=Des_bucket,
  755. Des_key=Des_key
  756. )
  757. # 设置了CleanUnfinishedUpload,就清理S3上现有未完成的Multipart Upload ID(当前Job),不做断点续传
  758. if multipart_uploaded_list and CleanUnfinishedUpload:
  759. logger.warning(f'You set CleanUnfinishedUpload. Now clean: {str(multipart_uploaded_list)}')
  760. clean_multipart_upload(
  761. s3_client=s3_des_client,
  762. multipart_uploaded_list=multipart_uploaded_list,
  763. Des_bucket=Des_bucket
  764. )
  765. multipart_uploaded_list = []
  766. # 开始 Job 步骤
  767. # 循环重试3次(如果MD5计算的ETag,或VersionID不一致)
  768. for md5_retry in range(Max_md5_retry + 1):
  769. # Job 准备
  770. # 检查文件没Multipart UploadID要新建, 有则 return UploadID
  771. response_check_upload = check_file_exist(
  772. prefix_and_key=Des_key,
  773. UploadIdList=multipart_uploaded_list
  774. )
  775. if response_check_upload == 'UPLOAD':
  776. try:
  777. logger.info(f'Create multipart upload - {Des_bucket}/{Des_key}')
  778. response_new_upload = s3_des_client.create_multipart_upload(
  779. Bucket=Des_bucket,
  780. Key=Des_key,
  781. StorageClass=StorageClass
  782. )
  783. # If update versionID enabled, update s3 versionID
  784. # 但可能会出现中断重传的时候,拿到了另一个新version,从而导致文件半老半新,所以需要在最后完成时候校验一次versionId
  785. if UpdateVersionId:
  786. versionId, Size = head_s3_version(
  787. s3_src_client=s3_src_client,
  788. Src_bucket=Src_bucket,
  789. Src_key=Src_key
  790. )
  791. if versionId == "ERR":
  792. break
  793. job['versionId'] = versionId
  794. job['Size'] = Size
  795. reponse_uploadId = response_new_upload["UploadId"]
  796. partnumberList = []
  797. # Write log to DDB in first round of job
  798. # ddb_first_round(table, Src_bucket, Src_key, Size, versionId)
  799. except Exception as e:
  800. logger.error(f'Fail to create new multipart upload - {Des_bucket}/{Des_key} - {str(e)}')
  801. upload_etag_full = "ERR"
  802. break
  803. else:
  804. reponse_uploadId = response_check_upload
  805. logger.info(f'Resume upload id: {Des_bucket}/{Des_key}')
  806. # 获取已上传partnumberList
  807. partnumberList = checkPartnumberList(
  808. Des_bucket=Des_bucket,
  809. Des_key=Des_key,
  810. uploadId=reponse_uploadId,
  811. s3_des_client=s3_des_client
  812. )
  813. # Get versionId/Size from DDB,如果传输前或中断后文件被替换,Size可能改变,所以重新取启动时候的versionId和当时的Size
  814. if UpdateVersionId:
  815. versionId, Size = ddb_get(
  816. table=table,
  817. Src_bucket=Src_bucket,
  818. Src_key=Src_key
  819. )
  820. if versionId == "ERR":
  821. break
  822. job['versionId'] = versionId
  823. job['Size'] = Size
  824. # 获取文件拆分片索引列表,例如[0, 10, 20]
  825. indexList, ChunkSize_auto = split(
  826. Size,
  827. ChunkSize
  828. ) # 对于大于10000分片的大文件,自动调整为Chunksize_auto
  829. # Write log to DDB in first round of job
  830. percent = int(len(partnumberList) / len(indexList) * 100)
  831. # ddb_this_round(table, percent, Src_bucket, Src_key, instance_id)
  832. ddb_start(table=table,
  833. percent=percent,
  834. job=job,
  835. instance_id=instance_id,
  836. new_upload=(response_check_upload == 'UPLOAD'))
  837. # Job Thread: uploadPart, 超时或Key不对返回 TIMEOUT/QUIT
  838. upload_etag_full = job_processor(
  839. uploadId=reponse_uploadId,
  840. indexList=indexList,
  841. partnumberList=partnumberList,
  842. job=job,
  843. s3_src_client=s3_src_client,
  844. s3_des_client=s3_des_client,
  845. MaxThread=MaxThread,
  846. ChunkSize=ChunkSize_auto, # 对单个文件使用自动调整的 Chunksize_auto
  847. MaxRetry=MaxRetry,
  848. JobTimeout=JobTimeout,
  849. ifVerifyMD5Twice=ifVerifyMD5Twice,
  850. GetObjectWithVersionId=GetObjectWithVersionId
  851. )
  852. if upload_etag_full == "TIMEOUT" or upload_etag_full == "QUIT":
  853. logger.warning(f'Quit job upload_etag_full == {upload_etag_full} - {str(job)}')
  854. break # 退出处理该Job
  855. elif upload_etag_full == "ERR":
  856. # 清掉已上传id,以便重新上传
  857. logger.error(f'upload_etag_full ERR - {str(job)}')
  858. clean_multipart_upload(
  859. s3_client=s3_des_client,
  860. multipart_uploaded_list=multipart_uploaded_list,
  861. Des_bucket=Des_bucket
  862. )
  863. multipart_uploaded_list = []
  864. if md5_retry >= Max_md5_retry:
  865. logger.error(f'Quit job upload_etag_full ERR - {upload_etag_full} - {str(job)}')
  866. return 'ERR'
  867. continue
  868. # 循环重试Job
  869. # 合并S3上的文件
  870. complete_etag = completeUpload(
  871. uploadId=reponse_uploadId,
  872. Des_bucket=Des_bucket,
  873. Des_key=Des_key,
  874. len_indexList=len(indexList),
  875. s3_des_client=s3_des_client
  876. )
  877. if complete_etag == "ERR":
  878. # 清掉已上传id,以便重新上传
  879. logger.error(f'complete_etag ERR - {str(job)}')
  880. clean_multipart_upload(
  881. s3_client=s3_des_client,
  882. multipart_uploaded_list=multipart_uploaded_list,
  883. Des_bucket=Des_bucket
  884. )
  885. multipart_uploaded_list = []
  886. if md5_retry >= Max_md5_retry:
  887. logger.error(f'Quit job complete_etag - {upload_etag_full} - {str(job)}')
  888. return 'ERR'
  889. continue
  890. # 循环重试Job
  891. # 检查文件MD5
  892. if ifVerifyMD5Twice and complete_etag != "QUIT": # QUIT是因为重复ID,所以就不检查了
  893. if complete_etag == upload_etag_full:
  894. logger.info(f'MD5 ETag Matched - {Des_bucket}/{Des_key} - {complete_etag}')
  895. break # 结束本文件,下一个sqs job
  896. else: # ETag 不匹配,删除目的S3的文件,重试
  897. logger.error(f'MD5 ETag NOT MATCHED {Des_bucket}/{Des_key}( Destination / Origin ): '
  898. f'{complete_etag} - {upload_etag_full}')
  899. del_des_s3_object(
  900. s3_des_client=s3_des_client,
  901. Des_bucket=Des_bucket,
  902. Des_key=Des_key
  903. )
  904. # 清掉已上传id,以便重新上传
  905. logger.error(f'complete_etag != upload_etag_full ERR - {str(job)}')
  906. clean_multipart_upload(
  907. s3_client=s3_des_client,
  908. multipart_uploaded_list=multipart_uploaded_list,
  909. Des_bucket=Des_bucket
  910. )
  911. multipart_uploaded_list = []
  912. if md5_retry >= Max_md5_retry:
  913. logger.error(f'Quit job ifVerifyMD5Twice - {upload_etag_full} - {str(job)}')
  914. return 'ERR'
  915. continue
  916. # 重新执行Job
  917. # DynamoDB log: ADD status: DONE/ERR(upload_etag_full)
  918. ddb_complete(
  919. upload_etag_full=upload_etag_full,
  920. table=table,
  921. Src_bucket=Src_bucket,
  922. Src_key=Src_key
  923. )
  924. # 正常结束 md5_retry 循环
  925. break
  926. # END md5_retry 超过次数
  927. # complete one job
  928. return upload_etag_full
  929. # delete des s3 object
  930. def del_des_s3_object(*, s3_des_client, Des_bucket, Des_key):
  931. logger.info(f'Delete {Des_bucket}/{Des_key}')
  932. try:
  933. s3_des_client.delete_object(
  934. Bucket=Des_bucket,
  935. Key=Des_key
  936. )
  937. except Exception as e:
  938. logger.error(f'Fail to delete S3 object - {Des_bucket}/{Des_key} - {str(e)}')
  939. # Get iteam versionId and Size from DDB
  940. def ddb_get(*, table, Src_bucket, Src_key):
  941. logger.info(f'Get versionId and Size from DDB - {Src_bucket}/{Src_key}')
  942. table_key = str(PurePosixPath(Src_bucket) / Src_key)
  943. if Src_key[-1] == '/': # 针对空目录对象
  944. table_key += '/'
  945. try:
  946. response = table.get_item(
  947. Key={"Key": table_key},
  948. AttributesToGet=["versionId", "Size"]
  949. )
  950. versionId = response['Item']['versionId']
  951. Size = response['Item']['Size']
  952. logger.info(f'Got VersionId: {versionId} - {Src_bucket}/{Src_key} - Size:{Size}')
  953. except Exception as e:
  954. logger.error(f'Fail to Get versionId and Size from DDB - {Src_bucket}/{Src_key} - {str(e)}')
  955. return 'ERR', 0
  956. return versionId, Size
  957. # # Write log to DDB in first round of job
  958. def ddb_start(*, table, percent, job, instance_id, new_upload):
  959. Src_bucket = job['Src_bucket']
  960. Src_key = job['Src_key']
  961. Size = job['Size']
  962. Des_bucket = job['Des_bucket']
  963. Des_key = job['Des_key']
  964. versionId = job['versionId']
  965. logger.info(f'Write log to DDB start job - {Src_bucket}/{Src_key}')
  966. cur_time = time.time()
  967. table_key = str(PurePosixPath(Src_bucket) / Src_key)
  968. if Src_key[-1] == '/': # 针对空目录对象
  969. table_key += '/'
  970. UpdateExpression = "ADD instanceID :id, tryTimes :t, startTime_f :s_format " \
  971. "SET lastTimeProgress=:p, Size=:size, desBucket=:b, desKey=:k"
  972. ExpressionAttributeValues = {
  973. ":t": 1,
  974. ":id": {instance_id},
  975. ":s_format": {time.asctime(time.localtime(cur_time))},
  976. ":size": Size,
  977. ":p": percent,
  978. ":b": Des_bucket,
  979. ":k": Des_key
  980. }
  981. if new_upload:
  982. logger.info(f'Update DDB <firstTime> - {Src_bucket}/{Src_key}')
  983. ExpressionAttributeValues[":s"] = int(cur_time)
  984. ExpressionAttributeValues[":v"] = versionId
  985. UpdateExpression += ", firstTime =:s, versionId =:v"
  986. try:
  987. table.update_item(
  988. Key={"Key": table_key},
  989. UpdateExpression=UpdateExpression,
  990. ExpressionAttributeValues=ExpressionAttributeValues
  991. )
  992. except Exception as e:
  993. # 日志写不了
  994. logger.error(f'Fail to put log to DDB at start job - {Src_bucket}/{Src_key} - {str(e)}')
  995. return
  996. # DynamoDB log: ADD status: DONE/ERR(upload_etag_full)
  997. def ddb_complete(*, upload_etag_full, table, Src_bucket, Src_key):
  998. if upload_etag_full not in ["TIMEOUT", "ERR", "QUIT"]:
  999. status = "DONE"
  1000. else:
  1001. status = upload_etag_full
  1002. cur_time = time.time()
  1003. table_key = str(PurePosixPath(Src_bucket) / Src_key)
  1004. if Src_key[-1] == '/': # 针对空目录对象
  1005. table_key += '/'
  1006. UpdateExpression = "ADD jobStatus :done SET totalSpentTime=:s-firstTime, endTime=:s, endTime_f=:e"
  1007. ExpressionAttributeValues = {
  1008. ":done": {status},
  1009. ":s": int(cur_time),
  1010. ":e": time.asctime(time.localtime(cur_time))
  1011. }
  1012. # 正常写DDB,如果是异常的就不加这个lastTimeProgress=100
  1013. if status == "DONE":
  1014. UpdateExpression += ", lastTimeProgress=:p"
  1015. ExpressionAttributeValues[":p"] = 100
  1016. # update DDB
  1017. logger.info(f'Write job complete status to DDB: {status} - {Src_bucket}/{Src_key}')
  1018. try:
  1019. table.update_item(
  1020. Key={"Key": table_key},
  1021. UpdateExpression=UpdateExpression,
  1022. ExpressionAttributeValues=ExpressionAttributeValues
  1023. )
  1024. except Exception as e:
  1025. logger.error(f'Fail to put log to DDB at end - {Src_bucket}/{Src_key} - {str(e)}')
  1026. return
  1027. def step_fn_small_file(*, job, table, s3_src_client, s3_des_client, instance_id, StorageClass, MaxRetry,
  1028. UpdateVersionId, GetObjectWithVersionId):
  1029. # 开始处理小文件
  1030. Src_bucket = job['Src_bucket']
  1031. Src_key = job['Src_key']
  1032. Size = job['Size']
  1033. Des_bucket = job['Des_bucket']
  1034. Des_key = job['Des_key']
  1035. versionId = job['versionId']
  1036. # If update versionID enabled, update s3 versionID
  1037. if UpdateVersionId:
  1038. versionId = head_s3_version(
  1039. s3_src_client=s3_src_client,
  1040. Src_bucket=Src_bucket,
  1041. Src_key=Src_key
  1042. )
  1043. job['versionId'] = versionId
  1044. logger.info(f'Start small: {Src_bucket}/{Src_key}, Size: {Size}, versionId: {versionId}')
  1045. # Write DDB log for first round
  1046. ddb_start(table=table,
  1047. percent=0,
  1048. job=job,
  1049. instance_id=instance_id,
  1050. new_upload=True)
  1051. upload_etag_full = []
  1052. for retryTime in range(MaxRetry + 1):
  1053. try:
  1054. # Get object
  1055. logger.info(f'--->Downloading {Size} Bytes {Src_bucket}/{Src_key} - Small file 1/1')
  1056. if GetObjectWithVersionId:
  1057. response_get_object = s3_src_client.get_object(
  1058. Bucket=Src_bucket,
  1059. Key=Src_key,
  1060. VersionId=versionId
  1061. )
  1062. else:
  1063. response_get_object = s3_src_client.get_object(
  1064. Bucket=Src_bucket,
  1065. Key=Src_key
  1066. )
  1067. getBody = response_get_object["Body"].read()
  1068. chunkdata_md5 = hashlib.md5(getBody)
  1069. ContentMD5 = base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
  1070. # Put object
  1071. logger.info(f'--->Uploading {Size} Bytes {Des_bucket}/{Des_key} - Small file 1/1')
  1072. response_put_object = s3_des_client.put_object(
  1073. Body=getBody,
  1074. Bucket=Des_bucket,
  1075. Key=Des_key,
  1076. ContentMD5=ContentMD5,
  1077. StorageClass=StorageClass
  1078. )
  1079. # 请求已经带上md5,如果s3校验是错的就Exception
  1080. upload_etag_full = response_put_object['ETag']
  1081. # 结束 Upload/download
  1082. break
  1083. except ClientError as e:
  1084. if e.response['Error']['Code'] in ['NoSuchKey', 'AccessDenied']:
  1085. logger.error(f"Fail to access {Src_bucket}/{Src_key} - ERR: {str(e)}.")
  1086. return "QUIT"
  1087. logger.warning(f'Download/Upload small file Fail: {Src_bucket}/{Src_key}, '
  1088. f'{str(e)}, Attempts: {retryTime}')
  1089. if retryTime >= MaxRetry:
  1090. logger.error(f'Fail MaxRetry Download/Upload small file: {Des_bucket}/{Des_key}')
  1091. return "TIMEOUT"
  1092. else:
  1093. time.sleep(5 * retryTime)
  1094. except Exception as e:
  1095. logger.error(f'Fail in step_fn_small - {Des_bucket}/{Des_key} - {str(e)}')
  1096. return "TIMEOUT"
  1097. # Write DDB log for complete
  1098. ddb_complete(
  1099. upload_etag_full=upload_etag_full,
  1100. table=table,
  1101. Src_bucket=Src_bucket,
  1102. Src_key=Src_key
  1103. )
  1104. # complete one job
  1105. return upload_etag_full