Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fuzz/fuzz_targets/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ fuzz_target!(|parts: (
root_variance_cubic: 0.0,
leap_indicator: NtpLeapIndicator::NoWarning,
accumulated_steps: NtpDuration::from_seconds(0.0),
accumulated_steps_threshold: None,
},
})),
keyset,
Expand Down
6 changes: 5 additions & 1 deletion ntp-proto/src/algorithm/kalman/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,11 @@ impl<C: NtpClock> InternalTimeSyncController for KalmanClockController<C> {
algo_config,
freq_offset,
desired_freq: 0.0,
timedata: TimeSnapshot::default(),
timedata: TimeSnapshot {
accumulated_steps_threshold: synchronization_config
.accumulated_step_panic_threshold,
..TimeSnapshot::default()
},
in_startup: true,
})
}
Expand Down
129 changes: 91 additions & 38 deletions ntp-proto/src/algorithm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ pub trait TimeSyncController: Sized + Send + Sync + 'static {
) -> Result<Self, <Self::Clock as NtpClock>::Error>;

/// Take control of the clock (should not be done in new!)
///
/// Should be callable multiple times, with subsequent calls not
/// doing anything.
fn take_control(&self) -> Result<(), <Self::Clock as NtpClock>::Error>;

/// Create a new source with given identity
Expand All @@ -182,11 +185,6 @@ pub trait TimeSyncController: Sized + Send + Sync + 'static {
measurement_noise_estimate: f64,
period: Option<f64>,
) -> Self::OneWaySourceController;
/// Notify the controller that a previous source has gone
fn remove_source(&self, id: ClockId);
/// Notify the controller that the status of a source (whether
/// or not it is usable for synchronization) has changed.
fn source_update(&self, id: ClockId, usable: bool);
/// Current synchronization state
fn synchronization_state(&self) -> (TimeSnapshot, Vec<ClockId>);
/// Run the internal watchdog and messaging.
Expand All @@ -196,13 +194,16 @@ pub trait TimeSyncController: Sized + Send + Sync + 'static {
pub struct TimeSyncControllerWrapper<T: InternalTimeSyncController> {
inner: Mutex<T>,
#[expect(clippy::type_complexity)]
messages_for_system:
Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<(ClockId, T::SourceMessage)>>>,
messages_for_system_sender: tokio::sync::mpsc::UnboundedSender<(ClockId, T::SourceMessage)>,
messages_for_system: Mutex<
Option<tokio::sync::mpsc::UnboundedReceiver<(ClockId, WrapperMessage<T::SourceMessage>)>>,
>,
messages_for_system_sender:
tokio::sync::mpsc::UnboundedSender<(ClockId, WrapperMessage<T::SourceMessage>)>,
oneway_sources: Mutex<Vec<Weak<Mutex<T::OneWaySourceController>>>>,
twoway_sources: Mutex<Vec<Weak<Mutex<T::NtpSourceController>>>>,
snapshot: Mutex<TimeSnapshot>,
used_sources: Mutex<Vec<ClockId>>,
has_taken_control: Mutex<bool>,
}

impl<T: InternalTimeSyncController> TimeSyncController for TimeSyncControllerWrapper<T> {
Expand All @@ -227,11 +228,17 @@ impl<T: InternalTimeSyncController> TimeSyncController for TimeSyncControllerWra
twoway_sources: Mutex::new(Vec::new()),
snapshot: Mutex::new(TimeSnapshot::default()),
used_sources: Mutex::new(Vec::new()),
has_taken_control: Mutex::new(false),
})
}

fn take_control(&self) -> Result<(), <Self::Clock as NtpClock>::Error> {
self.inner.lock().unwrap().take_control()
let mut has_taken_control = self.has_taken_control.lock().unwrap();
if !*has_taken_control {
self.inner.lock().unwrap().take_control()?;
*has_taken_control = true;
}
Ok(())
}

fn add_source(&self, id: ClockId, source_config: SourceConfig) -> Self::NtpSourceController {
Expand Down Expand Up @@ -274,14 +281,6 @@ impl<T: InternalTimeSyncController> TimeSyncController for TimeSyncControllerWra
wrapper
}

fn remove_source(&self, id: ClockId) {
self.inner.lock().unwrap().remove_source(id);
}

fn source_update(&self, id: ClockId, usable: bool) {
self.inner.lock().unwrap().source_update(id, usable);
}

