test_crt.py 9.8 KB


  1. # Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import fnmatch
  14. import threading
  15. import time
  16. from concurrent.futures import Future
  17. from botocore.session import Session
  18. from s3transfer.subscribers import BaseSubscriber
  19. from tests import HAS_CRT, FileCreator, mock, requires_crt, unittest
  20. if HAS_CRT:
  21. import awscrt
  22. import s3transfer.crt
  23. class submitThread(threading.Thread):
  24. def __init__(self, transfer_manager, futures, callargs):
  25. threading.Thread.__init__(self)
  26. self._transfer_manager = transfer_manager
  27. self._futures = futures
  28. self._callargs = callargs
  29. def run(self):
  30. self._futures.append(self._transfer_manager.download(*self._callargs))
  31. class RecordingSubscriber(BaseSubscriber):
  32. def __init__(self):
  33. self.on_queued_called = False
  34. self.on_done_called = False
  35. self.bytes_transferred = 0
  36. self.on_queued_future = None
  37. self.on_done_future = None
  38. def on_queued(self, future, **kwargs):
  39. self.on_queued_called = True
  40. self.on_queued_future = future
  41. def on_done(self, future, **kwargs):
  42. self.on_done_called = True
  43. self.on_done_future = future
  44. @requires_crt
  45. class TestCRTTransferManager(unittest.TestCase):
  46. def setUp(self):
  47. self.region = 'us-west-2'
  48. self.bucket = "test_bucket"
  49. self.key = "test_key"
  50. self.files = FileCreator()
  51. self.filename = self.files.create_file('myfile', 'my content')
  52. self.expected_path = "/" + self.bucket + "/" + self.key
  53. self.expected_host = "s3.%s.amazonaws.com" % (self.region)
  54. self.s3_request = mock.Mock(awscrt.s3.S3Request)
  55. self.s3_crt_client = mock.Mock(awscrt.s3.S3Client)
  56. self.s3_crt_client.make_request.return_value = self.s3_request
  57. self.session = Session()
  58. self.session.set_config_variable('region', self.region)
  59. self.request_serializer = s3transfer.crt.BotocoreCRTRequestSerializer(
  60. self.session
  61. )
  62. self.transfer_manager = s3transfer.crt.CRTTransferManager(
  63. crt_s3_client=self.s3_crt_client,
  64. crt_request_serializer=self.request_serializer,
  65. )
  66. self.record_subscriber = RecordingSubscriber()
  67. def tearDown(self):
  68. self.files.remove_all()
  69. def _assert_subscribers_called(self, expected_future=None):
  70. self.assertTrue(self.record_subscriber.on_queued_called)
  71. self.assertTrue(self.record_subscriber.on_done_called)
  72. if expected_future:
  73. self.assertIs(
  74. self.record_subscriber.on_queued_future, expected_future
  75. )
  76. self.assertIs(
  77. self.record_subscriber.on_done_future, expected_future
  78. )
  79. def _invoke_done_callbacks(self, **kwargs):
  80. callargs = self.s3_crt_client.make_request.call_args
  81. callargs_kwargs = callargs[1]
  82. on_done = callargs_kwargs["on_done"]
  83. on_done(error=None)
  84. def _simulate_file_download(self, recv_filepath):
  85. self.files.create_file(recv_filepath, "fake response")
  86. def _simulate_make_request_side_effect(self, **kwargs):
  87. if kwargs.get('recv_filepath'):
  88. self._simulate_file_download(kwargs['recv_filepath'])
  89. self._invoke_done_callbacks()
  90. return mock.DEFAULT
  91. def test_upload(self):
  92. self.s3_crt_client.make_request.side_effect = (
  93. self._simulate_make_request_side_effect
  94. )
  95. future = self.transfer_manager.upload(
  96. self.filename, self.bucket, self.key, {}, [self.record_subscriber]
  97. )
  98. future.result()
  99. callargs = self.s3_crt_client.make_request.call_args
  100. callargs_kwargs = callargs[1]
  101. self.assertEqual(callargs_kwargs["send_filepath"], self.filename)
  102. self.assertIsNone(callargs_kwargs["recv_filepath"])
  103. self.assertEqual(
  104. callargs_kwargs["type"], awscrt.s3.S3RequestType.PUT_OBJECT
  105. )
  106. crt_request = callargs_kwargs["request"]
  107. self.assertEqual("PUT", crt_request.method)
  108. self.assertEqual(self.expected_path, crt_request.path)
  109. self.assertEqual(self.expected_host, crt_request.headers.get("host"))
  110. self._assert_subscribers_called(future)
  111. def test_download(self):
  112. self.s3_crt_client.make_request.side_effect = (
  113. self._simulate_make_request_side_effect
  114. )
  115. future = self.transfer_manager.download(
  116. self.bucket, self.key, self.filename, {}, [self.record_subscriber]
  117. )
  118. future.result()
  119. callargs = self.s3_crt_client.make_request.call_args
  120. callargs_kwargs = callargs[1]
  121. # the recv_filepath will be set to a temporary file path with some
  122. # random suffix
  123. self.assertTrue(
  124. fnmatch.fnmatch(
  125. callargs_kwargs["recv_filepath"],
  126. f'{self.filename}.*',
  127. )
  128. )
  129. self.assertIsNone(callargs_kwargs["send_filepath"])
  130. self.assertEqual(
  131. callargs_kwargs["type"], awscrt.s3.S3RequestType.GET_OBJECT
  132. )
  133. crt_request = callargs_kwargs["request"]
  134. self.assertEqual("GET", crt_request.method)
  135. self.assertEqual(self.expected_path, crt_request.path)
  136. self.assertEqual(self.expected_host, crt_request.headers.get("host"))
  137. self._assert_subscribers_called(future)
  138. with open(self.filename, 'rb') as f:
  139. # Check the fake response overwrites the file because of download
  140. self.assertEqual(f.read(), b'fake response')
  141. def test_delete(self):
  142. self.s3_crt_client.make_request.side_effect = (
  143. self._simulate_make_request_side_effect
  144. )
  145. future = self.transfer_manager.delete(
  146. self.bucket, self.key, {}, [self.record_subscriber]
  147. )
  148. future.result()
  149. callargs = self.s3_crt_client.make_request.call_args
  150. callargs_kwargs = callargs[1]
  151. self.assertIsNone(callargs_kwargs["send_filepath"])
  152. self.assertIsNone(callargs_kwargs["recv_filepath"])
  153. self.assertEqual(
  154. callargs_kwargs["type"], awscrt.s3.S3RequestType.DEFAULT
  155. )
  156. crt_request = callargs_kwargs["request"]
  157. self.assertEqual("DELETE", crt_request.method)
  158. self.assertEqual(self.expected_path, crt_request.path)
  159. self.assertEqual(self.expected_host, crt_request.headers.get("host"))
  160. self._assert_subscribers_called(future)
  161. def test_blocks_when_max_requests_processes_reached(self):
  162. futures = []
  163. callargs = (self.bucket, self.key, self.filename, {}, [])
  164. max_request_processes = 128 # the hard coded max processes
  165. all_concurrent = max_request_processes + 1
  166. threads = []
  167. for i in range(0, all_concurrent):
  168. thread = submitThread(self.transfer_manager, futures, callargs)
  169. thread.start()
  170. threads.append(thread)
  171. # Sleep until the expected max requests has been reached
  172. while len(futures) < max_request_processes:
  173. time.sleep(0.05)
  174. self.assertLessEqual(
  175. self.s3_crt_client.make_request.call_count, max_request_processes
  176. )
  177. # Release lock
  178. callargs = self.s3_crt_client.make_request.call_args
  179. callargs_kwargs = callargs[1]
  180. on_done = callargs_kwargs["on_done"]
  181. on_done(error=None)
  182. for thread in threads:
  183. thread.join()
  184. self.assertEqual(
  185. self.s3_crt_client.make_request.call_count, all_concurrent
  186. )
  187. def _cancel_function(self):
  188. self.cancel_called = True
  189. self.s3_request.finished_future.set_exception(
  190. awscrt.exceptions.from_code(0)
  191. )
  192. self._invoke_done_callbacks()
  193. def test_cancel(self):
  194. self.s3_request.finished_future = Future()
  195. self.cancel_called = False
  196. self.s3_request.cancel = self._cancel_function
  197. try:
  198. with self.transfer_manager:
  199. future = self.transfer_manager.upload(
  200. self.filename, self.bucket, self.key, {}, []
  201. )
  202. raise KeyboardInterrupt()
  203. except KeyboardInterrupt:
  204. pass
  205. with self.assertRaises(awscrt.exceptions.AwsCrtError):
  206. future.result()
  207. self.assertTrue(self.cancel_called)
  208. def test_serializer_error_handling(self):
  209. class SerializationException(Exception):
  210. pass
  211. class ExceptionRaisingSerializer(
  212. s3transfer.crt.BaseCRTRequestSerializer
  213. ):
  214. def serialize_http_request(self, transfer_type, future):
  215. raise SerializationException()
  216. not_impl_serializer = ExceptionRaisingSerializer()
  217. transfer_manager = s3transfer.crt.CRTTransferManager(
  218. crt_s3_client=self.s3_crt_client,
  219. crt_request_serializer=not_impl_serializer,
  220. )
  221. future = transfer_manager.upload(
  222. self.filename, self.bucket, self.key, {}, []
  223. )
  224. with self.assertRaises(SerializationException):
  225. future.result()
  226. def test_crt_s3_client_error_handling(self):
  227. self.s3_crt_client.make_request.side_effect = (
  228. awscrt.exceptions.from_code(0)
  229. )
  230. future = self.transfer_manager.upload(
  231. self.filename, self.bucket, self.key, {}, []
  232. )
  233. with self.assertRaises(awscrt.exceptions.AwsCrtError):
  234. future.result()