123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290 |
- # -*- coding: utf-8 -*-
- # PROJECT LONGBOW - AMAZON S3 UPLOAD TOOL WITH BREAK-POINT RESUMING
- import os
- import sys
- import json
- import base64
- from boto3.session import Session
- from botocore.client import Config
- from concurrent import futures
- from configparser import ConfigParser, RawConfigParser, NoOptionError
- import time
- import datetime
- import hashlib
- import logging
- from pathlib import PurePosixPath, Path
- import platform
- import codecs
- os.system("") # workaround for some windows system to print color
- global JobType, SrcFileIndex, DesProfileName, DesBucket, S3Prefix, MaxRetry, MaxThread, \
- MaxParallelFile, StorageClass, ifVerifyMD5, DontAskMeToClean, LoggingLevel, \
- SrcDir, SrcBucket, SrcProfileName, ali_SrcBucket, ali_access_key_id, ali_access_key_secret, ali_endpoint
- # Read config.ini with GUI
- def set_config():
- sys_para = sys.argv
- file_path = os.path.split(sys_para[0])[0]
- gui = False
- if platform.uname()[0] == 'Windows': # Win默认打开
- gui = True
- if platform.uname()[0] == 'Linux': # Linux 默认关闭
- gui = False
- if '--gui' in sys.argv: # 指定 gui 模式
- gui = True
- if '--nogui' in sys.argv: # 带 nogui 就覆盖前面Win打开要求
- gui = False
- JobType_list = ['LOCAL_TO_S3', 'S3_TO_S3', 'ALIOSS_TO_S3']
- StorageClass_list = ['STANDARD', 'REDUCED_REDUNDANCY', 'STANDARD_IA', 'ONEZONE_IA', 'INTELLIGENT_TIERING',
- 'GLACIER', 'DEEP_ARCHIVE']
- config_file = os.path.join(file_path, 's3_upload_config.ini')
- # If no config file, read the default config
- if not os.path.exists(config_file):
- config_file += '.default'
- print("No customized config, use the default config")
- cfg = ConfigParser()
- print(f'Reading config file: {config_file}')
- # Get local config value
- try:
- global JobType, SrcFileIndex, DesProfileName, DesBucket, S3Prefix, MaxRetry, MaxThread, \
- MaxParallelFile, StorageClass, ifVerifyMD5, DontAskMeToClean, LoggingLevel, \
- SrcDir, SrcBucket, SrcProfileName, ali_SrcBucket, ali_access_key_id, ali_access_key_secret, ali_endpoint
- cfg.read(config_file, encoding='utf-8-sig')
- JobType = cfg.get('Basic', 'JobType')
- SrcFileIndex = cfg.get('Basic', 'SrcFileIndex')
- DesProfileName = cfg.get('Basic', 'DesProfileName')
- DesBucket = cfg.get('Basic', 'DesBucket')
- S3Prefix = cfg.get('Basic', 'S3Prefix')
- Megabytes = 1024 * 1024
- ChunkSize = cfg.getint('Advanced', 'ChunkSize') * Megabytes
- MaxRetry = cfg.getint('Advanced', 'MaxRetry')
- MaxThread = cfg.getint('Advanced', 'MaxThread')
- MaxParallelFile = cfg.getint('Advanced', 'MaxParallelFile')
- StorageClass = cfg.get('Advanced', 'StorageClass')
- ifVerifyMD5 = cfg.getboolean('Advanced', 'ifVerifyMD5')
- DontAskMeToClean = cfg.getboolean('Advanced', 'DontAskMeToClean')
- LoggingLevel = cfg.get('Advanced', 'LoggingLevel')
- try:
- SrcDir = cfg.get('LOCAL_TO_S3', 'SrcDir')
- except NoOptionError:
- SrcDir = ''
- try:
- SrcBucket = cfg.get('S3_TO_S3', 'SrcBucket')
- SrcProfileName = cfg.get('S3_TO_S3', 'SrcProfileName')
- except NoOptionError:
- SrcBucket = ''
- SrcProfileName = ''
- try:
- ali_SrcBucket = cfg.get('ALIOSS_TO_S3', 'ali_SrcBucket')
- ali_access_key_id = cfg.get('ALIOSS_TO_S3', 'ali_access_key_id')
- ali_access_key_secret = cfg.get('ALIOSS_TO_S3', 'ali_access_key_secret')
- ali_endpoint = cfg.get('ALIOSS_TO_S3', 'ali_endpoint')
- except NoOptionError:
- ali_SrcBucket = ""
- ali_access_key_id = ""
- ali_access_key_secret = ""
- ali_endpoint = ""
- except Exception as e:
- print("ERR loading s3_upload_config.ini", str(e))
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- # GUI only well support LOCAL_TO_S3 mode, start with --gui option
- # For other JobTpe, GUI is not a prefer option since it's better run on EC2 Linux
- if gui:
- # For GUI
- from tkinter import Tk, filedialog, END, StringVar, BooleanVar, messagebox
- from tkinter.ttk import Combobox, Label, Button, Entry, Spinbox, Checkbutton
- # get profile name list in ./aws/credentials
- pro_conf = RawConfigParser()
- pro_path = os.path.join(os.path.expanduser("~"), ".aws")
- cre_path = os.path.join(pro_path, "credentials")
- if os.path.exists(cre_path):
- pro_conf.read(cre_path)
- profile_list = pro_conf.sections()
- else:
- print(f"There is no aws_access_key in {cre_path}, please input for Destination S3 Bucket: ")
- os.mkdir(pro_path)
- aws_access_key_id = input('aws_access_key_id: ')
- aws_secret_access_key = input('aws_secret_access_key: ')
- region = input('region: ')
- pro_conf.add_section('default')
- pro_conf['default']['aws_access_key_id'] = aws_access_key_id
- pro_conf['default']['aws_secret_access_key'] = aws_secret_access_key
- pro_conf['default']['region'] = region
- profile_list = ['default']
- with open(cre_path, 'w') as f:
- print(f"Saving credentials to {cre_path}")
- pro_conf.write(f)
- # Click Select Folder
- def browse_folder():
- local_dir = filedialog.askdirectory(initialdir=os.path.dirname(__file__))
- url_txt.delete(0, END)
- url_txt.insert(0, local_dir)
- file_txt.delete(0, END)
- file_txt.insert(0, "*")
- # Finsih browse folder
- # Click Select File
- def browse_file():
- local_file = filedialog.askopenfilename()
- url_txt.delete(0, END)
- url_txt.insert(0, os.path.split(local_file)[0])
- file_txt.delete(0, END)
- file_txt.insert(0, os.path.split(local_file)[1])
- # Finsih browse file
- # Click List Buckets
- def ListBuckets(*args):
- DesProfileName = DesProfileName_txt.get()
- client = Session(profile_name=DesProfileName).client('s3')
- bucket_list = []
- try:
- response = client.list_buckets()
- if 'Buckets' in response:
- bucket_list = [b['Name'] for b in response['Buckets']]
- except Exception as e:
- messagebox.showerror('Error', f'Failt to List buckets. \n'
- f'Please verify your aws_access_key of profile: [{DesProfileName}]\n'
- f'{str(e)}')
- bucket_list = ['CAN_NOT_GET_BUCKET_LIST']
- DesBucket_txt['values'] = bucket_list
- DesBucket_txt.current(0)
- # Finish ListBuckets
- # Click List Prefix
- def ListPrefix(*args):
- DesProfileName = DesProfileName_txt.get()
- client = Session(profile_name=DesProfileName).client('s3')
- prefix_list = []
- this_bucket = DesBucket_txt.get()
- max_get = 100
- try:
- response = client.list_objects_v2(
- Bucket=this_bucket,
- Delimiter='/'
- ) # Only get the max 1000 prefix for simply list
- if 'CommonPrefixes' in response:
- prefix_list = [c['Prefix'] for c in response['CommonPrefixes']]
- if not prefix_list:
- messagebox.showinfo('Message', f'There is no "/" Prefix in: {this_bucket}')
- if response['IsTruncated']:
- messagebox.showinfo('Message', f'More than {max_get} Prefix, cannot fully list here.')
- except Exception as e:
- messagebox.showinfo('Error', f'Cannot get prefix list from bucket: {this_bucket}, {str(e)}')
- S3Prefix_txt['values'] = prefix_list
- S3Prefix_txt.current(0)
- # Finish list prefix
- # Change JobType
- def job_change(*args):
- if JobType_mode.get() != 'LOCAL_TO_S3':
- messagebox.showinfo('Notice', 'S3_TO_S3 or OSS_TO_S3. \n'
- 'Please config the rest hidden parameter in s3_upload_config.ini')
- # Finish JobType change message
- # Click START button
- def close():
- window.withdraw()
- ok = messagebox.askokcancel('Start uploading job',
- f'Upload from Local to \ns3://{DesBucket_txt.get()}/{S3Prefix_txt.get()}\n'
- f'Click OK to START')
- if not ok:
- window.deiconify()
- return
- window.quit()
- return
- # Finish close()
- # Start GUI
- window = Tk()
- window.title("LONGBOW - AMAZON S3 UPLOAD TOOL WITH BREAK-POINT RESUMING")
- window.geometry('705x350')
- window.configure(background='#ECECEC')
- window.protocol("WM_DELETE_WINDOW", sys.exit)
- Label(window, text='Job Type').grid(column=0, row=0, sticky='w', padx=2, pady=2)
- JobType_mode = Combobox(window, width=15, state="readonly")
- JobType_mode['values'] = tuple(JobType_list)
- JobType_mode.grid(column=1, row=0, sticky='w', padx=2, pady=2)
- if JobType in JobType_list:
- position = JobType_list.index(JobType)
- JobType_mode.current(position)
- else:
- JobType_mode.current(0)
- JobType_mode.bind("<<ComboboxSelected>>", job_change)
- Label(window, text="Folder").grid(column=0, row=1, sticky='w', padx=2, pady=2)
- url_txt = Entry(window, width=50)
- url_txt.grid(column=1, row=1, sticky='w', padx=2, pady=2)
- url_btn = Button(window, text="Select Folder", width=10, command=browse_folder)
- url_btn.grid(column=2, row=1, sticky='w', padx=2, pady=2)
- url_txt.insert(0, SrcDir)
- Label(window, text="Filename or *").grid(column=0, row=2, sticky='w', padx=2, pady=2)
- file_txt = Entry(window, width=50)
- file_txt.grid(column=1, row=2, sticky='w', padx=2, pady=2)
- file_btn = Button(window, text="Select File", width=10, command=browse_file)
- file_btn.grid(column=2, row=2, sticky='w', padx=2, pady=2)
- file_txt.insert(0, SrcFileIndex)
- Label(window, text="AWS Profile").grid(column=0, row=3, sticky='w', padx=2, pady=2)
- DesProfileName_txt = Combobox(window, width=15, state="readonly")
- DesProfileName_txt['values'] = tuple(profile_list)
- DesProfileName_txt.grid(column=1, row=3, sticky='w', padx=2, pady=2)
- if DesProfileName in profile_list:
- position = profile_list.index(DesProfileName)
- DesProfileName_txt.current(position)
- else:
- DesProfileName_txt.current(0)
- DesProfileName = DesProfileName_txt.get()
- DesProfileName_txt.bind("<<ComboboxSelected>>", ListBuckets)
- Label(window, text="S3 Bucket").grid(column=0, row=4, sticky='w', padx=2, pady=2)
- DesBucket_txt = Combobox(window, width=48)
- DesBucket_txt.grid(column=1, row=4, sticky='w', padx=2, pady=2)
- DesBucket_txt['values'] = DesBucket
- DesBucket_txt.current(0)
- Button(window, text="List Buckets", width=10, command=ListBuckets) \
- .grid(column=2, row=4, sticky='w', padx=2, pady=2)
- Label(window, text="S3 Prefix").grid(column=0, row=5, sticky='w', padx=2, pady=2)
- S3Prefix_txt = Combobox(window, width=48)
- S3Prefix_txt.grid(column=1, row=5, sticky='w', padx=2, pady=2)
- S3Prefix_txt['values'] = S3Prefix
- if S3Prefix != '':
- S3Prefix_txt.current(0)
- Button(window, text="List Prefix", width=10, command=ListPrefix) \
- .grid(column=2, row=5, sticky='w', padx=2, pady=2)
- Label(window, text="MaxThread/File").grid(column=0, row=6, sticky='w', padx=2, pady=2)
- if MaxThread < 1 or MaxThread > 100:
- MaxThread = 5
- var_t = StringVar()
- var_t.set(str(MaxThread))
- MaxThread_txt = Spinbox(window, from_=1, to=100, width=15, textvariable=var_t)
- MaxThread_txt.grid(column=1, row=6, sticky='w', padx=2, pady=2)
- Label(window, text="MaxParallelFile").grid(column=0, row=7, sticky='w', padx=2, pady=2)
- if MaxParallelFile < 1 or MaxParallelFile > 100:
- MaxParallelFile = 5
- var_f = StringVar()
- var_f.set(str(MaxParallelFile))
- MaxParallelFile_txt = Spinbox(window, from_=1, to=100, width=15, textvariable=var_f)
- MaxParallelFile_txt.grid(column=1, row=7, sticky='w', padx=2, pady=2)
- Label(window, text="S3 StorageClass").grid(column=0, row=8, sticky='w', padx=2, pady=2)
- StorageClass_txt = Combobox(window, width=15, state="readonly")
- StorageClass_txt['values'] = tuple(StorageClass_list)
- StorageClass_txt.grid(column=1, row=8, sticky='w', padx=2, pady=2)
- if StorageClass in StorageClass_list:
- position = StorageClass_list.index(StorageClass)
- StorageClass_txt.current(position)
- else:
- StorageClass_txt.current(0)
- save_config = BooleanVar()
- save_config.set(True)
- save_config_txt = Checkbutton(window, text="Save to s3_upload_config.ini", var=save_config)
- save_config_txt.grid(column=1, row=9, padx=2, pady=2)
- Button(window, text="Start Upload", width=15, command=close).grid(column=1, row=10, padx=5, pady=5)
- window.mainloop()
- JobType = JobType_mode.get()
- SrcDir = url_txt.get()
- SrcFileIndex = file_txt.get()
- DesBucket = DesBucket_txt.get()
- S3Prefix = S3Prefix_txt.get()
- DesProfileName = DesProfileName_txt.get()
- StorageClass = StorageClass_txt.get()
- MaxThread = int(MaxThread_txt.get())
- MaxParallelFile = int(MaxParallelFile_txt.get())
- if save_config:
- cfg['Basic']['JobType'] = JobType
- cfg['Basic']['DesProfileName'] = DesProfileName
- cfg['Basic']['DesBucket'] = DesBucket
- cfg['Basic']['S3Prefix'] = S3Prefix
- cfg['Advanced']['MaxThread'] = str(MaxThread)
- cfg['Advanced']['MaxParallelFile'] = str(MaxParallelFile)
- cfg['Advanced']['StorageClass'] = StorageClass
- cfg['LOCAL_TO_S3']['SrcDir'] = SrcDir
- cfg['Basic']['SrcFileIndex'] = SrcFileIndex
- config_file = os.path.join(file_path, 's3_upload_config.ini')
- with codecs.open(config_file, 'w', 'utf-8') as f:
- cfg.write(f)
- print(f"Save config to {config_file}")
- # GUI window finish
- S3Prefix = str(PurePosixPath(S3Prefix)) # 去掉结尾的'/',如果有的话
- if S3Prefix == '/' or S3Prefix == '.':
- S3Prefix = ''
- # 校验
- if JobType not in JobType_list:
- print(f'ERR JobType: {JobType}, check config file: {config_file}')
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- # Finish set_config()
- return ChunkSize
- # Configure logging
- def set_log():
- logger = logging.getLogger()
- # File logging
- if not os.path.exists("./log"):
- os.system("mkdir log")
- this_file_name = os.path.splitext(os.path.basename(__file__))[0]
- file_time = datetime.datetime.now().isoformat().replace(':', '-')[:19]
- log_file_name = './log/' + this_file_name + '-' + file_time + '.log'
- print('Logging to file:', os.path.abspath(log_file_name))
- print('Logging level:', LoggingLevel)
- fileHandler = logging.FileHandler(filename=log_file_name, encoding='utf-8')
- fileHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
- logger.addHandler(fileHandler)
- # Screen stream logging
- streamHandler = logging.StreamHandler()
- streamHandler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s'))
- logger.addHandler(streamHandler)
- # Loggin Level
- logger.setLevel(logging.WARNING)
- if LoggingLevel == 'INFO':
- logger.setLevel(logging.INFO)
- elif LoggingLevel == 'DEBUG':
- logger.setLevel(logging.DEBUG)
- return logger, log_file_name
- # Get local file list
- def get_local_file_list(str_key=False):
- __src_file_list = []
- try:
- if SrcFileIndex == "*":
- for parent, dirnames, filenames in os.walk(SrcDir):
- for filename in filenames: # 遍历输出文件信息
- file_absPath = os.path.join(parent, filename)
- file_relativePath = file_absPath[len(SrcDir) + 1:]
- file_size = os.path.getsize(file_absPath)
- key = Path(file_relativePath)
- if str_key:
- key = str(key)
- __src_file_list.append({
- "Key": key,
- "Size": file_size
- })
- else:
- join_path = os.path.join(SrcDir, SrcFileIndex)
- file_size = os.path.getsize(join_path)
- __src_file_list = [{
- "Key": SrcFileIndex,
- "Size": file_size
- }]
- except Exception as err:
- logger.error('Can not get source files. ERR: ' + str(err))
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- if not __src_file_list:
- logger.error('Source file empty.')
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- return __src_file_list
- # Get object list on S3
- def get_s3_file_list(*, s3_client, bucket, S3Prefix, no_prefix=False):
- logger.info('Get s3 file list ' + bucket)
- # For delete prefix in des_prefix
- if S3Prefix == '':
- # 目的bucket没有设置 Prefix
- dp_len = 0
- else:
- # 目的bucket的 "prefix/"长度
- dp_len = len(S3Prefix) + 1
- paginator = s3_client.get_paginator('list_objects_v2')
- __des_file_list = []
- try:
- response_iterator = paginator.paginate(
- Bucket=bucket,
- Prefix=S3Prefix
- )
- for page in response_iterator:
- if "Contents" in page:
- for n in page["Contents"]:
- key = n["Key"]
- if no_prefix:
- key = key[dp_len:]
- __des_file_list.append({
- "Key": key,
- "Size": n["Size"]
- })
- logger.info(f'Bucket list length:{str(len(__des_file_list))}')
- except Exception as err:
- logger.error(str(err))
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- return __des_file_list
- # Check single file on S3
- def head_s3_single_file(s3_client, bucket):
- try:
- response_fileList = s3_client.head_object(
- Bucket=bucket,
- Key=str(Path(S3Prefix)/SrcFileIndex)
- )
- file = [{
- "Key": str(Path(S3Prefix)/SrcFileIndex),
- "Size": response_fileList["ContentLength"]
- }]
- except Exception as err:
- logger.error(str(err))
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- return file
- # Check single file on OSS
- def head_oss_single_file(__ali_bucket):
- try:
- response_fileList = __ali_bucket.head_object(
- key=S3Prefix + SrcFileIndex
- )
- file = [{
- "Key": S3Prefix + SrcFileIndex,
- "Size": response_fileList.content_length
- }]
- except Exception as err:
- logger.error(str(err))
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- return file
- # Get object list on OSS
- def get_ali_oss_file_list(__ali_bucket):
- logger.info('Get oss file list ' + ali_SrcBucket)
- __des_file_list = []
- try:
- response_fileList = __ali_bucket.list_objects(
- prefix=S3Prefix,
- max_keys=1000
- )
- if len(response_fileList.object_list) != 0:
- for n in response_fileList.object_list:
- __des_file_list.append({
- "Key": n.key,
- "Size": n.size
- })
- while response_fileList.is_truncated:
- response_fileList = __ali_bucket.list_objects(
- prefix=S3Prefix,
- max_keys=1000,
- marker=response_fileList.next_marker
- )
- for n in response_fileList.object_list:
- __des_file_list.append({
- "Key": n.key,
- "Size": n.size
- })
- else:
- logger.info('File list is empty in the ali_oss bucket')
- except Exception as err:
- logger.error(str(err))
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- return __des_file_list
- # Get all exist object list on S3
- def get_uploaded_list(s3_client):
- logger.info('Get unfinished multipart upload')
- NextKeyMarker = ''
- IsTruncated = True
- __multipart_uploaded_list = []
- while IsTruncated:
- list_multipart_uploads = s3_client.list_multipart_uploads(
- Bucket=DesBucket,
- Prefix=S3Prefix,
- MaxUploads=1000,
- KeyMarker=NextKeyMarker
- )
- IsTruncated = list_multipart_uploads["IsTruncated"]
- NextKeyMarker = list_multipart_uploads["NextKeyMarker"]
- if NextKeyMarker != '':
- for i in list_multipart_uploads["Uploads"]:
- __multipart_uploaded_list.append({
- "Key": i["Key"],
- "Initiated": i["Initiated"],
- "UploadId": i["UploadId"]
- })
- logger.info(f'Unfinished upload, Key: {i["Key"]}, Time: {i["Initiated"]}')
- return __multipart_uploaded_list
- # Jump to handle next file
- class NextFile(Exception):
- pass
- def uploadThread_small(srcfile, prefix_and_key):
- print(f'\033[0;32;1m--->Uploading\033[0m {srcfile["Key"]} - small file')
- with open(os.path.join(SrcDir, srcfile["Key"]), 'rb') as data:
- for retryTime in range(MaxRetry + 1):
- try:
- pstart_time = time.time()
- chunkdata = data.read()
- chunkdata_md5 = hashlib.md5(chunkdata)
- s3_dest_client.put_object(
- Body=chunkdata,
- Bucket=DesBucket,
- Key=prefix_and_key,
- ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8'),
- StorageClass=StorageClass
- )
- pload_time = time.time() - pstart_time
- pload_bytes = len(chunkdata)
- pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
- print(f'\033[0;34;1m --->Complete\033[0m {srcfile["Key"]} - small file - {pload_speed}')
- break
- except Exception as e:
- logger.warning(f'Upload small file Fail: {srcfile["Key"]}, '
- f'{str(e)}, Attempts: {retryTime}')
- if retryTime >= MaxRetry:
- logger.error(f'Fail MaxRetry Download/Upload small file: {srcfile["Key"]}')
- return "MaxRetry"
- else:
- time.sleep(5 * retryTime)
- return
- def download_uploadThread_small(srcfileKey):
- for retryTime in range(MaxRetry + 1):
- try:
- pstart_time = time.time()
- # Get object
- print(f"\033[0;33;1m--->Downloading\033[0m {srcfileKey} - small file")
- response_get_object = s3_src_client.get_object(
- Bucket=SrcBucket,
- Key=srcfileKey
- )
- getBody = response_get_object["Body"].read()
- chunkdata_md5 = hashlib.md5(getBody)
- ContentMD5 = base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
- # Put object
- print(f'\033[0;32;1m --->Uploading\033[0m {srcfileKey} - small file')
- s3_dest_client.put_object(
- Body=getBody,
- Bucket=DesBucket,
- Key=srcfileKey,
- ContentMD5=ContentMD5,
- StorageClass=StorageClass
- )
- # 结束 Upload/download
- pload_time = time.time() - pstart_time
- pload_bytes = len(getBody)
- pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
- print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} - small file - {pload_speed}')
- break
- except Exception as e:
- logger.warning(f'Download/Upload small file Fail: {srcfileKey}, '
- f'{str(e)}, Attempts: {retryTime}')
- if retryTime >= MaxRetry:
- logger.error(f'Fail MaxRetry Download/Upload small file: {srcfileKey}')
- return "MaxRetry"
- else:
- time.sleep(5 * retryTime)
- return
- def alioss_download_uploadThread_small(srcfileKey):
- for retryTime in range(MaxRetry + 1):
- try:
- pstart_time = time.time()
- # Get Objcet
- print(f"\033[0;33;1m--->Downloading\033[0m {srcfileKey} - small file")
- response_get_object = ali_bucket.get_object(
- key=srcfileKey
- )
- getBody = b''
- for chunk in response_get_object:
- if chunk != '':
- getBody += chunk
- chunkdata_md5 = hashlib.md5(getBody)
- # Put Object
- print(f"\033[0;32;1m --->Uploading\033[0m {srcfileKey} - small file")
- s3_dest_client.put_object(
- Body=getBody,
- Bucket=DesBucket,
- Key=srcfileKey,
- ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8'),
- StorageClass=StorageClass
- )
- pload_time = time.time() - pstart_time
- pload_bytes = len(getBody)
- pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
- print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} - small file - {pload_speed}')
- break
- except Exception as e:
- logger.warning(f'Download/Upload small file Fail: {srcfileKey} - small file, '
- f'{str(e)}, Attempts: {retryTime}')
- if retryTime >= MaxRetry:
- logger.error(f'Fail MaxRetry Download/Upload small file: {srcfileKey} - small file')
- return "MaxRetry"
- else:
- time.sleep(5 * retryTime)
- return
- # Upload file with different JobType
- def upload_file(*, srcfile, desFilelist, UploadIdList, ChunkSize_default): # UploadIdList就是multipart_uploaded_list
- logger.info(f'Start file: {srcfile["Key"]}')
- prefix_and_key = srcfile["Key"]
- if JobType == 'LOCAL_TO_S3':
- prefix_and_key = str(PurePosixPath(S3Prefix) / srcfile["Key"])
- if srcfile['Size'] >= ChunkSize_default:
- try:
- # 循环重试3次(如果MD5计算的ETag不一致)
- for md5_retry in range(3):
- # 检查文件是否已存在,存在不继续、不存在且没UploadID要新建、不存在但有UploadID得到返回的UploadID
- response_check_upload = check_file_exist(srcfile=srcfile,
- desFilelist=desFilelist,
- UploadIdList=UploadIdList)
- if response_check_upload == 'UPLOAD':
- logger.info(f'New upload: {srcfile["Key"]}')
- response_new_upload = s3_dest_client.create_multipart_upload(
- Bucket=DesBucket,
- Key=prefix_and_key,
- StorageClass=StorageClass
- )
- # logger.info("UploadId: "+response_new_upload["UploadId"])
- reponse_uploadId = response_new_upload["UploadId"]
- partnumberList = []
- elif response_check_upload == 'NEXT':
- logger.info(f'Duplicated. {srcfile["Key"]} same size, goto next file.')
- raise NextFile()
- else:
- reponse_uploadId = response_check_upload
- # 获取已上传partnumberList
- partnumberList = checkPartnumberList(srcfile, reponse_uploadId)
- # 获取索引列表,例如[0, 10, 20]
- response_indexList, ChunkSize_auto = split(srcfile, ChunkSize_default)
- # 执行分片upload
- upload_etag_full = uploadPart(uploadId=reponse_uploadId,
- indexList=response_indexList,
- partnumberList=partnumberList,
- srcfile=srcfile,
- ChunkSize_auto=ChunkSize_auto)
- # 合并S3上的文件
- response_complete = completeUpload(reponse_uploadId=reponse_uploadId,
- srcfileKey=srcfile["Key"],
- len_indexList=len(response_indexList))
- logger.info(f'FINISH: {srcfile["Key"]} TO {response_complete["Location"]}')
- # 检查文件MD5
- if ifVerifyMD5:
- if response_complete["ETag"] == upload_etag_full:
- logger.info(f'MD5 ETag Matched - {srcfile["Key"]} - {response_complete["ETag"]}')
- break
- else: # ETag 不匹配,删除S3的文件,重试
- logger.warning(f'MD5 ETag NOT MATCHED {srcfile["Key"]}( Destination / Origin ): '
- f'{response_complete["ETag"]} - {upload_etag_full}')
- s3_dest_client.delete_object(
- Bucket=DesBucket,
- Key=prefix_and_key
- )
- UploadIdList = []
- logger.warning('Deleted and retry upload {srcfile["Key"]}')
- if md5_retry == 2:
- logger.warning('MD5 ETag NOT MATCHED Exceed Max Retries - {srcfile["Key"]}')
- else:
- break
- except NextFile:
- pass
- # Small file procedure
- else:
- # Check file exist
- for f in desFilelist:
- if f["Key"] == prefix_and_key and \
- (srcfile["Size"] == f["Size"]):
- logger.info(f'Duplicated. {prefix_and_key} same size, goto next file.')
- return
- # 找不到文件,或文件Size不一致 Submit upload
- if JobType == 'LOCAL_TO_S3':
- uploadThread_small(srcfile, prefix_and_key)
- elif JobType == 'S3_TO_S3':
- download_uploadThread_small(srcfile["Key"])
- elif JobType == 'ALIOSS_TO_S3':
- alioss_download_uploadThread_small(srcfile["Key"])
- return
- # Compare file exist on desination bucket
- def check_file_exist(*, srcfile, desFilelist, UploadIdList):
- # 检查源文件是否在目标文件夹中
- prefix_and_key = srcfile["Key"]
- if JobType == 'LOCAL_TO_S3':
- prefix_and_key = str(PurePosixPath(S3Prefix) / srcfile["Key"])
- for f in desFilelist:
- if f["Key"] == prefix_and_key and \
- (srcfile["Size"] == f["Size"]):
- return 'NEXT' # 文件完全相同
- # 找不到文件,或文件不一致,要重新传的
- # 查Key是否有未完成的UploadID
- keyIDList = []
- for u in UploadIdList:
- if u["Key"] == prefix_and_key:
- keyIDList.append(u)
- # 如果找不到上传过的Upload,则从头开始传
- if not keyIDList:
- return 'UPLOAD'
- # 对同一个Key(文件)的不同Upload找出时间最晚的值
- UploadID_latest = keyIDList[0]
- for u in keyIDList:
- if u["Initiated"] > UploadID_latest["Initiated"]:
- UploadID_latest = u
- return UploadID_latest["UploadId"]
- # Check parts number exist on S3
- def checkPartnumberList(srcfile, uploadId):
- try:
- prefix_and_key = srcfile["Key"]
- if JobType == 'LOCAL_TO_S3':
- prefix_and_key = str(PurePosixPath(S3Prefix) / srcfile["Key"])
- partnumberList = []
- PartNumberMarker = 0
- IsTruncated = True
- while IsTruncated:
- response_uploadedList = s3_dest_client.list_parts(
- Bucket=DesBucket,
- Key=prefix_and_key,
- UploadId=uploadId,
- MaxParts=1000,
- PartNumberMarker=PartNumberMarker
- )
- NextPartNumberMarker = response_uploadedList['NextPartNumberMarker']
- IsTruncated = response_uploadedList['IsTruncated']
- if NextPartNumberMarker > 0:
- for partnumberObject in response_uploadedList["Parts"]:
- partnumberList.append(partnumberObject["PartNumber"])
- PartNumberMarker = NextPartNumberMarker
- if partnumberList: # 如果为0则表示没有查到已上传的Part
- logger.info("Found uploaded partnumber: " + json.dumps(partnumberList))
- except Exception as checkPartnumberList_err:
- logger.error("checkPartnumberList_err" + json.dumps(checkPartnumberList_err))
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- return partnumberList
- # split the file into a virtual part list of index, each index is the start point of the file
- def split(srcfile, ChunkSize):
- partnumber = 1
- indexList = [0]
- if int(srcfile["Size"] / ChunkSize) + 1 > 10000:
- ChunkSize = int(srcfile["Size"] / 10000) + 1024 # 对于大于10000分片的大文件,自动调整Chunksize
- logger.info(f'Size excess 10000 parts limit. Auto change ChunkSize to {ChunkSize}')
- while ChunkSize * partnumber < srcfile["Size"]: # 如果刚好是"=",则无需再分下一part,所以这里不能用"<="
- indexList.append(ChunkSize * partnumber)
- partnumber += 1
- return indexList, ChunkSize
- # upload parts in the list
- def uploadPart(*, uploadId, indexList, partnumberList, srcfile, ChunkSize_auto):
- partnumber = 1 # 当前循环要上传的Partnumber
- total = len(indexList)
- md5list = [hashlib.md5(b'')] * total
- complete_list = []
- # 线程池Start
- with futures.ThreadPoolExecutor(max_workers=MaxThread) as pool:
- for partStartIndex in indexList:
- # start to upload part
- if partnumber not in partnumberList:
- dryrun = False
- else:
- dryrun = True
- # upload 1 part/thread, or dryrun to only caculate md5
- if JobType == 'LOCAL_TO_S3':
- pool.submit(uploadThread,
- uploadId=uploadId,
- partnumber=partnumber,
- partStartIndex=partStartIndex,
- srcfileKey=srcfile["Key"],
- total=total,
- md5list=md5list,
- dryrun=dryrun,
- complete_list=complete_list,
- ChunkSize=ChunkSize_auto)
- elif JobType == 'S3_TO_S3':
- pool.submit(download_uploadThread,
- uploadId=uploadId,
- partnumber=partnumber,
- partStartIndex=partStartIndex,
- srcfileKey=srcfile["Key"],
- total=total,
- md5list=md5list,
- dryrun=dryrun,
- complete_list=complete_list,
- ChunkSize=ChunkSize_auto)
- elif JobType == 'ALIOSS_TO_S3':
- pool.submit(alioss_download_uploadThread,
- uploadId=uploadId,
- partnumber=partnumber,
- partStartIndex=partStartIndex,
- srcfileKey=srcfile["Key"],
- srcfileSize=srcfile["Size"],
- total=total,
- md5list=md5list,
- dryrun=dryrun,
- complete_list=complete_list,
- ChunkSize=ChunkSize_auto)
- partnumber += 1
- # 线程池End
- logger.info(f'All parts uploaded - {srcfile["Key"]} - size: {srcfile["Size"]}')
- # Local upload 的时候考虑传输过程中文件会变更的情况,重新扫描本地文件的MD5,而不是用之前读取的body去生成的md5list
- if ifVerifyMD5 and JobType == 'LOCAL_TO_S3':
- md5list = cal_md5list(indexList=indexList,
- srcfileKey=srcfile["Key"],
- ChunkSize=ChunkSize_auto)
- # 计算所有分片列表的总etag: cal_etag
- digests = b"".join(m.digest() for m in md5list)
- md5full = hashlib.md5(digests)
- cal_etag = '"%s-%s"' % (md5full.hexdigest(), len(md5list))
- return cal_etag
- # convert bytes to human readable string
- def size_to_str(size):
- def loop(integer, remainder, level):
- if integer >= 1024:
- remainder = integer % 1024
- integer //= 1024
- level += 1
- return loop(integer, remainder, level)
- else:
- return integer, round(remainder / 1024, 1), level
- units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
- integer, remainder, level = loop(int(size), 0, 0)
- if level+1 > len(units):
- level = -1
- return f'{integer+remainder} {units[level]}'
- # 本地文件重新计算一次MD5
- def cal_md5list(*, indexList, srcfileKey, ChunkSize):
- logger.info(f'Re-read local file to calculate MD5 again: {srcfileKey}')
- md5list = []
- with open(os.path.join(SrcDir, srcfileKey), 'rb') as data:
- for partStartIndex in indexList:
- data.seek(partStartIndex)
- chunkdata = data.read(ChunkSize)
- chunkdata_md5 = hashlib.md5(chunkdata)
- md5list.append(chunkdata_md5)
- return md5list
- # Single Thread Upload one part, from local to s3
- def uploadThread(*, uploadId, partnumber, partStartIndex, srcfileKey, total, md5list, dryrun, complete_list, ChunkSize):
- prefix_and_key = str(PurePosixPath(S3Prefix) / srcfileKey)
- if not dryrun:
- print(f'\033[0;32;1m--->Uploading\033[0m {srcfileKey} - {partnumber}/{total}')
- pstart_time = time.time()
- with open(os.path.join(SrcDir, srcfileKey), 'rb') as data:
- retryTime = 0
- while retryTime <= MaxRetry:
- try:
- data.seek(partStartIndex)
- chunkdata = data.read(ChunkSize)
- chunkdata_md5 = hashlib.md5(chunkdata)
- md5list[partnumber - 1] = chunkdata_md5
- if not dryrun:
- s3_dest_client.upload_part(
- Body=chunkdata,
- Bucket=DesBucket,
- Key=prefix_and_key,
- PartNumber=partnumber,
- UploadId=uploadId,
- ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
- )
- # 这里对单个part上传做了 MD5 校验,后面多part合并的时候会再做一次整个文件的
- break
- except Exception as err:
- retryTime += 1
- logger.info(f'UploadThreadFunc log: {srcfileKey} - {str(err)}')
- logger.info(f'Upload Fail - {srcfileKey} - Retry part - {partnumber} - Attempt - {retryTime}')
- if retryTime > MaxRetry:
- logger.error(f'Quit for Max retries: {retryTime}')
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- time.sleep(5 * retryTime) # 递增延迟重试
- complete_list.append(partnumber)
- pload_time = time.time() - pstart_time
- pload_bytes = len(chunkdata)
- pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
- if not dryrun:
- print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
- f'- {partnumber}/{total} \033[0;34;1m{len(complete_list) / total:.2%} - {pload_speed}\033[0m')
- return
- # download part from src. s3 and upload to dest. s3
- def download_uploadThread(*, uploadId, partnumber, partStartIndex, srcfileKey, total, md5list, dryrun, complete_list,
- ChunkSize):
- pstart_time = time.time()
- getBody, chunkdata_md5 = b'', b'' # init
- if ifVerifyMD5 or not dryrun:
- # 下载文件
- if not dryrun:
- print(f"\033[0;33;1m--->Downloading\033[0m {srcfileKey} - {partnumber}/{total}")
- else:
- print(f"\033[0;33;40m--->Downloading for verify MD5\033[0m {srcfileKey} - {partnumber}/{total}")
- retryTime = 0
- while retryTime <= MaxRetry:
- try:
- response_get_object = s3_src_client.get_object(
- Bucket=SrcBucket,
- Key=srcfileKey,
- Range="bytes=" + str(partStartIndex) + "-" + str(partStartIndex + ChunkSize - 1)
- )
- getBody = response_get_object["Body"].read()
- chunkdata_md5 = hashlib.md5(getBody)
- md5list[partnumber - 1] = chunkdata_md5
- break
- except Exception as err:
- retryTime += 1
- logger.warning(f"DownloadThreadFunc - {srcfileKey} - Exception log: {str(err)}")
- logger.warning(f"Download part fail, retry part: {partnumber} Attempts: {retryTime}")
- if retryTime > MaxRetry:
- logger.error(f"Quit for Max Download retries: {retryTime}")
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- time.sleep(5 * retryTime) # 递增延迟重试
- if not dryrun:
- # 上传文件
- print(f'\033[0;32;1m --->Uploading\033[0m {srcfileKey} - {partnumber}/{total}')
- retryTime = 0
- while retryTime <= MaxRetry:
- try:
- s3_dest_client.upload_part(
- Body=getBody,
- Bucket=DesBucket,
- Key=srcfileKey,
- PartNumber=partnumber,
- UploadId=uploadId,
- ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
- )
- break
- except Exception as err:
- retryTime += 1
- logger.warning(f"UploadThreadFunc - {srcfileKey} - Exception log: {str(err)}")
- logger.warning(f"Upload part fail, retry part: {partnumber} Attempts: {retryTime}")
- if retryTime > MaxRetry:
- logger.error(f"Quit for Max Upload retries: {retryTime}")
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- time.sleep(5 * retryTime) # 递增延迟重试
- complete_list.append(partnumber)
- pload_time = time.time() - pstart_time
- pload_bytes = len(getBody)
- pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
- if not dryrun:
- print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
- f'- {partnumber}/{total} \033[0;34;1m{len(complete_list) / total:.2%} - {pload_speed}\033[0m')
- return
- # download part from src. ali_oss and upload to dest. s3
- def alioss_download_uploadThread(*, uploadId, partnumber, partStartIndex, srcfileKey, srcfileSize, total, md5list,
- dryrun, complete_list, ChunkSize):
- pstart_time = time.time()
- getBody, chunkdata_md5 = b'', b'' # init
- if ifVerifyMD5 or not dryrun:
- # 下载文件
- if not dryrun:
- print(f"\033[0;33;1m--->Downloading\033[0m {srcfileKey} - {partnumber}/{total}")
- else:
- print(f"\033[0;33;40m--->Downloading for verify MD5\033[0m {srcfileKey} - {partnumber}/{total}")
- retryTime = 0
- while retryTime <= MaxRetry:
- try:
- partEndIndex = partStartIndex + ChunkSize - 1
- if partEndIndex >= srcfileSize:
- partEndIndex = srcfileSize - 1
- # Ali OSS 如果range结尾超出范围会变成从头开始下载全部(什么脑子?),所以必须人工修改为FileSize-1
- # 而S3或本地硬盘超出范围只会把结尾指针改为最后一个字节
- response_get_object = ali_bucket.get_object(
- key=srcfileKey,
- byte_range=(partStartIndex, partEndIndex)
- )
- getBody = b''
- for chunk in response_get_object:
- if chunk != '':
- getBody += chunk
- chunkdata_md5 = hashlib.md5(getBody)
- md5list[partnumber - 1] = chunkdata_md5
- break
- except Exception as err:
- retryTime += 1
- logger.warning(f"DownloadThreadFunc - {srcfileKey} - Exception log: {str(err)}")
- logger.warning(f"Download part fail, retry part: {partnumber} Attempts: {retryTime}")
- if retryTime > MaxRetry:
- logger.error(f"Quit for Max Download retries: {retryTime}")
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- time.sleep(5 * retryTime) # 递增延迟重试
- if not dryrun:
- # 上传文件
- print(f'\033[0;32;1m --->Uploading\033[0m {srcfileKey} - {partnumber}/{total}')
- retryTime = 0
- while retryTime <= MaxRetry:
- try:
- s3_dest_client.upload_part(
- Body=getBody,
- Bucket=DesBucket,
- Key=srcfileKey,
- PartNumber=partnumber,
- UploadId=uploadId,
- ContentMD5=base64.b64encode(chunkdata_md5.digest()).decode('utf-8')
- )
- break
- except Exception as err:
- retryTime += 1
- logger.warning(f"UploadThreadFunc - {srcfileKey} - Exception log: {str(err)}")
- logger.warning(f"Upload part fail, retry part: {partnumber} Attempts: {retryTime}")
- if retryTime > MaxRetry:
- logger.error(f"Quit for Max Download retries: {retryTime}")
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- time.sleep(5 * retryTime) # 递增延迟重试
- complete_list.append(partnumber)
- pload_time = time.time() - pstart_time
- pload_bytes = len(getBody)
- pload_speed = size_to_str(int(pload_bytes / pload_time)) + "/s"
- if not dryrun:
- print(f'\033[0;34;1m --->Complete\033[0m {srcfileKey} '
- f'- {partnumber}/{total} \033[0;34;1m{len(complete_list) / total:.2%} - {pload_speed}\033[0m')
- return
- # Complete multipart upload, get uploadedListParts from S3 and construct completeStructJSON
- def completeUpload(*, reponse_uploadId, srcfileKey, len_indexList):
- # 查询S3的所有Part列表uploadedListParts构建completeStructJSON
- prefix_and_key = srcfileKey
- if JobType == 'LOCAL_TO_S3':
- prefix_and_key = str(PurePosixPath(S3Prefix) / srcfileKey)
- uploadedListPartsClean = []
- PartNumberMarker = 0
- IsTruncated = True
- while IsTruncated:
- response_uploadedList = s3_dest_client.list_parts(
- Bucket=DesBucket,
- Key=prefix_and_key,
- UploadId=reponse_uploadId,
- MaxParts=1000,
- PartNumberMarker=PartNumberMarker
- )
- NextPartNumberMarker = response_uploadedList['NextPartNumberMarker']
- IsTruncated = response_uploadedList['IsTruncated']
- if NextPartNumberMarker > 0:
- for partObject in response_uploadedList["Parts"]:
- ETag = partObject["ETag"]
- PartNumber = partObject["PartNumber"]
- addup = {
- "ETag": ETag,
- "PartNumber": PartNumber
- }
- uploadedListPartsClean.append(addup)
- PartNumberMarker = NextPartNumberMarker
- if len(uploadedListPartsClean) != len_indexList:
- logger.warning(f'Uploaded parts size not match - {srcfileKey}')
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- completeStructJSON = {"Parts": uploadedListPartsClean}
- # S3合并multipart upload任务
- response_complete = s3_dest_client.complete_multipart_upload(
- Bucket=DesBucket,
- Key=prefix_and_key,
- UploadId=reponse_uploadId,
- MultipartUpload=completeStructJSON
- )
- logger.info(f'Complete merge file {srcfileKey}')
- return response_complete
- # Compare local file list and s3 list
- def compare_local_to_s3():
- logger.info('Comparing destination and source ...')
- fileList = get_local_file_list(str_key=True)
- desFilelist = get_s3_file_list(s3_client=s3_dest_client,
- bucket=DesBucket,
- S3Prefix=S3Prefix,
- no_prefix=True)
- deltaList = []
- for source_file in fileList:
- if source_file not in desFilelist:
- deltaList.append(source_file)
- if not deltaList:
- logger.warning('All source files are in destination Bucket/Prefix. Job well done.')
- else:
- logger.warning(f'There are {len(deltaList)} files not in destination or not the same size. List:')
- for delta_file in deltaList:
- logger.warning(str(delta_file))
- return
- # Compare S3 buckets
- def compare_buckets():
- logger.info('Comparing destination and source ...')
- deltaList = []
- desFilelist = get_s3_file_list(s3_client=s3_dest_client,
- bucket=DesBucket,
- S3Prefix=S3Prefix)
- if JobType == 'S3_TO_S3':
- if SrcFileIndex == "*":
- fileList = get_s3_file_list(s3_client=s3_src_client,
- bucket=SrcBucket,
- S3Prefix=S3Prefix)
- else:
- fileList = head_s3_single_file(s3_src_client, SrcBucket)
- elif JobType == 'ALIOSS_TO_S3':
- if SrcFileIndex == "*":
- fileList = get_ali_oss_file_list(ali_bucket)
- else:
- fileList = head_oss_single_file(ali_bucket)
- else:
- return
- for source_file in fileList:
- if source_file not in desFilelist:
- deltaList.append(source_file)
- if not deltaList:
- logger.warning('All source files are in destination Bucket/Prefix. Job well done.')
- else:
- logger.warning(f'There are {len(deltaList)} files not in destination or not the same size. List:')
- for delta_file in deltaList:
- logger.warning(json.dumps(delta_file))
- return
- # Main
- if __name__ == '__main__':
- start_time = datetime.datetime.now()
- ChunkSize_default = set_config()
- logger, log_file_name = set_log()
- # Define s3 client
- s3_config = Config(max_pool_connections=200)
- s3_dest_client = Session(profile_name=DesProfileName).client('s3', config=s3_config)
- # Check destination S3 writable
- try:
- logger.info(f'Checking write permission for: {DesBucket}')
- s3_dest_client.put_object(
- Bucket=DesBucket,
- Key=str(PurePosixPath(S3Prefix) / 'access_test'),
- Body='access_test_content'
- )
- except Exception as e:
- logger.error(f'Can not write to {DesBucket}/{S3Prefix}, {str(e)}')
- input('PRESS ENTER TO QUIT')
- sys.exit(0)
- # 获取源文件列表
- logger.info('Get source file list')
- src_file_list = []
- if JobType == "LOCAL_TO_S3":
- SrcDir = str(Path(SrcDir))
- src_file_list = get_local_file_list()
- elif JobType == "S3_TO_S3":
- s3_src_client = Session(profile_name=SrcProfileName).client('s3', config=s3_config)
- if SrcFileIndex == "*":
- src_file_list = get_s3_file_list(s3_client=s3_src_client,
- bucket=SrcBucket,
- S3Prefix=S3Prefix)
- else:
- src_file_list = head_s3_single_file(s3_src_client, SrcBucket)
- elif JobType == 'ALIOSS_TO_S3':
- import oss2
- ali_bucket = oss2.Bucket(oss2.Auth(ali_access_key_id, ali_access_key_secret), ali_endpoint, ali_SrcBucket)
- if SrcFileIndex == "*":
- src_file_list = get_ali_oss_file_list(ali_bucket)
- else:
- src_file_list = head_oss_single_file(ali_bucket)
- # 获取目标s3现存文件列表
- des_file_list = get_s3_file_list(s3_client=s3_dest_client,
- bucket=DesBucket,
- S3Prefix=S3Prefix)
- # 获取Bucket中所有未完成的Multipart Upload
- multipart_uploaded_list = get_uploaded_list(s3_dest_client)
- # 是否清理所有未完成的Multipart Upload, 用于强制重传
- if multipart_uploaded_list:
- logger.warning(f'{len(multipart_uploaded_list)} Unfinished upload, clean them and restart?')
- logger.warning('NOTICE: IF CLEAN, YOU CANNOT RESUME ANY UNFINISHED UPLOAD')
- if not DontAskMeToClean:
- keyboard_input = input("CLEAN unfinished upload and restart(input CLEAN) or resume loading(press enter)? "
- "Please confirm: (n/CLEAN)")
- else:
- keyboard_input = 'no'
- if keyboard_input == 'CLEAN':
- # 清理所有未完成的Upload
- for clean_i in multipart_uploaded_list:
- s3_dest_client.abort_multipart_upload(
- Bucket=DesBucket,
- Key=clean_i["Key"],
- UploadId=clean_i["UploadId"]
- )
- multipart_uploaded_list = []
- logger.info('CLEAN FINISHED')
- else:
- logger.info('You choose not to clean, now try to resume unfinished upload')
- # 对文件列表中的逐个文件进行上传操作
- with futures.ThreadPoolExecutor(max_workers=MaxParallelFile) as file_pool:
- for src_file in src_file_list:
- file_pool.submit(upload_file,
- srcfile=src_file,
- desFilelist=des_file_list,
- UploadIdList=multipart_uploaded_list,
- ChunkSize_default=ChunkSize_default)
- # 再次获取源文件列表和目标文件夹现存文件列表进行比较,每个文件大小一致,输出比较结果
- time_str = str(datetime.datetime.now() - start_time)
- if JobType == 'S3_TO_S3':
- str_from = f'{SrcBucket}/{S3Prefix}'
- compare_buckets()
- elif JobType == 'ALIOSS_TO_S3':
- str_from = f'{ali_SrcBucket}/{S3Prefix}'
- compare_buckets()
- elif JobType == 'LOCAL_TO_S3':
- str_from = f'{SrcDir}'
- compare_local_to_s3()
- else:
- str_from = ""
- print(f'\033[0;34;1mMISSION ACCOMPLISHED - Time: {time_str} \033[0m - FROM: {str_from} TO {DesBucket}/{S3Prefix}')
- print('Logged to file:', os.path.abspath(log_file_name))
- input('PRESS ENTER TO QUIT')
|