s3-mp-copy_3.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. def main(src, dest, num_processes=2, split=50, force=False, reduced_redundancy=False, verbose=False):
  2. dest_bucket_name, dest_key_name = validate_url( dest )
  3. src_bucket_name, src_key_name = validate_url( src )
  4. s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
  5. dest_bucket = s3.lookup( dest_bucket_name )
  6. dest_key = dest_bucket.get_key( dest_key_name )
  7. # See if we're overwriting an existing key
  8. if dest_key is not None:
  9. if not force:
  10. raise ValueError("'%s' already exists. Specify -f to overwrite it" % dest)
  11. # Determine the total size and calculate byte ranges
  12. src_bucket = s3.lookup( src_bucket_name )
  13. src_key = src_bucket.get_key( src_key_name )
  14. size = src_key.size
  15. # If file is less than 5G, copy it directly
  16. if size < 5*1024*1024*1024:
  17. logging.info("Source object is %0.2fM copying it directly" % ( size/1024./1024. ))
  18. t1 = time.time()
  19. src_key.copy( dest_bucket_name, dest_key_name, reduced_redundancy=reduced_redundancy )
  20. t2 = time.time() - t1
  21. s = size/1024./1024.
  22. logger.info("Finished copying %0.2fM in %0.2fs (%0.2fMbps)" % (s, t2, s/t2))
  23. return
  24. part_size = max(5*1024*1024, 1024*1024*split)
  25. num_parts = int(ceil(size / float(part_size)))
  26. logging.info("Source object is %0.2fM splitting into %d parts of size %0.2fM" % (size/1024./1024., num_parts, part_size/1024./1024.) )
  27. # Create the multi-part upload object
  28. mpu = dest_bucket.initiate_multipart_upload( dest_key_name, reduced_redundancy=reduced_redundancy)
  29. logger.info("Initialized copy: %s" % mpu.id)
  30. # Generate arguments for invocations of do_part_copy
  31. def gen_args(num_parts):
  32. cur_pos = 0
  33. for i in range(num_parts):
  34. part_start = cur_pos
  35. cur_pos = cur_pos + part_size
  36. part_end = min(cur_pos - 1, size - 1)
  37. part_num = i + 1
  38. yield (src_bucket_name, src_key_name, dest_bucket_name, mpu.id, part_num, part_start, part_end)
  39. # Do the thing
  40. try:
  41. # Create a pool of workers
  42. pool = Pool(processes=num_processes)
  43. t1 = time.time()
  44. pool.map_async(do_part_copy, gen_args(num_parts)).get(9999999)
  45. # Print out some timings
  46. t2 = time.time() - t1
  47. s = size/1024./1024.
  48. # Finalize
  49. mpu.complete_upload()
  50. logger.info("Finished copying %0.2fM in %0.2fs (%0.2fMbps)" % (s, t2, s/t2))
  51. except KeyboardInterrupt:
  52. logger.warn("Received KeyboardInterrupt, canceling copy")
  53. pool.terminate()
  54. mpu.cancel_upload()
  55. except Exception as err:
  56. logger.error("Encountered an error, canceling copy")
  57. logger.error(err)
  58. mpu.cancel_upload()