Skip to content

Commit c939b3e

Browse files
committed
add async/sync acknowledgement result callback in sqs auto configuration
1 parent 88417a0 commit c939b3e

File tree

3 files changed

+117
-0
lines changed

3 files changed

+117
-0
lines changed

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
2626
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
2727
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
28+
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
29+
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
2830
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
2931
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
3032
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
@@ -123,6 +125,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
123125
ObjectProvider<ObservationRegistry> observationRegistry,
124126
ObjectProvider<SqsListenerObservation.Convention> observationConventionProvider,
125127
ObjectProvider<MessageInterceptor<Object>> interceptors,
128+
ObjectProvider<AcknowledgementResultCallback<Object>> acknowledgementResultCallback,
129+
ObjectProvider<AsyncAcknowledgementResultCallback<Object>> asyncAcknowledgementResultCallback,
126130
ObjectProvider<JacksonMessageConverterMigration> messageConverterFactory,
127131
MessagingMessageConverter<?> messagingMessageConverter) {
128132

@@ -133,6 +137,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
133137
errorHandler.ifAvailable(factory::setErrorHandler);
134138
interceptors.forEach(factory::addMessageInterceptor);
135139
asyncInterceptors.forEach(factory::addMessageInterceptor);
140+
acknowledgementResultCallback.ifAvailable(factory::setAcknowledgementResultCallback);
141+
asyncAcknowledgementResultCallback.ifAvailable(factory::setAcknowledgementResultCallback);
136142
messageConverterFactory.ifAvailable(mcf -> mcf.configureLegacyObjectMapper(messagingMessageConverter));
137143
if (this.sqsProperties.isObservationEnabled()) {
138144
observationRegistry

spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import io.awspring.cloud.sqs.listener.ContainerOptions;
2929
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
3030
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
31+
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
32+
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
3133
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
3234
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
3335
import io.awspring.cloud.sqs.operations.SqsTemplate;
@@ -206,6 +208,8 @@ void configuresFactoryComponentsAndOptionsWithDefaults() {
206208
var factory = context.getBean(SqsMessageListenerContainerFactory.class);
207209
assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors").asList()
208210
.isEmpty();
211+
assertThat(factory).extracting("acknowledgementResultCallback").isNull();
212+
assertThat(factory).extracting("asyncAcknowledgementResultCallback").isNull();
209213
assertThat(factory).extracting("containerOptionsBuilder").asInstanceOf(type(ContainerOptionsBuilder.class))
210214
.extracting(ContainerOptionsBuilder::build)
211215
.isInstanceOfSatisfying(ContainerOptions.class, options -> {
@@ -221,6 +225,36 @@ void configuresFactoryComponentsAndOptionsWithDefaults() {
221225
}
222226
// @formatter:on
223227

228+
@Test
229+
void configuresFactoryWithBlockingAcknowledgementCallback() {
230+
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
231+
.withUserConfiguration(BlockingAcknowledgementCallbackConfiguration.class).run(context -> {
232+
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
233+
assertThat(context).hasSingleBean(AcknowledgementResultCallback.class);
234+
235+
SqsMessageListenerContainerFactory<?> factory = context
236+
.getBean(SqsMessageListenerContainerFactory.class);
237+
238+
assertThat(factory).extracting("acknowledgementResultCallback")
239+
.isEqualTo(context.getBean(AcknowledgementResultCallback.class));
240+
});
241+
}
242+
243+
@Test
244+
void configuresFactoryWithAsyncAcknowledgementCallback() {
245+
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
246+
.withUserConfiguration(AsyncAcknowledgementCallbackConfiguration.class).run(context -> {
247+
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
248+
assertThat(context).hasSingleBean(AsyncAcknowledgementResultCallback.class);
249+
250+
SqsMessageListenerContainerFactory<?> factory = context
251+
.getBean(SqsMessageListenerContainerFactory.class);
252+
253+
assertThat(factory).extracting("asyncAcknowledgementResultCallback")
254+
.isEqualTo(context.getBean(AsyncAcknowledgementResultCallback.class));
255+
});
256+
}
257+
224258
@Test
225259
void configuresMessageConverter() {
226260
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
@@ -344,4 +378,26 @@ JsonMapper jsonMapper() {
344378

345379
}
346380

381+
@Configuration(proxyBeanMethods = false)
382+
static class BlockingAcknowledgementCallbackConfiguration {
383+
384+
@Bean
385+
AcknowledgementResultCallback<Object> acknowledgementResultCallback() {
386+
return new AcknowledgementResultCallback<>() {
387+
};
388+
}
389+
390+
}
391+
392+
@Configuration(proxyBeanMethods = false)
393+
static class AsyncAcknowledgementCallbackConfiguration {
394+
395+
@Bean
396+
AsyncAcknowledgementResultCallback<Object> asyncAcknowledgementResultCallback() {
397+
return new AsyncAcknowledgementResultCallback<>() {
398+
};
399+
}
400+
401+
}
402+
347403
}

spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/jackson2/LegacySqsAutoConfigurationTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import io.awspring.cloud.sqs.listener.ContainerOptions;
3434
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
3535
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
36+
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
37+
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
3638
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
3739
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
3840
import io.awspring.cloud.sqs.operations.SqsTemplate;
@@ -224,6 +226,8 @@ void configuresFactoryComponentsAndOptions() {
224226
.getBean(SqsMessageListenerContainerFactory.class);
225227
assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors")
226228
.asList().isNotEmpty();
229+
assertThat(factory).extracting("acknowledgementResultCallback").isNull();
230+
assertThat(factory).extracting("asyncAcknowledgementResultCallback").isNull();
227231
assertThat(factory).extracting("containerOptionsBuilder")
228232
.asInstanceOf(type(ContainerOptionsBuilder.class))
229233
.extracting(ContainerOptionsBuilder::build)
@@ -245,6 +249,36 @@ void configuresFactoryComponentsAndOptions() {
245249
});
246250
}
247251

252+
@Test
253+
void configuresFactoryWithBlockingAcknowledgementCallback() {
254+
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
255+
.withUserConfiguration(BlockingAcknowledgementCallbackConfiguration.class).run(context -> {
256+
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
257+
assertThat(context).hasSingleBean(AcknowledgementResultCallback.class);
258+
259+
SqsMessageListenerContainerFactory<?> factory = context
260+
.getBean(SqsMessageListenerContainerFactory.class);
261+
262+
assertThat(factory).extracting("acknowledgementResultCallback")
263+
.isEqualTo(context.getBean(AcknowledgementResultCallback.class));
264+
});
265+
}
266+
267+
@Test
268+
void configuresFactoryWithAsyncAcknowledgementCallback() {
269+
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
270+
.withUserConfiguration(AsyncAcknowledgementCallbackConfiguration.class).run(context -> {
271+
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
272+
assertThat(context).hasSingleBean(AsyncAcknowledgementResultCallback.class);
273+
274+
SqsMessageListenerContainerFactory<?> factory = context
275+
.getBean(SqsMessageListenerContainerFactory.class);
276+
277+
assertThat(factory).extracting("asyncAcknowledgementResultCallback")
278+
.isEqualTo(context.getBean(AsyncAcknowledgementResultCallback.class));
279+
});
280+
}
281+
248282
@Test
249283
void configuresObjectMapper() {
250284
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
@@ -364,4 +398,25 @@ MessagingMessageConverter<Message> messageConverter() {
364398

365399
}
366400

401+
@Configuration(proxyBeanMethods = false)
402+
static class BlockingAcknowledgementCallbackConfiguration {
403+
404+
@Bean
405+
AcknowledgementResultCallback<Object> acknowledgementResultCallback() {
406+
return new AcknowledgementResultCallback<>() {
407+
};
408+
}
409+
410+
}
411+
412+
@Configuration(proxyBeanMethods = false)
413+
static class AsyncAcknowledgementCallbackConfiguration {
414+
415+
@Bean
416+
AsyncAcknowledgementResultCallback<Object> asyncAcknowledgementResultCallback() {
417+
return new AsyncAcknowledgementResultCallback<>() {
418+
};
419+
}
420+
421+
}
367422
}

0 commit comments

Comments
 (0)