def uploadPart(*, uploadId, indexList, partnumberList, srcfile, ChunkSize_auto): partnumber = 1 # 当前循环要上传的Partnumber total = len(indexList) md5list = [hashlib.md5(b'')] * total complete_list = [] # 线程池Start with futures.ThreadPoolExecutor(max_workers=MaxThread) as pool: for partStartIndex in indexList: # start to upload part if partnumber not in partnumberList: dryrun = False else: dryrun = True # upload 1 part/thread, or dryrun to only caculate md5 if JobType == 'LOCAL_TO_S3': pool.submit(uploadThread, uploadId=uploadId, partnumber=partnumber, partStartIndex=partStartIndex, srcfileKey=srcfile["Key"], total=total, md5list=md5list, dryrun=dryrun, complete_list=complete_list, ChunkSize=ChunkSize_auto) elif JobType == 'S3_TO_S3': pool.submit(download_uploadThread, uploadId=uploadId, partnumber=partnumber, partStartIndex=partStartIndex, srcfileKey=srcfile["Key"], total=total, md5list=md5list, dryrun=dryrun, complete_list=complete_list, ChunkSize=ChunkSize_auto) elif JobType == 'ALIOSS_TO_S3': pool.submit(alioss_download_uploadThread, uploadId=uploadId, partnumber=partnumber, partStartIndex=partStartIndex, srcfileKey=srcfile["Key"], srcfileSize=srcfile["Size"], total=total, md5list=md5list, dryrun=dryrun, complete_list=complete_list, ChunkSize=ChunkSize_auto) partnumber += 1 # 线程池End logger.info(f'All parts uploaded - {srcfile["Key"]} - size: {srcfile["Size"]}') # Local upload 的时候考虑传输过程中文件会变更的情况,重新扫描本地文件的MD5,而不是用之前读取的body去生成的md5list if ifVerifyMD5 and JobType == 'LOCAL_TO_S3': md5list = cal_md5list(indexList=indexList, srcfileKey=srcfile["Key"], ChunkSize=ChunkSize_auto) # 计算所有分片列表的总etag: cal_etag digests = b"".join(m.digest() for m in md5list) md5full = hashlib.md5(digests) cal_etag = '"%s-%s"' % (md5full.hexdigest(), len(md5list)) return cal_etag