def completeUpload(*, reponse_uploadId, srcfileKey, len_indexList): # 查询S3的所有Part列表uploadedListParts构建completeStructJSON prefix_and_key = srcfileKey if JobType == 'LOCAL_TO_S3': prefix_and_key = str(PurePosixPath(S3Prefix) / srcfileKey) uploadedListPartsClean = [] PartNumberMarker = 0 IsTruncated = True while IsTruncated: response_uploadedList = s3_dest_client.list_parts( Bucket=DesBucket, Key=prefix_and_key, UploadId=reponse_uploadId, MaxParts=1000, PartNumberMarker=PartNumberMarker ) NextPartNumberMarker = response_uploadedList['NextPartNumberMarker'] IsTruncated = response_uploadedList['IsTruncated'] if NextPartNumberMarker > 0: for partObject in response_uploadedList["Parts"]: ETag = partObject["ETag"] PartNumber = partObject["PartNumber"] addup = { "ETag": ETag, "PartNumber": PartNumber } uploadedListPartsClean.append(addup) PartNumberMarker = NextPartNumberMarker if len(uploadedListPartsClean) != len_indexList: logger.warning(f'Uploaded parts size not match - {srcfileKey}') input('PRESS ENTER TO QUIT') sys.exit(0) completeStructJSON = {"Parts": uploadedListPartsClean} # S3合并multipart upload任务 response_complete = s3_dest_client.complete_multipart_upload( Bucket=DesBucket, Key=prefix_and_key, UploadId=reponse_uploadId, MultipartUpload=completeStructJSON ) logger.info(f'Complete merge file {srcfileKey}') return response_complete