Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/stackable-operator/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- Add `ProbeBuilder` to build Kubernetes container probes ([#1078]).

### Changed

- BREAKING: The `ResolvedProductImage` field `app_version_label` was renamed to `app_version_label_value` to match changes to its type ([#1076]).
Expand All @@ -14,6 +18,7 @@ All notable changes to this project will be documented in this file.
This is the case when referencing custom images via a `@sha256:...` hash. As such, the `product_image_selection::resolve` function is now fallible ([#1076]).

[#1076]: https://github.com/stackabletech/operator-rs/pull/1076
[#1078]: https://github.com/stackabletech/operator-rs/pull/1078

## [0.94.0] - 2025-07-10

Expand Down
1 change: 1 addition & 0 deletions crates/stackable-operator/src/builder/pod/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
};

pub mod container;
pub mod probe;
pub mod resources;
pub mod security;
pub mod volume;
Expand Down
341 changes: 341 additions & 0 deletions crates/stackable-operator/src/builder/pod/probe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
//! Kubernetes [`Probe`] builder.
//!
//! The upstream [`Probe`] struct does not prevent invalid probe configurations
//! which leads to surprises at runtime which can be deeply hidden.
//! You need to specify at least an action and interval (in this order).
//!
//! ### Usage example
//!
//! ```
//! use stackable_operator::{
//! builder::pod::probe::ProbeBuilder,
//! time::Duration,
//! };
//! # use k8s_openapi::api::core::v1::HTTPGetAction;
//! # use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
//!
//! let probe = ProbeBuilder::http_get_port_scheme_path(8080, None, None)
//! .with_period(Duration::from_secs(10))
//! .build()
//! .expect("failed to build probe");
//!
//! assert_eq!(
//! probe.http_get,
//! Some(HTTPGetAction {
//! port: IntOrString::Int(8080),
//! ..Default::default()
//! })
//! );
//! assert_eq!(probe.period_seconds, Some(10));
//! ```

use std::num::TryFromIntError;

use k8s_openapi::{
api::core::v1::{ExecAction, GRPCAction, HTTPGetAction, Probe, TCPSocketAction},
apimachinery::pkg::util::intstr::IntOrString,
};
use snafu::{ResultExt, Snafu, ensure};

use crate::time::Duration;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"The probe's {field:?} duration of {duration} is too long, as it's seconds doesn't fit into an i32"
))]
DurationTooLong {
source: TryFromIntError,
field: String,
duration: Duration,
},

#[snafu(display("The probe period is zero, but it needs to be a positive duration"))]
PeriodIsZero {},
}

#[derive(Debug)]
pub struct ProbeBuilder<Action, Period> {
// Mandatory field
action: Action,
period: Period,

// Fields with defaults
success_threshold: i32,
failure_threshold: i32,
timeout: Duration,
initial_delay: Duration,
termination_grace_period: Duration,
}

/// Available probes
///
/// Only one probe can be configured at a time. For more details about each
/// type, see [container-probes] documentation.
///
/// [container-probes]: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#container-probes
pub enum ProbeAction {
Exec(ExecAction),
Grpc(GRPCAction),
HttpGet(HTTPGetAction),
TcpSocket(TCPSocketAction),
}

impl ProbeBuilder<(), ()> {
/// This probe action executes the specified command
pub fn exec_command(
command: impl IntoIterator<Item = impl Into<String>>,
) -> ProbeBuilder<ProbeAction, ()> {
Self::exec(ExecAction {
command: Some(command.into_iter().map(Into::into).collect()),
})
}

// Note: Ideally we also have a builder for `HTTPGetAction`, but that is lot's of effort we
// don't want to spend now.
/// This probe action does an HTTP GET request to the specified port. Optionally, you can
/// configure a scheme and path, otherwise the Kubernetes default is used.
pub fn http_get_port_scheme_path(
port: u16,
scheme: Option<String>,
path: Option<String>,
) -> ProbeBuilder<ProbeAction, ()> {
Self::http_get(HTTPGetAction {
path,
scheme,
port: IntOrString::Int(port.into()),
..Default::default()
})
}

/// Set's an [`ExecAction`] as probe.
///
/// You likely want to use [`Self::exec_command`] whenever possible.
pub fn exec(exec_action: ExecAction) -> ProbeBuilder<ProbeAction, ()> {
Self::action(ProbeAction::Exec(exec_action))
}

/// Set's an [`GRPCAction`] as probe.
pub fn grpc(grpc_action: GRPCAction) -> ProbeBuilder<ProbeAction, ()> {
Self::action(ProbeAction::Grpc(grpc_action))
}

/// Set's an [`HTTPGetAction`] as probe.
///
/// For simple cases, there is a a convenience helper: [`Self::http_get_port_scheme_path`].
pub fn http_get(http_get_action: HTTPGetAction) -> ProbeBuilder<ProbeAction, ()> {
Self::action(ProbeAction::HttpGet(http_get_action))
}

/// Set's an [`TCPSocketAction`] as probe.
pub fn tcp_socket(tcp_socket_action: TCPSocketAction) -> ProbeBuilder<ProbeAction, ()> {
Self::action(ProbeAction::TcpSocket(tcp_socket_action))
}

/// Incase you already have an [`ProbeAction`] enum variant you can pass that here.
///
/// If not, it is recommended to use one of the specialized functions such as [`Self::exec`],
/// [`Self::grpc`], [`Self::http_get`] or [`Self::tcp_socket`] or their helper functions.
pub fn action(action: ProbeAction) -> ProbeBuilder<ProbeAction, ()> {
ProbeBuilder {
action,
period: (),
// The following values match the Kubernetes defaults
success_threshold: 1,
failure_threshold: 1,
timeout: Duration::from_secs(1),
initial_delay: Duration::from_secs(0),
termination_grace_period: Duration::from_secs(0),
}
}
}

impl ProbeBuilder<ProbeAction, ()> {
/// The period/interval in which the probe should be executed.
pub fn with_period(self, period: Duration) -> ProbeBuilder<ProbeAction, Duration> {
let Self {
action,
period: (),
success_threshold,
failure_threshold,
timeout,
initial_delay,
termination_grace_period,
} = self;

ProbeBuilder {
action,
period,
success_threshold,
failure_threshold,
timeout,
initial_delay,
termination_grace_period,
}
}
}

impl ProbeBuilder<ProbeAction, Duration> {
/// How often the probe must succeed before being considered successful.
pub fn with_success_threshold(mut self, success_threshold: i32) -> Self {
self.success_threshold = success_threshold;
self
}

/// The duration the probe needs to succeed before being considered successful.
///
/// This internally calculates the needed failure threshold based on the period and passes that
/// to [`Self::with_success_threshold`].
///
/// This function returns an [`Error::PeriodIsZero`] error in case the period is zero, as it
/// can not divide by zero.
pub fn with_success_threshold_duration(
self,
success_threshold_duration: Duration,
) -> Result<Self, Error> {
ensure!(self.period.as_nanos() != 0, PeriodIsZeroSnafu);

// SAFETY: Period is checked above to be non-zero
let success_threshold = success_threshold_duration.div_duration_f32(*self.period);
Ok(self.with_success_threshold(success_threshold.ceil() as i32))
}

/// How often the probe must fail before being considered failed.
pub fn with_failure_threshold(mut self, failure_threshold: i32) -> Self {
self.failure_threshold = failure_threshold;
self
}

pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}

pub fn with_initial_delay(mut self, initial_delay: Duration) -> Self {
self.initial_delay = initial_delay;
self
}

pub fn with_termination_grace_period(mut self, termination_grace_period: Duration) -> Self {
self.termination_grace_period = termination_grace_period;
self
}

/// The duration the probe needs to fail before being considered failed.
///
/// This internally calculates the needed failure threshold based on the period and passes that
/// to [`Self::with_failure_threshold`].
///
/// This function returns an [`Error::PeriodIsZero`] error in case the period is zero, as it
/// can not divide by zero.
pub fn with_failure_threshold_duration(
self,
failure_threshold_duration: Duration,
) -> Result<Self, Error> {
ensure!(self.period.as_nanos() != 0, PeriodIsZeroSnafu);

// SAFETY: Period is checked above to be non-zero
let failure_threshold = failure_threshold_duration.div_duration_f32(*self.period);
Ok(self.with_failure_threshold(failure_threshold.ceil() as i32))
}

pub fn build(self) -> Result<Probe, Error> {
let mut probe = Probe {
exec: None,
failure_threshold: Some(self.failure_threshold),
grpc: None,
http_get: None,
initial_delay_seconds: Some(self.initial_delay.as_secs().try_into().context(
DurationTooLongSnafu {
field: "initialDelay",
duration: self.initial_delay,
},
)?),
period_seconds: Some(self.period.as_secs().try_into().context(
DurationTooLongSnafu {
field: "period",
duration: self.period,
},
)?),
success_threshold: Some(self.success_threshold),
tcp_socket: None,
termination_grace_period_seconds: Some(
self.termination_grace_period.as_secs().try_into().context(
DurationTooLongSnafu {
field: "terminationGracePeriod",
duration: self.termination_grace_period,
},
)?,
),
timeout_seconds: Some(self.timeout.as_secs().try_into().context(
DurationTooLongSnafu {
field: "timeout",
duration: self.timeout,
},
)?),
};

match self.action {
ProbeAction::Exec(exec_action) => probe.exec = Some(exec_action),
ProbeAction::Grpc(grpc_action) => probe.grpc = Some(grpc_action),
ProbeAction::HttpGet(http_get_action) => probe.http_get = Some(http_get_action),
ProbeAction::TcpSocket(tcp_socket_action) => probe.tcp_socket = Some(tcp_socket_action),
}

Ok(probe)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_probe_builder_minimal() {
let probe = ProbeBuilder::http_get_port_scheme_path(8080, None, None)
.with_period(Duration::from_secs(10))
.build()
.expect("Valid inputs must produce a Probe");

assert_eq!(
probe.http_get,
Some(HTTPGetAction {
port: IntOrString::Int(8080),
..Default::default()
})
);
assert_eq!(probe.period_seconds, Some(10));
}

#[test]
fn test_probe_builder_complex() {
let probe = ProbeBuilder::exec_command(["sleep", "1"])
.with_period(Duration::from_secs(5))
.with_success_threshold(2)
.with_failure_threshold_duration(Duration::from_secs(33))
.expect("The period is always non-zero")
.with_timeout(Duration::from_secs(3))
.with_initial_delay(Duration::from_secs(7))
.with_termination_grace_period(Duration::from_secs(4))
.build()
.expect("Valid inputs must produce a Probe");

assert_eq!(
probe,
Probe {
exec: Some(ExecAction {
command: Some(vec!["sleep".to_owned(), "1".to_owned()])
}),
failure_threshold: Some(7),
grpc: None,
http_get: None,
initial_delay_seconds: Some(7),
period_seconds: Some(5),
success_threshold: Some(2),
tcp_socket: None,
termination_grace_period_seconds: Some(4),
timeout_seconds: Some(3),
}
);
}
}