resumable_upload_handler_5.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None):
  2. """
  3. Upload a file to a key into a bucket on GS, using GS resumable upload
  4. protocol.
  5. :type key: :class:`boto.s3.key.Key` or subclass
  6. :param key: The Key object to which data is to be uploaded
  7. :type fp: file-like object
  8. :param fp: The file pointer to upload
  9. :type headers: dict
  10. :param headers: The headers to pass along with the PUT request
  11. :type cb: function
  12. :param cb: a callback function that will be called to report progress on
  13. the upload. The callback should accept two integer parameters, the
  14. first representing the number of bytes that have been successfully
  15. transmitted to GS, and the second representing the total number of
  16. bytes that need to be transmitted.
  17. :type num_cb: int
  18. :param num_cb: (optional) If a callback is specified with the cb
  19. parameter, this parameter determines the granularity of the callback
  20. by defining the maximum number of times the callback will be called
  21. during the file transfer. Providing a negative integer will cause
  22. your callback to be called with each buffer read.
  23. :type hash_algs: dictionary
  24. :param hash_algs: (optional) Dictionary mapping hash algorithm
  25. descriptions to corresponding state-ful hashing objects that
  26. implement update(), digest(), and copy() (e.g. hashlib.md5()).
  27. Defaults to {'md5': md5()}.
  28. Raises ResumableUploadException if a problem occurs during the transfer.
  29. """
  30. if not headers:
  31. headers = {}
  32. # If Content-Type header is present and set to None, remove it.
  33. # This is gsutil's way of asking boto to refrain from auto-generating
  34. # that header.
  35. CT = 'Content-Type'
  36. if CT in headers and headers[CT] is None:
  37. del headers[CT]
  38. headers['User-Agent'] = UserAgent
  39. # Determine file size different ways for case where fp is actually a
  40. # wrapper around a Key vs an actual file.
  41. if isinstance(fp, KeyFile):
  42. file_length = fp.getkey().size
  43. else:
  44. fp.seek(0, os.SEEK_END)
  45. file_length = fp.tell()
  46. fp.seek(0)
  47. debug = key.bucket.connection.debug
  48. # Compute the MD5 checksum on the fly.
  49. if hash_algs is None:
  50. hash_algs = {'md5': md5}
  51. self.digesters = dict(
  52. (alg, hash_algs[alg]()) for alg in hash_algs or {})
  53. # Use num-retries from constructor if one was provided; else check
  54. # for a value specified in the boto config file; else default to 5.
  55. if self.num_retries is None:
  56. self.num_retries = config.getint('Boto', 'num_retries', 6)
  57. self.progress_less_iterations = 0
  58. while True: # Retry as long as we're making progress.
  59. server_had_bytes_before_attempt = self.server_has_bytes
  60. self.digesters_before_attempt = dict(
  61. (alg, self.digesters[alg].copy())
  62. for alg in self.digesters)
  63. try:
  64. # Save generation and metageneration in class state so caller
  65. # can find these values, for use in preconditions of future
  66. # operations on the uploaded object.
  67. (etag, self.generation, self.metageneration) = (
  68. self._attempt_resumable_upload(key, fp, file_length,
  69. headers, cb, num_cb))
  70. # Get the final digests for the uploaded content.
  71. for alg in self.digesters:
  72. key.local_hashes[alg] = self.digesters[alg].digest()
  73. # Upload succceded, so remove the tracker file (if have one).
  74. self._remove_tracker_file()
  75. self._check_final_md5(key, etag)
  76. key.generation = self.generation
  77. if debug >= 1:
  78. print('Resumable upload complete.')
  79. return
  80. except self.RETRYABLE_EXCEPTIONS as e:
  81. if debug >= 1:
  82. print('Caught exception (%s)' % e.__repr__())
  83. if isinstance(e, IOError) and e.errno == errno.EPIPE:
  84. # Broken pipe error causes httplib to immediately
  85. # close the socket (http://bugs.python.org/issue5542),
  86. # so we need to close the connection before we resume
  87. # the upload (which will cause a new connection to be
  88. # opened the next time an HTTP request is sent).
  89. key.bucket.connection.connection.close()
  90. except ResumableUploadException as e:
  91. self.handle_resumable_upload_exception(e, debug)
  92. self.track_progress_less_iterations(server_had_bytes_before_attempt,
  93. True, debug)