# -*- coding: utf-8 -*- # # Copyright 2015 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests for transfer.py.""" import string import httplib2 import mock import six from six.moves import http_client import unittest2 from apitools.base.py import base_api from apitools.base.py import exceptions from apitools.base.py import gzip from apitools.base.py import http_wrapper from apitools.base.py import transfer class TransferTest(unittest2.TestCase): def assertRangeAndContentRangeCompatible(self, request, response): request_prefix = 'bytes=' self.assertIn('range', request.headers) self.assertTrue(request.headers['range'].startswith(request_prefix)) request_range = request.headers['range'][len(request_prefix):] response_prefix = 'bytes ' self.assertIn('content-range', response.info) response_header = response.info['content-range'] self.assertTrue(response_header.startswith(response_prefix)) response_range = ( response_header[len(response_prefix):].partition('/')[0]) msg = ('Request range ({0}) not a prefix of ' 'response_range ({1})').format( request_range, response_range) self.assertTrue(response_range.startswith(request_range), msg=msg) def testComputeEndByte(self): total_size = 100 chunksize = 10 download = transfer.Download.FromStream( six.StringIO(), chunksize=chunksize, total_size=total_size) self.assertEqual(chunksize - 1, download._Download__ComputeEndByte(0, end=50)) def testComputeEndByteReturnNone(self): download = transfer.Download.FromStream(six.StringIO()) self.assertIsNone( download._Download__ComputeEndByte(0, use_chunks=False)) def testComputeEndByteNoChunks(self): total_size = 100 download = transfer.Download.FromStream( six.StringIO(), chunksize=10, total_size=total_size) for end in (None, 1000): self.assertEqual( total_size - 1, download._Download__ComputeEndByte(0, end=end, use_chunks=False), msg='Failed on end={0}'.format(end)) def testComputeEndByteNoTotal(self): download = transfer.Download.FromStream(six.StringIO()) default_chunksize = download.chunksize for chunksize in (100, default_chunksize): download.chunksize = chunksize for start in (0, 10): self.assertEqual( download.chunksize + start - 1, download._Download__ComputeEndByte(start), msg='Failed on start={0}, chunksize={1}'.format( start, chunksize)) def testComputeEndByteSmallTotal(self): total_size = 100 download = transfer.Download.FromStream(six.StringIO(), total_size=total_size) for start in (0, 10): self.assertEqual(total_size - 1, download._Download__ComputeEndByte(start), msg='Failed on start={0}'.format(start)) def testDownloadThenStream(self): bytes_http = object() http = object() download_stream = six.StringIO() download = transfer.Download.FromStream(download_stream, total_size=26) download.bytes_http = bytes_http base_url = 'https://part.one/' with mock.patch.object(http_wrapper, 'MakeRequest', autospec=True) as make_request: make_request.return_value = http_wrapper.Response( info={ 'content-range': 'bytes 0-25/26', 'status': http_client.OK, }, content=string.ascii_lowercase, request_url=base_url, ) request = http_wrapper.Request(url='https://part.one/') download.InitializeDownload(request, http=http) self.assertEqual(1, make_request.call_count) received_request = make_request.call_args[0][1] self.assertEqual(base_url, received_request.url) self.assertRangeAndContentRangeCompatible( received_request, make_request.return_value) with mock.patch.object(http_wrapper, 'MakeRequest', autospec=True) as make_request: make_request.return_value = http_wrapper.Response( info={ 'status': http_client.REQUESTED_RANGE_NOT_SATISFIABLE, }, content='error', request_url=base_url, ) download.StreamInChunks() self.assertEqual(1, make_request.call_count) received_request = make_request.call_args[0][1] self.assertEqual('bytes=26-', received_request.headers['range']) def testGetRange(self): for (start_byte, end_byte) in [(0, 25), (5, 15), (0, 0), (25, 25)]: bytes_http = object() http = object() download_stream = six.StringIO() download = transfer.Download.FromStream(download_stream, total_size=26, auto_transfer=False) download.bytes_http = bytes_http base_url = 'https://part.one/' with mock.patch.object(http_wrapper, 'MakeRequest', autospec=True) as make_request: make_request.return_value = http_wrapper.Response( info={ 'content-range': 'bytes %d-%d/26' % (start_byte, end_byte), 'status': http_client.OK, }, content=string.ascii_lowercase[start_byte:end_byte + 1], request_url=base_url, ) request = http_wrapper.Request(url='https://part.one/') download.InitializeDownload(request, http=http) download.GetRange(start_byte, end_byte) self.assertEqual(1, make_request.call_count) received_request = make_request.call_args[0][1] self.assertEqual(base_url, received_request.url) self.assertRangeAndContentRangeCompatible( received_request, make_request.return_value) def testNonChunkedDownload(self): bytes_http = object() http = object() download_stream = six.StringIO() download = transfer.Download.FromStream(download_stream, total_size=52) download.bytes_http = bytes_http base_url = 'https://part.one/' with mock.patch.object(http_wrapper, 'MakeRequest', autospec=True) as make_request: make_request.return_value = http_wrapper.Response( info={ 'content-range': 'bytes 0-51/52', 'status': http_client.OK, }, content=string.ascii_lowercase * 2, request_url=base_url, ) request = http_wrapper.Request(url='https://part.one/') download.InitializeDownload(request, http=http) self.assertEqual(1, make_request.call_count) received_request = make_request.call_args[0][1] self.assertEqual(base_url, received_request.url) self.assertRangeAndContentRangeCompatible( received_request, make_request.return_value) download_stream.seek(0) self.assertEqual(string.ascii_lowercase * 2, download_stream.getvalue()) def testChunkedDownload(self): bytes_http = object() http = object() download_stream = six.StringIO() download = transfer.Download.FromStream( download_stream, chunksize=26, total_size=52) download.bytes_http = bytes_http # Setting autospec on a mock with an iterable side_effect is # currently broken (http://bugs.python.org/issue17826), so # instead we write a little function. def _ReturnBytes(unused_http, http_request, *unused_args, **unused_kwds): url = http_request.url if url == 'https://part.one/': return http_wrapper.Response( info={ 'content-location': 'https://part.two/', 'content-range': 'bytes 0-25/52', 'status': http_client.PARTIAL_CONTENT, }, content=string.ascii_lowercase, request_url='https://part.one/', ) elif url == 'https://part.two/': return http_wrapper.Response( info={ 'content-range': 'bytes 26-51/52', 'status': http_client.OK, }, content=string.ascii_uppercase, request_url='https://part.two/', ) else: self.fail('Unknown URL requested: %s' % url) with mock.patch.object(http_wrapper, 'MakeRequest', autospec=True) as make_request: make_request.side_effect = _ReturnBytes request = http_wrapper.Request(url='https://part.one/') download.InitializeDownload(request, http=http) self.assertEqual(2, make_request.call_count) for call in make_request.call_args_list: self.assertRangeAndContentRangeCompatible( call[0][1], _ReturnBytes(*call[0])) download_stream.seek(0) self.assertEqual(string.ascii_lowercase + string.ascii_uppercase, download_stream.getvalue()) def testMultipartEncoding(self): # This is really a table test for various issues we've seen in # the past; see notes below for particular histories. test_cases = [ # Python's mime module by default encodes lines that start # with "From " as ">From ", which we need to make sure we # don't run afoul of when sending content that isn't # intended to be so encoded. This test calls out that we # get this right. We test for both the multipart and # non-multipart case. 'line one\nFrom \nline two', # We had originally used a `six.StringIO` to hold the http # request body in the case of a multipart upload; for # bytes being uploaded in Python3, however, this causes # issues like this: # https://github.com/GoogleCloudPlatform/gcloud-python/issues/1760 # We test below to ensure that we don't end up mangling # the body before sending. u'name,main_ingredient\nRäksmörgås,Räkor\nBaguette,Bröd', ] for upload_contents in test_cases: multipart_body = '{"body_field_one": 7}' upload_bytes = upload_contents.encode('ascii', 'backslashreplace') upload_config = base_api.ApiUploadInfo( accept=['*/*'], max_size=None, resumable_multipart=True, resumable_path=u'/resumable/upload', simple_multipart=True, simple_path=u'/upload', ) url_builder = base_api._UrlBuilder('http://www.uploads.com') # Test multipart: having a body argument in http_request forces # multipart here. upload = transfer.Upload.FromStream( six.BytesIO(upload_bytes), 'text/plain', total_size=len(upload_bytes)) http_request = http_wrapper.Request( 'http://www.uploads.com', headers={'content-type': 'text/plain'}, body=multipart_body) upload.ConfigureRequest(upload_config, http_request, url_builder) self.assertEqual( 'multipart', url_builder.query_params['uploadType']) rewritten_upload_contents = b'\n'.join( http_request.body.split(b'--')[2].splitlines()[1:]) self.assertTrue(rewritten_upload_contents.endswith(upload_bytes)) # Test non-multipart (aka media): no body argument means this is # sent as media. upload = transfer.Upload.FromStream( six.BytesIO(upload_bytes), 'text/plain', total_size=len(upload_bytes)) http_request = http_wrapper.Request( 'http://www.uploads.com', headers={'content-type': 'text/plain'}) upload.ConfigureRequest(upload_config, http_request, url_builder) self.assertEqual(url_builder.query_params['uploadType'], 'media') rewritten_upload_contents = http_request.body self.assertTrue(rewritten_upload_contents.endswith(upload_bytes)) class UploadTest(unittest2.TestCase): def setUp(self): # Sample highly compressible data. self.sample_data = b'abc' * 200 # Stream of the sample data. self.sample_stream = six.BytesIO(self.sample_data) # Sample url_builder. self.url_builder = base_api._UrlBuilder('http://www.uploads.com') # Sample request. self.request = http_wrapper.Request( 'http://www.uploads.com', headers={'content-type': 'text/plain'}) # Sample successful response. self.response = http_wrapper.Response( info={'status': http_client.OK, 'location': 'http://www.uploads.com'}, content='', request_url='http://www.uploads.com',) # Sample failure response. self.fail_response = http_wrapper.Response( info={'status': http_client.SERVICE_UNAVAILABLE, 'location': 'http://www.uploads.com'}, content='', request_url='http://www.uploads.com',) def testStreamInChunksCompressed(self): """Test that StreamInChunks will handle compression correctly.""" # Create and configure the upload object. upload = transfer.Upload( stream=self.sample_stream, mime_type='text/plain', total_size=len(self.sample_data), close_stream=False, gzip_encoded=True) upload.strategy = transfer.RESUMABLE_UPLOAD # Set the chunk size so the entire stream is uploaded. upload.chunksize = len(self.sample_data) # Mock the upload to return the sample response. with mock.patch.object(transfer.Upload, '_Upload__SendMediaRequest') as mock_result, \ mock.patch.object(http_wrapper, 'MakeRequest') as make_request: mock_result.return_value = self.response make_request.return_value = self.response # Initialization. upload.InitializeUpload(self.request, 'http') upload.StreamInChunks() # Get the uploaded request and end position of the stream. (request, _), _ = mock_result.call_args_list[0] # Ensure the mock was called. self.assertTrue(mock_result.called) # Ensure the correct content encoding was set. self.assertEqual(request.headers['Content-Encoding'], 'gzip') # Ensure the stream was compresed. self.assertLess(len(request.body), len(self.sample_data)) def testStreamMediaCompressedFail(self): """Test that non-chunked uploads raise an exception. Ensure uploads with the compressed and resumable flags set called from StreamMedia raise an exception. Those uploads are unsupported. """ # Create the upload object. upload = transfer.Upload( stream=self.sample_stream, mime_type='text/plain', total_size=len(self.sample_data), close_stream=False, auto_transfer=True, gzip_encoded=True) upload.strategy = transfer.RESUMABLE_UPLOAD # Mock the upload to return the sample response. with mock.patch.object(http_wrapper, 'MakeRequest') as make_request: make_request.return_value = self.response # Initialization. upload.InitializeUpload(self.request, 'http') # Ensure stream media raises an exception when the upload is # compressed. Compression is not supported on non-chunked uploads. with self.assertRaises(exceptions.InvalidUserInputError): upload.StreamMedia() def testAutoTransferCompressed(self): """Test that automatic transfers are compressed. Ensure uploads with the compressed, resumable, and automatic transfer flags set call StreamInChunks. StreamInChunks is tested in an earlier test. """ # Create the upload object. upload = transfer.Upload( stream=self.sample_stream, mime_type='text/plain', total_size=len(self.sample_data), close_stream=False, gzip_encoded=True) upload.strategy = transfer.RESUMABLE_UPLOAD # Mock the upload to return the sample response. with mock.patch.object(transfer.Upload, 'StreamInChunks') as mock_result, \ mock.patch.object(http_wrapper, 'MakeRequest') as make_request: mock_result.return_value = self.response make_request.return_value = self.response # Initialization. upload.InitializeUpload(self.request, 'http') # Ensure the mock was called. self.assertTrue(mock_result.called) def testMultipartCompressed(self): """Test that multipart uploads are compressed.""" # Create the multipart configuration. upload_config = base_api.ApiUploadInfo( accept=['*/*'], max_size=None, simple_multipart=True, simple_path=u'/upload',) # Create the upload object. upload = transfer.Upload( stream=self.sample_stream, mime_type='text/plain', total_size=len(self.sample_data), close_stream=False, gzip_encoded=True) # Set a body to trigger multipart configuration. self.request.body = '{"body_field_one": 7}' # Configure the request. upload.ConfigureRequest(upload_config, self.request, self.url_builder) # Ensure the request is a multipart request now. self.assertEqual( self.url_builder.query_params['uploadType'], 'multipart') # Ensure the request is gzip encoded. self.assertEqual(self.request.headers['Content-Encoding'], 'gzip') # Ensure data is compressed self.assertLess(len(self.request.body), len(self.sample_data)) # Ensure uncompressed data includes the sample data. with gzip.GzipFile(fileobj=self.request.body) as f: original = f.read() self.assertTrue(self.sample_data in original) def testMediaCompressed(self): """Test that media uploads are compressed.""" # Create the media configuration. upload_config = base_api.ApiUploadInfo( accept=['*/*'], max_size=None, simple_multipart=True, simple_path=u'/upload',) # Create the upload object. upload = transfer.Upload( stream=self.sample_stream, mime_type='text/plain', total_size=len(self.sample_data), close_stream=False, gzip_encoded=True) # Configure the request. upload.ConfigureRequest(upload_config, self.request, self.url_builder) # Ensure the request is a media request now. self.assertEqual(self.url_builder.query_params['uploadType'], 'media') # Ensure the request is gzip encoded. self.assertEqual(self.request.headers['Content-Encoding'], 'gzip') # Ensure data is compressed self.assertLess(len(self.request.body), len(self.sample_data)) # Ensure uncompressed data includes the sample data. with gzip.GzipFile(fileobj=self.request.body) as f: original = f.read() self.assertTrue(self.sample_data in original) def HttpRequestSideEffect(self, responses=None): responses = [(response.info, response.content) for response in responses] def _side_effect(uri, **kwargs): # pylint: disable=unused-argument body = kwargs['body'] read_func = getattr(body, 'read', None) if read_func: # If the body is a stream, consume the stream. body = read_func() self.assertEqual(int(kwargs['headers']['content-length']), len(body)) return responses.pop(0) return _side_effect def testRetryRequestChunks(self): """Test that StreamInChunks will retry correctly.""" refresh_response = http_wrapper.Response( info={'status': http_wrapper.RESUME_INCOMPLETE, 'location': 'http://www.uploads.com'}, content='', request_url='http://www.uploads.com',) # Create and configure the upload object. bytes_http = httplib2.Http() upload = transfer.Upload( stream=self.sample_stream, mime_type='text/plain', total_size=len(self.sample_data), close_stream=False, http=bytes_http) upload.strategy = transfer.RESUMABLE_UPLOAD # Set the chunk size so the entire stream is uploaded. upload.chunksize = len(self.sample_data) # Mock the upload to return the sample response. with mock.patch.object(bytes_http, 'request') as make_request: # This side effect also checks the request body. responses = [ self.response, # Initial request in InitializeUpload(). self.fail_response, # 503 status code from server. refresh_response, # Refresh upload progress. self.response, # Successful request. ] make_request.side_effect = self.HttpRequestSideEffect(responses) # Initialization. upload.InitializeUpload(self.request, bytes_http) upload.StreamInChunks() # Ensure the mock was called the correct number of times. self.assertEquals(make_request.call_count, len(responses)) def testStreamInChunks(self): """Test StreamInChunks.""" resume_incomplete_responses = [http_wrapper.Response( info={'status': http_wrapper.RESUME_INCOMPLETE, 'location': 'http://www.uploads.com', 'range': '0-{}'.format(end)}, content='', request_url='http://www.uploads.com',) for end in [199, 399, 599]] responses = [ self.response # Initial request in InitializeUpload(). ] + resume_incomplete_responses + [ self.response, # Successful request. ] # Create and configure the upload object. bytes_http = httplib2.Http() upload = transfer.Upload( stream=self.sample_stream, mime_type='text/plain', total_size=len(self.sample_data), close_stream=False, http=bytes_http) upload.strategy = transfer.RESUMABLE_UPLOAD # Set the chunk size so the entire stream is uploaded. upload.chunksize = 200 # Mock the upload to return the sample response. with mock.patch.object(bytes_http, 'request') as make_request: # This side effect also checks the request body. make_request.side_effect = self.HttpRequestSideEffect(responses) # Initialization. upload.InitializeUpload(self.request, bytes_http) upload.StreamInChunks() # Ensure the mock was called the correct number of times. self.assertEquals(make_request.call_count, len(responses))