Skip to content

Commit e66741c

Browse files
artembilanspring-builds
authored andcommitted
GH-9481: Fix JdbcMetadataStore for PostgreSQL & MySQL
Fixes: #9481 Issue link: #9481 MySQL throws `CannotAcquireLockException` in case of duplicate key failure. PostgreSQL just rollbacks a transaction not letting us move on with a `SELECT` * Include `TransientDataAccessException` to the catch block of the `INSERT` to ignore it for the subsequent `SELECT` * Add logic to determine `PostgreSQL` database vendor and include `ON CONFLICT DO NOTHING` hint into the `INSERT` to not fail in case of duplicate key found on `putIfAbsent` operation (cherry picked from commit 98d0266)
1 parent d993e4c commit e66741c

File tree

6 files changed

+213
-6
lines changed

6 files changed

+213
-6
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/metadata/JdbcMetadataStore.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import org.springframework.core.log.LogAccessor;
2626
import org.springframework.dao.DataIntegrityViolationException;
2727
import org.springframework.dao.EmptyResultDataAccessException;
28+
import org.springframework.dao.TransientDataAccessException;
2829
import org.springframework.integration.metadata.ConcurrentMetadataStore;
30+
import org.springframework.jdbc.core.ConnectionCallback;
2931
import org.springframework.jdbc.core.JdbcOperations;
3032
import org.springframework.jdbc.core.JdbcTemplate;
3133
import org.springframework.transaction.annotation.Transactional;
@@ -168,12 +170,18 @@ public void setLockHint(String lockHint) {
168170

169171
@Override
170172
public void afterPropertiesSet() {
173+
String dataBaseVendor =
174+
this.jdbcTemplate.execute((ConnectionCallback<String>) connection ->
175+
connection.getMetaData().getDatabaseProductName());
171176
this.getValueQuery = String.format(this.getValueQuery, this.tablePrefix);
172177
this.getValueForUpdateQuery = String.format(this.getValueForUpdateQuery, this.tablePrefix, this.lockHint);
173178
this.replaceValueQuery = String.format(this.replaceValueQuery, this.tablePrefix);
174179
this.replaceValueByKeyQuery = String.format(this.replaceValueByKeyQuery, this.tablePrefix);
175180
this.removeValueQuery = String.format(this.removeValueQuery, this.tablePrefix);
176181
this.putIfAbsentValueQuery = String.format(this.putIfAbsentValueQuery, this.tablePrefix, this.tablePrefix);
182+
if ("PostgreSQL".equals(dataBaseVendor)) {
183+
this.putIfAbsentValueQuery += " ON CONFLICT DO NOTHING";
184+
}
177185
this.countQuery = String.format(this.countQuery, this.tablePrefix);
178186
}
179187

@@ -247,7 +255,7 @@ private int tryToPutIfAbsent(String key, String value) {
247255
ps.setString(5, this.region); // NOSONAR magic number
248256
});
249257
}
250-
catch (DataIntegrityViolationException ex) {
258+
catch (TransientDataAccessException | DataIntegrityViolationException ex) {
251259
return 0;
252260
}
253261
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.mysql;
18+
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import javax.sql.DataSource;
25+
26+
import org.junit.jupiter.api.Test;
27+
28+
import org.springframework.beans.factory.annotation.Autowired;
29+
import org.springframework.beans.factory.annotation.Value;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.core.io.Resource;
33+
import org.springframework.integration.jdbc.metadata.JdbcMetadataStore;
34+
import org.springframework.integration.metadata.ConcurrentMetadataStore;
35+
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
36+
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
37+
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
38+
import org.springframework.test.annotation.DirtiesContext;
39+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
40+
import org.springframework.transaction.PlatformTransactionManager;
41+
import org.springframework.transaction.annotation.EnableTransactionManagement;
42+
43+
import static org.assertj.core.api.Assertions.assertThat;
44+
45+
/**
46+
* @author Artem Bilan
47+
*
48+
* @since 6.4
49+
*/
50+
@SpringJUnitConfig
51+
@DirtiesContext
52+
class MySqlMetadataStoreTests implements MySqlContainerTest {
53+
54+
@Autowired
55+
ConcurrentMetadataStore jdbcMetadataStore;
56+
57+
@Test
58+
void verifyJdbcMetadataStoreConcurrency() throws InterruptedException {
59+
ExecutorService executorService = Executors.newFixedThreadPool(100);
60+
CountDownLatch successPutIfAbsents = new CountDownLatch(100);
61+
for (int i = 0; i < 100; i++) {
62+
executorService.execute(() -> {
63+
this.jdbcMetadataStore.putIfAbsent("testKey", "testValue");
64+
successPutIfAbsents.countDown();
65+
});
66+
}
67+
assertThat(successPutIfAbsents.await(10, TimeUnit.SECONDS)).isTrue();
68+
executorService.shutdown();
69+
}
70+
71+
@Configuration(proxyBeanMethods = false)
72+
@EnableTransactionManagement
73+
static class TestConfiguration {
74+
75+
@Value("org/springframework/integration/jdbc/schema-mysql.sql")
76+
Resource createSchemaScript;
77+
78+
@Bean
79+
DataSource dataSource() {
80+
return MySqlContainerTest.dataSource();
81+
}
82+
83+
@Bean
84+
DataSourceInitializer dataSourceInitializer(DataSource dataSource) {
85+
DataSourceInitializer dataSourceInitializer = new DataSourceInitializer();
86+
dataSourceInitializer.setDataSource(dataSource);
87+
dataSourceInitializer.setDatabasePopulator(new ResourceDatabasePopulator(this.createSchemaScript));
88+
return dataSourceInitializer;
89+
}
90+
91+
@Bean
92+
PlatformTransactionManager transactionManager(DataSource dataSource) {
93+
return new DataSourceTransactionManager(dataSource);
94+
}
95+
96+
@Bean
97+
JdbcMetadataStore jdbcMetadataStore(DataSource dataSource) {
98+
return new JdbcMetadataStore(dataSource);
99+
}
100+
101+
}
102+
103+
}
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.jdbc.channel;
17+
package org.springframework.integration.jdbc.postgres;
1818

1919
import java.sql.DriverManager;
2020
import java.sql.SQLException;
@@ -44,6 +44,9 @@
4444
import org.springframework.context.annotation.Configuration;
4545
import org.springframework.core.io.ByteArrayResource;
4646
import org.springframework.integration.config.EnableIntegration;
47+
import org.springframework.integration.jdbc.channel.PgConnectionSupplier;
48+
import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber;
49+
import org.springframework.integration.jdbc.channel.PostgresSubscribableChannel;
4750
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
4851
import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider;
4952
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.jdbc.channel;
17+
package org.springframework.integration.jdbc.postgres;
1818

1919
import org.junit.jupiter.api.BeforeAll;
2020
import org.testcontainers.containers.PostgreSQLContainer;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jdbc.postgres;
18+
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import javax.sql.DataSource;
25+
26+
import org.apache.commons.dbcp2.BasicDataSource;
27+
import org.junit.jupiter.api.Test;
28+
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.integration.jdbc.metadata.JdbcMetadataStore;
33+
import org.springframework.integration.metadata.ConcurrentMetadataStore;
34+
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
35+
import org.springframework.test.annotation.DirtiesContext;
36+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
37+
import org.springframework.transaction.PlatformTransactionManager;
38+
import org.springframework.transaction.annotation.EnableTransactionManagement;
39+
40+
import static org.assertj.core.api.Assertions.assertThat;
41+
42+
/**
43+
* @author Artem Bilan
44+
*
45+
* @since 6.2.9
46+
*/
47+
@SpringJUnitConfig
48+
@DirtiesContext
49+
class PostgresMetadataStoreTests implements PostgresContainerTest {
50+
51+
@Autowired
52+
ConcurrentMetadataStore jdbcMetadataStore;
53+
54+
@Test
55+
void verifyJdbcMetadataStoreConcurrency() throws InterruptedException {
56+
ExecutorService executorService = Executors.newFixedThreadPool(100);
57+
CountDownLatch successPutIfAbsents = new CountDownLatch(100);
58+
for (int i = 0; i < 100; i++) {
59+
executorService.execute(() -> {
60+
this.jdbcMetadataStore.putIfAbsent("testKey", "testValue");
61+
successPutIfAbsents.countDown();
62+
});
63+
}
64+
assertThat(successPutIfAbsents.await(10, TimeUnit.SECONDS)).isTrue();
65+
executorService.shutdown();
66+
}
67+
68+
@Configuration(proxyBeanMethods = false)
69+
@EnableTransactionManagement
70+
static class TestConfiguration {
71+
72+
@Bean
73+
DataSource dataSource() {
74+
BasicDataSource dataSource = new BasicDataSource();
75+
dataSource.setUrl(PostgresContainerTest.getJdbcUrl());
76+
dataSource.setUsername(PostgresContainerTest.getUsername());
77+
dataSource.setPassword(PostgresContainerTest.getPassword());
78+
return dataSource;
79+
}
80+
81+
@Bean
82+
PlatformTransactionManager transactionManager(DataSource dataSource) {
83+
return new DataSourceTransactionManager(dataSource);
84+
}
85+
86+
@Bean
87+
JdbcMetadataStore jdbcMetadataStore(DataSource dataSource) {
88+
return new JdbcMetadataStore(dataSource);
89+
}
90+
91+
}
92+
93+
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,7 +22,7 @@
2222

2323
import org.springframework.context.annotation.Bean;
2424
import org.springframework.context.annotation.Configuration;
25-
import org.springframework.integration.jdbc.channel.PostgresContainerTest;
25+
import org.springframework.integration.jdbc.postgres.PostgresContainerTest;
2626
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
2727
import org.springframework.test.context.ContextConfiguration;
2828
import org.springframework.transaction.PlatformTransactionManager;

0 commit comments

Comments
 (0)