Skip to content

Commit 68f559e

Browse files
committed
rt: add sharding support for uring
1 parent 933fa49 commit 68f559e

File tree

7 files changed

+100
-34
lines changed

7 files changed

+100
-34
lines changed

tokio/src/runtime/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,7 +1457,7 @@ impl Builder {
14571457
use crate::runtime::scheduler;
14581458
use crate::runtime::Config;
14591459

1460-
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1460+
let (driver, driver_handle) = driver::Driver::new(self.get_cfg(), 1)?;
14611461

14621462
// Blocking pool
14631463
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
@@ -1612,7 +1612,7 @@ cfg_rt_multi_thread! {
16121612

16131613
let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
16141614

1615-
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1615+
let (driver, driver_handle) = driver::Driver::new(self.get_cfg(), worker_threads)?;
16161616

16171617
// Create the blocking pool
16181618
let blocking_pool =

tokio/src/runtime/driver.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ pub(crate) struct Cfg {
4343
}
4444

4545
impl Driver {
46-
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
47-
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
46+
pub(crate) fn new(cfg: Cfg, num_workers: usize) -> io::Result<(Self, Handle)> {
47+
let (io_stack, io_handle, signal_handle) =
48+
create_io_stack(cfg.enable_io, cfg.nevents, num_workers)?;
4849

4950
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
5051

@@ -136,12 +137,12 @@ cfg_io_driver! {
136137
Disabled(UnparkThread),
137138
}
138139

139-
fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
140+
fn create_io_stack(enabled: bool, nevents: usize, num_workers: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
140141
#[cfg(loom)]
141142
assert!(!enabled);
142143

143144
let ret = if enabled {
144-
let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
145+
let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents,num_workers)?;
145146

146147
let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
147148
let process_driver = create_process_driver(signal_driver);
@@ -202,7 +203,7 @@ cfg_not_io_driver! {
202203
#[derive(Debug)]
203204
pub(crate) struct IoStack(ParkThread);
204205

205-
fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
206+
fn create_io_stack(_enabled: bool, _nevents: usize, _num_worker: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
206207
let park_thread = ParkThread::new();
207208
let unpark_thread = park_thread.unpark();
208209
Ok((IoStack(park_thread), unpark_thread, Default::default()))

tokio/src/runtime/driver/op.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::runtime::Handle;
2+
use crate::runtime::OpId;
23
use io_uring::cqueue;
34
use io_uring::squeue::Entry;
45
use std::future::Future;
@@ -35,6 +36,8 @@ pub(crate) enum State {
3536
}
3637

3738
pub(crate) struct Op<T: Cancellable> {
39+
// Uring shard id this operation belongs to
40+
pub(crate) shard_id: usize,
3841
// Handle to the runtime
3942
handle: Handle,
4043
// State of this Op
@@ -51,7 +54,9 @@ impl<T: Cancellable> Op<T> {
5154
#[allow(dead_code)]
5255
pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
5356
let handle = Handle::current();
57+
let shard_size = handle.inner.driver().io().uring_context.len();
5458
Self {
59+
shard_id: OpId::next().as_u64() as usize % shard_size,
5560
handle,
5661
data: Some(data),
5762
state: State::Initialize(Some(entry)),
@@ -71,7 +76,11 @@ impl<T: Cancellable> Drop for Op<T> {
7176
State::Polled(index) => {
7277
let data = self.take_data();
7378
let handle = &mut self.handle;
74-
handle.inner.driver().io().cancel_op(index, data);
79+
handle
80+
.inner
81+
.driver()
82+
.io()
83+
.cancel_op(self.shard_id, index, data);
7584
}
7685
// This Op has not been polled yet.
7786
// We don't need to do anything here.
@@ -124,13 +133,13 @@ impl<T: Cancellable + Completable + Send> Future for Op<T> {
124133
let entry = entry_opt.take().expect("Entry must be present");
125134
let waker = cx.waker().clone();
126135
// SAFETY: entry is valid for the entire duration of the operation
127-
let idx = unsafe { driver.register_op(entry, waker)? };
136+
let idx = unsafe { driver.register_op(this.shard_id, entry, waker)? };
128137
this.state = State::Polled(idx);
129138
Poll::Pending
130139
}
131140

132141
State::Polled(idx) => {
133-
let mut ctx = driver.get_uring().lock();
142+
let mut ctx = driver.get_uring(this.shard_id).lock();
134143
let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present");
135144

136145
match mem::replace(lifecycle, Lifecycle::Submitted) {

tokio/src/runtime/io/driver.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ pub(crate) struct Handle {
5252
pub(crate) metrics: IoDriverMetrics,
5353

5454
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
55-
pub(crate) uring_context: Mutex<UringContext>,
55+
pub(crate) uring_context: Box<[Mutex<UringContext>]>,
5656

5757
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
58-
pub(crate) uring_state: AtomicUsize,
58+
pub(crate) uring_state: Box<[AtomicUsize]>,
5959
}
6060

6161
#[derive(Debug)]
@@ -102,7 +102,10 @@ fn _assert_kinds() {
102102
impl Driver {
103103
/// Creates a new event loop, returning any error that happened during the
104104
/// creation.
105-
pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
105+
pub(crate) fn new(
106+
nevents: usize,
107+
#[allow(unused)] num_workers: usize,
108+
) -> io::Result<(Driver, Handle)> {
106109
let poll = mio::Poll::new()?;
107110
#[cfg(not(target_os = "wasi"))]
108111
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
@@ -124,9 +127,15 @@ impl Driver {
124127
waker,
125128
metrics: IoDriverMetrics::default(),
126129
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
127-
uring_context: Mutex::new(UringContext::new()),
130+
uring_context: (0..num_workers)
131+
.map(|_| Mutex::new(UringContext::new()))
132+
.collect::<Vec<_>>()
133+
.into_boxed_slice(),
128134
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
129-
uring_state: AtomicUsize::new(0),
135+
uring_state: (0..num_workers)
136+
.map(|_| AtomicUsize::new(0))
137+
.collect::<Vec<_>>()
138+
.into_boxed_slice(),
130139
};
131140

132141
Ok((driver, handle))
@@ -200,9 +209,13 @@ impl Driver {
200209

201210
#[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
202211
{
203-
let mut guard = handle.get_uring().lock();
204-
let ctx = &mut *guard;
205-
ctx.dispatch_completions();
212+
let shard_size = handle.uring_context.len();
213+
214+
for shard_id in 0..shard_size {
215+
let mut guard = handle.get_uring(shard_id).lock();
216+
let ctx = &mut *guard;
217+
ctx.dispatch_completions();
218+
}
206219
}
207220

208221
handle.metrics.incr_ready_count_by(ready_count);

tokio/src/runtime/io/driver/uring.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -188,25 +188,26 @@ impl Handle {
188188
.register(&mut source, TOKEN_WAKEUP, Interest::READABLE.to_mio())
189189
}
190190

191-
pub(crate) fn get_uring(&self) -> &Mutex<UringContext> {
192-
&self.uring_context
191+
pub(crate) fn get_uring(&self, shard_id: usize) -> &Mutex<UringContext> {
192+
let shard_id = shard_id % self.uring_context.len();
193+
&self.uring_context[shard_id]
193194
}
194195

195-
fn set_uring_state(&self, state: State) {
196-
self.uring_state.store(state.as_usize(), Ordering::Release);
196+
fn set_uring_state(&self, shard_id: usize, state: State) {
197+
self.uring_state[shard_id].store(state.as_usize(), Ordering::Release);
197198
}
198199

199200
/// Check if the io_uring context is initialized. If not, it will try to initialize it.
200-
pub(crate) fn check_and_init(&self) -> io::Result<bool> {
201-
match State::from_usize(self.uring_state.load(Ordering::Acquire)) {
202-
State::Uninitialized => match self.try_init() {
201+
pub(crate) fn check_and_init(&self, shard_id: usize) -> io::Result<bool> {
202+
match State::from_usize(self.uring_state[shard_id].load(Ordering::Acquire)) {
203+
State::Uninitialized => match self.try_init(shard_id) {
203204
Ok(()) => {
204-
self.set_uring_state(State::Initialized);
205+
self.set_uring_state(shard_id, State::Initialized);
205206
Ok(true)
206207
}
207208
// If the system doesn't support io_uring, we set the state to Unsupported.
208209
Err(e) if e.raw_os_error() == Some(libc::ENOSYS) => {
209-
self.set_uring_state(State::Unsupported);
210+
self.set_uring_state(shard_id, State::Unsupported);
210211
Ok(false)
211212
}
212213
// For other system errors, we just return it.
@@ -218,8 +219,8 @@ impl Handle {
218219
}
219220

220221
/// Initialize the io_uring context if it hasn't been initialized yet.
221-
fn try_init(&self) -> io::Result<()> {
222-
let mut guard = self.get_uring().lock();
222+
fn try_init(&self, shard_id: usize) -> io::Result<()> {
223+
let mut guard = self.get_uring(shard_id).lock();
223224
if guard.try_init()? {
224225
self.add_uring_source(guard.ring().as_raw_fd())?;
225226
}
@@ -237,15 +238,20 @@ impl Handle {
237238
///
238239
/// Callers must ensure that parameters of the entry (such as buffer) are valid and will
239240
/// be valid for the entire duration of the operation, otherwise it may cause memory problems.
240-
pub(crate) unsafe fn register_op(&self, entry: Entry, waker: Waker) -> io::Result<usize> {
241+
pub(crate) unsafe fn register_op(
242+
&self,
243+
shard_id: usize,
244+
entry: Entry,
245+
waker: Waker,
246+
) -> io::Result<usize> {
241247
// Note: Maybe this check can be removed if upstream callers consistently use `check_and_init`.
242-
if !self.check_and_init()? {
248+
if !self.check_and_init(shard_id)? {
243249
return Err(io::Error::from_raw_os_error(libc::ENOSYS));
244250
}
245251

246252
// Uring is initialized.
247253

248-
let mut guard = self.get_uring().lock();
254+
let mut guard = self.get_uring(shard_id).lock();
249255
let ctx = &mut *guard;
250256
let index = ctx.ops.insert(Lifecycle::Waiting(waker));
251257
let entry = entry.user_data(index as u64);
@@ -273,8 +279,8 @@ impl Handle {
273279

274280
// TODO: Remove this annotation when operations are actually supported
275281
#[allow(unused_variables, unreachable_code)]
276-
pub(crate) fn cancel_op<T: Cancellable>(&self, index: usize, data: Option<T>) {
277-
let mut guard = self.get_uring().lock();
282+
pub(crate) fn cancel_op<T: Cancellable>(&self, shard_id: usize, index: usize, data: Option<T>) {
283+
let mut guard = self.get_uring(shard_id).lock();
278284
let ctx = &mut *guard;
279285
let ops = &mut ctx.ops;
280286
let Some(lifecycle) = ops.get_mut(index) else {

tokio/src/runtime/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,3 +421,8 @@ cfg_rt! {
421421
/// After thread starts / before thread stops
422422
type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
423423
}
424+
425+
cfg_tokio_uring! {
426+
pub(crate) mod op_id;
427+
pub(crate) use op_id::OpId;
428+
}

tokio/src/runtime/op_id.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// TODO: Put together with other similar id related utils.
2+
3+
use std::num::NonZeroU64;
4+
5+
#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)]
6+
pub(crate) struct OpId(NonZeroU64);
7+
8+
impl OpId {
9+
pub(crate) fn next() -> Self {
10+
use crate::loom::sync::atomic::Ordering::Relaxed;
11+
use crate::loom::sync::atomic::StaticAtomicU64;
12+
13+
#[cfg(all(test, loom))]
14+
crate::loom::lazy_static! {
15+
static ref NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
16+
}
17+
18+
#[cfg(not(all(test, loom)))]
19+
static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
20+
21+
loop {
22+
let id = NEXT_ID.fetch_add(1, Relaxed);
23+
if let Some(id) = NonZeroU64::new(id) {
24+
return Self(id);
25+
}
26+
}
27+
}
28+
29+
pub(crate) fn as_u64(&self) -> u64 {
30+
self.0.get()
31+
}
32+
}

0 commit comments

Comments
 (0)