diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 8445596a..4d6b82a4 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -202,7 +202,7 @@ impl Binding { match self { Binding::Admin(admin) => admin.done(), Binding::Server(Some(server)) => server.done(), - Binding::MultiShard(servers, _state) => servers.iter().all(|s| s.done()), + Binding::MultiShard(servers, state) => servers.iter().all(|s| s.done()) && state.done(), Binding::Replication(Some(server), _) => server.done(), _ => true, } diff --git a/pgdog/src/backend/pool/connection/multi_shard/mod.rs b/pgdog/src/backend/pool/connection/multi_shard/mod.rs index e6399f7a..cb289700 100644 --- a/pgdog/src/backend/pool/connection/multi_shard/mod.rs +++ b/pgdog/src/backend/pool/connection/multi_shard/mod.rs @@ -19,7 +19,7 @@ mod context; #[cfg(test)] mod test; -#[derive(Default, Debug)] +#[derive(Default, Debug, PartialEq)] struct Counters { rows: usize, ready_for_query: usize, @@ -224,4 +224,30 @@ impl MultiShard { Context::RowDescription(rd) => self.decoder.row_description(rd), } } + + pub(super) fn done(&self) -> bool { + // All or none. + let ready_for_query = self.counters.ready_for_query % self.shards == 0; + let command_complete_count = self.counters.command_complete_count % self.shards == 0; + let empty_query_response = self.counters.empty_query_response % self.shards == 0; + let copy_in = self.counters.copy_in % self.shards == 0; + let parse_complete = self.counters.parse_complete % self.shards == 0; + let parameter_description = self.counters.parameter_description % self.shards == 0; + let no_data = self.counters.no_data % self.shards == 0; + let row_description = self.counters.row_description % self.shards == 0; + let close_complete = self.counters.close_complete % self.shards == 0; + let bind_complete = self.counters.bind_complete % self.shards == 0; + self.buffer.is_empty() + && self.counters.command_complete.is_none() + && ready_for_query + && command_complete_count + && empty_query_response + && copy_in + && parse_complete + && parameter_description + && no_data + && row_description + && close_complete + && bind_complete + } } diff --git a/pgdog/src/backend/pool/connection/multi_shard/test.rs b/pgdog/src/backend/pool/connection/multi_shard/test.rs index 9b4bf7a0..064336a3 100644 --- a/pgdog/src/backend/pool/connection/multi_shard/test.rs +++ b/pgdog/src/backend/pool/connection/multi_shard/test.rs @@ -13,10 +13,12 @@ fn test_rd_before_dr() { .forward(rd.message().unwrap().backend()) .unwrap(); assert!(result.is_none()); // dropped + assert!(!multi_shard.done()); let result = multi_shard .forward(dr.message().unwrap().backend()) .unwrap(); assert!(result.is_none()); // buffered. + assert!(!multi_shard.done()); } let result = multi_shard.forward(rd.message().unwrap()).unwrap(); @@ -24,6 +26,7 @@ fn test_rd_before_dr() { let result = multi_shard.message(); // Waiting for command complete assert!(result.is_none()); + assert!(!multi_shard.done()); for _ in 0..3 { let result = multi_shard @@ -35,6 +38,7 @@ fn test_rd_before_dr() { ) .unwrap(); assert!(result.is_none()); + assert!(!multi_shard.done()); } for _ in 0..2 { @@ -43,6 +47,7 @@ fn test_rd_before_dr() { result.map(|m| m.backend()), Some(dr.message().unwrap().backend()) ); + assert!(!multi_shard.done()); } let result = multi_shard.message().map(|m| m.backend()); @@ -58,4 +63,5 @@ fn test_rd_before_dr() { // Buffer is empty. assert!(multi_shard.message().is_none()); + assert!(multi_shard.done()); }