s3_upload_19.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. def uploadThread(*, uploadId, partnumber, partStartIndex, srcfileKey, total, md5list, dryrun, complete_list, ChunkSize):
  2. prefix_and_key = str(PurePosixPath(S3Prefix) / srcfileKey)
  3. if not dryrun:
  4. print(f'\033[0;32;1m--->Uploading\033[0m {srcfileKey} - {partnumber}/{total}')
  5. pstart_time = time.time()
  6. with open(os.path.join(SrcDir, srcfileKey), 'rb') as data:
  7. retryTime = 0
  8. while retryTime <= MaxRetry:
  9. try:
  10. data.seek(partStartIndex)
  11. chunkdata = data.read(ChunkSize)
  12. chunkdata_md5 = hashlib.md5(chunkdata)
  13. md5list[partnumber - 1] = chunkdata_md5
  14. if not dryrun:
  15. s3_dest_client.upload_part(
  16. Body=chunkdata,
  17. Bucket=DesBucket,
  18. Key=prefix_and_key,
  19. PartNumber=partnumber,
  20. UploadId=uploadId,
  21. ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
  22. )
  23. # 这里对单个part上传做了 MD5 校验,后面多part合并的时候会再做一次整个文件的
  24. break
  25. except Exception as err:
  26. retryTime += 1
  27. logger.info(f'UploadThreadFunc log: {srcfileKey} - {str(err)}')
  28. logger.info(f'Upload Fail - {srcfileKey} - Retry part - {partnumber} - Attempt - {retryTime}')
  29. if retryTime > MaxRetry:
  30. logger.error(f'Quit for Max retries: {retryTime}')
  31. input('PRESS ENTER TO QUIT')
  32. sys.exit(0)
  33. time.sleep(5 * retryTime) # 递增延迟重试
  34. complete_list.append(partnumber)
  35. pload_time = time.time() - pstart_time
  36. pload_bytes = len(chunkdata)
  37. pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
  38. if not dryrun:
  39. print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
  40. f'- {partnumber}/{total} \033[0;34;1m{len(complete_list) / total:.2%} - {pload_speed}\033[0m')
  41. return