Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 90 additions & 2 deletions k8s/plugin/src/resources/diskpool_cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ mod quantity {
);
}

use anyhow::anyhow;
use crd::DiskPool;
use http::StatusCode;
use kube::{
api::{Api, DeleteParams},
api::{Api, DeleteParams, ListParams},
runtime::wait::{await_condition, conditions::is_deleted},
Client, ResourceExt,
};
use snafu::Snafu;

/// Errors that can occur during DiskPool CR cleanup.
#[derive(Debug, Snafu)]
pub enum CleanupError {
Expand All @@ -52,6 +52,94 @@ pub enum CleanupError {
},
}

/// List the names of all DiskPool CRs in `namespace` whose `spec.node` matches
/// `node_id`.
///
/// This queries Kubernetes directly, so it works even when the control-plane
/// node spec has already been deleted (i.e. `get_node_pools` via REST would
/// return NOT_FOUND or an empty list).
///
/// Results are fetched in pages to avoid a single large response.
pub async fn list_diskpool_ids_for_node(
client: Client,
namespace: &str,
node_id: &str,
) -> Result<Vec<String>, CleanupError> {
let api: Api<DiskPool> = Api::namespaced(client, namespace);
let max_entries: u32 = 500;
let mut params = ListParams::default().limit(max_entries);
let mut pool_ids = Vec::with_capacity(max_entries as usize);

loop {
let page = api
.list(&params)
.await
.map_err(|source| CleanupError::Kube {
source,
namespace: namespace.into(),
})?;

for pool in page.items {
if pool.spec.node == node_id {
if let Some(name) = pool.metadata.name {
pool_ids.push(name);
}
}
}

match page.metadata.continue_.as_deref() {
Some("") | None => break,
Some(token) => params = params.continue_token(token),
}
}

Ok(pool_ids)
}

/// Delete a DiskPool CR, applying a timeout and not-found semantics.
///
/// This is the high-level entry point used by the plugin's delete commands.
/// It wraps [`delete_diskpool_cr`] with:
///
/// - A `timeout` enforced via [`tokio::time::timeout`].
/// - `pool_deleted`: whether the backing pool spec was already removed via
/// REST. When `true`, a missing CR is acceptable (already clean). When
/// both are absent the operation is an error unless `ignore_not_found`.
/// - `ignore_not_found`: suppress the "both missing" error, matching the
/// `--ignore-not-found` CLI flag.
pub async fn cleanup_dsp(
client: Client,
namespace: &str,
pool_id: &str,
timeout: humantime::Duration,
pool_deleted: bool,
ignore_not_found: bool,
) -> Result<(), anyhow::Error> {
let cr_deleted = tokio::time::timeout(*timeout, delete_diskpool_cr(client, namespace, pool_id))
.await
.map_err(|_| {
anyhow!(
"Timed out after {timeout} waiting for DiskPool CR {pool_id} deletion \
in namespace {namespace}. Check that the pool operator is running."
)
})?
.map_err(|e| anyhow!("Failed to delete DiskPool {pool_id}: {e}"))?
.is_some();
Comment thread
niladrih marked this conversation as resolved.

// Both sides missing is an error (unless suppressed).
if !cr_deleted && !pool_deleted {
return if ignore_not_found {
Ok(())
} else {
Err(anyhow!(
"Pool {pool_id} not found (pool spec and DiskPool CR are both missing)"
))
};
}

Ok(())
}

