Skip to content

Commit 03e40e5

Browse files
author
Jean THOMAS
committed
cores.mech: Add Endless Potentiometer Decoder
1 parent bd22bcb commit 03e40e5

File tree

2 files changed

+398
-0
lines changed

2 files changed

+398
-0
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
# Endless potentiometer decoding into relative rotation
2+
# 2025 - LambdaConcept <[email protected]>
3+
from amaranth import *
4+
from ...interface import stream
5+
6+
__all__ = ["EndlessPotentiometerDecoder"]
7+
8+
9+
class _ThresholdDetector(Elaboratable):
10+
"""Detects when a value changes above/below a threshold"""
11+
def __init__(self, width, threshold):
12+
self._width = width
13+
self._threshold = threshold
14+
15+
self.readout = stream.Endpoint([
16+
("value", width),
17+
("previous_value", width),
18+
])
19+
20+
self.detection = stream.Endpoint([
21+
("up", 1),
22+
("down", 1),
23+
("value", width), # Readout value passthrough
24+
("delta", signed(width + 1)), # value - previous_value
25+
])
26+
27+
def elaborate(self, platform):
28+
m = Module()
29+
30+
low_threshold = Signal(signed(self._width + 1))
31+
high_threshold = Signal(signed(self._width + 1))
32+
m.d.comb += [
33+
low_threshold.eq(self.readout.previous_value - self._threshold),
34+
high_threshold.eq(self.readout.previous_value + self._threshold),
35+
]
36+
37+
with m.If(self.detection.ready | ~self.detection.valid):
38+
m.d.sync += [
39+
self.detection.valid.eq(self.readout.valid),
40+
self.detection.up.eq(self.readout.value > high_threshold),
41+
self.detection.down.eq(self.readout.value < low_threshold),
42+
self.detection.value.eq(self.readout.value),
43+
self.detection.delta.eq(self.readout.value - self.readout.previous_value),
44+
]
45+
m.d.comb += self.readout.ready.eq(self.detection.ready | ~self.detection.valid)
46+
47+
return m
48+
49+
50+
class _DirectionDecoding(Elaboratable):
51+
def __init__(self, width):
52+
self._width = width
53+
54+
self.dir_a = stream.Endpoint([
55+
("up", 1),
56+
("down", 1),
57+
("value", width),
58+
("delta", signed(width + 1)),
59+
])
60+
self.dir_b = stream.Endpoint([
61+
("up", 1),
62+
("down", 1),
63+
("value", width),
64+
("delta", signed(width + 1)),
65+
])
66+
67+
self.direction = stream.Endpoint([
68+
("clockwise", 1),
69+
("counterclockwise", 1),
70+
("value_a", width),
71+
("delta_a", signed(width + 1)),
72+
("value_b", width),
73+
("delta_b", signed(width + 1)),
74+
])
75+
76+
def elaborate(self, platform):
77+
m = Module()
78+
79+
m.d.comb += [
80+
self.dir_a.ready.eq(self.direction.ready & self.dir_b.valid),
81+
self.dir_b.ready.eq(self.direction.ready & self.dir_a.valid),
82+
]
83+
84+
with m.If(self.direction.ready | ~self.direction.valid):
85+
m.d.sync += [
86+
self.direction.valid.eq(self.dir_a.valid & self.dir_b.valid),
87+
self.direction.value_a.eq(self.dir_a.value),
88+
self.direction.value_b.eq(self.dir_b.value),
89+
self.direction.delta_a.eq(self.dir_a.delta),
90+
self.direction.delta_b.eq(self.dir_b.delta),
91+
]
92+
93+
a_above_b = Signal()
94+
a_above_mid = Signal()
95+
b_above_mid = Signal()
96+
m.d.comb += [
97+
a_above_b.eq(self.dir_a.value > self.dir_b.value),
98+
a_above_mid.eq(self.dir_a.value > (1 << self._width) // 2),
99+
b_above_mid.eq(self.dir_b.value > (1 << self._width) // 2),
100+
]
101+
102+
with m.If(self.direction.ready | ~self.direction.valid):
103+
with m.If(self.dir_a.down & self.dir_b.down):
104+
with m.If(a_above_b):
105+
m.d.sync += self.direction.clockwise.eq(1)
106+
with m.Else():
107+
m.d.sync += self.direction.counterclockwise.eq(1)
108+
with m.Elif(self.dir_a.up & self.dir_b.up):
109+
with m.If(~a_above_b):
110+
m.d.sync += self.direction.clockwise.eq(1)
111+
with m.Else():
112+
m.d.sync += self.direction.counterclockwise.eq(1)
113+
with m.Elif(self.dir_a.up & self.dir_b.down):
114+
with m.If(a_above_mid | b_above_mid):
115+
m.d.sync += self.direction.clockwise.eq(1)
116+
with m.Else():
117+
m.d.sync += self.direction.counterclockwise.eq(1)
118+
with m.Elif(self.dir_a.down & self.dir_b.up):
119+
with m.If(~a_above_mid | ~b_above_mid):
120+
m.d.sync += self.direction.clockwise.eq(1)
121+
with m.Else():
122+
m.d.sync += self.direction.counterclockwise.eq(1)
123+
with m.Else():
124+
m.d.sync += [
125+
self.direction.clockwise.eq(0),
126+
self.direction.counterclockwise.eq(0),
127+
]
128+
129+
return m
130+
131+
132+
class _ReadoutDeadzoneMuxer(Elaboratable):
133+
def __init__(self, width, deadzone=0.8):
134+
self._width = width
135+
self._deadzone = deadzone
136+
137+
self.direction = stream.Endpoint([
138+
("clockwise", 1),
139+
("counterclockwise", 1),
140+
("value_a", width),
141+
("delta_a", signed(width + 1)),
142+
("value_b", width),
143+
("delta_b", signed(width + 1)),
144+
])
145+
146+
self.position = stream.Endpoint([
147+
("diff", signed(width + 1)),
148+
])
149+
150+
def elaborate(self, platform):
151+
m = Module()
152+
153+
deadzone_max = int((1 << self._width) * self._deadzone)
154+
deadzone_min = int((1 << self._width) * (1 - self._deadzone))
155+
156+
value = Signal(signed(self._width + 1))
157+
with m.If((self.direction.value_a < deadzone_max) & (self.direction.value_a > deadzone_min)):
158+
with m.If(self.direction.clockwise):
159+
m.d.comb += value.eq(abs(self.direction.delta_a))
160+
with m.Elif(self.direction.counterclockwise):
161+
m.d.comb += value.eq(-abs(self.direction.delta_a))
162+
with m.Else():
163+
m.d.comb += value.eq(0)
164+
with m.Else():
165+
with m.If(self.direction.clockwise):
166+
m.d.comb += value.eq(abs(self.direction.delta_b))
167+
with m.Elif(self.direction.counterclockwise):
168+
m.d.comb += value.eq(-abs(self.direction.delta_b))
169+
with m.Else():
170+
m.d.comb += value.eq(0)
171+
172+
with m.If(self.position.ready | ~self.position.valid):
173+
m.d.sync += [
174+
self.position.valid.eq(self.direction.valid),
175+
self.position.diff.eq(value),
176+
]
177+
m.d.comb += self.direction.ready.eq(self.position.ready | ~self.position.valid)
178+
179+
return m
180+
181+
182+
class EndlessPotentiometerDecoder(Elaboratable):
183+
def __init__(self, width, threshold, deadzone):
184+
self._width = width
185+
self._threshold = threshold
186+
self._deadzone = deadzone
187+
188+
self.ch_a = stream.Endpoint([
189+
("value", width),
190+
("previous_value", width),
191+
])
192+
self.ch_b = stream.Endpoint([
193+
("value", width),
194+
("previous_value", width),
195+
])
196+
197+
self.position = stream.Endpoint([
198+
("diff", signed(width + 1)),
199+
])
200+
201+
def elaborate(self, platform):
202+
m = Module()
203+
204+
m.submodules.thres_det_a = thres_det_a = _ThresholdDetector(self._width, self._threshold)
205+
m.submodules.thres_det_b = thres_det_b = _ThresholdDetector(self._width, self._threshold)
206+
m.submodules.dir_decoding = dir_decoding = _DirectionDecoding(self._width)
207+
m.submodules.deadzone_mux = deadzone_mux = _ReadoutDeadzoneMuxer(self._width, self._deadzone)
208+
m.d.comb += [
209+
self.ch_a.connect(thres_det_a.readout),
210+
self.ch_b.connect(thres_det_b.readout),
211+
212+
thres_det_a.detection.connect(dir_decoding.dir_a),
213+
thres_det_b.detection.connect(dir_decoding.dir_b),
214+
215+
dir_decoding.direction.connect(deadzone_mux.direction),
216+
deadzone_mux.position.connect(self.position),
217+
]
218+
219+
return m
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
from amaranth.sim import *
2+
from lambdalib.interface.stream_sim import *
3+
from lambdalib.cores.mech.endless_potentiometer import (
4+
EndlessPotentiometerDecoder,
5+
_ThresholdDetector,
6+
_DirectionDecoding,
7+
)
8+
9+
10+
def _wiper_to_adc(angular_position, phase_shift, adc_resolution):
11+
"""Emulate sampled wiper output
12+
13+
:param angular_position: Wiper absolute position in deg
14+
:param phase_shift: Phase shift in deg
15+
:param adc_resolution: ADC resolution in bits"""
16+
adc_max = (1 << adc_resolution) - 1
17+
18+
angular_position += phase_shift
19+
20+
# Make it 0-180-0
21+
periodic_angle = angular_position % 180
22+
if (angular_position // 180) & 1:
23+
periodic_angle = 180 - periodic_angle
24+
25+
return int(adc_max * (periodic_angle / 180))
26+
27+
28+
29+
def test_wiper_to_adc():
30+
# W/o phase quadrature
31+
assert _wiper_to_adc(0, 0, 10) == 0
32+
assert _wiper_to_adc(90, 0, 10) == 1023//2
33+
assert _wiper_to_adc(180, 0, 10) == 1023
34+
assert _wiper_to_adc(270, 0, 10) == 1023//2
35+
assert _wiper_to_adc(360, 0, 10) == 0
36+
37+
# W/ phase quadrature
38+
assert _wiper_to_adc(0, 90, 10) == 1023//2
39+
assert _wiper_to_adc(90, 90, 10) == 1023
40+
assert _wiper_to_adc(180, 90, 10) == 1023//2
41+
assert _wiper_to_adc(270, 90, 10) == 0
42+
assert _wiper_to_adc(360, 90, 10) == 1023//2
43+
44+
45+
def test_threshold_detector():
46+
dut = _ThresholdDetector(width=8, threshold=16)
47+
sim = Simulator(dut)
48+
49+
data = {
50+
"value": [
51+
0, 32, 0,
52+
],
53+
"previous_value": [
54+
0, 0, 32,
55+
]
56+
}
57+
58+
tx = StreamSimSender(dut.readout, data, speed=0.3)
59+
rx = StreamSimReceiver(dut.detection, length=len(data["value"]), speed=0.8, verbose=True)
60+
61+
sim.add_clock(1e-6)
62+
sim.add_sync_process(tx.sync_process)
63+
sim.add_sync_process(rx.sync_process)
64+
with sim.write_vcd("tests/test_threshold_detector.vcd"):
65+
sim.run()
66+
67+
rx.verify({
68+
"up": [0, 1, 0],
69+
"down": [0, 0, 1],
70+
"value": [0, 32, 0],
71+
"delta": [0, 32, -32],
72+
})
73+
74+
75+
def test_direction_decoding():
76+
dut = _DirectionDecoding(width=8)
77+
sim = Simulator(dut)
78+
79+
ch_a = {
80+
"up": [
81+
1,
82+
],
83+
"down": [
84+
0,
85+
],
86+
"value": [
87+
0,
88+
],
89+
"delta": [
90+
0,
91+
],
92+
}
93+
ch_b = {
94+
"up": [
95+
1,
96+
],
97+
"down": [
98+
0,
99+
],
100+
"value": [
101+
0,
102+
],
103+
"delta": [
104+
0,
105+
],
106+
}
107+
108+
tx_a = StreamSimSender(dut.dir_a, ch_a, speed=0.3)
109+
tx_b = StreamSimSender(dut.dir_b, ch_b, speed=0.3)
110+
rx = StreamSimReceiver(dut.direction, length=len(ch_a["value"]), speed=0.8, verbose=True)
111+
112+
sim.add_clock(1e-6)
113+
sim.add_sync_process(tx_a.sync_process)
114+
sim.add_sync_process(tx_b.sync_process)
115+
sim.add_sync_process(rx.sync_process)
116+
with sim.write_vcd("tests/test_direction_decoding.vcd"):
117+
sim.run()
118+
119+
120+
def test_endless_potentiometer_decoder_single():
121+
adc_resolution = 10 # bits
122+
123+
dut = EndlessPotentiometerDecoder(adc_resolution, 5, 0.8)
124+
sim = Simulator(dut)
125+
126+
ch_a = {
127+
"value": [_wiper_to_adc(90, 0, adc_resolution)],
128+
"previous_value": [_wiper_to_adc(0, 0, adc_resolution)],
129+
}
130+
ch_b = {
131+
"value": [_wiper_to_adc(90, 90, adc_resolution)],
132+
"previous_value": [_wiper_to_adc(90, 90, adc_resolution)],
133+
}
134+
135+
tx_a = StreamSimSender(dut.ch_a, ch_a, speed=0.3)
136+
tx_b = StreamSimSender(dut.ch_b, ch_b, speed=0.3)
137+
rx = StreamSimReceiver(dut.position, 1, speed=0.8, verbose=True)
138+
139+
sim.add_clock(1e-6)
140+
sim.add_sync_process(tx_a.sync_process)
141+
sim.add_sync_process(tx_b.sync_process)
142+
sim.add_sync_process(rx.sync_process)
143+
with sim.write_vcd("tests/test_endless_potentiometer_decoder_single.vcd"):
144+
sim.run()
145+
146+
assert rx.data['diff'][0] == _wiper_to_adc(90, 0, adc_resolution)-_wiper_to_adc(0, 0, adc_resolution)
147+
148+
149+
def test_endless_potentiometer_decoder():
150+
adc_resolution = 10 # bits
151+
152+
dut = EndlessPotentiometerDecoder(adc_resolution, 2, 0.8)
153+
sim = Simulator(dut)
154+
155+
wiper_a = [_wiper_to_adc(x, 0, adc_resolution) for x in range(720)]
156+
wiper_b = [_wiper_to_adc(x, 90, adc_resolution) for x in range(720)]
157+
158+
ch_a = {
159+
"value": wiper_a[1:],
160+
"previous_value": wiper_a[:-1],
161+
}
162+
ch_b = {
163+
"value": wiper_b[1:],
164+
"previous_value": wiper_b[:-1],
165+
}
166+
167+
tx_a = StreamSimSender(dut.ch_a, ch_a, speed=0.3)
168+
tx_b = StreamSimSender(dut.ch_b, ch_b, speed=0.3)
169+
rx = StreamSimReceiver(dut.position, length=len(ch_a["value"]), speed=0.8, verbose=True)
170+
171+
sim.add_clock(1e-6)
172+
sim.add_sync_process(tx_a.sync_process)
173+
sim.add_sync_process(tx_b.sync_process)
174+
sim.add_sync_process(rx.sync_process)
175+
with sim.write_vcd("tests/test_endless_potentiometer_decoder.vcd"):
176+
sim.run()
177+
178+
for x in rx.data['diff']:
179+
assert x in [5, 6]

0 commit comments

Comments
 (0)