# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Monitor
import asyncio
import avl
import cocotb
from cocotb.triggers import FallingEdge, First, RisingEdge
from cocotb.utils import get_sim_time
from ._item import SequenceItem
[docs]
class Monitor(avl.Monitor):
[docs]
def __init__(self, name: str, parent: avl.Component) -> None:
"""
Initialize the Monitor for the AXI-STREAM agent.
:param name: Name of the agent instance
:type name: str
:param parent: Parent component
:type parent: Component
"""
super().__init__(name, parent)
self.i_f = avl.Factory.get_variable(f"{self.get_full_name()}.i_f", None)
self.wakeup = 0
[docs]
async def monitor(self) -> None:
"""
Monitor the AXI-STREAM bus signals and create sequence items based on the activity.
This method is called to monitor the bus signals and create sequence items
when there is activity on the bus.
"""
try:
item = SequenceItem(f"from_{self.name}", self)
item.wait_cycles = 0
item.time_since_wakeup = get_sim_time("ns") - self.wakeup
if hasattr(self.i_f, "tdata"):
item.set("tdata", int(self.i_f.get("tdata")))
if hasattr(self.i_f, "tstrb"):
item.set("tstrb" , int(self.i_f.get("tstrb")))
if hasattr(self.i_f, "tkeep"):
item.set("tkeep", int(self.i_f.get("tkeep")))
if hasattr(self.i_f, "tlast"):
item.set("tlast", int(self.i_f.get("tlast")))
if hasattr(self.i_f, "tid"):
item.set("tid", int(self.i_f.get("tid")))
if hasattr(self.i_f, "tdest"):
item.set("tdest", int(self.i_f.get("tdest")))
if hasattr(self.i_f, "tuser"):
item.set("tuser", int(self.i_f.get("tuser")))
while True:
if bool(self.i_f.get("tready", 1)):
break
await RisingEdge(self.i_f.aclk)
item.wait_cycles += 1
# Send to export
self.item_export.write(item)
except asyncio.CancelledError:
raise
except Exception:
self.debug(f"Drive task for item {item} was cancelled by reset")
item.set_event("done")
[docs]
async def run_phase(self):
"""
Run phase for the Requester Driver.
This method is called during the run phase of the simulation.
It is responsible for driving the request signals based on the sequencer's items.
:raises NotImplementedError: If the run phase is not implemented.
"""
async def wait_on_wakeup() -> None:
while True:
await RisingEdge(self.i_f.twakeup)
self.wakeup = get_sim_time("ns")
async def wait_on_reset() -> None:
try:
await FallingEdge(self.i_f.aresetn)
await self.reset()
except asyncio.CancelledError:
raise
except Exception:
pass
# Start Wakeup Monitor
if hasattr(self.i_f, "twakeup"):
cocotb.start_soon(wait_on_wakeup())
while True:
await RisingEdge(self.i_f.aclk)
if self.i_f.get("aresetn", default=1) == 0 or self.i_f.get("tvalid", default=1) == 0 or self.i_f.get("twakeup", 1) == 0:
continue
monitor_task = cocotb.start_soon(self.monitor())
reset_task = cocotb.start_soon(wait_on_reset())
await First(monitor_task, reset_task)
for t in [monitor_task, reset_task]:
if not t.done():
t.cancel()
__all__ = ["Monitor"]