def main(src, dest, num_processes=2, split=32, force=False, verbose=False, quiet=False, secure=True, max_tries=5): # Check that src is a valid S3 url split_rs = urlparse.urlsplit(src) if split_rs.scheme != "s3": raise ValueError("'%s' is not an S3 url" % src) # Check that dest does not exist if os.path.isdir(dest): filename = split_rs.path.split('/')[-1] dest = os.path.join(dest, filename) if os.path.exists(dest): if force: os.remove(dest) else: raise ValueError("Destination file '%s' exists, specify -f to" " overwrite" % dest) # Split out the bucket and the key s3 = boto.connect_s3() s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat()) s3.is_secure = secure logger.debug("split_rs: %s" % str(split_rs)) bucket = s3.lookup(split_rs.netloc) if bucket == None: raise ValueError("'%s' is not a valid bucket" % split_rs.netloc) key = bucket.get_key(split_rs.path) if key is None: raise ValueError("'%s' does not exist." % split_rs.path) # Determine the total size and calculate byte ranges resp = s3.make_request("HEAD", bucket=bucket, key=key) if resp is None: raise ValueError("response is invalid.") size = int(resp.getheader("content-length")) logger.debug("Got headers: %s" % resp.getheaders()) # Skipping multipart if file is less than 1mb if size < 1024 * 1024: t1 = time.time() key.get_contents_to_filename(dest) t2 = time.time() - t1 size_mb = size / 1024 / 1024 logger.info("Finished single-part download of %0.2fM in %0.2fs (%0.2fMBps)" % (size_mb, t2, size_mb/t2)) else: # Touch the file fd = os.open(dest, os.O_CREAT) os.close(fd) size_mb = size / 1024 / 1024 num_parts = (size_mb+(-size_mb%split))//split def arg_iterator(num_parts): for min_byte, max_byte in gen_byte_ranges(size, num_parts): yield (bucket.name, key.name, dest, min_byte, max_byte, split, secure, max_tries, 0) s = size / 1024 / 1024. try: t1 = time.time() pool = Pool(processes=num_processes) pool.map_async(do_part_download, arg_iterator(num_parts)).get(9999999) t2 = time.time() - t1 logger.info("Finished downloading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2)) except KeyboardInterrupt: logger.warning("User terminated") except Exception as err: logger.error(err)