requestPayer-exampleCodeFrom-丁可_s3_download.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. # -*- coding: utf-8 -*-
  2. # PROJECT LONGBOW - AMAZON S3 DOWNLOAD TOOL WITH BREAK-POINT RESUMING
  3. import os
  4. import sys
  5. import json
  6. from boto3.session import Session
  7. from botocore.client import Config
  8. from concurrent import futures
  9. from configparser import ConfigParser, RawConfigParser, NoOptionError
  10. import uuid
  11. import datetime
  12. import logging
  13. from pathlib import PurePosixPath, Path
  14. import platform
  15. import codecs
  16. import sqlite3
  17. import time
  18. os.system("") # workaround for some windows system to print color
  19. global SrcBucket, S3Prefix, SrcFileIndex, SrcProfileName, DesDir, MaxRetry, MaxThread, MaxParallelFile, LoggingLevel
  20. # Read config.ini with GUI
  21. def set_config():
  22. sys_para = sys.argv
  23. file_path = os.path.split(sys_para[0])[0]
  24. gui = False
  25. if platform.uname()[0] == 'Windows': # Win默认打开
  26. gui = True
  27. if platform.uname()[0] == 'Linux': # Linux 默认关闭
  28. gui = False
  29. if '--gui' in sys.argv: # 指定 gui 模式
  30. gui = True
  31. if '--nogui' in sys.argv: # 带 nogui 就覆盖前面Win打开要求
  32. gui = False
  33. config_file = os.path.join(file_path, 's3_download_config.ini')
  34. # If no config file, read the default config
  35. if not os.path.exists(config_file):
  36. config_file += '.default'
  37. print("No customized config, use the default config")
  38. cfg = ConfigParser()
  39. print(f'Reading config file: {config_file}')
  40. try:
  41. global SrcBucket, S3Prefix, SrcFileIndex, SrcProfileName, DesDir, MaxRetry, MaxThread, MaxParallelFile, LoggingLevel
  42. cfg.read(config_file, encoding='utf-8-sig')
  43. SrcBucket = cfg.get('Basic', 'SrcBucket')
  44. S3Prefix = cfg.get('Basic', 'S3Prefix')
  45. SrcFileIndex = cfg.get('Basic', 'SrcFileIndex')
  46. SrcProfileName = cfg.get('Basic', 'SrcProfileName')
  47. DesDir = cfg.get('Basic', 'DesDir')
  48. Megabytes = 1024 * 1024
  49. ChunkSize = cfg.getint('Advanced', 'ChunkSize') * Megabytes
  50. MaxRetry = cfg.getint('Advanced', 'MaxRetry')
  51. MaxThread = cfg.getint('Advanced', 'MaxThread')
  52. MaxParallelFile = cfg.getint('Advanced', 'MaxParallelFile')
  53. LoggingLevel = cfg.get('Advanced', 'LoggingLevel')
  54. except Exception as e:
  55. print("ERR loading s3_download_config.ini", str(e))
  56. input('PRESS ENTER TO QUIT')
  57. sys.exit(0)
  58. if gui:
  59. # For GUI
  60. from tkinter import Tk, filedialog, END, StringVar, BooleanVar, messagebox
  61. from tkinter.ttk import Combobox, Label, Button, Entry, Spinbox, Checkbutton
  62. # get profile name list in ./aws/credentials
  63. pro_conf = RawConfigParser()
  64. pro_path = os.path.join(os.path.expanduser("~"), ".aws")
  65. cre_path = os.path.join(pro_path, "credentials")
  66. if os.path.exists(cre_path):
  67. pro_conf.read(cre_path)
  68. profile_list = pro_conf.sections()
  69. else:
  70. print(f"There is no aws_access_key in {cre_path}, please input for S3 Bucket: ")
  71. os.mkdir(pro_path)
  72. aws_access_key_id = input('aws_access_key_id: ')
  73. aws_secret_access_key = input('aws_secret_access_key: ')
  74. region = input('region: ')
  75. pro_conf.add_section('default')
  76. pro_conf['default']['aws_access_key_id'] = aws_access_key_id
  77. pro_conf['default']['aws_secret_access_key'] = aws_secret_access_key
  78. pro_conf['default']['region'] = region
  79. profile_list = ['default']
  80. with open(cre_path, 'w') as f:
  81. print(f"Saving credentials to {cre_path}")
  82. pro_conf.write(f)
  83. # Click Select Folder
  84. def browse_folder():
  85. local_dir = filedialog.askdirectory(initialdir=os.path.dirname(__file__))
  86. url_txt.delete(0, END)
  87. url_txt.insert(0, local_dir)
  88. file_txt.delete(0, END)
  89. file_txt.insert(0, "*")
  90. # Finsih browse folder
  91. # Click List Buckets
  92. def ListBuckets(*args):
  93. SrcProfileName = SrcProfileName_txt.get()
  94. client = Session(profile_name=SrcProfileName).client('s3')
  95. bucket_list = []
  96. try:
  97. response = client.list_buckets()
  98. if 'Buckets' in response:
  99. bucket_list = [b['Name'] for b in response['Buckets']]
  100. except Exception as e:
  101. messagebox.showerror('Error', f'Failt to List buckets. \n'
  102. f'Please verify your aws_access_key of profile: [{SrcProfileName}]\n'
  103. f'{str(e)}')
  104. bucket_list = ['CAN_NOT_GET_BUCKET_LIST']
  105. SrcBucket_txt['values'] = bucket_list
  106. SrcBucket_txt.current(0)
  107. # Finish ListBuckets
  108. # Click List Prefix
  109. def ListPrefix(*args):
  110. SrcProfileName = SrcProfileName_txt.get()
  111. client = Session(profile_name=SrcProfileName).client('s3')
  112. prefix_list = []
  113. this_bucket = SrcBucket_txt.get()
  114. max_get = 100
  115. try:
  116. response = client.list_objects_v2(
  117. Bucket=this_bucket,
  118. Delimiter='/',
  119. RequestPayer='requester'
  120. ) # Only get the max 1000 prefix for simply list
  121. if 'CommonPrefixes' in response:
  122. prefix_list = [c['Prefix'] for c in response['CommonPrefixes']]
  123. if not prefix_list:
  124. messagebox.showinfo('Message', f'There is no "/" Prefix in: {this_bucket}')
  125. if response['IsTruncated']:
  126. messagebox.showinfo('Message', f'More than {max_get} Prefix, cannot fully list here.')
  127. except Exception as e:
  128. messagebox.showinfo('Error', f'Cannot get prefix list from bucket: {this_bucket}, {str(e)}')
  129. S3Prefix_txt['values'] = prefix_list
  130. S3Prefix_txt.current(0)
  131. # Finish list prefix
  132. def browse_file(*args):
  133. SrcProfileName = SrcProfileName_txt.get()
  134. S3Prefix = S3Prefix_txt.get()
  135. client = Session(profile_name=SrcProfileName).client('s3')
  136. file_list = []
  137. this_bucket = SrcBucket_txt.get()
  138. max_get = 100
  139. try:
  140. response = client.list_objects_v2(
  141. Bucket=this_bucket,
  142. Prefix=str(PurePosixPath(S3Prefix))+'/',
  143. RequestPayer='requester',
  144. Delimiter='/'
  145. ) # Only get the max 1000 files for simply list
  146. # For delete prefix in des_prefix
  147. if S3Prefix == '' or S3Prefix == '/':
  148. # 目的bucket没有设置 Prefix
  149. dp_len = 0
  150. else:
  151. # 目的bucket的 "prefix/"长度
  152. dp_len = len(str(PurePosixPath(S3Prefix)))+1
  153. if 'Contents' in response:
  154. file_list = [c['Key'][dp_len:] for c in response['Contents']] # 去掉Prefix
  155. if not file_list:
  156. messagebox.showinfo('Message', f'There is no files in s3://{this_bucket}/{S3Prefix}')
  157. if response['IsTruncated']:
  158. messagebox.showinfo('Message', f'More than {max_get} files, cannot fully list here.')
  159. except Exception as e:
  160. messagebox.showinfo('Error', f'Cannot get file list from bucket s3://{this_bucket}/{S3Prefix}, {str(e)}')
  161. file_txt['values'] = file_list
  162. file_txt.current(0)
  163. # Finish list files
  164. # Click START button
  165. def close():
  166. window.withdraw()
  167. ok = messagebox.askokcancel('Start downloading job',
  168. f'DOWNLOAD FROM s3://{SrcBucket_txt.get()}/{S3Prefix_txt.get()}\n'
  169. f'TO LOCAL {url_txt.get()}\n'
  170. f'Click OK to START')
  171. if not ok:
  172. window.deiconify()
  173. return
  174. window.quit()
  175. return
  176. # Finish close()
  177. # Start GUI
  178. window = Tk()
  179. window.title("LONGBOW - AMAZON S3 DOWNLOAD TOOL WITH BREAK-POINT RESUMING")
  180. window.geometry('705x350')
  181. window.configure(background='#ECECEC')
  182. window.protocol("WM_DELETE_WINDOW", sys.exit)
  183. Label(window, text="S3 Bucket").grid(column=0, row=1, sticky='w', padx=2, pady=2)
  184. SrcBucket_txt = Combobox(window, width=48)
  185. SrcBucket_txt.grid(column=1, row=1, sticky='w', padx=2, pady=2)
  186. SrcBucket_txt['values'] = SrcBucket
  187. SrcBucket_txt.current(0)
  188. Button(window, text="List Buckets", width=10, command=ListBuckets) \
  189. .grid(column=2, row=1, sticky='w', padx=2, pady=2)
  190. Label(window, text="S3 Prefix").grid(column=0, row=2, sticky='w', padx=2, pady=2)
  191. S3Prefix_txt = Combobox(window, width=48)
  192. S3Prefix_txt.grid(column=1, row=2, sticky='w', padx=2, pady=2)
  193. S3Prefix_txt['values'] = S3Prefix
  194. if S3Prefix != '':
  195. S3Prefix_txt.current(0)
  196. Button(window, text="List Prefix", width=10, command=ListPrefix) \
  197. .grid(column=2, row=2, sticky='w', padx=2, pady=2)
  198. Label(window, text="Filename or *").grid(column=0, row=3, sticky='w', padx=2, pady=2)
  199. file_txt = Combobox(window, width=48)
  200. file_txt.grid(column=1, row=3, sticky='w', padx=2, pady=2)
  201. file_txt['values'] = SrcFileIndex
  202. if SrcFileIndex != '':
  203. file_txt.current(0)
  204. Button(window, text="Select File", width=10, command=browse_file) \
  205. .grid(column=2, row=3, sticky='w', padx=2, pady=2)
  206. Label(window, text="AWS Profile").grid(column=0, row=4, sticky='w', padx=2, pady=2)
  207. SrcProfileName_txt = Combobox(window, width=15, state="readonly")
  208. SrcProfileName_txt['values'] = tuple(profile_list)
  209. SrcProfileName_txt.grid(column=1, row=4, sticky='w', padx=2, pady=2)
  210. if SrcProfileName in profile_list:
  211. position = profile_list.index(SrcProfileName)
  212. SrcProfileName_txt.current(position)
  213. else:
  214. SrcProfileName_txt.current(0)
  215. SrcProfileName = SrcProfileName_txt.get()
  216. SrcProfileName_txt.bind("<<ComboboxSelected>>", ListBuckets)
  217. Label(window, text="Folder").grid(column=0, row=5, sticky='w', padx=2, pady=2)
  218. url_txt = Entry(window, width=50)
  219. url_txt.grid(column=1, row=5, sticky='w', padx=2, pady=2)
  220. url_btn = Button(window, text="Select Folder", width=10, command=browse_folder)
  221. url_btn.grid(column=2, row=5, sticky='w', padx=2, pady=2)
  222. url_txt.insert(0, DesDir)
  223. Label(window, text="MaxThread/File").grid(column=0, row=6, sticky='w', padx=2, pady=2)
  224. if MaxThread < 1 or MaxThread > 100:
  225. MaxThread = 5
  226. var_t = StringVar()
  227. var_t.set(str(MaxThread))
  228. MaxThread_txt = Spinbox(window, from_=1, to=100, width=15, textvariable=var_t)
  229. MaxThread_txt.grid(column=1, row=6, sticky='w', padx=2, pady=2)
  230. Label(window, text="MaxParallelFile").grid(column=0, row=7, sticky='w', padx=2, pady=2)
  231. if MaxParallelFile < 1 or MaxParallelFile > 100:
  232. MaxParallelFile = 5
  233. var_f = StringVar()
  234. var_f.set(str(MaxParallelFile))
  235. MaxParallelFile_txt = Spinbox(window, from_=1, to=100, width=15, textvariable=var_f)
  236. MaxParallelFile_txt.grid(column=1, row=7, sticky='w', padx=2, pady=2)
  237. save_config = BooleanVar()
  238. save_config.set(True)
  239. save_config_txt = Checkbutton(window, text="Save to s3_download_config.ini", var=save_config)
  240. save_config_txt.grid(column=1, row=9, padx=2, pady=2)
  241. Button(window, text="Start Download", width=15, command=close).grid(column=1, row=10, padx=5, pady=5)
  242. window.mainloop()
  243. DesDir = url_txt.get()
  244. SrcFileIndex = file_txt.get()
  245. SrcBucket = SrcBucket_txt.get()
  246. S3Prefix = S3Prefix_txt.get()
  247. SrcProfileName = SrcProfileName_txt.get()
  248. MaxThread = int(MaxThread_txt.get())
  249. MaxParallelFile = int(MaxParallelFile_txt.get())
  250. if save_config:
  251. cfg['Basic']['SrcBucket'] = SrcBucket
  252. cfg['Basic']['S3Prefix'] = S3Prefix
  253. cfg['Basic']['SrcFileIndex'] = SrcFileIndex
  254. cfg['Basic']['SrcProfileName'] = SrcProfileName
  255. cfg['Basic']['DesDir'] = DesDir
  256. cfg['Advanced']['MaxThread'] = str(MaxThread)
  257. cfg['Advanced']['MaxParallelFile'] = str(MaxParallelFile)
  258. config_file = os.path.join(file_path, 's3_download_config.ini')
  259. with codecs.open(config_file, 'w', 'utf-8') as f:
  260. cfg.write(f)
  261. print(f"Save config to {config_file}")
  262. # GUI window finish
  263. if S3Prefix == '/':
  264. S3Prefix = ''
  265. # Finish set_config()
  266. return ChunkSize
  267. # Configure logging
  268. def set_log():
  269. logger = logging.getLogger()
  270. # File logging
  271. if not os.path.exists("./log"):
  272. os.mkdir("log")
  273. this_file_name = os.path.splitext(os.path.basename(__file__))[0]
  274. file_time = datetime.datetime.now().isoformat().replace(':', '-')[:19]
  275. log_file_name = './log/' + this_file_name + '-' + file_time + '.log'
  276. print('Logging to file:', os.path.abspath(log_file_name))
  277. print('Logging level:', LoggingLevel)
  278. fileHandler = logging.FileHandler(filename=log_file_name, encoding='utf-8')
  279. fileHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
  280. logger.addHandler(fileHandler)
  281. # Screen stream logging
  282. streamHandler = logging.StreamHandler()
  283. streamHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
  284. logger.addHandler(streamHandler)
  285. # Loggin Level
  286. logger.setLevel(logging.WARNING)
  287. if LoggingLevel == 'INFO':
  288. logger.setLevel(logging.INFO)
  289. elif LoggingLevel == 'DEBUG':
  290. logger.setLevel(logging.DEBUG)
  291. return logger, log_file_name
  292. # Get object list on S3
  293. def get_s3_file_list(s3_client, bucket):
  294. logger.info('Get s3 file list ' + bucket)
  295. paginator = s3_client.get_paginator('list_objects_v2')
  296. __des_file_list = []
  297. try:
  298. response_iterator = paginator.paginate(
  299. Bucket=bucket,
  300. Prefix=S3Prefix,
  301. RequestPayer='requester'
  302. )
  303. for page in response_iterator:
  304. if "Contents" in page:
  305. for n in page["Contents"]:
  306. key = n["Key"]
  307. __des_file_list.append({
  308. "Key": key,
  309. "Size": n["Size"]
  310. })
  311. logger.info(f'Bucket list length:{str(len(__des_file_list))}')
  312. except Exception as err:
  313. logger.error(str(err))
  314. input('PRESS ENTER TO QUIT')
  315. sys.exit(0)
  316. return __des_file_list
  317. # Check single file on S3
  318. def head_s3_single_file(s3_client, bucket):
  319. try:
  320. response_fileList = s3_client.head_object(
  321. Bucket=bucket,
  322. Key=str(PurePosixPath(S3Prefix)/SrcFileIndex)
  323. )
  324. file = [{
  325. "Key": str(PurePosixPath(S3Prefix)/SrcFileIndex),
  326. "Size": response_fileList["ContentLength"]
  327. }]
  328. except Exception as err:
  329. logger.error(str(err))
  330. input('PRESS ENTER TO QUIT')
  331. sys.exit(0)
  332. return file
  333. # split the file into a virtual part list of index, each index is the start point of the file
  334. def split(srcfile, ChunkSize):
  335. partnumber = 1
  336. indexList = [0]
  337. if int(srcfile["Size"] / ChunkSize) + 1 > 10000:
  338. ChunkSize = int(srcfile["Size"] / 10000) + 1024 # 对于大于10000分片的大文件,自动调整Chunksize
  339. logger.info(f'Size excess 10000 parts limit. Auto change ChunkSize to {ChunkSize}')
  340. while ChunkSize * partnumber < srcfile["Size"]: # 如果刚好是"=",则无需再分下一part,所以这里不能用"<="
  341. indexList.append(ChunkSize * partnumber)
  342. partnumber += 1
  343. return indexList, ChunkSize
  344. def size_to_str(size):
  345. def loop(integer, remainder, level):
  346. if integer >= 1024:
  347. remainder = integer % 1024
  348. integer //= 1024
  349. level += 1
  350. return loop(integer, remainder, level)
  351. else:
  352. return integer, round(remainder / 1024, 1), level
  353. units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
  354. integer, remainder, level = loop(int(size), 0, 0)
  355. if level+1 > len(units):
  356. level = -1
  357. return f'{integer+remainder} {units[level]}'
  358. def download_thread(partnumber, partStartIndex, srcfileKey, total, complete_list, ChunkSize, wfile):
  359. try:
  360. logger.info(f'Downloading {srcfileKey} - {partnumber}/{total}')
  361. pstart_time = time.time()
  362. response_get_object = s3_src_client.get_object(
  363. Bucket=SrcBucket,
  364. Key=srcfileKey,
  365. RequestPayer='requester',
  366. Range="bytes=" + str(partStartIndex) + "-" + str(partStartIndex + ChunkSize - 1)
  367. )
  368. getBody = response_get_object["Body"].read()
  369. complete_list.append(partnumber)
  370. pload_time = time.time() - pstart_time
  371. pload_bytes = len(getBody)
  372. pload_speed = size_to_str(int(pload_bytes/pload_time)) + "/s"
  373. # 写入文件
  374. wfile.seek(partStartIndex)
  375. wfile.write(getBody)
  376. print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
  377. f'- {partnumber}/{total}\033[0;34;1m {len(complete_list) / total:.2%} - {pload_speed}\033[0m')
  378. # 写入partnumber数据库
  379. dir_and_key = Path(DesDir) / srcfileKey
  380. try:
  381. with sqlite3.connect('s3_download.db') as db:
  382. cursor = db.cursor()
  383. uuid1 = uuid.uuid1()
  384. cursor.execute(f"INSERT INTO S3P (ID, BUCKET, KEY, PARTNUMBER) "
  385. f"VALUES ('{uuid1}', '{SrcBucket}', '{dir_and_key.as_uri()}', {partnumber})")
  386. db.commit()
  387. logger.info(f'Download part completed. Write to DB {srcfileKey} - {partnumber}/{total}')
  388. except Exception as e:
  389. logger.warning(f'Fail to insert DB: {dir_and_key.as_uri()}, {str(e)}')
  390. except Exception as e:
  391. logger.warning(f'Fail to download {srcfileKey} - {partnumber}/{total}. {str(e)}')
  392. return
  393. def download_part(indexList, partnumberList, srcfile, ChunkSize_auto, wfile):
  394. partnumber = 1 # 当前循环要上传的Partnumber
  395. total = len(indexList)
  396. complete_list = []
  397. # 线程池Start
  398. with futures.ThreadPoolExecutor(max_workers=MaxThread) as pool:
  399. for partStartIndex in indexList:
  400. # start to download part
  401. if partnumber not in partnumberList:
  402. pool.submit(download_thread, partnumber, partStartIndex, srcfile["Key"], total,
  403. complete_list, ChunkSize_auto, wfile)
  404. else:
  405. complete_list.append(partnumber)
  406. partnumber += 1
  407. # 线程池End
  408. logger.info(f'All parts downloaded - {srcfile["Key"]} - size: {srcfile["Size"]}')
  409. return
  410. # 创建文件目录结构
  411. def create_dir(file_dir):
  412. parent = file_dir.parent
  413. if not Path.exists(parent):
  414. create_dir(parent)
  415. try:
  416. Path.mkdir(file_dir)
  417. except Exception as e:
  418. logger.error(f'Fail to mkdir {str(e)}')
  419. # Download file
  420. def download_file(srcfile, ChunkSize_default):
  421. logger.info(f'Start file: {srcfile["Key"]}')
  422. dir_and_key = Path(DesDir) / srcfile["Key"]
  423. if Path.exists(dir_and_key):
  424. if dir_and_key.stat().st_size == srcfile["Size"] or dir_and_key.is_dir():
  425. logger.info(f'Duplicated: {dir_and_key.as_uri()} same size, goto next file.')
  426. return
  427. # 创建文件目录结构
  428. path = dir_and_key.parent
  429. if not Path.exists(path):
  430. create_dir(path)
  431. # 如果是子目录就跳过下载
  432. if srcfile["Key"][-1] == '/':
  433. Path.mkdir(dir_and_key)
  434. logger.info(f'Create empty subfolder: {dir_and_key.as_uri()}')
  435. return
  436. # 获取已下载的 part number list
  437. partnumberList = []
  438. try:
  439. with sqlite3.connect('s3_download.db') as db:
  440. cursor = db.cursor()
  441. p_sql = cursor.execute(f"SELECT PARTNUMBER FROM S3P WHERE BUCKET='{SrcBucket}' AND KEY='{dir_and_key.as_uri()}'")
  442. db.commit()
  443. partnumberList = [d[0] for d in p_sql]
  444. logger.info(f'Got partnumberList {dir_and_key.as_uri()} - {json.dumps(partnumberList)}')
  445. except Exception as e:
  446. logger.error(f'Fail to select partnumber from DB. {str(e)}')
  447. # 获取索引列表,例如[0, 10, 20]
  448. indexList, ChunkSize_auto = split(srcfile, ChunkSize_default)
  449. # 执行download
  450. s3tmp_name = dir_and_key.with_suffix('.s3tmp')
  451. if Path.exists(s3tmp_name):
  452. mode = 'r+b'
  453. else:
  454. # 如果没有临时文件,或被删除了,则新建文件并将partnumberList清空
  455. mode = 'wb'
  456. partnumberList = []
  457. with open(s3tmp_name, mode) as wfile:
  458. download_part(indexList, partnumberList, srcfile, ChunkSize_auto, wfile)
  459. # 修改文件名.s3part,清理partnumber数据库
  460. s3tmp_name.rename(dir_and_key)
  461. try:
  462. with sqlite3.connect('s3_download.db') as db:
  463. cursor = db.cursor()
  464. cursor.execute(f"DELETE FROM S3P WHERE BUCKET='{SrcBucket}' AND KEY='{dir_and_key.as_uri()}'")
  465. db.commit()
  466. except Exception as e:
  467. logger.warning(f'Fail to clean DB: {dir_and_key.as_uri()}. {str(e)}')
  468. logger.info(f'Finsh: {srcfile["Key"]} TO {dir_and_key.as_uri()}')
  469. return
  470. # Compare local file list and s3 list
  471. def compare_local_to_s3():
  472. logger.info('Comparing destination and source ...')
  473. if SrcFileIndex == "*":
  474. s3Filelist = get_s3_file_list(s3_src_client, SrcBucket)
  475. else:
  476. s3Filelist = head_s3_single_file(s3_src_client, SrcBucket)
  477. deltaList = []
  478. for srcfile in s3Filelist:
  479. dir_and_key = Path(DesDir) / srcfile["Key"]
  480. # 文件不存在
  481. if not Path.exists(dir_and_key):
  482. deltaList.append(srcfile)
  483. continue
  484. # 文件大小
  485. if srcfile["Key"][-1] != '/':
  486. if srcfile["Size"] != dir_and_key.stat().st_size:
  487. deltaList.append(srcfile)
  488. continue
  489. if not deltaList:
  490. logger.info('All source files are in destination, job well done.')
  491. else:
  492. logger.warning(f'There are {len(deltaList)} files not in destination or not the same size. List:')
  493. logger.warning(str(deltaList))
  494. return
  495. # Main
  496. if __name__ == '__main__':
  497. start_time = datetime.datetime.now()
  498. ChunkSize_default = set_config()
  499. logger, log_file_name = set_log()
  500. # Define s3 client
  501. s3_config = Config(max_pool_connections=100, retries={'max_attempts': MaxRetry})
  502. s3_src_client = Session(profile_name=SrcProfileName).client('s3', config=s3_config)
  503. # Define DB table
  504. with sqlite3.connect('s3_download.db') as db:
  505. cursor = db.cursor()
  506. cursor.execute("CREATE TABLE IF NOT EXISTS S3P "
  507. "(ID TEXT PRIMARY KEY, "
  508. "BUCKET TEXT, "
  509. "KEY TEXT, "
  510. "PARTNUMBER INTEGER)")
  511. db.commit()
  512. # 获取源文件列表
  513. logger.info('Get source file list')
  514. if SrcFileIndex == "*":
  515. src_file_list = get_s3_file_list(s3_src_client, SrcBucket)
  516. else:
  517. src_file_list = head_s3_single_file(s3_src_client, SrcBucket)
  518. # 对文件列表中的逐个文件进行下载操作
  519. with futures.ThreadPoolExecutor(max_workers=MaxParallelFile) as file_pool:
  520. for src_file in src_file_list:
  521. file_pool.submit(download_file, src_file, ChunkSize_default)
  522. # 再次获取源文件列表和目标文件夹现存文件列表进行比较,每个文件大小一致,输出比较结果
  523. time_str = str(datetime.datetime.now() - start_time)
  524. compare_local_to_s3()
  525. print(f'\033[0;34;1mMISSION ACCOMPLISHED - Time: {time_str} \033[0m - FROM: {SrcBucket}/{S3Prefix} TO {DesDir}')
  526. print('Logged to file:', os.path.abspath(log_file_name))
  527. input('PRESS ENTER TO QUIT')