Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
[package]
name = "fluvio-future"
version = "0.7.2"
edition = "2021"
version = "0.8.0"
edition = "2024"
authors = ["Fluvio Contributors <[email protected]>"]
description = "I/O futures for Fluvio project"
repository = "https://github.com/infinyon/future-aio"
license = "Apache-2.0"
resolver = "2"
resolver = "3"

[package.metadata.docs.rs]
all-features = true

[features]
task = ["async-std/default", "cfg-if"]
task = ["cfg-if"]
subscriber = ["tracing-subscriber", "tracing-subscriber/std", "tracing-subscriber/env-filter"]
fixture = ["subscriber", "task", "fluvio-future-derive"]
task_unstable = ["task", "async-std/unstable"]
io = ["async-std/default"]
sync = ["async-std/default"]
future = ["async-std/default"]
net = ["futures-lite", "async-net", "async-trait", "cfg-if", "futures-util/io", "socket2", "ws_stream_wasm"]
Expand All @@ -26,15 +24,15 @@ native_tls = ["net", "pin-project", "async-native-tls", "dep:native-tls", "opens
openssl_tls = ["net", "openssl", "openssl-sys", "pin-project", "futures-util/io"]
timer = ["async-io", "pin-project", "futures-lite", "fluvio-wasm-timer"]
fs = ["async-fs", "futures-lite", "pin-utils", "async-trait"]
zero_copy = ["nix", "task_unstable"]
mmap = ["fs", "memmap2", "task_unstable"]
zero_copy = ["nix"]
mmap = ["fs", "memmap2"]
retry = ["timer", "cfg-if", "async-trait", "futures-util/io"]
doomsday = ["task", "sync"]
tokio1 = ["async-std/tokio1"]
attributes = []

[dependencies]
anyhow = { version = "1.0" }
async-global-executor = { version = "3.1.0",features = ["tokio"] }
async-trait = { version = "0.1.80", optional = true }
cfg-if = { version = "1.0", optional = true }
fluvio-future-derive = { path = "fluvio-future-derive", version = "0.1.0", optional = true }
Expand All @@ -43,7 +41,7 @@ futures-util = { version = "0.3.30", optional = true }
pin-project = { version = "1.1", optional = true }
pin-utils = { version = "0.1.0", optional = true }
thiserror = "2.0.11"
tokio = { version = "1.38", default-features = false, optional = true }
tokio = { version = "1.44.2", default-features = false}
tracing = { version = "0.1.40" }
tracing-subscriber = { version = "0.3.18", optional = true }

Expand All @@ -52,7 +50,6 @@ async-fs = { version = "2.1", optional = true }
async-io = { version = "2.3", optional = true }
async-native-tls = { version = "0.5.0", optional = true }
async-net = { version = "2.0", optional = true }
async-std = { version = "1.12", default-features = false, optional = true }
futures-rustls = { version = "0.26.0", optional = true }
memmap2 = { version = "0.9.4", optional = true }
native-tls = { version = "0.2.12", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion src/fs/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ impl AsyncWrite for BoundedFileSink {
mod tests {

use std::env::temp_dir;
use std::fs::remove_file;
use std::fs::File as StdFile;
use std::fs::remove_file;
use std::io::Read;
use std::io::SeekFrom;
use std::path::Path;
Expand Down
2 changes: 0 additions & 2 deletions src/io.rs

This file was deleted.

4 changes: 0 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ pub mod file_slice;
#[cfg(not(target_arch = "wasm32"))]
pub mod fs;

#[cfg(feature = "io")]
#[cfg(not(target_arch = "wasm32"))]
pub mod io;

#[cfg(feature = "task")]
pub mod task;

Expand Down
4 changes: 2 additions & 2 deletions src/native_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ mod connector {
use tracing::debug;

use crate::net::{
tcp_stream::{stream, stream_with_opts, SocketOpts},
AsConnectionFd, BoxReadConnection, BoxWriteConnection, ConnectionFd, DomainConnector,
SplitConnection, TcpDomainConnector,
tcp_stream::{SocketOpts, stream, stream_with_opts},
};

use super::*;
Expand Down Expand Up @@ -327,9 +327,9 @@ mod test {
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;

use crate::net::TcpListener;
use crate::net::certs::CertBuilder;
use crate::net::tcp_stream::stream;
use crate::net::TcpListener;
use crate::test_async;
use crate::timer::sleep;

Expand Down
2 changes: 1 addition & 1 deletion src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ mod test {
use futures_util::AsyncReadExt;
use tracing::debug;

use crate::net::tcp_stream::stream;
use crate::net::TcpListener;
use crate::net::tcp_stream::stream;
use crate::test_async;
use crate::timer::sleep;

Expand Down
2 changes: 1 addition & 1 deletion src/net/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ mod tests {
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use futures_lite::future::zip;
use futures_lite::AsyncReadExt;
use futures_lite::future::zip;
use futures_util::SinkExt;
use futures_util::StreamExt;
use tokio_util::codec::BytesCodec;
Expand Down
4 changes: 2 additions & 2 deletions src/openssl/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use openssl::x509::verify::X509VerifyFlags;
use tracing::debug;

use crate::net::{
tcp_stream::{stream, stream_with_opts, SocketOpts},
AsConnectionFd, BoxReadConnection, BoxWriteConnection, ConnectionFd, DomainConnector,
SplitConnection, TcpDomainConnector,
tcp_stream::{SocketOpts, stream, stream_with_opts},
};

use super::async_to_sync_wrapper::AsyncToSyncWrapper;
Expand All @@ -40,7 +40,7 @@ pub mod certs {
// copied from https://github.com/sfackler/rust-native-tls/blob/master/src/imp/openssl.rs
mod identity_impl {

use anyhow::{anyhow, Result};
use anyhow::{Result, anyhow};
use openssl::pkcs12::Pkcs12;
use openssl::pkey::{PKey, Private};
use openssl::x509::X509;
Expand Down
2 changes: 1 addition & 1 deletion src/openssl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod test;
pub use acceptor::{TlsAcceptor, TlsAcceptorBuilder};
pub use certificate::Certificate;
pub use connector::{
certs, TlsAnonymousConnector, TlsConnector, TlsConnectorBuilder, TlsDomainConnector,
TlsAnonymousConnector, TlsConnector, TlsConnectorBuilder, TlsDomainConnector, certs,
};
pub use openssl::ssl::SslVerifyMode;
pub use stream::TlsStream;
Expand Down
2 changes: 1 addition & 1 deletion src/openssl/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio_util::codec::Framed;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;

use crate::net::{tcp_stream::stream, TcpListener};
use crate::net::{TcpListener, tcp_stream::stream};
use crate::test_async;
use crate::timer::sleep;

Expand Down
34 changes: 17 additions & 17 deletions src/rust_tls.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::net::TcpStream;

pub use futures_rustls::client::TlsStream as ClientTlsStream;
pub use futures_rustls::server::TlsStream as ServerTlsStream;
pub use futures_rustls::TlsAcceptor;
pub use futures_rustls::TlsConnector;
pub use futures_rustls::client::TlsStream as ClientTlsStream;
pub use futures_rustls::server::TlsStream as ServerTlsStream;

pub type DefaultServerTlsStream = ServerTlsStream<TcpStream>;
pub type DefaultClientTlsStream = ClientTlsStream<TcpStream>;
Expand Down Expand Up @@ -40,10 +40,10 @@ mod cert {
use std::io::BufReader;
use std::path::Path;

use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result, anyhow};
use futures_rustls::rustls::RootCertStore;
use futures_rustls::rustls::pki_types::CertificateDer;
use futures_rustls::rustls::pki_types::PrivateKeyDer;
use futures_rustls::rustls::RootCertStore;
use rustls_pemfile::certs;
use rustls_pemfile::pkcs8_private_keys;

Expand Down Expand Up @@ -104,8 +104,8 @@ mod connector {
use tracing::debug;

use crate::net::{
tcp_stream::stream, AsConnectionFd, BoxReadConnection, BoxWriteConnection, ConnectionFd,
DomainConnector, SplitConnection, TcpDomainConnector,
AsConnectionFd, BoxReadConnection, BoxWriteConnection, ConnectionFd, DomainConnector,
SplitConnection, TcpDomainConnector, tcp_stream::stream,
};

use super::TlsConnector;
Expand Down Expand Up @@ -219,25 +219,25 @@ mod builder {
use std::path::Path;
use std::sync::Arc;

use futures_rustls::TlsAcceptor;
use futures_rustls::TlsConnector;
use futures_rustls::pki_types::UnixTime;
use futures_rustls::rustls::ClientConfig;
use futures_rustls::rustls::ConfigBuilder;
use futures_rustls::rustls::Error as TlsError;
use futures_rustls::rustls::RootCertStore;
use futures_rustls::rustls::ServerConfig;
use futures_rustls::rustls::SignatureScheme;
use futures_rustls::rustls::WantsVerifier;
use futures_rustls::rustls::client::WantsClientCert;
use futures_rustls::rustls::client::danger::HandshakeSignatureValid;
use futures_rustls::rustls::client::danger::ServerCertVerified;
use futures_rustls::rustls::client::danger::ServerCertVerifier;
use futures_rustls::rustls::client::WantsClientCert;
use futures_rustls::rustls::pki_types::CertificateDer;
use futures_rustls::rustls::pki_types::PrivateKeyDer;
use futures_rustls::rustls::pki_types::ServerName;
use futures_rustls::rustls::server::WantsServerCert;
use futures_rustls::rustls::server::WebPkiClientVerifier;
use futures_rustls::rustls::ClientConfig;
use futures_rustls::rustls::ConfigBuilder;
use futures_rustls::rustls::Error as TlsError;
use futures_rustls::rustls::RootCertStore;
use futures_rustls::rustls::ServerConfig;
use futures_rustls::rustls::SignatureScheme;
use futures_rustls::rustls::WantsVerifier;
use futures_rustls::TlsAcceptor;
use futures_rustls::TlsConnector;

use anyhow::{Context, Result};
use tracing::info;
Expand Down Expand Up @@ -471,8 +471,8 @@ mod test {
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;

use fluvio_future::net::tcp_stream::stream;
use fluvio_future::net::TcpListener;
use fluvio_future::net::tcp_stream::stream;
use fluvio_future::test_async;
use fluvio_future::timer::sleep;

Expand Down
29 changes: 15 additions & 14 deletions src/task.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
use std::future::Future;

use async_std::task;

#[cfg(feature = "task_unstable")]
pub use async_std::task::spawn_local;

/// run future and wait forever
/// this is typically used in the server
pub fn run<F>(spawn_closure: F)
where
F: Future<Output = ()> + Send + 'static,
{
task::block_on(spawn_closure);
async_global_executor::block_on(spawn_closure);
}

// preserve async-std spawn behavior which is always detach task.
// to fully control task life cycle, use spawn_task
cfg_if::cfg_if! {
if #[cfg(target_arch = "wasm32")] {
pub use async_std::task::spawn_local as spawn;

pub use async_global_executor::spawn_local as spawn_task;
pub fn spawn<F: Future<Output = T> + 'static, T: 'static>(future: F){
spawn_task(future).detach();
}
} else {
pub use async_std::task::spawn;
pub use async_global_executor::spawn as spawn_task;
pub fn spawn<F: Future<Output = T> + Send + 'static, T: Send + 'static>(future: F) {
spawn_task(future).detach();
}
}
}

#[cfg(feature = "task_unstable")]
#[cfg(not(target_arch = "wasm32"))]
pub use async_std::task::spawn_blocking;
pub use async_global_executor::spawn_blocking;

cfg_if::cfg_if! {
if #[cfg(target_arch = "wasm32")] {
Expand All @@ -33,15 +36,13 @@ cfg_if::cfg_if! {
F: Future<Output = T> + 'static,
T: 'static,
{
task::block_on(f)
async_global_executor::block_on(f)
}
} else {
pub use async_std::task::block_on as run_block_on;
pub use async_global_executor::block_on as run_block_on;
}
}

pub use async_std::task::JoinHandle;

#[cfg(test)]
mod basic_test {

Expand Down
2 changes: 1 addition & 1 deletion src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ mod test {
use std::task::Context;
use std::task::Poll;

use futures_lite::future::poll_fn;
use futures_lite::Future;
use futures_lite::future::poll_fn;
use tracing::debug;

use crate::test_async;
Expand Down
17 changes: 5 additions & 12 deletions src/zero_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use std::os::unix::io::{AsRawFd, RawFd};
use std::thread::sleep;
use thiserror::Error;

use nix::Error as NixError;
#[allow(unused)]
use nix::libc::off_t;
use nix::sys::sendfile::sendfile;
use nix::Error as NixError;

use tracing::{debug, error, trace};

Expand Down Expand Up @@ -67,10 +67,7 @@ impl ZeroCopy {

trace!(
"trying: zero copy source fd: {} offset: {} len: {}, target fd: {}",
source_raw_fd,
current_offset,
to_be_transfer,
target_raw_fd
source_raw_fd, current_offset, to_be_transfer, target_raw_fd
);

match sendfile(
Expand All @@ -96,8 +93,7 @@ impl ZeroCopy {
} else {
trace!(
"actual: zero copy bytes transferred: {} out of {}, ",
bytes_transferred,
size
bytes_transferred, size
);

return Ok(total_transferred);
Expand Down Expand Up @@ -138,10 +134,7 @@ impl ZeroCopy {

trace!(
"mac zero copy source fd: {} offset: {} len: {}, target: fd{}",
source_raw_fd,
current_offset,
to_be_transfer,
target_raw_fd
source_raw_fd, current_offset, to_be_transfer, target_raw_fd
);

let (res, bytes_transferred) = sendfile(
Expand Down Expand Up @@ -203,8 +196,8 @@ mod tests {

use crate::file_slice::AsyncFileSlice;
use crate::fs::AsyncFileExtension;
use crate::net::tcp_stream::stream;
use crate::net::TcpListener;
use crate::net::tcp_stream::stream;
use crate::timer::sleep;
use crate::{fs::util as file_util, zero_copy::ZeroCopy};
use futures_lite::AsyncReadExt;
Expand Down
Loading