Skip to content

Commit 2e9a822

Browse files
committed
Basic diagnostics
1 parent 5bf297b commit 2e9a822

File tree

7 files changed

+286
-57
lines changed

7 files changed

+286
-57
lines changed

rust/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ fn tof_bin_event_linear(tof: i32, start: i32, stop: i32, step: i32) -> Option<us
2828

2929
#[pyfunction]
3030
fn bin_events_into_spectrum(
31-
mut histogram: PyReadwriteArray2<u64>,
31+
mut histogram: PyReadwriteArray2<f64>,
3232
event_tofs: PyReadonlyArray1<i32>,
3333
pixel_ids: PyReadonlyArray1<i32>,
3434
tof_bin_boundaries: PyReadonlyArray1<i32>,
@@ -42,7 +42,7 @@ fn bin_events_into_spectrum(
4242
.for_each(|(&tof, &pixel)| {
4343
if let Some(tof_bin) = tof_bin_event(tof, boundaries)
4444
&& let Some(e) = histogram.get_mut((pixel as usize, tof_bin)) {
45-
*e += 1;
45+
*e += 1.0
4646
}
4747
});
4848

@@ -51,7 +51,7 @@ fn bin_events_into_spectrum(
5151

5252
#[pyfunction]
5353
fn bin_events_into_spectrum_linear(
54-
mut histogram: PyReadwriteArray2<u64>,
54+
mut histogram: PyReadwriteArray2<f64>,
5555
event_tofs: PyReadonlyArray1<i32>,
5656
pixel_ids: PyReadonlyArray1<i32>,
5757
tof_bin_start: i32,
@@ -66,7 +66,7 @@ fn bin_events_into_spectrum_linear(
6666
.for_each(|(&tof, &pixel)| {
6767
if let Some(tof_bin) = tof_bin_event_linear(tof, tof_bin_start, tof_bin_stop, tof_bin_step)
6868
&& let Some(e) = histogram.get_mut((pixel as usize, tof_bin)) {
69-
*e += 1;
69+
*e += 1.0
7070
}
7171
});
7272

src/kafka_dae_diagnostics/_cli.py

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@
66

77
import numpy as np
88
from confluent_kafka.cimpl import TopicPartition
9-
from p4p.server import DynamicProvider, Server
9+
from p4p.server import DynamicProvider, Server, StaticProvider
1010
from confluent_kafka import Consumer
1111

1212
from kafka_dae_diagnostics.data import Data
1313
from kafka_dae_diagnostics.kafka_handlers import handle_event_messages, \
14-
handle_run_info_messages, handle_event_msg
14+
handle_run_info_messages
1515
from kafka_dae_diagnostics.spectrum_handlers import SpectrumHandler
16+
import p4p
17+
18+
from kafka_dae_diagnostics.static_pvs import StaticPVs
1619

1720
logger = logging.getLogger(__name__)
1821

@@ -21,31 +24,51 @@
2124

2225
def main() -> None:
2326
data = Data(
24-
spectra=np.zeros(shape=(1, 1, 1), dtype=np.uint64),
25-
spectrum_updaters=[],
27+
spectra=np.zeros(shape=(1, 1, 1), dtype=np.float64),
28+
callbacks={},
29+
bin_boundaries=np.linspace(0, 20_000_000, num=1001, dtype=np.int32)
2630
)
2731

28-
handler = SpectrumHandler("TE:NDW2922:KDAEDIAG:", data)
32+
prefix = "TE:NDW2922:KDAEDIAG:"
33+
34+
static_pvs = StaticPVs(data)
35+
static_provider = StaticProvider()
36+
static_provider.add(f"{prefix}EVENTS", static_pvs.total_events)
37+
static_provider.add(f"{prefix}MEVENTS", static_pvs.total_mevents)
38+
static_provider.add(f"{prefix}TOTALCOUNTS", static_pvs.total_events)
39+
static_provider.add(f"{prefix}TOTAL_EVENT_MESSAGES", static_pvs.total_event_messages)
40+
static_provider.add(f"{prefix}TOTAL_EVENT_MEGABYTES", static_pvs.total_event_megabytes)
41+
static_provider.add(f"{prefix}COUNTRATE", static_pvs.count_rate)
42+
static_provider.add(f"{prefix}HISTMEMORY", static_pvs.histogram_memory)
43+
44+
static_provider.add(f"{prefix}NUMPERIODS", static_pvs.num_periods)
45+
static_provider.add(f"{prefix}NUMSPECTRA", static_pvs.num_spectra)
46+
static_provider.add(f"{prefix}NUMTIMECHANNELS", static_pvs.num_time_channels)
47+
48+
static_provider.add(f"{prefix}STARTTIME", static_pvs.start_time)
49+
static_provider.add(f"{prefix}RUNDURATION", static_pvs.run_duration)
50+
static_provider.add(f"{prefix}DIAGNOSTICSLAG", static_pvs.processing_lag)
51+
52+
spectrum_handler = SpectrumHandler(prefix, data)
2953
providers = [
30-
DynamicProvider("spectra", handler=handler),
54+
DynamicProvider("spectra", handler=spectrum_handler),
55+
static_provider,
3156
]
57+
58+
data.callbacks["static-callbacks"] = static_pvs.update_all
59+
3260
server = Server(providers=providers)
3361
with server:
3462
consume_from_kafka_forever(data)
3563

3664

37-
def update_spectra(data: Data) -> None:
38-
num_updaters = len(data.spectrum_updaters)
39-
if num_updaters == 0:
40-
return
41-
42-
logger.debug("Updating %d spectra for connected clients", num_updaters)
43-
for period, detector, pv in data.spectrum_updaters:
44-
try:
45-
pv.post(data.spectra[(0, detector)].astype(np.double), timestamp=time.time())
46-
except Exception as e:
47-
logger.warning("Failed to update dynamic spectrum PV for period %d, detector %d, error: %s %s", period,
48-
detector, e.__class__.__name__, e)
65+
def callbacks(data: Data) -> None:
66+
with data.callbacks_lock:
67+
for callback_id, cb in data.callbacks.items():
68+
try:
69+
cb(data)
70+
except Exception as e:
71+
logger.warning("Callback '%s' failed, error: %s %s", callback_id, e.__class__.__name__, e)
4972

5073

5174
def make_runinfo_consumer(settings: dict[str, Any]) -> Consumer:
@@ -59,10 +82,8 @@ def make_runinfo_consumer(settings: dict[str, Any]) -> Consumer:
5982
"""
6083
runinfo_consumer = Consumer(settings)
6184

62-
start_offset = (
63-
runinfo_consumer.get_watermark_offsets(TopicPartition("NDW2922_runInfo", 0), cached=False)[1]
64-
- 2
65-
)
85+
low, high = runinfo_consumer.get_watermark_offsets(TopicPartition("NDW2922_runInfo", 0), cached=False)
86+
start_offset = max(high - 2, low)
6687
runinfo_consumer.assign([TopicPartition("NDW2922_runInfo", 0, start_offset)])
6788
return runinfo_consumer
6889

@@ -92,12 +113,15 @@ def consume_from_kafka_forever(data: Data) -> None:
92113

93114
while True:
94115
run_info_messages = runinfo_consumer.consume(num_messages=50, timeout=0.)
95-
handle_run_info_messages(run_info_messages, data=data, event_consumer=event_consumer)
116+
if run_info_messages:
117+
handle_run_info_messages(run_info_messages, data=data, event_consumer=event_consumer)
96118

97119
event_messages = event_consumer.consume(num_messages=1000, timeout=0.1)
98-
handle_event_messages(event_messages, data=data)
120+
if event_messages:
121+
start = time.time()
122+
handle_event_messages(event_messages, data=data)
123+
logger.debug("Handled %d event messages in %.3f ms", len(event_messages), ((time.time() - start) * 1000))
99124

100125
if len(event_messages) > 0 or len(run_info_messages) > 0:
101-
# If any messages arrived, spectra may have changed - update any PVs which
102-
# are connected.
103-
update_spectra(data)
126+
# If any messages arrived, data may have changed - update all subscribed callbacks
127+
callbacks(data)

src/kafka_dae_diagnostics/_kdaediag_rs.pyi

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ def bin_events_into_spectrum(
55
event_tofs: npt.NDArray,
66
pixel_ids: npt.NDArray,
77
tof_bin_boundaries: npt.NDArray,
8-
) -> npt.NDArray:
8+
) -> None:
99
pass
1010

1111

@@ -16,5 +16,5 @@ def bin_events_into_spectrum_linear(
1616
tof_bin_start: int,
1717
tof_bin_stop: int,
1818
tof_bin_step: int,
19-
) -> npt.NDArray:
19+
) -> None:
2020
pass

src/kafka_dae_diagnostics/data.py

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
"""Data being served by this IOC."""
22

33
import dataclasses
4+
import threading
5+
import time
6+
from datetime import date
7+
from typing import Callable
48

59
import numpy.typing as npt
610
import numpy as np
@@ -12,17 +16,96 @@
1216
class Data:
1317
"""A mutable object describing the data being served."""
1418

15-
spectra: npt.NDArray[np.uint64]
19+
spectra: npt.NDArray[np.uint]
1620
"""
1721
An array describing counts since last run start.
1822
1923
Has shape ``(n_periods, n_detectors, n_timechannels)`` and
20-
data-type :py:obj:`numpy.uint64`.
24+
data-type :py:obj:`numpy.uint`.
2125
"""
2226

23-
spectrum_updaters: list[tuple[int, int, SharedPV]]
27+
callbacks: dict[str, Callable[["Data"], None]]
28+
"""
29+
A list of callbacks to notify when data is updated.
2430
"""
25-
A list of callbacks to notify when a spectrum is updated.
2631

27-
Arguments are (period, detector, :py:obj:`~p4p.server.thread.SharedPV`).
32+
bin_boundaries: npt.NDArray[np.int32]
33+
"""
34+
Time-bin boundaries (ns).
35+
"""
36+
37+
callbacks_lock: threading.RLock = threading.RLock()
38+
"""
39+
Lock-object, must be taken when spectrum_updaters is iterated/mutated
40+
:meta private:
41+
"""
42+
43+
total_events: int = 0
44+
"""
45+
Total number of neutron events in this run.
46+
"""
47+
48+
total_event_messages: int = 0
49+
"""
50+
Total number of ev44 event messages in this run.
51+
"""
52+
53+
total_event_megabytes: float = 0.
54+
"""
55+
Megabytes of event messages processed in this run.
56+
"""
57+
58+
largest_kafka_timestamp: float = 0.
59+
"""
60+
Largest timestamp seen in an ev44 or pl72 message since
61+
the beginning of this run. Seconds since epoch.
2862
"""
63+
64+
most_recent_kafka_timestamp: float = 0.
65+
"""
66+
Timestamp in the most recently-processed ev44 or pl72 message.
67+
Seconds since epoch.
68+
"""
69+
70+
start_time: float = 0.
71+
"""
72+
Timestamp of the most recent pl72 run-start message.
73+
Seconds since epoch.
74+
"""
75+
76+
processing_lag: float = 0.
77+
"""
78+
Estimated time difference between an event being recorded in
79+
electronics and processed in KDAEDIAG ioc.
80+
"""
81+
82+
@property
83+
def mev(self):
84+
return self.total_events / 1_000_000
85+
86+
@property
87+
def duration(self):
88+
return max(self.largest_kafka_timestamp - self.start_time, 0)
89+
90+
@property
91+
def mev_per_hour(self):
92+
duration = self.duration
93+
if duration == 0:
94+
return 0
95+
return (self.mev / duration) * 3600
96+
97+
@property
98+
def num_periods(self) -> int:
99+
return self.spectra.shape[0]
100+
101+
@property
102+
def num_detectors(self) -> int:
103+
return self.spectra.shape[1]
104+
105+
@property
106+
def num_time_channels(self) -> int:
107+
return self.spectra.shape[2]
108+
109+
@property
110+
def histogram_megabytes(self) -> int:
111+
return self.spectra.nbytes / 1024**2

src/kafka_dae_diagnostics/kafka_handlers.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
Utilities for reacting to Kafka messages.
33
"""
44
import logging
5+
import time
6+
7+
import numpy.typing as npt
58

69
import numpy as np
710
from confluent_kafka import TopicPartition, Message, Consumer
@@ -23,7 +26,6 @@ def handle_event_messages(event_messages: list[Message], data: Data) -> None:
2326
event_messages: Messages received from Kafka event topic.
2427
data: Data served by ``kafka_dae_diagnostics``.
2528
"""
26-
logger.debug("Processing %s event messages", len(event_messages))
2729
for msg in event_messages:
2830
if msg.error():
2931
logger.warning("Kafka message error: %s", msg.error().code())
@@ -51,14 +53,22 @@ def handle_ev44(data: Data, msg: bytes):
5153
msg: Message bytes received from Kafka.
5254
"""
5355
ev44 = deserialise_ev44(msg)
54-
5556
bin_events_into_spectrum(
5657
histogram=data.spectra[0],
5758
event_tofs=ev44.time_of_flight,
5859
pixel_ids=ev44.pixel_id,
59-
tof_bin_boundaries=np.linspace(0, 20_000_000, 1_000, dtype=np.int32)
60+
tof_bin_boundaries=data.bin_boundaries
6061
)
6162

63+
data.total_events += ev44.pixel_id.size
64+
data.total_event_messages += 1
65+
data.total_event_megabytes += len(msg) / 1024**2
66+
67+
ev44_timestamp_s = ev44.reference_time[0] / 1_000_000_000
68+
data.largest_kafka_timestamp = max(data.largest_kafka_timestamp, ev44_timestamp_s)
69+
data.most_recent_kafka_timestamp = ev44_timestamp_s
70+
data.processing_lag = max(time.time() - ev44_timestamp_s, 0)
71+
6272

6373
def handle_run_info_messages(run_info_messages: list[Message], data: Data, event_consumer: Consumer):
6474
"""
@@ -103,20 +113,31 @@ def handle_pl72(data: Data, msg: bytes, event_consumer: Consumer) -> None:
103113
event_consumer: Kafka event topic consumer.
104114
"""
105115
pl72 = deserialise_pl72(msg)
106-
logger.info(f"Run start (filename='%s', start_time='%i', run_name='%s', instrument-name='%s', n_spectra='%i')", pl72.filename, pl72.start_time, pl72.run_name, pl72.instrument_name, pl72.detector_spectrum_map.n_spectra)
116+
logger.info("Run start (filename='%s', start_time='%i', run_name='%s', instrument-name='%s', n_spectra='%i')", pl72.filename, pl72.start_time, pl72.run_name, pl72.instrument_name, pl72.detector_spectrum_map.n_spectra)
107117
periods = 1 # TODO
108118
detectors = pl72.detector_spectrum_map.n_spectra
109119
time_channels = 1000 # TODO
110120

121+
data.time_bin_boundaries = np.linspace(0, 20_000_000, time_channels+1, dtype=np.int32) # TODO
122+
111123
# Only reallocate if shape has changed - otherwise zero existing array.
112-
if data.spectra.shape == [periods, detectors, time_channels]:
124+
if data.spectra.shape == (periods, detectors, time_channels):
113125
data.spectra[...] = 0
114126
else:
115127
del data.spectra
116-
data.spectra = np.zeros([periods, detectors, time_channels], dtype=np.uint64)
128+
data.spectra = np.zeros([periods, detectors, time_channels], dtype=np.float64)
117129

118130
# Assign event consumer to start at the time the run started
119131
tp = event_consumer.assignment()[0]
120132
event_consumer.seek(event_consumer.offsets_for_times(
121133
[TopicPartition(tp.topic, tp.partition, pl72.start_time)]
122134
)[0])
135+
136+
pl72_timestamp_s = pl72.start_time / 1000
137+
138+
data.total_events = 0
139+
data.total_event_messages = 0
140+
data.total_event_megabytes = 0
141+
data.largest_kafka_timestamp = pl72_timestamp_s
142+
data.most_recent_kafka_timestamp = pl72_timestamp_s
143+
data.start_time = pl72_timestamp_s

0 commit comments

Comments
 (0)