Skip to content

Commit b7f721a

Browse files
committed
Prefer replicas over leader for consumer connections
The Go client currently uses equal probability distribution for consumer placement, including the leader node in the candidate pool alongside replicas. This results in consumers connecting to the leader ~33% of the time (with 2 replicas), which violates RabbitMQ Streams best practices that recommend consumers connect to replicas rather than leaders. This change modifies `BrokerForConsumerWithResolver` to only include the leader in the candidate pool when no replicas are available. When replicas exist, consumers randomly select from replicas only, guaranteeing zero probability of leader placement. This matches the .NET client behavior and follows RabbitMQ Streams best practices for consumer placement. The leader is used as a fallback when `availableReplicas == 0`, ensuring consumers can still connect when replicas are unavailable. Use same retry delay as .NET stream client
1 parent b8578b7 commit b7f721a

File tree

2 files changed

+65
-18
lines changed

2 files changed

+65
-18
lines changed

pkg/stream/client.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,10 @@ func (c *Client) queryPublisherSequence(publisherReference string, stream string
677677
}
678678

679679
func (c *Client) BrokerLeader(stream string) (*Broker, error) {
680+
return c.BrokerLeaderWithResolver(stream, nil)
681+
}
682+
683+
func (c *Client) BrokerLeaderWithResolver(stream string, resolver *AddressResolver) (*Broker, error) {
680684
streamsMetadata := c.metaData(stream)
681685
if streamsMetadata == nil {
682686
return nil, fmt.Errorf("leader error for stream for stream: %s", stream)
@@ -693,6 +697,13 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) {
693697
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
694698
streamMetadata.Leader.advHost = streamMetadata.Leader.Host
695699

700+
// If AddressResolver is configured, use it directly and skip DNS lookup
701+
if resolver != nil {
702+
streamMetadata.Leader.Host = resolver.Host
703+
streamMetadata.Leader.Port = strconv.Itoa(resolver.Port)
704+
return streamMetadata.Leader, nil
705+
}
706+
696707
res := net.Resolver{}
697708
// see: https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/317
698709
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
@@ -738,12 +749,30 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {
738749
}
739750

740751
brokers := make([]*Broker, 0, 1+len(streamMetadata.Replicas))
741-
brokers = append(brokers, streamMetadata.Leader)
752+
753+
// Count available replicas
754+
availableReplicas := 0
755+
for _, replica := range streamMetadata.Replicas {
756+
if replica != nil {
757+
availableReplicas++
758+
}
759+
}
760+
761+
// Only add leader if no replicas are available
762+
if availableReplicas == 0 {
763+
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
764+
streamMetadata.Leader.advHost = streamMetadata.Leader.Host
765+
brokers = append(brokers, streamMetadata.Leader)
766+
}
767+
768+
// Add all available replicas
742769
for idx, replica := range streamMetadata.Replicas {
743770
if replica == nil {
744771
logs.LogWarn("Stream %s replica not ready: %d", stream, idx)
745772
continue
746773
}
774+
replica.advPort = replica.Port
775+
replica.advHost = replica.Host
747776
brokers = append(brokers, replica)
748777
}
749778

pkg/stream/environment.go

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -554,26 +554,20 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
554554
}
555555

556556
if clientResult == nil {
557-
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
557+
clientResult = cc.newClientForConnection(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
558558
}
559559

560560
err := clientResult.connect()
561561
if err != nil {
562562
return nil, err
563563
}
564564

565-
for clientResult.connectionProperties.host != leader.advHost ||
566-
clientResult.connectionProperties.port != leader.advPort {
567-
logs.LogDebug("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry",
568-
clientResult.connectionProperties.host,
569-
leader.advHost, leader.advPort)
570-
clientResult.Close()
571-
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
572-
err = clientResult.connect()
573-
if err != nil {
574-
return nil, err
575-
}
576-
time.Sleep(1 * time.Second)
565+
clientResult, err = cc.validateBrokerConnection(clientResult, leader,
566+
func() *Client {
567+
return cc.newClientForConnection(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
568+
})
569+
if err != nil {
570+
return nil, err
577571
}
578572

579573
producer, err := clientResult.declarePublisher(streamName, options, cleanUp)
@@ -585,10 +579,26 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
585579
return producer, nil
586580
}
587581

588-
func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
589-
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut)
590-
cc.nextId++
582+
func (cc *environmentCoordinator) validateBrokerConnection(client *Client, broker *Broker, newClientFunc func() *Client) (*Client, error) {
583+
for client.connectionProperties.host != broker.advHost ||
584+
client.connectionProperties.port != broker.advPort {
585+
logs.LogDebug("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry",
586+
client.connectionProperties.host,
587+
broker.advHost, broker.advPort)
588+
client.Close()
589+
client = newClientFunc()
590+
err := client.connect()
591+
if err != nil {
592+
return nil, err
593+
}
594+
time.Sleep(time.Duration(500+rand.Intn(1000)) * time.Millisecond)
595+
}
596+
return client, nil
597+
}
591598

599+
func (cc *environmentCoordinator) newClientForConnection(connectionName string, broker *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeout time.Duration) *Client {
600+
clientResult := newClient(connectionName, broker, tcpParameters, saslConfiguration, rpcTimeout)
601+
cc.nextId++
592602
cc.clientsPerContext.Store(cc.nextId, clientResult)
593603
return clientResult
594604
}
@@ -619,6 +629,14 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro
619629
return nil, err
620630
}
621631

632+
clientResult, err = cc.validateBrokerConnection(clientResult, leader,
633+
func() *Client {
634+
return cc.newClientForConnection(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
635+
})
636+
if err != nil {
637+
return nil, err
638+
}
639+
622640
subscriber, err := clientResult.declareSubscriber(streamName, messagesHandler, options, cleanUp)
623641
if err != nil {
624642
return nil, err
@@ -664,7 +682,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
664682
options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) {
665683
ps.mutex.Lock()
666684
defer ps.mutex.Unlock()
667-
leader, err := clientLocator.BrokerLeader(streamName)
685+
leader, err := clientLocator.BrokerLeaderWithResolver(streamName, resolver)
668686
if err != nil {
669687
return nil, err
670688
}

0 commit comments

Comments
 (0)