diff --git a/build.gradle b/build.gradle index 2b4f1294e9c02..a900094251ecf 100644 --- a/build.gradle +++ b/build.gradle @@ -1038,6 +1038,8 @@ project(':core') { testImplementation project(':server-common').sourceSets.test.output testImplementation project(':storage:storage-api').sourceSets.test.output testImplementation project(':server').sourceSets.test.output + testImplementation project(':streams').sourceSets.test.output + testImplementation project(':streams') testImplementation project(':test-common:test-common-runtime') testImplementation project(':test-common:test-common-internal-api') testImplementation project(':test-common:test-common-util') @@ -2271,6 +2273,7 @@ project(':storage') { testImplementation project(':server') testImplementation project(':server-common') testImplementation project(':server-common').sourceSets.test.output + testImplementation project(':streams') testImplementation project(':transaction-coordinator') testImplementation libs.hamcrest testImplementation libs.jacksonDatabindYaml diff --git a/core/src/test/resources/log4j2.yaml b/core/src/test/resources/log4j2.yaml index 016a542689b4e..91cd1530ec650 100644 --- a/core/src/test/resources/log4j2.yaml +++ b/core/src/test/resources/log4j2.yaml @@ -36,3 +36,6 @@ Configuration: - name: org.apache.kafka level: WARN + + - name: org.apache.kafka.coordinator.group + level: INFO diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 7c08dd9c3fe7d..07ab9612dc668 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -22,23 +22,26 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume import kafka.utils.TestUtils import kafka.utils.Implicits._ -import java.util.{Optional, Properties} +import java.util +import java.util.{Optional, Properties, UUID} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness import kafka.security.JaasTestUtils import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} -import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData} +import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData, StreamsRebalanceListener} import org.apache.kafka.common.network.{ConnectionMode, ListenerName} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer} import org.apache.kafka.common.utils.Utils import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.MetadataLogConfig import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs} -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} + +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import scala.collection.mutable import scala.collection.Seq +import scala.jdk.CollectionConverters._ import scala.jdk.javaapi.OptionConverters /** @@ -57,6 +60,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { val serverConfig = new Properties val controllerConfig = new Properties + private val consumers = mutable.Buffer[Consumer[_, _]]() private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]() private val streamsConsumers = mutable.Buffer[Consumer[_, _]]() @@ -235,6 +239,97 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { streamsConsumer } + def createStreamsGroup[K, V](configOverrides: Properties = new Properties, + configsToRemove: List[String] = List(), + inputTopic: String, + outputTopic: String, + streamsGroupId: String): AsyncKafkaConsumer[K, V] = { + val props = new Properties() + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + props ++= configOverrides + configsToRemove.foreach(props.remove(_)) + + val streamsRebalanceData = new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + util.Map.of( + "subtopology-0", new StreamsRebalanceData.Subtopology( + util.Set.of(inputTopic), + util.Set.of(), + util.Map.of(), + util.Map.of(), + util.Set.of() + )), + Map.empty[String, String].asJava + ) + + val consumer = createStreamsConsumer( + keyDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[K]], + valueDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[V]], + configOverrides = props, + streamsRebalanceData = streamsRebalanceData + ) + consumer.subscribe(util.Set.of(inputTopic, outputTopic), + new StreamsRebalanceListener { + override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] = + Optional.empty() + override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] = { + Optional.empty() + } + override def onAllTasksLost(): Optional[Exception] = + Optional.empty() + }) + + consumer.poll(Duration.ofMillis(500L)) + consumer + } + + + def createStreamsGroupWithAggregation[K, V](configOverrides: Properties = new Properties, + configsToRemove: List[String] = List(), + inputTopic: String, + outputTopic: String, + streamsGroupId: String, + groupProtocol: String): AsyncKafkaConsumer[K, V] = { + val props = new Properties() + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId) + props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName) + props ++= configOverrides + configsToRemove.foreach(props.remove(_)) + + val streamsRebalanceData = new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + util.Map.of( + "subtopology-0", new StreamsRebalanceData.Subtopology( + util.Set.of(inputTopic), + util.Set.of(), + util.Map.of(), + util.Map.of(outputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())), // state changelog topics for aggregation + util.Set.of() + )), + Map.empty[String, String].asJava + ) + + val consumer = createStreamsConsumer( + keyDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[K]], + valueDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[V]], + configOverrides = props, + streamsRebalanceData = streamsRebalanceData + ) + consumer.subscribe(util.Set.of(inputTopic)) + consumer.poll(Duration.ofMillis(500L)) + consumer + } + def createAdminClient( listenerName: ListenerName = listenerName, configOverrides: Properties = new Properties diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 44835885e0c34..6c1365cd2c51d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -56,6 +56,8 @@ import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVe import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} +//import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} +//import org.apache.kafka.streams.GroupProtocol.STREAMS import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows} import org.apache.logging.log4j.core.config.Configurator import org.junit.jupiter.api.Assertions._ @@ -177,7 +179,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { //null can create default quota val userDefaultEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> Option.empty[String].orNull).asJava) client.alterClientQuotas(util.List.of(new ClientQuotaAlteration(userDefaultEntity, util.Set.of( - new ClientQuotaAlteration.Op("consumer_byte_rate", 100D))))).all().get() + new ClientQuotaAlteration.Op("consumer_byte_rate", 100D))))).all().get() val clientDefaultEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> Option.empty[String].orNull).asJava) client.alterClientQuotas(util.List.of(new ClientQuotaAlteration(clientDefaultEntity, util.Set.of( new ClientQuotaAlteration.Op("producer_byte_rate", 100D))))).all().get() @@ -527,7 +529,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(3, client.listTransactions().all().get().size()) assertEquals(2, client.listTransactions(new ListTransactionsOptions() - .filterStates(util.List.of(TransactionState.COMPLETE_COMMIT))).all().get().size()) + .filterStates(util.List.of(TransactionState.COMPLETE_COMMIT))).all().get().size()) assertEquals(1, client.listTransactions(new ListTransactionsOptions() .filterStates(util.List.of(TransactionState.COMPLETE_ABORT))).all().get().size()) assertEquals(1, client.listTransactions(new ListTransactionsOptions() @@ -766,8 +768,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * describe should not auto create topics - */ + * describe should not auto create topics + */ @Test def testDescribeNonExistingTopic(): Unit = { client = createAdminClient @@ -1272,7 +1274,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { () => s"$desc: Expect InvalidPartitionsException when #brokers != replication factor") assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) exceptionMsgStr = "The manual partition assignment includes a partition with 2 replica(s), but this is not " + - "consistent with previous partitions, which have 1 replica(s)." + "consistent with previous partitions, which have 1 replica(s)." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1313,7 +1315,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { () => s"$desc: Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists") assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException], desc) exceptionMsgStr = "The manual partition assignment includes a partition with 2 replica(s), but this is not " + - "consistent with previous partitions, which have 1 replica(s)." + "consistent with previous partitions, which have 1 replica(s)." assertEquals(exceptionMsgStr, e.getCause.getMessage, desc) assertEquals(3, numPartitions(topic1, expectedNumPartitionsOpt = Some(3)), desc) @@ -1429,7 +1431,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } catch { case e: ExecutionException if e.getCause.isInstanceOf[LeaderNotAvailableException] || e.getCause.isInstanceOf[NotLeaderOrFollowerException] => false - } + } }, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}") } @@ -1461,7 +1463,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = { TestUtils.waitUntilTrue(() => brokers(followerIndex).replicaManager.localLog(topicPartition).isDefined, - "Expected follower to create replica for partition") + "Expected follower to create replica for partition") // wait until the follower discovers that log start offset moved beyond its HW TestUtils.waitUntilTrue(() => { @@ -1749,9 +1751,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, - * since they can be done within the timeout. New calls should receive exceptions. - */ + * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, + * since they can be done within the timeout. New calls should receive exceptions. + */ @Test def testDelayedClose(): Unit = { client = createAdminClient @@ -1766,9 +1768,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long - * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses. - */ + * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long + * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses. + */ @Test def testForceClose(): Unit = { val config = createConfig @@ -1783,9 +1785,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * Check that a call with a timeout does not complete before the minimum timeout has elapsed, - * even when the default request timeout is shorter. - */ + * Check that a call with a timeout does not complete before the minimum timeout has elapsed, + * even when the default request timeout is shorter. + */ @Test def testMinimumRequestTimeouts(): Unit = { val config = createConfig @@ -1801,8 +1803,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * Test injecting timeouts for calls that are in flight. - */ + * Test injecting timeouts for calls that are in flight. + */ @Test def testCallInFlightTimeouts(): Unit = { val config = createConfig @@ -1811,7 +1813,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory() client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory) val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, - new CreateTopicsOptions().validateOnly(true)).all() + new CreateTopicsOptions().validateOnly(true)).all() assertFutureThrows(classOf[TimeoutException], future) val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new NewTopic(_, 1, 1.toShort)).asJava, new CreateTopicsOptions().validateOnly(true)).all() @@ -2573,7 +2575,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val consumerGroupId = "consumer_group_id" val shareGroupId = "share_group_id" val simpleGroupId = "simple_group_id" + val streamsGroupId = "streams_group_id" val testTopicName = "test_topic" + val testStreamsOutputTopicName = "test_streams_output_topic" consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name) val classicGroupConfig = new Properties(consumerConfig) @@ -2589,12 +2593,25 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId) val shareGroup = createShareConsumer(configOverrides = shareGroupConfig) + val streamsConfig = new Properties() + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") +// streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) + + val streamsGroup = createStreamsGroup( + configOverrides = streamsConfig, + inputTopic = testTopicName, + outputTopic = testStreamsOutputTopicName, + streamsGroupId = streamsGroupId + ) + val config = createConfig client = Admin.create(config) try { client.createTopics(util.Set.of( - new NewTopic(testTopicName, 1, 1.toShort) + new NewTopic(testTopicName, 1, 1.toShort), + new NewTopic(testStreamsOutputTopicName, 1, 1.toShort) )).all().get() + waitForTopics(client, List(testTopicName), List()) val topicPartition = new TopicPartition(testTopicName, 0) @@ -2604,6 +2621,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { consumerGroup.poll(JDuration.ofMillis(1000)) shareGroup.subscribe(util.Set.of(testTopicName)) shareGroup.poll(JDuration.ofMillis(1000)) +// streamsGroup.start() val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(simpleGroupId, util.Map.of(topicPartition, new OffsetAndMetadata(0L))) @@ -2612,18 +2630,27 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitUntilTrue(() => { val groups = client.listGroups().all().get() - groups.size() == 4 + groups.size() == 5 }, "Expected to find all groups") val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)) val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)) val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY)) + // Streams group could either be in STABLE or NOT_READY state + val streamsGroupListingStable = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)) + val streamsGroupListingNotReady = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY)) var listGroupsResult = client.listGroups() assertTrue(listGroupsResult.errors().get().isEmpty) - assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.all().get().asScala.toSet) - assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.valid().get().asScala.toSet) + + val expectedStreamListings = Set(streamsGroupListingStable, streamsGroupListingNotReady) + val expectedListings = Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing) + val actualListings = listGroupsResult.all().get().asScala.toSet + + // Check that actualListings contains all expectedListings and one of the streams listings + assertTrue(expectedListings.subsetOf(actualListings)) + assertTrue(actualListings.exists(expectedStreamListings.contains)) listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(util.Set.of(GroupType.CLASSIC))) assertTrue(listGroupsResult.errors().get().isEmpty) @@ -2639,11 +2666,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(listGroupsResult.errors().get().isEmpty) assertEquals(Set(shareGroupListing), listGroupsResult.all().get().asScala.toSet) assertEquals(Set(shareGroupListing), listGroupsResult.valid().get().asScala.toSet) + + listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(util.Set.of(GroupType.STREAMS))) + assertTrue(listGroupsResult.errors().get().isEmpty) + assertTrue(listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingStable)) || + listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingNotReady))) + assertTrue(listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingStable)) || + listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingNotReady))) + + } finally { Utils.closeQuietly(classicGroup, "classicGroup") Utils.closeQuietly(consumerGroup, "consumerGroup") Utils.closeQuietly(shareGroup, "shareGroup") Utils.closeQuietly(client, "adminClient") + Utils.closeQuietly(streamsGroup, "streamsGroup") } } @@ -3114,9 +3151,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.assertLeader(client, partition2, 1) def assertUnknownTopicOrPartition( - topicPartition: TopicPartition, - result: ElectLeadersResult - ): Unit = { + topicPartition: TopicPartition, + result: ElectLeadersResult + ): Unit = { val exception = result.partitions.get.get(topicPartition).get assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass) assertEquals(s"No such topic as ${topicPartition.topic()}", exception.getMessage) @@ -3153,9 +3190,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1)) def assertPreferredLeaderNotAvailable( - topicPartition: TopicPartition, - result: ElectLeadersResult - ): Unit = { + topicPartition: TopicPartition, + result: ElectLeadersResult + ): Unit = { val exception = result.partitions.get.get(topicPartition).get assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass) assertTrue(exception.getMessage.contains( @@ -3603,7 +3640,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") client.incrementalAlterConfigs(util.Map.of(broker0Resource, util.List.of(new AlterConfigOp(new ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, "123"), - AlterConfigOp.OpType.SET), + AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, "456"), AlterConfigOp.OpType.SET) ))).all().get() @@ -3809,7 +3846,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val longTopicName = String.join("", Collections.nCopies(249, "x")) val invalidTopicName = String.join("", Collections.nCopies(250, "x")) val newTopics2 = util.List.of(new NewTopic(invalidTopicName, 3, 3.toShort), - new NewTopic(longTopicName, 3, 3.toShort)) + new NewTopic(longTopicName, 3, 3.toShort)) val results = client.createTopics(newTopics2).values() assertTrue(results.containsKey(longTopicName)) results.get(longTopicName).get() @@ -3933,12 +3970,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** - * 1. Assume kafka logger == TRACE - * 2. Change kafka.server.ControllerServer logger to INFO - * 3. Unset kafka.server.ControllerServer via AlterConfigOp.OpType.DELETE (resets it to the kafka logger - TRACE) - * 4. Change kafka logger to ERROR - * 5. Ensure the kafka.server.ControllerServer logger's level is ERROR (the current kafka logger level) - */ + * 1. Assume kafka logger == TRACE + * 2. Change kafka.server.ControllerServer logger to INFO + * 3. Unset kafka.server.ControllerServer via AlterConfigOp.OpType.DELETE (resets it to the kafka logger - TRACE) + * 4. Change kafka logger to ERROR + * 5. Ensure the kafka.server.ControllerServer logger's level is ERROR (the current kafka logger level) + */ @Test def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit = { client = createAdminClient @@ -4363,17 +4400,165 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } } + + @Test + def testDescribeStreamsGroups(): Unit = { + val streamsGroupId = "stream_group_id" + val testTopicName = "test_topic" + val testOutputTopicName = "test_output_topic" + val testNumPartitions = 1 + + val config = createConfig + client = Admin.create(config) + + prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions) + prepareRecords(testTopicName) + + val streamsConfig = new Properties() + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + val streams = createStreamsGroup( + configOverrides = streamsConfig, + inputTopic = testTopicName, + outputTopic = testOutputTopicName, + streamsGroupId = streamsGroupId + ) + + try { + TestUtils.waitUntilTrue(() => { + val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null) + print("Group state:",firstGroup.groupState()) + firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId + }, "Stream group not stable yet") + + // Verify the describe call works correctly + val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() + val group = describedGroups.get(streamsGroupId) + assertNotNull(group) + assertEquals(streamsGroupId, group.groupId()) + assertFalse(group.members().isEmpty) + assertNotNull(group.subtopologies()) + assertFalse(group.subtopologies().isEmpty) + + // Verify the topology contains the expected source and sink topics + val subtopologies = group.subtopologies().asScala + assertTrue(subtopologies.exists(subtopology => + subtopology.sourceTopics().contains(testTopicName))) + + // Test describing a non-existing group + val nonExistingGroup = "non_existing_stream_group" + val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup)) + assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all()) + + } finally { + Utils.closeQuietly(streams, "streams") + Utils.closeQuietly(client, "adminClient") + } + } + +// @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) +// @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) +// def testDeleteStreamsGroups(groupProtocol: String): Unit = { +// val testTopicName = "test_topic" +// val testOutputTopicName = "test_output_topic" +// val testNumPartitions = 3 +// val testNumStreamsGroup = 3 +// +// val targetDeletedGroups = util.List.of("stream_group_id_2", "stream_group_id_3") +// val targetRemainingGroups = util.List.of("stream_group_id_1") +// +// val config = createConfig +// client = Admin.create(config) +// +// prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions) +// prepareRecords(testTopicName) +// +// val streamsList = scala.collection.mutable.ListBuffer[(String, KafkaStreams)]() +// +// try { +// for (i <- 1 to testNumStreamsGroup) { +// val streamsGroupId = s"stream_group_id_$i" +// val streamsConfig = new Properties() +// streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") +// streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) +// val streams = createStreamsGroupWithAggregation( +// configOverrides = streamsConfig, +// inputTopic = testTopicName, +// outputTopic = testOutputTopicName, +// streamsGroupId = streamsGroupId, +// groupProtocol = groupProtocol +// ) +// +// streamsList += ((streamsGroupId, streams)) +// } +// +// TestUtils.waitUntilTrue(() => { +// val groups = client.listGroups().all().get() +// groups.stream() +// .anyMatch(g => g.groupId().startsWith("stream_group_id_")) && testNumStreamsGroup == groups.size() +// }, "Streams groups not ready to delete yet") +// +// // Test deletion of non-empty existing groups +// var deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) +// assertFutureThrows(classOf[GroupNotEmptyException], deleteStreamsGroupResult.all()) +// assertEquals(deleteStreamsGroupResult.deletedGroups().size(),2) +// +// // Stop and clean up the streams for the groups that are going to be deleted +// streamsList +// .filter { case (groupId, _) => targetDeletedGroups.contains(groupId) } +// .foreach { case (_, streams) => +// streams.close(java.time.Duration.ofSeconds(10)) +// streams.cleanUp() +// } +// +// var listTopicResult = client.listTopics() +// assertEquals(5, listTopicResult.names().get().size())// input + output topic + 3 internal topics +// +// // Test deletion of emptied existing streams groups +// deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) +// assertEquals(2, deleteStreamsGroupResult.deletedGroups().size()) +// +// // Wait for the deleted groups to be removed +// TestUtils.waitUntilTrue(() => { +// val groupIds = client.listGroups().all().get().asScala.map(_.groupId()).toSet +// targetDeletedGroups.asScala.forall(id => !groupIds.contains(id)) +// }, "Deleted groups not yet deleted") +// +// // Verify that the deleted groups are no longer present +// val remainingGroups = client.listGroups().all().get() +// assertEquals(targetRemainingGroups.size(), remainingGroups.size()) +// remainingGroups.stream().forEach(g => { +// assertTrue(targetRemainingGroups.contains(g.groupId())) +// }) +// +// // Verify internal topics persists after deletion of corresponding streams groups +// listTopicResult = client.listTopics() +// assertEquals(5, listTopicResult.names().get().size())// input + output topic + 3 internal topics +// +// // Test deletion of a non-existing group +// val nonExistingGroup = "non_existing_stream_group" +// val deleteNonExistingGroupResult = client.deleteStreamsGroups(util.List.of(nonExistingGroup)) +// assertFutureThrows(classOf[GroupIdNotFoundException], deleteNonExistingGroupResult.all()) +// assertEquals(deleteNonExistingGroupResult.deletedGroups().size(), 1) +// +// } finally{ +// streamsList.foreach { case (_, streams) => +// streams.close(java.time.Duration.ofSeconds(10)) +// streams.cleanUp() +// } +// Utils.closeQuietly(client, "adminClient") +// } +// } } object PlaintextAdminIntegrationTest { def checkValidAlterConfigs( - admin: Admin, - test: KafkaServerTestHarness, - topicResource1: ConfigResource, - topicResource2: ConfigResource, - maxMessageBytes: String, - retentionMs: String): Unit = { + admin: Admin, + test: KafkaServerTestHarness, + topicResource1: ConfigResource, + topicResource2: ConfigResource, + maxMessageBytes: String, + retentionMs: String): Unit = { // Alter topics val alterConfigs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() alterConfigs.put(topicResource1, util.List.of(new AlterConfigOp(new ConfigEntry(TopicConfig.FLUSH_MS_CONFIG, "1000"), OpType.SET))) @@ -4421,9 +4606,9 @@ object PlaintextAdminIntegrationTest { } def checkInvalidAlterConfigs( - test: KafkaServerTestHarness, - admin: Admin - ): Unit = { + test: KafkaServerTestHarness, + admin: Admin + ): Unit = { // Create topics val topic1 = "invalid-alter-configs-topic-1" val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) @@ -4494,4 +4679,4 @@ object PlaintextAdminIntegrationTest { assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value) } -} +} \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 3d5837b92d0d7..f74ac79932e6f 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -44,6 +44,7 @@ import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion} import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} import org.apache.kafka.server.util.timer.SystemTimer +import org.apache.kafka.streams.GroupProtocol.STREAMS import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo} import org.junit.jupiter.params.provider.Arguments @@ -420,4 +421,11 @@ object QuorumTestHarness { Arguments.of(GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) ) } + + // For tests that only work with the streams group protocol + def getTestGroupProtocolParametersStreamsGroupProtocolOnly: java.util.stream.Stream[Arguments] = { + stream.Stream.of( + Arguments.of(STREAMS.name.toLowerCase(Locale.ROOT)) + ) + } }