12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- def completeUpload(*, reponse_uploadId, srcfileKey, len_indexList):
- # 查询S3的所有Part列表uploadedListParts构建completeStructJSON
- prefix_and_key = srcfileKey
- if JobType == 'LOCAL_TO_S3':
- prefix_and_key = str(PurePosixPath(S3Prefix) / srcfileKey)
- uploadedListPartsClean = []
- PartNumberMarker = 0
- IsTruncated = True
- while IsTruncated:
- response_uploadedList = s3_dest_client.list_parts(
- Bucket=DesBucket,
- Key=prefix_and_key,
- UploadId=reponse_uploadId,
- MaxParts=1000,
- PartNumberMarker=PartNumberMarker
- )
- NextPartNumberMarker = response_uploadedList['NextPartNumberMarker']
- IsTruncated = response_uploadedList['IsTruncated']
- if NextPartNumberMarker > 0:
- for partObject in response_uploadedList["Parts"]:
- ETag = partObject["ETag"]
- PartNumber = partObject["PartNumber"]
- addup = {
- "ETag": ETag,
- "PartNumber": PartNumber
- }
- uploadedListPartsClean.append(addup)
- PartNumberMarker = NextPartNumberMarker
- if len(uploadedListPartsClean) != len_indexList:
- logger.warning(f'Uploaded parts size not match - {srcfileKey}')
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- completeStructJSON = {"Parts": uploadedListPartsClean}
- # S3合并multipart upload任务
- response_complete = s3_dest_client.complete_multipart_upload(
- Bucket=DesBucket,
- Key=prefix_and_key,
- UploadId=reponse_uploadId,
- MultipartUpload=completeStructJSON
- )
- logger.info(f'Complete merge file {srcfileKey}')
- return response_complete
|