Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/stackable-operator/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- Add `ProbeBuilder` to build Kubernetes container probes ([#1078]).

### Changed

- BREAKING: The `ResolvedProductImage` field `app_version_label` was renamed to `app_version_label_value` to match changes to its type ([#1076]).
Expand All @@ -14,6 +18,7 @@ All notable changes to this project will be documented in this file.
This is the case when referencing custom images via a `@sha256:...` hash. As such, the `product_image_selection::resolve` function is now fallible ([#1076]).

[#1076]: https://github.com/stackabletech/operator-rs/pull/1076
[#1078]: https://github.com/stackabletech/operator-rs/pull/1078

## [0.94.0] - 2025-07-10

Expand Down
1 change: 1 addition & 0 deletions crates/stackable-operator/src/builder/pod/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
};

pub mod container;
pub mod probe;
pub mod resources;
pub mod security;
pub mod volume;
Expand Down
330 changes: 330 additions & 0 deletions crates/stackable-operator/src/builder/pod/probe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
use std::num::TryFromIntError;

use k8s_openapi::{
api::core::v1::{ExecAction, GRPCAction, HTTPGetAction, Probe, TCPSocketAction},
apimachinery::pkg::util::intstr::IntOrString,
};
use snafu::{ResultExt, Snafu, ensure};

use crate::time::Duration;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"The probe's {field:?} duration of {duration} is too long, as it's seconds doesn't fit into an i32"
))]
DurationTooLong {
source: TryFromIntError,
field: String,
duration: Duration,
},

#[snafu(display("The probe period is zero, but it needs to be a positive duration"))]
PeriodIsZero {},
}

#[derive(Debug)]
pub struct ProbeBuilder<Action, Period> {
// Mandatory field
action: Action,
period: Period,

// Fields with defaults
success_threshold: i32,
failure_threshold: i32,
timeout: Duration,
initial_delay: Duration,
termination_grace_period: Duration,
}

impl Default for ProbeBuilder<(), ()> {
fn default() -> Self {
Self {
action: (),
period: (),
// The following values match the Kubernetes defaults
success_threshold: 1,
failure_threshold: 1,
timeout: Duration::from_secs(1),
initial_delay: Duration::from_secs(0),
termination_grace_period: Duration::from_secs(0),
}
}
}

pub enum ProbeAction {
Exec(ExecAction),
Grpc(GRPCAction),
HttpGet(HTTPGetAction),
TcpSocket(TCPSocketAction),
}

impl ProbeBuilder<(), ()> {
/// This probe action executes the specified command
pub fn with_exec_action_helper(
self,
command: impl IntoIterator<Item = impl Into<String>>,
) -> ProbeBuilder<ProbeAction, ()> {
self.with_exec_action(ExecAction {
command: Some(command.into_iter().map(Into::into).collect()),
})
}

/// There is a convenience helper: [`Self::with_exec_action_helper`].
pub fn with_exec_action(self, exec_action: ExecAction) -> ProbeBuilder<ProbeAction, ()> {
self.with_action(ProbeAction::Exec(exec_action))
}

pub fn with_grpc_action(self, grpc_action: GRPCAction) -> ProbeBuilder<ProbeAction, ()> {
self.with_action(ProbeAction::Grpc(grpc_action))
}

// Note: Ideally we also have a builder for `HTTPGetAction`, but that is lot's of effort we
// don't want to spend now.
/// This probe action does an HTTP GET request to the specified port. Optionally, you can
/// configure the path, otherwise the Kubernetes default is used.
pub fn with_http_get_action_helper(
self,
port: u16,
scheme: Option<String>,
path: Option<String>,
) -> ProbeBuilder<ProbeAction, ()> {
self.with_http_get_action(HTTPGetAction {
path,
scheme,
port: IntOrString::Int(port.into()),
..Default::default()
})
}

/// There is a convenience helper: [`Self::with_http_get_action_helper`].
pub fn with_http_get_action(
self,
http_get_action: HTTPGetAction,
) -> ProbeBuilder<ProbeAction, ()> {
self.with_action(ProbeAction::HttpGet(http_get_action))
}

pub fn with_tcp_socket_action(
self,
tcp_socket_action: TCPSocketAction,
) -> ProbeBuilder<ProbeAction, ()> {
self.with_action(ProbeAction::TcpSocket(tcp_socket_action))
}

/// Action-specific functions (e.g. [`Self::with_exec_action`] or [`Self::with_http_get_action`])
/// are recommended instead.
pub fn with_action(self, action: ProbeAction) -> ProbeBuilder<ProbeAction, ()> {
let Self {
action: (),
period,
success_threshold,
failure_threshold,
timeout,
initial_delay,
termination_grace_period,
} = self;

ProbeBuilder {
action,
period,
success_threshold,
failure_threshold,
timeout,
initial_delay,
termination_grace_period,
}
}
}

impl ProbeBuilder<ProbeAction, ()> {
/// The period/interval in which the probe should be executed.
pub fn with_period(self, period: Duration) -> ProbeBuilder<ProbeAction, Duration> {
let Self {
action,
period: (),
success_threshold,
failure_threshold,
timeout,
initial_delay,
termination_grace_period,
} = self;

ProbeBuilder {
action,
period,
success_threshold,
failure_threshold,
timeout,
initial_delay,
termination_grace_period,
}
}
}

impl ProbeBuilder<ProbeAction, Duration> {
/// How often the probe must succeed before being considered successful.
pub fn with_success_threshold(mut self, success_threshold: i32) -> Self {
self.success_threshold = success_threshold;
self
}

/// The duration the probe needs to succeed before being considered successful.
///
/// This internally calculates the needed failure threshold based on the period and passes that
/// to [`Self::with_success_threshold`].
///
/// This function returns an [`Error::PeriodIsZero`] error in case the period is zero, as it
/// can not divide by zero.
pub fn with_success_threshold_duration(
self,
success_threshold_duration: Duration,
) -> Result<Self, Error> {
ensure!(self.period.as_nanos() != 0, PeriodIsZeroSnafu);

// SAFETY: Period is checked above to be non-zero
let success_threshold = success_threshold_duration.div_duration_f32(*self.period);
Ok(self.with_success_threshold(success_threshold.ceil() as i32))
}

/// How often the probe must fail before being considered failed.
pub fn with_failure_threshold(mut self, failure_threshold: i32) -> Self {
self.failure_threshold = failure_threshold;
self
}

pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}

pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
self.initial_delay = initial_delay;
self
}

pub fn with_termination_grace_period(mut self, termination_grace_period: Duration) -> Self {
self.termination_grace_period = termination_grace_period;
self
}

/// The duration the probe needs to fail before being considered failed.
///
/// This internally calculates the needed failure threshold based on the period and passes that
/// to [`Self::with_failure_threshold`].
///
/// This function returns an [`Error::PeriodIsZero`] error in case the period is zero, as it
/// can not divide by zero.
pub fn with_failure_threshold_duration(
self,
failure_threshold_duration: Duration,
) -> Result<Self, Error> {
ensure!(self.period.as_nanos() != 0, PeriodIsZeroSnafu);

// SAFETY: Period is checked above to be non-zero
let failure_threshold = failure_threshold_duration.div_duration_f32(*self.period);
Ok(self.with_failure_threshold(failure_threshold.ceil() as i32))
}

pub fn build(self) -> Result<Probe, Error> {
let mut probe = Probe {
exec: None,
failure_threshold: Some(self.failure_threshold),
grpc: None,
http_get: None,
initial_delay_seconds: Some(self.initial_delay.as_secs().try_into().context(
DurationTooLongSnafu {
field: "initialDelay",
duration: self.initial_delay,
},
)?),
period_seconds: Some(self.period.as_secs().try_into().context(
DurationTooLongSnafu {
field: "period",
duration: self.period,
},
)?),
success_threshold: Some(self.success_threshold),
tcp_socket: None,
termination_grace_period_seconds: Some(
self.termination_grace_period.as_secs().try_into().context(
DurationTooLongSnafu {
field: "terminationGracePeriod",
duration: self.termination_grace_period,
},
)?,
),
timeout_seconds: Some(self.timeout.as_secs().try_into().context(
DurationTooLongSnafu {
field: "timeout",
duration: self.timeout,
},
)?),
};

match self.action {
ProbeAction::Exec(exec_action) => probe.exec = Some(exec_action),
ProbeAction::Grpc(grpc_action) => probe.grpc = Some(grpc_action),
ProbeAction::HttpGet(http_get_action) => probe.http_get = Some(http_get_action),
ProbeAction::TcpSocket(tcp_socket_action) => probe.tcp_socket = Some(tcp_socket_action),
}

Ok(probe)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_probe_builder_minimal() {
let probe = ProbeBuilder::default()
.with_http_get_action_helper(8080, None, None)
.with_period(Duration::from_secs(10))
.build()
.expect("Valid inputs must produce a Probe");

assert_eq!(
probe.http_get,
Some(HTTPGetAction {
port: IntOrString::Int(8080),
..Default::default()
})
);
assert_eq!(probe.period_seconds, Some(10));
}

#[test]
fn test_probe_builder_complex() {
let probe = ProbeBuilder::default()
.with_exec_action_helper(["sleep", "1"])
.with_period(Duration::from_secs(5))
.with_success_threshold(2)
.with_failure_threshold_duration(Duration::from_secs(33))
.expect("The period is always non-zero")
.with_timeout(Duration::from_secs(3))
.with_initial_delay(Duration::from_secs(7))
.with_termination_grace_period(Duration::from_secs(4))
.build()
.expect("Valid inputs must produce a Probe");

assert_eq!(
probe,
Probe {
exec: Some(ExecAction {
command: Some(vec!["sleep".to_owned(), "1".to_owned()])
}),
failure_threshold: Some(7),
grpc: None,
http_get: None,
initial_delay_seconds: Some(7),
period_seconds: Some(5),
success_threshold: Some(2),
tcp_socket: None,
termination_grace_period_seconds: Some(4),
timeout_seconds: Some(3),
}
);
}
}