upload.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. # Copyright 2017 Google Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Support for resumable uploads.
  15. Also supported here are simple (media) uploads and multipart
  16. uploads that contain both metadata and a small file as payload.
  17. """
  18. from google.resumable_media import _upload
  19. from google.resumable_media.requests import _helpers
  20. class SimpleUpload(_helpers.RequestsMixin, _upload.SimpleUpload):
  21. """Upload a resource to a Google API.
  22. A **simple** media upload sends no metadata and completes the upload
  23. in a single request.
  24. Args:
  25. upload_url (str): The URL where the content will be uploaded.
  26. headers (Optional[Mapping[str, str]]): Extra headers that should
  27. be sent with the request, e.g. headers for encrypted data.
  28. Attributes:
  29. upload_url (str): The URL where the content will be uploaded.
  30. """
  31. def transmit(self, transport, data, content_type):
  32. """Transmit the resource to be uploaded.
  33. Args:
  34. transport (~requests.Session): A ``requests`` object which can
  35. make authenticated requests.
  36. data (bytes): The resource content to be uploaded.
  37. content_type (str): The content type of the resource, e.g. a JPEG
  38. image has content type ``image/jpeg``.
  39. Returns:
  40. ~requests.Response: The HTTP response returned by ``transport``.
  41. """
  42. method, url, payload, headers = self._prepare_request(
  43. data, content_type)
  44. result = _helpers.http_request(
  45. transport, method, url, data=payload, headers=headers,
  46. retry_strategy=self._retry_strategy)
  47. self._process_response(result)
  48. return result
  49. class MultipartUpload(_helpers.RequestsMixin, _upload.MultipartUpload):
  50. """Upload a resource with metadata to a Google API.
  51. A **multipart** upload sends both metadata and the resource in a single
  52. (multipart) request.
  53. Args:
  54. upload_url (str): The URL where the content will be uploaded.
  55. headers (Optional[Mapping[str, str]]): Extra headers that should
  56. be sent with the request, e.g. headers for encrypted data.
  57. Attributes:
  58. upload_url (str): The URL where the content will be uploaded.
  59. """
  60. def transmit(self, transport, data, metadata, content_type):
  61. """Transmit the resource to be uploaded.
  62. Args:
  63. transport (~requests.Session): A ``requests`` object which can
  64. make authenticated requests.
  65. data (bytes): The resource content to be uploaded.
  66. metadata (Mapping[str, str]): The resource metadata, such as an
  67. ACL list.
  68. content_type (str): The content type of the resource, e.g. a JPEG
  69. image has content type ``image/jpeg``.
  70. Returns:
  71. ~requests.Response: The HTTP response returned by ``transport``.
  72. """
  73. method, url, payload, headers = self._prepare_request(
  74. data, metadata, content_type)
  75. result = _helpers.http_request(
  76. transport, method, url, data=payload, headers=headers,
  77. retry_strategy=self._retry_strategy)
  78. self._process_response(result)
  79. return result
  80. class ResumableUpload(_helpers.RequestsMixin, _upload.ResumableUpload):
  81. """Initiate and fulfill a resumable upload to a Google API.
  82. A **resumable** upload sends an initial request with the resource metadata
  83. and then gets assigned an upload ID / upload URL to send bytes to.
  84. Using the upload URL, the upload is then done in chunks (determined by
  85. the user) until all bytes have been uploaded.
  86. When constructing a resumable upload, only the resumable upload URL and
  87. the chunk size are required:
  88. .. testsetup:: resumable-constructor
  89. bucket = u'bucket-foo'
  90. .. doctest:: resumable-constructor
  91. >>> from google.resumable_media.requests import ResumableUpload
  92. >>>
  93. >>> url_template = (
  94. ... u'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?'
  95. ... u'uploadType=resumable')
  96. >>> upload_url = url_template.format(bucket=bucket)
  97. >>>
  98. >>> chunk_size = 3 * 1024 * 1024 # 3MB
  99. >>> upload = ResumableUpload(upload_url, chunk_size)
  100. When initiating an upload (via :meth:`initiate`), the caller is expected
  101. to pass the resource being uploaded as a file-like ``stream``. If the size
  102. of the resource is explicitly known, it can be passed in directly:
  103. .. testsetup:: resumable-explicit-size
  104. import os
  105. import tempfile
  106. import mock
  107. import requests
  108. from six.moves import http_client
  109. from google.resumable_media.requests import ResumableUpload
  110. upload_url = u'http://test.invalid'
  111. chunk_size = 3 * 1024 * 1024 # 3MB
  112. upload = ResumableUpload(upload_url, chunk_size)
  113. file_desc, filename = tempfile.mkstemp()
  114. os.close(file_desc)
  115. data = b'some bytes!'
  116. with open(filename, u'wb') as file_obj:
  117. file_obj.write(data)
  118. fake_response = requests.Response()
  119. fake_response.status_code = int(http_client.OK)
  120. fake_response._content = b''
  121. resumable_url = u'http://test.invalid?upload_id=7up'
  122. fake_response.headers[u'location'] = resumable_url
  123. post_method = mock.Mock(return_value=fake_response, spec=[])
  124. transport = mock.Mock(request=post_method, spec=[u'request'])
  125. .. doctest:: resumable-explicit-size
  126. >>> import os
  127. >>>
  128. >>> upload.total_bytes is None
  129. True
  130. >>>
  131. >>> stream = open(filename, u'rb')
  132. >>> total_bytes = os.path.getsize(filename)
  133. >>> metadata = {u'name': filename}
  134. >>> response = upload.initiate(
  135. ... transport, stream, metadata, u'text/plain',
  136. ... total_bytes=total_bytes)
  137. >>> response
  138. <Response [200]>
  139. >>>
  140. >>> upload.total_bytes == total_bytes
  141. True
  142. .. testcleanup:: resumable-explicit-size
  143. os.remove(filename)
  144. If the stream is in a "final" state (i.e. it won't have any more bytes
  145. written to it), the total number of bytes can be determined implicitly
  146. from the ``stream`` itself:
  147. .. testsetup:: resumable-implicit-size
  148. import io
  149. import mock
  150. import requests
  151. from six.moves import http_client
  152. from google.resumable_media.requests import ResumableUpload
  153. upload_url = u'http://test.invalid'
  154. chunk_size = 3 * 1024 * 1024 # 3MB
  155. upload = ResumableUpload(upload_url, chunk_size)
  156. fake_response = requests.Response()
  157. fake_response.status_code = int(http_client.OK)
  158. fake_response._content = b''
  159. resumable_url = u'http://test.invalid?upload_id=7up'
  160. fake_response.headers[u'location'] = resumable_url
  161. post_method = mock.Mock(return_value=fake_response, spec=[])
  162. transport = mock.Mock(request=post_method, spec=[u'request'])
  163. data = b'some MOAR bytes!'
  164. metadata = {u'name': u'some-file.jpg'}
  165. content_type = u'image/jpeg'
  166. .. doctest:: resumable-implicit-size
  167. >>> stream = io.BytesIO(data)
  168. >>> response = upload.initiate(
  169. ... transport, stream, metadata, content_type)
  170. >>>
  171. >>> upload.total_bytes == len(data)
  172. True
  173. If the size of the resource is **unknown** when the upload is initiated,
  174. the ``stream_final`` argument can be used. This might occur if the
  175. resource is being dynamically created on the client (e.g. application
  176. logs). To use this argument:
  177. .. testsetup:: resumable-unknown-size
  178. import io
  179. import mock
  180. import requests
  181. from six.moves import http_client
  182. from google.resumable_media.requests import ResumableUpload
  183. upload_url = u'http://test.invalid'
  184. chunk_size = 3 * 1024 * 1024 # 3MB
  185. upload = ResumableUpload(upload_url, chunk_size)
  186. fake_response = requests.Response()
  187. fake_response.status_code = int(http_client.OK)
  188. fake_response._content = b''
  189. resumable_url = u'http://test.invalid?upload_id=7up'
  190. fake_response.headers[u'location'] = resumable_url
  191. post_method = mock.Mock(return_value=fake_response, spec=[])
  192. transport = mock.Mock(request=post_method, spec=[u'request'])
  193. metadata = {u'name': u'some-file.jpg'}
  194. content_type = u'application/octet-stream'
  195. stream = io.BytesIO(b'data')
  196. .. doctest:: resumable-unknown-size
  197. >>> response = upload.initiate(
  198. ... transport, stream, metadata, content_type,
  199. ... stream_final=False)
  200. >>>
  201. >>> upload.total_bytes is None
  202. True
  203. Args:
  204. upload_url (str): The URL where the resumable upload will be initiated.
  205. chunk_size (int): The size of each chunk used to upload the resource.
  206. headers (Optional[Mapping[str, str]]): Extra headers that should
  207. be sent with the :meth:`initiate` request, e.g. headers for
  208. encrypted data. These **will not** be sent with
  209. :meth:`transmit_next_chunk` or :meth:`recover` requests.
  210. Attributes:
  211. upload_url (str): The URL where the content will be uploaded.
  212. Raises:
  213. ValueError: If ``chunk_size`` is not a multiple of
  214. :data:`.UPLOAD_CHUNK_SIZE`.
  215. """
  216. def initiate(self, transport, stream, metadata, content_type,
  217. total_bytes=None, stream_final=True):
  218. """Initiate a resumable upload.
  219. By default, this method assumes your ``stream`` is in a "final"
  220. state ready to transmit. However, ``stream_final=False`` can be used
  221. to indicate that the size of the resource is not known. This can happen
  222. if bytes are being dynamically fed into ``stream``, e.g. if the stream
  223. is attached to application logs.
  224. If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be
  225. read from the stream every time :meth:`transmit_next_chunk` is called.
  226. If one of those reads produces strictly fewer bites than the chunk
  227. size, the upload will be concluded.
  228. Args:
  229. transport (~requests.Session): A ``requests`` object which can
  230. make authenticated requests.
  231. stream (IO[bytes]): The stream (i.e. file-like object) that will
  232. be uploaded. The stream **must** be at the beginning (i.e.
  233. ``stream.tell() == 0``).
  234. metadata (Mapping[str, str]): The resource metadata, such as an
  235. ACL list.
  236. content_type (str): The content type of the resource, e.g. a JPEG
  237. image has content type ``image/jpeg``.
  238. total_bytes (Optional[int]): The total number of bytes to be
  239. uploaded. If specified, the upload size **will not** be
  240. determined from the stream (even if ``stream_final=True``).
  241. stream_final (Optional[bool]): Indicates if the ``stream`` is
  242. "final" (i.e. no more bytes will be added to it). In this case
  243. we determine the upload size from the size of the stream. If
  244. ``total_bytes`` is passed, this argument will be ignored.
  245. Returns:
  246. ~requests.Response: The HTTP response returned by ``transport``.
  247. """
  248. method, url, payload, headers = self._prepare_initiate_request(
  249. stream, metadata, content_type,
  250. total_bytes=total_bytes, stream_final=stream_final)
  251. result = _helpers.http_request(
  252. transport, method, url, data=payload, headers=headers,
  253. retry_strategy=self._retry_strategy)
  254. self._process_initiate_response(result)
  255. return result
  256. def transmit_next_chunk(self, transport):
  257. """Transmit the next chunk of the resource to be uploaded.
  258. If the current upload was initiated with ``stream_final=False``,
  259. this method will dynamically determine if the upload has completed.
  260. The upload will be considered complete if the stream produces
  261. fewer than :attr:`chunk_size` bytes when a chunk is read from it.
  262. In the case of failure, an exception is thrown that preserves the
  263. failed response:
  264. .. testsetup:: bad-response
  265. import io
  266. import mock
  267. import requests
  268. from six.moves import http_client
  269. from google import resumable_media
  270. import google.resumable_media.requests.upload as upload_mod
  271. transport = mock.Mock(spec=[u'request'])
  272. fake_response = requests.Response()
  273. fake_response.status_code = int(http_client.BAD_REQUEST)
  274. transport.request.return_value = fake_response
  275. upload_url = u'http://test.invalid'
  276. upload = upload_mod.ResumableUpload(
  277. upload_url, resumable_media.UPLOAD_CHUNK_SIZE)
  278. # Fake that the upload has been initiate()-d
  279. data = b'data is here'
  280. upload._stream = io.BytesIO(data)
  281. upload._total_bytes = len(data)
  282. upload._resumable_url = u'http://test.invalid?upload_id=nope'
  283. .. doctest:: bad-response
  284. :options: +NORMALIZE_WHITESPACE
  285. >>> error = None
  286. >>> try:
  287. ... upload.transmit_next_chunk(transport)
  288. ... except resumable_media.InvalidResponse as caught_exc:
  289. ... error = caught_exc
  290. ...
  291. >>> error
  292. InvalidResponse('Request failed with status code', 400,
  293. 'Expected one of', <HTTPStatus.OK: 200>, 308)
  294. >>> error.response
  295. <Response [400]>
  296. Args:
  297. transport (~requests.Session): A ``requests`` object which can
  298. make authenticated requests.
  299. Returns:
  300. ~requests.Response: The HTTP response returned by ``transport``.
  301. Raises:
  302. ~google.resumable_media.common.InvalidResponse: If the status
  303. code is not 200 or 308.
  304. """
  305. method, url, payload, headers = self._prepare_request()
  306. result = _helpers.http_request(
  307. transport, method, url, data=payload, headers=headers,
  308. retry_strategy=self._retry_strategy)
  309. self._process_response(result, len(payload))
  310. return result
  311. def recover(self, transport):
  312. """Recover from a failure.
  313. This method should be used when a :class:`ResumableUpload` is in an
  314. :attr:`~ResumableUpload.invalid` state due to a request failure.
  315. This will verify the progress with the server and make sure the
  316. current upload is in a valid state before :meth:`transmit_next_chunk`
  317. can be used again.
  318. Args:
  319. transport (~requests.Session): A ``requests`` object which can
  320. make authenticated requests.
  321. Returns:
  322. ~requests.Response: The HTTP response returned by ``transport``.
  323. """
  324. method, url, payload, headers = self._prepare_recover_request()
  325. # NOTE: We assume "payload is None" but pass it along anyway.
  326. result = _helpers.http_request(
  327. transport, method, url, data=payload, headers=headers,
  328. retry_strategy=self._retry_strategy)
  329. self._process_recover_response(result)
  330. return result