Skip to content

Commit 3305f97

Browse files
committed
Configure interceptors on container factory
This changes allows configuring interceptors on AbstractKafkaListenerContainerFactory in same way as on KafkaMessageListenerContainer, and changes the example docs to show this instead. Signed-off-by: Christian Fredriksson <[email protected]>
1 parent d92a586 commit 3305f97

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,24 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t
2222

2323
The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.
2424

25-
Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` and `getBatchInterceptor()` as public methods.
25+
Starting with version 4.0, `AbstractMessageListenerContainer` and `AbstractKafkaListenerContainerFactory` exposes `getRecordInterceptor()` and `getBatchInterceptor()` as public methods.
2626
If the returned interceptor is an instance of `CompositeRecordInterceptor` or `CompositeBatchInterceptor`, additional `RecordInterceptor` or `BatchInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` or `BatchInterceptor` has already been configured.
27-
The following example shows how to do so:
27+
The following examples shows how to do so:
2828

2929
[source, java]
3030
----
31-
public void configureRecordInterceptor(KafkaMessageListenerContainer<Integer, String> container) {
31+
public void configureRecordInterceptor(AbstractKafkaListenerContainerFactory<Integer, String> containerFactory) {
3232
CompositeRecordInterceptor compositeInterceptor;
3333
34-
RecordInterceptor<Integer, String> previousInterceptor = container.getRecordInterceptor();
34+
RecordInterceptor<Integer, String> previousInterceptor = containerFactory.getRecordInterceptor();
3535
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
3636
compositeInterceptor = interceptor;
3737
} else {
3838
compositeInterceptor = new CompositeRecordInterceptor<>();
39-
container.setRecordInterceptor(compositeInterceptor);
40-
}
41-
42-
if (previousInterceptor != null) {
43-
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
39+
containerFactory.setRecordInterceptor(compositeInterceptor);
40+
if (previousInterceptor != null) {
41+
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
42+
}
4443
}
4544
4645
RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
* @author Stephane Nicoll
6262
* @author Gary Russell
6363
* @author Artem Bilan
64+
* @author Christian Fredriksson
6465
*
6566
* @see AbstractMessageListenerContainer
6667
*/
@@ -275,6 +276,15 @@ public ContainerProperties getContainerProperties() {
275276
return this.containerProperties;
276277
}
277278

279+
/**
280+
* Get the {@link RecordInterceptor} for modification, if configured.
281+
* @return the {@link RecordInterceptor}, or {@code null} if not configured
282+
* @since 4.0
283+
*/
284+
public @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
285+
return this.recordInterceptor;
286+
}
287+
278288
/**
279289
* Set an interceptor to be called before calling the listener.
280290
* Only used with record listeners.
@@ -286,6 +296,15 @@ public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
286296
this.recordInterceptor = recordInterceptor;
287297
}
288298

299+
/**
300+
* Get the {@link BatchInterceptor} for modification, if configured.
301+
* @return the {@link BatchInterceptor}, or {@code null} if not configured
302+
* @since 4.0
303+
*/
304+
public @Nullable BatchInterceptor<K, V> getBatchInterceptor() {
305+
return this.batchInterceptor;
306+
}
307+
289308
/**
290309
* Set a batch interceptor to be called before and after calling the listener.
291310
* Only used with batch listeners.

0 commit comments

Comments
 (0)