Skip to content

Commit d2bc46f

Browse files
committed
h2spec ETA: 140/147
- h2 converter takes into account the window of the converted stream - update windows upon receiving a new initial_window setting - track negative windows Signed-off-by: Eloi DEMOLIS <[email protected]>
1 parent 69241af commit d2bc46f

File tree

6 files changed

+86
-52
lines changed

6 files changed

+86
-52
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ hdrhistogram = "^7.5.4"
3434
hex = "^0.4.3"
3535
hpack = "^0.3.0"
3636
idna = "^0.5.0"
37-
kawa = { version = "^0.6.6", default-features = false }
37+
# kawa = { version = "^0.6.6", default-features = false }
38+
kawa = { git = "https://github.com/CleverCloud/kawa", branch = "stoppable_converter" }
39+
# kawa = { path = "/home/wonshtrum/Perso/sozu/htx", default-features = false }
3840
libc = "^0.2.153"
3941
memchr = "^2.7.2"
4042
mio = { version = "^0.8.11", features = ["os-poll", "os-ext", "net"] }

lib/src/protocol/mux/converter.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::protocol::{
1515
};
1616

1717
pub struct H2BlockConverter<'a> {
18+
pub window: i32,
1819
pub stream_id: StreamId,
1920
pub encoder: &'a mut hpack::Encoder<'static>,
2021
pub out: Vec<u8>,
@@ -41,7 +42,7 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
4142
_ => {}
4243
}
4344
}
44-
fn call(&mut self, block: Block, kawa: &mut Kawa<T>) {
45+
fn call(&mut self, block: Block, kawa: &mut Kawa<T>) -> bool {
4546
let buffer = kawa.storage.buffer();
4647
match block {
4748
Block::StatusLine => match kawa.detached.status_line.pop() {
@@ -73,7 +74,7 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
7374
},
7475
Block::Cookies => {
7576
if kawa.detached.jar.is_empty() {
76-
return;
77+
return true;
7778
}
7879
for cookie in kawa
7980
.detached
@@ -107,7 +108,7 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
107108
|| compare_no_case(key, b"upgrade")
108109
{
109110
println!("Elided H2 header: {}", unsafe { from_utf8_unchecked(key) });
110-
return;
111+
return true;
111112
}
112113
}
113114
self.encoder
@@ -122,12 +123,21 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
122123
}
123124
Block::Chunk(Chunk { data }) => {
124125
let mut header = [0; 9];
125-
let payload_len = match &data {
126-
Store::Empty => 0,
127-
Store::Detached(s) | Store::Slice(s) => s.len,
128-
Store::Static(s) => s.len() as u32,
129-
Store::Alloc(a, i) => a.len() as u32 - i,
126+
let payload_len = data.len();
127+
let (data, payload_len, can_continue) = if self.window >= payload_len as i32 {
128+
// the window is wide enought to send the entire chunk
129+
(data, payload_len as u32, true)
130+
} else if self.window > 0 {
131+
// we split the chunk to fit in the window
132+
let (before, after) = data.split(self.window as usize);
133+
kawa.blocks.push_front(Block::Chunk(Chunk { data: after }));
134+
(before, self.window as u32, false)
135+
} else {
136+
// the window can't take any more bytes, return the chunk to the blocks
137+
kawa.blocks.push_front(Block::Chunk(Chunk { data }));
138+
return false;
130139
};
140+
self.window -= payload_len as i32;
131141
gen_frame_header(
132142
&mut header,
133143
&FrameHeader {
@@ -140,7 +150,8 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
140150
.unwrap();
141151
kawa.push_out(Store::from_slice(&header));
142152
kawa.push_out(data);
143-
kawa.push_delimiter()
153+
kawa.push_delimiter();
154+
return can_continue;
144155
}
145156
Block::Flags(Flags {
146157
end_header,
@@ -182,6 +193,7 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
182193
}
183194
}
184195
}
196+
true
185197
}
186198
fn finalize(&mut self, _kawa: &mut Kawa<T>) {
187199
assert!(self.out.is_empty());

lib/src/protocol/mux/h2.rs

Lines changed: 56 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::HashMap;
1+
use std::{cmp::min, collections::HashMap};
22

33
use rusty_ulid::Ulid;
44
use sozu_command::ready::Ready;
@@ -9,7 +9,7 @@ use crate::{
99
converter, debug_kawa, forcefully_terminate_answer,
1010
parser::{
1111
self, error_code_to_str, Frame, FrameHeader, FrameType, H2Error, Headers, ParserError,
12-
ParserErrorKind, StreamDependency, WindowUpdate,
12+
ParserErrorKind, WindowUpdate,
1313
},
1414
pkawa, serializer, set_default_answer, update_readiness_after_read,
1515
update_readiness_after_write, BackendStatus, Context, Endpoint, GenericHttpStream,
@@ -127,13 +127,13 @@ pub struct ConnectionH2<Front: SocketHandler> {
127127
impl<Front: SocketHandler> std::fmt::Debug for ConnectionH2<Front> {
128128
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129129
f.debug_struct("ConnectionH2")
130-
.field("expect", &self.expect_read)
131130
.field("position", &self.position)
131+
.field("state", &self.state)
132+
.field("expect", &self.expect_read)
132133
.field("readiness", &self.readiness)
133134
.field("local_settings", &self.local_settings)
134135
.field("peer_settings", &self.peer_settings)
135136
.field("socket", &self.socket.socket_ref())
136-
.field("state", &self.state)
137137
.field("streams", &self.streams)
138138
.field("zero", &self.zero.storage.meter(20))
139139
.field("window", &self.window)
@@ -185,15 +185,6 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
185185
return self.force_disconnect();
186186
}
187187
}
188-
// (
189-
// H2State::Frame(FrameHeader {
190-
// payload_len,
191-
// frame_type: FrameType::Data,
192-
// flags,
193-
// stream_id,
194-
// }),
195-
// _,
196-
// ) => {}
197188
_ => {}
198189
}
199190
return MuxResult::Continue;
@@ -340,9 +331,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
340331
Some(_) => {}
341332
None => return self.goaway(H2Error::InternalError),
342333
}
343-
} else if header.frame_type != FrameType::Priority
344-
&& header.frame_type != FrameType::WindowUpdate
345-
{
334+
} else if header.frame_type != FrameType::Priority {
346335
println_!(
347336
"ONLY HEADERS AND PRIORITY CAN BE RECEIVED ON IDLE/CLOSED STREAMS"
348337
);
@@ -546,6 +535,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
546535
}
547536

548537
let mut converter = converter::H2BlockConverter {
538+
window: 0,
549539
stream_id: 0,
550540
encoder: &mut self.encoder,
551541
out: Vec::new(),
@@ -557,10 +547,16 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
557547
'outer: for stream_id in priorities {
558548
let global_stream_id = *self.streams.get(stream_id).unwrap();
559549
let stream = &mut context.streams[global_stream_id];
560-
let kawa = stream.wbuffer(&self.position);
550+
let parts = stream.split(&self.position);
551+
let kawa = parts.wbuffer;
561552
if kawa.is_main_phase() || kawa.is_error() {
553+
let window = min(*parts.window, self.window);
554+
converter.window = window;
562555
converter.stream_id = *stream_id;
563556
kawa.prepare(&mut converter);
557+
let consumed = window - converter.window;
558+
*parts.window -= consumed;
559+
self.window -= consumed;
564560
debug_kawa(kawa);
565561
}
566562
while !kawa.out.is_empty() {
@@ -828,7 +824,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
828824
1 => { self.peer_settings.settings_header_table_size = v },
829825
2 => { self.peer_settings.settings_enable_push = v == 1; is_error |= v > 1 },
830826
3 => { self.peer_settings.settings_max_concurrent_streams = v },
831-
4 => { self.peer_settings.settings_initial_window_size = v; is_error |= v >= 1<<31 },
827+
4 => { is_error |= self.update_initial_window_size(v, context) },
832828
5 => { self.peer_settings.settings_max_frame_size = v; is_error |= v >= 1<<24 || v < 1<<14 },
833829
6 => { self.peer_settings.settings_max_header_list_size = v },
834830
8 => { self.peer_settings.settings_enable_connect_protocol = v == 1; is_error |= v > 1 },
@@ -881,23 +877,29 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
881877
}) => {
882878
let increment = increment as i32;
883879
if stream_id == 0 {
884-
if increment > i32::MAX - self.window {
885-
return self.goaway(H2Error::FlowControlError);
880+
if let Some(window) = self.window.checked_add(increment) {
881+
if self.window <= 0 && window > 0 {
882+
self.readiness.interest.insert(Ready::WRITABLE);
883+
}
884+
self.window = window;
886885
} else {
887-
self.window += increment;
886+
return self.goaway(H2Error::FlowControlError);
888887
}
889888
} else {
890889
if let Some(global_stream_id) = self.streams.get(&stream_id) {
891890
let stream = &mut context.streams[*global_stream_id];
892-
if increment > i32::MAX - stream.window {
891+
if let Some(window) = stream.window.checked_add(increment) {
892+
if stream.window <= 0 && window > 0 {
893+
self.readiness.interest.insert(Ready::WRITABLE);
894+
}
895+
stream.window = window;
896+
} else {
893897
return self.reset_stream(
894898
*global_stream_id,
895899
context,
896900
endpoint,
897901
H2Error::FlowControlError,
898902
);
899-
} else {
900-
stream.window += increment;
901903
}
902904
} else {
903905
println_!(
@@ -911,6 +913,36 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
911913
MuxResult::Continue
912914
}
913915

916+
fn update_initial_window_size<L>(&mut self, value: u32, context: &mut Context<L>) -> bool
917+
where
918+
L: ListenerHandler + L7ListenerHandler,
919+
{
920+
if value >= 1 << 31 {
921+
return true;
922+
}
923+
let delta = value as i32 - self.peer_settings.settings_initial_window_size as i32;
924+
println!(
925+
"INITIAL_WINDOW_SIZE: {} -> {} => {}",
926+
self.peer_settings.settings_initial_window_size, value, delta
927+
);
928+
let mut open_window = false;
929+
for (i, stream) in context.streams.iter_mut().enumerate() {
930+
println!(
931+
" - stream_{i}: {} -> {}",
932+
stream.window,
933+
stream.window + delta
934+
);
935+
open_window |= stream.window <= 0 && stream.window + delta > 0;
936+
stream.window += delta;
937+
}
938+
println_!("UPDATE INIT WINDOW: {open_window} {:?}", self.readiness);
939+
if open_window {
940+
self.readiness.interest.insert(Ready::WRITABLE);
941+
}
942+
self.peer_settings.settings_initial_window_size = value;
943+
false
944+
}
945+
914946
pub fn force_disconnect(&mut self) -> MuxResult {
915947
self.state = H2State::Error;
916948
match self.position {

lib/src/protocol/mux/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ impl<Front: SocketHandler> Connection<Front> {
275275
state: H2State::ClientPreface,
276276
streams: HashMap::new(),
277277
timeout_container,
278-
window: 1 << 16,
278+
window: (1 << 16) - 1,
279279
zero: kawa::Kawa::new(kawa::Kind::Request, kawa::Buffer::new(buffer)),
280280
}))
281281
}
@@ -306,7 +306,7 @@ impl<Front: SocketHandler> Connection<Front> {
306306
state: H2State::ClientPreface,
307307
streams: HashMap::new(),
308308
timeout_container,
309-
window: 1 << 16,
309+
window: (1 << 16) - 1,
310310
zero: kawa::Kawa::new(kawa::Kind::Request, kawa::Buffer::new(buffer)),
311311
}))
312312
}
@@ -586,6 +586,7 @@ pub struct Stream {
586586
/// This struct allows to mutably borrow the read and write buffers (dependant on the position)
587587
/// as well as the context of a Stream at the same time
588588
pub struct StreamParts<'a> {
589+
pub window: &'a mut i32,
589590
pub rbuffer: &'a mut GenericHttpStream,
590591
pub wbuffer: &'a mut GenericHttpStream,
591592
pub context: &'a mut HttpContext,
@@ -616,11 +617,13 @@ impl Stream {
616617
pub fn split(&mut self, position: &Position) -> StreamParts<'_> {
617618
match position {
618619
Position::Client(_) => StreamParts {
620+
window: &mut self.window,
619621
rbuffer: &mut self.back,
620622
wbuffer: &mut self.front,
621623
context: &mut self.context,
622624
},
623625
Position::Server => StreamParts {
626+
window: &mut self.window,
624627
rbuffer: &mut self.front,
625628
wbuffer: &mut self.back,
626629
context: &mut self.context,

lib/src/protocol/mux/pkawa.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,6 @@ use crate::{
1717
},
1818
};
1919

20-
trait AdHocStore {
21-
fn len(&self) -> usize;
22-
}
23-
impl AdHocStore for Store {
24-
fn len(&self) -> usize {
25-
match self {
26-
Store::Empty => 0,
27-
Store::Slice(slice) | Store::Detached(slice) => slice.len(),
28-
Store::Static(s) => s.len(),
29-
Store::Alloc(a, i) => a.len() - *i as usize,
30-
}
31-
}
32-
}
33-
3420
pub fn handle_header<C>(
3521
decoder: &mut hpack::Decoder,
3622
prioriser: &mut Prioriser,

0 commit comments

Comments
 (0)