Skip to content

Commit 502140e

Browse files
committed
Prefer replicas over leader for consumer connections
The Go client currently uses equal probability distribution for consumer placement, including the leader node in the candidate pool alongside replicas. This results in consumers connecting to the leader ~33% of the time (with 2 replicas), which violates RabbitMQ Streams best practices that recommend consumers connect to replicas rather than leaders. This change modifies `BrokerForConsumerWithResolver` to only include the leader in the candidate pool when no replicas are available. When replicas exist, consumers randomly select from replicas only, guaranteeing zero probability of leader placement. This matches the .NET client behavior and follows RabbitMQ Streams best practices for consumer placement. The leader is used as a fallback when `availableReplicas == 0`, ensuring consumers can still connect when replicas are unavailable. Use same retry delay as .NET stream client
1 parent b8578b7 commit 502140e

File tree

4 files changed

+89
-56
lines changed

4 files changed

+89
-56
lines changed

pkg/stream/client.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,10 @@ func (c *Client) queryPublisherSequence(publisherReference string, stream string
677677
}
678678

679679
func (c *Client) BrokerLeader(stream string) (*Broker, error) {
680+
return c.BrokerLeaderWithResolver(stream, nil)
681+
}
682+
683+
func (c *Client) BrokerLeaderWithResolver(stream string, resolver *AddressResolver) (*Broker, error) {
680684
streamsMetadata := c.metaData(stream)
681685
if streamsMetadata == nil {
682686
return nil, fmt.Errorf("leader error for stream for stream: %s", stream)
@@ -693,6 +697,13 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) {
693697
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
694698
streamMetadata.Leader.advHost = streamMetadata.Leader.Host
695699

700+
// If AddressResolver is configured, use it directly and skip DNS lookup
701+
if resolver != nil {
702+
streamMetadata.Leader.Host = resolver.Host
703+
streamMetadata.Leader.Port = strconv.Itoa(resolver.Port)
704+
return streamMetadata.Leader, nil
705+
}
706+
696707
res := net.Resolver{}
697708
// see: https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/317
698709
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
@@ -738,12 +749,30 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {
738749
}
739750

740751
brokers := make([]*Broker, 0, 1+len(streamMetadata.Replicas))
741-
brokers = append(brokers, streamMetadata.Leader)
752+
753+
// Count available replicas
754+
availableReplicas := 0
755+
for _, replica := range streamMetadata.Replicas {
756+
if replica != nil {
757+
availableReplicas++
758+
}
759+
}
760+
761+
// Only add leader if no replicas are available
762+
if availableReplicas == 0 {
763+
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
764+
streamMetadata.Leader.advHost = streamMetadata.Leader.Host
765+
brokers = append(brokers, streamMetadata.Leader)
766+
}
767+
768+
// Add all available replicas
742769
for idx, replica := range streamMetadata.Replicas {
743770
if replica == nil {
744771
logs.LogWarn("Stream %s replica not ready: %d", stream, idx)
745772
continue
746773
}
774+
replica.advPort = replica.Port
775+
replica.advHost = replica.Host
747776
brokers = append(brokers, replica)
748777
}
749778

pkg/stream/consumer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,10 @@ func (c *ConsumerOptions) SetClientProvidedName(clientProvidedName string) *Cons
312312
return c
313313
}
314314

315+
func (c *ConsumerOptions) GetClientProvidedName() string {
316+
return c.ClientProvidedName
317+
}
318+
315319
func (c *ConsumerOptions) SetFilter(filter *ConsumerFilter) *ConsumerOptions {
316320
c.Filter = filter
317321
return c

pkg/stream/environment.go

Lines changed: 51 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,10 @@ func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *Envi
457457
return envOptions
458458
}
459459

460+
type clientOptions interface {
461+
GetClientProvidedName() string
462+
}
463+
460464
type environmentCoordinator struct {
461465
mutex *sync.Mutex
462466
clientsPerContext sync.Map
@@ -535,95 +539,87 @@ func (c *Client) maybeCleanConsumers(streamName string) {
535539
})
536540
}
537541

