Skip to content

Commit 3bbed3f

Browse files
committed
Prefer replicas over leader for consumer connections
The Go client currently uses equal probability distribution for consumer placement, including the leader node in the candidate pool alongside replicas. This results in consumers connecting to the leader ~33% of the time (with 2 replicas), which violates RabbitMQ Streams best practices that recommend consumers connect to replicas rather than leaders. This change modifies `BrokerForConsumerWithResolver` to only include the leader in the candidate pool when no replicas are available. When replicas exist, consumers randomly select from replicas only, guaranteeing zero probability of leader placement. This matches the .NET client behavior and follows RabbitMQ Streams best practices for consumer placement. The leader is used as a fallback when `availableReplicas == 0`, ensuring consumers can still connect when replicas are unavailable.
1 parent b8578b7 commit 3bbed3f

File tree

2 files changed

+54
-2
lines changed

2 files changed

+54
-2
lines changed

pkg/stream/client.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,10 @@ func (c *Client) queryPublisherSequence(publisherReference string, stream string
677677
}
678678

679679
func (c *Client) BrokerLeader(stream string) (*Broker, error) {
680+
return c.BrokerLeaderWithResolver(stream, nil)
681+
}
682+
683+
func (c *Client) BrokerLeaderWithResolver(stream string, resolver *AddressResolver) (*Broker, error) {
680684
streamsMetadata := c.metaData(stream)
681685
if streamsMetadata == nil {
682686
return nil, fmt.Errorf("leader error for stream for stream: %s", stream)
@@ -693,6 +697,13 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) {
693697
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
694698
streamMetadata.Leader.advHost = streamMetadata.Leader.Host
695699

700+
// If AddressResolver is configured, use it directly and skip DNS lookup
701+
if resolver != nil {
702+
streamMetadata.Leader.Host = resolver.Host
703+
streamMetadata.Leader.Port = strconv.Itoa(resolver.Port)
704+
return streamMetadata.Leader, nil
705+
}
706+
696707
res := net.Resolver{}
697708
// see: https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/317
698709
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
@@ -738,12 +749,30 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {
738749
}
739750

740751
brokers := make([]*Broker, 0, 1+len(streamMetadata.Replicas))
741-
brokers = append(brokers, streamMetadata.Leader)
752+
753+
// Count available replicas
754+
availableReplicas := 0
755+
for _, replica := range streamMetadata.Replicas {
756+
if replica != nil {
757+
availableReplicas++
758+
}
759+
}
760+
761+
// Only add leader if no replicas are available
762+
if availableReplicas == 0 {
763+
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
764+
streamMetadata.Leader.advHost = streamMetadata.Leader.Host
765+
brokers = append(brokers, streamMetadata.Leader)
766+
}
767+
768+
// Add all available replicas
742769
for idx, replica := range streamMetadata.Replicas {
743770
if replica == nil {
744771
logs.LogWarn("Stream %s replica not ready: %d", stream, idx)
745772
continue
746773
}
774+
replica.advPort = replica.Port
775+
replica.advHost = replica.Host
747776
brokers = append(brokers, replica)
748777
}
749778

pkg/stream/environment.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -619,13 +619,36 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro
619619
return nil, err
620620
}
621621

622+
// Validate that connection reached the intended broker (replica vs leader)
623+
// This is critical when using load balancers to ensure proper consumer placement
624+
for clientResult.connectionProperties.host != leader.advHost ||
625+
clientResult.connectionProperties.port != leader.advPort {
626+
logs.LogDebug("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry",
627+
clientResult.connectionProperties.host,
628+
leader.advHost, leader.advPort)
629+
clientResult.Close()
630+
clientResult = cc.newClientForConsumer(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
631+
err = clientResult.connect()
632+
if err != nil {
633+
return nil, err
634+
}
635+
time.Sleep(1 * time.Second)
636+
}
637+
622638
subscriber, err := clientResult.declareSubscriber(streamName, messagesHandler, options, cleanUp)
623639
if err != nil {
624640
return nil, err
625641
}
626642
return subscriber, nil
627643
}
628644

645+
func (cc *environmentCoordinator) newClientForConsumer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeout time.Duration) *Client {
646+
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
647+
cc.nextId++
648+
cc.clientsPerContext.Store(cc.nextId, clientResult)
649+
return clientResult
650+
}
651+
629652
func (cc *environmentCoordinator) Close() error {
630653
cc.clientsPerContext.Range(func(_, value any) bool {
631654
value.(*Client).coordinator.Close()
@@ -664,7 +687,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
664687
options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) {
665688
ps.mutex.Lock()
666689
defer ps.mutex.Unlock()
667-
leader, err := clientLocator.BrokerLeader(streamName)
690+
leader, err := clientLocator.BrokerLeaderWithResolver(streamName, resolver)
668691
if err != nil {
669692
return nil, err
670693
}

0 commit comments

Comments
 (0)