s3_upload_22.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. def completeUpload(*, reponse_uploadId, srcfileKey, len_indexList):
  2. # 查询S3的所有Part列表uploadedListParts构建completeStructJSON
  3. prefix_and_key = srcfileKey
  4. if JobType == 'LOCAL_TO_S3':
  5. prefix_and_key = str(PurePosixPath(S3Prefix) / srcfileKey)
  6. uploadedListPartsClean = []
  7. PartNumberMarker = 0
  8. IsTruncated = True
  9. while IsTruncated:
  10. response_uploadedList = s3_dest_client.list_parts(
  11. Bucket=DesBucket,
  12. Key=prefix_and_key,
  13. UploadId=reponse_uploadId,
  14. MaxParts=1000,
  15. PartNumberMarker=PartNumberMarker
  16. )
  17. NextPartNumberMarker = response_uploadedList['NextPartNumberMarker']
  18. IsTruncated = response_uploadedList['IsTruncated']
  19. if NextPartNumberMarker > 0:
  20. for partObject in response_uploadedList["Parts"]:
  21. ETag = partObject["ETag"]
  22. PartNumber = partObject["PartNumber"]
  23. addup = {
  24. "ETag": ETag,
  25. "PartNumber": PartNumber
  26. }
  27. uploadedListPartsClean.append(addup)
  28. PartNumberMarker = NextPartNumberMarker
  29. if len(uploadedListPartsClean) != len_indexList:
  30. logger.warning(f'Uploaded parts size not match - {srcfileKey}')
  31. input('PRESS ENTER TO QUIT')
  32. sys.exit(0)
  33. completeStructJSON = {"Parts": uploadedListPartsClean}
  34. # S3合并multipart upload任务
  35. response_complete = s3_dest_client.complete_multipart_upload(
  36. Bucket=DesBucket,
  37. Key=prefix_and_key,
  38. UploadId=reponse_uploadId,
  39. MultipartUpload=completeStructJSON
  40. )
  41. logger.info(f'Complete merge file {srcfileKey}')
  42. return response_complete