Skip to content

Commit 561721f

Browse files
dzmitry-dulkotomazfernandes
authored andcommitted
fix: polling future removed on exception (#1455)
(cherry picked from commit 287ce48)
1 parent 42f7a77 commit 561721f

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2022 the original author or authors.
2+
* Copyright 2013-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -320,7 +320,7 @@ private Collection<S> resetBackOffContext(Collection<S> messages) {
320320

321321
private <F> CompletableFuture<F> managePollingFuture(CompletableFuture<F> pollingFuture) {
322322
this.pollingFutures.add(pollingFuture);
323-
pollingFuture.thenRun(() -> this.pollingFutures.remove(pollingFuture));
323+
pollingFuture.whenComplete((result, throwable) -> this.pollingFutures.remove(pollingFuture));
324324
return pollingFuture;
325325
}
326326

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2023 the original author or authors.
2+
* Copyright 2013-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -353,6 +353,47 @@ else if (currentPoll.compareAndSet(2, 3)) {
353353

354354
}
355355

356+
@Test
357+
void shouldRemovePollingFutureOnException() throws InterruptedException {
358+
String testName = "shouldClearPollingFuturesOnException";
359+
360+
SemaphoreBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler.builder()
361+
.acquireTimeout(Duration.ofMillis(100)).batchSize(10).totalPermits(10)
362+
.throughputConfiguration(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES).build();
363+
364+
AbstractPollingMessageSource<Object, Message> source = new AbstractPollingMessageSource<>() {
365+
@Override
366+
protected CompletableFuture<Collection<Message>> doPollForMessages(int messagesToRequest) {
367+
return CompletableFuture.failedFuture(new RuntimeException("Simulating a polling error"));
368+
}
369+
};
370+
371+
BackOffPolicy policy = mock(BackOffPolicy.class);
372+
BackOffContext ctx = mock(BackOffContext.class);
373+
given(policy.start(null)).willReturn(ctx);
374+
375+
source.setBackPressureHandler(backPressureHandler);
376+
source.setMessageSink((msgs, context) -> CompletableFuture.completedFuture(null));
377+
source.setId(testName + " source");
378+
source.setPollingEndpointName("test-queue");
379+
source.configure(SqsContainerOptions.builder().pollBackOffPolicy(policy).build());
380+
source.setTaskExecutor(createTaskExecutor(testName));
381+
source.setAcknowledgementProcessor(getNoOpsAcknowledgementProcessor());
382+
383+
@SuppressWarnings("unchecked")
384+
Collection<CompletableFuture<?>> futures = (Collection<CompletableFuture<?>>) ReflectionTestUtils
385+
.getField(source, "pollingFutures");
386+
// Verify that the pollingFutures collection is initially empty
387+
assertThat(futures).isEmpty();
388+
389+
source.start();
390+
391+
// Verify that the pollingFutures collection is empty after the exceptional completion
392+
assertThat(futures).isEmpty();
393+
394+
source.stop();
395+
}
396+
356397
private static boolean doAwait(CountDownLatch processingLatch) {
357398
try {
358399
return processingLatch.await(4, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)