test_processpool.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. # Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import os
  14. import queue
  15. import signal
  16. import threading
  17. import time
  18. from io import BytesIO
  19. from botocore.client import BaseClient
  20. from botocore.config import Config
  21. from botocore.exceptions import ClientError, ReadTimeoutError
  22. from s3transfer.constants import PROCESS_USER_AGENT
  23. from s3transfer.exceptions import CancelledError, RetriesExceededError
  24. from s3transfer.processpool import (
  25. SHUTDOWN_SIGNAL,
  26. ClientFactory,
  27. DownloadFileRequest,
  28. GetObjectJob,
  29. GetObjectSubmitter,
  30. GetObjectWorker,
  31. ProcessPoolDownloader,
  32. ProcessPoolTransferFuture,
  33. ProcessPoolTransferMeta,
  34. ProcessTransferConfig,
  35. TransferMonitor,
  36. TransferState,
  37. ignore_ctrl_c,
  38. )
  39. from s3transfer.utils import CallArgs, OSUtils
  40. from tests import (
  41. FileCreator,
  42. StreamWithError,
  43. StubbedClientTest,
  44. mock,
  45. skip_if_windows,
  46. unittest,
  47. )
  48. class RenameFailingOSUtils(OSUtils):
  49. def __init__(self, exception):
  50. self.exception = exception
  51. def rename_file(self, current_filename, new_filename):
  52. raise self.exception
  53. class TestIgnoreCtrlC(unittest.TestCase):
  54. @skip_if_windows('os.kill() with SIGINT not supported on Windows')
  55. def test_ignore_ctrl_c(self):
  56. with ignore_ctrl_c():
  57. try:
  58. os.kill(os.getpid(), signal.SIGINT)
  59. except KeyboardInterrupt:
  60. self.fail(
  61. 'The ignore_ctrl_c context manager should have '
  62. 'ignored the KeyboardInterrupt exception'
  63. )
  64. class TestProcessPoolDownloader(unittest.TestCase):
  65. def test_uses_client_kwargs(self):
  66. with mock.patch('s3transfer.processpool.ClientFactory') as factory:
  67. ProcessPoolDownloader(client_kwargs={'region_name': 'myregion'})
  68. self.assertEqual(
  69. factory.call_args[0][0], {'region_name': 'myregion'}
  70. )
  71. class TestProcessPoolTransferFuture(unittest.TestCase):
  72. def setUp(self):
  73. self.monitor = TransferMonitor()
  74. self.transfer_id = self.monitor.notify_new_transfer()
  75. self.meta = ProcessPoolTransferMeta(
  76. transfer_id=self.transfer_id, call_args=CallArgs()
  77. )
  78. self.future = ProcessPoolTransferFuture(
  79. monitor=self.monitor, meta=self.meta
  80. )
  81. def test_meta(self):
  82. self.assertEqual(self.future.meta, self.meta)
  83. def test_done(self):
  84. self.assertFalse(self.future.done())
  85. self.monitor.notify_done(self.transfer_id)
  86. self.assertTrue(self.future.done())
  87. def test_result(self):
  88. self.monitor.notify_done(self.transfer_id)
  89. self.assertIsNone(self.future.result())
  90. def test_result_with_exception(self):
  91. self.monitor.notify_exception(self.transfer_id, RuntimeError())
  92. self.monitor.notify_done(self.transfer_id)
  93. with self.assertRaises(RuntimeError):
  94. self.future.result()
  95. def test_result_with_keyboard_interrupt(self):
  96. mock_monitor = mock.Mock(TransferMonitor)
  97. mock_monitor._connect = mock.Mock()
  98. mock_monitor.poll_for_result.side_effect = KeyboardInterrupt()
  99. future = ProcessPoolTransferFuture(
  100. monitor=mock_monitor, meta=self.meta
  101. )
  102. with self.assertRaises(KeyboardInterrupt):
  103. future.result()
  104. self.assertTrue(mock_monitor._connect.called)
  105. self.assertTrue(mock_monitor.notify_exception.called)
  106. call_args = mock_monitor.notify_exception.call_args[0]
  107. self.assertEqual(call_args[0], self.transfer_id)
  108. self.assertIsInstance(call_args[1], CancelledError)
  109. def test_cancel(self):
  110. self.future.cancel()
  111. self.monitor.notify_done(self.transfer_id)
  112. with self.assertRaises(CancelledError):
  113. self.future.result()
  114. class TestProcessPoolTransferMeta(unittest.TestCase):
  115. def test_transfer_id(self):
  116. meta = ProcessPoolTransferMeta(1, CallArgs())
  117. self.assertEqual(meta.transfer_id, 1)
  118. def test_call_args(self):
  119. call_args = CallArgs()
  120. meta = ProcessPoolTransferMeta(1, call_args)
  121. self.assertEqual(meta.call_args, call_args)
  122. def test_user_context(self):
  123. meta = ProcessPoolTransferMeta(1, CallArgs())
  124. self.assertEqual(meta.user_context, {})
  125. meta.user_context['mykey'] = 'myvalue'
  126. self.assertEqual(meta.user_context, {'mykey': 'myvalue'})
  127. class TestClientFactory(unittest.TestCase):
  128. def test_create_client(self):
  129. client = ClientFactory().create_client()
  130. self.assertIsInstance(client, BaseClient)
  131. self.assertEqual(client.meta.service_model.service_name, 's3')
  132. self.assertIn(PROCESS_USER_AGENT, client.meta.config.user_agent)
  133. def test_create_client_with_client_kwargs(self):
  134. client = ClientFactory({'region_name': 'myregion'}).create_client()
  135. self.assertEqual(client.meta.region_name, 'myregion')
  136. def test_user_agent_with_config(self):
  137. client = ClientFactory({'config': Config()}).create_client()
  138. self.assertIn(PROCESS_USER_AGENT, client.meta.config.user_agent)
  139. def test_user_agent_with_existing_user_agent_extra(self):
  140. config = Config(user_agent_extra='foo/1.0')
  141. client = ClientFactory({'config': config}).create_client()
  142. self.assertIn(PROCESS_USER_AGENT, client.meta.config.user_agent)
  143. def test_user_agent_with_existing_user_agent(self):
  144. config = Config(user_agent='foo/1.0')
  145. client = ClientFactory({'config': config}).create_client()
  146. self.assertIn(PROCESS_USER_AGENT, client.meta.config.user_agent)
  147. class TestTransferMonitor(unittest.TestCase):
  148. def setUp(self):
  149. self.monitor = TransferMonitor()
  150. self.transfer_id = self.monitor.notify_new_transfer()
  151. def test_notify_new_transfer_creates_new_state(self):
  152. monitor = TransferMonitor()
  153. transfer_id = monitor.notify_new_transfer()
  154. self.assertFalse(monitor.is_done(transfer_id))
  155. self.assertIsNone(monitor.get_exception(transfer_id))
  156. def test_notify_new_transfer_increments_transfer_id(self):
  157. monitor = TransferMonitor()
  158. self.assertEqual(monitor.notify_new_transfer(), 0)
  159. self.assertEqual(monitor.notify_new_transfer(), 1)
  160. def test_notify_get_exception(self):
  161. exception = Exception()
  162. self.monitor.notify_exception(self.transfer_id, exception)
  163. self.assertEqual(
  164. self.monitor.get_exception(self.transfer_id), exception
  165. )
  166. def test_get_no_exception(self):
  167. self.assertIsNone(self.monitor.get_exception(self.transfer_id))
  168. def test_notify_jobs(self):
  169. self.monitor.notify_expected_jobs_to_complete(self.transfer_id, 2)
  170. self.assertEqual(self.monitor.notify_job_complete(self.transfer_id), 1)
  171. self.assertEqual(self.monitor.notify_job_complete(self.transfer_id), 0)
  172. def test_notify_jobs_for_multiple_transfers(self):
  173. self.monitor.notify_expected_jobs_to_complete(self.transfer_id, 2)
  174. other_transfer_id = self.monitor.notify_new_transfer()
  175. self.monitor.notify_expected_jobs_to_complete(other_transfer_id, 2)
  176. self.assertEqual(self.monitor.notify_job_complete(self.transfer_id), 1)
  177. self.assertEqual(
  178. self.monitor.notify_job_complete(other_transfer_id), 1
  179. )
  180. def test_done(self):
  181. self.assertFalse(self.monitor.is_done(self.transfer_id))
  182. self.monitor.notify_done(self.transfer_id)
  183. self.assertTrue(self.monitor.is_done(self.transfer_id))
  184. def test_poll_for_result(self):
  185. self.monitor.notify_done(self.transfer_id)
  186. self.assertIsNone(self.monitor.poll_for_result(self.transfer_id))
  187. def test_poll_for_result_raises_error(self):
  188. self.monitor.notify_exception(self.transfer_id, RuntimeError())
  189. self.monitor.notify_done(self.transfer_id)
  190. with self.assertRaises(RuntimeError):
  191. self.monitor.poll_for_result(self.transfer_id)
  192. def test_poll_for_result_waits_till_done(self):
  193. event_order = []
  194. def sleep_then_notify_done():
  195. time.sleep(0.05)
  196. event_order.append('notify_done')
  197. self.monitor.notify_done(self.transfer_id)
  198. t = threading.Thread(target=sleep_then_notify_done)
  199. t.start()
  200. self.monitor.poll_for_result(self.transfer_id)
  201. event_order.append('done_polling')
  202. self.assertEqual(event_order, ['notify_done', 'done_polling'])
  203. def test_notify_cancel_all_in_progress(self):
  204. monitor = TransferMonitor()
  205. transfer_ids = []
  206. for _ in range(10):
  207. transfer_ids.append(monitor.notify_new_transfer())
  208. monitor.notify_cancel_all_in_progress()
  209. for transfer_id in transfer_ids:
  210. self.assertIsInstance(
  211. monitor.get_exception(transfer_id), CancelledError
  212. )
  213. # Cancelling a transfer does not mean it is done as there may
  214. # be cleanup work left to do.
  215. self.assertFalse(monitor.is_done(transfer_id))
  216. def test_notify_cancel_does_not_affect_done_transfers(self):
  217. self.monitor.notify_done(self.transfer_id)
  218. self.monitor.notify_cancel_all_in_progress()
  219. self.assertTrue(self.monitor.is_done(self.transfer_id))
  220. self.assertIsNone(self.monitor.get_exception(self.transfer_id))
  221. class TestTransferState(unittest.TestCase):
  222. def setUp(self):
  223. self.state = TransferState()
  224. def test_done(self):
  225. self.assertFalse(self.state.done)
  226. self.state.set_done()
  227. self.assertTrue(self.state.done)
  228. def test_waits_till_done_is_set(self):
  229. event_order = []
  230. def sleep_then_set_done():
  231. time.sleep(0.05)
  232. event_order.append('set_done')
  233. self.state.set_done()
  234. t = threading.Thread(target=sleep_then_set_done)
  235. t.start()
  236. self.state.wait_till_done()
  237. event_order.append('done_waiting')
  238. self.assertEqual(event_order, ['set_done', 'done_waiting'])
  239. def test_exception(self):
  240. exception = RuntimeError()
  241. self.state.exception = exception
  242. self.assertEqual(self.state.exception, exception)
  243. def test_jobs_to_complete(self):
  244. self.state.jobs_to_complete = 5
  245. self.assertEqual(self.state.jobs_to_complete, 5)
  246. def test_decrement_jobs_to_complete(self):
  247. self.state.jobs_to_complete = 5
  248. self.assertEqual(self.state.decrement_jobs_to_complete(), 4)
  249. class TestGetObjectSubmitter(StubbedClientTest):
  250. def setUp(self):
  251. super().setUp()
  252. self.transfer_config = ProcessTransferConfig()
  253. self.client_factory = mock.Mock(ClientFactory)
  254. self.client_factory.create_client.return_value = self.client
  255. self.transfer_monitor = TransferMonitor()
  256. self.osutil = mock.Mock(OSUtils)
  257. self.download_request_queue = queue.Queue()
  258. self.worker_queue = queue.Queue()
  259. self.submitter = GetObjectSubmitter(
  260. transfer_config=self.transfer_config,
  261. client_factory=self.client_factory,
  262. transfer_monitor=self.transfer_monitor,
  263. osutil=self.osutil,
  264. download_request_queue=self.download_request_queue,
  265. worker_queue=self.worker_queue,
  266. )
  267. self.transfer_id = self.transfer_monitor.notify_new_transfer()
  268. self.bucket = 'bucket'
  269. self.key = 'key'
  270. self.filename = 'myfile'
  271. self.temp_filename = 'myfile.temp'
  272. self.osutil.get_temp_filename.return_value = self.temp_filename
  273. self.extra_args = {}
  274. self.expected_size = None
  275. def add_download_file_request(self, **override_kwargs):
  276. kwargs = {
  277. 'transfer_id': self.transfer_id,
  278. 'bucket': self.bucket,
  279. 'key': self.key,
  280. 'filename': self.filename,
  281. 'extra_args': self.extra_args,
  282. 'expected_size': self.expected_size,
  283. }
  284. kwargs.update(override_kwargs)
  285. self.download_request_queue.put(DownloadFileRequest(**kwargs))
  286. def add_shutdown(self):
  287. self.download_request_queue.put(SHUTDOWN_SIGNAL)
  288. def assert_submitted_get_object_jobs(self, expected_jobs):
  289. actual_jobs = []
  290. while not self.worker_queue.empty():
  291. actual_jobs.append(self.worker_queue.get())
  292. self.assertEqual(actual_jobs, expected_jobs)
  293. def test_run_for_non_ranged_download(self):
  294. self.add_download_file_request(expected_size=1)
  295. self.add_shutdown()
  296. self.submitter.run()
  297. self.osutil.allocate.assert_called_with(self.temp_filename, 1)
  298. self.assert_submitted_get_object_jobs(
  299. [
  300. GetObjectJob(
  301. transfer_id=self.transfer_id,
  302. bucket=self.bucket,
  303. key=self.key,
  304. temp_filename=self.temp_filename,
  305. offset=0,
  306. extra_args={},
  307. filename=self.filename,
  308. )
  309. ]
  310. )
  311. def test_run_for_ranged_download(self):
  312. self.transfer_config.multipart_chunksize = 2
  313. self.transfer_config.multipart_threshold = 4
  314. self.add_download_file_request(expected_size=4)
  315. self.add_shutdown()
  316. self.submitter.run()
  317. self.osutil.allocate.assert_called_with(self.temp_filename, 4)
  318. self.assert_submitted_get_object_jobs(
  319. [
  320. GetObjectJob(
  321. transfer_id=self.transfer_id,
  322. bucket=self.bucket,
  323. key=self.key,
  324. temp_filename=self.temp_filename,
  325. offset=0,
  326. extra_args={'Range': 'bytes=0-1'},
  327. filename=self.filename,
  328. ),
  329. GetObjectJob(
  330. transfer_id=self.transfer_id,
  331. bucket=self.bucket,
  332. key=self.key,
  333. temp_filename=self.temp_filename,
  334. offset=2,
  335. extra_args={'Range': 'bytes=2-'},
  336. filename=self.filename,
  337. ),
  338. ]
  339. )
  340. def test_run_when_expected_size_not_provided(self):
  341. self.stubber.add_response(
  342. 'head_object',
  343. {'ContentLength': 1},
  344. expected_params={'Bucket': self.bucket, 'Key': self.key},
  345. )
  346. self.add_download_file_request(expected_size=None)
  347. self.add_shutdown()
  348. self.submitter.run()
  349. self.stubber.assert_no_pending_responses()
  350. self.osutil.allocate.assert_called_with(self.temp_filename, 1)
  351. self.assert_submitted_get_object_jobs(
  352. [
  353. GetObjectJob(
  354. transfer_id=self.transfer_id,
  355. bucket=self.bucket,
  356. key=self.key,
  357. temp_filename=self.temp_filename,
  358. offset=0,
  359. extra_args={},
  360. filename=self.filename,
  361. )
  362. ]
  363. )
  364. def test_run_with_extra_args(self):
  365. self.stubber.add_response(
  366. 'head_object',
  367. {'ContentLength': 1},
  368. expected_params={
  369. 'Bucket': self.bucket,
  370. 'Key': self.key,
  371. 'VersionId': 'versionid',
  372. },
  373. )
  374. self.add_download_file_request(
  375. extra_args={'VersionId': 'versionid'}, expected_size=None
  376. )
  377. self.add_shutdown()
  378. self.submitter.run()
  379. self.stubber.assert_no_pending_responses()
  380. self.osutil.allocate.assert_called_with(self.temp_filename, 1)
  381. self.assert_submitted_get_object_jobs(
  382. [
  383. GetObjectJob(
  384. transfer_id=self.transfer_id,
  385. bucket=self.bucket,
  386. key=self.key,
  387. temp_filename=self.temp_filename,
  388. offset=0,
  389. extra_args={'VersionId': 'versionid'},
  390. filename=self.filename,
  391. )
  392. ]
  393. )
  394. def test_run_with_exception(self):
  395. self.stubber.add_client_error('head_object', 'NoSuchKey', 404)
  396. self.add_download_file_request(expected_size=None)
  397. self.add_shutdown()
  398. self.submitter.run()
  399. self.stubber.assert_no_pending_responses()
  400. self.assert_submitted_get_object_jobs([])
  401. self.assertIsInstance(
  402. self.transfer_monitor.get_exception(self.transfer_id), ClientError
  403. )
  404. def test_run_with_error_in_allocating_temp_file(self):
  405. self.osutil.allocate.side_effect = OSError()
  406. self.add_download_file_request(expected_size=1)
  407. self.add_shutdown()
  408. self.submitter.run()
  409. self.assert_submitted_get_object_jobs([])
  410. self.assertIsInstance(
  411. self.transfer_monitor.get_exception(self.transfer_id), OSError
  412. )
  413. @skip_if_windows('os.kill() with SIGINT not supported on Windows')
  414. def test_submitter_cannot_be_killed(self):
  415. self.add_download_file_request(expected_size=None)
  416. self.add_shutdown()
  417. def raise_ctrl_c(**kwargs):
  418. os.kill(os.getpid(), signal.SIGINT)
  419. mock_client = mock.Mock()
  420. mock_client.head_object = raise_ctrl_c
  421. self.client_factory.create_client.return_value = mock_client
  422. try:
  423. self.submitter.run()
  424. except KeyboardInterrupt:
  425. self.fail(
  426. 'The submitter should have not been killed by the '
  427. 'KeyboardInterrupt'
  428. )
  429. class TestGetObjectWorker(StubbedClientTest):
  430. def setUp(self):
  431. super().setUp()
  432. self.files = FileCreator()
  433. self.queue = queue.Queue()
  434. self.client_factory = mock.Mock(ClientFactory)
  435. self.client_factory.create_client.return_value = self.client
  436. self.transfer_monitor = TransferMonitor()
  437. self.osutil = OSUtils()
  438. self.worker = GetObjectWorker(
  439. queue=self.queue,
  440. client_factory=self.client_factory,
  441. transfer_monitor=self.transfer_monitor,
  442. osutil=self.osutil,
  443. )
  444. self.transfer_id = self.transfer_monitor.notify_new_transfer()
  445. self.bucket = 'bucket'
  446. self.key = 'key'
  447. self.remote_contents = b'my content'
  448. self.temp_filename = self.files.create_file('tempfile', '')
  449. self.extra_args = {}
  450. self.offset = 0
  451. self.final_filename = self.files.full_path('final_filename')
  452. self.stream = BytesIO(self.remote_contents)
  453. self.transfer_monitor.notify_expected_jobs_to_complete(
  454. self.transfer_id, 1000
  455. )
  456. def tearDown(self):
  457. super().tearDown()
  458. self.files.remove_all()
  459. def add_get_object_job(self, **override_kwargs):
  460. kwargs = {
  461. 'transfer_id': self.transfer_id,
  462. 'bucket': self.bucket,
  463. 'key': self.key,
  464. 'temp_filename': self.temp_filename,
  465. 'extra_args': self.extra_args,
  466. 'offset': self.offset,
  467. 'filename': self.final_filename,
  468. }
  469. kwargs.update(override_kwargs)
  470. self.queue.put(GetObjectJob(**kwargs))
  471. def add_shutdown(self):
  472. self.queue.put(SHUTDOWN_SIGNAL)
  473. def add_stubbed_get_object_response(self, body=None, expected_params=None):
  474. if body is None:
  475. body = self.stream
  476. get_object_response = {'Body': body}
  477. if expected_params is None:
  478. expected_params = {'Bucket': self.bucket, 'Key': self.key}
  479. self.stubber.add_response(
  480. 'get_object', get_object_response, expected_params
  481. )
  482. def assert_contents(self, filename, contents):
  483. self.assertTrue(os.path.exists(filename))
  484. with open(filename, 'rb') as f:
  485. self.assertEqual(f.read(), contents)
  486. def assert_does_not_exist(self, filename):
  487. self.assertFalse(os.path.exists(filename))
  488. def test_run_is_final_job(self):
  489. self.add_get_object_job()
  490. self.add_shutdown()
  491. self.add_stubbed_get_object_response()
  492. self.transfer_monitor.notify_expected_jobs_to_complete(
  493. self.transfer_id, 1
  494. )
  495. self.worker.run()
  496. self.stubber.assert_no_pending_responses()
  497. self.assert_does_not_exist(self.temp_filename)
  498. self.assert_contents(self.final_filename, self.remote_contents)
  499. def test_run_jobs_is_not_final_job(self):
  500. self.add_get_object_job()
  501. self.add_shutdown()
  502. self.add_stubbed_get_object_response()
  503. self.transfer_monitor.notify_expected_jobs_to_complete(
  504. self.transfer_id, 1000
  505. )
  506. self.worker.run()
  507. self.stubber.assert_no_pending_responses()
  508. self.assert_contents(self.temp_filename, self.remote_contents)
  509. self.assert_does_not_exist(self.final_filename)
  510. def test_run_with_extra_args(self):
  511. self.add_get_object_job(extra_args={'VersionId': 'versionid'})
  512. self.add_shutdown()
  513. self.add_stubbed_get_object_response(
  514. expected_params={
  515. 'Bucket': self.bucket,
  516. 'Key': self.key,
  517. 'VersionId': 'versionid',
  518. }
  519. )
  520. self.worker.run()
  521. self.stubber.assert_no_pending_responses()
  522. def test_run_with_offset(self):
  523. offset = 1
  524. self.add_get_object_job(offset=offset)
  525. self.add_shutdown()
  526. self.add_stubbed_get_object_response()
  527. self.worker.run()
  528. with open(self.temp_filename, 'rb') as f:
  529. f.seek(offset)
  530. self.assertEqual(f.read(), self.remote_contents)
  531. def test_run_error_in_get_object(self):
  532. self.add_get_object_job()
  533. self.add_shutdown()
  534. self.stubber.add_client_error('get_object', 'NoSuchKey', 404)
  535. self.add_stubbed_get_object_response()
  536. self.worker.run()
  537. self.assertIsInstance(
  538. self.transfer_monitor.get_exception(self.transfer_id), ClientError
  539. )
  540. def test_run_does_retries_for_get_object(self):
  541. self.add_get_object_job()
  542. self.add_shutdown()
  543. self.add_stubbed_get_object_response(
  544. body=StreamWithError(
  545. self.stream, ReadTimeoutError(endpoint_url='')
  546. )
  547. )
  548. self.add_stubbed_get_object_response()
  549. self.worker.run()
  550. self.stubber.assert_no_pending_responses()
  551. self.assert_contents(self.temp_filename, self.remote_contents)
  552. def test_run_can_exhaust_retries_for_get_object(self):
  553. self.add_get_object_job()
  554. self.add_shutdown()
  555. # 5 is the current setting for max number of GetObject attempts
  556. for _ in range(5):
  557. self.add_stubbed_get_object_response(
  558. body=StreamWithError(
  559. self.stream, ReadTimeoutError(endpoint_url='')
  560. )
  561. )
  562. self.worker.run()
  563. self.stubber.assert_no_pending_responses()
  564. self.assertIsInstance(
  565. self.transfer_monitor.get_exception(self.transfer_id),
  566. RetriesExceededError,
  567. )
  568. def test_run_skips_get_object_on_previous_exception(self):
  569. self.add_get_object_job()
  570. self.add_shutdown()
  571. self.transfer_monitor.notify_exception(self.transfer_id, Exception())
  572. self.worker.run()
  573. # Note we did not add a stubbed response for get_object
  574. self.stubber.assert_no_pending_responses()
  575. def test_run_final_job_removes_file_on_previous_exception(self):
  576. self.add_get_object_job()
  577. self.add_shutdown()
  578. self.transfer_monitor.notify_exception(self.transfer_id, Exception())
  579. self.transfer_monitor.notify_expected_jobs_to_complete(
  580. self.transfer_id, 1
  581. )
  582. self.worker.run()
  583. self.stubber.assert_no_pending_responses()
  584. self.assert_does_not_exist(self.temp_filename)
  585. self.assert_does_not_exist(self.final_filename)
  586. def test_run_fails_to_rename_file(self):
  587. exception = OSError()
  588. osutil = RenameFailingOSUtils(exception)
  589. self.worker = GetObjectWorker(
  590. queue=self.queue,
  591. client_factory=self.client_factory,
  592. transfer_monitor=self.transfer_monitor,
  593. osutil=osutil,
  594. )
  595. self.add_get_object_job()
  596. self.add_shutdown()
  597. self.add_stubbed_get_object_response()
  598. self.transfer_monitor.notify_expected_jobs_to_complete(
  599. self.transfer_id, 1
  600. )
  601. self.worker.run()
  602. self.assertEqual(
  603. self.transfer_monitor.get_exception(self.transfer_id), exception
  604. )
  605. self.assert_does_not_exist(self.temp_filename)
  606. self.assert_does_not_exist(self.final_filename)
  607. @skip_if_windows('os.kill() with SIGINT not supported on Windows')
  608. def test_worker_cannot_be_killed(self):
  609. self.add_get_object_job()
  610. self.add_shutdown()
  611. self.transfer_monitor.notify_expected_jobs_to_complete(
  612. self.transfer_id, 1
  613. )
  614. def raise_ctrl_c(**kwargs):
  615. os.kill(os.getpid(), signal.SIGINT)
  616. mock_client = mock.Mock()
  617. mock_client.get_object = raise_ctrl_c
  618. self.client_factory.create_client.return_value = mock_client
  619. try:
  620. self.worker.run()
  621. except KeyboardInterrupt:
  622. self.fail(
  623. 'The worker should have not been killed by the '
  624. 'KeyboardInterrupt'
  625. )