test_tasks.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. from concurrent import futures
  14. from functools import partial
  15. from threading import Event
  16. from s3transfer.futures import BoundedExecutor, TransferCoordinator
  17. from s3transfer.subscribers import BaseSubscriber
  18. from s3transfer.tasks import (
  19. CompleteMultipartUploadTask,
  20. CreateMultipartUploadTask,
  21. SubmissionTask,
  22. Task,
  23. )
  24. from s3transfer.utils import CallArgs, FunctionContainer, get_callbacks
  25. from tests import (
  26. BaseSubmissionTaskTest,
  27. BaseTaskTest,
  28. RecordingSubscriber,
  29. unittest,
  30. )
  31. class TaskFailureException(Exception):
  32. pass
  33. class SuccessTask(Task):
  34. def _main(
  35. self, return_value='success', callbacks=None, failure_cleanups=None
  36. ):
  37. if callbacks:
  38. for callback in callbacks:
  39. callback()
  40. if failure_cleanups:
  41. for failure_cleanup in failure_cleanups:
  42. self._transfer_coordinator.add_failure_cleanup(failure_cleanup)
  43. return return_value
  44. class FailureTask(Task):
  45. def _main(self, exception=TaskFailureException):
  46. raise exception()
  47. class ReturnKwargsTask(Task):
  48. def _main(self, **kwargs):
  49. return kwargs
  50. class SubmitMoreTasksTask(Task):
  51. def _main(self, executor, tasks_to_submit):
  52. for task_to_submit in tasks_to_submit:
  53. self._transfer_coordinator.submit(executor, task_to_submit)
  54. class NOOPSubmissionTask(SubmissionTask):
  55. def _submit(self, transfer_future, **kwargs):
  56. pass
  57. class ExceptionSubmissionTask(SubmissionTask):
  58. def _submit(
  59. self,
  60. transfer_future,
  61. executor=None,
  62. tasks_to_submit=None,
  63. additional_callbacks=None,
  64. exception=TaskFailureException,
  65. ):
  66. if executor and tasks_to_submit:
  67. for task_to_submit in tasks_to_submit:
  68. self._transfer_coordinator.submit(executor, task_to_submit)
  69. if additional_callbacks:
  70. for callback in additional_callbacks:
  71. callback()
  72. raise exception()
  73. class StatusRecordingTransferCoordinator(TransferCoordinator):
  74. def __init__(self, transfer_id=None):
  75. super().__init__(transfer_id)
  76. self.status_changes = [self._status]
  77. def set_status_to_queued(self):
  78. super().set_status_to_queued()
  79. self._record_status_change()
  80. def set_status_to_running(self):
  81. super().set_status_to_running()
  82. self._record_status_change()
  83. def _record_status_change(self):
  84. self.status_changes.append(self._status)
  85. class RecordingStateSubscriber(BaseSubscriber):
  86. def __init__(self, transfer_coordinator):
  87. self._transfer_coordinator = transfer_coordinator
  88. self.status_during_on_queued = None
  89. def on_queued(self, **kwargs):
  90. self.status_during_on_queued = self._transfer_coordinator.status
  91. class TestSubmissionTask(BaseSubmissionTaskTest):
  92. def setUp(self):
  93. super().setUp()
  94. self.executor = BoundedExecutor(1000, 5)
  95. self.call_args = CallArgs(subscribers=[])
  96. self.transfer_future = self.get_transfer_future(self.call_args)
  97. self.main_kwargs = {'transfer_future': self.transfer_future}
  98. def test_transitions_from_not_started_to_queued_to_running(self):
  99. self.transfer_coordinator = StatusRecordingTransferCoordinator()
  100. submission_task = self.get_task(
  101. NOOPSubmissionTask, main_kwargs=self.main_kwargs
  102. )
  103. # Status should be queued until submission task has been ran.
  104. self.assertEqual(self.transfer_coordinator.status, 'not-started')
  105. submission_task()
  106. # Once submission task has been ran, the status should now be running.
  107. self.assertEqual(self.transfer_coordinator.status, 'running')
  108. # Ensure the transitions were as expected as well.
  109. self.assertEqual(
  110. self.transfer_coordinator.status_changes,
  111. ['not-started', 'queued', 'running'],
  112. )
  113. def test_on_queued_callbacks(self):
  114. submission_task = self.get_task(
  115. NOOPSubmissionTask, main_kwargs=self.main_kwargs
  116. )
  117. subscriber = RecordingSubscriber()
  118. self.call_args.subscribers.append(subscriber)
  119. submission_task()
  120. # Make sure the on_queued callback of the subscriber is called.
  121. self.assertEqual(
  122. subscriber.on_queued_calls, [{'future': self.transfer_future}]
  123. )
  124. def test_on_queued_status_in_callbacks(self):
  125. submission_task = self.get_task(
  126. NOOPSubmissionTask, main_kwargs=self.main_kwargs
  127. )
  128. subscriber = RecordingStateSubscriber(self.transfer_coordinator)
  129. self.call_args.subscribers.append(subscriber)
  130. submission_task()
  131. # Make sure the status was queued during on_queued callback.
  132. self.assertEqual(subscriber.status_during_on_queued, 'queued')
  133. def test_sets_exception_from_submit(self):
  134. submission_task = self.get_task(
  135. ExceptionSubmissionTask, main_kwargs=self.main_kwargs
  136. )
  137. submission_task()
  138. # Make sure the status of the future is failed
  139. self.assertEqual(self.transfer_coordinator.status, 'failed')
  140. # Make sure the future propagates the exception encountered in the
  141. # submission task.
  142. with self.assertRaises(TaskFailureException):
  143. self.transfer_future.result()
  144. def test_catches_and_sets_keyboard_interrupt_exception_from_submit(self):
  145. self.main_kwargs['exception'] = KeyboardInterrupt
  146. submission_task = self.get_task(
  147. ExceptionSubmissionTask, main_kwargs=self.main_kwargs
  148. )
  149. submission_task()
  150. self.assertEqual(self.transfer_coordinator.status, 'failed')
  151. with self.assertRaises(KeyboardInterrupt):
  152. self.transfer_future.result()
  153. def test_calls_done_callbacks_on_exception(self):
  154. submission_task = self.get_task(
  155. ExceptionSubmissionTask, main_kwargs=self.main_kwargs
  156. )
  157. subscriber = RecordingSubscriber()
  158. self.call_args.subscribers.append(subscriber)
  159. # Add the done callback to the callbacks to be invoked when the
  160. # transfer is done.
  161. done_callbacks = get_callbacks(self.transfer_future, 'done')
  162. for done_callback in done_callbacks:
  163. self.transfer_coordinator.add_done_callback(done_callback)
  164. submission_task()
  165. # Make sure the task failed to start
  166. self.assertEqual(self.transfer_coordinator.status, 'failed')
  167. # Make sure the on_done callback of the subscriber is called.
  168. self.assertEqual(
  169. subscriber.on_done_calls, [{'future': self.transfer_future}]
  170. )
  171. def test_calls_failure_cleanups_on_exception(self):
  172. submission_task = self.get_task(
  173. ExceptionSubmissionTask, main_kwargs=self.main_kwargs
  174. )
  175. # Add the callback to the callbacks to be invoked when the
  176. # transfer fails.
  177. invocations_of_cleanup = []
  178. cleanup_callback = FunctionContainer(
  179. invocations_of_cleanup.append, 'cleanup happened'
  180. )
  181. self.transfer_coordinator.add_failure_cleanup(cleanup_callback)
  182. submission_task()
  183. # Make sure the task failed to start
  184. self.assertEqual(self.transfer_coordinator.status, 'failed')
  185. # Make sure the cleanup was called.
  186. self.assertEqual(invocations_of_cleanup, ['cleanup happened'])
  187. def test_cleanups_only_ran_once_on_exception(self):
  188. # We want to be able to handle the case where the final task completes
  189. # and anounces done but there is an error in the submission task
  190. # which will cause it to need to announce done as well. In this case,
  191. # we do not want the done callbacks to be invoke more than once.
  192. final_task = self.get_task(FailureTask, is_final=True)
  193. self.main_kwargs['executor'] = self.executor
  194. self.main_kwargs['tasks_to_submit'] = [final_task]
  195. submission_task = self.get_task(
  196. ExceptionSubmissionTask, main_kwargs=self.main_kwargs
  197. )
  198. subscriber = RecordingSubscriber()
  199. self.call_args.subscribers.append(subscriber)
  200. # Add the done callback to the callbacks to be invoked when the
  201. # transfer is done.
  202. done_callbacks = get_callbacks(self.transfer_future, 'done')
  203. for done_callback in done_callbacks:
  204. self.transfer_coordinator.add_done_callback(done_callback)
  205. submission_task()
  206. # Make sure the task failed to start
  207. self.assertEqual(self.transfer_coordinator.status, 'failed')
  208. # Make sure the on_done callback of the subscriber is called only once.
  209. self.assertEqual(
  210. subscriber.on_done_calls, [{'future': self.transfer_future}]
  211. )
  212. def test_done_callbacks_only_ran_once_on_exception(self):
  213. # We want to be able to handle the case where the final task completes
  214. # and anounces done but there is an error in the submission task
  215. # which will cause it to need to announce done as well. In this case,
  216. # we do not want the failure cleanups to be invoked more than once.
  217. final_task = self.get_task(FailureTask, is_final=True)
  218. self.main_kwargs['executor'] = self.executor
  219. self.main_kwargs['tasks_to_submit'] = [final_task]
  220. submission_task = self.get_task(
  221. ExceptionSubmissionTask, main_kwargs=self.main_kwargs
  222. )
  223. # Add the callback to the callbacks to be invoked when the
  224. # transfer fails.
  225. invocations_of_cleanup = []
  226. cleanup_callback = FunctionContainer(
  227. invocations_of_cleanup.append, 'cleanup happened'
  228. )
  229. self.transfer_coordinator.add_failure_cleanup(cleanup_callback)
  230. submission_task()
  231. # Make sure the task failed to start
  232. self.assertEqual(self.transfer_coordinator.status, 'failed')
  233. # Make sure the cleanup was called only once.
  234. self.assertEqual(invocations_of_cleanup, ['cleanup happened'])
  235. def test_handles_cleanups_submitted_in_other_tasks(self):
  236. invocations_of_cleanup = []
  237. event = Event()
  238. cleanup_callback = FunctionContainer(
  239. invocations_of_cleanup.append, 'cleanup happened'
  240. )
  241. # We want the cleanup to be added in the execution of the task and
  242. # still be executed by the submission task when it fails.
  243. task = self.get_task(
  244. SuccessTask,
  245. main_kwargs={
  246. 'callbacks': [event.set],
  247. 'failure_cleanups': [cleanup_callback],
  248. },
  249. )
  250. self.main_kwargs['executor'] = self.executor
  251. self.main_kwargs['tasks_to_submit'] = [task]
  252. self.main_kwargs['additional_callbacks'] = [event.wait]
  253. submission_task = self.get_task(
  254. ExceptionSubmissionTask, main_kwargs=self.main_kwargs
  255. )
  256. submission_task()
  257. self.assertEqual(self.transfer_coordinator.status, 'failed')
  258. # Make sure the cleanup was called even though the callback got
  259. # added in a completely different task.
  260. self.assertEqual(invocations_of_cleanup, ['cleanup happened'])
  261. def test_waits_for_tasks_submitted_by_other_tasks_on_exception(self):
  262. # In this test, we want to make sure that any tasks that may be
  263. # submitted in another task complete before we start performing
  264. # cleanups.
  265. #
  266. # This is tested by doing the following:
  267. #
  268. # ExecutionSubmissionTask
  269. # |
  270. # +--submits-->SubmitMoreTasksTask
  271. # |
  272. # +--submits-->SuccessTask
  273. # |
  274. # +-->sleeps-->adds failure cleanup
  275. #
  276. # In the end, the failure cleanup of the SuccessTask should be ran
  277. # when the ExecutionSubmissionTask fails. If the
  278. # ExeceptionSubmissionTask did not run the failure cleanup it is most
  279. # likely that it did not wait for the SuccessTask to complete, which
  280. # it needs to because the ExeceptionSubmissionTask does not know
  281. # what failure cleanups it needs to run until all spawned tasks have
  282. # completed.
  283. invocations_of_cleanup = []
  284. event = Event()
  285. cleanup_callback = FunctionContainer(
  286. invocations_of_cleanup.append, 'cleanup happened'
  287. )
  288. cleanup_task = self.get_task(
  289. SuccessTask,
  290. main_kwargs={
  291. 'callbacks': [event.set],
  292. 'failure_cleanups': [cleanup_callback],
  293. },
  294. )
  295. task_for_submitting_cleanup_task = self.get_task(
  296. SubmitMoreTasksTask,
  297. main_kwargs={
  298. 'executor': self.executor,
  299. 'tasks_to_submit': [cleanup_task],
  300. },
  301. )
  302. self.main_kwargs['executor'] = self.executor
  303. self.main_kwargs['tasks_to_submit'] = [
  304. task_for_submitting_cleanup_task
  305. ]
  306. self.main_kwargs['additional_callbacks'] = [event.wait]
  307. submission_task = self.get_task(
  308. ExceptionSubmissionTask, main_kwargs=self.main_kwargs
  309. )
  310. submission_task()
  311. self.assertEqual(self.transfer_coordinator.status, 'failed')
  312. self.assertEqual(invocations_of_cleanup, ['cleanup happened'])
  313. def test_submission_task_announces_done_if_cancelled_before_main(self):
  314. invocations_of_done = []
  315. done_callback = FunctionContainer(
  316. invocations_of_done.append, 'done announced'
  317. )
  318. self.transfer_coordinator.add_done_callback(done_callback)
  319. self.transfer_coordinator.cancel()
  320. submission_task = self.get_task(
  321. NOOPSubmissionTask, main_kwargs=self.main_kwargs
  322. )
  323. submission_task()
  324. # Because the submission task was cancelled before being run
  325. # it did not submit any extra tasks so a result it is responsible
  326. # for making sure it announces done as nothing else will.
  327. self.assertEqual(invocations_of_done, ['done announced'])
  328. class TestTask(unittest.TestCase):
  329. def setUp(self):
  330. self.transfer_id = 1
  331. self.transfer_coordinator = TransferCoordinator(
  332. transfer_id=self.transfer_id
  333. )
  334. def test_repr(self):
  335. main_kwargs = {'bucket': 'mybucket', 'param_to_not_include': 'foo'}
  336. task = ReturnKwargsTask(
  337. self.transfer_coordinator, main_kwargs=main_kwargs
  338. )
  339. # The repr should not include the other parameter because it is not
  340. # a desired parameter to include.
  341. self.assertEqual(
  342. repr(task),
  343. 'ReturnKwargsTask(transfer_id={}, {})'.format(
  344. self.transfer_id, {'bucket': 'mybucket'}
  345. ),
  346. )
  347. def test_transfer_id(self):
  348. task = SuccessTask(self.transfer_coordinator)
  349. # Make sure that the id is the one provided to the id associated
  350. # to the transfer coordinator.
  351. self.assertEqual(task.transfer_id, self.transfer_id)
  352. def test_context_status_transitioning_success(self):
  353. # The status should be set to running.
  354. self.transfer_coordinator.set_status_to_running()
  355. self.assertEqual(self.transfer_coordinator.status, 'running')
  356. # If a task is called, the status still should be running.
  357. SuccessTask(self.transfer_coordinator)()
  358. self.assertEqual(self.transfer_coordinator.status, 'running')
  359. # Once the final task is called, the status should be set to success.
  360. SuccessTask(self.transfer_coordinator, is_final=True)()
  361. self.assertEqual(self.transfer_coordinator.status, 'success')
  362. def test_context_status_transitioning_failed(self):
  363. self.transfer_coordinator.set_status_to_running()
  364. SuccessTask(self.transfer_coordinator)()
  365. self.assertEqual(self.transfer_coordinator.status, 'running')
  366. # A failure task should result in the failed status
  367. FailureTask(self.transfer_coordinator)()
  368. self.assertEqual(self.transfer_coordinator.status, 'failed')
  369. # Even if the final task comes in and succeeds, it should stay failed.
  370. SuccessTask(self.transfer_coordinator, is_final=True)()
  371. self.assertEqual(self.transfer_coordinator.status, 'failed')
  372. def test_result_setting_for_success(self):
  373. override_return = 'foo'
  374. SuccessTask(self.transfer_coordinator)()
  375. SuccessTask(
  376. self.transfer_coordinator,
  377. main_kwargs={'return_value': override_return},
  378. is_final=True,
  379. )()
  380. # The return value for the transfer future should be of the final
  381. # task.
  382. self.assertEqual(self.transfer_coordinator.result(), override_return)
  383. def test_result_setting_for_error(self):
  384. FailureTask(self.transfer_coordinator)()
  385. # If another failure comes in, the result should still throw the
  386. # original exception when result() is eventually called.
  387. FailureTask(
  388. self.transfer_coordinator, main_kwargs={'exception': Exception}
  389. )()
  390. # Even if a success task comes along, the result of the future
  391. # should be the original exception
  392. SuccessTask(self.transfer_coordinator, is_final=True)()
  393. with self.assertRaises(TaskFailureException):
  394. self.transfer_coordinator.result()
  395. def test_done_callbacks_success(self):
  396. callback_results = []
  397. SuccessTask(
  398. self.transfer_coordinator,
  399. done_callbacks=[
  400. partial(callback_results.append, 'first'),
  401. partial(callback_results.append, 'second'),
  402. ],
  403. )()
  404. # For successful tasks, the done callbacks should get called.
  405. self.assertEqual(callback_results, ['first', 'second'])
  406. def test_done_callbacks_failure(self):
  407. callback_results = []
  408. FailureTask(
  409. self.transfer_coordinator,
  410. done_callbacks=[
  411. partial(callback_results.append, 'first'),
  412. partial(callback_results.append, 'second'),
  413. ],
  414. )()
  415. # For even failed tasks, the done callbacks should get called.
  416. self.assertEqual(callback_results, ['first', 'second'])
  417. # Callbacks should continue to be called even after a related failure
  418. SuccessTask(
  419. self.transfer_coordinator,
  420. done_callbacks=[
  421. partial(callback_results.append, 'third'),
  422. partial(callback_results.append, 'fourth'),
  423. ],
  424. )()
  425. self.assertEqual(
  426. callback_results, ['first', 'second', 'third', 'fourth']
  427. )
  428. def test_failure_cleanups_on_failure(self):
  429. callback_results = []
  430. self.transfer_coordinator.add_failure_cleanup(
  431. callback_results.append, 'first'
  432. )
  433. self.transfer_coordinator.add_failure_cleanup(
  434. callback_results.append, 'second'
  435. )
  436. FailureTask(self.transfer_coordinator)()
  437. # The failure callbacks should have not been called yet because it
  438. # is not the last task
  439. self.assertEqual(callback_results, [])
  440. # Now the failure callbacks should get called.
  441. SuccessTask(self.transfer_coordinator, is_final=True)()
  442. self.assertEqual(callback_results, ['first', 'second'])
  443. def test_no_failure_cleanups_on_success(self):
  444. callback_results = []
  445. self.transfer_coordinator.add_failure_cleanup(
  446. callback_results.append, 'first'
  447. )
  448. self.transfer_coordinator.add_failure_cleanup(
  449. callback_results.append, 'second'
  450. )
  451. SuccessTask(self.transfer_coordinator, is_final=True)()
  452. # The failure cleanups should not have been called because no task
  453. # failed for the transfer context.
  454. self.assertEqual(callback_results, [])
  455. def test_passing_main_kwargs(self):
  456. main_kwargs = {'foo': 'bar', 'baz': 'biz'}
  457. ReturnKwargsTask(
  458. self.transfer_coordinator, main_kwargs=main_kwargs, is_final=True
  459. )()
  460. # The kwargs should have been passed to the main()
  461. self.assertEqual(self.transfer_coordinator.result(), main_kwargs)
  462. def test_passing_pending_kwargs_single_futures(self):
  463. pending_kwargs = {}
  464. ref_main_kwargs = {'foo': 'bar', 'baz': 'biz'}
  465. # Pass some tasks to an executor
  466. with futures.ThreadPoolExecutor(1) as executor:
  467. pending_kwargs['foo'] = executor.submit(
  468. SuccessTask(
  469. self.transfer_coordinator,
  470. main_kwargs={'return_value': ref_main_kwargs['foo']},
  471. )
  472. )
  473. pending_kwargs['baz'] = executor.submit(
  474. SuccessTask(
  475. self.transfer_coordinator,
  476. main_kwargs={'return_value': ref_main_kwargs['baz']},
  477. )
  478. )
  479. # Create a task that depends on the tasks passed to the executor
  480. ReturnKwargsTask(
  481. self.transfer_coordinator,
  482. pending_main_kwargs=pending_kwargs,
  483. is_final=True,
  484. )()
  485. # The result should have the pending keyword arg values flushed
  486. # out.
  487. self.assertEqual(self.transfer_coordinator.result(), ref_main_kwargs)
  488. def test_passing_pending_kwargs_list_of_futures(self):
  489. pending_kwargs = {}
  490. ref_main_kwargs = {'foo': ['first', 'second']}
  491. # Pass some tasks to an executor
  492. with futures.ThreadPoolExecutor(1) as executor:
  493. first_future = executor.submit(
  494. SuccessTask(
  495. self.transfer_coordinator,
  496. main_kwargs={'return_value': ref_main_kwargs['foo'][0]},
  497. )
  498. )
  499. second_future = executor.submit(
  500. SuccessTask(
  501. self.transfer_coordinator,
  502. main_kwargs={'return_value': ref_main_kwargs['foo'][1]},
  503. )
  504. )
  505. # Make the pending keyword arg value a list
  506. pending_kwargs['foo'] = [first_future, second_future]
  507. # Create a task that depends on the tasks passed to the executor
  508. ReturnKwargsTask(
  509. self.transfer_coordinator,
  510. pending_main_kwargs=pending_kwargs,
  511. is_final=True,
  512. )()
  513. # The result should have the pending keyword arg values flushed
  514. # out in the expected order.
  515. self.assertEqual(self.transfer_coordinator.result(), ref_main_kwargs)
  516. def test_passing_pending_and_non_pending_kwargs(self):
  517. main_kwargs = {'nonpending_value': 'foo'}
  518. pending_kwargs = {}
  519. ref_main_kwargs = {
  520. 'nonpending_value': 'foo',
  521. 'pending_value': 'bar',
  522. 'pending_list': ['first', 'second'],
  523. }
  524. # Create the pending tasks
  525. with futures.ThreadPoolExecutor(1) as executor:
  526. pending_kwargs['pending_value'] = executor.submit(
  527. SuccessTask(
  528. self.transfer_coordinator,
  529. main_kwargs={
  530. 'return_value': ref_main_kwargs['pending_value']
  531. },
  532. )
  533. )
  534. first_future = executor.submit(
  535. SuccessTask(
  536. self.transfer_coordinator,
  537. main_kwargs={
  538. 'return_value': ref_main_kwargs['pending_list'][0]
  539. },
  540. )
  541. )
  542. second_future = executor.submit(
  543. SuccessTask(
  544. self.transfer_coordinator,
  545. main_kwargs={
  546. 'return_value': ref_main_kwargs['pending_list'][1]
  547. },
  548. )
  549. )
  550. # Make the pending keyword arg value a list
  551. pending_kwargs['pending_list'] = [first_future, second_future]
  552. # Create a task that depends on the tasks passed to the executor
  553. # and just regular nonpending kwargs.
  554. ReturnKwargsTask(
  555. self.transfer_coordinator,
  556. main_kwargs=main_kwargs,
  557. pending_main_kwargs=pending_kwargs,
  558. is_final=True,
  559. )()
  560. # The result should have all of the kwargs (both pending and
  561. # nonpending)
  562. self.assertEqual(self.transfer_coordinator.result(), ref_main_kwargs)
  563. def test_single_failed_pending_future(self):
  564. pending_kwargs = {}
  565. # Pass some tasks to an executor. Make one successful and the other
  566. # a failure.
  567. with futures.ThreadPoolExecutor(1) as executor:
  568. pending_kwargs['foo'] = executor.submit(
  569. SuccessTask(
  570. self.transfer_coordinator,
  571. main_kwargs={'return_value': 'bar'},
  572. )
  573. )
  574. pending_kwargs['baz'] = executor.submit(
  575. FailureTask(self.transfer_coordinator)
  576. )
  577. # Create a task that depends on the tasks passed to the executor
  578. ReturnKwargsTask(
  579. self.transfer_coordinator,
  580. pending_main_kwargs=pending_kwargs,
  581. is_final=True,
  582. )()
  583. # The end result should raise the exception from the initial
  584. # pending future value
  585. with self.assertRaises(TaskFailureException):
  586. self.transfer_coordinator.result()
  587. def test_single_failed_pending_future_in_list(self):
  588. pending_kwargs = {}
  589. # Pass some tasks to an executor. Make one successful and the other
  590. # a failure.
  591. with futures.ThreadPoolExecutor(1) as executor:
  592. first_future = executor.submit(
  593. SuccessTask(
  594. self.transfer_coordinator,
  595. main_kwargs={'return_value': 'bar'},
  596. )
  597. )
  598. second_future = executor.submit(
  599. FailureTask(self.transfer_coordinator)
  600. )
  601. pending_kwargs['pending_list'] = [first_future, second_future]
  602. # Create a task that depends on the tasks passed to the executor
  603. ReturnKwargsTask(
  604. self.transfer_coordinator,
  605. pending_main_kwargs=pending_kwargs,
  606. is_final=True,
  607. )()
  608. # The end result should raise the exception from the initial
  609. # pending future value in the list
  610. with self.assertRaises(TaskFailureException):
  611. self.transfer_coordinator.result()
  612. class BaseMultipartTaskTest(BaseTaskTest):
  613. def setUp(self):
  614. super().setUp()
  615. self.bucket = 'mybucket'
  616. self.key = 'foo'
  617. class TestCreateMultipartUploadTask(BaseMultipartTaskTest):
  618. def test_main(self):
  619. upload_id = 'foo'
  620. extra_args = {'Metadata': {'foo': 'bar'}}
  621. response = {'UploadId': upload_id}
  622. task = self.get_task(
  623. CreateMultipartUploadTask,
  624. main_kwargs={
  625. 'client': self.client,
  626. 'bucket': self.bucket,
  627. 'key': self.key,
  628. 'extra_args': extra_args,
  629. },
  630. )
  631. self.stubber.add_response(
  632. method='create_multipart_upload',
  633. service_response=response,
  634. expected_params={
  635. 'Bucket': self.bucket,
  636. 'Key': self.key,
  637. 'Metadata': {'foo': 'bar'},
  638. },
  639. )
  640. result_id = task()
  641. self.stubber.assert_no_pending_responses()
  642. # Ensure the upload id returned is correct
  643. self.assertEqual(upload_id, result_id)
  644. # Make sure that the abort was added as a cleanup failure
  645. self.assertEqual(len(self.transfer_coordinator.failure_cleanups), 1)
  646. # Make sure if it is called, it will abort correctly
  647. self.stubber.add_response(
  648. method='abort_multipart_upload',
  649. service_response={},
  650. expected_params={
  651. 'Bucket': self.bucket,
  652. 'Key': self.key,
  653. 'UploadId': upload_id,
  654. },
  655. )
  656. self.transfer_coordinator.failure_cleanups[0]()
  657. self.stubber.assert_no_pending_responses()
  658. class TestCompleteMultipartUploadTask(BaseMultipartTaskTest):
  659. def test_main(self):
  660. upload_id = 'my-id'
  661. parts = [{'ETag': 'etag', 'PartNumber': 0}]
  662. task = self.get_task(
  663. CompleteMultipartUploadTask,
  664. main_kwargs={
  665. 'client': self.client,
  666. 'bucket': self.bucket,
  667. 'key': self.key,
  668. 'upload_id': upload_id,
  669. 'parts': parts,
  670. 'extra_args': {},
  671. },
  672. )
  673. self.stubber.add_response(
  674. method='complete_multipart_upload',
  675. service_response={},
  676. expected_params={
  677. 'Bucket': self.bucket,
  678. 'Key': self.key,
  679. 'UploadId': upload_id,
  680. 'MultipartUpload': {'Parts': parts},
  681. },
  682. )
  683. task()
  684. self.stubber.assert_no_pending_responses()
  685. def test_includes_extra_args(self):
  686. upload_id = 'my-id'
  687. parts = [{'ETag': 'etag', 'PartNumber': 0}]
  688. task = self.get_task(
  689. CompleteMultipartUploadTask,
  690. main_kwargs={
  691. 'client': self.client,
  692. 'bucket': self.bucket,
  693. 'key': self.key,
  694. 'upload_id': upload_id,
  695. 'parts': parts,
  696. 'extra_args': {'RequestPayer': 'requester'},
  697. },
  698. )
  699. self.stubber.add_response(
  700. method='complete_multipart_upload',
  701. service_response={},
  702. expected_params={
  703. 'Bucket': self.bucket,
  704. 'Key': self.key,
  705. 'UploadId': upload_id,
  706. 'MultipartUpload': {'Parts': parts},
  707. 'RequestPayer': 'requester',
  708. },
  709. )
  710. task()
  711. self.stubber.assert_no_pending_responses()