fn synchronization_state(&self) -> (TimeSnapshot, Vec<ClockId>) {
(
*self.snapshot.lock().unwrap(),
Expand All @@ -295,23 +294,33 @@ impl<T: InternalTimeSyncController> TimeSyncController for TimeSyncControllerWra
loop {
tokio::select! {
Some((clock_id, message)) = messages_for_system.recv() => {
let update = self.inner.lock().unwrap().source_message(clock_id, message);
if let Some(source_message) = update.source_message {
for source in self.oneway_sources.lock().unwrap().iter().filter_map(Weak::upgrade) {
source.lock().unwrap().handle_message(source_message.clone());
}
for source in self.twoway_sources.lock().unwrap().iter().filter_map(Weak::upgrade) {
source.lock().unwrap().handle_message(source_message.clone());
}
}
if let Some(time_snapshot) = update.time_snapshot {
*self.snapshot.lock().unwrap() = time_snapshot;
}
if let Some(used_sources) = update.used_sources {
*self.used_sources.lock().unwrap() = used_sources;
}
if let Some(next_update) = update.next_update {
sleeper.as_mut().reset(tokio::time::Instant::now() + next_update);
match message {
WrapperMessage::SourceMessage(message) => {
let update = self.inner.lock().unwrap().source_message(clock_id, message);
if let Some(source_message) = update.source_message {
for source in self.oneway_sources.lock().unwrap().iter().filter_map(Weak::upgrade) {
source.lock().unwrap().handle_message(source_message.clone());
}
for source in self.twoway_sources.lock().unwrap().iter().filter_map(Weak::upgrade) {
source.lock().unwrap().handle_message(source_message.clone());
}
}
if let Some(time_snapshot) = update.time_snapshot {
*self.snapshot.lock().unwrap() = time_snapshot;
}
if let Some(used_sources) = update.used_sources {
*self.used_sources.lock().unwrap() = used_sources;
}
if let Some(next_update) = update.next_update {
sleeper.as_mut().reset(tokio::time::Instant::now() + next_update);
}
},
WrapperMessage::UsabilityChange(usable) => {
self.inner.lock().unwrap().source_update(clock_id, usable);
},
WrapperMessage::Dropped => {
self.inner.lock().unwrap().remove_source(clock_id);
},
}
},
_ = sleeper.as_mut() => {
Expand Down Expand Up @@ -342,15 +351,32 @@ impl<T: InternalTimeSyncController> TimeSyncController for TimeSyncControllerWra
pub trait SourceController: Sized + Send + 'static {
fn handle_measurement(&mut self, measurement: Measurement);

fn set_usable(&mut self, usable: bool);

fn desired_poll_interval(&self) -> PollInterval;

fn observe(&self) -> ObservableSourceTimedata;
}

enum WrapperMessage<SourceMessage> {
SourceMessage(SourceMessage),
UsabilityChange(bool),
Dropped,
}

pub struct OneWaySourceControllerWrapper<T: InternalSourceController<MeasurementDelay = ()>> {
id: ClockId,
inner: Arc<Mutex<T>>,
messages_for_system: tokio::sync::mpsc::UnboundedSender<(ClockId, T::SourceMessage)>,
messages_for_system:
tokio::sync::mpsc::UnboundedSender<(ClockId, WrapperMessage<T::SourceMessage>)>,
}

impl<T: InternalSourceController<MeasurementDelay = ()>> Drop for OneWaySourceControllerWrapper<T> {
fn drop(&mut self) {
self.messages_for_system
.send((self.id, WrapperMessage::Dropped))
.ok();
}
}

impl<T: InternalSourceController<MeasurementDelay = ()>> SourceController
Expand All @@ -372,10 +398,18 @@ impl<T: InternalSourceController<MeasurementDelay = ()>> SourceController
precision: measurement.precision,
})
{
self.messages_for_system.send((self.id, message)).ok();
self.messages_for_system
.send((self.id, WrapperMessage::SourceMessage(message)))
.ok();
}
}

fn set_usable(&mut self, usable: bool) {
self.messages_for_system
.send((self.id, WrapperMessage::UsabilityChange(usable)))
.ok();
}

fn desired_poll_interval(&self) -> PollInterval {
self.inner.lock().unwrap().desired_poll_interval()
}
Expand All @@ -391,7 +425,18 @@ pub struct TwoWaySourceControllerWrapper<
id: ClockId,
inner: Arc<Mutex<T>>,
last_outgoing_measurement: Option<Measurement>,
messages_for_system: tokio::sync::mpsc::UnboundedSender<(ClockId, T::SourceMessage)>,
messages_for_system:
tokio::sync::mpsc::UnboundedSender<(ClockId, WrapperMessage<T::SourceMessage>)>,
}

impl<T: InternalSourceController<MeasurementDelay = NtpDuration>> Drop
for TwoWaySourceControllerWrapper<T>
{
fn drop(&mut self) {
self.messages_for_system
.send((self.id, WrapperMessage::Dropped))
.ok();
}
}

impl<T: InternalSourceController<MeasurementDelay = NtpDuration>> SourceController
Expand Down Expand Up @@ -423,11 +468,19 @@ impl<T: InternalSourceController<MeasurementDelay = NtpDuration>> SourceControll
precision: measurement.precision,
})
{
self.messages_for_system.send((self.id, message)).ok();
self.messages_for_system
.send((self.id, WrapperMessage::SourceMessage(message)))
.ok();
}
}
}

fn set_usable(&mut self, usable: bool) {
self.messages_for_system
.send((self.id, WrapperMessage::UsabilityChange(usable)))
.ok();
}

fn desired_poll_interval(&self) -> PollInterval {
self.inner.lock().unwrap().desired_poll_interval()
}
Expand Down
8 changes: 5 additions & 3 deletions ntp-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,12 @@ mod exports {
pub use super::source::source_snapshot;
pub use super::source::{
AcceptSynchronizationError, NtpSource, NtpSourceAction, NtpSourceActionIterator,
NtpSourceSnapshot, NtpSourceUpdate, ObservableSourceState, OneWaySource, ProtocolVersion,
Reach, SourceNtsData,
NtpSourceSnapshot, ObservableSourceState, OneWaySource, ProtocolVersion, Reach,
SourceNtsData,
};
pub use super::system::{
NtpManager, NtpServerInfo, NtpSnapshot, SourceType, SystemSnapshot, TimeSnapshot,
};
pub use super::system::{NtpServerInfo, NtpSnapshot, System, SystemSnapshot, TimeSnapshot};

#[cfg(feature = "__internal-fuzz")]
pub use super::time_types::fuzz_duration_from_seconds;
Expand Down
Loading
Loading