def main(src, dest, num_processes=2, split=50, force=False, reduced_redundancy=False, verbose=False, quiet=False, secure=True, max_tries=5): # Check that dest is a valid S3 url split_rs = urlparse.urlsplit(dest) if split_rs.scheme != "s3": raise ValueError("'%s' is not an S3 url" % dest) s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat()) s3.is_secure = secure bucket = s3.lookup(split_rs.netloc) if bucket == None: raise ValueError("'%s' is not a valid bucket" % split_rs.netloc) key = bucket.get_key(split_rs.path) # See if we're overwriting an existing key if key is not None: if not force: raise ValueError("'%s' already exists. Specify -f to overwrite it" % dest) # Determine the splits part_size = max(5*1024*1024, 1024*1024*split) src.seek(0,2) size = src.tell() num_parts = int(ceil(size / part_size)) # If file is less than 5M, just upload it directly if size < 5*1024*1024: src.seek(0) t1 = time.time() k = boto.s3.key.Key(bucket,split_rs.path) k.set_contents_from_file(src) t2 = time.time() - t1 s = size/1024./1024. logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2)) return # Create the multi-part upload object mpu = bucket.initiate_multipart_upload(split_rs.path, reduced_redundancy=reduced_redundancy) logger.info("Initialized upload: %s" % mpu.id) # Generate arguments for invocations of do_part_upload def gen_args(num_parts, fold_last): for i in range(num_parts+1): part_start = part_size*i if i == (num_parts-1) and fold_last is True: yield (bucket.name, mpu.id, src.name, i, part_start, part_size*2, secure, max_tries, 0) break else: yield (bucket.name, mpu.id, src.name, i, part_start, part_size, secure, max_tries, 0) # If the last part is less than 5M, just fold it into the previous part fold_last = ((size % part_size) < 5*1024*1024) # Do the thing try: # Create a pool of workers pool = Pool(processes=num_processes) t1 = time.time() pool.map_async(do_part_upload, gen_args(num_parts, fold_last)).get(9999999) # Print out some timings t2 = time.time() - t1 s = size/1024./1024. # Finalize src.close() mpu.complete_upload() logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2)) except KeyboardInterrupt: logger.warn("Received KeyboardInterrupt, canceling upload") pool.terminate() mpu.cancel_upload() except Exception as err: logger.error("Encountered an error, canceling upload") logger.error(err) mpu.cancel_upload()