Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ ext {
hamcrestVersion = '3.0'
hibernateValidationVersion = '9.0.1.Final'
jacksonBomVersion = '2.19.2'
jackson3Version = '3.0.0-rc5'
jaywayJsonPathVersion = '2.9.0'
junit4Version = '4.13.2'
junitJupiterVersion = '5.13.4'
Expand Down Expand Up @@ -110,6 +111,7 @@ allprojects {

imports {
mavenBom "com.fasterxml.jackson:jackson-bom:$jacksonBomVersion"
mavenBom "tools.jackson:jackson-bom:$jackson3Version"
mavenBom "org.junit:junit-bom:$junitJupiterVersion"
mavenBom "io.micrometer:micrometer-bom:$micrometerVersion"
mavenBom "io.micrometer:micrometer-tracing-bom:$micrometerTracingVersion"
Expand Down Expand Up @@ -263,6 +265,12 @@ project ('spring-kafka') {
exclude group: 'org.jetbrains.kotlin'
}

optionalApi 'tools.jackson.core:jackson-databind'
optionalApi 'tools.jackson.datatype:jackson-datatype-joda'
optionalApi('tools.jackson.module:jackson-module-kotlin') {
exclude group: 'org.jetbrains.kotlin'
}

// Spring Data projection message binding support
optionalApi ('org.springframework.data:spring-data-commons') {
exclude group: 'org.springframework'
Expand Down
4 changes: 2 additions & 2 deletions samples/sample-01/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This sample demonstrates a simple producer and consumer; the producer sends objects of type `Foo1` and the consumer receives objects of type `Foo2` (the objects have the same field, `foo`).

The producer uses a `JsonSerializer`; the consumer uses the `ByteArrayDeserializer`, together with a `JsonMessageConverter` which converts to the type of the listener method argument.
The producer uses a `JacksonJsonSerializer`; the consumer uses the `ByteArrayDeserializer`, together with a `JacksonJsonMessageConverter` which converts to the type of the listener method argument.

Run the application and use curl to send some data:

Expand Down Expand Up @@ -31,4 +31,4 @@ Console:
...
2018-11-05 10:12:33.537 INFO 41635 --- [ fooGroup-0-C-1] com.example.Application : Received: Foo2 [foo=fail]
2018-11-05 10:12:43.359 INFO 41635 --- [ dltGroup-0-C-1] com.example.Application : Received from DLT: {"foo":"fail"}
----
----
2 changes: 1 addition & 1 deletion samples/sample-02/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This sample demonstrates a simple producer and a multi-method consumer; the producer sends objects of types `Foo1` and `Bar1` and the consumer receives objects of type `Foo2` and `Bar2` (the objects have the same field, `foo`).

The producer uses a `JsonSerializer`; the consumer uses a `ByteArrayDeserializer`, together with a `ByteArrayJsonMessageConverter` which converts to the required type of the listener method argument.
The producer uses a `JacksonJsonSerializer`; the consumer uses a `ByteArrayJacksonDeserializer`, together with a `ByteArrayJacksonJsonMessageConverter` which converts to the required type of the listener method argument.
We can't infer the type in this case (because the type is used to choose the method to call).
We therefore configure type mapping on the producer and consumer side.
See the `application.yml` for the producer side and the `converter` bean on the consumer side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ Previously, they were mapped as JSON and only `MimeType` was decoded.
`MediaType` could not be decoded.
They are now simple strings for interoperability.

Also, the `DefaultKafkaHeaderMapper` has a new `addToStringClasses` method, allowing the specification of types that should be mapped by using `toString()` instead of JSON.
Also, the `JsonKafkaHeaderMapper` has a new `addToStringClasses` method, allowing the specification of types that should be mapped by using `toString()` instead of JSON.
See xref:kafka/headers.adoc[Message Headers] for more information.

[[cb-2-1-and-2-2-embedded-kafka-changes]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ int delivery = ByteBuffer.wrap(record.headers()
.getInt();
----

When using `@KafkaListener` with the `DefaultKafkaHeaderMapper` or `SimpleKafkaHeaderMapper`, it can be obtained by adding `@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery` as a parameter to the listener method.
When using `@KafkaListener` with the `JsonKafkaHeaderMapper` or `SimpleKafkaHeaderMapper`, it can be obtained by adding `@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery` as a parameter to the listener method.

To enable population of this header, set the container property `deliveryAttemptHeader` to `true`.
It is disabled by default to avoid the (small) overhead of looking up the state for each record and adding the header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface KafkaHeaderMapper {

The `SimpleKafkaHeaderMapper` maps raw headers as `byte[]`, with configuration options for conversion to `String` values.

The `DefaultKafkaHeaderMapper` maps the key to the `MessageHeaders` header name and, in order to support rich header types for outbound messages, JSON conversion is performed.
The `JsonKafkaHeaderMapper` maps the key to the `MessageHeaders` header name and, in order to support rich header types for outbound messages, JSON conversion is performed.
A +++"+++`special`+++"+++ header (with a key of `spring_json_header_types`) contains a JSON map of `<key>:<type>`.
This header is used on the inbound side to provide appropriate conversion of each header value to the original type.

Expand All @@ -48,19 +48,19 @@ The following listing shows a number of example mappings:

[source, java]
----
public DefaultKafkaHeaderMapper() { <1>
public JsonKafkaHeaderMapper() { <1>
...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { <2>
public JsonKafkaHeaderMapper(ObjectMapper objectMapper) { <2>
...
}

public DefaultKafkaHeaderMapper(String... patterns) { <3>
public JsonKafkaHeaderMapper(String... patterns) { <3>
...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { <4>
public JsonKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { <4>
...
}
----
Expand Down Expand Up @@ -95,7 +95,7 @@ The following test case illustrates this mechanism.
----
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
Expand Down Expand Up @@ -126,10 +126,10 @@ To create a mapper for inbound mapping, use one of the static methods on the res

[source, java]
----
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
Expand All @@ -140,20 +140,20 @@ For example:

[source, java]
----
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
JsonKafkaHeaderMapper inboundMapper = JsonKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
----

This will exclude all headers beginning with `abc` and include all others.

By default, the `DefaultKafkaHeaderMapper` is used in the `MessagingMessageConverter` and `BatchMessagingMessageConverter`, as long as Jackson is on the classpath.
By default, the `JsonKafkaHeaderMapper` is used in the `MessagingMessageConverter` and `BatchMessagingMessageConverter`, as long as Jackson is on the classpath.

With the batch converter, the converted headers are available in the `KafkaHeaders.BATCH_CONVERTED_HEADERS` as a `List<Map<String, Object>>` where the map in a position of the list corresponds to the data position in the payload.

If there is no converter (either because Jackson is not present or it is explicitly set to `null`), the headers from the consumer record are provided unconverted in the `KafkaHeaders.NATIVE_HEADERS` header.
This header is a `Headers` object (or a `List<Headers>` in the case of the batch converter), where the position in the list corresponds to the data position in the payload.

IMPORTANT: Certain types are not suitable for JSON serialization, and a simple `toString()` serialization might be preferred for these types.
The `DefaultKafkaHeaderMapper` has a method called `addToStringClasses()` that lets you supply the names of classes that should be treated this way for outbound mapping.
The `JsonKafkaHeaderMapper` has a method called `addToStringClasses()` that lets you supply the names of classes that should be treated this way for outbound mapping.
During inbound mapping, they are mapped as `String`.
By default, only `org.springframework.util.MimeType` and `org.springframework.http.MediaType` are mapped this way.

Expand All @@ -170,7 +170,7 @@ When all applications are using 2.3 or higher, you can leave the property at its
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
Expand All @@ -187,16 +187,16 @@ Starting with 4.0, multi-value header mapping is supported, where the same logic
By default, the `HeaderMapper` does **not** create multiple Kafka headers with the same name.
Instead, when it encounters a collection value (e.g., a `List<byte[]>`), it serializes the entire collection into **one** Kafka header whose value is a JSON array.

* **Producer side:** `DefaultKafkaHeaderMapper` writes the JSON bytes, while `SimpleKafkaHeaderMapper` ignore it.
* **Producer side:** `JsonKafkaHeaderMapper` writes the JSON bytes, while `SimpleKafkaHeaderMapper` ignore it.
* **Consumer side:** the mapper exposes the header as a single value—the **last occurrence wins**; earlier duplicates are silently discarded.

Preserving each individual header requires explicit registration of patterns that designate the header as multi‑valued.

`DefaultKafkaHeaderMapper#setMultiValueHeaderPatterns(String... patterns)` accepts a list of patterns, which can be either wildcard expressions or exact header names.
`JsonKafkaHeaderMapper#setMultiValueHeaderPatterns(String... patterns)` accepts a list of patterns, which can be either wildcard expressions or exact header names.

[source, java]
----
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();

// Explicit header names
mapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2");
Expand All @@ -214,5 +214,5 @@ NOTE: Regular expressions are *not* supported; only the +*+ wildcard is allowed

[IMPORTANT]
====
On the *Producer Side*, When `DefaultKafkaHeaderMapper` serializes a multi-value header, every element in that collection must be of a single Java type—mixing, for example, `String` and `byte[]` values under a single header key will lead to a conversion error.
On the *Producer Side*, When `JsonKafkaHeaderMapper` serializes a multi-value header, every element in that collection must be of a single Java type—mixing, for example, `String` and `byte[]` values under a single header key will lead to a conversion error.
====
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ static class MultiListenerBean {
Starting with version 2.1.3, you can designate a `@KafkaHandler` method as the default method that is invoked if there is no match on other methods.
At most, one method can be so designated.
When using `@KafkaHandler` methods, the payload must have already been converted to the domain object (so the match can be performed).
Use a custom deserializer, the `JsonDeserializer`, or the `JsonMessageConverter` with its `TypePrecedence` set to `TYPE_ID`.
Use a custom deserializer, the `JacksonJsonDeserializer`, or the `JacksonJsonMessageConverter` with its `TypePrecedence` set to `TYPE_ID`.
See xref:kafka/serdes.adoc[Serialization, Deserialization, and Message Conversion] for more information.

IMPORTANT: Due to some limitations in the way Spring resolves method arguments, a default `@KafkaHandler` cannot receive discrete headers; it must use the `ConsumerRecordMetadata` as discussed in xref:kafka/receiving-messages/listener-annotation.adoc#consumer-record-metadata[Consumer Record Metadata].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ The `Header` contains a four-byte int (big-endian).
The server must use this header to route the reply to the correct partition (`@KafkaListener` does this).
In this case, though, the reply container must not use Kafka's group management feature and must be configured to listen on a fixed partition (by using a `TopicPartitionOffset` in its `ContainerProperties` constructor).

NOTE: The `DefaultKafkaHeaderMapper` requires Jackson to be on the classpath (for the `@KafkaListener`).
NOTE: The `JsonKafkaHeaderMapper` requires Jackson to be on the classpath (for the `@KafkaListener`).
If it is not available, the message converter has no header mapper, so you must configure a `MessagingMessageConverter` with a `SimpleKafkaHeaderMapper`, as shown earlier.

By default, 3 headers are used:
Expand Down
Loading