s3-mp-upload_1.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. def do_part_upload(args):
  2. """
  3. Upload a part of a MultiPartUpload
  4. Open the target file and read in a chunk. Since we can't pickle
  5. S3Connection or MultiPartUpload objects, we have to reconnect and lookup
  6. the MPU object with each part upload.
  7. :type args: tuple of (string, string, string, int, int, int)
  8. :param args: The actual arguments of this method. Due to lameness of
  9. multiprocessing, we have to extract these outside of the
  10. function definition.
  11. The arguments are: S3 Bucket name, MultiPartUpload id, file
  12. name, the part number, part offset, part size
  13. """
  14. # Multiprocessing args lameness
  15. bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries = args
  16. logger.debug("do_part_upload got args: %s" % (args,))
  17. # Connect to S3, get the MultiPartUpload
  18. s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
  19. s3.is_secure = secure
  20. bucket = s3.lookup(bucket_name)
  21. mpu = None
  22. for mp in bucket.list_multipart_uploads():
  23. if mp.id == mpu_id:
  24. mpu = mp
  25. break
  26. if mpu is None:
  27. raise Exception("Could not find MultiPartUpload %s" % mpu_id)
  28. # Read the chunk from the file
  29. fp = open(fname, 'rb')
  30. fp.seek(start)
  31. data = fp.read(size)
  32. fp.close()
  33. if not data:
  34. raise Exception("Unexpectedly tried to read an empty chunk")
  35. def progress(x,y):
  36. logger.debug("Part %d: %0.2f%%" % (i+1, 100.*x/y))
  37. try:
  38. # Do the upload
  39. t1 = time.time()
  40. mpu.upload_part_from_file(StringIO(data), i+1, cb=progress)
  41. # Print some timings
  42. t2 = time.time() - t1
  43. s = len(data)/1024./1024.
  44. logger.info("Uploaded part %s (%0.2fM) in %0.2fs at %0.2fMBps" % (i+1, s, t2, s/t2))
  45. except Exception as err:
  46. logger.debug("Retry request %d of max %d times" % (current_tries, max_tries))
  47. if (current_tries > max_tries):
  48. logger.error(err)
  49. else:
  50. time.sleep(3)
  51. current_tries += 1
  52. do_part_download(bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries)