1234567891011121314151617181920212223242526272829303132333435363738394041 |
- def uploadThread(*, uploadId, partnumber, partStartIndex, srcfileKey, total, md5list, dryrun, complete_list, ChunkSize):
- prefix_and_key = str(PurePosixPath(S3Prefix) / srcfileKey)
- if not dryrun:
- print(f'\033[0;32;1m--->Uploading\033[0m {srcfileKey} - {partnumber}/{total}')
- pstart_time = time.time()
- with open(os.path.join(SrcDir, srcfileKey), 'rb') as data:
- retryTime = 0
- while retryTime <= MaxRetry:
- try:
- data.seek(partStartIndex)
- chunkdata = data.read(ChunkSize)
- chunkdata_md5 = hashlib.md5(chunkdata)
- md5list[partnumber - 1] = chunkdata_md5
- if not dryrun:
- s3_dest_client.upload_part(
- Body=chunkdata,
- Bucket=DesBucket,
- Key=prefix_and_key,
- PartNumber=partnumber,
- UploadId=uploadId,
- ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
- )
- # 这里对单个part上传做了 MD5 校验,后面多part合并的时候会再做一次整个文件的
- break
- except Exception as err:
- retryTime += 1
- logger.info(f'UploadThreadFunc log: {srcfileKey} - {str(err)}')
- logger.info(f'Upload Fail - {srcfileKey} - Retry part - {partnumber} - Attempt - {retryTime}')
- if retryTime > MaxRetry:
- logger.error(f'Quit for Max retries: {retryTime}')
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- time.sleep(5 * retryTime) # 递增延迟重试
- complete_list.append(partnumber)
- pload_time = time.time() - pstart_time
- pload_bytes = len(chunkdata)
- pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
- if not dryrun:
- print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
- f'- {partnumber}/{total} \033[0;34;1m{len(complete_list) / total:.2%} - {pload_speed}\033[0m')
- return
|