transfer_test.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. # -*- coding: utf-8 -*-
  2. #
  3. # Copyright 2015 Google Inc.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. """Tests for transfer.py."""
  17. import string
  18. import httplib2
  19. import mock
  20. import six
  21. from six.moves import http_client
  22. import unittest2
  23. from apitools.base.py import base_api
  24. from apitools.base.py import exceptions
  25. from apitools.base.py import gzip
  26. from apitools.base.py import http_wrapper
  27. from apitools.base.py import transfer
  28. class TransferTest(unittest2.TestCase):
  29. def assertRangeAndContentRangeCompatible(self, request, response):
  30. request_prefix = 'bytes='
  31. self.assertIn('range', request.headers)
  32. self.assertTrue(request.headers['range'].startswith(request_prefix))
  33. request_range = request.headers['range'][len(request_prefix):]
  34. response_prefix = 'bytes '
  35. self.assertIn('content-range', response.info)
  36. response_header = response.info['content-range']
  37. self.assertTrue(response_header.startswith(response_prefix))
  38. response_range = (
  39. response_header[len(response_prefix):].partition('/')[0])
  40. msg = ('Request range ({0}) not a prefix of '
  41. 'response_range ({1})').format(
  42. request_range, response_range)
  43. self.assertTrue(response_range.startswith(request_range), msg=msg)
  44. def testComputeEndByte(self):
  45. total_size = 100
  46. chunksize = 10
  47. download = transfer.Download.FromStream(
  48. six.StringIO(), chunksize=chunksize, total_size=total_size)
  49. self.assertEqual(chunksize - 1,
  50. download._Download__ComputeEndByte(0, end=50))
  51. def testComputeEndByteReturnNone(self):
  52. download = transfer.Download.FromStream(six.StringIO())
  53. self.assertIsNone(
  54. download._Download__ComputeEndByte(0, use_chunks=False))
  55. def testComputeEndByteNoChunks(self):
  56. total_size = 100
  57. download = transfer.Download.FromStream(
  58. six.StringIO(), chunksize=10, total_size=total_size)
  59. for end in (None, 1000):
  60. self.assertEqual(
  61. total_size - 1,
  62. download._Download__ComputeEndByte(0, end=end,
  63. use_chunks=False),
  64. msg='Failed on end={0}'.format(end))
  65. def testComputeEndByteNoTotal(self):
  66. download = transfer.Download.FromStream(six.StringIO())
  67. default_chunksize = download.chunksize
  68. for chunksize in (100, default_chunksize):
  69. download.chunksize = chunksize
  70. for start in (0, 10):
  71. self.assertEqual(
  72. download.chunksize + start - 1,
  73. download._Download__ComputeEndByte(start),
  74. msg='Failed on start={0}, chunksize={1}'.format(
  75. start, chunksize))
  76. def testComputeEndByteSmallTotal(self):
  77. total_size = 100
  78. download = transfer.Download.FromStream(six.StringIO(),
  79. total_size=total_size)
  80. for start in (0, 10):
  81. self.assertEqual(total_size - 1,
  82. download._Download__ComputeEndByte(start),
  83. msg='Failed on start={0}'.format(start))
  84. def testDownloadThenStream(self):
  85. bytes_http = object()
  86. http = object()
  87. download_stream = six.StringIO()
  88. download = transfer.Download.FromStream(download_stream,
  89. total_size=26)
  90. download.bytes_http = bytes_http
  91. base_url = 'https://part.one/'
  92. with mock.patch.object(http_wrapper, 'MakeRequest',
  93. autospec=True) as make_request:
  94. make_request.return_value = http_wrapper.Response(
  95. info={
  96. 'content-range': 'bytes 0-25/26',
  97. 'status': http_client.OK,
  98. },
  99. content=string.ascii_lowercase,
  100. request_url=base_url,
  101. )
  102. request = http_wrapper.Request(url='https://part.one/')
  103. download.InitializeDownload(request, http=http)
  104. self.assertEqual(1, make_request.call_count)
  105. received_request = make_request.call_args[0][1]
  106. self.assertEqual(base_url, received_request.url)
  107. self.assertRangeAndContentRangeCompatible(
  108. received_request, make_request.return_value)
  109. with mock.patch.object(http_wrapper, 'MakeRequest',
  110. autospec=True) as make_request:
  111. make_request.return_value = http_wrapper.Response(
  112. info={
  113. 'status': http_client.REQUESTED_RANGE_NOT_SATISFIABLE,
  114. },
  115. content='error',
  116. request_url=base_url,
  117. )
  118. download.StreamInChunks()
  119. self.assertEqual(1, make_request.call_count)
  120. received_request = make_request.call_args[0][1]
  121. self.assertEqual('bytes=26-', received_request.headers['range'])
  122. def testGetRange(self):
  123. for (start_byte, end_byte) in [(0, 25), (5, 15), (0, 0), (25, 25)]:
  124. bytes_http = object()
  125. http = object()
  126. download_stream = six.StringIO()
  127. download = transfer.Download.FromStream(download_stream,
  128. total_size=26,
  129. auto_transfer=False)
  130. download.bytes_http = bytes_http
  131. base_url = 'https://part.one/'
  132. with mock.patch.object(http_wrapper, 'MakeRequest',
  133. autospec=True) as make_request:
  134. make_request.return_value = http_wrapper.Response(
  135. info={
  136. 'content-range': 'bytes %d-%d/26' %
  137. (start_byte, end_byte),
  138. 'status': http_client.OK,
  139. },
  140. content=string.ascii_lowercase[start_byte:end_byte + 1],
  141. request_url=base_url,
  142. )
  143. request = http_wrapper.Request(url='https://part.one/')
  144. download.InitializeDownload(request, http=http)
  145. download.GetRange(start_byte, end_byte)
  146. self.assertEqual(1, make_request.call_count)
  147. received_request = make_request.call_args[0][1]
  148. self.assertEqual(base_url, received_request.url)
  149. self.assertRangeAndContentRangeCompatible(
  150. received_request, make_request.return_value)
  151. def testNonChunkedDownload(self):
  152. bytes_http = object()
  153. http = object()
  154. download_stream = six.StringIO()
  155. download = transfer.Download.FromStream(download_stream, total_size=52)
  156. download.bytes_http = bytes_http
  157. base_url = 'https://part.one/'
  158. with mock.patch.object(http_wrapper, 'MakeRequest',
  159. autospec=True) as make_request:
  160. make_request.return_value = http_wrapper.Response(
  161. info={
  162. 'content-range': 'bytes 0-51/52',
  163. 'status': http_client.OK,
  164. },
  165. content=string.ascii_lowercase * 2,
  166. request_url=base_url,
  167. )
  168. request = http_wrapper.Request(url='https://part.one/')
  169. download.InitializeDownload(request, http=http)
  170. self.assertEqual(1, make_request.call_count)
  171. received_request = make_request.call_args[0][1]
  172. self.assertEqual(base_url, received_request.url)
  173. self.assertRangeAndContentRangeCompatible(
  174. received_request, make_request.return_value)
  175. download_stream.seek(0)
  176. self.assertEqual(string.ascii_lowercase * 2,
  177. download_stream.getvalue())
  178. def testChunkedDownload(self):
  179. bytes_http = object()
  180. http = object()
  181. download_stream = six.StringIO()
  182. download = transfer.Download.FromStream(
  183. download_stream, chunksize=26, total_size=52)
  184. download.bytes_http = bytes_http
  185. # Setting autospec on a mock with an iterable side_effect is
  186. # currently broken (http://bugs.python.org/issue17826), so
  187. # instead we write a little function.
  188. def _ReturnBytes(unused_http, http_request,
  189. *unused_args, **unused_kwds):
  190. url = http_request.url
  191. if url == 'https://part.one/':
  192. return http_wrapper.Response(
  193. info={
  194. 'content-location': 'https://part.two/',
  195. 'content-range': 'bytes 0-25/52',
  196. 'status': http_client.PARTIAL_CONTENT,
  197. },
  198. content=string.ascii_lowercase,
  199. request_url='https://part.one/',
  200. )
  201. elif url == 'https://part.two/':
  202. return http_wrapper.Response(
  203. info={
  204. 'content-range': 'bytes 26-51/52',
  205. 'status': http_client.OK,
  206. },
  207. content=string.ascii_uppercase,
  208. request_url='https://part.two/',
  209. )
  210. else:
  211. self.fail('Unknown URL requested: %s' % url)
  212. with mock.patch.object(http_wrapper, 'MakeRequest',
  213. autospec=True) as make_request:
  214. make_request.side_effect = _ReturnBytes
  215. request = http_wrapper.Request(url='https://part.one/')
  216. download.InitializeDownload(request, http=http)
  217. self.assertEqual(2, make_request.call_count)
  218. for call in make_request.call_args_list:
  219. self.assertRangeAndContentRangeCompatible(
  220. call[0][1], _ReturnBytes(*call[0]))
  221. download_stream.seek(0)
  222. self.assertEqual(string.ascii_lowercase + string.ascii_uppercase,
  223. download_stream.getvalue())
  224. def testMultipartEncoding(self):
  225. # This is really a table test for various issues we've seen in
  226. # the past; see notes below for particular histories.
  227. test_cases = [
  228. # Python's mime module by default encodes lines that start
  229. # with "From " as ">From ", which we need to make sure we
  230. # don't run afoul of when sending content that isn't
  231. # intended to be so encoded. This test calls out that we
  232. # get this right. We test for both the multipart and
  233. # non-multipart case.
  234. 'line one\nFrom \nline two',
  235. # We had originally used a `six.StringIO` to hold the http
  236. # request body in the case of a multipart upload; for
  237. # bytes being uploaded in Python3, however, this causes
  238. # issues like this:
  239. # https://github.com/GoogleCloudPlatform/gcloud-python/issues/1760
  240. # We test below to ensure that we don't end up mangling
  241. # the body before sending.
  242. u'name,main_ingredient\nRäksmörgås,Räkor\nBaguette,Bröd',
  243. ]
  244. for upload_contents in test_cases:
  245. multipart_body = '{"body_field_one": 7}'
  246. upload_bytes = upload_contents.encode('ascii', 'backslashreplace')
  247. upload_config = base_api.ApiUploadInfo(
  248. accept=['*/*'],
  249. max_size=None,
  250. resumable_multipart=True,
  251. resumable_path=u'/resumable/upload',
  252. simple_multipart=True,
  253. simple_path=u'/upload',
  254. )
  255. url_builder = base_api._UrlBuilder('http://www.uploads.com')
  256. # Test multipart: having a body argument in http_request forces
  257. # multipart here.
  258. upload = transfer.Upload.FromStream(
  259. six.BytesIO(upload_bytes),
  260. 'text/plain',
  261. total_size=len(upload_bytes))
  262. http_request = http_wrapper.Request(
  263. 'http://www.uploads.com',
  264. headers={'content-type': 'text/plain'},
  265. body=multipart_body)
  266. upload.ConfigureRequest(upload_config, http_request, url_builder)
  267. self.assertEqual(
  268. 'multipart', url_builder.query_params['uploadType'])
  269. rewritten_upload_contents = b'\n'.join(
  270. http_request.body.split(b'--')[2].splitlines()[1:])
  271. self.assertTrue(rewritten_upload_contents.endswith(upload_bytes))
  272. # Test non-multipart (aka media): no body argument means this is
  273. # sent as media.
  274. upload = transfer.Upload.FromStream(
  275. six.BytesIO(upload_bytes),
  276. 'text/plain',
  277. total_size=len(upload_bytes))
  278. http_request = http_wrapper.Request(
  279. 'http://www.uploads.com',
  280. headers={'content-type': 'text/plain'})
  281. upload.ConfigureRequest(upload_config, http_request, url_builder)
  282. self.assertEqual(url_builder.query_params['uploadType'], 'media')
  283. rewritten_upload_contents = http_request.body
  284. self.assertTrue(rewritten_upload_contents.endswith(upload_bytes))
  285. class UploadTest(unittest2.TestCase):
  286. def setUp(self):
  287. # Sample highly compressible data.
  288. self.sample_data = b'abc' * 200
  289. # Stream of the sample data.
  290. self.sample_stream = six.BytesIO(self.sample_data)
  291. # Sample url_builder.
  292. self.url_builder = base_api._UrlBuilder('http://www.uploads.com')
  293. # Sample request.
  294. self.request = http_wrapper.Request(
  295. 'http://www.uploads.com',
  296. headers={'content-type': 'text/plain'})
  297. # Sample successful response.
  298. self.response = http_wrapper.Response(
  299. info={'status': http_client.OK,
  300. 'location': 'http://www.uploads.com'},
  301. content='',
  302. request_url='http://www.uploads.com',)
  303. # Sample failure response.
  304. self.fail_response = http_wrapper.Response(
  305. info={'status': http_client.SERVICE_UNAVAILABLE,
  306. 'location': 'http://www.uploads.com'},
  307. content='',
  308. request_url='http://www.uploads.com',)
  309. def testStreamInChunksCompressed(self):
  310. """Test that StreamInChunks will handle compression correctly."""
  311. # Create and configure the upload object.
  312. upload = transfer.Upload(
  313. stream=self.sample_stream,
  314. mime_type='text/plain',
  315. total_size=len(self.sample_data),
  316. close_stream=False,
  317. gzip_encoded=True)
  318. upload.strategy = transfer.RESUMABLE_UPLOAD
  319. # Set the chunk size so the entire stream is uploaded.
  320. upload.chunksize = len(self.sample_data)
  321. # Mock the upload to return the sample response.
  322. with mock.patch.object(transfer.Upload,
  323. '_Upload__SendMediaRequest') as mock_result, \
  324. mock.patch.object(http_wrapper,
  325. 'MakeRequest') as make_request:
  326. mock_result.return_value = self.response
  327. make_request.return_value = self.response
  328. # Initialization.
  329. upload.InitializeUpload(self.request, 'http')
  330. upload.StreamInChunks()
  331. # Get the uploaded request and end position of the stream.
  332. (request, _), _ = mock_result.call_args_list[0]
  333. # Ensure the mock was called.
  334. self.assertTrue(mock_result.called)
  335. # Ensure the correct content encoding was set.
  336. self.assertEqual(request.headers['Content-Encoding'], 'gzip')
  337. # Ensure the stream was compresed.
  338. self.assertLess(len(request.body), len(self.sample_data))
  339. def testStreamMediaCompressedFail(self):
  340. """Test that non-chunked uploads raise an exception.
  341. Ensure uploads with the compressed and resumable flags set called from
  342. StreamMedia raise an exception. Those uploads are unsupported.
  343. """
  344. # Create the upload object.
  345. upload = transfer.Upload(
  346. stream=self.sample_stream,
  347. mime_type='text/plain',
  348. total_size=len(self.sample_data),
  349. close_stream=False,
  350. auto_transfer=True,
  351. gzip_encoded=True)
  352. upload.strategy = transfer.RESUMABLE_UPLOAD
  353. # Mock the upload to return the sample response.
  354. with mock.patch.object(http_wrapper,
  355. 'MakeRequest') as make_request:
  356. make_request.return_value = self.response
  357. # Initialization.
  358. upload.InitializeUpload(self.request, 'http')
  359. # Ensure stream media raises an exception when the upload is
  360. # compressed. Compression is not supported on non-chunked uploads.
  361. with self.assertRaises(exceptions.InvalidUserInputError):
  362. upload.StreamMedia()
  363. def testAutoTransferCompressed(self):
  364. """Test that automatic transfers are compressed.
  365. Ensure uploads with the compressed, resumable, and automatic transfer
  366. flags set call StreamInChunks. StreamInChunks is tested in an earlier
  367. test.
  368. """
  369. # Create the upload object.
  370. upload = transfer.Upload(
  371. stream=self.sample_stream,
  372. mime_type='text/plain',
  373. total_size=len(self.sample_data),
  374. close_stream=False,
  375. gzip_encoded=True)
  376. upload.strategy = transfer.RESUMABLE_UPLOAD
  377. # Mock the upload to return the sample response.
  378. with mock.patch.object(transfer.Upload,
  379. 'StreamInChunks') as mock_result, \
  380. mock.patch.object(http_wrapper,
  381. 'MakeRequest') as make_request:
  382. mock_result.return_value = self.response
  383. make_request.return_value = self.response
  384. # Initialization.
  385. upload.InitializeUpload(self.request, 'http')
  386. # Ensure the mock was called.
  387. self.assertTrue(mock_result.called)
  388. def testMultipartCompressed(self):
  389. """Test that multipart uploads are compressed."""
  390. # Create the multipart configuration.
  391. upload_config = base_api.ApiUploadInfo(
  392. accept=['*/*'],
  393. max_size=None,
  394. simple_multipart=True,
  395. simple_path=u'/upload',)
  396. # Create the upload object.
  397. upload = transfer.Upload(
  398. stream=self.sample_stream,
  399. mime_type='text/plain',
  400. total_size=len(self.sample_data),
  401. close_stream=False,
  402. gzip_encoded=True)
  403. # Set a body to trigger multipart configuration.
  404. self.request.body = '{"body_field_one": 7}'
  405. # Configure the request.
  406. upload.ConfigureRequest(upload_config, self.request, self.url_builder)
  407. # Ensure the request is a multipart request now.
  408. self.assertEqual(
  409. self.url_builder.query_params['uploadType'], 'multipart')
  410. # Ensure the request is gzip encoded.
  411. self.assertEqual(self.request.headers['Content-Encoding'], 'gzip')
  412. # Ensure data is compressed
  413. self.assertLess(len(self.request.body), len(self.sample_data))
  414. # Ensure uncompressed data includes the sample data.
  415. with gzip.GzipFile(fileobj=self.request.body) as f:
  416. original = f.read()
  417. self.assertTrue(self.sample_data in original)
  418. def testMediaCompressed(self):
  419. """Test that media uploads are compressed."""
  420. # Create the media configuration.
  421. upload_config = base_api.ApiUploadInfo(
  422. accept=['*/*'],
  423. max_size=None,
  424. simple_multipart=True,
  425. simple_path=u'/upload',)
  426. # Create the upload object.
  427. upload = transfer.Upload(
  428. stream=self.sample_stream,
  429. mime_type='text/plain',
  430. total_size=len(self.sample_data),
  431. close_stream=False,
  432. gzip_encoded=True)
  433. # Configure the request.
  434. upload.ConfigureRequest(upload_config, self.request, self.url_builder)
  435. # Ensure the request is a media request now.
  436. self.assertEqual(self.url_builder.query_params['uploadType'], 'media')
  437. # Ensure the request is gzip encoded.
  438. self.assertEqual(self.request.headers['Content-Encoding'], 'gzip')
  439. # Ensure data is compressed
  440. self.assertLess(len(self.request.body), len(self.sample_data))
  441. # Ensure uncompressed data includes the sample data.
  442. with gzip.GzipFile(fileobj=self.request.body) as f:
  443. original = f.read()
  444. self.assertTrue(self.sample_data in original)
  445. def HttpRequestSideEffect(self, responses=None):
  446. responses = [(response.info, response.content)
  447. for response in responses]
  448. def _side_effect(uri, **kwargs): # pylint: disable=unused-argument
  449. body = kwargs['body']
  450. read_func = getattr(body, 'read', None)
  451. if read_func:
  452. # If the body is a stream, consume the stream.
  453. body = read_func()
  454. self.assertEqual(int(kwargs['headers']['content-length']),
  455. len(body))
  456. return responses.pop(0)
  457. return _side_effect
  458. def testRetryRequestChunks(self):
  459. """Test that StreamInChunks will retry correctly."""
  460. refresh_response = http_wrapper.Response(
  461. info={'status': http_wrapper.RESUME_INCOMPLETE,
  462. 'location': 'http://www.uploads.com'},
  463. content='',
  464. request_url='http://www.uploads.com',)
  465. # Create and configure the upload object.
  466. bytes_http = httplib2.Http()
  467. upload = transfer.Upload(
  468. stream=self.sample_stream,
  469. mime_type='text/plain',
  470. total_size=len(self.sample_data),
  471. close_stream=False,
  472. http=bytes_http)
  473. upload.strategy = transfer.RESUMABLE_UPLOAD
  474. # Set the chunk size so the entire stream is uploaded.
  475. upload.chunksize = len(self.sample_data)
  476. # Mock the upload to return the sample response.
  477. with mock.patch.object(bytes_http,
  478. 'request') as make_request:
  479. # This side effect also checks the request body.
  480. responses = [
  481. self.response, # Initial request in InitializeUpload().
  482. self.fail_response, # 503 status code from server.
  483. refresh_response, # Refresh upload progress.
  484. self.response, # Successful request.
  485. ]
  486. make_request.side_effect = self.HttpRequestSideEffect(responses)
  487. # Initialization.
  488. upload.InitializeUpload(self.request, bytes_http)
  489. upload.StreamInChunks()
  490. # Ensure the mock was called the correct number of times.
  491. self.assertEquals(make_request.call_count, len(responses))
  492. def testStreamInChunks(self):
  493. """Test StreamInChunks."""
  494. resume_incomplete_responses = [http_wrapper.Response(
  495. info={'status': http_wrapper.RESUME_INCOMPLETE,
  496. 'location': 'http://www.uploads.com',
  497. 'range': '0-{}'.format(end)},
  498. content='',
  499. request_url='http://www.uploads.com',) for end in [199, 399, 599]]
  500. responses = [
  501. self.response # Initial request in InitializeUpload().
  502. ] + resume_incomplete_responses + [
  503. self.response, # Successful request.
  504. ]
  505. # Create and configure the upload object.
  506. bytes_http = httplib2.Http()
  507. upload = transfer.Upload(
  508. stream=self.sample_stream,
  509. mime_type='text/plain',
  510. total_size=len(self.sample_data),
  511. close_stream=False,
  512. http=bytes_http)
  513. upload.strategy = transfer.RESUMABLE_UPLOAD
  514. # Set the chunk size so the entire stream is uploaded.
  515. upload.chunksize = 200
  516. # Mock the upload to return the sample response.
  517. with mock.patch.object(bytes_http,
  518. 'request') as make_request:
  519. # This side effect also checks the request body.
  520. make_request.side_effect = self.HttpRequestSideEffect(responses)
  521. # Initialization.
  522. upload.InitializeUpload(self.request, bytes_http)
  523. upload.StreamInChunks()
  524. # Ensure the mock was called the correct number of times.
  525. self.assertEquals(make_request.call_count, len(responses))