Source code for cocotb._base_triggers

# Copyright cocotb contributors
# Copyright (c) 2013 Potential Ventures Ltd
# Copyright (c) 2013 SolarFlare Communications Inc
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause

"""A collection of triggers which a testbench can :keyword:`await`."""

import logging
import warnings
from typing import (
    AsyncContextManager,
    Callable,
    Generator,
    List,
    Optional,
    Union,
)

from cocotb._deprecation import deprecated
from cocotb._py_compat import Self, cached_property
from cocotb._utils import pointer_str


class Trigger:
    """A future event that a Task can wait upon."""

    def __init__(self) -> None:
        self._primed = False

    @cached_property
    def _log(self) -> logging.Logger:
        return logging.getLogger(f"cocotb.{type(self).__qualname__}.0x{id(self):x}")

    def _prime(self, callback: Callable[["Self"], None]) -> None:
        """Set a callback to be invoked when the trigger fires.

        The callback will be invoked with a single argument, `self`.

        Sub-classes must override this, but should end by calling the base class
        method.

        .. warning::
            Do not call this directly within a :term:`task`. It is intended to be used
            only by the scheduler.
        """
        # Set _primed so the trigger can test if it's already been primed and behave appropriately.
        self._primed = True

    def _unprime(self) -> None:
        """Remove the callback, and perform cleanup if necessary.

        After being un-primed, a Trigger may be re-primed again in the future.
        Calling `_unprime` multiple times is allowed, subsequent calls should be
        a no-op.

        Sub-classes may override this, but should end by calling the base class
        method.

        .. warning::
            Do not call this directly within a :term:`task`. It is intended to be used
            only by the scheduler.
        """
        self._cleanup()

    def _cleanup(self) -> None:
        # Clear _primed so this Trigger can be re-primed.
        self._primed = False

    def __await__(self) -> Generator["Self", None, "Self"]:
        yield self
        return self


class _Event(Trigger):
    """Unique instance used by the Event object.

    One created for each attempt to wait on the event so that the scheduler
    can maintain a unique mapping of triggers to tasks.
    """

    _callback: Callable[["_Event"], None]

    def __init__(self, parent: "Event") -> None:
        super().__init__()
        self._parent = parent

    def _prime(self, callback: Callable[["_Event"], None]) -> None:
        if self._primed:
            return
        if self._parent.is_set():
            # If the event is already set, we need to call the callback
            # immediately, so we don't need to wait for the scheduler.
            callback(self)
            return
        self._callback = callback
        return super()._prime(callback)

    def _unprime(self) -> None:
        if not self._primed:
            return
        return super()._unprime()

    def _set(self) -> None:
        if self._primed:
            self._callback(self)

    def __repr__(self) -> str:
        return f"<{self._parent!r}.wait() at {pointer_str(self)}>"


class Event:
    r"""A way to signal an event across :class:`~cocotb.task.Task`\ s.

    :keyword:`await`\ ing the result of :meth:`wait()` will block the :keyword:`await`\ ing :class:`~cocotb.task.Task`
    until :meth:`set` is called.

    Args:
        name: Name for the Event.

    Usage:
        .. code-block:: python

            e = Event()


            async def task1():
                await e.wait()
                print("resuming!")


            cocotb.start_soon(task1())
            # do stuff
            e.set()
            await NullTrigger()  # allows task1 to execute
            # resuming!

    .. versionremoved:: 2.0

        Removed the undocumented *data* attribute and argument to :meth:`set`,
        and the *name* attribute and argument to the constructor.
    """

    def __init__(self, name: Optional[str] = None) -> None:
        self._event: _Event = _Event(self)
        self._name: Union[str, None] = None
        if name is not None:
            warnings.warn(
                "The 'name' argument will be removed in a future release.",
                DeprecationWarning,
                stacklevel=2,
            )
            self.name = name
        self._fired: bool = False
        self._data: object = None

    @property
    @deprecated("The 'name' field will be removed in a future release.")
    def name(self) -> Union[str, None]:
        """Name of the Event.

        .. deprecated:: 2.0
            The *name* field will be removed in a future release.
        """
        return self._name

    @name.setter
    @deprecated("The 'name' field will be removed in a future release.")
    def name(self, new_name: Union[str, None]) -> None:
        self._name = new_name

    @property
    @deprecated("The data field will be removed in a future release.")
    def data(self) -> object:
        """The data associated with the Event.

        .. deprecated:: 2.0
            The data field will be removed in a future release.
            Use a separate variable to store the data instead.
        """
        return self._data

    @data.setter
    @deprecated("The data field will be removed in a future release.")
    def data(self, new_data: object) -> None:
        self._data = new_data

