test_processpool.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import glob
  14. import os
  15. import time
  16. from s3transfer.processpool import ProcessPoolDownloader, ProcessTransferConfig
  17. from tests import assert_files_equal
  18. from tests.integration import BaseTransferManagerIntegTest
  19. class TestProcessPoolDownloader(BaseTransferManagerIntegTest):
  20. def setUp(self):
  21. super().setUp()
  22. self.multipart_threshold = 5 * 1024 * 1024
  23. self.config = ProcessTransferConfig(
  24. multipart_threshold=self.multipart_threshold
  25. )
  26. self.client_kwargs = {'region_name': self.region}
  27. def create_process_pool_downloader(self, client_kwargs=None, config=None):
  28. if client_kwargs is None:
  29. client_kwargs = self.client_kwargs
  30. if config is None:
  31. config = self.config
  32. return ProcessPoolDownloader(
  33. client_kwargs=client_kwargs, config=config
  34. )
  35. def test_below_threshold(self):
  36. downloader = self.create_process_pool_downloader()
  37. filename = self.files.create_file_with_size(
  38. 'foo.txt', filesize=1024 * 1024
  39. )
  40. self.upload_file(filename, '1mb.txt')
  41. download_path = os.path.join(self.files.rootdir, '1mb.txt')
  42. with downloader:
  43. downloader.download_file(
  44. self.bucket_name, '1mb.txt', download_path
  45. )
  46. assert_files_equal(filename, download_path)
  47. def test_above_threshold(self):
  48. downloader = self.create_process_pool_downloader()
  49. filename = self.files.create_file_with_size(
  50. 'foo.txt', filesize=20 * 1024 * 1024
  51. )
  52. self.upload_file(filename, '20mb.txt')
  53. download_path = os.path.join(self.files.rootdir, '20mb.txt')
  54. with downloader:
  55. downloader.download_file(
  56. self.bucket_name, '20mb.txt', download_path
  57. )
  58. assert_files_equal(filename, download_path)
  59. def test_large_download_exits_quickly_on_exception(self):
  60. downloader = self.create_process_pool_downloader()
  61. filename = self.files.create_file_with_size(
  62. 'foo.txt', filesize=60 * 1024 * 1024
  63. )
  64. self.upload_file(filename, '60mb.txt')
  65. download_path = os.path.join(self.files.rootdir, '60mb.txt')
  66. sleep_time = 0.2
  67. try:
  68. with downloader:
  69. downloader.download_file(
  70. self.bucket_name, '60mb.txt', download_path
  71. )
  72. # Sleep for a little to get the transfer process going
  73. time.sleep(sleep_time)
  74. # Raise an exception which should cause the preceding
  75. # download to cancel and exit quickly
  76. start_time = time.time()
  77. raise KeyboardInterrupt()
  78. except KeyboardInterrupt:
  79. pass
  80. end_time = time.time()
  81. # The maximum time allowed for the transfer manager to exit.
  82. # This means that it should take less than a couple second after
  83. # sleeping to exit.
  84. max_allowed_exit_time = 5
  85. self.assertLess(
  86. end_time - start_time,
  87. max_allowed_exit_time,
  88. "Failed to exit under {}. Instead exited in {}.".format(
  89. max_allowed_exit_time, end_time - start_time
  90. ),
  91. )
  92. # Make sure the actual file and the temporary do not exist
  93. # by globbing for the file and any of its extensions
  94. possible_matches = glob.glob('%s*' % download_path)
  95. self.assertEqual(possible_matches, [])
  96. def test_many_files_exits_quickly_on_exception(self):
  97. downloader = self.create_process_pool_downloader()
  98. filename = self.files.create_file_with_size(
  99. '1mb.txt', filesize=1024 * 1024
  100. )
  101. self.upload_file(filename, '1mb.txt')
  102. filenames = []
  103. base_filename = os.path.join(self.files.rootdir, 'file')
  104. for i in range(10):
  105. filenames.append(base_filename + str(i))
  106. try:
  107. with downloader:
  108. start_time = time.time()
  109. for filename in filenames:
  110. downloader.download_file(
  111. self.bucket_name, '1mb.txt', filename
  112. )
  113. # Raise an exception which should cause the preceding
  114. # transfer to cancel and exit quickly
  115. raise KeyboardInterrupt()
  116. except KeyboardInterrupt:
  117. pass
  118. end_time = time.time()
  119. # The maximum time allowed for the transfer manager to exit.
  120. # This means that it should take less than a couple seconds to exit.
  121. max_allowed_exit_time = 5
  122. self.assertLess(
  123. end_time - start_time,
  124. max_allowed_exit_time,
  125. "Failed to exit under {}. Instead exited in {}.".format(
  126. max_allowed_exit_time, end_time - start_time
  127. ),
  128. )
  129. # For the transfer that did get cancelled, make sure the temporary
  130. # file got removed.
  131. possible_matches = glob.glob('%s*' % base_filename)
  132. self.assertEqual(possible_matches, [])