123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None):
- """
- Upload a file to a key into a bucket on GS, using GS resumable upload
- protocol.
- :type key: :class:`boto.s3.key.Key` or subclass
- :param key: The Key object to which data is to be uploaded
- :type fp: file-like object
- :param fp: The file pointer to upload
- :type headers: dict
- :param headers: The headers to pass along with the PUT request
- :type cb: function
- :param cb: a callback function that will be called to report progress on
- the upload. The callback should accept two integer parameters, the
- first representing the number of bytes that have been successfully
- transmitted to GS, and the second representing the total number of
- bytes that need to be transmitted.
- :type num_cb: int
- :param num_cb: (optional) If a callback is specified with the cb
- parameter, this parameter determines the granularity of the callback
- by defining the maximum number of times the callback will be called
- during the file transfer. Providing a negative integer will cause
- your callback to be called with each buffer read.
- :type hash_algs: dictionary
- :param hash_algs: (optional) Dictionary mapping hash algorithm
- descriptions to corresponding state-ful hashing objects that
- implement update(), digest(), and copy() (e.g. hashlib.md5()).
- Defaults to {'md5': md5()}.
- Raises ResumableUploadException if a problem occurs during the transfer.
- """
- if not headers:
- headers = {}
- # If Content-Type header is present and set to None, remove it.
- # This is gsutil's way of asking boto to refrain from auto-generating
- # that header.
- CT = 'Content-Type'
- if CT in headers and headers[CT] is None:
- del headers[CT]
- headers['User-Agent'] = UserAgent
- # Determine file size different ways for case where fp is actually a
- # wrapper around a Key vs an actual file.
- if isinstance(fp, KeyFile):
- file_length = fp.getkey().size
- else:
- fp.seek(0, os.SEEK_END)
- file_length = fp.tell()
- fp.seek(0)
- debug = key.bucket.connection.debug
- # Compute the MD5 checksum on the fly.
- if hash_algs is None:
- hash_algs = {'md5': md5}
- self.digesters = dict(
- (alg, hash_algs[alg]()) for alg in hash_algs or {})
- # Use num-retries from constructor if one was provided; else check
- # for a value specified in the boto config file; else default to 5.
- if self.num_retries is None:
- self.num_retries = config.getint('Boto', 'num_retries', 6)
- self.progress_less_iterations = 0
- while True: # Retry as long as we're making progress.
- server_had_bytes_before_attempt = self.server_has_bytes
- self.digesters_before_attempt = dict(
- (alg, self.digesters[alg].copy())
- for alg in self.digesters)
- try:
- # Save generation and metageneration in class state so caller
- # can find these values, for use in preconditions of future
- # operations on the uploaded object.
- (etag, self.generation, self.metageneration) = (
- self._attempt_resumable_upload(key, fp, file_length,
- headers, cb, num_cb))
- # Get the final digests for the uploaded content.
- for alg in self.digesters:
- key.local_hashes[alg] = self.digesters[alg].digest()
- # Upload succceded, so remove the tracker file (if have one).
- self._remove_tracker_file()
- self._check_final_md5(key, etag)
- key.generation = self.generation
- if debug >= 1:
- print('Resumable upload complete.')
- return
- except self.RETRYABLE_EXCEPTIONS as e:
- if debug >= 1:
- print('Caught exception (%s)' % e.__repr__())
- if isinstance(e, IOError) and e.errno == errno.EPIPE:
- # Broken pipe error causes httplib to immediately
- # close the socket (http://bugs.python.org/issue5542),
- # so we need to close the connection before we resume
- # the upload (which will cause a new connection to be
- # opened the next time an HTTP request is sent).
- key.bucket.connection.connection.close()
- except ResumableUploadException as e:
- self.handle_resumable_upload_exception(e, debug)
- self.track_progress_less_iterations(server_had_bytes_before_attempt,
- True, debug)
|