s3-multiputter.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. #!/usr/bin/python
  2. '''
  3. s3-multiputter.py
  4. Description:
  5. Reads a huge file (or device) and uploads to Amazon S3.
  6. Multiple workers are launched which read & send in parallel.
  7. Workers are allocated one chunk of the file at a time.
  8. Usage:
  9. s3-multiputter.py <BUCKET> <FILE> <THREADS> <CHUNKSIZE>
  10. BUCKET: The S3 bucket name to upload to
  11. FILE: The source file to upload
  12. THREADS: Number of parallel uploader threads
  13. CHUNKSIZE: Size (MB) of each chunk
  14. Prerequisites:
  15. Boto library must be installed & configured with AWS creds
  16. http://code.google.com/p/boto/wiki/BotoConfig
  17. Created on Aug 30, 2011
  18. @author: louis zuckerman
  19. '''
  20. import boto
  21. import sys
  22. import io
  23. import cStringIO
  24. from multiprocessing import Process, Queue
  25. from Queue import Empty
  26. import time
  27. class MPUploader(Process):
  28. def __init__(self, queue, donequeue, srcfile, chunksize, bucket):
  29. self.s3c = boto.connect_s3()
  30. for mpu in self.s3c.lookup(bucket).list_multipart_uploads():
  31. if mpu.key_name == srcfile:
  32. self.multipart = mpu
  33. break
  34. self.work = queue
  35. self.donework = donequeue
  36. self.infilename = srcfile
  37. self.chunksize = chunksize
  38. Process.__init__(self)
  39. def send_part(self, partno):
  40. while True:
  41. try:
  42. self.buffer.seek(0)
  43. self.multipart.upload_part_from_file(self.buffer, partno+1)
  44. break
  45. except Exception as s3err:
  46. print ("S3 ERROR IN PART ", partno, ": ", type(s3err), "\n\t", s3err, "\n\t\t(retrying)")
  47. continue
  48. def read_part(self, partno):
  49. while True:
  50. try:
  51. self.infile = open(self.infilename, "rb", 0)
  52. self.infile.seek(partno*self.chunksize)
  53. self.buffer.write(self.infile.read(self.chunksize))
  54. self.infile.close()
  55. return self.buffer.tell() #return number of bytes read, or 0 if EOF
  56. except Exception as ioerr:
  57. return 0
  58. def run(self):
  59. while True:
  60. uppart = self.work.get()
  61. self.buffer = cStringIO.StringIO()
  62. if self.read_part(uppart) == 0:
  63. break #past EOF
  64. self.send_part(uppart)
  65. self.buffer.close()
  66. self.donework.put(uppart)
  67. if __name__ == '__main__':
  68. buck = sys.argv[1]
  69. srcfile = sys.argv[2]
  70. workers = int(sys.argv[3])
  71. chunksize = int(sys.argv[4])*1024*1024
  72. s3c = boto.connect_s3()
  73. mpu = s3c.lookup(buck).initiate_multipart_upload(srcfile)
  74. work = Queue()
  75. donework = Queue()
  76. starttime = time.clock()
  77. chunkstep = 100
  78. chunkserving = 0
  79. chunksdone = 0
  80. if (chunkserving - chunksdone) < (chunkstep / 2):
  81. for c in range(chunkserving, chunkserving+chunkstep):
  82. work.put(c)
  83. chunkserving += chunkstep
  84. uploader = range(workers)
  85. print ("Launching workers ",)
  86. for i in range(workers):
  87. uploader[i] = MPUploader(work, donework, srcfile, chunksize, buck)
  88. uploader[i].start()
  89. print( ".",)
  90. sys.stdout.flush()
  91. print (len(uploader))
  92. def childrenExist(kids):
  93. for child in kids:
  94. if child.is_alive():
  95. return True
  96. return False
  97. while childrenExist(uploader):
  98. try:
  99. while True:
  100. try:
  101. donework.get_nowait()
  102. chunksdone += 1
  103. except Empty:
  104. break
  105. if (chunkserving - chunksdone) < (chunkstep / 2):
  106. for c in range(chunkserving, chunkserving+chunkstep):
  107. work.put(c)
  108. chunkserving += chunkstep
  109. print ((chunksdone*chunksize)/(1024*1024), " ",)
  110. sys.stdout.flush()
  111. time.sleep(1)
  112. except KeyboardInterrupt:
  113. for w in uploader:
  114. w.terminate()
  115. time.sleep(1)
  116. mpu.cancel_upload()
  117. print ("ABORTED!")
  118. exit()
  119. complete = mpu.complete_upload()
  120. print ("COMPLETED IN ", time.clock()-starttime, "s")
  121. exit()