diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index ffc91d80fda4..030721d19d24 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -16,10 +16,11 @@ // under the License. use crate::raw::oio::FlatLister; -use crate::raw::oio::PrefixLister; +use crate::raw::oio::{PrefixLister, RecursiveDeleter}; use crate::raw::*; use crate::*; use std::cmp::Ordering; +use std::collections::HashSet; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; @@ -343,7 +344,7 @@ impl LayeredAccess for CompleteAccessor { type BlockingWriter = CompleteWriter; type Lister = CompleteLister; type BlockingLister = CompleteLister; - type Deleter = A::Deleter; + type Deleter = CompleteDeleter; type BlockingDeleter = A::BlockingDeleter; fn inner(&self) -> &Self::Inner { @@ -377,7 +378,9 @@ impl LayeredAccess for CompleteAccessor { } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { - self.inner().delete().await + let (rp, d) = self.inner().delete().await?; + let d = CompleteDeleter::new(self.inner.clone(), d); + Ok((rp, d)) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { @@ -609,3 +612,44 @@ where Ok(ret) } } + +pub struct CompleteDeleter { + inner: RecursiveDeleter, + buffer: HashSet<(String, OpDelete)>, +} + +impl CompleteDeleter { + pub fn new(acc: Arc, deleter: D) -> Self { + Self { + inner: RecursiveDeleter::new(acc, deleter), + buffer: HashSet::default(), + } + } +} + +impl oio::Delete for CompleteDeleter { + fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.buffer.insert((path.to_string(), args)); + Ok(()) + } + + async fn flush(&mut self) -> Result { + if self.buffer.is_empty() { + return Ok(0); + } + + let size = self.buffer.len(); + for (path, args) in self.buffer.clone() { + if args.recursive() { + self.inner.recursive_delete(path.as_str()).await? + } else { + self.inner.delete(path.as_str(), args.clone()).await?; + } + + self.buffer.remove(&(path, args)); + } + + self.inner.flush_all().await?; + Ok(size) + } +} diff --git a/core/src/raw/oio/delete/mod.rs b/core/src/raw/oio/delete/mod.rs index 192a4235b05d..b57bc884ef11 100644 --- a/core/src/raw/oio/delete/mod.rs +++ b/core/src/raw/oio/delete/mod.rs @@ -31,3 +31,6 @@ mod one_shot_delete; pub use one_shot_delete::BlockingOneShotDelete; pub use one_shot_delete::OneShotDelete; pub use one_shot_delete::OneShotDeleter; + +mod recursive_delete; +pub use recursive_delete::RecursiveDeleter; diff --git a/core/src/raw/oio/delete/recursive_delete.rs b/core/src/raw/oio/delete/recursive_delete.rs new file mode 100644 index 000000000000..36068b6357c4 --- /dev/null +++ b/core/src/raw/oio/delete/recursive_delete.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::oio::{Delete, FlatLister, List, PrefixLister}; +use crate::raw::*; +use crate::*; +use std::sync::Arc; + +/// RecursiveDeleter is designed to recursively delete content from storage +pub struct RecursiveDeleter { + acc: Arc, + recursive: bool, + deleter: D, + max_size: usize, + cur_size: usize, +} + +impl RecursiveDeleter { + /// Create a new recursive deleter. + pub(crate) fn new(acc: Arc, deleter: D) -> Self { + let recursive = acc.info().native_capability().delete_with_recursive; + let max_size = acc.info().native_capability().delete_max_size.unwrap_or(1); + Self { + acc, + recursive, + deleter, + max_size, + cur_size: 0, + } + } + /// Recursively delete a path. + pub async fn recursive_delete(&mut self, path: &str) -> Result<()> { + if path.ends_with('/') { + return self.delete_dir(path).await; + } + + let parent = get_parent(path); + let (_, lister) = self.acc.list(parent, OpList::default()).await?; + let mut lister = PrefixLister::new(lister, path); + while let Some(entry) = lister.next().await? { + println!("Deleting {}", entry.path()); + if entry.mode().is_dir() { + self.delete_dir(entry.path()).await? + } else { + self.delete(entry.path(), OpDelete::default()).await?; + } + } + + Ok(()) + } + + async fn delete_dir(&mut self, dir: &str) -> Result<()> { + if self.recursive { + return self + .delete(dir, OpDelete::default().with_recursive(true)) + .await; + } + + let mut lister = FlatLister::new(self.acc.clone(), dir); + while let Some(entry) = lister.next().await? { + self.delete(entry.path(), OpDelete::default()).await? + } + + Ok(()) + } + + /// Delete a path + pub async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + if self.cur_size >= self.max_size { + let deleted = self.deleter.flush().await?; + self.cur_size -= deleted; + } + + self.deleter.delete(path, args)?; + self.cur_size += 1; + + Ok(()) + } + + /// Flush the deleter, returns the number of deleted items. + async fn flush(&mut self) -> Result { + let deleted = self.deleter.flush().await?; + self.cur_size -= deleted; + + Ok(deleted) + } + + /// Flush all items in the deleter. + pub async fn flush_all(&mut self) -> Result<()> { + loop { + self.flush().await?; + if self.cur_size == 0 { + break; + } + } + + Ok(()) + } +} diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index 4c07d9e92d23..96ddfcf265ba 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -43,6 +43,7 @@ impl OpCreateDir { #[derive(Debug, Clone, Default, Eq, Hash, PartialEq)] pub struct OpDelete { version: Option, + recursive: bool, } impl OpDelete { @@ -63,6 +64,17 @@ impl OpDelete { pub fn version(&self) -> Option<&str> { self.version.as_deref() } + + /// Change the recursive flag of this delete operation. + pub fn with_recursive(mut self, recursive: bool) -> Self { + self.recursive = recursive; + self + } + + /// Get the recursive flag of this delete operation. + pub fn recursive(&self) -> bool { + self.recursive + } } /// Args for `delete` operation. diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index 39811394cc1d..0d3a06eebfdf 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -167,6 +167,8 @@ pub struct Capability { pub delete_with_version: bool, /// Maximum size supported for single delete operations. pub delete_max_size: Option, + /// Indicates if recursive delete operations are supported. + pub delete_with_recursive: bool, /// Indicates if copy operations are supported. pub copy: bool, diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index 72ef6bd62049..d07e7fecc06e 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -1539,6 +1539,11 @@ impl>> FutureDelete { pub fn version(self, v: &str) -> Self { self.map(|args| args.with_version(v)) } + + /// Change the recursive flag of this delete operation. + pub fn recursive(self, v: bool) -> Self { + self.map(|args| args.with_recursive(v)) + } } /// Future that generated by [`Operator::deleter_with`]. diff --git a/core/tests/behavior/async_delete.rs b/core/tests/behavior/async_delete.rs index c88ecd66ecb2..22331de982d7 100644 --- a/core/tests/behavior/async_delete.rs +++ b/core/tests/behavior/async_delete.rs @@ -32,16 +32,16 @@ pub fn tests(op: &Operator, tests: &mut Vec) { test_delete_with_special_chars, test_delete_not_existing, test_delete_stream, - test_remove_one_file, + test_delete_one_file, test_delete_with_version, test_delete_with_not_existing_version, test_batch_delete, test_batch_delete_with_version )); - if cap.list_with_recursive { - tests.extend(async_trials!(op, test_remove_all_basic)); + if cap.list { + tests.extend(async_trials!(op, test_delete_all_basic)); if !cap.create_dir { - tests.extend(async_trials!(op, test_remove_all_with_prefix_exists)); + tests.extend(async_trials!(op, test_delete_all_with_prefix_exists)); } } } @@ -106,8 +106,8 @@ pub async fn test_delete_not_existing(op: Operator) -> Result<()> { Ok(()) } -/// Remove one file -pub async fn test_remove_one_file(op: Operator) -> Result<()> { +/// Delete one file +pub async fn test_delete_one_file(op: Operator) -> Result<()> { let (path, content, _) = TEST_FIXTURE.new_file(op.clone()); op.write(&path, content.clone()) @@ -180,7 +180,7 @@ async fn test_blocking_remove_all_with_objects( op.write(&path, content).await.expect("write must succeed"); } - op.remove_all(&parent).await?; + op.delete_with(&parent).recursive(true).await?; let found = op .lister_with(&format!("{parent}/")) @@ -197,14 +197,14 @@ async fn test_blocking_remove_all_with_objects( Ok(()) } -/// Remove all under a prefix -pub async fn test_remove_all_basic(op: Operator) -> Result<()> { +/// Delete all under a prefix +pub async fn test_delete_all_basic(op: Operator) -> Result<()> { let parent = uuid::Uuid::new_v4().to_string(); test_blocking_remove_all_with_objects(op, parent, ["a/b", "a/c", "a/d/e"]).await } -/// Remove all under a prefix, while the prefix itself is also an object -pub async fn test_remove_all_with_prefix_exists(op: Operator) -> Result<()> { +/// Delete all under a prefix, while the prefix itself is also an object +pub async fn test_delete_all_with_prefix_exists(op: Operator) -> Result<()> { let parent = uuid::Uuid::new_v4().to_string(); let (content, _) = gen_bytes(op.info().full_capability()); op.write(&parent, content) diff --git a/core/tests/behavior/async_list.rs b/core/tests/behavior/async_list.rs index 0667a914a5fe..607be4cbece4 100644 --- a/core/tests/behavior/async_list.rs +++ b/core/tests/behavior/async_list.rs @@ -47,7 +47,6 @@ pub fn tests(op: &Operator, tests: &mut Vec) { test_list_dir_with_recursive_no_trailing_slash, test_list_file_with_recursive, test_list_root_with_recursive, - test_remove_all, test_list_files_with_versions, test_list_with_versions_and_limit, test_list_with_versions_and_start_after, @@ -541,35 +540,6 @@ pub async fn test_list_file_with_recursive(op: Operator) -> Result<()> { Ok(()) } -// Remove all should remove all in this path. -pub async fn test_remove_all(op: Operator) -> Result<()> { - let parent = uuid::Uuid::new_v4().to_string(); - - let expected = [ - "x/", "x/y", "x/x/", "x/x/y", "x/x/x/", "x/x/x/y", "x/x/x/x/", - ]; - for path in expected.iter() { - if path.ends_with('/') { - op.create_dir(&format!("{parent}/{path}")).await?; - } else { - op.write(&format!("{parent}/{path}"), "test_scan").await?; - } - } - - op.remove_all(&format!("{parent}/x/")).await?; - - for path in expected.iter() { - if path.ends_with('/') { - continue; - } - assert!( - !op.exists(&format!("{parent}/{path}")).await?, - "{parent}/{path} should be removed" - ) - } - Ok(()) -} - /// Stat normal file and dir should return metadata pub async fn test_list_only(op: Operator) -> Result<()> { let mut entries = HashMap::new();