123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- #!usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- @file: multipart_upload.py
- """
- import sys
- import time
- import botocore.exceptions
- from concurrent import futures
- from multipart_config import *
- class MultiPartUpload(object):
- def __init__(self):
- self.s3_client = self.get_s3_session()
- @staticmethod
- def get_s3_session():
- """
- 获取 S3 客户端
- :return: s3 client
- """
- s3_client = boto3.client(
- service_name="s3",
- aws_access_key_id=ACCESS_KEY,
- aws_secret_access_key=SECRET_KEY,
- endpoint_url=ENDPOINT_URL,
- verify=False
- )
- return s3_client
- def check_access(self):
- """
- 检查目标 s3 是否能写入
- :return:
- """
- bucket_flag = True
- try:
- res = self.s3_client.list_buckets()
- for bucket in res["Buckets"]:
- if bucket["Name"] == DESTINATION_BUCKET:
- bucket_flag = False
- if bucket_flag:
- self.s3_client.create_bucket(Bucket=DESTINATION_BUCKET)
- self.s3_client.put_object(
- Bucket=DESTINATION_BUCKET,
- Key="access_test",
- Body="This is a access test"
- )
- except Exception as e:
- print (e.message)
- def get_object_info(self):
- """
- 查看桶内是否以存在同名对象
- :return:
- """
- try:
- res = self.s3_client.head_object(
- Bucket=DESTINATION_BUCKET,
- Key=MULTIPART_UPLOAD_FILE_NAME
- )
- object_size = res["ContentLength"]
- except botocore.exceptions.ClientError:
-
- object_size = -1
- return object_size
- def get_upload_id_list(self):
- """
- 获取桶内是否存在未完成的multipart
- :return: 未完成 upload id list
- """
-
- upload_id_list = []
- res = self.s3_client.list_multipart_uploads(Bucket=DESTINATION_BUCKET)
- try:
- # 如果桶内不存在未完成的分片上传,response里就不存在 Uploads 字段
- for upload_info in res["Uploads"]:
- upload_id_list.append(
- {
- "Key": upload_info["Key"],
- "Initiated": upload_info["Initiated"],
- "UploadId": upload_info["UploadId"]
- }
- )
- print ("存在未完成分片 对象名 : {}, 初始化上传时间 : {}".format(upload_info["Key"], upload_info["Initiated"]))
- except Exception as e:
- # TODO 后面需要确定是哪种异常类型
- print ("桶 {} 没有未完成的分片上传".format(DESTINATION_BUCKET))
- return upload_id_list
- def clean_unfinished_upload(self, upload_id_list):
- """
- 清理(abort)未完成的分片上传
- :param upload_id_list: 未完成分片上传的 upload id
- :return:
- """
- if upload_id_list:
- keyboard_input = raw_input("是否清理未完成分片,输入 yes 完成清理,其他代表不清理\n")
- if keyboard_input == "yes":
- for upload_info in upload_id_list:
- self.s3_client.abort_multipart_upload(
- Bucket=DESTINATION_BUCKET,
- Key=upload_info["Key"],
- UploadId=upload_info["UploadId"]
- )
- print ("清理分片对象 :{}, UploadId : {}".format(upload_info["Key"], upload_info["UploadId"]))
- return True
- else:
- print ("不清理未完成分片上传信息")
- return False
- def check_object_exist(self, upload_id_list, clean_flag):
- """
- 检查源文件是否存在于对象存储中
- :param clean_flag 是否清理了未完成的分片上传
- :param upload_id_list: 未完成的 upload id
- :return:
- """
- object_size = self.get_object_info()
- if object_size == -1:
- if upload_id_list and (not clean_flag):
- # 查看对象是否存在于未完成的分片上传中,如果不存在,全新上传。如果在继续完成分片上传
- for upload_info in upload_id_list:
- if MULTIPART_UPLOAD_FILE_NAME in upload_info["Key"]:
- # 表示存在该对象未完成上传的分片信息,返回该分片信息
- print ("存在对象 {} 分片上传信息".format(MULTIPART_UPLOAD_FILE_NAME))
- return upload_info["UploadId"]
- else:
- # 表示这个对象不存在,也不存在分片信息。
- print ("不存在对象 {} 分片上传信息".format(MULTIPART_UPLOAD_FILE_NAME))
- return self.init_multipart_upload()
- elif object_size == MULTIPART_UPLOAD_FILE_SIZE:
- # 表示对象已经存在,不用重复上传
- print ("文件 {} 已存在,请检查后再上传!".format(MULTIPART_UPLOAD_FILE))
- sys.exit(-1)
- def check_part_number_list(self, upload_id):
- """
- 检查是否存在以上传的分片
- :param upload_id: 未完成分片上传文件的upload_id
- :return:
- """
- try:
- is_truncated = True
- part_number_marker = 0
- part_number_list = []
- while is_truncated:
- res = self.s3_client.list_parts(
- Bucket=DESTINATION_BUCKET,
- Key=MULTIPART_UPLOAD_FILE_NAME,
- UploadId=upload_id,
- MaxParts=1000,
- PartNumberMarker=part_number_marker
- )
- next_part_number_marker = res["NextPartNumberMarker"]
- is_truncated = res["IsTruncated"]
- if next_part_number_marker > 0:
- for part_number_object in res["Parts"]:
- part_number_list.append(part_number_object["PartNumber"])
- part_number_marker = next_part_number_marker
- if part_number_list:
- print ("已存在上传分片 : {}".format(part_number_list))
- except Exception as e:
- print ("Exception error {}, quit \n".format(e.message))
- sys.exit(-1)
- return part_number_list
- @staticmethod
- def split_source_file():
- """
- 对文件进行分片
- :return:
- """
- part_number = 1
- index_list = [0]
- while CHUNK_SIZE * part_number < MULTIPART_UPLOAD_FILE_SIZE:
- index_list.append(CHUNK_SIZE * part_number)
- part_number += 1
- if part_number > 10000:
- print ("分片数 %s 已超过最大允许分片数 10000,请调整分片大小." % part_number)
- sys.exit(-1)
- return index_list
- def upload_thread(self, upload_id, part_number, part_start_index, total):
- """
- :param upload_id: multipart upload id
- :param part_number: 分片数
- :param part_start_index: 分片在文件的偏移位置
- :param total:总分片量
- :return:
- """
- print ("Uploading {} / {}".format(part_number, total))
- with open(MULTIPART_UPLOAD_FILE, "rb") as data:
- retry_time = 0
- while retry_time <= MAX_RETRY_TIME:
- try:
- data.seek(part_start_index)
- self.s3_client.upload_part(
- Body=data.read(CHUNK_SIZE),
- Bucket=DESTINATION_BUCKET,
- Key=MULTIPART_UPLOAD_FILE_NAME,
- PartNumber=part_number,
- UploadId=upload_id
- )
- break
- except Exception as e:
- retry_time += 1
-
- if retry_time > MAX_RETRY_TIME:
-
- sys.exit(-1)
- time.sleep(5)
- print (" Complete {} / {} ".format(part_number, total))
- def upload_virtual_file_thread(self, upload_id, part_number, upload_body, total):
- """
- :param upload_id: multipart upload id
- :param part_number: 分片数
- :param upload_body: 虚拟文件分片内容
- :param total:总分片量
- :return:
- """
- print( "Uploading {} / {}".format(part_number, total))
- retry_time = 0
- while retry_time <= MAX_RETRY_TIME:
- try:
- self.s3_client.upload_part(
- Body=upload_body,
- Bucket=DESTINATION_BUCKET,
- Key=MULTIPART_UPLOAD_FILE_NAME,
- PartNumber=part_number,
- UploadId=upload_id
- )
- break
- except Exception as e:
- retry_time += 1
- if retry_time > MAX_RETRY_TIME:
- print ("已达最大重传次数 {},退出".format(retry_time))
- sys.exit(-1)
- time.sleep(5)
- print (" Complete {} / {} ".format(part_number, total))
- def upload_part(self, upload_id, index_list, part_number_list):
- """
- 多线程上传分片
- :param upload_id: 分片上传的upload id
- :param index_list: 所有分片的
- :param part_number_list:
- :return:
- """
- part_number = 1
- total = len(index_list)
- with futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as pool:
- for part_start_index in index_list:
- if part_number not in part_number_list:
- pool.submit(self.upload_thread, upload_id, part_number, part_start_index, total)
- part_number += 1
- print ("所有分片文件已上传, 文件名: {}, 文件大小:{}".format(MULTIPART_UPLOAD_FILE_NAME, MULTIPART_UPLOAD_FILE_SIZE))
- return part_number - 1
- def upload_virtual_file_part(self, upload_id, index_list, part_number_list):
- """
- 多线程上传虚拟文件分片
- :param upload_id: 分片上传的upload id
- :param index_list: 所有分片的
- :param part_number_list:
- :return:
- """
- part_number = 1
- total = len(index_list)
- part_data_body = CHUNK_SIZE * "a"
- with futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as pool:
- for part_start_index in index_list:
- if part_number not in part_number_list:
- pool.submit(self.upload_virtual_file_thread, upload_id, part_number, part_data_body, total)
- part_number += 1
- print ("所有分片文件已上传, 文件名: {}, 文件大小:{}".format(MULTIPART_UPLOAD_FILE_NAME, MULTIPART_UPLOAD_FILE_SIZE))
- return part_number - 1
- def complete_upload(self, upload_id, len_index_list):
- """
- 合并分片
- :param upload_id:
- :param len_index_list:
- :return:
- """
- uploaded_list_parts_clean = []
- part_number_marker = 0
- is_truncated = True
- while is_truncated:
- res = self.s3_client.list_parts(
- Bucket=DESTINATION_BUCKET,
- Key=MULTIPART_UPLOAD_FILE_NAME,
- UploadId=upload_id,
- MaxParts=1000,
- PartNumberMarker=part_number_marker
- )
- next_part_number_marker = res["NextPartNumberMarker"]
- is_truncated = res["IsTruncated"]
- if next_part_number_marker > 0:
- for part_object in res["Parts"]:
- e_tag = part_object["ETag"]
- part_number = part_object["PartNumber"]
- add_up = {
- "ETag": e_tag,
- "PartNumber": part_number
- }
- uploaded_list_parts_clean.append(add_up)
- part_number_marker = next_part_number_marker
- if len(uploaded_list_parts_clean) != len_index_list:
- print ("以上传分片数量与源文件分片数量不匹配!!!")
- sys.exit(-1)
- complete_struck_json = {
- "Parts": uploaded_list_parts_clean
- }
- res = self.s3_client.complete_multipart_upload(
- Bucket=DESTINATION_BUCKET,
- Key=MULTIPART_UPLOAD_FILE_NAME,
- UploadId=upload_id,
- MultipartUpload=complete_struck_json
- )
-
- def init_multipart_upload(self):
- res = self.s3_client.create_multipart_upload(
- Bucket=DESTINATION_BUCKET,
- Key=MULTIPART_UPLOAD_FILE_NAME
- )
- print ("文件 {} 初始化分片上传 upload_id {}".format(MULTIPART_UPLOAD_FILE, res["UploadId"]))
- return res["UploadId"]
- def start_upload(self):
- clean_flag = False
- # 检查是否能够正常接入
- self.check_access()
- # 获取桶内是否存在分片上传信息
- upload_id_list = self.get_upload_id_list()
- # 是否清理未完成的分片
- if upload_id_list:
- clean_flag = self.clean_unfinished_upload(upload_id_list)
- upload_id = self.check_object_exist(upload_id_list, clean_flag)
- part_number_list = self.check_part_number_list(upload_id)
- file_index_list = self.split_source_file()
- # 分片上传文件
- if VIRTUAL_FILE_FLAG:
- self.upload_virtual_file_part(upload_id, file_index_list, part_number_list)
- else:
- self.upload_part(upload_id, file_index_list, part_number_list)
- # 合并文件
- self.complete_upload(upload_id, len(file_index_list))
- if __name__ == "__main__":
- mp = MultiPartUpload()
- mp.start_upload()
|