Skip to content

Commit 00c75ac

Browse files
joshisteartembilan
authored andcommitted
PostgresChannelMessageTableSubscriber: Renew connection only if invalid
Fixes: #9111 An evolution of the #9061: renew the connection only when we need to. (cherry picked from commit da29e2d) # Conflicts: # spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java
1 parent cb5d20e commit 00c75ac

File tree

2 files changed

+84
-56
lines changed

2 files changed

+84
-56
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 67 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -169,70 +169,84 @@ public synchronized void start() {
169169
this.latch = new CountDownLatch(1);
170170

171171
CountDownLatch startingLatch = new CountDownLatch(1);
172-
this.future = executorToUse.submit(() -> {
173-
try {
174-
while (isActive()) {
175-
try {
176-
PgConnection conn = this.connectionSupplier.get();
177-
try (Statement stmt = conn.createStatement()) {
178-
stmt.execute("LISTEN " + this.tablePrefix.toLowerCase() + "channel_message_notify");
172+
this.future = executorToUse.submit(() -> doStart(startingLatch));
173+
174+
try {
175+
if (!startingLatch.await(5, TimeUnit.SECONDS)) {
176+
throw new IllegalStateException("Failed to start " + this);
177+
}
178+
}
179+
catch (InterruptedException ex) {
180+
Thread.currentThread().interrupt();
181+
throw new IllegalStateException("Failed to start " + this, ex);
182+
}
183+
}
184+
185+
private void doStart(CountDownLatch startingLatch) {
186+
try {
187+
while (isActive()) {
188+
try {
189+
PgConnection conn = this.connectionSupplier.get();
190+
try (Statement stmt = conn.createStatement()) {
191+
stmt.execute("LISTEN " + this.tablePrefix.toLowerCase() + "channel_message_notify");
192+
}
193+
catch (Exception ex) {
194+
try {
195+
conn.close();
196+
}
197+
catch (Exception suppressed) {
198+
ex.addSuppressed(suppressed);
179199
}
180-
catch (Exception ex) {
181-
try {
182-
conn.close();
200+
throw ex;
201+
}
202+
this.subscriptionsMap.values()
203+
.forEach(subscriptions -> subscriptions.forEach(Subscription::notifyUpdate));
204+
try {
205+
this.connection = conn;
206+
while (isActive()) {
207+
startingLatch.countDown();
208+
209+
PGNotification[] notifications = conn.getNotifications((int) this.notificationTimeout.toMillis());
210+
// Unfortunately, there is no good way of interrupting a notification
211+
// poll but by closing its connection.
212+
if (!isActive()) {
213+
return;
183214
}
184-
catch (Exception suppressed) {
185-
ex.addSuppressed(suppressed);
215+
if ((notifications == null || notifications.length == 0) && !conn.isValid(1)) {
216+
//We did not receive any notifications within the timeout period.
217+
//If the connection is still valid, we will continue polling
218+
//Otherwise, we will close the connection and re-establish it.
219+
break;
186220
}
187-
throw ex;
188-
}
189-
this.subscriptionsMap.values()
190-
.forEach(subscriptions -> subscriptions.forEach(Subscription::notifyUpdate));
191-
try {
192-
this.connection = conn;
193-
while (isActive()) {
194-
startingLatch.countDown();
195-
196-
PGNotification[] notifications = conn.getNotifications((int) this.notificationTimeout.toMillis());
197-
// Unfortunately, there is no good way of interrupting a notification
198-
// poll but by closing its connection.
199-
if (!isActive()) {
200-
return;
221+
for (PGNotification notification : notifications) {
222+
String parameter = notification.getParameter();
223+
Set<Subscription> subscriptions = this.subscriptionsMap.get(parameter);
224+
if (subscriptions == null) {
225+
continue;
201226
}
202-
if (notifications == null || notifications.length == 0) {
203-
//We did not receive any notifications within the timeout period.
204-
//We will close the connection and re-establish it.
205-
break;
206-
}
207-
for (PGNotification notification : notifications) {
208-
String parameter = notification.getParameter();
209-
Set<Subscription> subscriptions = this.subscriptionsMap.get(parameter);
210-
if (subscriptions == null) {
211-
continue;
212-
}
213-
for (Subscription subscription : subscriptions) {
214-
subscription.notifyUpdate();
215-
}
227+
for (Subscription subscription : subscriptions) {
228+
subscription.notifyUpdate();
216229
}
217230
}
218-
}
219-
finally {
220-
conn.close();
231+
221232
}
222233
}
223-
catch (Exception e) {
224-
// The getNotifications method does not throw a meaningful message on interruption.
225-
// Therefore, we do not log an error, unless it occurred while active.
226-
if (isActive()) {
227-
LOGGER.error(e, "Failed to poll notifications from Postgres database");
228-
}
234+
finally {
235+
conn.close();
236+
}
237+
}
238+
catch (Exception e) {
239+
// The getNotifications method does not throw a meaningful message on interruption.
240+
// Therefore, we do not log an error, unless it occurred while active.
241+
if (isActive()) {
242+
LOGGER.error(e, "Failed to poll notifications from Postgres database");
229243
}
230244
}
231245
}
232-
finally {
233-
this.latch.countDown();
234-
}
235-
});
246+
}
247+
finally {
248+
this.latch.countDown();
249+
}
236250

237251
try {
238252
if (!startingLatch.await(5, TimeUnit.SECONDS)) {

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.List;
2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.concurrent.atomic.AtomicInteger;
2728
import java.util.concurrent.atomic.AtomicReference;
29+
import java.util.function.Consumer;
2830

2931
import javax.sql.DataSource;
3032

@@ -284,7 +286,18 @@ public void testRenewConnection() throws Exception {
284286
CountDownLatch latch = new CountDownLatch(2);
285287
List<Object> payloads = new ArrayList<>();
286288
CountDownLatch connectionLatch = new CountDownLatch(2);
287-
connectionSupplier.onGetConnection = connectionLatch::countDown;
289+
AtomicBoolean connectionCloseState = new AtomicBoolean();
290+
connectionSupplier.onGetConnection = conn -> {
291+
connectionLatch.countDown();
292+
if (connectionCloseState.compareAndSet(false, true)) {
293+
try {
294+
conn.close();
295+
}
296+
catch (Exception e) {
297+
//nop
298+
}
299+
}
300+
};
288301
postgresChannelMessageTableSubscriber.start();
289302
postgresSubscribableChannel.subscribe(message -> {
290303
payloads.add(message.getPayload());
@@ -340,7 +353,7 @@ public JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) {
340353

341354
private static class ConnectionSupplier implements PgConnectionSupplier {
342355

343-
Runnable onGetConnection;
356+
Consumer<PgConnection> onGetConnection;
344357

345358
@Override
346359
public PgConnection get() throws SQLException {
@@ -349,10 +362,11 @@ public PgConnection get() throws SQLException {
349362
POSTGRES_CONTAINER.getPassword())
350363
.unwrap(PgConnection.class);
351364
if (this.onGetConnection != null) {
352-
this.onGetConnection.run();
365+
this.onGetConnection.accept(conn);
353366
}
354367
return conn;
355368
}
356369

357370
}
371+
358372
}

0 commit comments

Comments
 (0)