Skip to content

[WIP]wishbone.Connector class #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 263 additions & 0 deletions amaranth_soc/test/test_wishbone_bus.py
Original file line number Diff line number Diff line change
@@ -123,6 +123,269 @@ def test_set_map_wrong_addr_width(self):
iface.memory_map = MemoryMap(addr_width=30, data_width=8)


class ConnectorTestCase(unittest.TestCase):
def test_wrong_intr(self):
sub_bus = Interface(addr_width=10, data_width=8)
with self.assertRaisesRegex(TypeError,
r"Initiator bus must be an instance of wishbone.Interface, not 'foo'"):
Connector(intr_bus="foo", sub_bus=sub_bus)

def test_wrong_sub(self):
intr_bus = Interface(addr_width=10, data_width=8)
with self.assertRaisesRegex(TypeError,
r"Subordinate bus must be an instance of wishbone.Interface, not 'foo'"):
Connector(intr_bus=intr_bus, sub_bus="foo")

def test_wrong_bitsize(self):
intr_bus = Interface(addr_width=10, data_width=32)
sub_bus = Interface(addr_width=10, data_width=8)
with self.assertRaisesRegex(ValueError,
r"Total bit size of initiator and subordinate bus have to be the same"):
Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def test_wrong_granularity(self):
intr_bus = Interface(addr_width=12, data_width=8)
sub_bus = Interface(addr_width=10, data_width=32)
with self.assertRaisesRegex(ValueError,
r"Granularity of subordinate bus has to be smaller or equal to "
r"granulariy of initiator bus"):
Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def test_lock_mismatch(self):
intr_bus = Interface(addr_width=10, data_width=8, features={"lock"})
sub_bus = Interface(addr_width=10, data_width=8)
with self.assertRaisesRegex(ValueError,
r"Initiator bus has optional output 'lock', but the subordinate bus "
r"does not have a corresponding input"):
Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def test_err_mismatch(self):
intr_bus = Interface(addr_width=10, data_width=8)
sub_bus = Interface(addr_width=10, data_width=8, features={"err"})
with self.assertRaisesRegex(ValueError,
r"Subordinate bus has optional output 'err', but the initiator bus "
r"does not have a corresponding input"):
Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def test_rty_mismatch(self):
intr_bus = Interface(addr_width=10, data_width=8)
sub_bus = Interface(addr_width=10, data_width=8, features={"rty"})
with self.assertRaisesRegex(ValueError,
r"Subordinate bus has optional output 'rty', but the initiator bus "
r"does not have a corresponding input"):
Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def test_not_implemented_multicycle(self):
intr_bus = Interface(addr_width=10, data_width=32)
sub_bus = Interface(addr_width=12, data_width=8)
with self.assertRaisesRegex(NotImplementedError,
r"Support for multi-cycle bus operation when initiator data_width is"
r"bigger than the subordinate one is not implemented."):
Connector(intr_bus=intr_bus, sub_bus=sub_bus)


class ConnectorSimulationTestCase(unittest.TestCase):
def test_same(self):
intr_bus = Interface(addr_width=10, data_width=32, granularity=8)
sub_bus = Interface(addr_width=10, data_width=32, granularity=8)
dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def sim_test():
yield intr_bus.adr.eq(1)
yield intr_bus.we.eq(0)
yield intr_bus.cyc.eq(1)
yield intr_bus.stb.eq(1)
yield intr_bus.sel.eq(5)
yield Delay(1e-6)
self.assertEqual((yield sub_bus.adr), 1)
self.assertEqual((yield sub_bus.we), 0)
self.assertEqual((yield sub_bus.cyc), 1)
self.assertEqual((yield sub_bus.stb), 1)
self.assertEqual((yield sub_bus.sel), 5)
yield sub_bus.ack.eq(1)
yield Delay(1e-6)
self.assertEqual((yield intr_bus.ack), 1)
yield intr_bus.adr.eq(127)
yield intr_bus.we.eq(1)
yield intr_bus.cyc.eq(1)
yield intr_bus.stb.eq(0)
yield intr_bus.sel.eq(10)
yield Delay(1e-6)
self.assertEqual((yield sub_bus.adr), 127)
self.assertEqual((yield sub_bus.we), 1)
self.assertEqual((yield sub_bus.cyc), 1)
self.assertEqual((yield sub_bus.stb), 0)
self.assertEqual((yield sub_bus.sel), 10)

