s3_upload_16.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. def uploadPart(*, uploadId, indexList, partnumberList, srcfile, ChunkSize_auto):
  2. partnumber = 1 # 当前循环要上传的Partnumber
  3. total = len(indexList)
  4. md5list = [hashlib.md5(b'')] * total
  5. complete_list = []
  6. # 线程池Start
  7. with futures.ThreadPoolExecutor(max_workers=MaxThread) as pool:
  8. for partStartIndex in indexList:
  9. # start to upload part
  10. if partnumber not in partnumberList:
  11. dryrun = False
  12. else:
  13. dryrun = True
  14. # upload 1 part/thread, or dryrun to only caculate md5
  15. if JobType == 'LOCAL_TO_S3':
  16. pool.submit(uploadThread,
  17. uploadId=uploadId,
  18. partnumber=partnumber,
  19. partStartIndex=partStartIndex,
  20. srcfileKey=srcfile["Key"],
  21. total=total,
  22. md5list=md5list,
  23. dryrun=dryrun,
  24. complete_list=complete_list,
  25. ChunkSize=ChunkSize_auto)
  26. elif JobType == 'S3_TO_S3':
  27. pool.submit(download_uploadThread,
  28. uploadId=uploadId,
  29. partnumber=partnumber,
  30. partStartIndex=partStartIndex,
  31. srcfileKey=srcfile["Key"],
  32. total=total,
  33. md5list=md5list,
  34. dryrun=dryrun,
  35. complete_list=complete_list,
  36. ChunkSize=ChunkSize_auto)
  37. elif JobType == 'ALIOSS_TO_S3':
  38. pool.submit(alioss_download_uploadThread,
  39. uploadId=uploadId,
  40. partnumber=partnumber,
  41. partStartIndex=partStartIndex,
  42. srcfileKey=srcfile["Key"],
  43. srcfileSize=srcfile["Size"],
  44. total=total,
  45. md5list=md5list,
  46. dryrun=dryrun,
  47. complete_list=complete_list,
  48. ChunkSize=ChunkSize_auto)
  49. partnumber += 1
  50. # 线程池End
  51. logger.info(f'All parts uploaded - {srcfile["Key"]} - size: {srcfile["Size"]}')
  52. # Local upload 的时候考虑传输过程中文件会变更的情况,重新扫描本地文件的MD5,而不是用之前读取的body去生成的md5list
  53. if ifVerifyMD5 and JobType == 'LOCAL_TO_S3':
  54. md5list = cal_md5list(indexList=indexList,
  55. srcfileKey=srcfile["Key"],
  56. ChunkSize=ChunkSize_auto)
  57. # 计算所有分片列表的总etag: cal_etag
  58. digests = b"".join(m.digest() for m in md5list)
  59. md5full = hashlib.md5(digests)
  60. cal_etag = '"%s-%s"' % (md5full.hexdigest(), len(md5list))
  61. return cal_etag