123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- # Copyright 2017 Google Inc.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Support for resumable uploads.
- Also supported here are simple (media) uploads and multipart
- uploads that contain both metadata and a small file as payload.
- """
- from google.resumable_media import _upload
- from google.resumable_media.requests import _helpers
- class SimpleUpload(_helpers.RequestsMixin, _upload.SimpleUpload):
- """Upload a resource to a Google API.
- A **simple** media upload sends no metadata and completes the upload
- in a single request.
- Args:
- upload_url (str): The URL where the content will be uploaded.
- headers (Optional[Mapping[str, str]]): Extra headers that should
- be sent with the request, e.g. headers for encrypted data.
- Attributes:
- upload_url (str): The URL where the content will be uploaded.
- """
- def transmit(self, transport, data, content_type):
- """Transmit the resource to be uploaded.
- Args:
- transport (~requests.Session): A ``requests`` object which can
- make authenticated requests.
- data (bytes): The resource content to be uploaded.
- content_type (str): The content type of the resource, e.g. a JPEG
- image has content type ``image/jpeg``.
- Returns:
- ~requests.Response: The HTTP response returned by ``transport``.
- """
- method, url, payload, headers = self._prepare_request(
- data, content_type)
- result = _helpers.http_request(
- transport, method, url, data=payload, headers=headers,
- retry_strategy=self._retry_strategy)
- self._process_response(result)
- return result
- class MultipartUpload(_helpers.RequestsMixin, _upload.MultipartUpload):
- """Upload a resource with metadata to a Google API.
- A **multipart** upload sends both metadata and the resource in a single
- (multipart) request.
- Args:
- upload_url (str): The URL where the content will be uploaded.
- headers (Optional[Mapping[str, str]]): Extra headers that should
- be sent with the request, e.g. headers for encrypted data.
- Attributes:
- upload_url (str): The URL where the content will be uploaded.
- """
- def transmit(self, transport, data, metadata, content_type):
- """Transmit the resource to be uploaded.
- Args:
- transport (~requests.Session): A ``requests`` object which can
- make authenticated requests.
- data (bytes): The resource content to be uploaded.
- metadata (Mapping[str, str]): The resource metadata, such as an
- ACL list.
- content_type (str): The content type of the resource, e.g. a JPEG
- image has content type ``image/jpeg``.
- Returns:
- ~requests.Response: The HTTP response returned by ``transport``.
- """
- method, url, payload, headers = self._prepare_request(
- data, metadata, content_type)
- result = _helpers.http_request(
- transport, method, url, data=payload, headers=headers,
- retry_strategy=self._retry_strategy)
- self._process_response(result)
- return result
- class ResumableUpload(_helpers.RequestsMixin, _upload.ResumableUpload):
- """Initiate and fulfill a resumable upload to a Google API.
- A **resumable** upload sends an initial request with the resource metadata
- and then gets assigned an upload ID / upload URL to send bytes to.
- Using the upload URL, the upload is then done in chunks (determined by
- the user) until all bytes have been uploaded.
- When constructing a resumable upload, only the resumable upload URL and
- the chunk size are required:
- .. testsetup:: resumable-constructor
- bucket = u'bucket-foo'
- .. doctest:: resumable-constructor
- >>> from google.resumable_media.requests import ResumableUpload
- >>>
- >>> url_template = (
- ... u'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?'
- ... u'uploadType=resumable')
- >>> upload_url = url_template.format(bucket=bucket)
- >>>
- >>> chunk_size = 3 * 1024 * 1024 # 3MB
- >>> upload = ResumableUpload(upload_url, chunk_size)
- When initiating an upload (via :meth:`initiate`), the caller is expected
- to pass the resource being uploaded as a file-like ``stream``. If the size
- of the resource is explicitly known, it can be passed in directly:
- .. testsetup:: resumable-explicit-size
- import os
- import tempfile
- import mock
- import requests
- from six.moves import http_client
- from google.resumable_media.requests import ResumableUpload
- upload_url = u'http://test.invalid'
- chunk_size = 3 * 1024 * 1024 # 3MB
- upload = ResumableUpload(upload_url, chunk_size)
- file_desc, filename = tempfile.mkstemp()
- os.close(file_desc)
- data = b'some bytes!'
- with open(filename, u'wb') as file_obj:
- file_obj.write(data)
- fake_response = requests.Response()
- fake_response.status_code = int(http_client.OK)
- fake_response._content = b''
- resumable_url = u'http://test.invalid?upload_id=7up'
- fake_response.headers[u'location'] = resumable_url
- post_method = mock.Mock(return_value=fake_response, spec=[])
- transport = mock.Mock(request=post_method, spec=[u'request'])
- .. doctest:: resumable-explicit-size
- >>> import os
- >>>
- >>> upload.total_bytes is None
- True
- >>>
- >>> stream = open(filename, u'rb')
- >>> total_bytes = os.path.getsize(filename)
- >>> metadata = {u'name': filename}
- >>> response = upload.initiate(
- ... transport, stream, metadata, u'text/plain',
- ... total_bytes=total_bytes)
- >>> response
- <Response [200]>
- >>>
- >>> upload.total_bytes == total_bytes
- True
- .. testcleanup:: resumable-explicit-size
- os.remove(filename)
- If the stream is in a "final" state (i.e. it won't have any more bytes
- written to it), the total number of bytes can be determined implicitly
- from the ``stream`` itself:
- .. testsetup:: resumable-implicit-size
- import io
- import mock
- import requests
- from six.moves import http_client
- from google.resumable_media.requests import ResumableUpload
- upload_url = u'http://test.invalid'
- chunk_size = 3 * 1024 * 1024 # 3MB
- upload = ResumableUpload(upload_url, chunk_size)
- fake_response = requests.Response()
- fake_response.status_code = int(http_client.OK)
- fake_response._content = b''
- resumable_url = u'http://test.invalid?upload_id=7up'
- fake_response.headers[u'location'] = resumable_url
- post_method = mock.Mock(return_value=fake_response, spec=[])
- transport = mock.Mock(request=post_method, spec=[u'request'])
- data = b'some MOAR bytes!'
- metadata = {u'name': u'some-file.jpg'}
- content_type = u'image/jpeg'
- .. doctest:: resumable-implicit-size
- >>> stream = io.BytesIO(data)
- >>> response = upload.initiate(
- ... transport, stream, metadata, content_type)
- >>>
- >>> upload.total_bytes == len(data)
- True
- If the size of the resource is **unknown** when the upload is initiated,
- the ``stream_final`` argument can be used. This might occur if the
- resource is being dynamically created on the client (e.g. application
- logs). To use this argument:
- .. testsetup:: resumable-unknown-size
- import io
- import mock
- import requests
- from six.moves import http_client
- from google.resumable_media.requests import ResumableUpload
- upload_url = u'http://test.invalid'
- chunk_size = 3 * 1024 * 1024 # 3MB
- upload = ResumableUpload(upload_url, chunk_size)
- fake_response = requests.Response()
- fake_response.status_code = int(http_client.OK)
- fake_response._content = b''
- resumable_url = u'http://test.invalid?upload_id=7up'
- fake_response.headers[u'location'] = resumable_url
- post_method = mock.Mock(return_value=fake_response, spec=[])
- transport = mock.Mock(request=post_method, spec=[u'request'])
- metadata = {u'name': u'some-file.jpg'}
- content_type = u'application/octet-stream'
- stream = io.BytesIO(b'data')
- .. doctest:: resumable-unknown-size
- >>> response = upload.initiate(
- ... transport, stream, metadata, content_type,
- ... stream_final=False)
- >>>
- >>> upload.total_bytes is None
- True
- Args:
- upload_url (str): The URL where the resumable upload will be initiated.
- chunk_size (int): The size of each chunk used to upload the resource.
- headers (Optional[Mapping[str, str]]): Extra headers that should
- be sent with the :meth:`initiate` request, e.g. headers for
- encrypted data. These **will not** be sent with
- :meth:`transmit_next_chunk` or :meth:`recover` requests.
- Attributes:
- upload_url (str): The URL where the content will be uploaded.
- Raises:
- ValueError: If ``chunk_size`` is not a multiple of
- :data:`.UPLOAD_CHUNK_SIZE`.
- """
- def initiate(self, transport, stream, metadata, content_type,
- total_bytes=None, stream_final=True):
- """Initiate a resumable upload.
- By default, this method assumes your ``stream`` is in a "final"
- state ready to transmit. However, ``stream_final=False`` can be used
- to indicate that the size of the resource is not known. This can happen
- if bytes are being dynamically fed into ``stream``, e.g. if the stream
- is attached to application logs.
- If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be
- read from the stream every time :meth:`transmit_next_chunk` is called.
- If one of those reads produces strictly fewer bites than the chunk
- size, the upload will be concluded.
- Args:
- transport (~requests.Session): A ``requests`` object which can
- make authenticated requests.
- stream (IO[bytes]): The stream (i.e. file-like object) that will
- be uploaded. The stream **must** be at the beginning (i.e.
- ``stream.tell() == 0``).
- metadata (Mapping[str, str]): The resource metadata, such as an
- ACL list.
- content_type (str): The content type of the resource, e.g. a JPEG
- image has content type ``image/jpeg``.
- total_bytes (Optional[int]): The total number of bytes to be
- uploaded. If specified, the upload size **will not** be
- determined from the stream (even if ``stream_final=True``).
- stream_final (Optional[bool]): Indicates if the ``stream`` is
- "final" (i.e. no more bytes will be added to it). In this case
- we determine the upload size from the size of the stream. If
- ``total_bytes`` is passed, this argument will be ignored.
- Returns:
- ~requests.Response: The HTTP response returned by ``transport``.
- """
- method, url, payload, headers = self._prepare_initiate_request(
- stream, metadata, content_type,
- total_bytes=total_bytes, stream_final=stream_final)
- result = _helpers.http_request(
- transport, method, url, data=payload, headers=headers,
- retry_strategy=self._retry_strategy)
- self._process_initiate_response(result)
- return result
- def transmit_next_chunk(self, transport):
- """Transmit the next chunk of the resource to be uploaded.
- If the current upload was initiated with ``stream_final=False``,
- this method will dynamically determine if the upload has completed.
- The upload will be considered complete if the stream produces
- fewer than :attr:`chunk_size` bytes when a chunk is read from it.
- In the case of failure, an exception is thrown that preserves the
- failed response:
- .. testsetup:: bad-response
- import io
- import mock
- import requests
- from six.moves import http_client
- from google import resumable_media
- import google.resumable_media.requests.upload as upload_mod
- transport = mock.Mock(spec=[u'request'])
- fake_response = requests.Response()
- fake_response.status_code = int(http_client.BAD_REQUEST)
- transport.request.return_value = fake_response
- upload_url = u'http://test.invalid'
- upload = upload_mod.ResumableUpload(
- upload_url, resumable_media.UPLOAD_CHUNK_SIZE)
- # Fake that the upload has been initiate()-d
- data = b'data is here'
- upload._stream = io.BytesIO(data)
- upload._total_bytes = len(data)
- upload._resumable_url = u'http://test.invalid?upload_id=nope'
- .. doctest:: bad-response
- :options: +NORMALIZE_WHITESPACE
- >>> error = None
- >>> try:
- ... upload.transmit_next_chunk(transport)
- ... except resumable_media.InvalidResponse as caught_exc:
- ... error = caught_exc
- ...
- >>> error
- InvalidResponse('Request failed with status code', 400,
- 'Expected one of', <HTTPStatus.OK: 200>, 308)
- >>> error.response
- <Response [400]>
- Args:
- transport (~requests.Session): A ``requests`` object which can
- make authenticated requests.
- Returns:
- ~requests.Response: The HTTP response returned by ``transport``.
- Raises:
- ~google.resumable_media.common.InvalidResponse: If the status
- code is not 200 or 308.
- """
- method, url, payload, headers = self._prepare_request()
- result = _helpers.http_request(
- transport, method, url, data=payload, headers=headers,
- retry_strategy=self._retry_strategy)
- self._process_response(result, len(payload))
- return result
- def recover(self, transport):
- """Recover from a failure.
- This method should be used when a :class:`ResumableUpload` is in an
- :attr:`~ResumableUpload.invalid` state due to a request failure.
- This will verify the progress with the server and make sure the
- current upload is in a valid state before :meth:`transmit_next_chunk`
- can be used again.
- Args:
- transport (~requests.Session): A ``requests`` object which can
- make authenticated requests.
- Returns:
- ~requests.Response: The HTTP response returned by ``transport``.
- """
- method, url, payload, headers = self._prepare_recover_request()
- # NOTE: We assume "payload is None" but pass it along anyway.
- result = _helpers.http_request(
- transport, method, url, data=payload, headers=headers,
- retry_strategy=self._retry_strategy)
- self._process_recover_response(result)
- return result
|