sim = Simulator(dut)
sim.add_process(sim_test)
with sim.write_vcd(vcd_file=open("test.vcd", "w")):
sim.run()

def test_same_pipelined(self):
intr_bus = Interface(addr_width=10, data_width=8, features={"stall"})
sub_bus = Interface(addr_width=10, data_width=8, features={"stall"})
dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def sim_test():
yield intr_bus.adr.eq(1)
yield intr_bus.we.eq(0)
yield intr_bus.cyc.eq(1)
yield intr_bus.stb.eq(1)
yield intr_bus.sel.eq(1)
yield sub_bus.stall.eq(1)
yield Delay(1e-6)
self.assertEqual((yield sub_bus.adr), 1)
self.assertEqual((yield sub_bus.we), 0)
self.assertEqual((yield sub_bus.cyc), 1)
self.assertEqual((yield sub_bus.stb), 1)
self.assertEqual((yield sub_bus.sel), 1)
self.assertEqual((yield intr_bus.stall), 1)
yield sub_bus.stall.eq(0)
yield Delay(1e-6)
self.assertEqual((yield intr_bus.stall), 0)
yield sub_bus.ack.eq(1)
yield Delay(1e-6)
self.assertEqual((yield intr_bus.ack), 1)

sim = Simulator(dut)
sim.add_process(sim_test)
with sim.write_vcd(vcd_file=open("test.vcd", "w")):
sim.run()

def test_default(self):
intr_bus = Interface(addr_width=10, data_width=8, features={"err", "rty"})
sub_bus = Interface(addr_width=10, data_width=8, features={"lock", "cti", "bte"})
dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def sim_test():
yield Delay(1e-6)
self.assertEqual((yield intr_bus.err), 0)
self.assertEqual((yield intr_bus.rty), 0)
self.assertEqual((yield sub_bus.lock), 0)
self.assertEqual((yield sub_bus.cti), CycleType.CLASSIC.value)
self.assertEqual((yield sub_bus.bte), BurstTypeExt.LINEAR.value)

sim = Simulator(dut)
sim.add_process(sim_test)
with sim.write_vcd(vcd_file=open("test.vcd", "w")):
sim.run()

def test_conv_granularity(self):
intr_bus = Interface(addr_width=10, data_width=32)
sub_bus = Interface(addr_width=10, data_width=32, granularity=8)
dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def sim_test():
yield intr_bus.sel.eq(1)
yield Delay(1e-6)
self.assertEqual((yield sub_bus.sel), 0b1111)
yield intr_bus.sel.eq(0)
yield Delay(1e-6)
self.assertEqual((yield sub_bus.sel), 0b0000)

sim = Simulator(dut)
sim.add_process(sim_test)
with sim.write_vcd(vcd_file=open("test.vcd", "w")):
sim.run()

def test_conv_addr_width(self):
intr_bus = Interface(addr_width=12, data_width=8)
sub_bus = Interface(addr_width=10, data_width=32, granularity=8)
dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def sim_test():
yield intr_bus.adr.eq(1)
yield intr_bus.sel.eq(1)
yield intr_bus.dat_w.eq(0xA5)
yield sub_bus.dat_r.eq(0x03020100)
yield Delay(1e-6)
self.assertEqual((yield sub_bus.sel), 0b0010)
self.assertEqual((yield sub_bus.dat_w), 0xA5A5A5A5)
self.assertEqual((yield intr_bus.dat_r), 0x01)

sim = Simulator(dut)
sim.add_process(sim_test)
with sim.write_vcd(vcd_file=open("test.vcd", "w")):
sim.run()

def test_conv_granularity_addr_width(self):
intr_bus = Interface(addr_width=11, data_width=16)
sub_bus = Interface(addr_width=10, data_width=32, granularity=8)
dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def sim_test():
yield intr_bus.adr.eq(3)
yield intr_bus.sel.eq(1)
yield intr_bus.dat_w.eq(0xA55A)
yield sub_bus.dat_r.eq(0x03020100)
yield Delay(1e-6)
self.assertEqual((yield sub_bus.sel), 0b1100)
self.assertEqual((yield sub_bus.dat_w), 0xA55AA55A)
self.assertEqual((yield intr_bus.dat_r), 0x0302)

