123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- import os
- import argparse
- import time
- import boto3
- from dotenv import load_dotenv
- from botocore.exceptions import ClientError
- from queue import Queue
- from threading import Thread, Event
- class S3UploaderException(Exception):
- pass
- def get_client(region):
- return boto3.client(
- 's3',
- aws_access_key_id=os.getenv('AWS_KEY'),
- aws_secret_access_key=os.getenv('AWS_SECRET'),
- region_name=region
- )
- def get_queue(directory: str, base_path):
- queue = Queue()
- for dir_path, dir_names, filenames in os.walk(directory):
- for filename in filenames:
- object_key = os.path.join(
- dir_path.replace(directory, base_path, 1) if base_path else dir_path,
- filename
- ).replace(os.sep, '/')
- filepath = os.path.join(dir_path, filename)
- queue.put((filepath, object_key))
- print('discovered {} files'.format(queue.qsize()), end='\r')
- return queue
- def put_to_s3(run_event: Event, client, queue: Queue, bucket, acl, remove_files):
- while not queue.empty() and run_event.is_set():
- filepath, object_key = queue.get()
- try:
- client.upload_file(
- filepath, bucket, object_key,
- ExtraArgs={'ACL': acl}
- )
- except ClientError as e:
- print('Error occurred while uploading: {}'.format(str(e)))
- continue
- if remove_files:
- os.remove(filepath)
- print('uploaded: {}\nkey: {}\n{}\n'.format(
- filepath,
- object_key,
- 'removed: {}'.format(filepath) if remove_files else ''
- ))
- def generate_threads(
- run_event: Event,
- directory: str,
- bucket,
- region,
- acl,
- remove_files,
- base_path,
- thread_no
- ):
- client = get_client(region)
- queue = get_queue(directory, base_path)
- threads = []
- for i in range(thread_no):
- threads.append(Thread(
- target=put_to_s3,
- args=(run_event, client, queue, bucket, acl, remove_files)
- ))
- return threads
- def start_threads(threads):
- for thread in threads:
- thread.start()
- def has_live_threads(threads):
- return True in [t.is_alive() for t in threads]
- def main():
- start_time = time.time()
- parser = argparse.ArgumentParser()
- run_event = Event()
- run_event.set()
- parser.add_argument('directory', help='Directory to upload')
- parser.add_argument('bucket', help='AWS S3 bucket name')
- parser.add_argument('region', help='AWS S3 region')
- parser.add_argument('--env_file', help='Env file with AWS_KEY and AWS_SECRET', default='.env')
- parser.add_argument('--acl', help='ACL Policy to be applied', default='public-read')
- parser.add_argument('--base_path', help='Base path name for object key')
- parser.add_argument('--remove_files', action='store_true', help='Delete files after uploading', default=False)
- parser.add_argument('--threads', help="No. of threads", default=5, type=int)
- args = parser.parse_args()
- try:
- if not os.path.isdir(args.directory):
- raise S3UploaderException('Directory \'{}\'does not exists'.format(args.directory))
- if not os.path.isfile(args.env_file):
- raise S3UploaderException('Env file {} does not exists'.format(args.env_file))
- if args.threads < 1:
- raise S3UploaderException('At least one thread is required')
- load_dotenv(args.env_file)
- threads = generate_threads(
- run_event,
- args.directory,
- args.bucket,
- args.region,
- args.acl,
- args.remove_files,
- args.base_path,
- args.threads
- )
- start_threads(threads)
- while has_live_threads(threads):
- try:
- [t.join(1) for t in threads
- if t is not None and t.is_alive()]
- except KeyboardInterrupt:
- print('Please wait! gracefully stopping...')
- run_event.clear()
- except S3UploaderException as e:
- print('Error: ', str(e))
- print("--- %s seconds ---" % (time.time() - start_time))
- if __name__ == "__main__":
- main()
|