Skip to content

Commit faac102

Browse files
committed
Properly deregister backends from slab and mio
Signed-off-by: Eloi DEMOLIS <[email protected]>
1 parent 7f3f7f2 commit faac102

File tree

7 files changed

+119
-44
lines changed

7 files changed

+119
-44
lines changed

e2e/src/mock/async_backend.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl<A: Aggregator + Send + Sync + 'static> BackendHandle<A> {
3535
let name = name.into();
3636
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
3737
let (mut aggregator_tx, aggregator_rx) = mpsc::channel::<A>(1);
38-
let listener = TcpListener::bind(address).expect("could not bind");
38+
let listener = TcpListener::bind(address).expect(&format!("could not bind on: {address}"));
3939
let mut clients = Vec::new();
4040
let thread_name = name.to_owned();
4141

e2e/src/mock/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl Client {
3939
/// Establish a TCP connection with its address,
4040
/// register the yielded TCP stream, apply timeouts
4141
pub fn connect(&mut self) {
42-
let stream = TcpStream::connect(self.address).expect("could not connect");
42+
let stream = TcpStream::connect(self.address).expect(&format!("could not connect to: {}", self.address));
4343
stream
4444
.set_read_timeout(Some(Duration::from_millis(100)))
4545
.expect("could not set read timeout");

e2e/src/mock/sync_backend.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl Backend {
4444

4545
/// Binds itself to its address, stores the yielded TCP listener
4646
pub fn connect(&mut self) {
47-
let listener = TcpListener::bind(self.address).expect("could not bind");
47+
let listener = TcpListener::bind(self.address).expect(&format!("could not bind on: {}", self.address));
4848
let timeout = Duration::from_millis(100);
4949
let timeout = libc::timeval {
5050
tv_sec: 0,

lib/src/protocol/mux/h1.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
3939
E: Endpoint,
4040
{
4141
println_!("======= MUX H1 READABLE {:?}", self.position);
42+
self.timeout_container.reset();
4243
let stream = &mut context.streams[self.stream];
4344
let parts = stream.split(&self.position);
4445
let kawa = parts.rbuffer;
@@ -67,6 +68,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
6768
return MuxResult::Continue;
6869
}
6970
if kawa.is_terminated() {
71+
self.timeout_container.cancel();
7072
self.readiness.interest.remove(Ready::READABLE);
7173
}
7274
if kawa.is_main_phase() {
@@ -95,6 +97,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
9597
E: Endpoint,
9698
{
9799
println_!("======= MUX H1 WRITABLE {:?}", self.position);
100+
self.timeout_container.reset();
98101
let stream = &mut context.streams[self.stream];
99102
let kawa = stream.wbuffer(&self.position);
100103
kawa.prepare(&mut kawa::h1::BlockConverter);
@@ -121,20 +124,20 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
121124
match kawa.detached.status_line {
122125
kawa::StatusLine::Response { code: 101, .. } => {
123126
println!("============== HANDLE UPGRADE!");
124-
// unimplemented!();
125127
return MuxResult::Upgrade;
126128
}
127129
kawa::StatusLine::Response { code: 100, .. } => {
128130
println!("============== HANDLE CONTINUE!");
129131
// after a 100 continue, we expect the client to continue with its request
132+
self.timeout_container.reset();
130133
self.readiness.interest.insert(Ready::READABLE);
131134
kawa.clear();
132135
return MuxResult::Continue;
133136
}
134137
kawa::StatusLine::Response { code: 103, .. } => {
135138
println!("============== HANDLE EARLY HINT!");
136139
if let StreamState::Linked(token) = stream.state {
137-
// after a 103 early hints, we expect the server to send its response
140+
// after a 103 early hints, we expect the backend to send its response
138141
endpoint
139142
.readiness_mut(token)
140143
.interest
@@ -149,6 +152,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
149152
}
150153
let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked);
151154
if stream.context.keep_alive_frontend {
155+
self.timeout_container.reset();
152156
println!("{old_state:?} {:?}", self.readiness);
153157
if let StreamState::Linked(token) = old_state {
154158
println!("{:?}", endpoint.readiness(token));
@@ -160,7 +164,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
160164
stream.back.clear();
161165
stream.back.storage.clear();
162166
stream.front.clear();
163-
// do not clear stream.front.storage because of H1 pipelining
167+
// do not stream.front.storage.clear() because of H1 pipelining
164168
stream.attempts = 0;
165169
} else {
166170
return MuxResult::CloseSession;

lib/src/protocol/mux/h2.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, str::from_utf8_unchecked};
1+
use std::collections::HashMap;
22

33
use rusty_ulid::Ulid;
44
use sozu_command::ready::Ready;
@@ -60,7 +60,7 @@ impl Default for H2Settings {
6060
Self {
6161
settings_header_table_size: 4096,
6262
settings_enable_push: true,
63-
settings_max_concurrent_streams: 256,
63+
settings_max_concurrent_streams: 100,
6464
settings_initial_window_size: (1 << 16) - 1,
6565
settings_max_frame_size: 1 << 14,
6666
settings_max_header_list_size: u32::MAX,
@@ -128,6 +128,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
128128
E: Endpoint,
129129
{
130130
println_!("======= MUX H2 READABLE {:?}", self.position);
131+
self.timeout_container.reset();
131132
let (stream_id, kawa) = if let Some((stream_id, amount)) = self.expect_read {
132133
let kawa = match stream_id {
133134
H2StreamId::Zero => &mut self.zero,
@@ -269,7 +270,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
269270
self.expect_read = Some((stream_id, header.payload_len as usize));
270271
self.state = H2State::Frame(header);
271272
}
272-
Err(e) => panic!("stream error: {:?}", error_nom_to_h2(e)),
273+
Err(_) => return self.goaway(H2Error::ProtocolError),
273274
};
274275
}
275276
(H2State::ContinuationHeader(headers), _) => {
@@ -287,7 +288,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
287288
headers.header_block_fragment.len += header.payload_len;
288289
self.state = H2State::ContinuationFrame(headers);
289290
}
290-
Err(e) => panic!("stream error: {:?}", error_nom_to_h2(e)),
291+
Err(_) => return self.goaway(H2Error::ProtocolError),
291292
};
292293
}
293294
(H2State::Frame(header), _) => {
@@ -325,6 +326,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
325326
E: Endpoint,
326327
{
327328
println_!("======= MUX H2 WRITABLE {:?}", self.position);
329+
self.timeout_container.reset();
328330
if let Some(H2StreamId::Zero) = self.expect_write {
329331
let kawa = &mut self.zero;
330332
println_!("{:?}", kawa.storage.data());
@@ -607,7 +609,19 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
607609
rst_stream.error_code,
608610
error_code_to_str(rst_stream.error_code)
609611
);
610-
self.streams.remove(&rst_stream.stream_id);
612+
if let Some(stream_id) = self.streams.remove(&rst_stream.stream_id) {
613+
let stream = &mut context.streams[stream_id];
614+
if let StreamState::Linked(token) = stream.state {
615+
endpoint.end_stream(token, stream_id, context);
616+
}
617+
let stream = &mut context.streams[stream_id];
618+
match self.position {
619+
Position::Client(_) => {}
620+
Position::Server => {
621+
stream.state = StreamState::Recycle;
622+
}
623+
}
624+
}
611625
}
612626
Frame::Settings(settings) => {
613627
if settings.ack {

lib/src/protocol/mux/mod.rs

Lines changed: 89 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::{
22
cell::RefCell,
33
collections::HashMap,
4-
io::Write,
5-
net::SocketAddr,
4+
io::{ErrorKind, Write},
5+
net::{Shutdown, SocketAddr},
66
rc::{Rc, Weak},
77
time::Duration,
88
};
@@ -47,7 +47,7 @@ macro_rules! println_ {
4747
};
4848
}
4949
fn debug_kawa(_kawa: &GenericHttpStream) {
50-
kawa::debug_kawa(_kawa);
50+
// kawa::debug_kawa(_kawa);
5151
}
5252

5353
/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
@@ -299,18 +299,24 @@ impl<Front: SocketHandler> Connection<Front> {
299299
Connection::H2(c) => &mut c.position,
300300
}
301301
}
302-
pub fn timeout_container(&mut self) -> &mut TimeoutContainer {
303-
match self {
304-
Connection::H1(c) => &mut c.timeout_container,
305-
Connection::H2(c) => &mut c.timeout_container,
306-
}
307-
}
308302
pub fn socket(&self) -> &TcpStream {
309303
match self {
310304
Connection::H1(c) => c.socket.socket_ref(),
311305
Connection::H2(c) => c.socket.socket_ref(),
312306
}
313307
}
308+
pub fn socket_mut(&mut self) -> &mut TcpStream {
309+
match self {
310+
Connection::H1(c) => c.socket.socket_mut(),
311+
Connection::H2(c) => c.socket.socket_mut(),
312+
}
313+
}
314+
pub fn timeout_container(&mut self) -> &mut TimeoutContainer {
315+
match self {
316+
Connection::H1(c) => &mut c.timeout_container,
317+
Connection::H2(c) => &mut c.timeout_container,
318+
}
319+
}
314320
fn force_disconnect(&mut self) -> MuxResult {
315321
match self {
316322
Connection::H1(c) => c.force_disconnect(),
@@ -951,6 +957,9 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
951957
*position = Position::Client(BackendStatus::Connected(
952958
std::mem::take(cluster_id),
953959
));
960+
backend
961+
.timeout_container()
962+
.set_duration(self.router.configured_backend_timeout);
954963
}
955964
_ => {}
956965
}
@@ -981,7 +990,29 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
981990
}
982991
if !dead_backends.is_empty() {
983992
for token in &dead_backends {
984-
self.router.backends.remove(token);
993+
let proxy_borrow = proxy.borrow();
994+
if let Some(mut backend) = self.router.backends.remove(token) {
995+
backend.timeout_container().cancel();
996+
let socket = backend.socket_mut();
997+
if let Err(e) = proxy_borrow.deregister_socket(socket) {
998+
error!("error deregistering back socket({:?}): {:?}", socket, e);
999+
}
1000+
if let Err(e) = socket.shutdown(Shutdown::Both) {
1001+
if e.kind() != ErrorKind::NotConnected {
1002+
error!(
1003+
"error shutting down back socket({:?}): {:?}",
1004+
socket, e
1005+
);
1006+
}
1007+
}
1008+
} else {
1009+
error!("session {:?} has no backend!", token);
1010+
}
1011+
if !proxy_borrow.remove_session(*token) {
1012+
error!("session {:?} was already removed!", token);
1013+
} else {
1014+
println!("SUCCESS: session {token:?} was removed!");
1015+
}
9851016
}
9861017
println_!("FRONTEND: {:#?}", self.frontend);
9871018
println_!("BACKENDS: {:#?}", self.router.backends);
@@ -1013,10 +1044,15 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
10131044
}
10141045

10151046
let context = &mut self.context;
1016-
let front_readiness = self.frontend.readiness_mut();
10171047
let mut dirty = false;
10181048
for stream_id in 0..context.streams.len() {
10191049
if context.streams[stream_id].state == StreamState::Link {
1050+
// Before the first request triggers a stream Link, the frontend timeout is set
1051+
// to a shorter request_timeout, here we switch to the longer nominal timeout
1052+
self.frontend
1053+
.timeout_container()
1054+
.set_duration(self.configured_frontend_timeout);
1055+
let front_readiness = self.frontend.readiness_mut();
10201056
dirty = true;
10211057
match self.router.connect(
10221058
stream_id,
@@ -1105,9 +1141,9 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
11051141
should_write = true;
11061142
}
11071143
StreamState::Linked(_) => {
1108-
// A stream Linked to a backend is waiting for the response, not the answer.
1144+
// A stream Linked to a backend is waiting for the response, not the request.
11091145
// For streaming or malformed requests, it is possible that the request is not
1110-
// terminated at this point. For now, we do nothing and
1146+
// terminated at this point. For now, we do nothing
11111147
should_close = false;
11121148
}
11131149
StreamState::Unlinked => {
@@ -1202,27 +1238,48 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
12021238
}
12031239
}
12041240

1205-
fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
1206-
let s = match &mut self.frontend {
1207-
Connection::H1(c) => &mut c.socket,
1208-
Connection::H2(c) => &mut c.socket,
1209-
};
1210-
let mut b = [0; 1024];
1211-
let (size, status) = s.socket_read(&mut b);
1212-
println_!("{size} {status:?} {:?}", &b[..size]);
1213-
for stream in &mut self.context.streams {
1214-
for kawa in [&mut stream.front, &mut stream.back] {
1215-
debug_kawa(kawa);
1216-
kawa.prepare(&mut kawa::h1::BlockConverter);
1217-
let out = kawa.as_io_slice();
1218-
let mut writer = std::io::BufWriter::new(Vec::new());
1219-
let amount = writer.write_vectored(&out).unwrap();
1220-
println_!(
1221-
"amount: {amount}\n{}",
1222-
String::from_utf8_lossy(writer.buffer())
1223-
);
1241+
fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
1242+
println_!("FRONTEND: {:#?}", self.frontend);
1243+
println_!("BACKENDS: {:#?}", self.router.backends);
1244+
1245+
for (token, backend) in &mut self.router.backends {
1246+
let proxy_borrow = proxy.borrow();
1247+
backend.timeout_container().cancel();
1248+
let socket = backend.socket_mut();
1249+
if let Err(e) = proxy_borrow.deregister_socket(socket) {
1250+
error!("error deregistering back socket({:?}): {:?}", socket, e);
1251+
}
1252+
if let Err(e) = socket.shutdown(Shutdown::Both) {
1253+
if e.kind() != ErrorKind::NotConnected {
1254+
error!("error shutting down back socket({:?}): {:?}", socket, e);
1255+
}
1256+
}
1257+
if !proxy_borrow.remove_session(*token) {
1258+
error!("session {:?} was already removed!", token);
1259+
} else {
1260+
println!("SUCCESS: session {token:?} was removed!");
12241261
}
12251262
}
1263+
// let s = match &mut self.frontend {
1264+
// Connection::H1(c) => &mut c.socket,
1265+
// Connection::H2(c) => &mut c.socket,
1266+
// };
1267+
// let mut b = [0; 1024];
1268+
// let (size, status) = s.socket_read(&mut b);
1269+
// println_!("{size} {status:?} {:?}", &b[..size]);
1270+
// for stream in &mut self.context.streams {
1271+
// for kawa in [&mut stream.front, &mut stream.back] {
1272+
// debug_kawa(kawa);
1273+
// kawa.prepare(&mut kawa::h1::BlockConverter);
1274+
// let out = kawa.as_io_slice();
1275+
// let mut writer = std::io::BufWriter::new(Vec::new());
1276+
// let amount = writer.write_vectored(&out).unwrap();
1277+
// println_!(
1278+
// "amount: {amount}\n{}",
1279+
// String::from_utf8_lossy(writer.buffer())
1280+
// );
1281+
// }
1282+
// }
12261283
}
12271284

12281285
fn shutting_down(&mut self) -> SessionIsToBeClosed {

lib/src/protocol/mux/pkawa.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub fn handle_header<C>(
117117
version: Version::V20,
118118
code,
119119
status,
120-
reason: Store::Static(b"Default"),
120+
reason: Store::Static(b"FromH2"),
121121
}
122122
}
123123
};

0 commit comments

Comments
 (0)