Skip to content

Commit e86fc49

Browse files
committed
Change need_read and need_write to return an error
This commit is targeted at solving tokio-rs/tokio-core#12 and incorporates the solution from tokio-rs/tokio-core#17. Namely the `need_read` and `need_write` functions on `PollEvented` now return an error when the connected reactor has gone away and the task cannot be blocked. This will typically naturally translate to errors being returned by various connected I/O objects and should help tear down the world in a clean-ish fashion.
1 parent 8fcce95 commit e86fc49

File tree

6 files changed

+97
-23
lines changed

6 files changed

+97
-23
lines changed

src/net/tcp.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl TcpListener {
6464
match self.io.get_ref().accept() {
6565
Err(e) => {
6666
if e.kind() == io::ErrorKind::WouldBlock {
67-
self.io.need_read();
67+
self.io.need_read()?;
6868
}
6969
Err(e)
7070
},
@@ -576,7 +576,7 @@ impl<'a> AsyncRead for &'a TcpStream {
576576
Ok(Async::Ready(n))
577577
}
578578
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
579-
self.io.need_read();
579+
self.io.need_read()?;
580580
Ok(Async::NotReady)
581581
}
582582
Err(e) => Err(e),
@@ -614,7 +614,7 @@ impl<'a> AsyncWrite for &'a TcpStream {
614614
Ok(Async::Ready(n))
615615
}
616616
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
617-
self.io.need_write();
617+
self.io.need_write()?;
618618
Ok(Async::NotReady)
619619
}
620620
Err(e) => Err(e),

