| import queue |
| import asyncio |
|
|
|
|
| class IteratorPipe: |
| """ |
| Iterator Pipe creates an iterator that can be fed in data from another block of code or thread of execution |
| """ |
|
|
| def __init__(self, sentinel=object()): |
| self._q = queue.Queue() |
| self._sentinel = sentinel |
| self._sentinel_pushed = False |
| self._closed = False |
|
|
| def __iter__(self): |
| return self |
|
|
| def __next__(self): |
| if self._closed: |
| raise StopIteration |
|
|
| data = self._q.get(block=True) |
| if data is self._sentinel: |
| self._closed = True |
| raise StopIteration |
|
|
| return data |
|
|
| def put(self, data) -> bool: |
| """ |
| Pushes next item to Iterator and returns True |
| If iterator has been closed via close(), doesn't push anything and returns False |
| """ |
| if self._sentinel_pushed: |
| return False |
|
|
| self._q.put(data) |
| return True |
|
|
| def close(self): |
| """ |
| Close is idempotent. Calling close multiple times is safe |
| Iterator will raise StopIteration only after all elements pushed before close have been iterated |
| """ |
| |
| if not self._sentinel_pushed: |
| self._sentinel_pushed = True |
| self._q.put(self._sentinel) |
|
|
|
|
| class AsyncIteratorPipe: |
|
|
| def __init__(self, sentinel=object()): |
| self._q = asyncio.Queue() |
| self._sentinel = sentinel |
| self._sentinel_pushed = False |
| self._closed = False |
|
|
| def __aiter__(self): |
| return self |
|
|
| async def __anext__(self): |
| if self._closed: |
| raise StopAsyncIteration |
|
|
| data = await self._q.get() |
| if data is self._sentinel: |
| self._closed = True |
| raise StopAsyncIteration |
|
|
| return data |
|
|
| async def put(self, data) -> bool: |
| """ |
| Pushes next item to Iterator and returns True |
| If iterator has been closed via close(), doesn't push anything and returns False |
| """ |
| if self._sentinel_pushed: |
| return False |
|
|
| await self._q.put(data) |
| return True |
|
|
| async def close(self): |
| """ |
| Close is idempotent. Calling close multiple times is safe |
| Iterator will raise StopIteration only after all elements pushed before close have been iterated |
| """ |
| |
| if not self._sentinel_pushed: |
| self._sentinel_pushed = True |
| await self._q.put(self._sentinel) |
|
|