Source code for cocotb.queue
# Copyright cocotb contributors
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause
import asyncio.queues
import collections
import heapq
from abc import abstractmethod
from typing import (
Deque,
Generic,
List,
Tuple,
TypeVar,
)
import cocotb
from cocotb._utils import pointer_str
from cocotb.task import Task
from cocotb.triggers import Event
__all__ = (
"AbstractQueue",
"LifoQueue",
"PriorityQueue",
"Queue",
"QueueEmpty",
"QueueFull",
)
[docs]
class QueueFull(asyncio.queues.QueueFull):
"""Raised when the :meth:`Queue.put_nowait()` method is called on a full :class:`Queue`."""
[docs]
class QueueEmpty(asyncio.queues.QueueEmpty):
"""Raised when the :meth:`Queue.get_nowait()` method is called on an empty :class:`Queue`."""
T = TypeVar("T")
[docs]
class AbstractQueue(Generic[T]):
"""A queue, useful for coordinating producer and consumer coroutines.
If *maxsize* is less than or equal to 0, the queue size is infinite. If it
is an integer greater than 0, then :meth:`put` will block when the queue
reaches *maxsize*, until an item is removed by :meth:`get`.
"""
def __init__(self, maxsize: int = 0) -> None:
self._maxsize: int = maxsize
self._getters: Deque[Tuple[Event, Task[object]]] = collections.deque()
self._putters: Deque[Tuple[Event, Task[object]]] = collections.deque()
@abstractmethod
def _get(self) -> T:
"""Remove and return the next element from the queue."""
@abstractmethod
def _put(self, item: T) -> None:
"""Place a new element on the queue."""
@abstractmethod
def _size(self) -> int:
"""Return the number of elements in the queue."""
@abstractmethod
def _repr(self) -> str:
"""Return a string representation of the state of the queue."""
def _wakeup_next(self, waiters: Deque[Tuple[Event, Task[object]]]) -> None:
while waiters:
event, task = waiters.popleft()
if not task.done():
event.set()
break
def __repr__(self) -> str:
return f"<{type(self).__name__} {self._format()} at {pointer_str(self)}>"
def __str__(self) -> str:
return f"<{type(self).__name__} {self._format()}>"
def _format(self) -> str:
result = f"maxsize={self._maxsize!r}"
if getattr(self, "_queue", None):
result += f" _queue={self._repr()}"
if self._getters:
result += f" _getters[{len(self._getters)}]"
if self._putters:
result += f" _putters[{len(self._putters)}]"
return result
[docs]
def qsize(self) -> int:
"""Number of items in the queue."""
return self._size()
@property
def maxsize(self) -> int:
"""Number of items allowed in the queue."""
return self._maxsize
[docs]
def empty(self) -> bool:
"""Return ``True`` if the queue is empty, ``False`` otherwise."""
return self._size() == 0
[docs]
def full(self) -> bool:
"""Return ``True`` if there are :meth:`maxsize` items in the queue.
.. note::
If the Queue was initialized with ``maxsize=0`` (the default), then
:meth:`full` is never ``True``.
"""
if self._maxsize <= 0:
return False
else:
return self.qsize() >= self._maxsize
[docs]
async def put(self, item: T) -> None:
"""Put an *item* into the queue.
If the queue is full, wait until a free
slot is available before adding the item.
"""
while self.full():
event = Event()
self._putters.append((event, cocotb.task.current_task()))
await event.wait()
self.put_nowait(item)
[docs]
def put_nowait(self, item: T) -> None:
"""Put an *item* into the queue without blocking.
If no free slot is immediately available, raise :exc:`~cocotb.queue.QueueFull`.
"""
if self.full():
raise QueueFull()
self._put(item)
self._wakeup_next(self._getters)
[docs]
async def get(self) -> T:
"""Remove and return an item from the queue.
If the queue is empty, wait until an item is available.
"""
while self.empty():
event = Event()
self._getters.append((event, cocotb.task.current_task()))
await event.wait()
return self.get_nowait()
[docs]
def get_nowait(self) -> T:
"""Remove and return an item from the queue.
Return an item if one is immediately available, else raise
:exc:`~cocotb.queue.QueueEmpty`.
"""
if self.empty():
raise QueueEmpty()
item = self._get()
self._wakeup_next(self._putters)
return item
[docs]
class Queue(AbstractQueue[T]):
"""A subclass of :class:`AbstractQueue`; retrieves oldest entries first (FIFO)."""
def __init__(self, maxsize: int = 0) -> None:
super().__init__(maxsize)
self._queue: Deque[T] = collections.deque()
def _put(self, item: T) -> None:
self._queue.append(item)
def _get(self) -> T:
return self._queue.popleft()
def _size(self) -> int:
return len(self._queue)
def _repr(self) -> str:
return repr(self._queue)
[docs]
class PriorityQueue(AbstractQueue[T]):
r"""A subclass of :class:`AbstractQueue`; retrieves entries in priority order (smallest item first).
Entries are typically tuples of the form ``(priority number, data)``.
"""
def __init__(self, maxsize: int = 0) -> None:
super().__init__(maxsize)
self._queue: List[T] = []
def _put(self, item: T) -> None:
heapq.heappush(self._queue, item)
def _get(self) -> T:
return heapq.heappop(self._queue)
def _size(self) -> int:
return len(self._queue)
def _repr(self) -> str:
return repr(self._queue)
[docs]
class LifoQueue(AbstractQueue[T]):
"""A subclass of :class:`AbstractQueue`; retrieves most recently added entries first (LIFO)."""
def __init__(self, maxsize: int = 0) -> None:
super().__init__(maxsize)
self._queue: Deque[T] = collections.deque()
def _put(self, item: T) -> None:
self._queue.append(item)
def _get(self) -> T:
return self._queue.pop()
def _size(self) -> int:
return len(self._queue)
def _repr(self) -> str:
return repr(self._queue)