Skip to content

Commit 45a39f2

Browse files
committed
Added new class with 'use-case' testing.
- Added GlobalKTables as the main data structure to store the keys and topics for each gateway. - Added SerDe (Serializer and Deserializer) to create a local custom data structure to facilitate searching for data (will transform in hashmap/hashset in the future) that is shared by all the local streams instances appointed to it.
1 parent 94f496a commit 45a39f2

File tree

5 files changed

+273
-60
lines changed

5 files changed

+273
-60
lines changed
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
package com.isel.kafkastreamsmoduledemo.kafkaStreams
1+
package com.isel.kafkastreamsmoduledemo.kafkaStreamsExperimentations
22

3-
import com.isel.kafkastreamsmoduledemo.utiils.KafkaStreamsUtils
4-
import com.isel.kafkastreamsmoduledemo.utiils.KafkaStreamsUtils.Companion.KEY_FILTER_STORE
5-
import com.isel.kafkastreamsmoduledemo.utiils.KafkaStreamsUtils.Companion.KEY_FILTER_TOPIC
6-
import com.isel.kafkastreamsmoduledemo.utiils.TopicKeys
7-
import com.isel.kafkastreamsmoduledemo.utiils.TopicKeysArraySerDe
3+
import com.isel.kafkastreamsmoduledemo.utils.KafkaStreamsUtils
4+
import com.isel.kafkastreamsmoduledemo.utils.KafkaStreamsUtils.Companion.KEY_FILTER_STORE
5+
import com.isel.kafkastreamsmoduledemo.utils.KafkaStreamsUtils.Companion.KEY_FILTER_TOPIC
6+
import com.isel.kafkastreamsmoduledemo.utils.TopicKeys
7+
import com.isel.kafkastreamsmoduledemo.utils.TopicKeysArraySerDe
88
import org.apache.kafka.clients.consumer.KafkaConsumer
99
import org.apache.kafka.clients.producer.KafkaProducer
1010
import org.apache.kafka.clients.producer.ProducerRecord
Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
1-
package com.isel.kafkastreamsmoduledemo.kafkaStreams
1+
package com.isel.kafkastreamsmoduledemo.kafkaStreamsExperimentations
22

3-
import com.isel.kafkastreamsmoduledemo.utiils.KafkaStreamsUtils
4-
import com.isel.kafkastreamsmoduledemo.utiils.KafkaStreamsUtils.Companion.KEY_FILTER_STORE
5-
import com.isel.kafkastreamsmoduledemo.utiils.TopicKeys
6-
import com.isel.kafkastreamsmoduledemo.utiils.TopicKeysArraySerDe
3+
import com.isel.kafkastreamsmoduledemo.utils.KafkaStreamsUtils
4+
import com.isel.kafkastreamsmoduledemo.utils.KafkaStreamsUtils.Companion.KEY_FILTER_STORE
5+
import com.isel.kafkastreamsmoduledemo.utils.TopicKeys
76
import org.apache.kafka.clients.consumer.ConsumerConfig
87
import org.apache.kafka.clients.consumer.KafkaConsumer
98
import org.apache.kafka.clients.producer.ProducerConfig
109
import org.apache.kafka.common.serialization.*
11-
import org.apache.kafka.common.utils.Bytes
1210
import org.apache.kafka.streams.KafkaStreams
13-
import org.apache.kafka.streams.StoreQueryParameters
1411
import org.apache.kafka.streams.StreamsBuilder
1512
import org.apache.kafka.streams.StreamsConfig
1613
import org.apache.kafka.streams.kstream.*
@@ -19,10 +16,6 @@ import org.apache.kafka.streams.processor.api.Processor
1916
import org.apache.kafka.streams.processor.api.ProcessorContext
2017
import org.apache.kafka.streams.processor.api.Record
2118
import org.apache.kafka.streams.state.KeyValueStore
22-
import org.apache.kafka.streams.state.QueryableStoreType
23-
import org.apache.kafka.streams.state.QueryableStoreTypes
24-
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
25-
import org.apache.kafka.streams.state.ValueAndTimestamp
2619
import org.springframework.beans.factory.annotation.Value
2720
import org.springframework.stereotype.Component
2821
import java.time.Duration
@@ -271,30 +264,7 @@ class KStreamsHandler(
271264
}
272265

