diff --git a/core/src/lib.rs b/core/src/lib.rs index 864dfb4527a9..f7b490f9dd62 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -153,7 +153,7 @@ //! - [Performance Guide][crate::docs::performance] // Make sure all our public APIs have docs. -#![warn(missing_docs)] +#![deny(missing_docs)] // Private module with public types, they will be accessed via `opendal::Xxxx` mod types; diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs index 44deab8d9016..1c6cd9386d18 100644 --- a/core/src/raw/futures_util.rs +++ b/core/src/raw/futures_util.rs @@ -25,19 +25,17 @@ use futures::FutureExt; use crate::*; /// BoxedFuture is the type alias of [`futures::future::BoxFuture`]. -/// -/// We will switch to [`futures::future::LocalBoxFuture`] on wasm32 target. #[cfg(not(target_arch = "wasm32"))] pub type BoxedFuture<'a, T> = futures::future::BoxFuture<'a, T>; #[cfg(target_arch = "wasm32")] +/// BoxedFuture is the type alias of [`futures::future::LocalBoxFuture`]. pub type BoxedFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>; /// BoxedStaticFuture is the type alias of [`futures::future::BoxFuture`]. -/// -/// We will switch to [`futures::future::LocalBoxFuture`] on wasm32 target. #[cfg(not(target_arch = "wasm32"))] pub type BoxedStaticFuture = futures::future::BoxFuture<'static, T>; #[cfg(target_arch = "wasm32")] +/// BoxedStaticFuture is the type alias of [`futures::future::LocalBoxFuture`]. pub type BoxedStaticFuture = futures::future::LocalBoxFuture<'static, T>; /// MaybeSend is a marker to determine whether a type is `Send` or not. @@ -49,6 +47,14 @@ pub type BoxedStaticFuture = futures::future::LocalBoxFuture<'static, T>; /// And it's empty trait on wasm32 target to indicate that a type is not `Send`. #[cfg(not(target_arch = "wasm32"))] pub trait MaybeSend: Send {} + +/// MaybeSend is a marker to determine whether a type is `Send` or not. +/// We use this trait to wrap the `Send` requirement for wasm32 target. +/// +/// # Safety +/// +/// [`MaybeSend`] is equivalent to `Send` on non-wasm32 target. +/// And it's empty trait on wasm32 target to indicate that a type is not `Send`. #[cfg(target_arch = "wasm32")] pub trait MaybeSend {} diff --git a/core/src/raw/oio/list/page_list.rs b/core/src/raw/oio/list/page_list.rs index 2e146e33ce64..efdd826e3c23 100644 --- a/core/src/raw/oio/list/page_list.rs +++ b/core/src/raw/oio/list/page_list.rs @@ -36,6 +36,7 @@ pub trait PageList: Send + Sync + Unpin + 'static { #[cfg(not(target_arch = "wasm32"))] fn next_page(&self, ctx: &mut PageContext) -> impl Future> + MaybeSend; #[cfg(target_arch = "wasm32")] + /// next_page is used to fetch next page of entries from underlying storage. fn next_page(&self, ctx: &mut PageContext) -> impl Future>; } diff --git a/core/src/types/read/reader.rs b/core/src/types/read/reader.rs index 34e8cc67fb49..15c1cd46b1c8 100644 --- a/core/src/types/read/reader.rs +++ b/core/src/types/read/reader.rs @@ -20,10 +20,10 @@ use std::ops::RangeBounds; use std::sync::Arc; use bytes::BufMut; -use futures::stream; -use futures::StreamExt; use futures::TryStreamExt; +use crate::raw::Access; +use crate::raw::ConcurrentTasks; use crate::*; /// Reader is designed to read data from given path in an asynchronous @@ -146,11 +146,32 @@ impl Reader { pub async fn fetch(&self, ranges: Vec>) -> Result> { let merged_ranges = self.merge_ranges(ranges.clone()); - let merged_bufs: Vec<_> = - stream::iter(merged_ranges.clone().into_iter().map(|v| self.read(v))) - .buffered(self.ctx.options().concurrent()) - .try_collect() - .await?; + #[derive(Clone)] + struct FetchInput { + reader: Reader, + range: Range, + } + + let mut tasks = ConcurrentTasks::new( + self.ctx.accessor().info().executor(), + self.ctx.options().concurrent(), + |input: FetchInput| { + Box::pin(async move { + let FetchInput { range, reader } = input.clone(); + (input, reader.read(range).await) + }) + }, + ); + + for range in merged_ranges.clone() { + let reader = self.clone(); + tasks.execute(FetchInput { reader, range }).await?; + } + + let mut merged_bufs = vec![]; + while let Some(b) = tasks.next().await { + merged_bufs.push(b?); + } let mut bufs = Vec::with_capacity(ranges.len()); for range in ranges {