123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576 |
- # -*- coding: utf-8 -*-
- #
- # Copyright 2015 Google Inc.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Tests for transfer.py."""
- import string
- import httplib2
- import mock
- import six
- from six.moves import http_client
- import unittest2
- from apitools.base.py import base_api
- from apitools.base.py import exceptions
- from apitools.base.py import gzip
- from apitools.base.py import http_wrapper
- from apitools.base.py import transfer
- class TransferTest(unittest2.TestCase):
- def assertRangeAndContentRangeCompatible(self, request, response):
- request_prefix = 'bytes='
- self.assertIn('range', request.headers)
- self.assertTrue(request.headers['range'].startswith(request_prefix))
- request_range = request.headers['range'][len(request_prefix):]
- response_prefix = 'bytes '
- self.assertIn('content-range', response.info)
- response_header = response.info['content-range']
- self.assertTrue(response_header.startswith(response_prefix))
- response_range = (
- response_header[len(response_prefix):].partition('/')[0])
- msg = ('Request range ({0}) not a prefix of '
- 'response_range ({1})').format(
- request_range, response_range)
- self.assertTrue(response_range.startswith(request_range), msg=msg)
- def testComputeEndByte(self):
- total_size = 100
- chunksize = 10
- download = transfer.Download.FromStream(
- six.StringIO(), chunksize=chunksize, total_size=total_size)
- self.assertEqual(chunksize - 1,
- download._Download__ComputeEndByte(0, end=50))
- def testComputeEndByteReturnNone(self):
- download = transfer.Download.FromStream(six.StringIO())
- self.assertIsNone(
- download._Download__ComputeEndByte(0, use_chunks=False))
- def testComputeEndByteNoChunks(self):
- total_size = 100
- download = transfer.Download.FromStream(
- six.StringIO(), chunksize=10, total_size=total_size)
- for end in (None, 1000):
- self.assertEqual(
- total_size - 1,
- download._Download__ComputeEndByte(0, end=end,
- use_chunks=False),
- msg='Failed on end={0}'.format(end))
- def testComputeEndByteNoTotal(self):
- download = transfer.Download.FromStream(six.StringIO())
- default_chunksize = download.chunksize
- for chunksize in (100, default_chunksize):
- download.chunksize = chunksize
- for start in (0, 10):
- self.assertEqual(
- download.chunksize + start - 1,
- download._Download__ComputeEndByte(start),
- msg='Failed on start={0}, chunksize={1}'.format(
- start, chunksize))
- def testComputeEndByteSmallTotal(self):
- total_size = 100
- download = transfer.Download.FromStream(six.StringIO(),
- total_size=total_size)
- for start in (0, 10):
- self.assertEqual(total_size - 1,
- download._Download__ComputeEndByte(start),
- msg='Failed on start={0}'.format(start))
- def testDownloadThenStream(self):
- bytes_http = object()
- http = object()
- download_stream = six.StringIO()
- download = transfer.Download.FromStream(download_stream,
- total_size=26)
- download.bytes_http = bytes_http
- base_url = 'https://part.one/'
- with mock.patch.object(http_wrapper, 'MakeRequest',
- autospec=True) as make_request:
- make_request.return_value = http_wrapper.Response(
- info={
- 'content-range': 'bytes 0-25/26',
- 'status': http_client.OK,
- },
- content=string.ascii_lowercase,
- request_url=base_url,
- )
- request = http_wrapper.Request(url='https://part.one/')
- download.InitializeDownload(request, http=http)
- self.assertEqual(1, make_request.call_count)
- received_request = make_request.call_args[0][1]
- self.assertEqual(base_url, received_request.url)
- self.assertRangeAndContentRangeCompatible(
- received_request, make_request.return_value)
- with mock.patch.object(http_wrapper, 'MakeRequest',
- autospec=True) as make_request:
- make_request.return_value = http_wrapper.Response(
- info={
- 'status': http_client.REQUESTED_RANGE_NOT_SATISFIABLE,
- },
- content='error',
- request_url=base_url,
- )
- download.StreamInChunks()
- self.assertEqual(1, make_request.call_count)
- received_request = make_request.call_args[0][1]
- self.assertEqual('bytes=26-', received_request.headers['range'])
- def testGetRange(self):
- for (start_byte, end_byte) in [(0, 25), (5, 15), (0, 0), (25, 25)]:
- bytes_http = object()
- http = object()
- download_stream = six.StringIO()
- download = transfer.Download.FromStream(download_stream,
- total_size=26,
- auto_transfer=False)
- download.bytes_http = bytes_http
- base_url = 'https://part.one/'
- with mock.patch.object(http_wrapper, 'MakeRequest',
- autospec=True) as make_request:
- make_request.return_value = http_wrapper.Response(
- info={
- 'content-range': 'bytes %d-%d/26' %
- (start_byte, end_byte),
- 'status': http_client.OK,
- },
- content=string.ascii_lowercase[start_byte:end_byte + 1],
- request_url=base_url,
- )
- request = http_wrapper.Request(url='https://part.one/')
- download.InitializeDownload(request, http=http)
- download.GetRange(start_byte, end_byte)
- self.assertEqual(1, make_request.call_count)
- received_request = make_request.call_args[0][1]
- self.assertEqual(base_url, received_request.url)
- self.assertRangeAndContentRangeCompatible(
- received_request, make_request.return_value)
- def testNonChunkedDownload(self):
- bytes_http = object()
- http = object()
- download_stream = six.StringIO()
- download = transfer.Download.FromStream(download_stream, total_size=52)
- download.bytes_http = bytes_http
- base_url = 'https://part.one/'
- with mock.patch.object(http_wrapper, 'MakeRequest',
- autospec=True) as make_request:
- make_request.return_value = http_wrapper.Response(
- info={
- 'content-range': 'bytes 0-51/52',
- 'status': http_client.OK,
- },
- content=string.ascii_lowercase * 2,
- request_url=base_url,
- )
- request = http_wrapper.Request(url='https://part.one/')
- download.InitializeDownload(request, http=http)
- self.assertEqual(1, make_request.call_count)
- received_request = make_request.call_args[0][1]
- self.assertEqual(base_url, received_request.url)
- self.assertRangeAndContentRangeCompatible(
- received_request, make_request.return_value)
- download_stream.seek(0)
- self.assertEqual(string.ascii_lowercase * 2,
- download_stream.getvalue())
- def testChunkedDownload(self):
- bytes_http = object()
- http = object()
- download_stream = six.StringIO()
- download = transfer.Download.FromStream(
- download_stream, chunksize=26, total_size=52)
- download.bytes_http = bytes_http
- # Setting autospec on a mock with an iterable side_effect is
- # currently broken (http://bugs.python.org/issue17826), so
- # instead we write a little function.
- def _ReturnBytes(unused_http, http_request,
- *unused_args, **unused_kwds):
- url = http_request.url
- if url == 'https://part.one/':
- return http_wrapper.Response(
- info={
- 'content-location': 'https://part.two/',
- 'content-range': 'bytes 0-25/52',
- 'status': http_client.PARTIAL_CONTENT,
- },
- content=string.ascii_lowercase,
- request_url='https://part.one/',
- )
- elif url == 'https://part.two/':
- return http_wrapper.Response(
- info={
- 'content-range': 'bytes 26-51/52',
- 'status': http_client.OK,
- },
- content=string.ascii_uppercase,
- request_url='https://part.two/',
- )
- else:
- self.fail('Unknown URL requested: %s' % url)
- with mock.patch.object(http_wrapper, 'MakeRequest',
- autospec=True) as make_request:
- make_request.side_effect = _ReturnBytes
- request = http_wrapper.Request(url='https://part.one/')
- download.InitializeDownload(request, http=http)
- self.assertEqual(2, make_request.call_count)
- for call in make_request.call_args_list:
- self.assertRangeAndContentRangeCompatible(
- call[0][1], _ReturnBytes(*call[0]))
- download_stream.seek(0)
- self.assertEqual(string.ascii_lowercase + string.ascii_uppercase,
- download_stream.getvalue())
- def testMultipartEncoding(self):
- # This is really a table test for various issues we've seen in
- # the past; see notes below for particular histories.
- test_cases = [
- # Python's mime module by default encodes lines that start
- # with "From " as ">From ", which we need to make sure we
- # don't run afoul of when sending content that isn't
- # intended to be so encoded. This test calls out that we
- # get this right. We test for both the multipart and
- # non-multipart case.
- 'line one\nFrom \nline two',
- # We had originally used a `six.StringIO` to hold the http
- # request body in the case of a multipart upload; for
- # bytes being uploaded in Python3, however, this causes
- # issues like this:
- # https://github.com/GoogleCloudPlatform/gcloud-python/issues/1760
- # We test below to ensure that we don't end up mangling
- # the body before sending.
- u'name,main_ingredient\nRäksmörgås,Räkor\nBaguette,Bröd',
- ]
- for upload_contents in test_cases:
- multipart_body = '{"body_field_one": 7}'
- upload_bytes = upload_contents.encode('ascii', 'backslashreplace')
- upload_config = base_api.ApiUploadInfo(
- accept=['*/*'],
- max_size=None,
- resumable_multipart=True,
- resumable_path=u'/resumable/upload',
- simple_multipart=True,
- simple_path=u'/upload',
- )
- url_builder = base_api._UrlBuilder('http://www.uploads.com')
- # Test multipart: having a body argument in http_request forces
- # multipart here.
- upload = transfer.Upload.FromStream(
- six.BytesIO(upload_bytes),
- 'text/plain',
- total_size=len(upload_bytes))
- http_request = http_wrapper.Request(
- 'http://www.uploads.com',
- headers={'content-type': 'text/plain'},
- body=multipart_body)
- upload.ConfigureRequest(upload_config, http_request, url_builder)
- self.assertEqual(
- 'multipart', url_builder.query_params['uploadType'])
- rewritten_upload_contents = b'\n'.join(
- http_request.body.split(b'--')[2].splitlines()[1:])
- self.assertTrue(rewritten_upload_contents.endswith(upload_bytes))
- # Test non-multipart (aka media): no body argument means this is
- # sent as media.
- upload = transfer.Upload.FromStream(
- six.BytesIO(upload_bytes),
- 'text/plain',
- total_size=len(upload_bytes))
- http_request = http_wrapper.Request(
- 'http://www.uploads.com',
- headers={'content-type': 'text/plain'})
- upload.ConfigureRequest(upload_config, http_request, url_builder)
- self.assertEqual(url_builder.query_params['uploadType'], 'media')
- rewritten_upload_contents = http_request.body
- self.assertTrue(rewritten_upload_contents.endswith(upload_bytes))
- class UploadTest(unittest2.TestCase):
- def setUp(self):
- # Sample highly compressible data.
- self.sample_data = b'abc' * 200
- # Stream of the sample data.
- self.sample_stream = six.BytesIO(self.sample_data)
- # Sample url_builder.
- self.url_builder = base_api._UrlBuilder('http://www.uploads.com')
- # Sample request.
- self.request = http_wrapper.Request(
- 'http://www.uploads.com',
- headers={'content-type': 'text/plain'})
- # Sample successful response.
- self.response = http_wrapper.Response(
- info={'status': http_client.OK,
- 'location': 'http://www.uploads.com'},
- content='',
- request_url='http://www.uploads.com',)
- # Sample failure response.
- self.fail_response = http_wrapper.Response(
- info={'status': http_client.SERVICE_UNAVAILABLE,
- 'location': 'http://www.uploads.com'},
- content='',
- request_url='http://www.uploads.com',)
- def testStreamInChunksCompressed(self):
- """Test that StreamInChunks will handle compression correctly."""
- # Create and configure the upload object.
- upload = transfer.Upload(
- stream=self.sample_stream,
- mime_type='text/plain',
- total_size=len(self.sample_data),
- close_stream=False,
- gzip_encoded=True)
- upload.strategy = transfer.RESUMABLE_UPLOAD
- # Set the chunk size so the entire stream is uploaded.
- upload.chunksize = len(self.sample_data)
- # Mock the upload to return the sample response.
- with mock.patch.object(transfer.Upload,
- '_Upload__SendMediaRequest') as mock_result, \
- mock.patch.object(http_wrapper,
- 'MakeRequest') as make_request:
- mock_result.return_value = self.response
- make_request.return_value = self.response
- # Initialization.
- upload.InitializeUpload(self.request, 'http')
- upload.StreamInChunks()
- # Get the uploaded request and end position of the stream.
- (request, _), _ = mock_result.call_args_list[0]
- # Ensure the mock was called.
- self.assertTrue(mock_result.called)
- # Ensure the correct content encoding was set.
- self.assertEqual(request.headers['Content-Encoding'], 'gzip')
- # Ensure the stream was compresed.
- self.assertLess(len(request.body), len(self.sample_data))
- def testStreamMediaCompressedFail(self):
- """Test that non-chunked uploads raise an exception.
- Ensure uploads with the compressed and resumable flags set called from
- StreamMedia raise an exception. Those uploads are unsupported.
- """
- # Create the upload object.
- upload = transfer.Upload(
- stream=self.sample_stream,
- mime_type='text/plain',
- total_size=len(self.sample_data),
- close_stream=False,
- auto_transfer=True,
- gzip_encoded=True)
- upload.strategy = transfer.RESUMABLE_UPLOAD
- # Mock the upload to return the sample response.
- with mock.patch.object(http_wrapper,
- 'MakeRequest') as make_request:
- make_request.return_value = self.response
- # Initialization.
- upload.InitializeUpload(self.request, 'http')
- # Ensure stream media raises an exception when the upload is
- # compressed. Compression is not supported on non-chunked uploads.
- with self.assertRaises(exceptions.InvalidUserInputError):
- upload.StreamMedia()
- def testAutoTransferCompressed(self):
- """Test that automatic transfers are compressed.
- Ensure uploads with the compressed, resumable, and automatic transfer
- flags set call StreamInChunks. StreamInChunks is tested in an earlier
- test.
- """
- # Create the upload object.
- upload = transfer.Upload(
- stream=self.sample_stream,
- mime_type='text/plain',
- total_size=len(self.sample_data),
- close_stream=False,
- gzip_encoded=True)
- upload.strategy = transfer.RESUMABLE_UPLOAD
- # Mock the upload to return the sample response.
- with mock.patch.object(transfer.Upload,
- 'StreamInChunks') as mock_result, \
- mock.patch.object(http_wrapper,
- 'MakeRequest') as make_request:
- mock_result.return_value = self.response
- make_request.return_value = self.response
- # Initialization.
- upload.InitializeUpload(self.request, 'http')
- # Ensure the mock was called.
- self.assertTrue(mock_result.called)
- def testMultipartCompressed(self):
- """Test that multipart uploads are compressed."""
- # Create the multipart configuration.
- upload_config = base_api.ApiUploadInfo(
- accept=['*/*'],
- max_size=None,
- simple_multipart=True,
- simple_path=u'/upload',)
- # Create the upload object.
- upload = transfer.Upload(
- stream=self.sample_stream,
- mime_type='text/plain',
- total_size=len(self.sample_data),
- close_stream=False,
- gzip_encoded=True)
- # Set a body to trigger multipart configuration.
- self.request.body = '{"body_field_one": 7}'
- # Configure the request.
- upload.ConfigureRequest(upload_config, self.request, self.url_builder)
- # Ensure the request is a multipart request now.
- self.assertEqual(
- self.url_builder.query_params['uploadType'], 'multipart')
- # Ensure the request is gzip encoded.
- self.assertEqual(self.request.headers['Content-Encoding'], 'gzip')
- # Ensure data is compressed
- self.assertLess(len(self.request.body), len(self.sample_data))
- # Ensure uncompressed data includes the sample data.
- with gzip.GzipFile(fileobj=self.request.body) as f:
- original = f.read()
- self.assertTrue(self.sample_data in original)
- def testMediaCompressed(self):
- """Test that media uploads are compressed."""
- # Create the media configuration.
- upload_config = base_api.ApiUploadInfo(
- accept=['*/*'],
- max_size=None,
- simple_multipart=True,
- simple_path=u'/upload',)
- # Create the upload object.
- upload = transfer.Upload(
- stream=self.sample_stream,
- mime_type='text/plain',
- total_size=len(self.sample_data),
- close_stream=False,
- gzip_encoded=True)
- # Configure the request.
- upload.ConfigureRequest(upload_config, self.request, self.url_builder)
- # Ensure the request is a media request now.
- self.assertEqual(self.url_builder.query_params['uploadType'], 'media')
- # Ensure the request is gzip encoded.
- self.assertEqual(self.request.headers['Content-Encoding'], 'gzip')
- # Ensure data is compressed
- self.assertLess(len(self.request.body), len(self.sample_data))
- # Ensure uncompressed data includes the sample data.
- with gzip.GzipFile(fileobj=self.request.body) as f:
- original = f.read()
- self.assertTrue(self.sample_data in original)
- def HttpRequestSideEffect(self, responses=None):
- responses = [(response.info, response.content)
- for response in responses]
- def _side_effect(uri, **kwargs): # pylint: disable=unused-argument
- body = kwargs['body']
- read_func = getattr(body, 'read', None)
- if read_func:
- # If the body is a stream, consume the stream.
- body = read_func()
- self.assertEqual(int(kwargs['headers']['content-length']),
- len(body))
- return responses.pop(0)
- return _side_effect
- def testRetryRequestChunks(self):
- """Test that StreamInChunks will retry correctly."""
- refresh_response = http_wrapper.Response(
- info={'status': http_wrapper.RESUME_INCOMPLETE,
- 'location': 'http://www.uploads.com'},
- content='',
- request_url='http://www.uploads.com',)
- # Create and configure the upload object.
- bytes_http = httplib2.Http()
- upload = transfer.Upload(
- stream=self.sample_stream,
- mime_type='text/plain',
- total_size=len(self.sample_data),
- close_stream=False,
- http=bytes_http)
- upload.strategy = transfer.RESUMABLE_UPLOAD
- # Set the chunk size so the entire stream is uploaded.
- upload.chunksize = len(self.sample_data)
- # Mock the upload to return the sample response.
- with mock.patch.object(bytes_http,
- 'request') as make_request:
- # This side effect also checks the request body.
- responses = [
- self.response, # Initial request in InitializeUpload().
- self.fail_response, # 503 status code from server.
- refresh_response, # Refresh upload progress.
- self.response, # Successful request.
- ]
- make_request.side_effect = self.HttpRequestSideEffect(responses)
- # Initialization.
- upload.InitializeUpload(self.request, bytes_http)
- upload.StreamInChunks()
- # Ensure the mock was called the correct number of times.
- self.assertEquals(make_request.call_count, len(responses))
- def testStreamInChunks(self):
- """Test StreamInChunks."""
- resume_incomplete_responses = [http_wrapper.Response(
- info={'status': http_wrapper.RESUME_INCOMPLETE,
- 'location': 'http://www.uploads.com',
- 'range': '0-{}'.format(end)},
- content='',
- request_url='http://www.uploads.com',) for end in [199, 399, 599]]
- responses = [
- self.response # Initial request in InitializeUpload().
- ] + resume_incomplete_responses + [
- self.response, # Successful request.
- ]
- # Create and configure the upload object.
- bytes_http = httplib2.Http()
- upload = transfer.Upload(
- stream=self.sample_stream,
- mime_type='text/plain',
- total_size=len(self.sample_data),
- close_stream=False,
- http=bytes_http)
- upload.strategy = transfer.RESUMABLE_UPLOAD
- # Set the chunk size so the entire stream is uploaded.
- upload.chunksize = 200
- # Mock the upload to return the sample response.
- with mock.patch.object(bytes_http,
- 'request') as make_request:
- # This side effect also checks the request body.
- make_request.side_effect = self.HttpRequestSideEffect(responses)
- # Initialization.
- upload.InitializeUpload(self.request, bytes_http)
- upload.StreamInChunks()
- # Ensure the mock was called the correct number of times.
- self.assertEquals(make_request.call_count, len(responses))
|