transfer_test_15.py 1.0 KB

12345678910111213141516171819202122232425262728
  1. def testAutoTransferCompressed(self):
  2. """Test that automatic transfers are compressed.
  3. Ensure uploads with the compressed, resumable, and automatic transfer
  4. flags set call StreamInChunks. StreamInChunks is tested in an earlier
  5. test.
  6. """
  7. # Create the upload object.
  8. upload = transfer.Upload(
  9. stream=self.sample_stream,
  10. mime_type='text/plain',
  11. total_size=len(self.sample_data),
  12. close_stream=False,
  13. gzip_encoded=True)
  14. upload.strategy = transfer.RESUMABLE_UPLOAD
  15. # Mock the upload to return the sample response.
  16. with mock.patch.object(transfer.Upload,
  17. 'StreamInChunks') as mock_result, \
  18. mock.patch.object(http_wrapper,
  19. 'MakeRequest') as make_request:
  20. mock_result.return_value = self.response
  21. make_request.return_value = self.response
  22. # Initialization.
  23. upload.InitializeUpload(self.request, 'http')
  24. # Ensure the mock was called.
  25. self.assertTrue(mock_result.called)