def main(src, dest, num_processes=2, split=50, force=False, reduced_redundancy=False, verbose=False): dest_bucket_name, dest_key_name = validate_url( dest ) src_bucket_name, src_key_name = validate_url( src ) s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat()) dest_bucket = s3.lookup( dest_bucket_name ) dest_key = dest_bucket.get_key( dest_key_name ) # See if we're overwriting an existing key if dest_key is not None: if not force: raise ValueError("'%s' already exists. Specify -f to overwrite it" % dest) # Determine the total size and calculate byte ranges src_bucket = s3.lookup( src_bucket_name ) src_key = src_bucket.get_key( src_key_name ) size = src_key.size # If file is less than 5G, copy it directly if size < 5*1024*1024*1024: logging.info("Source object is %0.2fM copying it directly" % ( size/1024./1024. )) t1 = time.time() src_key.copy( dest_bucket_name, dest_key_name, reduced_redundancy=reduced_redundancy ) t2 = time.time() - t1 s = size/1024./1024. logger.info("Finished copying %0.2fM in %0.2fs (%0.2fMbps)" % (s, t2, s/t2)) return part_size = max(5*1024*1024, 1024*1024*split) num_parts = int(ceil(size / float(part_size))) logging.info("Source object is %0.2fM splitting into %d parts of size %0.2fM" % (size/1024./1024., num_parts, part_size/1024./1024.) ) # Create the multi-part upload object mpu = dest_bucket.initiate_multipart_upload( dest_key_name, reduced_redundancy=reduced_redundancy) logger.info("Initialized copy: %s" % mpu.id) # Generate arguments for invocations of do_part_copy def gen_args(num_parts): cur_pos = 0 for i in range(num_parts): part_start = cur_pos cur_pos = cur_pos + part_size part_end = min(cur_pos - 1, size - 1) part_num = i + 1 yield (src_bucket_name, src_key_name, dest_bucket_name, mpu.id, part_num, part_start, part_end) # Do the thing try: # Create a pool of workers pool = Pool(processes=num_processes) t1 = time.time() pool.map_async(do_part_copy, gen_args(num_parts)).get(9999999) # Print out some timings t2 = time.time() - t1 s = size/1024./1024. # Finalize mpu.complete_upload() logger.info("Finished copying %0.2fM in %0.2fs (%0.2fMbps)" % (s, t2, s/t2)) except KeyboardInterrupt: logger.warn("Received KeyboardInterrupt, canceling copy") pool.terminate() mpu.cancel_upload() except Exception as err: logger.error("Encountered an error, canceling copy") logger.error(err) mpu.cancel_upload()