Skip to content

Commit 2b9d435

Browse files
authored
Select keys from value exclusively (#364)
1 parent 4049141 commit 2b9d435

File tree

3 files changed

+76
-0
lines changed

3 files changed

+76
-0
lines changed

streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,32 @@ public interface KStreamX<K, V> extends KStream<K, V> {
8383
@Override
8484
<KR> KStreamX<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper, Named named);
8585

86+
/**
87+
* Set a new key (with possibly new type) for each input record. The provided {@link ValueMapper} is applied to each
88+
* input record and computes a new key for it. Thus, an input record {@code <K,V>} can be transformed into an output
89+
* record {@code <K':V>}. This is a stateless record-by-record operation.
90+
*
91+
* @param mapper a {@link ValueMapper} that computes a new key for each record
92+
* @param <KR> the new key type of the result stream
93+
* @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
94+
* @see #selectKey(KeyValueMapper)
95+
*/
96+
<KR> KStreamX<KR, V> selectKey(ValueMapper<? super V, ? extends KR> mapper);
97+
98+
/**
99+
* Set a new key (with possibly new type) for each input record.
100+
* The provided {@link ValueMapper} is applied to each input record and computes a new key for it.
101+
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
102+
* This is a stateless record-by-record operation.
103+
*
104+
* @param mapper a {@link ValueMapper} that computes a new key for each record
105+
* @param named a {@link Named} config used to name the processor in the topology
106+
* @param <KR> the new key type of the result stream
107+
* @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
108+
* @see #selectKey(KeyValueMapper, Named)
109+
*/
110+
<KR> KStreamX<KR, V> selectKey(ValueMapper<? super V, ? extends KR> mapper, Named named);
111+
86112
@Override
87113
<KR, VR> KStreamX<KR, VR> map(
88114
KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);

streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,16 @@ public <KR> KStreamX<KR, V> selectKey(final KeyValueMapper<? super K, ? super V,
102102
return this.context.wrap(this.wrapped.selectKey(mapper, named));
103103
}
104104

105+
@Override
106+
public <KR> KStreamX<KR, V> selectKey(final ValueMapper<? super V, ? extends KR> mapper) {
107+
return this.selectKey((k, v) -> mapper.apply(v));
108+
}
109+
110+
@Override
111+
public <KR> KStreamX<KR, V> selectKey(final ValueMapper<? super V, ? extends KR> mapper, final Named named) {
112+
return this.selectKey((k, v) -> mapper.apply(v), named);
113+
}
114+
105115
@Override
106116
public <KR, VR> KStreamX<KR, VR> map(
107117
final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {

streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KStreamXTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -898,6 +898,46 @@ public void buildTopology(final StreamsBuilderX builder) {
898898
}
899899
}
900900

901+
@Test
902+
void shouldSelectKeyFromValue() {
903+
final StringApp app = new StringApp() {
904+
@Override
905+
public void buildTopology(final StreamsBuilderX builder) {
906+
final KStreamX<String, String> input = builder.stream("input");
907+
input.selectKey(v -> v).to("output");
908+
}
909+
};
910+
try (final TestTopology<String, String> topology = app.startApp()) {
911+
topology.input()
912+
.add("foo", "bar");
913+
topology.streamOutput()
914+
.expectNextRecord()
915+
.hasKey("bar")
916+
.hasValue("bar")
917+
.expectNoMoreRecord();
918+
}
919+
}
920+
921+
@Test
922+
void shouldSelectKeyFromValueNamed() {
923+
final StringApp app = new StringApp() {
924+
@Override
925+
public void buildTopology(final StreamsBuilderX builder) {
926+
final KStreamX<String, String> input = builder.stream("input");
927+
input.selectKey(v -> v, Named.as("select")).to("output");
928+
}
929+
};
930+
try (final TestTopology<String, String> topology = app.startApp()) {
931+
topology.input()
932+
.add("foo", "bar");
933+
topology.streamOutput()
934+
.expectNextRecord()
935+
.hasKey("bar")
936+
.hasValue("bar")
937+
.expectNoMoreRecord();
938+
}
939+
}
940+
901941
@Test
902942
void shouldMapCapturingErrors() {
903943
final KeyValueMapper<String, String, KeyValue<String, String>> mapper = mock();

0 commit comments

Comments
 (0)