diff --git a/clients/cloud/kotlin/build.gradle.kts b/clients/cloud/kotlin/build.gradle.kts index 7e7161ca3..ebfb9d2ca 100644 --- a/clients/cloud/kotlin/build.gradle.kts +++ b/clients/cloud/kotlin/build.gradle.kts @@ -4,32 +4,31 @@ group = "io.confluent.confluent" plugins { java - kotlin("jvm") version "1.3.11" + kotlin("jvm") version "1.8.21" } - -java { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 +kotlin { + jvmToolchain { + languageVersion.set(JavaLanguageVersion.of(17)) + } } + + dependencies { - compile(kotlin("stdlib")) - compile(kotlin("reflect")) - - compile("org.apache.kafka:kafka-clients:2.1.0") - compile("org.apache.kafka:kafka-streams:2.1.0") - compile("org.apache.kafka:connect-runtime:2.1.0") - compile("io.confluent:kafka-json-serializer:5.0.1") - compile("org.slf4j:slf4j-api:1.7.6") - compile("org.slf4j:slf4j-log4j12:1.7.6") - compile("com.fasterxml.jackson.core:jackson-databind:[2.8.11.1,)") - //compile ("com.fasterxml.jackson.module:jackson-module-kotlin:[2.8.11.1,)") - compile("com.google.code.gson:gson:2.2.4") + implementation("org.apache.kafka:kafka-clients:3.5.0") + implementation("org.apache.kafka:kafka-streams:3.5.0") + implementation("org.apache.kafka:connect-runtime:3.5.0") + implementation("io.confluent:kafka-json-serializer:5.0.1") + implementation("org.slf4j:slf4j-api:2.0.5") + implementation("org.slf4j:slf4j-log4j12:2.0.5") + implementation("com.fasterxml.jackson.core:jackson-databind:2.15.1") + implementation("com.google.code.gson:gson:2.10.1") } repositories { - jcenter() - maven(url = "http://packages.confluent.io/maven/") + mavenCentral() + maven { url = uri("https://packages.confluent.io/maven/")} + maven { url = uri("https://maven.pkg.jetbrains.space/public/p/ktor/eap") } } val configPath: String by project @@ -37,11 +36,11 @@ val topic: String by project val mainClass: String by project tasks.withType { - kotlinOptions.jvmTarget = "1.8" + kotlinOptions.jvmTarget = "17" } task("runApp", JavaExec::class) { classpath = sourceSets["main"].runtimeClasspath - main = mainClass + mainClass.set("io.confluent.examples.clients.cloud.ProducerExample") args = listOf(configPath, topic) } \ No newline at end of file diff --git a/clients/cloud/kotlin/gradle.properties b/clients/cloud/kotlin/gradle.properties index 958fa0e51..3f34d3a56 100644 --- a/clients/cloud/kotlin/gradle.properties +++ b/clients/cloud/kotlin/gradle.properties @@ -1,3 +1,3 @@ configPath=$HOME/.confluent/java.config -topic=topic1 +topic=test1 mainClass=io.confluent.examples.clients.cloud.ConsumerExample diff --git a/clients/cloud/kotlin/gradle/wrapper/gradle-wrapper.properties b/clients/cloud/kotlin/gradle/wrapper/gradle-wrapper.properties index 10646ba60..c9407dfd5 100644 --- a/clients/cloud/kotlin/gradle/wrapper/gradle-wrapper.properties +++ b/clients/cloud/kotlin/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Tue Jan 22 17:48:43 EST 2019 +#Wed Jun 07 10:23:44 SAST 2023 distributionBase=GRADLE_USER_HOME +distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip distributionPath=wrapper/dists -zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.1.1-all.zip +zipStoreBase=GRADLE_USER_HOME diff --git a/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/ConsumerExample.kt b/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/ConsumerExample.kt index fda9e7c11..8b23bc521 100644 --- a/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/ConsumerExample.kt +++ b/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/ConsumerExample.kt @@ -56,11 +56,11 @@ fun main(args: Array) { while (true) { totalCount = consumer .poll(ofMillis(100)) - .fold(totalCount, { accumulator, record -> + .fold(totalCount) { accumulator, record -> val newCount = accumulator + 1 println("Consumed record with key ${record.key()} and value ${record.value()}, and updated total count to $newCount") newCount - }) + } } } } diff --git a/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/ProducerExample.kt b/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/ProducerExample.kt index 75eff1b5a..58d733d5a 100644 --- a/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/ProducerExample.kt +++ b/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/ProducerExample.kt @@ -43,6 +43,7 @@ fun createTopic(topic: String, try { with(AdminClient.create(cloudConfig)) { createTopics(listOf(newTopic)).all().get() + } } catch (e: ExecutionException) { if (e.cause !is TopicExistsException) throw e diff --git a/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/StreamsExample.kt b/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/StreamsExample.kt index db5143058..06c1c6e57 100644 --- a/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/StreamsExample.kt +++ b/clients/cloud/kotlin/src/main/kotlin/io/confluent/examples/clients/cloud/StreamsExample.kt @@ -33,6 +33,7 @@ import org.apache.kafka.streams.StreamsConfig.* import org.apache.kafka.streams.kstream.Consumed import org.apache.kafka.streams.kstream.Grouped import org.apache.kafka.streams.kstream.Printed +import kotlin.system.exitProcess //DataRecord Serde @@ -53,7 +54,7 @@ fun main(args: Array) { if (args.size != 2) { println("Please provide command line arguments: ") - System.exit(1) + exitProcess(1) } val topic = args[1] @@ -62,7 +63,7 @@ fun main(args: Array) { val props = loadConfig(args[0]) props[APPLICATION_ID_CONFIG] = "kotlin_streams_example_group_1" // Disable caching to print the aggregation value after each record - props[CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0 + props[STATESTORE_CACHE_MAX_BYTES_CONFIG] = 0 props[REPLICATION_FACTOR_CONFIG] = 3 props[AUTO_OFFSET_RESET_CONFIG] = "earliest" @@ -83,6 +84,6 @@ fun main(args: Array) { streams.start() // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams - Runtime.getRuntime().addShutdownHook(Thread(Runnable { streams.close() })) + Runtime.getRuntime().addShutdownHook(Thread { streams.close() }) }