273266
final fun loggerConsumer(topics: List<String> = listOf("even", "uneven")) {
274-
275-
if (utils.streamsMap.contains("logger-stream")) {
276-
utils.streamsMap.get("logger-stream")?.close()
277-
utils.streamsMap.remove("logger-stream")
278-
return
279-
}
280-
281-
val props = Properties()
282-
props.put(ConsumerConfig.GROUP_ID_CONFIG, "fu")
283-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
284-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer::class.java.name)
285-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
286-
val consumer: KafkaConsumer<Long, String> = KafkaConsumer(props)
287-
288-
consumer.subscribe(topics)
289-
290-
thread {
291-
while (true) {
292-
consumer.poll(Duration.ofSeconds(5)).forEach { record ->
293-
println("[${System.currentTimeMillis()}] - Consumer key: [${record.key()}] and value[${record.value()}] from topic:[${record.topic()}] and timestamp [${record.timestamp()}]")
294-
}
295-
}
296-
}
297-
267+
utils.loggerConsumer(topics)
298268
}
299269

300270
fun closeConsumerLogger() {
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package com.isel.kafkastreamsmoduledemo.kafkaStreamsExperimentations
2+
3+
import com.isel.kafkastreamsmoduledemo.utils.KafkaStreamsUtils
4+
import com.isel.kafkastreamsmoduledemo.utils.TopicKeys
5+
import com.isel.kafkastreamsmoduledemo.utils.TopicKeysArraySerDe
6+
import org.apache.kafka.clients.producer.KafkaProducer
7+
import org.apache.kafka.clients.producer.ProducerRecord
8+
import org.apache.kafka.common.MetricName
9+
import org.apache.kafka.common.serialization.Serdes
10+
import org.apache.kafka.common.utils.Bytes
11+
import org.apache.kafka.streams.KafkaStreams
12+
import org.apache.kafka.streams.StoreQueryParameters
13+
import org.apache.kafka.streams.StreamsBuilder
14+
import org.apache.kafka.streams.kstream.Consumed
15+
import org.apache.kafka.streams.kstream.GlobalKTable
16+
import org.apache.kafka.streams.kstream.KStream
17+
import org.apache.kafka.streams.kstream.Materialized
18+
import org.apache.kafka.streams.state.KeyValueStore
19+
import org.apache.kafka.streams.state.QueryableStoreTypes
20+
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
21+
import org.apache.kafka.streams.state.ValueAndTimestamp
22+
import org.springframework.stereotype.Component
23+
24+
@Component
25+
class UseCase(
26+
val utils: KafkaStreamsUtils,
27+
val kStreamsHandler: KStreamsHandler,
28+
val globalKTables: GlobalKTables
29+
) {
30+
private val keyStorage: ReadOnlyKeyValueStore<String, ValueAndTimestamp<Array<TopicKeys>>> = startStore()
31+
companion object {
32+
private val STORE_TOPIC: String = "store-topic"
33+
private val STORE_NAME: String = "store-name"
34+
}
35+
36+
fun populateStore() {
37+
val producer: KafkaProducer<String, Array<TopicKeys>> = KafkaProducer(utils.getProducerDefaultProperties())
38+
39+
producer.send(ProducerRecord(
40+
STORE_TOPIC, "gateway-a", arrayOf(
41+
TopicKeys("use-case-a", arrayOf(
42+
1L, 0L
43+
)),
44+
TopicKeys("use-case-b", arrayOf(
45+
10L, 0L
46+
))
47+
)))
48+
producer.send(ProducerRecord(
49+
STORE_TOPIC, "gateway-b", arrayOf(
50+
TopicKeys("use-case-a", arrayOf(
51+
2L, 0L
52+
))
53+
)))
54+
producer.send(ProducerRecord(
55+
STORE_TOPIC, "gateway-c", arrayOf(
56+
TopicKeys("use-case-a", arrayOf(
57+
3L, 300L
58+
)),
59+
TopicKeys("use-case-b", arrayOf(
60+
30L, 300L
61+
))
62+
)))
63+
}
64+
65+
/**
66+
* Creates or accesses the GlobalKTable responsible for storing the keys and topics to know to which gateway topic it goes.
67+
*
68+
*/
69+
final fun startStore(): ReadOnlyKeyValueStore<String, ValueAndTimestamp<Array<TopicKeys>>> {
70+
loggerConsumer()
71+
val builder = StreamsBuilder()
72+
73+
val gatewaysKeys: GlobalKTable<String, Array<TopicKeys>> = builder.globalTable(
74+
STORE_TOPIC,
75+
Materialized.`as`<String, Array<TopicKeys>, KeyValueStore<Bytes, ByteArray>>(STORE_NAME)
76+
.withKeySerde(Serdes.String())
77+
.withValueSerde(TopicKeysArraySerDe())
78+
)
79+
80+
val stream = KafkaStreams(builder.build(), utils.getStreamDefaultProperties())
81+
utils.streamsMap.put(STORE_TOPIC,stream)
82+
83+
stream.cleanUp()
84+
stream.start()
85+
86+
// Creates a local state store (a file/db) in the current machine. Also good for querying.
87+
val store: ReadOnlyKeyValueStore<String, ValueAndTimestamp<Array<TopicKeys>>> = stream.store(
88+
StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore())
89+
)
90+
91+
store.all().forEach { record ->
92+
println("record key[${record.key}]")
93+
println("record values:")
94+
for (topicKeys in record.value.value()) {
95+
println("TOPIC=[${topicKeys.topic}]")
96+
for (key in topicKeys.keys) {
97+
println("key=[$key]")
98+
}
99+
}
100+
println("----------------------------")
101+
}
102+
103+
104+
Runtime.getRuntime().addShutdownHook(Thread(stream::close))
105+
106+
return store
107+
}
108+
109+
fun startStreamUsingStore(inputTopic: String) {
110+
val builder = StreamsBuilder()
111+
val inputStream: KStream<Long, String> = builder.stream(inputTopic, Consumed.with(Serdes.Long(), Serdes.String()))
112+
113+
for (gatewayEntry in keyStorage.all().iterator()) {
114+
println("${KafkaStreamsUtils.YELLOW_TEXT}************************************************************************************************************************${KafkaStreamsUtils.RESET_TEXT_COLOR}")
115+
KafkaStreamsUtils.printlnBetweenColoredLines("gateway entry key[${gatewayEntry.key}]", KafkaStreamsUtils.PURPLE_TEXT)
116+
println("${KafkaStreamsUtils.YELLOW_TEXT}************************************************************************************************************************${KafkaStreamsUtils.RESET_TEXT_COLOR}")
117+
118+
inputStream.filter {key, value -> isKeyForGateway(key, gatewayEntry.key, inputTopic)}.to(gatewayEntry.key)
119+
}
120+
121+
val stream = KafkaStreams(builder.build(), utils.getStreamDefaultProperties())
122+
utils.streamsMap.put(inputTopic ,stream)
123+
124+
stream.start()
125+
126+
KafkaStreamsUtils.getTopologyMetrics(stream)
127+
128+
Runtime.getRuntime().addShutdownHook(Thread(stream::close))
129+
}
130+
131+
private fun isKeyForGateway(key: Long, gateway: String, inputTopic: String): Boolean {
132+
KafkaStreamsUtils.printlnBetweenColoredLines("isKeyForGateway key[$key] gateway[$gateway] inputTopic[$inputTopic]", KafkaStreamsUtils.PURPLE_TEXT)
133+
for (topicKeys in keyStorage.get(gateway).value()) {
134+
if (topicKeys.topic == inputTopic) {
135+
for (topicKey in topicKeys.keys) {
136+
if (topicKey == key) {
137+
return true
138+
}
139+
}
140+
}
141+
}
142+
return false
143+
}
144+
145+
fun checkStreamMetadata(topic: String) {
146+
val stream: KafkaStreams? = utils.streamsMap.get(topic)
147+
if (stream == null) {
148+
println("checkStreamMetadata for stream of topic [$topic] failed.")
149+
return
150+
}
151+
checkStreamMetadata(stream)
152+
}
153+
154+
fun checkStreamMetadata(stream: KafkaStreams) {
155+
156+
stream.metrics().forEach {
157+
println("metric name [${it.key}]")
158+
println("metric value name [${it.value.metricName().name()}], group [${it.value.metricName().group()}], description [${it.value.metricName().description()}] and tags [${it.value.metricName().tags().toString()}]")
159+
println("metric value:")
160+
println(it.value.metricValue().toString())
161+
}
162+
163+
}
164+
165+
/**
166+
* Function responsible for logging to the console the received records
167+
* in the output topics (for testing purposes).
168+
*/
169+
fun loggerConsumer() {
170+
utils.loggerConsumer(listOf("gateway-a", "gateway-b", "gateway-c"))
171+
}
172+
173+
174+
}

