123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588 |
- # -*- coding: utf-8 -*-
- # PROJECT LONGBOW - AMAZON S3 DOWNLOAD TOOL WITH BREAK-POINT RESUMING
- import os
- import sys
- import json
- from boto3.session import Session
- from botocore.client import Config
- from concurrent import futures
- from configparser import ConfigParser, RawConfigParser, NoOptionError
- import uuid
- import datetime
- import logging
- from pathlib import PurePosixPath, Path
- import platform
- import codecs
- import sqlite3
- import time
- os.system("") # workaround for some windows system to print color
- global SrcBucket, S3Prefix, SrcFileIndex, SrcProfileName, DesDir, MaxRetry, MaxThread, MaxParallelFile, LoggingLevel
- # Read config.ini with GUI
- def set_config():
- sys_para = sys.argv
- file_path = os.path.split(sys_para[0])[0]
- gui = False
- if platform.uname()[0] == 'Windows': # Win默认打开
- gui = True
- if platform.uname()[0] == 'Linux': # Linux 默认关闭
- gui = False
- if '--gui' in sys.argv: # 指定 gui 模式
- gui = True
- if '--nogui' in sys.argv: # 带 nogui 就覆盖前面Win打开要求
- gui = False
- config_file = os.path.join(file_path, 's3_download_config.ini')
- # If no config file, read the default config
- if not os.path.exists(config_file):
- config_file += '.default'
- print("No customized config, use the default config")
- cfg = ConfigParser()
- print(f'Reading config file: {config_file}')
- try:
- global SrcBucket, S3Prefix, SrcFileIndex, SrcProfileName, DesDir, MaxRetry, MaxThread, MaxParallelFile, LoggingLevel
- cfg.read(config_file, encoding='utf-8-sig')
- SrcBucket = cfg.get('Basic', 'SrcBucket')
- S3Prefix = cfg.get('Basic', 'S3Prefix')
- SrcFileIndex = cfg.get('Basic', 'SrcFileIndex')
- SrcProfileName = cfg.get('Basic', 'SrcProfileName')
- DesDir = cfg.get('Basic', 'DesDir')
- Megabytes = 1024 * 1024
- ChunkSize = cfg.getint('Advanced', 'ChunkSize') * Megabytes
- MaxRetry = cfg.getint('Advanced', 'MaxRetry')
- MaxThread = cfg.getint('Advanced', 'MaxThread')
- MaxParallelFile = cfg.getint('Advanced', 'MaxParallelFile')
- LoggingLevel = cfg.get('Advanced', 'LoggingLevel')
- except Exception as e:
- print("ERR loading s3_download_config.ini", str(e))
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- if gui:
- # For GUI
- from tkinter import Tk, filedialog, END, StringVar, BooleanVar, messagebox
- from tkinter.ttk import Combobox, Label, Button, Entry, Spinbox, Checkbutton
- # get profile name list in ./aws/credentials
- pro_conf = RawConfigParser()
- pro_path = os.path.join(os.path.expanduser("~"), ".aws")
- cre_path = os.path.join(pro_path, "credentials")
- if os.path.exists(cre_path):
- pro_conf.read(cre_path)
- profile_list = pro_conf.sections()
- else:
- print(f"There is no aws_access_key in {cre_path}, please input for S3 Bucket: ")
- os.mkdir(pro_path)
- aws_access_key_id = input('aws_access_key_id: ')
- aws_secret_access_key = input('aws_secret_access_key: ')
- region = input('region: ')
- pro_conf.add_section('default')
- pro_conf['default']['aws_access_key_id'] = aws_access_key_id
- pro_conf['default']['aws_secret_access_key'] = aws_secret_access_key
- pro_conf['default']['region'] = region
- profile_list = ['default']
- with open(cre_path, 'w') as f:
- print(f"Saving credentials to {cre_path}")
- pro_conf.write(f)
- # Click Select Folder
- def browse_folder():
- local_dir = filedialog.askdirectory(initialdir=os.path.dirname(__file__))
- url_txt.delete(0, END)
- url_txt.insert(0, local_dir)
- file_txt.delete(0, END)
- file_txt.insert(0, "*")
- # Finsih browse folder
- # Click List Buckets
- def ListBuckets(*args):
- SrcProfileName = SrcProfileName_txt.get()
- client = Session(profile_name=SrcProfileName).client('s3')
- bucket_list = []
- try:
- response = client.list_buckets()
- if 'Buckets' in response:
- bucket_list = [b['Name'] for b in response['Buckets']]
- except Exception as e:
- messagebox.showerror('Error', f'Failt to List buckets. \n'
- f'Please verify your aws_access_key of profile: [{SrcProfileName}]\n'
- f'{str(e)}')
- bucket_list = ['CAN_NOT_GET_BUCKET_LIST']
- SrcBucket_txt['values'] = bucket_list
- SrcBucket_txt.current(0)
- # Finish ListBuckets
- # Click List Prefix
- def ListPrefix(*args):
- SrcProfileName = SrcProfileName_txt.get()
- client = Session(profile_name=SrcProfileName).client('s3')
- prefix_list = []
- this_bucket = SrcBucket_txt.get()
- max_get = 100
- try:
- response = client.list_objects_v2(
- Bucket=this_bucket,
- Delimiter='/'
- ) # Only get the max 1000 prefix for simply list
- if 'CommonPrefixes' in response:
- prefix_list = [c['Prefix'] for c in response['CommonPrefixes']]
- if not prefix_list:
- messagebox.showinfo('Message', f'There is no "/" Prefix in: {this_bucket}')
- if response['IsTruncated']:
- messagebox.showinfo('Message', f'More than {max_get} Prefix, cannot fully list here.')
- except Exception as e:
- messagebox.showinfo('Error', f'Cannot get prefix list from bucket: {this_bucket}, {str(e)}')
- S3Prefix_txt['values'] = prefix_list
- S3Prefix_txt.current(0)
- # Finish list prefix
- def browse_file(*args):
- SrcProfileName = SrcProfileName_txt.get()
- S3Prefix = S3Prefix_txt.get()
- client = Session(profile_name=SrcProfileName).client('s3')
- file_list = []
- this_bucket = SrcBucket_txt.get()
- max_get = 100
- try:
- response = client.list_objects_v2(
- Bucket=this_bucket,
- Prefix=str(PurePosixPath(S3Prefix))+'/',
- Delimiter='/'
- ) # Only get the max 1000 files for simply list
- # For delete prefix in des_prefix
- if S3Prefix == '' or S3Prefix == '/':
- # 目的bucket没有设置 Prefix
- dp_len = 0
- else:
- # 目的bucket的 "prefix/"长度
- dp_len = len(str(PurePosixPath(S3Prefix)))+1
- if 'Contents' in response:
- file_list = [c['Key'][dp_len:] for c in response['Contents']] # 去掉Prefix
- if not file_list:
- messagebox.showinfo('Message', f'There is no files in s3://{this_bucket}/{S3Prefix}')
- if response['IsTruncated']:
- messagebox.showinfo('Message', f'More than {max_get} files, cannot fully list here.')
- except Exception as e:
- messagebox.showinfo('Error', f'Cannot get file list from bucket s3://{this_bucket}/{S3Prefix}, {str(e)}')
- file_txt['values'] = file_list
- file_txt.current(0)
- # Finish list files
- # Click START button
- def close():
- window.withdraw()
- ok = messagebox.askokcancel('Start downloading job',
- f'DOWNLOAD FROM s3://{SrcBucket_txt.get()}/{S3Prefix_txt.get()}\n'
- f'TO LOCAL {url_txt.get()}\n'
- f'Click OK to START')
- if not ok:
- window.deiconify()
- return
- window.quit()
- return
- # Finish close()
- # Start GUI
- window = Tk()
- window.title("LONGBOW - AMAZON S3 DOWNLOAD TOOL WITH BREAK-POINT RESUMING")
- window.geometry('705x350')
- window.configure(background='#ECECEC')
- window.protocol("WM_DELETE_WINDOW", sys.exit)
- Label(window, text="S3 Bucket").grid(column=0, row=1, sticky='w', padx=2, pady=2)
- SrcBucket_txt = Combobox(window, width=48)
- SrcBucket_txt.grid(column=1, row=1, sticky='w', padx=2, pady=2)
- SrcBucket_txt['values'] = SrcBucket
- SrcBucket_txt.current(0)
- Button(window, text="List Buckets", width=10, command=ListBuckets) \
- .grid(column=2, row=1, sticky='w', padx=2, pady=2)
- Label(window, text="S3 Prefix").grid(column=0, row=2, sticky='w', padx=2, pady=2)
- S3Prefix_txt = Combobox(window, width=48)
- S3Prefix_txt.grid(column=1, row=2, sticky='w', padx=2, pady=2)
- S3Prefix_txt['values'] = S3Prefix
- if S3Prefix != '':
- S3Prefix_txt.current(0)
- Button(window, text="List Prefix", width=10, command=ListPrefix) \
- .grid(column=2, row=2, sticky='w', padx=2, pady=2)
- Label(window, text="Filename or *").grid(column=0, row=3, sticky='w', padx=2, pady=2)
- file_txt = Combobox(window, width=48)
- file_txt.grid(column=1, row=3, sticky='w', padx=2, pady=2)
- file_txt['values'] = SrcFileIndex
- if SrcFileIndex != '':
- file_txt.current(0)
- Button(window, text="Select File", width=10, command=browse_file) \
- .grid(column=2, row=3, sticky='w', padx=2, pady=2)
- Label(window, text="AWS Profile").grid(column=0, row=4, sticky='w', padx=2, pady=2)
- SrcProfileName_txt = Combobox(window, width=15, state="readonly")
- SrcProfileName_txt['values'] = tuple(profile_list)
- SrcProfileName_txt.grid(column=1, row=4, sticky='w', padx=2, pady=2)
- if SrcProfileName in profile_list:
- position = profile_list.index(SrcProfileName)
- SrcProfileName_txt.current(position)
- else:
- SrcProfileName_txt.current(0)
- SrcProfileName = SrcProfileName_txt.get()
- SrcProfileName_txt.bind("<<ComboboxSelected>>", ListBuckets)
- Label(window, text="Folder").grid(column=0, row=5, sticky='w', padx=2, pady=2)
- url_txt = Entry(window, width=50)
- url_txt.grid(column=1, row=5, sticky='w', padx=2, pady=2)
- url_btn = Button(window, text="Select Folder", width=10, command=browse_folder)
- url_btn.grid(column=2, row=5, sticky='w', padx=2, pady=2)
- url_txt.insert(0, DesDir)
- Label(window, text="MaxThread/File").grid(column=0, row=6, sticky='w', padx=2, pady=2)
- if MaxThread < 1 or MaxThread > 100:
- MaxThread = 5
- var_t = StringVar()
- var_t.set(str(MaxThread))
- MaxThread_txt = Spinbox(window, from_=1, to=100, width=15, textvariable=var_t)
- MaxThread_txt.grid(column=1, row=6, sticky='w', padx=2, pady=2)
- Label(window, text="MaxParallelFile").grid(column=0, row=7, sticky='w', padx=2, pady=2)
- if MaxParallelFile < 1 or MaxParallelFile > 100:
- MaxParallelFile = 5
- var_f = StringVar()
- var_f.set(str(MaxParallelFile))
- MaxParallelFile_txt = Spinbox(window, from_=1, to=100, width=15, textvariable=var_f)
- MaxParallelFile_txt.grid(column=1, row=7, sticky='w', padx=2, pady=2)
- save_config = BooleanVar()
- save_config.set(True)
- save_config_txt = Checkbutton(window, text="Save to s3_download_config.ini", var=save_config)
- save_config_txt.grid(column=1, row=9, padx=2, pady=2)
- Button(window, text="Start Download", width=15, command=close).grid(column=1, row=10, padx=5, pady=5)
- window.mainloop()
- DesDir = url_txt.get()
- SrcFileIndex = file_txt.get()
- SrcBucket = SrcBucket_txt.get()
- S3Prefix = S3Prefix_txt.get()
- SrcProfileName = SrcProfileName_txt.get()
- MaxThread = int(MaxThread_txt.get())
- MaxParallelFile = int(MaxParallelFile_txt.get())
- if save_config:
- cfg['Basic']['SrcBucket'] = SrcBucket
- cfg['Basic']['S3Prefix'] = S3Prefix
- cfg['Basic']['SrcFileIndex'] = SrcFileIndex
- cfg['Basic']['SrcProfileName'] = SrcProfileName
- cfg['Basic']['DesDir'] = DesDir
- cfg['Advanced']['MaxThread'] = str(MaxThread)
- cfg['Advanced']['MaxParallelFile'] = str(MaxParallelFile)
- config_file = os.path.join(file_path, 's3_download_config.ini')
- with codecs.open(config_file, 'w', 'utf-8') as f:
- cfg.write(f)
- print(f"Save config to {config_file}")
- # GUI window finish
- if S3Prefix == '/':
- S3Prefix = ''
- # Finish set_config()
- return ChunkSize
- # Configure logging
- def set_log():
- logger = logging.getLogger()
- # File logging
- if not os.path.exists("./log"):
- os.mkdir("log")
- this_file_name = os.path.splitext(os.path.basename(__file__))[0]
- file_time = datetime.datetime.now().isoformat().replace(':', '-')[:19]
- log_file_name = './log/' + this_file_name + '-' + file_time + '.log'
- print('Logging to file:', os.path.abspath(log_file_name))
- print('Logging level:', LoggingLevel)
- fileHandler = logging.FileHandler(filename=log_file_name, encoding='utf-8')
- fileHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
- logger.addHandler(fileHandler)
- # Screen stream logging
- streamHandler = logging.StreamHandler()
- streamHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
- logger.addHandler(streamHandler)
- # Loggin Level
- logger.setLevel(logging.WARNING)
- if LoggingLevel == 'INFO':
- logger.setLevel(logging.INFO)
- elif LoggingLevel == 'DEBUG':
- logger.setLevel(logging.DEBUG)
- return logger, log_file_name
- # Get object list on S3
- def get_s3_file_list(s3_client, bucket):
- logger.info('Get s3 file list ' + bucket)
- paginator = s3_client.get_paginator('list_objects_v2')
- __des_file_list = []
- try:
- response_iterator = paginator.paginate(
- Bucket=bucket,
- Prefix=S3Prefix
- )
- for page in response_iterator:
- if "Contents" in page:
- for n in page["Contents"]:
- key = n["Key"]
- __des_file_list.append({
- "Key": key,
- "Size": n["Size"]
- })
- logger.info(f'Bucket list length:{str(len(__des_file_list))}')
- except Exception as err:
- logger.error(str(err))
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- return __des_file_list
- # Check single file on S3
- def head_s3_single_file(s3_client, bucket):
- try:
- response_fileList = s3_client.head_object(
- Bucket=bucket,
- Key=str(PurePosixPath(S3Prefix)/SrcFileIndex)
- )
- file = [{
- "Key": str(PurePosixPath(S3Prefix)/SrcFileIndex),
- "Size": response_fileList["ContentLength"]
- }]
- except Exception as err:
- logger.error(str(err))
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- return file
- # split the file into a virtual part list of index, each index is the start point of the file
- def split(srcfile, ChunkSize):
- partnumber = 1
- indexList = [0]
- if int(srcfile["Size"] / ChunkSize) + 1 > 10000:
- ChunkSize = int(srcfile["Size"] / 10000) + 1024 # 对于大于10000分片的大文件,自动调整Chunksize
- logger.info(f'Size excess 10000 parts limit. Auto change ChunkSize to {ChunkSize}')
- while ChunkSize * partnumber < srcfile["Size"]: # 如果刚好是"=",则无需再分下一part,所以这里不能用"<="
- indexList.append(ChunkSize * partnumber)
- partnumber += 1
- return indexList, ChunkSize
- def size_to_str(size):
- def loop(integer, remainder, level):
- if integer >= 1024:
- remainder = integer % 1024
- integer //= 1024
- level += 1
- return loop(integer, remainder, level)
- else:
- return integer, round(remainder / 1024, 1), level
- units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
- integer, remainder, level = loop(int(size), 0, 0)
- if level+1 > len(units):
- level = -1
- return f'{integer+remainder} {units[level]}'
- def download_thread(partnumber, partStartIndex, srcfileKey, total, complete_list, ChunkSize, wfile):
- try:
- logger.info(f'Downloading {srcfileKey} - {partnumber}/{total}')
- pstart_time = time.time()
- response_get_object = s3_src_client.get_object(
- Bucket=SrcBucket,
- Key=srcfileKey,
- Range="bytes=" + str(partStartIndex) + "-" + str(partStartIndex + ChunkSize - 1)
- )
- getBody = response_get_object["Body"].read()
- complete_list.append(partnumber)
- pload_time = time.time() - pstart_time
- pload_bytes = len(getBody)
- pload_speed = size_to_str(int(pload_bytes/pload_time)) + "/s"
- # 写入文件
- wfile.seek(partStartIndex)
- wfile.write(getBody)
- print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
- f'- {partnumber}/{total}\033[0;34;1m {len(complete_list) / total:.2%} - {pload_speed}\033[0m')
- # 写入partnumber数据库
- dir_and_key = Path(DesDir) / srcfileKey
- try:
- with sqlite3.connect('s3_download.db') as db:
- cursor = db.cursor()
- uuid1 = uuid.uuid1()
- cursor.execute(f"INSERT INTO S3P (ID, BUCKET, KEY, PARTNUMBER) "
- f"VALUES ('{uuid1}', '{SrcBucket}', '{dir_and_key.as_uri()}', {partnumber})")
- db.commit()
- logger.info(f'Download part completed. Write to DB {srcfileKey} - {partnumber}/{total}')
- except Exception as e:
- logger.warning(f'Fail to insert DB: {dir_and_key.as_uri()}, {str(e)}')
- except Exception as e:
- logger.warning(f'Fail to download {srcfileKey} - {partnumber}/{total}. {str(e)}')
- return
- def download_part(indexList, partnumberList, srcfile, ChunkSize_auto, wfile):
- partnumber = 1 # 当前循环要上传的Partnumber
- total = len(indexList)
- complete_list = []
- # 线程池Start
- with futures.ThreadPoolExecutor(max_workers=MaxThread) as pool:
- for partStartIndex in indexList:
- # start to download part
- if partnumber not in partnumberList:
- pool.submit(download_thread, partnumber, partStartIndex, srcfile["Key"], total,
- complete_list, ChunkSize_auto, wfile)
- else:
- complete_list.append(partnumber)
- partnumber += 1
- # 线程池End
- logger.info(f'All parts downloaded - {srcfile["Key"]} - size: {srcfile["Size"]}')
- return
- # 创建文件目录结构
- def create_dir(file_dir):
- parent = file_dir.parent
- if not Path.exists(parent):
- create_dir(parent)
- try:
- Path.mkdir(file_dir)
- except Exception as e:
- logger.error(f'Fail to mkdir {str(e)}')
- # Download file
- def download_file(srcfile, ChunkSize_default):
- logger.info(f'Start file: {srcfile["Key"]}')
- dir_and_key = Path(DesDir) / srcfile["Key"]
- if Path.exists(dir_and_key):
- if dir_and_key.stat().st_size == srcfile["Size"] or dir_and_key.is_dir():
- logger.info(f'Duplicated: {dir_and_key.as_uri()} same size, goto next file.')
- return
- # 创建文件目录结构
- path = dir_and_key.parent
- if not Path.exists(path):
- create_dir(path)
- # 如果是子目录就跳过下载
- if srcfile["Key"][-1] == '/':
- Path.mkdir(dir_and_key)
- logger.info(f'Create empty subfolder: {dir_and_key.as_uri()}')
- return
- # 获取已下载的 part number list
- partnumberList = []
- try:
- with sqlite3.connect('s3_download.db') as db:
- cursor = db.cursor()
- p_sql = cursor.execute(f"SELECT PARTNUMBER FROM S3P WHERE BUCKET='{SrcBucket}' AND KEY='{dir_and_key.as_uri()}'")
- db.commit()
- partnumberList = [d[0] for d in p_sql]
- logger.info(f'Got partnumberList {dir_and_key.as_uri()} - {json.dumps(partnumberList)}')
- except Exception as e:
- logger.error(f'Fail to select partnumber from DB. {str(e)}')
- # 获取索引列表,例如[0, 10, 20]
- indexList, ChunkSize_auto = split(srcfile, ChunkSize_default)
- # 执行download
- s3tmp_name = dir_and_key.with_suffix('.s3tmp')
- if Path.exists(s3tmp_name):
- mode = 'r+b'
- else:
- # 如果没有临时文件,或被删除了,则新建文件并将partnumberList清空
- mode = 'wb'
- partnumberList = []
- with open(s3tmp_name, mode) as wfile:
- download_part(indexList, partnumberList, srcfile, ChunkSize_auto, wfile)
- # 修改文件名.s3part,清理partnumber数据库
- s3tmp_name.rename(dir_and_key)
- try:
- with sqlite3.connect('s3_download.db') as db:
- cursor = db.cursor()
- cursor.execute(f"DELETE FROM S3P WHERE BUCKET='{SrcBucket}' AND KEY='{dir_and_key.as_uri()}'")
- db.commit()
- except Exception as e:
- logger.warning(f'Fail to clean DB: {dir_and_key.as_uri()}. {str(e)}')
- logger.info(f'Finsh: {srcfile["Key"]} TO {dir_and_key.as_uri()}')
- return
- # Compare local file list and s3 list
- def compare_local_to_s3():
- logger.info('Comparing destination and source ...')
- if SrcFileIndex == "*":
- s3Filelist = get_s3_file_list(s3_src_client, SrcBucket)
- else:
- s3Filelist = head_s3_single_file(s3_src_client, SrcBucket)
- deltaList = []
- for srcfile in s3Filelist:
- dir_and_key = Path(DesDir) / srcfile["Key"]
- # 文件不存在
- if not Path.exists(dir_and_key):
- deltaList.append(srcfile)
- continue
- # 文件大小
- if srcfile["Key"][-1] != '/':
- if srcfile["Size"] != dir_and_key.stat().st_size:
- deltaList.append(srcfile)
- continue
- if not deltaList:
- logger.info('All source files are in destination, job well done.')
- else:
- logger.warning(f'There are {len(deltaList)} files not in destination or not the same size. List:')
- logger.warning(str(deltaList))
- return
- # Main
- if __name__ == '__main__':
- start_time = datetime.datetime.now()
- ChunkSize_default = set_config()
- logger, log_file_name = set_log()
- # Define s3 client
- s3_config = Config(max_pool_connections=200, retries={'max_attempts': MaxRetry})
- s3_src_client = Session(profile_name=SrcProfileName).client('s3', config=s3_config)
- # Define DB table
- with sqlite3.connect('s3_download.db') as db:
- cursor = db.cursor()
- cursor.execute("CREATE TABLE IF NOT EXISTS S3P "
- "(ID TEXT PRIMARY KEY, "
- "BUCKET TEXT, "
- "KEY TEXT, "
- "PARTNUMBER INTEGER)")
- db.commit()
- # 获取源文件列表
- logger.info('Get source file list')
- if SrcFileIndex == "*":
- src_file_list = get_s3_file_list(s3_src_client, SrcBucket)
- else:
- src_file_list = head_s3_single_file(s3_src_client, SrcBucket)
- # 对文件列表中的逐个文件进行下载操作
- with futures.ThreadPoolExecutor(max_workers=MaxParallelFile) as file_pool:
- for src_file in src_file_list:
- file_pool.submit(download_file, src_file, ChunkSize_default)
- # 再次获取源文件列表和目标文件夹现存文件列表进行比较,每个文件大小一致,输出比较结果
- time_str = str(datetime.datetime.now() - start_time)
- compare_local_to_s3()
- print(f'\033[0;34;1mMISSION ACCOMPLISHED - Time: {time_str} \033[0m - FROM: {SrcBucket}/{S3Prefix} TO {DesDir}')
- print('Logged to file:', os.path.abspath(log_file_name))
- input('PRESS ENTER TO QUIT')
|