/// Delete a DiskPool CR and wait for the operator to process the
/// finalizer.
///
Expand Down
210 changes: 135 additions & 75 deletions k8s/plugin/src/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use clap::Parser;
use k8s_openapi::api::core::v1 as core_v1;
use kube::api::GroupVersionKind;
use openapi::{apis::StatusCode, tower::client::Url};
use plugin::resources::{pool, snapshot, VolumeId};
use plugin::resources::{node, pool, snapshot, VolumeId};
use plugin::{
resources::{
CordonResources, DrainResources, ExpandResources, GetResources, LabelResources,
Expand Down Expand Up @@ -148,6 +148,8 @@ pub enum DeleteResources {
Upgrade(DeleteUpgradeArgs),
/// Deletes the specified pool resource.
Pool(PoolDeleteArgs),
/// Deletes the specified node and all its resources (purge).
Node(NodeDeleteArgs),
/// Deletes the specified volume resource.
Volume {
/// The id of the volume to delete.
Expand All @@ -157,25 +159,46 @@ pub enum DeleteResources {
VolumeSnapshot(snapshot::DelVolumeSnapshotArgs),
}

/// Shared DiskPool CR cleanup arguments, flattened into delete commands that
/// support `--cleanup-dsp`.
#[derive(Debug, Clone, clap::Args)]
pub struct CleanupDspArgs {
/// Also delete the DiskPool CustomResource(s) via the Kubernetes API.{n}
/// Deletes each CR and waits for the pool operator to process the
/// finalizer before returning.
#[clap(long)]
pub cleanup_dsp: bool,

/// Timeout per DiskPool CR deletion when using --cleanup-dsp.
#[clap(long, default_value = "60s")]
pub cleanup_dsp_timeout: humantime::Duration,
}

/// Arguments for deleting a pool.
///
/// Wraps the control-plane pool delete arguments and adds `--cleanup-cr`
/// Wraps the control-plane pool delete arguments and adds `--cleanup-dsp`
/// for also deleting the DiskPool CustomResource via the Kubernetes API.
#[derive(Debug, Clone, clap::Args)]
pub struct PoolDeleteArgs {
#[clap(flatten)]
rest_args: pool::DeletePoolArgs,

/// Also delete the DiskPool CustomResource via the Kubernetes API.{n}
/// Deletes the CR and waits for the pool operator to process the
/// finalizer before returning.
#[arg(long)]
cleanup_cr: bool,
#[clap(flatten)]
cleanup: CleanupDspArgs,
}

/// Arguments for deleting a node.
///
/// Wraps the control-plane node delete arguments. When `--cleanup-dsp` is set,
Comment thread
niladrih marked this conversation as resolved.
/// the DiskPool CRs for all pools that were on this node are deleted
/// concurrently after the REST node delete succeeds.
#[derive(Debug, Clone, clap::Args)]
pub struct NodeDeleteArgs {
#[clap(flatten)]
rest_args: node::DeleteNodeArgs,

/// Timeout for waiting for the DiskPool CR to be deleted by the
/// operator (used with --cleanup-cr).
#[arg(long, default_value = "60s")]
cleanup_cr_timeout: humantime::Duration,
#[clap(flatten)]
cleanup: CleanupDspArgs,
}

#[async_trait::async_trait(?Send)]
Expand Down Expand Up @@ -229,7 +252,7 @@ impl ExecuteOperation for Operations {
match &args.resource {
// todo: use generic execute trait
DeleteResources::Upgrade(res) => res.delete(&cli_args.namespace).await?,
DeleteResources::Pool(pool_args) if !pool_args.cleanup_cr => {
DeleteResources::Pool(pool_args) if !pool_args.cleanup.cleanup_dsp => {
// No cleanup — delegate entirely like Volume/VolumeSnapshot.
plugin::resources::DeleteArgs {
ignore_not_found: args.ignore_not_found,
Expand All @@ -242,8 +265,8 @@ impl ExecuteOperation for Operations {
.await?
}
DeleteResources::Pool(pool_args) => {
// --cleanup-cr: run REST delete (never swallow 404), then
// CR cleanup.
// --cleanup-dsp: run REST delete (never swallow 404), then
// DSP CR cleanup.
let rest_result = plugin::resources::DeleteArgs {
ignore_not_found: false,
yes: args.yes,
Expand All @@ -262,20 +285,110 @@ impl ExecuteOperation for Operations {
}) if source.status() == Some(StatusCode::NOT_FOUND) => false,
_ => return rest_result.map_err(Into::into),
};
cleanup_cr(cli_args, pool_args, pool_deleted, args.ignore_not_found)
.await?;
let client = cli_args.client().await?;
diskpool_cleanup::cleanup_dsp(
client,
&cli_args.namespace,
&pool_args.rest_args.pool_id,
pool_args.cleanup.cleanup_dsp_timeout,
pool_deleted,
args.ignore_not_found,
)
.await?;
}
DeleteResources::Node(node_args) if !node_args.cleanup.cleanup_dsp => {
// No DSP cleanup — delegate entirely to the control-plane
// plugin lib. show_impact, purge, accept_* flags, and the
// confirm prompt bypass are all handled there.
plugin::resources::DeleteArgs {
ignore_not_found: args.ignore_not_found,
yes: args.yes,
resource: plugin::resources::DeleteResources::Node(
node_args.rest_args.clone(),
),
}
.execute(cli_args)
.await?
}
DeleteResources::Node(node_args) => {
let node_id = &node_args.rest_args.node_id;

// REST delete first. If it fails with anything other than
// NOT_FOUND we propagate the error immediately and never
// touch the K8s API.
let rest_result = plugin::resources::DeleteArgs {
ignore_not_found: false,
yes: args.yes,
resource: plugin::resources::DeleteResources::Node(
node_args.rest_args.clone(),
),
}
.execute(cli_args)
.await;

let node_deleted = match &rest_result {
Ok(()) => true,
Err(plugin::resources::error::Error::DeleteNodeError {
source,
..
}) if source.status() == Some(StatusCode::NOT_FOUND) => false,
_ => return rest_result.map_err(Into::into),
};
Comment thread
niladrih marked this conversation as resolved.

// Only now pay for the K8s round-trip. List DiskPool CRs
// directly from Kubernetes so the lookup works even when
// the node spec or pool specs are already gone from the
// control plane.
let client = cli_args.client().await?;
let pool_ids = diskpool_cleanup::list_diskpool_ids_for_node(
client.clone(),
&cli_args.namespace,
node_id,
)
.await
.map_err(|e| {
anyhow!("Failed to list DiskPool CR(s) for node {node_id}: {e}")
})?;

// Spawn one task per pool so all DSP CR deletions run
// concurrently. kube::Client is Arc-backed, cheap to clone.
let namespace = cli_args.namespace.clone();
let timeout = node_args.cleanup.cleanup_dsp_timeout;
let ignore_not_found = args.ignore_not_found;
let mut tasks = tokio::task::JoinSet::new();

for pool_id in pool_ids {
let client = client.clone();
let namespace = namespace.clone();
tasks.spawn(async move {
diskpool_cleanup::cleanup_dsp(
client,
&namespace,
pool_id.as_str(),
timeout,
node_deleted,
ignore_not_found,
)
.await
});
}

// Collect all results; first error cancels the rest.
while let Some(result) = tasks.join_next().await {
result.map_err(|e| {
anyhow!("Failed to delete DiskPool CR(s) for node {node_id}: {e}")
})??;
}
}
DeleteResources::Volume { id } => {
// 1. ensure PV is not present
let pv_name = format!("pvc-{id}");
let client = cli_args.pv_api().await?;
let pv = client.get_opt(&pv_name).await.map_err(|error| {
anyhow::anyhow!(
"Failed to fetch PV {pv_name} from K8s api-server: {error}"
)
anyhow!("Failed to fetch PV {pv_name} from K8s api-server: {error}")
})?;
if pv.is_some() {
return Err(Error::Generic(anyhow::anyhow!(
return Err(Error::Generic(anyhow!(
"The volume is still being referenced by PV {pv_name}"
)));
}
Expand All @@ -294,12 +407,10 @@ impl ExecuteOperation for Operations {
let vsc_name = format!("snapcontent-{}", snap_args.snapshot);
let client = cli_args.snap_content_api().await?;
let vsc = client.get_opt(&vsc_name).await.map_err(|error| {
anyhow::anyhow!(
"Failed to fetch VSC {vsc_name} from K8s api-server: {error}"
)
anyhow!("Failed to fetch VSC {vsc_name} from K8s api-server: {error}")
})?;
if vsc.is_some() {
return Err(Error::Generic(anyhow::anyhow!(
return Err(Error::Generic(anyhow!(
"The volume snapshot is still being referenced by VSC {vsc_name}"
)));
}
Expand Down Expand Up @@ -382,54 +493,3 @@ pub async fn init_rest(cli_args: &CliArgs) -> Result<(), Error> {
}
}
}

/// Delete the DiskPool CR and wait for the operator to process the
/// finalizer.
///
/// `pool_deleted`: the plugin successfully deleted the pool spec via REST.
/// `ignore_not_found`: suppress not-found warnings and errors.
///
/// If only one of pool spec or CR is missing, a warning is printed to
/// stderr but the operation succeeds. If both are missing, it is an
/// error. With `ignore_not_found`, all not-found cases are silent and
/// successful.
async fn cleanup_cr(
cli_args: &CliArgs,
pool_args: &PoolDeleteArgs,
pool_deleted: bool,
ignore_not_found: bool,
) -> Result<(), Error> {
use diskpool_cleanup::delete_diskpool_cr;

let pool_id = &pool_args.rest_args.pool_id;
let namespace = &cli_args.namespace;
let timeout = *pool_args.cleanup_cr_timeout;
let client = cli_args.client().await?;

let result = tokio::time::timeout(timeout, delete_diskpool_cr(client, namespace, pool_id))
.await
.map_err(|_| {
anyhow!(
"Timed out after {} waiting for DiskPool CR {pool_id} deletion \
in namespace {namespace}. Check that the pool operator is running.",
humantime::format_duration(timeout)
)
})?
.map_err(|e| anyhow!("{e}"))?;

let cr_deleted = result.is_some();

// Both missing is an error (unless ignored).
if !cr_deleted && !pool_deleted {
return if ignore_not_found {
Ok(())
} else {
Err(
anyhow!("Pool {pool_id} not found (pool spec and DiskPool CR are both missing)")
.into(),
)
};
}

Ok(())
}
Loading