1+ package com.isel.kafkastreamsmoduledemo.kafkaStreamsExperimentations
2+
3+ import com.isel.kafkastreamsmoduledemo.utils.KafkaStreamsUtils
4+ import com.isel.kafkastreamsmoduledemo.utils.KafkaStreamsUtils.Companion.KEY_FILTER_STORE
5+ import com.isel.kafkastreamsmoduledemo.utils.KafkaStreamsUtils.Companion.KEY_FILTER_TOPIC
6+ import com.isel.kafkastreamsmoduledemo.utils.TopicKeys
7+ import com.isel.kafkastreamsmoduledemo.utils.TopicKeysArraySerDe
8+ import org.apache.kafka.clients.consumer.KafkaConsumer
9+ import org.apache.kafka.clients.producer.KafkaProducer
10+ import org.apache.kafka.clients.producer.ProducerRecord
11+ import org.apache.kafka.common.serialization.Serdes
12+ import org.apache.kafka.common.utils.Bytes
13+ import org.apache.kafka.streams.KafkaStreams
14+ import org.apache.kafka.streams.StoreQueryParameters
15+ import org.apache.kafka.streams.StreamsBuilder
16+ import org.apache.kafka.streams.kstream.Consumed
17+ import org.apache.kafka.streams.kstream.GlobalKTable
18+ import org.apache.kafka.streams.kstream.KStream
19+ import org.apache.kafka.streams.kstream.Materialized
20+ import org.apache.kafka.streams.state.KeyValueStore
21+ import org.apache.kafka.streams.state.QueryableStoreTypes
22+ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
23+ import org.apache.kafka.streams.state.ValueAndTimestamp
24+ import org.springframework.beans.factory.annotation.Value
25+ import org.springframework.stereotype.Component
26+ import java.time.Duration
27+ import kotlin.concurrent.thread
28+
29+ @Component
30+ class GlobalKTables (
31+ @Value(" \$ {spring.kafka.bootstrap-servers}" )
32+ private val bootstrapServers : String , // TODO: maybe change to db or kafka topic
33+ private val utils : KafkaStreamsUtils
34+ ) {
35+ private lateinit var tableStore: ReadOnlyKeyValueStore <String , ValueAndTimestamp <Array <TopicKeys >>>
36+
37+ fun globalTable () {
38+
39+ val builder = StreamsBuilder ()
40+
41+
42+ // Create a global table for gateways topics keys. The data from this global table
43+ // will be fully replicated on each instance of this application.
44+ val gatewaysKeys: GlobalKTable <String , Array <TopicKeys >> = builder.globalTable(
45+ KEY_FILTER_STORE ,
46+ Materialized .`as `<String , Array <TopicKeys >, KeyValueStore <Bytes , ByteArray >>(KEY_FILTER_STORE )
47+ .withKeySerde(Serdes .String ())
48+ .withValueSerde(TopicKeysArraySerDe ())
49+ )
50+
51+ val stream = KafkaStreams (builder.build(), utils.getStreamDefaultProperties())
52+ utils.streamsMap.put(KEY_FILTER_TOPIC ,stream)
53+
54+ stream.start()
55+
56+ Runtime .getRuntime().addShutdownHook(Thread (stream::close))
57+ }
58+
59+ fun accessGlobalTable () {
60+ val builder = StreamsBuilder ()
61+
62+ val inputStream: KStream <String , Array <TopicKeys >> = builder.stream(KEY_FILTER_TOPIC , Consumed .with (Serdes .String (), TopicKeysArraySerDe ()))
63+
64+ inputStream.peek { key, value ->
65+ println (" record key[${key} ]" )
66+ println (" record values:" )
67+ for (topicKeys in value) {
68+ println (" TOPIC=[${topicKeys.topic} ]" )
69+ for (topicKey in topicKeys.keys) {
70+ println (" key=[$topicKey ]" )
71+ }
72+ }
73+ println (" ----------------------------" )
74+ }
75+
76+
77+ val stream = KafkaStreams (builder.build(), utils.getStreamDefaultProperties())
78+ stream.start()
79+ tableStore = stream.store(
80+ StoreQueryParameters .fromNameAndType(KEY_FILTER_STORE , QueryableStoreTypes .timestampedKeyValueStore())
81+ )
82+ }
83+
84+ fun populateGlobalTable () {
85+ val producer: KafkaProducer <String , Array <TopicKeys >> = KafkaProducer (utils.getProducerDefaultProperties())
86+ val value = arrayOf(
87+ TopicKeys (" topicA" , arrayOf(
88+ 3L , 0L
89+ ))
90+ )
91+ producer.send(ProducerRecord (KEY_FILTER_TOPIC , " gateway01" , value))
92+ }
93+
94+ fun consumeSaidTablesTopic () {
95+ val consumer: KafkaConsumer <String , Array <TopicKeys >> = KafkaConsumer (utils.getConsumerDefaultProperties())
96+ consumer.subscribe(arrayListOf (KEY_FILTER_TOPIC ))
97+
98+ thread {
99+ while (true ) {
100+ consumer.poll(Duration .ofSeconds(5 )).forEach { record ->
101+ println (" record key[${record.key()} ]" )
102+ println (" record values:" )
103+ for (topicKeys in record.value()) {
104+ println (" TOPIC=[${topicKeys.topic} ]" )
105+ for (key in topicKeys.keys) {
106+ println (" key=[$key ]" )
107+ }
108+ }
109+ println (" ----------------------------" )
110+ }
111+
112+ }
113+ }
114+ }
115+
116+ fun printGlobalTable () {
117+ tableStore.all().forEach { record ->
118+ println (" record key[${record.key} ]" )
119+ println (" record values:" )
120+ for (topicKeys in record.value.value()) {
121+ println (" TOPIC=[${topicKeys.topic} ]" )
122+ for (key in topicKeys.keys) {
123+ println (" key=[$key ]" )
124+ }
125+ }
126+ println (" ----------------------------" )
127+ }
128+
129+ }
130+ }
0 commit comments