diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 13214c14..d0eb7aae 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -2,7 +2,7 @@ junit = "5.13.3" mockito = "5.18.0" testcontainers = "1.21.3" -kafkaUtils = "1.2.1" +kafkaUtils = "1.2.2-SNAPSHOT" [libraries] kafka-bom = { group = "com.bakdata.kafka", name = "kafka-bom", version.ref = "kafkaUtils" } @@ -17,7 +17,7 @@ kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro- kafka-protobuf-provider = { group = "io.confluent", name = "kafka-protobuf-provider" } largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.0" } largeMessage-core = { group = "com.bakdata.kafka", name = "large-message-core" } -errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.0.0" } +errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.0.1-SNAPSHOT" } errorHandling-core = { group = "com.bakdata.kafka", name = "error-handling-core" } picocli = { group = "info.picocli", name = "picocli", version = "4.7.7" } slf4j = { group = "org.slf4j", name = "slf4j-api", version = "2.0.17" } @@ -33,7 +33,7 @@ mockito-core = { group = "org.mockito", name = "mockito-core", version.ref = "mo mockito-junit = { group = "org.mockito", name = "mockito-junit-jupiter", version.ref = "mockito" } testcontainers-junit = { group = "org.testcontainers", name = "junit-jupiter", version.ref = "testcontainers" } testcontainers-kafka = { group = "org.testcontainers", name = "kafka", version.ref = "testcontainers" } -fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.4.1" } +fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.4.2-SNAPSHOT" } log4j-slf4j2 = { group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = "2.25.1" } awaitility = { group = "org.awaitility", name = "awaitility", version = "4.3.0" } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java index 55f3b016..668052f2 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java @@ -850,7 +850,7 @@ KStreamX leftJoin(GlobalKTable globalTable, @Override KStreamX process( - ProcessorSupplier processorSupplier, + ProcessorSupplier processorSupplier, String... stateStoreNames); /** @@ -866,7 +866,7 @@ KStreamX process( * @see ErrorCapturingProcessor#captureErrors(ProcessorSupplier) */ KErrorStreamX processCapturingErrors( - ProcessorSupplier processorSupplier, + ProcessorSupplier processorSupplier, String... stateStoreNames); /** @@ -883,13 +883,13 @@ KErrorStreamX processCapturingErrors( * @see ErrorCapturingProcessor#captureErrors(ProcessorSupplier, java.util.function.Predicate) */ KErrorStreamX processCapturingErrors( - ProcessorSupplier processorSupplier, + ProcessorSupplier processorSupplier, java.util.function.Predicate errorFilter, String... stateStoreNames); @Override KStreamX process( - ProcessorSupplier processorSupplier, + ProcessorSupplier processorSupplier, Named named, String... stateStoreNames); /** @@ -906,7 +906,7 @@ KStreamX process( * @see ErrorCapturingProcessor#captureErrors(ProcessorSupplier) */ KErrorStreamX processCapturingErrors( - ProcessorSupplier processorSupplier, + ProcessorSupplier processorSupplier, Named named, String... stateStoreNames); /** @@ -924,13 +924,13 @@ KErrorStreamX processCapturingErrors( * @see ErrorCapturingProcessor#captureErrors(ProcessorSupplier, java.util.function.Predicate) */ KErrorStreamX processCapturingErrors( - ProcessorSupplier processorSupplier, + ProcessorSupplier processorSupplier, java.util.function.Predicate errorFilter, Named named, String... stateStoreNames); @Override KStreamX processValues( - FixedKeyProcessorSupplier processorSupplier, + FixedKeyProcessorSupplier processorSupplier, String... stateStoreNames); /** @@ -947,7 +947,7 @@ KStreamX processValues( * @see ErrorCapturingValueProcessor#captureErrors(FixedKeyProcessorSupplier) */ KErrorStreamX processValuesCapturingErrors( - FixedKeyProcessorSupplier processorSupplier, + FixedKeyProcessorSupplier processorSupplier, String... stateStoreNames); /** @@ -965,13 +965,13 @@ KErrorStreamX processValuesCapturingErrors( * @see ErrorCapturingValueProcessor#captureErrors(FixedKeyProcessorSupplier, java.util.function.Predicate) */ KErrorStreamX processValuesCapturingErrors( - FixedKeyProcessorSupplier processorSupplier, + FixedKeyProcessorSupplier processorSupplier, java.util.function.Predicate errorFilter, String... stateStoreNames); @Override KStreamX processValues( - FixedKeyProcessorSupplier processorSupplier, + FixedKeyProcessorSupplier processorSupplier, Named named, String... stateStoreNames); /** @@ -989,7 +989,7 @@ KStreamX processValues( * @see ErrorCapturingValueProcessor#captureErrors(FixedKeyProcessorSupplier) */ KErrorStreamX processValuesCapturingErrors( - FixedKeyProcessorSupplier processorSupplier, + FixedKeyProcessorSupplier processorSupplier, Named named, String... stateStoreNames); /** @@ -1008,7 +1008,7 @@ KErrorStreamX processValuesCapturingErrors( * @see ErrorCapturingValueProcessor#captureErrors(FixedKeyProcessorSupplier, java.util.function.Predicate) */ KErrorStreamX processValuesCapturingErrors( - FixedKeyProcessorSupplier processorSupplier, + FixedKeyProcessorSupplier processorSupplier, java.util.function.Predicate errorFilter, Named named, String... stateStoreNames); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java index 0bdd0b6a..876c643f 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java @@ -838,14 +838,14 @@ public KStreamX leftJoin(final GlobalKTable globalTa @Override public KStreamX process( - final ProcessorSupplier processorSupplier, + final ProcessorSupplier processorSupplier, final String... stateStoreNames) { return this.context.wrap(this.wrapped.process(processorSupplier, stateStoreNames)); } @Override public KErrorStreamX processCapturingErrors( - final ProcessorSupplier processorSupplier, + final ProcessorSupplier processorSupplier, final String... stateStoreNames) { return this.processCapturingErrorsInternal(ErrorCapturingProcessor.captureErrors(processorSupplier), stateStoreNames); @@ -853,7 +853,7 @@ public KErrorStreamX processCapturingErrors( @Override public KErrorStreamX processCapturingErrors( - final ProcessorSupplier processorSupplier, + final ProcessorSupplier processorSupplier, final java.util.function.Predicate errorFilter, final String... stateStoreNames) { return this.processCapturingErrorsInternal( @@ -863,14 +863,16 @@ public KErrorStreamX processCapturingErrors( @Override public KStreamX process( - final ProcessorSupplier processorSupplier, final Named named, + final ProcessorSupplier processorSupplier, + final Named named, final String... stateStoreNames) { return this.context.wrap(this.wrapped.process(processorSupplier, named, stateStoreNames)); } @Override public KErrorStreamX processCapturingErrors( - final ProcessorSupplier processorSupplier, final Named named, + final ProcessorSupplier processorSupplier, + final Named named, final String... stateStoreNames) { return this.processCapturingErrorsInternal(ErrorCapturingProcessor.captureErrors(processorSupplier), named, stateStoreNames); @@ -878,7 +880,7 @@ public KErrorStreamX processCapturingErrors( @Override public KErrorStreamX processCapturingErrors( - final ProcessorSupplier processorSupplier, + final ProcessorSupplier processorSupplier, final java.util.function.Predicate errorFilter, final Named named, final String... stateStoreNames) { return this.processCapturingErrorsInternal( @@ -888,14 +890,14 @@ public KErrorStreamX processCapturingErrors( @Override public KStreamX processValues( - final FixedKeyProcessorSupplier processorSupplier, + final FixedKeyProcessorSupplier processorSupplier, final String... stateStoreNames) { return this.context.wrap(this.wrapped.processValues(processorSupplier, stateStoreNames)); } @Override public KErrorStreamX processValuesCapturingErrors( - final FixedKeyProcessorSupplier processorSupplier, + final FixedKeyProcessorSupplier processorSupplier, final String... stateStoreNames) { return this.processValuesCapturingErrorsInternal(ErrorCapturingValueProcessor.captureErrors(processorSupplier), stateStoreNames); @@ -903,7 +905,7 @@ public KErrorStreamX processValuesCapturingErrors( @Override public KErrorStreamX processValuesCapturingErrors( - final FixedKeyProcessorSupplier processorSupplier, + final FixedKeyProcessorSupplier processorSupplier, final java.util.function.Predicate errorFilter, final String... stateStoreNames) { return this.processValuesCapturingErrorsInternal( ErrorCapturingValueProcessor.captureErrors(processorSupplier, errorFilter), @@ -913,14 +915,14 @@ public KErrorStreamX processValuesCapturingErrors( @Override public KStreamX processValues( - final FixedKeyProcessorSupplier processorSupplier, final Named named, + final FixedKeyProcessorSupplier processorSupplier, final Named named, final String... stateStoreNames) { return this.context.wrap(this.wrapped.processValues(processorSupplier, named, stateStoreNames)); } @Override public KErrorStreamX processValuesCapturingErrors( - final FixedKeyProcessorSupplier processorSupplier, final Named named, + final FixedKeyProcessorSupplier processorSupplier, final Named named, final String... stateStoreNames) { return this.processValuesCapturingErrorsInternal(ErrorCapturingValueProcessor.captureErrors(processorSupplier), named, @@ -929,87 +931,87 @@ public KErrorStreamX processValuesCapturingErrors( @Override public KErrorStreamX processValuesCapturingErrors( - final FixedKeyProcessorSupplier processorSupplier, + final FixedKeyProcessorSupplier processorSupplier, final java.util.function.Predicate errorFilter, final Named named, final String... stateStoreNames) { return this.processValuesCapturingErrorsInternal( ErrorCapturingValueProcessor.captureErrors(processorSupplier, errorFilter), named, stateStoreNames); } - private KeyValueKErrorStreamX mapCapturingErrorsInternal( + private KErrorStreamX mapCapturingErrorsInternal( final KeyValueMapper>> mapper) { final KStreamX> map = this.map(mapper); return new KeyValueKErrorStreamX<>(map); } - private KeyValueKErrorStreamX mapCapturingErrorsInternal( + private KErrorStreamX mapCapturingErrorsInternal( final KeyValueMapper>> mapper, final Named named) { final KStreamX> map = this.map(mapper, named); return new KeyValueKErrorStreamX<>(map); } - private ValueKErrorStreamX mapValuesCapturingErrorsInternal( + private KErrorStreamX mapValuesCapturingErrorsInternal( final ValueMapper> mapper) { final KStreamX> map = this.mapValues(mapper); return new ValueKErrorStreamX<>(map); } - private ValueKErrorStreamX mapValuesCapturingErrorsInternal( + private KErrorStreamX mapValuesCapturingErrorsInternal( final ValueMapper> mapper, final Named named) { final KStreamX> map = this.mapValues(mapper, named); return new ValueKErrorStreamX<>(map); } - private ValueKErrorStreamX mapValuesCapturingErrorsInternal( + private KErrorStreamX mapValuesCapturingErrorsInternal( final ValueMapperWithKey> mapper) { final KStreamX> map = this.mapValues(mapper); return new ValueKErrorStreamX<>(map); } - private ValueKErrorStreamX mapValuesCapturingErrorsInternal( + private KErrorStreamX mapValuesCapturingErrorsInternal( final ValueMapperWithKey> mapper, final Named named) { final KStreamX> map = this.mapValues(mapper, named); return new ValueKErrorStreamX<>(map); } - private KeyValueKErrorStreamX flatMapCapturingErrorsInternal( + private KErrorStreamX flatMapCapturingErrorsInternal( final KeyValueMapper>>> mapper) { final KStreamX> map = this.flatMap(mapper); return new KeyValueKErrorStreamX<>(map); } - private KeyValueKErrorStreamX flatMapCapturingErrorsInternal( + private KErrorStreamX flatMapCapturingErrorsInternal( final KeyValueMapper>>> mapper, final Named named) { final KStreamX> map = this.flatMap(mapper, named); return new KeyValueKErrorStreamX<>(map); } - private ValueKErrorStreamX flatMapValuesCapturingErrorsInternal( + private KErrorStreamX flatMapValuesCapturingErrorsInternal( final ValueMapper>> mapper) { final KStreamX> map = this.flatMapValues(mapper); return new ValueKErrorStreamX<>(map); } - private ValueKErrorStreamX flatMapValuesCapturingErrorsInternal( + private KErrorStreamX flatMapValuesCapturingErrorsInternal( final ValueMapper>> mapper, final Named named) { final KStreamX> map = this.flatMapValues(mapper, named); return new ValueKErrorStreamX<>(map); } - private ValueKErrorStreamX flatMapValuesCapturingErrorsInternal( + private KErrorStreamX flatMapValuesCapturingErrorsInternal( final ValueMapperWithKey>> mapper) { final KStreamX> map = this.flatMapValues(mapper); return new ValueKErrorStreamX<>(map); } - private ValueKErrorStreamX flatMapValuesCapturingErrorsInternal( + private KErrorStreamX flatMapValuesCapturingErrorsInternal( final ValueMapperWithKey>> mapper, final Named named) { final KStreamX> map = this.flatMapValues(mapper, named); return new ValueKErrorStreamX<>(map); } - private KeyValueKErrorStreamX processCapturingErrorsInternal( + private KErrorStreamX processCapturingErrorsInternal( final ProcessorSupplier> processorSupplier, final String... stateStoreNames) { final KStreamX> map = @@ -1017,7 +1019,7 @@ private KeyValueKErrorStreamX processCapturingErr return new KeyValueKErrorStreamX<>(map); } - private KeyValueKErrorStreamX processCapturingErrorsInternal( + private KErrorStreamX processCapturingErrorsInternal( final ProcessorSupplier> processorSupplier, final Named named, final String... stateStoreNames) { final KStreamX> map = @@ -1025,14 +1027,14 @@ private KeyValueKErrorStreamX processCapturingErr return new KeyValueKErrorStreamX<>(map); } - private ValueKErrorStreamX processValuesCapturingErrorsInternal( + private KErrorStreamX processValuesCapturingErrorsInternal( final FixedKeyProcessorSupplier> processorSupplier, final String... stateStoreNames) { final KStreamX> map = this.processValues(processorSupplier, stateStoreNames); return new ValueKErrorStreamX<>(map); } - private ValueKErrorStreamX processValuesCapturingErrorsInternal( + private KErrorStreamX processValuesCapturingErrorsInternal( final FixedKeyProcessorSupplier> processorSupplier, final Named named, final String... stateStoreNames) { final KStreamX> map = diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java index 3a77d5c8..833f3334 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java @@ -205,16 +205,19 @@ KTableX transformValues( String... stateStoreNames); @Override - KGroupedTableX groupBy(KeyValueMapper> selector); + KGroupedTableX groupBy( + KeyValueMapper> selector); @Override - KGroupedTableX groupBy(KeyValueMapper> selector, + KGroupedTableX groupBy( + KeyValueMapper> selector, Grouped grouped); /** * @see #groupBy(KeyValueMapper, Grouped) */ - KGroupedTableX groupBy(KeyValueMapper> selector, + KGroupedTableX groupBy( + KeyValueMapper> selector, GroupedX grouped); @Override @@ -311,122 +314,142 @@ KTableX outerJoin(KTable other, Named named, MaterializedX> materialized); @Override - KTableX join(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner); + KTableX join(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner); @Override - KTableX join(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined); + KTableX join(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined); @Override - KTableX join(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, Materialized> materialized); + KTableX join(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, + Materialized> materialized); /** * @see #join(KTable, Function, ValueJoiner, Materialized) */ - KTableX join(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, MaterializedX> materialized); + KTableX join(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, + MaterializedX> materialized); @Override - KTableX join(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined, + KTableX join(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, Materialized> materialized); /** * @see #join(KTable, Function, ValueJoiner, TableJoined, Materialized) */ - KTableX join(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined, + KTableX join(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, MaterializedX> materialized); @Override - KTableX leftJoin(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner); + KTableX leftJoin(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner); @Override - KTableX leftJoin(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined); + KTableX leftJoin(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined); @Override - KTableX leftJoin(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, Materialized> materialized); + KTableX leftJoin(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, + Materialized> materialized); /** * @see #leftJoin(KTable, Function, ValueJoiner, Materialized) */ - KTableX leftJoin(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, MaterializedX> materialized); + KTableX leftJoin(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, + MaterializedX> materialized); @Override - KTableX leftJoin(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined, + KTableX leftJoin(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, Materialized> materialized); /** * @see #leftJoin(KTable, Function, ValueJoiner, TableJoined, Materialized) */ - KTableX leftJoin(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined, + KTableX leftJoin(KTable other, Function foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, MaterializedX> materialized); @Override - KTableX join(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner); + KTableX join(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner); @Override - KTableX join(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined); + KTableX join(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined); @Override - KTableX join(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner, Materialized> materialized); + KTableX join(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner, + Materialized> materialized); /** * @see #join(KTable, BiFunction, ValueJoiner, Materialized) */ - KTableX join(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner, MaterializedX> materialized); + KTableX join(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner, + MaterializedX> materialized); @Override - KTableX join(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined, + KTableX join(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, Materialized> materialized); /** * @see #join(KTable, BiFunction, ValueJoiner, TableJoined, Materialized) */ - KTableX join(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined, + KTableX join(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, MaterializedX> materialized); @Override - KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner); + KTableX leftJoin(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner); @Override - KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined); + KTableX leftJoin(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined); @Override - KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner, Materialized> materialized); + KTableX leftJoin(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner, + Materialized> materialized); /** * @see #leftJoin(KTable, BiFunction, ValueJoiner, Materialized) */ - KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner, MaterializedX> materialized); + KTableX leftJoin(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner, + MaterializedX> materialized); @Override - KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined, + KTableX leftJoin(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, Materialized> materialized); /** * @see #leftJoin(KTable, BiFunction, ValueJoiner, TableJoined, Materialized) */ - KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, - ValueJoiner joiner, TableJoined tableJoined, + KTableX leftJoin(KTable other, + BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, MaterializedX> materialized); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java index 34b7c250..2af2e71b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java @@ -267,19 +267,21 @@ public KTableX transformValues( @Override public KGroupedTableX groupBy( - final KeyValueMapper> selector) { + final KeyValueMapper> selector) { return this.context.wrap(this.wrapped.groupBy(selector)); } @Override public KGroupedTableX groupBy( - final KeyValueMapper> selector, final Grouped grouped) { + final KeyValueMapper> selector, + final Grouped grouped) { return this.context.wrap(this.wrapped.groupBy(selector, grouped)); } @Override public KGroupedTableX groupBy( - final KeyValueMapper> selector, final GroupedX grouped) { + final KeyValueMapper> selector, + final GroupedX grouped) { return this.groupBy(selector, grouped.configure(this.context.getConfigurator())); } @@ -417,40 +419,41 @@ public KTableX outerJoin(final KTable other, @Override public KTableX join(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner) { + final Function foreignKeyExtractor, + final ValueJoiner joiner) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner)); } @Override public KTableX join(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined) { + final Function foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, tableJoined)); } @Override public KTableX join(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final Materialized> materialized) { + final Function foreignKeyExtractor, + final ValueJoiner joiner, + final Materialized> materialized) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, materialized)); } @Override public KTableX join(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, + final Function foreignKeyExtractor, + final ValueJoiner joiner, final MaterializedX> materialized) { return this.join(other, foreignKeyExtractor, joiner, materialized.configure(this.context.getConfigurator())); } @Override public KTableX join(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined, + final Function foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, final Materialized> materialized) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, tableJoined, materialized)); @@ -458,8 +461,8 @@ public KTableX join(final KTable other, @Override public KTableX join(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined, + final Function foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, final MaterializedX> materialized) { return this.join(other, foreignKeyExtractor, joiner, tableJoined, materialized.configure(this.context.getConfigurator())); @@ -467,32 +470,33 @@ public KTableX join(final KTable other, @Override public KTableX leftJoin(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner) { + final Function foreignKeyExtractor, + final ValueJoiner joiner) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner)); } @Override public KTableX leftJoin(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined) { + final Function foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, tableJoined)); } @Override public KTableX leftJoin(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final Materialized> materialized) { + final Function foreignKeyExtractor, + final ValueJoiner joiner, + final Materialized> materialized) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, materialized)); } @Override public KTableX leftJoin(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, + final Function foreignKeyExtractor, + final ValueJoiner joiner, final MaterializedX> materialized) { return this.leftJoin(other, foreignKeyExtractor, joiner, materialized.configure(this.context.getConfigurator())); @@ -500,8 +504,8 @@ public KTableX leftJoin(final KTable other, @Override public KTableX leftJoin(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined, + final Function foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, final Materialized> materialized) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap( @@ -510,52 +514,59 @@ public KTableX leftJoin(final KTable other, @Override public KTableX leftJoin(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined, + final Function foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, final MaterializedX> materialized) { return this.leftJoin(other, foreignKeyExtractor, joiner, tableJoined, materialized.configure(this.context.getConfigurator())); } @Override - public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, - final ValueJoiner joiner) { + public KTableX join(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner)); } @Override - public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined) { + public KTableX join(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, tableJoined)); } @Override - public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, - final ValueJoiner joiner, final Materialized> materialized) { + public KTableX join(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, + final Materialized> materialized) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, materialized)); } @Override - public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, - final ValueJoiner joiner, + public KTableX join(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final MaterializedX> materialized) { return this.join(other, foreignKeyExtractor, joiner, materialized.configure(this.context.getConfigurator())); } @Override - public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined, + public KTableX join(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, final Materialized> materialized) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, tableJoined, materialized)); } @Override - public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined, + public KTableX join(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, final MaterializedX> materialized) { return this.join(other, foreignKeyExtractor, joiner, tableJoined, materialized.configure(this.context.getConfigurator())); @@ -563,32 +574,33 @@ public KTableX join(final KTable other, final BiFunc @Override public KTableX leftJoin(final KTable other, - final BiFunction foreignKeyExtractor, - final ValueJoiner joiner) { + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner)); } @Override public KTableX leftJoin(final KTable other, - final BiFunction foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined) { + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, tableJoined)); } @Override public KTableX leftJoin(final KTable other, - final BiFunction foreignKeyExtractor, - final ValueJoiner joiner, final Materialized> materialized) { + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, + final Materialized> materialized) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, materialized)); } @Override public KTableX leftJoin(final KTable other, - final BiFunction foreignKeyExtractor, - final ValueJoiner joiner, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final MaterializedX> materialized) { return this.leftJoin(other, foreignKeyExtractor, joiner, materialized.configure(this.context.getConfigurator())); @@ -596,8 +608,8 @@ public KTableX leftJoin(final KTable other, @Override public KTableX leftJoin(final KTable other, - final BiFunction foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, final Materialized> materialized) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap( @@ -606,8 +618,8 @@ public KTableX leftJoin(final KTable other, @Override public KTableX leftJoin(final KTable other, - final BiFunction foreignKeyExtractor, - final ValueJoiner joiner, final TableJoined tableJoined, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, final MaterializedX> materialized) { return this.leftJoin(other, foreignKeyExtractor, joiner, tableJoined, materialized.configure(this.context.getConfigurator()));