Skip to content

Commit 5079b00

Browse files
MDzajagermanosinHaarolean
authored
Serde: Enable header serialization (#509)
Co-authored-by: German Osin <[email protected]> Co-authored-by: Roman Zabaluev <[email protected]>
1 parent 8b5494b commit 5079b00

File tree

4 files changed

+32
-7
lines changed

4 files changed

+32
-7
lines changed

api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import javax.annotation.Nullable;
66
import lombok.RequiredArgsConstructor;
77
import org.apache.kafka.clients.producer.ProducerRecord;
8-
import org.apache.kafka.common.header.Header;
8+
import org.apache.kafka.common.header.Headers;
99
import org.apache.kafka.common.header.internals.RecordHeader;
1010
import org.apache.kafka.common.header.internals.RecordHeaders;
1111

@@ -20,18 +20,23 @@ public ProducerRecord<byte[], byte[]> create(String topic,
2020
@Nullable String key,
2121
@Nullable String value,
2222
@Nullable Map<String, String> headers) {
23+
24+
Headers kafkaHeaders = createHeaders(headers);
25+
2326
return new ProducerRecord<>(
2427
topic,
2528
partition,
26-
key == null ? null : keySerializer.serialize(key),
27-
value == null ? null : valuesSerializer.serialize(value),
28-
headers == null ? null : createHeaders(headers)
29+
key == null ? null : keySerializer.serialize(key, kafkaHeaders),
30+
value == null ? null : valuesSerializer.serialize(value, kafkaHeaders),
31+
kafkaHeaders
2932
);
3033
}
3134

32-
private Iterable<Header> createHeaders(Map<String, String> clientHeaders) {
35+
private Headers createHeaders(Map<String, String> clientHeaders) {
3336
RecordHeaders headers = new RecordHeaders();
34-
clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v == null ? null : v.getBytes())));
37+
if (clientHeaders != null) {
38+
clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v == null ? null : v.getBytes())));
39+
}
3540
return headers;
3641
}
3742

api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import lombok.Getter;
1111
import lombok.RequiredArgsConstructor;
1212
import lombok.extern.slf4j.Slf4j;
13+
import org.apache.kafka.common.header.Headers;
1314

1415
@Slf4j
1516
@RequiredArgsConstructor
@@ -80,7 +81,17 @@ public boolean canDeserialize(String topic, Serde.Target type) {
8081
public Serde.Serializer serializer(String topic, Serde.Target type) {
8182
return wrapWithClassloader(() -> {
8283
var serializer = serde.serializer(topic, type);
83-
return input -> wrapWithClassloader(() -> serializer.serialize(input));
84+
return new Serde.Serializer() {
85+
@Override
86+
public byte[] serialize(String input) {
87+
return wrapWithClassloader(() -> serializer.serialize(input));
88+
}
89+
90+
@Override
91+
public byte[] serialize(String input, Headers headers) {
92+
return wrapWithClassloader(() -> serializer.serialize(input, headers));
93+
}
94+
};
8495
});
8596
}
8697

serde-api/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ tasks.register('javadocJar', Jar) {
1515
from javadoc.destinationDir
1616
}
1717

18+
dependencies {
19+
implementation libs.kafka.clients
20+
}
21+
1822
artifacts {
1923
archives sourceJar, javadocJar
2024
}

serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.Closeable;
44
import java.util.Optional;
5+
import org.apache.kafka.common.header.Headers;
56

67
/**
78
* Main interface of serialization/deserialization logic.
@@ -121,6 +122,10 @@ interface Serializer {
121122
* @return serialized bytes. Can be null if input is null or empty string.
122123
*/
123124
byte[] serialize(String input);
125+
126+
default byte[] serialize(String input, Headers headers) {
127+
return serialize(input);
128+
}
124129
}
125130

126131
/**

0 commit comments

Comments
 (0)