test_futures.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import os
  14. import sys
  15. import time
  16. import traceback
  17. from concurrent.futures import ThreadPoolExecutor
  18. from s3transfer.exceptions import (
  19. CancelledError,
  20. FatalError,
  21. TransferNotDoneError,
  22. )
  23. from s3transfer.futures import (
  24. BaseExecutor,
  25. BoundedExecutor,
  26. ExecutorFuture,
  27. NonThreadedExecutor,
  28. NonThreadedExecutorFuture,
  29. TransferCoordinator,
  30. TransferFuture,
  31. TransferMeta,
  32. )
  33. from s3transfer.tasks import Task
  34. from s3transfer.utils import (
  35. FunctionContainer,
  36. NoResourcesAvailable,
  37. TaskSemaphore,
  38. )
  39. from tests import (
  40. RecordingExecutor,
  41. TransferCoordinatorWithInterrupt,
  42. mock,
  43. unittest,
  44. )
  45. def return_call_args(*args, **kwargs):
  46. return args, kwargs
  47. def raise_exception(exception):
  48. raise exception
  49. def get_exc_info(exception):
  50. try:
  51. raise_exception(exception)
  52. except Exception:
  53. return sys.exc_info()
  54. class RecordingTransferCoordinator(TransferCoordinator):
  55. def __init__(self):
  56. self.all_transfer_futures_ever_associated = set()
  57. super().__init__()
  58. def add_associated_future(self, future):
  59. self.all_transfer_futures_ever_associated.add(future)
  60. super().add_associated_future(future)
  61. class ReturnFooTask(Task):
  62. def _main(self, **kwargs):
  63. return 'foo'
  64. class SleepTask(Task):
  65. def _main(self, sleep_time, **kwargs):
  66. time.sleep(sleep_time)
  67. class TestTransferFuture(unittest.TestCase):
  68. def setUp(self):
  69. self.meta = TransferMeta()
  70. self.coordinator = TransferCoordinator()
  71. self.future = self._get_transfer_future()
  72. def _get_transfer_future(self, **kwargs):
  73. components = {
  74. 'meta': self.meta,
  75. 'coordinator': self.coordinator,
  76. }
  77. for component_name, component in kwargs.items():
  78. components[component_name] = component
  79. return TransferFuture(**components)
  80. def test_meta(self):
  81. self.assertIs(self.future.meta, self.meta)
  82. def test_done(self):
  83. self.assertFalse(self.future.done())
  84. self.coordinator.set_result(None)
  85. self.assertTrue(self.future.done())
  86. def test_result(self):
  87. result = 'foo'
  88. self.coordinator.set_result(result)
  89. self.coordinator.announce_done()
  90. self.assertEqual(self.future.result(), result)
  91. def test_keyboard_interrupt_on_result_does_not_block(self):
  92. # This should raise a KeyboardInterrupt when result is called on it.
  93. self.coordinator = TransferCoordinatorWithInterrupt()
  94. self.future = self._get_transfer_future()
  95. # result() should not block and immediately raise the keyboard
  96. # interrupt exception.
  97. with self.assertRaises(KeyboardInterrupt):
  98. self.future.result()
  99. def test_cancel(self):
  100. self.future.cancel()
  101. self.assertTrue(self.future.done())
  102. self.assertEqual(self.coordinator.status, 'cancelled')
  103. def test_set_exception(self):
  104. # Set the result such that there is no exception
  105. self.coordinator.set_result('result')
  106. self.coordinator.announce_done()
  107. self.assertEqual(self.future.result(), 'result')
  108. self.future.set_exception(ValueError())
  109. with self.assertRaises(ValueError):
  110. self.future.result()
  111. def test_set_exception_only_after_done(self):
  112. with self.assertRaises(TransferNotDoneError):
  113. self.future.set_exception(ValueError())
  114. self.coordinator.set_result('result')
  115. self.coordinator.announce_done()
  116. self.future.set_exception(ValueError())
  117. with self.assertRaises(ValueError):
  118. self.future.result()
  119. class TestTransferMeta(unittest.TestCase):
  120. def setUp(self):
  121. self.transfer_meta = TransferMeta()
  122. def test_size(self):
  123. self.assertEqual(self.transfer_meta.size, None)
  124. self.transfer_meta.provide_transfer_size(5)
  125. self.assertEqual(self.transfer_meta.size, 5)
  126. def test_call_args(self):
  127. call_args = object()
  128. transfer_meta = TransferMeta(call_args)
  129. # Assert the that call args provided is the same as is returned
  130. self.assertIs(transfer_meta.call_args, call_args)
  131. def test_transfer_id(self):
  132. transfer_meta = TransferMeta(transfer_id=1)
  133. self.assertEqual(transfer_meta.transfer_id, 1)
  134. def test_user_context(self):
  135. self.transfer_meta.user_context['foo'] = 'bar'
  136. self.assertEqual(self.transfer_meta.user_context, {'foo': 'bar'})
  137. class TestTransferCoordinator(unittest.TestCase):
  138. def setUp(self):
  139. self.transfer_coordinator = TransferCoordinator()
  140. def test_transfer_id(self):
  141. transfer_coordinator = TransferCoordinator(transfer_id=1)
  142. self.assertEqual(transfer_coordinator.transfer_id, 1)
  143. def test_repr(self):
  144. transfer_coordinator = TransferCoordinator(transfer_id=1)
  145. self.assertEqual(
  146. repr(transfer_coordinator), 'TransferCoordinator(transfer_id=1)'
  147. )
  148. def test_initial_status(self):
  149. # A TransferCoordinator with no progress should have the status
  150. # of not-started
  151. self.assertEqual(self.transfer_coordinator.status, 'not-started')
  152. def test_set_status_to_queued(self):
  153. self.transfer_coordinator.set_status_to_queued()
  154. self.assertEqual(self.transfer_coordinator.status, 'queued')
  155. def test_cannot_set_status_to_queued_from_done_state(self):
  156. self.transfer_coordinator.set_exception(RuntimeError)
  157. with self.assertRaises(RuntimeError):
  158. self.transfer_coordinator.set_status_to_queued()
  159. def test_status_running(self):
  160. self.transfer_coordinator.set_status_to_running()
  161. self.assertEqual(self.transfer_coordinator.status, 'running')
  162. def test_cannot_set_status_to_running_from_done_state(self):
  163. self.transfer_coordinator.set_exception(RuntimeError)
  164. with self.assertRaises(RuntimeError):
  165. self.transfer_coordinator.set_status_to_running()
  166. def test_set_result(self):
  167. success_result = 'foo'
  168. self.transfer_coordinator.set_result(success_result)
  169. self.transfer_coordinator.announce_done()
  170. # Setting result should result in a success state and the return value
  171. # that was set.
  172. self.assertEqual(self.transfer_coordinator.status, 'success')
  173. self.assertEqual(self.transfer_coordinator.result(), success_result)
  174. def test_set_exception(self):
  175. exception_result = RuntimeError
  176. self.transfer_coordinator.set_exception(exception_result)
  177. self.transfer_coordinator.announce_done()
  178. # Setting an exception should result in a failed state and the return
  179. # value should be the raised exception
  180. self.assertEqual(self.transfer_coordinator.status, 'failed')
  181. self.assertEqual(self.transfer_coordinator.exception, exception_result)
  182. with self.assertRaises(exception_result):
  183. self.transfer_coordinator.result()
  184. def test_exception_cannot_override_done_state(self):
  185. self.transfer_coordinator.set_result('foo')
  186. self.transfer_coordinator.set_exception(RuntimeError)
  187. # It status should be success even after the exception is set because
  188. # success is a done state.
  189. self.assertEqual(self.transfer_coordinator.status, 'success')
  190. def test_exception_can_override_done_state_with_override_flag(self):
  191. self.transfer_coordinator.set_result('foo')
  192. self.transfer_coordinator.set_exception(RuntimeError, override=True)
  193. self.assertEqual(self.transfer_coordinator.status, 'failed')
  194. def test_cancel(self):
  195. self.assertEqual(self.transfer_coordinator.status, 'not-started')
  196. self.transfer_coordinator.cancel()
  197. # This should set the state to cancelled and raise the CancelledError
  198. # exception and should have also set the done event so that result()
  199. # is no longer set.
  200. self.assertEqual(self.transfer_coordinator.status, 'cancelled')
  201. with self.assertRaises(CancelledError):
  202. self.transfer_coordinator.result()
  203. def test_cancel_can_run_done_callbacks_that_uses_result(self):
  204. exceptions = []
  205. def capture_exception(transfer_coordinator, captured_exceptions):
  206. try:
  207. transfer_coordinator.result()
  208. except Exception as e:
  209. captured_exceptions.append(e)
  210. self.assertEqual(self.transfer_coordinator.status, 'not-started')
  211. self.transfer_coordinator.add_done_callback(
  212. capture_exception, self.transfer_coordinator, exceptions
  213. )
  214. self.transfer_coordinator.cancel()
  215. self.assertEqual(len(exceptions), 1)
  216. self.assertIsInstance(exceptions[0], CancelledError)
  217. def test_cancel_with_message(self):
  218. message = 'my message'
  219. self.transfer_coordinator.cancel(message)
  220. self.transfer_coordinator.announce_done()
  221. with self.assertRaisesRegex(CancelledError, message):
  222. self.transfer_coordinator.result()
  223. def test_cancel_with_provided_exception(self):
  224. message = 'my message'
  225. self.transfer_coordinator.cancel(message, exc_type=FatalError)
  226. self.transfer_coordinator.announce_done()
  227. with self.assertRaisesRegex(FatalError, message):
  228. self.transfer_coordinator.result()
  229. def test_cancel_cannot_override_done_state(self):
  230. self.transfer_coordinator.set_result('foo')
  231. self.transfer_coordinator.cancel()
  232. # It status should be success even after cancel is called because
  233. # success is a done state.
  234. self.assertEqual(self.transfer_coordinator.status, 'success')
  235. def test_set_result_can_override_cancel(self):
  236. self.transfer_coordinator.cancel()
  237. # Result setting should override any cancel or set exception as this
  238. # is always invoked by the final task.
  239. self.transfer_coordinator.set_result('foo')
  240. self.transfer_coordinator.announce_done()
  241. self.assertEqual(self.transfer_coordinator.status, 'success')
  242. def test_submit(self):
  243. # Submit a callable to the transfer coordinator. It should submit it
  244. # to the executor.
  245. executor = RecordingExecutor(
  246. BoundedExecutor(1, 1, {'my-tag': TaskSemaphore(1)})
  247. )
  248. task = ReturnFooTask(self.transfer_coordinator)
  249. future = self.transfer_coordinator.submit(executor, task, tag='my-tag')
  250. executor.shutdown()
  251. # Make sure the future got submit and executed as well by checking its
  252. # result value which should include the provided future tag.
  253. self.assertEqual(
  254. executor.submissions,
  255. [{'block': True, 'tag': 'my-tag', 'task': task}],
  256. )
  257. self.assertEqual(future.result(), 'foo')
  258. def test_association_and_disassociation_on_submit(self):
  259. self.transfer_coordinator = RecordingTransferCoordinator()
  260. # Submit a callable to the transfer coordinator.
  261. executor = BoundedExecutor(1, 1)
  262. task = ReturnFooTask(self.transfer_coordinator)
  263. future = self.transfer_coordinator.submit(executor, task)
  264. executor.shutdown()
  265. # Make sure the future that got submitted was associated to the
  266. # transfer future at some point.
  267. self.assertEqual(
  268. self.transfer_coordinator.all_transfer_futures_ever_associated,
  269. {future},
  270. )
  271. # Make sure the future got disassociated once the future is now done
  272. # by looking at the currently associated futures.
  273. self.assertEqual(self.transfer_coordinator.associated_futures, set())
  274. def test_done(self):
  275. # These should result in not done state:
  276. # queued
  277. self.assertFalse(self.transfer_coordinator.done())
  278. # running
  279. self.transfer_coordinator.set_status_to_running()
  280. self.assertFalse(self.transfer_coordinator.done())
  281. # These should result in done state:
  282. # failed
  283. self.transfer_coordinator.set_exception(Exception)
  284. self.assertTrue(self.transfer_coordinator.done())
  285. # success
  286. self.transfer_coordinator.set_result('foo')
  287. self.assertTrue(self.transfer_coordinator.done())
  288. # cancelled
  289. self.transfer_coordinator.cancel()
  290. self.assertTrue(self.transfer_coordinator.done())
  291. def test_result_waits_until_done(self):
  292. execution_order = []
  293. def sleep_then_set_result(transfer_coordinator, execution_order):
  294. time.sleep(0.05)
  295. execution_order.append('setting_result')
  296. transfer_coordinator.set_result(None)
  297. self.transfer_coordinator.announce_done()
  298. with ThreadPoolExecutor(max_workers=1) as executor:
  299. executor.submit(
  300. sleep_then_set_result,
  301. self.transfer_coordinator,
  302. execution_order,
  303. )
  304. self.transfer_coordinator.result()
  305. execution_order.append('after_result')
  306. # The result() call should have waited until the other thread set
  307. # the result after sleeping for 0.05 seconds.
  308. self.assertTrue(execution_order, ['setting_result', 'after_result'])
  309. def test_failure_cleanups(self):
  310. args = (1, 2)
  311. kwargs = {'foo': 'bar'}
  312. second_args = (2, 4)
  313. second_kwargs = {'biz': 'baz'}
  314. self.transfer_coordinator.add_failure_cleanup(
  315. return_call_args, *args, **kwargs
  316. )
  317. self.transfer_coordinator.add_failure_cleanup(
  318. return_call_args, *second_args, **second_kwargs
  319. )
  320. # Ensure the callbacks got added.
  321. self.assertEqual(len(self.transfer_coordinator.failure_cleanups), 2)
  322. result_list = []
  323. # Ensure they will get called in the correct order.
  324. for cleanup in self.transfer_coordinator.failure_cleanups:
  325. result_list.append(cleanup())
  326. self.assertEqual(
  327. result_list, [(args, kwargs), (second_args, second_kwargs)]
  328. )
  329. def test_associated_futures(self):
  330. first_future = object()
  331. # Associate one future to the transfer
  332. self.transfer_coordinator.add_associated_future(first_future)
  333. associated_futures = self.transfer_coordinator.associated_futures
  334. # The first future should be in the returned list of futures.
  335. self.assertEqual(associated_futures, {first_future})
  336. second_future = object()
  337. # Associate another future to the transfer.
  338. self.transfer_coordinator.add_associated_future(second_future)
  339. # The association should not have mutated the returned list from
  340. # before.
  341. self.assertEqual(associated_futures, {first_future})
  342. # Both futures should be in the returned list.
  343. self.assertEqual(
  344. self.transfer_coordinator.associated_futures,
  345. {first_future, second_future},
  346. )
  347. def test_done_callbacks_on_done(self):
  348. done_callback_invocations = []
  349. callback = FunctionContainer(
  350. done_callback_invocations.append, 'done callback called'
  351. )
  352. # Add the done callback to the transfer.
  353. self.transfer_coordinator.add_done_callback(callback)
  354. # Announce that the transfer is done. This should invoke the done
  355. # callback.
  356. self.transfer_coordinator.announce_done()
  357. self.assertEqual(done_callback_invocations, ['done callback called'])
  358. # If done is announced again, we should not invoke the callback again
  359. # because done has already been announced and thus the callback has
  360. # been ran as well.
  361. self.transfer_coordinator.announce_done()
  362. self.assertEqual(done_callback_invocations, ['done callback called'])
  363. def test_failure_cleanups_on_done(self):
  364. cleanup_invocations = []
  365. callback = FunctionContainer(
  366. cleanup_invocations.append, 'cleanup called'
  367. )
  368. # Add the failure cleanup to the transfer.
  369. self.transfer_coordinator.add_failure_cleanup(callback)
  370. # Announce that the transfer is done. This should invoke the failure
  371. # cleanup.
  372. self.transfer_coordinator.announce_done()
  373. self.assertEqual(cleanup_invocations, ['cleanup called'])
  374. # If done is announced again, we should not invoke the cleanup again
  375. # because done has already been announced and thus the cleanup has
  376. # been ran as well.
  377. self.transfer_coordinator.announce_done()
  378. self.assertEqual(cleanup_invocations, ['cleanup called'])
  379. class TestBoundedExecutor(unittest.TestCase):
  380. def setUp(self):
  381. self.coordinator = TransferCoordinator()
  382. self.tag_semaphores = {}
  383. self.executor = self.get_executor()
  384. def get_executor(self, max_size=1, max_num_threads=1):
  385. return BoundedExecutor(max_size, max_num_threads, self.tag_semaphores)
  386. def get_task(self, task_cls, main_kwargs=None):
  387. return task_cls(self.coordinator, main_kwargs=main_kwargs)
  388. def get_sleep_task(self, sleep_time=0.01):
  389. return self.get_task(SleepTask, main_kwargs={'sleep_time': sleep_time})
  390. def add_semaphore(self, task_tag, count):
  391. self.tag_semaphores[task_tag] = TaskSemaphore(count)
  392. def assert_submit_would_block(self, task, tag=None):
  393. with self.assertRaises(NoResourcesAvailable):
  394. self.executor.submit(task, tag=tag, block=False)
  395. def assert_submit_would_not_block(self, task, tag=None, **kwargs):
  396. try:
  397. self.executor.submit(task, tag=tag, block=False)
  398. except NoResourcesAvailable:
  399. self.fail(
  400. 'Task {} should not have been blocked. Caused by:\n{}'.format(
  401. task, traceback.format_exc()
  402. )
  403. )
  404. def add_done_callback_to_future(self, future, fn, *args, **kwargs):
  405. callback_for_future = FunctionContainer(fn, *args, **kwargs)
  406. future.add_done_callback(callback_for_future)
  407. def test_submit_single_task(self):
  408. # Ensure we can submit a task to the executor
  409. task = self.get_task(ReturnFooTask)
  410. future = self.executor.submit(task)
  411. # Ensure what we get back is a Future
  412. self.assertIsInstance(future, ExecutorFuture)
  413. # Ensure the callable got executed.
  414. self.assertEqual(future.result(), 'foo')
  415. @unittest.skipIf(
  416. os.environ.get('USE_SERIAL_EXECUTOR'),
  417. "Not supported with serial executor tests",
  418. )
  419. def test_executor_blocks_on_full_capacity(self):
  420. first_task = self.get_sleep_task()
  421. second_task = self.get_sleep_task()
  422. self.executor.submit(first_task)
  423. # The first task should be sleeping for a substantial period of
  424. # time such that on the submission of the second task, it will
  425. # raise an error saying that it cannot be submitted as the max
  426. # capacity of the semaphore is one.
  427. self.assert_submit_would_block(second_task)
  428. def test_executor_clears_capacity_on_done_tasks(self):
  429. first_task = self.get_sleep_task()
  430. second_task = self.get_task(ReturnFooTask)
  431. # Submit a task.
  432. future = self.executor.submit(first_task)
  433. # Submit a new task when the first task finishes. This should not get
  434. # blocked because the first task should have finished clearing up
  435. # capacity.
  436. self.add_done_callback_to_future(
  437. future, self.assert_submit_would_not_block, second_task
  438. )
  439. # Wait for it to complete.
  440. self.executor.shutdown()
  441. @unittest.skipIf(
  442. os.environ.get('USE_SERIAL_EXECUTOR'),
  443. "Not supported with serial executor tests",
  444. )
  445. def test_would_not_block_when_full_capacity_in_other_semaphore(self):
  446. first_task = self.get_sleep_task()
  447. # Now let's create a new task with a tag and so it uses different
  448. # semaphore.
  449. task_tag = 'other'
  450. other_task = self.get_sleep_task()
  451. self.add_semaphore(task_tag, 1)
  452. # Submit the normal first task
  453. self.executor.submit(first_task)
  454. # Even though The first task should be sleeping for a substantial
  455. # period of time, the submission of the second task should not
  456. # raise an error because it should use a different semaphore
  457. self.assert_submit_would_not_block(other_task, task_tag)
  458. # Another submission of the other task though should raise
  459. # an exception as the capacity is equal to one for that tag.
  460. self.assert_submit_would_block(other_task, task_tag)
  461. def test_shutdown(self):
  462. slow_task = self.get_sleep_task()
  463. future = self.executor.submit(slow_task)
  464. self.executor.shutdown()
  465. # Ensure that the shutdown waits until the task is done
  466. self.assertTrue(future.done())
  467. @unittest.skipIf(
  468. os.environ.get('USE_SERIAL_EXECUTOR'),
  469. "Not supported with serial executor tests",
  470. )
  471. def test_shutdown_no_wait(self):
  472. slow_task = self.get_sleep_task()
  473. future = self.executor.submit(slow_task)
  474. self.executor.shutdown(False)
  475. # Ensure that the shutdown returns immediately even if the task is
  476. # not done, which it should not be because it it slow.
  477. self.assertFalse(future.done())
  478. def test_replace_underlying_executor(self):
  479. mocked_executor_cls = mock.Mock(BaseExecutor)
  480. executor = BoundedExecutor(10, 1, {}, mocked_executor_cls)
  481. executor.submit(self.get_task(ReturnFooTask))
  482. self.assertTrue(mocked_executor_cls.return_value.submit.called)
  483. class TestExecutorFuture(unittest.TestCase):
  484. def test_result(self):
  485. with ThreadPoolExecutor(max_workers=1) as executor:
  486. future = executor.submit(return_call_args, 'foo', biz='baz')
  487. wrapped_future = ExecutorFuture(future)
  488. self.assertEqual(wrapped_future.result(), (('foo',), {'biz': 'baz'}))
  489. def test_done(self):
  490. with ThreadPoolExecutor(max_workers=1) as executor:
  491. future = executor.submit(return_call_args, 'foo', biz='baz')
  492. wrapped_future = ExecutorFuture(future)
  493. self.assertTrue(wrapped_future.done())
  494. def test_add_done_callback(self):
  495. done_callbacks = []
  496. with ThreadPoolExecutor(max_workers=1) as executor:
  497. future = executor.submit(return_call_args, 'foo', biz='baz')
  498. wrapped_future = ExecutorFuture(future)
  499. wrapped_future.add_done_callback(
  500. FunctionContainer(done_callbacks.append, 'called')
  501. )
  502. self.assertEqual(done_callbacks, ['called'])
  503. class TestNonThreadedExecutor(unittest.TestCase):
  504. def test_submit(self):
  505. executor = NonThreadedExecutor()
  506. future = executor.submit(return_call_args, 1, 2, foo='bar')
  507. self.assertIsInstance(future, NonThreadedExecutorFuture)
  508. self.assertEqual(future.result(), ((1, 2), {'foo': 'bar'}))
  509. def test_submit_with_exception(self):
  510. executor = NonThreadedExecutor()
  511. future = executor.submit(raise_exception, RuntimeError())
  512. self.assertIsInstance(future, NonThreadedExecutorFuture)
  513. with self.assertRaises(RuntimeError):
  514. future.result()
  515. def test_submit_with_exception_and_captures_info(self):
  516. exception = ValueError('message')
  517. tb = get_exc_info(exception)[2]
  518. future = NonThreadedExecutor().submit(raise_exception, exception)
  519. try:
  520. future.result()
  521. # An exception should have been raised
  522. self.fail('Future should have raised a ValueError')
  523. except ValueError:
  524. actual_tb = sys.exc_info()[2]
  525. last_frame = traceback.extract_tb(actual_tb)[-1]
  526. last_expected_frame = traceback.extract_tb(tb)[-1]
  527. self.assertEqual(last_frame, last_expected_frame)
  528. class TestNonThreadedExecutorFuture(unittest.TestCase):
  529. def setUp(self):
  530. self.future = NonThreadedExecutorFuture()
  531. def test_done_starts_false(self):
  532. self.assertFalse(self.future.done())
  533. def test_done_after_setting_result(self):
  534. self.future.set_result('result')
  535. self.assertTrue(self.future.done())
  536. def test_done_after_setting_exception(self):
  537. self.future.set_exception_info(Exception(), None)
  538. self.assertTrue(self.future.done())
  539. def test_result(self):
  540. self.future.set_result('result')
  541. self.assertEqual(self.future.result(), 'result')
  542. def test_exception_result(self):
  543. exception = ValueError('message')
  544. self.future.set_exception_info(exception, None)
  545. with self.assertRaisesRegex(ValueError, 'message'):
  546. self.future.result()
  547. def test_exception_result_doesnt_modify_last_frame(self):
  548. exception = ValueError('message')
  549. tb = get_exc_info(exception)[2]
  550. self.future.set_exception_info(exception, tb)
  551. try:
  552. self.future.result()
  553. # An exception should have been raised
  554. self.fail()
  555. except ValueError:
  556. actual_tb = sys.exc_info()[2]
  557. last_frame = traceback.extract_tb(actual_tb)[-1]
  558. last_expected_frame = traceback.extract_tb(tb)[-1]
  559. self.assertEqual(last_frame, last_expected_frame)
  560. def test_done_callback(self):
  561. done_futures = []
  562. self.future.add_done_callback(done_futures.append)
  563. self.assertEqual(done_futures, [])
  564. self.future.set_result('result')
  565. self.assertEqual(done_futures, [self.future])
  566. def test_done_callback_after_done(self):
  567. self.future.set_result('result')
  568. done_futures = []
  569. self.future.add_done_callback(done_futures.append)
  570. self.assertEqual(done_futures, [self.future])