538-
func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, options *ProducerOptions, rpcTimeout time.Duration, cleanUp func()) (*Producer, error) {
542+
func (cc *environmentCoordinator) newClientEntity(
543+
isListFull func(int) bool,
544+
defaultClientName string,
545+
leader *Broker,
546+
tcpParameters *TCPParameters,
547+
saslConfiguration *SaslConfiguration,
548+
options clientOptions,
549+
rpcTimeout time.Duration,
550+
) (*Client, error) {
539551
cc.mutex.Lock()
540552
defer cc.mutex.Unlock()
541553
var clientResult *Client
542554

543555
cc.clientsPerContext.Range(func(key, value any) bool {
544-
if !cc.isProducerListFull(key.(int)) {
556+
if !isListFull(key.(int)) {
545557
clientResult = value.(*Client)
546558
return false
547559
}
548560
return true
549561
})
550562

551-
clientProvidedName := "go-stream-producer"
552-
if options != nil && options.ClientProvidedName != "" {
553-
clientProvidedName = options.ClientProvidedName
563+
clientProvidedName := defaultClientName
564+
if options != nil && options.GetClientProvidedName() != "" {
565+
clientProvidedName = options.GetClientProvidedName()
554566
}
555567

556568
if clientResult == nil {
557-
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
569+
clientResult = cc.newClientForConnection(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
558570
}
559571

560572
err := clientResult.connect()
561573
if err != nil {
562574
return nil, err
563575
}
564576

565-
for clientResult.connectionProperties.host != leader.advHost ||
566-
clientResult.connectionProperties.port != leader.advPort {
567-
logs.LogDebug("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry",
568-
clientResult.connectionProperties.host,
569-
leader.advHost, leader.advPort)
570-
clientResult.Close()
571-
clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
572-
err = clientResult.connect()
573-
if err != nil {
574-
return nil, err
575-
}
576-
time.Sleep(1 * time.Second)
577-
}
578-
579-
producer, err := clientResult.declarePublisher(streamName, options, cleanUp)
577+
return cc.validateBrokerConnection(clientResult, leader,
578+
func() *Client {
579+
return cc.newClientForConnection(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout)
580+
})
581+
}
580582

583+
func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, options *ProducerOptions, rpcTimeout time.Duration, cleanUp func()) (*Producer, error) {
584+
client, err := cc.newClientEntity(cc.isProducerListFull, "go-stream-producer", leader, tcpParameters, saslConfiguration, options, rpcTimeout)
581585
if err != nil {
582586
return nil, err
583587
}
584-
585-
return producer, nil
586-
}
587-
588-
func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client {
589-
clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut)
590-
cc.nextId++
591-
592-
cc.clientsPerContext.Store(cc.nextId, clientResult)
593-
return clientResult
588+
return client.declarePublisher(streamName, options, cleanUp)
594589
}
595590

596591
func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration,
597592
streamName string, messagesHandler MessagesHandler,
598593
options *ConsumerOptions, rpcTimeout time.Duration, cleanUp func()) (*Consumer, error) {
599-
cc.mutex.Lock()
600-
defer cc.mutex.Unlock()
601-
var clientResult *Client
602-
603-
cc.clientsPerContext.Range(func(key, value any) bool {
604-
if !cc.isConsumerListFull(key.(int)) {
605-
clientResult = value.(*Client)
606-
return false
607-
}
608-
return true
609-
})
610-
611-
if clientResult == nil {
612-
clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout)
613-
cc.nextId++
614-
cc.clientsPerContext.Store(cc.nextId, clientResult)
615-
}
616-
// try to reconnect in case the socket is closed
617-
err := clientResult.connect()
594+
client, err := cc.newClientEntity(cc.isConsumerListFull, "go-stream-consumer", leader, tcpParameters, saslConfiguration, options, rpcTimeout)
618595
if err != nil {
619596
return nil, err
620597
}
598+
return client.declareSubscriber(streamName, messagesHandler, options, cleanUp)
599+
}
621600

622-
subscriber, err := clientResult.declareSubscriber(streamName, messagesHandler, options, cleanUp)
623-
if err != nil {
624-
return nil, err
601+
func (cc *environmentCoordinator) validateBrokerConnection(client *Client, broker *Broker, newClientFunc func() *Client) (*Client, error) {
602+
for client.connectionProperties.host != broker.advHost ||
603+
client.connectionProperties.port != broker.advPort {
604+
logs.LogDebug("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry",
605+
client.connectionProperties.host,
606+
broker.advHost, broker.advPort)
607+
client.Close()
608+
client = newClientFunc()
609+
err := client.connect()
610+
if err != nil {
611+
return nil, err
612+
}
613+
time.Sleep(time.Duration(500+rand.Intn(1000)) * time.Millisecond)
625614
}
626-
return subscriber, nil
615+
return client, nil
616+
}
617+
618+
func (cc *environmentCoordinator) newClientForConnection(connectionName string, broker *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeout time.Duration) *Client {
619+
clientResult := newClient(connectionName, broker, tcpParameters, saslConfiguration, rpcTimeout)
620+
cc.nextId++
621+
cc.clientsPerContext.Store(cc.nextId, clientResult)
622+
return clientResult
627623
}
628624

629625
func (cc *environmentCoordinator) Close() error {
@@ -664,7 +660,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
664660
options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) {
665661
ps.mutex.Lock()
666662
defer ps.mutex.Unlock()
667-
leader, err := clientLocator.BrokerLeader(streamName)
663+
leader, err := clientLocator.BrokerLeaderWithResolver(streamName, resolver)
668664
if err != nil {
669665
return nil, err
670666
}

pkg/stream/producer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,10 @@ func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions {
189189
return po
190190
}
191191

192+
func (po *ProducerOptions) GetClientProvidedName() string {
193+
return po.ClientProvidedName
194+
}
195+
192196
// SetFilter sets the filter for the producer. See ProducerOptions.Filter for more details
193197
func (po *ProducerOptions) SetFilter(filter *ProducerFilter) *ProducerOptions {
194198
po.Filter = filter

0 commit comments

Comments
 (0)