@@ -457,6 +457,10 @@ func (envOptions *EnvironmentOptions) SetRPCTimeout(timeout time.Duration) *Envi
457457 return envOptions
458458}
459459
460+ type clientOptions interface {
461+ GetClientProvidedName () string
462+ }
463+
460464type environmentCoordinator struct {
461465 mutex * sync.Mutex
462466 clientsPerContext sync.Map
@@ -535,95 +539,88 @@ func (c *Client) maybeCleanConsumers(streamName string) {
535539 })
536540}
537541
538- func (cc * environmentCoordinator ) newProducer (leader * Broker , tcpParameters * TCPParameters , saslConfiguration * SaslConfiguration , streamName string , options * ProducerOptions , rpcTimeout time.Duration , cleanUp func ()) (* Producer , error ) {
542+ func (cc * environmentCoordinator ) newClientEntity (
543+ isListFull func (int ) bool ,
544+ defaultClientName string ,
545+ leader * Broker ,
546+ tcpParameters * TCPParameters ,
547+ saslConfiguration * SaslConfiguration ,
548+ options clientOptions ,
549+ rpcTimeout time.Duration ,
550+ ) (* Client , error ) {
539551 cc .mutex .Lock ()
540552 defer cc .mutex .Unlock ()
541553 var clientResult * Client
542554
543555 cc .clientsPerContext .Range (func (key , value any ) bool {
544- if ! cc . isProducerListFull (key .(int )) {
556+ if ! isListFull (key .(int )) {
545557 clientResult = value .(* Client )
546558 return false
547559 }
548560 return true
549561 })
550562
551- clientProvidedName := "go-stream-producer"
552- if options != nil && options .ClientProvidedName != "" {
553- clientProvidedName = options .ClientProvidedName
563+ clientProvidedName := defaultClientName
564+ if options != nil && options .GetClientProvidedName () != "" {
565+ clientProvidedName = options .GetClientProvidedName ()
554566 }
555567
556568 if clientResult == nil {
557- clientResult = cc .newClientForProducer (clientProvidedName , leader , tcpParameters , saslConfiguration , rpcTimeout )
569+ clientResult = cc .newClientForConnection (clientProvidedName , leader , tcpParameters , saslConfiguration , rpcTimeout )
558570 }
559571
560572 err := clientResult .connect ()
561573 if err != nil {
562574 return nil , err
563575 }
564576
565- for clientResult .connectionProperties .host != leader .advHost ||
566- clientResult .connectionProperties .port != leader .advPort {
567- logs .LogDebug ("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry" ,
568- clientResult .connectionProperties .host ,
569- leader .advHost , leader .advPort )
570- clientResult .Close ()
571- clientResult = cc .newClientForProducer (clientProvidedName , leader , tcpParameters , saslConfiguration , rpcTimeout )
572- err = clientResult .connect ()
573- if err != nil {
574- return nil , err
575- }
576- time .Sleep (1 * time .Second )
577- }
578-
579- producer , err := clientResult .declarePublisher (streamName , options , cleanUp )
577+ return cc .validateBrokerConnection (clientResult , leader ,
578+ func () * Client {
579+ return cc .newClientForConnection (clientProvidedName , leader , tcpParameters , saslConfiguration , rpcTimeout )
580+ })
581+ }
580582
583+ func (cc * environmentCoordinator ) newProducer (leader * Broker , tcpParameters * TCPParameters , saslConfiguration * SaslConfiguration , streamName string , options * ProducerOptions , rpcTimeout time.Duration , cleanUp func ()) (* Producer , error ) {
584+ client , err := cc .newClientEntity (cc .isProducerListFull , "go-stream-producer" , leader , tcpParameters , saslConfiguration , options , rpcTimeout )
581585 if err != nil {
582586 return nil , err
583587 }
584-
585- return producer , nil
588+ return client .declarePublisher (streamName , options , cleanUp )
586589}
587590
588- func (cc * environmentCoordinator ) newClientForProducer (connectionName string , leader * Broker , tcpParameters * TCPParameters , saslConfiguration * SaslConfiguration , rpcTimeOut time.Duration ) * Client {
589- clientResult := newClient (connectionName , leader , tcpParameters , saslConfiguration , rpcTimeOut )
590- cc .nextId ++
591-
592- cc .clientsPerContext .Store (cc .nextId , clientResult )
593- return clientResult
594- }
595-
596- func (cc * environmentCoordinator ) newConsumer (connectionName string , leader * Broker , tcpParameters * TCPParameters , saslConfiguration * SaslConfiguration ,
591+ func (cc * environmentCoordinator ) newConsumer (leader * Broker , tcpParameters * TCPParameters , saslConfiguration * SaslConfiguration ,
597592 streamName string , messagesHandler MessagesHandler ,
598593 options * ConsumerOptions , rpcTimeout time.Duration , cleanUp func ()) (* Consumer , error ) {
599- cc .mutex .Lock ()
600- defer cc .mutex .Unlock ()
601- var clientResult * Client
602-
603- cc .clientsPerContext .Range (func (key , value any ) bool {
604- if ! cc .isConsumerListFull (key .(int )) {
605- clientResult = value .(* Client )
606- return false
607- }
608- return true
609- })
610-
611- if clientResult == nil {
612- clientResult = newClient (connectionName , leader , tcpParameters , saslConfiguration , rpcTimeout )
613- cc .nextId ++
614- cc .clientsPerContext .Store (cc .nextId , clientResult )
615- }
616- // try to reconnect in case the socket is closed
617- err := clientResult .connect ()
594+ client , err := cc .newClientEntity (cc .isConsumerListFull , "go-stream-consumer" , leader , tcpParameters , saslConfiguration , options , rpcTimeout )
618595 if err != nil {
619596 return nil , err
620597 }
621598
622- subscriber , err := clientResult .declareSubscriber (streamName , messagesHandler , options , cleanUp )
623- if err != nil {
624- return nil , err
599+ return client .declareSubscriber (streamName , messagesHandler , options , cleanUp )
600+ }
601+
602+ func (cc * environmentCoordinator ) validateBrokerConnection (client * Client , broker * Broker , newClientFunc func () * Client ) (* Client , error ) {
603+ for client .connectionProperties .host != broker .advHost ||
604+ client .connectionProperties .port != broker .advPort {
605+ logs .LogDebug ("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry" ,
606+ client .connectionProperties .host ,
607+ broker .advHost , broker .advPort )
608+ client .Close ()
609+ client = newClientFunc ()
610+ err := client .connect ()
611+ if err != nil {
612+ return nil , err
613+ }
614+ time .Sleep (time .Duration (500 + rand .Intn (1000 )) * time .Millisecond )
625615 }
626- return subscriber , nil
616+ return client , nil
617+ }
618+
619+ func (cc * environmentCoordinator ) newClientForConnection (connectionName string , broker * Broker , tcpParameters * TCPParameters , saslConfiguration * SaslConfiguration , rpcTimeout time.Duration ) * Client {
620+ clientResult := newClient (connectionName , broker , tcpParameters , saslConfiguration , rpcTimeout )
621+ cc .nextId ++
622+ cc .clientsPerContext .Store (cc .nextId , clientResult )
623+ return clientResult
627624}
628625
629626func (cc * environmentCoordinator ) Close () error {
@@ -664,10 +661,12 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
664661 options * ProducerOptions , resolver * AddressResolver , rpcTimeOut time.Duration ) (* Producer , error ) {
665662 ps .mutex .Lock ()
666663 defer ps .mutex .Unlock ()
667- leader , err := clientLocator .BrokerLeader (streamName )
664+
665+ leader , err := clientLocator .BrokerLeaderWithResolver (streamName , resolver )
668666 if err != nil {
669667 return nil , err
670668 }
669+
671670 coordinatorKey := leader .hostPort ()
672671 if ps .producersCoordinator [coordinatorKey ] == nil {
673672 ps .producersCoordinator [coordinatorKey ] = & environmentCoordinator {
@@ -677,13 +676,15 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
677676 nextId : 0 ,
678677 }
679678 }
679+
680680 leader .cloneFrom (clientLocator .broker , resolver )
681681
682682 cleanUp := func () {
683683 for _ , coordinator := range ps .producersCoordinator {
684684 coordinator .maybeCleanClients ()
685685 }
686686 }
687+
687688 producer , err := ps .producersCoordinator [coordinatorKey ].newProducer (leader , clientLocator .tcpParameters ,
688689 clientLocator .saslConfiguration , streamName , options , rpcTimeOut , cleanUp )
689690 if err != nil {
@@ -728,10 +729,12 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName
728729 consumerOptions * ConsumerOptions , resolver * AddressResolver , rpcTimeout time.Duration ) (* Consumer , error ) {
729730 ps .mutex .Lock ()
730731 defer ps .mutex .Unlock ()
732+
731733 consumerBroker , err := clientLocator .BrokerForConsumer (streamName )
732734 if err != nil {
733735 return nil , err
734736 }
737+
735738 coordinatorKey := consumerBroker .hostPort ()
736739 if ps .consumersCoordinator [coordinatorKey ] == nil {
737740 ps .consumersCoordinator [coordinatorKey ] = & environmentCoordinator {
@@ -741,19 +744,17 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName
741744 nextId : 0 ,
742745 }
743746 }
747+
744748 consumerBroker .cloneFrom (clientLocator .broker , resolver )
745- clientProvidedName := "go-stream-consumer"
746- if consumerOptions != nil && consumerOptions .ClientProvidedName != "" {
747- clientProvidedName = consumerOptions .ClientProvidedName
748- }
749+
749750 cleanUp := func () {
750751 for _ , coordinator := range ps .consumersCoordinator {
751752 coordinator .maybeCleanClients ()
752753 }
753754 }
754755
755756 consumer , err := ps .consumersCoordinator [coordinatorKey ].
756- newConsumer (clientProvidedName , consumerBroker , clientLocator .tcpParameters ,
757+ newConsumer (consumerBroker , clientLocator .tcpParameters ,
757758 clientLocator .saslConfiguration ,
758759 streamName , messagesHandler , consumerOptions , rpcTimeout , cleanUp )
759760 if err != nil {
0 commit comments