[docs] def set(self, data: Optional[object] = None) -> None: """Set the Event and unblock all Tasks blocked on this Event.""" self._fired = True if data is not None: warnings.warn( "The data field will be removed in a future release.", DeprecationWarning, stacklevel=2, ) self._data = data self._event._set()
[docs] def wait(self) -> Trigger: """Block the current Task until the Event is set. If the event has already been set, the trigger will fire immediately. To set the Event call :meth:`set`. To reset the Event (and enable the use of :meth:`wait` again), call :meth:`clear`. """ return self._event
[docs] def clear(self) -> None: """Clear this event that has been set. Subsequent calls to :meth:`~cocotb.triggers.Event.wait` will block until :meth:`~cocotb.triggers.Event.set` is called again. """ self._fired = False
[docs] def is_set(self) -> bool: """Return ``True`` if event has been set.""" return self._fired
def __repr__(self) -> str: if self._name is None: fmt = "<{0} at {2}>" else: fmt = "<{0} for {1} at {2}>" return fmt.format(type(self).__qualname__, self._name, pointer_str(self)) class _InternalEvent(Trigger): """Event used internally for triggers that need cross-:class:`~cocotb.task.Task` synchronization. This Event can only be waited on once, by a single :class:`~cocotb.task.Task`. Provides transparent :func`repr` pass-through to the :class:`Trigger` using this event, providing a better debugging experience. """ def __init__(self, parent: object) -> None: super().__init__() self._parent = parent self._callback: Optional[Callable[[_InternalEvent], None]] = None self.fired: bool = False def _prime(self, callback: Callable[["_InternalEvent"], None]) -> None: if self._primed: raise RuntimeError("This Trigger may only be awaited once") self._callback = callback super()._prime(callback) if self.fired: self._callback(self) def _cleanup(self) -> None: # Don't clear _primed so a second call to _prime() fails. pass def set(self) -> None: """Wake up coroutine blocked on this event.""" self.fired = True if self._callback is not None: self._callback(self) def is_set(self) -> bool: """Return true if event has been set.""" return self.fired def __await__( self, ) -> Generator["Self", None, "Self"]: if self._primed: raise RuntimeError("Only one Task may await this Trigger") yield self return self def __repr__(self) -> str: return repr(self._parent) class _Lock(Trigger): """Unique instance used by the Lock object. One created for each attempt to acquire the Lock so that the scheduler can maintain a unique mapping of triggers to tasks. """ def __init__(self, parent: "Lock") -> None: super().__init__() self._parent = parent def _prime(self, callback: Callable[["Self"], None]) -> None: if self._primed: raise RuntimeError( "Lock.acquire() result can only be used by one task at a time" ) self._callback = callback self._parent._prime_lock(self) return super()._prime(callback) def _unprime(self) -> None: if not self._primed: return self._parent._unprime_lock(self) return super()._unprime() def __repr__(self) -> str: return f"<{self._parent!r}.acquire() at {pointer_str(self)}>" class Lock(AsyncContextManager[None]): """A mutual exclusion lock. Guarantees fair scheduling. Lock acquisition is given in order of attempted lock acquisition. Usage: By directly calling :meth:`acquire` and :meth:`release`. .. code-block:: python lock = Lock() ... await lock.acquire() try: # do some stuff ... finally: lock.release() Or... .. code-block:: python async with Lock(): # do some stuff ... .. versionchanged:: 1.4 The lock can be used as an asynchronous context manager in an :keyword:`async with` statement """ def __init__(self, name: Optional[str] = None) -> None: self._pending_primed: List[_Lock] = [] self._name: Union[str, None] = None if name is not None: warnings.warn( "The 'name' argument will be removed in a future release.", DeprecationWarning, stacklevel=2, ) self._name = name self._locked: bool = False @property @deprecated("The 'name' field will be removed in a future release.") def name(self) -> Union[str, None]: """Name of the Lock. .. deprecated:: 2.0 The *name* field will be removed in a future release. """ return self._name @name.setter @deprecated("The 'name' field will be removed in a future release.") def name(self, new_name: Union[str, None]) -> None: self._name = new_name
[docs] def locked(self) -> bool: """Return ``True`` if the lock has been acquired. .. versionchanged:: 2.0 This is now a method to match :meth:`asyncio.Lock.locked`, rather than an attribute. """ return self._locked
def _acquire_and_fire(self, lock: _Lock) -> None: self._locked = True lock._callback(lock) def _prime_lock(self, lock: _Lock) -> None: if not self._locked: self._acquire_and_fire(lock) else: self._pending_primed.append(lock) def _unprime_lock(self, lock: _Lock) -> None: if lock in self._pending_primed: self._pending_primed.remove(lock)
[docs] def acquire(self) -> Trigger: """Produce a trigger which fires when the lock is acquired.""" trig = _Lock(self) return trig
[docs] def release(self) -> None: """Release the lock.""" if not self._locked: raise RuntimeError(f"Attempt to release an unacquired Lock {self!s}") self._locked = False # nobody waiting for this lock if not self._pending_primed: return lock = self._pending_primed.pop(0) self._acquire_and_fire(lock)
def __repr__(self) -> str: if self._name is None: fmt = "<{0} [{2} waiting] at {3}>" else: fmt = "<{0} for {1} [{2} waiting] at {3}>" return fmt.format( type(self).__qualname__, self._name, len(self._pending_primed), pointer_str(self), ) async def __aenter__(self) -> None: await self.acquire() async def __aexit__(self, *args: object) -> None: self.release() class NullTrigger(Trigger): """Trigger that fires immediately. Mostly useful when building or using higher-order functions which need to take or return Triggers. The scheduling order of the Task awaiting this Trigger with respect to any other Task is not deterministic and should generally not be relied upon. Instead of using this Trigger to push the Task until "after" another Task has run, use other synchronization techniques, such as using an :class:`.Event`. **Do not** do this: .. code-block:: python transaction_data = None def monitor(): while dut.valid.value != 1 and dut.ready.value != 1: await RisingEdge(dut.clk) transaction_data = dut.data.value def use_transaction(): while True: await RisingEdge(dut.clk) # We need the NullTrigger here because both Tasks react to RisingEdge, # but there's no guarantee about which Task is run first, # so we need to force this one to run "later" using NullTrigger. await NullTrigger() if transaction_data is not None: process(transaction_data) use_task = cocotb.start_soon(use_transaction()) monitor_task = cocotb.start_soon(monitor()) Instead use an :class:`!.Event` to explicitly synchronize the two Tasks, like so: .. code-block:: python transaction_data = None transaction_event = Event() def monitor(): while dut.valid.value != 1 and dut.ready.value != 1: await RisingEdge(dut.clk) transaction_data = dut.data.value transaction_event.set() def use_transaction(): # Now we don't need the NullTrigger. # This Task will wake up *strictly* after `monitor_task` sets the transaction. await transaction_event.wait() process(transaction_data) use_task = cocotb.start_soon(use_transaction()) monitor_task = cocotb.start_soon(monitor()) .. versionremoved:: 2.0 The *outcome* parameter was removed. There is no alternative. """ def __init__(self, name: Optional[str] = None) -> None: super().__init__() self.name = name def _prime(self, callback: Callable[["Self"], None]) -> None: if self._primed: return callback(self) return super()._prime(callback) def __repr__(self) -> str: if self.name is None: fmt = "<{0} at {2}>" else: fmt = "<{0} for {1} at {2}>" return fmt.format(type(self).__qualname__, self.name, pointer_str(self))