sim = Simulator(dut)
sim.add_process(sim_test)
with sim.write_vcd(vcd_file=open("test.vcd", "w")):
sim.run()

def test_pipelined_initiator(self):
intr_bus = Interface(addr_width=10, data_width=8, features={"stall"})
sub_bus = Interface(addr_width=10, data_width=8)
dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def sim_test():
yield intr_bus.adr.eq(1)
yield intr_bus.sel.eq(1)
yield intr_bus.cyc.eq(1)
yield intr_bus.stb.eq(1)
yield Delay(1e-7)
self.assertEqual((yield sub_bus.cyc), 1)
self.assertEqual((yield sub_bus.stb), 1)
self.assertEqual((yield intr_bus.ack), 0)
self.assertEqual((yield intr_bus.stall), 1)
yield Delay(1e-7)
yield sub_bus.ack.eq(1)
yield Delay(1e-7)
self.assertEqual((yield intr_bus.stall), 0)

sim = Simulator(dut)
sim.add_process(sim_test)
with sim.write_vcd(vcd_file=open("test.vcd", "w")):
sim.run()

def test_pipelined_subordinate(self):
intr_bus = Interface(addr_width=10, data_width=8)
sub_bus = Interface(addr_width=10, data_width=8, features={"stall"})
dut = Connector(intr_bus=intr_bus, sub_bus=sub_bus)

def sim_test():
yield intr_bus.adr.eq(1)
yield intr_bus.sel.eq(1)
yield intr_bus.cyc.eq(1)
yield intr_bus.stb.eq(1)
yield Delay(1e-8)
self.assertEqual((yield sub_bus.cyc), 1)
self.assertEqual((yield sub_bus.stb), 1)
self.assertEqual((yield intr_bus.ack), 0)
yield
yield sub_bus.ack.eq(1)
yield Delay(1e-8)
self.assertEqual((yield intr_bus.ack), 1)
yield intr_bus.stb.eq(0)
yield
self.assertEqual((yield intr_bus.ack), 1)
yield sub_bus.ack.eq(0)
yield
self.assertEqual((yield intr_bus.ack), 0)

sim = Simulator(dut)
sim.add_clock(1e-6)
sim.add_sync_process(sim_test)
with sim.write_vcd(vcd_file=open("test.vcd", "w")):
sim.run()


class DecoderTestCase(unittest.TestCase):
def setUp(self):
self.dut = Decoder(addr_width=31, data_width=32, granularity=16)
119 changes: 118 additions & 1 deletion amaranth_soc/wishbone/bus.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from enum import Enum
from amaranth import *
from amaranth.lib import coding
from amaranth.hdl.rec import Direction
from amaranth.utils import log2_int

from ..memory import MemoryMap


__all__ = ["CycleType", "BurstTypeExt", "Interface", "Decoder", "Arbiter"]
__all__ = ["CycleType", "BurstTypeExt", "Interface", "Connector", "Decoder", "Arbiter"]


class CycleType(Enum):
@@ -169,6 +170,122 @@ def memory_map(self, memory_map):
self._map = memory_map


class Connector(Elaboratable):
"""Module to connect one Wishbone initiator bus to one Wishbone subordinate bus
This class also handles data_width conversion if the two buses have compatible
granularity. This means that granularity of subordinate bus has to be smaller or
equal to the granularity of the initiator bus.
Currently initiating multiple subordinate bus cycles for initiator bus cycle is
not implemented. As a consequence an initiator bus data width bigger than the
subordinate bus data width is not supported.
Parameters:
-----------
intr_bus : :class:`Interface`
The initiator bus
sub_bus : :class:`Interface`
The subordinate bus
"""
def __init__(self, *, intr_bus, sub_bus):
if not isinstance(intr_bus, Interface):
raise TypeError("Initiator bus must be an instance of wishbone.Interface, not {!r}"
.format(intr_bus))
if not isinstance(sub_bus, Interface):
raise TypeError("Subordinate bus must be an instance of wishbone.Interface, not {!r}"
.format(sub_bus))
intr_size = (2**intr_bus.addr_width)*intr_bus.data_width
sub_size = (2**sub_bus.addr_width)*sub_bus.data_width
if intr_size != sub_size:
raise ValueError("Total bit size of initiator and subordinate bus have to be the same")
if sub_bus.granularity > intr_bus.granularity:
raise ValueError(
"Granularity of subordinate bus has to be smaller or equal to "
"granulariy of initiator bus"
)
for opt_output in {"lock"}:
if hasattr(intr_bus, opt_output) and not hasattr(sub_bus, opt_output):
raise ValueError("Initiator bus has optional output {!r}, but the subordinate bus "
"does not have a corresponding input"
.format(opt_output))
for opt_output in {"err", "rty"}:
if hasattr(sub_bus, opt_output) and not hasattr(intr_bus, opt_output):
raise ValueError("Subordinate bus has optional output {!r}, but the initiator bus "
"does not have a corresponding input"
.format(opt_output))
if intr_bus.data_width > sub_bus.data_width:
raise NotImplementedError(
"Support for multi-cycle bus operation when initiator data_width is"
"bigger than the subordinate one is not implemented."
)

