1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- def main(src, dest, num_processes=2, split=32, force=False, verbose=False, quiet=False, secure=True, max_tries=5):
- # Check that src is a valid S3 url
- split_rs = urlparse.urlsplit(src)
- if split_rs.scheme != "s3":
- raise ValueError("'%s' is not an S3 url" % src)
- # Check that dest does not exist
- if os.path.isdir(dest):
- filename = split_rs.path.split('/')[-1]
- dest = os.path.join(dest, filename)
- if os.path.exists(dest):
- if force:
- os.remove(dest)
- else:
- raise ValueError("Destination file '%s' exists, specify -f to"
- " overwrite" % dest)
- # Split out the bucket and the key
- s3 = boto.connect_s3()
- s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
- s3.is_secure = secure
- logger.debug("split_rs: %s" % str(split_rs))
- bucket = s3.lookup(split_rs.netloc)
- if bucket == None:
- raise ValueError("'%s' is not a valid bucket" % split_rs.netloc)
- key = bucket.get_key(split_rs.path)
- if key is None:
- raise ValueError("'%s' does not exist." % split_rs.path)
- # Determine the total size and calculate byte ranges
- resp = s3.make_request("HEAD", bucket=bucket, key=key)
- if resp is None:
- raise ValueError("response is invalid.")
-
- size = int(resp.getheader("content-length"))
- logger.debug("Got headers: %s" % resp.getheaders())
- # Skipping multipart if file is less than 1mb
- if size < 1024 * 1024:
- t1 = time.time()
- key.get_contents_to_filename(dest)
- t2 = time.time() - t1
- size_mb = size / 1024 / 1024
- logger.info("Finished single-part download of %0.2fM in %0.2fs (%0.2fMBps)" %
- (size_mb, t2, size_mb/t2))
- else:
- # Touch the file
- fd = os.open(dest, os.O_CREAT)
- os.close(fd)
-
- size_mb = size / 1024 / 1024
- num_parts = (size_mb+(-size_mb%split))//split
- def arg_iterator(num_parts):
- for min_byte, max_byte in gen_byte_ranges(size, num_parts):
- yield (bucket.name, key.name, dest, min_byte, max_byte, split, secure, max_tries, 0)
- s = size / 1024 / 1024.
- try:
- t1 = time.time()
- pool = Pool(processes=num_processes)
- pool.map_async(do_part_download, arg_iterator(num_parts)).get(9999999)
- t2 = time.time() - t1
- logger.info("Finished downloading %0.2fM in %0.2fs (%0.2fMBps)" %
- (s, t2, s/t2))
- except KeyboardInterrupt:
- logger.warning("User terminated")
- except Exception as err:
- logger.error(err)
|