#!/usr/bin/python # # Author: Tobias Galitzien # Copyright (c) 2012 Tobias Galitzien # Berlin, Germany # import sys import boto.s3.connection import argparse import cStringIO import hashlib MB = 1024 * 1024 def bailout(msg): sys.stderr.write(msg) sys.exit(-1) # parse command line parser = argparse.ArgumentParser(description = 'pipe streaming to s3 buckets') parser.add_argument('-b','--bucket', required = True) parser.add_argument('-k','--key', required = True) parser.add_argument('-d','--download', action = 'store_true', help = 'download mode, default is upload') parser.add_argument('-c','--chunksize', type = int, default = 5, help = 'the upload chunk size in MB, default 5') parser.add_argument('-q','--quiet', action = 'store_true', help = 'be quiet') parser.add_argument('-v','--verbose', type = int, default = 0, help = 'set boto debug level') parser.add_argument('-e','--endpoint', default = 's3.amazonaws.com', help = 'set s3 endpoint (s3-eu-west-1.amazonaws.com for EU buckets, see http://docs.amazonwebservices.com/general/latest/gr/rande.html#s3_region)') args = parser.parse_args() # connect to s3 try: s3 = boto.s3.connection.S3Connection(debug = args.verbose, host = args.endpoint) except boto.exception.S3ResponseError, e: bailout("cannot connect to S3: %s\n" % e.code) # open bucket or create it if necessary try: bucket = s3.get_bucket(args.bucket) except boto.exception.S3ResponseError, e: if e.code == 'NoSuchBucket': if args.download: bailout("bucket %s not found\n" % args.bucket) if not args.quiet: sys.stderr.write("creating bucket %s\n" % args.bucket) try: bucket = s3.create_bucket(args.bucket) except boto.exception.S3ResponseError, e: bailout("cannot create bucket %s: %s\n" % (args.bucket,e.code)) else: bailout("cannot access bucket %s: %s\n" % (args.bucket,e.code)) chunksize = args.chunksize * MB buf = str(chunksize) sum = 0 part_num = 1 # handle download mode if args.download: key = bucket.get_key(args.key) if not key: bailout("cannot find key %s in bucket %s\n" % (args.key, args.bucket)) if not args.quiet: sys.stderr.write("download key %s from bucket %s\n" % (args.key, args.bucket)) while True: if not args.quiet: sys.stderr.write("part %d: going to read %.1f MB from key %s\n" % ( part_num, float(chunksize)/MB, args.key)) buf = key.read(chunksize) part_size = len(buf) sum += part_size if not args.quiet: sys.stderr.write("part %d: %.1f MB (%d bytes) read, sum: %.1f MB (%d bytes)\n" % (part_num, float(part_size)/MB, part_size, float(sum)/MB, sum)) sys.stdout.write(buf) part_num += 1 if part_size < chunksize: break key.close() sys.exit() # handle upload mode if not args.quiet: sys.stderr.write("initiating multipart upload to S3 bucket %s, key %s\n" % (args.bucket, args.key)) try: mp = bucket.initiate_multipart_upload(args.key) except boto.exception.S3ResponseError, e: bailout("cannot initiate multipart upload: %s\n" % e.code) while True: if not args.quiet: sys.stderr.write("reading from stdin...") buf = sys.stdin.read(chunksize) part_size = len(buf) sum += part_size if not args.quiet: sys.stderr.write("\npart %d: %.1f MB (%d bytes) read, sum: %.1f MB (%d bytes)\n" % (part_num, float(part_size)/MB, part_size, float(sum)/MB, sum)) my_md5 = hashlib.md5(buf).hexdigest() sio = cStringIO.StringIO(buf) mp.upload_part_from_file(sio,part_num) part = mp.get_all_parts()[-1] their_md5 = part.etag[1:-1] if part.size == part_size and their_md5 == my_md5: if not args.quiet: sys.stderr.write("part %d: %.1f MB (%d bytes) written, checksum ok\n" % (part.part_number, float(part.size)/MB, part.size)) else: mp.cancel_upload() bailout("checksum error, upload canceled\n") sio.close() part_num += 1 if part_size < chunksize: break try: cu = mp.complete_upload() if not args.quiet: sys.stderr.write("ok\n") except boto.exception.S3ResponseError, e: bailout("multipart complete error: %s\n" % e.code)