s3bundler.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. ##
  2. ## Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. ##
  4. ## Licensed under the Amazon Software License (the "License"). You may not use this
  5. ## file except in compliance with the License. A copy of the License is located at
  6. ##
  7. ## http://aws.amazon.com/asl/
  8. ##
  9. ## or in the "license" file accompanying this file. This file is distributed on an
  10. ## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied.
  11. ## See the License for the specific language governing permissions and limitations
  12. ## under the License.
  13. ##
  14. import boto3
  15. import botocore
  16. import tarfile
  17. import csv
  18. import tempfile
  19. from io import BytesIO,StringIO,TextIOWrapper
  20. import hashlib
  21. import json
  22. import sys
  23. import atexit
  24. import argparse
  25. from urllib.parse import urlparse, unquote_plus
  26. import os.path
  27. import logging
  28. import threading
  29. logger = logging.getLogger(__name__)
  30. logger.addHandler(logging.StreamHandler())
  31. logger.setLevel(logging.ERROR)
  32. class Heartbeat(threading.Timer):
  33. "Run a function on a timer repeatedly until canceled"
  34. def run(self):
  35. while not self.finished.is_set():
  36. logger.info("sending heartbeat")
  37. self.function(*self.args, **self.kwargs)
  38. self.finished.wait(self.interval)
  39. class Manifest(object):
  40. "Downloads and interprets manifests and creates an iterable object for processing S3 objects."
  41. def __init__(self, bucket, key, message=None, fieldnames=['Bucket', 'Key', 'Size']):
  42. self.fieldnames = fieldnames
  43. self.s3 = boto3.client('s3')
  44. self.message = message
  45. if message is not None:
  46. hb = Heartbeat(int(message.Queue().attributes.get('VisibilityTimeout', 30)) - 10,
  47. message.change_visibility,
  48. kwargs={'VisibilityTimeout': int(message.Queue().attributes.get('VisibilityTimeout', 30))})
  49. atexit.register(hb.cancel)
  50. hb.start()
  51. self.bucket = bucket
  52. self.key = key
  53. if key[-1:].isdigit():
  54. self.name = os.path.basename(key)
  55. else:
  56. self.name = os.path.splitext(os.path.basename(key))[0]
  57. self.manifestcsv = tempfile.TemporaryFile()
  58. try:
  59. self.s3.download_fileobj(self.bucket, self.key, self.manifestcsv)
  60. except botocore.exceptions.ClientError as e:
  61. logger.error("ERROR: Failed to download manifest: s3://{}/{}".format(
  62. self.bucket, self.key))
  63. logger.debug("Exception: %s", e, exc_info=True)
  64. sys.exit(5)
  65. self.manifestcsv.seek(0)
  66. TextIOmanifestcsv = TextIOWrapper(self.manifestcsv)
  67. try:
  68. self.manifestreader = csv.DictReader(TextIOmanifestcsv, fieldnames=fieldnames)
  69. except csv.Error as e:
  70. logger.error("ERROR: Failed to open manifest: s3://{}/{}".format(
  71. self.bucket, self.key))
  72. logger.debug("Exception: %s", e, exc_info=True)
  73. sys.exit(3)
  74. firstline = self.manifestreader.__next__()
  75. self.sourcebucket = firstline['Bucket']
  76. TextIOmanifestcsv.seek(0)
  77. logger.error("begin processing s3://{}/{}".format(self.bucket, self.key))
  78. def __iter__(self):
  79. return self
  80. def __next__(self):
  81. return self.manifestreader.__next__()
  82. def __del__(self):
  83. try:
  84. hb.cancel()
  85. logger.debug("cancelling heartbeat")
  86. except (NameError, AttributeError):
  87. """not using SQS, so there is no heartbeat thread"""
  88. pass
  89. def success(self):
  90. "Cleanup after successfully processing a manifest"
  91. try:
  92. hb.cancel()
  93. except (NameError, AttributeError):
  94. """not using SQS, so there is no heartbeat thread"""
  95. pass
  96. try:
  97. self.message.delete()
  98. except AttributeError:
  99. """not using SQS, no need to delete message from queue"""
  100. pass
  101. logger.error("successfully archived s3://{}/{}".format(self.bucket, self.key))
  102. self.manifestcsv.close()
  103. class Index(tempfile.SpooledTemporaryFile):
  104. "Helper to handle creation of indices"
  105. def __init__(self, name, s3, bucket, key):
  106. tempfile.SpooledTemporaryFile.__init__(self, max_size=8*1024*1024)
  107. self.indexname = name
  108. self.s3 = s3
  109. self.bucket = bucket
  110. self.key = key
  111. def line(self, obj, o):
  112. if o.get('TagCount', 0) >= 1:
  113. logger.info("get tags s3://{}/{}".format(obj['Bucket'],obj['Key']))
  114. tags = self.s3.get_object_tagging(Bucket=obj['Bucket'],
  115. Key=obj['Key'])['TagSet']
  116. tagset = ';'.join(("Key={};Value={}".format(t['Key'],t['Value']) for t in tags))
  117. logger.info("Tags: {}".format(tagset))
  118. else:
  119. tagset = ''
  120. metadata = ';'.join(("{}={}".format(k,v) for k,v in o.get('Metadata', {}).items()))
  121. if o.get("PartsCount", 1) == 1:
  122. checksum = o.get('ETag', "")
  123. else:
  124. logger.info("calculate checksum s3://{}/{}".format(obj['Bucket'],obj['Key']))
  125. # calculate md5sum of o['Body']
  126. # needs to be in separate thread
  127. try:
  128. m = hashlib.md5()
  129. for chunk in iter(lambda: bodyobj.read(4096), b''):
  130. m.update(chunk)
  131. checksum = m.digest()
  132. except Exception as e:
  133. logger.error("ERROR: Failed to calculate checksum. continuing. s3://{}/{}".format(
  134. obj['Bucket'], obj['Key']))
  135. logger.debug("Exception: %s", e, exc_info=True)
  136. checksum=""
  137. try:
  138. self.write(','.join((
  139. obj['Bucket'],
  140. obj['Key'],
  141. str(o['ContentLength']),
  142. str(o['LastModified']),
  143. str(checksum),
  144. metadata,
  145. tagset)).encode() + "\n".encode())
  146. except Exception as e:
  147. logger.error("ERROR: Failed to add metadata to index. s3://{}/{}".format(obj['Bucket'], obj['Key']))
  148. logger.debug("Exception: %s", e, exc_info=True)
  149. sys.exit(2)
  150. def push(self):
  151. self.seek(0)
  152. try:
  153. logger.info("uploading {} to s3://{}/{}".format(self.indexname,self.bucket,
  154. self.key))
  155. self.s3.upload_fileobj(self, self.bucket, self.key)
  156. except botocore.exceptions.ClientError as e:
  157. logger.error("ERROR: Failed to upload index: s3://{}/{}".format(self.bucket,
  158. self.key))
  159. logger.debug("Exception: %s", e, exc_info=True)
  160. sys.exit(4)
  161. class DLQ(Index):
  162. "The dead letter queue index needs a reason instead of metadata"
  163. def line(self, obj, reason):
  164. try:
  165. self.write(','.join((
  166. obj['Bucket'],
  167. obj['Key'],
  168. obj['Size'],
  169. obj.get('LastModifiedDate', ''),
  170. reason)).encode() + "\n".encode())
  171. except Exception as e:
  172. logger.error("ERROR: Failed to write to dead letter queue. s3://{}/{}".format(obj['Bucket'], obj['Key']))
  173. logger.debug("Exception: %s", e, exc_info=True)
  174. sys.exit(2)
  175. class Archiver(object):
  176. "Handler to create tar archives of S3 objects in a manifest"
  177. def __init__(self, bucket, prefix, manifest, maxsize=2*1024*1024, compress=False):
  178. self.s3 = boto3.client('s3')
  179. if compress:
  180. self.tarmode = 'w:gz'
  181. self.extension = 'tar.gz'
  182. else:
  183. self.tarmode='w'
  184. self.extension = 'tar'
  185. self.bucket = bucket
  186. self.prefix = prefix
  187. self.manifest = manifest
  188. self.maxsize = maxsize
  189. self.compress = compress
  190. self.tarobj = tempfile.TemporaryFile()
  191. self.target = tarfile.open(mode=self.tarmode, fileobj=self.tarobj)
  192. hk = hashkey(self.manifest.sourcebucket, self.manifest.name)
  193. self.indexkey = "{}/{}/{}/{}.index".format(self.prefix.strip('/'),
  194. hk,
  195. self.manifest.sourcebucket,
  196. self.manifest.name)
  197. self.index = Index("index", self.s3, self.bucket, self.indexkey)
  198. self.dlq = DLQ("dlq", self.s3, self.manifest.bucket, self.manifest.key + ".dlq")
  199. self.archivekey = "{}/{}/{}/{}.{}".format(self.prefix.strip('/'),
  200. hk,
  201. self.manifest.sourcebucket,
  202. self.manifest.name,
  203. self.extension)
  204. def __del__(self):
  205. logger.info("cleaning up archiver")
  206. self.tarobj.close()
  207. self.index.close()
  208. self.dlq.close()
  209. def S3ErrorHandler(self, obj, e):
  210. "Handle S3 exceptions to see if we can recover or need to fail"
  211. code = e.response["Error"]["Code"]
  212. logger.error("DEBUG: s3://{}/{} request-id: {} x-amz-id-2: {}".format(obj['Bucket'],
  213. obj['Key'], e.response['ResponseMetadata']['HTTPHeaders']['x-amz-request-id'],
  214. e.response['ResponseMetadata']['HTTPHeaders']['x-amz-id-2']))
  215. if code in ("InternalError","OperationAborted","RequestTimeout","ServiceUnavailable","SlowDown"):
  216. #retry
  217. logger.error("ERROR: Skipping {} on {}. Possible throttling. object: s3://{}/{}".format(code,
  218. e.operation_name, obj['Bucket'], obj['Key']))
  219. self.dlq.line(obj, "Retry")
  220. elif str(code) in ("NoSuchKey", "404"):
  221. #deleted. log and continue
  222. logger.error("ERROR: Skipping deleted object: s3://{}/{}".format(obj['Bucket'], obj['Key']))
  223. self.dlq.line(obj, "NoSuchKey")
  224. elif code == "InvalidObjectState":
  225. #glacier. log and continue
  226. logger.error("ERROR: Skipping glaciered object: s3://{}/{}".format(obj['Bucket'], obj['Key']))
  227. self.dlq.line(obj, "Glacier")
  228. elif code == "AccessDenied":
  229. #maybe it is object acl? Log and continue.
  230. logger.error("ERROR: Skipping AccessDenied object: s3://{}/{}".format(obj['Bucket'], obj['Key']))
  231. self.dlq.line(obj, "AccessDenied")
  232. else:
  233. #Log and quit
  234. logger.error("ERROR: Failed to {} object: s3://{}/{}".format(
  235. e.operation_name, obj['Bucket'], obj['Key']))
  236. logger.error("Exception: %s", e, exc_info=True)
  237. sys.exit(5)
  238. def process(self, obj):
  239. """Download objects and add them to the archive.
  240. Really big objects are copied without modification."""
  241. # S3 Storage Inventory encodes to application/x-www-form-urlencoded and converts '%2f' back
  242. # to '/'
  243. decodedkey = unquote_plus(obj['Key'])
  244. if int(obj["Size"]) > self.maxsize:
  245. """This object is too big. copy to new bucket
  246. write metadata as key.metadata"""
  247. try:
  248. hk = hashkey(self.manifest.sourcebucket, obj['Key'])
  249. targetkey = '/'.join((self.prefix.strip('/'),
  250. hk,
  251. self.manifest.sourcebucket,
  252. 'bigobjects', decodedkey))
  253. metadatakey = targetkey + '.metadata'
  254. with Index("metadata", self.s3, self.bucket, metadatakey) as metadata:
  255. logger.info("Copying big object s3://{}/{} to s3://{}/{}".format(
  256. obj['Bucket'], decodedkey,
  257. self.bucket,
  258. targetkey))
  259. o = self.s3.head_object(Bucket=obj['Bucket'], Key=decodedkey)
  260. self.s3.copy({'Bucket': obj['Bucket'], 'Key': decodedkey},
  261. self.bucket,
  262. targetkey)
  263. metadata.line(obj, o)
  264. metadata.push()
  265. return
  266. except botocore.exceptions.ClientError as e:
  267. self.S3ErrorHandler(obj, e)
  268. return
  269. elif int(obj["Size"]) < 8*1024*1024:
  270. try:
  271. logger.info("Downloading to memory s3://{}/{}".format(
  272. obj['Bucket'], decodedkey))
  273. o = self.s3.get_object(Bucket=obj['Bucket'], Key=decodedkey)
  274. except botocore.exceptions.ClientError as e:
  275. self.S3ErrorHandler(obj, e)
  276. return
  277. bodyobj = o['Body']
  278. else:
  279. try:
  280. logger.info("Downloading to file s3://{}/{}".format(
  281. obj['Bucket'], decodedkey))
  282. o = self.s3.head_object(Bucket=obj['Bucket'], Key=decodedkey)
  283. bodyobj = tempfile.TemporaryFile()
  284. self.s3.download_fileobj(obj['Bucket'], decodedkey, bodyobj)
  285. except botocore.exceptions.ClientError as e:
  286. self.S3ErrorHandler(obj, e)
  287. return
  288. bodyobj.seek(0)
  289. self.index.line(obj, o)
  290. info = tarfile.TarInfo(name='/'.join((obj['Bucket'],
  291. decodedkey)))
  292. info.size = o['ContentLength']
  293. info.mtime = o['LastModified'].timestamp()
  294. info.type = tarfile.REGTYPE
  295. try:
  296. self.target.addfile(info, bodyobj)
  297. except Exception as e:
  298. logger.error("ERROR: Failed to add object to archive. s3://{}/{}".format(obj['Bucket'], decodedkey))
  299. logger.error("Exception: %s", e, exc_info=True)
  300. sys.exit(2)
  301. bodyobj.close()
  302. def commit(self):
  303. "Close archive and write index, dlq and tarball to S3"
  304. self.target.close()
  305. self.tarobj.seek(0)
  306. if self.dlq.tell() > 0:
  307. self.dlq.push()
  308. if self.index.tell() > 0:
  309. self.index.push()
  310. logger.info("uploading archive to s3://{bucket}/{key}".format(
  311. bucket=self.bucket,key=self.archivekey))
  312. try:
  313. self.s3.upload_fileobj(self.tarobj, self.bucket, self.archivekey)
  314. except botocore.exceptions.ClientError as e:
  315. logger.error("ERROR: Failed to upload archive: s3://{}/{}".format(self.bucket,
  316. self.archivekey))
  317. logger.error("Exception: %s", e, exc_info=True)
  318. sys.exit(4)
  319. self.manifest.success()
  320. def s3uri(uri):
  321. if not uri.startswith('s3://'):
  322. raise argparse.ArgumentTypeError("manifest uri is not an s3uri: s3://bucket/key")
  323. else:
  324. o = urlparse(uri)
  325. return {'bucket': o.netloc,
  326. 'key': o.path.lstrip('/')}
  327. def Size(size):
  328. import re
  329. sizere = re.compile(r'^(\d+)(gb|g|mb|m|kb|k)?$', re.IGNORECASE)
  330. m = sizere.match(size)
  331. if m is None:
  332. raise argparse.ArgumentTypeError("maxsize must be a number possibly followed by GB|G|MB|M|KB|K")
  333. elif m.groups()[1] is None:
  334. return int(m.groups()[0])
  335. elif m.groups()[1].lower() in ("gb","g"):
  336. return int(m.groups()[0])*1024*1024*1024
  337. elif m.groups()[1].lower() in ("mb","m"):
  338. return int(m.groups()[0])*1024*1024
  339. elif m.groups()[1].lower() in ("kb","k"):
  340. return int(m.groups()[0])*1024
  341. else:
  342. raise argparse.ArgumentTypeError("Invalid size")
  343. def parsemsg(message):
  344. "Accepts JSON messages from S3 Events or with Bucket and Key keys"
  345. try:
  346. mb = json.loads(message.body)
  347. if mb.get("Records", None) is not None:
  348. for event in mb["Records"]:
  349. if (event.get('eventSource') == 'aws:s3' and
  350. event.get('eventName') == 'ObjectCreated:Put'):
  351. bucket = event['s3']['bucket']['name']
  352. key = event['s3']['object']['key']
  353. yield (bucket, key)
  354. elif (mb.get('Bucket', None) is not None and
  355. mb.get('Key', None) is not None):
  356. yield (mb['Bucket'], mb['Key'])
  357. else:
  358. raise ValueError
  359. except json.decoder.JSONDecodeError as e:
  360. logger.error("ERROR: Invalid message: {}".format(message.body))
  361. logger.error("ERROR: Invalid JSON in message: %s", e, exc_info=False)
  362. sys.exit(2)
  363. except ValueError:
  364. logger.error("ERROR: Invalid message: {}".format(message.body))
  365. sys.exit(2)
  366. def hashkey(bucket, key):
  367. """create a key to use with a prepartitioned bucket:
  368. https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html"""
  369. return hashlib.md5(os.path.join(bucket,key).encode('utf-8')).hexdigest()[0:6]
  370. if __name__ == "__main__":
  371. parser = argparse.ArgumentParser(description="Bundle S3 objects from an inventory into an archive")
  372. parser.add_argument("-q","--queue", type=str,
  373. help="SQS S3Bundler manifest queue.")
  374. parser.add_argument("-m","--manifest", metavar="s3://bucket/key", type=s3uri,
  375. help="Manifest produced by s3grouper")
  376. parser.add_argument("-b","--bucket", metavar="BUCKET", type=str,
  377. help="S3 bucket to write archives to")
  378. parser.add_argument("-p","--prefix", metavar="PREFIX", type=str,
  379. help="Target S3 prefix")
  380. parser.add_argument("-f","--fieldnames", metavar="F", type=str, nargs="*",
  381. help="Field names in order used by s3grouper",
  382. default=["Bucket",
  383. "Key",
  384. "Size"])
  385. parser.add_argument("-s", "--maxsize", metavar="BYTES", type=Size,
  386. default=2*1024*1024*1024,
  387. help="Objects greater than maxsize will be copied " +
  388. "directly to the destination bucket. Metadata will be" +
  389. " stored alongside them. Checksums will not be" +
  390. " calculated. Default: 2GB")
  391. parser.add_argument("-c","--compress", action="store_true", default=False,
  392. help="Compress archives with gzip")
  393. parser.add_argument("-v", "--verbose", action="store_true", default=False,
  394. help="Enable verbose messages")
  395. parser.add_argument("-d", "--debug", action="store_true", default=False,
  396. help="Enable debug messages")
  397. args = parser.parse_args()
  398. if args.verbose:
  399. logger.setLevel(logging.INFO)
  400. if args.debug:
  401. logger.setLevel(logging.DEBUG)
  402. if args.queue is not None:
  403. sqs = boto3.resource('sqs')
  404. try:
  405. queue = sqs.get_queue_by_name(QueueName=args.queue)
  406. except botocore.exceptions.ClientError as e:
  407. logger.error("ERROR: Failed to open queue {}".format(args.queue))
  408. logger.debug("Exception: %s", e, exc_info=True)
  409. sys.exit(2)
  410. count = 0
  411. while count < 2:
  412. try:
  413. for message in queue.receive_messages(WaitTimeSeconds=20,
  414. MaxNumberOfMessages=1):
  415. for (bucket, key) in parsemsg(message):
  416. logger.info("Received manifest from queue: s3://{}/{}".format(bucket, key))
  417. manifest = Manifest(bucket, key, message=message, fieldnames=args.fieldnames)
  418. archiver = Archiver(args.bucket, args.prefix, manifest,
  419. maxsize=args.maxsize, compress=args.compress)
  420. for obj in manifest:
  421. archiver.process(obj)
  422. archiver.commit()
  423. count += 1
  424. except botocore.exceptions.ClientError as e:
  425. logger.error("ERROR: Failed to get message from queue {}".format(queue.url))
  426. logger.debug("Exception: %s", e, exc_info=True)
  427. sys.exit(2)
  428. else:
  429. manifest = Manifest(args.manifest['bucket'], args.manifest['key'], fieldnames=args.fieldnames)
  430. archiver = Archiver(args.bucket, args.prefix, manifest,
  431. maxsize=args.maxsize, compress=args.compress)
  432. for obj in manifest:
  433. archiver.process(obj)
  434. archiver.commit()