From c4b77d86f0087ad4f25c06c79f59da3d0acdf7b7 Mon Sep 17 00:00:00 2001 From: Sehyo Chang Date: Thu, 24 Apr 2025 16:43:55 -0700 Subject: [PATCH] set tokio as default runtime, we will add support for different runtime --- Cargo.toml | 19 ++++++++----------- src/fs/bounded.rs | 2 +- src/io.rs | 2 -- src/lib.rs | 4 ---- src/native_tls.rs | 4 ++-- src/net/mod.rs | 2 +- src/net/tcp_stream.rs | 2 +- src/openssl/connector.rs | 4 ++-- src/openssl/mod.rs | 2 +- src/openssl/test.rs | 2 +- src/rust_tls.rs | 34 +++++++++++++++++----------------- src/task.rs | 29 +++++++++++++++-------------- src/test_util.rs | 2 +- src/zero_copy.rs | 17 +++++------------ 14 files changed, 55 insertions(+), 70 deletions(-) delete mode 100644 src/io.rs diff --git a/Cargo.toml b/Cargo.toml index 8f69cd97..ab1c6bc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,22 +1,20 @@ [package] name = "fluvio-future" -version = "0.7.2" -edition = "2021" +version = "0.8.0" +edition = "2024" authors = ["Fluvio Contributors "] description = "I/O futures for Fluvio project" repository = "https://github.com/infinyon/future-aio" license = "Apache-2.0" -resolver = "2" +resolver = "3" [package.metadata.docs.rs] all-features = true [features] -task = ["async-std/default", "cfg-if"] +task = ["cfg-if"] subscriber = ["tracing-subscriber", "tracing-subscriber/std", "tracing-subscriber/env-filter"] fixture = ["subscriber", "task", "fluvio-future-derive"] -task_unstable = ["task", "async-std/unstable"] -io = ["async-std/default"] sync = ["async-std/default"] future = ["async-std/default"] net = ["futures-lite", "async-net", "async-trait", "cfg-if", "futures-util/io", "socket2", "ws_stream_wasm"] @@ -26,15 +24,15 @@ native_tls = ["net", "pin-project", "async-native-tls", "dep:native-tls", "opens openssl_tls = ["net", "openssl", "openssl-sys", "pin-project", "futures-util/io"] timer = ["async-io", "pin-project", "futures-lite", "fluvio-wasm-timer"] fs = ["async-fs", "futures-lite", "pin-utils", "async-trait"] -zero_copy = ["nix", "task_unstable"] -mmap = ["fs", "memmap2", "task_unstable"] +zero_copy = ["nix"] +mmap = ["fs", "memmap2"] retry = ["timer", "cfg-if", "async-trait", "futures-util/io"] doomsday = ["task", "sync"] -tokio1 = ["async-std/tokio1"] attributes = [] [dependencies] anyhow = { version = "1.0" } +async-global-executor = { version = "3.1.0",features = ["tokio"] } async-trait = { version = "0.1.80", optional = true } cfg-if = { version = "1.0", optional = true } fluvio-future-derive = { path = "fluvio-future-derive", version = "0.1.0", optional = true } @@ -43,7 +41,7 @@ futures-util = { version = "0.3.30", optional = true } pin-project = { version = "1.1", optional = true } pin-utils = { version = "0.1.0", optional = true } thiserror = "2.0.11" -tokio = { version = "1.38", default-features = false, optional = true } +tokio = { version = "1.44.2", default-features = false} tracing = { version = "0.1.40" } tracing-subscriber = { version = "0.3.18", optional = true } @@ -52,7 +50,6 @@ async-fs = { version = "2.1", optional = true } async-io = { version = "2.3", optional = true } async-native-tls = { version = "0.5.0", optional = true } async-net = { version = "2.0", optional = true } -async-std = { version = "1.12", default-features = false, optional = true } futures-rustls = { version = "0.26.0", optional = true } memmap2 = { version = "0.9.4", optional = true } native-tls = { version = "0.2.12", optional = true } diff --git a/src/fs/bounded.rs b/src/fs/bounded.rs index 409c0a65..becddf0c 100644 --- a/src/fs/bounded.rs +++ b/src/fs/bounded.rs @@ -184,8 +184,8 @@ impl AsyncWrite for BoundedFileSink { mod tests { use std::env::temp_dir; - use std::fs::remove_file; use std::fs::File as StdFile; + use std::fs::remove_file; use std::io::Read; use std::io::SeekFrom; use std::path::Path; diff --git a/src/io.rs b/src/io.rs deleted file mode 100644 index db385e5d..00000000 --- a/src/io.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub use async_std::io::*; -pub use async_std::prelude::*; diff --git a/src/lib.rs b/src/lib.rs index 18f297d5..86ea808c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,10 +5,6 @@ pub mod file_slice; #[cfg(not(target_arch = "wasm32"))] pub mod fs; -#[cfg(feature = "io")] -#[cfg(not(target_arch = "wasm32"))] -pub mod io; - #[cfg(feature = "task")] pub mod task; diff --git a/src/native_tls.rs b/src/native_tls.rs index 7ea74463..2c6d699d 100644 --- a/src/native_tls.rs +++ b/src/native_tls.rs @@ -37,9 +37,9 @@ mod connector { use tracing::debug; use crate::net::{ - tcp_stream::{stream, stream_with_opts, SocketOpts}, AsConnectionFd, BoxReadConnection, BoxWriteConnection, ConnectionFd, DomainConnector, SplitConnection, TcpDomainConnector, + tcp_stream::{SocketOpts, stream, stream_with_opts}, }; use super::*; @@ -327,9 +327,9 @@ mod test { use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::debug; + use crate::net::TcpListener; use crate::net::certs::CertBuilder; use crate::net::tcp_stream::stream; - use crate::net::TcpListener; use crate::test_async; use crate::timer::sleep; diff --git a/src/net/mod.rs b/src/net/mod.rs index 3d64284c..1507f6f7 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -222,8 +222,8 @@ mod test { use futures_util::AsyncReadExt; use tracing::debug; - use crate::net::tcp_stream::stream; use crate::net::TcpListener; + use crate::net::tcp_stream::stream; use crate::test_async; use crate::timer::sleep; diff --git a/src/net/tcp_stream.rs b/src/net/tcp_stream.rs index 3ba84dae..f16dd8e9 100644 --- a/src/net/tcp_stream.rs +++ b/src/net/tcp_stream.rs @@ -117,8 +117,8 @@ mod tests { use bytes::BufMut; use bytes::Bytes; use bytes::BytesMut; - use futures_lite::future::zip; use futures_lite::AsyncReadExt; + use futures_lite::future::zip; use futures_util::SinkExt; use futures_util::StreamExt; use tokio_util::codec::BytesCodec; diff --git a/src/openssl/connector.rs b/src/openssl/connector.rs index 6169aaa5..a302fed1 100644 --- a/src/openssl/connector.rs +++ b/src/openssl/connector.rs @@ -11,9 +11,9 @@ use openssl::x509::verify::X509VerifyFlags; use tracing::debug; use crate::net::{ - tcp_stream::{stream, stream_with_opts, SocketOpts}, AsConnectionFd, BoxReadConnection, BoxWriteConnection, ConnectionFd, DomainConnector, SplitConnection, TcpDomainConnector, + tcp_stream::{SocketOpts, stream, stream_with_opts}, }; use super::async_to_sync_wrapper::AsyncToSyncWrapper; @@ -40,7 +40,7 @@ pub mod certs { // copied from https://github.com/sfackler/rust-native-tls/blob/master/src/imp/openssl.rs mod identity_impl { - use anyhow::{anyhow, Result}; + use anyhow::{Result, anyhow}; use openssl::pkcs12::Pkcs12; use openssl::pkey::{PKey, Private}; use openssl::x509::X509; diff --git a/src/openssl/mod.rs b/src/openssl/mod.rs index fd34b7ba..e86d1e50 100644 --- a/src/openssl/mod.rs +++ b/src/openssl/mod.rs @@ -11,7 +11,7 @@ mod test; pub use acceptor::{TlsAcceptor, TlsAcceptorBuilder}; pub use certificate::Certificate; pub use connector::{ - certs, TlsAnonymousConnector, TlsConnector, TlsConnectorBuilder, TlsDomainConnector, + TlsAnonymousConnector, TlsConnector, TlsConnectorBuilder, TlsDomainConnector, certs, }; pub use openssl::ssl::SslVerifyMode; pub use stream::TlsStream; diff --git a/src/openssl/test.rs b/src/openssl/test.rs index c3b614ec..791b833d 100644 --- a/src/openssl/test.rs +++ b/src/openssl/test.rs @@ -13,7 +13,7 @@ use tokio_util::codec::Framed; use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::debug; -use crate::net::{tcp_stream::stream, TcpListener}; +use crate::net::{TcpListener, tcp_stream::stream}; use crate::test_async; use crate::timer::sleep; diff --git a/src/rust_tls.rs b/src/rust_tls.rs index 98b538da..242849f4 100644 --- a/src/rust_tls.rs +++ b/src/rust_tls.rs @@ -1,9 +1,9 @@ use crate::net::TcpStream; -pub use futures_rustls::client::TlsStream as ClientTlsStream; -pub use futures_rustls::server::TlsStream as ServerTlsStream; pub use futures_rustls::TlsAcceptor; pub use futures_rustls::TlsConnector; +pub use futures_rustls::client::TlsStream as ClientTlsStream; +pub use futures_rustls::server::TlsStream as ServerTlsStream; pub type DefaultServerTlsStream = ServerTlsStream; pub type DefaultClientTlsStream = ClientTlsStream; @@ -40,10 +40,10 @@ mod cert { use std::io::BufReader; use std::path::Path; - use anyhow::{anyhow, Context, Result}; + use anyhow::{Context, Result, anyhow}; + use futures_rustls::rustls::RootCertStore; use futures_rustls::rustls::pki_types::CertificateDer; use futures_rustls::rustls::pki_types::PrivateKeyDer; - use futures_rustls::rustls::RootCertStore; use rustls_pemfile::certs; use rustls_pemfile::pkcs8_private_keys; @@ -104,8 +104,8 @@ mod connector { use tracing::debug; use crate::net::{ - tcp_stream::stream, AsConnectionFd, BoxReadConnection, BoxWriteConnection, ConnectionFd, - DomainConnector, SplitConnection, TcpDomainConnector, + AsConnectionFd, BoxReadConnection, BoxWriteConnection, ConnectionFd, DomainConnector, + SplitConnection, TcpDomainConnector, tcp_stream::stream, }; use super::TlsConnector; @@ -219,25 +219,25 @@ mod builder { use std::path::Path; use std::sync::Arc; + use futures_rustls::TlsAcceptor; + use futures_rustls::TlsConnector; use futures_rustls::pki_types::UnixTime; + use futures_rustls::rustls::ClientConfig; + use futures_rustls::rustls::ConfigBuilder; + use futures_rustls::rustls::Error as TlsError; + use futures_rustls::rustls::RootCertStore; + use futures_rustls::rustls::ServerConfig; + use futures_rustls::rustls::SignatureScheme; + use futures_rustls::rustls::WantsVerifier; + use futures_rustls::rustls::client::WantsClientCert; use futures_rustls::rustls::client::danger::HandshakeSignatureValid; use futures_rustls::rustls::client::danger::ServerCertVerified; use futures_rustls::rustls::client::danger::ServerCertVerifier; - use futures_rustls::rustls::client::WantsClientCert; use futures_rustls::rustls::pki_types::CertificateDer; use futures_rustls::rustls::pki_types::PrivateKeyDer; use futures_rustls::rustls::pki_types::ServerName; use futures_rustls::rustls::server::WantsServerCert; use futures_rustls::rustls::server::WebPkiClientVerifier; - use futures_rustls::rustls::ClientConfig; - use futures_rustls::rustls::ConfigBuilder; - use futures_rustls::rustls::Error as TlsError; - use futures_rustls::rustls::RootCertStore; - use futures_rustls::rustls::ServerConfig; - use futures_rustls::rustls::SignatureScheme; - use futures_rustls::rustls::WantsVerifier; - use futures_rustls::TlsAcceptor; - use futures_rustls::TlsConnector; use anyhow::{Context, Result}; use tracing::info; @@ -471,8 +471,8 @@ mod test { use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::debug; - use fluvio_future::net::tcp_stream::stream; use fluvio_future::net::TcpListener; + use fluvio_future::net::tcp_stream::stream; use fluvio_future::test_async; use fluvio_future::timer::sleep; diff --git a/src/task.rs b/src/task.rs index c215f65e..30e865ae 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,30 +1,33 @@ use std::future::Future; -use async_std::task; - -#[cfg(feature = "task_unstable")] -pub use async_std::task::spawn_local; - /// run future and wait forever /// this is typically used in the server pub fn run(spawn_closure: F) where F: Future + Send + 'static, { - task::block_on(spawn_closure); + async_global_executor::block_on(spawn_closure); } +// preserve async-std spawn behavior which is always detach task. +// to fully control task life cycle, use spawn_task cfg_if::cfg_if! { if #[cfg(target_arch = "wasm32")] { - pub use async_std::task::spawn_local as spawn; + + pub use async_global_executor::spawn_local as spawn_task; + pub fn spawn + 'static, T: 'static>(future: F){ + spawn_task(future).detach(); + } } else { - pub use async_std::task::spawn; + pub use async_global_executor::spawn as spawn_task; + pub fn spawn + Send + 'static, T: Send + 'static>(future: F) { + spawn_task(future).detach(); + } } } -#[cfg(feature = "task_unstable")] #[cfg(not(target_arch = "wasm32"))] -pub use async_std::task::spawn_blocking; +pub use async_global_executor::spawn_blocking; cfg_if::cfg_if! { if #[cfg(target_arch = "wasm32")] { @@ -33,15 +36,13 @@ cfg_if::cfg_if! { F: Future + 'static, T: 'static, { - task::block_on(f) + async_global_executor::block_on(f) } } else { - pub use async_std::task::block_on as run_block_on; + pub use async_global_executor::block_on as run_block_on; } } -pub use async_std::task::JoinHandle; - #[cfg(test)] mod basic_test { diff --git a/src/test_util.rs b/src/test_util.rs index 4f1f0453..5f8a67fc 100644 --- a/src/test_util.rs +++ b/src/test_util.rs @@ -22,8 +22,8 @@ mod test { use std::task::Context; use std::task::Poll; - use futures_lite::future::poll_fn; use futures_lite::Future; + use futures_lite::future::poll_fn; use tracing::debug; use crate::test_async; diff --git a/src/zero_copy.rs b/src/zero_copy.rs index 585c9c87..a0a98c53 100644 --- a/src/zero_copy.rs +++ b/src/zero_copy.rs @@ -4,10 +4,10 @@ use std::os::unix::io::{AsRawFd, RawFd}; use std::thread::sleep; use thiserror::Error; +use nix::Error as NixError; #[allow(unused)] use nix::libc::off_t; use nix::sys::sendfile::sendfile; -use nix::Error as NixError; use tracing::{debug, error, trace}; @@ -67,10 +67,7 @@ impl ZeroCopy { trace!( "trying: zero copy source fd: {} offset: {} len: {}, target fd: {}", - source_raw_fd, - current_offset, - to_be_transfer, - target_raw_fd + source_raw_fd, current_offset, to_be_transfer, target_raw_fd ); match sendfile( @@ -96,8 +93,7 @@ impl ZeroCopy { } else { trace!( "actual: zero copy bytes transferred: {} out of {}, ", - bytes_transferred, - size + bytes_transferred, size ); return Ok(total_transferred); @@ -138,10 +134,7 @@ impl ZeroCopy { trace!( "mac zero copy source fd: {} offset: {} len: {}, target: fd{}", - source_raw_fd, - current_offset, - to_be_transfer, - target_raw_fd + source_raw_fd, current_offset, to_be_transfer, target_raw_fd ); let (res, bytes_transferred) = sendfile( @@ -203,8 +196,8 @@ mod tests { use crate::file_slice::AsyncFileSlice; use crate::fs::AsyncFileExtension; - use crate::net::tcp_stream::stream; use crate::net::TcpListener; + use crate::net::tcp_stream::stream; use crate::timer::sleep; use crate::{fs::util as file_util, zero_copy::ZeroCopy}; use futures_lite::AsyncReadExt;