Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,8 @@ project(':core') {
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.test.output
testImplementation project(':server').sourceSets.test.output
testImplementation project(':streams').sourceSets.test.output
testImplementation project(':streams')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-util')
Expand Down Expand Up @@ -2271,6 +2273,7 @@ project(':storage') {
testImplementation project(':server')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':streams')
testImplementation project(':transaction-coordinator')
testImplementation libs.hamcrest
testImplementation libs.jacksonDatabindYaml
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/resources/log4j2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ Configuration:

- name: org.apache.kafka
level: WARN

- name: org.apache.kafka.coordinator.group
level: INFO
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,26 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume
import kafka.utils.TestUtils
import kafka.utils.Implicits._

import java.util.{Optional, Properties}
import java.util
import java.util.{Optional, Properties, UUID}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
import kafka.security.JaasTestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData}
import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData, StreamsRebalanceListener}
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.MetadataLogConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}

import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}

import scala.collection.mutable
import scala.collection.Seq
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters

/**
Expand All @@ -57,6 +60,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
val serverConfig = new Properties
val controllerConfig = new Properties


private val consumers = mutable.Buffer[Consumer[_, _]]()
private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]()
private val streamsConsumers = mutable.Buffer[Consumer[_, _]]()
Expand Down Expand Up @@ -235,6 +239,97 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
streamsConsumer
}

def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
configsToRemove: List[String] = List(),
inputTopic: String,
outputTopic: String,
streamsGroupId: String): AsyncKafkaConsumer[K, V] = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
props ++= configOverrides
configsToRemove.foreach(props.remove(_))

val streamsRebalanceData = new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
util.Map.of(
"subtopology-0", new StreamsRebalanceData.Subtopology(
util.Set.of(inputTopic),
util.Set.of(),
util.Map.of(),
util.Map.of(),
util.Set.of()
)),
Map.empty[String, String].asJava
)

val consumer = createStreamsConsumer(
keyDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[K]],
valueDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[V]],
configOverrides = props,
streamsRebalanceData = streamsRebalanceData
)
consumer.subscribe(util.Set.of(inputTopic, outputTopic),
new StreamsRebalanceListener {
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
Optional.empty()
override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] = {
Optional.empty()
}
override def onAllTasksLost(): Optional[Exception] =
Optional.empty()
})

consumer.poll(Duration.ofMillis(500L))
consumer
}


def createStreamsGroupWithAggregation[K, V](configOverrides: Properties = new Properties,
configsToRemove: List[String] = List(),
inputTopic: String,
outputTopic: String,
streamsGroupId: String,
groupProtocol: String): AsyncKafkaConsumer[K, V] = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId)
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
props ++= configOverrides
configsToRemove.foreach(props.remove(_))

val streamsRebalanceData = new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
util.Map.of(
"subtopology-0", new StreamsRebalanceData.Subtopology(
util.Set.of(inputTopic),
util.Set.of(),
util.Map.of(),
util.Map.of(outputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())), // state changelog topics for aggregation
util.Set.of()
)),
Map.empty[String, String].asJava
)

val consumer = createStreamsConsumer(
keyDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[K]],
valueDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[V]],
configOverrides = props,
streamsRebalanceData = streamsRebalanceData
)
consumer.subscribe(util.Set.of(inputTopic))
consumer.poll(Duration.ofMillis(500L))
consumer
}

def createAdminClient(
listenerName: ListenerName = listenerName,
configOverrides: Properties = new Properties
Expand Down
Loading