multipart_upload.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. #!usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. @file: multipart_upload.py
  5. """
  6. import sys
  7. import time
  8. import botocore.exceptions
  9. from concurrent import futures
  10. from multipart_config import *
  11. class MultiPartUpload(object):
  12. def __init__(self):
  13. self.s3_client = self.get_s3_session()
  14. @staticmethod
  15. def get_s3_session():
  16. """
  17. 获取 S3 客户端
  18. :return: s3 client
  19. """
  20. s3_client = boto3.client(
  21. service_name="s3",
  22. aws_access_key_id=ACCESS_KEY,
  23. aws_secret_access_key=SECRET_KEY,
  24. endpoint_url=ENDPOINT_URL,
  25. verify=False
  26. )
  27. return s3_client
  28. def check_access(self):
  29. """
  30. 检查目标 s3 是否能写入
  31. :return:
  32. """
  33. bucket_flag = True
  34. try:
  35. res = self.s3_client.list_buckets()
  36. for bucket in res["Buckets"]:
  37. if bucket["Name"] == DESTINATION_BUCKET:
  38. bucket_flag = False
  39. if bucket_flag:
  40. self.s3_client.create_bucket(Bucket=DESTINATION_BUCKET)
  41. self.s3_client.put_object(
  42. Bucket=DESTINATION_BUCKET,
  43. Key="access_test",
  44. Body="This is a access test"
  45. )
  46. except Exception as e:
  47. print (e.message)
  48. def get_object_info(self):
  49. """
  50. 查看桶内是否以存在同名对象
  51. :return:
  52. """
  53. try:
  54. res = self.s3_client.head_object(
  55. Bucket=DESTINATION_BUCKET,
  56. Key=MULTIPART_UPLOAD_FILE_NAME
  57. )
  58. object_size = res["ContentLength"]
  59. except botocore.exceptions.ClientError:
  60. object_size = -1
  61. return object_size
  62. def get_upload_id_list(self):
  63. """
  64. 获取桶内是否存在未完成的multipart
  65. :return: 未完成 upload id list
  66. """
  67. upload_id_list = []
  68. res = self.s3_client.list_multipart_uploads(Bucket=DESTINATION_BUCKET)
  69. try:
  70. # 如果桶内不存在未完成的分片上传,response里就不存在 Uploads 字段
  71. for upload_info in res["Uploads"]:
  72. upload_id_list.append(
  73. {
  74. "Key": upload_info["Key"],
  75. "Initiated": upload_info["Initiated"],
  76. "UploadId": upload_info["UploadId"]
  77. }
  78. )
  79. print ("存在未完成分片 对象名 : {}, 初始化上传时间 : {}".format(upload_info["Key"], upload_info["Initiated"]))
  80. except Exception as e:
  81. # TODO 后面需要确定是哪种异常类型
  82. print ("桶 {} 没有未完成的分片上传".format(DESTINATION_BUCKET))
  83. return upload_id_list
  84. def clean_unfinished_upload(self, upload_id_list):
  85. """
  86. 清理(abort)未完成的分片上传
  87. :param upload_id_list: 未完成分片上传的 upload id
  88. :return:
  89. """
  90. if upload_id_list:
  91. keyboard_input = raw_input("是否清理未完成分片,输入 yes 完成清理,其他代表不清理\n")
  92. if keyboard_input == "yes":
  93. for upload_info in upload_id_list:
  94. self.s3_client.abort_multipart_upload(
  95. Bucket=DESTINATION_BUCKET,
  96. Key=upload_info["Key"],
  97. UploadId=upload_info["UploadId"]
  98. )
  99. print ("清理分片对象 :{}, UploadId : {}".format(upload_info["Key"], upload_info["UploadId"]))
  100. return True
  101. else:
  102. print ("不清理未完成分片上传信息")
  103. return False
  104. def check_object_exist(self, upload_id_list, clean_flag):
  105. """
  106. 检查源文件是否存在于对象存储中
  107. :param clean_flag 是否清理了未完成的分片上传
  108. :param upload_id_list: 未完成的 upload id
  109. :return:
  110. """
  111. object_size = self.get_object_info()
  112. if object_size == -1:
  113. if upload_id_list and (not clean_flag):
  114. # 查看对象是否存在于未完成的分片上传中,如果不存在,全新上传。如果在继续完成分片上传
  115. for upload_info in upload_id_list:
  116. if MULTIPART_UPLOAD_FILE_NAME in upload_info["Key"]:
  117. # 表示存在该对象未完成上传的分片信息,返回该分片信息
  118. print ("存在对象 {} 分片上传信息".format(MULTIPART_UPLOAD_FILE_NAME))
  119. return upload_info["UploadId"]
  120. else:
  121. # 表示这个对象不存在,也不存在分片信息。
  122. print ("不存在对象 {} 分片上传信息".format(MULTIPART_UPLOAD_FILE_NAME))
  123. return self.init_multipart_upload()
  124. elif object_size == MULTIPART_UPLOAD_FILE_SIZE:
  125. # 表示对象已经存在,不用重复上传
  126. print ("文件 {} 已存在,请检查后再上传!".format(MULTIPART_UPLOAD_FILE))
  127. sys.exit(-1)
  128. def check_part_number_list(self, upload_id):
  129. """
  130. 检查是否存在以上传的分片
  131. :param upload_id: 未完成分片上传文件的upload_id
  132. :return:
  133. """
  134. try:
  135. is_truncated = True
  136. part_number_marker = 0
  137. part_number_list = []
  138. while is_truncated:
  139. res = self.s3_client.list_parts(
  140. Bucket=DESTINATION_BUCKET,
  141. Key=MULTIPART_UPLOAD_FILE_NAME,
  142. UploadId=upload_id,
  143. MaxParts=1000,
  144. PartNumberMarker=part_number_marker
  145. )
  146. next_part_number_marker = res["NextPartNumberMarker"]
  147. is_truncated = res["IsTruncated"]
  148. if next_part_number_marker > 0:
  149. for part_number_object in res["Parts"]:
  150. part_number_list.append(part_number_object["PartNumber"])
  151. part_number_marker = next_part_number_marker
  152. if part_number_list:
  153. print ("已存在上传分片 : {}".format(part_number_list))
  154. except Exception as e:
  155. print ("Exception error {}, quit \n".format(e.message))
  156. sys.exit(-1)
  157. return part_number_list
  158. @staticmethod
  159. def split_source_file():
  160. """
  161. 对文件进行分片
  162. :return:
  163. """
  164. part_number = 1
  165. index_list = [0]
  166. while CHUNK_SIZE * part_number < MULTIPART_UPLOAD_FILE_SIZE:
  167. index_list.append(CHUNK_SIZE * part_number)
  168. part_number += 1
  169. if part_number > 10000:
  170. print ("分片数 %s 已超过最大允许分片数 10000,请调整分片大小." % part_number)
  171. sys.exit(-1)
  172. return index_list
  173. def upload_thread(self, upload_id, part_number, part_start_index, total):
  174. """
  175. :param upload_id: multipart upload id
  176. :param part_number: 分片数
  177. :param part_start_index: 分片在文件的偏移位置
  178. :param total:总分片量
  179. :return:
  180. """
  181. print ("Uploading {} / {}".format(part_number, total))
  182. with open(MULTIPART_UPLOAD_FILE, "rb") as data:
  183. retry_time = 0
  184. while retry_time <= MAX_RETRY_TIME:
  185. try:
  186. data.seek(part_start_index)
  187. self.s3_client.upload_part(
  188. Body=data.read(CHUNK_SIZE),
  189. Bucket=DESTINATION_BUCKET,
  190. Key=MULTIPART_UPLOAD_FILE_NAME,
  191. PartNumber=part_number,
  192. UploadId=upload_id
  193. )
  194. break
  195. except Exception as e:
  196. retry_time += 1
  197. if retry_time > MAX_RETRY_TIME:
  198. sys.exit(-1)
  199. time.sleep(5)
  200. print (" Complete {} / {} ".format(part_number, total))
  201. def upload_virtual_file_thread(self, upload_id, part_number, upload_body, total):
  202. """
  203. :param upload_id: multipart upload id
  204. :param part_number: 分片数
  205. :param upload_body: 虚拟文件分片内容
  206. :param total:总分片量
  207. :return:
  208. """
  209. print( "Uploading {} / {}".format(part_number, total))
  210. retry_time = 0
  211. while retry_time <= MAX_RETRY_TIME:
  212. try:
  213. self.s3_client.upload_part(
  214. Body=upload_body,
  215. Bucket=DESTINATION_BUCKET,
  216. Key=MULTIPART_UPLOAD_FILE_NAME,
  217. PartNumber=part_number,
  218. UploadId=upload_id
  219. )
  220. break
  221. except Exception as e:
  222. retry_time += 1
  223. if retry_time > MAX_RETRY_TIME:
  224. print ("已达最大重传次数 {},退出".format(retry_time))
  225. sys.exit(-1)
  226. time.sleep(5)
  227. print (" Complete {} / {} ".format(part_number, total))
  228. def upload_part(self, upload_id, index_list, part_number_list):
  229. """
  230. 多线程上传分片
  231. :param upload_id: 分片上传的upload id
  232. :param index_list: 所有分片的
  233. :param part_number_list:
  234. :return:
  235. """
  236. part_number = 1
  237. total = len(index_list)
  238. with futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as pool:
  239. for part_start_index in index_list:
  240. if part_number not in part_number_list:
  241. pool.submit(self.upload_thread, upload_id, part_number, part_start_index, total)
  242. part_number += 1
  243. print ("所有分片文件已上传, 文件名: {}, 文件大小:{}".format(MULTIPART_UPLOAD_FILE_NAME, MULTIPART_UPLOAD_FILE_SIZE))
  244. return part_number - 1
  245. def upload_virtual_file_part(self, upload_id, index_list, part_number_list):
  246. """
  247. 多线程上传虚拟文件分片
  248. :param upload_id: 分片上传的upload id
  249. :param index_list: 所有分片的
  250. :param part_number_list:
  251. :return:
  252. """
  253. part_number = 1
  254. total = len(index_list)
  255. part_data_body = CHUNK_SIZE * "a"
  256. with futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as pool:
  257. for part_start_index in index_list:
  258. if part_number not in part_number_list:
  259. pool.submit(self.upload_virtual_file_thread, upload_id, part_number, part_data_body, total)
  260. part_number += 1
  261. print ("所有分片文件已上传, 文件名: {}, 文件大小:{}".format(MULTIPART_UPLOAD_FILE_NAME, MULTIPART_UPLOAD_FILE_SIZE))
  262. return part_number - 1
  263. def complete_upload(self, upload_id, len_index_list):
  264. """
  265. 合并分片
  266. :param upload_id:
  267. :param len_index_list:
  268. :return:
  269. """
  270. uploaded_list_parts_clean = []
  271. part_number_marker = 0
  272. is_truncated = True
  273. while is_truncated:
  274. res = self.s3_client.list_parts(
  275. Bucket=DESTINATION_BUCKET,
  276. Key=MULTIPART_UPLOAD_FILE_NAME,
  277. UploadId=upload_id,
  278. MaxParts=1000,
  279. PartNumberMarker=part_number_marker
  280. )
  281. next_part_number_marker = res["NextPartNumberMarker"]
  282. is_truncated = res["IsTruncated"]
  283. if next_part_number_marker > 0:
  284. for part_object in res["Parts"]:
  285. e_tag = part_object["ETag"]
  286. part_number = part_object["PartNumber"]
  287. add_up = {
  288. "ETag": e_tag,
  289. "PartNumber": part_number
  290. }
  291. uploaded_list_parts_clean.append(add_up)
  292. part_number_marker = next_part_number_marker
  293. if len(uploaded_list_parts_clean) != len_index_list:
  294. print ("以上传分片数量与源文件分片数量不匹配!!!")
  295. sys.exit(-1)
  296. complete_struck_json = {
  297. "Parts": uploaded_list_parts_clean
  298. }
  299. res = self.s3_client.complete_multipart_upload(
  300. Bucket=DESTINATION_BUCKET,
  301. Key=MULTIPART_UPLOAD_FILE_NAME,
  302. UploadId=upload_id,
  303. MultipartUpload=complete_struck_json
  304. )
  305. def init_multipart_upload(self):
  306. res = self.s3_client.create_multipart_upload(
  307. Bucket=DESTINATION_BUCKET,
  308. Key=MULTIPART_UPLOAD_FILE_NAME
  309. )
  310. print ("文件 {} 初始化分片上传 upload_id {}".format(MULTIPART_UPLOAD_FILE, res["UploadId"]))
  311. return res["UploadId"]
  312. def start_upload(self):
  313. clean_flag = False
  314. # 检查是否能够正常接入
  315. self.check_access()
  316. # 获取桶内是否存在分片上传信息
  317. upload_id_list = self.get_upload_id_list()
  318. # 是否清理未完成的分片
  319. if upload_id_list:
  320. clean_flag = self.clean_unfinished_upload(upload_id_list)
  321. upload_id = self.check_object_exist(upload_id_list, clean_flag)
  322. part_number_list = self.check_part_number_list(upload_id)
  323. file_index_list = self.split_source_file()
  324. # 分片上传文件
  325. if VIRTUAL_FILE_FLAG:
  326. self.upload_virtual_file_part(upload_id, file_index_list, part_number_list)
  327. else:
  328. self.upload_part(upload_id, file_index_list, part_number_list)
  329. # 合并文件
  330. self.complete_upload(upload_id, len(file_index_list))
  331. if __name__ == "__main__":
  332. mp = MultiPartUpload()
  333. mp.start_upload()