1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- def uploadPart(*, uploadId, indexList, partnumberList, srcfile, ChunkSize_auto):
- partnumber = 1 # 当前循环要上传的Partnumber
- total = len(indexList)
- md5list = [hashlib.md5(b'')] * total
- complete_list = []
- # 线程池Start
- with futures.ThreadPoolExecutor(max_workers=MaxThread) as pool:
- for partStartIndex in indexList:
- # start to upload part
- if partnumber not in partnumberList:
- dryrun = False
- else:
- dryrun = True
- # upload 1 part/thread, or dryrun to only caculate md5
- if JobType == 'LOCAL_TO_S3':
- pool.submit(uploadThread,
- uploadId=uploadId,
- partnumber=partnumber,
- partStartIndex=partStartIndex,
- srcfileKey=srcfile["Key"],
- total=total,
- md5list=md5list,
- dryrun=dryrun,
- complete_list=complete_list,
- ChunkSize=ChunkSize_auto)
- elif JobType == 'S3_TO_S3':
- pool.submit(download_uploadThread,
- uploadId=uploadId,
- partnumber=partnumber,
- partStartIndex=partStartIndex,
- srcfileKey=srcfile["Key"],
- total=total,
- md5list=md5list,
- dryrun=dryrun,
- complete_list=complete_list,
- ChunkSize=ChunkSize_auto)
- elif JobType == 'ALIOSS_TO_S3':
- pool.submit(alioss_download_uploadThread,
- uploadId=uploadId,
- partnumber=partnumber,
- partStartIndex=partStartIndex,
- srcfileKey=srcfile["Key"],
- srcfileSize=srcfile["Size"],
- total=total,
- md5list=md5list,
- dryrun=dryrun,
- complete_list=complete_list,
- ChunkSize=ChunkSize_auto)
- partnumber += 1
- # 线程池End
- logger.info(f'All parts uploaded - {srcfile["Key"]} - size: {srcfile["Size"]}')
- # Local upload 的时候考虑传输过程中文件会变更的情况,重新扫描本地文件的MD5,而不是用之前读取的body去生成的md5list
- if ifVerifyMD5 and JobType == 'LOCAL_TO_S3':
- md5list = cal_md5list(indexList=indexList,
- srcfileKey=srcfile["Key"],
- ChunkSize=ChunkSize_auto)
- # 计算所有分片列表的总etag: cal_etag
- digests = b"".join(m.digest() for m in md5list)
- md5full = hashlib.md5(digests)
- cal_etag = '"%s-%s"' % (md5full.hexdigest(), len(md5list))
- return cal_etag
|