s3-mp-upload_2.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. def main(src, dest, num_processes=2, split=50, force=False, reduced_redundancy=False, verbose=False, quiet=False, secure=True, max_tries=5):
  2. # Check that dest is a valid S3 url
  3. split_rs = urlparse.urlsplit(dest)
  4. if split_rs.scheme != "s3":
  5. raise ValueError("'%s' is not an S3 url" % dest)
  6. s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
  7. s3.is_secure = secure
  8. bucket = s3.lookup(split_rs.netloc)
  9. if bucket == None:
  10. raise ValueError("'%s' is not a valid bucket" % split_rs.netloc)
  11. key = bucket.get_key(split_rs.path)
  12. # See if we're overwriting an existing key
  13. if key is not None:
  14. if not force:
  15. raise ValueError("'%s' already exists. Specify -f to overwrite it" % dest)
  16. # Determine the splits
  17. part_size = max(5*1024*1024, 1024*1024*split)
  18. src.seek(0,2)
  19. size = src.tell()
  20. num_parts = int(ceil(size / part_size))
  21. # If file is less than 5M, just upload it directly
  22. if size < 5*1024*1024:
  23. src.seek(0)
  24. t1 = time.time()
  25. k = boto.s3.key.Key(bucket,split_rs.path)
  26. k.set_contents_from_file(src)
  27. t2 = time.time() - t1
  28. s = size/1024./1024.
  29. logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2))
  30. return
  31. # Create the multi-part upload object
  32. mpu = bucket.initiate_multipart_upload(split_rs.path, reduced_redundancy=reduced_redundancy)
  33. logger.info("Initialized upload: %s" % mpu.id)
  34. # Generate arguments for invocations of do_part_upload
  35. def gen_args(num_parts, fold_last):
  36. for i in range(num_parts+1):
  37. part_start = part_size*i
  38. if i == (num_parts-1) and fold_last is True:
  39. yield (bucket.name, mpu.id, src.name, i, part_start, part_size*2, secure, max_tries, 0)
  40. break
  41. else:
  42. yield (bucket.name, mpu.id, src.name, i, part_start, part_size, secure, max_tries, 0)
  43. # If the last part is less than 5M, just fold it into the previous part
  44. fold_last = ((size % part_size) < 5*1024*1024)
  45. # Do the thing
  46. try:
  47. # Create a pool of workers
  48. pool = Pool(processes=num_processes)
  49. t1 = time.time()
  50. pool.map_async(do_part_upload, gen_args(num_parts, fold_last)).get(9999999)
  51. # Print out some timings
  52. t2 = time.time() - t1
  53. s = size/1024./1024.
  54. # Finalize
  55. src.close()
  56. mpu.complete_upload()
  57. logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2))
  58. except KeyboardInterrupt:
  59. logger.warn("Received KeyboardInterrupt, canceling upload")
  60. pool.terminate()
  61. mpu.cancel_upload()
  62. except Exception as err:
  63. logger.error("Encountered an error, canceling upload")
  64. logger.error(err)
  65. mpu.cancel_upload()