Skip to content

Commit

Permalink
[TB] Use single TB for verification.
Browse files Browse the repository at this point in the history
  • Loading branch information
egorman44 committed Jun 23, 2024
1 parent 46ba029 commit f67a2ec
Show file tree
Hide file tree
Showing 4 changed files with 442 additions and 0 deletions.
151 changes: 151 additions & 0 deletions coco_sim/new/rs_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# All tests are lacoted here:
import os
import argparse
from config import RsConfig
import cocotb
from cocotb.runner import get_runner
from config import PRJ_DIR, N_LEN, K_LEN, T_LEN, REDUNDANCY, FCR
import random
from rs_packets_builder import RsPacketsBuilder
from rs_interface import RsIfBuilder
from rs_env import RsEnv

class IfContainer():

def __init__(self):
self.if_name = ''
self.if_ptr = None
self.if_packets = []

def parse_command_line():

args = argparse.ArgumentParser(add_help=True)

args.add_argument("-l", "--hdl_toplevel",
dest="hdl_toplevel",
required=True,
help='Set hdl top level. For Example RsSynd, RsBm, RsChien, RsForney, RsDecTop.'
)

args.add_argument("-t", "--testcase",
dest="testcase",
required=False,
default=None,
help='Set testcases you want to run. Check available test in rs_testcases.py.'
)

args.add_argument("-s", "--seed",
dest="seed",
required=False,
default=None,
help='Set run seed.'
)

return args.parse_args()

def build_and_run():
sim = os.getenv("SIM", "verilator")
runner = get_runner(sim)
sv_top = args.hdl_toplevel+".sv"
runner.build(
defines={},
parameters={},
verilog_sources=[PRJ_DIR / sv_top],
includes={},
hdl_toplevel=args.hdl_toplevel,
build_args=[ '--timing', '--assert' , '--trace' , '--trace-structs', '--trace-max-array', '512', '--trace-max-width', '512'],
always=True,
)

runner.test(hdl_toplevel=args.hdl_toplevel,
test_module='rs_decoder',
testcase=args.testcase,
)

if __name__ == "__main__":
args = parse_command_line()
if args.seed:
os.environ["RANDOM_SEED"] = args.seed

#positions = [0,1,2,3]
#pkt_builder.corrupt_msg(positions)
#for s_if in env_cfg['s_if']:
# print(f"s_if = {s_if}")
# env_cfg['s_if'][s_if] = pkt_builder.get_pkt(s_if)
#for m_if in env_cfg['m_if']:
# print(f"m_if = {m_if}")
# env_cfg['m_if'][m_if] = pkt_builder.get_pkt(m_if)

#for s_if in env_cfg['s_if']:
# gen_drv_pkt = pkt_builder.get_pkt(s_if)
#drv_pkt = gen_pkt()
#drv_pkt.print_pkt()
build_and_run()

'''
TESTS:
'''
def get_if(top_level):
if top_level == 'RsSynd':
s_if = ['sAxisIf']
m_if = ['syndIf']
elif top_level == 'RsBm':
s_if = ['syndIf']
m_if = ['errLocIf']
elif top_level == 'RsChien':
s_if = ['errLocIf']
m_if = ['errPosIf']
elif top_level == 'RsForney':
s_if = ['errPosIf', 'syndIf']
m_if = ['errValIf']
elif top_level == 'RsDecoder':
s_if = ['sAxisIf']
m_if = ['errValIf', 'errPosOutIf']
else:
raise ValueError(f"Not expected value for top_level = {top_level}")
return s_if, m_if

@cocotb.test()
async def random_error(dut):
pkt_num = 10
s_if_containers = []
m_if_containers = []

