Skip to content

Commit 80990af

Browse files
artembilantzolov
authored andcommitted
Fix JMS Inbound Endpoints for observation
The `JmsMessageDrivenEndpoint` delegates all the hard work to the `ChannelPublishingJmsMessageListener`, but missed to propagate an `ObservationRegistry` and other related options. The `JmsInboundGateway` is worse: it delegated to the `JmsMessageDrivenEndpoint` * Add `IntegrationObservation.HANDLER` observation to the `MessagingGatewaySupport.send()` operation: used by the delegate in the `ChannelPublishingJmsMessageListener` * Expose and propagate observation-related options from `JmsInboundGateway` and `JmsMessageDrivenEndpoint` * Expose `observationConvention()` option on the `MessagingGatewaySpec` and `MessageProducerSpec` * Remove unused imports * Do not start a new `RECEIVER` observation if there is already `SERVER` one * Fix `MessagingGatewaySupport` for `Observation.NOOP` check. The parent process may still use `ObservationRegistry.NOOP` which sets `Observation.NOOP` instance into the current context and thread local. **Cherry-pick to `6.1.x` & `6.0.x`**
1 parent 78b09ed commit 80990af

File tree

8 files changed

+207
-34
lines changed

8 files changed

+207
-34
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,7 @@ project('spring-integration-jms') {
742742
testImplementation "org.apache.activemq:artemis-jakarta-client:$artemisVersion"
743743
testImplementation 'org.springframework:spring-oxm'
744744
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
745+
testImplementation 'io.micrometer:micrometer-observation-test'
745746
}
746747
}
747748

spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageProducerSpec.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.springframework.integration.endpoint.MessageProducerSupport;
2020
import org.springframework.integration.support.ErrorMessageStrategy;
21+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
2122
import org.springframework.lang.Nullable;
2223
import org.springframework.messaging.MessageChannel;
2324

@@ -152,4 +153,16 @@ public S errorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
152153
return _this();
153154
}
154155

156+
/**
157+
* Provide a custom {@link MessageReceiverObservationConvention}.
158+
* @param observationConvention the observation convention to use.
159+
* @return the spec.
160+
* @since 6.0.8
161+
* @see MessageProducerSupport#setObservationConvention(MessageReceiverObservationConvention)
162+
*/
163+
public S observationConvention(MessageReceiverObservationConvention observationConvention) {
164+
this.target.setObservationConvention(observationConvention);
165+
return _this();
166+
}
167+
155168
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/MessagingGatewaySpec.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.springframework.integration.gateway.MessagingGatewaySupport;
2020
import org.springframework.integration.mapping.InboundMessageMapper;
2121
import org.springframework.integration.mapping.OutboundMessageMapper;
22+
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
2223
import org.springframework.lang.Nullable;
2324
import org.springframework.messaging.MessageChannel;
2425

@@ -205,4 +206,16 @@ public S shouldTrack(boolean shouldTrack) {
205206
return _this();
206207
}
207208

209+
/**
210+
* Provide a custom {@link MessageRequestReplyReceiverObservationConvention}.
211+
* @param observationConvention the observation convention to use.
212+
* @return the spec.
213+
* @since 6.0.8
214+
* @see MessagingGatewaySupport#setObservationConvention(MessageRequestReplyReceiverObservationConvention)
215+
*/
216+
public S observationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) {
217+
this.target.setObservationConvention(observationConvention);
218+
return _this();
219+
}
220+
208221
}

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Set;
2121
import java.util.concurrent.ConcurrentHashMap;
2222

