test_manager.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import time
  14. from concurrent.futures import ThreadPoolExecutor
  15. from s3transfer.exceptions import CancelledError, FatalError
  16. from s3transfer.futures import TransferCoordinator
  17. from s3transfer.manager import TransferConfig, TransferCoordinatorController
  18. from tests import TransferCoordinatorWithInterrupt, unittest
  19. class FutureResultException(Exception):
  20. pass
  21. class TestTransferConfig(unittest.TestCase):
  22. def test_exception_on_zero_attr_value(self):
  23. with self.assertRaises(ValueError):
  24. TransferConfig(max_request_queue_size=0)
  25. class TestTransferCoordinatorController(unittest.TestCase):
  26. def setUp(self):
  27. self.coordinator_controller = TransferCoordinatorController()
  28. def sleep_then_announce_done(self, transfer_coordinator, sleep_time):
  29. time.sleep(sleep_time)
  30. transfer_coordinator.set_result('done')
  31. transfer_coordinator.announce_done()
  32. def assert_coordinator_is_cancelled(self, transfer_coordinator):
  33. self.assertEqual(transfer_coordinator.status, 'cancelled')
  34. def test_add_transfer_coordinator(self):
  35. transfer_coordinator = TransferCoordinator()
  36. # Add the transfer coordinator
  37. self.coordinator_controller.add_transfer_coordinator(
  38. transfer_coordinator
  39. )
  40. # Ensure that is tracked.
  41. self.assertEqual(
  42. self.coordinator_controller.tracked_transfer_coordinators,
  43. {transfer_coordinator},
  44. )
  45. def test_remove_transfer_coordinator(self):
  46. transfer_coordinator = TransferCoordinator()
  47. # Add the coordinator
  48. self.coordinator_controller.add_transfer_coordinator(
  49. transfer_coordinator
  50. )
  51. # Now remove the coordinator
  52. self.coordinator_controller.remove_transfer_coordinator(
  53. transfer_coordinator
  54. )
  55. # Make sure that it is no longer getting tracked.
  56. self.assertEqual(
  57. self.coordinator_controller.tracked_transfer_coordinators, set()
  58. )
  59. def test_cancel(self):
  60. transfer_coordinator = TransferCoordinator()
  61. # Add the transfer coordinator
  62. self.coordinator_controller.add_transfer_coordinator(
  63. transfer_coordinator
  64. )
  65. # Cancel with the canceler
  66. self.coordinator_controller.cancel()
  67. # Check that coordinator got canceled
  68. self.assert_coordinator_is_cancelled(transfer_coordinator)
  69. def test_cancel_with_message(self):
  70. message = 'my cancel message'
  71. transfer_coordinator = TransferCoordinator()
  72. self.coordinator_controller.add_transfer_coordinator(
  73. transfer_coordinator
  74. )
  75. self.coordinator_controller.cancel(message)
  76. transfer_coordinator.announce_done()
  77. with self.assertRaisesRegex(CancelledError, message):
  78. transfer_coordinator.result()
  79. def test_cancel_with_provided_exception(self):
  80. message = 'my cancel message'
  81. transfer_coordinator = TransferCoordinator()
  82. self.coordinator_controller.add_transfer_coordinator(
  83. transfer_coordinator
  84. )
  85. self.coordinator_controller.cancel(message, exc_type=FatalError)
  86. transfer_coordinator.announce_done()
  87. with self.assertRaisesRegex(FatalError, message):
  88. transfer_coordinator.result()
  89. def test_wait_for_done_transfer_coordinators(self):
  90. # Create a coordinator and add it to the canceler
  91. transfer_coordinator = TransferCoordinator()
  92. self.coordinator_controller.add_transfer_coordinator(
  93. transfer_coordinator
  94. )
  95. sleep_time = 0.02
  96. with ThreadPoolExecutor(max_workers=1) as executor:
  97. # In a separate thread sleep and then set the transfer coordinator
  98. # to done after sleeping.
  99. start_time = time.time()
  100. executor.submit(
  101. self.sleep_then_announce_done, transfer_coordinator, sleep_time
  102. )
  103. # Now call wait to wait for the transfer coordinator to be done.
  104. self.coordinator_controller.wait()
  105. end_time = time.time()
  106. wait_time = end_time - start_time
  107. # The time waited should not be less than the time it took to sleep in
  108. # the separate thread because the wait ending should be dependent on
  109. # the sleeping thread announcing that the transfer coordinator is done.
  110. self.assertTrue(sleep_time <= wait_time)
  111. def test_wait_does_not_propogate_exceptions_from_result(self):
  112. transfer_coordinator = TransferCoordinator()
  113. transfer_coordinator.set_exception(FutureResultException())
  114. transfer_coordinator.announce_done()
  115. try:
  116. self.coordinator_controller.wait()
  117. except FutureResultException as e:
  118. self.fail('%s should not have been raised.' % e)
  119. def test_wait_can_be_interrupted(self):
  120. inject_interrupt_coordinator = TransferCoordinatorWithInterrupt()
  121. self.coordinator_controller.add_transfer_coordinator(
  122. inject_interrupt_coordinator
  123. )
  124. with self.assertRaises(KeyboardInterrupt):
  125. self.coordinator_controller.wait()