From 0cf74cd2f02b679f5ca10c66808ef38abc8dcdbb Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Fri, 18 Jul 2025 23:19:50 +0200 Subject: [PATCH 1/8] refactor: rewrite EligibleLeaderReplicasIntegrationTest with new test infra --- ...EligibleLeaderReplicasIntegrationTest.java | 159 ++++++++---------- .../kafka/common/test/ClusterInstance.java | 17 ++ 2 files changed, 85 insertions(+), 91 deletions(-) diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java index 28c12cf6bceea..87c9339980f8b 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java @@ -15,10 +15,7 @@ * limitations under the License. */ package kafka.server.integration; -import kafka.integration.KafkaServerTestHarness; import kafka.server.KafkaBroker; -import kafka.server.KafkaConfig; -import kafka.utils.Logging; import kafka.utils.TestUtils; import org.apache.kafka.clients.CommonClientConfigs; @@ -41,16 +38,20 @@ import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestInfo; import java.io.File; @@ -65,9 +66,7 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; -import scala.collection.JavaConverters; import scala.collection.Seq; -import scala.collection.mutable.HashMap; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -75,58 +74,35 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging { +public class EligibleLeaderReplicasIntegrationTest { private String bootstrapServer; private String testTopicName; private Admin adminClient; - @Override - public MetadataVersion metadataVersion() { - return MetadataVersion.IBP_4_0_IV1; - } + private final ClusterInstance clusterInstance; - @Override - public Seq generateConfigs() { - List brokerConfigs = new ArrayList<>(); - brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( - 5, // The tests require 4 brokers to host the partition. However, we need the 5th broker to handle the admin client requests. - true, - true, - scala.Option.empty(), - scala.Option.empty(), - scala.Option.empty(), - true, - false, - false, - false, - new HashMap<>(), - 1, - false, - 1, - (short) 4, - 0, - false - ))); - List configs = new ArrayList<>(); - for (Properties props : brokerConfigs) { - configs.add(KafkaConfig.fromProps(props)); - } - return JavaConverters.asScalaBuffer(configs).toSeq(); - } + @ClusterTest( + types = {Type.KRAFT}, + metadataVersion = MetadataVersion.IBP_4_0_IV1, + brokers = 5, + serverProperties = { + @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4"), + } + ) @BeforeEach - @Override public void setUp(TestInfo info) { - super.setUp(info); // create adminClient Properties props = new Properties(); - bootstrapServer = bootstrapServers(listenerName()); + //bootstrapServer = bootstrapServers(listenerName()); + bootstrapServer = clusterInstance.bootstrapServers(listenerName()); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); adminClient = Admin.create(props); adminClient.updateFeatures( - Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, - new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() ); testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); } @@ -136,11 +112,12 @@ public void close() throws Exception { if (adminClient != null) adminClient.close(); } - @Test - public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { + @ClusterTest + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -180,8 +157,8 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); waitUntilOneMessageIsConsumed(consumer); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 2 && elrSize == 1; @@ -193,15 +170,15 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count()); // Restore the min ISR and the previous log should be visible. - startBroker(initialReplicas.get(1).id()); - startBroker(initialReplicas.get(0).id()); + clusterInstance.startBroker(initialReplicas.get(1).id()); + clusterInstance.startBroker(initialReplicas.get(0).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 4 && elrSize == 0; }); waitUntilOneMessageIsConsumed(consumer); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); if (consumer != null) consumer.close(); if (producer != null) producer.close(); } @@ -222,11 +199,12 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { ); } - @Test - public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { + @ClusterTest + public void testElrMemberCanBeElected(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -244,15 +222,15 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx assertEquals(0, topicPartitionInfo.elr().size()); assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 1 && elrSize == 2; }); - killBroker(initialReplicas.get(3).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 0 && elrSize == 3; @@ -270,7 +248,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx int expectLeader = topicPartitionInfo.elr().stream() .filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id(); - startBroker(expectLeader); + clusterInstance.startBroker(expectLeader); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 1 && elrSize == 2; }); @@ -282,7 +260,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx // Start another 2 brokers and the ELR fields should be cleaned. topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) - .forEach(node -> startBroker(node.id())); + .forEach(node -> clusterInstance.startBroker(node.id())); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 3 && elrSize == 0; @@ -293,15 +271,16 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); } } - @Test - public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { + @ClusterTest + public void testElrMemberShouldBeKickOutWhenUncleanShutdown(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -319,10 +298,10 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx assertEquals(0, topicPartitionInfo.elr().size()); assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); - killBroker(initialReplicas.get(3).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 0 && elrSize == 3; @@ -331,9 +310,8 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx .allTopicNames().get().get(testTopicName).partitions().get(0); int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); - KafkaBroker broker = brokers().find(b -> { - return b.config().brokerId() == brokerToBeUncleanShutdown; - }).get(); + KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown).findFirst() + .orElseThrow(() -> new RuntimeException("No broker found")); Seq dirs = broker.logManager().liveLogDirs(); assertEquals(1, dirs.size()); CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); @@ -341,7 +319,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx assertDoesNotThrow(() -> handler.delete()); // After remove the clean shutdown file, the broker should report unclean shutdown during restart. - startBroker(brokerToBeUncleanShutdown); + clusterInstance.startBroker(brokerToBeUncleanShutdown); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 0 && elrSize == 2; }); @@ -350,18 +328,19 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx assertNull(topicPartitionInfo.leader()); assertEquals(1, topicPartitionInfo.lastKnownElr().size()); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); } } /* This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. */ - @Test - public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { + @ClusterTest + public void testLastKnownLeaderShouldBeElectedIfEmptyElr(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -379,10 +358,10 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep assertEquals(0, topicPartitionInfo.elr().size()); assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); - killBroker(initialReplicas.get(3).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 0 && elrSize == 3; @@ -392,20 +371,18 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); Set initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet()); - brokers().foreach(broker -> { + for (KafkaBroker broker : clusterInstance.brokers().values()) { if (initialReplicaSet.contains(broker.config().brokerId())) { Seq dirs = broker.logManager().liveLogDirs(); assertEquals(1, dirs.size()); CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); assertDoesNotThrow(() -> handler.delete()); } - return true; - }); - + } // After remove the clean shutdown file, the broker should report unclean shutdown during restart. topicPartitionInfo.replicas().forEach(replica -> { - if (replica.id() != lastKnownLeader) startBroker(replica.id()); + if (replica.id() != lastKnownLeader) clusterInstance.startBroker(replica.id()); }); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize == 0 && elrSize == 1; @@ -416,7 +393,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep assertEquals(1, topicPartitionInfo.lastKnownElr().size()); // Now if the last known leader goes through unclean shutdown, it will still be elected. - startBroker(lastKnownLeader); + clusterInstance.startBroker(lastKnownLeader); waitForIsrAndElr((isrSize, elrSize) -> { return isrSize > 0 && elrSize == 0; }); @@ -436,7 +413,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep DEFAULT_MAX_WAIT_MS, 100L ); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); } } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index ceb30af6e97b6..299c4c3a8d856 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -71,7 +71,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import scala.collection.Seq; import scala.jdk.javaapi.CollectionConverters; +import scala.collection.JavaConverters; import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; @@ -444,6 +446,21 @@ default List boundPorts() { .map(KafkaBroker::socketServer) .map(s -> s.boundPort(clientListener())) .toList(); + } + + default void restartDeadBrokers() { + for (Map.Entry entry : brokers().entrySet()) { + int brokerId = entry.getKey(); + KafkaBroker broker = entry.getValue(); + + if (broker.isShutdown()) { + startBroker(brokerId); + } + } + } + default String bootstrapServers(ListenerName listenerName){ + Seq brokerSeq = new ArrayList<>(brokers().values()).asScala().toSeq(); + kafka.utils.TestUtils.bootstrapServers(brokerSeq, listenerName); } } From 9dd08e5a35ee5ea3db5c41bbaae92a902ff24fdf Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Mon, 21 Jul 2025 21:06:42 +0200 Subject: [PATCH 2/8] rewrite the logic of ClusterTestDefaults and restartDeadBrokers --- ...EligibleLeaderReplicasIntegrationTest.java | 43 ++++++++++--------- .../kafka/common/test/ClusterInstance.java | 17 +++----- 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java index 87c9339980f8b..00b945707c2c5 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java @@ -43,15 +43,14 @@ import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterConfigProperty; import org.apache.kafka.common.test.api.ClusterTest; -import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; -import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.config.ReplicationConfigs; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; import java.io.File; @@ -74,6 +73,15 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +@ClusterTestDefaults( + brokers = 5, + serverProperties = { + @ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ServerConfigs.BROKER_RACK_DOC, value = "new HashMap<>()"), + @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4"), + } +) public class EligibleLeaderReplicasIntegrationTest { private String bootstrapServer; private String testTopicName; @@ -81,28 +89,21 @@ public class EligibleLeaderReplicasIntegrationTest { private final ClusterInstance clusterInstance; - @ClusterTest( - types = {Type.KRAFT}, - metadataVersion = MetadataVersion.IBP_4_0_IV1, - brokers = 5, - serverProperties = { - @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"), - @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4"), - } - ) + EligibleLeaderReplicasIntegrationTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } @BeforeEach public void setUp(TestInfo info) { // create adminClient Properties props = new Properties(); - //bootstrapServer = bootstrapServers(listenerName()); - bootstrapServer = clusterInstance.bootstrapServers(listenerName()); + bootstrapServer = clusterInstance.bootstrapServers(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); adminClient = Admin.create(props); adminClient.updateFeatures( - Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, - new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions() ); testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); } @@ -113,7 +114,7 @@ public void close() throws Exception { } @ClusterTest - public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); @@ -200,7 +201,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { } @ClusterTest - public void testElrMemberCanBeElected(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); @@ -276,7 +277,7 @@ public void testElrMemberCanBeElected(ClusterInstance clusterInstance) throws Ex } @ClusterTest - public void testElrMemberShouldBeKickOutWhenUncleanShutdown(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); @@ -336,7 +337,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown(ClusterInstance clus This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. */ @ClusterTest - public void testLastKnownLeaderShouldBeElectedIfEmptyElr(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 299c4c3a8d856..11bcd09628924 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -35,6 +35,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; @@ -71,9 +72,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import scala.collection.Seq; import scala.jdk.javaapi.CollectionConverters; -import scala.collection.JavaConverters; import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; @@ -449,18 +448,12 @@ default List boundPorts() { } default void restartDeadBrokers() { + if (brokers().isEmpty()) + throw new KafkaException("Must supply at least one server config."); for (Map.Entry entry : brokers().entrySet()) { - int brokerId = entry.getKey(); - KafkaBroker broker = entry.getValue(); - - if (broker.isShutdown()) { - startBroker(brokerId); + if (entry.getValue().isShutdown()) { + startBroker(entry.getKey()); } } } - - default String bootstrapServers(ListenerName listenerName){ - Seq brokerSeq = new ArrayList<>(brokers().values()).asScala().toSeq(); - kafka.utils.TestUtils.bootstrapServers(brokerSeq, listenerName); - } } From 9aa594e749398fd4eabc458081739d59622269aa Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Tue, 22 Jul 2025 08:29:07 +0200 Subject: [PATCH 3/8] Rewrite the logic of restartDeadBrokers and replace JavaConverters.asScalaBuffer with CollectionConverters.asScala --- .../EligibleLeaderReplicasIntegrationTest.java | 9 +++++---- .../org/apache/kafka/common/test/ClusterInstance.java | 11 ++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java index 00b945707c2c5..fca1a0b8bc979 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java @@ -66,6 +66,7 @@ import java.util.stream.Collectors; import scala.collection.Seq; +import scala.jdk.javaapi.CollectionConverters; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -117,7 +118,7 @@ public void close() throws Exception { public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); @@ -204,7 +205,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); @@ -280,7 +281,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); @@ -340,7 +341,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); + Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 11bcd09628924..1d3a86e0abb42 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -448,12 +448,13 @@ default List boundPorts() { } default void restartDeadBrokers() { - if (brokers().isEmpty()) + if (brokers().isEmpty()){ throw new KafkaException("Must supply at least one server config."); - for (Map.Entry entry : brokers().entrySet()) { - if (entry.getValue().isShutdown()) { - startBroker(entry.getKey()); - } } + brokers().entrySet().foreach(entry -> { + if (!entry.getValue().isShutdown()) { + startBroker(id); + } + }); } } From eb25348a2e170aaca62b739b1bd9bbed9f836ac7 Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Tue, 22 Jul 2025 23:49:30 +0200 Subject: [PATCH 4/8] Move EligibleLeaderReplicasIntegrationTest to server module, rewrite import-control-server.xml, and fix the bug in restartDeadBrokers --- checkstyle/import-control-server.xml | 5 +- ...EligibleLeaderReplicasIntegrationTest.java | 52 +++++++++---------- .../kafka/common/test/ClusterInstance.java | 4 +- 3 files changed, 28 insertions(+), 33 deletions(-) rename {core/src/test/java/kafka/server/integration => server/src/test/java/org/apache/kafka/server}/EligibleLeaderReplicasIntegrationTest.java (92%) diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index b3d1b928cc6db..fe659cc9d38fe 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -32,9 +32,8 @@ - - - + + diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java similarity index 92% rename from core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java rename to server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index fca1a0b8bc979..165f79b3a5c8b 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.server.integration; +package org.apache.kafka.server; + import kafka.server.KafkaBroker; -import kafka.utils.TestUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; @@ -48,6 +48,7 @@ import org.apache.kafka.server.config.ReplicationConfigs; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -66,7 +67,6 @@ import java.util.stream.Collectors; import scala.collection.Seq; -import scala.jdk.javaapi.CollectionConverters; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -118,8 +118,7 @@ public void close() throws Exception { public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); - TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -186,8 +185,8 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc } } - void waitUntilOneMessageIsConsumed(Consumer consumer) { - TestUtils.waitUntilTrue( + void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedException { + TestUtils.waitForCondition( () -> { try { ConsumerRecords record = consumer.poll(Duration.ofMillis(100L)); @@ -196,8 +195,8 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { return false; } }, - () -> "fail to consume messages", - DEFAULT_MAX_WAIT_MS, 100L + DEFAULT_MAX_WAIT_MS, + () -> "fail to consume messages" ); } @@ -205,8 +204,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); - TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -281,8 +279,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); - TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -312,8 +309,8 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx .allTopicNames().get().get(testTopicName).partitions().get(0); int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); - KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown).findFirst() - .orElseThrow(() -> new RuntimeException("No broker found")); + KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown) + .findFirst().get(); Seq dirs = broker.logManager().liveLogDirs(); assertEquals(1, dirs.size()); CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); @@ -341,8 +338,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - Seq brokerSeq = CollectionConverters.asScala(new ArrayList<>(clusterInstance.brokers().values())).toSeq(); - TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -373,14 +369,14 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); Set initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet()); - for (KafkaBroker broker : clusterInstance.brokers().values()) { - if (initialReplicaSet.contains(broker.config().brokerId())) { + clusterInstance.brokers().forEach((id, broker) -> { + if (initialReplicaSet.contains(id)) { Seq dirs = broker.logManager().liveLogDirs(); assertEquals(1, dirs.size()); CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); assertDoesNotThrow(() -> handler.delete()); } - } + }); // After remove the clean shutdown file, the broker should report unclean shutdown during restart. topicPartitionInfo.replicas().forEach(replica -> { @@ -399,8 +395,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep waitForIsrAndElr((isrSize, elrSize) -> { return isrSize > 0 && elrSize == 0; }); - - TestUtils.waitUntilTrue( + TestUtils.waitForCondition( () -> { try { TopicPartitionInfo partition = adminClient.describeTopics(List.of(testTopicName)) @@ -411,16 +406,16 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep return false; } }, - () -> String.format("Partition metadata for %s is not correct", testTopicName), - DEFAULT_MAX_WAIT_MS, 100L + DEFAULT_MAX_WAIT_MS, + () -> String.format("Partition metadata for %s is not correct", testTopicName) ); } finally { clusterInstance.restartDeadBrokers(); } } - void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied) { - TestUtils.waitUntilTrue( + void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied) throws InterruptedException { + TestUtils.waitForCondition( () -> { try { TopicDescription topicDescription = adminClient.describeTopics(List.of(testTopicName)) @@ -431,7 +426,8 @@ void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatis return false; } }, - () -> String.format("Partition metadata for %s is not propagated", testTopicName), - DEFAULT_MAX_WAIT_MS, 100L); + DEFAULT_MAX_WAIT_MS, + () -> String.format("Partition metadata for %s is not propagated", testTopicName) + ); } } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 15a3cfcbd6136..96710f4caa616 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -451,9 +451,9 @@ default void restartDeadBrokers() { if (brokers().isEmpty()){ throw new KafkaException("Must supply at least one server config."); } - brokers().entrySet().foreach(entry -> { + brokers().entrySet().forEach(entry -> { if (!entry.getValue().isShutdown()) { - startBroker(id); + startBroker(entry.getKey()); } }); } From f3d61ef8ca6a3d8f537a2f54b1e6485bad80caf6 Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Wed, 23 Jul 2025 12:29:05 +0200 Subject: [PATCH 5/8] Fix checkstyleTest error --- .../java/org/apache/kafka/common/test/api/ClusterTest.java | 3 +-- .../java/org/apache/kafka/common/test/ClusterInstance.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java index f81f2739907be..cabd2fbf60802 100644 --- a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java +++ b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java @@ -39,8 +39,7 @@ @TestTemplate @Timeout(60) @Tag("integration") -public @interface ClusterTest { - Type[] types() default {}; +public @interface ClusterTest { Type[] types() default {}; int brokers() default 0; int controllers() default 0; int disksPerBroker() default 0; diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 96710f4caa616..1f2cfa08bc26f 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -448,7 +448,7 @@ default List boundPorts() { } default void restartDeadBrokers() { - if (brokers().isEmpty()){ + if (brokers().isEmpty()) { throw new KafkaException("Must supply at least one server config."); } brokers().entrySet().forEach(entry -> { From e68ee42810e0608f69b2f7bb88efeb203df029d9 Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Wed, 23 Jul 2025 12:44:22 +0200 Subject: [PATCH 6/8] Add metadataVersion to ClusterTest --- .../server/EligibleLeaderReplicasIntegrationTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index 165f79b3a5c8b..d4b748dc22c52 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.config.ReplicationConfigs; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; @@ -114,7 +115,7 @@ public void close() throws Exception { if (adminClient != null) adminClient.close(); } - @ClusterTest + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); @@ -200,7 +201,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedExceptio ); } - @ClusterTest + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); @@ -275,7 +276,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx } } - @ClusterTest + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); @@ -334,7 +335,7 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx /* This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. */ - @ClusterTest + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); From 4fd332343031b2c4c37a518066345d1b0e6c56b5 Mon Sep 17 00:00:00 2001 From: jim0987795064 Date: Wed, 23 Jul 2025 13:13:58 +0200 Subject: [PATCH 7/8] replace KafkaException with InterruptedException --- .../java/org/apache/kafka/common/test/api/ClusterTest.java | 3 ++- .../java/org/apache/kafka/common/test/ClusterInstance.java | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java index cabd2fbf60802..f81f2739907be 100644 --- a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java +++ b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java @@ -39,7 +39,8 @@ @TestTemplate @Timeout(60) @Tag("integration") -public @interface ClusterTest { Type[] types() default {}; +public @interface ClusterTest { + Type[] types() default {}; int brokers() default 0; int controllers() default 0; int disksPerBroker() default 0; diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 1f2cfa08bc26f..a1d36ab44dcf9 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -35,7 +35,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; @@ -447,9 +446,9 @@ default List boundPorts() { .toList(); } - default void restartDeadBrokers() { + default void restartDeadBrokers() throws InterruptedException { if (brokers().isEmpty()) { - throw new KafkaException("Must supply at least one server config."); + throw new IllegalArgumentException("Must supply at least one server config."); } brokers().entrySet().forEach(entry -> { if (!entry.getValue().isShutdown()) { From 493a7c7ad52fa572c4171ac4018e0fbf3aacf628 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Thu, 24 Jul 2025 22:22:01 +0200 Subject: [PATCH 8/8] refacotr: Replace Scala Seq with Java List --- checkstyle/import-control-server.xml | 5 +- ...EligibleLeaderReplicasIntegrationTest.java | 64 ++++++------------- .../kafka/common/test/ClusterInstance.java | 2 +- 3 files changed, 24 insertions(+), 47 deletions(-) diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index fe659cc9d38fe..b3d1b928cc6db 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -32,8 +32,9 @@ - - + + + diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index d4b748dc22c52..913362e82eb3a 100644 --- a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.server; -import kafka.server.KafkaBroker; - import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AlterConfigOp; @@ -67,8 +65,6 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; -import scala.collection.Seq; - import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -162,9 +158,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc clusterInstance.shutdownBroker(initialReplicas.get(0).id()); clusterInstance.shutdownBroker(initialReplicas.get(1).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 2 && elrSize == 1; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1); // Now the partition is under min ISR. HWM should not advance. producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); @@ -174,9 +168,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc // Restore the min ISR and the previous log should be visible. clusterInstance.startBroker(initialReplicas.get(1).id()); clusterInstance.startBroker(initialReplicas.get(0).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 4 && elrSize == 0; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 4 && elrSize == 0); waitUntilOneMessageIsConsumed(consumer); } finally { @@ -227,15 +219,11 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx clusterInstance.shutdownBroker(initialReplicas.get(1).id()); clusterInstance.shutdownBroker(initialReplicas.get(2).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 1 && elrSize == 2; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -250,9 +238,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx .filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id(); clusterInstance.startBroker(expectLeader); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 1 && elrSize == 2; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -263,9 +249,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) .forEach(node -> clusterInstance.startBroker(node.id())); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 3 && elrSize == 0; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 3 && elrSize == 0); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -303,26 +287,23 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx clusterInstance.shutdownBroker(initialReplicas.get(2).id()); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); - KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown) + var broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown) .findFirst().get(); - Seq dirs = broker.logManager().liveLogDirs(); + List dirs = new ArrayList<>(); + broker.logManager().liveLogDirs().foreach(dirs::add); assertEquals(1, dirs.size()); - CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.get(0).toString()); assertTrue(handler.exists()); - assertDoesNotThrow(() -> handler.delete()); + assertDoesNotThrow(handler::delete); // After remove the clean shutdown file, the broker should report unclean shutdown during restart. clusterInstance.startBroker(brokerToBeUncleanShutdown); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 2; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 2); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertNull(topicPartitionInfo.leader()); @@ -362,9 +343,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep clusterInstance.shutdownBroker(initialReplicas.get(2).id()); clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); @@ -372,10 +351,11 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep Set initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet()); clusterInstance.brokers().forEach((id, broker) -> { if (initialReplicaSet.contains(id)) { - Seq dirs = broker.logManager().liveLogDirs(); + List dirs = new ArrayList<>(); + broker.logManager().liveLogDirs().foreach(dirs::add); assertEquals(1, dirs.size()); - CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); - assertDoesNotThrow(() -> handler.delete()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.get(0).toString()); + assertDoesNotThrow(handler::delete); } }); @@ -383,9 +363,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep topicPartitionInfo.replicas().forEach(replica -> { if (replica.id() != lastKnownLeader) clusterInstance.startBroker(replica.id()); }); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 1; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 1); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertNull(topicPartitionInfo.leader()); @@ -393,9 +371,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep // Now if the last known leader goes through unclean shutdown, it will still be elected. clusterInstance.startBroker(lastKnownLeader); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize > 0 && elrSize == 0; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize > 0 && elrSize == 0); TestUtils.waitForCondition( () -> { try { diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index a1d36ab44dcf9..32871e5e2926e 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -451,7 +451,7 @@ default void restartDeadBrokers() throws InterruptedException { throw new IllegalArgumentException("Must supply at least one server config."); } brokers().entrySet().forEach(entry -> { - if (!entry.getValue().isShutdown()) { + if (entry.getValue().isShutdown()) { startBroker(entry.getKey()); } });