s3upload.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import os
  2. import argparse
  3. import time
  4. import boto3
  5. from dotenv import load_dotenv
  6. from botocore.exceptions import ClientError
  7. from queue import Queue
  8. from threading import Thread, Event
  9. class S3UploaderException(Exception):
  10. pass
  11. def get_client(region):
  12. return boto3.client(
  13. 's3',
  14. aws_access_key_id=os.getenv('AWS_KEY'),
  15. aws_secret_access_key=os.getenv('AWS_SECRET'),
  16. region_name=region
  17. )
  18. def get_queue(directory: str, base_path):
  19. queue = Queue()
  20. for dir_path, dir_names, filenames in os.walk(directory):
  21. for filename in filenames:
  22. object_key = os.path.join(
  23. dir_path.replace(directory, base_path, 1) if base_path else dir_path,
  24. filename
  25. ).replace(os.sep, '/')
  26. filepath = os.path.join(dir_path, filename)
  27. queue.put((filepath, object_key))
  28. print('discovered {} files'.format(queue.qsize()), end='\r')
  29. return queue
  30. def put_to_s3(run_event: Event, client, queue: Queue, bucket, acl, remove_files):
  31. while not queue.empty() and run_event.is_set():
  32. filepath, object_key = queue.get()
  33. try:
  34. client.upload_file(
  35. filepath, bucket, object_key,
  36. ExtraArgs={'ACL': acl}
  37. )
  38. except ClientError as e:
  39. print('Error occurred while uploading: {}'.format(str(e)))
  40. continue
  41. if remove_files:
  42. os.remove(filepath)
  43. print('uploaded: {}\nkey: {}\n{}\n'.format(
  44. filepath,
  45. object_key,
  46. 'removed: {}'.format(filepath) if remove_files else ''
  47. ))
  48. def generate_threads(
  49. run_event: Event,
  50. directory: str,
  51. bucket,
  52. region,
  53. acl,
  54. remove_files,
  55. base_path,
  56. thread_no
  57. ):
  58. client = get_client(region)
  59. queue = get_queue(directory, base_path)
  60. threads = []
  61. for i in range(thread_no):
  62. threads.append(Thread(
  63. target=put_to_s3,
  64. args=(run_event, client, queue, bucket, acl, remove_files)
  65. ))
  66. return threads
  67. def start_threads(threads):
  68. for thread in threads:
  69. thread.start()
  70. def has_live_threads(threads):
  71. return True in [t.is_alive() for t in threads]
  72. def main():
  73. start_time = time.time()
  74. parser = argparse.ArgumentParser()
  75. run_event = Event()
  76. run_event.set()
  77. parser.add_argument('directory', help='Directory to upload')
  78. parser.add_argument('bucket', help='AWS S3 bucket name')
  79. parser.add_argument('region', help='AWS S3 region')
  80. parser.add_argument('--env_file', help='Env file with AWS_KEY and AWS_SECRET', default='.env')
  81. parser.add_argument('--acl', help='ACL Policy to be applied', default='public-read')
  82. parser.add_argument('--base_path', help='Base path name for object key')
  83. parser.add_argument('--remove_files', action='store_true', help='Delete files after uploading', default=False)
  84. parser.add_argument('--threads', help="No. of threads", default=5, type=int)
  85. args = parser.parse_args()
  86. try:
  87. if not os.path.isdir(args.directory):
  88. raise S3UploaderException('Directory \'{}\'does not exists'.format(args.directory))
  89. if not os.path.isfile(args.env_file):
  90. raise S3UploaderException('Env file {} does not exists'.format(args.env_file))
  91. if args.threads < 1:
  92. raise S3UploaderException('At least one thread is required')
  93. load_dotenv(args.env_file)
  94. threads = generate_threads(
  95. run_event,
  96. args.directory,
  97. args.bucket,
  98. args.region,
  99. args.acl,
  100. args.remove_files,
  101. args.base_path,
  102. args.threads
  103. )
  104. start_threads(threads)
  105. while has_live_threads(threads):
  106. try:
  107. [t.join(1) for t in threads
  108. if t is not None and t.is_alive()]
  109. except KeyboardInterrupt:
  110. print('Please wait! gracefully stopping...')
  111. run_event.clear()
  112. except S3UploaderException as e:
  113. print('Error: ', str(e))
  114. print("--- %s seconds ---" % (time.time() - start_time))
  115. if __name__ == "__main__":
  116. main()