1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189 |
- # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"). You
- # may not use this file except in compliance with the License. A copy of
- # the License is located at
- #
- # http://aws.amazon.com/apache2.0/
- #
- # or in the "license" file accompanying this file. This file is
- # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
- # ANY KIND, either express or implied. See the License for the specific
- # language governing permissions and limitations under the License.
- import io
- import os.path
- import random
- import re
- import shutil
- import tempfile
- import threading
- import time
- from io import BytesIO, StringIO
- from s3transfer.futures import TransferFuture, TransferMeta
- from s3transfer.utils import (
- MAX_PARTS,
- MAX_SINGLE_UPLOAD_SIZE,
- MIN_UPLOAD_CHUNKSIZE,
- CallArgs,
- ChunksizeAdjuster,
- CountCallbackInvoker,
- DeferredOpenFile,
- FunctionContainer,
- NoResourcesAvailable,
- OSUtils,
- ReadFileChunk,
- SlidingWindowSemaphore,
- StreamReaderProgress,
- TaskSemaphore,
- calculate_num_parts,
- calculate_range_parameter,
- get_callbacks,
- get_filtered_dict,
- invoke_progress_callbacks,
- random_file_extension,
- )
- from tests import NonSeekableWriter, RecordingSubscriber, mock, unittest
- class TestGetCallbacks(unittest.TestCase):
- def setUp(self):
- self.subscriber = RecordingSubscriber()
- self.second_subscriber = RecordingSubscriber()
- self.call_args = CallArgs(
- subscribers=[self.subscriber, self.second_subscriber]
- )
- self.transfer_meta = TransferMeta(self.call_args)
- self.transfer_future = TransferFuture(self.transfer_meta)
- def test_get_callbacks(self):
- callbacks = get_callbacks(self.transfer_future, 'queued')
- # Make sure two callbacks were added as both subscribers had
- # an on_queued method.
- self.assertEqual(len(callbacks), 2)
- # Ensure that the callback was injected with the future by calling
- # one of them and checking that the future was used in the call.
- callbacks[0]()
- self.assertEqual(
- self.subscriber.on_queued_calls, [{'future': self.transfer_future}]
- )
- def test_get_callbacks_for_missing_type(self):
- callbacks = get_callbacks(self.transfer_future, 'fake_state')
- # There should be no callbacks as the subscribers will not have the
- # on_fake_state method
- self.assertEqual(len(callbacks), 0)
- class TestGetFilteredDict(unittest.TestCase):
- def test_get_filtered_dict(self):
- original = {'Include': 'IncludeValue', 'NotInlude': 'NotIncludeValue'}
- whitelist = ['Include']
- self.assertEqual(
- get_filtered_dict(original, whitelist), {'Include': 'IncludeValue'}
- )
- class TestCallArgs(unittest.TestCase):
- def test_call_args(self):
- call_args = CallArgs(foo='bar', biz='baz')
- self.assertEqual(call_args.foo, 'bar')
- self.assertEqual(call_args.biz, 'baz')
- class TestFunctionContainer(unittest.TestCase):
- def get_args_kwargs(self, *args, **kwargs):
- return args, kwargs
- def test_call(self):
- func_container = FunctionContainer(
- self.get_args_kwargs, 'foo', bar='baz'
- )
- self.assertEqual(func_container(), (('foo',), {'bar': 'baz'}))
- def test_repr(self):
- func_container = FunctionContainer(
- self.get_args_kwargs, 'foo', bar='baz'
- )
- self.assertEqual(
- str(func_container),
- 'Function: {} with args {} and kwargs {}'.format(
- self.get_args_kwargs, ('foo',), {'bar': 'baz'}
- ),
- )
- class TestCountCallbackInvoker(unittest.TestCase):
- def invoke_callback(self):
- self.ref_results.append('callback invoked')
- def assert_callback_invoked(self):
- self.assertEqual(self.ref_results, ['callback invoked'])
- def assert_callback_not_invoked(self):
- self.assertEqual(self.ref_results, [])
- def setUp(self):
- self.ref_results = []
- self.invoker = CountCallbackInvoker(self.invoke_callback)
- def test_increment(self):
- self.invoker.increment()
- self.assertEqual(self.invoker.current_count, 1)
- def test_decrement(self):
- self.invoker.increment()
- self.invoker.increment()
- self.invoker.decrement()
- self.assertEqual(self.invoker.current_count, 1)
- def test_count_cannot_go_below_zero(self):
- with self.assertRaises(RuntimeError):
- self.invoker.decrement()
- def test_callback_invoked_only_once_finalized(self):
- self.invoker.increment()
- self.invoker.decrement()
- self.assert_callback_not_invoked()
- self.invoker.finalize()
- # Callback should only be invoked once finalized
- self.assert_callback_invoked()
- def test_callback_invoked_after_finalizing_and_count_reaching_zero(self):
- self.invoker.increment()
- self.invoker.finalize()
- # Make sure that it does not get invoked immediately after
- # finalizing as the count is currently one
- self.assert_callback_not_invoked()
- self.invoker.decrement()
- self.assert_callback_invoked()
- def test_cannot_increment_after_finalization(self):
- self.invoker.finalize()
- with self.assertRaises(RuntimeError):
- self.invoker.increment()
- class TestRandomFileExtension(unittest.TestCase):
- def test_has_proper_length(self):
- self.assertEqual(len(random_file_extension(num_digits=4)), 4)
- class TestInvokeProgressCallbacks(unittest.TestCase):
- def test_invoke_progress_callbacks(self):
- recording_subscriber = RecordingSubscriber()
- invoke_progress_callbacks([recording_subscriber.on_progress], 2)
- self.assertEqual(recording_subscriber.calculate_bytes_seen(), 2)
- def test_invoke_progress_callbacks_with_no_progress(self):
- recording_subscriber = RecordingSubscriber()
- invoke_progress_callbacks([recording_subscriber.on_progress], 0)
- self.assertEqual(len(recording_subscriber.on_progress_calls), 0)
- class TestCalculateNumParts(unittest.TestCase):
- def test_calculate_num_parts_divisible(self):
- self.assertEqual(calculate_num_parts(size=4, part_size=2), 2)
- def test_calculate_num_parts_not_divisible(self):
- self.assertEqual(calculate_num_parts(size=3, part_size=2), 2)
- class TestCalculateRangeParameter(unittest.TestCase):
- def setUp(self):
- self.part_size = 5
- self.part_index = 1
- self.num_parts = 3
- def test_calculate_range_paramter(self):
- range_val = calculate_range_parameter(
- self.part_size, self.part_index, self.num_parts
- )
- self.assertEqual(range_val, 'bytes=5-9')
- def test_last_part_with_no_total_size(self):
- range_val = calculate_range_parameter(
- self.part_size, self.part_index, num_parts=2
- )
- self.assertEqual(range_val, 'bytes=5-')
- def test_last_part_with_total_size(self):
- range_val = calculate_range_parameter(
- self.part_size, self.part_index, num_parts=2, total_size=8
- )
- self.assertEqual(range_val, 'bytes=5-7')
- class BaseUtilsTest(unittest.TestCase):
- def setUp(self):
- self.tempdir = tempfile.mkdtemp()
- self.filename = os.path.join(self.tempdir, 'foo')
- self.content = b'abc'
- with open(self.filename, 'wb') as f:
- f.write(self.content)
- self.amounts_seen = []
- self.num_close_callback_calls = 0
- def tearDown(self):
- shutil.rmtree(self.tempdir)
- def callback(self, bytes_transferred):
- self.amounts_seen.append(bytes_transferred)
- def close_callback(self):
- self.num_close_callback_calls += 1
- class TestOSUtils(BaseUtilsTest):
- def test_get_file_size(self):
- self.assertEqual(
- OSUtils().get_file_size(self.filename), len(self.content)
- )
- def test_open_file_chunk_reader(self):
- reader = OSUtils().open_file_chunk_reader(
- self.filename, 0, 3, [self.callback]
- )
- # The returned reader should be a ReadFileChunk.
- self.assertIsInstance(reader, ReadFileChunk)
- # The content of the reader should be correct.
- self.assertEqual(reader.read(), self.content)
- # Callbacks should be disabled depspite being passed in.
- self.assertEqual(self.amounts_seen, [])
- def test_open_file_chunk_reader_from_fileobj(self):
- with open(self.filename, 'rb') as f:
- reader = OSUtils().open_file_chunk_reader_from_fileobj(
- f, len(self.content), len(self.content), [self.callback]
- )
- # The returned reader should be a ReadFileChunk.
- self.assertIsInstance(reader, ReadFileChunk)
- # The content of the reader should be correct.
- self.assertEqual(reader.read(), self.content)
- reader.close()
- # Callbacks should be disabled depspite being passed in.
- self.assertEqual(self.amounts_seen, [])
- self.assertEqual(self.num_close_callback_calls, 0)
- def test_open_file(self):
- fileobj = OSUtils().open(os.path.join(self.tempdir, 'foo'), 'w')
- self.assertTrue(hasattr(fileobj, 'write'))
- def test_remove_file_ignores_errors(self):
- non_existent_file = os.path.join(self.tempdir, 'no-exist')
- # This should not exist to start.
- self.assertFalse(os.path.exists(non_existent_file))
- try:
- OSUtils().remove_file(non_existent_file)
- except OSError as e:
- self.fail('OSError should have been caught: %s' % e)
- def test_remove_file_proxies_remove_file(self):
- OSUtils().remove_file(self.filename)
- self.assertFalse(os.path.exists(self.filename))
- def test_rename_file(self):
- new_filename = os.path.join(self.tempdir, 'newfoo')
- OSUtils().rename_file(self.filename, new_filename)
- self.assertFalse(os.path.exists(self.filename))
- self.assertTrue(os.path.exists(new_filename))
- def test_is_special_file_for_normal_file(self):
- self.assertFalse(OSUtils().is_special_file(self.filename))
- def test_is_special_file_for_non_existant_file(self):
- non_existant_filename = os.path.join(self.tempdir, 'no-exist')
- self.assertFalse(os.path.exists(non_existant_filename))
- self.assertFalse(OSUtils().is_special_file(non_existant_filename))
- def test_get_temp_filename(self):
- filename = 'myfile'
- self.assertIsNotNone(
- re.match(
- r'%s\.[0-9A-Fa-f]{8}$' % filename,
- OSUtils().get_temp_filename(filename),
- )
- )
- def test_get_temp_filename_len_255(self):
- filename = 'a' * 255
- temp_filename = OSUtils().get_temp_filename(filename)
- self.assertLessEqual(len(temp_filename), 255)
- def test_get_temp_filename_len_gt_255(self):
- filename = 'a' * 280
- temp_filename = OSUtils().get_temp_filename(filename)
- self.assertLessEqual(len(temp_filename), 255)
- def test_allocate(self):
- truncate_size = 1
- OSUtils().allocate(self.filename, truncate_size)
- with open(self.filename, 'rb') as f:
- self.assertEqual(len(f.read()), truncate_size)
- @mock.patch('s3transfer.utils.fallocate')
- def test_allocate_with_io_error(self, mock_fallocate):
- mock_fallocate.side_effect = IOError()
- with self.assertRaises(IOError):
- OSUtils().allocate(self.filename, 1)
- self.assertFalse(os.path.exists(self.filename))
- @mock.patch('s3transfer.utils.fallocate')
- def test_allocate_with_os_error(self, mock_fallocate):
- mock_fallocate.side_effect = OSError()
- with self.assertRaises(OSError):
- OSUtils().allocate(self.filename, 1)
- self.assertFalse(os.path.exists(self.filename))
- class TestDeferredOpenFile(BaseUtilsTest):
- def setUp(self):
- super().setUp()
- self.filename = os.path.join(self.tempdir, 'foo')
- self.contents = b'my contents'
- with open(self.filename, 'wb') as f:
- f.write(self.contents)
- self.deferred_open_file = DeferredOpenFile(
- self.filename, open_function=self.recording_open_function
- )
- self.open_call_args = []
- def tearDown(self):
- self.deferred_open_file.close()
- super().tearDown()
- def recording_open_function(self, filename, mode):
- self.open_call_args.append((filename, mode))
- return open(filename, mode)
- def open_nonseekable(self, filename, mode):
- self.open_call_args.append((filename, mode))
- return NonSeekableWriter(BytesIO(self.content))
- def test_instantiation_does_not_open_file(self):
- DeferredOpenFile(
- self.filename, open_function=self.recording_open_function
- )
- self.assertEqual(len(self.open_call_args), 0)
- def test_name(self):
- self.assertEqual(self.deferred_open_file.name, self.filename)
- def test_read(self):
- content = self.deferred_open_file.read(2)
- self.assertEqual(content, self.contents[0:2])
- content = self.deferred_open_file.read(2)
- self.assertEqual(content, self.contents[2:4])
- self.assertEqual(len(self.open_call_args), 1)
- def test_write(self):
- self.deferred_open_file = DeferredOpenFile(
- self.filename,
- mode='wb',
- open_function=self.recording_open_function,
- )
- write_content = b'foo'
- self.deferred_open_file.write(write_content)
- self.deferred_open_file.write(write_content)
- self.deferred_open_file.close()
- # Both of the writes should now be in the file.
- with open(self.filename, 'rb') as f:
- self.assertEqual(f.read(), write_content * 2)
- # Open should have only been called once.
- self.assertEqual(len(self.open_call_args), 1)
- def test_seek(self):
- self.deferred_open_file.seek(2)
- content = self.deferred_open_file.read(2)
- self.assertEqual(content, self.contents[2:4])
- self.assertEqual(len(self.open_call_args), 1)
- def test_open_does_not_seek_with_zero_start_byte(self):
- self.deferred_open_file = DeferredOpenFile(
- self.filename,
- mode='wb',
- start_byte=0,
- open_function=self.open_nonseekable,
- )
- try:
- # If this seeks, an UnsupportedOperation error will be raised.
- self.deferred_open_file.write(b'data')
- except io.UnsupportedOperation:
- self.fail('DeferredOpenFile seeked upon opening')
- def test_open_seeks_with_nonzero_start_byte(self):
- self.deferred_open_file = DeferredOpenFile(
- self.filename,
- mode='wb',
- start_byte=5,
- open_function=self.open_nonseekable,
- )
- # Since a non-seekable file is being opened, calling Seek will raise
- # an UnsupportedOperation error.
- with self.assertRaises(io.UnsupportedOperation):
- self.deferred_open_file.write(b'data')
- def test_tell(self):
- self.deferred_open_file.tell()
- # tell() should not have opened the file if it has not been seeked
- # or read because we know the start bytes upfront.
- self.assertEqual(len(self.open_call_args), 0)
- self.deferred_open_file.seek(2)
- self.assertEqual(self.deferred_open_file.tell(), 2)
- self.assertEqual(len(self.open_call_args), 1)
- def test_open_args(self):
- self.deferred_open_file = DeferredOpenFile(
- self.filename,
- mode='ab+',
- open_function=self.recording_open_function,
- )
- # Force an open
- self.deferred_open_file.write(b'data')
- self.assertEqual(len(self.open_call_args), 1)
- self.assertEqual(self.open_call_args[0], (self.filename, 'ab+'))
- def test_context_handler(self):
- with self.deferred_open_file:
- self.assertEqual(len(self.open_call_args), 1)
- class TestReadFileChunk(BaseUtilsTest):
- def test_read_entire_chunk(self):
- filename = os.path.join(self.tempdir, 'foo')
- with open(filename, 'wb') as f:
- f.write(b'onetwothreefourfivesixseveneightnineten')
- chunk = ReadFileChunk.from_filename(
- filename, start_byte=0, chunk_size=3
- )
- self.assertEqual(chunk.read(), b'one')
- self.assertEqual(chunk.read(), b'')
- def test_read_with_amount_size(self):
- filename = os.path.join(self.tempdir, 'foo')
- with open(filename, 'wb') as f:
- f.write(b'onetwothreefourfivesixseveneightnineten')
- chunk = ReadFileChunk.from_filename(
- filename, start_byte=11, chunk_size=4
- )
- self.assertEqual(chunk.read(1), b'f')
- self.assertEqual(chunk.read(1), b'o')
- self.assertEqual(chunk.read(1), b'u')
- self.assertEqual(chunk.read(1), b'r')
- self.assertEqual(chunk.read(1), b'')
- def test_reset_stream_emulation(self):
- filename = os.path.join(self.tempdir, 'foo')
- with open(filename, 'wb') as f:
- f.write(b'onetwothreefourfivesixseveneightnineten')
- chunk = ReadFileChunk.from_filename(
- filename, start_byte=11, chunk_size=4
- )
- self.assertEqual(chunk.read(), b'four')
- chunk.seek(0)
- self.assertEqual(chunk.read(), b'four')
- def test_read_past_end_of_file(self):
- filename = os.path.join(self.tempdir, 'foo')
- with open(filename, 'wb') as f:
- f.write(b'onetwothreefourfivesixseveneightnineten')
- chunk = ReadFileChunk.from_filename(
- filename, start_byte=36, chunk_size=100000
- )
- self.assertEqual(chunk.read(), b'ten')
- self.assertEqual(chunk.read(), b'')
- self.assertEqual(len(chunk), 3)
- def test_tell_and_seek(self):
- filename = os.path.join(self.tempdir, 'foo')
- with open(filename, 'wb') as f:
- f.write(b'onetwothreefourfivesixseveneightnineten')
- chunk = ReadFileChunk.from_filename(
- filename, start_byte=36, chunk_size=100000
- )
- self.assertEqual(chunk.tell(), 0)
- self.assertEqual(chunk.read(), b'ten')
- self.assertEqual(chunk.tell(), 3)
- chunk.seek(0)
- self.assertEqual(chunk.tell(), 0)
- chunk.seek(1, whence=1)
- self.assertEqual(chunk.tell(), 1)
- chunk.seek(-1, whence=1)
- self.assertEqual(chunk.tell(), 0)
- chunk.seek(-1, whence=2)
- self.assertEqual(chunk.tell(), 2)
- def test_tell_and_seek_boundaries(self):
- # Test to ensure ReadFileChunk behaves the same as the
- # Python standard library around seeking and reading out
- # of bounds in a file object.
- data = b'abcdefghij12345678klmnopqrst'
- start_pos = 10
- chunk_size = 8
- # Create test file
- filename = os.path.join(self.tempdir, 'foo')
- with open(filename, 'wb') as f:
- f.write(data)
- # ReadFileChunk should be a substring of only numbers
- file_objects = [
- ReadFileChunk.from_filename(
- filename, start_byte=start_pos, chunk_size=chunk_size
- )
- ]
- # Uncomment next line to validate we match Python's io.BytesIO
- # file_objects.append(io.BytesIO(data[start_pos:start_pos+chunk_size]))
- for obj in file_objects:
- self._assert_whence_start_behavior(obj)
- self._assert_whence_end_behavior(obj)
- self._assert_whence_relative_behavior(obj)
- self._assert_boundary_behavior(obj)
- def _assert_whence_start_behavior(self, file_obj):
- self.assertEqual(file_obj.tell(), 0)
- file_obj.seek(1, 0)
- self.assertEqual(file_obj.tell(), 1)
- file_obj.seek(1)
- self.assertEqual(file_obj.tell(), 1)
- self.assertEqual(file_obj.read(), b'2345678')
- file_obj.seek(3, 0)
- self.assertEqual(file_obj.tell(), 3)
- file_obj.seek(0, 0)
- self.assertEqual(file_obj.tell(), 0)
- def _assert_whence_relative_behavior(self, file_obj):
- self.assertEqual(file_obj.tell(), 0)
- file_obj.seek(2, 1)
- self.assertEqual(file_obj.tell(), 2)
- file_obj.seek(1, 1)
- self.assertEqual(file_obj.tell(), 3)
- self.assertEqual(file_obj.read(), b'45678')
- file_obj.seek(20, 1)
- self.assertEqual(file_obj.tell(), 28)
- file_obj.seek(-30, 1)
- self.assertEqual(file_obj.tell(), 0)
- self.assertEqual(file_obj.read(), b'12345678')
- file_obj.seek(-8, 1)
- self.assertEqual(file_obj.tell(), 0)
- def _assert_whence_end_behavior(self, file_obj):
- self.assertEqual(file_obj.tell(), 0)
- file_obj.seek(-1, 2)
- self.assertEqual(file_obj.tell(), 7)
- file_obj.seek(1, 2)
- self.assertEqual(file_obj.tell(), 9)
- file_obj.seek(3, 2)
- self.assertEqual(file_obj.tell(), 11)
- self.assertEqual(file_obj.read(), b'')
- file_obj.seek(-15, 2)
- self.assertEqual(file_obj.tell(), 0)
- self.assertEqual(file_obj.read(), b'12345678')
- file_obj.seek(-8, 2)
- self.assertEqual(file_obj.tell(), 0)
- def _assert_boundary_behavior(self, file_obj):
- # Verify we're at the start
- self.assertEqual(file_obj.tell(), 0)
- # Verify we can't move backwards beyond start of file
- file_obj.seek(-10, 1)
- self.assertEqual(file_obj.tell(), 0)
- # Verify we *can* move after end of file, but return nothing
- file_obj.seek(10, 2)
- self.assertEqual(file_obj.tell(), 18)
- self.assertEqual(file_obj.read(), b'')
- self.assertEqual(file_obj.read(10), b'')
- # Verify we can partially rewind
- file_obj.seek(-12, 1)
- self.assertEqual(file_obj.tell(), 6)
- self.assertEqual(file_obj.read(), b'78')
- self.assertEqual(file_obj.tell(), 8)
- # Verify we can rewind to start
- file_obj.seek(0)
- self.assertEqual(file_obj.tell(), 0)
- def test_file_chunk_supports_context_manager(self):
- filename = os.path.join(self.tempdir, 'foo')
- with open(filename, 'wb') as f:
- f.write(b'abc')
- with ReadFileChunk.from_filename(
- filename, start_byte=0, chunk_size=2
- ) as chunk:
- val = chunk.read()
- self.assertEqual(val, b'ab')
- def test_iter_is_always_empty(self):
- # This tests the workaround for the httplib bug (see
- # the source for more info).
- filename = os.path.join(self.tempdir, 'foo')
- open(filename, 'wb').close()
- chunk = ReadFileChunk.from_filename(
- filename, start_byte=0, chunk_size=10
- )
- self.assertEqual(list(chunk), [])
- def test_callback_is_invoked_on_read(self):
- chunk = ReadFileChunk.from_filename(
- self.filename,
- start_byte=0,
- chunk_size=3,
- callbacks=[self.callback],
- )
- chunk.read(1)
- chunk.read(1)
- chunk.read(1)
- self.assertEqual(self.amounts_seen, [1, 1, 1])
- def test_all_callbacks_invoked_on_read(self):
- chunk = ReadFileChunk.from_filename(
- self.filename,
- start_byte=0,
- chunk_size=3,
- callbacks=[self.callback, self.callback],
- )
- chunk.read(1)
- chunk.read(1)
- chunk.read(1)
- # The list should be twice as long because there are two callbacks
- # recording the amount read.
- self.assertEqual(self.amounts_seen, [1, 1, 1, 1, 1, 1])
- def test_callback_can_be_disabled(self):
- chunk = ReadFileChunk.from_filename(
- self.filename,
- start_byte=0,
- chunk_size=3,
- callbacks=[self.callback],
- )
- chunk.disable_callback()
- # Now reading from the ReadFileChunk should not invoke
- # the callback.
- chunk.read()
- self.assertEqual(self.amounts_seen, [])
- def test_callback_will_also_be_triggered_by_seek(self):
- chunk = ReadFileChunk.from_filename(
- self.filename,
- start_byte=0,
- chunk_size=3,
- callbacks=[self.callback],
- )
- chunk.read(2)
- chunk.seek(0)
- chunk.read(2)
- chunk.seek(1)
- chunk.read(2)
- self.assertEqual(self.amounts_seen, [2, -2, 2, -1, 2])
- def test_callback_triggered_by_out_of_bound_seeks(self):
- data = b'abcdefghij1234567890klmnopqr'
- # Create test file
- filename = os.path.join(self.tempdir, 'foo')
- with open(filename, 'wb') as f:
- f.write(data)
- chunk = ReadFileChunk.from_filename(
- filename, start_byte=10, chunk_size=10, callbacks=[self.callback]
- )
- # Seek calls that generate "0" progress are skipped by
- # invoke_progress_callbacks and won't appear in the list.
- expected_callback_prog = [10, -5, 5, -1, 1, -1, 1, -5, 5, -10]
- self._assert_out_of_bound_start_seek(chunk, expected_callback_prog)
- self._assert_out_of_bound_relative_seek(chunk, expected_callback_prog)
- self._assert_out_of_bound_end_seek(chunk, expected_callback_prog)
- def _assert_out_of_bound_start_seek(self, chunk, expected):
- # clear amounts_seen
- self.amounts_seen = []
- self.assertEqual(self.amounts_seen, [])
- # (position, change)
- chunk.seek(20) # (20, 10)
- chunk.seek(5) # (5, -5)
- chunk.seek(20) # (20, 5)
- chunk.seek(9) # (9, -1)
- chunk.seek(20) # (20, 1)
- chunk.seek(11) # (11, 0)
- chunk.seek(20) # (20, 0)
- chunk.seek(9) # (9, -1)
- chunk.seek(20) # (20, 1)
- chunk.seek(5) # (5, -5)
- chunk.seek(20) # (20, 5)
- chunk.seek(0) # (0, -10)
- chunk.seek(0) # (0, 0)
- self.assertEqual(self.amounts_seen, expected)
- def _assert_out_of_bound_relative_seek(self, chunk, expected):
- # clear amounts_seen
- self.amounts_seen = []
- self.assertEqual(self.amounts_seen, [])
- # (position, change)
- chunk.seek(20, 1) # (20, 10)
- chunk.seek(-15, 1) # (5, -5)
- chunk.seek(15, 1) # (20, 5)
- chunk.seek(-11, 1) # (9, -1)
- chunk.seek(11, 1) # (20, 1)
- chunk.seek(-9, 1) # (11, 0)
- chunk.seek(9, 1) # (20, 0)
- chunk.seek(-11, 1) # (9, -1)
- chunk.seek(11, 1) # (20, 1)
- chunk.seek(-15, 1) # (5, -5)
- chunk.seek(15, 1) # (20, 5)
- chunk.seek(-20, 1) # (0, -10)
- chunk.seek(-1000, 1) # (0, 0)
- self.assertEqual(self.amounts_seen, expected)
- def _assert_out_of_bound_end_seek(self, chunk, expected):
- # clear amounts_seen
- self.amounts_seen = []
- self.assertEqual(self.amounts_seen, [])
- # (position, change)
- chunk.seek(10, 2) # (20, 10)
- chunk.seek(-5, 2) # (5, -5)
- chunk.seek(10, 2) # (20, 5)
- chunk.seek(-1, 2) # (9, -1)
- chunk.seek(10, 2) # (20, 1)
- chunk.seek(1, 2) # (11, 0)
- chunk.seek(10, 2) # (20, 0)
- chunk.seek(-1, 2) # (9, -1)
- chunk.seek(10, 2) # (20, 1)
- chunk.seek(-5, 2) # (5, -5)
- chunk.seek(10, 2) # (20, 5)
- chunk.seek(-10, 2) # (0, -10)
- chunk.seek(-1000, 2) # (0, 0)
- self.assertEqual(self.amounts_seen, expected)
- def test_close_callbacks(self):
- with open(self.filename) as f:
- chunk = ReadFileChunk(
- f,
- chunk_size=1,
- full_file_size=3,
- close_callbacks=[self.close_callback],
- )
- chunk.close()
- self.assertEqual(self.num_close_callback_calls, 1)
- def test_close_callbacks_when_not_enabled(self):
- with open(self.filename) as f:
- chunk = ReadFileChunk(
- f,
- chunk_size=1,
- full_file_size=3,
- enable_callbacks=False,
- close_callbacks=[self.close_callback],
- )
- chunk.close()
- self.assertEqual(self.num_close_callback_calls, 0)
- def test_close_callbacks_when_context_handler_is_used(self):
- with open(self.filename) as f:
- with ReadFileChunk(
- f,
- chunk_size=1,
- full_file_size=3,
- close_callbacks=[self.close_callback],
- ) as chunk:
- chunk.read(1)
- self.assertEqual(self.num_close_callback_calls, 1)
- def test_signal_transferring(self):
- chunk = ReadFileChunk.from_filename(
- self.filename,
- start_byte=0,
- chunk_size=3,
- callbacks=[self.callback],
- )
- chunk.signal_not_transferring()
- chunk.read(1)
- self.assertEqual(self.amounts_seen, [])
- chunk.signal_transferring()
- chunk.read(1)
- self.assertEqual(self.amounts_seen, [1])
- def test_signal_transferring_to_underlying_fileobj(self):
- underlying_stream = mock.Mock()
- underlying_stream.tell.return_value = 0
- chunk = ReadFileChunk(underlying_stream, 3, 3)
- chunk.signal_transferring()
- self.assertTrue(underlying_stream.signal_transferring.called)
- def test_no_call_signal_transferring_to_underlying_fileobj(self):
- underlying_stream = mock.Mock(io.RawIOBase)
- underlying_stream.tell.return_value = 0
- chunk = ReadFileChunk(underlying_stream, 3, 3)
- try:
- chunk.signal_transferring()
- except AttributeError:
- self.fail(
- 'The stream should not have tried to call signal_transferring '
- 'to the underlying stream.'
- )
- def test_signal_not_transferring_to_underlying_fileobj(self):
- underlying_stream = mock.Mock()
- underlying_stream.tell.return_value = 0
- chunk = ReadFileChunk(underlying_stream, 3, 3)
- chunk.signal_not_transferring()
- self.assertTrue(underlying_stream.signal_not_transferring.called)
- def test_no_call_signal_not_transferring_to_underlying_fileobj(self):
- underlying_stream = mock.Mock(io.RawIOBase)
- underlying_stream.tell.return_value = 0
- chunk = ReadFileChunk(underlying_stream, 3, 3)
- try:
- chunk.signal_not_transferring()
- except AttributeError:
- self.fail(
- 'The stream should not have tried to call '
- 'signal_not_transferring to the underlying stream.'
- )
- class TestStreamReaderProgress(BaseUtilsTest):
- def test_proxies_to_wrapped_stream(self):
- original_stream = StringIO('foobarbaz')
- wrapped = StreamReaderProgress(original_stream)
- self.assertEqual(wrapped.read(), 'foobarbaz')
- def test_callback_invoked(self):
- original_stream = StringIO('foobarbaz')
- wrapped = StreamReaderProgress(
- original_stream, [self.callback, self.callback]
- )
- self.assertEqual(wrapped.read(), 'foobarbaz')
- self.assertEqual(self.amounts_seen, [9, 9])
- class TestTaskSemaphore(unittest.TestCase):
- def setUp(self):
- self.semaphore = TaskSemaphore(1)
- def test_should_block_at_max_capacity(self):
- self.semaphore.acquire('a', blocking=False)
- with self.assertRaises(NoResourcesAvailable):
- self.semaphore.acquire('a', blocking=False)
- def test_release_capacity(self):
- acquire_token = self.semaphore.acquire('a', blocking=False)
- self.semaphore.release('a', acquire_token)
- try:
- self.semaphore.acquire('a', blocking=False)
- except NoResourcesAvailable:
- self.fail(
- 'The release of the semaphore should have allowed for '
- 'the second acquire to not be blocked'
- )
- class TestSlidingWindowSemaphore(unittest.TestCase):
- # These tests use block=False to tests will fail
- # instead of hang the test runner in the case of x
- # incorrect behavior.
- def test_acquire_release_basic_case(self):
- sem = SlidingWindowSemaphore(1)
- # Count is 1
- num = sem.acquire('a', blocking=False)
- self.assertEqual(num, 0)
- sem.release('a', 0)
- # Count now back to 1.
- def test_can_acquire_release_multiple_times(self):
- sem = SlidingWindowSemaphore(1)
- num = sem.acquire('a', blocking=False)
- self.assertEqual(num, 0)
- sem.release('a', num)
- num = sem.acquire('a', blocking=False)
- self.assertEqual(num, 1)
- sem.release('a', num)
- def test_can_acquire_a_range(self):
- sem = SlidingWindowSemaphore(3)
- self.assertEqual(sem.acquire('a', blocking=False), 0)
- self.assertEqual(sem.acquire('a', blocking=False), 1)
- self.assertEqual(sem.acquire('a', blocking=False), 2)
- sem.release('a', 0)
- sem.release('a', 1)
- sem.release('a', 2)
- # Now we're reset so we should be able to acquire the same
- # sequence again.
- self.assertEqual(sem.acquire('a', blocking=False), 3)
- self.assertEqual(sem.acquire('a', blocking=False), 4)
- self.assertEqual(sem.acquire('a', blocking=False), 5)
- self.assertEqual(sem.current_count(), 0)
- def test_counter_release_only_on_min_element(self):
- sem = SlidingWindowSemaphore(3)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- # The count only increases when we free the min
- # element. This means if we're currently failing to
- # acquire now:
- with self.assertRaises(NoResourcesAvailable):
- sem.acquire('a', blocking=False)
- # Then freeing a non-min element:
- sem.release('a', 1)
- # doesn't change anything. We still fail to acquire.
- with self.assertRaises(NoResourcesAvailable):
- sem.acquire('a', blocking=False)
- self.assertEqual(sem.current_count(), 0)
- def test_raises_error_when_count_is_zero(self):
- sem = SlidingWindowSemaphore(3)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- # Count is now 0 so trying to acquire should fail.
- with self.assertRaises(NoResourcesAvailable):
- sem.acquire('a', blocking=False)
- def test_release_counters_can_increment_counter_repeatedly(self):
- sem = SlidingWindowSemaphore(3)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- # These two releases don't increment the counter
- # because we're waiting on 0.
- sem.release('a', 1)
- sem.release('a', 2)
- self.assertEqual(sem.current_count(), 0)
- # But as soon as we release 0, we free up 0, 1, and 2.
- sem.release('a', 0)
- self.assertEqual(sem.current_count(), 3)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- def test_error_to_release_unknown_tag(self):
- sem = SlidingWindowSemaphore(3)
- with self.assertRaises(ValueError):
- sem.release('a', 0)
- def test_can_track_multiple_tags(self):
- sem = SlidingWindowSemaphore(3)
- self.assertEqual(sem.acquire('a', blocking=False), 0)
- self.assertEqual(sem.acquire('b', blocking=False), 0)
- self.assertEqual(sem.acquire('a', blocking=False), 1)
- # We're at our max of 3 even though 2 are for A and 1 is for B.
- with self.assertRaises(NoResourcesAvailable):
- sem.acquire('a', blocking=False)
- with self.assertRaises(NoResourcesAvailable):
- sem.acquire('b', blocking=False)
- def test_can_handle_multiple_tags_released(self):
- sem = SlidingWindowSemaphore(4)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- sem.acquire('b', blocking=False)
- sem.acquire('b', blocking=False)
- sem.release('b', 1)
- sem.release('a', 1)
- self.assertEqual(sem.current_count(), 0)
- sem.release('b', 0)
- self.assertEqual(sem.acquire('a', blocking=False), 2)
- sem.release('a', 0)
- self.assertEqual(sem.acquire('b', blocking=False), 2)
- def test_is_error_to_release_unknown_sequence_number(self):
- sem = SlidingWindowSemaphore(3)
- sem.acquire('a', blocking=False)
- with self.assertRaises(ValueError):
- sem.release('a', 1)
- def test_is_error_to_double_release(self):
- # This is different than other error tests because
- # we're verifying we can reset the state after an
- # acquire/release cycle.
- sem = SlidingWindowSemaphore(2)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- sem.release('a', 0)
- sem.release('a', 1)
- self.assertEqual(sem.current_count(), 2)
- with self.assertRaises(ValueError):
- sem.release('a', 0)
- def test_can_check_in_partial_range(self):
- sem = SlidingWindowSemaphore(4)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- sem.release('a', 1)
- sem.release('a', 3)
- sem.release('a', 0)
- self.assertEqual(sem.current_count(), 2)
- class TestThreadingPropertiesForSlidingWindowSemaphore(unittest.TestCase):
- # These tests focus on mutithreaded properties of the range
- # semaphore. Basic functionality is tested in TestSlidingWindowSemaphore.
- def setUp(self):
- self.threads = []
- def tearDown(self):
- self.join_threads()
- def join_threads(self):
- for thread in self.threads:
- thread.join()
- self.threads = []
- def start_threads(self):
- for thread in self.threads:
- thread.start()
- def test_acquire_blocks_until_release_is_called(self):
- sem = SlidingWindowSemaphore(2)
- sem.acquire('a', blocking=False)
- sem.acquire('a', blocking=False)
- def acquire():
- # This next call to acquire will block.
- self.assertEqual(sem.acquire('a', blocking=True), 2)
- t = threading.Thread(target=acquire)
- self.threads.append(t)
- # Starting the thread will block the sem.acquire()
- # in the acquire function above.
- t.start()
- # This still will keep the thread blocked.
- sem.release('a', 1)
- # Releasing the min element will unblock the thread.
- sem.release('a', 0)
- t.join()
- sem.release('a', 2)
- def test_stress_invariants_random_order(self):
- sem = SlidingWindowSemaphore(100)
- for _ in range(10):
- recorded = []
- for _ in range(100):
- recorded.append(sem.acquire('a', blocking=False))
- # Release them in randomized order. As long as we
- # eventually free all 100, we should have all the
- # resources released.
- random.shuffle(recorded)
- for i in recorded:
- sem.release('a', i)
- # Everything's freed so should be back at count == 100
- self.assertEqual(sem.current_count(), 100)
- def test_blocking_stress(self):
- sem = SlidingWindowSemaphore(5)
- num_threads = 10
- num_iterations = 50
- def acquire():
- for _ in range(num_iterations):
- num = sem.acquire('a', blocking=True)
- time.sleep(0.001)
- sem.release('a', num)
- for i in range(num_threads):
- t = threading.Thread(target=acquire)
- self.threads.append(t)
- self.start_threads()
- self.join_threads()
- # Should have all the available resources freed.
- self.assertEqual(sem.current_count(), 5)
- # Should have acquired num_threads * num_iterations
- self.assertEqual(
- sem.acquire('a', blocking=False), num_threads * num_iterations
- )
- class TestAdjustChunksize(unittest.TestCase):
- def setUp(self):
- self.adjuster = ChunksizeAdjuster()
- def test_valid_chunksize(self):
- chunksize = 7 * (1024**2)
- file_size = 8 * (1024**2)
- new_size = self.adjuster.adjust_chunksize(chunksize, file_size)
- self.assertEqual(new_size, chunksize)
- def test_chunksize_below_minimum(self):
- chunksize = MIN_UPLOAD_CHUNKSIZE - 1
- file_size = 3 * MIN_UPLOAD_CHUNKSIZE
- new_size = self.adjuster.adjust_chunksize(chunksize, file_size)
- self.assertEqual(new_size, MIN_UPLOAD_CHUNKSIZE)
- def test_chunksize_above_maximum(self):
- chunksize = MAX_SINGLE_UPLOAD_SIZE + 1
- file_size = MAX_SINGLE_UPLOAD_SIZE * 2
- new_size = self.adjuster.adjust_chunksize(chunksize, file_size)
- self.assertEqual(new_size, MAX_SINGLE_UPLOAD_SIZE)
- def test_chunksize_too_small(self):
- chunksize = 7 * (1024**2)
- file_size = 5 * (1024**4)
- # If we try to upload a 5TB file, we'll need to use 896MB part
- # sizes.
- new_size = self.adjuster.adjust_chunksize(chunksize, file_size)
- self.assertEqual(new_size, 896 * (1024**2))
- num_parts = file_size / new_size
- self.assertLessEqual(num_parts, MAX_PARTS)
- def test_unknown_file_size_with_valid_chunksize(self):
- chunksize = 7 * (1024**2)
- new_size = self.adjuster.adjust_chunksize(chunksize)
- self.assertEqual(new_size, chunksize)
- def test_unknown_file_size_below_minimum(self):
- chunksize = MIN_UPLOAD_CHUNKSIZE - 1
- new_size = self.adjuster.adjust_chunksize(chunksize)
- self.assertEqual(new_size, MIN_UPLOAD_CHUNKSIZE)
- def test_unknown_file_size_above_maximum(self):
- chunksize = MAX_SINGLE_UPLOAD_SIZE + 1
- new_size = self.adjuster.adjust_chunksize(chunksize)
- self.assertEqual(new_size, MAX_SINGLE_UPLOAD_SIZE)
|