Source code for avl_axi_stream._rdriver

# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Driver

import asyncio
import random

import avl
from cocotb.triggers import RisingEdge

from ._driver import Driver
from ._item import SequenceItem


[docs] class RecDriver(Driver):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the Receiver Driver for the AXI-STREAM agent. :param name: Name of the agent instance :type name: str :param parent: Parent component :type parent: Component """ super().__init__(name, parent) if not hasattr(self.i_f, "tready"): raise ValueError("Receiver Driver instanced on AXI-STREAM with no tready signal")
[docs] async def reset(self) -> None: """ Reset the driver by setting all signals to their default values. This method is called when the driver is reset. By default 0's all signals - can be overridden in subclasses to add randomization or other behavior. """ self.i_f.set("tready", 0)
[docs] async def drive(self, item : SequenceItem) -> None: """ Drive the signals based on the provided sequence item. This method is called to drive the signals of the AXI-STREAM interface. :param item: The sequence item containing the values to drive :type item: SequenceItem """ try: self.i_f.set("tready", 0) # Rate Limiter rate = self.rate_limit() while random.random() > rate: await RisingEdge(self.i_f.aclk) if bool(self.i_f.get("twakeup", 1)): self.i_f.set("tready", 1) # Wait for the next clock edge to drive the item while True: await RisingEdge(self.i_f.aclk) if bool(self.i_f.get("tvalid", 1)) or not bool(self.i_f.get("twakeup", 1)): break # Clear the bus await self.quiesce() except asyncio.CancelledError: raise except Exception: self.debug(f"Receiver drive task for item was cancelled by reset:\n{item}")
[docs] async def get_next_item(self, item : SequenceItem = None) -> SequenceItem: """ Get the next sequence item. The implementation ensures items are driven on the rising edge of aclk, when not in reset, while allowing for back-to-back requests if the sequencer provides them. :param item: The sequence item to retrieve, defaults to None :type item: SequenceItem, optional :return: The next sequence item :rtype: SequenceItem :raises NotImplementedError: If the method is not implemented in subclasses """ return None
__all__ = ["RecDriver"]