From 6c4fba4400a2f1fc2c85fd73c2d70e94b930e065 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Fri, 25 Jul 2025 15:51:26 -0500 Subject: [PATCH 01/14] fix indentation --- build.gradle | 1 + .../kafka/api/IntegrationTestHarness.scala | 8 +- .../api/PlaintextAdminIntegrationTest.scala | 85 +++++++++++++++++++ .../kafka/server/QuorumTestHarness.scala | 8 ++ 4 files changed, 101 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 5f9e81789cc2b..7d41c91963a1d 100644 --- a/build.gradle +++ b/build.gradle @@ -1038,6 +1038,7 @@ project(':core') { testImplementation project(':server-common').sourceSets.test.output testImplementation project(':storage:storage-api').sourceSets.test.output testImplementation project(':server').sourceSets.test.output + testImplementation project(':streams').sourceSets.test.output testImplementation project(':test-common:test-common-runtime') testImplementation project(':test-common:test-common-internal-api') testImplementation project(':test-common:test-common-util') diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 7c08dd9c3fe7d..0522f6e145860 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -30,11 +30,12 @@ import kafka.security.JaasTestUtils import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData} import org.apache.kafka.common.network.{ConnectionMode, ListenerName} -import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer} +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serdes, Serializer} import org.apache.kafka.common.utils.Utils import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.MetadataLogConfig import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs} +import org.apache.kafka.streams.StreamsConfig import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import scala.collection.mutable @@ -56,6 +57,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { val superuserClientConfig = new Properties val serverConfig = new Properties val controllerConfig = new Properties + var streamsGroupConfig = new Properties private val consumers = mutable.Buffer[Consumer[_, _]]() private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]() @@ -160,6 +162,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + streamsGroupConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + streamsGroupConfig.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, classOf[Serdes.ByteArraySerde].getName) + streamsGroupConfig.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, classOf[Serdes.ByteArraySerde].getName) + doSuperuserSetup(testInfo) if (createOffsetsTopic) { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 44835885e0c34..2857966c321ff 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -56,6 +56,7 @@ import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVe import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} +import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig} import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows} import org.apache.logging.log4j.core.config.Configurator import org.junit.jupiter.api.Assertions._ @@ -4363,6 +4364,90 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) + def testDescribeStreamsGroups(groupProtocol: String): Unit = { + val streamsGroupId = "stream_group_id" + val testTopicName = "test_topic" + val testOutputTopicName = "test_output_topic" + val testNumPartitions = 1 + + val config = createConfig + client = Admin.create(config) + + val featureUpdates = Collections.singletonMap("streams.version", new FeatureUpdate(1.toShort, FeatureUpdate.UpgradeType.UPGRADE)) + client.updateFeatures(featureUpdates, new UpdateFeaturesOptions()).all.get() + + client.createTopics(util.Set.of( + new NewTopic(testTopicName, testNumPartitions, 1.toShort), + new NewTopic(testOutputTopicName, testNumPartitions, 1.toShort) + )).all().get() + waitForTopics(client, Seq(testTopicName, testOutputTopicName), List()) + + val producer = createProducer() + try { + producer.send(new ProducerRecord(testTopicName, 0, "key".getBytes, "value".getBytes)).get() + } finally { + Utils.closeQuietly(producer, "producer") + } + + val streamsConfig = new Properties(streamsGroupConfig) + streamsConfig.putAll(streamsGroupConfig) + streamsConfig.put( StreamsConfig.APPLICATION_ID_CONFIG, streamsGroupId) + streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) + + val builder = new StreamsBuilder() + builder.stream[String, String](testTopicName).to(testOutputTopicName) + val streams = new KafkaStreams(builder.build(), streamsConfig) + + try { + streams.cleanUp() + streams.start() + + TestUtils.waitUntilTrue(() => streams.state() == KafkaStreams.State.RUNNING, "Streams not in RUNNING state") + + TestUtils.waitUntilTrue(() => { + client.listGroups().all().get().stream() + .anyMatch(g => g.groupId() == streamsGroupId) + }, "Streams group not ready to describe yet") + + TestUtils.waitUntilTrue(() => { + try { + val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() + val group = describedGroups.get(streamsGroupId) + if (group != null) { + group.groupState() == GroupState.STABLE && !group.subtopologies().isEmpty + } else { + false + } + } catch { + case _: Exception => false + } + }, "Stream group not fully initialized with topology") + + // Verify the describe call works correctly + val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() + val group = describedGroups.get(streamsGroupId) + + assertNotNull(group) + assertEquals(streamsGroupId, group.groupId()) + assertFalse(group.members().isEmpty) + assertNotNull(group.subtopologies()) + assertFalse(group.subtopologies().isEmpty) + + // Verify the topology contains the expected source and sink topics + val subtopologies = group.subtopologies().asScala + assertTrue(subtopologies.exists(subtopology => + subtopology.sourceTopics().contains(testTopicName))) + + } finally { + Utils.closeQuietly(streams, "streams") + Utils.closeQuietly(client, "adminClient") + } + } } object PlaintextAdminIntegrationTest { diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 3d5837b92d0d7..f74ac79932e6f 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -44,6 +44,7 @@ import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion} import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.apache.kafka.server.util.timer.SystemTimer +import org.apache.kafka.streams.GroupProtocol.STREAMS import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo} import org.junit.jupiter.params.provider.Arguments @@ -420,4 +421,11 @@ object QuorumTestHarness { Arguments.of(GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) ) } + + // For tests that only work with the streams group protocol + def getTestGroupProtocolParametersStreamsGroupProtocolOnly: java.util.stream.Stream[Arguments] = { + stream.Stream.of( + Arguments.of(STREAMS.name.toLowerCase(Locale.ROOT)) + ) + } } From c272552e1ae69eb45641c2da9da0693d5f512b2f Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Fri, 25 Jul 2025 16:55:29 -0500 Subject: [PATCH 02/14] clean up test --- .../api/PlaintextAdminIntegrationTest.scala | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 2857966c321ff..af58d6cc5fd26 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4376,24 +4376,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val config = createConfig client = Admin.create(config) - val featureUpdates = Collections.singletonMap("streams.version", new FeatureUpdate(1.toShort, FeatureUpdate.UpgradeType.UPGRADE)) - client.updateFeatures(featureUpdates, new UpdateFeaturesOptions()).all.get() - - client.createTopics(util.Set.of( - new NewTopic(testTopicName, testNumPartitions, 1.toShort), - new NewTopic(testOutputTopicName, testNumPartitions, 1.toShort) - )).all().get() - waitForTopics(client, Seq(testTopicName, testOutputTopicName), List()) - - val producer = createProducer() - try { - producer.send(new ProducerRecord(testTopicName, 0, "key".getBytes, "value".getBytes)).get() - } finally { - Utils.closeQuietly(producer, "producer") - } + prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions) + prepareRecords(testTopicName) val streamsConfig = new Properties(streamsGroupConfig) - streamsConfig.putAll(streamsGroupConfig) streamsConfig.put( StreamsConfig.APPLICATION_ID_CONFIG, streamsGroupId) streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") From 503483cebf3563ca12700e8ca39620fe80694838 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Fri, 25 Jul 2025 23:44:17 -0500 Subject: [PATCH 03/14] add depency --- build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/build.gradle b/build.gradle index 7d41c91963a1d..e16512b558abf 100644 --- a/build.gradle +++ b/build.gradle @@ -1039,6 +1039,7 @@ project(':core') { testImplementation project(':storage:storage-api').sourceSets.test.output testImplementation project(':server').sourceSets.test.output testImplementation project(':streams').sourceSets.test.output + testImplementation project(':streams') testImplementation project(':test-common:test-common-runtime') testImplementation project(':test-common:test-common-internal-api') testImplementation project(':test-common:test-common-util') From 71e0ab08df650bdd073c40aaae35c1c6b1def11b Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 28 Jul 2025 13:02:06 -0500 Subject: [PATCH 04/14] add deleteStreamsGroup integration test --- .../api/PlaintextAdminIntegrationTest.scala | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index af58d6cc5fd26..300121c7b7ffc 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4434,6 +4434,98 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Utils.closeQuietly(client, "adminClient") } } + + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) + def testDeleteStreamsGroups(groupProtocol: String): Unit = { + val testTopicName = "test_topic" + val testOutputTopicName = "test_output_topic" + val testNumPartitions = 3 + val testNumStreamsGroup = 3 + + val targetDeletedGroups = util.List.of("stream_group_id_2", "stream_group_id_3") + val targetRemainingGroups = util.List.of("stream_group_id_1") + + val config = createConfig + client = Admin.create(config) + + prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions) + prepareRecords(testTopicName) + + val streamsList = scala.collection.mutable.ListBuffer[(String, KafkaStreams)]() + + + try { + for (i <- 1 to testNumStreamsGroup) { + val streamsGroupId = s"stream_group_id_$i" + val streamsConfig = new Properties(streamsGroupConfig) + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, streamsGroupId) + streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) + + val builder = new StreamsBuilder() + builder.stream[String, String](testTopicName).to(testOutputTopicName) + val streams = new KafkaStreams(builder.build(), streamsConfig) + streams.cleanUp() + streams.start() + streamsList += ((streamsGroupId, streams)) + } + + TestUtils.waitUntilTrue(() => { + client.listGroups().all().get().stream() + .anyMatch(g => g.groupId().startsWith("stream_group_id_")) + }, "Streams groups not ready to delete yet") + + // Verify that there are 3 groups created + val groups = client.listGroups().all().get() + assertEquals(testNumStreamsGroup, groups.size()) + + // Test deletion of non-empty existing groups + var deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) + assertFutureThrows(classOf[GroupNotEmptyException], deleteStreamsGroupResult.all()) + assertEquals(deleteStreamsGroupResult.deletedGroups().size(),2) + + // Stop and clean up the streams for the groups that are going to be deleted + streamsList + .filter { case (groupId, _) => targetDeletedGroups.contains(groupId) } + .foreach { case (_, streams) => + streams.close(java.time.Duration.ofSeconds(10)) + streams.cleanUp() + } + + // Test deletion of emptied existing streams groups + deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) + assertEquals(deleteStreamsGroupResult.deletedGroups().size(),2) + + // Wait for the deleted groups to be removed + TestUtils.waitUntilTrue(() => { + val groupIds = client.listGroups().all().get().asScala.map(_.groupId()).toSet + targetDeletedGroups.asScala.forall(id => !groupIds.contains(id)) + }, "Deleted groups not yet deleted") + + // Verify that the deleted groups are no longer present + val remainingGroups = client.listGroups().all().get() + assertEquals(targetRemainingGroups.size(), remainingGroups.size()) + remainingGroups.stream().forEach(g => { + assertTrue(targetRemainingGroups.contains(g.groupId())) + }) + + // Test deletion of a non-existing group + val nonExistingGroup = "non_existing_stream_group" + val deleteNonExistingGroupResult = client.deleteStreamsGroups(util.List.of(nonExistingGroup)) + assertFutureThrows(classOf[GroupIdNotFoundException], deleteNonExistingGroupResult.all()) + assertEquals(deleteNonExistingGroupResult.deletedGroups().size(), 1) + + } finally{ + streamsList.foreach { case (_, streams) => + streams.close(java.time.Duration.ofSeconds(10)) + streams.cleanUp() + } + Utils.closeQuietly(client, "adminClient") + } + } } object PlaintextAdminIntegrationTest { From 6524912702c1951f51d9e38fd011e707f55c014a Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 28 Jul 2025 18:10:33 -0500 Subject: [PATCH 05/14] remove redundant code --- .../kafka/api/IntegrationTestHarness.scala | 15 +++++++++++++- .../api/PlaintextAdminIntegrationTest.scala | 20 +++++++++++-------- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 0522f6e145860..6573a88a5707c 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.MetadataLogConfig import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs} -import org.apache.kafka.streams.StreamsConfig +import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig} import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import scala.collection.mutable @@ -241,6 +241,19 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { streamsConsumer } + def createStreamsGroup[K, V](configOverrides: Properties = new Properties, + configsToRemove: List[String] = List(), + inputTopic: String, + outputTopic: String): KafkaStreams = { + val streamsConfig = new Properties(streamsGroupConfig) + streamsConfig ++= configOverrides + configsToRemove.foreach(streamsConfig.remove(_)) + val builder = new StreamsBuilder() + builder.stream[K, V](inputTopic).to(outputTopic) + val streams = new KafkaStreams(builder.build(), streamsConfig) + streams + } + def createAdminClient( listenerName: ListenerName = listenerName, configOverrides: Properties = new Properties diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 300121c7b7ffc..35443cf1670f5 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -56,7 +56,7 @@ import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVe import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} -import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig} +import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows} import org.apache.logging.log4j.core.config.Configurator import org.junit.jupiter.api.Assertions._ @@ -4385,9 +4385,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) - val builder = new StreamsBuilder() - builder.stream[String, String](testTopicName).to(testOutputTopicName) - val streams = new KafkaStreams(builder.build(), streamsConfig) + val streams = createStreamsGroup(configOverrides = streamsConfig, inputTopic = testTopicName, outputTopic = testOutputTopicName) try { streams.cleanUp() @@ -4400,6 +4398,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { .anyMatch(g => g.groupId() == streamsGroupId) }, "Streams group not ready to describe yet") + Thread.sleep(10000) + TestUtils.waitUntilTrue(() => { try { val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() @@ -4455,7 +4455,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val streamsList = scala.collection.mutable.ListBuffer[(String, KafkaStreams)]() - try { for (i <- 1 to testNumStreamsGroup) { val streamsGroupId = s"stream_group_id_$i" @@ -4465,9 +4464,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) - val builder = new StreamsBuilder() - builder.stream[String, String](testTopicName).to(testOutputTopicName) - val streams = new KafkaStreams(builder.build(), streamsConfig) + val streams = createStreamsGroup(configOverrides = streamsConfig, inputTopic = testTopicName, outputTopic = testOutputTopicName) streams.cleanUp() streams.start() streamsList += ((streamsGroupId, streams)) @@ -4526,6 +4523,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Utils.closeQuietly(client, "adminClient") } } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) + def testListStreamsGroupOffsets(groupProtocol: String): Unit = { + + } + } object PlaintextAdminIntegrationTest { From d923d0146ea3583c5b7d4c7ebc7b7fc576746f7b Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 28 Jul 2025 18:20:55 -0500 Subject: [PATCH 06/14] remove parts that shouldn't be included --- .../kafka/api/PlaintextAdminIntegrationTest.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 35443cf1670f5..4b6c6ba9ad13e 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4523,13 +4523,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Utils.closeQuietly(client, "adminClient") } } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) - def testListStreamsGroupOffsets(groupProtocol: String): Unit = { - - } - } object PlaintextAdminIntegrationTest { From 41964340a9bbae5b27d0f9345216681478e203c0 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 28 Jul 2025 18:26:51 -0500 Subject: [PATCH 07/14] cleanup spacing --- .../integration/kafka/api/PlaintextAdminIntegrationTest.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 4b6c6ba9ad13e..f53c801bff585 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4384,7 +4384,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) - val streams = createStreamsGroup(configOverrides = streamsConfig, inputTopic = testTopicName, outputTopic = testOutputTopicName) try { @@ -4435,7 +4434,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) def testDeleteStreamsGroups(groupProtocol: String): Unit = { @@ -4463,7 +4461,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) - val streams = createStreamsGroup(configOverrides = streamsConfig, inputTopic = testTopicName, outputTopic = testOutputTopicName) streams.cleanUp() streams.start() From 316a417e872922fe5d1bf18d0afb0284a64e4250 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 28 Jul 2025 22:29:28 -0500 Subject: [PATCH 08/14] reorganize code --- .../kafka/api/IntegrationTestHarness.scala | 6 ++++- .../api/PlaintextAdminIntegrationTest.scala | 25 +++++++++++++------ 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 6573a88a5707c..d9391fd323f1e 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -244,8 +244,12 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { def createStreamsGroup[K, V](configOverrides: Properties = new Properties, configsToRemove: List[String] = List(), inputTopic: String, - outputTopic: String): KafkaStreams = { + outputTopic: String, + streamsGroupId: String, + groupProtocol: String): KafkaStreams = { val streamsConfig = new Properties(streamsGroupConfig) + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, streamsGroupId) + streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) streamsConfig ++= configOverrides configsToRemove.foreach(streamsConfig.remove(_)) val builder = new StreamsBuilder() diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index f53c801bff585..7b35da4400556 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4379,12 +4379,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions) prepareRecords(testTopicName) - val streamsConfig = new Properties(streamsGroupConfig) - streamsConfig.put( StreamsConfig.APPLICATION_ID_CONFIG, streamsGroupId) - streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val streamsConfig = new Properties() streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) - val streams = createStreamsGroup(configOverrides = streamsConfig, inputTopic = testTopicName, outputTopic = testOutputTopicName) + val streams = createStreamsGroup( + configOverrides = streamsConfig, + inputTopic = testTopicName, + outputTopic = testOutputTopicName, + streamsGroupId = streamsGroupId, + groupProtocol = groupProtocol + ) try { streams.cleanUp() @@ -4456,12 +4460,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { try { for (i <- 1 to testNumStreamsGroup) { val streamsGroupId = s"stream_group_id_$i" - val streamsConfig = new Properties(streamsGroupConfig) - streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, streamsGroupId) - streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + val streamsConfig = new Properties() + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) - val streams = createStreamsGroup(configOverrides = streamsConfig, inputTopic = testTopicName, outputTopic = testOutputTopicName) + val streams = createStreamsGroup( + configOverrides = streamsConfig, + inputTopic = testTopicName, + outputTopic = testOutputTopicName, + streamsGroupId = streamsGroupId, + groupProtocol = groupProtocol + ) streams.cleanUp() streams.start() streamsList += ((streamsGroupId, streams)) From a50ed7ee5dc0d5d13d5f399c26139054604df51d Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 28 Jul 2025 22:46:04 -0500 Subject: [PATCH 09/14] reorganize code --- .../integration/kafka/api/PlaintextAdminIntegrationTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 7b35da4400556..846bccdd683ee 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4461,7 +4461,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { for (i <- 1 to testNumStreamsGroup) { val streamsGroupId = s"stream_group_id_$i" val streamsConfig = new Properties() - streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) val streams = createStreamsGroup( From ad59b001cc16224ce9bcd94f838ada5a57564f01 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 4 Aug 2025 17:49:05 -0500 Subject: [PATCH 10/14] revise describeStreamsGroup wait until stable --- .../api/PlaintextAdminIntegrationTest.scala | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 846bccdd683ee..500cd1bfa2576 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4401,26 +4401,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { .anyMatch(g => g.groupId() == streamsGroupId) }, "Streams group not ready to describe yet") - Thread.sleep(10000) - TestUtils.waitUntilTrue(() => { - try { - val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() - val group = describedGroups.get(streamsGroupId) - if (group != null) { - group.groupState() == GroupState.STABLE && !group.subtopologies().isEmpty - } else { - false - } - } catch { - case _: Exception => false - } - }, "Stream group not fully initialized with topology") + val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null) + firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId + }, "Stream group not stable yet") // Verify the describe call works correctly val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() val group = describedGroups.get(streamsGroupId) - assertNotNull(group) assertEquals(streamsGroupId, group.groupId()) assertFalse(group.members().isEmpty) From 03f64e438b1199e624735cd430f912d064e71d2b Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 4 Aug 2025 22:48:23 -0500 Subject: [PATCH 11/14] add integration test for listgroups for streams protocol --- build.gradle | 1 + .../api/PlaintextAdminIntegrationTest.scala | 65 +++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/build.gradle b/build.gradle index c71a46ac5ee81..a900094251ecf 100644 --- a/build.gradle +++ b/build.gradle @@ -2273,6 +2273,7 @@ project(':storage') { testImplementation project(':server') testImplementation project(':server-common') testImplementation project(':server-common').sourceSets.test.output + testImplementation project(':streams') testImplementation project(':transaction-coordinator') testImplementation libs.hamcrest testImplementation libs.jacksonDatabindYaml diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 500cd1bfa2576..23608dceea355 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4365,6 +4365,71 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) + def testListStreamsGroups(groupProtocol: String): Unit = { + val streamsGroupIdPrefix = "stream_group_id" + val testInputTopicNamePrefix = "test_topic" + val testOutputTopicNamePrefix = "test_output_topic" + val testNumPartitions = 1 + val testNumStreamsGroup = 3 + + val config = createConfig + client = Admin.create(config) + val streamsList = scala.collection.mutable.ListBuffer[(String, KafkaStreams)]() + + val streamsConfig = new Properties() + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) + + for (i <- 1 to testNumStreamsGroup) { + val streamsGroupId = s"$streamsGroupIdPrefix-$i" + val inputTopicName = s"$testInputTopicNamePrefix-$i" + val outputTopicName = s"$testOutputTopicNamePrefix-$i" + prepareTopics(List(inputTopicName, outputTopicName), testNumPartitions) + + val streams = createStreamsGroup( + configOverrides = streamsConfig, + inputTopic = inputTopicName, + outputTopic = outputTopicName, + streamsGroupId = streamsGroupId, + groupProtocol = groupProtocol + ) + streamsList += ((streamsGroupId, streams)) + } + + try { + for ((streamsGroupId, streams) <- streamsList) { + streams.cleanUp() + streams.start() + TestUtils.waitUntilTrue(() => streams.state() == KafkaStreams.State.RUNNING, "Streams not in RUNNING state") + } + + TestUtils.waitUntilTrue(() => { + client.listGroups().all().get().stream() != null + }, "Streams group not ready to describe yet") + + val groups = client.listGroups().all().get() + assertEquals(testNumStreamsGroup, groups.size()) + + val groupsSeq = groups.asScala.toSeq + val expected = (1 to testNumStreamsGroup).map { i => + (s"stream_group_id-$i", s"test_topic-$i", s"test_output_topic-$i") + } + expected.foreach { case (expectedGroupId, expectedInputTopic, expectedOutputTopic) => + assert(groupsSeq.exists { group => + group.groupId() == expectedGroupId && + group.protocol() == groupProtocol + }) + } + } finally { + for ((_, streams) <- streamsList) { + Utils.closeQuietly(streams, "streams") + } + Utils.closeQuietly(client, "adminClient") + } + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) def testDescribeStreamsGroups(groupProtocol: String): Unit = { From f33fddb2da5091102ac5741a2e2fa64f1024090e Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Tue, 5 Aug 2025 15:50:05 -0500 Subject: [PATCH 12/14] move list streams group test into existing listGroups test --- .../api/PlaintextAdminIntegrationTest.scala | 110 +++++++----------- 1 file changed, 41 insertions(+), 69 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 23608dceea355..6f32ec608e6e2 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -57,6 +57,7 @@ import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConf import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} +import org.apache.kafka.streams.GroupProtocol.STREAMS import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows} import org.apache.logging.log4j.core.config.Configurator import org.junit.jupiter.api.Assertions._ @@ -2574,7 +2575,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val consumerGroupId = "consumer_group_id" val shareGroupId = "share_group_id" val simpleGroupId = "simple_group_id" + val streamsGroupId = "streams_group_id" val testTopicName = "test_topic" + val testStreamsOutputTopicName = "test_streams_output_topic" consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name) val classicGroupConfig = new Properties(consumerConfig) @@ -2590,12 +2593,26 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId) val shareGroup = createShareConsumer(configOverrides = shareGroupConfig) + val streamsConfig = new Properties() + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) + + val streamsGroup = createStreamsGroup( + configOverrides = streamsConfig, + inputTopic = testTopicName, + outputTopic = testStreamsOutputTopicName, + streamsGroupId = streamsGroupId, + groupProtocol = STREAMS.toString + ) + val config = createConfig client = Admin.create(config) try { client.createTopics(util.Set.of( - new NewTopic(testTopicName, 1, 1.toShort) + new NewTopic(testTopicName, 1, 1.toShort), + new NewTopic(testStreamsOutputTopicName, 1, 1.toShort) )).all().get() + waitForTopics(client, List(testTopicName), List()) val topicPartition = new TopicPartition(testTopicName, 0) @@ -2605,6 +2622,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { consumerGroup.poll(JDuration.ofMillis(1000)) shareGroup.subscribe(util.Set.of(testTopicName)) shareGroup.poll(JDuration.ofMillis(1000)) + streamsGroup.start() val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(simpleGroupId, util.Map.of(topicPartition, new OffsetAndMetadata(0L))) @@ -2613,18 +2631,27 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitUntilTrue(() => { val groups = client.listGroups().all().get() - groups.size() == 4 + groups.size() == 5 }, "Expected to find all groups") val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)) val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)) val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY)) + // Streams group could either be in STABLE or NOT_READY state + val streamsGroupListingStable = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)) + val streamsGroupListingNotReady = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY)) var listGroupsResult = client.listGroups() assertTrue(listGroupsResult.errors().get().isEmpty) - assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.all().get().asScala.toSet) - assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.valid().get().asScala.toSet) + + val expectedStreamListings = Set(streamsGroupListingStable, streamsGroupListingNotReady) + val expectedListings = Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing) + val actualListings = listGroupsResult.all().get().asScala.toSet + + // Check that actualListings contains all expectedListings and one of the streams listings + assertTrue(expectedListings.subsetOf(actualListings)) + assertTrue(actualListings.exists(expectedStreamListings.contains)) listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(util.Set.of(GroupType.CLASSIC))) assertTrue(listGroupsResult.errors().get().isEmpty) @@ -2640,11 +2667,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(listGroupsResult.errors().get().isEmpty) assertEquals(Set(shareGroupListing), listGroupsResult.all().get().asScala.toSet) assertEquals(Set(shareGroupListing), listGroupsResult.valid().get().asScala.toSet) + + listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(util.Set.of(GroupType.STREAMS))) + assertTrue(listGroupsResult.errors().get().isEmpty) + assertTrue(listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingStable)) || + listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingNotReady))) + assertTrue(listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingStable)) || + listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingNotReady))) + + } finally { Utils.closeQuietly(classicGroup, "classicGroup") Utils.closeQuietly(consumerGroup, "consumerGroup") Utils.closeQuietly(shareGroup, "shareGroup") Utils.closeQuietly(client, "adminClient") + Utils.closeQuietly(streamsGroup, "streamsGroup") } } @@ -4365,71 +4402,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) - def testListStreamsGroups(groupProtocol: String): Unit = { - val streamsGroupIdPrefix = "stream_group_id" - val testInputTopicNamePrefix = "test_topic" - val testOutputTopicNamePrefix = "test_output_topic" - val testNumPartitions = 1 - val testNumStreamsGroup = 3 - - val config = createConfig - client = Admin.create(config) - val streamsList = scala.collection.mutable.ListBuffer[(String, KafkaStreams)]() - - val streamsConfig = new Properties() - streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) - - for (i <- 1 to testNumStreamsGroup) { - val streamsGroupId = s"$streamsGroupIdPrefix-$i" - val inputTopicName = s"$testInputTopicNamePrefix-$i" - val outputTopicName = s"$testOutputTopicNamePrefix-$i" - prepareTopics(List(inputTopicName, outputTopicName), testNumPartitions) - - val streams = createStreamsGroup( - configOverrides = streamsConfig, - inputTopic = inputTopicName, - outputTopic = outputTopicName, - streamsGroupId = streamsGroupId, - groupProtocol = groupProtocol - ) - streamsList += ((streamsGroupId, streams)) - } - - try { - for ((streamsGroupId, streams) <- streamsList) { - streams.cleanUp() - streams.start() - TestUtils.waitUntilTrue(() => streams.state() == KafkaStreams.State.RUNNING, "Streams not in RUNNING state") - } - - TestUtils.waitUntilTrue(() => { - client.listGroups().all().get().stream() != null - }, "Streams group not ready to describe yet") - - val groups = client.listGroups().all().get() - assertEquals(testNumStreamsGroup, groups.size()) - - val groupsSeq = groups.asScala.toSeq - val expected = (1 to testNumStreamsGroup).map { i => - (s"stream_group_id-$i", s"test_topic-$i", s"test_output_topic-$i") - } - expected.foreach { case (expectedGroupId, expectedInputTopic, expectedOutputTopic) => - assert(groupsSeq.exists { group => - group.groupId() == expectedGroupId && - group.protocol() == groupProtocol - }) - } - } finally { - for ((_, streams) <- streamsList) { - Utils.closeQuietly(streams, "streams") - } - Utils.closeQuietly(client, "adminClient") - } - } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) def testDescribeStreamsGroups(groupProtocol: String): Unit = { From 069a406a9f9ce2a43dc97d983a0ef8a80d575cb8 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Tue, 26 Aug 2025 12:32:15 -0500 Subject: [PATCH 13/14] modify tests --- .../kafka/api/IntegrationTestHarness.scala | 18 +++++++++++ .../api/PlaintextAdminIntegrationTest.scala | 30 +++++++++++-------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index d9391fd323f1e..7c2d3e6b687f4 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -258,6 +258,24 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { streams } + def createStreamsGroupWithAggregation[K, V](configOverrides: Properties = new Properties, + configsToRemove: List[String] = List(), + inputTopic: String, + outputTopic: String, + streamsGroupId: String, + groupProtocol: String): KafkaStreams = { + val streamsConfig = new Properties(streamsGroupConfig) + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, streamsGroupId) + streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + streamsConfig ++= configOverrides + configsToRemove.foreach(streamsConfig.remove(_)) + val builder = new StreamsBuilder() + + builder.stream[K, V](inputTopic).groupByKey.count().toStream.to(outputTopic) + val streams = new KafkaStreams(builder.build(), streamsConfig) + streams + } + def createAdminClient( listenerName: ListenerName = listenerName, configOverrides: Properties = new Properties diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 6f32ec608e6e2..7156c38df9d51 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4433,11 +4433,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitUntilTrue(() => streams.state() == KafkaStreams.State.RUNNING, "Streams not in RUNNING state") - TestUtils.waitUntilTrue(() => { - client.listGroups().all().get().stream() - .anyMatch(g => g.groupId() == streamsGroupId) - }, "Streams group not ready to describe yet") - TestUtils.waitUntilTrue(() => { val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null) firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId @@ -4457,6 +4452,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(subtopologies.exists(subtopology => subtopology.sourceTopics().contains(testTopicName))) + // Test describing a non-existing group + val nonExistingGroup = "non_existing_stream_group" + val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup)) + assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all()) + } finally { Utils.closeQuietly(streams, "streams") Utils.closeQuietly(client, "adminClient") @@ -4488,7 +4488,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val streamsConfig = new Properties() streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) - val streams = createStreamsGroup( + val streams = createStreamsGroupWithAggregation( configOverrides = streamsConfig, inputTopic = testTopicName, outputTopic = testOutputTopicName, @@ -4501,14 +4501,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } TestUtils.waitUntilTrue(() => { - client.listGroups().all().get().stream() - .anyMatch(g => g.groupId().startsWith("stream_group_id_")) + val groups = client.listGroups().all().get() + groups.stream() + .anyMatch(g => g.groupId().startsWith("stream_group_id_")) && testNumStreamsGroup == groups.size() }, "Streams groups not ready to delete yet") - // Verify that there are 3 groups created - val groups = client.listGroups().all().get() - assertEquals(testNumStreamsGroup, groups.size()) - // Test deletion of non-empty existing groups var deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) assertFutureThrows(classOf[GroupNotEmptyException], deleteStreamsGroupResult.all()) @@ -4522,9 +4519,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { streams.cleanUp() } + var listTopicResult = client.listTopics() + assertEquals(5, listTopicResult.names().get().size())// input + output topic + 3 internal topics + // Test deletion of emptied existing streams groups deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) - assertEquals(deleteStreamsGroupResult.deletedGroups().size(),2) + assertEquals(2, deleteStreamsGroupResult.deletedGroups().size()) // Wait for the deleted groups to be removed TestUtils.waitUntilTrue(() => { @@ -4539,6 +4539,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(targetRemainingGroups.contains(g.groupId())) }) + // Verify internal topics persists after deletion of corresponding streams groups + listTopicResult = client.listTopics() + assertEquals(5, listTopicResult.names().get().size())// input + output topic + 3 internal topics + // Test deletion of a non-existing group val nonExistingGroup = "non_existing_stream_group" val deleteNonExistingGroupResult = client.deleteStreamsGroups(util.List.of(nonExistingGroup)) From d199beb9d3766b44ed01ca9a5855d41a61b20cb5 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Fri, 29 Aug 2025 10:13:02 -0500 Subject: [PATCH 14/14] debug --- core/src/test/resources/log4j2.yaml | 3 + .../kafka/api/IntegrationTestHarness.scala | 118 +++++-- .../api/PlaintextAdminIntegrationTest.scala | 301 +++++++++--------- 3 files changed, 235 insertions(+), 187 deletions(-) diff --git a/core/src/test/resources/log4j2.yaml b/core/src/test/resources/log4j2.yaml index 016a542689b4e..91cd1530ec650 100644 --- a/core/src/test/resources/log4j2.yaml +++ b/core/src/test/resources/log4j2.yaml @@ -36,3 +36,6 @@ Configuration: - name: org.apache.kafka level: WARN + + - name: org.apache.kafka.coordinator.group + level: INFO diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 7c2d3e6b687f4..07ab9612dc668 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -22,24 +22,26 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume import kafka.utils.TestUtils import kafka.utils.Implicits._ -import java.util.{Optional, Properties} +import java.util +import java.util.{Optional, Properties, UUID} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness import kafka.security.JaasTestUtils import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} -import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData} +import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData, StreamsRebalanceListener} import org.apache.kafka.common.network.{ConnectionMode, ListenerName} -import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serdes, Serializer} +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer} import org.apache.kafka.common.utils.Utils import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.MetadataLogConfig import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs} -import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig} -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} + +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import scala.collection.mutable import scala.collection.Seq +import scala.jdk.CollectionConverters._ import scala.jdk.javaapi.OptionConverters /** @@ -57,7 +59,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { val superuserClientConfig = new Properties val serverConfig = new Properties val controllerConfig = new Properties - var streamsGroupConfig = new Properties + private val consumers = mutable.Buffer[Consumer[_, _]]() private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]() @@ -162,10 +164,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) - streamsGroupConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) - streamsGroupConfig.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, classOf[Serdes.ByteArraySerde].getName) - streamsGroupConfig.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, classOf[Serdes.ByteArraySerde].getName) - doSuperuserSetup(testInfo) if (createOffsetsTopic) { @@ -245,35 +243,91 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { configsToRemove: List[String] = List(), inputTopic: String, outputTopic: String, - streamsGroupId: String, - groupProtocol: String): KafkaStreams = { - val streamsConfig = new Properties(streamsGroupConfig) - streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, streamsGroupId) - streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) - streamsConfig ++= configOverrides - configsToRemove.foreach(streamsConfig.remove(_)) - val builder = new StreamsBuilder() - builder.stream[K, V](inputTopic).to(outputTopic) - val streams = new KafkaStreams(builder.build(), streamsConfig) - streams + streamsGroupId: String): AsyncKafkaConsumer[K, V] = { + val props = new Properties() + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + props ++= configOverrides + configsToRemove.foreach(props.remove(_)) + + val streamsRebalanceData = new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + util.Map.of( + "subtopology-0", new StreamsRebalanceData.Subtopology( + util.Set.of(inputTopic), + util.Set.of(), + util.Map.of(), + util.Map.of(), + util.Set.of() + )), + Map.empty[String, String].asJava + ) + + val consumer = createStreamsConsumer( + keyDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[K]], + valueDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[V]], + configOverrides = props, + streamsRebalanceData = streamsRebalanceData + ) + consumer.subscribe(util.Set.of(inputTopic, outputTopic), + new StreamsRebalanceListener { + override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] = + Optional.empty() + override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] = { + Optional.empty() + } + override def onAllTasksLost(): Optional[Exception] = + Optional.empty() + }) + + consumer.poll(Duration.ofMillis(500L)) + consumer } + def createStreamsGroupWithAggregation[K, V](configOverrides: Properties = new Properties, configsToRemove: List[String] = List(), inputTopic: String, outputTopic: String, streamsGroupId: String, - groupProtocol: String): KafkaStreams = { - val streamsConfig = new Properties(streamsGroupConfig) - streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, streamsGroupId) - streamsConfig.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) - streamsConfig ++= configOverrides - configsToRemove.foreach(streamsConfig.remove(_)) - val builder = new StreamsBuilder() - - builder.stream[K, V](inputTopic).groupByKey.count().toStream.to(outputTopic) - val streams = new KafkaStreams(builder.build(), streamsConfig) - streams + groupProtocol: String): AsyncKafkaConsumer[K, V] = { + val props = new Properties() + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId) + props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + props ++= configOverrides + configsToRemove.foreach(props.remove(_)) + + val streamsRebalanceData = new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + util.Map.of( + "subtopology-0", new StreamsRebalanceData.Subtopology( + util.Set.of(inputTopic), + util.Set.of(), + util.Map.of(), + util.Map.of(outputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())), // state changelog topics for aggregation + util.Set.of() + )), + Map.empty[String, String].asJava + ) + + val consumer = createStreamsConsumer( + keyDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[K]], + valueDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[V]], + configOverrides = props, + streamsRebalanceData = streamsRebalanceData + ) + consumer.subscribe(util.Set.of(inputTopic)) + consumer.poll(Duration.ofMillis(500L)) + consumer } def createAdminClient( diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 7156c38df9d51..6c1365cd2c51d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -56,8 +56,8 @@ import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVe import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} -import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} -import org.apache.kafka.streams.GroupProtocol.STREAMS +//import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} +//import org.apache.kafka.streams.GroupProtocol.STREAMS import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows} import org.apache.logging.log4j.core.config.Configurator import org.junit.jupiter.api.Assertions._ @@ -179,7 +179,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { //null can create default quota val userDefaultEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> Option.empty[String].orNull).asJava) client.alterClientQuotas(util.List.of(new ClientQuotaAlteration(userDefaultEntity, util.Set.of( - new ClientQuotaAlteration.Op("consumer_byte_rate", 100D))))).all().get() + new ClientQuotaAlteration.Op("consumer_byte_rate", 100D))))).all().get() val clientDefaultEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> Option.empty[String].orNull).asJava) client.alterClientQuotas(util.List.of(new ClientQuotaAlteration(clientDefaultEntity, util.Set.of( new ClientQuotaAlteration.Op("producer_byte_rate", 100D))))).all().get() @@ -529,7 +529,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(3, client.listTransactions().all().get().size()) assertEquals(2, client.listTransactions(new ListTransactionsOptions() - .filterStates(util.List.of(TransactionState.COMPLETE_COMMIT))).all().get().size()) + .filterStates(util.List.of(TransactionState.COMPLETE_COMMIT))).all().get().size()) assertEquals(1, client.listTransactions(new ListTransactionsOptions() .filterStates(util.List.of(TransactionState.COMPLETE_ABORT))).all().get().size()) assertEquals(1, client.listTransactions(new ListTransactionsOptions() @@ -768,8 +768,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * describe should not auto create topics - */ + * describe should not auto create topics + */ @Test def testDescribeNonExistingTopic(): Unit = { client = createAdminClient @@ -1274,7 +1274,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { () => s"$desc: Expect InvalidPartitionsException when #brokers != replication factor") assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) exceptionMsgStr = "The manual partition assignment includes a partition with 2 replica(s), but this is not " + - "consistent with previous partitions, which have 1 replica(s)." + "consistent with previous partitions, which have 1 replica(s)." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1315,7 +1315,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { () => s"$desc: Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists") assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) exceptionMsgStr = "The manual partition assignment includes a partition with 2 replica(s), but this is not " + - "consistent with previous partitions, which have 1 replica(s)." + "consistent with previous partitions, which have 1 replica(s)." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1431,7 +1431,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } catch { case e: ExecutionException if e.getCause.isInstanceOf[LeaderNotAvailableException] || e.getCause.isInstanceOf[NotLeaderOrFollowerException] => false - } + } }, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}") } @@ -1463,7 +1463,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = { TestUtils.waitUntilTrue(() => brokers(followerIndex).replicaManager.localLog(topicPartition).isDefined, - "Expected follower to create replica for partition") + "Expected follower to create replica for partition") // wait until the follower discovers that log start offset moved beyond its HW TestUtils.waitUntilTrue(() => { @@ -1751,9 +1751,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, - * since they can be done within the timeout. New calls should receive exceptions. - */ + * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, + * since they can be done within the timeout. New calls should receive exceptions. + */ @Test def testDelayedClose(): Unit = { client = createAdminClient @@ -1768,9 +1768,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long - * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses. - */ + * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long + * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses. + */ @Test def testForceClose(): Unit = { val config = createConfig @@ -1785,9 +1785,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * Check that a call with a timeout does not complete before the minimum timeout has elapsed, - * even when the default request timeout is shorter. - */ + * Check that a call with a timeout does not complete before the minimum timeout has elapsed, + * even when the default request timeout is shorter. + */ @Test def testMinimumRequestTimeouts(): Unit = { val config = createConfig @@ -1803,8 +1803,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * Test injecting timeouts for calls that are in flight. - */ + * Test injecting timeouts for calls that are in flight. + */ @Test def testCallInFlightTimeouts(): Unit = { val config = createConfig @@ -1813,7 +1813,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory() client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory) val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, - new CreateTopicsOptions().validateOnly(true)).all() + new CreateTopicsOptions().validateOnly(true)).all() assertFutureThrows(classOf[TimeoutException], future) val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new NewTopic(_, 1, 1.toShort)).asJava, new CreateTopicsOptions().validateOnly(true)).all() @@ -2595,14 +2595,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val streamsConfig = new Properties() streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) +// streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) val streamsGroup = createStreamsGroup( configOverrides = streamsConfig, inputTopic = testTopicName, outputTopic = testStreamsOutputTopicName, - streamsGroupId = streamsGroupId, - groupProtocol = STREAMS.toString + streamsGroupId = streamsGroupId ) val config = createConfig @@ -2622,7 +2621,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { consumerGroup.poll(JDuration.ofMillis(1000)) shareGroup.subscribe(util.Set.of(testTopicName)) shareGroup.poll(JDuration.ofMillis(1000)) - streamsGroup.start() +// streamsGroup.start() val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(simpleGroupId, util.Map.of(topicPartition, new OffsetAndMetadata(0L))) @@ -3152,9 +3151,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.assertLeader(client, partition2, 1) def assertUnknownTopicOrPartition( - topicPartition: TopicPartition, - result: ElectLeadersResult - ): Unit = { + topicPartition: TopicPartition, + result: ElectLeadersResult + ): Unit = { val exception = result.partitions.get.get(topicPartition).get assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass) assertEquals(s"No such topic as ${topicPartition.topic()}", exception.getMessage) @@ -3191,9 +3190,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1)) def assertPreferredLeaderNotAvailable( - topicPartition: TopicPartition, - result: ElectLeadersResult - ): Unit = { + topicPartition: TopicPartition, + result: ElectLeadersResult + ): Unit = { val exception = result.partitions.get.get(topicPartition).get assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass) assertTrue(exception.getMessage.contains( @@ -3641,7 +3640,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") client.incrementalAlterConfigs(util.Map.of(broker0Resource, util.List.of(new AlterConfigOp(new ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, "123"), - AlterConfigOp.OpType.SET), + AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, "456"), AlterConfigOp.OpType.SET) ))).all().get() @@ -3847,7 +3846,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val longTopicName = String.join("", Collections.nCopies(249, "x")) val invalidTopicName = String.join("", Collections.nCopies(250, "x")) val newTopics2 = util.List.of(new NewTopic(invalidTopicName, 3, 3.toShort), - new NewTopic(longTopicName, 3, 3.toShort)) + new NewTopic(longTopicName, 3, 3.toShort)) val results = client.createTopics(newTopics2).values() assertTrue(results.containsKey(longTopicName)) results.get(longTopicName).get() @@ -3971,12 +3970,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * 1. Assume kafka logger == TRACE - * 2. Change kafka.server.ControllerServer logger to INFO - * 3. Unset kafka.server.ControllerServer via AlterConfigOp.OpType.DELETE (resets it to the kafka logger - TRACE) - * 4. Change kafka logger to ERROR - * 5. Ensure the kafka.server.ControllerServer logger's level is ERROR (the current kafka logger level) - */ + * 1. Assume kafka logger == TRACE + * 2. Change kafka.server.ControllerServer logger to INFO + * 3. Unset kafka.server.ControllerServer via AlterConfigOp.OpType.DELETE (resets it to the kafka logger - TRACE) + * 4. Change kafka logger to ERROR + * 5. Ensure the kafka.server.ControllerServer logger's level is ERROR (the current kafka logger level) + */ @Test def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit = { client = createAdminClient @@ -4402,9 +4401,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) - def testDescribeStreamsGroups(groupProtocol: String): Unit = { + @Test + def testDescribeStreamsGroups(): Unit = { val streamsGroupId = "stream_group_id" val testTopicName = "test_topic" val testOutputTopicName = "test_output_topic" @@ -4418,23 +4416,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val streamsConfig = new Properties() streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) val streams = createStreamsGroup( configOverrides = streamsConfig, inputTopic = testTopicName, outputTopic = testOutputTopicName, - streamsGroupId = streamsGroupId, - groupProtocol = groupProtocol + streamsGroupId = streamsGroupId ) try { - streams.cleanUp() - streams.start() - - TestUtils.waitUntilTrue(() => streams.state() == KafkaStreams.State.RUNNING, "Streams not in RUNNING state") - TestUtils.waitUntilTrue(() => { val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null) + print("Group state:",firstGroup.groupState()) firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId }, "Stream group not stable yet") @@ -4463,111 +4455,110 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) - def testDeleteStreamsGroups(groupProtocol: String): Unit = { - val testTopicName = "test_topic" - val testOutputTopicName = "test_output_topic" - val testNumPartitions = 3 - val testNumStreamsGroup = 3 - - val targetDeletedGroups = util.List.of("stream_group_id_2", "stream_group_id_3") - val targetRemainingGroups = util.List.of("stream_group_id_1") - - val config = createConfig - client = Admin.create(config) - - prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions) - prepareRecords(testTopicName) - - val streamsList = scala.collection.mutable.ListBuffer[(String, KafkaStreams)]() - - try { - for (i <- 1 to testNumStreamsGroup) { - val streamsGroupId = s"stream_group_id_$i" - val streamsConfig = new Properties() - streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) - val streams = createStreamsGroupWithAggregation( - configOverrides = streamsConfig, - inputTopic = testTopicName, - outputTopic = testOutputTopicName, - streamsGroupId = streamsGroupId, - groupProtocol = groupProtocol - ) - streams.cleanUp() - streams.start() - streamsList += ((streamsGroupId, streams)) - } - - TestUtils.waitUntilTrue(() => { - val groups = client.listGroups().all().get() - groups.stream() - .anyMatch(g => g.groupId().startsWith("stream_group_id_")) && testNumStreamsGroup == groups.size() - }, "Streams groups not ready to delete yet") - - // Test deletion of non-empty existing groups - var deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) - assertFutureThrows(classOf[GroupNotEmptyException], deleteStreamsGroupResult.all()) - assertEquals(deleteStreamsGroupResult.deletedGroups().size(),2) - - // Stop and clean up the streams for the groups that are going to be deleted - streamsList - .filter { case (groupId, _) => targetDeletedGroups.contains(groupId) } - .foreach { case (_, streams) => - streams.close(java.time.Duration.ofSeconds(10)) - streams.cleanUp() - } - - var listTopicResult = client.listTopics() - assertEquals(5, listTopicResult.names().get().size())// input + output topic + 3 internal topics - - // Test deletion of emptied existing streams groups - deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) - assertEquals(2, deleteStreamsGroupResult.deletedGroups().size()) - - // Wait for the deleted groups to be removed - TestUtils.waitUntilTrue(() => { - val groupIds = client.listGroups().all().get().asScala.map(_.groupId()).toSet - targetDeletedGroups.asScala.forall(id => !groupIds.contains(id)) - }, "Deleted groups not yet deleted") - - // Verify that the deleted groups are no longer present - val remainingGroups = client.listGroups().all().get() - assertEquals(targetRemainingGroups.size(), remainingGroups.size()) - remainingGroups.stream().forEach(g => { - assertTrue(targetRemainingGroups.contains(g.groupId())) - }) - - // Verify internal topics persists after deletion of corresponding streams groups - listTopicResult = client.listTopics() - assertEquals(5, listTopicResult.names().get().size())// input + output topic + 3 internal topics - - // Test deletion of a non-existing group - val nonExistingGroup = "non_existing_stream_group" - val deleteNonExistingGroupResult = client.deleteStreamsGroups(util.List.of(nonExistingGroup)) - assertFutureThrows(classOf[GroupIdNotFoundException], deleteNonExistingGroupResult.all()) - assertEquals(deleteNonExistingGroupResult.deletedGroups().size(), 1) - - } finally{ - streamsList.foreach { case (_, streams) => - streams.close(java.time.Duration.ofSeconds(10)) - streams.cleanUp() - } - Utils.closeQuietly(client, "adminClient") - } - } +// @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) +// @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) +// def testDeleteStreamsGroups(groupProtocol: String): Unit = { +// val testTopicName = "test_topic" +// val testOutputTopicName = "test_output_topic" +// val testNumPartitions = 3 +// val testNumStreamsGroup = 3 +// +// val targetDeletedGroups = util.List.of("stream_group_id_2", "stream_group_id_3") +// val targetRemainingGroups = util.List.of("stream_group_id_1") +// +// val config = createConfig +// client = Admin.create(config) +// +// prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions) +// prepareRecords(testTopicName) +// +// val streamsList = scala.collection.mutable.ListBuffer[(String, KafkaStreams)]() +// +// try { +// for (i <- 1 to testNumStreamsGroup) { +// val streamsGroupId = s"stream_group_id_$i" +// val streamsConfig = new Properties() +// streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") +// streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) +// val streams = createStreamsGroupWithAggregation( +// configOverrides = streamsConfig, +// inputTopic = testTopicName, +// outputTopic = testOutputTopicName, +// streamsGroupId = streamsGroupId, +// groupProtocol = groupProtocol +// ) +// +// streamsList += ((streamsGroupId, streams)) +// } +// +// TestUtils.waitUntilTrue(() => { +// val groups = client.listGroups().all().get() +// groups.stream() +// .anyMatch(g => g.groupId().startsWith("stream_group_id_")) && testNumStreamsGroup == groups.size() +// }, "Streams groups not ready to delete yet") +// +// // Test deletion of non-empty existing groups +// var deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) +// assertFutureThrows(classOf[GroupNotEmptyException], deleteStreamsGroupResult.all()) +// assertEquals(deleteStreamsGroupResult.deletedGroups().size(),2) +// +// // Stop and clean up the streams for the groups that are going to be deleted +// streamsList +// .filter { case (groupId, _) => targetDeletedGroups.contains(groupId) } +// .foreach { case (_, streams) => +// streams.close(java.time.Duration.ofSeconds(10)) +// streams.cleanUp() +// } +// +// var listTopicResult = client.listTopics() +// assertEquals(5, listTopicResult.names().get().size())// input + output topic + 3 internal topics +// +// // Test deletion of emptied existing streams groups +// deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) +// assertEquals(2, deleteStreamsGroupResult.deletedGroups().size()) +// +// // Wait for the deleted groups to be removed +// TestUtils.waitUntilTrue(() => { +// val groupIds = client.listGroups().all().get().asScala.map(_.groupId()).toSet +// targetDeletedGroups.asScala.forall(id => !groupIds.contains(id)) +// }, "Deleted groups not yet deleted") +// +// // Verify that the deleted groups are no longer present +// val remainingGroups = client.listGroups().all().get() +// assertEquals(targetRemainingGroups.size(), remainingGroups.size()) +// remainingGroups.stream().forEach(g => { +// assertTrue(targetRemainingGroups.contains(g.groupId())) +// }) +// +// // Verify internal topics persists after deletion of corresponding streams groups +// listTopicResult = client.listTopics() +// assertEquals(5, listTopicResult.names().get().size())// input + output topic + 3 internal topics +// +// // Test deletion of a non-existing group +// val nonExistingGroup = "non_existing_stream_group" +// val deleteNonExistingGroupResult = client.deleteStreamsGroups(util.List.of(nonExistingGroup)) +// assertFutureThrows(classOf[GroupIdNotFoundException], deleteNonExistingGroupResult.all()) +// assertEquals(deleteNonExistingGroupResult.deletedGroups().size(), 1) +// +// } finally{ +// streamsList.foreach { case (_, streams) => +// streams.close(java.time.Duration.ofSeconds(10)) +// streams.cleanUp() +// } +// Utils.closeQuietly(client, "adminClient") +// } +// } } object PlaintextAdminIntegrationTest { def checkValidAlterConfigs( - admin: Admin, - test: KafkaServerTestHarness, - topicResource1: ConfigResource, - topicResource2: ConfigResource, - maxMessageBytes: String, - retentionMs: String): Unit = { + admin: Admin, + test: KafkaServerTestHarness, + topicResource1: ConfigResource, + topicResource2: ConfigResource, + maxMessageBytes: String, + retentionMs: String): Unit = { // Alter topics val alterConfigs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() alterConfigs.put(topicResource1, util.List.of(new AlterConfigOp(new ConfigEntry(TopicConfig.FLUSH_MS_CONFIG, "1000"), OpType.SET))) @@ -4615,9 +4606,9 @@ object PlaintextAdminIntegrationTest { } def checkInvalidAlterConfigs( - test: KafkaServerTestHarness, - admin: Admin - ): Unit = { + test: KafkaServerTestHarness, + admin: Admin + ): Unit = { // Create topics val topic1 = "invalid-alter-configs-topic-1" val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) @@ -4688,4 +4679,4 @@ object PlaintextAdminIntegrationTest { assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value) } -} +} \ No newline at end of file