12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- def upload_file(*, srcfile, desFilelist, UploadIdList, ChunkSize_default): # UploadIdList就是multipart_uploaded_list
- logger.info(f'Start file: {srcfile["Key"]}')
- prefix_and_key = srcfile["Key"]
- if JobType == 'LOCAL_TO_S3':
- prefix_and_key = str(PurePosixPath(S3Prefix) / srcfile["Key"])
- if srcfile['Size'] >= ChunkSize_default:
- try:
- # 循环重试3次(如果MD5计算的ETag不一致)
- for md5_retry in range(3):
- # 检查文件是否已存在,存在不继续、不存在且没UploadID要新建、不存在但有UploadID得到返回的UploadID
- response_check_upload = check_file_exist(srcfile=srcfile,
- desFilelist=desFilelist,
- UploadIdList=UploadIdList)
- if response_check_upload == 'UPLOAD':
- logger.info(f'New upload: {srcfile["Key"]}')
- response_new_upload = s3_dest_client.create_multipart_upload(
- Bucket=DesBucket,
- Key=prefix_and_key,
- StorageClass=StorageClass
- )
- # logger.info("UploadId: "+response_new_upload["UploadId"])
- reponse_uploadId = response_new_upload["UploadId"]
- partnumberList = []
- elif response_check_upload == 'NEXT':
- logger.info(f'Duplicated. {srcfile["Key"]} same size, goto next file.')
- raise NextFile()
- else:
- reponse_uploadId = response_check_upload
- # 获取已上传partnumberList
- partnumberList = checkPartnumberList(srcfile, reponse_uploadId)
- # 获取索引列表,例如[0, 10, 20]
- response_indexList, ChunkSize_auto = split(srcfile, ChunkSize_default)
- # 执行分片upload
- upload_etag_full = uploadPart(uploadId=reponse_uploadId,
- indexList=response_indexList,
- partnumberList=partnumberList,
- srcfile=srcfile,
- ChunkSize_auto=ChunkSize_auto)
- # 合并S3上的文件
- response_complete = completeUpload(reponse_uploadId=reponse_uploadId,
- srcfileKey=srcfile["Key"],
- len_indexList=len(response_indexList))
- logger.info(f'FINISH: {srcfile["Key"]} TO {response_complete["Location"]}')
- # 检查文件MD5
- if ifVerifyMD5:
- if response_complete["ETag"] == upload_etag_full:
- logger.info(f'MD5 ETag Matched - {srcfile["Key"]} - {response_complete["ETag"]}')
- break
- else: # ETag 不匹配,删除S3的文件,重试
- logger.warning(f'MD5 ETag NOT MATCHED {srcfile["Key"]}( Destination / Origin ): '
- f'{response_complete["ETag"]} - {upload_etag_full}')
- s3_dest_client.delete_object(
- Bucket=DesBucket,
- Key=prefix_and_key
- )
- UploadIdList = []
- logger.warning('Deleted and retry upload {srcfile["Key"]}')
- if md5_retry == 2:
- logger.warning('MD5 ETag NOT MATCHED Exceed Max Retries - {srcfile["Key"]}')
- else:
- break
- except NextFile:
- pass
- # Small file procedure
- else:
- # Check file exist
- for f in desFilelist:
- if f["Key"] == prefix_and_key and \
- (srcfile["Size"] == f["Size"]):
- logger.info(f'Duplicated. {prefix_and_key} same size, goto next file.')
- return
- # 找不到文件,或文件Size不一致 Submit upload
- if JobType == 'LOCAL_TO_S3':
- uploadThread_small(srcfile, prefix_and_key)
- elif JobType == 'S3_TO_S3':
- download_uploadThread_small(srcfile["Key"])
- elif JobType == 'ALIOSS_TO_S3':
- alioss_download_uploadThread_small(srcfile["Key"])
- return
|