Skip to content

Commit 8968675

Browse files
tisonkunandylokandy
andcommitted
fix: patch futures buffer_by_ordered
Signed-off-by: tison <[email protected]> Co-authored-by: Andy Lok <[email protected]> Signed-off-by: tison <[email protected]>
1 parent 8de30cc commit 8968675

File tree

10 files changed

+232
-8
lines changed

10 files changed

+232
-8
lines changed

bin/oli/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ http-body = "1"
255255
log = "0.4"
256256
md-5 = "0.10"
257257
percent-encoding = "2"
258+
pin-project-lite = { version = "0.2.16"}
258259
quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] }
259260
reqwest = { version = "0.12.22", features = [
260261
"stream",

core/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,10 @@
153153
//! - [Performance Guide][crate::docs::performance]
154154
155155
// Make sure all our public APIs have docs.
156-
#![warn(missing_docs)]
156+
#![deny(missing_docs)]
157+
158+
// Private modules, they will not be accessed by users.
159+
mod patches;
157160

158161
// Private module with public types, they will be accessed via `opendal::Xxxx`
159162
mod types;
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::VecDeque;
19+
use std::fmt;
20+
use std::pin::Pin;
21+
use std::task::Context;
22+
use std::task::Poll;
23+
24+
use futures::future::Future;
25+
use futures::stream::Fuse;
26+
use futures::stream::FuturesOrdered;
27+
use futures::stream::Stream;
28+
use futures::stream::StreamExt;
29+
30+
use pin_project_lite::pin_project;
31+
32+
pin_project! {
33+
/// Stream for the [`buffer_by_ordered`] method.
34+
///
35+
/// [`buffer_by_ordered`]: crate::StreamExt::buffer_by_ordered
36+
#[must_use = "streams do nothing unless polled"]
37+
pub struct BufferByOrdered<St, F>
38+
where
39+
St: Stream<Item = (F, usize)>,
40+
F: Future,
41+
{
42+
#[pin]
43+
stream: Fuse<St>,
44+
in_progress_queue: FuturesOrdered<SizedFuture<F>>,
45+
ready_queue: VecDeque<(F::Output, usize)>,
46+
max_size: usize,
47+
current_size: usize,
48+
}
49+
}
50+
51+
impl<St, F> fmt::Debug for BufferByOrdered<St, F>
52+
where
53+
St: Stream<Item = (F, usize)> + fmt::Debug,
54+
F: Future,
55+
{
56+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57+
f.debug_struct("BufferByOrdered")
58+
.field("stream", &self.stream)
59+
.field("in_progress_queue", &self.in_progress_queue)
60+
.field("max_size", &self.max_size)
61+
.field("current_size", &self.current_size)
62+
.finish()
63+
}
64+
}
65+
66+
impl<St, F> BufferByOrdered<St, F>
67+
where
68+
St: Stream<Item = (F, usize)>,
69+
F: Future,
70+
{
71+
pub(crate) fn new(stream: St, max_size: usize) -> Self {
72+
Self {
73+
stream: stream.fuse(),
74+
in_progress_queue: FuturesOrdered::new(),
75+
ready_queue: VecDeque::new(),
76+
max_size,
77+
current_size: 0,
78+
}
79+
}
80+
}
81+
82+
impl<St, F> Stream for BufferByOrdered<St, F>
83+
where
84+
St: Stream<Item = (F, usize)>,
85+
F: Future,
86+
{
87+
type Item = F::Output;
88+
89+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90+
let mut this = self.project();
91+
92+
// First up, try to spawn off as many futures as possible by filling up
93+
// our queue of futures.
94+
while this.current_size < this.max_size {
95+
match this.stream.as_mut().poll_next(cx) {
96+
Poll::Ready(Some((future, size))) => {
97+
*this.current_size += size;
98+
this.in_progress_queue
99+
.push_back(SizedFuture { future, size });
100+
}
101+
Poll::Ready(None) => break,
102+
Poll::Pending => break,
103+
}
104+
}
105+
106+
// Try to poll all ready futures in the in_progress_queue.
107+
loop {
108+
match this.in_progress_queue.poll_next_unpin(cx) {
109+
Poll::Ready(Some(output)) => {
110+
this.ready_queue.push_back(output);
111+
}
112+
Poll::Ready(None) => break,
113+
Poll::Pending => break,
114+
}
115+
}
116+
117+
if let Some((output, size)) = this.ready_queue.pop_front() {
118+
// If we have any ready outputs, return the first one.
119+
*this.current_size -= size;
120+
Poll::Ready(Some(output))
121+
} else if this.stream.is_done() && this.in_progress_queue.is_empty() {
122+
Poll::Ready(None)
123+
} else {
124+
Poll::Pending
125+
}
126+
}
127+
128+
fn size_hint(&self) -> (usize, Option<usize>) {
129+
let queue_len = self.in_progress_queue.len() + self.ready_queue.len();
130+
let (lower, upper) = self.stream.size_hint();
131+
let lower = lower.saturating_add(queue_len);
132+
let upper = match upper {
133+
Some(x) => x.checked_add(queue_len),
134+
None => None,
135+
};
136+
(lower, upper)
137+
}
138+
}
139+
140+
pin_project! {
141+
struct SizedFuture<F> {
142+
#[pin]
143+
future: F,
144+
size: usize,
145+
}
146+
}
147+
148+
impl<F: Future> Future for SizedFuture<F> {
149+
type Output = (F::Output, usize);
150+
151+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152+
let this = self.project();
153+
match this.future.poll(cx) {
154+
Poll::Ready(output) => Poll::Ready((output, *this.size)),
155+
Poll::Pending => Poll::Pending,
156+
}
157+
}
158+
}

