diff --git a/amaranth/hdl/__init__.py b/amaranth/hdl/__init__.py index 6ae76f862..769694946 100644 --- a/amaranth/hdl/__init__.py +++ b/amaranth/hdl/__init__.py @@ -4,7 +4,7 @@ from ._dsl import SyntaxError, SyntaxWarning, Module from ._cd import DomainError, ClockDomain from ._ir import UnusedElaboratable, Elaboratable, DriverConflict, Fragment, Instance -from ._mem import Memory, ReadPort, WritePort, DummyPort +from ._mem import MemoryIdentity, MemoryInstance, Memory, ReadPort, WritePort, DummyPort from ._rec import Record from ._xfrm import DomainRenamer, ResetInserter, EnableInserter @@ -21,7 +21,7 @@ # _ir "UnusedElaboratable", "Elaboratable", "DriverConflict", "Fragment", "Instance", # _mem - "Memory", "ReadPort", "WritePort", "DummyPort", + "MemoryIdentity", "MemoryInstance", "Memory", "ReadPort", "WritePort", "DummyPort", # _rec "Record", # _xfrm diff --git a/amaranth/hdl/_ir.py b/amaranth/hdl/_ir.py index 8575e89fc..968b3724b 100644 --- a/amaranth/hdl/_ir.py +++ b/amaranth/hdl/_ir.py @@ -1109,7 +1109,7 @@ def emit_read_port(self, module_idx: int, fragment: '_mem.MemoryInstance', en=en, clk=clk, clk_edge=cd.clk_edge, - transparent_for=tuple(write_ports[idx] for idx in port._transparency), + transparent_for=tuple(write_ports[idx] for idx in port._transparent_for), src_loc=port._data.src_loc, ) data = self.netlist.add_value_cell(len(port._data), cell) diff --git a/amaranth/hdl/_mem.py b/amaranth/hdl/_mem.py index b8bca04eb..20bcd9a4d 100644 --- a/amaranth/hdl/_mem.py +++ b/amaranth/hdl/_mem.py @@ -5,6 +5,7 @@ from ._ast import * from ._ir import Elaboratable, Fragment from ..utils import ceil_log2 +from .._utils import deprecated __all__ = ["Memory", "ReadPort", "WritePort", "DummyPort"] @@ -33,18 +34,19 @@ def __init__(self, identity, addr, data): class MemoryInstance(Fragment): class _ReadPort: - def __init__(self, *, domain, addr, data, en, transparency): + def __init__(self, *, domain, addr, data, en, transparent_for): assert isinstance(domain, str) self._domain = domain self._addr = Value.cast(addr) self._data = Value.cast(data) self._en = Value.cast(en) - self._transparency = tuple(transparency) + self._transparent_for = tuple(transparent_for) assert len(self._en) == 1 if domain == "comb": assert isinstance(self._en, Const) assert self._en.width == 1 assert self._en.value == 1 + assert not self._transparent_for class _WritePort: def __init__(self, *, domain, addr, data, en): @@ -70,22 +72,24 @@ def __init__(self, *, identity, width, depth, init=None, attrs=None, src_loc=Non self._identity = identity self._width = operator.index(width) self._depth = operator.index(depth) - self._init = tuple(init) if init is not None else () + mask = (1 << self._width) - 1 + self._init = tuple(item & mask for item in init) if init is not None else () assert len(self._init) <= self._depth self._init += (0,) * (self._depth - len(self._init)) for x in self._init: assert isinstance(x, int) self._attrs = attrs or {} - self._read_ports = [] - self._write_ports = [] + self._read_ports: "list[MemoryInstance._ReadPort]" = [] + self._write_ports: "list[MemoryInstance._WritePort]" = [] - def read_port(self, *, domain, addr, data, en, transparency): - port = self._ReadPort(domain=domain, addr=addr, data=data, en=en, transparency=transparency) + def read_port(self, *, domain, addr, data, en, transparent_for): + port = self._ReadPort(domain=domain, addr=addr, data=data, en=en, transparent_for=transparent_for) assert len(port._data) == self._width assert len(port._addr) == ceil_log2(self._depth) - for x in port._transparency: - assert isinstance(x, int) - assert x in range(len(self._write_ports)) + for idx in port._transparent_for: + assert isinstance(idx, int) + assert idx in range(len(self._write_ports)) + assert self._write_ports[idx]._domain == port._domain for signal in port._data._rhs_signals(): self.add_driver(signal, port._domain) self._read_ports.append(port) @@ -124,6 +128,8 @@ class Memory(Elaboratable): init : list of int attrs : dict """ + # TODO(amaranth-0.6): remove + @deprecated("`amaranth.hdl.Memory` is deprecated, use `amaranth.lib.memory.Memory` instead") def __init__(self, *, width, depth, init=None, name=None, attrs=None, simulate=True): if not isinstance(width, int) or width < 0: raise TypeError("Memory width must be a non-negative integer, not {!r}" @@ -132,8 +138,8 @@ def __init__(self, *, width, depth, init=None, name=None, attrs=None, simulate=T raise TypeError("Memory depth must be a non-negative integer, not {!r}" .format(depth)) - self.name = name or tracer.get_var_name(depth=2, default="$memory") - self.src_loc = tracer.get_src_loc() + self.name = name or tracer.get_var_name(depth=3, default="$memory") + self.src_loc = tracer.get_src_loc(src_loc_at=1) self.width = width self.depth = depth @@ -208,12 +214,12 @@ def elaborate(self, platform): for port in self._read_ports: port._MustUse__used = True if port.domain == "comb": - f.read_port(domain="comb", addr=port.addr, data=port.data, en=Const(1), transparency=()) + f.read_port(domain="comb", addr=port.addr, data=port.data, en=Const(1), transparent_for=()) else: - transparency = [] + transparent_for = [] if port.transparent: - transparency = write_ports.get(port.domain, []) - f.read_port(domain=port.domain, addr=port.addr, data=port.data, en=port.en, transparency=transparency) + transparent_for = write_ports.get(port.domain, []) + f.read_port(domain=port.domain, addr=port.addr, data=port.data, en=port.en, transparent_for=transparent_for) return f @@ -346,13 +352,15 @@ class DummyPort: It does not include any read/write port specific attributes, i.e. none besides ``"domain"``; any such attributes may be set manually. """ + # TODO(amaranth-0.6): remove + @deprecated("`DummyPort` is deprecated, use `amaranth.lib.memory.ReadPort` or `amaranth.lib.memory.WritePort` instead") def __init__(self, *, data_width, addr_width, domain="sync", name=None, granularity=None): self.domain = domain if granularity is None: granularity = data_width if name is None: - name = tracer.get_var_name(depth=2, default="dummy") + name = tracer.get_var_name(depth=3, default="dummy") self.addr = Signal(addr_width, name=f"{name}_addr", src_loc_at=1) diff --git a/amaranth/hdl/_xfrm.py b/amaranth/hdl/_xfrm.py index e3050e40d..054ff4757 100644 --- a/amaranth/hdl/_xfrm.py +++ b/amaranth/hdl/_xfrm.py @@ -263,7 +263,7 @@ def on_fragment(self, fragment): addr=port._addr, data=port._data, en=port._en, - transparency=port._transparency, + transparent_for=port._transparent_for, ) for port in fragment._read_ports ] diff --git a/amaranth/lib/fifo.py b/amaranth/lib/fifo.py index 48023bb01..e376ea181 100644 --- a/amaranth/lib/fifo.py +++ b/amaranth/lib/fifo.py @@ -1,12 +1,11 @@ """First-in first-out queues.""" -import warnings - from .. import * from ..asserts import * from ..utils import ceil_log2 from .coding import GrayEncoder, GrayDecoder from .cdc import FFSynchronizer, AsyncFFSynchronizer +from .memory import Memory __all__ = ["FIFOInterface", "SyncFIFO", "SyncFIFOBuffered", "AsyncFIFO", "AsyncFIFOBuffered"] @@ -130,7 +129,7 @@ def elaborate(self, platform): do_read = self.r_rdy & self.r_en do_write = self.w_rdy & self.w_en - storage = m.submodules.storage = Memory(width=self.width, depth=self.depth) + storage = m.submodules.storage = Memory(shape=self.width, depth=self.depth, init=[]) w_port = storage.write_port() r_port = storage.read_port(domain="comb") produce = Signal(range(self.depth)) @@ -257,9 +256,9 @@ def elaborate(self, platform): do_inner_read = inner_r_rdy & (~self.r_rdy | self.r_en) - storage = m.submodules.storage = Memory(width=self.width, depth=inner_depth) + storage = m.submodules.storage = Memory(shape=self.width, depth=inner_depth, init=[]) w_port = storage.write_port() - r_port = storage.read_port(domain="sync", transparent=False) + r_port = storage.read_port(domain="sync") produce = Signal(range(inner_depth)) consume = Signal(range(inner_depth)) @@ -438,9 +437,9 @@ def elaborate(self, platform): m.d[self._w_domain] += self.w_level.eq(produce_w_bin - consume_w_bin) m.d.comb += self.r_level.eq(produce_r_bin - consume_r_bin) - storage = m.submodules.storage = Memory(width=self.width, depth=self.depth) + storage = m.submodules.storage = Memory(shape=self.width, depth=self.depth, init=[]) w_port = storage.write_port(domain=self._w_domain) - r_port = storage.read_port (domain=self._r_domain, transparent=False) + r_port = storage.read_port (domain=self._r_domain) m.d.comb += [ w_port.addr.eq(produce_w_bin[:-1]), w_port.data.eq(self.w_data), diff --git a/amaranth/lib/memory.py b/amaranth/lib/memory.py new file mode 100644 index 000000000..a1a69f253 --- /dev/null +++ b/amaranth/lib/memory.py @@ -0,0 +1,430 @@ +import operator +from collections import OrderedDict +from collections.abc import MutableSequence + +from ..hdl import MemoryIdentity, MemoryInstance, Shape, ShapeCastable, Const +from ..hdl._mem import MemorySimRead +from ..utils import ceil_log2 +from .data import ArrayLayout +from . import wiring +from .. import tracer + + +__all__ = ["WritePort", "ReadPort", "Memory"] + + +class WritePort: + """A memory write port. + + Parameters + ---------- + signature : :class:`WritePort.Signature` + The signature of the port. + memory : :class:`Memory` or ``None`` + Memory associated with the port. + domain : str + Clock domain. Defaults to ``"sync"``. Writes have a latency of 1 clock cycle. + + Attributes + ---------- + signature : :class:`WritePort.Signature` + memory : :class:`Memory` + domain : str + """ + + class Signature(wiring.Signature): + """A signature of a write port. + + Parameters + ---------- + addr_width : int + Address width in bits. If the port is associated with a :class:`Memory`, + it must be equal to :py:`ceil_log2(memory.depth)`. + shape : :ref:`shape-like ` object + The shape of the port data. If the port is associated with a :class:`Memory`, + it must be equal to its element shape. + granularity : int or ``None`` + Port granularity. If ``None``, the entire storage element is written at once. + Otherwise, determines the size of access covered by a single bit of ``en``. + One of the following must hold: + + - ``granularity is None``, in which case ``en_width == 1``, or + - ``shape == unsigned(data_width)`` and ``data_width == 0 or data_width % granularity == 0`` in which case ``en_width == data_width // granularity`` (or 0 if ``data_width == 0``) + - ``shape == amaranth.lib.data.ArrayLayout(_, elem_count)`` and ``elem_count == 0 or elem_count % granularity == 0`` in which case ``en_width == elem_count // granularity`` (or 0 if ``elem_count == 0``) + + Members + ------- + addr: :py:`unsigned(data_width)` + data: ``shape`` + en: :py:`unsigned(en_width)` + """ + + def __init__(self, *, addr_width, shape, granularity=None): + if not isinstance(addr_width, int) or addr_width < 0: + raise TypeError(f"`addr_width` must be a non-negative int, not {addr_width!r}") + self._addr_width = addr_width + self._shape = shape + self._granularity = granularity + if granularity is None: + en_width = 1 + elif not isinstance(granularity, int) or granularity < 0: + raise TypeError(f"Granularity must be a non-negative int or None, not {granularity!r}") + elif not isinstance(shape, ShapeCastable): + actual_shape = Shape.cast(shape) + if actual_shape.signed: + raise ValueError("Granularity cannot be specified with signed shape") + elif actual_shape.width == 0: + en_width = 0 + elif granularity == 0: + raise ValueError("Granularity must be positive") + elif actual_shape.width % granularity != 0: + raise ValueError("Granularity must divide data width") + else: + en_width = actual_shape.width // granularity + elif isinstance(shape, ArrayLayout): + if shape.length == 0: + en_width = 0 + elif granularity == 0: + raise ValueError("Granularity must be positive") + elif shape.length % granularity != 0: + raise ValueError("Granularity must divide data array length") + else: + en_width = shape.length // granularity + else: + raise TypeError("Granularity can only be specified for plain unsigned `Shape` or `ArrayLayout`") + super().__init__({ + "addr": wiring.In(addr_width), + "data": wiring.In(shape), + "en": wiring.In(en_width), + }) + + @property + def addr_width(self): + return self._addr_width + + @property + def shape(self): + return self._shape + + @property + def granularity(self): + return self._granularity + + def __repr__(self): + granularity = f", granularity={self.granularity}" if self.granularity is not None else "" + return f"WritePort.Signature(addr_width={self.addr_width}, shape={self.shape}{granularity})" + + + def __init__(self, signature, *, memory, domain): + if not isinstance(signature, WritePort.Signature): + raise TypeError(f"Expected `WritePort.Signature`, not {signature!r}") + if memory is not None: + if not isinstance(memory, Memory): + raise TypeError(f"Expected `Memory` or `None`, not {memory!r}") + if signature.shape != memory.shape or Shape.cast(signature.shape) != Shape.cast(memory.shape): + raise ValueError(f"Memory shape {memory.shape!r} doesn't match port shape {signature.shape!r}") + if signature.addr_width != ceil_log2(memory.depth): + raise ValueError(f"Memory address width {ceil_log2(memory.depth)!r} doesn't match port address width {signature.addr_width!r}") + if not isinstance(domain, str): + raise TypeError(f"Domain has to be a string, not {domain!r}") + if domain == "comb": + raise ValueError("Write port domain cannot be \"comb\"") + self._signature = signature + self._memory = memory + self._domain = domain + self.__dict__.update(signature.members.create()) + if memory is not None: + memory._w_ports.append(self) + + @property + def signature(self): + return self._signature + + @property + def memory(self): + return self._memory + + @property + def domain(self): + return self._domain + + +class ReadPort: + """A memory read port. + + Parameters + ---------- + signature : :class:`ReadPort.Signature` + The signature of the port. + memory : :class:`Memory` + Memory associated with the port. + domain : str + Clock domain. Defaults to ``"sync"``. If set to ``"comb"``, the port is asynchronous. + Otherwise, the read data becomes available on the next clock cycle. + transparent_for : iterable of :class:`WritePort` + The set of write ports that this read port should be transparent with. All ports + must belong to the same memory and the same clock domain. + + Attributes + ---------- + signature : :class:`ReadPort.Signature` + memory : :class:`Memory` + domain : str + transparent_for : tuple of :class:`WritePort` + """ + + class Signature(wiring.Signature): + """A signature of a read port. + + Parameters + ---------- + addr_width : int + Address width in bits. If the port is associated with a :class:`Memory`, + it must be equal to :py:`ceil_log2(memory.depth)`. + shape : :ref:`shape-like ` object + The shape of the port data. If the port is associated with a :class:`Memory`, + it must be equal to its element shape. + + Members + ------- + addr: :py:`unsigned(data_width)` + data: ``shape`` + en: :py:`unsigned(1)` + The enable signal. If ``domain == "comb"``, this is tied to ``Const(1)``. + Otherwise it is a signal with ``init=1``. + """ + + def __init__(self, *, addr_width, shape): + if not isinstance(addr_width, int) or addr_width < 0: + raise TypeError(f"`addr_width` must be a non-negative int, not {addr_width!r}") + self._addr_width = addr_width + self._shape = shape + super().__init__({ + "addr": wiring.In(addr_width), + "data": wiring.Out(shape), + "en": wiring.In(1, init=1), + }) + + @property + def addr_width(self): + return self._addr_width + + @property + def shape(self): + return self._shape + + def __repr__(self): + return f"ReadPort.Signature(addr_width={self.addr_width}, shape={self.shape})" + + + def __init__(self, signature, *, memory, domain, transparent_for=()): + if not isinstance(signature, ReadPort.Signature): + raise TypeError(f"Expected `ReadPort.Signature`, not {signature!r}") + if memory is not None: + if not isinstance(memory, Memory): + raise TypeError(f"Expected `Memory` or `None`, not {memory!r}") + if signature.shape != memory.shape or Shape.cast(signature.shape) != Shape.cast(memory.shape): + raise ValueError(f"Memory shape {memory.shape!r} doesn't match port shape {signature.shape!r}") + if signature.addr_width != ceil_log2(memory.depth): + raise ValueError(f"Memory address width {ceil_log2(memory.depth)!r} doesn't match port address width {signature.addr_width!r}") + if not isinstance(domain, str): + raise TypeError(f"Domain has to be a string, not {domain!r}") + transparent_for = tuple(transparent_for) + for port in transparent_for: + if not isinstance(port, WritePort): + raise TypeError("`transparent_for` must contain only `WritePort` instances") + if memory is not None and port not in memory._w_ports: + raise ValueError("Transparent write ports must belong to the same memory") + if port.domain != domain: + raise ValueError("Transparent write ports must belong to the same domain") + self._signature = signature + self._memory = memory + self._domain = domain + self._transparent_for = transparent_for + self.__dict__.update(signature.members.create()) + if domain == "comb": + self.en = Const(1) + if memory is not None: + memory._r_ports.append(self) + + @property + def signature(self): + return self._signature + + @property + def memory(self): + return self._memory + + @property + def domain(self): + return self._domain + + @property + def transparent_for(self): + return self._transparent_for + + +class Memory(wiring.Component): + """A word addressable storage. + + Parameters + ---------- + shape : :ref:`shape-like ` object + The shape of a single element of the storage. + depth : int + Word count. This memory contains ``depth`` storage elements. + init : iterable of int or of any objects accepted by ``shape.const()`` + Initial values. At power on, each storage element in this memory is initialized to + the corresponding element of ``init``, if any, or to the default value of ``shape`` otherwise. + Uninitialized memories are not currently supported. + attrs : dict + Dictionary of synthesis attributes. + + Attributes + ---------- + shape : :ref:`shape-like ` + depth : int + init : :class:`Memory.Init` + attrs : dict + r_ports : tuple of :class:`ReadPort` + w_ports : tuple of :class:`WritePort` + """ + + class Init(MutableSequence): + """Initial data of a :class:`Memory`. + + This is a container implementing the ``MutableSequence`` protocol, enforcing two constraints: + + - the length is immutable and must equal ``depth`` + - if ``shape`` is a :class:`ShapeCastable`, each element can be cast to ``shape`` via :py:`shape.const()` + - otherwise, each element is an :py:`int` + """ + def __init__(self, items, *, shape, depth): + Shape.cast(shape) + if not isinstance(depth, int) or depth < 0: + raise TypeError("Memory depth must be a non-negative integer, not {!r}" + .format(depth)) + self._shape = shape + self._depth = depth + if isinstance(shape, ShapeCastable): + self._items = [None] * depth + default = Const.cast(shape.const(None)).value + self._raw = [default] * depth + else: + self._raw = self._items = [0] * depth + try: + for idx, item in enumerate(items): + self[idx] = item + except (TypeError, ValueError) as e: + raise type(e)("Memory initialization value at address {:x}: {}" + .format(idx, e)) from None + + def __getitem__(self, index): + return self._items[index] + + def __setitem__(self, index, value): + if isinstance(index, slice): + start, stop, step = index.indices(len(self._items)) + indices = range(start, stop, step) + if len(value) != len(indices): + raise ValueError("Changing length of Memory.init is not allowed") + for actual_index, actual_value in zip(indices, value): + self[actual_index] = actual_value + else: + if isinstance(self._shape, ShapeCastable): + self._raw[index] = Const.cast(self._shape.const(value)).value + else: + value = operator.index(value) + self._items[index] = value + + def __delitem__(self, index): + raise TypeError("Deleting items from Memory.init is not allowed") + + def insert(self, index, value): + raise TypeError("Inserting items into Memory.init is not allowed") + + def __len__(self): + return self._depth + + @property + def depth(self): + return self._depth + + @property + def shape(self): + return self._shape + + def __repr__(self): + return f"Memory.Init({self._items!r})" + + def __init__(self, *, depth, shape, init, attrs=None, src_loc_at=0, src_loc=None): + # shape and depth validation performed in Memory.Init constructor. + self._depth = depth + self._shape = shape + self._init = Memory.Init(init, shape=shape, depth=depth) + self._attrs = {} if attrs is None else dict(attrs) + self.src_loc = src_loc or tracer.get_src_loc(src_loc_at=src_loc_at) + self._identity = MemoryIdentity() + self._r_ports: "list[ReadPort]" = [] + self._w_ports: "list[WritePort]" = [] + super().__init__(wiring.Signature({})) + + def read_port(self, *, domain="sync", transparent_for=()): + """Adds a new read port and returns it. + + Equivalent to creating a :class:`ReadPort` with a signature of :py:`ReadPort.Signature(addr_width=ceil_log2(self.depth), shape=self.shape)` + """ + signature = ReadPort.Signature(addr_width=ceil_log2(self.depth), shape=self.shape) + return ReadPort(signature, memory=self, domain=domain, transparent_for=transparent_for) + + def write_port(self, *, domain="sync", granularity=None): + """Adds a new write port and returns it. + + Equivalent to creating a :class:`WritePort` with a signature of :py:`WritePort.Signature(addr_width=ceil_log2(self.depth), shape=self.shape, granularity=granularity)` + """ + signature = WritePort.Signature(addr_width=ceil_log2(self.depth), shape=self.shape, granularity=granularity) + return WritePort(signature, memory=self, domain=domain) + + @property + def depth(self): + return self._depth + + @property + def shape(self): + return self._shape + + @property + def init(self): + return self._init + + @property + def attrs(self): + return self._attrs + + @property + def w_ports(self): + """Returns a tuple of all write ports defined so far.""" + return tuple(self._w_ports) + + @property + def r_ports(self): + """Returns a tuple of all read ports defined so far.""" + return tuple(self._r_ports) + + def elaborate(self, platform): + if hasattr(platform, "get_memory"): + return platform.get_memory(self) + shape = Shape.cast(self.shape) + instance = MemoryInstance(identity=self._identity, width=shape.width, depth=self.depth, init=self.init._raw, attrs=self.attrs, src_loc=self.src_loc) + w_ports = {} + for port in self._w_ports: + idx = instance.write_port(domain=port.domain, addr=port.addr, data=port.data, en=port.en) + w_ports[port] = idx + for port in self._r_ports: + transparent_for = [w_ports[write_port] for write_port in port.transparent_for] + instance.read_port(domain=port.domain, data=port.data, addr=port.addr, en=port.en, transparent_for=transparent_for) + return instance + + def __getitem__(self, index): + """Simulation only.""" + return MemorySimRead(self._identity, index) diff --git a/amaranth/sim/_pyrtl.py b/amaranth/sim/_pyrtl.py index d9d11fca4..19666139e 100644 --- a/amaranth/sim/_pyrtl.py +++ b/amaranth/sim/_pyrtl.py @@ -505,7 +505,7 @@ def __call__(self, fragment): addr = emitter.def_var("read_addr", f"({(1 << len(port._addr)) - 1:#x} & {addr})") data = emitter.def_var("read_data", f"slots[{memory_index}].read({addr})") - for idx in port._transparency: + for idx in port._transparent_for: waddr, wdata, wen = write_vals[idx] emitter.append(f"if {addr} == {waddr}:") with emitter.indent(): diff --git a/amaranth/sim/pysim.py b/amaranth/sim/pysim.py index bedd951f2..affc3ebc7 100644 --- a/amaranth/sim/pysim.py +++ b/amaranth/sim/pysim.py @@ -85,7 +85,7 @@ def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()): trace_names[trace_signal] = {("bench", name)} assigned_names.add(name) self.traces.append(trace_signal) - elif isinstance(trace, (MemoryInstance, Memory)): + elif hasattr(trace, "_identity") and isinstance(trace._identity, MemoryIdentity): if not trace._identity in memories: raise ValueError(f"{trace!r} is a memory not part of the elaborated design") self.traces.append(trace._identity) diff --git a/docs/changes.rst b/docs/changes.rst index d7e122bac..2b1e6da5c 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -32,6 +32,7 @@ Apply the following changes to code written against Amaranth 0.4 to migrate it t * Update uses of ``reset=`` keyword argument to ``init=`` * Convert uses of ``Simulator.add_sync_process`` used as testbenches to ``Simulator.add_testbench`` * Convert other uses of ``Simulator.add_sync_process`` to ``Simulator.add_process`` +* Replace uses of ``amaranth.hdl.Memory`` with ``amaranth.lib.memory.Memory`` Implemented RFCs @@ -41,12 +42,14 @@ Implemented RFCs .. _RFC 27: https://amaranth-lang.org/rfcs/0027-simulator-testbenches.html .. _RFC 39: https://amaranth-lang.org/rfcs/0039-empty-case.html .. _RFC 43: https://amaranth-lang.org/rfcs/0043-rename-reset-to-init.html +.. _RFC 45: https://amaranth-lang.org/rfcs/0045-lib-memory.html .. _RFC 46: https://amaranth-lang.org/rfcs/0046-shape-range-1.html * `RFC 17`_: Remove ``log2_int`` * `RFC 27`_: Testbench processes for the simulator * `RFC 39`_: Change semantics of no-argument ``m.Case()`` * `RFC 43`_: Rename ``reset=`` to ``init=`` +* `RFC 45`_: Move ``hdl.Memory`` to ``lib.Memory`` * `RFC 46`_: Change ``Shape.cast(range(1))`` to ``unsigned(0)`` @@ -65,6 +68,7 @@ Language changes * Changed: the ``reset=`` argument of :class:`Signal`, :meth:`Signal.like`, :class:`amaranth.lib.wiring.Member`, :class:`amaranth.lib.cdc.FFSynchronizer`, and ``m.FSM()`` has been renamed to ``init=``. (`RFC 43`_) * Changed: :class:`Shape` has been made immutable and hashable. * Deprecated: :func:`amaranth.utils.log2_int`. (`RFC 17`_) +* Deprecated: :class:`amaranth.hdl.Memory`. (`RFC 45`_) * Removed: (deprecated in 0.4) :meth:`Const.normalize`. (`RFC 5`_) * Removed: (deprecated in 0.4) :class:`Repl`. (`RFC 10`_) * Removed: (deprecated in 0.4) :class:`ast.Sample`, :class:`ast.Past`, :class:`ast.Stable`, :class:`ast.Rose`, :class:`ast.Fell`. @@ -75,6 +79,7 @@ Standard library changes .. currentmodule:: amaranth.lib +* Added: :mod:`amaranth.lib.memory`. (`RFC 45`_) * Removed: (deprecated in 0.4) :mod:`amaranth.lib.scheduler`. (`RFC 19`_) * Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.FIFOInterface` with ``fwft=False``. (`RFC 20`_) * Removed: (deprecated in 0.4) :class:`amaranth.lib.fifo.SyncFIFO` with ``fwft=False``. (`RFC 20`_) diff --git a/docs/stdlib.rst b/docs/stdlib.rst index 2badf819b..53e0b17d0 100644 --- a/docs/stdlib.rst +++ b/docs/stdlib.rst @@ -4,7 +4,7 @@ Standard library The :mod:`amaranth.lib` module, also known as the standard library, provides modules that falls into one of the three categories: 1. Modules that will used by essentially all idiomatic Amaranth code, and are necessary for interoperability. This includes :mod:`amaranth.lib.enum` (enumerations), :mod:`amaranth.lib.data` (data structures), and :mod:`amaranth.lib.wiring` (interfaces and components). -2. Modules that abstract common functionality whose implementation differs between hardware platforms. This includes :mod:`amaranth.lib.cdc`. +2. Modules that abstract common functionality whose implementation differs between hardware platforms. This includes :mod:`amaranth.lib.cdc`, :mod:`amaranth.lib.memory`. 3. Modules that have essentially one correct implementation and are of broad utility in digital designs. This includes :mod:`amaranth.lib.coding`, :mod:`amaranth.lib.fifo`, and :mod:`amaranth.lib.crc`. As part of the Amaranth backwards compatibility guarantee, any behaviors described in these documents will not change from a version to another without at least one version including a warning about the impending change. Any nontrivial change to these behaviors must also go through the public review as a part of the `Amaranth Request for Comments process `_. diff --git a/examples/basic/mem.py b/examples/basic/mem.py index 14bb66159..2a0f6711c 100644 --- a/examples/basic/mem.py +++ b/examples/basic/mem.py @@ -1,4 +1,5 @@ from amaranth import * +from amaranth.lib.memory import Memory from amaranth.cli import main @@ -8,12 +9,13 @@ def __init__(self): self.dat_r = Signal(8) self.dat_w = Signal(8) self.we = Signal() - self.mem = Memory(width=8, depth=16, init=[0xaa, 0x55]) + self.mem = Memory(shape=8, depth=16, init=[0xaa, 0x55]) def elaborate(self, platform): m = Module() - m.submodules.rdport = rdport = self.mem.read_port() - m.submodules.wrport = wrport = self.mem.write_port() + m.submodules.mem = self.mem + rdport = self.mem.read_port() + wrport = self.mem.write_port() m.d.comb += [ rdport.addr.eq(self.adr), self.dat_r.eq(rdport.data), diff --git a/tests/test_hdl_mem.py b/tests/test_hdl_mem.py index e0ec47658..e4f0f3382 100644 --- a/tests/test_hdl_mem.py +++ b/tests/test_hdl_mem.py @@ -2,113 +2,128 @@ from amaranth.hdl._ast import * from amaranth.hdl._mem import * +from amaranth._utils import _ignore_deprecated from .utils import * class MemoryTestCase(FHDLTestCase): def test_name(self): - m1 = Memory(width=8, depth=4) - self.assertEqual(m1.name, "m1") - m2 = [Memory(width=8, depth=4)][0] - self.assertEqual(m2.name, "$memory") - m3 = Memory(width=8, depth=4, name="foo") - self.assertEqual(m3.name, "foo") + with _ignore_deprecated(): + m1 = Memory(width=8, depth=4) + self.assertEqual(m1.name, "m1") + m2 = [Memory(width=8, depth=4)][0] + self.assertEqual(m2.name, "$memory") + m3 = Memory(width=8, depth=4, name="foo") + self.assertEqual(m3.name, "foo") def test_geometry(self): - m = Memory(width=8, depth=4) - self.assertEqual(m.width, 8) - self.assertEqual(m.depth, 4) + with _ignore_deprecated(): + m = Memory(width=8, depth=4) + self.assertEqual(m.width, 8) + self.assertEqual(m.depth, 4) def test_geometry_wrong(self): - with self.assertRaisesRegex(TypeError, - r"^Memory width must be a non-negative integer, not -1$"): - m = Memory(width=-1, depth=4) - with self.assertRaisesRegex(TypeError, - r"^Memory depth must be a non-negative integer, not -1$"): - m = Memory(width=8, depth=-1) + with _ignore_deprecated(): + with self.assertRaisesRegex(TypeError, + r"^Memory width must be a non-negative integer, not -1$"): + m = Memory(width=-1, depth=4) + with self.assertRaisesRegex(TypeError, + r"^Memory depth must be a non-negative integer, not -1$"): + m = Memory(width=8, depth=-1) def test_init(self): - m = Memory(width=8, depth=4, init=range(4)) - self.assertEqual(m.init, [0, 1, 2, 3]) + with _ignore_deprecated(): + m = Memory(width=8, depth=4, init=range(4)) + self.assertEqual(m.init, [0, 1, 2, 3]) def test_init_wrong_count(self): - with self.assertRaisesRegex(ValueError, - r"^Memory initialization value count exceed memory depth \(8 > 4\)$"): - m = Memory(width=8, depth=4, init=range(8)) + with _ignore_deprecated(): + with self.assertRaisesRegex(ValueError, + r"^Memory initialization value count exceed memory depth \(8 > 4\)$"): + m = Memory(width=8, depth=4, init=range(8)) def test_init_wrong_type(self): - with self.assertRaisesRegex(TypeError, - (r"^Memory initialization value at address 1: " - r"'str' object cannot be interpreted as an integer$")): - m = Memory(width=8, depth=4, init=[1, "0"]) + with _ignore_deprecated(): + with self.assertRaisesRegex(TypeError, + (r"^Memory initialization value at address 1: " + r"'str' object cannot be interpreted as an integer$")): + m = Memory(width=8, depth=4, init=[1, "0"]) def test_attrs(self): - m1 = Memory(width=8, depth=4) - self.assertEqual(m1.attrs, {}) - m2 = Memory(width=8, depth=4, attrs={"ram_block": True}) - self.assertEqual(m2.attrs, {"ram_block": True}) + with _ignore_deprecated(): + m1 = Memory(width=8, depth=4) + self.assertEqual(m1.attrs, {}) + m2 = Memory(width=8, depth=4, attrs={"ram_block": True}) + self.assertEqual(m2.attrs, {"ram_block": True}) def test_read_port_transparent(self): - mem = Memory(width=8, depth=4) - rdport = mem.read_port() - self.assertEqual(rdport.memory, mem) - self.assertEqual(rdport.domain, "sync") - self.assertEqual(rdport.transparent, True) - self.assertEqual(len(rdport.addr), 2) - self.assertEqual(len(rdport.data), 8) - self.assertEqual(len(rdport.en), 1) - self.assertIsInstance(rdport.en, Signal) - self.assertEqual(rdport.en.init, 1) + with _ignore_deprecated(): + mem = Memory(width=8, depth=4) + rdport = mem.read_port() + self.assertEqual(rdport.memory, mem) + self.assertEqual(rdport.domain, "sync") + self.assertEqual(rdport.transparent, True) + self.assertEqual(len(rdport.addr), 2) + self.assertEqual(len(rdport.data), 8) + self.assertEqual(len(rdport.en), 1) + self.assertIsInstance(rdport.en, Signal) + self.assertEqual(rdport.en.init, 1) def test_read_port_non_transparent(self): - mem = Memory(width=8, depth=4) - rdport = mem.read_port(transparent=False) - self.assertEqual(rdport.memory, mem) - self.assertEqual(rdport.domain, "sync") - self.assertEqual(rdport.transparent, False) - self.assertEqual(len(rdport.en), 1) - self.assertIsInstance(rdport.en, Signal) - self.assertEqual(rdport.en.init, 1) + with _ignore_deprecated(): + mem = Memory(width=8, depth=4) + rdport = mem.read_port(transparent=False) + self.assertEqual(rdport.memory, mem) + self.assertEqual(rdport.domain, "sync") + self.assertEqual(rdport.transparent, False) + self.assertEqual(len(rdport.en), 1) + self.assertIsInstance(rdport.en, Signal) + self.assertEqual(rdport.en.init, 1) def test_read_port_asynchronous(self): - mem = Memory(width=8, depth=4) - rdport = mem.read_port(domain="comb") - self.assertEqual(rdport.memory, mem) - self.assertEqual(rdport.domain, "comb") - self.assertEqual(rdport.transparent, True) - self.assertEqual(len(rdport.en), 1) - self.assertIsInstance(rdport.en, Const) - self.assertEqual(rdport.en.value, 1) + with _ignore_deprecated(): + mem = Memory(width=8, depth=4) + rdport = mem.read_port(domain="comb") + self.assertEqual(rdport.memory, mem) + self.assertEqual(rdport.domain, "comb") + self.assertEqual(rdport.transparent, True) + self.assertEqual(len(rdport.en), 1) + self.assertIsInstance(rdport.en, Const) + self.assertEqual(rdport.en.value, 1) def test_read_port_wrong(self): - mem = Memory(width=8, depth=4) - with self.assertRaisesRegex(ValueError, - r"^Read port cannot be simultaneously asynchronous and non-transparent$"): - mem.read_port(domain="comb", transparent=False) + with _ignore_deprecated(): + mem = Memory(width=8, depth=4) + with self.assertRaisesRegex(ValueError, + r"^Read port cannot be simultaneously asynchronous and non-transparent$"): + mem.read_port(domain="comb", transparent=False) def test_write_port(self): - mem = Memory(width=8, depth=4) - wrport = mem.write_port() - self.assertEqual(wrport.memory, mem) - self.assertEqual(wrport.domain, "sync") - self.assertEqual(wrport.granularity, 8) - self.assertEqual(len(wrport.addr), 2) - self.assertEqual(len(wrport.data), 8) - self.assertEqual(len(wrport.en), 1) + with _ignore_deprecated(): + mem = Memory(width=8, depth=4) + wrport = mem.write_port() + self.assertEqual(wrport.memory, mem) + self.assertEqual(wrport.domain, "sync") + self.assertEqual(wrport.granularity, 8) + self.assertEqual(len(wrport.addr), 2) + self.assertEqual(len(wrport.data), 8) + self.assertEqual(len(wrport.en), 1) def test_write_port_granularity(self): - mem = Memory(width=8, depth=4) - wrport = mem.write_port(granularity=2) - self.assertEqual(wrport.memory, mem) - self.assertEqual(wrport.domain, "sync") - self.assertEqual(wrport.granularity, 2) - self.assertEqual(len(wrport.addr), 2) - self.assertEqual(len(wrport.data), 8) - self.assertEqual(len(wrport.en), 4) + with _ignore_deprecated(): + mem = Memory(width=8, depth=4) + wrport = mem.write_port(granularity=2) + self.assertEqual(wrport.memory, mem) + self.assertEqual(wrport.domain, "sync") + self.assertEqual(wrport.granularity, 2) + self.assertEqual(len(wrport.addr), 2) + self.assertEqual(len(wrport.data), 8) + self.assertEqual(len(wrport.en), 4) def test_write_port_granularity_wrong(self): - mem = Memory(width=8, depth=4) + with _ignore_deprecated(): + mem = Memory(width=8, depth=4) with self.assertRaisesRegex(TypeError, r"^Write port granularity must be a non-negative integer, not -1$"): mem.write_port(granularity=-1) @@ -119,20 +134,32 @@ def test_write_port_granularity_wrong(self): r"^Write port granularity must divide memory width evenly$"): mem.write_port(granularity=3) + def test_deprecated(self): + with self.assertWarnsRegex(DeprecationWarning, + r"^`amaranth.hdl.Memory` is deprecated.*$"): + mem = Memory(width=8, depth=4) + class DummyPortTestCase(FHDLTestCase): def test_name(self): - p1 = DummyPort(data_width=8, addr_width=2) - self.assertEqual(p1.addr.name, "p1_addr") - p2 = [DummyPort(data_width=8, addr_width=2)][0] - self.assertEqual(p2.addr.name, "dummy_addr") - p3 = DummyPort(data_width=8, addr_width=2, name="foo") - self.assertEqual(p3.addr.name, "foo_addr") + with _ignore_deprecated(): + p1 = DummyPort(data_width=8, addr_width=2) + self.assertEqual(p1.addr.name, "p1_addr") + p2 = [DummyPort(data_width=8, addr_width=2)][0] + self.assertEqual(p2.addr.name, "dummy_addr") + p3 = DummyPort(data_width=8, addr_width=2, name="foo") + self.assertEqual(p3.addr.name, "foo_addr") def test_sizes(self): - p1 = DummyPort(data_width=8, addr_width=2) - self.assertEqual(p1.addr.width, 2) - self.assertEqual(p1.data.width, 8) - self.assertEqual(p1.en.width, 1) - p2 = DummyPort(data_width=8, addr_width=2, granularity=2) - self.assertEqual(p2.en.width, 4) + with _ignore_deprecated(): + p1 = DummyPort(data_width=8, addr_width=2) + self.assertEqual(p1.addr.width, 2) + self.assertEqual(p1.data.width, 8) + self.assertEqual(p1.en.width, 1) + p2 = DummyPort(data_width=8, addr_width=2, granularity=2) + self.assertEqual(p2.en.width, 4) + + def test_deprecated(self): + with self.assertWarnsRegex(DeprecationWarning, + r"^`DummyPort` is deprecated.*$"): + DummyPort(data_width=8, addr_width=2) diff --git a/tests/test_hdl_xfrm.py b/tests/test_hdl_xfrm.py index d253f0dff..0fc7fb7bc 100644 --- a/tests/test_hdl_xfrm.py +++ b/tests/test_hdl_xfrm.py @@ -128,7 +128,8 @@ def test_rename_cd_subfragment(self): def test_rename_mem_ports(self): m = Module() - mem = Memory(depth=4, width=16) + with _ignore_deprecated(): + mem = Memory(depth=4, width=16) m.submodules.mem = mem mem.read_port(domain="a") mem.read_port(domain="b") @@ -397,7 +398,8 @@ def test_enable_subfragment(self): """) def test_enable_read_port(self): - mem = Memory(width=8, depth=4) + with _ignore_deprecated(): + mem = Memory(width=8, depth=4) mem.read_port(transparent=False) f = EnableInserter(self.c1)(mem).elaborate(platform=None) self.assertRepr(f._read_ports[0]._en, """ @@ -405,7 +407,8 @@ def test_enable_read_port(self): """) def test_enable_write_port(self): - mem = Memory(width=8, depth=4) + with _ignore_deprecated(): + mem = Memory(width=8, depth=4) mem.write_port(granularity=2) f = EnableInserter(self.c1)(mem).elaborate(platform=None) self.assertRepr(f._write_ports[0]._en, """ diff --git a/tests/test_lib_fifo.py b/tests/test_lib_fifo.py index 5ea9e48a3..126d71f5a 100644 --- a/tests/test_lib_fifo.py +++ b/tests/test_lib_fifo.py @@ -6,6 +6,7 @@ from amaranth.asserts import * from amaranth.sim import * from amaranth.lib.fifo import * +from amaranth.lib.memory import * from .utils import * from amaranth._utils import _ignore_deprecated @@ -81,9 +82,9 @@ def __init__(self, *, width, depth, r_domain, w_domain): def elaborate(self, platform): m = Module() - storage = Memory(width=self.width, depth=self.depth) - w_port = m.submodules.w_port = storage.write_port(domain=self.w_domain) - r_port = m.submodules.r_port = storage.read_port (domain="comb") + storage = m.submodules.storage = Memory(shape=self.width, depth=self.depth, init=[]) + w_port = storage.write_port(domain=self.w_domain) + r_port = storage.read_port (domain="comb") produce = Signal(range(self.depth)) consume = Signal(range(self.depth)) diff --git a/tests/test_lib_memory.py b/tests/test_lib_memory.py new file mode 100644 index 000000000..917097f3e --- /dev/null +++ b/tests/test_lib_memory.py @@ -0,0 +1,369 @@ +# amaranth: UnusedElaboratable=no + +from amaranth.hdl._ast import * +from amaranth.hdl._mem import MemoryInstance +from amaranth.lib.memory import * +from amaranth.lib.data import * +from amaranth.lib.wiring import In, Out, SignatureMembers + +from .utils import * + +class MyStruct(Struct): + a: unsigned(3) + b: signed(2) + + +class WritePortTestCase(FHDLTestCase): + def test_signature(self): + sig = WritePort.Signature(addr_width=2, shape=signed(4)) + self.assertEqual(sig.addr_width, 2) + self.assertEqual(sig.shape, signed(4)) + self.assertEqual(sig.granularity, None) + self.assertEqual(sig.members, SignatureMembers({ + "addr": In(2), + "data": In(signed(4)), + "en": In(1), + })) + sig = WritePort.Signature(addr_width=2, shape=8, granularity=2) + self.assertEqual(sig.addr_width, 2) + self.assertEqual(sig.shape, 8) + self.assertEqual(sig.members, SignatureMembers({ + "addr": In(2), + "data": In(8), + "en": In(4), + })) + sig = WritePort.Signature(addr_width=2, shape=ArrayLayout(9, 8), granularity=2) + self.assertEqual(sig.addr_width, 2) + self.assertEqual(sig.shape, ArrayLayout(9, 8)) + self.assertEqual(sig.members, SignatureMembers({ + "addr": In(2), + "data": In(ArrayLayout(9, 8)), + "en": In(4), + })) + sig = WritePort.Signature(addr_width=2, shape=0, granularity=0) + self.assertEqual(sig.addr_width, 2) + self.assertEqual(sig.shape, 0) + self.assertEqual(sig.members, SignatureMembers({ + "addr": In(2), + "data": In(0), + "en": In(0), + })) + sig = WritePort.Signature(addr_width=2, shape=ArrayLayout(9, 0), granularity=0) + self.assertEqual(sig.addr_width, 2) + self.assertEqual(sig.shape, ArrayLayout(9, 0)) + self.assertEqual(sig.members, SignatureMembers({ + "addr": In(2), + "data": In(ArrayLayout(9, 0)), + "en": In(0), + })) + + def test_signature_wrong(self): + with self.assertRaisesRegex(TypeError, + "^`addr_width` must be a non-negative int, not -2$"): + WritePort.Signature(addr_width=-2, shape=8) + with self.assertRaisesRegex(TypeError, + "^Granularity must be a non-negative int or None, not -2$"): + WritePort.Signature(addr_width=4, shape=8, granularity=-2) + with self.assertRaisesRegex(ValueError, + "^Granularity cannot be specified with signed shape$"): + WritePort.Signature(addr_width=2, shape=signed(8), granularity=2) + with self.assertRaisesRegex(TypeError, + "^Granularity can only be specified for plain unsigned `Shape` or `ArrayLayout`$"): + WritePort.Signature(addr_width=2, shape=MyStruct, granularity=2) + with self.assertRaisesRegex(ValueError, + "^Granularity must be positive$"): + WritePort.Signature(addr_width=2, shape=8, granularity=0) + with self.assertRaisesRegex(ValueError, + "^Granularity must be positive$"): + WritePort.Signature(addr_width=2, shape=ArrayLayout(8, 8), granularity=0) + with self.assertRaisesRegex(ValueError, + "^Granularity must divide data width$"): + WritePort.Signature(addr_width=2, shape=8, granularity=3) + with self.assertRaisesRegex(ValueError, + "^Granularity must divide data array length$"): + WritePort.Signature(addr_width=2, shape=ArrayLayout(8, 8), granularity=3) + + def test_constructor(self): + signature = WritePort.Signature(shape=MyStruct, addr_width=4) + port = WritePort(signature, memory=None, domain="sync") + self.assertEqual(port.signature, signature) + self.assertIsNone(port.memory) + self.assertEqual(port.domain, "sync") + self.assertIsInstance(port.addr, Signal) + self.assertEqual(port.addr.shape(), unsigned(4)) + self.assertIsInstance(port.data, View) + self.assertEqual(port.data.shape(), MyStruct) + self.assertIsInstance(port.en, Signal) + self.assertEqual(port.en.shape(), unsigned(1)) + + signature = WritePort.Signature(shape=8, addr_width=4, granularity=2) + port = WritePort(signature, memory=None, domain="sync") + self.assertEqual(port.signature, signature) + self.assertIsNone(port.memory) + self.assertEqual(port.domain, "sync") + self.assertIsInstance(port.addr, Signal) + self.assertEqual(port.addr.shape(), unsigned(4)) + self.assertIsInstance(port.data, Signal) + self.assertEqual(port.data.shape(), unsigned(8)) + self.assertIsInstance(port.en, Signal) + self.assertEqual(port.en.shape(), unsigned(4)) + + m = Memory(depth=16, shape=8, init=[]) + port = WritePort(signature, memory=m, domain="sync") + self.assertIs(port.memory, m) + self.assertEqual(m.w_ports, (port,)) + + def test_constructor_wrong(self): + signature = ReadPort.Signature(shape=8, addr_width=4) + with self.assertRaisesRegex(TypeError, + r"^Expected `WritePort.Signature`, not ReadPort.Signature\(.*\)$"): + WritePort(signature, memory=None, domain="sync") + signature = WritePort.Signature(shape=8, addr_width=4, granularity=2) + with self.assertRaisesRegex(TypeError, + r"^Domain has to be a string, not None$"): + WritePort(signature, memory=None, domain=None) + with self.assertRaisesRegex(TypeError, + r"^Expected `Memory` or `None`, not 'a'$"): + WritePort(signature, memory="a", domain="sync") + with self.assertRaisesRegex(ValueError, + r"^Write port domain cannot be \"comb\"$"): + WritePort(signature, memory=None, domain="comb") + signature = WritePort.Signature(shape=8, addr_width=4) + m = Memory(depth=8, shape=8, init=[]) + with self.assertRaisesRegex(ValueError, + r"^Memory address width 3 doesn't match port address width 4$"): + WritePort(signature, memory=m, domain="sync") + m = Memory(depth=16, shape=signed(8), init=[]) + with self.assertRaisesRegex(ValueError, + r"^Memory shape signed\(8\) doesn't match port shape 8$"): + WritePort(signature, memory=m, domain="sync") + + +class ReadPortTestCase(FHDLTestCase): + def test_signature(self): + sig = ReadPort.Signature(addr_width=2, shape=signed(4)) + self.assertEqual(sig.addr_width, 2) + self.assertEqual(sig.shape, signed(4)) + self.assertEqual(sig.members, SignatureMembers({ + "addr": In(2), + "data": Out(signed(4)), + "en": In(1, init=1), + })) + sig = ReadPort.Signature(addr_width=2, shape=8) + self.assertEqual(sig.addr_width, 2) + self.assertEqual(sig.shape, 8) + self.assertEqual(sig.members, SignatureMembers({ + "addr": In(2), + "data": Out(8), + "en": In(1, init=1), + })) + sig = ReadPort.Signature(addr_width=2, shape=MyStruct) + self.assertEqual(sig.addr_width, 2) + self.assertEqual(sig.shape, MyStruct) + self.assertEqual(sig.members, SignatureMembers({ + "addr": In(2), + "data": Out(MyStruct), + "en": In(1, init=1), + })) + + def test_signature_wrong(self): + with self.assertRaisesRegex(TypeError, + "^`addr_width` must be a non-negative int, not -2$"): + ReadPort.Signature(addr_width=-2, shape=8) + + def test_constructor(self): + signature = ReadPort.Signature(shape=MyStruct, addr_width=4) + port = ReadPort(signature, memory=None, domain="sync") + self.assertEqual(port.signature, signature) + self.assertIsNone(port.memory) + self.assertEqual(port.domain, "sync") + self.assertIsInstance(port.addr, Signal) + self.assertEqual(port.addr.shape(), unsigned(4)) + self.assertIsInstance(port.data, View) + self.assertEqual(port.data.shape(), MyStruct) + self.assertIsInstance(port.en, Signal) + self.assertEqual(port.en.shape(), unsigned(1)) + self.assertEqual(port.transparent_for, ()) + + signature = ReadPort.Signature(shape=8, addr_width=4) + port = ReadPort(signature, memory=None, domain="comb") + self.assertEqual(port.signature, signature) + self.assertIsNone(port.memory) + self.assertEqual(port.domain, "comb") + self.assertIsInstance(port.addr, Signal) + self.assertEqual(port.addr.shape(), unsigned(4)) + self.assertIsInstance(port.data, Signal) + self.assertEqual(port.data.shape(), unsigned(8)) + self.assertIsInstance(port.en, Const) + self.assertEqual(port.en.shape(), unsigned(1)) + self.assertEqual(port.en.value, 1) + self.assertEqual(port.transparent_for, ()) + + m = Memory(depth=16, shape=8, init=[]) + port = ReadPort(signature, memory=m, domain="sync") + self.assertIs(port.memory, m) + self.assertEqual(m.r_ports, (port,)) + write_port = m.write_port() + port = ReadPort(signature, memory=m, domain="sync", transparent_for=[write_port]) + self.assertIs(port.memory, m) + self.assertEqual(port.transparent_for, (write_port,)) + + def test_constructor_wrong(self): + signature = WritePort.Signature(shape=8, addr_width=4) + with self.assertRaisesRegex(TypeError, + r"^Expected `ReadPort.Signature`, not WritePort.Signature\(.*\)$"): + ReadPort(signature, memory=None, domain="sync") + signature = ReadPort.Signature(shape=8, addr_width=4) + with self.assertRaisesRegex(TypeError, + r"^Domain has to be a string, not None$"): + ReadPort(signature, memory=None, domain=None) + with self.assertRaisesRegex(TypeError, + r"^Expected `Memory` or `None`, not 'a'$"): + ReadPort(signature, memory="a", domain="sync") + signature = ReadPort.Signature(shape=8, addr_width=4) + m = Memory(depth=8, shape=8, init=[]) + with self.assertRaisesRegex(ValueError, + r"^Memory address width 3 doesn't match port address width 4$"): + ReadPort(signature, memory=m, domain="sync") + m = Memory(depth=16, shape=signed(8), init=[]) + with self.assertRaisesRegex(ValueError, + r"^Memory shape signed\(8\) doesn't match port shape 8$"): + ReadPort(signature, memory=m, domain="sync") + m = Memory(depth=16, shape=8, init=[]) + port = m.read_port() + with self.assertRaisesRegex(TypeError, + r"^`transparent_for` must contain only `WritePort` instances$"): + ReadPort(signature, memory=m, domain="sync", transparent_for=[port]) + write_port = m.write_port() + m2 = Memory(depth=16, shape=8, init=[]) + with self.assertRaisesRegex(ValueError, + r"^Transparent write ports must belong to the same memory$"): + ReadPort(signature, memory=m2, domain="sync", transparent_for=[write_port]) + with self.assertRaisesRegex(ValueError, + r"^Transparent write ports must belong to the same domain$"): + ReadPort(signature, memory=m, domain="other", transparent_for=[write_port]) + + +class MemoryTestCase(FHDLTestCase): + def test_constructor(self): + m = Memory(shape=8, depth=4, init=[1, 2, 3]) + self.assertEqual(m.shape, 8) + self.assertEqual(m.depth, 4) + self.assertEqual(m.init.shape, 8) + self.assertEqual(m.init.depth, 4) + self.assertEqual(m.attrs, {}) + self.assertIsInstance(m.init, Memory.Init) + self.assertEqual(list(m.init), [1, 2, 3, 0]) + self.assertEqual(m.init._raw, [1, 2, 3, 0]) + self.assertRepr(m.init, "Memory.Init([1, 2, 3, 0])") + self.assertEqual(m.r_ports, ()) + self.assertEqual(m.w_ports, ()) + + def test_constructor_shapecastable(self): + init = [ + {"a": 0, "b": 1}, + {"a": 2, "b": 3}, + ] + m = Memory(shape=MyStruct, depth=4, init=init, attrs={"ram_style": "block"}) + self.assertEqual(m.shape, MyStruct) + self.assertEqual(m.depth, 4) + self.assertEqual(m.attrs, {"ram_style": "block"}) + self.assertIsInstance(m.init, Memory.Init) + self.assertEqual(list(m.init), [{"a": 0, "b": 1}, {"a": 2, "b": 3}, None, None]) + self.assertEqual(m.init._raw, [8, 0x1a, 0, 0]) + + def test_constructor_wrong(self): + with self.assertRaisesRegex(TypeError, + r"^Memory depth must be a non-negative integer, not 'a'$"): + Memory(shape=8, depth="a", init=[]) + with self.assertRaisesRegex(TypeError, + r"^Memory depth must be a non-negative integer, not -1$"): + Memory(shape=8, depth=-1, init=[]) + with self.assertRaisesRegex(TypeError, + r"^Object 'a' cannot be converted to an Amaranth shape$"): + Memory(shape="a", depth=3, init=[]) + with self.assertRaisesRegex(TypeError, + (r"^Memory initialization value at address 1: " + r"'str' object cannot be interpreted as an integer$")): + Memory(shape=8, depth=4, init=[1, "0"]) + + def test_init_set(self): + m = Memory(shape=8, depth=4, init=[]) + m.init[1] = 2 + self.assertEqual(list(m.init), [0, 2, 0, 0]) + self.assertEqual(m.init._raw, [0, 2, 0, 0]) + m.init[2:] = [4, 5] + self.assertEqual(list(m.init), [0, 2, 4, 5]) + + def test_init_set_shapecastable(self): + m = Memory(shape=MyStruct, depth=4, init=[]) + m.init[1] = {"a": 1, "b": 2} + self.assertEqual(list(m.init), [None, {"a": 1, "b": 2}, None, None]) + self.assertEqual(m.init._raw, [0, 0x11, 0, 0]) + + def test_init_set_wrong(self): + m = Memory(shape=8, depth=4, init=[]) + with self.assertRaisesRegex(TypeError, + r"^'str' object cannot be interpreted as an integer$"): + m.init[0] = "a" + m = Memory(shape=MyStruct, depth=4, init=[]) + # underlying TypeError message differs between PyPy and CPython + with self.assertRaises(TypeError): + m.init[0] = 1 + + def test_init_set_slice_wrong(self): + m = Memory(shape=8, depth=4, init=[]) + with self.assertRaisesRegex(ValueError, + r"^Changing length of Memory.init is not allowed$"): + m.init[1:] = [1, 2] + with self.assertRaisesRegex(TypeError, + r"^Deleting items from Memory.init is not allowed$"): + del m.init[1:2] + with self.assertRaisesRegex(TypeError, + r"^Inserting items into Memory.init is not allowed$"): + m.init.insert(1, 3) + + def test_port(self): + for depth, addr_width in [ + (0, 0), + (1, 0), + (3, 2), + (4, 2), + (5, 3), + ]: + m = Memory(shape=8, depth=depth, init=[]) + rp = m.read_port() + self.assertEqual(rp.signature.addr_width, addr_width) + self.assertEqual(rp.signature.shape, 8) + wp = m.write_port() + self.assertEqual(wp.signature.addr_width, addr_width) + self.assertEqual(wp.signature.shape, 8) + self.assertEqual(m.r_ports, (rp,)) + self.assertEqual(m.w_ports, (wp,)) + + def test_elaborate(self): + m = Memory(shape=MyStruct, depth=4, init=[{"a": 1, "b": 2}]) + wp = m.write_port() + rp0 = m.read_port(domain="sync", transparent_for=[wp]) + rp1 = m.read_port(domain="comb") + f = m.elaborate(None) + self.assertIsInstance(f, MemoryInstance) + self.assertIs(f._identity, m._identity) + self.assertEqual(f._depth, 4) + self.assertEqual(f._width, 5) + self.assertEqual(f._init, (0x11, 0, 0, 0)) + self.assertEqual(f._write_ports[0]._domain, "sync") + self.assertEqual(f._write_ports[0]._granularity, 5) + self.assertIs(f._write_ports[0]._addr, wp.addr) + self.assertIs(f._write_ports[0]._data, wp.data.as_value()) + self.assertIs(f._write_ports[0]._en, wp.en) + self.assertEqual(f._read_ports[0]._domain, "sync") + self.assertEqual(f._read_ports[0]._transparent_for, (0,)) + self.assertIs(f._read_ports[0]._addr, rp0.addr) + self.assertIs(f._read_ports[0]._data, rp0.data.as_value()) + self.assertIs(f._read_ports[0]._en, rp0.en) + self.assertEqual(f._read_ports[1]._domain, "comb") + self.assertEqual(f._read_ports[1]._transparent_for, ()) + self.assertIs(f._read_ports[1]._addr, rp1.addr) + self.assertIs(f._read_ports[1]._data, rp1.data.as_value()) + self.assertIs(f._read_ports[1]._en, rp1.en) diff --git a/tests/test_sim.py b/tests/test_sim.py index 8cac7f81d..1afe63b50 100644 --- a/tests/test_sim.py +++ b/tests/test_sim.py @@ -5,13 +5,13 @@ from amaranth._utils import flatten from amaranth.hdl._ast import * from amaranth.hdl._cd import * -from amaranth.hdl._mem import * with warnings.catch_warnings(): warnings.filterwarnings(action="ignore", category=DeprecationWarning) from amaranth.hdl.rec import * from amaranth.hdl._dsl import * from amaranth.hdl._ir import * from amaranth.sim import * +from amaranth.lib.memory import Memory from .utils import * from amaranth._utils import _ignore_deprecated @@ -752,14 +752,12 @@ def process(): sim.add_testbench(process) self.assertTrue(survived) - def setUp_memory(self, rd_synchronous=True, rd_transparent=True, wr_granularity=None): + def setUp_memory(self, rd_synchronous=True, rd_transparent=False, wr_granularity=None): self.m = Module() - self.memory = Memory(width=8, depth=4, init=[0xaa, 0x55]) - self.m.submodules.rdport = self.rdport = \ - self.memory.read_port(domain="sync" if rd_synchronous else "comb", - transparent=rd_transparent) - self.m.submodules.wrport = self.wrport = \ - self.memory.write_port(granularity=wr_granularity) + self.memory = self.m.submodules.memory = Memory(shape=8, depth=4, init=[0xaa, 0x55]) + self.wrport = self.memory.write_port(granularity=wr_granularity) + self.rdport = self.memory.read_port(domain="sync" if rd_synchronous else "comb", + transparent_for=[self.wrport] if rd_transparent else []) def test_memory_init(self): self.setUp_memory() @@ -862,8 +860,8 @@ def process(): def test_memory_read_only(self): self.m = Module() - self.memory = Memory(width=8, depth=4, init=[0xaa, 0x55]) - self.m.submodules.rdport = self.rdport = self.memory.read_port() + self.m.submodules.memory = self.memory = Memory(shape=8, depth=4, init=[0xaa, 0x55]) + self.rdport = self.memory.read_port() with self.assertSimulation(self.m) as sim: def process(): yield Tick() @@ -920,9 +918,9 @@ def process(): def test_memory_transparency_simple(self): m = Module() init = [0x11, 0x22, 0x33, 0x44] - m.submodules.memory = memory = Memory(width=8, depth=4, init=init) - rdport = memory.read_port() + m.submodules.memory = memory = Memory(shape=8, depth=4, init=init) wrport = memory.write_port(granularity=8) + rdport = memory.read_port(transparent_for=[wrport]) with self.assertSimulation(m) as sim: def process(): yield rdport.addr.eq(0) @@ -959,9 +957,9 @@ def process(): def test_memory_transparency_multibit(self): m = Module() init = [0x11111111, 0x22222222, 0x33333333, 0x44444444] - m.submodules.memory = memory = Memory(width=32, depth=4, init=init) - rdport = memory.read_port() + m.submodules.memory = memory = Memory(shape=32, depth=4, init=init) wrport = memory.write_port(granularity=8) + rdport = memory.read_port(transparent_for=[wrport]) with self.assertSimulation(m) as sim: def process(): yield rdport.addr.eq(0)