s3_upload.py 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290
  1. # -*- coding: utf-8 -*-
  2. # PROJECT LONGBOW - AMAZON S3 UPLOAD TOOL WITH BREAK-POINT RESUMING
  3. import os
  4. import sys
  5. import json
  6. import base64
  7. from boto3.session import Session
  8. from botocore.client import Config
  9. from concurrent import futures
  10. from configparser import ConfigParser, RawConfigParser, NoOptionError
  11. import time
  12. import datetime
  13. import hashlib
  14. import logging
  15. from pathlib import PurePosixPath, Path
  16. import platform
  17. import codecs
  18. os.system("") # workaround for some windows system to print color
  19. global JobType, SrcFileIndex, DesProfileName, DesBucket, S3Prefix, MaxRetry, MaxThread, \
  20. MaxParallelFile, StorageClass, ifVerifyMD5, DontAskMeToClean, LoggingLevel, \
  21. SrcDir, SrcBucket, SrcProfileName, ali_SrcBucket, ali_access_key_id, ali_access_key_secret, ali_endpoint
  22. # Read config.ini with GUI
  23. def set_config():
  24. sys_para = sys.argv
  25. file_path = os.path.split(sys_para[0])[0]
  26. gui = False
  27. if platform.uname()[0] == 'Windows': # Win默认打开
  28. gui = True
  29. if platform.uname()[0] == 'Linux': # Linux 默认关闭
  30. gui = False
  31. if '--gui' in sys.argv: # 指定 gui 模式
  32. gui = True
  33. if '--nogui' in sys.argv: # 带 nogui 就覆盖前面Win打开要求
  34. gui = False
  35. JobType_list = ['LOCAL_TO_S3', 'S3_TO_S3', 'ALIOSS_TO_S3']
  36. StorageClass_list = ['STANDARD', 'REDUCED_REDUNDANCY', 'STANDARD_IA', 'ONEZONE_IA', 'INTELLIGENT_TIERING',
  37. 'GLACIER', 'DEEP_ARCHIVE']
  38. config_file = os.path.join(file_path, 's3_upload_config.ini')
  39. # If no config file, read the default config
  40. if not os.path.exists(config_file):
  41. config_file += '.default'
  42. print("No customized config, use the default config")
  43. cfg = ConfigParser()
  44. print(f'Reading config file: {config_file}')
  45. # Get local config value
  46. try:
  47. global JobType, SrcFileIndex, DesProfileName, DesBucket, S3Prefix, MaxRetry, MaxThread, \
  48. MaxParallelFile, StorageClass, ifVerifyMD5, DontAskMeToClean, LoggingLevel, \
  49. SrcDir, SrcBucket, SrcProfileName, ali_SrcBucket, ali_access_key_id, ali_access_key_secret, ali_endpoint
  50. cfg.read(config_file, encoding='utf-8-sig')
  51. JobType = cfg.get('Basic', 'JobType')
  52. SrcFileIndex = cfg.get('Basic', 'SrcFileIndex')
  53. DesProfileName = cfg.get('Basic', 'DesProfileName')
  54. DesBucket = cfg.get('Basic', 'DesBucket')
  55. S3Prefix = cfg.get('Basic', 'S3Prefix')
  56. Megabytes = 1024 * 1024
  57. ChunkSize = cfg.getint('Advanced', 'ChunkSize') * Megabytes
  58. MaxRetry = cfg.getint('Advanced', 'MaxRetry')
  59. MaxThread = cfg.getint('Advanced', 'MaxThread')
  60. MaxParallelFile = cfg.getint('Advanced', 'MaxParallelFile')
  61. StorageClass = cfg.get('Advanced', 'StorageClass')
  62. ifVerifyMD5 = cfg.getboolean('Advanced', 'ifVerifyMD5')
  63. DontAskMeToClean = cfg.getboolean('Advanced', 'DontAskMeToClean')
  64. LoggingLevel = cfg.get('Advanced', 'LoggingLevel')
  65. try:
  66. SrcDir = cfg.get('LOCAL_TO_S3', 'SrcDir')
  67. except NoOptionError:
  68. SrcDir = ''
  69. try:
  70. SrcBucket = cfg.get('S3_TO_S3', 'SrcBucket')
  71. SrcProfileName = cfg.get('S3_TO_S3', 'SrcProfileName')
  72. except NoOptionError:
  73. SrcBucket = ''
  74. SrcProfileName = ''
  75. try:
  76. ali_SrcBucket = cfg.get('ALIOSS_TO_S3', 'ali_SrcBucket')
  77. ali_access_key_id = cfg.get('ALIOSS_TO_S3', 'ali_access_key_id')
  78. ali_access_key_secret = cfg.get('ALIOSS_TO_S3', 'ali_access_key_secret')
  79. ali_endpoint = cfg.get('ALIOSS_TO_S3', 'ali_endpoint')
  80. except NoOptionError:
  81. ali_SrcBucket = ""
  82. ali_access_key_id = ""
  83. ali_access_key_secret = ""
  84. ali_endpoint = ""
  85. except Exception as e:
  86. print("ERR loading s3_upload_config.ini", str(e))
  87. input('PRESS ENTER TO QUIT')
  88. sys.exit(0)
  89. # GUI only well support LOCAL_TO_S3 mode, start with --gui option
  90. # For other JobTpe, GUI is not a prefer option since it's better run on EC2 Linux
  91. if gui:
  92. # For GUI
  93. from tkinter import Tk, filedialog, END, StringVar, BooleanVar, messagebox
  94. from tkinter.ttk import Combobox, Label, Button, Entry, Spinbox, Checkbutton
  95. # get profile name list in ./aws/credentials
  96. pro_conf = RawConfigParser()
  97. pro_path = os.path.join(os.path.expanduser("~"), ".aws")
  98. cre_path = os.path.join(pro_path, "credentials")
  99. if os.path.exists(cre_path):
  100. pro_conf.read(cre_path)
  101. profile_list = pro_conf.sections()
  102. else:
  103. print(f"There is no aws_access_key in {cre_path}, please input for Destination S3 Bucket: ")
  104. os.mkdir(pro_path)
  105. aws_access_key_id = input('aws_access_key_id: ')
  106. aws_secret_access_key = input('aws_secret_access_key: ')
  107. region = input('region: ')
  108. pro_conf.add_section('default')
  109. pro_conf['default']['aws_access_key_id'] = aws_access_key_id
  110. pro_conf['default']['aws_secret_access_key'] = aws_secret_access_key
  111. pro_conf['default']['region'] = region
  112. profile_list = ['default']
  113. with open(cre_path, 'w') as f:
  114. print(f"Saving credentials to {cre_path}")
  115. pro_conf.write(f)
  116. # Click Select Folder
  117. def browse_folder():
  118. local_dir = filedialog.askdirectory(initialdir=os.path.dirname(__file__))
  119. url_txt.delete(0, END)
  120. url_txt.insert(0, local_dir)
  121. file_txt.delete(0, END)
  122. file_txt.insert(0, "*")
  123. # Finsih browse folder
  124. # Click Select File
  125. def browse_file():
  126. local_file = filedialog.askopenfilename()
  127. url_txt.delete(0, END)
  128. url_txt.insert(0, os.path.split(local_file)[0])
  129. file_txt.delete(0, END)
  130. file_txt.insert(0, os.path.split(local_file)[1])
  131. # Finsih browse file
  132. # Click List Buckets
  133. def ListBuckets(*args):
  134. DesProfileName = DesProfileName_txt.get()
  135. client = Session(profile_name=DesProfileName).client('s3')
  136. bucket_list = []
  137. try:
  138. response = client.list_buckets()
  139. if 'Buckets' in response:
  140. bucket_list = [b['Name'] for b in response['Buckets']]
  141. except Exception as e:
  142. messagebox.showerror('Error', f'Failt to List buckets. \n'
  143. f'Please verify your aws_access_key of profile: [{DesProfileName}]\n'
  144. f'{str(e)}')
  145. bucket_list = ['CAN_NOT_GET_BUCKET_LIST']
  146. DesBucket_txt['values'] = bucket_list
  147. DesBucket_txt.current(0)
  148. # Finish ListBuckets
  149. # Click List Prefix
  150. def ListPrefix(*args):
  151. DesProfileName = DesProfileName_txt.get()
  152. client = Session(profile_name=DesProfileName).client('s3')
  153. prefix_list = []
  154. this_bucket = DesBucket_txt.get()
  155. max_get = 100
  156. try:
  157. response = client.list_objects_v2(
  158. Bucket=this_bucket,
  159. Delimiter='/'
  160. ) # Only get the max 1000 prefix for simply list
  161. if 'CommonPrefixes' in response:
  162. prefix_list = [c['Prefix'] for c in response['CommonPrefixes']]
  163. if not prefix_list:
  164. messagebox.showinfo('Message', f'There is no "/" Prefix in: {this_bucket}')
  165. if response['IsTruncated']:
  166. messagebox.showinfo('Message', f'More than {max_get} Prefix, cannot fully list here.')
  167. except Exception as e:
  168. messagebox.showinfo('Error', f'Cannot get prefix list from bucket: {this_bucket}, {str(e)}')
  169. S3Prefix_txt['values'] = prefix_list
  170. S3Prefix_txt.current(0)
  171. # Finish list prefix
  172. # Change JobType
  173. def job_change(*args):
  174. if JobType_mode.get() != 'LOCAL_TO_S3':
  175. messagebox.showinfo('Notice', 'S3_TO_S3 or OSS_TO_S3. \n'
  176. 'Please config the rest hidden parameter in s3_upload_config.ini')
  177. # Finish JobType change message
  178. # Click START button
  179. def close():
  180. window.withdraw()
  181. ok = messagebox.askokcancel('Start uploading job',
  182. f'Upload from Local to \ns3://{DesBucket_txt.get()}/{S3Prefix_txt.get()}\n'
  183. f'Click OK to START')
  184. if not ok:
  185. window.deiconify()
  186. return
  187. window.quit()
  188. return
  189. # Finish close()
  190. # Start GUI
  191. window = Tk()
  192. window.title("LONGBOW - AMAZON S3 UPLOAD TOOL WITH BREAK-POINT RESUMING")
  193. window.geometry('705x350')
  194. window.configure(background='#ECECEC')
  195. window.protocol("WM_DELETE_WINDOW", sys.exit)
  196. Label(window, text='Job Type').grid(column=0, row=0, sticky='w', padx=2, pady=2)
  197. JobType_mode = Combobox(window, width=15, state="readonly")
  198. JobType_mode['values'] = tuple(JobType_list)
  199. JobType_mode.grid(column=1, row=0, sticky='w', padx=2, pady=2)
  200. if JobType in JobType_list:
  201. position = JobType_list.index(JobType)
  202. JobType_mode.current(position)
  203. else:
  204. JobType_mode.current(0)
  205. JobType_mode.bind("<<ComboboxSelected>>", job_change)
  206. Label(window, text="Folder").grid(column=0, row=1, sticky='w', padx=2, pady=2)
  207. url_txt = Entry(window, width=50)
  208. url_txt.grid(column=1, row=1, sticky='w', padx=2, pady=2)
  209. url_btn = Button(window, text="Select Folder", width=10, command=browse_folder)
  210. url_btn.grid(column=2, row=1, sticky='w', padx=2, pady=2)
  211. url_txt.insert(0, SrcDir)
  212. Label(window, text="Filename or *").grid(column=0, row=2, sticky='w', padx=2, pady=2)
  213. file_txt = Entry(window, width=50)
  214. file_txt.grid(column=1, row=2, sticky='w', padx=2, pady=2)
  215. file_btn = Button(window, text="Select File", width=10, command=browse_file)
  216. file_btn.grid(column=2, row=2, sticky='w', padx=2, pady=2)
  217. file_txt.insert(0, SrcFileIndex)
  218. Label(window, text="AWS Profile").grid(column=0, row=3, sticky='w', padx=2, pady=2)
  219. DesProfileName_txt = Combobox(window, width=15, state="readonly")
  220. DesProfileName_txt['values'] = tuple(profile_list)
  221. DesProfileName_txt.grid(column=1, row=3, sticky='w', padx=2, pady=2)
  222. if DesProfileName in profile_list:
  223. position = profile_list.index(DesProfileName)
  224. DesProfileName_txt.current(position)
  225. else:
  226. DesProfileName_txt.current(0)
  227. DesProfileName = DesProfileName_txt.get()
  228. DesProfileName_txt.bind("<<ComboboxSelected>>", ListBuckets)
  229. Label(window, text="S3 Bucket").grid(column=0, row=4, sticky='w', padx=2, pady=2)
  230. DesBucket_txt = Combobox(window, width=48)
  231. DesBucket_txt.grid(column=1, row=4, sticky='w', padx=2, pady=2)
  232. DesBucket_txt['values'] = DesBucket
  233. DesBucket_txt.current(0)
  234. Button(window, text="List Buckets", width=10, command=ListBuckets) \
  235. .grid(column=2, row=4, sticky='w', padx=2, pady=2)
  236. Label(window, text="S3 Prefix").grid(column=0, row=5, sticky='w', padx=2, pady=2)
  237. S3Prefix_txt = Combobox(window, width=48)
  238. S3Prefix_txt.grid(column=1, row=5, sticky='w', padx=2, pady=2)
  239. S3Prefix_txt['values'] = S3Prefix
  240. if S3Prefix != '':
  241. S3Prefix_txt.current(0)
  242. Button(window, text="List Prefix", width=10, command=ListPrefix) \
  243. .grid(column=2, row=5, sticky='w', padx=2, pady=2)
  244. Label(window, text="MaxThread/File").grid(column=0, row=6, sticky='w', padx=2, pady=2)
  245. if MaxThread < 1 or MaxThread > 100:
  246. MaxThread = 5
  247. var_t = StringVar()
  248. var_t.set(str(MaxThread))
  249. MaxThread_txt = Spinbox(window, from_=1, to=100, width=15, textvariable=var_t)
  250. MaxThread_txt.grid(column=1, row=6, sticky='w', padx=2, pady=2)
  251. Label(window, text="MaxParallelFile").grid(column=0, row=7, sticky='w', padx=2, pady=2)
  252. if MaxParallelFile < 1 or MaxParallelFile > 100:
  253. MaxParallelFile = 5
  254. var_f = StringVar()
  255. var_f.set(str(MaxParallelFile))
  256. MaxParallelFile_txt = Spinbox(window, from_=1, to=100, width=15, textvariable=var_f)
  257. MaxParallelFile_txt.grid(column=1, row=7, sticky='w', padx=2, pady=2)
  258. Label(window, text="S3 StorageClass").grid(column=0, row=8, sticky='w', padx=2, pady=2)
  259. StorageClass_txt = Combobox(window, width=15, state="readonly")
  260. StorageClass_txt['values'] = tuple(StorageClass_list)
  261. StorageClass_txt.grid(column=1, row=8, sticky='w', padx=2, pady=2)
  262. if StorageClass in StorageClass_list:
  263. position = StorageClass_list.index(StorageClass)
  264. StorageClass_txt.current(position)
  265. else:
  266. StorageClass_txt.current(0)
  267. save_config = BooleanVar()
  268. save_config.set(True)
  269. save_config_txt = Checkbutton(window, text="Save to s3_upload_config.ini", var=save_config)
  270. save_config_txt.grid(column=1, row=9, padx=2, pady=2)
  271. Button(window, text="Start Upload", width=15, command=close).grid(column=1, row=10, padx=5, pady=5)
  272. window.mainloop()
  273. JobType = JobType_mode.get()
  274. SrcDir = url_txt.get()
  275. SrcFileIndex = file_txt.get()
  276. DesBucket = DesBucket_txt.get()
  277. S3Prefix = S3Prefix_txt.get()
  278. DesProfileName = DesProfileName_txt.get()
  279. StorageClass = StorageClass_txt.get()
  280. MaxThread = int(MaxThread_txt.get())
  281. MaxParallelFile = int(MaxParallelFile_txt.get())
  282. if save_config:
  283. cfg['Basic']['JobType'] = JobType
  284. cfg['Basic']['DesProfileName'] = DesProfileName
  285. cfg['Basic']['DesBucket'] = DesBucket
  286. cfg['Basic']['S3Prefix'] = S3Prefix
  287. cfg['Advanced']['MaxThread'] = str(MaxThread)
  288. cfg['Advanced']['MaxParallelFile'] = str(MaxParallelFile)
  289. cfg['Advanced']['StorageClass'] = StorageClass
  290. cfg['LOCAL_TO_S3']['SrcDir'] = SrcDir
  291. cfg['Basic']['SrcFileIndex'] = SrcFileIndex
  292. config_file = os.path.join(file_path, 's3_upload_config.ini')
  293. with codecs.open(config_file, 'w', 'utf-8') as f:
  294. cfg.write(f)
  295. print(f"Save config to {config_file}")
  296. # GUI window finish
  297. S3Prefix = str(PurePosixPath(S3Prefix)) # 去掉结尾的'/',如果有的话
  298. if S3Prefix == '/' or S3Prefix == '.':
  299. S3Prefix = ''
  300. # 校验
  301. if JobType not in JobType_list:
  302. print(f'ERR JobType: {JobType}, check config file: {config_file}')
  303. input('PRESS ENTER TO QUIT')
  304. sys.exit(0)
  305. # Finish set_config()
  306. return ChunkSize
  307. # Configure logging
  308. def set_log():
  309. logger = logging.getLogger()
  310. # File logging
  311. if not os.path.exists("./log"):
  312. os.system("mkdir log")
  313. this_file_name = os.path.splitext(os.path.basename(__file__))[0]
  314. file_time = datetime.datetime.now().isoformat().replace(':', '-')[:19]
  315. log_file_name = './log/' + this_file_name + '-' + file_time + '.log'
  316. print('Logging to file:', os.path.abspath(log_file_name))
  317. print('Logging level:', LoggingLevel)
  318. fileHandler = logging.FileHandler(filename=log_file_name, encoding='utf-8')
  319. fileHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
  320. logger.addHandler(fileHandler)
  321. # Screen stream logging
  322. streamHandler = logging.StreamHandler()
  323. streamHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
  324. logger.addHandler(streamHandler)
  325. # Loggin Level
  326. logger.setLevel(logging.WARNING)
  327. if LoggingLevel == 'INFO':
  328. logger.setLevel(logging.INFO)
  329. elif LoggingLevel == 'DEBUG':
  330. logger.setLevel(logging.DEBUG)
  331. return logger, log_file_name
  332. # Get local file list
  333. def get_local_file_list(str_key=False):
  334. __src_file_list = []
  335. try:
  336. if SrcFileIndex == "*":
  337. for parent, dirnames, filenames in os.walk(SrcDir):
  338. for filename in filenames: # 遍历输出文件信息
  339. file_absPath = os.path.join(parent, filename)
  340. file_relativePath = file_absPath[len(SrcDir) + 1:]
  341. file_size = os.path.getsize(file_absPath)
  342. key = Path(file_relativePath)
  343. if str_key:
  344. key = str(key)
  345. __src_file_list.append({
  346. "Key": key,
  347. "Size": file_size
  348. })
  349. else:
  350. join_path = os.path.join(SrcDir, SrcFileIndex)
  351. file_size = os.path.getsize(join_path)
  352. __src_file_list = [{
  353. "Key": SrcFileIndex,
  354. "Size": file_size
  355. }]
  356. except Exception as err:
  357. logger.error('Can not get source files. ERR: ' + str(err))
  358. input('PRESS ENTER TO QUIT')
  359. sys.exit(0)
  360. if not __src_file_list:
  361. logger.error('Source file empty.')
  362. input('PRESS ENTER TO QUIT')
  363. sys.exit(0)
  364. return __src_file_list
  365. # Get object list on S3
  366. def get_s3_file_list(*, s3_client, bucket, S3Prefix, no_prefix=False):
  367. logger.info('Get s3 file list ' + bucket)
  368. # For delete prefix in des_prefix
  369. if S3Prefix == '':
  370. # 目的bucket没有设置 Prefix
  371. dp_len = 0
  372. else:
  373. # 目的bucket的 "prefix/"长度
  374. dp_len = len(S3Prefix) + 1
  375. paginator = s3_client.get_paginator('list_objects_v2')
  376. __des_file_list = []
  377. try:
  378. response_iterator = paginator.paginate(
  379. Bucket=bucket,
  380. Prefix=S3Prefix
  381. )
  382. for page in response_iterator:
  383. if "Contents" in page:
  384. for n in page["Contents"]:
  385. key = n["Key"]
  386. if no_prefix:
  387. key = key[dp_len:]
  388. __des_file_list.append({
  389. "Key": key,
  390. "Size": n["Size"]
  391. })
  392. logger.info(f'Bucket list length:{str(len(__des_file_list))}')
  393. except Exception as err:
  394. logger.error(str(err))
  395. input('PRESS ENTER TO QUIT')
  396. sys.exit(0)
  397. return __des_file_list
  398. # Check single file on S3
  399. def head_s3_single_file(s3_client, bucket):
  400. try:
  401. response_fileList = s3_client.head_object(
  402. Bucket=bucket,
  403. Key=str(Path(S3Prefix)/SrcFileIndex)
  404. )
  405. file = [{
  406. "Key": str(Path(S3Prefix)/SrcFileIndex),
  407. "Size": response_fileList["ContentLength"]
  408. }]
  409. except Exception as err:
  410. logger.error(str(err))
  411. input('PRESS ENTER TO QUIT')
  412. sys.exit(0)
  413. return file
  414. # Check single file on OSS
  415. def head_oss_single_file(__ali_bucket):
  416. try:
  417. response_fileList = __ali_bucket.head_object(
  418. key=S3Prefix + SrcFileIndex
  419. )
  420. file = [{
  421. "Key": S3Prefix + SrcFileIndex,
  422. "Size": response_fileList.content_length
  423. }]
  424. except Exception as err:
  425. logger.error(str(err))
  426. input('PRESS ENTER TO QUIT')
  427. sys.exit(0)
  428. return file
  429. # Get object list on OSS
  430. def get_ali_oss_file_list(__ali_bucket):
  431. logger.info('Get oss file list ' + ali_SrcBucket)
  432. __des_file_list = []
  433. try:
  434. response_fileList = __ali_bucket.list_objects(
  435. prefix=S3Prefix,
  436. max_keys=1000
  437. )
  438. if len(response_fileList.object_list) != 0:
  439. for n in response_fileList.object_list:
  440. __des_file_list.append({
  441. "Key": n.key,
  442. "Size": n.size
  443. })
  444. while response_fileList.is_truncated:
  445. response_fileList = __ali_bucket.list_objects(
  446. prefix=S3Prefix,
  447. max_keys=1000,
  448. marker=response_fileList.next_marker
  449. )
  450. for n in response_fileList.object_list:
  451. __des_file_list.append({
  452. "Key": n.key,
  453. "Size": n.size
  454. })
  455. else:
  456. logger.info('File list is empty in the ali_oss bucket')
  457. except Exception as err:
  458. logger.error(str(err))
  459. input('PRESS ENTER TO QUIT')
  460. sys.exit(0)
  461. return __des_file_list
  462. # Get all exist object list on S3
  463. def get_uploaded_list(s3_client):
  464. logger.info('Get unfinished multipart upload')
  465. NextKeyMarker = ''
  466. IsTruncated = True
  467. __multipart_uploaded_list = []
  468. while IsTruncated:
  469. list_multipart_uploads = s3_client.list_multipart_uploads(
  470. Bucket=DesBucket,
  471. Prefix=S3Prefix,
  472. MaxUploads=1000,
  473. KeyMarker=NextKeyMarker
  474. )
  475. IsTruncated = list_multipart_uploads["IsTruncated"]
  476. NextKeyMarker = list_multipart_uploads["NextKeyMarker"]
  477. if NextKeyMarker != '':
  478. for i in list_multipart_uploads["Uploads"]:
  479. __multipart_uploaded_list.append({
  480. "Key": i["Key"],
  481. "Initiated": i["Initiated"],
  482. "UploadId": i["UploadId"]
  483. })
  484. logger.info(f'Unfinished upload, Key: {i["Key"]}, Time: {i["Initiated"]}')
  485. return __multipart_uploaded_list
  486. # Jump to handle next file
  487. class NextFile(Exception):
  488. pass
  489. def uploadThread_small(srcfile, prefix_and_key):
  490. print(f'\033[0;32;1m--->Uploading\033[0m {srcfile["Key"]} - small file')
  491. with open(os.path.join(SrcDir, srcfile["Key"]), 'rb') as data:
  492. for retryTime in range(MaxRetry + 1):
  493. try:
  494. pstart_time = time.time()
  495. chunkdata = data.read()
  496. chunkdata_md5 = hashlib.md5(chunkdata)
  497. s3_dest_client.put_object(
  498. Body=chunkdata,
  499. Bucket=DesBucket,
  500. Key=prefix_and_key,
  501. ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8'),
  502. StorageClass=StorageClass
  503. )
  504. pload_time = time.time() - pstart_time
  505. pload_bytes = len(chunkdata)
  506. pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
  507. print(f'\033[0;34;1m --->Complete\033[0m {srcfile["Key"]} - small file - {pload_speed}')
  508. break
  509. except Exception as e:
  510. logger.warning(f'Upload small file Fail: {srcfile["Key"]}, '
  511. f'{str(e)}, Attempts: {retryTime}')
  512. if retryTime >= MaxRetry:
  513. logger.error(f'Fail MaxRetry Download/Upload small file: {srcfile["Key"]}')
  514. return "MaxRetry"
  515. else:
  516. time.sleep(5 * retryTime)
  517. return
  518. def download_uploadThread_small(srcfileKey):
  519. for retryTime in range(MaxRetry + 1):
  520. try:
  521. pstart_time = time.time()
  522. # Get object
  523. print(f"\033[0;33;1m--->Downloading\033[0m {srcfileKey} - small file")
  524. response_get_object = s3_src_client.get_object(
  525. Bucket=SrcBucket,
  526. Key=srcfileKey
  527. )
  528. getBody = response_get_object["Body"].read()
  529. chunkdata_md5 = hashlib.md5(getBody)
  530. ContentMD5 = base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
  531. # Put object
  532. print(f'\033[0;32;1m --->Uploading\033[0m {srcfileKey} - small file')
  533. s3_dest_client.put_object(
  534. Body=getBody,
  535. Bucket=DesBucket,
  536. Key=srcfileKey,
  537. ContentMD5=ContentMD5,
  538. StorageClass=StorageClass
  539. )
  540. # 结束 Upload/download
  541. pload_time = time.time() - pstart_time
  542. pload_bytes = len(getBody)
  543. pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
  544. print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} - small file - {pload_speed}')
  545. break
  546. except Exception as e:
  547. logger.warning(f'Download/Upload small file Fail: {srcfileKey}, '
  548. f'{str(e)}, Attempts: {retryTime}')
  549. if retryTime >= MaxRetry:
  550. logger.error(f'Fail MaxRetry Download/Upload small file: {srcfileKey}')
  551. return "MaxRetry"
  552. else:
  553. time.sleep(5 * retryTime)
  554. return
  555. def alioss_download_uploadThread_small(srcfileKey):
  556. for retryTime in range(MaxRetry + 1):
  557. try:
  558. pstart_time = time.time()
  559. # Get Objcet
  560. print(f"\033[0;33;1m--->Downloading\033[0m {srcfileKey} - small file")
  561. response_get_object = ali_bucket.get_object(
  562. key=srcfileKey
  563. )
  564. getBody = b''
  565. for chunk in response_get_object:
  566. if chunk != '':
  567. getBody += chunk
  568. chunkdata_md5 = hashlib.md5(getBody)
  569. # Put Object
  570. print(f"\033[0;32;1m --->Uploading\033[0m {srcfileKey} - small file")
  571. s3_dest_client.put_object(
  572. Body=getBody,
  573. Bucket=DesBucket,
  574. Key=srcfileKey,
  575. ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8'),
  576. StorageClass=StorageClass
  577. )
  578. pload_time = time.time() - pstart_time
  579. pload_bytes = len(getBody)
  580. pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
  581. print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} - small file - {pload_speed}')
  582. break
  583. except Exception as e:
  584. logger.warning(f'Download/Upload small file Fail: {srcfileKey} - small file, '
  585. f'{str(e)}, Attempts: {retryTime}')
  586. if retryTime >= MaxRetry:
  587. logger.error(f'Fail MaxRetry Download/Upload small file: {srcfileKey} - small file')
  588. return "MaxRetry"
  589. else:
  590. time.sleep(5 * retryTime)
  591. return
  592. # Upload file with different JobType
  593. def upload_file(*, srcfile, desFilelist, UploadIdList, ChunkSize_default): # UploadIdList就是multipart_uploaded_list
  594. logger.info(f'Start file: {srcfile["Key"]}')
  595. prefix_and_key = srcfile["Key"]
  596. if JobType == 'LOCAL_TO_S3':
  597. prefix_and_key = str(PurePosixPath(S3Prefix) / srcfile["Key"])
  598. if srcfile['Size'] >= ChunkSize_default:
  599. try:
  600. # 循环重试3次(如果MD5计算的ETag不一致)
  601. for md5_retry in range(3):
  602. # 检查文件是否已存在,存在不继续、不存在且没UploadID要新建、不存在但有UploadID得到返回的UploadID
  603. response_check_upload = check_file_exist(srcfile=srcfile,
  604. desFilelist=desFilelist,
  605. UploadIdList=UploadIdList)
  606. if response_check_upload == 'UPLOAD':
  607. logger.info(f'New upload: {srcfile["Key"]}')
  608. response_new_upload = s3_dest_client.create_multipart_upload(
  609. Bucket=DesBucket,
  610. Key=prefix_and_key,
  611. StorageClass=StorageClass
  612. )
  613. # logger.info("UploadId: "+response_new_upload["UploadId"])
  614. reponse_uploadId = response_new_upload["UploadId"]
  615. partnumberList = []
  616. elif response_check_upload == 'NEXT':
  617. logger.info(f'Duplicated. {srcfile["Key"]} same size, goto next file.')
  618. raise NextFile()
  619. else:
  620. reponse_uploadId = response_check_upload
  621. # 获取已上传partnumberList
  622. partnumberList = checkPartnumberList(srcfile, reponse_uploadId)
  623. # 获取索引列表,例如[0, 10, 20]
  624. response_indexList, ChunkSize_auto = split(srcfile, ChunkSize_default)
  625. # 执行分片upload
  626. upload_etag_full = uploadPart(uploadId=reponse_uploadId,
  627. indexList=response_indexList,
  628. partnumberList=partnumberList,
  629. srcfile=srcfile,
  630. ChunkSize_auto=ChunkSize_auto)
  631. # 合并S3上的文件
  632. response_complete = completeUpload(reponse_uploadId=reponse_uploadId,
  633. srcfileKey=srcfile["Key"],
  634. len_indexList=len(response_indexList))
  635. logger.info(f'FINISH: {srcfile["Key"]} TO {response_complete["Location"]}')
  636. # 检查文件MD5
  637. if ifVerifyMD5:
  638. if response_complete["ETag"] == upload_etag_full:
  639. logger.info(f'MD5 ETag Matched - {srcfile["Key"]} - {response_complete["ETag"]}')
  640. break
  641. else: # ETag 不匹配,删除S3的文件,重试
  642. logger.warning(f'MD5 ETag NOT MATCHED {srcfile["Key"]}( Destination / Origin ): '
  643. f'{response_complete["ETag"]} - {upload_etag_full}')
  644. s3_dest_client.delete_object(
  645. Bucket=DesBucket,
  646. Key=prefix_and_key
  647. )
  648. UploadIdList = []
  649. logger.warning('Deleted and retry upload {srcfile["Key"]}')
  650. if md5_retry == 2:
  651. logger.warning('MD5 ETag NOT MATCHED Exceed Max Retries - {srcfile["Key"]}')
  652. else:
  653. break
  654. except NextFile:
  655. pass
  656. # Small file procedure
  657. else:
  658. # Check file exist
  659. for f in desFilelist:
  660. if f["Key"] == prefix_and_key and \
  661. (srcfile["Size"] == f["Size"]):
  662. logger.info(f'Duplicated. {prefix_and_key} same size, goto next file.')
  663. return
  664. # 找不到文件,或文件Size不一致 Submit upload
  665. if JobType == 'LOCAL_TO_S3':
  666. uploadThread_small(srcfile, prefix_and_key)
  667. elif JobType == 'S3_TO_S3':
  668. download_uploadThread_small(srcfile["Key"])
  669. elif JobType == 'ALIOSS_TO_S3':
  670. alioss_download_uploadThread_small(srcfile["Key"])
  671. return
  672. # Compare file exist on desination bucket
  673. def check_file_exist(*, srcfile, desFilelist, UploadIdList):
  674. # 检查源文件是否在目标文件夹中
  675. prefix_and_key = srcfile["Key"]
  676. if JobType == 'LOCAL_TO_S3':
  677. prefix_and_key = str(PurePosixPath(S3Prefix) / srcfile["Key"])
  678. for f in desFilelist:
  679. if f["Key"] == prefix_and_key and \
  680. (srcfile["Size"] == f["Size"]):
  681. return 'NEXT' # 文件完全相同
  682. # 找不到文件,或文件不一致,要重新传的
  683. # 查Key是否有未完成的UploadID
  684. keyIDList = []
  685. for u in UploadIdList:
  686. if u["Key"] == prefix_and_key:
  687. keyIDList.append(u)
  688. # 如果找不到上传过的Upload,则从头开始传
  689. if not keyIDList:
  690. return 'UPLOAD'
  691. # 对同一个Key(文件)的不同Upload找出时间最晚的值
  692. UploadID_latest = keyIDList[0]
  693. for u in keyIDList:
  694. if u["Initiated"] > UploadID_latest["Initiated"]:
  695. UploadID_latest = u
  696. return UploadID_latest["UploadId"]
  697. # Check parts number exist on S3
  698. def checkPartnumberList(srcfile, uploadId):
  699. try:
  700. prefix_and_key = srcfile["Key"]
  701. if JobType == 'LOCAL_TO_S3':
  702. prefix_and_key = str(PurePosixPath(S3Prefix) / srcfile["Key"])
  703. partnumberList = []
  704. PartNumberMarker = 0
  705. IsTruncated = True
  706. while IsTruncated:
  707. response_uploadedList = s3_dest_client.list_parts(
  708. Bucket=DesBucket,
  709. Key=prefix_and_key,
  710. UploadId=uploadId,
  711. MaxParts=1000,
  712. PartNumberMarker=PartNumberMarker
  713. )
  714. NextPartNumberMarker = response_uploadedList['NextPartNumberMarker']
  715. IsTruncated = response_uploadedList['IsTruncated']
  716. if NextPartNumberMarker > 0:
  717. for partnumberObject in response_uploadedList["Parts"]:
  718. partnumberList.append(partnumberObject["PartNumber"])
  719. PartNumberMarker = NextPartNumberMarker
  720. if partnumberList: # 如果为0则表示没有查到已上传的Part
  721. logger.info("Found uploaded partnumber: " + json.dumps(partnumberList))
  722. except Exception as checkPartnumberList_err:
  723. logger.error("checkPartnumberList_err" + json.dumps(checkPartnumberList_err))
  724. input('PRESS ENTER TO QUIT')
  725. sys.exit(0)
  726. return partnumberList
  727. # split the file into a virtual part list of index, each index is the start point of the file
  728. def split(srcfile, ChunkSize):
  729. partnumber = 1
  730. indexList = [0]
  731. if int(srcfile["Size"] / ChunkSize) + 1 > 10000:
  732. ChunkSize = int(srcfile["Size"] / 10000) + 1024 # 对于大于10000分片的大文件,自动调整Chunksize
  733. logger.info(f'Size excess 10000 parts limit. Auto change ChunkSize to {ChunkSize}')
  734. while ChunkSize * partnumber < srcfile["Size"]: # 如果刚好是"=",则无需再分下一part,所以这里不能用"<="
  735. indexList.append(ChunkSize * partnumber)
  736. partnumber += 1
  737. return indexList, ChunkSize
  738. # upload parts in the list
  739. def uploadPart(*, uploadId, indexList, partnumberList, srcfile, ChunkSize_auto):
  740. partnumber = 1 # 当前循环要上传的Partnumber
  741. total = len(indexList)
  742. md5list = [hashlib.md5(b'')] * total
  743. complete_list = []
  744. # 线程池Start
  745. with futures.ThreadPoolExecutor(max_workers=MaxThread) as pool:
  746. for partStartIndex in indexList:
  747. # start to upload part
  748. if partnumber not in partnumberList:
  749. dryrun = False
  750. else:
  751. dryrun = True
  752. # upload 1 part/thread, or dryrun to only caculate md5
  753. if JobType == 'LOCAL_TO_S3':
  754. pool.submit(uploadThread,
  755. uploadId=uploadId,
  756. partnumber=partnumber,
  757. partStartIndex=partStartIndex,
  758. srcfileKey=srcfile["Key"],
  759. total=total,
  760. md5list=md5list,
  761. dryrun=dryrun,
  762. complete_list=complete_list,
  763. ChunkSize=ChunkSize_auto)
  764. elif JobType == 'S3_TO_S3':
  765. pool.submit(download_uploadThread,
  766. uploadId=uploadId,
  767. partnumber=partnumber,
  768. partStartIndex=partStartIndex,
  769. srcfileKey=srcfile["Key"],
  770. total=total,
  771. md5list=md5list,
  772. dryrun=dryrun,
  773. complete_list=complete_list,
  774. ChunkSize=ChunkSize_auto)
  775. elif JobType == 'ALIOSS_TO_S3':
  776. pool.submit(alioss_download_uploadThread,
  777. uploadId=uploadId,
  778. partnumber=partnumber,
  779. partStartIndex=partStartIndex,
  780. srcfileKey=srcfile["Key"],
  781. srcfileSize=srcfile["Size"],
  782. total=total,
  783. md5list=md5list,
  784. dryrun=dryrun,
  785. complete_list=complete_list,
  786. ChunkSize=ChunkSize_auto)
  787. partnumber += 1
  788. # 线程池End
  789. logger.info(f'All parts uploaded - {srcfile["Key"]} - size: {srcfile["Size"]}')
  790. # Local upload 的时候考虑传输过程中文件会变更的情况,重新扫描本地文件的MD5,而不是用之前读取的body去生成的md5list
  791. if ifVerifyMD5 and JobType == 'LOCAL_TO_S3':
  792. md5list = cal_md5list(indexList=indexList,
  793. srcfileKey=srcfile["Key"],
  794. ChunkSize=ChunkSize_auto)
  795. # 计算所有分片列表的总etag: cal_etag
  796. digests = b"".join(m.digest() for m in md5list)
  797. md5full = hashlib.md5(digests)
  798. cal_etag = '"%s-%s"' % (md5full.hexdigest(), len(md5list))
  799. return cal_etag
  800. # convert bytes to human readable string
  801. def size_to_str(size):
  802. def loop(integer, remainder, level):
  803. if integer >= 1024:
  804. remainder = integer % 1024
  805. integer //= 1024
  806. level += 1
  807. return loop(integer, remainder, level)
  808. else:
  809. return integer, round(remainder / 1024, 1), level
  810. units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
  811. integer, remainder, level = loop(int(size), 0, 0)
  812. if level+1 > len(units):
  813. level = -1
  814. return f'{integer+remainder} {units[level]}'
  815. # 本地文件重新计算一次MD5
  816. def cal_md5list(*, indexList, srcfileKey, ChunkSize):
  817. logger.info(f'Re-read local file to calculate MD5 again: {srcfileKey}')
  818. md5list = []
  819. with open(os.path.join(SrcDir, srcfileKey), 'rb') as data:
  820. for partStartIndex in indexList:
  821. data.seek(partStartIndex)
  822. chunkdata = data.read(ChunkSize)
  823. chunkdata_md5 = hashlib.md5(chunkdata)
  824. md5list.append(chunkdata_md5)
  825. return md5list
  826. # Single Thread Upload one part, from local to s3
  827. def uploadThread(*, uploadId, partnumber, partStartIndex, srcfileKey, total, md5list, dryrun, complete_list, ChunkSize):
  828. prefix_and_key = str(PurePosixPath(S3Prefix) / srcfileKey)
  829. if not dryrun:
  830. print(f'\033[0;32;1m--->Uploading\033[0m {srcfileKey} - {partnumber}/{total}')
  831. pstart_time = time.time()
  832. with open(os.path.join(SrcDir, srcfileKey), 'rb') as data:
  833. retryTime = 0
  834. while retryTime <= MaxRetry:
  835. try:
  836. data.seek(partStartIndex)
  837. chunkdata = data.read(ChunkSize)
  838. chunkdata_md5 = hashlib.md5(chunkdata)
  839. md5list[partnumber - 1] = chunkdata_md5
  840. if not dryrun:
  841. s3_dest_client.upload_part(
  842. Body=chunkdata,
  843. Bucket=DesBucket,
  844. Key=prefix_and_key,
  845. PartNumber=partnumber,
  846. UploadId=uploadId,
  847. ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
  848. )
  849. # 这里对单个part上传做了 MD5 校验,后面多part合并的时候会再做一次整个文件的
  850. break
  851. except Exception as err:
  852. retryTime += 1
  853. logger.info(f'UploadThreadFunc log: {srcfileKey} - {str(err)}')
  854. logger.info(f'Upload Fail - {srcfileKey} - Retry part - {partnumber} - Attempt - {retryTime}')
  855. if retryTime > MaxRetry:
  856. logger.error(f'Quit for Max retries: {retryTime}')
  857. input('PRESS ENTER TO QUIT')
  858. sys.exit(0)
  859. time.sleep(5 * retryTime) # 递增延迟重试
  860. complete_list.append(partnumber)
  861. pload_time = time.time() - pstart_time
  862. pload_bytes = len(chunkdata)
  863. pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
  864. if not dryrun:
  865. print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
  866. f'- {partnumber}/{total} \033[0;34;1m{len(complete_list) / total:.2%} - {pload_speed}\033[0m')
  867. return
  868. # download part from src. s3 and upload to dest. s3
  869. def download_uploadThread(*, uploadId, partnumber, partStartIndex, srcfileKey, total, md5list, dryrun, complete_list,
  870. ChunkSize):
  871. pstart_time = time.time()
  872. getBody, chunkdata_md5 = b'', b'' # init
  873. if ifVerifyMD5 or not dryrun:
  874. # 下载文件
  875. if not dryrun:
  876. print(f"\033[0;33;1m--->Downloading\033[0m {srcfileKey} - {partnumber}/{total}")
  877. else:
  878. print(f"\033[0;33;40m--->Downloading for verify MD5\033[0m {srcfileKey} - {partnumber}/{total}")
  879. retryTime = 0
  880. while retryTime <= MaxRetry:
  881. try:
  882. response_get_object = s3_src_client.get_object(
  883. Bucket=SrcBucket,
  884. Key=srcfileKey,
  885. Range="bytes=" + str(partStartIndex) + "-" + str(partStartIndex + ChunkSize - 1)
  886. )
  887. getBody = response_get_object["Body"].read()
  888. chunkdata_md5 = hashlib.md5(getBody)
  889. md5list[partnumber - 1] = chunkdata_md5
  890. break
  891. except Exception as err:
  892. retryTime += 1
  893. logger.warning(f"DownloadThreadFunc - {srcfileKey} - Exception log: {str(err)}")
  894. logger.warning(f"Download part fail, retry part: {partnumber} Attempts: {retryTime}")
  895. if retryTime > MaxRetry:
  896. logger.error(f"Quit for Max Download retries: {retryTime}")
  897. input('PRESS ENTER TO QUIT')
  898. sys.exit(0)
  899. time.sleep(5 * retryTime) # 递增延迟重试
  900. if not dryrun:
  901. # 上传文件
  902. print(f'\033[0;32;1m --->Uploading\033[0m {srcfileKey} - {partnumber}/{total}')
  903. retryTime = 0
  904. while retryTime <= MaxRetry:
  905. try:
  906. s3_dest_client.upload_part(
  907. Body=getBody,
  908. Bucket=DesBucket,
  909. Key=srcfileKey,
  910. PartNumber=partnumber,
  911. UploadId=uploadId,
  912. ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
  913. )
  914. break
  915. except Exception as err:
  916. retryTime += 1
  917. logger.warning(f"UploadThreadFunc - {srcfileKey} - Exception log: {str(err)}")
  918. logger.warning(f"Upload part fail, retry part: {partnumber} Attempts: {retryTime}")
  919. if retryTime > MaxRetry:
  920. logger.error(f"Quit for Max Upload retries: {retryTime}")
  921. input('PRESS ENTER TO QUIT')
  922. sys.exit(0)
  923. time.sleep(5 * retryTime) # 递增延迟重试
  924. complete_list.append(partnumber)
  925. pload_time = time.time() - pstart_time
  926. pload_bytes = len(getBody)
  927. pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
  928. if not dryrun:
  929. print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
  930. f'- {partnumber}/{total} \033[0;34;1m{len(complete_list) / total:.2%} - {pload_speed}\033[0m')
  931. return
  932. # download part from src. ali_oss and upload to dest. s3
  933. def alioss_download_uploadThread(*, uploadId, partnumber, partStartIndex, srcfileKey, srcfileSize, total, md5list,
  934. dryrun, complete_list, ChunkSize):
  935. pstart_time = time.time()
  936. getBody, chunkdata_md5 = b'', b'' # init
  937. if ifVerifyMD5 or not dryrun:
  938. # 下载文件
  939. if not dryrun:
  940. print(f"\033[0;33;1m--->Downloading\033[0m {srcfileKey} - {partnumber}/{total}")
  941. else:
  942. print(f"\033[0;33;40m--->Downloading for verify MD5\033[0m {srcfileKey} - {partnumber}/{total}")
  943. retryTime = 0
  944. while retryTime <= MaxRetry:
  945. try:
  946. partEndIndex = partStartIndex + ChunkSize - 1
  947. if partEndIndex >= srcfileSize:
  948. partEndIndex = srcfileSize - 1
  949. # Ali OSS 如果range结尾超出范围会变成从头开始下载全部(什么脑子?),所以必须人工修改为FileSize-1
  950. # 而S3或本地硬盘超出范围只会把结尾指针改为最后一个字节
  951. response_get_object = ali_bucket.get_object(
  952. key=srcfileKey,
  953. byte_range=(partStartIndex, partEndIndex)
  954. )
  955. getBody = b''
  956. for chunk in response_get_object:
  957. if chunk != '':
  958. getBody += chunk
  959. chunkdata_md5 = hashlib.md5(getBody)
  960. md5list[partnumber - 1] = chunkdata_md5
  961. break
  962. except Exception as err:
  963. retryTime += 1
  964. logger.warning(f"DownloadThreadFunc - {srcfileKey} - Exception log: {str(err)}")
  965. logger.warning(f"Download part fail, retry part: {partnumber} Attempts: {retryTime}")
  966. if retryTime > MaxRetry:
  967. logger.error(f"Quit for Max Download retries: {retryTime}")
  968. input('PRESS ENTER TO QUIT')
  969. sys.exit(0)
  970. time.sleep(5 * retryTime) # 递增延迟重试
  971. if not dryrun:
  972. # 上传文件
  973. print(f'\033[0;32;1m --->Uploading\033[0m {srcfileKey} - {partnumber}/{total}')
  974. retryTime = 0
  975. while retryTime <= MaxRetry:
  976. try:
  977. s3_dest_client.upload_part(
  978. Body=getBody,
  979. Bucket=DesBucket,
  980. Key=srcfileKey,
  981. PartNumber=partnumber,
  982. UploadId=uploadId,
  983. ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
  984. )
  985. break
  986. except Exception as err:
  987. retryTime += 1
  988. logger.warning(f"UploadThreadFunc - {srcfileKey} - Exception log: {str(err)}")
  989. logger.warning(f"Upload part fail, retry part: {partnumber} Attempts: {retryTime}")
  990. if retryTime > MaxRetry:
  991. logger.error(f"Quit for Max Download retries: {retryTime}")
  992. input('PRESS ENTER TO QUIT')
  993. sys.exit(0)
  994. time.sleep(5 * retryTime) # 递增延迟重试
  995. complete_list.append(partnumber)
  996. pload_time = time.time() - pstart_time
  997. pload_bytes = len(getBody)
  998. pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
  999. if not dryrun:
  1000. print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
  1001. f'- {partnumber}/{total} \033[0;34;1m{len(complete_list) / total:.2%} - {pload_speed}\033[0m')
  1002. return
  1003. # Complete multipart upload, get uploadedListParts from S3 and construct completeStructJSON
  1004. def completeUpload(*, reponse_uploadId, srcfileKey, len_indexList):
  1005. # 查询S3的所有Part列表uploadedListParts构建completeStructJSON
  1006. prefix_and_key = srcfileKey
  1007. if JobType == 'LOCAL_TO_S3':
  1008. prefix_and_key = str(PurePosixPath(S3Prefix) / srcfileKey)
  1009. uploadedListPartsClean = []
  1010. PartNumberMarker = 0
  1011. IsTruncated = True
  1012. while IsTruncated:
  1013. response_uploadedList = s3_dest_client.list_parts(
  1014. Bucket=DesBucket,
  1015. Key=prefix_and_key,
  1016. UploadId=reponse_uploadId,
  1017. MaxParts=1000,
  1018. PartNumberMarker=PartNumberMarker
  1019. )
  1020. NextPartNumberMarker = response_uploadedList['NextPartNumberMarker']
  1021. IsTruncated = response_uploadedList['IsTruncated']
  1022. if NextPartNumberMarker > 0:
  1023. for partObject in response_uploadedList["Parts"]:
  1024. ETag = partObject["ETag"]
  1025. PartNumber = partObject["PartNumber"]
  1026. addup = {
  1027. "ETag": ETag,
  1028. "PartNumber": PartNumber
  1029. }
  1030. uploadedListPartsClean.append(addup)
  1031. PartNumberMarker = NextPartNumberMarker
  1032. if len(uploadedListPartsClean) != len_indexList:
  1033. logger.warning(f'Uploaded parts size not match - {srcfileKey}')
  1034. input('PRESS ENTER TO QUIT')
  1035. sys.exit(0)
  1036. completeStructJSON = {"Parts": uploadedListPartsClean}
  1037. # S3合并multipart upload任务
  1038. response_complete = s3_dest_client.complete_multipart_upload(
  1039. Bucket=DesBucket,
  1040. Key=prefix_and_key,
  1041. UploadId=reponse_uploadId,
  1042. MultipartUpload=completeStructJSON
  1043. )
  1044. logger.info(f'Complete merge file {srcfileKey}')
  1045. return response_complete
  1046. # Compare local file list and s3 list
  1047. def compare_local_to_s3():
  1048. logger.info('Comparing destination and source ...')
  1049. fileList = get_local_file_list(str_key=True)
  1050. desFilelist = get_s3_file_list(s3_client=s3_dest_client,
  1051. bucket=DesBucket,
  1052. S3Prefix=S3Prefix,
  1053. no_prefix=True)
  1054. deltaList = []
  1055. for source_file in fileList:
  1056. if source_file not in desFilelist:
  1057. deltaList.append(source_file)
  1058. if not deltaList:
  1059. logger.warning('All source files are in destination Bucket/Prefix. Job well done.')
  1060. else:
  1061. logger.warning(f'There are {len(deltaList)} files not in destination or not the same size. List:')
  1062. for delta_file in deltaList:
  1063. logger.warning(str(delta_file))
  1064. return
  1065. # Compare S3 buckets
  1066. def compare_buckets():
  1067. logger.info('Comparing destination and source ...')
  1068. deltaList = []
  1069. desFilelist = get_s3_file_list(s3_client=s3_dest_client,
  1070. bucket=DesBucket,
  1071. S3Prefix=S3Prefix)
  1072. if JobType == 'S3_TO_S3':
  1073. if SrcFileIndex == "*":
  1074. fileList = get_s3_file_list(s3_client=s3_src_client,
  1075. bucket=SrcBucket,
  1076. S3Prefix=S3Prefix)
  1077. else:
  1078. fileList = head_s3_single_file(s3_src_client, SrcBucket)
  1079. elif JobType == 'ALIOSS_TO_S3':
  1080. if SrcFileIndex == "*":
  1081. fileList = get_ali_oss_file_list(ali_bucket)
  1082. else:
  1083. fileList = head_oss_single_file(ali_bucket)
  1084. else:
  1085. return
  1086. for source_file in fileList:
  1087. if source_file not in desFilelist:
  1088. deltaList.append(source_file)
  1089. if not deltaList:
  1090. logger.warning('All source files are in destination Bucket/Prefix. Job well done.')
  1091. else:
  1092. logger.warning(f'There are {len(deltaList)} files not in destination or not the same size. List:')
  1093. for delta_file in deltaList:
  1094. logger.warning(json.dumps(delta_file))
  1095. return
  1096. # Main
  1097. if __name__ == '__main__':
  1098. start_time = datetime.datetime.now()
  1099. ChunkSize_default = set_config()
  1100. logger, log_file_name = set_log()
  1101. # Define s3 client
  1102. s3_config = Config(max_pool_connections=200)
  1103. s3_dest_client = Session(profile_name=DesProfileName).client('s3', config=s3_config)
  1104. # Check destination S3 writable
  1105. try:
  1106. logger.info(f'Checking write permission for: {DesBucket}')
  1107. s3_dest_client.put_object(
  1108. Bucket=DesBucket,
  1109. Key=str(PurePosixPath(S3Prefix) / 'access_test'),
  1110. Body='access_test_content'
  1111. )
  1112. except Exception as e:
  1113. logger.error(f'Can not write to {DesBucket}/{S3Prefix}, {str(e)}')
  1114. input('PRESS ENTER TO QUIT')
  1115. sys.exit(0)
  1116. # 获取源文件列表
  1117. logger.info('Get source file list')
  1118. src_file_list = []
  1119. if JobType == "LOCAL_TO_S3":
  1120. SrcDir = str(Path(SrcDir))
  1121. src_file_list = get_local_file_list()
  1122. elif JobType == "S3_TO_S3":
  1123. s3_src_client = Session(profile_name=SrcProfileName).client('s3', config=s3_config)
  1124. if SrcFileIndex == "*":
  1125. src_file_list = get_s3_file_list(s3_client=s3_src_client,
  1126. bucket=SrcBucket,
  1127. S3Prefix=S3Prefix)
  1128. else:
  1129. src_file_list = head_s3_single_file(s3_src_client, SrcBucket)
  1130. elif JobType == 'ALIOSS_TO_S3':
  1131. import oss2
  1132. ali_bucket = oss2.Bucket(oss2.Auth(ali_access_key_id, ali_access_key_secret), ali_endpoint, ali_SrcBucket)
  1133. if SrcFileIndex == "*":
  1134. src_file_list = get_ali_oss_file_list(ali_bucket)
  1135. else:
  1136. src_file_list = head_oss_single_file(ali_bucket)
  1137. # 获取目标s3现存文件列表
  1138. des_file_list = get_s3_file_list(s3_client=s3_dest_client,
  1139. bucket=DesBucket,
  1140. S3Prefix=S3Prefix)
  1141. # 获取Bucket中所有未完成的Multipart Upload
  1142. multipart_uploaded_list = get_uploaded_list(s3_dest_client)
  1143. # 是否清理所有未完成的Multipart Upload, 用于强制重传
  1144. if multipart_uploaded_list:
  1145. logger.warning(f'{len(multipart_uploaded_list)} Unfinished upload, clean them and restart?')
  1146. logger.warning('NOTICE: IF CLEAN, YOU CANNOT RESUME ANY UNFINISHED UPLOAD')
  1147. if not DontAskMeToClean:
  1148. keyboard_input = input("CLEAN unfinished upload and restart(input CLEAN) or resume loading(press enter)? "
  1149. "Please confirm: (n/CLEAN)")
  1150. else:
  1151. keyboard_input = 'no'
  1152. if keyboard_input == 'CLEAN':
  1153. # 清理所有未完成的Upload
  1154. for clean_i in multipart_uploaded_list:
  1155. s3_dest_client.abort_multipart_upload(
  1156. Bucket=DesBucket,
  1157. Key=clean_i["Key"],
  1158. UploadId=clean_i["UploadId"]
  1159. )
  1160. multipart_uploaded_list = []
  1161. logger.info('CLEAN FINISHED')
  1162. else:
  1163. logger.info('You choose not to clean, now try to resume unfinished upload')
  1164. # 对文件列表中的逐个文件进行上传操作
  1165. with futures.ThreadPoolExecutor(max_workers=MaxParallelFile) as file_pool:
  1166. for src_file in src_file_list:
  1167. file_pool.submit(upload_file,
  1168. srcfile=src_file,
  1169. desFilelist=des_file_list,
  1170. UploadIdList=multipart_uploaded_list,
  1171. ChunkSize_default=ChunkSize_default)
  1172. # 再次获取源文件列表和目标文件夹现存文件列表进行比较,每个文件大小一致,输出比较结果
  1173. time_str = str(datetime.datetime.now() - start_time)
  1174. if JobType == 'S3_TO_S3':
  1175. str_from = f'{SrcBucket}/{S3Prefix}'
  1176. compare_buckets()
  1177. elif JobType == 'ALIOSS_TO_S3':
  1178. str_from = f'{ali_SrcBucket}/{S3Prefix}'
  1179. compare_buckets()
  1180. elif JobType == 'LOCAL_TO_S3':
  1181. str_from = f'{SrcDir}'
  1182. compare_local_to_s3()
  1183. else:
  1184. str_from = ""
  1185. print(f'\033[0;34;1mMISSION ACCOMPLISHED - Time: {time_str} \033[0m - FROM: {str_from} TO {DesBucket}/{S3Prefix}')
  1186. print('Logged to file:', os.path.abspath(log_file_name))
  1187. input('PRESS ENTER TO QUIT')