Skip to content

[WIP/PoC] Add async testbench functions #990

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion amaranth/hdl/_mem.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ def eq(self, value):


class MemorySimWrite:
def __init__(self, identity, addr, data):
def __init__(self, identity, addr, data, mask = None):
assert isinstance(identity, MemoryIdentity)
self._identity = identity
self._addr = Value.cast(addr)
self._data = Value.cast(data)
self._mask = mask


class MemoryInstance(Fragment):
Expand Down
22 changes: 20 additions & 2 deletions amaranth/sim/_pycoro.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from ..hdl import *
from ..hdl._ast import Statement, SignalSet, ValueCastable
from ..hdl._mem import MemorySimRead, MemorySimWrite
from .core import Tick, Settle, Delay, Passive, Active
from .core import Tick, Settle, Delay, Passive, Active, _CombinableTrigger
from ._base import BaseProcess, BaseMemoryState
from ._pyrtl import _ValueCompiler, _RHSValueCompiler, _StatementCompiler

Expand Down Expand Up @@ -104,6 +104,23 @@ def run(self):
self.add_trigger(domain.rst, trigger=1)
return

elif type(command) is _CombinableTrigger:
delay_interval = None
for trigger, *args in command._triggers:
if trigger == 'edge':
signal, value = args
self.add_trigger(signal, trigger=value)
elif trigger == 'changed':
for signal in args:
self.add_trigger(signal)
elif trigger == 'delay':
interval, = args
if delay_interval is None or delay_interval > interval:
delay_interval = interval
if delay_interval is not None:
self.state.wait_interval(self, delay_interval * 1e12)
return

elif type(command) is Settle:
self.state.wait_interval(self, None)
return
Expand Down Expand Up @@ -136,10 +153,11 @@ def run(self):
exec(_RHSValueCompiler.compile(self.state, command._data, mode="curr"),
self.exec_locals)
data = Const(self.exec_locals["result"], command._data.shape()).value
mask = command._mask
index = self.state.memories[command._identity]
state = self.state.slots[index]
assert isinstance(state, BaseMemoryState)
state.write(addr, data)
state.write(addr, data, mask)

elif command is None: # only possible if self.default_cmd is None
raise TypeError("Received default command from process {!r} that was added "
Expand Down
120 changes: 114 additions & 6 deletions amaranth/sim/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .._utils import deprecated
from ..hdl._cd import *
from ..hdl._ir import *
from ..hdl._mem import MemorySimRead, MemorySimWrite
from ._base import BaseEngine


Expand Down Expand Up @@ -58,6 +59,91 @@ def __repr__(self):
return "(active)"


class _AwaitableCmd:
def __init__(self, obj):
self.obj = obj

def __await__(self):
return (yield self.obj)


class _DomainTrigger:
def __init__(self, sim, domain, context):
self._sim = sim
self._domain = domain
self._context = context

def __await__(self):
yield Tick(self.domain)

async def until(self, condition):
while not await self._sim.get(condition):
await self

async def repeat(self, times):
for _ in range(times):
await self


class _CombinableTrigger:
def __init__(self, triggers=None):
self._triggers = [] if triggers is None else triggers

def __await__(self):
yield self

async def __aiter__(self):
while True:
yield await self

def delay(self, interval):
return _CombinableTrigger(self._triggers + [('delay', interval)])

def changed(self, *signals):
return _CombinableTrigger(self._triggers + [('changed', signals)])

def edge(self, signal, value):
return _CombinableTrigger(self._triggers + [('edge', signal, value)])

def posedge(self, signal):
return self.edge(signal, 1)

def negedge(self, signal):
return self.edge(signal, 0)


class SimulatorContext:
def get(self, expr):
return _AwaitableCmd(expr)

def set(self, expr, value):
return _AwaitableCmd(expr.eq(value))

def memory_read(self, instance, address):
return _AwaitableCmd(MemorySimRead(instance, address))

def memory_write(self, instance, address, value, mask=None):
return _AwaitableCmd(MemorySimWrite(instance, address, value, mask))

def tick(self, domain="sync", context=None):
return _DomainTrigger(self, domain, context)

def delay(self, interval=None):
return _CombinableTrigger().delay(interval)

def changed(self, *signals):
return _CombinableTrigger().changed(*signals)

def edge(self, signal, value):
return _CombinableTrigger().edge(signal, value)

def posedge(self, signal):
return _CombinableTrigger().posedge(signal)

def negedge(self, signal):
return _CombinableTrigger().negedge(signal)


class Simulator:
def __init__(self, fragment, *, engine="pysim"):
if isinstance(engine, type) and issubclass(engine, BaseEngine):
Expand All @@ -80,21 +166,36 @@ def _check_process(self, process):
.format(process))
return process

def add_process(self, process):
def add_process(self, process, *, passive=False):
process = self._check_process(process)
def wrapper():
if passive:
yield Passive()
# Only start a bench process after comb settling, so that the initial values are correct.
yield object.__new__(Settle)
yield from process()
if "sim" in inspect.signature(process).parameters:
generator = process(sim=SimulatorContext())
else:
generator = process()
if inspect.isawaitable(generator):
generator = generator.__await__()
yield from generator
self._engine.add_coroutine_process(wrapper, default_cmd=None)

@deprecated("The `add_sync_process` method is deprecated per RFC 47. Use `add_process` or `add_testbench` instead.")
def add_sync_process(self, process, *, domain="sync"):
def add_sync_process(self, process, *, domain="sync", passive=False):
process = self._check_process(process)
def wrapper():
if passive:
yield Passive()
# Only start a sync process after the first clock edge (or reset edge, if the domain
# uses an asynchronous reset). This matches the behavior of synchronous FFs.
generator = process()
if "sim" in inspect.signature(process).parameters:
generator = process(sim=SimulatorContext())
else:
generator = process()
if inspect.isawaitable(generator):
generator = generator.__await__()
result = None
exception = None
yield Tick(domain)
Expand All @@ -114,10 +215,17 @@ def wrapper():
exception = e
self._engine.add_coroutine_process(wrapper, default_cmd=Tick(domain))

def add_testbench(self, process):
def add_testbench(self, process, *, passive=False):
process = self._check_process(process)
def wrapper():
generator = process()
if passive:
yield Passive()
if "sim" in inspect.signature(process).parameters:
generator = process(sim=SimulatorContext())
else:
generator = process()
if inspect.isawaitable(generator):
generator = generator.__await__()
# Only start a bench process after power-on reset finishes. Use object.__new__ to
# avoid deprecation warning.
yield object.__new__(Settle)
Expand Down