app.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. # -*- coding: utf-8 -*-
  2. """Monitor folder and upload media files to AWS S3
  3. It will get monitored folder from environment or from command line
  4. monitored_folder/ : media files
  5. monitored_folder/done/ : all the files succesfully uploaded
  6. monitored_folder/error/: orphaned JSON files
  7. monitored_folder/logs/ : log files
  8. monitored_folder/config/config.json : config file
  9. """
  10. from __future__ import print_function
  11. import sys
  12. from os import path, getenv, listdir, rename
  13. import time
  14. import json
  15. import logging
  16. import logging.config
  17. import threading
  18. import StringIO
  19. import boto3
  20. import botocore
  21. import random
  22. CONFIG_FOLDER = "config"
  23. CONFIG_FILENAME = "config.json"
  24. ERROR_FOLDER = "error"
  25. DONE_FOLDER = "done"
  26. MONITORING_DELAY_DEFAULT = 5
  27. FILE_MOVE_RETRIES = 3
  28. FILE_MOVE_DELAY = 5
  29. class ProgressPercentage(object):
  30. def __init__(self, filename):
  31. self._filename = filename
  32. self._size = path.getsize(filename)
  33. self._seen_so_far = 0
  34. self._lock = threading.Lock()
  35. def __call__(self, bytes_amount):
  36. # To simplify we'll assume this is hooked up
  37. # to a single filename.
  38. with self._lock:
  39. log = logging.getLogger(__name__)
  40. self._seen_so_far += bytes_amount
  41. percentage = (float(self._seen_so_far) / float(self._size)) * 100
  42. log.debug("Upload %s: %s / %s (%.2f%%)", self._filename, self._seen_so_far, self._size, percentage)
  43. # sys.stdout.write(
  44. # "\\r%s %s / %s (%.2f%%)" % (
  45. # self._filename, self._seen_so_far, self._size,
  46. # percentage))
  47. # sys.stdout.flush()
  48. def safe_move(source, destination):
  49. """Move a source file to destination.
  50. Check and catch all errors.
  51. Args:
  52. source: source file
  53. destination: destination to move file to
  54. Returns:
  55. N/A
  56. """
  57. log = logging.getLogger(__name__)
  58. retries_left = FILE_MOVE_RETRIES
  59. while retries_left:
  60. retries_left -= 1
  61. try:
  62. rename(source, destination)
  63. except Exception as error:
  64. log.error("Error moving file %s to %s: %s", source, destination, error)
  65. return
  66. # check if file really moved
  67. if path.isfile(source):
  68. log.error("File %s NOT moved to %s, despite no error reported, retrying", source, destination, error)
  69. time.sleep(FILE_MOVE_DELAY)
  70. else:
  71. return
  72. log.error("File %s NOT moved to %s, despite no error reported, after %s retries", source, destination, error, FILE_MOVE_RETRIES)
  73. return
  74. def process_json_files(json_file_list, monitored_folder, config):
  75. """Process a list of files names.
  76. Args:
  77. json_file_list: list of absolute file names
  78. monitored_folder: folder to monitor
  79. Returns:
  80. N/A
  81. """
  82. log = logging.getLogger(__name__)
  83. try:
  84. s3 = boto3.resource(
  85. 's3',
  86. region_name=config["region"],
  87. aws_access_key_id=config["aws_key"],
  88. aws_secret_access_key=config["aws_secret"],
  89. )
  90. bucket = s3.Bucket(config["bucket"])
  91. except Exception as error:
  92. log.error("Unable connect to S3. Found error: %s", error)
  93. return
  94. # process all files
  95. for json_file_name in json_file_list:
  96. log.debug("Process file %s", json_file_name)
  97. # read json file
  98. try:
  99. json_file_path = path.abspath(path.join(monitored_folder, json_file_name))
  100. with open(json_file_path) as data_file:
  101. json_file = json.load(data_file)
  102. except Exception as error:
  103. log.error("Unable to read JSON file %s. Found error: %s", json_file_path, error)
  104. continue
  105. log.debug("Read file %s", json_file_name)
  106. # prepare metadata
  107. try:
  108. metadata = {}
  109. config_metadata = config["metadata"]
  110. for metadata_key, metadata_name in config_metadata.iteritems():
  111. if json_file.get(metadata_name, None):
  112. metadata[metadata_key] = json_file[metadata_name]
  113. except Exception as error:
  114. log.error("Unable to prepare metadata for JSON file %s. Found error: %s", json_file_path, error)
  115. log.debug("Prepared metadata %s", metadata)
  116. # prepare media file name/path
  117. media_file_name = json_file["filename"] + json_file["extension"]
  118. media_file_path = path.abspath(path.join(monitored_folder, media_file_name))
  119. # check if media file exists
  120. if not path.isfile(media_file_path):
  121. log.error("Media file %s not found.", media_file_path)
  122. safe_move(json_file_path, path.abspath(path.join(monitored_folder, ERROR_FOLDER, json_file_name)))
  123. continue
  124. log.debug("Checked media file %s exists", media_file_path)
  125. # prepare upload arguments
  126. upload_kwargs = {}
  127. try:
  128. upload_kwargs["ACL"] = config["s3-acl"]
  129. # on AWS "/" are not allowed in bucket name, any path must be in key name
  130. if config.get("bucket-path", None) is not None:
  131. bucket_key = config["bucket-path"]+media_file_name
  132. else:
  133. bucket_key = media_file_name
  134. upload_kwargs["Metadata"] = metadata
  135. except KeyError as error:
  136. # ignore missing fields
  137. pass
  138. log.debug("Prepared upload arguments %s", upload_kwargs)
  139. # check if media file exist in bucket
  140. log.debug("Check if media file %s exist in bucket", media_file_name)
  141. media_exists_in_bucket = True
  142. try:
  143. s3.Object(config["bucket"], bucket_key).load()
  144. except botocore.exceptions.ClientError as error:
  145. if error.response['Error']['Code'] == "404":
  146. # does not exist, all ok
  147. log.debug("Media file %s does not exists in bucket %s", bucket_key, config["bucket"])
  148. media_exists_in_bucket = False
  149. pass
  150. else:
  151. # something went wrong with S3,return to future retry
  152. log.error("Error checking if media file %s exists in S3. Found error code %s, message: %s.",
  153. media_file_path,
  154. error.response['Error']['Code'],
  155. error.response['Error']['Message'])
  156. return
  157. if media_exists_in_bucket:
  158. # key_exists, move to error
  159. log.error("Media file %s already exists.", media_file_name)
  160. safe_move(json_file_path, path.abspath(path.join(monitored_folder, ERROR_FOLDER, json_file_name)))
  161. safe_move(media_file_path, path.abspath(path.join(monitored_folder, ERROR_FOLDER, media_file_name)))
  162. continue
  163. # upload media to s3
  164. try:
  165. # http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.upload_file
  166. # Similar behavior as S3Transfer's upload_file() method, except that parameters are capitalized.
  167. s3.meta.client.upload_file(
  168. media_file_path,
  169. config["bucket"],
  170. bucket_key,
  171. Callback=ProgressPercentage(media_file_path),
  172. ExtraArgs=upload_kwargs,
  173. )
  174. log.debug("Media file %s uploaded successfully to key %s.", media_file_path, bucket_key)
  175. except botocore.exceptions.ClientError as error:
  176. # something went wrong with S3,return to future retry
  177. log.error("Error uploading media file %s to S3. Found error code %s, message: %s.",
  178. media_file_path,
  179. error.response['Error']['Code'],
  180. error.response['Error']['Message'])
  181. return
  182. except boto3.exceptions.S3UploadFailedError as error:
  183. log.error("Unable to upload file %s. Found error: %s", media_file_path, error)
  184. return
  185. log.debug("Uploaded media file %s", media_file_path)
  186. # clean-up
  187. safe_move(json_file_path, path.abspath(path.join(monitored_folder, DONE_FOLDER, json_file_name)))
  188. safe_move(media_file_path, path.abspath(path.join(monitored_folder, DONE_FOLDER, media_file_name)))
  189. log.debug("Clean-up done")
  190. def mononitor_folder(monitored_folder, config):
  191. """Monitor folder.
  192. Args:
  193. monitored_folder: folder to monitor
  194. config: config dictionary
  195. Returns:
  196. N/A
  197. """
  198. log = logging.getLogger(__name__)
  199. monitored_folder = path.abspath(monitored_folder)
  200. log.debug("Monitoring %s", monitored_folder)
  201. monitoring_delay = config.get("monitoring-delay", MONITORING_DELAY_DEFAULT)
  202. while True:
  203. # get file list
  204. try:
  205. file_list = [name for name in listdir(monitored_folder) if name.endswith("." + config["watch-extension"])]
  206. except Exception as error:
  207. log.error("Unable to get JSON file list. Will retry in %s seconds. Found error: %s", monitoring_delay, error)
  208. time.sleep(monitoring_delay)
  209. continue
  210. random.shuffle(file_list)
  211. log.debug("Got file list %s", file_list)
  212. process_json_files(file_list, monitored_folder, config)
  213. time.sleep(monitoring_delay)
  214. ################################################################################
  215. def main():
  216. """Main script routine
  217. Perform initializations:
  218. - read config file
  219. - init logging
  220. - start monitoring loop
  221. """
  222. # get monitored folder from enviroment variable
  223. if getenv('MONITORED_FOLDER', None) is None:
  224. print("Environment variable MONITORED_FOLDER is not defined", file=sys.stderr)
  225. return
  226. monitored_folder = getenv('MONITORED_FOLDER')
  227. # read config file
  228. try:
  229. config_path = path.abspath(path.join(monitored_folder, CONFIG_FOLDER, CONFIG_FILENAME))
  230. with open(config_path) as data_file:
  231. config = json.load(data_file)
  232. except:
  233. print("Unable to read config file %s" % CONFIG_FILENAME, file=sys.stderr)
  234. raise
  235. # initialize logging
  236. try:
  237. logging_config = config["log-config"]
  238. # set up logging handlers filenames to absolute
  239. for handler in [ "file_all", "file_error" ]:
  240. log_file_name = logging_config["handlers"][handler]["filename"]
  241. log_file_path = path.abspath(path.join(monitored_folder, log_file_name))
  242. logging_config["handlers"][handler]["filename"] = log_file_path
  243. logging.config.dictConfig(logging_config)
  244. log = logging.getLogger(__name__)
  245. log.info("Logging successfully initialized")
  246. except:
  247. print("Unable to initialize logging", file=sys.stderr)
  248. raise
  249. # check AWS credentials:
  250. try:
  251. log.debug("Checking AWS credentials")
  252. s3 = boto3.resource(
  253. 's3',
  254. region_name=config["region"],
  255. aws_access_key_id=config["aws_key"],
  256. aws_secret_access_key=config["aws_secret"],
  257. )
  258. bucket = s3.Bucket(config["bucket"])
  259. data = StringIO.StringIO("Test content")
  260. bucket.upload_fileobj(data, 'credentials_test_object')
  261. response = bucket.delete_objects(
  262. Delete={
  263. 'Objects': [
  264. {
  265. 'Key': 'credentials_test_object',
  266. },
  267. ],
  268. 'Quiet': True
  269. },
  270. )
  271. log.debug("AWS credentials ok")
  272. except Exception as error:
  273. log.error("Unable connect to S3. Found error: %s", error)
  274. return
  275. # start monitoring folder
  276. mononitor_folder(monitored_folder, config)
  277. if __name__ == '__main__':
  278. main()