From 3b244db9357ad14734e7a6856413ea52b07c667e Mon Sep 17 00:00:00 2001 From: Cathy Zhao Date: Mon, 7 Jul 2025 23:13:28 +0000 Subject: [PATCH 1/8] added exit idle --- grpc/src/client/load_balancing/child_manager.rs | 4 ++++ grpc/src/client/load_balancing/mod.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs index 0d4af6542..4d59aa43d 100644 --- a/grpc/src/client/load_balancing/child_manager.rs +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -262,6 +262,10 @@ impl LbPolicy for ChildManager self.resolve_child_controller(channel_controller, child_idx); } } + + fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) { + // TODO(cjzhao): add exit idle logic once everything has been merged in + } } struct WrappedController<'a> { diff --git a/grpc/src/client/load_balancing/mod.rs b/grpc/src/client/load_balancing/mod.rs index b91950c31..4275d0c10 100644 --- a/grpc/src/client/load_balancing/mod.rs +++ b/grpc/src/client/load_balancing/mod.rs @@ -169,6 +169,10 @@ pub trait LbPolicy: Send { /// Called by the channel in response to a call from the LB policy to the /// WorkScheduler's request_work method. fn work(&mut self, channel_controller: &mut dyn ChannelController); + + /// Called by policy when it needs to exit the idle state and start connecting + /// to subchannels. + fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController); } /// Controls channel behaviors. From a6372a66372463fb8fa37d0e2dcf3c4ec2ad8188 Mon Sep 17 00:00:00 2001 From: Cathy Zhao Date: Mon, 7 Jul 2025 23:21:12 +0000 Subject: [PATCH 2/8] added todo --- grpc/src/client/load_balancing/child_manager.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs index 4d59aa43d..5c53c635d 100644 --- a/grpc/src/client/load_balancing/child_manager.rs +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -264,6 +264,7 @@ impl LbPolicy for ChildManager } fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) { + todo!() // TODO(cjzhao): add exit idle logic once everything has been merged in } } From 5116e245013d7b72b4cd684bd45d159d7d2f3998 Mon Sep 17 00:00:00 2001 From: Cathy Zhao Date: Tue, 8 Jul 2025 23:58:26 +0000 Subject: [PATCH 3/8] implemented review comments --- grpc/src/client/load_balancing/child_manager.rs | 4 ++-- grpc/src/client/load_balancing/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs index 5c53c635d..ed6e621ba 100644 --- a/grpc/src/client/load_balancing/child_manager.rs +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -263,8 +263,8 @@ impl LbPolicy for ChildManager } } - fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) { - todo!() + fn exit_idle(&mut self, _channel_controller: &mut dyn ChannelController) { + todo!("implement exit_idle") // TODO(cjzhao): add exit idle logic once everything has been merged in } } diff --git a/grpc/src/client/load_balancing/mod.rs b/grpc/src/client/load_balancing/mod.rs index 4275d0c10..18048bdfe 100644 --- a/grpc/src/client/load_balancing/mod.rs +++ b/grpc/src/client/load_balancing/mod.rs @@ -170,8 +170,8 @@ pub trait LbPolicy: Send { /// WorkScheduler's request_work method. fn work(&mut self, channel_controller: &mut dyn ChannelController); - /// Called by policy when it needs to exit the idle state and start connecting - /// to subchannels. + /// Called by the channel when a LbPolicy goes Idle and the channel + /// wants it to start connecting to subchannels again. fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController); } From 27ed8327f4c489d48a9f18b01d1ca7d3eb7243e4 Mon Sep 17 00:00:00 2001 From: Cathy Zhao Date: Thu, 7 Aug 2025 16:16:38 +0000 Subject: [PATCH 4/8] implemented comment fixes --- grpc/src/client/load_balancing/child_manager.rs | 1 - grpc/src/client/load_balancing/pick_first.rs | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs index ed6e621ba..fea17bfe5 100644 --- a/grpc/src/client/load_balancing/child_manager.rs +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -265,7 +265,6 @@ impl LbPolicy for ChildManager fn exit_idle(&mut self, _channel_controller: &mut dyn ChannelController) { todo!("implement exit_idle") - // TODO(cjzhao): add exit idle logic once everything has been merged in } } diff --git a/grpc/src/client/load_balancing/pick_first.rs b/grpc/src/client/load_balancing/pick_first.rs index ed7ae76f6..54ae78711 100644 --- a/grpc/src/client/load_balancing/pick_first.rs +++ b/grpc/src/client/load_balancing/pick_first.rs @@ -99,6 +99,10 @@ impl LbPolicy for PickFirstPolicy { } fn work(&mut self, channel_controller: &mut dyn ChannelController) {} + + fn exit_idle(&mut self, _channel_controller: &mut dyn ChannelController) { + todo!("implement exit_idle") + } } struct OneSubchannelPicker { From 8b24ab161cd8fb47fe65d4e383b54adb038adea4 Mon Sep 17 00:00:00 2001 From: Cathy Zhao Date: Thu, 7 Aug 2025 18:13:21 +0000 Subject: [PATCH 5/8] fix comment --- grpc/src/client/load_balancing/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpc/src/client/load_balancing/mod.rs b/grpc/src/client/load_balancing/mod.rs index 18048bdfe..7cb562660 100644 --- a/grpc/src/client/load_balancing/mod.rs +++ b/grpc/src/client/load_balancing/mod.rs @@ -170,7 +170,7 @@ pub trait LbPolicy: Send { /// WorkScheduler's request_work method. fn work(&mut self, channel_controller: &mut dyn ChannelController); - /// Called by the channel when a LbPolicy goes Idle and the channel + /// Called by the channel when an LbPolicy goes idle and the channel /// wants it to start connecting to subchannels again. fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController); } From b0aa1ee8f438a4d57a42f387740490e53c52e986 Mon Sep 17 00:00:00 2001 From: Cathy Zhao Date: Mon, 7 Jul 2025 18:18:29 +0000 Subject: [PATCH 6/8] sync master branch --- .../client/load_balancing/child_manager.rs | 210 ++++++++++++++++++ 1 file changed, 210 insertions(+) diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs index fea17bfe5..c0c48a222 100644 --- a/grpc/src/client/load_balancing/child_manager.rs +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -32,10 +32,20 @@ use std::collections::HashSet; use std::sync::Mutex; use std::{collections::HashMap, error::Error, hash::Hash, mem, sync::Arc}; +use std::{ + fmt::Debug, + sync::atomic::{AtomicUsize, Ordering}, +}; use crate::client::load_balancing::{ +<<<<<<< HEAD ChannelController, LbConfig, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState, WeakSubchannel, WorkScheduler, +======= + ChannelController, ConnectivityState, ExternalSubchannel, Failing, LbConfig, LbPolicy, + LbPolicyBuilder, LbPolicyOptions, LbState, ParsedJsonLbConfig, PickResult, Picker, + QueuingPicker, Subchannel, SubchannelState, WeakSubchannel, WorkScheduler, GLOBAL_LB_REGISTRY, +>>>>>>> f7537e6 (fixed some logic for review) }; use crate::client::name_resolution::{Address, ResolverUpdate}; @@ -47,8 +57,18 @@ pub struct ChildManager { children: Vec>, update_sharder: Box>, pending_work: Arc>>, +<<<<<<< HEAD +} + +======= + updated: bool, // true if a child has updated its state since the last call to has_updated. + prev_state: ConnectivityState, + last_ready_pickers: Vec>, } +pub trait ChildIdentifier: PartialEq + Hash + Eq + Send + Sync + Debug + 'static {} + +>>>>>>> f7537e6 (fixed some logic for review) struct Child { identifier: T, policy: Box, @@ -87,6 +107,12 @@ impl ChildManager { subchannel_child_map: Default::default(), children: Default::default(), pending_work: Default::default(), +<<<<<<< HEAD +======= + updated: false, + prev_state: ConnectivityState::Idle, + last_ready_pickers: Vec::new(), +>>>>>>> f7537e6 (fixed some logic for review) } } @@ -97,6 +123,14 @@ impl ChildManager { .map(|child| (&child.identifier, &child.state)) } +<<<<<<< HEAD +======= + /// Returns true if a child has produced an update and resets flag to false. + pub fn has_updated(&mut self) -> bool { + mem::take(&mut self.updated) + } + +>>>>>>> f7537e6 (fixed some logic for review) // Called to update all accounting in the ChildManager from operations // performed by a child policy on the WrappedController that was created for // it. child_idx is an index into the children map for the relevant child. @@ -116,7 +150,135 @@ impl ChildManager { // Update the tracked state if the child produced an update. if let Some(state) = channel_controller.picker_update { self.children[child_idx].state = state; +<<<<<<< HEAD + }; +======= + self.updated = true; + }; + } + + /// Called to aggregate states from children policies then returns a update. + pub fn aggregate_states(&mut self) -> Option { + let current_connectivity_state = self.prev_state.clone(); + let child_states_vec = self.child_states(); + + // Construct pickers to return. + let mut ready_pickers = RoundRobinPicker::new(); + + let mut has_connecting = false; + let mut has_ready = false; + let mut is_transient_failure = true; + + for (child_id, state) in child_states_vec { + match state.connectivity_state { + ConnectivityState::Idle => { + has_connecting = true; + is_transient_failure = false; + } + ConnectivityState::Connecting => { + has_connecting = true; + is_transient_failure = false; + } + ConnectivityState::Ready => { + ready_pickers.add_picker(state.picker.clone()); + is_transient_failure = false; + has_ready = true; + } + _ => {} + } + } + + // Decide the new aggregate state. + let new_state = if has_ready { + ConnectivityState::Ready + } else if has_connecting { + ConnectivityState::Connecting + } else if is_transient_failure { + ConnectivityState::TransientFailure + } else { + ConnectivityState::Connecting }; + + // Now update state and send picker as appropriate. + match new_state { + ConnectivityState::Ready => { + let pickers_vec = ready_pickers.pickers.clone(); + let picker: Arc = Arc::new(ready_pickers); + let should_update = + !self.compare_prev_to_new_pickers(&self.last_ready_pickers, &pickers_vec); + + if should_update || self.prev_state != ConnectivityState::Ready { + self.prev_state = ConnectivityState::Ready; + self.last_ready_pickers = pickers_vec; + return Some(LbState { + connectivity_state: ConnectivityState::Ready, + picker, + }); + } else { + return None; + } + } + ConnectivityState::Connecting => { + if self.prev_state == ConnectivityState::TransientFailure + && new_state != ConnectivityState::Ready + { + return None; + } + if self.prev_state != ConnectivityState::Connecting { + let picker = Arc::new(QueuingPicker {}); + self.prev_state = ConnectivityState::Connecting; + return Some(LbState { + connectivity_state: ConnectivityState::Connecting, + picker, + }); + } else { + return None; + } + } + ConnectivityState::Idle => { + let picker = Arc::new(QueuingPicker {}); + self.prev_state = ConnectivityState::Connecting; + return Some(LbState { + connectivity_state: ConnectivityState::Connecting, + picker, + }); + } + ConnectivityState::TransientFailure => { + if current_connectivity_state != ConnectivityState::TransientFailure { + self.prev_state = ConnectivityState::TransientFailure; + let picker = Arc::new(Failing { + error: "No children available".to_string(), + }); + return Some(LbState { + connectivity_state: ConnectivityState::TransientFailure, + picker: picker, + }); + } else { + return None; + } + } + } + } +} + +impl ChildManager { + fn compare_prev_to_new_pickers( + &self, + old_pickers: &[Arc], + new_pickers: &[Arc], + ) -> bool { + // If length is different, then definitely not the same picker. + if old_pickers.len() != new_pickers.len() { + return false; + } + // Compares two vectors of pickers by pointer equality and returns true if all pickers are the same. + for (x, y) in old_pickers.iter().zip(new_pickers.iter()) { + if !Arc::ptr_eq(x, y) { + return false; + } + } + true +>>>>>>> f7537e6 (fixed some logic for review) } } @@ -263,8 +425,20 @@ impl LbPolicy for ChildManager } } +<<<<<<< HEAD fn exit_idle(&mut self, _channel_controller: &mut dyn ChannelController) { todo!("implement exit_idle") +======= + fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) { + let child_idxes = mem::take(&mut *self.pending_work.lock().unwrap()); + for child_idx in child_idxes { + let mut channel_controller = WrappedController::new(channel_controller); + self.children[child_idx] + .policy + .exit_idle(&mut channel_controller); + self.resolve_child_controller(channel_controller, child_idx); + } +>>>>>>> f7537e6 (fixed some logic for review) } } @@ -292,7 +466,11 @@ impl ChannelController for WrappedController<'_> { } fn update_picker(&mut self, update: LbState) { +<<<<<<< HEAD self.picker_update = Some(update); +======= + self.picker_update = Some(update.clone()); +>>>>>>> f7537e6 (fixed some logic for review) } fn request_resolution(&mut self) { @@ -313,3 +491,35 @@ impl WorkScheduler for ChildWorkScheduler { } } } +<<<<<<< HEAD +======= + +struct RoundRobinPicker { + pickers: Vec>, + next: AtomicUsize, +} + +impl RoundRobinPicker { + fn new() -> Self { + Self { + pickers: vec![], + next: AtomicUsize::new(0), + } + } + + fn add_picker(&mut self, picker: Arc) { + self.pickers.push(picker); + } +} + +impl Picker for RoundRobinPicker { + fn pick(&self, request: &Request) -> PickResult { + let len = self.pickers.len(); + if len == 0 { + return PickResult::Queue; + } + let idx = self.next.fetch_add(1, Ordering::Relaxed) % len; + self.pickers[idx].pick(request) + } +} +>>>>>>> f7537e6 (fixed some logic for review) From 0ad44d2e96dbcbf1ec4ba26f14331c5c2ba70318 Mon Sep 17 00:00:00 2001 From: Cathy Zhao Date: Fri, 8 Aug 2025 20:48:22 +0000 Subject: [PATCH 7/8] fix conflicts --- .../client/load_balancing/child_manager.rs | 204 ------------------ 1 file changed, 204 deletions(-) diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs index c0c48a222..78b689842 100644 --- a/grpc/src/client/load_balancing/child_manager.rs +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -32,20 +32,10 @@ use std::collections::HashSet; use std::sync::Mutex; use std::{collections::HashMap, error::Error, hash::Hash, mem, sync::Arc}; -use std::{ - fmt::Debug, - sync::atomic::{AtomicUsize, Ordering}, -}; use crate::client::load_balancing::{ -<<<<<<< HEAD ChannelController, LbConfig, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState, WeakSubchannel, WorkScheduler, -======= - ChannelController, ConnectivityState, ExternalSubchannel, Failing, LbConfig, LbPolicy, - LbPolicyBuilder, LbPolicyOptions, LbState, ParsedJsonLbConfig, PickResult, Picker, - QueuingPicker, Subchannel, SubchannelState, WeakSubchannel, WorkScheduler, GLOBAL_LB_REGISTRY, ->>>>>>> f7537e6 (fixed some logic for review) }; use crate::client::name_resolution::{Address, ResolverUpdate}; @@ -57,18 +47,8 @@ pub struct ChildManager { children: Vec>, update_sharder: Box>, pending_work: Arc>>, -<<<<<<< HEAD } -======= - updated: bool, // true if a child has updated its state since the last call to has_updated. - prev_state: ConnectivityState, - last_ready_pickers: Vec>, -} - -pub trait ChildIdentifier: PartialEq + Hash + Eq + Send + Sync + Debug + 'static {} - ->>>>>>> f7537e6 (fixed some logic for review) struct Child { identifier: T, policy: Box, @@ -123,14 +103,6 @@ impl ChildManager { .map(|child| (&child.identifier, &child.state)) } -<<<<<<< HEAD -======= - /// Returns true if a child has produced an update and resets flag to false. - pub fn has_updated(&mut self) -> bool { - mem::take(&mut self.updated) - } - ->>>>>>> f7537e6 (fixed some logic for review) // Called to update all accounting in the ChildManager from operations // performed by a child policy on the WrappedController that was created for // it. child_idx is an index into the children map for the relevant child. @@ -150,136 +122,8 @@ impl ChildManager { // Update the tracked state if the child produced an update. if let Some(state) = channel_controller.picker_update { self.children[child_idx].state = state; -<<<<<<< HEAD - }; -======= - self.updated = true; }; } - - /// Called to aggregate states from children policies then returns a update. - pub fn aggregate_states(&mut self) -> Option { - let current_connectivity_state = self.prev_state.clone(); - let child_states_vec = self.child_states(); - - // Construct pickers to return. - let mut ready_pickers = RoundRobinPicker::new(); - - let mut has_connecting = false; - let mut has_ready = false; - let mut is_transient_failure = true; - - for (child_id, state) in child_states_vec { - match state.connectivity_state { - ConnectivityState::Idle => { - has_connecting = true; - is_transient_failure = false; - } - ConnectivityState::Connecting => { - has_connecting = true; - is_transient_failure = false; - } - ConnectivityState::Ready => { - ready_pickers.add_picker(state.picker.clone()); - is_transient_failure = false; - has_ready = true; - } - _ => {} - } - } - - // Decide the new aggregate state. - let new_state = if has_ready { - ConnectivityState::Ready - } else if has_connecting { - ConnectivityState::Connecting - } else if is_transient_failure { - ConnectivityState::TransientFailure - } else { - ConnectivityState::Connecting - }; - - // Now update state and send picker as appropriate. - match new_state { - ConnectivityState::Ready => { - let pickers_vec = ready_pickers.pickers.clone(); - let picker: Arc = Arc::new(ready_pickers); - let should_update = - !self.compare_prev_to_new_pickers(&self.last_ready_pickers, &pickers_vec); - - if should_update || self.prev_state != ConnectivityState::Ready { - self.prev_state = ConnectivityState::Ready; - self.last_ready_pickers = pickers_vec; - return Some(LbState { - connectivity_state: ConnectivityState::Ready, - picker, - }); - } else { - return None; - } - } - ConnectivityState::Connecting => { - if self.prev_state == ConnectivityState::TransientFailure - && new_state != ConnectivityState::Ready - { - return None; - } - if self.prev_state != ConnectivityState::Connecting { - let picker = Arc::new(QueuingPicker {}); - self.prev_state = ConnectivityState::Connecting; - return Some(LbState { - connectivity_state: ConnectivityState::Connecting, - picker, - }); - } else { - return None; - } - } - ConnectivityState::Idle => { - let picker = Arc::new(QueuingPicker {}); - self.prev_state = ConnectivityState::Connecting; - return Some(LbState { - connectivity_state: ConnectivityState::Connecting, - picker, - }); - } - ConnectivityState::TransientFailure => { - if current_connectivity_state != ConnectivityState::TransientFailure { - self.prev_state = ConnectivityState::TransientFailure; - let picker = Arc::new(Failing { - error: "No children available".to_string(), - }); - return Some(LbState { - connectivity_state: ConnectivityState::TransientFailure, - picker: picker, - }); - } else { - return None; - } - } - } - } -} - -impl ChildManager { - fn compare_prev_to_new_pickers( - &self, - old_pickers: &[Arc], - new_pickers: &[Arc], - ) -> bool { - // If length is different, then definitely not the same picker. - if old_pickers.len() != new_pickers.len() { - return false; - } - // Compares two vectors of pickers by pointer equality and returns true if all pickers are the same. - for (x, y) in old_pickers.iter().zip(new_pickers.iter()) { - if !Arc::ptr_eq(x, y) { - return false; - } - } - true ->>>>>>> f7537e6 (fixed some logic for review) - } } impl LbPolicy for ChildManager { @@ -425,20 +269,8 @@ impl LbPolicy for ChildManager } } -<<<<<<< HEAD fn exit_idle(&mut self, _channel_controller: &mut dyn ChannelController) { todo!("implement exit_idle") -======= - fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) { - let child_idxes = mem::take(&mut *self.pending_work.lock().unwrap()); - for child_idx in child_idxes { - let mut channel_controller = WrappedController::new(channel_controller); - self.children[child_idx] - .policy - .exit_idle(&mut channel_controller); - self.resolve_child_controller(channel_controller, child_idx); - } ->>>>>>> f7537e6 (fixed some logic for review) } } @@ -466,11 +298,7 @@ impl ChannelController for WrappedController<'_> { } fn update_picker(&mut self, update: LbState) { -<<<<<<< HEAD self.picker_update = Some(update); -======= - self.picker_update = Some(update.clone()); ->>>>>>> f7537e6 (fixed some logic for review) } fn request_resolution(&mut self) { @@ -491,35 +319,3 @@ impl WorkScheduler for ChildWorkScheduler { } } } -<<<<<<< HEAD -======= - -struct RoundRobinPicker { - pickers: Vec>, - next: AtomicUsize, -} - -impl RoundRobinPicker { - fn new() -> Self { - Self { - pickers: vec![], - next: AtomicUsize::new(0), - } - } - - fn add_picker(&mut self, picker: Arc) { - self.pickers.push(picker); - } -} - -impl Picker for RoundRobinPicker { - fn pick(&self, request: &Request) -> PickResult { - let len = self.pickers.len(); - if len == 0 { - return PickResult::Queue; - } - let idx = self.next.fetch_add(1, Ordering::Relaxed) % len; - self.pickers[idx].pick(request) - } -} ->>>>>>> f7537e6 (fixed some logic for review) From ed8f68f30e5a546e9b4d4536f8d5af291813d344 Mon Sep 17 00:00:00 2001 From: Cathy Zhao Date: Fri, 8 Aug 2025 20:49:22 +0000 Subject: [PATCH 8/8] fix more conflicts --- grpc/src/client/load_balancing/child_manager.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs index 78b689842..fea17bfe5 100644 --- a/grpc/src/client/load_balancing/child_manager.rs +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -87,12 +87,6 @@ impl ChildManager { subchannel_child_map: Default::default(), children: Default::default(), pending_work: Default::default(), -<<<<<<< HEAD -======= - updated: false, - prev_state: ConnectivityState::Idle, - last_ready_pickers: Vec::new(), ->>>>>>> f7537e6 (fixed some logic for review) } }