s3-mp-upload.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. #!/usr/bin/env python
  2. import argparse
  3. from cStringIO import StringIO
  4. import logging
  5. from math import ceil
  6. from multiprocessing import Pool
  7. import time
  8. import urlparse
  9. import boto
  10. from boto.s3.connection import OrdinaryCallingFormat
  11. parser = argparse.ArgumentParser(description="Transfer large files to S3",
  12. prog="s3-mp-upload")
  13. parser.add_argument("src", type=file, help="The file to transfer")
  14. parser.add_argument("dest", help="The S3 destination object")
  15. parser.add_argument("-np", "--num-processes", help="Number of processors to use",
  16. type=int, default=2)
  17. parser.add_argument("-f", "--force", help="Overwrite an existing S3 key",
  18. action="store_true")
  19. parser.add_argument("-s", "--split", help="Split size, in Mb", type=int, default=50)
  20. parser.add_argument("-rrs", "--reduced-redundancy", help="Use reduced redundancy storage. Default is standard.", default=False, action="store_true")
  21. parser.add_argument("--insecure", dest='secure', help="Use HTTP for connection",
  22. default=True, action="store_false")
  23. parser.add_argument("-t", "--max-tries", help="Max allowed retries for http timeout", type=int, default=5)
  24. parser.add_argument("-v", "--verbose", help="Be more verbose", default=False, action="store_true")
  25. parser.add_argument("-q", "--quiet", help="Be less verbose (for use in cron jobs)", default=False, action="store_true")
  26. logger = logging.getLogger("s3-mp-upload")
  27. def do_part_upload(args):
  28. """
  29. Upload a part of a MultiPartUpload
  30. Open the target file and read in a chunk. Since we can't pickle
  31. S3Connection or MultiPartUpload objects, we have to reconnect and lookup
  32. the MPU object with each part upload.
  33. :type args: tuple of (string, string, string, int, int, int)
  34. :param args: The actual arguments of this method. Due to lameness of
  35. multiprocessing, we have to extract these outside of the
  36. function definition.
  37. The arguments are: S3 Bucket name, MultiPartUpload id, file
  38. name, the part number, part offset, part size
  39. """
  40. # Multiprocessing args lameness
  41. bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries = args
  42. logger.debug("do_part_upload got args: %s" % (args,))
  43. # Connect to S3, get the MultiPartUpload
  44. s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
  45. s3.is_secure = secure
  46. bucket = s3.lookup(bucket_name)
  47. mpu = None
  48. for mp in bucket.list_multipart_uploads():
  49. if mp.id == mpu_id:
  50. mpu = mp
  51. break
  52. if mpu is None:
  53. raise Exception("Could not find MultiPartUpload %s" % mpu_id)
  54. # Read the chunk from the file
  55. fp = open(fname, 'rb')
  56. fp.seek(start)
  57. data = fp.read(size)
  58. fp.close()
  59. if not data:
  60. raise Exception("Unexpectedly tried to read an empty chunk")
  61. def progress(x,y):
  62. logger.debug("Part %d: %0.2f%%" % (i+1, 100.*x/y))
  63. try:
  64. # Do the upload
  65. t1 = time.time()
  66. mpu.upload_part_from_file(StringIO(data), i+1, cb=progress)
  67. # Print some timings
  68. t2 = time.time() - t1
  69. s = len(data)/1024./1024.
  70. logger.info("Uploaded part %s (%0.2fM) in %0.2fs at %0.2fMBps" % (i+1, s, t2, s/t2))
  71. except Exception as err:
  72. logger.debug("Retry request %d of max %d times" % (current_tries, max_tries))
  73. if (current_tries > max_tries):
  74. logger.error(err)
  75. else:
  76. time.sleep(3)
  77. current_tries += 1
  78. do_part_download(bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries)
  79. def main(src, dest, num_processes=2, split=50, force=False, reduced_redundancy=False, verbose=False, quiet=False, secure=True, max_tries=5):
  80. # Check that dest is a valid S3 url
  81. split_rs = urlparse.urlsplit(dest)
  82. if split_rs.scheme != "s3":
  83. raise ValueError("'%s' is not an S3 url" % dest)
  84. s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
  85. s3.is_secure = secure
  86. bucket = s3.lookup(split_rs.netloc)
  87. if bucket == None:
  88. raise ValueError("'%s' is not a valid bucket" % split_rs.netloc)
  89. key = bucket.get_key(split_rs.path)
  90. # See if we're overwriting an existing key
  91. if key is not None:
  92. if not force:
  93. raise ValueError("'%s' already exists. Specify -f to overwrite it" % dest)
  94. # Determine the splits
  95. part_size = max(5*1024*1024, 1024*1024*split)
  96. src.seek(0,2)
  97. size = src.tell()
  98. num_parts = int(ceil(size / part_size))
  99. # If file is less than 5M, just upload it directly
  100. if size < 5*1024*1024:
  101. src.seek(0)
  102. t1 = time.time()
  103. k = boto.s3.key.Key(bucket,split_rs.path)
  104. k.set_contents_from_file(src)
  105. t2 = time.time() - t1
  106. s = size/1024./1024.
  107. logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2))
  108. return
  109. # Create the multi-part upload object
  110. mpu = bucket.initiate_multipart_upload(split_rs.path, reduced_redundancy=reduced_redundancy)
  111. logger.info("Initialized upload: %s" % mpu.id)
  112. # Generate arguments for invocations of do_part_upload
  113. def gen_args(num_parts, fold_last):
  114. for i in range(num_parts+1):
  115. part_start = part_size*i
  116. if i == (num_parts-1) and fold_last is True:
  117. yield (bucket.name, mpu.id, src.name, i, part_start, part_size*2, secure, max_tries, 0)
  118. break
  119. else:
  120. yield (bucket.name, mpu.id, src.name, i, part_start, part_size, secure, max_tries, 0)
  121. # If the last part is less than 5M, just fold it into the previous part
  122. fold_last = ((size % part_size) < 5*1024*1024)
  123. # Do the thing
  124. try:
  125. # Create a pool of workers
  126. pool = Pool(processes=num_processes)
  127. t1 = time.time()
  128. pool.map_async(do_part_upload, gen_args(num_parts, fold_last)).get(9999999)
  129. # Print out some timings
  130. t2 = time.time() - t1
  131. s = size/1024./1024.
  132. # Finalize
  133. src.close()
  134. mpu.complete_upload()
  135. logger.info("Finished uploading %0.2fM in %0.2fs (%0.2fMBps)" % (s, t2, s/t2))
  136. except KeyboardInterrupt:
  137. logger.warn("Received KeyboardInterrupt, canceling upload")
  138. pool.terminate()
  139. mpu.cancel_upload()
  140. except Exception as err:
  141. logger.error("Encountered an error, canceling upload")
  142. logger.error(err)
  143. mpu.cancel_upload()
  144. if __name__ == "__main__":
  145. logging.basicConfig(level=logging.INFO)
  146. args = parser.parse_args()
  147. arg_dict = vars(args)
  148. if arg_dict['quiet'] == True:
  149. logger.setLevel(logging.WARNING)
  150. if arg_dict['verbose'] == True:
  151. logger.setLevel(logging.DEBUG)
  152. logger.debug("CLI args: %s" % args)
  153. main(**arg_dict)