core/src/patches/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
pub(crate) mod buffer_by_ordered;
19+
pub(crate) mod stream;

core/src/patches/stream.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::patches::buffer_by_ordered::BufferByOrdered;
19+
20+
use futures::Stream;
21+
use std::future::Future;
22+
23+
pub trait StreamExt: Stream {
24+
fn buffer_by_ordered<F>(self, max_size: usize) -> BufferByOrdered<Self, F>
25+
where
26+
Self: Sized,
27+
Self: Stream<Item = (F, usize)>,
28+
F: Future,
29+
{
30+
BufferByOrdered::new(self, max_size)
31+
}
32+
}
33+
34+
impl<T: ?Sized> StreamExt for T where T: Stream {}

core/src/raw/futures_util.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,17 @@ use futures::FutureExt;
2525
use crate::*;
2626

2727
/// BoxedFuture is the type alias of [`futures::future::BoxFuture`].
28-
///
29-
/// We will switch to [`futures::future::LocalBoxFuture`] on wasm32 target.
3028
#[cfg(not(target_arch = "wasm32"))]
3129
pub type BoxedFuture<'a, T> = futures::future::BoxFuture<'a, T>;
3230
#[cfg(target_arch = "wasm32")]
31+
/// BoxedFuture is the type alias of [`futures::future::LocalBoxFuture`].
3332
pub type BoxedFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>;
3433

3534
/// BoxedStaticFuture is the type alias of [`futures::future::BoxFuture`].
36-
///
37-
/// We will switch to [`futures::future::LocalBoxFuture`] on wasm32 target.
3835
#[cfg(not(target_arch = "wasm32"))]
3936
pub type BoxedStaticFuture<T> = futures::future::BoxFuture<'static, T>;
4037
#[cfg(target_arch = "wasm32")]
38+
/// BoxedStaticFuture is the type alias of [`futures::future::LocalBoxFuture`].
4139
pub type BoxedStaticFuture<T> = futures::future::LocalBoxFuture<'static, T>;
4240

4341
/// MaybeSend is a marker to determine whether a type is `Send` or not.
@@ -49,6 +47,14 @@ pub type BoxedStaticFuture<T> = futures::future::LocalBoxFuture<'static, T>;
4947
/// And it's empty trait on wasm32 target to indicate that a type is not `Send`.
5048
#[cfg(not(target_arch = "wasm32"))]
5149
pub trait MaybeSend: Send {}
50+
51+
/// MaybeSend is a marker to determine whether a type is `Send` or not.
52+
/// We use this trait to wrap the `Send` requirement for wasm32 target.
53+
///
54+
/// # Safety
55+
///
56+
/// [`MaybeSend`] is equivalent to `Send` on non-wasm32 target.
57+
/// And it's empty trait on wasm32 target to indicate that a type is not `Send`.
5258
#[cfg(target_arch = "wasm32")]
5359
pub trait MaybeSend {}
5460

core/src/raw/oio/list/page_list.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub trait PageList: Send + Sync + Unpin + 'static {
3636
#[cfg(not(target_arch = "wasm32"))]
3737
fn next_page(&self, ctx: &mut PageContext) -> impl Future<Output = Result<()>> + MaybeSend;
3838
#[cfg(target_arch = "wasm32")]
39+
/// next_page is used to fetch next page of entries from underlying storage.
3940
fn next_page(&self, ctx: &mut PageContext) -> impl Future<Output = Result<()>>;
4041
}
4142

core/src/types/read/reader.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ use std::sync::Arc;
2121

2222
use bytes::BufMut;
2323
use futures::stream;
24-
use futures::StreamExt;
2524
use futures::TryStreamExt;
2625

26+
use crate::patches::stream::StreamExt;
2727
use crate::*;
2828

2929
/// Reader is designed to read data from given path in an asynchronous
@@ -147,8 +147,8 @@ impl Reader {
147147
let merged_ranges = self.merge_ranges(ranges.clone());
148148

149149
let merged_bufs: Vec<_> =
150-
stream::iter(merged_ranges.clone().into_iter().map(|v| self.read(v)))
151-
.buffered(self.ctx.options().concurrent())
150+
stream::iter(merged_ranges.clone().into_iter().map(|v| (self.read(v), 1)))
151+
.buffer_by_ordered(self.ctx.options().concurrent())
152152
.try_collect()
153153
.await?;
154154

0 commit comments

Comments
 (0)