if_builder = RsIfBuilder(dut)
pkt_builder = RsPacketsBuilder(K_LEN, REDUNDANCY, FCR, 'increment')
# Get interfaces
s_if_list, m_if_list = get_if(dut._name)
for if_name in s_if_list:
if_container = IfContainer()
if_container.if_name = if_name
if_container.if_ptr = if_builder.get_if(if_name)
s_if_containers.append(if_container)
for if_name in m_if_list:
if_container = IfContainer()
if_container.if_name = if_name
if_container.if_ptr = if_builder.get_if(if_name)
m_if_containers.append(if_container)
# Generate packets
for i in range(pkt_num):
err_num = 3
err_pos = random.sample(range(0, N_LEN-1), err_num)
pkt_builder.generate_msg()
pkt_builder.encode_msg()
pkt_builder.corrupt_msg(err_pos)
for i in range(len(s_if_containers)):
s_if_containers[i].if_packets.append(pkt_builder.get_pkt(s_if_containers[i].if_name))
for i in range (len(m_if_containers)):
#m_if_containers[i].if_packets.append(pkt_builder.get_pkt(m_if_containers[i].if_name))
mon_pkt = pkt_builder.get_pkt(m_if_containers[i].if_name)
print("PRINT_MON_PKT")
mon_pkt.print_pkt()
m_if_containers[i].if_packets.append(mon_pkt)


# Build environment
env = RsEnv(dut)
env.build_env(s_if_containers, m_if_containers)
await env.run()
env.post_run()

74 changes: 74 additions & 0 deletions coco_sim/new/rs_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import cocotb
from cocotb.triggers import Timer
from cocotb.triggers import Join
from cocotb.triggers import RisingEdge
from cocotb.triggers import FallingEdge
from cocotb.triggers import with_timeout

from tb_utils import reset_dut
from tb_utils import custom_clock
from tb_utils import watchdog_set

from scoreboard import Comparator
from axis import AxisDriver
from axis import AxisResponder
from axis import AxisMonitor

from config import ENCODE_MSG_DURATION

class RsEnv():

def __init__(self, dut):
self.s_drivers = []
self.m_monitors = []
self.comparators = []
self.dut = dut

def build_env(self, s_if_containers, m_if_containers):
self.s_if_containers = s_if_containers
self.m_if_containers = m_if_containers
for i in range (len(self.s_if_containers)):
self.s_drivers.append(AxisDriver(name=f's_drv{i}',
axis_if=self.s_if_containers[i].if_ptr))
for i in range (len(self.m_if_containers)):
self.comparators.append(Comparator(name=f'comp_{self.m_if_containers[i].if_name}'))

self.m_monitors.append(AxisMonitor(name=f'm_mon_{self.m_if_containers[i].if_name}',
axis_if=self.m_if_containers[i].if_ptr,
aport=self.comparators[i].port_out))

self.comparators[i].port_prd = self.m_if_containers[i].if_packets.copy()


async def run(self):
await cocotb.start(reset_dut(self.dut.reset,200,1))
await Timer(50, units = "ns")

await cocotb.start(custom_clock(self.dut.clock, 10))
for m_mon in self.m_monitors:
await cocotb.start(m_mon.mon_if())

await FallingEdge(self.dut.reset)
for i in range(10):
await RisingEdge(self.dut.clock)

# Send packets
s_coroutings = []
for pkt_num in range(len(self.s_if_containers[0].if_packets)):
for i in range(len(self.s_drivers)):
s_coroutings.append(cocotb.start_soon(with_timeout(self.s_drivers[i].send_pkt(self.s_if_containers[i].if_packets[pkt_num]), 10_000, 'us')))
for corouting in s_coroutings:
await Join(corouting)
for i in range(ENCODE_MSG_DURATION):
await RisingEdge(self.dut.clock)

for i in range(ENCODE_MSG_DURATION+100):
await RisingEdge(self.dut.clock)

#await watchdog_set(self.dut.clock, self.comparators)

def post_run(self):
print("post_run()")
for comp in self.comparators:
print(type(comp))
comp.compare()
118 changes: 118 additions & 0 deletions coco_sim/new/rs_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from axis import AxisIf
from config import BUS_WIDTH, T_LEN, REDUNDANCY

class RsIfBuilder():

