s3pipe 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. #!/usr/bin/python
  2. #
  3. # Author: Tobias Galitzien <tg@trusttheadmin.de>
  4. # Copyright (c) 2012 Tobias Galitzien <tg@trusttheadmin.de>
  5. # Berlin, Germany
  6. #
  7. import sys
  8. import boto.s3.connection
  9. import argparse
  10. import cStringIO
  11. import hashlib
  12. MB = 1024 * 1024
  13. def bailout(msg):
  14. sys.stderr.write(msg)
  15. sys.exit(-1)
  16. # parse command line
  17. parser = argparse.ArgumentParser(description = 'pipe streaming to s3 buckets')
  18. parser.add_argument('-b','--bucket', required = True)
  19. parser.add_argument('-k','--key', required = True)
  20. parser.add_argument('-d','--download', action = 'store_true', help = 'download mode, default is upload')
  21. parser.add_argument('-c','--chunksize', type = int, default = 5, help = 'the upload chunk size in MB, default 5')
  22. parser.add_argument('-q','--quiet', action = 'store_true', help = 'be quiet')
  23. parser.add_argument('-v','--verbose', type = int, default = 0, help = 'set boto debug level')
  24. parser.add_argument('-e','--endpoint', default = 's3.amazonaws.com', help = 'set s3 endpoint (s3-eu-west-1.amazonaws.com for EU buckets, see http://docs.amazonwebservices.com/general/latest/gr/rande.html#s3_region)')
  25. args = parser.parse_args()
  26. # connect to s3
  27. try:
  28. s3 = boto.s3.connection.S3Connection(debug = args.verbose, host = args.endpoint)
  29. except boto.exception.S3ResponseError, e:
  30. bailout("cannot connect to S3: %s\n" % e.code)
  31. # open bucket or create it if necessary
  32. try:
  33. bucket = s3.get_bucket(args.bucket)
  34. except boto.exception.S3ResponseError, e:
  35. if e.code == 'NoSuchBucket':
  36. if args.download:
  37. bailout("bucket %s not found\n" % args.bucket)
  38. if not args.quiet: sys.stderr.write("creating bucket %s\n" % args.bucket)
  39. try:
  40. bucket = s3.create_bucket(args.bucket)
  41. except boto.exception.S3ResponseError, e:
  42. bailout("cannot create bucket %s: %s\n" % (args.bucket,e.code))
  43. else:
  44. bailout("cannot access bucket %s: %s\n" % (args.bucket,e.code))
  45. chunksize = args.chunksize * MB
  46. buf = str(chunksize)
  47. sum = 0
  48. part_num = 1
  49. # handle download mode
  50. if args.download:
  51. key = bucket.get_key(args.key)
  52. if not key:
  53. bailout("cannot find key %s in bucket %s\n" % (args.key, args.bucket))
  54. if not args.quiet: sys.stderr.write("download key %s from bucket %s\n" % (args.key, args.bucket))
  55. while True:
  56. if not args.quiet: sys.stderr.write("part %d: going to read %.1f MB from key %s\n" % ( part_num, float(chunksize)/MB, args.key))
  57. buf = key.read(chunksize)
  58. part_size = len(buf)
  59. sum += part_size
  60. if not args.quiet: sys.stderr.write("part %d: %.1f MB (%d bytes) read, sum: %.1f MB (%d bytes)\n" % (part_num, float(part_size)/MB, part_size, float(sum)/MB, sum))
  61. sys.stdout.write(buf)
  62. part_num += 1
  63. if part_size < chunksize: break
  64. key.close()
  65. sys.exit()
  66. # handle upload mode
  67. if not args.quiet: sys.stderr.write("initiating multipart upload to S3 bucket %s, key %s\n" % (args.bucket, args.key))
  68. try:
  69. mp = bucket.initiate_multipart_upload(args.key)
  70. except boto.exception.S3ResponseError, e:
  71. bailout("cannot initiate multipart upload: %s\n" % e.code)
  72. while True:
  73. if not args.quiet: sys.stderr.write("reading from stdin...")
  74. buf = sys.stdin.read(chunksize)
  75. part_size = len(buf)
  76. sum += part_size
  77. if not args.quiet: sys.stderr.write("\npart %d: %.1f MB (%d bytes) read, sum: %.1f MB (%d bytes)\n" % (part_num, float(part_size)/MB, part_size, float(sum)/MB, sum))
  78. my_md5 = hashlib.md5(buf).hexdigest()
  79. sio = cStringIO.StringIO(buf)
  80. mp.upload_part_from_file(sio,part_num)
  81. part = mp.get_all_parts()[-1]
  82. their_md5 = part.etag[1:-1]
  83. if part.size == part_size and their_md5 == my_md5:
  84. if not args.quiet: sys.stderr.write("part %d: %.1f MB (%d bytes) written, checksum ok\n" % (part.part_number, float(part.size)/MB, part.size))
  85. else:
  86. mp.cancel_upload()
  87. bailout("checksum error, upload canceled\n")
  88. sio.close()
  89. part_num += 1
  90. if part_size < chunksize: break
  91. try:
  92. cu = mp.complete_upload()
  93. if not args.quiet: sys.stderr.write("ok\n")
  94. except boto.exception.S3ResponseError, e:
  95. bailout("multipart complete error: %s\n" % e.code)