Source code for avl_axi_stream._tsequence

# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Transmitter Sequence

import math
import random

import avl

from ._item import SequenceItem


[docs] class TransSequence(avl.Sequence):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the sequence Sequence of independently randomized transactions :param name: Name of the sequence item :param parent: Parent component of the sequence item """ super().__init__(name, parent) self.i_f = avl.Factory.get_variable(f"{self.get_full_name()}.i_f", None) """Handle to interface - defines capabilities and parameters""" self.n_items = avl.Factory.get_variable(f"{self.get_full_name()}.n_items", 1) """Number of items in the sequence (default 1)"""
[docs] async def write(self, randomize: bool = True, **kwargs) -> SequenceItem: """ Write a single transaction item Creates, constrains, and sends a single AXI Stream transaction. Can either randomize the item or set specific values via kwargs. :param randomize: If True, randomizes the item. If False, uses kwargs to set values. :type randomize: bool :param kwargs: Field values to set when randomize=False (e.g., tdata=0x1234, tkeep=0xF) :return: The sequence item that was sent :rtype: SequenceItem """ item = SequenceItem(f"from_{self.name}", self) await self.start_item(item) if randomize: # Randomize all fields item.randomize() else: # Set fields from kwargs for field_name, value in kwargs.items(): if hasattr(item, field_name): item.set(field_name, value) await self.finish_item(item) return item
[docs] async def write_stream(self, stream: list, randomize: bool = False) -> list[SequenceItem]: """ Write a stream of data transactions Accepts a list of data entries and writes them sequentially. The last entry automatically sets tlast=1. :param stream: List of data entries. Each entry can be: - int: Value for tdata field - dict: Field names and values (e.g., {'tdata': 0x1234, 'tkeep': 0xF}) :type stream: list :param randomize: If True, randomizes the item. If False, uses kwargs to set values. :type randomize: bool :return: List of sequence items that were sent :rtype: list[SequenceItem] """ items = [] for i, entry in enumerate(stream): tlast = 1 if (i == len(stream) - 1) else 0 if isinstance(entry, dict): # Entry is a dictionary of field values item = await self.write(tlast=tlast, randomize=randomize, **entry) else: # Entry is a single value for tdata - simplified version! item = await self.write(tlast=tlast, randomize=randomize, tdata=entry) items.append(item) return items
[docs] async def body(self) -> None: """ Body of the sequence Generates n_items transactions by repeatedly calling write() """ self.info(f"Starting transaction sequence {self.get_full_name()} with {self.n_items} items") for i in range(self.n_items): tlast = 1 if (i == self.n_items - 1) else 0 await self.write(tlast=tlast)
[docs] class PacketSequence(TransSequence):
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the sequence Sequence of packets :param name: Name of the sequence item :param parent: Parent component of the sequence item """ super().__init__(name, parent) self.packet_length = avl.Factory.get_variable(f"{self.get_full_name()}.packet_length", lambda : 1) """Function to return packet length (in bytes)""" self.keep_rate = avl.Factory.get_variable(f"{self.get_full_name()}.keep_rate", lambda : 1.0) """Function to determine rate of keep trasactions""" self.sleep_rate = avl.Factory.get_variable(f"{self.get_full_name()}.sleep_rate", lambda : 0.0) """Function to determine rate of sleep transactions""" self.tid =avl.Factory.get_variable(f"{self.get_full_name()}.tid", lambda : 0) """Function o determine Stream idetifier"""
[docs] async def body(self) -> None: """ Body of the sequence """ self.info(f"Starting packet sequence {self.get_full_name()} with {self.n_items} items") all_bytes = (2**self.i_f.TSTRB_WIDTH)-1 for _ in range(self.n_items): packet_length = self.packet_length() transactions_in_packet = math.ceil(packet_length / self.i_f.TDATA_WIDTH) last_bytes = (1 << ((packet_length % self.i_f.TDATA_WIDTH) // 8)) - 1 if not hasattr(self.i_f, "tstrb"): if last_bytes != 0: raise ValueError("Packet must be multiple of TDATA_WITH if no tstrb") i = 0 while i < transactions_in_packet: item = SequenceItem(f"from_{self.name}", self) await self.start_item(item) # Add constraints if hasattr(item, "tid"): item.add_constraint("_c_tid", lambda x: x == self.tid(), item.tid) if random.random() > self.keep_rate(): if hasattr(item, "tkeep"): item.add_constraint("_c_tkeep", lambda x: x == 0, item.tkeep) else: if hasattr(item, "tkeep"): if i == (transactions_in_packet - 1) and last_bytes != 0: item.add_constraint("_c_tkeep", lambda x, y=last_bytes: x == y, item.tkeep) else: item.add_constraint("_c_tkeep", lambda x, y=all_bytes: x == y, item.tkeep) if hasattr(item, "tlast"): if i == (transactions_in_packet - 1): item.add_constraint("_c_tlast", lambda x: x == 1, item.tlast) else: item.add_constraint("_c_tlast", lambda x: x == 0, item.tlast) if hasattr(item, "tstrb"): if i == (transactions_in_packet - 1) and last_bytes != 0: item.add_constraint("_c_tstrb", lambda x, y=last_bytes: x == y, item.tstrb) else: item.add_constraint("_c_tstrb", lambda x, y=all_bytes: x == y, item.tstrb) if hasattr(item, "goto_sleep"): if i == (transactions_in_packet - 1): item.add_constraint("_c_goto_sleep", lambda x: x == 1, item.goto_sleep) else: item.add_constraint("_c_goto_sleep", lambda x: x == 0, item.goto_sleep) item.randomize() await self.finish_item(item) if bool(item.get("tkeep", 1)): i += 1
__all__ = ["TransSequence", "PacketSequence"]