def __init__(self, dut):
self._builder = {}
self._dut = dut
self.register_if('sAxisIf', self.gen_sAxisIf)
self.register_if('syndIf', self.gen_syndIf)
self.register_if('errLocIf', self.gen_errLocIf)
self.register_if('errPosIf', self.gen_errPosIf)
self.register_if('errValIf', self.gen_errValIf)
self.register_if('errPosOutIf', self.gen_errPosOutIf)

def register_if(self, key, value):
self._builder[key] = value

def get_if(self, if_name):
# Gen function that builds an interface
gen_if_func = self._builder.get(if_name)
if not gen_if_func:
raise ValueError(f"Not expected value for if_name = {if_name}.")
return gen_if_func()

'''
Interface generation methods
'''

def gen_sAxisIf(self):
tdata = []
for i in range(BUS_WIDTH):
tdata.append(eval(f"self._dut.io_sAxisIf_bits_tdata_{i}"))
if_inst = AxisIf(name='sAxisIf',
aclk=self._dut.clock,
tdata=tdata,
tvalid=self._dut.io_sAxisIf_valid,
tkeep=self._dut.io_sAxisIf_bits_tkeep,
tlast=self._dut.io_sAxisIf_bits_tlast,
unpack='chisel_vec',
width=BUS_WIDTH)

return if_inst

def gen_syndIf(self):
tdata = []
for i in range(REDUNDANCY):
tdata.append(eval(f"self._dut.io_syndIf_bits_{i}"))

if_inst = AxisIf(name='syndIf',
aclk=self._dut.clock,
tvalid=self._dut.io_syndIf_valid,
tlast=self._dut.io_syndIf_valid,
tdata=tdata,
unpack='chisel_vec',
width=REDUNDANCY)
return if_inst

def gen_errLocIf(self):
tdata = []
for i in range(T_LEN+1):
tdata.append(eval(f"self._dut.io_errLocIf_bits_vec_{i}"))
if_inst = AxisIf(name='errLocIf',
aclk=self._dut.clock,
tdata=tdata,
tvalid=self._dut.io_errLocIf_valid,
tlast=self._dut.io_errLocIf_valid,
tkeep=self._dut.io_errLocIf_bits_ffs,
tkeep_type='ffs',
unpack='chisel_vec',
width=T_LEN+1)
return if_inst

def gen_errPosIf(self):
t_data = []
for i in range(T_LEN):
t_data.append(eval(f"self._dut.io_errPosIf_bits_vec_{i}"))
if_inst = AxisIf(name='errPosIf',
aclk=self._dut.clock,
tdata=t_data,
tvalid=self._dut.io_errPosIf_valid,
tlast=self._dut.io_errPosIf_valid,
tkeep=self._dut.io_errPosIf_bits_ffs,
unpack='chisel_vec',
width=T_LEN,
tkeep_type='ffs')
return if_inst

def gen_errValIf(self):
tdata = []
for i in range(T_LEN):
tdata.append(eval(f"self._dut.io_errValIf_bits_vec_{i}"))
if_inst = AxisIf(name='errValIf',
aclk=self._dut.clock,
tdata=tdata,
tvalid=self._dut.io_errValIf_valid,
tlast=self._dut.io_errValIf_valid,
tkeep=self._dut.io_errValIf_bits_ffs,
unpack='chisel_vec',
width=T_LEN,
tkeep_type='ffs')
return if_inst

def gen_errPosOutIf(self):
t_data = []
for i in range(T_LEN):
t_data.append(eval(f"self._dut.io_errPosIf_bits_vec_{i}"))
if_inst = AxisIf(name='errPosIf',
aclk=self._dut.clock,
tdata=t_data,
tvalid=self._dut.io_errValIf_valid,
tlast=self._dut.io_errValIf_valid,
tkeep=self._dut.io_errPosIf_bits_ffs,
unpack='chisel_vec',
width=T_LEN,
tkeep_type='ffs')
return if_inst

Loading

0 comments on commit f67a2ec

Please sign in to comment.