Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ All notable changes to this project will be documented in this file.

- Remove ZooKeeper 3.8.4 from tests and docs ([#857]).

### Fixed

- Failing to parse one `ZookeeperCluster`/`ZookeeperZnode` should no longer cause the whole operator to stop functioning ([#872]).

[#853]: https://github.com/stackabletech/zookeeper-operator/pull/853
[#857]: https://github.com/stackabletech/zookeeper-operator/pull/857
[#872]: https://github.com/stackabletech/zookeeper-operator/pull/872

## [24.7.0] - 2024-07-24

Expand Down
38 changes: 25 additions & 13 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use stackable_operator::{
apps::v1::StatefulSet,
core::v1::{ConfigMap, Endpoints, Service},
},
kube::runtime::{reflector::ObjectRef, watcher, Controller},
kube::{
core::DeserializeGuard,
runtime::{reflector::ObjectRef, watcher, Controller},
Resource,
},
logging::controller::report_controller_reconciled,
CustomResourceExt,
};
Expand Down Expand Up @@ -71,36 +75,40 @@ async fn main() -> anyhow::Result<()> {
let client =
stackable_operator::client::create_client(Some(OPERATOR_NAME.to_string())).await?;
let zk_controller_builder = Controller::new(
watch_namespace.get_api::<ZookeeperCluster>(&client),
watch_namespace.get_api::<DeserializeGuard<ZookeeperCluster>>(&client),
watcher::Config::default(),
);

let zk_store = zk_controller_builder.store();
let zk_controller = zk_controller_builder
.owns(
watch_namespace.get_api::<Service>(&client),
watch_namespace.get_api::<DeserializeGuard<Service>>(&client),
watcher::Config::default(),
)
.watches(
watch_namespace.get_api::<Endpoints>(&client),
watch_namespace.get_api::<DeserializeGuard<Endpoints>>(&client),
watcher::Config::default(),
move |endpoints| {
zk_store
.state()
.into_iter()
.filter(move |zk| {
zk.metadata.namespace == endpoints.metadata.namespace
&& zk.server_role_service_name() == endpoints.metadata.name
let Ok(zk) = &zk.0 else {
return false;
};
let endpoints_meta = endpoints.meta();
zk.metadata.namespace == endpoints_meta.namespace
&& zk.server_role_service_name() == endpoints_meta.name
})
.map(|zk| ObjectRef::from_obj(&*zk))
},
)
.owns(
watch_namespace.get_api::<StatefulSet>(&client),
watch_namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
watcher::Config::default(),
)
.owns(
watch_namespace.get_api::<ConfigMap>(&client),
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
watcher::Config::default(),
)
.shutdown_on_signal()
Expand All @@ -120,25 +128,29 @@ async fn main() -> anyhow::Result<()> {
);
});
let znode_controller_builder = Controller::new(
watch_namespace.get_api::<ZookeeperZnode>(&client),
watch_namespace.get_api::<DeserializeGuard<ZookeeperZnode>>(&client),
watcher::Config::default(),
);
let znode_store = znode_controller_builder.store();
let znode_controller = znode_controller_builder
.owns(
watch_namespace.get_api::<ConfigMap>(&client),
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
watcher::Config::default(),
)
.watches(
watch_namespace.get_api::<ZookeeperCluster>(&client),
watch_namespace.get_api::<DeserializeGuard<ZookeeperCluster>>(&client),
watcher::Config::default(),
move |zk| {
znode_store
.state()
.into_iter()
.filter(move |znode| {
zk.metadata.namespace == znode.spec.cluster_ref.namespace
&& zk.metadata.name == znode.spec.cluster_ref.name
let Ok(znode) = &znode.0 else {
return false;
};
let zk_meta = zk.meta();
zk_meta.namespace == znode.spec.cluster_ref.namespace
&& zk_meta.name == znode.spec.cluster_ref.name
})
.map(|znode| ObjectRef::from_obj(&*znode))
},
Expand Down
62 changes: 41 additions & 21 deletions rust/operator-binary/src/zk_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ use stackable_operator::{
apimachinery::pkg::apis::meta::v1::LabelSelector,
DeepMerge,
},
kube::{api::DynamicObject, runtime::controller, Resource},
kube::{
api::DynamicObject,
core::{error_boundary, DeserializeGuard},
runtime::controller,
Resource,
},
kvp::{Label, LabelError, Labels},
logging::controller::ReconcilerError,
product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config},
Expand Down Expand Up @@ -89,6 +94,11 @@ type Result<T, E = Error> = std::result::Result<T, E>;
#[strum_discriminants(derive(IntoStaticStr))]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[snafu(display("ZookeeperCluster object is invalid"))]
InvalidZookeeperCluster {
source: error_boundary::InvalidObject,
},

#[snafu(display("crd validation failure"))]
CrdValidationFailure {
source: stackable_zookeeper_crd::Error,
Expand Down Expand Up @@ -253,6 +263,7 @@ impl ReconcilerError for Error {
}
fn secondary_object(&self) -> Option<ObjectRef<DynamicObject>> {
match self {
Error::InvalidZookeeperCluster { source: _ } => None,
Error::CrdValidationFailure { .. } => None,
Error::NoServerRole => None,
Error::RoleParseFailure { .. } => None,
Expand Down Expand Up @@ -289,8 +300,15 @@ impl ReconcilerError for Error {
}
}

pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<controller::Action> {
pub async fn reconcile_zk(
zk: Arc<DeserializeGuard<ZookeeperCluster>>,
ctx: Arc<Ctx>,
) -> Result<controller::Action> {
tracing::info!("Starting reconcile");
let zk =
zk.0.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidZookeeperClusterSnafu)?;
let client = &ctx.client;

let resolved_product_image = zk
Expand All @@ -310,7 +328,7 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
let validated_config = validate_all_roles_and_groups_config(
&resolved_product_image.app_version_label,
&transform_all_roles_to_config(
zk.as_ref(),
zk,
[(
ZookeeperRole::Server.to_string(),
(
Expand All @@ -336,16 +354,16 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
.map(Cow::Borrowed)
.unwrap_or_default();

let vector_aggregator_address = resolve_vector_aggregator_address(&zk, client)
let vector_aggregator_address = resolve_vector_aggregator_address(zk, client)
.await
.context(ResolveVectorAggregatorAddressSnafu)?;

let zookeeper_security = ZookeeperSecurity::new_from_zookeeper_cluster(client, &zk)
let zookeeper_security = ZookeeperSecurity::new_from_zookeeper_cluster(client, zk)
.await
.context(FailedToInitializeSecurityContextSnafu)?;

let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
zk.as_ref(),
zk,
APP_NAME,
cluster_resources
.get_required_labels()
Expand All @@ -366,7 +384,7 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
let server_role_service = cluster_resources
.add(
client,
build_server_role_service(&zk, &resolved_product_image, &zookeeper_security)?,
build_server_role_service(zk, &resolved_product_image, &zookeeper_security)?,
)
.await
.context(ApplyRoleServiceSnafu)?;
Expand All @@ -381,21 +399,21 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
.context(FailedToResolveConfigSnafu)?;

let rg_service = build_server_rolegroup_service(
&zk,
zk,
&rolegroup,
&resolved_product_image,
&zookeeper_security,
)?;
let rg_configmap = build_server_rolegroup_config_map(
&zk,
zk,
&rolegroup,
rolegroup_config,
&resolved_product_image,
vector_aggregator_address.as_deref(),
&zookeeper_security,
)?;
let rg_statefulset = build_server_rolegroup_statefulset(
&zk,
zk,
&zk_role,
&rolegroup,
rolegroup_config,
Expand Down Expand Up @@ -431,7 +449,7 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
pod_disruption_budget: pdb,
}) = role_config
{
add_pdbs(pdb, &zk, &zk_role, client, &mut cluster_resources)
add_pdbs(pdb, zk, &zk_role, client, &mut cluster_resources)
.await
.context(FailedToCreatePdbSnafu)?;
}
Expand All @@ -440,8 +458,8 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
// We don't /need/ stability, but it's still nice to avoid spurious changes where possible.
let mut discovery_hash = FnvHasher::with_key(0);
for discovery_cm in build_discovery_configmaps(
&zk,
&*zk,
zk,
zk,
client,
ZK_CONTROLLER_NAME,
&server_role_service,
Expand All @@ -468,18 +486,15 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
// Serialize as a string to discourage users from trying to parse the value,
// and to keep things flexible if we end up changing the hasher at some point.
discovery_hash: Some(discovery_hash.finish().to_string()),
conditions: compute_conditions(
zk.as_ref(),
&[&ss_cond_builder, &cluster_operation_cond_builder],
),
conditions: compute_conditions(zk, &[&ss_cond_builder, &cluster_operation_cond_builder]),
};

cluster_resources
.delete_orphaned_resources(client)
.await
.context(DeleteOrphansSnafu)?;
client
.apply_patch_status(OPERATOR_NAME, &*zk, &status)
.apply_patch_status(OPERATOR_NAME, zk, &status)
.await
.context(ApplyStatusSnafu)?;

Expand Down Expand Up @@ -1004,11 +1019,16 @@ fn build_server_rolegroup_statefulset(
}

pub fn error_policy(
_obj: Arc<ZookeeperCluster>,
_error: &Error,
_obj: Arc<DeserializeGuard<ZookeeperCluster>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> controller::Action {
controller::Action::requeue(*Duration::from_secs(5))
match error {
// root object is invalid, will be requeued when modified anyway
Error::InvalidZookeeperCluster { .. } => controller::Action::await_change(),

_ => controller::Action::requeue(*Duration::from_secs(5)),
}
}

#[cfg(test)]
Expand Down
23 changes: 17 additions & 6 deletions rust/operator-binary/src/znode_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use stackable_operator::{
kube::{
self,
api::ObjectMeta,
core::DynamicObject,
core::{error_boundary, DeserializeGuard, DynamicObject},
runtime::{controller, finalizer, reflector::ObjectRef},
Resource,
},
Expand Down Expand Up @@ -40,6 +40,11 @@ pub struct Ctx {
#[strum_discriminants(derive(IntoStaticStr))]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[snafu(display("ZookeeperZnode object is invalid"))]
InvalidZookeeperZnode {
source: error_boundary::InvalidObject,
},

#[snafu(display(
"object is missing metadata that should be created by the Kubernetes cluster",
))]
Expand Down Expand Up @@ -147,6 +152,7 @@ impl ReconcilerError for Error {

fn secondary_object(&self) -> Option<ObjectRef<DynamicObject>> {
match self {
Error::InvalidZookeeperZnode { source: _ } => None,
Error::ObjectMissingMetadata => None,
Error::InvalidZkReference => None,
Error::FindZk { zk, .. } => Some(zk.clone().erase()),
Expand All @@ -168,10 +174,15 @@ impl ReconcilerError for Error {
}

pub async fn reconcile_znode(
znode: Arc<ZookeeperZnode>,
znode: Arc<DeserializeGuard<ZookeeperZnode>>,
ctx: Arc<Ctx>,
) -> Result<controller::Action> {
tracing::info!("Starting reconcile");
let znode = znode
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidZookeeperZnodeSnafu)?;
let (ns, uid) = if let ObjectMeta {
namespace: Some(ns),
uid: Some(uid),
Expand All @@ -184,7 +195,7 @@ pub async fn reconcile_znode(
};
let client = &ctx.client;

let zk = find_zk_of_znode(client, &znode).await;
let zk = find_zk_of_znode(client, znode).await;
let mut default_status_updates: Option<ZookeeperZnodeStatus> = None;
// Store the znode path in the status rather than the object itself, to ensure that only K8s administrators can override it
let znode_path = match znode.status.as_ref().and_then(|s| s.znode_path.as_deref()) {
Expand All @@ -210,15 +221,15 @@ pub async fn reconcile_znode(
if let Some(status) = default_status_updates {
info!("Writing default configuration to status");
ctx.client
.merge_patch_status(&*znode, &status)
.merge_patch_status(znode, &status)
.await
.context(ApplyStatusSnafu)?;
}

finalizer(
&client.get_api::<ZookeeperZnode>(&ns),
&format!("{OPERATOR_NAME}/znode"),
znode.clone(),
Arc::new(znode.clone()),
|ev| async {
match ev {
finalizer::Event::Apply(znode) => {
Expand Down Expand Up @@ -381,7 +392,7 @@ async fn find_zk_of_znode(
}

pub fn error_policy(
_obj: Arc<ZookeeperZnode>,
_obj: Arc<DeserializeGuard<ZookeeperZnode>>,
_error: &Error,
_ctx: Arc<Ctx>,
) -> controller::Action {
Expand Down
Loading