12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- def main(src, dest, num_processes=2, split=50, force=False, reduced_redundancy=False, verbose=False, quiet=False, secure=True, max_tries=5):
- # Check that dest is a valid S3 url
- split_rs = urlparse.urlsplit(dest)
- if split_rs.scheme != "s3":
- raise ValueError("'%s' is not an S3 url" % dest)
- s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
- s3.is_secure = secure
- bucket = s3.lookup(split_rs.netloc)
- if bucket == None:
- raise ValueError("'%s' is not a valid bucket" % split_rs.netloc)
- key = bucket.get_key(split_rs.path)
- # See if we're overwriting an existing key
- if key is not None:
- if not force:
- raise ValueError("'%s' already exists. Specify -f to overwrite it" % dest)
- # Determine the splits
- part_size = max(5*1024*1024, 1024*1024*split)
- src.seek(0,2)
- size = src.tell()
- num_parts = int(ceil(size / part_size))
- # If file is less than 5M, just upload it directly
- if size < 5*1024*1024:
- src.seek(0)
- t1 = time.time()
- k = boto.s3.key.Key(bucket,split_rs.path)
- k.set_contents_from_file(src)
- t2 = time.time() - t1
- s = size/1024./1024.
- logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2))
- return
- # Create the multi-part upload object
- mpu = bucket.initiate_multipart_upload(split_rs.path, reduced_redundancy=reduced_redundancy)
- logger.info("Initialized upload: %s" % mpu.id)
- # Generate arguments for invocations of do_part_upload
- def gen_args(num_parts, fold_last):
- for i in range(num_parts+1):
- part_start = part_size*i
- if i == (num_parts-1) and fold_last is True:
- yield (bucket.name, mpu.id, src.name, i, part_start, part_size*2, secure, max_tries, 0)
- break
- else:
- yield (bucket.name, mpu.id, src.name, i, part_start, part_size, secure, max_tries, 0)
- # If the last part is less than 5M, just fold it into the previous part
- fold_last = ((size % part_size) < 5*1024*1024)
- # Do the thing
- try:
- # Create a pool of workers
- pool = Pool(processes=num_processes)
- t1 = time.time()
- pool.map_async(do_part_upload, gen_args(num_parts, fold_last)).get(9999999)
- # Print out some timings
- t2 = time.time() - t1
- s = size/1024./1024.
- # Finalize
- src.close()
- mpu.complete_upload()
- logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2))
- except KeyboardInterrupt:
- logger.warn("Received KeyboardInterrupt, canceling upload")
- pool.terminate()
- mpu.cancel_upload()
- except Exception as err:
- logger.error("Encountered an error, canceling upload")
- logger.error(err)
- mpu.cancel_upload()
|