def uploadThread(*, uploadId, partnumber, partStartIndex, srcfileKey, total, md5list, dryrun, complete_list, ChunkSize): prefix_and_key = str(PurePosixPath(S3Prefix) / srcfileKey) if not dryrun: print(f'\033[0;32;1m--->Uploading\033[0m {srcfileKey} - {partnumber}/{total}') pstart_time = time.time() with open(os.path.join(SrcDir, srcfileKey), 'rb') as data: retryTime = 0 while retryTime <= MaxRetry: try: data.seek(partStartIndex) chunkdata = data.read(ChunkSize) chunkdata_md5 = hashlib.md5(chunkdata) md5list[partnumber - 1] = chunkdata_md5 if not dryrun: s3_dest_client.upload_part( Body=chunkdata, Bucket=DesBucket, Key=prefix_and_key, PartNumber=partnumber, UploadId=uploadId, ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8') ) # 这里对单个part上传做了 MD5 校验,后面多part合并的时候会再做一次整个文件的 break except Exception as err: retryTime += 1 logger.info(f'UploadThreadFunc log: {srcfileKey} - {str(err)}') logger.info(f'Upload Fail - {srcfileKey} - Retry part - {partnumber} - Attempt - {retryTime}') if retryTime > MaxRetry: logger.error(f'Quit for Max retries: {retryTime}') input('PRESS ENTER TO QUIT') sys.exit(0) time.sleep(5 * retryTime) # 递增延迟重试 complete_list.append(partnumber) pload_time = time.time() - pstart_time pload_bytes = len(chunkdata) pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s" if not dryrun: print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} ' f'- {partnumber}/{total} \033[0;34;1m{len(complete_list) / total:.2%} - {pload_speed}\033[0m') return