Skip to content

Commit cae173e

Browse files
authored
Migrate to DeserializeGuard (#872)
* Migrate to DeserializeGuard Part of stackabletech/issues#211 * Changelog
1 parent dc42c9b commit cae173e

File tree

4 files changed

+88
-40
lines changed

4 files changed

+88
-40
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,13 @@ All notable changes to this project will be documented in this file.
1414

1515
- Remove ZooKeeper 3.8.4 from tests and docs ([#857]).
1616

17+
### Fixed
18+
19+
- Failing to parse one `ZookeeperCluster`/`ZookeeperZnode` should no longer cause the whole operator to stop functioning ([#872]).
20+
1721
[#853]: https://github.com/stackabletech/zookeeper-operator/pull/853
1822
[#857]: https://github.com/stackabletech/zookeeper-operator/pull/857
23+
[#872]: https://github.com/stackabletech/zookeeper-operator/pull/872
1924

2025
## [24.7.0] - 2024-07-24
2126

rust/operator-binary/src/main.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ use stackable_operator::{
88
apps::v1::StatefulSet,
99
core::v1::{ConfigMap, Endpoints, Service},
1010
},
11-
kube::runtime::{reflector::ObjectRef, watcher, Controller},
11+
kube::{
12+
core::DeserializeGuard,
13+
runtime::{reflector::ObjectRef, watcher, Controller},
14+
Resource,
15+
},
1216
logging::controller::report_controller_reconciled,
1317
CustomResourceExt,
1418
};
@@ -71,36 +75,40 @@ async fn main() -> anyhow::Result<()> {
7175
let client =
7276
stackable_operator::client::create_client(Some(OPERATOR_NAME.to_string())).await?;
7377
let zk_controller_builder = Controller::new(
74-
watch_namespace.get_api::<ZookeeperCluster>(&client),
78+
watch_namespace.get_api::<DeserializeGuard<ZookeeperCluster>>(&client),
7579
watcher::Config::default(),
7680
);
7781

7882
let zk_store = zk_controller_builder.store();
7983
let zk_controller = zk_controller_builder
8084
.owns(
81-
watch_namespace.get_api::<Service>(&client),
85+
watch_namespace.get_api::<DeserializeGuard<Service>>(&client),
8286
watcher::Config::default(),
8387
)
8488
.watches(
85-
watch_namespace.get_api::<Endpoints>(&client),
89+
watch_namespace.get_api::<DeserializeGuard<Endpoints>>(&client),
8690
watcher::Config::default(),
8791
move |endpoints| {
8892
zk_store
8993
.state()
9094
.into_iter()
9195
.filter(move |zk| {
92-
zk.metadata.namespace == endpoints.metadata.namespace
93-
&& zk.server_role_service_name() == endpoints.metadata.name
96+
let Ok(zk) = &zk.0 else {
97+
return false;
98+
};
99+
let endpoints_meta = endpoints.meta();
100+
zk.metadata.namespace == endpoints_meta.namespace
101+
&& zk.server_role_service_name() == endpoints_meta.name
94102
})
95103
.map(|zk| ObjectRef::from_obj(&*zk))
96104
},
97105
)
98106
.owns(
99-
watch_namespace.get_api::<StatefulSet>(&client),
107+
watch_namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
100108
watcher::Config::default(),
101109
)
102110
.owns(
103-
watch_namespace.get_api::<ConfigMap>(&client),
111+
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
104112
watcher::Config::default(),
105113
)
106114
.shutdown_on_signal()
@@ -120,25 +128,29 @@ async fn main() -> anyhow::Result<()> {
120128
);
121129
});
122130
let znode_controller_builder = Controller::new(
123-
watch_namespace.get_api::<ZookeeperZnode>(&client),
131+
watch_namespace.get_api::<DeserializeGuard<ZookeeperZnode>>(&client),
124132
watcher::Config::default(),
125133
);
126134
let znode_store = znode_controller_builder.store();
127135
let znode_controller = znode_controller_builder
128136
.owns(
129-
watch_namespace.get_api::<ConfigMap>(&client),
137+
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
130138
watcher::Config::default(),
131139
)
132140
.watches(
133-
watch_namespace.get_api::<ZookeeperCluster>(&client),
141+
watch_namespace.get_api::<DeserializeGuard<ZookeeperCluster>>(&client),
134142
watcher::Config::default(),
135143
move |zk| {
136144
znode_store
137145
.state()
138146
.into_iter()
139147
.filter(move |znode| {
140-
zk.metadata.namespace == znode.spec.cluster_ref.namespace
141-
&& zk.metadata.name == znode.spec.cluster_ref.name
148+
let Ok(znode) = &znode.0 else {
149+
return false;
150+
};
151+
let zk_meta = zk.meta();
152+
zk_meta.namespace == znode.spec.cluster_ref.namespace
153+
&& zk_meta.name == znode.spec.cluster_ref.name
142154
})
143155
.map(|znode| ObjectRef::from_obj(&*znode))
144156
},

rust/operator-binary/src/zk_controller.rs

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ use stackable_operator::{
3838
apimachinery::pkg::apis::meta::v1::LabelSelector,
3939
DeepMerge,
4040
},
41-
kube::{api::DynamicObject, runtime::controller, Resource},
41+
kube::{
42+
api::DynamicObject,
43+
core::{error_boundary, DeserializeGuard},
44+
runtime::controller,
45+
Resource,
46+
},
4247
kvp::{Label, LabelError, Labels},
4348
logging::controller::ReconcilerError,
4449
product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config},
@@ -89,6 +94,11 @@ type Result<T, E = Error> = std::result::Result<T, E>;
8994
#[strum_discriminants(derive(IntoStaticStr))]
9095
#[allow(clippy::enum_variant_names)]
9196
pub enum Error {
97+
#[snafu(display("ZookeeperCluster object is invalid"))]
98+
InvalidZookeeperCluster {
99+
source: error_boundary::InvalidObject,
100+
},
101+
92102
#[snafu(display("crd validation failure"))]
93103
CrdValidationFailure {
94104
source: stackable_zookeeper_crd::Error,
@@ -253,6 +263,7 @@ impl ReconcilerError for Error {
253263
}
254264
fn secondary_object(&self) -> Option<ObjectRef<DynamicObject>> {
255265
match self {
266+
Error::InvalidZookeeperCluster { source: _ } => None,
256267
Error::CrdValidationFailure { .. } => None,
257268
Error::NoServerRole => None,
258269
Error::RoleParseFailure { .. } => None,
@@ -289,8 +300,15 @@ impl ReconcilerError for Error {
289300
}
290301
}
291302

292-
pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<controller::Action> {
303+
pub async fn reconcile_zk(
304+
zk: Arc<DeserializeGuard<ZookeeperCluster>>,
305+
ctx: Arc<Ctx>,
306+
) -> Result<controller::Action> {
293307
tracing::info!("Starting reconcile");
308+
let zk =
309+
zk.0.as_ref()
310+
.map_err(error_boundary::InvalidObject::clone)
311+
.context(InvalidZookeeperClusterSnafu)?;
294312
let client = &ctx.client;
295313

296314
let resolved_product_image = zk
@@ -310,7 +328,7 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
310328
let validated_config = validate_all_roles_and_groups_config(
311329
&resolved_product_image.app_version_label,
312330
&transform_all_roles_to_config(
313-
zk.as_ref(),
331+
zk,
314332
[(
315333
ZookeeperRole::Server.to_string(),
316334
(
@@ -336,16 +354,16 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
336354
.map(Cow::Borrowed)
337355
.unwrap_or_default();
338356

339-
let vector_aggregator_address = resolve_vector_aggregator_address(&zk, client)
357+
let vector_aggregator_address = resolve_vector_aggregator_address(zk, client)
340358
.await
341359
.context(ResolveVectorAggregatorAddressSnafu)?;
342360

343-
let zookeeper_security = ZookeeperSecurity::new_from_zookeeper_cluster(client, &zk)
361+
let zookeeper_security = ZookeeperSecurity::new_from_zookeeper_cluster(client, zk)
344362
.await
345363
.context(FailedToInitializeSecurityContextSnafu)?;
346364

347365
let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
348-
zk.as_ref(),
366+
zk,
349367
APP_NAME,
350368
cluster_resources
351369
.get_required_labels()
@@ -366,7 +384,7 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
366384
let server_role_service = cluster_resources
367385
.add(
368386
client,
369-
build_server_role_service(&zk, &resolved_product_image, &zookeeper_security)?,
387+
build_server_role_service(zk, &resolved_product_image, &zookeeper_security)?,
370388
)
371389
.await
372390
.context(ApplyRoleServiceSnafu)?;
@@ -381,21 +399,21 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
381399
.context(FailedToResolveConfigSnafu)?;
382400

383401
let rg_service = build_server_rolegroup_service(
384-
&zk,
402+
zk,
385403
&rolegroup,
386404
&resolved_product_image,
387405
&zookeeper_security,
388406
)?;
389407
let rg_configmap = build_server_rolegroup_config_map(
390-
&zk,
408+
zk,
391409
&rolegroup,
392410
rolegroup_config,
393411
&resolved_product_image,
394412
vector_aggregator_address.as_deref(),
395413
&zookeeper_security,
396414
)?;
397415
let rg_statefulset = build_server_rolegroup_statefulset(
398-
&zk,
416+
zk,
399417
&zk_role,
400418
&rolegroup,
401419
rolegroup_config,
@@ -431,7 +449,7 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
431449
pod_disruption_budget: pdb,
432450
}) = role_config
433451
{
434-
add_pdbs(pdb, &zk, &zk_role, client, &mut cluster_resources)
452+
add_pdbs(pdb, zk, &zk_role, client, &mut cluster_resources)
435453
.await
436454
.context(FailedToCreatePdbSnafu)?;
437455
}
@@ -440,8 +458,8 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
440458
// We don't /need/ stability, but it's still nice to avoid spurious changes where possible.
441459
let mut discovery_hash = FnvHasher::with_key(0);
442460
for discovery_cm in build_discovery_configmaps(
443-
&zk,
444-
&*zk,
461+
zk,
462+
zk,
445463
client,
446464
ZK_CONTROLLER_NAME,
447465
&server_role_service,
@@ -468,18 +486,15 @@ pub async fn reconcile_zk(zk: Arc<ZookeeperCluster>, ctx: Arc<Ctx>) -> Result<co
468486
// Serialize as a string to discourage users from trying to parse the value,
469487
// and to keep things flexible if we end up changing the hasher at some point.
470488
discovery_hash: Some(discovery_hash.finish().to_string()),
471-
conditions: compute_conditions(
472-
zk.as_ref(),
473-
&[&ss_cond_builder, &cluster_operation_cond_builder],
474-
),
489+
conditions: compute_conditions(zk, &[&ss_cond_builder, &cluster_operation_cond_builder]),
475490
};
476491

477492
cluster_resources
478493
.delete_orphaned_resources(client)
479494
.await
480495
.context(DeleteOrphansSnafu)?;
481496
client
482-
.apply_patch_status(OPERATOR_NAME, &*zk, &status)
497+
.apply_patch_status(OPERATOR_NAME, zk, &status)
483498
.await
484499
.context(ApplyStatusSnafu)?;
485500

@@ -1004,11 +1019,16 @@ fn build_server_rolegroup_statefulset(
10041019
}
10051020

10061021
pub fn error_policy(
1007-
_obj: Arc<ZookeeperCluster>,
1008-
_error: &Error,
1022+
_obj: Arc<DeserializeGuard<ZookeeperCluster>>,
1023+
error: &Error,
10091024
_ctx: Arc<Ctx>,
10101025
) -> controller::Action {
1011-
controller::Action::requeue(*Duration::from_secs(5))
1026+
match error {
1027+
// root object is invalid, will be requeued when modified anyway
1028+
Error::InvalidZookeeperCluster { .. } => controller::Action::await_change(),
1029+
1030+
_ => controller::Action::requeue(*Duration::from_secs(5)),
1031+
}
10121032
}
10131033

10141034
#[cfg(test)]

rust/operator-binary/src/znode_controller.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use stackable_operator::{
1111
kube::{
1212
self,
1313
api::ObjectMeta,
14-
core::DynamicObject,
14+
core::{error_boundary, DeserializeGuard, DynamicObject},
1515
runtime::{controller, finalizer, reflector::ObjectRef},
1616
Resource,
1717
},
@@ -40,6 +40,11 @@ pub struct Ctx {
4040
#[strum_discriminants(derive(IntoStaticStr))]
4141
#[allow(clippy::enum_variant_names)]
4242
pub enum Error {
43+
#[snafu(display("ZookeeperZnode object is invalid"))]
44+
InvalidZookeeperZnode {
45+
source: error_boundary::InvalidObject,
46+
},
47+
4348
#[snafu(display(
4449
"object is missing metadata that should be created by the Kubernetes cluster",
4550
))]
@@ -147,6 +152,7 @@ impl ReconcilerError for Error {
147152

148153
fn secondary_object(&self) -> Option<ObjectRef<DynamicObject>> {
149154
match self {
155+
Error::InvalidZookeeperZnode { source: _ } => None,
150156
Error::ObjectMissingMetadata => None,
151157
Error::InvalidZkReference => None,
152158
Error::FindZk { zk, .. } => Some(zk.clone().erase()),
@@ -168,10 +174,15 @@ impl ReconcilerError for Error {
168174
}
169175

170176
pub async fn reconcile_znode(
171-
znode: Arc<ZookeeperZnode>,
177+
znode: Arc<DeserializeGuard<ZookeeperZnode>>,
172178
ctx: Arc<Ctx>,
173179
) -> Result<controller::Action> {
174180
tracing::info!("Starting reconcile");
181+
let znode = znode
182+
.0
183+
.as_ref()
184+
.map_err(error_boundary::InvalidObject::clone)
185+
.context(InvalidZookeeperZnodeSnafu)?;
175186
let (ns, uid) = if let ObjectMeta {
176187
namespace: Some(ns),
177188
uid: Some(uid),
@@ -184,7 +195,7 @@ pub async fn reconcile_znode(
184195
};
185196
let client = &ctx.client;
186197

187-
let zk = find_zk_of_znode(client, &znode).await;
198+
let zk = find_zk_of_znode(client, znode).await;
188199
let mut default_status_updates: Option<ZookeeperZnodeStatus> = None;
189200
// Store the znode path in the status rather than the object itself, to ensure that only K8s administrators can override it
190201
let znode_path = match znode.status.as_ref().and_then(|s| s.znode_path.as_deref()) {
@@ -210,15 +221,15 @@ pub async fn reconcile_znode(
210221
if let Some(status) = default_status_updates {
211222
info!("Writing default configuration to status");
212223
ctx.client
213-
.merge_patch_status(&*znode, &status)
224+
.merge_patch_status(znode, &status)
214225
.await
215226
.context(ApplyStatusSnafu)?;
216227
}
217228

218229
finalizer(
219230
&client.get_api::<ZookeeperZnode>(&ns),
220231
&format!("{OPERATOR_NAME}/znode"),
221-
znode.clone(),
232+
Arc::new(znode.clone()),
222233
|ev| async {
223234
match ev {
224235
finalizer::Event::Apply(znode) => {
@@ -381,7 +392,7 @@ async fn find_zk_of_znode(
381392
}
382393

383394
pub fn error_policy(
384-
_obj: Arc<ZookeeperZnode>,
395+
_obj: Arc<DeserializeGuard<ZookeeperZnode>>,
385396
_error: &Error,
386397
_ctx: Arc<Ctx>,
387398
) -> controller::Action {

0 commit comments

Comments
 (0)