diff --git a/dependencies/control-plane b/dependencies/control-plane index 26a162479..14e9e475b 160000 --- a/dependencies/control-plane +++ b/dependencies/control-plane @@ -1 +1 @@ -Subproject commit 26a162479a7bf55157018e2aa02e144814fb3a8a +Subproject commit 14e9e475b759b324449705a96ef33a6e43137847 diff --git a/k8s/plugin/src/resources/diskpool_cleanup.rs b/k8s/plugin/src/resources/diskpool_cleanup.rs index 7fd9fb715..a40643218 100644 --- a/k8s/plugin/src/resources/diskpool_cleanup.rs +++ b/k8s/plugin/src/resources/diskpool_cleanup.rs @@ -26,15 +26,15 @@ mod quantity { ); } +use anyhow::anyhow; use crd::DiskPool; use http::StatusCode; use kube::{ - api::{Api, DeleteParams}, + api::{Api, DeleteParams, ListParams}, runtime::wait::{await_condition, conditions::is_deleted}, Client, ResourceExt, }; use snafu::Snafu; - /// Errors that can occur during DiskPool CR cleanup. #[derive(Debug, Snafu)] pub enum CleanupError { @@ -52,6 +52,94 @@ pub enum CleanupError { }, } +/// List the names of all DiskPool CRs in `namespace` whose `spec.node` matches +/// `node_id`. +/// +/// This queries Kubernetes directly, so it works even when the control-plane +/// node spec has already been deleted (i.e. `get_node_pools` via REST would +/// return NOT_FOUND or an empty list). +/// +/// Results are fetched in pages to avoid a single large response. +pub async fn list_diskpool_ids_for_node( + client: Client, + namespace: &str, + node_id: &str, +) -> Result, CleanupError> { + let api: Api = Api::namespaced(client, namespace); + let max_entries: u32 = 500; + let mut params = ListParams::default().limit(max_entries); + let mut pool_ids = Vec::with_capacity(max_entries as usize); + + loop { + let page = api + .list(¶ms) + .await + .map_err(|source| CleanupError::Kube { + source, + namespace: namespace.into(), + })?; + + for pool in page.items { + if pool.spec.node == node_id { + if let Some(name) = pool.metadata.name { + pool_ids.push(name); + } + } + } + + match page.metadata.continue_.as_deref() { + Some("") | None => break, + Some(token) => params = params.continue_token(token), + } + } + + Ok(pool_ids) +} + +/// Delete a DiskPool CR, applying a timeout and not-found semantics. +/// +/// This is the high-level entry point used by the plugin's delete commands. +/// It wraps [`delete_diskpool_cr`] with: +/// +/// - A `timeout` enforced via [`tokio::time::timeout`]. +/// - `pool_deleted`: whether the backing pool spec was already removed via +/// REST. When `true`, a missing CR is acceptable (already clean). When +/// both are absent the operation is an error unless `ignore_not_found`. +/// - `ignore_not_found`: suppress the "both missing" error, matching the +/// `--ignore-not-found` CLI flag. +pub async fn cleanup_dsp( + client: Client, + namespace: &str, + pool_id: &str, + timeout: humantime::Duration, + pool_deleted: bool, + ignore_not_found: bool, +) -> Result<(), anyhow::Error> { + let cr_deleted = tokio::time::timeout(*timeout, delete_diskpool_cr(client, namespace, pool_id)) + .await + .map_err(|_| { + anyhow!( + "Timed out after {timeout} waiting for DiskPool CR {pool_id} deletion \ + in namespace {namespace}. Check that the pool operator is running." + ) + })? + .map_err(|e| anyhow!("Failed to delete DiskPool {pool_id}: {e}"))? + .is_some(); + + // Both sides missing is an error (unless suppressed). + if !cr_deleted && !pool_deleted { + return if ignore_not_found { + Ok(()) + } else { + Err(anyhow!( + "Pool {pool_id} not found (pool spec and DiskPool CR are both missing)" + )) + }; + } + + Ok(()) +} + /// Delete a DiskPool CR and wait for the operator to process the /// finalizer. /// diff --git a/k8s/plugin/src/resources/mod.rs b/k8s/plugin/src/resources/mod.rs index f18ea0b6a..509f4c50c 100644 --- a/k8s/plugin/src/resources/mod.rs +++ b/k8s/plugin/src/resources/mod.rs @@ -5,7 +5,7 @@ use clap::Parser; use k8s_openapi::api::core::v1 as core_v1; use kube::api::GroupVersionKind; use openapi::{apis::StatusCode, tower::client::Url}; -use plugin::resources::{pool, snapshot, VolumeId}; +use plugin::resources::{node, pool, snapshot, VolumeId}; use plugin::{ resources::{ CordonResources, DrainResources, ExpandResources, GetResources, LabelResources, @@ -148,6 +148,8 @@ pub enum DeleteResources { Upgrade(DeleteUpgradeArgs), /// Deletes the specified pool resource. Pool(PoolDeleteArgs), + /// Deletes the specified node and all its resources (purge). + Node(NodeDeleteArgs), /// Deletes the specified volume resource. Volume { /// The id of the volume to delete. @@ -157,25 +159,46 @@ pub enum DeleteResources { VolumeSnapshot(snapshot::DelVolumeSnapshotArgs), } +/// Shared DiskPool CR cleanup arguments, flattened into delete commands that +/// support `--cleanup-dsp`. +#[derive(Debug, Clone, clap::Args)] +pub struct CleanupDspArgs { + /// Also delete the DiskPool CustomResource(s) via the Kubernetes API.{n} + /// Deletes each CR and waits for the pool operator to process the + /// finalizer before returning. + #[clap(long)] + pub cleanup_dsp: bool, + + /// Timeout per DiskPool CR deletion when using --cleanup-dsp. + #[clap(long, default_value = "60s")] + pub cleanup_dsp_timeout: humantime::Duration, +} + /// Arguments for deleting a pool. /// -/// Wraps the control-plane pool delete arguments and adds `--cleanup-cr` +/// Wraps the control-plane pool delete arguments and adds `--cleanup-dsp` /// for also deleting the DiskPool CustomResource via the Kubernetes API. #[derive(Debug, Clone, clap::Args)] pub struct PoolDeleteArgs { #[clap(flatten)] rest_args: pool::DeletePoolArgs, - /// Also delete the DiskPool CustomResource via the Kubernetes API.{n} - /// Deletes the CR and waits for the pool operator to process the - /// finalizer before returning. - #[arg(long)] - cleanup_cr: bool, + #[clap(flatten)] + cleanup: CleanupDspArgs, +} + +/// Arguments for deleting a node. +/// +/// Wraps the control-plane node delete arguments. When `--cleanup-dsp` is set, +/// the DiskPool CRs for all pools that were on this node are deleted +/// concurrently after the REST node delete succeeds. +#[derive(Debug, Clone, clap::Args)] +pub struct NodeDeleteArgs { + #[clap(flatten)] + rest_args: node::DeleteNodeArgs, - /// Timeout for waiting for the DiskPool CR to be deleted by the - /// operator (used with --cleanup-cr). - #[arg(long, default_value = "60s")] - cleanup_cr_timeout: humantime::Duration, + #[clap(flatten)] + cleanup: CleanupDspArgs, } #[async_trait::async_trait(?Send)] @@ -229,7 +252,7 @@ impl ExecuteOperation for Operations { match &args.resource { // todo: use generic execute trait DeleteResources::Upgrade(res) => res.delete(&cli_args.namespace).await?, - DeleteResources::Pool(pool_args) if !pool_args.cleanup_cr => { + DeleteResources::Pool(pool_args) if !pool_args.cleanup.cleanup_dsp => { // No cleanup — delegate entirely like Volume/VolumeSnapshot. plugin::resources::DeleteArgs { ignore_not_found: args.ignore_not_found, @@ -242,8 +265,8 @@ impl ExecuteOperation for Operations { .await? } DeleteResources::Pool(pool_args) => { - // --cleanup-cr: run REST delete (never swallow 404), then - // CR cleanup. + // --cleanup-dsp: run REST delete (never swallow 404), then + // DSP CR cleanup. let rest_result = plugin::resources::DeleteArgs { ignore_not_found: false, yes: args.yes, @@ -262,20 +285,110 @@ impl ExecuteOperation for Operations { }) if source.status() == Some(StatusCode::NOT_FOUND) => false, _ => return rest_result.map_err(Into::into), }; - cleanup_cr(cli_args, pool_args, pool_deleted, args.ignore_not_found) - .await?; + let client = cli_args.client().await?; + diskpool_cleanup::cleanup_dsp( + client, + &cli_args.namespace, + &pool_args.rest_args.pool_id, + pool_args.cleanup.cleanup_dsp_timeout, + pool_deleted, + args.ignore_not_found, + ) + .await?; + } + DeleteResources::Node(node_args) if !node_args.cleanup.cleanup_dsp => { + // No DSP cleanup — delegate entirely to the control-plane + // plugin lib. show_impact, purge, accept_* flags, and the + // confirm prompt bypass are all handled there. + plugin::resources::DeleteArgs { + ignore_not_found: args.ignore_not_found, + yes: args.yes, + resource: plugin::resources::DeleteResources::Node( + node_args.rest_args.clone(), + ), + } + .execute(cli_args) + .await? + } + DeleteResources::Node(node_args) => { + let node_id = &node_args.rest_args.node_id; + + // REST delete first. If it fails with anything other than + // NOT_FOUND we propagate the error immediately and never + // touch the K8s API. + let rest_result = plugin::resources::DeleteArgs { + ignore_not_found: false, + yes: args.yes, + resource: plugin::resources::DeleteResources::Node( + node_args.rest_args.clone(), + ), + } + .execute(cli_args) + .await; + + let node_deleted = match &rest_result { + Ok(()) => true, + Err(plugin::resources::error::Error::DeleteNodeError { + source, + .. + }) if source.status() == Some(StatusCode::NOT_FOUND) => false, + _ => return rest_result.map_err(Into::into), + }; + + // Only now pay for the K8s round-trip. List DiskPool CRs + // directly from Kubernetes so the lookup works even when + // the node spec or pool specs are already gone from the + // control plane. + let client = cli_args.client().await?; + let pool_ids = diskpool_cleanup::list_diskpool_ids_for_node( + client.clone(), + &cli_args.namespace, + node_id, + ) + .await + .map_err(|e| { + anyhow!("Failed to list DiskPool CR(s) for node {node_id}: {e}") + })?; + + // Spawn one task per pool so all DSP CR deletions run + // concurrently. kube::Client is Arc-backed, cheap to clone. + let namespace = cli_args.namespace.clone(); + let timeout = node_args.cleanup.cleanup_dsp_timeout; + let ignore_not_found = args.ignore_not_found; + let mut tasks = tokio::task::JoinSet::new(); + + for pool_id in pool_ids { + let client = client.clone(); + let namespace = namespace.clone(); + tasks.spawn(async move { + diskpool_cleanup::cleanup_dsp( + client, + &namespace, + pool_id.as_str(), + timeout, + node_deleted, + ignore_not_found, + ) + .await + }); + } + + // Collect all results; first error cancels the rest. + while let Some(result) = tasks.join_next().await { + result.map_err(|e| { + anyhow!("Failed to delete DiskPool CR(s) for node {node_id}: {e}") + })??; + } } DeleteResources::Volume { id } => { // 1. ensure PV is not present let pv_name = format!("pvc-{id}"); let client = cli_args.pv_api().await?; let pv = client.get_opt(&pv_name).await.map_err(|error| { - anyhow::anyhow!( - "Failed to fetch PV {pv_name} from K8s api-server: {error}" - ) + anyhow!("Failed to fetch PV {pv_name} from K8s api-server: {error}") })?; if pv.is_some() { - return Err(Error::Generic(anyhow::anyhow!( + return Err(Error::Generic(anyhow!( "The volume is still being referenced by PV {pv_name}" ))); } @@ -294,12 +407,10 @@ impl ExecuteOperation for Operations { let vsc_name = format!("snapcontent-{}", snap_args.snapshot); let client = cli_args.snap_content_api().await?; let vsc = client.get_opt(&vsc_name).await.map_err(|error| { - anyhow::anyhow!( - "Failed to fetch VSC {vsc_name} from K8s api-server: {error}" - ) + anyhow!("Failed to fetch VSC {vsc_name} from K8s api-server: {error}") })?; if vsc.is_some() { - return Err(Error::Generic(anyhow::anyhow!( + return Err(Error::Generic(anyhow!( "The volume snapshot is still being referenced by VSC {vsc_name}" ))); } @@ -382,54 +493,3 @@ pub async fn init_rest(cli_args: &CliArgs) -> Result<(), Error> { } } } - -/// Delete the DiskPool CR and wait for the operator to process the -/// finalizer. -/// -/// `pool_deleted`: the plugin successfully deleted the pool spec via REST. -/// `ignore_not_found`: suppress not-found warnings and errors. -/// -/// If only one of pool spec or CR is missing, a warning is printed to -/// stderr but the operation succeeds. If both are missing, it is an -/// error. With `ignore_not_found`, all not-found cases are silent and -/// successful. -async fn cleanup_cr( - cli_args: &CliArgs, - pool_args: &PoolDeleteArgs, - pool_deleted: bool, - ignore_not_found: bool, -) -> Result<(), Error> { - use diskpool_cleanup::delete_diskpool_cr; - - let pool_id = &pool_args.rest_args.pool_id; - let namespace = &cli_args.namespace; - let timeout = *pool_args.cleanup_cr_timeout; - let client = cli_args.client().await?; - - let result = tokio::time::timeout(timeout, delete_diskpool_cr(client, namespace, pool_id)) - .await - .map_err(|_| { - anyhow!( - "Timed out after {} waiting for DiskPool CR {pool_id} deletion \ - in namespace {namespace}. Check that the pool operator is running.", - humantime::format_duration(timeout) - ) - })? - .map_err(|e| anyhow!("{e}"))?; - - let cr_deleted = result.is_some(); - - // Both missing is an error (unless ignored). - if !cr_deleted && !pool_deleted { - return if ignore_not_found { - Ok(()) - } else { - Err( - anyhow!("Pool {pool_id} not found (pool spec and DiskPool CR are both missing)") - .into(), - ) - }; - } - - Ok(()) -}