Source code for cocotb._extended_awaitables

# Copyright cocotb contributors
# Copyright (c) 2013 Potential Ventures Ltd
# Copyright (c) 2013 SolarFlare Communications Inc
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause

"""A collection of triggers which a testbench can :keyword:`await`."""

from abc import abstractmethod
from decimal import Decimal
from typing import (
    Any,
    Awaitable,
    Coroutine,
    Generator,
    List,
    Optional,
    Type,
    TypeVar,
    Union,
    cast,
    overload,
)

import cocotb.handle
from cocotb._base_triggers import NullTrigger, Trigger, _InternalEvent
from cocotb._gpi_triggers import FallingEdge, RisingEdge, Timer, ValueChange
from cocotb._typing import RoundMode, TimeUnit
from cocotb.task import Task

T = TypeVar("T")


class Waitable(Awaitable[T]):
    """A Trigger-like object that can be implemented using coroutines.

    This converts a ``_wait`` abstract method into a suitable ``__await__``.
    """

[docs] @abstractmethod async def _wait(self) -> T: """The coroutine function which implements the functionality of the Waitable."""
def __await__(self) -> Generator[Trigger, None, T]: return self._wait().__await__() class _AggregateWaitable(Waitable[T]): """Base class for :class:`Combine` and :class:`First`.""" def __init__(self, *trigger: Union[Trigger, Waitable[Any], Task[Any]]) -> None: self._triggers = trigger # Do some basic type-checking up front, rather than waiting until we # await them. allowed_types = (Trigger, Waitable, Task) for t in self._triggers: if not isinstance(t, allowed_types): raise TypeError( f"All triggers must be instances of Trigger! Got: {type(t).__qualname__}" ) def __repr__(self) -> str: # no _pointer_str here, since this is not a trigger, so identity # doesn't matter. return "{}({})".format( type(self).__qualname__, ", ".join(repr(t) for t in self._triggers), ) async def _wait_callback(trigger: Awaitable[T]) -> T: return await trigger class Combine(_AggregateWaitable["Combine"]): r"""Trigger that fires when all *triggers* have fired. :keyword:`await`\ ing this returns the :class:`Combine` object. This is similar to Verilog's ``join``. See :ref:`combine-tutorial` for an example. Args: trigger: One or more :keyword:`await`\ able objects. Raises: TypeError: When an unsupported *trigger* object is passed. """ async def _wait(self) -> "Combine": if len(self._triggers) == 0: await NullTrigger() elif len(self._triggers) == 1: await self._triggers[0] else: waiters: List[Task[object]] = [] completed: List[Task[object]] = [] done = _InternalEvent(self) exception: Union[BaseException, None] = None def on_done( task: Task[object], ) -> None: # have to check cancelled first otherwise exception() will throw if task.cancelled(): completed.append(task) if len(completed) == len(waiters): done.set() return e = task.exception() if e is not None: nonlocal exception exception = e done.set() else: completed.append(task) if len(completed) == len(waiters): done.set() # start a parallel task for each trigger for t in self._triggers: task = Task[object](_wait_callback(t)) task._add_done_callback(on_done) cocotb.start_soon(task) waiters.append(task) try: # wait for the last waiter to complete await done finally: # kill remaining waiters for w in waiters: w.cancel() if exception is not None: raise exception return self class First(_AggregateWaitable[object]): r"""Fires when the first trigger in *triggers* fires. :keyword:`await`\ ing this object returns the result of the first trigger that fires. This is similar to Verilog's ``join_any``. See :ref:`first-tutorial` for an example. Args: trigger: One or more :keyword:`await`\ able objects. Raises: TypeError: When an unsupported *trigger* object is passed. ValueError: When no triggers are passed. .. note:: The event loop is single threaded, so while events may be simultaneous in simulation time, they can never be simultaneous in real time. For this reason, the value of ``t_ret is t1`` in the following example is implementation-defined, and will vary by simulator:: t1 = Timer(10, unit="ps") t2 = Timer(10, unit="ps") t_ret = await First(t1, t2) .. note:: In the old-style :ref:`generator-based coroutines <yield-syntax>`, ``t = yield [a, b]`` was another spelling of ``t = yield First(a, b)``. This spelling is no longer available when using :keyword:`await`-based coroutines. """ def __init__(self, *trigger: Union[Trigger, Waitable[Any], Task[Any]]) -> None: if not trigger: raise ValueError("First() requires at least one Trigger or Task argument") super().__init__(*trigger) async def _wait(self) -> object: if len(self._triggers) == 1: return await self._triggers[0] waiters: List[Task[object]] = [] done = _InternalEvent(self) completed: List[Task[object]] = [] def on_done(task: Task[object]) -> None: completed.append(task) done.set() # start a parallel task for each trigger for t in self._triggers: task = Task[object](_wait_callback(t)) task._add_done_callback(on_done) cocotb.start_soon(task) waiters.append(task) try: # wait for a waiter to complete await done finally: # kill all the other waiters for w in waiters: w.cancel() return completed[0].result() class ClockCycles(Waitable["ClockCycles"]): r"""Finishes after *num_cycles* transitions of *signal*. :keyword:`await`\ ing this Trigger returns the ClockCycle object. Args: signal: The signal to monitor. num_cycles: The number of cycles to count. rising: If ``True``, count rising edges; if ``False``, count falling edges. edge_type: The kind of :ref:`edge-triggers` to count. .. warning:: On many simulators transitions occur when the signal changes value from non-``0`` to ``0`` or non-``1`` to ``1``, not just from ``1`` to ``0`` or ``0`` to ``1``. .. versionadded:: 2.0 Passing the edge trigger type: :class:`.RisingEdge`, :class:`.FallingEdge`, or :class:`.ValueChange` as the third positional argument or by the keyword *edge_type*. """ @overload def __init__( self, signal: "cocotb.handle.LogicObject", num_cycles: int, ) -> None: ... @overload def __init__( self, signal: "cocotb.handle.LogicObject", num_cycles: int, edge_type: Union[ Type[RisingEdge], Type[FallingEdge], Type[ValueChange], None ] = None, ) -> None: ... @overload def __init__( self, signal: "cocotb.handle.LogicObject", num_cycles: int, *, rising: bool ) -> None: ... def __init__( self, signal: "cocotb.handle.LogicObject", num_cycles: int, edge_type: Union[ bool, Type[RisingEdge], Type[FallingEdge], Type[ValueChange], None ] = None, *, rising: Union[bool, None] = None, ) -> None: self._signal = signal self._num_cycles = num_cycles self._edge_type: Union[Type[RisingEdge], Type[FallingEdge], Type[ValueChange]] if edge_type is not None and rising is not None: raise TypeError("Passed more than one edge selection argument.") elif edge_type is True: self._edge_type = RisingEdge elif edge_type is False: self._edge_type = FallingEdge elif edge_type is not None: self._edge_type = edge_type elif rising is not None: self._edge_type = RisingEdge if rising else FallingEdge else: # default if no argument is passed self._edge_type = RisingEdge @property def signal(self) -> "cocotb.handle.LogicObject": """The signal being monitored.""" return self._signal @property def num_cycles(self) -> int: """The number of cycles to wait.""" return self._num_cycles @property def edge_type( self, ) -> Union[Type[RisingEdge], Type[FallingEdge], Type[ValueChange]]: """The type of edge trigger used.""" return self._edge_type async def _wait(self) -> "ClockCycles": trigger = self._edge_type(self._signal) for _ in range(self._num_cycles): await trigger return self def __repr__(self) -> str: return f"{type(self).__qualname__}({self._signal._path}, {self._num_cycles}, {self._edge_type.__qualname__})" class SimTimeoutError(TimeoutError): """Exception thrown when a timeout, in terms of simulation time, occurs.""" TriggerT = TypeVar("TriggerT", bound=Trigger) @overload async def with_timeout( trigger: TriggerT, timeout_time: Union[float, Decimal], timeout_unit: TimeUnit = "step", round_mode: Optional[RoundMode] = None, ) -> TriggerT: ... @overload async def with_timeout( trigger: Waitable[T], timeout_time: Union[float, Decimal], timeout_unit: TimeUnit = "step", round_mode: Optional[RoundMode] = None, ) -> T: ... @overload async def with_timeout( trigger: Task[T], timeout_time: Union[float, Decimal], timeout_unit: TimeUnit = "step", round_mode: Optional[RoundMode] = None, ) -> T: ... @overload async def with_timeout( trigger: Coroutine[Trigger, None, T], timeout_time: Union[float, Decimal], timeout_unit: TimeUnit = "step", round_mode: Optional[RoundMode] = None, ) -> T: ... async def with_timeout( trigger: Union[TriggerT, Waitable[T], Task[T], Coroutine[Trigger, None, T]], timeout_time: Union[float, Decimal], timeout_unit: TimeUnit = "step", round_mode: Optional[RoundMode] = None, ) -> Union[T, TriggerT]: r"""Wait on triggers or coroutines, throw an exception if it waits longer than the given time. When a :term:`python:coroutine` is passed, the callee coroutine is started, the caller blocks until the callee completes, and the callee's result is returned to the caller. If timeout occurs, the callee is killed and :exc:`SimTimeoutError` is raised. When a :term:`task` is passed, the caller blocks until the callee completes and the callee's result is returned to the caller. If timeout occurs, the callee `continues to run` and :exc:`SimTimeoutError` is raised. If a :class:`~cocotb.triggers.Trigger` or :class:`~cocotb.triggers.Waitable` is passed, the caller blocks until the trigger fires, and the trigger is returned to the caller. If timeout occurs, the trigger is cancelled and :exc:`SimTimeoutError` is raised. Usage: .. code-block:: python await with_timeout(coro, 100, "ns") await with_timeout(First(coro, event.wait()), 100, "ns") Args: trigger: A single object that could be right of an :keyword:`await` expression in cocotb. timeout_time: Simulation time duration before timeout occurs. timeout_unit: Unit of timeout_time, accepts any unit that :class:`~cocotb.triggers.Timer` does. round_mode: String specifying how to handle time values that sit between time steps (one of ``'error'``, ``'round'``, ``'ceil'``, ``'floor'``, ``None``). A ``None`` argument is converted to the current value of :attr:`.Timer.round_mode`. Returns: First trigger that completed if timeout did not occur. Raises: :exc:`SimTimeoutError`: If timeout occurs. .. versionadded:: 1.3 .. versionchanged:: 1.7 Support passing :term:`python:coroutine`\ s. .. versionchanged:: 2.0 Passing ``None`` as the *timeout_unit* argument was removed, use ``'step'`` instead. """ if isinstance(trigger, Coroutine): trigger = cocotb.start_soon(trigger) shielded = False else: shielded = True timeout_timer = Timer(timeout_time, timeout_unit, round_mode=round_mode) res = await First(timeout_timer, trigger) if res is timeout_timer: if not shielded: # shielded = False only when trigger is a Task created to wrap a Coroutine task = cast("Task[object]", trigger) task.cancel() raise SimTimeoutError else: return cast("T | TriggerT", res)