self.intr_bus = intr_bus
self.sub_bus = sub_bus

def elaborate(self, platform):
m = Module()

common_addr_width = min(self.intr_bus.addr_width, self.sub_bus.addr_width)
m.d.comb += [
self.sub_bus.cyc.eq(self.intr_bus.cyc),
self.sub_bus.we.eq(self.intr_bus.we),
self.sub_bus.adr[(self.sub_bus.addr_width-common_addr_width):self.sub_bus.addr_width].eq(
self.intr_bus.adr[(self.intr_bus.addr_width-common_addr_width):self.intr_bus.addr_width]
),
self.intr_bus.ack.eq(self.sub_bus.ack),
]
if hasattr(self.intr_bus, "err"):
m.d.comb += self.intr_bus.err.eq(getattr(self.sub_bus, "err", 0))
if hasattr(self.intr_bus, "rty"):
m.d.comb += self.intr_bus.rty.eq(getattr(self.sub_bus, "rty", 0))
if hasattr(self.sub_bus, "lock"):
m.d.comb += self.sub_bus.lock.eq(getattr(self.intr_bus, "lock", 0))
if hasattr(self.sub_bus, "cti"):
m.d.comb += self.sub_bus.cti.eq(getattr(self.intr_bus, "cti", CycleType.CLASSIC))
if hasattr(self.sub_bus, "bte"):
m.d.comb += self.sub_bus.bte.eq(getattr(self.intr_bus, "bte", BurstTypeExt.LINEAR))

# stb and stall for different pipeline combinations of initiator and subordinate bus
if hasattr(self.intr_bus, "stall") == hasattr(self.sub_bus, "stall"):
m.d.comb += self.sub_bus.stb.eq(self.intr_bus.stb)
if hasattr(self.intr_bus, "stall"):
m.d.comb += self.intr_bus.stall.eq(self.sub_bus.stall)
elif hasattr(self.intr_bus, "stall"):
# See Wishbone B4 spec: 5.2 Pipelined master connected to standard slave
m.d.comb += [
self.sub_bus.stb.eq(self.intr_bus.stb),
self.intr_bus.stall.eq(self.intr_bus.cyc & ~self.sub_bus.ack),
]
else:
# See Wishbone B4 spec: 5.1 Standard master connected to pipelined slave
wait4ack = Signal()
m.d.sync += wait4ack.eq(self.intr_bus.stb & ~(wait4ack & self.sub_bus.ack))
m.d.comb += self.sub_bus.stb.eq(self.intr_bus.stb & ~wait4ack)

# Data and sel width conversion
m.submodules.sel_decoder = sel_decoder = coding.Decoder(width=1<<(self.intr_bus.addr_width-self.sub_bus.addr_width))
sel_from_granularity = Const(-1, self.intr_bus.granularity//self.sub_bus.granularity)
m.d.comb += [
sel_decoder.i.eq(self.intr_bus.adr[:self.intr_bus.addr_width-self.sub_bus.addr_width]),
self.sub_bus.dat_w.eq(Repl(self.intr_bus.dat_w, self.sub_bus.data_width//self.intr_bus.data_width)),
self.intr_bus.dat_r.eq(self.sub_bus.dat_r.word_select(sel_decoder.i, self.intr_bus.data_width)),
self.sub_bus.sel.eq(
Cat(
Cat(
Cat(
sel_a & sel_i & sel_g
for sel_g in sel_from_granularity
)
for sel_i in self.intr_bus.sel
)
for sel_a in sel_decoder.o
)
),
]

return m


class Decoder(Elaboratable):
"""Wishbone bus decoder.