123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- #!/usr/bin/python
- #
- # Author: Tobias Galitzien <tg@trusttheadmin.de>
- # Copyright (c) 2012 Tobias Galitzien <tg@trusttheadmin.de>
- # Berlin, Germany
- #
- import sys
- import boto.s3.connection
- import argparse
- import cStringIO
- import hashlib
- MB = 1024 * 1024
- def bailout(msg):
- sys.stderr.write(msg)
- sys.exit(-1)
- # parse command line
- parser = argparse.ArgumentParser(description = 'pipe streaming to s3 buckets')
- parser.add_argument('-b','--bucket', required = True)
- parser.add_argument('-k','--key', required = True)
- parser.add_argument('-d','--download', action = 'store_true', help = 'download mode, default is upload')
- parser.add_argument('-c','--chunksize', type = int, default = 5, help = 'the upload chunk size in MB, default 5')
- parser.add_argument('-q','--quiet', action = 'store_true', help = 'be quiet')
- parser.add_argument('-v','--verbose', type = int, default = 0, help = 'set boto debug level')
- parser.add_argument('-e','--endpoint', default = 's3.amazonaws.com', help = 'set s3 endpoint (s3-eu-west-1.amazonaws.com for EU buckets, see http://docs.amazonwebservices.com/general/latest/gr/rande.html#s3_region)')
- args = parser.parse_args()
- # connect to s3
- try:
- s3 = boto.s3.connection.S3Connection(debug = args.verbose, host = args.endpoint)
- except boto.exception.S3ResponseError, e:
- bailout("cannot connect to S3: %s\n" % e.code)
- # open bucket or create it if necessary
- try:
- bucket = s3.get_bucket(args.bucket)
- except boto.exception.S3ResponseError, e:
- if e.code == 'NoSuchBucket':
- if args.download:
- bailout("bucket %s not found\n" % args.bucket)
- if not args.quiet: sys.stderr.write("creating bucket %s\n" % args.bucket)
- try:
- bucket = s3.create_bucket(args.bucket)
- except boto.exception.S3ResponseError, e:
- bailout("cannot create bucket %s: %s\n" % (args.bucket,e.code))
- else:
- bailout("cannot access bucket %s: %s\n" % (args.bucket,e.code))
- chunksize = args.chunksize * MB
- buf = str(chunksize)
- sum = 0
- part_num = 1
- # handle download mode
- if args.download:
- key = bucket.get_key(args.key)
- if not key:
- bailout("cannot find key %s in bucket %s\n" % (args.key, args.bucket))
- if not args.quiet: sys.stderr.write("download key %s from bucket %s\n" % (args.key, args.bucket))
- while True:
- if not args.quiet: sys.stderr.write("part %d: going to read %.1f MB from key %s\n" % ( part_num, float(chunksize)/MB, args.key))
- buf = key.read(chunksize)
- part_size = len(buf)
- sum += part_size
- if not args.quiet: sys.stderr.write("part %d: %.1f MB (%d bytes) read, sum: %.1f MB (%d bytes)\n" % (part_num, float(part_size)/MB, part_size, float(sum)/MB, sum))
- sys.stdout.write(buf)
- part_num += 1
- if part_size < chunksize: break
- key.close()
- sys.exit()
- # handle upload mode
- if not args.quiet: sys.stderr.write("initiating multipart upload to S3 bucket %s, key %s\n" % (args.bucket, args.key))
- try:
- mp = bucket.initiate_multipart_upload(args.key)
- except boto.exception.S3ResponseError, e:
- bailout("cannot initiate multipart upload: %s\n" % e.code)
- while True:
- if not args.quiet: sys.stderr.write("reading from stdin...")
- buf = sys.stdin.read(chunksize)
- part_size = len(buf)
- sum += part_size
- if not args.quiet: sys.stderr.write("\npart %d: %.1f MB (%d bytes) read, sum: %.1f MB (%d bytes)\n" % (part_num, float(part_size)/MB, part_size, float(sum)/MB, sum))
- my_md5 = hashlib.md5(buf).hexdigest()
- sio = cStringIO.StringIO(buf)
- mp.upload_part_from_file(sio,part_num)
- part = mp.get_all_parts()[-1]
- their_md5 = part.etag[1:-1]
- if part.size == part_size and their_md5 == my_md5:
- if not args.quiet: sys.stderr.write("part %d: %.1f MB (%d bytes) written, checksum ok\n" % (part.part_number, float(part.size)/MB, part.size))
- else:
- mp.cancel_upload()
- bailout("checksum error, upload canceled\n")
- sio.close()
- part_num += 1
- if part_size < chunksize: break
- try:
- cu = mp.complete_upload()
- if not args.quiet: sys.stderr.write("ok\n")
- except boto.exception.S3ResponseError, e:
- bailout("multipart complete error: %s\n" % e.code)
|