23+
import io.micrometer.observation.Observation;
2324
import io.micrometer.observation.ObservationRegistry;
2425
import org.reactivestreams.Publisher;
2526
import org.reactivestreams.Subscriber;
@@ -55,12 +56,16 @@
5556
import org.springframework.integration.support.converter.SimpleMessageConverter;
5657
import org.springframework.integration.support.management.IntegrationInboundManagement;
5758
import org.springframework.integration.support.management.IntegrationManagedResource;
59+
import org.springframework.integration.support.management.TrackableComponent;
5860
import org.springframework.integration.support.management.metrics.MeterFacade;
5961
import org.springframework.integration.support.management.metrics.MetricsCaptor;
6062
import org.springframework.integration.support.management.metrics.SampleFacade;
6163
import org.springframework.integration.support.management.metrics.TimerFacade;
64+
import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention;
6265
import org.springframework.integration.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention;
6366
import org.springframework.integration.support.management.observation.IntegrationObservation;
67+
import org.springframework.integration.support.management.observation.MessageReceiverContext;
68+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
6469
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverContext;
6570
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
6671
import org.springframework.lang.Nullable;
@@ -89,7 +94,7 @@
8994
*/
9095
@IntegrationManagedResource
9196
public abstract class MessagingGatewaySupport extends AbstractEndpoint
92-
implements org.springframework.integration.support.management.TrackableComponent,
97+
implements TrackableComponent,
9398
IntegrationInboundManagement, IntegrationPattern {
9499

95100
protected final ConvertingMessagingTemplate messagingTemplate; // NOSONAR
@@ -140,6 +145,8 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint
140145
@Nullable
141146
private MessageRequestReplyReceiverObservationConvention observationConvention;
142147

148+
private MessageReceiverObservationConvention receiverObservationConvention;
149+
143150
private volatile AbstractEndpoint replyMessageCorrelator;
144151

145152
private volatile boolean initialized;
@@ -381,6 +388,10 @@ public void setObservationConvention(
381388
this.observationConvention = observationConvention;
382389
}
383390

391+
public void setReceiverObservationConvention(MessageReceiverObservationConvention receiverObservationConvention) {
392+
this.receiverObservationConvention = receiverObservationConvention;
393+
}
394+
384395
@Override
385396
protected void onInit() {
386397
Assert.state(!(this.requestChannelName != null && this.requestChannel != null),
@@ -458,27 +469,65 @@ protected void send(Object object) {
458469
MessageChannel channel = getRequestChannel();
459470
Assert.state(channel != null,
460471
"send is not supported, because no request channel has been configured");
461-
SampleFacade sample = null;
462-
if (this.metricsCaptor != null) {
463-
sample = this.metricsCaptor.start();
472+
473+
Message<?> requestMessage = this.messagingTemplate.doConvert(object, null, this.historyWritingPostProcessor);
474+
475+
if (!ObservationRegistry.NOOP.equals(this.observationRegistry)
476+
&& (this.observationRegistry.getCurrentObservation() == null
477+
|| Observation.NOOP.equals(this.observationRegistry.getCurrentObservation()))) {
478+
479+
sendWithObservation(channel, requestMessage);
480+
}
481+
else if (this.metricsCaptor != null) {
482+
sendWithMetrics(channel, requestMessage);
464483
}
484+
else {
485+
doSend(channel, requestMessage);
486+
}
487+
}
488+
489+
private void sendWithObservation(MessageChannel channel, Message<?> message) {
465490
try {
466-
this.messagingTemplate.convertAndSend(channel, object, this.historyWritingPostProcessor);
467-
if (sample != null) {
468-
sample.stop(sendTimer());
469-
}
491+
IntegrationObservation.HANDLER.observation(
492+
this.receiverObservationConvention,
493+
DefaultMessageReceiverObservationConvention.INSTANCE,
494+
() -> new MessageReceiverContext(message, getComponentName()),
495+
this.observationRegistry)
496+
.observe(() -> this.messagingTemplate.send(channel, message));
470497
}
471-
catch (Exception e) {
472-
if (sample != null) {
473-
sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
474-
}
475-
MessageChannel errorChan = getErrorChannel();
476-
if (errorChan != null) {
477-
this.messagingTemplate.send(errorChan, new ErrorMessage(e));
478-
}
479-
else {
480-
rethrow(e, "failed to send message");
481-
}
498+
catch (Exception ex) {
499+
sendErrorMessage(ex, message);
500+
}
501+
}
502+
503+
private void sendWithMetrics(MessageChannel channel, Message<?> message) {
504+
SampleFacade sample = this.metricsCaptor.start();
505+
try {
506+
this.messagingTemplate.send(channel, message);
507+
sample.stop(sendTimer());
508+
}
509+
catch (Exception ex) {
510+
sample.stop(buildSendTimer(false, ex.getClass().getSimpleName()));
511+
sendErrorMessage(ex, message);
512+
}
513+
}
514+
515+
private void doSend(MessageChannel channel, Message<?> message) {
516+
try {
517+
this.messagingTemplate.send(channel, message);
518+
}
519+
catch (Exception ex) {
520+
sendErrorMessage(ex, message);
521+
}
522+
}
523+
524+
private void sendErrorMessage(Exception exception, Message<?> failedMessage) {
525+
MessageChannel errorChan = getErrorChannel();
526+
if (errorChan != null) {
527+
this.messagingTemplate.send(errorChan, buildErrorMessage(failedMessage, exception));
528+
}
529+
else {
530+
rethrow(exception, "failed to send message");
482531
}
483532
}
484533

spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Map;
2020

21+
import io.micrometer.observation.ObservationRegistry;
2122
import jakarta.jms.DeliveryMode;
2223
import jakarta.jms.Destination;
2324
import jakarta.jms.InvalidDestinationException;
@@ -38,6 +39,9 @@
3839
import org.springframework.integration.support.DefaultMessageBuilderFactory;
3940
import org.springframework.integration.support.MessageBuilderFactory;
4041
import org.springframework.integration.support.management.TrackableComponent;
42+
import org.springframework.integration.support.management.metrics.MetricsCaptor;
43+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
44+
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
4145
import org.springframework.integration.support.utils.IntegrationUtils;
4246
import org.springframework.jms.listener.SessionAwareMessageListener;
4347
import org.springframework.jms.support.JmsUtils;
@@ -323,6 +327,26 @@ public void setExtractReplyPayload(boolean extractReplyPayload) {
323327
this.extractReplyPayload = extractReplyPayload;
324328
}
325329

330+
public void setMetricsCaptor(MetricsCaptor captor) {
331+
this.gatewayDelegate.registerMetricsCaptor(captor);
332+
}
333+
334+
public void setObservationRegistry(ObservationRegistry observationRegistry) {
335+
this.gatewayDelegate.registerObservationRegistry(observationRegistry);
336+
}
337+
338+
public void setRequestReplyObservationConvention(
339+
@Nullable MessageRequestReplyReceiverObservationConvention observationConvention) {
340+
341+
this.gatewayDelegate.setObservationConvention(observationConvention);
342+
}
343+
344+
public void setReceiverObservationConvention(
345+
@Nullable MessageReceiverObservationConvention observationConvention) {
346+
347+
this.gatewayDelegate.setReceiverObservationConvention(observationConvention);
348+
}
349+
326350
@Override
327351
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
328352
this.beanFactory = beanFactory;

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,14 @@
1616

1717
package org.springframework.integration.jms;
1818

19+
import io.micrometer.observation.ObservationRegistry;
20+
1921
import org.springframework.beans.BeansException;
2022
import org.springframework.context.ApplicationContext;
2123
import org.springframework.integration.context.OrderlyShutdownCapable;
2224
import org.springframework.integration.gateway.MessagingGatewaySupport;
25+
import org.springframework.integration.support.management.metrics.MetricsCaptor;
26+
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
2327
import org.springframework.jms.listener.AbstractMessageListenerContainer;
2428
import org.springframework.messaging.MessageChannel;
2529

@@ -114,28 +118,38 @@ public void setShutdownContainerOnStop(boolean shutdownContainerOnStop) {
114118
this.endpoint.setShutdownContainerOnStop(shutdownContainerOnStop);
115119
}
116120

121+
@Override
122+
public void registerMetricsCaptor(MetricsCaptor metricsCaptorToRegister) {
123+
super.registerMetricsCaptor(metricsCaptorToRegister);
124+
this.endpoint.registerMetricsCaptor(metricsCaptorToRegister);
125+
}
117126

118127
@Override
119-
public String getComponentType() {
120-
return this.endpoint.getComponentType();
128+
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
129+
super.registerObservationRegistry(observationRegistry);
130+
this.endpoint.registerObservationRegistry(observationRegistry);
121131
}
122132

123133
@Override
124-
public void setComponentName(String componentName) {
125-
super.setComponentName(componentName);
126-
this.endpoint.setComponentName(getComponentName());
134+
public void setObservationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) {
135+
super.setObservationConvention(observationConvention);
136+
this.endpoint.getListener().setRequestReplyObservationConvention(observationConvention);
137+
}
138+
139+
@Override
140+
public String getComponentType() {
141+
return this.endpoint.getComponentType();
127142
}
128143

129144
@Override
130145
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
131146
super.setApplicationContext(applicationContext);
132147
this.endpoint.setApplicationContext(applicationContext);
133-
this.endpoint.setBeanFactory(applicationContext);
134-
this.endpoint.getListener().setBeanFactory(applicationContext);
135148
}
136149

137150
@Override
138151
protected void onInit() {
152+
this.endpoint.setComponentName(getComponentName());
139153
this.endpoint.afterPropertiesSet();
140154
}
141155

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,15 @@
1616

1717
package org.springframework.integration.jms;
1818

19+
import io.micrometer.observation.ObservationRegistry;
20+
1921
import org.springframework.beans.BeansException;
2022
import org.springframework.context.ApplicationContext;
2123
import org.springframework.integration.context.OrderlyShutdownCapable;
2224
import org.springframework.integration.endpoint.MessageProducerSupport;
2325
import org.springframework.integration.jms.util.JmsAdapterUtils;
26+
import org.springframework.integration.support.management.metrics.MetricsCaptor;
27+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
2428
import org.springframework.jms.listener.AbstractMessageListenerContainer;
2529
import org.springframework.jms.listener.DefaultMessageListenerContainer;
2630
import org.springframework.messaging.MessageChannel;
@@ -91,7 +95,7 @@ private JmsMessageDrivenEndpoint(AbstractMessageListenerContainer listenerContai
9195
* container setting even if an external container is provided. Defaults to null
9296
* (won't change container) if an external container is provided or `transacted` when
9397
* the framework creates an implicit {@link DefaultMessageListenerContainer}.
94-
* @param sessionAcknowledgeMode the acknowledge mode.
98+
* @param sessionAcknowledgeMode the acknowledgement mode.
9599
*/
96100
public void setSessionAcknowledgeMode(String sessionAcknowledgeMode) {
97101
this.sessionAcknowledgeMode = sessionAcknowledgeMode;
@@ -134,9 +138,9 @@ public void setShouldTrack(boolean shouldTrack) {
134138
}
135139

136140
/**
137-
* Set to false to prevent listener container shutdown when the endpoint is stopped.
141+
* Set to {@code false} to prevent listener container shutdown when the endpoint is stopped.
138142
* Then, if so configured, any cached consumer(s) in the container will remain.
139-
* Otherwise the shared connection and will be closed and the listener invokers shut
143+
* Otherwise, the shared connection and will be closed and the listener invokers shut
140144
* down; this behavior is new starting with version 5.1. Default: true.
141145
* @param shutdownContainerOnStop false to not shutdown.
142146
* @since 5.1
@@ -149,6 +153,24 @@ public ChannelPublishingJmsMessageListener getListener() {
149153
return this.listener;
150154
}
151155

156+
@Override
157+
public void registerMetricsCaptor(MetricsCaptor captor) {
158+
super.registerMetricsCaptor(captor);
159+
this.listener.setMetricsCaptor(captor);
160+
}
161+
162+
@Override
163+
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
164+
super.registerObservationRegistry(observationRegistry);
165+
this.listener.setObservationRegistry(observationRegistry);
166+
}
167+
168+
@Override
169+
public void setObservationConvention(MessageReceiverObservationConvention observationConvention) {
170+
super.setObservationConvention(observationConvention);
171+
this.listener.setReceiverObservationConvention(observationConvention);
172+
}
173+
152174
@Override
153175
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
154176
super.setApplicationContext(applicationContext);

0 commit comments

Comments
 (0)