s3_download.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. # -*- coding: utf-8 -*-
  2. # PROJECT LONGBOW - AMAZON S3 DOWNLOAD TOOL WITH BREAK-POINT RESUMING
  3. import os
  4. import sys
  5. import json
  6. from boto3.session import Session
  7. from botocore.client import Config
  8. from concurrent import futures
  9. from configparser import ConfigParser, RawConfigParser, NoOptionError
  10. import uuid
  11. import datetime
  12. import logging
  13. from pathlib import PurePosixPath, Path
  14. import platform
  15. import codecs
  16. import sqlite3
  17. import time
  18. os.system("") # workaround for some windows system to print color
  19. global SrcBucket, S3Prefix, SrcFileIndex, SrcProfileName, DesDir, MaxRetry, MaxThread, MaxParallelFile, LoggingLevel
  20. # Read config.ini with GUI
  21. def set_config():
  22. sys_para = sys.argv
  23. file_path = os.path.split(sys_para[0])[0]
  24. gui = False
  25. if platform.uname()[0] == 'Windows': # Win默认打开
  26. gui = True
  27. if platform.uname()[0] == 'Linux': # Linux 默认关闭
  28. gui = False
  29. if '--gui' in sys.argv: # 指定 gui 模式
  30. gui = True
  31. if '--nogui' in sys.argv: # 带 nogui 就覆盖前面Win打开要求
  32. gui = False
  33. config_file = os.path.join(file_path, 's3_download_config.ini')
  34. # If no config file, read the default config
  35. if not os.path.exists(config_file):
  36. config_file += '.default'
  37. print("No customized config, use the default config")
  38. cfg = ConfigParser()
  39. print(f'Reading config file: {config_file}')
  40. try:
  41. global SrcBucket, S3Prefix, SrcFileIndex, SrcProfileName, DesDir, MaxRetry, MaxThread, MaxParallelFile, LoggingLevel
  42. cfg.read(config_file, encoding='utf-8-sig')
  43. SrcBucket = cfg.get('Basic', 'SrcBucket')
  44. S3Prefix = cfg.get('Basic', 'S3Prefix')
  45. SrcFileIndex = cfg.get('Basic', 'SrcFileIndex')
  46. SrcProfileName = cfg.get('Basic', 'SrcProfileName')
  47. DesDir = cfg.get('Basic', 'DesDir')
  48. Megabytes = 1024 * 1024
  49. ChunkSize = cfg.getint('Advanced', 'ChunkSize') * Megabytes
  50. MaxRetry = cfg.getint('Advanced', 'MaxRetry')
  51. MaxThread = cfg.getint('Advanced', 'MaxThread')
  52. MaxParallelFile = cfg.getint('Advanced', 'MaxParallelFile')
  53. LoggingLevel = cfg.get('Advanced', 'LoggingLevel')
  54. except Exception as e:
  55. print("ERR loading s3_download_config.ini", str(e))
  56. input('PRESS ENTER TO QUIT')
  57. sys.exit(0)
  58. if gui:
  59. # For GUI
  60. from tkinter import Tk, filedialog, END, StringVar, BooleanVar, messagebox
  61. from tkinter.ttk import Combobox, Label, Button, Entry, Spinbox, Checkbutton
  62. # get profile name list in ./aws/credentials
  63. pro_conf = RawConfigParser()
  64. pro_path = os.path.join(os.path.expanduser("~"), ".aws")
  65. cre_path = os.path.join(pro_path, "credentials")
  66. if os.path.exists(cre_path):
  67. pro_conf.read(cre_path)
  68. profile_list = pro_conf.sections()
  69. else:
  70. print(f"There is no aws_access_key in {cre_path}, please input for S3 Bucket: ")
  71. os.mkdir(pro_path)
  72. aws_access_key_id = input('aws_access_key_id: ')
  73. aws_secret_access_key = input('aws_secret_access_key: ')
  74. region = input('region: ')
  75. pro_conf.add_section('default')
  76. pro_conf['default']['aws_access_key_id'] = aws_access_key_id
  77. pro_conf['default']['aws_secret_access_key'] = aws_secret_access_key
  78. pro_conf['default']['region'] = region
  79. profile_list = ['default']
  80. with open(cre_path, 'w') as f:
  81. print(f"Saving credentials to {cre_path}")
  82. pro_conf.write(f)
  83. # Click Select Folder
  84. def browse_folder():
  85. local_dir = filedialog.askdirectory(initialdir=os.path.dirname(__file__))
  86. url_txt.delete(0, END)
  87. url_txt.insert(0, local_dir)
  88. file_txt.delete(0, END)
  89. file_txt.insert(0, "*")
  90. # Finsih browse folder
  91. # Click List Buckets
  92. def ListBuckets(*args):
  93. SrcProfileName = SrcProfileName_txt.get()
  94. client = Session(profile_name=SrcProfileName).client('s3')
  95. bucket_list = []
  96. try:
  97. response = client.list_buckets()
  98. if 'Buckets' in response:
  99. bucket_list = [b['Name'] for b in response['Buckets']]
  100. except Exception as e:
  101. messagebox.showerror('Error', f'Failt to List buckets. \n'
  102. f'Please verify your aws_access_key of profile: [{SrcProfileName}]\n'
  103. f'{str(e)}')
  104. bucket_list = ['CAN_NOT_GET_BUCKET_LIST']
  105. SrcBucket_txt['values'] = bucket_list
  106. SrcBucket_txt.current(0)
  107. # Finish ListBuckets
  108. # Click List Prefix
  109. def ListPrefix(*args):
  110. SrcProfileName = SrcProfileName_txt.get()
  111. client = Session(profile_name=SrcProfileName).client('s3')
  112. prefix_list = []
  113. this_bucket = SrcBucket_txt.get()
  114. max_get = 100
  115. try:
  116. response = client.list_objects_v2(
  117. Bucket=this_bucket,
  118. Delimiter='/'
  119. ) # Only get the max 1000 prefix for simply list
  120. if 'CommonPrefixes' in response:
  121. prefix_list = [c['Prefix'] for c in response['CommonPrefixes']]
  122. if not prefix_list:
  123. messagebox.showinfo('Message', f'There is no "/" Prefix in: {this_bucket}')
  124. if response['IsTruncated']:
  125. messagebox.showinfo('Message', f'More than {max_get} Prefix, cannot fully list here.')
  126. except Exception as e:
  127. messagebox.showinfo('Error', f'Cannot get prefix list from bucket: {this_bucket}, {str(e)}')
  128. S3Prefix_txt['values'] = prefix_list
  129. S3Prefix_txt.current(0)
  130. # Finish list prefix
  131. def browse_file(*args):
  132. SrcProfileName = SrcProfileName_txt.get()
  133. S3Prefix = S3Prefix_txt.get()
  134. client = Session(profile_name=SrcProfileName).client('s3')
  135. file_list = []
  136. this_bucket = SrcBucket_txt.get()
  137. max_get = 100
  138. try:
  139. response = client.list_objects_v2(
  140. Bucket=this_bucket,
  141. Prefix=str(PurePosixPath(S3Prefix))+'/',
  142. Delimiter='/'
  143. ) # Only get the max 1000 files for simply list
  144. # For delete prefix in des_prefix
  145. if S3Prefix == '' or S3Prefix == '/':
  146. # 目的bucket没有设置 Prefix
  147. dp_len = 0
  148. else:
  149. # 目的bucket的 "prefix/"长度
  150. dp_len = len(str(PurePosixPath(S3Prefix)))+1
  151. if 'Contents' in response:
  152. file_list = [c['Key'][dp_len:] for c in response['Contents']] # 去掉Prefix
  153. if not file_list:
  154. messagebox.showinfo('Message', f'There is no files in s3://{this_bucket}/{S3Prefix}')
  155. if response['IsTruncated']:
  156. messagebox.showinfo('Message', f'More than {max_get} files, cannot fully list here.')
  157. except Exception as e:
  158. messagebox.showinfo('Error', f'Cannot get file list from bucket s3://{this_bucket}/{S3Prefix}, {str(e)}')
  159. file_txt['values'] = file_list
  160. file_txt.current(0)
  161. # Finish list files
  162. # Click START button
  163. def close():
  164. window.withdraw()
  165. ok = messagebox.askokcancel('Start downloading job',
  166. f'DOWNLOAD FROM s3://{SrcBucket_txt.get()}/{S3Prefix_txt.get()}\n'
  167. f'TO LOCAL {url_txt.get()}\n'
  168. f'Click OK to START')
  169. if not ok:
  170. window.deiconify()
  171. return
  172. window.quit()
  173. return
  174. # Finish close()
  175. # Start GUI
  176. window = Tk()
  177. window.title("LONGBOW - AMAZON S3 DOWNLOAD TOOL WITH BREAK-POINT RESUMING")
  178. window.geometry('705x350')
  179. window.configure(background='#ECECEC')
  180. window.protocol("WM_DELETE_WINDOW", sys.exit)
  181. Label(window, text="S3 Bucket").grid(column=0, row=1, sticky='w', padx=2, pady=2)
  182. SrcBucket_txt = Combobox(window, width=48)
  183. SrcBucket_txt.grid(column=1, row=1, sticky='w', padx=2, pady=2)
  184. SrcBucket_txt['values'] = SrcBucket
  185. SrcBucket_txt.current(0)
  186. Button(window, text="List Buckets", width=10, command=ListBuckets) \
  187. .grid(column=2, row=1, sticky='w', padx=2, pady=2)
  188. Label(window, text="S3 Prefix").grid(column=0, row=2, sticky='w', padx=2, pady=2)
  189. S3Prefix_txt = Combobox(window, width=48)
  190. S3Prefix_txt.grid(column=1, row=2, sticky='w', padx=2, pady=2)
  191. S3Prefix_txt['values'] = S3Prefix
  192. if S3Prefix != '':
  193. S3Prefix_txt.current(0)
  194. Button(window, text="List Prefix", width=10, command=ListPrefix) \
  195. .grid(column=2, row=2, sticky='w', padx=2, pady=2)
  196. Label(window, text="Filename or *").grid(column=0, row=3, sticky='w', padx=2, pady=2)
  197. file_txt = Combobox(window, width=48)
  198. file_txt.grid(column=1, row=3, sticky='w', padx=2, pady=2)
  199. file_txt['values'] = SrcFileIndex
  200. if SrcFileIndex != '':
  201. file_txt.current(0)
  202. Button(window, text="Select File", width=10, command=browse_file) \
  203. .grid(column=2, row=3, sticky='w', padx=2, pady=2)
  204. Label(window, text="AWS Profile").grid(column=0, row=4, sticky='w', padx=2, pady=2)
  205. SrcProfileName_txt = Combobox(window, width=15, state="readonly")
  206. SrcProfileName_txt['values'] = tuple(profile_list)
  207. SrcProfileName_txt.grid(column=1, row=4, sticky='w', padx=2, pady=2)
  208. if SrcProfileName in profile_list:
  209. position = profile_list.index(SrcProfileName)
  210. SrcProfileName_txt.current(position)
  211. else:
  212. SrcProfileName_txt.current(0)
  213. SrcProfileName = SrcProfileName_txt.get()
  214. SrcProfileName_txt.bind("<<ComboboxSelected>>", ListBuckets)
  215. Label(window, text="Folder").grid(column=0, row=5, sticky='w', padx=2, pady=2)
  216. url_txt = Entry(window, width=50)
  217. url_txt.grid(column=1, row=5, sticky='w', padx=2, pady=2)
  218. url_btn = Button(window, text="Select Folder", width=10, command=browse_folder)
  219. url_btn.grid(column=2, row=5, sticky='w', padx=2, pady=2)
  220. url_txt.insert(0, DesDir)
  221. Label(window, text="MaxThread/File").grid(column=0, row=6, sticky='w', padx=2, pady=2)
  222. if MaxThread < 1 or MaxThread > 100:
  223. MaxThread = 5
  224. var_t = StringVar()
  225. var_t.set(str(MaxThread))
  226. MaxThread_txt = Spinbox(window, from_=1, to=100, width=15, textvariable=var_t)
  227. MaxThread_txt.grid(column=1, row=6, sticky='w', padx=2, pady=2)
  228. Label(window, text="MaxParallelFile").grid(column=0, row=7, sticky='w', padx=2, pady=2)
  229. if MaxParallelFile < 1 or MaxParallelFile > 100:
  230. MaxParallelFile = 5
  231. var_f = StringVar()
  232. var_f.set(str(MaxParallelFile))
  233. MaxParallelFile_txt = Spinbox(window, from_=1, to=100, width=15, textvariable=var_f)
  234. MaxParallelFile_txt.grid(column=1, row=7, sticky='w', padx=2, pady=2)
  235. save_config = BooleanVar()
  236. save_config.set(True)
  237. save_config_txt = Checkbutton(window, text="Save to s3_download_config.ini", var=save_config)
  238. save_config_txt.grid(column=1, row=9, padx=2, pady=2)
  239. Button(window, text="Start Download", width=15, command=close).grid(column=1, row=10, padx=5, pady=5)
  240. window.mainloop()
  241. DesDir = url_txt.get()
  242. SrcFileIndex = file_txt.get()
  243. SrcBucket = SrcBucket_txt.get()
  244. S3Prefix = S3Prefix_txt.get()
  245. SrcProfileName = SrcProfileName_txt.get()
  246. MaxThread = int(MaxThread_txt.get())
  247. MaxParallelFile = int(MaxParallelFile_txt.get())
  248. if save_config:
  249. cfg['Basic']['SrcBucket'] = SrcBucket
  250. cfg['Basic']['S3Prefix'] = S3Prefix
  251. cfg['Basic']['SrcFileIndex'] = SrcFileIndex
  252. cfg['Basic']['SrcProfileName'] = SrcProfileName
  253. cfg['Basic']['DesDir'] = DesDir
  254. cfg['Advanced']['MaxThread'] = str(MaxThread)
  255. cfg['Advanced']['MaxParallelFile'] = str(MaxParallelFile)
  256. config_file = os.path.join(file_path, 's3_download_config.ini')
  257. with codecs.open(config_file, 'w', 'utf-8') as f:
  258. cfg.write(f)
  259. print(f"Save config to {config_file}")
  260. # GUI window finish
  261. if S3Prefix == '/':
  262. S3Prefix = ''
  263. # Finish set_config()
  264. return ChunkSize
  265. # Configure logging
  266. def set_log():
  267. logger = logging.getLogger()
  268. # File logging
  269. if not os.path.exists("./log"):
  270. os.mkdir("log")
  271. this_file_name = os.path.splitext(os.path.basename(__file__))[0]
  272. file_time = datetime.datetime.now().isoformat().replace(':', '-')[:19]
  273. log_file_name = './log/' + this_file_name + '-' + file_time + '.log'
  274. print('Logging to file:', os.path.abspath(log_file_name))
  275. print('Logging level:', LoggingLevel)
  276. fileHandler = logging.FileHandler(filename=log_file_name, encoding='utf-8')
  277. fileHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
  278. logger.addHandler(fileHandler)
  279. # Screen stream logging
  280. streamHandler = logging.StreamHandler()
  281. streamHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
  282. logger.addHandler(streamHandler)
  283. # Loggin Level
  284. logger.setLevel(logging.WARNING)
  285. if LoggingLevel == 'INFO':
  286. logger.setLevel(logging.INFO)
  287. elif LoggingLevel == 'DEBUG':
  288. logger.setLevel(logging.DEBUG)
  289. return logger, log_file_name
  290. # Get object list on S3
  291. def get_s3_file_list(s3_client, bucket):
  292. logger.info('Get s3 file list ' + bucket)
  293. paginator = s3_client.get_paginator('list_objects_v2')
  294. __des_file_list = []
  295. try:
  296. response_iterator = paginator.paginate(
  297. Bucket=bucket,
  298. Prefix=S3Prefix
  299. )
  300. for page in response_iterator:
  301. if "Contents" in page:
  302. for n in page["Contents"]:
  303. key = n["Key"]
  304. __des_file_list.append({
  305. "Key": key,
  306. "Size": n["Size"]
  307. })
  308. logger.info(f'Bucket list length:{str(len(__des_file_list))}')
  309. except Exception as err:
  310. logger.error(str(err))
  311. input('PRESS ENTER TO QUIT')
  312. sys.exit(0)
  313. return __des_file_list
  314. # Check single file on S3
  315. def head_s3_single_file(s3_client, bucket):
  316. try:
  317. response_fileList = s3_client.head_object(
  318. Bucket=bucket,
  319. Key=str(PurePosixPath(S3Prefix)/SrcFileIndex)
  320. )
  321. file = [{
  322. "Key": str(PurePosixPath(S3Prefix)/SrcFileIndex),
  323. "Size": response_fileList["ContentLength"]
  324. }]
  325. except Exception as err:
  326. logger.error(str(err))
  327. input('PRESS ENTER TO QUIT')
  328. sys.exit(0)
  329. return file
  330. # split the file into a virtual part list of index, each index is the start point of the file
  331. def split(srcfile, ChunkSize):
  332. partnumber = 1
  333. indexList = [0]
  334. if int(srcfile["Size"] / ChunkSize) + 1 > 10000:
  335. ChunkSize = int(srcfile["Size"] / 10000) + 1024 # 对于大于10000分片的大文件,自动调整Chunksize
  336. logger.info(f'Size excess 10000 parts limit. Auto change ChunkSize to {ChunkSize}')
  337. while ChunkSize * partnumber < srcfile["Size"]: # 如果刚好是"=",则无需再分下一part,所以这里不能用"<="
  338. indexList.append(ChunkSize * partnumber)
  339. partnumber += 1
  340. return indexList, ChunkSize
  341. def size_to_str(size):
  342. def loop(integer, remainder, level):
  343. if integer >= 1024:
  344. remainder = integer % 1024
  345. integer //= 1024
  346. level += 1
  347. return loop(integer, remainder, level)
  348. else:
  349. return integer, round(remainder / 1024, 1), level
  350. units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
  351. integer, remainder, level = loop(int(size), 0, 0)
  352. if level+1 > len(units):
  353. level = -1
  354. return f'{integer+remainder} {units[level]}'
  355. def download_thread(partnumber, partStartIndex, srcfileKey, total, complete_list, ChunkSize, wfile):
  356. try:
  357. logger.info(f'Downloading {srcfileKey} - {partnumber}/{total}')
  358. pstart_time = time.time()
  359. response_get_object = s3_src_client.get_object(
  360. Bucket=SrcBucket,
  361. Key=srcfileKey,
  362. Range="bytes=" + str(partStartIndex) + "-" + str(partStartIndex + ChunkSize - 1)
  363. )
  364. getBody = response_get_object["Body"].read()
  365. complete_list.append(partnumber)
  366. pload_time = time.time() - pstart_time
  367. pload_bytes = len(getBody)
  368. pload_speed = size_to_str(int(pload_bytes/pload_time)) + "/s"
  369. # 写入文件
  370. wfile.seek(partStartIndex)
  371. wfile.write(getBody)
  372. print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
  373. f'- {partnumber}/{total}\033[0;34;1m {len(complete_list) / total:.2%} - {pload_speed}\033[0m')
  374. # 写入partnumber数据库
  375. dir_and_key = Path(DesDir) / srcfileKey
  376. try:
  377. with sqlite3.connect('s3_download.db') as db:
  378. cursor = db.cursor()
  379. uuid1 = uuid.uuid1()
  380. cursor.execute(f"INSERT INTO S3P (ID, BUCKET, KEY, PARTNUMBER) "
  381. f"VALUES ('{uuid1}', '{SrcBucket}', '{dir_and_key.as_uri()}', {partnumber})")
  382. db.commit()
  383. logger.info(f'Download part completed. Write to DB {srcfileKey} - {partnumber}/{total}')
  384. except Exception as e:
  385. logger.warning(f'Fail to insert DB: {dir_and_key.as_uri()}, {str(e)}')
  386. except Exception as e:
  387. logger.warning(f'Fail to download {srcfileKey} - {partnumber}/{total}. {str(e)}')
  388. return
  389. def download_part(indexList, partnumberList, srcfile, ChunkSize_auto, wfile):
  390. partnumber = 1 # 当前循环要上传的Partnumber
  391. total = len(indexList)
  392. complete_list = []
  393. # 线程池Start
  394. with futures.ThreadPoolExecutor(max_workers=MaxThread) as pool:
  395. for partStartIndex in indexList:
  396. # start to download part
  397. if partnumber not in partnumberList:
  398. pool.submit(download_thread, partnumber, partStartIndex, srcfile["Key"], total,
  399. complete_list, ChunkSize_auto, wfile)
  400. else:
  401. complete_list.append(partnumber)
  402. partnumber += 1
  403. # 线程池End
  404. logger.info(f'All parts downloaded - {srcfile["Key"]} - size: {srcfile["Size"]}')
  405. return
  406. # 创建文件目录结构
  407. def create_dir(file_dir):
  408. parent = file_dir.parent
  409. if not Path.exists(parent):
  410. create_dir(parent)
  411. try:
  412. Path.mkdir(file_dir)
  413. except Exception as e:
  414. logger.error(f'Fail to mkdir {str(e)}')
  415. # Download file
  416. def download_file(srcfile, ChunkSize_default):
  417. logger.info(f'Start file: {srcfile["Key"]}')
  418. dir_and_key = Path(DesDir) / srcfile["Key"]
  419. if Path.exists(dir_and_key):
  420. if dir_and_key.stat().st_size == srcfile["Size"] or dir_and_key.is_dir():
  421. logger.info(f'Duplicated: {dir_and_key.as_uri()} same size, goto next file.')
  422. return
  423. # 创建文件目录结构
  424. path = dir_and_key.parent
  425. if not Path.exists(path):
  426. create_dir(path)
  427. # 如果是子目录就跳过下载
  428. if srcfile["Key"][-1] == '/':
  429. Path.mkdir(dir_and_key)
  430. logger.info(f'Create empty subfolder: {dir_and_key.as_uri()}')
  431. return
  432. # 获取已下载的 part number list
  433. partnumberList = []
  434. try:
  435. with sqlite3.connect('s3_download.db') as db:
  436. cursor = db.cursor()
  437. p_sql = cursor.execute(f"SELECT PARTNUMBER FROM S3P WHERE BUCKET='{SrcBucket}' AND KEY='{dir_and_key.as_uri()}'")
  438. db.commit()
  439. partnumberList = [d[0] for d in p_sql]
  440. logger.info(f'Got partnumberList {dir_and_key.as_uri()} - {json.dumps(partnumberList)}')
  441. except Exception as e:
  442. logger.error(f'Fail to select partnumber from DB. {str(e)}')
  443. # 获取索引列表,例如[0, 10, 20]
  444. indexList, ChunkSize_auto = split(srcfile, ChunkSize_default)
  445. # 执行download
  446. s3tmp_name = dir_and_key.with_suffix('.s3tmp')
  447. if Path.exists(s3tmp_name):
  448. mode = 'r+b'
  449. else:
  450. # 如果没有临时文件,或被删除了,则新建文件并将partnumberList清空
  451. mode = 'wb'
  452. partnumberList = []
  453. with open(s3tmp_name, mode) as wfile:
  454. download_part(indexList, partnumberList, srcfile, ChunkSize_auto, wfile)
  455. # 修改文件名.s3part,清理partnumber数据库
  456. s3tmp_name.rename(dir_and_key)
  457. try:
  458. with sqlite3.connect('s3_download.db') as db:
  459. cursor = db.cursor()
  460. cursor.execute(f"DELETE FROM S3P WHERE BUCKET='{SrcBucket}' AND KEY='{dir_and_key.as_uri()}'")
  461. db.commit()
  462. except Exception as e:
  463. logger.warning(f'Fail to clean DB: {dir_and_key.as_uri()}. {str(e)}')
  464. logger.info(f'Finsh: {srcfile["Key"]} TO {dir_and_key.as_uri()}')
  465. return
  466. # Compare local file list and s3 list
  467. def compare_local_to_s3():
  468. logger.info('Comparing destination and source ...')
  469. if SrcFileIndex == "*":
  470. s3Filelist = get_s3_file_list(s3_src_client, SrcBucket)
  471. else:
  472. s3Filelist = head_s3_single_file(s3_src_client, SrcBucket)
  473. deltaList = []
  474. for srcfile in s3Filelist:
  475. dir_and_key = Path(DesDir) / srcfile["Key"]
  476. # 文件不存在
  477. if not Path.exists(dir_and_key):
  478. deltaList.append(srcfile)
  479. continue
  480. # 文件大小
  481. if srcfile["Key"][-1] != '/':
  482. if srcfile["Size"] != dir_and_key.stat().st_size:
  483. deltaList.append(srcfile)
  484. continue
  485. if not deltaList:
  486. logger.info('All source files are in destination, job well done.')
  487. else:
  488. logger.warning(f'There are {len(deltaList)} files not in destination or not the same size. List:')
  489. logger.warning(str(deltaList))
  490. return
  491. # Main
  492. if __name__ == '__main__':
  493. start_time = datetime.datetime.now()
  494. ChunkSize_default = set_config()
  495. logger, log_file_name = set_log()
  496. # Define s3 client
  497. s3_config = Config(max_pool_connections=200, retries={'max_attempts': MaxRetry})
  498. s3_src_client = Session(profile_name=SrcProfileName).client('s3', config=s3_config)
  499. # Define DB table
  500. with sqlite3.connect('s3_download.db') as db:
  501. cursor = db.cursor()
  502. cursor.execute("CREATE TABLE IF NOT EXISTS S3P "
  503. "(ID TEXT PRIMARY KEY, "
  504. "BUCKET TEXT, "
  505. "KEY TEXT, "
  506. "PARTNUMBER INTEGER)")
  507. db.commit()
  508. # 获取源文件列表
  509. logger.info('Get source file list')
  510. if SrcFileIndex == "*":
  511. src_file_list = get_s3_file_list(s3_src_client, SrcBucket)
  512. else:
  513. src_file_list = head_s3_single_file(s3_src_client, SrcBucket)
  514. # 对文件列表中的逐个文件进行下载操作
  515. with futures.ThreadPoolExecutor(max_workers=MaxParallelFile) as file_pool:
  516. for src_file in src_file_list:
  517. file_pool.submit(download_file, src_file, ChunkSize_default)
  518. # 再次获取源文件列表和目标文件夹现存文件列表进行比较,每个文件大小一致,输出比较结果
  519. time_str = str(datetime.datetime.now() - start_time)
  520. compare_local_to_s3()
  521. print(f'\033[0;34;1mMISSION ACCOMPLISHED - Time: {time_str} \033[0m - FROM: {SrcBucket}/{S3Prefix} TO {DesDir}')
  522. print('Logged to file:', os.path.abspath(log_file_name))
  523. input('PRESS ENTER TO QUIT')