code/kafka-streams-module-demo/src/main/kotlin/com/isel/kafkastreamsmoduledemo/rest/Controller.kt

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,37 @@
11
package com.isel.kafkastreamsmoduledemo.rest
22

3-
import com.isel.kafkastreamsmoduledemo.kafkaStreams.GlobalKTables
4-
import com.isel.kafkastreamsmoduledemo.kafkaStreams.KStreamsHandler
5-
import com.isel.kafkastreamsmoduledemo.utiils.KafkaStreamsUtils.Companion.DEFAULT_STREAM_ID
6-
import org.apache.kafka.streams.Topology
3+
import com.isel.kafkastreamsmoduledemo.kafkaStreamsExperimentations.GlobalKTables
4+
import com.isel.kafkastreamsmoduledemo.kafkaStreamsExperimentations.KStreamsHandler
5+
import com.isel.kafkastreamsmoduledemo.kafkaStreamsExperimentations.UseCase
6+
import com.isel.kafkastreamsmoduledemo.utils.KafkaStreamsUtils.Companion.DEFAULT_STREAM_ID
77
import org.springframework.web.bind.annotation.*
8-
import java.lang.Long
9-
import java.lang.Long.parseLong
108

119
@RestController
12-
class Controller(private val streamsHandler: KStreamsHandler, private val globalKTables: GlobalKTables) {
10+
class Controller(
11+
private val streamsHandler: KStreamsHandler,
12+
private val globalKTables: GlobalKTables,
13+
private val useCase: UseCase
14+
) {
1315

1416
@GetMapping("/test")
1517
fun testEndpoint(): String {
1618
println("this is the controller")
1719
return "yes, hello"
1820
}
1921

22+
@PostMapping("/use-case-start")
23+
fun useCaseStart() {
24+
println("Starting use case")
25+
useCase.startStreamUsingStore("use-case-a")
26+
useCase.startStreamUsingStore("use-case-b")
27+
println("Ending use case")
28+
}
29+
30+
@PostMapping("/use-case-stream-metrics")
31+
fun useCaseStreamMetrics() {
32+
33+
}
34+
2035
@PostMapping("/start-stream-evens/{id}")
2136
fun startStreamEvens(@PathVariable id: String): String {
2237
return streamsHandler.startStreaming(id)
@@ -80,4 +95,6 @@ class Controller(private val streamsHandler: KStreamsHandler, private val global
8095
globalKTables.printGlobalTable()
8196
}
8297

98+
99+
83100
}

0 commit comments

Comments
 (0)