diff --git a/amaranth/hdl/_mem.py b/amaranth/hdl/_mem.py index 20bcd9a4d..0f9c80601 100644 --- a/amaranth/hdl/_mem.py +++ b/amaranth/hdl/_mem.py @@ -25,11 +25,12 @@ def eq(self, value): class MemorySimWrite: - def __init__(self, identity, addr, data): + def __init__(self, identity, addr, data, mask = None): assert isinstance(identity, MemoryIdentity) self._identity = identity self._addr = Value.cast(addr) self._data = Value.cast(data) + self._mask = mask class MemoryInstance(Fragment): diff --git a/amaranth/sim/_pycoro.py b/amaranth/sim/_pycoro.py index 085eb009c..121576d9a 100644 --- a/amaranth/sim/_pycoro.py +++ b/amaranth/sim/_pycoro.py @@ -3,7 +3,7 @@ from ..hdl import * from ..hdl._ast import Statement, SignalSet, ValueCastable from ..hdl._mem import MemorySimRead, MemorySimWrite -from .core import Tick, Settle, Delay, Passive, Active +from .core import Tick, Settle, Delay, Passive, Active, _CombinableTrigger from ._base import BaseProcess, BaseMemoryState from ._pyrtl import _ValueCompiler, _RHSValueCompiler, _StatementCompiler @@ -104,6 +104,23 @@ def run(self): self.add_trigger(domain.rst, trigger=1) return + elif type(command) is _CombinableTrigger: + delay_interval = None + for trigger, *args in command._triggers: + if trigger == 'edge': + signal, value = args + self.add_trigger(signal, trigger=value) + elif trigger == 'changed': + for signal in args: + self.add_trigger(signal) + elif trigger == 'delay': + interval, = args + if delay_interval is None or delay_interval > interval: + delay_interval = interval + if delay_interval is not None: + self.state.wait_interval(self, delay_interval * 1e12) + return + elif type(command) is Settle: self.state.wait_interval(self, None) return @@ -136,10 +153,11 @@ def run(self): exec(_RHSValueCompiler.compile(self.state, command._data, mode="curr"), self.exec_locals) data = Const(self.exec_locals["result"], command._data.shape()).value + mask = command._mask index = self.state.memories[command._identity] state = self.state.slots[index] assert isinstance(state, BaseMemoryState) - state.write(addr, data) + state.write(addr, data, mask) elif command is None: # only possible if self.default_cmd is None raise TypeError("Received default command from process {!r} that was added " diff --git a/amaranth/sim/core.py b/amaranth/sim/core.py index 67003d1d7..eba087e5d 100644 --- a/amaranth/sim/core.py +++ b/amaranth/sim/core.py @@ -4,6 +4,7 @@ from .._utils import deprecated from ..hdl._cd import * from ..hdl._ir import * +from ..hdl._mem import MemorySimRead, MemorySimWrite from ._base import BaseEngine @@ -58,6 +59,91 @@ def __repr__(self): return "(active)" +class _AwaitableCmd: + def __init__(self, obj): + self.obj = obj + + def __await__(self): + return (yield self.obj) + + +class _DomainTrigger: + def __init__(self, sim, domain, context): + self._sim = sim + self._domain = domain + self._context = context + + def __await__(self): + yield Tick(self.domain) + + async def until(self, condition): + while not await self._sim.get(condition): + await self + + async def repeat(self, times): + for _ in range(times): + await self + + +class _CombinableTrigger: + def __init__(self, triggers=None): + self._triggers = [] if triggers is None else triggers + + def __await__(self): + yield self + + async def __aiter__(self): + while True: + yield await self + + def delay(self, interval): + return _CombinableTrigger(self._triggers + [('delay', interval)]) + + def changed(self, *signals): + return _CombinableTrigger(self._triggers + [('changed', signals)]) + + def edge(self, signal, value): + return _CombinableTrigger(self._triggers + [('edge', signal, value)]) + + def posedge(self, signal): + return self.edge(signal, 1) + + def negedge(self, signal): + return self.edge(signal, 0) + + +class SimulatorContext: + def get(self, expr): + return _AwaitableCmd(expr) + + def set(self, expr, value): + return _AwaitableCmd(expr.eq(value)) + + def memory_read(self, instance, address): + return _AwaitableCmd(MemorySimRead(instance, address)) + + def memory_write(self, instance, address, value, mask=None): + return _AwaitableCmd(MemorySimWrite(instance, address, value, mask)) + + def tick(self, domain="sync", context=None): + return _DomainTrigger(self, domain, context) + + def delay(self, interval=None): + return _CombinableTrigger().delay(interval) + + def changed(self, *signals): + return _CombinableTrigger().changed(*signals) + + def edge(self, signal, value): + return _CombinableTrigger().edge(signal, value) + + def posedge(self, signal): + return _CombinableTrigger().posedge(signal) + + def negedge(self, signal): + return _CombinableTrigger().negedge(signal) + + class Simulator: def __init__(self, fragment, *, engine="pysim"): if isinstance(engine, type) and issubclass(engine, BaseEngine): @@ -80,21 +166,36 @@ def _check_process(self, process): .format(process)) return process - def add_process(self, process): + def add_process(self, process, *, passive=False): process = self._check_process(process) def wrapper(): + if passive: + yield Passive() # Only start a bench process after comb settling, so that the initial values are correct. yield object.__new__(Settle) - yield from process() + if "sim" in inspect.signature(process).parameters: + generator = process(sim=SimulatorContext()) + else: + generator = process() + if inspect.isawaitable(generator): + generator = generator.__await__() + yield from generator self._engine.add_coroutine_process(wrapper, default_cmd=None) @deprecated("The `add_sync_process` method is deprecated per RFC 47. Use `add_process` or `add_testbench` instead.") - def add_sync_process(self, process, *, domain="sync"): + def add_sync_process(self, process, *, domain="sync", passive=False): process = self._check_process(process) def wrapper(): + if passive: + yield Passive() # Only start a sync process after the first clock edge (or reset edge, if the domain # uses an asynchronous reset). This matches the behavior of synchronous FFs. - generator = process() + if "sim" in inspect.signature(process).parameters: + generator = process(sim=SimulatorContext()) + else: + generator = process() + if inspect.isawaitable(generator): + generator = generator.__await__() result = None exception = None yield Tick(domain) @@ -114,10 +215,17 @@ def wrapper(): exception = e self._engine.add_coroutine_process(wrapper, default_cmd=Tick(domain)) - def add_testbench(self, process): + def add_testbench(self, process, *, passive=False): process = self._check_process(process) def wrapper(): - generator = process() + if passive: + yield Passive() + if "sim" in inspect.signature(process).parameters: + generator = process(sim=SimulatorContext()) + else: + generator = process() + if inspect.isawaitable(generator): + generator = generator.__await__() # Only start a bench process after power-on reset finishes. Use object.__new__ to # avoid deprecation warning. yield object.__new__(Settle)