Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pgdog/src/backend/pool/connection/binding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl Binding {
match self {
Binding::Admin(admin) => admin.done(),
Binding::Server(Some(server)) => server.done(),
Binding::MultiShard(servers, _state) => servers.iter().all(|s| s.done()),
Binding::MultiShard(servers, state) => servers.iter().all(|s| s.done()) && state.done(),
Binding::Replication(Some(server), _) => server.done(),
_ => true,
}
Expand Down
28 changes: 27 additions & 1 deletion pgdog/src/backend/pool/connection/multi_shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod context;
#[cfg(test)]
mod test;

#[derive(Default, Debug)]
#[derive(Default, Debug, PartialEq)]
struct Counters {
rows: usize,
ready_for_query: usize,
Expand Down Expand Up @@ -224,4 +224,30 @@ impl MultiShard {
Context::RowDescription(rd) => self.decoder.row_description(rd),
}
}

pub(super) fn done(&self) -> bool {
// All or none.
let ready_for_query = self.counters.ready_for_query % self.shards == 0;
let command_complete_count = self.counters.command_complete_count % self.shards == 0;
let empty_query_response = self.counters.empty_query_response % self.shards == 0;
let copy_in = self.counters.copy_in % self.shards == 0;
let parse_complete = self.counters.parse_complete % self.shards == 0;
let parameter_description = self.counters.parameter_description % self.shards == 0;
let no_data = self.counters.no_data % self.shards == 0;
let row_description = self.counters.row_description % self.shards == 0;
let close_complete = self.counters.close_complete % self.shards == 0;
let bind_complete = self.counters.bind_complete % self.shards == 0;
self.buffer.is_empty()
&& self.counters.command_complete.is_none()
&& ready_for_query
&& command_complete_count
&& empty_query_response
&& copy_in
&& parse_complete
&& parameter_description
&& no_data
&& row_description
&& close_complete
&& bind_complete
}
}
6 changes: 6 additions & 0 deletions pgdog/src/backend/pool/connection/multi_shard/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@ fn test_rd_before_dr() {
.forward(rd.message().unwrap().backend())
.unwrap();
assert!(result.is_none()); // dropped
assert!(!multi_shard.done());
let result = multi_shard
.forward(dr.message().unwrap().backend())
.unwrap();
assert!(result.is_none()); // buffered.
assert!(!multi_shard.done());
}

let result = multi_shard.forward(rd.message().unwrap()).unwrap();
assert_eq!(result, Some(rd.message().unwrap()));
let result = multi_shard.message();
// Waiting for command complete
assert!(result.is_none());
assert!(!multi_shard.done());

for _ in 0..3 {
let result = multi_shard
Expand All @@ -35,6 +38,7 @@ fn test_rd_before_dr() {
)
.unwrap();
assert!(result.is_none());
assert!(!multi_shard.done());
}

for _ in 0..2 {
Expand All @@ -43,6 +47,7 @@ fn test_rd_before_dr() {
result.map(|m| m.backend()),
Some(dr.message().unwrap().backend())
);
assert!(!multi_shard.done());
}

let result = multi_shard.message().map(|m| m.backend());
Expand All @@ -58,4 +63,5 @@ fn test_rd_before_dr() {

// Buffer is empty.
assert!(multi_shard.message().is_none());
assert!(multi_shard.done());
}
Loading