From bd22bcba441525c3c350f3c55426019a8b1954aa Mon Sep 17 00:00:00 2001 From: Jean THOMAS Date: Fri, 21 Mar 2025 11:13:38 +0100 Subject: [PATCH 1/3] CI: Add timeout --- .github/workflows/python-tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index d8ab16a..e313599 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -13,6 +13,7 @@ jobs: build: runs-on: ubuntu-latest + timeout-minutes: 10 steps: - uses: actions/checkout@v4 From 03e40e5f3cec0742a2eb218ce26c4fdab7d2e6fa Mon Sep 17 00:00:00 2001 From: Jean THOMAS Date: Thu, 20 Mar 2025 19:04:06 +0100 Subject: [PATCH 2/3] cores.mech: Add Endless Potentiometer Decoder --- lambdalib/cores/mech/endless_potentiometer.py | 219 ++++++++++++++++++ .../tests/test_cores_endless_potentiometer.py | 179 ++++++++++++++ 2 files changed, 398 insertions(+) create mode 100644 lambdalib/cores/mech/endless_potentiometer.py create mode 100644 lambdalib/tests/test_cores_endless_potentiometer.py diff --git a/lambdalib/cores/mech/endless_potentiometer.py b/lambdalib/cores/mech/endless_potentiometer.py new file mode 100644 index 0000000..1f993d4 --- /dev/null +++ b/lambdalib/cores/mech/endless_potentiometer.py @@ -0,0 +1,219 @@ +# Endless potentiometer decoding into relative rotation +# 2025 - LambdaConcept +from amaranth import * +from ...interface import stream + +__all__ = ["EndlessPotentiometerDecoder"] + + +class _ThresholdDetector(Elaboratable): + """Detects when a value changes above/below a threshold""" + def __init__(self, width, threshold): + self._width = width + self._threshold = threshold + + self.readout = stream.Endpoint([ + ("value", width), + ("previous_value", width), + ]) + + self.detection = stream.Endpoint([ + ("up", 1), + ("down", 1), + ("value", width), # Readout value passthrough + ("delta", signed(width + 1)), # value - previous_value + ]) + + def elaborate(self, platform): + m = Module() + + low_threshold = Signal(signed(self._width + 1)) + high_threshold = Signal(signed(self._width + 1)) + m.d.comb += [ + low_threshold.eq(self.readout.previous_value - self._threshold), + high_threshold.eq(self.readout.previous_value + self._threshold), + ] + + with m.If(self.detection.ready | ~self.detection.valid): + m.d.sync += [ + self.detection.valid.eq(self.readout.valid), + self.detection.up.eq(self.readout.value > high_threshold), + self.detection.down.eq(self.readout.value < low_threshold), + self.detection.value.eq(self.readout.value), + self.detection.delta.eq(self.readout.value - self.readout.previous_value), + ] + m.d.comb += self.readout.ready.eq(self.detection.ready | ~self.detection.valid) + + return m + + +class _DirectionDecoding(Elaboratable): + def __init__(self, width): + self._width = width + + self.dir_a = stream.Endpoint([ + ("up", 1), + ("down", 1), + ("value", width), + ("delta", signed(width + 1)), + ]) + self.dir_b = stream.Endpoint([ + ("up", 1), + ("down", 1), + ("value", width), + ("delta", signed(width + 1)), + ]) + + self.direction = stream.Endpoint([ + ("clockwise", 1), + ("counterclockwise", 1), + ("value_a", width), + ("delta_a", signed(width + 1)), + ("value_b", width), + ("delta_b", signed(width + 1)), + ]) + + def elaborate(self, platform): + m = Module() + + m.d.comb += [ + self.dir_a.ready.eq(self.direction.ready & self.dir_b.valid), + self.dir_b.ready.eq(self.direction.ready & self.dir_a.valid), + ] + + with m.If(self.direction.ready | ~self.direction.valid): + m.d.sync += [ + self.direction.valid.eq(self.dir_a.valid & self.dir_b.valid), + self.direction.value_a.eq(self.dir_a.value), + self.direction.value_b.eq(self.dir_b.value), + self.direction.delta_a.eq(self.dir_a.delta), + self.direction.delta_b.eq(self.dir_b.delta), + ] + + a_above_b = Signal() + a_above_mid = Signal() + b_above_mid = Signal() + m.d.comb += [ + a_above_b.eq(self.dir_a.value > self.dir_b.value), + a_above_mid.eq(self.dir_a.value > (1 << self._width) // 2), + b_above_mid.eq(self.dir_b.value > (1 << self._width) // 2), + ] + + with m.If(self.direction.ready | ~self.direction.valid): + with m.If(self.dir_a.down & self.dir_b.down): + with m.If(a_above_b): + m.d.sync += self.direction.clockwise.eq(1) + with m.Else(): + m.d.sync += self.direction.counterclockwise.eq(1) + with m.Elif(self.dir_a.up & self.dir_b.up): + with m.If(~a_above_b): + m.d.sync += self.direction.clockwise.eq(1) + with m.Else(): + m.d.sync += self.direction.counterclockwise.eq(1) + with m.Elif(self.dir_a.up & self.dir_b.down): + with m.If(a_above_mid | b_above_mid): + m.d.sync += self.direction.clockwise.eq(1) + with m.Else(): + m.d.sync += self.direction.counterclockwise.eq(1) + with m.Elif(self.dir_a.down & self.dir_b.up): + with m.If(~a_above_mid | ~b_above_mid): + m.d.sync += self.direction.clockwise.eq(1) + with m.Else(): + m.d.sync += self.direction.counterclockwise.eq(1) + with m.Else(): + m.d.sync += [ + self.direction.clockwise.eq(0), + self.direction.counterclockwise.eq(0), + ] + + return m + + +class _ReadoutDeadzoneMuxer(Elaboratable): + def __init__(self, width, deadzone=0.8): + self._width = width + self._deadzone = deadzone + + self.direction = stream.Endpoint([ + ("clockwise", 1), + ("counterclockwise", 1), + ("value_a", width), + ("delta_a", signed(width + 1)), + ("value_b", width), + ("delta_b", signed(width + 1)), + ]) + + self.position = stream.Endpoint([ + ("diff", signed(width + 1)), + ]) + + def elaborate(self, platform): + m = Module() + + deadzone_max = int((1 << self._width) * self._deadzone) + deadzone_min = int((1 << self._width) * (1 - self._deadzone)) + + value = Signal(signed(self._width + 1)) + with m.If((self.direction.value_a < deadzone_max) & (self.direction.value_a > deadzone_min)): + with m.If(self.direction.clockwise): + m.d.comb += value.eq(abs(self.direction.delta_a)) + with m.Elif(self.direction.counterclockwise): + m.d.comb += value.eq(-abs(self.direction.delta_a)) + with m.Else(): + m.d.comb += value.eq(0) + with m.Else(): + with m.If(self.direction.clockwise): + m.d.comb += value.eq(abs(self.direction.delta_b)) + with m.Elif(self.direction.counterclockwise): + m.d.comb += value.eq(-abs(self.direction.delta_b)) + with m.Else(): + m.d.comb += value.eq(0) + + with m.If(self.position.ready | ~self.position.valid): + m.d.sync += [ + self.position.valid.eq(self.direction.valid), + self.position.diff.eq(value), + ] + m.d.comb += self.direction.ready.eq(self.position.ready | ~self.position.valid) + + return m + + +class EndlessPotentiometerDecoder(Elaboratable): + def __init__(self, width, threshold, deadzone): + self._width = width + self._threshold = threshold + self._deadzone = deadzone + + self.ch_a = stream.Endpoint([ + ("value", width), + ("previous_value", width), + ]) + self.ch_b = stream.Endpoint([ + ("value", width), + ("previous_value", width), + ]) + + self.position = stream.Endpoint([ + ("diff", signed(width + 1)), + ]) + + def elaborate(self, platform): + m = Module() + + m.submodules.thres_det_a = thres_det_a = _ThresholdDetector(self._width, self._threshold) + m.submodules.thres_det_b = thres_det_b = _ThresholdDetector(self._width, self._threshold) + m.submodules.dir_decoding = dir_decoding = _DirectionDecoding(self._width) + m.submodules.deadzone_mux = deadzone_mux = _ReadoutDeadzoneMuxer(self._width, self._deadzone) + m.d.comb += [ + self.ch_a.connect(thres_det_a.readout), + self.ch_b.connect(thres_det_b.readout), + + thres_det_a.detection.connect(dir_decoding.dir_a), + thres_det_b.detection.connect(dir_decoding.dir_b), + + dir_decoding.direction.connect(deadzone_mux.direction), + deadzone_mux.position.connect(self.position), + ] + + return m diff --git a/lambdalib/tests/test_cores_endless_potentiometer.py b/lambdalib/tests/test_cores_endless_potentiometer.py new file mode 100644 index 0000000..5820edc --- /dev/null +++ b/lambdalib/tests/test_cores_endless_potentiometer.py @@ -0,0 +1,179 @@ +from amaranth.sim import * +from lambdalib.interface.stream_sim import * +from lambdalib.cores.mech.endless_potentiometer import ( + EndlessPotentiometerDecoder, + _ThresholdDetector, + _DirectionDecoding, +) + + +def _wiper_to_adc(angular_position, phase_shift, adc_resolution): + """Emulate sampled wiper output + + :param angular_position: Wiper absolute position in deg + :param phase_shift: Phase shift in deg + :param adc_resolution: ADC resolution in bits""" + adc_max = (1 << adc_resolution) - 1 + + angular_position += phase_shift + + # Make it 0-180-0 + periodic_angle = angular_position % 180 + if (angular_position // 180) & 1: + periodic_angle = 180 - periodic_angle + + return int(adc_max * (periodic_angle / 180)) + + + +def test_wiper_to_adc(): + # W/o phase quadrature + assert _wiper_to_adc(0, 0, 10) == 0 + assert _wiper_to_adc(90, 0, 10) == 1023//2 + assert _wiper_to_adc(180, 0, 10) == 1023 + assert _wiper_to_adc(270, 0, 10) == 1023//2 + assert _wiper_to_adc(360, 0, 10) == 0 + + # W/ phase quadrature + assert _wiper_to_adc(0, 90, 10) == 1023//2 + assert _wiper_to_adc(90, 90, 10) == 1023 + assert _wiper_to_adc(180, 90, 10) == 1023//2 + assert _wiper_to_adc(270, 90, 10) == 0 + assert _wiper_to_adc(360, 90, 10) == 1023//2 + + +def test_threshold_detector(): + dut = _ThresholdDetector(width=8, threshold=16) + sim = Simulator(dut) + + data = { + "value": [ + 0, 32, 0, + ], + "previous_value": [ + 0, 0, 32, + ] + } + + tx = StreamSimSender(dut.readout, data, speed=0.3) + rx = StreamSimReceiver(dut.detection, length=len(data["value"]), speed=0.8, verbose=True) + + sim.add_clock(1e-6) + sim.add_sync_process(tx.sync_process) + sim.add_sync_process(rx.sync_process) + with sim.write_vcd("tests/test_threshold_detector.vcd"): + sim.run() + + rx.verify({ + "up": [0, 1, 0], + "down": [0, 0, 1], + "value": [0, 32, 0], + "delta": [0, 32, -32], + }) + + +def test_direction_decoding(): + dut = _DirectionDecoding(width=8) + sim = Simulator(dut) + + ch_a = { + "up": [ + 1, + ], + "down": [ + 0, + ], + "value": [ + 0, + ], + "delta": [ + 0, + ], + } + ch_b = { + "up": [ + 1, + ], + "down": [ + 0, + ], + "value": [ + 0, + ], + "delta": [ + 0, + ], + } + + tx_a = StreamSimSender(dut.dir_a, ch_a, speed=0.3) + tx_b = StreamSimSender(dut.dir_b, ch_b, speed=0.3) + rx = StreamSimReceiver(dut.direction, length=len(ch_a["value"]), speed=0.8, verbose=True) + + sim.add_clock(1e-6) + sim.add_sync_process(tx_a.sync_process) + sim.add_sync_process(tx_b.sync_process) + sim.add_sync_process(rx.sync_process) + with sim.write_vcd("tests/test_direction_decoding.vcd"): + sim.run() + + +def test_endless_potentiometer_decoder_single(): + adc_resolution = 10 # bits + + dut = EndlessPotentiometerDecoder(adc_resolution, 5, 0.8) + sim = Simulator(dut) + + ch_a = { + "value": [_wiper_to_adc(90, 0, adc_resolution)], + "previous_value": [_wiper_to_adc(0, 0, adc_resolution)], + } + ch_b = { + "value": [_wiper_to_adc(90, 90, adc_resolution)], + "previous_value": [_wiper_to_adc(90, 90, adc_resolution)], + } + + tx_a = StreamSimSender(dut.ch_a, ch_a, speed=0.3) + tx_b = StreamSimSender(dut.ch_b, ch_b, speed=0.3) + rx = StreamSimReceiver(dut.position, 1, speed=0.8, verbose=True) + + sim.add_clock(1e-6) + sim.add_sync_process(tx_a.sync_process) + sim.add_sync_process(tx_b.sync_process) + sim.add_sync_process(rx.sync_process) + with sim.write_vcd("tests/test_endless_potentiometer_decoder_single.vcd"): + sim.run() + + assert rx.data['diff'][0] == _wiper_to_adc(90, 0, adc_resolution)-_wiper_to_adc(0, 0, adc_resolution) + + +def test_endless_potentiometer_decoder(): + adc_resolution = 10 # bits + + dut = EndlessPotentiometerDecoder(adc_resolution, 2, 0.8) + sim = Simulator(dut) + + wiper_a = [_wiper_to_adc(x, 0, adc_resolution) for x in range(720)] + wiper_b = [_wiper_to_adc(x, 90, adc_resolution) for x in range(720)] + + ch_a = { + "value": wiper_a[1:], + "previous_value": wiper_a[:-1], + } + ch_b = { + "value": wiper_b[1:], + "previous_value": wiper_b[:-1], + } + + tx_a = StreamSimSender(dut.ch_a, ch_a, speed=0.3) + tx_b = StreamSimSender(dut.ch_b, ch_b, speed=0.3) + rx = StreamSimReceiver(dut.position, length=len(ch_a["value"]), speed=0.8, verbose=True) + + sim.add_clock(1e-6) + sim.add_sync_process(tx_a.sync_process) + sim.add_sync_process(tx_b.sync_process) + sim.add_sync_process(rx.sync_process) + with sim.write_vcd("tests/test_endless_potentiometer_decoder.vcd"): + sim.run() + + for x in rx.data['diff']: + assert x in [5, 6] From 6ddc77514d4c15235b220917959b4bb3e90b8fea Mon Sep 17 00:00:00 2001 From: Jean THOMAS Date: Mon, 24 Mar 2025 16:23:27 +0100 Subject: [PATCH 3/3] core.mech: Simplify EndlessPotentiometerDecoder, add id passthrough --- lambdalib/cores/mech/endless_potentiometer.py | 58 +++++++++++++------ .../tests/test_cores_endless_potentiometer.py | 38 +++++------- 2 files changed, 56 insertions(+), 40 deletions(-) diff --git a/lambdalib/cores/mech/endless_potentiometer.py b/lambdalib/cores/mech/endless_potentiometer.py index 1f993d4..e8bfd08 100644 --- a/lambdalib/cores/mech/endless_potentiometer.py +++ b/lambdalib/cores/mech/endless_potentiometer.py @@ -8,16 +8,18 @@ class _ThresholdDetector(Elaboratable): """Detects when a value changes above/below a threshold""" - def __init__(self, width, threshold): + def __init__(self, width, threshold, id_width=0): self._width = width self._threshold = threshold self.readout = stream.Endpoint([ + ("id", id_width), ("value", width), ("previous_value", width), ]) self.detection = stream.Endpoint([ + ("id", id_width), ("up", 1), ("down", 1), ("value", width), # Readout value passthrough @@ -41,6 +43,7 @@ def elaborate(self, platform): self.detection.down.eq(self.readout.value < low_threshold), self.detection.value.eq(self.readout.value), self.detection.delta.eq(self.readout.value - self.readout.previous_value), + self.detection.id.eq(self.readout.id), ] m.d.comb += self.readout.ready.eq(self.detection.ready | ~self.detection.valid) @@ -48,16 +51,18 @@ def elaborate(self, platform): class _DirectionDecoding(Elaboratable): - def __init__(self, width): + def __init__(self, width, id_width=0): self._width = width self.dir_a = stream.Endpoint([ + ("id", id_width), ("up", 1), ("down", 1), ("value", width), ("delta", signed(width + 1)), ]) self.dir_b = stream.Endpoint([ + ("id", id_width), ("up", 1), ("down", 1), ("value", width), @@ -65,6 +70,7 @@ def __init__(self, width): ]) self.direction = stream.Endpoint([ + ("id", id_width), ("clockwise", 1), ("counterclockwise", 1), ("value_a", width), @@ -130,11 +136,12 @@ def elaborate(self, platform): class _ReadoutDeadzoneMuxer(Elaboratable): - def __init__(self, width, deadzone=0.8): + def __init__(self, width, deadzone=0.8, id_width=0): self._width = width self._deadzone = deadzone self.direction = stream.Endpoint([ + ("id", id_width), ("clockwise", 1), ("counterclockwise", 1), ("value_a", width), @@ -144,7 +151,10 @@ def __init__(self, width, deadzone=0.8): ]) self.position = stream.Endpoint([ + ("id", id_width), ("diff", signed(width + 1)), + ("value_a", width), + ("value_b", width), ]) def elaborate(self, platform): @@ -172,7 +182,10 @@ def elaborate(self, platform): with m.If(self.position.ready | ~self.position.valid): m.d.sync += [ self.position.valid.eq(self.direction.valid), + self.position.value_a.eq(self.direction.value_a), + self.position.value_b.eq(self.direction.value_b), self.position.diff.eq(value), + self.position.id.eq(self.direction.id), ] m.d.comb += self.direction.ready.eq(self.position.ready | ~self.position.valid) @@ -180,34 +193,45 @@ def elaborate(self, platform): class EndlessPotentiometerDecoder(Elaboratable): - def __init__(self, width, threshold, deadzone): + def __init__(self, width, threshold, deadzone, id_width=0): self._width = width self._threshold = threshold self._deadzone = deadzone + self._id_width = id_width - self.ch_a = stream.Endpoint([ - ("value", width), - ("previous_value", width), - ]) - self.ch_b = stream.Endpoint([ - ("value", width), - ("previous_value", width), + self.adc_readout = stream.Endpoint([ + ("id", id_width), + ("value_a", width), + ("previous_value_a", width), + ("value_b", width), + ("previous_value_b", width), ]) self.position = stream.Endpoint([ + ("id", id_width), ("diff", signed(width + 1)), + ("value_a", width), + ("value_b", width), ]) def elaborate(self, platform): m = Module() - m.submodules.thres_det_a = thres_det_a = _ThresholdDetector(self._width, self._threshold) - m.submodules.thres_det_b = thres_det_b = _ThresholdDetector(self._width, self._threshold) - m.submodules.dir_decoding = dir_decoding = _DirectionDecoding(self._width) - m.submodules.deadzone_mux = deadzone_mux = _ReadoutDeadzoneMuxer(self._width, self._deadzone) + m.submodules.thres_det_a = thres_det_a = _ThresholdDetector(self._width, self._threshold, self._id_width) + m.submodules.thres_det_b = thres_det_b = _ThresholdDetector(self._width, self._threshold, self._id_width) + m.submodules.dir_decoding = dir_decoding = _DirectionDecoding(self._width, self._id_width) + m.submodules.deadzone_mux = deadzone_mux = _ReadoutDeadzoneMuxer(self._width, self._deadzone, self._id_width) m.d.comb += [ - self.ch_a.connect(thres_det_a.readout), - self.ch_b.connect(thres_det_b.readout), + self.adc_readout.ready.eq(thres_det_a.readout.ready & thres_det_b.readout.ready), + + thres_det_a.readout.valid.eq(self.adc_readout.valid & thres_det_b.readout.ready), + thres_det_a.readout.id.eq(self.adc_readout.id), + thres_det_a.readout.value.eq(self.adc_readout.value_a), + thres_det_a.readout.previous_value.eq(self.adc_readout.previous_value_a), + thres_det_b.readout.valid.eq(self.adc_readout.valid & thres_det_a.readout.ready), + thres_det_b.readout.id.eq(self.adc_readout.id), + thres_det_b.readout.value.eq(self.adc_readout.value_b), + thres_det_b.readout.previous_value.eq(self.adc_readout.previous_value_b), thres_det_a.detection.connect(dir_decoding.dir_a), thres_det_b.detection.connect(dir_decoding.dir_b), diff --git a/lambdalib/tests/test_cores_endless_potentiometer.py b/lambdalib/tests/test_cores_endless_potentiometer.py index 5820edc..5c4f33e 100644 --- a/lambdalib/tests/test_cores_endless_potentiometer.py +++ b/lambdalib/tests/test_cores_endless_potentiometer.py @@ -123,22 +123,18 @@ def test_endless_potentiometer_decoder_single(): dut = EndlessPotentiometerDecoder(adc_resolution, 5, 0.8) sim = Simulator(dut) - ch_a = { - "value": [_wiper_to_adc(90, 0, adc_resolution)], - "previous_value": [_wiper_to_adc(0, 0, adc_resolution)], - } - ch_b = { - "value": [_wiper_to_adc(90, 90, adc_resolution)], - "previous_value": [_wiper_to_adc(90, 90, adc_resolution)], + adc_readout = { + "value_a": [_wiper_to_adc(90, 0, adc_resolution)], + "previous_value_a": [_wiper_to_adc(0, 0, adc_resolution)], + "value_b": [_wiper_to_adc(90, 90, adc_resolution)], + "previous_value_b": [_wiper_to_adc(90, 90, adc_resolution)], } - tx_a = StreamSimSender(dut.ch_a, ch_a, speed=0.3) - tx_b = StreamSimSender(dut.ch_b, ch_b, speed=0.3) + tx = StreamSimSender(dut.adc_readout, adc_readout, speed=0.3) rx = StreamSimReceiver(dut.position, 1, speed=0.8, verbose=True) sim.add_clock(1e-6) - sim.add_sync_process(tx_a.sync_process) - sim.add_sync_process(tx_b.sync_process) + sim.add_sync_process(tx.sync_process) sim.add_sync_process(rx.sync_process) with sim.write_vcd("tests/test_endless_potentiometer_decoder_single.vcd"): sim.run() @@ -155,22 +151,18 @@ def test_endless_potentiometer_decoder(): wiper_a = [_wiper_to_adc(x, 0, adc_resolution) for x in range(720)] wiper_b = [_wiper_to_adc(x, 90, adc_resolution) for x in range(720)] - ch_a = { - "value": wiper_a[1:], - "previous_value": wiper_a[:-1], - } - ch_b = { - "value": wiper_b[1:], - "previous_value": wiper_b[:-1], + adc_readout = { + "value_a": wiper_a[1:], + "previous_value_a": wiper_a[:-1], + "value_b": wiper_b[1:], + "previous_value_b": wiper_b[:-1], } - tx_a = StreamSimSender(dut.ch_a, ch_a, speed=0.3) - tx_b = StreamSimSender(dut.ch_b, ch_b, speed=0.3) - rx = StreamSimReceiver(dut.position, length=len(ch_a["value"]), speed=0.8, verbose=True) + tx = StreamSimSender(dut.adc_readout, adc_readout, speed=0.3) + rx = StreamSimReceiver(dut.position, length=len(adc_readout["value_a"]), speed=0.8, verbose=True) sim.add_clock(1e-6) - sim.add_sync_process(tx_a.sync_process) - sim.add_sync_process(tx_b.sync_process) + sim.add_sync_process(tx.sync_process) sim.add_sync_process(rx.sync_process) with sim.write_vcd("tests/test_endless_potentiometer_decoder.vcd"): sim.run()