| | import unittest |
| | import asyncio |
| |
|
| | from iterators import AsyncTimeoutIterator |
| |
|
| |
|
| | async def iter_simple(): |
| | yield 1 |
| | yield 2 |
| |
|
| |
|
| | async def iter_with_sleep(): |
| | yield 1 |
| | await asyncio.sleep(0.6) |
| | yield 2 |
| | await asyncio.sleep(0.4) |
| | yield 3 |
| |
|
| |
|
| | async def iter_with_exception(): |
| | yield 1 |
| | yield 2 |
| | raise Exception |
| | yield 3 |
| |
|
| |
|
| | class TestTimeoutIterator(unittest.TestCase): |
| |
|
| | def test_normal_iteration(self): |
| |
|
| | async def _(self): |
| | i = iter_simple() |
| | it = AsyncTimeoutIterator(i) |
| |
|
| | self.assertEqual(await it.__anext__(), 1) |
| | self.assertEqual(await it.__anext__(), 2) |
| |
|
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_normal_iteration_for_loop(self): |
| |
|
| | async def _(self): |
| | i = iter_simple() |
| | it = AsyncTimeoutIterator(i) |
| | iterResults = [] |
| | async for x in it: |
| | iterResults.append(x) |
| | self.assertEqual(iterResults, [1, 2]) |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_timeout_block(self): |
| |
|
| | async def _(self): |
| | i = iter_with_sleep() |
| | it = AsyncTimeoutIterator(i) |
| | self.assertEqual(await it.__anext__(), 1) |
| | self.assertEqual(await it.__anext__(), 2) |
| | self.assertEqual(await it.__anext__(), 3) |
| |
|
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_timeout_block_for_loop(self): |
| |
|
| | async def _(self): |
| | i = iter_with_sleep() |
| | it = AsyncTimeoutIterator(i) |
| | iterResults = [] |
| | async for x in it: |
| | iterResults.append(x) |
| | self.assertEqual(iterResults, [1, 2, 3]) |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_fixed_timeout(self): |
| |
|
| | async def _(self): |
| | i = iter_with_sleep() |
| | it = AsyncTimeoutIterator(i, timeout=0.5) |
| |
|
| | self.assertEqual(await it.__anext__(), 1) |
| | self.assertEqual(await it.__anext__(), it.get_sentinel()) |
| | self.assertEqual(await it.__anext__(), 2) |
| | self.assertEqual(await it.__anext__(), 3) |
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_fixed_timeout(self): |
| |
|
| | async def _(self): |
| | i = iter_with_sleep() |
| | it = AsyncTimeoutIterator(i, timeout=0.5) |
| | iterResults = [] |
| | async for x in it: |
| | iterResults.append(x) |
| | self.assertEqual(iterResults, [1, it.get_sentinel(), 2, 3]) |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_timeout_update(self): |
| | async def _(self): |
| | i = iter_with_sleep() |
| | it = AsyncTimeoutIterator(i, timeout=0.5) |
| |
|
| | self.assertEqual(await it.__anext__(), 1) |
| | self.assertEqual(await it.__anext__(), it.get_sentinel()) |
| |
|
| | it.set_timeout(0.3) |
| | self.assertEqual(await it.__anext__(), 2) |
| | self.assertEqual(await it.__anext__(), it.get_sentinel()) |
| |
|
| | self.assertEqual(await it.__anext__(), 3) |
| |
|
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_custom_sentinel(self): |
| | async def _(self): |
| | i = iter_with_sleep() |
| | it = AsyncTimeoutIterator(i, timeout=0.5, sentinel="END") |
| | self.assertEqual(await it.__anext__(), 1) |
| | self.assertEqual(await it.__anext__(), "END") |
| |
|
| | self.assertEqual(await it.__anext__(), 2) |
| | self.assertEqual(await it.__anext__(), 3) |
| |
|
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_feature_timeout_reset(self): |
| | async def _(self): |
| | i = iter_with_sleep() |
| | it = AsyncTimeoutIterator(i, timeout=0.5, reset_on_next=True) |
| |
|
| | self.assertEqual(await it.__anext__(), 1) |
| | self.assertEqual(await it.__anext__(), 2) |
| | self.assertEqual(await it.__anext__(), 3) |
| |
|
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_function_set_reset_on_next(self): |
| | async def _(self): |
| | i = iter_with_sleep() |
| | it = AsyncTimeoutIterator(i, timeout=0.35, reset_on_next=False) |
| |
|
| | self.assertEqual(await it.__anext__(), 1) |
| | self.assertEqual(await it.__anext__(), it.get_sentinel()) |
| | it.set_reset_on_next(True) |
| | self.assertEqual(await it.__anext__(), 2) |
| | self.assertEqual(await it.__anext__(), 3) |
| |
|
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_iterator_raises_exception(self): |
| | async def _(self): |
| | i = iter_with_exception() |
| | it = AsyncTimeoutIterator(i, timeout=0.5, sentinel="END") |
| | self.assertEqual(await it.__anext__(), 1) |
| | self.assertEqual(await it.__anext__(), 2) |
| |
|
| | with self.assertRaises(Exception): |
| | await it.__anext__() |
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|
| | def test_interrupt_thread(self): |
| | async def _(self): |
| | i = iter_with_sleep() |
| | it = AsyncTimeoutIterator(i, timeout=0.5, sentinel="END") |
| | self.assertEqual(await it.__anext__(), 1) |
| | self.assertEqual(await it.__anext__(), it.get_sentinel()) |
| | it.interrupt() |
| | self.assertEqual(await it.__anext__(), 2) |
| |
|
| | with self.assertRaises(StopAsyncIteration): |
| | await it.__anext__() |
| |
|
| | asyncio.get_event_loop().run_until_complete(_(self)) |
| |
|