Skip to content

Commit 85b3aee

Browse files
Igor Macedo Quintanilhasobychacko
authored andcommitted
GH-3944: support per-record observations in batch listeners
Fixes #3944 Signed-off-by: Igor Macedo Quintanilha <[email protected]> Docs updates and minor cleanup Signed-off-by: Soby Chacko <[email protected]>
1 parent c491502 commit 85b3aee

File tree

6 files changed

+458
-4
lines changed

6 files changed

+458
-4
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,29 @@ The `record` property in both observation contexts contains the `ConsumerRecord`
119119
The sender and receiver contexts `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`.
120120
If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual `clusterId` on the `KafkaAdmin` and inject it into ``KafkaTemplate``s and listener containers.
121121
When it is `null` (default), the admin will invoke the `describeCluster` admin operation to retrieve it from the broker.
122+
123+
[[batch-listener-obs]]
124+
=== Batch Listener Observations
125+
126+
When using a batch listener, by default, no observations are created, even if a `ObservationRegistry` is present.
127+
This is because the scope of an observation is tied to the thread, and with a batch listener, there is no one-to-one mapping between an observation and a record.
128+
129+
To enable per-record observations in a batch listener, set the container factory property `recordObservationsInBatch` to `true`.
130+
131+
[source,java]
132+
----
133+
@Bean
134+
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
135+
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
136+
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
137+
138+
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
139+
configurer.configure(factory, kafkaConsumerFactory);
140+
factory.getContainerProperties().setRecordObservationsInBatch(true);
141+
return factory;
142+
}
143+
----
144+
145+
When this property is `true`, an observation will be created for each record in the batch, but the observation is not propagated to the listener method.
146+
The application can then use the observation context to track the processing of each record in the batch.
147+
This allows you to have visibility into the processing of each record, even within a batch context.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,9 @@ More details are available in xref:kafka/headers.adoc#multi-value-header[Support
8282

8383
Listener containers now support interceptor customization via `getRecordInterceptor()`.
8484
See the xref:kafka/receiving-messages/message-listener-container.adoc#message-listener-container[Message Listener Containers] section for details.
85+
86+
[[x40-batch-observability]]
87+
=== Per-Record Observation in Batch Listeners
88+
89+
It is now possible to get an observation for each record when using a batch listener.
90+
See xref:kafka/micrometer.adoc#batch-listener-obs[Observability for Batch Listeners] for more information.

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,8 @@ public enum EOSMode {
310310

311311
private boolean restartAfterAuthExceptions;
312312

313+
private boolean recordObservationsInBatch;
314+
313315
/**
314316
* Create properties for a container that will subscribe to the specified topics.
315317
* @param topics the topics.
@@ -1091,6 +1093,27 @@ public void setRestartAfterAuthExceptions(boolean restartAfterAuthExceptions) {
10911093
this.restartAfterAuthExceptions = restartAfterAuthExceptions;
10921094
}
10931095

1096+
/**
1097+
* When true, and a batch listener is configured with observation enabled, an observation
1098+
* will be started for each record in the batch.
1099+
* @return recordObservationsInBatch.
1100+
* @since 4.0
1101+
*/
1102+
public boolean isRecordObservationsInBatch() {
1103+
return this.recordObservationsInBatch;
1104+
}
1105+
1106+
/**
1107+
* Set whether to enable individual record observations in a batch.
1108+
* When true, and a batch listener is configured with observation enabled, an observation
1109+
* will be started for each record in the batch. Default false.
1110+
* @param recordObservationsInBatch true to enable individual record observations.
1111+
* @since 4.0
1112+
*/
1113+
public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {
1114+
this.recordObservationsInBatch = recordObservationsInBatch;
1115+
}
1116+
10941117
@Override
10951118
public String toString() {
10961119
return "ContainerProperties ["
@@ -1141,6 +1164,7 @@ public String toString() {
11411164
? "\n observationRegistry=" + this.observationRegistry
11421165
: "")
11431166
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
1167+
+ "\n recordObservationsInBatch=" + this.recordObservationsInBatch
11441168
+ "\n]";
11451169
}
11461170

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
901901
this.isBatchListener = true;
902902
this.wantsFullRecords = this.batchListener.wantsPollResult();
903903
this.pollThreadStateProcessor = setUpPollProcessor(true);
904-
this.observationEnabled = false;
904+
this.observationEnabled = this.containerProperties.isObservationEnabled() && this.containerProperties.isRecordObservationsInBatch();
905905
}
906906
else if (listener instanceof MessageListener) {
907907
this.listener = (MessageListener<K, V>) listener;
@@ -2426,6 +2426,21 @@ private void ackBatch(final ConsumerRecords<K, V> records) throws InterruptedExc
24262426
}
24272427
}
24282428

2429+
private void invokeBatchWithIndividualRecordObservation(List<ConsumerRecord<K, V>> recordList) {
2430+
// Create individual observations for each record without scopes
2431+
for (ConsumerRecord<K, V> record : recordList) {
2432+
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
2433+
this.containerProperties.getObservationConvention(),
2434+
DefaultKafkaListenerObservationConvention.INSTANCE,
2435+
() -> new KafkaRecordReceiverContext(record, getListenerId(), getClientId(), this.consumerGroupId,
2436+
this::clusterId),
2437+
this.observationRegistry);
2438+
observation.observe(() -> {
2439+
this.logger.debug(() -> "Observing record in batch: " + KafkaUtils.format(record));
2440+
});
2441+
}
2442+
}
2443+
24292444
private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> recordsArg,
24302445
List<ConsumerRecord<K, V>> recordListArg) {
24312446

@@ -2446,7 +2461,13 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
24462461
}
24472462
}
24482463
Object sample = startMicrometerSample();
2464+
2465+
24492466
try {
2467+
if (this.observationEnabled) {
2468+
invokeBatchWithIndividualRecordObservation(recordList);
2469+
}
2470+
24502471
if (this.wantsFullRecords) {
24512472
Objects.requireNonNull(this.batchListener).onMessage(records, // NOSONAR
24522473
this.isAnyManualAck

0 commit comments

Comments
 (0)