@@ -286,6 +286,15 @@ class XdsOverrideHostLb final : public LoadBalancingPolicy {
286
286
address_list_ = std::move (address_list);
287
287
}
288
288
289
+ const ChannelArgs& per_endpoint_args () const
290
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
291
+ return per_endpoint_args_;
292
+ }
293
+ void set_per_endpoint_args (ChannelArgs per_endpoint_args)
294
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
295
+ per_endpoint_args_ = std::move (per_endpoint_args);
296
+ }
297
+
289
298
Timestamp last_used_time () const
290
299
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
291
300
return last_used_time_;
@@ -304,6 +313,7 @@ class XdsOverrideHostLb final : public LoadBalancingPolicy {
304
313
&XdsOverrideHostLb::mu_) = XdsHealthStatus(XdsHealthStatus::kUnknown );
305
314
RefCountedStringValue address_list_
306
315
ABSL_GUARDED_BY (&XdsOverrideHostLb::mu_);
316
+ ChannelArgs per_endpoint_args_ ABSL_GUARDED_BY (&XdsOverrideHostLb::mu_);
307
317
Timestamp last_used_time_ ABSL_GUARDED_BY (&XdsOverrideHostLb::mu_) =
308
318
Timestamp::InfPast();
309
319
};
@@ -793,8 +803,12 @@ void XdsOverrideHostLb::UpdateAddressMap(
793
803
struct AddressInfo {
794
804
XdsHealthStatus eds_health_status;
795
805
RefCountedStringValue address_list;
796
- AddressInfo (XdsHealthStatus status, RefCountedStringValue addresses)
797
- : eds_health_status(status), address_list(std::move(addresses)) {}
806
+ ChannelArgs per_endpoint_args;
807
+ AddressInfo (XdsHealthStatus status, RefCountedStringValue addresses,
808
+ ChannelArgs args)
809
+ : eds_health_status(status),
810
+ address_list (std::move(addresses)),
811
+ per_endpoint_args(std::move(args)) {}
798
812
};
799
813
std::map<const std::string, AddressInfo> addresses_for_map;
800
814
endpoints.ForEach([&](const EndpointAddresses& endpoint) {
@@ -830,7 +844,8 @@ void XdsOverrideHostLb::UpdateAddressMap(
830
844
(end.empty () ? " " : " ," ), end));
831
845
addresses_for_map.emplace (
832
846
std::piecewise_construct, std::forward_as_tuple (addresses[i]),
833
- std::forward_as_tuple (status, std::move (address_list)));
847
+ std::forward_as_tuple (status, std::move (address_list),
848
+ endpoint.args ()));
834
849
}
835
850
});
836
851
// Now grab the lock and update subchannel_map_ from addresses_for_map.
@@ -865,9 +880,12 @@ void XdsOverrideHostLb::UpdateAddressMap(
865
880
<< " [xds_override_host_lb " << this << " ] map key " << address
866
881
<< " : setting "
867
882
<< " eds_health_status=" << address_info.eds_health_status .ToString ()
868
- << " address_list=" << address_info.address_list .c_str ();
883
+ << " address_list=" << address_info.address_list .c_str ()
884
+ << " per_endpoint_args=" << address_info.per_endpoint_args .ToString ();
869
885
it->second ->set_eds_health_status (address_info.eds_health_status );
870
886
it->second ->set_address_list (std::move (address_info.address_list ));
887
+ it->second ->set_per_endpoint_args (
888
+ std::move (address_info.per_endpoint_args ));
871
889
// Check the entry's last_used_time to determine the next time at
872
890
// which the timer needs to run.
873
891
if (it->second ->last_used_time () > idle_threshold) {
@@ -908,14 +926,38 @@ void XdsOverrideHostLb::CreateSubchannelForAddress(absl::string_view address) {
908
926
<< address;
909
927
auto addr = StringToSockaddr (address);
910
928
CHECK (addr.ok ());
911
- // Note: We don't currently have any cases where per_address_args need to
912
- // be passed through. If we encounter any such cases in the future, we
913
- // will need to change this to store those attributes from the resolver
914
- // update in the map entry.
929
+ // We need to do 3 things here:
930
+ // 1. Get the per-endpoint args from the entry in subchannel_map_.
931
+ // 2. Create the subchannel using those per-endpoint args.
932
+ // 3. Wrap the subchannel and store the wrapper in subchannel_map_.
933
+ //
934
+ // Steps 1 and 3 require holding the lock, but we don't want to hold
935
+ // the lock in step 2, since we're calling into arbitrary code in the
936
+ // channel. Unfortunately, this means we need to grab and release the
937
+ // lock twice -- and each time, we need to check if some other thread
938
+ // has preempted us.
939
+ //
940
+ // Step 1.
941
+ ChannelArgs per_endpoint_args;
942
+ {
943
+ MutexLock lock (&mu_);
944
+ auto it = subchannel_map_.find (address);
945
+ // This can happen if the map entry was removed between the time that
946
+ // the picker requested the subchannel creation and the time that we got
947
+ // here. In that case, we can just make it a no-op, since the update
948
+ // that removed the entry will have generated a new picker already.
949
+ if (it == subchannel_map_.end ()) return ;
950
+ // This can happen if the picker requests subchannel creation for
951
+ // the same address multiple times.
952
+ if (it->second ->HasOwnedSubchannel ()) return ;
953
+ per_endpoint_args = it->second ->per_endpoint_args ();
954
+ }
955
+ // Step 2.
915
956
auto subchannel = channel_control_helper ()->CreateSubchannel (
916
- *addr, /* per_address_args= */ ChannelArgs () , args_);
957
+ *addr, per_endpoint_args , args_);
917
958
auto wrapper = MakeRefCounted<SubchannelWrapper>(
918
959
std::move (subchannel), RefAsSubclass<XdsOverrideHostLb>());
960
+ // Step 3.
919
961
{
920
962
MutexLock lock (&mu_);
921
963
auto it = subchannel_map_.find (address);
0 commit comments