diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java similarity index 73% rename from core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java rename to server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java index 28c12cf6bceea..913362e82eb3a 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -14,12 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.server.integration; -import kafka.integration.KafkaServerTestHarness; -import kafka.server.KafkaBroker; -import kafka.server.KafkaConfig; -import kafka.utils.Logging; -import kafka.utils.TestUtils; +package org.apache.kafka.server; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; @@ -41,16 +36,21 @@ import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import java.io.File; @@ -65,62 +65,37 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; -import scala.collection.JavaConverters; -import scala.collection.Seq; -import scala.collection.mutable.HashMap; - import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging { +@ClusterTestDefaults( + brokers = 5, + serverProperties = { + @ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ServerConfigs.BROKER_RACK_DOC, value = "new HashMap<>()"), + @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4"), + } +) +public class EligibleLeaderReplicasIntegrationTest { private String bootstrapServer; private String testTopicName; private Admin adminClient; - @Override - public MetadataVersion metadataVersion() { - return MetadataVersion.IBP_4_0_IV1; - } + private final ClusterInstance clusterInstance; - @Override - public Seq generateConfigs() { - List brokerConfigs = new ArrayList<>(); - brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( - 5, // The tests require 4 brokers to host the partition. However, we need the 5th broker to handle the admin client requests. - true, - true, - scala.Option.empty(), - scala.Option.empty(), - scala.Option.empty(), - true, - false, - false, - false, - new HashMap<>(), - 1, - false, - 1, - (short) 4, - 0, - false - ))); - List configs = new ArrayList<>(); - for (Properties props : brokerConfigs) { - configs.add(KafkaConfig.fromProps(props)); - } - return JavaConverters.asScalaBuffer(configs).toSeq(); + EligibleLeaderReplicasIntegrationTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; } @BeforeEach - @Override public void setUp(TestInfo info) { - super.setUp(info); // create adminClient Properties props = new Properties(); - bootstrapServer = bootstrapServers(listenerName()); + bootstrapServer = clusterInstance.bootstrapServers(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); adminClient = Admin.create(props); adminClient.updateFeatures( @@ -136,11 +111,11 @@ public void close() throws Exception { if (adminClient != null) adminClient.close(); } - @Test + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -180,12 +155,10 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); waitUntilOneMessageIsConsumed(consumer); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 2 && elrSize == 1; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1); // Now the partition is under min ISR. HWM should not advance. producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); @@ -193,22 +166,20 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count()); // Restore the min ISR and the previous log should be visible. - startBroker(initialReplicas.get(1).id()); - startBroker(initialReplicas.get(0).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 4 && elrSize == 0; - }); + clusterInstance.startBroker(initialReplicas.get(1).id()); + clusterInstance.startBroker(initialReplicas.get(0).id()); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 4 && elrSize == 0); waitUntilOneMessageIsConsumed(consumer); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); if (consumer != null) consumer.close(); if (producer != null) producer.close(); } } - void waitUntilOneMessageIsConsumed(Consumer consumer) { - TestUtils.waitUntilTrue( + void waitUntilOneMessageIsConsumed(Consumer consumer) throws InterruptedException { + TestUtils.waitForCondition( () -> { try { ConsumerRecords record = consumer.poll(Duration.ofMillis(100L)); @@ -217,16 +188,16 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) { return false; } }, - () -> "fail to consume messages", - DEFAULT_MAX_WAIT_MS, 100L + DEFAULT_MAX_WAIT_MS, + () -> "fail to consume messages" ); } - @Test + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -244,19 +215,15 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx assertEquals(0, topicPartitionInfo.elr().size()); assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 1 && elrSize == 2; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2); - killBroker(initialReplicas.get(3).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -270,10 +237,8 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx int expectLeader = topicPartitionInfo.elr().stream() .filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id(); - startBroker(expectLeader); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 1 && elrSize == 2; - }); + clusterInstance.startBroker(expectLeader); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); @@ -282,26 +247,24 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx // Start another 2 brokers and the ELR fields should be cleaned. topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) - .forEach(node -> startBroker(node.id())); + .forEach(node -> clusterInstance.startBroker(node.id())); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 3 && elrSize == 0; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 3 && elrSize == 0); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); } } - @Test + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -319,49 +282,45 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx assertEquals(0, topicPartitionInfo.elr().size()); assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); - killBroker(initialReplicas.get(3).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); - KafkaBroker broker = brokers().find(b -> { - return b.config().brokerId() == brokerToBeUncleanShutdown; - }).get(); - Seq dirs = broker.logManager().liveLogDirs(); + var broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown) + .findFirst().get(); + List dirs = new ArrayList<>(); + broker.logManager().liveLogDirs().foreach(dirs::add); assertEquals(1, dirs.size()); - CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.get(0).toString()); assertTrue(handler.exists()); - assertDoesNotThrow(() -> handler.delete()); + assertDoesNotThrow(handler::delete); // After remove the clean shutdown file, the broker should report unclean shutdown during restart. - startBroker(brokerToBeUncleanShutdown); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 2; - }); + clusterInstance.startBroker(brokerToBeUncleanShutdown); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 2); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertNull(topicPartitionInfo.leader()); assertEquals(1, topicPartitionInfo.lastKnownElr().size()); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); } } /* This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. */ - @Test + @ClusterTest(metadataVersion = MetadataVersion.IBP_4_0_IV1) public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { adminClient.createTopics( List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); + clusterInstance.waitTopicCreation(testTopicName, 1); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Collection ops = new ArrayList<>(); @@ -379,49 +338,41 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep assertEquals(0, topicPartitionInfo.elr().size()); assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); - killBroker(initialReplicas.get(3).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); Set initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet()); - brokers().foreach(broker -> { - if (initialReplicaSet.contains(broker.config().brokerId())) { - Seq dirs = broker.logManager().liveLogDirs(); + clusterInstance.brokers().forEach((id, broker) -> { + if (initialReplicaSet.contains(id)) { + List dirs = new ArrayList<>(); + broker.logManager().liveLogDirs().foreach(dirs::add); assertEquals(1, dirs.size()); - CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); - assertDoesNotThrow(() -> handler.delete()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.get(0).toString()); + assertDoesNotThrow(handler::delete); } - return true; }); - // After remove the clean shutdown file, the broker should report unclean shutdown during restart. topicPartitionInfo.replicas().forEach(replica -> { - if (replica.id() != lastKnownLeader) startBroker(replica.id()); - }); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 1; + if (replica.id() != lastKnownLeader) clusterInstance.startBroker(replica.id()); }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 1); topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) .allTopicNames().get().get(testTopicName).partitions().get(0); assertNull(topicPartitionInfo.leader()); assertEquals(1, topicPartitionInfo.lastKnownElr().size()); // Now if the last known leader goes through unclean shutdown, it will still be elected. - startBroker(lastKnownLeader); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize > 0 && elrSize == 0; - }); - - TestUtils.waitUntilTrue( + clusterInstance.startBroker(lastKnownLeader); + waitForIsrAndElr((isrSize, elrSize) -> isrSize > 0 && elrSize == 0); + TestUtils.waitForCondition( () -> { try { TopicPartitionInfo partition = adminClient.describeTopics(List.of(testTopicName)) @@ -432,16 +383,16 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep return false; } }, - () -> String.format("Partition metadata for %s is not correct", testTopicName), - DEFAULT_MAX_WAIT_MS, 100L + DEFAULT_MAX_WAIT_MS, + () -> String.format("Partition metadata for %s is not correct", testTopicName) ); } finally { - restartDeadBrokers(false); + clusterInstance.restartDeadBrokers(); } } - void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied) { - TestUtils.waitUntilTrue( + void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied) throws InterruptedException { + TestUtils.waitForCondition( () -> { try { TopicDescription topicDescription = adminClient.describeTopics(List.of(testTopicName)) @@ -452,7 +403,8 @@ void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatis return false; } }, - () -> String.format("Partition metadata for %s is not propagated", testTopicName), - DEFAULT_MAX_WAIT_MS, 100L); + DEFAULT_MAX_WAIT_MS, + () -> String.format("Partition metadata for %s is not propagated", testTopicName) + ); } } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java index 243ca5394d54f..32871e5e2926e 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java @@ -444,6 +444,16 @@ default List boundPorts() { .map(KafkaBroker::socketServer) .map(s -> s.boundPort(clientListener())) .toList(); + } + default void restartDeadBrokers() throws InterruptedException { + if (brokers().isEmpty()) { + throw new IllegalArgumentException("Must supply at least one server config."); + } + brokers().entrySet().forEach(entry -> { + if (entry.getValue().isShutdown()) { + startBroker(entry.getKey()); + } + }); } }