queue2blob.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import argparse
  2. import os
  3. import sys
  4. import boto3
  5. import json
  6. import time
  7. import logging
  8. import gc
  9. from azure.storage.blob import BlockBlobService
  10. WAIT_TIME = 60
  11. blob_service = ""
  12. debug = False
  13. # setup logging
  14. FORMAT = '%(asctime)-15s [%(levelname)s] %(message)s'
  15. logging.basicConfig(format=FORMAT, stream=sys.stdout, level=logging.INFO)
  16. logging.getLogger("botocore").setLevel(logging.CRITICAL)
  17. logging.getLogger("boto3").setLevel(logging.CRITICAL)
  18. logger = logging.getLogger(__name__)
  19. def parse_config_file(config_file):
  20. global logger
  21. if not os.path.exists(config_file):
  22. s3_client = boto3.client('s3')
  23. local_path = "/tmp/config.json"
  24. s3bucket, s3config = config_file.split('/')
  25. s3_client.download_file(s3bucket, s3config, local_path)
  26. logger.info('Loaded configuration from S3')
  27. profile = "None"
  28. else:
  29. local_path = config_file
  30. print ("LOCAL PATH: " + local_path)
  31. profile = "dev2"
  32. logger.info('Loaded configuration from local file system')
  33. with open(local_path) as stream:
  34. config = {}
  35. try:
  36. config = json.load(stream)
  37. except ValueError as e:
  38. print(e)
  39. sys.exit(1)
  40. if os.environ.get('STORAGE_KEY'):
  41. key = os.environ.get('STORAGE_KEY')
  42. else:
  43. key = config['STORAGE_KEY']
  44. queue = config['QUEUE']
  45. region = config['REGION']
  46. s3_region = config['S3REGION']
  47. storage_account = config['STORAGE_ACCOUNT']
  48. container = config['CONTAINER']
  49. logger.debug('Values : %s,%s,%s,%s,%s', queue, region, storage_account, container, key)
  50. return queue, region, s3_region, profile, storage_account, container, key
  51. def init_blob_service(storage_account, region, profile, key):
  52. global blob_service
  53. global logger
  54. try:
  55. blob_service = BlockBlobService(account_name=storage_account, account_key=key)
  56. except Exception as e:
  57. logger.error("ERROR: Could connect to Azure Storage: %s", str(e))
  58. return False
  59. return True
  60. def upload_to_azure(source_files, container_name):
  61. global blob_service
  62. global logger
  63. for fileobject in source_files:
  64. logger.info('Uploading Fileobject: %s Sourceurl: %s', fileobject, source_files[fileobject])
  65. try:
  66. copy = blob_service.copy_blob(container_name, fileobject, source_files[fileobject])
  67. except Exception as e:
  68. # does not exist
  69. logger.error('Could not upload to container: %s', str(e))
  70. return False
  71. count = 0
  72. copy = blob_service.get_blob_properties(container_name, fileobject).properties.copy
  73. while copy.status != 'success':
  74. count = count + 1
  75. if count > 50:
  76. print('ERROR: Timed out waiting for async copy to complete.')
  77. return False
  78. logger.info('Copying to blob storage: %s', fileobject)
  79. time.sleep(5)
  80. copy = blob_service.get_blob_properties(container_name, fileobject).properties.copy
  81. logger.info('Upload complete: %s', fileobject)
  82. return True
  83. def poll_queue(queue_name, region, s3_region, profile, container):
  84. global debug
  85. global logger
  86. try:
  87. if profile != "None":
  88. session = boto3.session.Session(profile_name=profile, region_name=region)
  89. else:
  90. session = boto3.session.Session(region_name=region)
  91. sqs = session.resource('sqs')
  92. except Exception as e:
  93. logger.error("Could not connect to AWS: %s", str(e))
  94. return False
  95. try:
  96. queue = sqs.get_queue_by_name(QueueName=queue_name)
  97. except Exception as e:
  98. logger.error("Could not load queue %s: %s", queue_name, str(e))
  99. return False
  100. # Process messages
  101. max_queue_messages = 10
  102. success = True
  103. while True:
  104. messages_to_delete = []
  105. for message in queue.receive_messages(MaxNumberOfMessages=max_queue_messages):
  106. # process message body
  107. try:
  108. body = json.loads(message.body)
  109. except Exception as e:
  110. logger.error("Could not load messaged body: %s", str(e))
  111. return False
  112. to_upload = {}
  113. if len(body) > 0:
  114. logger.debug('Message found')
  115. try:
  116. logger.debug('Region: %s Bucket: %s File: %s ', body['Records'][0]['awsRegion'], body['Records'][0]['s3']['bucket']['name'], body['Records'][0]['s3']['object']['key'])
  117. file_object = body['Records'][0]['s3']['object']['key']
  118. except KeyError as e:
  119. logger.info('Found non s3 upload message, removing')
  120. messages_to_delete.append({
  121. 'Id': message.message_id,
  122. 'ReceiptHandle': message.receipt_handle
  123. })
  124. except IndexError as e:
  125. logger.error('ERROR: IndexError received: %s', str(e))
  126. return False
  127. else:
  128. message_url = "https://" + s3_region + "/" + body['Records'][0]['s3']['bucket']['name'] + "/" + file_object
  129. to_upload.update({file_object: message_url})
  130. upload_result = upload_to_azure(to_upload, container)
  131. # add message to delete
  132. if upload_result:
  133. messages_to_delete.append({
  134. 'Id': message.message_id,
  135. 'ReceiptHandle': message.receipt_handle
  136. })
  137. else:
  138. # exception occured on upload
  139. success = False
  140. if len(messages_to_delete) == 0:
  141. break
  142. # delete messages to remove them from SQS queue
  143. # handle any errors
  144. else:
  145. try:
  146. queue.delete_messages(Entries=messages_to_delete)
  147. except Exception as e:
  148. logger.error("Could not delete messages from queue: %s", str(e))
  149. return False
  150. return success
  151. if __name__ == "__main__":
  152. parser = argparse.ArgumentParser(description='queue2blob.py')
  153. parser.add_argument("--config-file", help="config file containing required config", dest="config_file")
  154. parser.add_argument("--queue", help="Queue name", dest="queue_name")
  155. parser.add_argument("--region", help="AWS Region that the Queue is in", dest="region")
  156. parser.add_argument("--s3region", help="The region prefix for s3 downloads", dest="s3_region")
  157. parser.add_argument("--profile", help="The name of an aws cli profile to use.", dest='profile', required=False)
  158. parser.add_argument("--storage", help="The name of storage account to use.", dest='storage_account', required=False)
  159. parser.add_argument("--key", help="The key for the storage account", dest='storage_key', required=False)
  160. parser.add_argument("--container", help="The container for the blob.", dest='container', required=False)
  161. parser.add_argument("--debug", help="Set debug flag", action='store_true', dest='debug', required=False)
  162. args = parser.parse_args()
  163. if args.debug:
  164. logger.setLevel(logging.DEBUG)
  165. if args.config_file:
  166. queue_name, region, s3_region, profile, storage_account, container, key = parse_config_file(args.config_file)
  167. elif os.environ.get('CONFIG_FILE') != None:
  168. queue_name, region, s3_region, profile, storage_account, container, key = parse_config_file(os.environ.get('CONFIG_FILE'))
  169. else:
  170. queue_name = args.queue_name
  171. region = args.region
  172. profile = args.profile
  173. storage_account = args.storage_account
  174. s3_region = args.s3_region
  175. container = args.container
  176. if args.storage_key:
  177. key = args.storage_key
  178. if not init_blob_service(storage_account, region, profile, key):
  179. sys.exit(1)
  180. while True:
  181. logger.info('Starting run')
  182. result = poll_queue(queue_name, region, s3_region, profile, container)
  183. if result:
  184. logger.info('Completed run')
  185. else:
  186. logger.info('No messages processed , completed run')
  187. del result
  188. gc.collect()
  189. time.sleep(WAIT_TIME)