# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import os import sys import time import traceback from concurrent.futures import ThreadPoolExecutor from s3transfer.exceptions import ( CancelledError, FatalError, TransferNotDoneError, ) from s3transfer.futures import ( BaseExecutor, BoundedExecutor, ExecutorFuture, NonThreadedExecutor, NonThreadedExecutorFuture, TransferCoordinator, TransferFuture, TransferMeta, ) from s3transfer.tasks import Task from s3transfer.utils import ( FunctionContainer, NoResourcesAvailable, TaskSemaphore, ) from tests import ( RecordingExecutor, TransferCoordinatorWithInterrupt, mock, unittest, ) def return_call_args(*args, **kwargs): return args, kwargs def raise_exception(exception): raise exception def get_exc_info(exception): try: raise_exception(exception) except Exception: return sys.exc_info() class RecordingTransferCoordinator(TransferCoordinator): def __init__(self): self.all_transfer_futures_ever_associated = set() super().__init__() def add_associated_future(self, future): self.all_transfer_futures_ever_associated.add(future) super().add_associated_future(future) class ReturnFooTask(Task): def _main(self, **kwargs): return 'foo' class SleepTask(Task): def _main(self, sleep_time, **kwargs): time.sleep(sleep_time) class TestTransferFuture(unittest.TestCase): def setUp(self): self.meta = TransferMeta() self.coordinator = TransferCoordinator() self.future = self._get_transfer_future() def _get_transfer_future(self, **kwargs): components = { 'meta': self.meta, 'coordinator': self.coordinator, } for component_name, component in kwargs.items(): components[component_name] = component return TransferFuture(**components) def test_meta(self): self.assertIs(self.future.meta, self.meta) def test_done(self): self.assertFalse(self.future.done()) self.coordinator.set_result(None) self.assertTrue(self.future.done()) def test_result(self): result = 'foo' self.coordinator.set_result(result) self.coordinator.announce_done() self.assertEqual(self.future.result(), result) def test_keyboard_interrupt_on_result_does_not_block(self): # This should raise a KeyboardInterrupt when result is called on it. self.coordinator = TransferCoordinatorWithInterrupt() self.future = self._get_transfer_future() # result() should not block and immediately raise the keyboard # interrupt exception. with self.assertRaises(KeyboardInterrupt): self.future.result() def test_cancel(self): self.future.cancel() self.assertTrue(self.future.done()) self.assertEqual(self.coordinator.status, 'cancelled') def test_set_exception(self): # Set the result such that there is no exception self.coordinator.set_result('result') self.coordinator.announce_done() self.assertEqual(self.future.result(), 'result') self.future.set_exception(ValueError()) with self.assertRaises(ValueError): self.future.result() def test_set_exception_only_after_done(self): with self.assertRaises(TransferNotDoneError): self.future.set_exception(ValueError()) self.coordinator.set_result('result') self.coordinator.announce_done() self.future.set_exception(ValueError()) with self.assertRaises(ValueError): self.future.result() class TestTransferMeta(unittest.TestCase): def setUp(self): self.transfer_meta = TransferMeta() def test_size(self): self.assertEqual(self.transfer_meta.size, None) self.transfer_meta.provide_transfer_size(5) self.assertEqual(self.transfer_meta.size, 5) def test_call_args(self): call_args = object() transfer_meta = TransferMeta(call_args) # Assert the that call args provided is the same as is returned self.assertIs(transfer_meta.call_args, call_args) def test_transfer_id(self): transfer_meta = TransferMeta(transfer_id=1) self.assertEqual(transfer_meta.transfer_id, 1) def test_user_context(self): self.transfer_meta.user_context['foo'] = 'bar' self.assertEqual(self.transfer_meta.user_context, {'foo': 'bar'}) class TestTransferCoordinator(unittest.TestCase): def setUp(self): self.transfer_coordinator = TransferCoordinator() def test_transfer_id(self): transfer_coordinator = TransferCoordinator(transfer_id=1) self.assertEqual(transfer_coordinator.transfer_id, 1) def test_repr(self): transfer_coordinator = TransferCoordinator(transfer_id=1) self.assertEqual( repr(transfer_coordinator), 'TransferCoordinator(transfer_id=1)' ) def test_initial_status(self): # A TransferCoordinator with no progress should have the status # of not-started self.assertEqual(self.transfer_coordinator.status, 'not-started') def test_set_status_to_queued(self): self.transfer_coordinator.set_status_to_queued() self.assertEqual(self.transfer_coordinator.status, 'queued') def test_cannot_set_status_to_queued_from_done_state(self): self.transfer_coordinator.set_exception(RuntimeError) with self.assertRaises(RuntimeError): self.transfer_coordinator.set_status_to_queued() def test_status_running(self): self.transfer_coordinator.set_status_to_running() self.assertEqual(self.transfer_coordinator.status, 'running') def test_cannot_set_status_to_running_from_done_state(self): self.transfer_coordinator.set_exception(RuntimeError) with self.assertRaises(RuntimeError): self.transfer_coordinator.set_status_to_running() def test_set_result(self): success_result = 'foo' self.transfer_coordinator.set_result(success_result) self.transfer_coordinator.announce_done() # Setting result should result in a success state and the return value # that was set. self.assertEqual(self.transfer_coordinator.status, 'success') self.assertEqual(self.transfer_coordinator.result(), success_result) def test_set_exception(self): exception_result = RuntimeError self.transfer_coordinator.set_exception(exception_result) self.transfer_coordinator.announce_done() # Setting an exception should result in a failed state and the return # value should be the raised exception self.assertEqual(self.transfer_coordinator.status, 'failed') self.assertEqual(self.transfer_coordinator.exception, exception_result) with self.assertRaises(exception_result): self.transfer_coordinator.result() def test_exception_cannot_override_done_state(self): self.transfer_coordinator.set_result('foo') self.transfer_coordinator.set_exception(RuntimeError) # It status should be success even after the exception is set because # success is a done state. self.assertEqual(self.transfer_coordinator.status, 'success') def test_exception_can_override_done_state_with_override_flag(self): self.transfer_coordinator.set_result('foo') self.transfer_coordinator.set_exception(RuntimeError, override=True) self.assertEqual(self.transfer_coordinator.status, 'failed') def test_cancel(self): self.assertEqual(self.transfer_coordinator.status, 'not-started') self.transfer_coordinator.cancel() # This should set the state to cancelled and raise the CancelledError # exception and should have also set the done event so that result() # is no longer set. self.assertEqual(self.transfer_coordinator.status, 'cancelled') with self.assertRaises(CancelledError): self.transfer_coordinator.result() def test_cancel_can_run_done_callbacks_that_uses_result(self): exceptions = [] def capture_exception(transfer_coordinator, captured_exceptions): try: transfer_coordinator.result() except Exception as e: captured_exceptions.append(e) self.assertEqual(self.transfer_coordinator.status, 'not-started') self.transfer_coordinator.add_done_callback( capture_exception, self.transfer_coordinator, exceptions ) self.transfer_coordinator.cancel() self.assertEqual(len(exceptions), 1) self.assertIsInstance(exceptions[0], CancelledError) def test_cancel_with_message(self): message = 'my message' self.transfer_coordinator.cancel(message) self.transfer_coordinator.announce_done() with self.assertRaisesRegex(CancelledError, message): self.transfer_coordinator.result() def test_cancel_with_provided_exception(self): message = 'my message' self.transfer_coordinator.cancel(message, exc_type=FatalError) self.transfer_coordinator.announce_done() with self.assertRaisesRegex(FatalError, message): self.transfer_coordinator.result() def test_cancel_cannot_override_done_state(self): self.transfer_coordinator.set_result('foo') self.transfer_coordinator.cancel() # It status should be success even after cancel is called because # success is a done state. self.assertEqual(self.transfer_coordinator.status, 'success') def test_set_result_can_override_cancel(self): self.transfer_coordinator.cancel() # Result setting should override any cancel or set exception as this # is always invoked by the final task. self.transfer_coordinator.set_result('foo') self.transfer_coordinator.announce_done() self.assertEqual(self.transfer_coordinator.status, 'success') def test_submit(self): # Submit a callable to the transfer coordinator. It should submit it # to the executor. executor = RecordingExecutor( BoundedExecutor(1, 1, {'my-tag': TaskSemaphore(1)}) ) task = ReturnFooTask(self.transfer_coordinator) future = self.transfer_coordinator.submit(executor, task, tag='my-tag') executor.shutdown() # Make sure the future got submit and executed as well by checking its # result value which should include the provided future tag. self.assertEqual( executor.submissions, [{'block': True, 'tag': 'my-tag', 'task': task}], ) self.assertEqual(future.result(), 'foo') def test_association_and_disassociation_on_submit(self): self.transfer_coordinator = RecordingTransferCoordinator() # Submit a callable to the transfer coordinator. executor = BoundedExecutor(1, 1) task = ReturnFooTask(self.transfer_coordinator) future = self.transfer_coordinator.submit(executor, task) executor.shutdown() # Make sure the future that got submitted was associated to the # transfer future at some point. self.assertEqual( self.transfer_coordinator.all_transfer_futures_ever_associated, {future}, ) # Make sure the future got disassociated once the future is now done # by looking at the currently associated futures. self.assertEqual(self.transfer_coordinator.associated_futures, set()) def test_done(self): # These should result in not done state: # queued self.assertFalse(self.transfer_coordinator.done()) # running self.transfer_coordinator.set_status_to_running() self.assertFalse(self.transfer_coordinator.done()) # These should result in done state: # failed self.transfer_coordinator.set_exception(Exception) self.assertTrue(self.transfer_coordinator.done()) # success self.transfer_coordinator.set_result('foo') self.assertTrue(self.transfer_coordinator.done()) # cancelled self.transfer_coordinator.cancel() self.assertTrue(self.transfer_coordinator.done()) def test_result_waits_until_done(self): execution_order = [] def sleep_then_set_result(transfer_coordinator, execution_order): time.sleep(0.05) execution_order.append('setting_result') transfer_coordinator.set_result(None) self.transfer_coordinator.announce_done() with ThreadPoolExecutor(max_workers=1) as executor: executor.submit( sleep_then_set_result, self.transfer_coordinator, execution_order, ) self.transfer_coordinator.result() execution_order.append('after_result') # The result() call should have waited until the other thread set # the result after sleeping for 0.05 seconds. self.assertTrue(execution_order, ['setting_result', 'after_result']) def test_failure_cleanups(self): args = (1, 2) kwargs = {'foo': 'bar'} second_args = (2, 4) second_kwargs = {'biz': 'baz'} self.transfer_coordinator.add_failure_cleanup( return_call_args, *args, **kwargs ) self.transfer_coordinator.add_failure_cleanup( return_call_args, *second_args, **second_kwargs ) # Ensure the callbacks got added. self.assertEqual(len(self.transfer_coordinator.failure_cleanups), 2) result_list = [] # Ensure they will get called in the correct order. for cleanup in self.transfer_coordinator.failure_cleanups: result_list.append(cleanup()) self.assertEqual( result_list, [(args, kwargs), (second_args, second_kwargs)] ) def test_associated_futures(self): first_future = object() # Associate one future to the transfer self.transfer_coordinator.add_associated_future(first_future) associated_futures = self.transfer_coordinator.associated_futures # The first future should be in the returned list of futures. self.assertEqual(associated_futures, {first_future}) second_future = object() # Associate another future to the transfer. self.transfer_coordinator.add_associated_future(second_future) # The association should not have mutated the returned list from # before. self.assertEqual(associated_futures, {first_future}) # Both futures should be in the returned list. self.assertEqual( self.transfer_coordinator.associated_futures, {first_future, second_future}, ) def test_done_callbacks_on_done(self): done_callback_invocations = [] callback = FunctionContainer( done_callback_invocations.append, 'done callback called' ) # Add the done callback to the transfer. self.transfer_coordinator.add_done_callback(callback) # Announce that the transfer is done. This should invoke the done # callback. self.transfer_coordinator.announce_done() self.assertEqual(done_callback_invocations, ['done callback called']) # If done is announced again, we should not invoke the callback again # because done has already been announced and thus the callback has # been ran as well. self.transfer_coordinator.announce_done() self.assertEqual(done_callback_invocations, ['done callback called']) def test_failure_cleanups_on_done(self): cleanup_invocations = [] callback = FunctionContainer( cleanup_invocations.append, 'cleanup called' ) # Add the failure cleanup to the transfer. self.transfer_coordinator.add_failure_cleanup(callback) # Announce that the transfer is done. This should invoke the failure # cleanup. self.transfer_coordinator.announce_done() self.assertEqual(cleanup_invocations, ['cleanup called']) # If done is announced again, we should not invoke the cleanup again # because done has already been announced and thus the cleanup has # been ran as well. self.transfer_coordinator.announce_done() self.assertEqual(cleanup_invocations, ['cleanup called']) class TestBoundedExecutor(unittest.TestCase): def setUp(self): self.coordinator = TransferCoordinator() self.tag_semaphores = {} self.executor = self.get_executor() def get_executor(self, max_size=1, max_num_threads=1): return BoundedExecutor(max_size, max_num_threads, self.tag_semaphores) def get_task(self, task_cls, main_kwargs=None): return task_cls(self.coordinator, main_kwargs=main_kwargs) def get_sleep_task(self, sleep_time=0.01): return self.get_task(SleepTask, main_kwargs={'sleep_time': sleep_time}) def add_semaphore(self, task_tag, count): self.tag_semaphores[task_tag] = TaskSemaphore(count) def assert_submit_would_block(self, task, tag=None): with self.assertRaises(NoResourcesAvailable): self.executor.submit(task, tag=tag, block=False) def assert_submit_would_not_block(self, task, tag=None, **kwargs): try: self.executor.submit(task, tag=tag, block=False) except NoResourcesAvailable: self.fail( 'Task {} should not have been blocked. Caused by:\n{}'.format( task, traceback.format_exc() ) ) def add_done_callback_to_future(self, future, fn, *args, **kwargs): callback_for_future = FunctionContainer(fn, *args, **kwargs) future.add_done_callback(callback_for_future) def test_submit_single_task(self): # Ensure we can submit a task to the executor task = self.get_task(ReturnFooTask) future = self.executor.submit(task) # Ensure what we get back is a Future self.assertIsInstance(future, ExecutorFuture) # Ensure the callable got executed. self.assertEqual(future.result(), 'foo') @unittest.skipIf( os.environ.get('USE_SERIAL_EXECUTOR'), "Not supported with serial executor tests", ) def test_executor_blocks_on_full_capacity(self): first_task = self.get_sleep_task() second_task = self.get_sleep_task() self.executor.submit(first_task) # The first task should be sleeping for a substantial period of # time such that on the submission of the second task, it will # raise an error saying that it cannot be submitted as the max # capacity of the semaphore is one. self.assert_submit_would_block(second_task) def test_executor_clears_capacity_on_done_tasks(self): first_task = self.get_sleep_task() second_task = self.get_task(ReturnFooTask) # Submit a task. future = self.executor.submit(first_task) # Submit a new task when the first task finishes. This should not get # blocked because the first task should have finished clearing up # capacity. self.add_done_callback_to_future( future, self.assert_submit_would_not_block, second_task ) # Wait for it to complete. self.executor.shutdown() @unittest.skipIf( os.environ.get('USE_SERIAL_EXECUTOR'), "Not supported with serial executor tests", ) def test_would_not_block_when_full_capacity_in_other_semaphore(self): first_task = self.get_sleep_task() # Now let's create a new task with a tag and so it uses different # semaphore. task_tag = 'other' other_task = self.get_sleep_task() self.add_semaphore(task_tag, 1) # Submit the normal first task self.executor.submit(first_task) # Even though The first task should be sleeping for a substantial # period of time, the submission of the second task should not # raise an error because it should use a different semaphore self.assert_submit_would_not_block(other_task, task_tag) # Another submission of the other task though should raise # an exception as the capacity is equal to one for that tag. self.assert_submit_would_block(other_task, task_tag) def test_shutdown(self): slow_task = self.get_sleep_task() future = self.executor.submit(slow_task) self.executor.shutdown() # Ensure that the shutdown waits until the task is done self.assertTrue(future.done()) @unittest.skipIf( os.environ.get('USE_SERIAL_EXECUTOR'), "Not supported with serial executor tests", ) def test_shutdown_no_wait(self): slow_task = self.get_sleep_task() future = self.executor.submit(slow_task) self.executor.shutdown(False) # Ensure that the shutdown returns immediately even if the task is # not done, which it should not be because it it slow. self.assertFalse(future.done()) def test_replace_underlying_executor(self): mocked_executor_cls = mock.Mock(BaseExecutor) executor = BoundedExecutor(10, 1, {}, mocked_executor_cls) executor.submit(self.get_task(ReturnFooTask)) self.assertTrue(mocked_executor_cls.return_value.submit.called) class TestExecutorFuture(unittest.TestCase): def test_result(self): with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(return_call_args, 'foo', biz='baz') wrapped_future = ExecutorFuture(future) self.assertEqual(wrapped_future.result(), (('foo',), {'biz': 'baz'})) def test_done(self): with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(return_call_args, 'foo', biz='baz') wrapped_future = ExecutorFuture(future) self.assertTrue(wrapped_future.done()) def test_add_done_callback(self): done_callbacks = [] with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(return_call_args, 'foo', biz='baz') wrapped_future = ExecutorFuture(future) wrapped_future.add_done_callback( FunctionContainer(done_callbacks.append, 'called') ) self.assertEqual(done_callbacks, ['called']) class TestNonThreadedExecutor(unittest.TestCase): def test_submit(self): executor = NonThreadedExecutor() future = executor.submit(return_call_args, 1, 2, foo='bar') self.assertIsInstance(future, NonThreadedExecutorFuture) self.assertEqual(future.result(), ((1, 2), {'foo': 'bar'})) def test_submit_with_exception(self): executor = NonThreadedExecutor() future = executor.submit(raise_exception, RuntimeError()) self.assertIsInstance(future, NonThreadedExecutorFuture) with self.assertRaises(RuntimeError): future.result() def test_submit_with_exception_and_captures_info(self): exception = ValueError('message') tb = get_exc_info(exception)[2] future = NonThreadedExecutor().submit(raise_exception, exception) try: future.result() # An exception should have been raised self.fail('Future should have raised a ValueError') except ValueError: actual_tb = sys.exc_info()[2] last_frame = traceback.extract_tb(actual_tb)[-1] last_expected_frame = traceback.extract_tb(tb)[-1] self.assertEqual(last_frame, last_expected_frame) class TestNonThreadedExecutorFuture(unittest.TestCase): def setUp(self): self.future = NonThreadedExecutorFuture() def test_done_starts_false(self): self.assertFalse(self.future.done()) def test_done_after_setting_result(self): self.future.set_result('result') self.assertTrue(self.future.done()) def test_done_after_setting_exception(self): self.future.set_exception_info(Exception(), None) self.assertTrue(self.future.done()) def test_result(self): self.future.set_result('result') self.assertEqual(self.future.result(), 'result') def test_exception_result(self): exception = ValueError('message') self.future.set_exception_info(exception, None) with self.assertRaisesRegex(ValueError, 'message'): self.future.result() def test_exception_result_doesnt_modify_last_frame(self): exception = ValueError('message') tb = get_exc_info(exception)[2] self.future.set_exception_info(exception, tb) try: self.future.result() # An exception should have been raised self.fail() except ValueError: actual_tb = sys.exc_info()[2] last_frame = traceback.extract_tb(actual_tb)[-1] last_expected_frame = traceback.extract_tb(tb)[-1] self.assertEqual(last_frame, last_expected_frame) def test_done_callback(self): done_futures = [] self.future.add_done_callback(done_futures.append) self.assertEqual(done_futures, []) self.future.set_result('result') self.assertEqual(done_futures, [self.future]) def test_done_callback_after_done(self): self.future.set_result('result') done_futures = [] self.future.add_done_callback(done_futures.append) self.assertEqual(done_futures, [self.future])