|
1 |
| -""" |
2 |
| -file: cocotb_test.py |
3 |
| -author(s): Oliver Gurka <[email protected]> |
4 |
| -description: Simple cocotb tests for multiport BRAM implemented using XOR |
| 1 | +# SPDX-License-Identifier: BSD-3-Clause |
| 2 | +# Copyright (C) 2025 CESNET z. s. p. o. |
| 3 | +# Author(s): Ondrej Schwarz <[email protected]> |
5 | 4 |
|
6 |
| -Copyright (C) 2024 CESNET z. s. p. o. |
7 |
| -SPDX-License-Identifier: BSD-3-Clause |
8 |
| -""" |
9 | 5 |
|
10 | 6 | import cocotb
|
11 | 7 | from cocotb.clock import Clock
|
12 | 8 | from cocotb.triggers import RisingEdge, ClockCycles
|
| 9 | +from cocotbext.ofm.mp_bram.controller import MP_BRAM_Controller |
| 10 | +from cocotbext.ofm.utils.ram import VWWRAM |
| 11 | +from cocotbext.ofm.ver.generators import random_packets |
| 12 | +from cocotb.result import TestFailure, TestSuccess |
| 13 | +from random import randint |
13 | 14 |
|
14 | 15 |
|
15 |
| -async def reset(dut): |
16 |
| - # Reset the design |
17 |
| - dut.RESET.value = 1 |
18 |
| - await ClockCycles(dut.CLK, 5) |
19 |
| - dut.RESET.value = 0 |
20 |
| - await RisingEdge(dut.CLK) |
| 16 | +class testbench(): |
| 17 | + def __init__(self, dut, debug=False): |
| 18 | + self.dut = dut |
| 19 | + self.stream_in = MP_BRAM_Controller(dut, "", dut.CLK) |
21 | 20 |
|
| 21 | + self.ref_ram = VWWRAM(capacity=2**len(self.stream_in.bus.WR_ADDR[0]), |
| 22 | + word_width=len(self.stream_in.bus.WR_DATA[0]), |
| 23 | + block_width=dut.BLOCK_WIDTH.value if dut.BLOCK_ENABLE.value else None) |
22 | 24 |
|
23 |
| -async def read(dut, port, addr): |
24 |
| - re = RisingEdge(dut.CLK) |
| 25 | + if debug: |
| 26 | + self.stream_in.log.setLevel(cocotb.logging.DEBUG) |
25 | 27 |
|
26 |
| - dut.RD_ADDR[port].value = addr |
27 |
| - dut.RD_EN[port].value = 1 |
28 |
| - await re |
29 |
| - dut.RD_EN[port].value = 0 |
| 28 | + async def reset(self): |
| 29 | + self.dut.RESET.value = 1 |
| 30 | + await ClockCycles(self.dut.CLK, 2) |
| 31 | + self.dut.RESET.value = 0 |
| 32 | + await RisingEdge(self.dut.CLK) |
30 | 33 |
|
31 |
| - while dut.RD_DATA_VLD[port].value != 1: |
32 |
| - await re |
33 | 34 |
|
34 |
| - return dut.RD_DATA[port].value |
| 35 | +async def write_read_test(dut, pkt_count=1000, item_width_min=1, item_width_max=16, parallel_write=False, parallel_read=False): |
| 36 | + # Start clock generator |
| 37 | + cocotb.start_soon(Clock(dut.CLK, 5, units='ns').start()) |
| 38 | + tb = testbench(dut) |
| 39 | + await tb.reset() |
| 40 | + await tb.stream_in.clear_memory() |
35 | 41 |
|
| 42 | + addr_width = len(tb.stream_in.bus.WR_ADDR[0]) |
36 | 43 |
|
37 |
| -def start_clock(dut): |
38 |
| - # Set up the clock |
39 |
| - clock = Clock(dut.CLK, 10, units="ns") |
40 |
| - cocotb.start_soon(clock.start()) |
| 44 | + cocotb.log.info("-------------------------------------Write and immediate read---------------------------------") |
41 | 45 |
|
| 46 | + for transaction in random_packets(item_width_min, item_width_max, pkt_count): |
| 47 | + cocotb.log.info(f"generated transaction: {transaction.hex()}") |
42 | 48 |
|
43 |
| -async def setup(dut): |
44 |
| - start_clock(dut) |
45 |
| - for i in range(dut.READ_PORTS.value): |
46 |
| - dut.RD_EN[i].value = 0 |
47 |
| - for i in range(dut.WRITE_PORTS.value): |
48 |
| - dut.WR_EN[i].value = 0 |
49 |
| - await reset(dut) |
| 49 | + address = randint(0, (2**addr_width-1)-(len(transaction)*8)) |
50 | 50 |
|
| 51 | + await tb.stream_in.write(address, transaction, parallel=parallel_write) |
51 | 52 |
|
52 |
| -@cocotb.test() |
53 |
| -async def test_simple(dut): |
54 |
| - await setup(dut) |
55 |
| - re = RisingEdge(dut.CLK) |
56 |
| - # Write some data |
57 |
| - dut.WR_EN[0].value = 1 |
58 |
| - dut.WR_ADDR[0].value = 0 |
59 |
| - dut.WR_DATA[0].value = 0xAA |
60 |
| - await re |
61 |
| - |
62 |
| - dut.WR_ADDR[0].value = 1 |
63 |
| - dut.WR_DATA[0].value = 0xBB |
64 |
| - await RisingEdge(dut.CLK) |
65 |
| - |
66 |
| - dut.WR_EN.value = 0 |
67 |
| - await RisingEdge(dut.CLK) |
| 53 | + tb.ref_ram.write(address, transaction) # writting transaction to reference RAM |
68 | 54 |
|
69 |
| - assert (await read(dut, 0, 0)) == 0xAA |
70 |
| - assert (await read(dut, 1, 1)) == 0xBB |
| 55 | + output = await tb.stream_in.read(address, len(transaction), parallel=parallel_read) |
| 56 | + cocotb.log.info(f"received transaction: {output.hex()}") |
71 | 57 |
|
| 58 | + if output != transaction: |
| 59 | + raise TestFailure(f"Expected {transaction.hex()}, got {output.hex()}") |
72 | 60 |
|
73 |
| -@cocotb.test() |
74 |
| -async def test_parallel_write(dut): |
75 |
| - await setup(dut) |
76 |
| - re = RisingEdge(dut.CLK) |
77 |
| - |
78 |
| - # Write some data |
79 |
| - dut.WR_EN[0].value = 1 |
80 |
| - dut.WR_ADDR[0].value = 42 |
81 |
| - dut.WR_DATA[0].value = 0xAA |
| 61 | + cocotb.log.info("--------------------Comparing end state of reference RAM and simulated RAM--------------------") |
82 | 62 |
|
83 |
| - dut.WR_EN[1].value = 1 |
84 |
| - dut.WR_ADDR[1].value = 43 |
85 |
| - dut.WR_DATA[1].value = 0xFF |
86 |
| - await re |
87 |
| - dut.WR_EN.value = 0 |
| 63 | + for i in range(2**addr_width): |
| 64 | + ref_word = tb.ref_ram.read_word(i) |
| 65 | + sim_word = await tb.stream_in.read_word(i, 0) |
88 | 66 |
|
89 |
| - if not dut.ONE_CLK_WRITE.value: |
90 |
| - await re |
| 67 | + if ref_word != sim_word: |
| 68 | + raise TestFailure(f"On address {i} reference RAM has {ref_word}, but simulated RAM has {sim_word}.") |
91 | 69 |
|
92 |
| - assert (await read(dut, 0, 42)) == 0xAA |
93 |
| - assert (await read(dut, 1, 43)) == 0xFF |
| 70 | + raise TestSuccess() |
94 | 71 |
|
95 | 72 |
|
96 | 73 | @cocotb.test()
|
97 |
| -async def test_parallel_write_read(dut): |
98 |
| - await setup(dut) |
99 |
| - re = RisingEdge(dut.CLK) |
| 74 | +async def test_simple(dut): |
| 75 | + await write_read_test(dut) |
100 | 76 |
|
101 |
| - # Write some data |
102 |
| - dut.WR_EN[0].value = 1 |
103 |
| - dut.WR_ADDR[0].value = 52 |
104 |
| - dut.WR_DATA[0].value = 0xAA |
105 | 77 |
|
106 |
| - dut.WR_EN[1].value = 1 |
107 |
| - dut.WR_ADDR[1].value = 53 |
108 |
| - dut.WR_DATA[1].value = 0xFF |
109 |
| - await re |
110 |
| - dut.WR_EN.value = 0 |
| 78 | +@cocotb.test() |
| 79 | +async def test_parallel_write(dut): |
| 80 | + await write_read_test(dut, parallel_write=True) |
111 | 81 |
|
112 |
| - if not dut.ONE_CLK_WRITE.value: |
113 |
| - await re |
114 | 82 |
|
115 |
| - dut.RD_ADDR[0] = 52 |
116 |
| - dut.RD_ADDR[1] = 53 |
117 |
| - dut.RD_EN = 3 |
118 |
| - await re |
| 83 | +@cocotb.test() |
| 84 | +async def test_parallel_read(dut): |
| 85 | + await write_read_test(dut, parallel_read=True) |
119 | 86 |
|
120 |
| - while dut.RD_DATA_VLD[0].value != 1: |
121 |
| - await re |
122 | 87 |
|
123 |
| - assert (dut.RD_DATA[0].value == 0xAA) |
124 |
| - assert (dut.RD_DATA[1].value == 0xFF) |
| 88 | +@cocotb.test() |
| 89 | +async def test_parallel_write_read(dut): |
| 90 | + await write_read_test(dut, parallel_write=True, parallel_read=True) |
0 commit comments