Skip to content

Commit 45e7c74

Browse files
committed
Prefer replicas over leader for consumer connections
The Go client currently uses equal probability distribution for consumer placement, including the leader node in the candidate pool alongside replicas. This results in consumers connecting to the leader ~33% of the time (with 2 replicas), which violates RabbitMQ Streams best practices that recommend consumers connect to replicas rather than leaders. This change modifies `BrokerForConsumerWithResolver` to only include the leader in the candidate pool when no replicas are available. When replicas exist, consumers randomly select from replicas only, guaranteeing zero probability of leader placement. This matches the .NET client behavior and follows RabbitMQ Streams best practices for consumer placement. The leader is used as a fallback when `availableReplicas == 0`, ensuring consumers can still connect when replicas are unavailable. Use same retry delay as .NET stream client
1 parent b8578b7 commit 45e7c74

File tree

2 files changed

+78
-28
lines changed

2 files changed

+78
-28
lines changed

pkg/stream/client.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,10 @@ func (c *Client) queryPublisherSequence(publisherReference string, stream string
677677
}
678678

679679
func (c *Client) BrokerLeader(stream string) (*Broker, error) {
680+
return c.BrokerLeaderWithResolver(stream, nil)
681+
}
682+
683+
func (c *Client) BrokerLeaderWithResolver(stream string, resolver *AddressResolver) (*Broker, error) {
680684
streamsMetadata := c.metaData(stream)
681685
if streamsMetadata == nil {
682686
return nil, fmt.Errorf("leader error for stream for stream: %s", stream)
@@ -693,6 +697,13 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) {
693697
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
694698
streamMetadata.Leader.advHost = streamMetadata.Leader.Host
695699

700+
// If AddressResolver is configured, use it directly and skip DNS lookup
701+
if resolver != nil {
702+
streamMetadata.Leader.Host = resolver.Host
703+
streamMetadata.Leader.Port = strconv.Itoa(resolver.Port)
704+
return streamMetadata.Leader, nil
705+
}
706+
696707
res := net.Resolver{}
697708
// see: https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/317
698709
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
@@ -738,12 +749,30 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {
738749
}
739750

740751
brokers := make([]*Broker, 0, 1+len(streamMetadata.Replicas))
741-
brokers = append(brokers, streamMetadata.Leader)
752+
753+
// Count available replicas
754+
availableReplicas := 0
755+
for _, replica := range streamMetadata.Replicas {
756+
if replica != nil {
757+
availableReplicas++
758+
}
759+
}
760+
761+
// Only add leader if no replicas are available
762+
if availableReplicas == 0 {
763+
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
764+
streamMetadata.Leader.advHost = streamMetadata.Leader.Host
765+
brokers = append(brokers, streamMetadata.Leader)
766+
}
767+
768+
// Add all available replicas
742769
for idx, replica := range streamMetadata.Replicas {
743770
if replica == nil {
744771
logs.LogWarn("Stream %s replica not ready: %d", stream, idx)
745772
continue
746773
}
774+
replica.advPort = replica.Port
775+
replica.advHost = replica.Host
747776
brokers = append(brokers, replica)
748777
}
749778

pkg/stream/environment.go

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -554,45 +554,30 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
554554
}
555555

556556
if clientResult == nil {
557-
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
557+
clientResult = cc.newClientForConnection(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
558558
}
559559

560+
// try to reconnect in case the socket is closed
560561
err := clientResult.connect()
561562
if err != nil {
562563
return nil, err
563564
}
564565

565-
for clientResult.connectionProperties.host != leader.advHost ||
566-
clientResult.connectionProperties.port != leader.advPort {
567-
logs.LogDebug("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry",
568-
clientResult.connectionProperties.host,
569-
leader.advHost, leader.advPort)
570-
clientResult.Close()
571-
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
572-
err = clientResult.connect()
573-
if err != nil {
574-
return nil, err
575-
}
576-
time.Sleep(1 * time.Second)
566+
clientResult, err = cc.validateBrokerConnection(clientResult, leader,
567+
func() *Client {
568+
return cc.newClientForConnection(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
569+
})
570+
if err != nil {
571+
return nil, err
577572
}
578573

579574
producer, err := clientResult.declarePublisher(streamName, options, cleanUp)
580-
581575
if err != nil {
582576
return nil, err
583577
}
584-
585578
return producer, nil
586579
}
587580

588-
func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
589-
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut)
590-
cc.nextId++
591-
592-
cc.clientsPerContext.Store(cc.nextId, clientResult)
593-
return clientResult
594-
}
595-
596581
func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration,
597582
streamName string, messagesHandler MessagesHandler,
598583
options *ConsumerOptions, rpcTimeout time.Duration, cleanUp func()) (*Consumer, error) {
@@ -608,24 +593,60 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro
608593
return true
609594
})
610595

596+
clientProvidedName := "go-stream-consumer"
597+
if options != nil && options.ClientProvidedName != "" {
598+
clientProvidedName = options.ClientProvidedName
599+
}
600+
611601
if clientResult == nil {
612-
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
613-
cc.nextId++
614-
cc.clientsPerContext.Store(cc.nextId, clientResult)
602+
clientResult = cc.newClientForConnection(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
615603
}
604+
616605
// try to reconnect in case the socket is closed
617606
err := clientResult.connect()
618607
if err != nil {
619608
return nil, err
620609
}
621610

611+
clientResult, err = cc.validateBrokerConnection(clientResult, leader,
612+
func() *Client {
613+
return cc.newClientForConnection(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
614+
})
615+
if err != nil {
616+
return nil, err
617+
}
618+
622619
subscriber, err := clientResult.declareSubscriber(streamName, messagesHandler, options, cleanUp)
623620
if err != nil {
624621
return nil, err
625622
}
626623
return subscriber, nil
627624
}
628625

626+
func (cc *environmentCoordinator) validateBrokerConnection(client *Client, broker *Broker, newClientFunc func() *Client) (*Client, error) {
627+
for client.connectionProperties.host != broker.advHost ||
628+
client.connectionProperties.port != broker.advPort {
629+
logs.LogDebug("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry",
630+
client.connectionProperties.host,
631+
broker.advHost, broker.advPort)
632+
client.Close()
633+
client = newClientFunc()
634+
err := client.connect()
635+
if err != nil {
636+
return nil, err
637+
}
638+
time.Sleep(time.Duration(500+rand.Intn(1000)) * time.Millisecond)
639+
}
640+
return client, nil
641+
}
642+
643+
func (cc *environmentCoordinator) newClientForConnection(connectionName string, broker *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeout time.Duration) *Client {
644+
clientResult := newClient(connectionName, broker, tcpParameters, saslConfiguration, rpcTimeout)
645+
cc.nextId++
646+
cc.clientsPerContext.Store(cc.nextId, clientResult)
647+
return clientResult
648+
}
649+
629650
func (cc *environmentCoordinator) Close() error {
630651
cc.clientsPerContext.Range(func(_, value any) bool {
631652
value.(*Client).coordinator.Close()
@@ -664,7 +685,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
664685
options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) {
665686
ps.mutex.Lock()
666687
defer ps.mutex.Unlock()
667-
leader, err := clientLocator.BrokerLeader(streamName)
688+
leader, err := clientLocator.BrokerLeaderWithResolver(streamName, resolver)
668689
if err != nil {
669690
return nil, err
670691
}

0 commit comments

Comments
 (0)