@@ -806,120 +806,37 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
806
806
"Topic has an existing exclusive producer: " + exclusiveProducerName ));
807
807
} else if (!producers .isEmpty ()) {
808
808
return FutureUtil .failedFuture (new ProducerFencedException ("Topic has existing shared producers" ));
809
- } else if (producer .getTopicEpoch ().isPresent ()
810
- && producer .getTopicEpoch ().get () < topicEpoch .orElse (-1L )) {
811
- // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
812
- // to be fenced, because a new producer had been present in between.
813
- return FutureUtil .failedFuture (new ProducerFencedException (
814
- String .format ("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d" ,
815
- topicEpoch .get (), producer .getTopicEpoch ().get ())));
816
- } else {
817
- // There are currently no existing producers
818
- hasExclusiveProducer = true ;
819
- exclusiveProducerName = producer .getProducerName ();
820
-
821
- CompletableFuture <Long > future ;
822
- if (producer .getTopicEpoch ().isPresent ()) {
823
- future = setTopicEpoch (producer .getTopicEpoch ().get ());
824
- } else {
825
- future = incrementTopicEpoch (topicEpoch );
826
- }
827
- future .exceptionally (ex -> {
828
- hasExclusiveProducer = false ;
829
- exclusiveProducerName = null ;
830
- return null ;
809
+ }
810
+ return handleTopicEpochForExclusiveProducer (producer );
811
+ case ExclusiveWithFencing :
812
+ if (hasExclusiveProducer || !producers .isEmpty ()) {
813
+ // clear all waiting producers
814
+ // otherwise closing any producer will trigger the promotion
815
+ // of the next pending producer
816
+ List <Pair <Producer , CompletableFuture <Optional <Long >>>> waitingExclusiveProducersCopy =
817
+ new ArrayList <>(waitingExclusiveProducers );
818
+ waitingExclusiveProducers .clear ();
819
+ waitingExclusiveProducersCopy .forEach ((Pair <Producer ,
820
+ CompletableFuture <Optional <Long >>> handle ) -> {
821
+ log .info ("[{}] Failing waiting producer {}" , topic , handle .getKey ());
822
+ handle .getValue ().completeExceptionally (new ProducerFencedException ("Fenced out" ));
823
+ handle .getKey ().close (true );
831
824
});
832
-
833
- return future .thenApply (epoch -> {
834
- topicEpoch = Optional .of (epoch );
835
- return topicEpoch ;
825
+ producers .forEach ((k , currentProducer ) -> {
826
+ log .info ("[{}] Fencing out producer {}" , topic , currentProducer );
827
+ currentProducer .close (true );
836
828
});
837
829
}
838
- case ExclusiveWithFencing :
839
- if (hasExclusiveProducer || !producers .isEmpty ()) {
840
- // clear all waiting producers
841
- // otherwise closing any producer will trigger the promotion
842
- // of the next pending producer
843
- List <Pair <Producer , CompletableFuture <Optional <Long >>>> waitingExclusiveProducersCopy =
844
- new ArrayList <>(waitingExclusiveProducers );
845
- waitingExclusiveProducers .clear ();
846
- waitingExclusiveProducersCopy .forEach ((Pair <Producer ,
847
- CompletableFuture <Optional <Long >>> handle ) -> {
848
- log .info ("[{}] Failing waiting producer {}" , topic , handle .getKey ());
849
- handle .getValue ().completeExceptionally (new ProducerFencedException ("Fenced out" ));
850
- handle .getKey ().close (true );
851
- });
852
- producers .forEach ((k , currentProducer ) -> {
853
- log .info ("[{}] Fencing out producer {}" , topic , currentProducer );
854
- currentProducer .close (true );
855
- });
856
- }
857
- if (producer .getTopicEpoch ().isPresent ()
858
- && producer .getTopicEpoch ().get () < topicEpoch .orElse (-1L )) {
859
- // If a producer reconnects, but all the topic epoch has already moved forward,
860
- // this producer needs to be fenced, because a new producer had been present in between.
861
- hasExclusiveProducer = false ;
862
- return FutureUtil .failedFuture (new ProducerFencedException (
863
- String .format ("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d" ,
864
- topicEpoch .get (), producer .getTopicEpoch ().get ())));
865
- } else {
866
- // There are currently no existing producers
867
- hasExclusiveProducer = true ;
868
- exclusiveProducerName = producer .getProducerName ();
869
-
870
- CompletableFuture <Long > future ;
871
- if (producer .getTopicEpoch ().isPresent ()) {
872
- future = setTopicEpoch (producer .getTopicEpoch ().get ());
873
- } else {
874
- future = incrementTopicEpoch (topicEpoch );
875
- }
876
- future .exceptionally (ex -> {
877
- hasExclusiveProducer = false ;
878
- exclusiveProducerName = null ;
879
- return null ;
880
- });
881
-
882
- return future .thenApply (epoch -> {
883
- topicEpoch = Optional .of (epoch );
884
- return topicEpoch ;
885
- });
886
- }
830
+ return handleTopicEpochForExclusiveProducer (producer );
887
831
case WaitForExclusive : {
888
832
if (hasExclusiveProducer || !producers .isEmpty ()) {
889
833
CompletableFuture <Optional <Long >> future = new CompletableFuture <>();
890
834
log .info ("[{}] Queuing producer {} since there's already a producer" , topic , producer );
891
835
waitingExclusiveProducers .add (Pair .of (producer , future ));
892
836
producerQueuedFuture .complete (null );
893
837
return future ;
894
- } else if (producer .getTopicEpoch ().isPresent ()
895
- && producer .getTopicEpoch ().get () < topicEpoch .orElse (-1L )) {
896
- // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
897
- // to be fenced, because a new producer had been present in between.
898
- return FutureUtil .failedFuture (new ProducerFencedException (
899
- String .format ("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d" ,
900
- topicEpoch .get (), producer .getTopicEpoch ().get ())));
901
- } else {
902
- // There are currently no existing producers
903
- hasExclusiveProducer = true ;
904
- exclusiveProducerName = producer .getProducerName ();
905
-
906
- CompletableFuture <Long > future ;
907
- if (producer .getTopicEpoch ().isPresent ()) {
908
- future = setTopicEpoch (producer .getTopicEpoch ().get ());
909
- } else {
910
- future = incrementTopicEpoch (topicEpoch );
911
- }
912
- future .exceptionally (ex -> {
913
- hasExclusiveProducer = false ;
914
- exclusiveProducerName = null ;
915
- return null ;
916
- });
917
-
918
- return future .thenApply (epoch -> {
919
- topicEpoch = Optional .of (epoch );
920
- return topicEpoch ;
921
- });
922
838
}
839
+ return handleTopicEpochForExclusiveProducer (producer );
923
840
}
924
841
925
842
default :
@@ -935,6 +852,37 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
935
852
}
936
853
}
937
854
855
+ private CompletableFuture <Optional <Long >> handleTopicEpochForExclusiveProducer (Producer producer ) {
856
+ if (producer .getTopicEpoch ().isPresent ()
857
+ && producer .getTopicEpoch ().get () < topicEpoch .orElse (-1L )) {
858
+ // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
859
+ // to be fenced, because a new producer had been present in between.
860
+ return FutureUtil .failedFuture (new ProducerFencedException (
861
+ String .format ("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d" ,
862
+ topicEpoch .get (), producer .getTopicEpoch ().get ())));
863
+ }
864
+ // There are currently no existing producers
865
+ hasExclusiveProducer = true ;
866
+ exclusiveProducerName = producer .getProducerName ();
867
+
868
+ CompletableFuture <Long > future ;
869
+ if (producer .getTopicEpoch ().isPresent ()) {
870
+ future = setTopicEpoch (producer .getTopicEpoch ().get ());
871
+ } else {
872
+ future = incrementTopicEpoch (topicEpoch );
873
+ }
874
+ future .exceptionally (ex -> {
875
+ hasExclusiveProducer = false ;
876
+ exclusiveProducerName = null ;
877
+ return null ;
878
+ });
879
+
880
+ return future .thenApply (epoch -> {
881
+ topicEpoch = Optional .of (epoch );
882
+ return topicEpoch ;
883
+ });
884
+ }
885
+
938
886
protected abstract CompletableFuture <Long > setTopicEpoch (long newEpoch );
939
887
940
888
protected abstract CompletableFuture <Long > incrementTopicEpoch (Optional <Long > currentEpoch );
0 commit comments