11package com.isel.kafkastreamsmoduledemo.kafkaStreams
22
3- import org.apache.kafka.common.serialization.Serdes
3+ import com.fasterxml.jackson.databind.ObjectMapper
4+ import com.isel.kafkastreamsmoduledemo.utiils.TopicKeys
5+ import com.isel.kafkastreamsmoduledemo.utiils.TopicKeysArraySerDe
6+ import org.apache.kafka.clients.consumer.ConsumerConfig
7+ import org.apache.kafka.clients.consumer.KafkaConsumer
8+ import org.apache.kafka.clients.producer.ProducerConfig
9+ import org.apache.kafka.common.errors.SerializationException
10+ import org.apache.kafka.common.serialization.*
11+ import org.apache.kafka.common.utils.Bytes
412import org.apache.kafka.streams.KafkaStreams
513import org.apache.kafka.streams.StreamsBuilder
614import org.apache.kafka.streams.StreamsConfig
715import org.apache.kafka.streams.kstream.Consumed
816import org.apache.kafka.streams.kstream.KStream
917import org.apache.kafka.streams.processor.RecordContext
18+ import org.apache.kafka.streams.processor.To
19+ import org.apache.kafka.streams.processor.api.Processor
20+ import org.apache.kafka.streams.processor.api.ProcessorContext
21+ import org.apache.kafka.streams.processor.api.Record
22+ import org.apache.kafka.streams.state.KeyValueStore
23+ import org.apache.kafka.streams.state.Stores
1024import org.springframework.beans.factory.annotation.Value
1125import org.springframework.stereotype.Component
26+ import java.time.Duration
1227import java.util.*
1328import java.util.concurrent.ConcurrentHashMap
1429
30+
1531@Component
1632class KStreamsHandler (
1733 @Value(" \$ {spring.kafka.bootstrap-servers}" )
1834 private val bootstrapServers : String // TODO: maybe change to db or kafka topic
1935) {
2036
37+ private lateinit var plannerStream: KafkaStreams
38+ private val streamsMap: ConcurrentHashMap <String , KafkaStreams > = ConcurrentHashMap ()
39+ private var testGatewayKeys: ConcurrentHashMap <String , Array <TopicKeys >> = ConcurrentHashMap ()
40+
2141 companion object {
22- private val streamsMap: ConcurrentHashMap <String , KafkaStreams > = ConcurrentHashMap ()
42+ private val KEY_FILTER_STORE = " key-filter-store"
43+ private val KEY_FILTER_TOPIC = " key-filter-topic"
44+ private val DEFAULT_STREAM_ID = " gateway-topics-filter-stream"
45+
46+
47+ // Processor that keeps the global store updated.
48+ private class GlobalStoresUpdater <K , V >(
49+ private val storeName : String
50+ ): Processor<K, V, Void, Void> { // not being used as the stores can only be accessed inside the processors.
51+
52+ private var store: KeyValueStore <K , V >? = null
53+
54+ override fun init (context : ProcessorContext <Void , Void >? ) {
55+ store = context?.getStateStore(storeName)
56+ }
57+
58+ override fun process (record : Record <K , V >? ) {
59+ store!! .put(record!! .key(), record.value())
60+ }
61+
62+ }
63+ }
64+
65+ init {
66+ /* val builder = StreamsBuilder()
67+
68+ builder.addGlobalStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(KEY_FILTER_STORE), Serdes.String(), TopicKeysArraySerDe()),
69+ KEY_FILTER_TOPIC,
70+ Consumed.with(Serdes.String(), TopicKeysArraySerDe())){
71+ GlobalStoresUpdater<String, Bytes>(KEY_FILTER_STORE)
72+ }
73+
74+ val inputStream = builder.stream(KEY_FILTER_TOPIC, Consumed.with(Serdes.String(), TopicKeysArraySerDe()))
75+ inputStream.peek{key, value ->
76+ plannerStream.store(KEY_FILTER_STORE, QueryableStoreTypes.keyValueStore())
77+ }
78+ plannerStream = KafkaStreams(builder.build(), getStreamDefaultProperties())*/
79+ }
80+
81+ class IsNewTopicProcessor : Processor <String , Array <TopicKeys >, String , Array <TopicKeys >> {
82+
83+ var store: KeyValueStore <String , Array <TopicKeys >>? = null
84+ override fun init (context : ProcessorContext <String , Array <TopicKeys >>? ) {
85+ this .store = context?.getStateStore(KEY_FILTER_STORE )
86+
87+ }
88+
89+ // See if received record
90+ override fun process (record : Record <String , Array <TopicKeys >>? ) {
91+
92+ }
93+
94+ }
95+
96+
97+ fun newStreamForTopic (topic : String , streamId : String ) {
98+ println (" startGatewayFiltersStream function beginning" )
99+
100+ val builder = StreamsBuilder ()
101+ val inputStream: KStream <Long , String > = builder.stream(topic, Consumed .with (Serdes .Long (), Serdes .String ()))
102+
103+ // TODO: hardcoded the position 0 as a fix should be thought of to send the record to more than one topic
104+ inputStream.to {key, value, recordContext -> gatewayKeyFilter(key, value, recordContext)[0 ]}
105+
106+ val stream = KafkaStreams (builder.build(), getStreamDefaultProperties())
107+ streamsMap.put(topic ,stream)
108+ println (" startGatewayFiltersStream function ending" )
109+ }
110+
111+
112+
113+
114+ private fun gatewayKeyFilter (key : Long , value : String , recordContext : RecordContext ): Array <String > {
115+ val outputTopics: Array <String > = emptyArray()
116+ for (entry in testGatewayKeys) {
117+ for (topicKeysArray in entry.value) {
118+ if (topicKeysArray.keys.contains(key)) {
119+ outputTopics.plus(topicKeysArray.topic)
120+ }
121+ }
122+ }
123+ return outputTopics
124+ }
125+
126+ private fun getStreamDefaultProperties (): Properties {
127+ val props = Properties ()
128+ props.put(StreamsConfig .APPLICATION_ID_CONFIG , DEFAULT_STREAM_ID )
129+ props.put(ProducerConfig .LINGER_MS_CONFIG , 1 ) // This value is only for testing, should be around maybe 20 or more in production. Default is 100
130+ props.put(StreamsConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers)
131+ props.put(StreamsConfig .DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes .Long ().javaClass)
132+ props.put(StreamsConfig .DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes .String ().javaClass)
133+ props.put(ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , " latest" )
134+ return props
23135 }
24136
25137 fun startStreaming (id : String ): String {
@@ -33,6 +145,7 @@ class KStreamsHandler(
33145
34146 val props = Properties ()
35147 props.put(StreamsConfig .APPLICATION_ID_CONFIG , " streams-app-id-1-example" )
148+ props.put(ProducerConfig .LINGER_MS_CONFIG , 1 )
36149 props.put(StreamsConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers)
37150 props.put(StreamsConfig .DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes .Long ().javaClass)
38151 props.put(StreamsConfig .DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes .String ().javaClass)
@@ -53,7 +166,8 @@ class KStreamsHandler(
53166 }
54167 }*/
55168
56- inputStream.to { key, value, recordContext -> evenFilter(key, value, recordContext) }
169+ // inputStream.to { key, value, recordContext -> evenFilter(key, value, recordContext) }
170+ inputStream.to(" even" )
57171
58172
59173 val topology = builder.build()
@@ -76,6 +190,36 @@ class KStreamsHandler(
76190 }
77191 }
78192
193+ fun loggerConsumer () {
194+
195+ if (streamsMap.contains(" logger-stream" )) {
196+ streamsMap.get(" logger-stream" )?.close()
197+ streamsMap.remove(" logger-stream" )
198+ return
199+ }
200+
201+ val props = Properties ()
202+ props.put(ConsumerConfig .GROUP_ID_CONFIG , " fu" )
203+ props.put(ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers)
204+ props.put(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , LongDeserializer ::class .java.name)
205+ props.put(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer ::class .java.name)
206+ val consumer: KafkaConsumer <Long , String > = KafkaConsumer (props)
207+
208+ consumer.subscribe(listOf (" even" , " uneven" ))
209+
210+
211+
212+ while (true ) {
213+ consumer.poll(Duration .ofSeconds(5 )).forEach { record ->
214+ println (" [${System .currentTimeMillis()} ] - Consumer key: [${record.key()} ] and value[${record.value()} ] from topic:[${record.topic()} ] and timestamp [${record.timestamp()} ]" )
215+ }
216+ }
217+ }
218+
219+ fun closeConsumerLogger () {
220+ streamsMap.remove(" logger-stream" )
221+ }
222+
79223 fun loggerStream () {
80224 if (streamsMap.contains(" logger-stream" )) {
81225 streamsMap.get(" logger-stream" )?.close()
0 commit comments