# Copyright cocotb contributors
# Copyright (c) 2013, 2018 Potential Ventures Ltd
# Copyright (c) 2013 SolarFlare Communications Inc
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause
"""All things relating to regression capabilities."""
import functools
import hashlib
import inspect
import logging
import os
import random
import re
import time
import warnings
from enum import auto
from importlib import import_module
from typing import (
Callable,
Coroutine,
List,
Union,
)
import cocotb
import cocotb._gpi_triggers
import cocotb.handle
from cocotb import _ANSI, simulator
from cocotb._base_triggers import Trigger
from cocotb._decorators import Parameterized, Test
from cocotb._extended_awaitables import SimTimeoutError, with_timeout
from cocotb._gpi_triggers import GPITrigger, Timer
from cocotb._outcomes import Error, Outcome
from cocotb._test import RunningTest
from cocotb._test_factory import TestFactory
from cocotb._test_functions import Failed
from cocotb._utils import (
DocEnum,
remove_traceback_frames,
safe_divide,
want_color_output,
)
from cocotb._xunit_reporter import XUnitReporter
from cocotb.task import Task
from cocotb.utils import get_sim_time
__all__ = (
"Parameterized",
"RegressionManager",
"RegressionMode",
"SimFailure",
"Test",
"TestFactory",
)
# Set __module__ on re-exports
Parameterized.__module__ = __name__
Test.__module__ = __name__
TestFactory.__module__ = __name__
[docs]
class SimFailure(BaseException):
"""A Test failure due to simulator failure."""
_logger = logging.getLogger(__name__)
def _format_doc(docstring: Union[str, None]) -> str:
if docstring is None:
return ""
else:
brief = docstring.split("\n")[0]
return f"\n {brief}"
[docs]
class RegressionMode(DocEnum):
"""The mode of the :class:`RegressionManager`."""
REGRESSION = (
auto(),
"""Tests are run if included. Skipped tests are skipped, expected failures and errors are respected.""",
)
TESTCASE = (
auto(),
"""Like :attr:`REGRESSION`, but skipped tests are *not* skipped if included.""",
)
class _TestResults:
# TODO Replace with dataclass in Python 3.7+
def __init__(
self,
test_fullname: str,
passed: Union[None, bool],
wall_time_s: float,
sim_time_ns: float,
) -> None:
self.test_fullname = test_fullname
self.passed = passed
self.wall_time_s = wall_time_s
self.sim_time_ns = sim_time_ns
@property
def ratio(self) -> float:
return safe_divide(self.sim_time_ns, self.wall_time_s)
[docs]
class RegressionManager:
"""Object which manages tests.
This object uses the builder pattern to build up a regression.
Tests are added using :meth:`register_test` or :meth:`discover_tests`.
Inclusion filters for tests can be added using :meth:`add_filters`.
The "mode" of the regression can be controlled using :meth:`set_mode`.
These methods can be called in any order any number of times before :meth:`start_regression` is called,
and should not be called again after that.
Once all the tests, filters, and regression behavior configuration is done,
the user starts the regression with :meth:`start_regression`.
This method must be called exactly once.
Until the regression is started, :attr:`total_tests`, :attr:`count`, :attr:`passed`,
:attr:`skipped`, and :attr:`failures` hold placeholder values.
"""
_timer1 = Timer(1)
def __init__(self) -> None:
self._test: Test
self._running_test: RunningTest
self.log = _logger
self._regression_start_time: float
self._test_results: List[_TestResults] = []
self.total_tests = 0
"""Total number of tests that will be run or skipped."""
self.count = 0
"""The current test count."""
self.passed = 0
"""The current number of passed tests."""
self.skipped = 0
"""The current number of skipped tests."""
self.failures = 0
"""The current number of failed tests."""
self._tearing_down = False
self._test_queue: List[Test] = []
self._filters: List[re.Pattern[str]] = []
self._mode = RegressionMode.REGRESSION
self._included: List[bool]
self._sim_failure: Union[Error[None], None] = None
# Setup XUnit
###################
results_filename = os.getenv("COCOTB_RESULTS_FILE", "results.xml")
suite_name = os.getenv("COCOTB_RESULT_TESTSUITE", "all")
package_name = os.getenv("COCOTB_RESULT_TESTPACKAGE", "all")
self.xunit = XUnitReporter(filename=results_filename)
self.xunit.add_testsuite(name=suite_name, package=package_name)
self.xunit.add_property(name="random_seed", value=str(cocotb.RANDOM_SEED))
[docs]
def discover_tests(self, *modules: str) -> None:
"""Discover tests in files automatically.
Should be called before :meth:`start_regression` is called.
Args:
modules: Each argument given is the name of a module where tests are found.
"""
for module_name in modules:
mod = import_module(module_name)
found_test: bool = False
for obj_name, obj in vars(mod).items():
if isinstance(obj, Test):
found_test = True
self.register_test(obj)
elif isinstance(obj, Parameterized):
found_test = True
generated_tests: bool = False
for test in obj.generate_tests():
generated_tests = True
self.register_test(test)
if not generated_tests:
warnings.warn(
f"Parametrize object generated no tests: {module_name}.{obj_name}",
stacklevel=2,
)
if not found_test:
warnings.warn(
f"No tests were discovered in module: {module_name}", stacklevel=2
)
# error if no tests were discovered
if not self._test_queue:
modules_str = ", ".join(repr(m) for m in modules)
raise RuntimeError(f"No tests were discovered in any module: {modules_str}")
[docs]
def add_filters(self, *filters: str) -> None:
"""Add regular expressions to filter-in registered tests.
Only those tests which match at least one of the given filters are included;
the rest are excluded.
Should be called before :meth:`start_regression` is called.
Args:
filters: Each argument given is a regex pattern for test names.
A match *includes* the test.
"""
for filter in filters:
compiled_filter = re.compile(filter)
self._filters.append(compiled_filter)
[docs]
def set_mode(self, mode: RegressionMode) -> None:
"""Set the regression mode.
See :class:`RegressionMode` for more details on how each mode affects :class:`RegressionManager` behavior.
Should be called before :meth:`start_regression` is called.
Args:
mode: The regression mode to set.
"""
self._mode = mode
[docs]
def register_test(self, test: Test) -> None:
"""Register a test with the :class:`RegressionManager`.
Should be called before :meth:`start_regression` is called.
Args:
test: The test object to register.
"""
self.log.debug("Registered test %r", test.fullname)
self._test_queue.append(test)
[docs]
@classmethod
def setup_pytest_assertion_rewriting(cls) -> None:
"""Configure pytest to rewrite assertions for better failure messages.
Must be called before all modules containing tests are imported.
"""
try:
import pytest # noqa: PLC0415
except ImportError:
_logger.info(
"pytest not found, install it to enable better AssertionError messages"
)
return
try:
# Install the assertion rewriting hook, which must be done before we
# import the test modules.
from _pytest.assertion import install_importhook # noqa: PLC0415
from _pytest.config import Config # noqa: PLC0415
python_files = os.getenv("COCOTB_REWRITE_ASSERTION_FILES", "*.py").strip()
if not python_files:
# Even running the hook causes exceptions in some cases, so if the user
# selects nothing, don't install the hook at all.
return
pytest_conf = Config.fromdictargs(
{}, ["--capture=no", "-o", f"python_files={python_files}"]
)
install_importhook(pytest_conf)
except Exception:
_logger.exception(
"Configuring the assertion rewrite hook using pytest %s failed. "
"Please file a bug report!",
pytest.__version__,
)
[docs]
def start_regression(self) -> None:
"""Start the regression."""
# sort tests into stages
self._test_queue.sort(key=lambda test: test.stage)
# mark tests for running
if self._filters:
self._included = [False] * len(self._test_queue)
for i, test in enumerate(self._test_queue):
for filter in self._filters:
if filter.search(test.fullname):
self._included[i] = True
else:
self._included = [True] * len(self._test_queue)
# compute counts
self.count = 1
self.total_tests = sum(self._included)
if self.total_tests == 0:
self.log.warning(
"No tests left after filtering with: %s",
", ".join(f.pattern for f in self._filters),
)
# start write scheduler
cocotb.handle._start_write_scheduler()
# start test loop
self._regression_start_time = time.time()
self._first_test = True
self._execute()
def _execute(self) -> None:
"""Run the main regression loop.
Used by :meth:`start_regression` and :meth:`_test_complete` to continue to the main test running loop,
and by :meth:`_fail_regression` to shutdown the regression when a simulation failure occurs.
"""
while self._test_queue:
self._test = self._test_queue.pop(0)
included = self._included.pop(0)
# if the test is not included, record and continue
if not included:
self._record_test_excluded()
continue
# if the test is skipped, record and continue
if self._test.skip and self._mode != RegressionMode.TESTCASE:
self._record_test_skipped()
continue
# if the test should be run, but the simulator has failed, record and continue
if self._sim_failure is not None:
self._score_test(
self._sim_failure,
0,
0,
)
continue
# initialize the test, if it fails, record and continue
try:
self._running_test = self._init_test()
except Exception:
self._record_test_init_failed()
continue
self._log_test_start()
if self._first_test:
self._first_test = False
return self._schedule_next_test()
else:
return self._timer1._prime(self._schedule_next_test)
return self._tear_down()
def _init_test(self) -> RunningTest:
# wrap test function in timeout
func: Callable[..., Coroutine[Trigger, None, None]]
timeout = self._test.timeout_time
if timeout is not None:
f = self._test.func
@functools.wraps(f)
async def func(*args: object, **kwargs: object) -> None:
running_co = Task(f(*args, **kwargs))
try:
await with_timeout(running_co, timeout, self._test.timeout_unit)
except SimTimeoutError:
running_co.cancel()
raise
else:
func = self._test.func
main_task = Task(func(cocotb.top), name=f"Test {self._test.name}")
return RunningTest(self._test_complete, main_task)
def _schedule_next_test(self, trigger: Union[GPITrigger, None] = None) -> None:
if trigger is not None:
# TODO move to Trigger object
cocotb._gpi_triggers._current_gpi_trigger = trigger
trigger._cleanup()
# seed random number generator based on test module, name, and COCOTB_RANDOM_SEED
hasher = hashlib.sha1()
hasher.update(self._test.fullname.encode())
seed = cocotb.RANDOM_SEED + int(hasher.hexdigest(), 16)
random.seed(seed)
self._start_sim_time = get_sim_time("ns")
self._start_time = time.time()
self._running_test.start()
def _tear_down(self) -> None:
"""Called by :meth:`_execute` when there are no more tests to run to finalize the regression."""
# prevent re-entering the tear down procedure
if not self._tearing_down:
self._tearing_down = True
else:
return
assert not self._test_queue
# stop the write scheduler
cocotb.handle._stop_write_scheduler()
# Write out final log messages
self._log_test_summary()
# Generate output reports
self.xunit.write()
# TODO refactor initialization and finalization into their own module
# to prevent circular imports requiring local imports
from cocotb._init import _shutdown_testbench # noqa: PLC0415
_shutdown_testbench()
# Setup simulator finalization
simulator.stop_simulator()
def _test_complete(self) -> None:
"""Callback given to the test to be called when the test finished."""
# compute wall time
wall_time = time.time() - self._start_time
sim_time_ns = get_sim_time("ns") - self._start_sim_time
# Judge and record pass/fail.
self._score_test(
self._running_test.result(),
wall_time,
sim_time_ns,
)
# Run next test.
return self._execute()
def _score_test(
self,
outcome: Outcome[None],
wall_time_s: float,
sim_time_ns: float,
) -> None:
test = self._test
# score test
passed: bool
msg: Union[str, None]
exc: Union[BaseException, None]
try:
outcome.get()
except BaseException as e:
passed, msg = False, None
exc = remove_traceback_frames(e, ["_score_test", "get"])
else:
passed, msg, exc = True, None, None
if passed:
if test.expect_error:
self._record_test_failed(
wall_time_s=wall_time_s,
sim_time_ns=sim_time_ns,
result=exc,
msg="passed but we expected an error",
)
passed = False
elif test.expect_fail:
self._record_test_failed(
wall_time_s=wall_time_s,
sim_time_ns=sim_time_ns,
result=exc,
msg="passed but we expected a failure",
)
passed = False
else:
self._record_test_passed(
wall_time_s=wall_time_s,
sim_time_ns=sim_time_ns,
result=None,
msg=msg,
)
elif test.expect_fail:
if isinstance(exc, (AssertionError, Failed)):
self._record_test_passed(
wall_time_s=wall_time_s,
sim_time_ns=sim_time_ns,
result=None,
msg="failed as expected",
)
else:
self._record_test_failed(
wall_time_s=wall_time_s,
sim_time_ns=sim_time_ns,
result=exc,
msg="expected failure, but errored with unexpected type",
)
passed = False
elif test.expect_error:
if isinstance(exc, test.expect_error):
self._record_test_passed(
wall_time_s=wall_time_s,
sim_time_ns=sim_time_ns,
result=None,
msg="errored as expected",
)
else:
self._record_test_failed(
wall_time_s=wall_time_s,
sim_time_ns=sim_time_ns,
result=exc,
msg="errored with unexpected type",
)
passed = False
else:
self._record_test_failed(
wall_time_s=wall_time_s,
sim_time_ns=sim_time_ns,
result=exc,
msg=msg,
)
def _get_lineno(self, test: Test) -> int:
try:
return inspect.getsourcelines(test.func)[1]
except OSError:
return 1
def _log_test_start(self) -> None:
"""Called by :meth:`_execute` to log that a test is starting."""
hilight_start = _ANSI.COLOR_TEST if want_color_output() else ""
hilight_end = _ANSI.COLOR_DEFAULT if want_color_output() else ""
self.log.info(
"%srunning%s %s (%d/%d)%s",
hilight_start,
hilight_end,
self._test.fullname,
self.count,
self.total_tests,
_format_doc(self._test.doc),
)
def _record_test_excluded(self) -> None:
"""Called by :meth:`_execute` when a test is excluded by filters."""
# write out xunit results
lineno = self._get_lineno(self._test)
self.xunit.add_testcase(
name=self._test.name,
classname=self._test.module,
file=inspect.getfile(self._test.func),
lineno=repr(lineno),
time=repr(0),
sim_time_ns=repr(0),
ratio_time=repr(0),
)
self.xunit.add_skipped()
# do not log anything, nor save details for the summary
def _record_test_skipped(self) -> None:
"""Called by :meth:`_execute` when a test is skipped."""
# log test results
hilight_start = _ANSI.COLOR_SKIPPED if want_color_output() else ""
hilight_end = _ANSI.COLOR_DEFAULT if want_color_output() else ""
self.log.info(
"%sskipping%s %s (%d/%d)%s",
hilight_start,
hilight_end,
self._test.fullname,
self.count,
self.total_tests,
_format_doc(self._test.doc),
)
# write out xunit results
lineno = self._get_lineno(self._test)
self.xunit.add_testcase(
name=self._test.name,
classname=self._test.module,
file=inspect.getfile(self._test.func),
lineno=repr(lineno),
time=repr(0),
sim_time_ns=repr(0),
ratio_time=repr(0),
)
self.xunit.add_skipped()
# save details for summary
self._test_results.append(
_TestResults(
test_fullname=self._test.fullname,
passed=None,
sim_time_ns=0,
wall_time_s=0,
)
)
# update running passed/failed/skipped counts
self.skipped += 1
self.count += 1
def _record_test_init_failed(self) -> None:
"""Called by :meth:`_execute` when a test initialization fails."""
# log test results
hilight_start = _ANSI.COLOR_FAILED if want_color_output() else ""
hilight_end = _ANSI.COLOR_DEFAULT if want_color_output() else ""
self.log.exception(
"%sFailed to initialize%s %s! (%d/%d)%s",
hilight_start,
hilight_end,
self._test.fullname,
self.count,
self.total_tests,
_format_doc(self._test.doc),
)
# write out xunit results
lineno = self._get_lineno(self._test)
self.xunit.add_testcase(
name=self._test.name,
classname=self._test.module,
file=inspect.getfile(self._test.func),
lineno=repr(lineno),
time=repr(0),
sim_time_ns=repr(0),
ratio_time=repr(0),
)
self.xunit.add_failure(msg="Test initialization failed")
# save details for summary
self._test_results.append(
_TestResults(
test_fullname=self._test.fullname,
passed=False,
sim_time_ns=0,
wall_time_s=0,
)
)
# update running passed/failed/skipped counts
self.failures += 1
self.count += 1
def _record_test_passed(
self,
wall_time_s: float,
sim_time_ns: float,
result: Union[Exception, None],
msg: Union[str, None],
) -> None:
start_hilight = _ANSI.COLOR_PASSED if want_color_output() else ""
stop_hilight = _ANSI.COLOR_DEFAULT if want_color_output() else ""
if msg is None:
rest = ""
else:
rest = f": {msg}"
if result is None:
result_was = ""
else:
result_was = f" (result was {type(result).__qualname__})"
self.log.info(
"%s %spassed%s%s%s",
self._test.fullname,
start_hilight,
stop_hilight,
rest,
result_was,
)
# write out xunit results
ratio_time = safe_divide(sim_time_ns, wall_time_s)
lineno = self._get_lineno(self._test)
self.xunit.add_testcase(
name=self._test.name,
classname=self._test.module,
file=inspect.getfile(self._test.func),
lineno=repr(lineno),
time=repr(wall_time_s),
sim_time_ns=repr(sim_time_ns),
ratio_time=repr(ratio_time),
)
# update running passed/failed/skipped counts
self.passed += 1
self.count += 1
# save details for summary
self._test_results.append(
_TestResults(
test_fullname=self._test.fullname,
passed=True,
sim_time_ns=sim_time_ns,
wall_time_s=wall_time_s,
)
)
def _record_test_failed(
self,
wall_time_s: float,
sim_time_ns: float,
result: Union[BaseException, None],
msg: Union[str, None],
) -> None:
start_hilight = _ANSI.COLOR_FAILED if want_color_output() else ""
stop_hilight = _ANSI.COLOR_DEFAULT if want_color_output() else ""
if msg is None:
rest = ""
else:
rest = f": {msg}"
self.log.warning(
"%s%s %sfailed%s%s",
stop_hilight,
self._test.fullname,
start_hilight,
stop_hilight,
rest,
)
# write out xunit results
ratio_time = safe_divide(sim_time_ns, wall_time_s)
lineno = self._get_lineno(self._test)
self.xunit.add_testcase(
name=self._test.name,
classname=self._test.module,
file=inspect.getfile(self._test.func),
lineno=repr(lineno),
time=repr(wall_time_s),
sim_time_ns=repr(sim_time_ns),
ratio_time=repr(ratio_time),
)
self.xunit.add_failure(error_type=type(result).__name__, error_msg=str(result))
# update running passed/failed/skipped counts
self.failures += 1
self.count += 1
# save details for summary
self._test_results.append(
_TestResults(
test_fullname=self._test.fullname,
passed=False,
sim_time_ns=sim_time_ns,
wall_time_s=wall_time_s,
)
)
def _log_test_summary(self) -> None:
"""Called by :meth:`_tear_down` to log the test summary."""
real_time = time.time() - self._regression_start_time
sim_time_ns = get_sim_time("ns")
ratio_time = safe_divide(sim_time_ns, real_time)
if len(self._test_results) == 0:
return
TEST_FIELD = "TEST"
RESULT_FIELD = "STATUS"
SIM_FIELD = "SIM TIME (ns)"
REAL_FIELD = "REAL TIME (s)"
RATIO_FIELD = "RATIO (ns/s)"
TOTAL_NAME = f"TESTS={self.total_tests} PASS={self.passed} FAIL={self.failures} SKIP={self.skipped}"
TEST_FIELD_LEN = max(
len(TEST_FIELD),
len(TOTAL_NAME),
len(max([x.test_fullname for x in self._test_results], key=len)),
)
RESULT_FIELD_LEN = len(RESULT_FIELD)
SIM_FIELD_LEN = len(SIM_FIELD)
REAL_FIELD_LEN = len(REAL_FIELD)
RATIO_FIELD_LEN = len(RATIO_FIELD)
header_dict = {
"a": TEST_FIELD,
"b": RESULT_FIELD,
"c": SIM_FIELD,
"d": REAL_FIELD,
"e": RATIO_FIELD,
"a_len": TEST_FIELD_LEN,
"b_len": RESULT_FIELD_LEN,
"c_len": SIM_FIELD_LEN,
"d_len": REAL_FIELD_LEN,
"e_len": RATIO_FIELD_LEN,
}
LINE_LEN = (
3
+ TEST_FIELD_LEN
+ 2
+ RESULT_FIELD_LEN
+ 2
+ SIM_FIELD_LEN
+ 2
+ REAL_FIELD_LEN
+ 2
+ RATIO_FIELD_LEN
+ 3
)
LINE_SEP = "*" * LINE_LEN + "\n"
summary = ""
summary += LINE_SEP
summary += "** {a:<{a_len}} {b:^{b_len}} {c:>{c_len}} {d:>{d_len}} {e:>{e_len}} **\n".format(
**header_dict
)
summary += LINE_SEP
test_line = "** {a:<{a_len}} {start}{b:^{b_len}}{end} {c:>{c_len}.2f} {d:>{d_len}.2f} {e:>{e_len}} **\n"
for result in self._test_results:
hilite = ""
lolite = ""
if result.passed is None:
ratio = "-.--"
pass_fail_str = "SKIP"
if want_color_output():
hilite = _ANSI.COLOR_SKIPPED
lolite = _ANSI.COLOR_DEFAULT
elif result.passed:
ratio = format(result.ratio, "0.2f")
pass_fail_str = "PASS"
if want_color_output():
hilite = _ANSI.COLOR_PASSED
lolite = _ANSI.COLOR_DEFAULT
else:
ratio = format(result.ratio, "0.2f")
pass_fail_str = "FAIL"
if want_color_output():
hilite = _ANSI.COLOR_FAILED
lolite = _ANSI.COLOR_DEFAULT
test_dict = {
"a": result.test_fullname,
"b": pass_fail_str,
"c": result.sim_time_ns,
"d": result.wall_time_s,
"e": ratio,
"a_len": TEST_FIELD_LEN,
"b_len": RESULT_FIELD_LEN,
"c_len": SIM_FIELD_LEN - 1,
"d_len": REAL_FIELD_LEN - 1,
"e_len": RATIO_FIELD_LEN - 1,
"start": hilite,
"end": lolite,
}
summary += test_line.format(**test_dict)
summary += LINE_SEP
summary += test_line.format(
a=TOTAL_NAME,
b="",
c=sim_time_ns,
d=real_time,
e=format(ratio_time, "0.2f"),
a_len=TEST_FIELD_LEN,
b_len=RESULT_FIELD_LEN,
c_len=SIM_FIELD_LEN - 1,
d_len=REAL_FIELD_LEN - 1,
e_len=RATIO_FIELD_LEN - 1,
start="",
end="",
)
summary += LINE_SEP
self.log.info(summary)
def _fail_simulation(self, msg: str) -> None:
self._sim_failure = Error(SimFailure(msg))
self._running_test.abort(self._sim_failure)
cocotb._scheduler_inst._event_loop()