123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- def main(src, dest, num_processes=2, split=50, force=False, reduced_redundancy=False, verbose=False):
- dest_bucket_name, dest_key_name = validate_url( dest )
- src_bucket_name, src_key_name = validate_url( src )
- s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
- dest_bucket = s3.lookup( dest_bucket_name )
- dest_key = dest_bucket.get_key( dest_key_name )
-
- # See if we're overwriting an existing key
- if dest_key is not None:
- if not force:
- raise ValueError("'%s' already exists. Specify -f to overwrite it" % dest)
- # Determine the total size and calculate byte ranges
- src_bucket = s3.lookup( src_bucket_name )
- src_key = src_bucket.get_key( src_key_name )
- size = src_key.size
- # If file is less than 5G, copy it directly
- if size < 5*1024*1024*1024:
- logging.info("Source object is %0.2fM copying it directly" % ( size/1024./1024. ))
- t1 = time.time()
- src_key.copy( dest_bucket_name, dest_key_name, reduced_redundancy=reduced_redundancy )
- t2 = time.time() - t1
- s = size/1024./1024.
- logger.info("Finished copying %0.2fM in %0.2fs (%0.2fMbps)" % (s, t2, s/t2))
- return
- part_size = max(5*1024*1024, 1024*1024*split)
- num_parts = int(ceil(size / float(part_size)))
- logging.info("Source object is %0.2fM splitting into %d parts of size %0.2fM" % (size/1024./1024., num_parts, part_size/1024./1024.) )
- # Create the multi-part upload object
- mpu = dest_bucket.initiate_multipart_upload( dest_key_name, reduced_redundancy=reduced_redundancy)
- logger.info("Initialized copy: %s" % mpu.id)
- # Generate arguments for invocations of do_part_copy
- def gen_args(num_parts):
- cur_pos = 0
- for i in range(num_parts):
- part_start = cur_pos
- cur_pos = cur_pos + part_size
- part_end = min(cur_pos - 1, size - 1)
- part_num = i + 1
- yield (src_bucket_name, src_key_name, dest_bucket_name, mpu.id, part_num, part_start, part_end)
- # Do the thing
- try:
- # Create a pool of workers
- pool = Pool(processes=num_processes)
- t1 = time.time()
- pool.map_async(do_part_copy, gen_args(num_parts)).get(9999999)
- # Print out some timings
- t2 = time.time() - t1
- s = size/1024./1024.
- # Finalize
- mpu.complete_upload()
- logger.info("Finished copying %0.2fM in %0.2fs (%0.2fMbps)" % (s, t2, s/t2))
- except KeyboardInterrupt:
- logger.warn("Received KeyboardInterrupt, canceling copy")
- pool.terminate()
- mpu.cancel_upload()
- except Exception as err:
- logger.error("Encountered an error, canceling copy")
- logger.error(err)
- mpu.cancel_upload()
|