def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None): """ Upload a file to a key into a bucket on GS, using GS resumable upload protocol. :type key: :class:`boto.s3.key.Key` or subclass :param key: The Key object to which data is to be uploaded :type fp: file-like object :param fp: The file pointer to upload :type headers: dict :param headers: The headers to pass along with the PUT request :type cb: function :param cb: a callback function that will be called to report progress on the upload. The callback should accept two integer parameters, the first representing the number of bytes that have been successfully transmitted to GS, and the second representing the total number of bytes that need to be transmitted. :type num_cb: int :param num_cb: (optional) If a callback is specified with the cb parameter, this parameter determines the granularity of the callback by defining the maximum number of times the callback will be called during the file transfer. Providing a negative integer will cause your callback to be called with each buffer read. :type hash_algs: dictionary :param hash_algs: (optional) Dictionary mapping hash algorithm descriptions to corresponding state-ful hashing objects that implement update(), digest(), and copy() (e.g. hashlib.md5()). Defaults to {'md5': md5()}. Raises ResumableUploadException if a problem occurs during the transfer. """ if not headers: headers = {} # If Content-Type header is present and set to None, remove it. # This is gsutil's way of asking boto to refrain from auto-generating # that header. CT = 'Content-Type' if CT in headers and headers[CT] is None: del headers[CT] headers['User-Agent'] = UserAgent # Determine file size different ways for case where fp is actually a # wrapper around a Key vs an actual file. if isinstance(fp, KeyFile): file_length = fp.getkey().size else: fp.seek(0, os.SEEK_END) file_length = fp.tell() fp.seek(0) debug = key.bucket.connection.debug # Compute the MD5 checksum on the fly. if hash_algs is None: hash_algs = {'md5': md5} self.digesters = dict( (alg, hash_algs[alg]()) for alg in hash_algs or {}) # Use num-retries from constructor if one was provided; else check # for a value specified in the boto config file; else default to 5. if self.num_retries is None: self.num_retries = config.getint('Boto', 'num_retries', 6) self.progress_less_iterations = 0 while True: # Retry as long as we're making progress. server_had_bytes_before_attempt = self.server_has_bytes self.digesters_before_attempt = dict( (alg, self.digesters[alg].copy()) for alg in self.digesters) try: # Save generation and metageneration in class state so caller # can find these values, for use in preconditions of future # operations on the uploaded object. (etag, self.generation, self.metageneration) = ( self._attempt_resumable_upload(key, fp, file_length, headers, cb, num_cb)) # Get the final digests for the uploaded content. for alg in self.digesters: key.local_hashes[alg] = self.digesters[alg].digest() # Upload succceded, so remove the tracker file (if have one). self._remove_tracker_file() self._check_final_md5(key, etag) key.generation = self.generation if debug >= 1: print('Resumable upload complete.') return except self.RETRYABLE_EXCEPTIONS as e: if debug >= 1: print('Caught exception (%s)' % e.__repr__()) if isinstance(e, IOError) and e.errno == errno.EPIPE: # Broken pipe error causes httplib to immediately # close the socket (http://bugs.python.org/issue5542), # so we need to close the connection before we resume # the upload (which will cause a new connection to be # opened the next time an HTTP request is sent). key.bucket.connection.connection.close() except ResumableUploadException as e: self.handle_resumable_upload_exception(e, debug) self.track_progress_less_iterations(server_had_bytes_before_attempt, True, debug)