s3_upload_12.py 4.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. def upload_file(*, srcfile, desFilelist, UploadIdList, ChunkSize_default): # UploadIdList就是multipart_uploaded_list
  2. logger.info(f'Start file: {srcfile["Key"]}')
  3. prefix_and_key = srcfile["Key"]
  4. if JobType == 'LOCAL_TO_S3':
  5. prefix_and_key = str(PurePosixPath(S3Prefix) / srcfile["Key"])
  6. if srcfile['Size'] >= ChunkSize_default:
  7. try:
  8. # 循环重试3次(如果MD5计算的ETag不一致)
  9. for md5_retry in range(3):
  10. # 检查文件是否已存在,存在不继续、不存在且没UploadID要新建、不存在但有UploadID得到返回的UploadID
  11. response_check_upload = check_file_exist(srcfile=srcfile,
  12. desFilelist=desFilelist,
  13. UploadIdList=UploadIdList)
  14. if response_check_upload == 'UPLOAD':
  15. logger.info(f'New upload: {srcfile["Key"]}')
  16. response_new_upload = s3_dest_client.create_multipart_upload(
  17. Bucket=DesBucket,
  18. Key=prefix_and_key,
  19. StorageClass=StorageClass
  20. )
  21. # logger.info("UploadId: "+response_new_upload["UploadId"])
  22. reponse_uploadId = response_new_upload["UploadId"]
  23. partnumberList = []
  24. elif response_check_upload == 'NEXT':
  25. logger.info(f'Duplicated. {srcfile["Key"]} same size, goto next file.')
  26. raise NextFile()
  27. else:
  28. reponse_uploadId = response_check_upload
  29. # 获取已上传partnumberList
  30. partnumberList = checkPartnumberList(srcfile, reponse_uploadId)
  31. # 获取索引列表,例如[0, 10, 20]
  32. response_indexList, ChunkSize_auto = split(srcfile, ChunkSize_default)
  33. # 执行分片upload
  34. upload_etag_full = uploadPart(uploadId=reponse_uploadId,
  35. indexList=response_indexList,
  36. partnumberList=partnumberList,
  37. srcfile=srcfile,
  38. ChunkSize_auto=ChunkSize_auto)
  39. # 合并S3上的文件
  40. response_complete = completeUpload(reponse_uploadId=reponse_uploadId,
  41. srcfileKey=srcfile["Key"],
  42. len_indexList=len(response_indexList))
  43. logger.info(f'FINISH: {srcfile["Key"]} TO {response_complete["Location"]}')
  44. # 检查文件MD5
  45. if ifVerifyMD5:
  46. if response_complete["ETag"] == upload_etag_full:
  47. logger.info(f'MD5 ETag Matched - {srcfile["Key"]} - {response_complete["ETag"]}')
  48. break
  49. else: # ETag 不匹配,删除S3的文件,重试
  50. logger.warning(f'MD5 ETag NOT MATCHED {srcfile["Key"]}( Destination / Origin ): '
  51. f'{response_complete["ETag"]} - {upload_etag_full}')
  52. s3_dest_client.delete_object(
  53. Bucket=DesBucket,
  54. Key=prefix_and_key
  55. )
  56. UploadIdList = []
  57. logger.warning('Deleted and retry upload {srcfile["Key"]}')
  58. if md5_retry == 2:
  59. logger.warning('MD5 ETag NOT MATCHED Exceed Max Retries - {srcfile["Key"]}')
  60. else:
  61. break
  62. except NextFile:
  63. pass
  64. # Small file procedure
  65. else:
  66. # Check file exist
  67. for f in desFilelist:
  68. if f["Key"] == prefix_and_key and \
  69. (srcfile["Size"] == f["Size"]):
  70. logger.info(f'Duplicated. {prefix_and_key} same size, goto next file.')
  71. return
  72. # 找不到文件,或文件Size不一致 Submit upload
  73. if JobType == 'LOCAL_TO_S3':
  74. uploadThread_small(srcfile, prefix_and_key)
  75. elif JobType == 'S3_TO_S3':
  76. download_uploadThread_small(srcfile["Key"])
  77. elif JobType == 'ALIOSS_TO_S3':
  78. alioss_download_uploadThread_small(srcfile["Key"])
  79. return