def upload_file(*, srcfile, desFilelist, UploadIdList, ChunkSize_default): # UploadIdList就是multipart_uploaded_list logger.info(f'Start file: {srcfile["Key"]}') prefix_and_key = srcfile["Key"] if JobType == 'LOCAL_TO_S3': prefix_and_key = str(PurePosixPath(S3Prefix) / srcfile["Key"]) if srcfile['Size'] >= ChunkSize_default: try: # 循环重试3次(如果MD5计算的ETag不一致) for md5_retry in range(3): # 检查文件是否已存在,存在不继续、不存在且没UploadID要新建、不存在但有UploadID得到返回的UploadID response_check_upload = check_file_exist(srcfile=srcfile, desFilelist=desFilelist, UploadIdList=UploadIdList) if response_check_upload == 'UPLOAD': logger.info(f'New upload: {srcfile["Key"]}') response_new_upload = s3_dest_client.create_multipart_upload( Bucket=DesBucket, Key=prefix_and_key, StorageClass=StorageClass ) # logger.info("UploadId: "+response_new_upload["UploadId"]) reponse_uploadId = response_new_upload["UploadId"] partnumberList = [] elif response_check_upload == 'NEXT': logger.info(f'Duplicated. {srcfile["Key"]} same size, goto next file.') raise NextFile() else: reponse_uploadId = response_check_upload # 获取已上传partnumberList partnumberList = checkPartnumberList(srcfile, reponse_uploadId) # 获取索引列表,例如[0, 10, 20] response_indexList, ChunkSize_auto = split(srcfile, ChunkSize_default) # 执行分片upload upload_etag_full = uploadPart(uploadId=reponse_uploadId, indexList=response_indexList, partnumberList=partnumberList, srcfile=srcfile, ChunkSize_auto=ChunkSize_auto) # 合并S3上的文件 response_complete = completeUpload(reponse_uploadId=reponse_uploadId, srcfileKey=srcfile["Key"], len_indexList=len(response_indexList)) logger.info(f'FINISH: {srcfile["Key"]} TO {response_complete["Location"]}') # 检查文件MD5 if ifVerifyMD5: if response_complete["ETag"] == upload_etag_full: logger.info(f'MD5 ETag Matched - {srcfile["Key"]} - {response_complete["ETag"]}') break else: # ETag 不匹配,删除S3的文件,重试 logger.warning(f'MD5 ETag NOT MATCHED {srcfile["Key"]}( Destination / Origin ): ' f'{response_complete["ETag"]} - {upload_etag_full}') s3_dest_client.delete_object( Bucket=DesBucket, Key=prefix_and_key ) UploadIdList = [] logger.warning('Deleted and retry upload {srcfile["Key"]}') if md5_retry == 2: logger.warning('MD5 ETag NOT MATCHED Exceed Max Retries - {srcfile["Key"]}') else: break except NextFile: pass # Small file procedure else: # Check file exist for f in desFilelist: if f["Key"] == prefix_and_key and \ (srcfile["Size"] == f["Size"]): logger.info(f'Duplicated. {prefix_and_key} same size, goto next file.') return # 找不到文件,或文件Size不一致 Submit upload if JobType == 'LOCAL_TO_S3': uploadThread_small(srcfile, prefix_and_key) elif JobType == 'S3_TO_S3': download_uploadThread_small(srcfile["Key"]) elif JobType == 'ALIOSS_TO_S3': alioss_download_uploadThread_small(srcfile["Key"]) return