src/net/udp/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl UdpSocket {
9393
Ok(n) => Ok(n),
9494
Err(e) => {
9595
if e.kind() == io::ErrorKind::WouldBlock {
96-
self.io.need_write();
96+
self.io.need_write()?;
9797
}
9898
Err(e)
9999
}
@@ -115,7 +115,7 @@ impl UdpSocket {
115115
Ok(n) => Ok(n),
116116
Err(e) => {
117117
if e.kind() == io::ErrorKind::WouldBlock {
118-
self.io.need_read();
118+
self.io.need_read()?;
119119
}
120120
Err(e)
121121
}
@@ -163,7 +163,7 @@ impl UdpSocket {
163163
Ok(n) => Ok(n),
164164
Err(e) => {
165165
if e.kind() == io::ErrorKind::WouldBlock {
166-
self.io.need_write();
166+
self.io.need_write()?;
167167
}
168168
Err(e)
169169
}
@@ -205,7 +205,7 @@ impl UdpSocket {
205205
Ok(n) => Ok(n),
206206
Err(e) => {
207207
if e.kind() == io::ErrorKind::WouldBlock {
208-
self.io.need_read();
208+
self.io.need_read()?;
209209
}
210210
Err(e)
211211
}

src/reactor/io_token.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,14 @@ impl IoToken {
9292
///
9393
/// This function will also panic if there is not a currently running future
9494
/// task.
95-
pub fn schedule_read(&self) {
95+
pub fn schedule_read(&self) -> io::Result<()> {
9696
let inner = match self.handle.inner.upgrade() {
9797
Some(inner) => inner,
98-
None => return,
98+
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
9999
};
100100

101101
inner.schedule(self.token, Direction::Read);
102+
Ok(())
102103
}
103104

104105
/// Schedule the current future task to receive a notification when the
@@ -124,13 +125,14 @@ impl IoToken {
124125
///
125126
/// This function will also panic if there is not a currently running future
126127
/// task.
127-
pub fn schedule_write(&self) {
128+
pub fn schedule_write(&self) -> io::Result<()> {
128129
let inner = match self.handle.inner.upgrade() {
129130
Some(inner) => inner,
130-
None => return,
131+
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
131132
};
132133

133134
inner.schedule(self.token, Direction::Write);
135+
Ok(())
134136
}
135137

136138
/// Unregister all information associated with a token on an event loop,

src/reactor/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,19 @@ impl fmt::Debug for Core {
231231
}
232232
}
233233

234+
impl Drop for Inner {
235+
fn drop(&mut self) {
236+
// When a reactor is dropped it needs to wake up all blocked tasks as
237+
// they'll never receive a notification, and all connected I/O objects
238+
// will start returning errors pretty quickly.
239+
let io = self.io_dispatch.read().unwrap();
240+
for (_, io) in io.iter() {
241+
io.writer.notify();
242+
io.reader.notify();
243+
}
244+
}
245+
}
246+
234247
impl Inner {
235248
/// Register an I/O resource with the reactor.
236249
///

src/reactor/poll_evented.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,13 @@ impl<E> PollEvented<E> {
184184
match self.readiness.load(Ordering::SeqCst) & bits {
185185
0 => {
186186
if mask.is_writable() {
187-
self.need_write();
187+
if self.need_write().is_err() {
188+
return Async::Ready(mask)
189+
}
188190
} else {
189-
self.need_read();
191+
if self.need_read().is_err() {
192+
return Async::Ready(mask)
193+
}
190194
}
191195
Async::NotReady
192196
}
@@ -213,14 +217,22 @@ impl<E> PollEvented<E> {
213217
/// previously indicated that the object is readable. That is, this function
214218
/// must always be paired with calls to `poll_read` previously.
215219
///
220+
/// # Errors
221+
///
222+
/// This function will return an error if the `Core` that this `PollEvented`
223+
/// is associated with has gone away (been destroyed). The error means that
224+
/// the ambient futures task could not be scheduled to receive a
225+
/// notification and typically means that the error should be propagated
226+
/// outwards.
227+
///
216228
/// # Panics
217229
///
218230
/// This function will panic if called outside the context of a future's
219231
/// task.
220-
pub fn need_read(&self) {
232+
pub fn need_read(&self) -> io::Result<()> {
221233
let bits = super::ready2usize(super::read_ready());
222234
self.readiness.fetch_and(!bits, Ordering::SeqCst);
223-
self.token.schedule_read();
235+
self.token.schedule_read()
224236
}
225237

226238
/// Indicates to this source of events that the corresponding I/O object is
@@ -239,14 +251,22 @@ impl<E> PollEvented<E> {
239251
/// previously indicated that the object is writable. That is, this function
240252
/// must always be paired with calls to `poll_write` previously.
241253
///
254+
/// # Errors
255+
///
256+
/// This function will return an error if the `Core` that this `PollEvented`
257+
/// is associated with has gone away (been destroyed). The error means that
258+
/// the ambient futures task could not be scheduled to receive a
259+
/// notification and typically means that the error should be propagated
260+
/// outwards.
261+
///
242262
/// # Panics
243263
///
244264
/// This function will panic if called outside the context of a future's
245265
/// task.
246-
pub fn need_write(&self) {
266+
pub fn need_write(&self) -> io::Result<()> {
247267
let bits = super::ready2usize(Ready::writable());
248268
self.readiness.fetch_and(!bits, Ordering::SeqCst);
249-
self.token.schedule_write();
269+
self.token.schedule_write()
250270
}
251271

252272
/// Returns a reference to the event loop handle that this readiness stream
@@ -275,7 +295,7 @@ impl<E: Read> Read for PollEvented<E> {
275295
}
276296
let r = self.get_mut().read(buf);
277297
if is_wouldblock(&r) {
278-
self.need_read();
298+
self.need_read()?;
279299
}
280300
return r
281301
}
@@ -288,7 +308,7 @@ impl<E: Write> Write for PollEvented<E> {
288308
}
289309
let r = self.get_mut().write(buf);
290310
if is_wouldblock(&r) {
291-
self.need_write();
311+
self.need_write()?;
292312
}
293313
return r
294314
}
@@ -299,7 +319,7 @@ impl<E: Write> Write for PollEvented<E> {
299319
}
300320
let r = self.get_mut().flush();
301321
if is_wouldblock(&r) {
302-
self.need_write();
322+
self.need_write()?;
303323
}
304324
return r
305325
}
@@ -323,7 +343,7 @@ impl<'a, E> Read for &'a PollEvented<E>
323343
}
324344
let r = self.get_ref().read(buf);
325345
if is_wouldblock(&r) {
326-
self.need_read();
346+
self.need_read()?;
327347
}
328348
return r
329349
}
@@ -338,7 +358,7 @@ impl<'a, E> Write for &'a PollEvented<E>
338358
}
339359
let r = self.get_ref().write(buf);
340360
if is_wouldblock(&r) {
341-
self.need_write();
361+
self.need_write()?;
342362
}
343363
return r
344364
}
@@ -349,7 +369,7 @@ impl<'a, E> Write for &'a PollEvented<E>
349369
}
350370
let r = self.get_ref().flush();
351371
if is_wouldblock(&r) {
352-
self.need_write();
372+
self.need_write()?;
353373
}
354374
return r
355375
}

tests/drop-core.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
extern crate tokio;
2+
extern crate futures;
3+
4+
use std::thread;
5+
6+
use futures::future;
7+
use futures::prelude::*;
8+
use futures::sync::oneshot;
9+
use tokio::net::TcpListener;
10+
use tokio::reactor::Core;
11+
12+
#[test]
13+
fn tcp_doesnt_block() {
14+
let core = Core::new().unwrap();
15+
let handle = core.handle();
16+
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap();
17+
drop(core);
18+
assert!(listener.incoming().wait().next().unwrap().is_err());
19+
}
20+
21+
#[test]
22+
fn drop_wakes() {
23+
let core = Core::new().unwrap();
24+
let handle = core.handle();
25+
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap();
26+
let (tx, rx) = oneshot::channel::<()>();
27+
let t = thread::spawn(move || {
28+
let incoming = listener.incoming();
29+
let new_socket = incoming.into_future().map_err(|_| ());
30+
let drop_tx = future::lazy(|| {
31+
drop(tx);
32+
future::ok(())
33+
});
34+
assert!(new_socket.join(drop_tx).wait().is_err());
35+
});
36+
drop(rx.wait());
37+
drop(core);
38+
t.join().unwrap();
39+
}

0 commit comments

Comments
 (0)