test_compat.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import os
  14. import shutil
  15. import signal
  16. import tempfile
  17. from io import BytesIO
  18. from s3transfer.compat import BaseManager, readable, seekable
  19. from tests import skip_if_windows, unittest
  20. class ErrorRaisingSeekWrapper:
  21. """An object wrapper that throws an error when seeked on
  22. :param fileobj: The fileobj that it wraps
  23. :param exception: The exception to raise when seeked on.
  24. """
  25. def __init__(self, fileobj, exception):
  26. self._fileobj = fileobj
  27. self._exception = exception
  28. def seek(self, offset, whence=0):
  29. raise self._exception
  30. def tell(self):
  31. return self._fileobj.tell()
  32. class TestSeekable(unittest.TestCase):
  33. def setUp(self):
  34. self.tempdir = tempfile.mkdtemp()
  35. self.filename = os.path.join(self.tempdir, 'foo')
  36. def tearDown(self):
  37. shutil.rmtree(self.tempdir)
  38. def test_seekable_fileobj(self):
  39. with open(self.filename, 'w') as f:
  40. self.assertTrue(seekable(f))
  41. def test_non_file_like_obj(self):
  42. # Fails because there is no seekable(), seek(), nor tell()
  43. self.assertFalse(seekable(object()))
  44. def test_non_seekable_ioerror(self):
  45. # Should return False if IOError is thrown.
  46. with open(self.filename, 'w') as f:
  47. self.assertFalse(seekable(ErrorRaisingSeekWrapper(f, IOError())))
  48. def test_non_seekable_oserror(self):
  49. # Should return False if OSError is thrown.
  50. with open(self.filename, 'w') as f:
  51. self.assertFalse(seekable(ErrorRaisingSeekWrapper(f, OSError())))
  52. class TestReadable(unittest.TestCase):
  53. def test_readable_fileobj(self):
  54. with tempfile.TemporaryFile() as f:
  55. self.assertTrue(readable(f))
  56. def test_readable_file_like_obj(self):
  57. self.assertTrue(readable(BytesIO()))
  58. def test_non_file_like_obj(self):
  59. self.assertFalse(readable(object()))
  60. class TestBaseManager(unittest.TestCase):
  61. def create_pid_manager(self):
  62. class PIDManager(BaseManager):
  63. pass
  64. PIDManager.register('getpid', os.getpid)
  65. return PIDManager()
  66. def get_pid(self, pid_manager):
  67. pid = pid_manager.getpid()
  68. # A proxy object is returned back. The needed value can be acquired
  69. # from the repr and converting that to an integer
  70. return int(str(pid))
  71. @skip_if_windows('os.kill() with SIGINT not supported on Windows')
  72. def test_can_provide_signal_handler_initializers_to_start(self):
  73. manager = self.create_pid_manager()
  74. manager.start(signal.signal, (signal.SIGINT, signal.SIG_IGN))
  75. pid = self.get_pid(manager)
  76. try:
  77. os.kill(pid, signal.SIGINT)
  78. except KeyboardInterrupt:
  79. pass
  80. # Try using the manager after the os.kill on the parent process. The
  81. # manager should not have died and should still be usable.
  82. self.assertEqual(pid, self.get_pid(manager))