123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- #!/usr/bin/env python
- import argparse
- from cStringIO import StringIO
- import logging
- from math import ceil
- from multiprocessing import Pool
- import time
- import urlparse
- import boto
- from boto.s3.connection import OrdinaryCallingFormat
- parser = argparse.ArgumentParser(description="Transfer large files to S3",
- prog="s3-mp-upload")
- parser.add_argument("src", type=file, help="The file to transfer")
- parser.add_argument("dest", help="The S3 destination object")
- parser.add_argument("-np", "--num-processes", help="Number of processors to use",
- type=int, default=2)
- parser.add_argument("-f", "--force", help="Overwrite an existing S3 key",
- action="store_true")
- parser.add_argument("-s", "--split", help="Split size, in Mb", type=int, default=50)
- parser.add_argument("-rrs", "--reduced-redundancy", help="Use reduced redundancy storage. Default is standard.", default=False, action="store_true")
- parser.add_argument("--insecure", dest='secure', help="Use HTTP for connection",
- default=True, action="store_false")
- parser.add_argument("-t", "--max-tries", help="Max allowed retries for http timeout", type=int, default=5)
- parser.add_argument("-v", "--verbose", help="Be more verbose", default=False, action="store_true")
- parser.add_argument("-q", "--quiet", help="Be less verbose (for use in cron jobs)", default=False, action="store_true")
- logger = logging.getLogger("s3-mp-upload")
- def do_part_upload(args):
- """
- Upload a part of a MultiPartUpload
- Open the target file and read in a chunk. Since we can't pickle
- S3Connection or MultiPartUpload objects, we have to reconnect and lookup
- the MPU object with each part upload.
- :type args: tuple of (string, string, string, int, int, int)
- :param args: The actual arguments of this method. Due to lameness of
- multiprocessing, we have to extract these outside of the
- function definition.
- The arguments are: S3 Bucket name, MultiPartUpload id, file
- name, the part number, part offset, part size
- """
- # Multiprocessing args lameness
- bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries = args
- logger.debug("do_part_upload got args: %s" % (args,))
- # Connect to S3, get the MultiPartUpload
- s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
- s3.is_secure = secure
- bucket = s3.lookup(bucket_name)
- mpu = None
- for mp in bucket.list_multipart_uploads():
- if mp.id == mpu_id:
- mpu = mp
- break
- if mpu is None:
- raise Exception("Could not find MultiPartUpload %s" % mpu_id)
- # Read the chunk from the file
- fp = open(fname, 'rb')
- fp.seek(start)
- data = fp.read(size)
- fp.close()
- if not data:
- raise Exception("Unexpectedly tried to read an empty chunk")
- def progress(x,y):
- logger.debug("Part %d: %0.2f%%" % (i+1, 100.*x/y))
- try:
- # Do the upload
- t1 = time.time()
- mpu.upload_part_from_file(StringIO(data), i+1, cb=progress)
- # Print some timings
- t2 = time.time() - t1
- s = len(data)/1024./1024.
- logger.info("Uploaded part %s (%0.2fM) in %0.2fs at %0.2fMBps" % (i+1, s, t2, s/t2))
- except Exception, err:
- logger.debug("Retry request %d of max %d times" % (current_tries, max_tries))
- if (current_tries > max_tries):
- logger.error(err)
- else:
- time.sleep(3)
- current_tries += 1
- do_part_download(bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries)
- def main(src, dest, num_processes=2, split=50, force=False, reduced_redundancy=False, verbose=False, quiet=False, secure=True, max_tries=5):
- # Check that dest is a valid S3 url
- split_rs = urlparse.urlsplit(dest)
- if split_rs.scheme != "s3":
- raise ValueError("'%s' is not an S3 url" % dest)
- s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
- s3.is_secure = secure
- bucket = s3.lookup(split_rs.netloc)
- if bucket == None:
- raise ValueError("'%s' is not a valid bucket" % split_rs.netloc)
- key = bucket.get_key(split_rs.path)
- # See if we're overwriting an existing key
- if key is not None:
- if not force:
- raise ValueError("'%s' already exists. Specify -f to overwrite it" % dest)
- # Determine the splits
- part_size = max(5*1024*1024, 1024*1024*split)
- src.seek(0,2)
- size = src.tell()
- num_parts = int(ceil(size / part_size))
- # If file is less than 5M, just upload it directly
- if size < 5*1024*1024:
- src.seek(0)
- t1 = time.time()
- k = boto.s3.key.Key(bucket,split_rs.path)
- k.set_contents_from_file(src)
- t2 = time.time() - t1
- s = size/1024./1024.
- logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2))
- return
- # Create the multi-part upload object
- mpu = bucket.initiate_multipart_upload(split_rs.path, reduced_redundancy=reduced_redundancy)
- logger.info("Initialized upload: %s" % mpu.id)
- # Generate arguments for invocations of do_part_upload
- def gen_args(num_parts, fold_last):
- for i in range(num_parts+1):
- part_start = part_size*i
- if i == (num_parts-1) and fold_last is True:
- yield (bucket.name, mpu.id, src.name, i, part_start, part_size*2, secure, max_tries, 0)
- break
- else:
- yield (bucket.name, mpu.id, src.name, i, part_start, part_size, secure, max_tries, 0)
- # If the last part is less than 5M, just fold it into the previous part
- fold_last = ((size % part_size) < 5*1024*1024)
- # Do the thing
- try:
- # Create a pool of workers
- pool = Pool(processes=num_processes)
- t1 = time.time()
- pool.map_async(do_part_upload, gen_args(num_parts, fold_last)).get(9999999)
- # Print out some timings
- t2 = time.time() - t1
- s = size/1024./1024.
- # Finalize
- src.close()
- mpu.complete_upload()
- logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2))
- except KeyboardInterrupt:
- logger.warn("Received KeyboardInterrupt, canceling upload")
- pool.terminate()
- mpu.cancel_upload()
- except Exception, err:
- logger.error("Encountered an error, canceling upload")
- logger.error(err)
- mpu.cancel_upload()
- if __name__ == "__main__":
- logging.basicConfig(level=logging.INFO)
- args = parser.parse_args()
- arg_dict = vars(args)
- if arg_dict['quiet'] == True:
- logger.setLevel(logging.WARNING)
- if arg_dict['verbose'] == True:
- logger.setLevel(logging.DEBUG)
- logger.debug("CLI args: %s" % args)
- main(**arg_dict)
|