Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
/**
* @author Soby Chacko
* @author Ralf Wiedmann
* @author Gihong Park
* @since 3.0.0
*/
public abstract class AbstractKafkaStreamsBinderProcessor implements ApplicationContextAware {
Expand Down Expand Up @@ -542,13 +543,31 @@ private <K, V> KTable<K, V> materializedAs(StreamsBuilder streamsBuilder, String

final Consumed<K, V> consumed = getConsumed(kafkaStreamsConsumerProperties, k, v, autoOffsetReset);
return streamsBuilder.table(this.bindingServiceProperties.getBindingDestination(destination),
consumed, getMaterialized(storeName, k, v));
consumed, getMaterialized(storeName, k, v, kafkaStreamsConsumerProperties.isCachingDisabled(), kafkaStreamsConsumerProperties.isLoggingDisabled()));
}

private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(
String storeName, Serde<K> k, Serde<V> v) {
return Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
String storeName, Serde<K> k, Serde<V> v, Boolean isCachingDisabled, Boolean isLoggingDisabled) {
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(k).withValueSerde(v);

if (isCachingDisabled != null) {
if (isCachingDisabled) {
materialized = materialized.withCachingDisabled();
}
else {
materialized = materialized.withCachingEnabled();
}
}

if (isLoggingDisabled != null) {
if (isLoggingDisabled) {
materialized = materialized.withLoggingDisabled();
}
}

return materialized;
}

private <K, V> GlobalKTable<K, V> materializedAsGlobalKTable(
Expand All @@ -558,7 +577,7 @@ private <K, V> GlobalKTable<K, V> materializedAsGlobalKTable(
return streamsBuilder.globalTable(
this.bindingServiceProperties.getBindingDestination(destination),
consumed,
getMaterialized(storeName, k, v));
getMaterialized(storeName, k, v, kafkaStreamsConsumerProperties.isCachingDisabled(), kafkaStreamsConsumerProperties.isLoggingDisabled()));
}

private GlobalKTable<?, ?> getGlobalKTable(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*
* @author Marius Bogoevici
* @author Soby Chacko
* @author Gihong Park
*/
public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties {

Expand All @@ -44,6 +45,16 @@ public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties {
*/
private String materializedAs;

/**
* Disable caching for materialized KTable.
*/
private boolean cachingDisabled;

/**
* Disable logging for materialized KTable.
*/
private boolean loggingDisabled;

/**
* Per input binding deserialization handler.
*/
Expand Down Expand Up @@ -109,6 +120,22 @@ public void setMaterializedAs(String materializedAs) {
this.materializedAs = materializedAs;
}

public boolean isCachingDisabled() {
return this.cachingDisabled;
}

public void setCachingDisabled(boolean cachingDisabled) {
this.cachingDisabled = cachingDisabled;
}

public boolean isLoggingDisabled() {
return this.loggingDisabled;
}

public void setLoggingDisabled(boolean loggingDisabled) {
this.loggingDisabled = loggingDisabled;
}

public String getTimestampExtractorBeanName() {
return timestampExtractorBeanName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,85 @@ void defaultsRespectedWhenCustomBindingProperties() {
});
}

@Test
void cachingAndLoggingDisabledPropertiesWork() {
this.contextRunner
.withPropertyValues(
"spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.caching-disabled: true",
"spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.logging-disabled: true")
.run((context) -> {
assertThat(context)
.hasNotFailed()
.hasSingleBean(KafkaStreamsExtendedBindingProperties.class);
KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0"))
.hasFieldOrPropertyWithValue("cachingDisabled", true)
.hasFieldOrPropertyWithValue("loggingDisabled", true);
});
}

@Test
void cachingAndLoggingDefaultValues() {
this.contextRunner.run((context) -> {
assertThat(context)
.hasNotFailed()
.hasSingleBean(KafkaStreamsExtendedBindingProperties.class);
KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0"))
.hasFieldOrPropertyWithValue("cachingDisabled", false)
.hasFieldOrPropertyWithValue("loggingDisabled", false);
});
}

@Test
void onlyCachingDisabledProperty() {
this.contextRunner
.withPropertyValues(
"spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.caching-disabled: true")
.run((context) -> {
assertThat(context)
.hasNotFailed()
.hasSingleBean(KafkaStreamsExtendedBindingProperties.class);
KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0"))
.hasFieldOrPropertyWithValue("cachingDisabled", true)
.hasFieldOrPropertyWithValue("loggingDisabled", false);
});
}

@Test
void onlyLoggingDisabledProperty() {
this.contextRunner
.withPropertyValues(
"spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.logging-disabled: true")
.run((context) -> {
assertThat(context)
.hasNotFailed()
.hasSingleBean(KafkaStreamsExtendedBindingProperties.class);
KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0"))
.hasFieldOrPropertyWithValue("cachingDisabled", false)
.hasFieldOrPropertyWithValue("loggingDisabled", true);
});
}

@Test
void defaultAndBindingSpecificCachingLoggingProperties() {
this.contextRunner
.withPropertyValues(
"spring.cloud.stream.kafka.streams.default.consumer.caching-disabled: true",
"spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.logging-disabled: true")
.run((context) -> {
assertThat(context)
.hasNotFailed()
.hasSingleBean(KafkaStreamsExtendedBindingProperties.class);
KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0"))
.hasFieldOrPropertyWithValue("cachingDisabled", true)
.hasFieldOrPropertyWithValue("loggingDisabled", true);
});
}

@EnableAutoConfiguration
static class KafkaStreamsTestApp {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,19 @@ state store to materialize when using incoming KTable types
+
Default: `none`.

cachingDisabled::
Disable caching for materialized KTable.
When set to `true`, calls `withCachingDisabled()` on the Materialized object.
When set to `false`, calls `withCachingEnabled()` on the Materialized object.
+
Default: `false`.

loggingDisabled::
Disable logging for materialized KTable.
When set to `true`, calls `withLoggingDisabled()` on the Materialized object.
+
Default: `false`.

useNativeDecoding::
flag to enable/disable native decoding
+
Expand Down