Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions java/connector-node/risingwave-jdbc-runner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,21 @@
<artifactId>redshift-jdbc42</artifactId>
<version>2.1.0.33</version>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-sink-jdbc</artifactId>
<version>0.1.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,40 @@

package com.risingwave.runner;

import java.sql.*;
import com.risingwave.connector.SnowflakeJDBCSinkConfig;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JDBCSqlRunner {
private static final Logger LOG = LoggerFactory.getLogger(JDBCSqlRunner.class);

public static void executeSql(String fullUrl, String[] sqls) throws Exception {
public static void executeSqlWithProps(
String fullUrl, String[] sqls, String[] propKeys, String[] propValues)
throws Exception {
Connection connection = null;
try {
Class.forName("net.snowflake.client.jdbc.SnowflakeDriver");
connection = DriverManager.getConnection(fullUrl);
Properties props = new Properties();
if (propKeys != null && propValues != null) {
if (propKeys.length != propValues.length) {
throw new IllegalArgumentException(
"Property keys and values arrays must have the same length");
}
for (int i = 0; i < propKeys.length; i++) {
if (propKeys[i] != null && propValues[i] != null) {
props.put(propKeys[i], propValues[i]);
}
}
}

SnowflakeJDBCSinkConfig.handleSnowflakeAuth(props);

connection = DriverManager.getConnection(fullUrl, props);
connection.setAutoCommit(false);
LOG.info("[JDBCRunner] Transaction started, auto-commit disabled");
Statement stmt = connection.createStatement();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2025 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.runner;

import static org.junit.Assert.assertNotNull;

import com.risingwave.connector.SnowflakeJDBCSinkConfig;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.PrivateKey;
import org.junit.Test;

public class JDBCSqlRunnerTest {

private String loadTestPem() throws IOException {
try (InputStream is =
getClass().getClassLoader().getResourceAsStream("test-private-key.pem")) {
if (is == null) {
throw new IOException("Test PEM file not found in resources");
}
return new String(is.readAllBytes(), StandardCharsets.UTF_8);
}
}

@Test
public void loadPrivateKeyFromPem_unencrypted() throws Exception {
String testPem = loadTestPem();
PrivateKey key = SnowflakeJDBCSinkConfig.loadPrivateKeyFromPem(testPem, null);
assertNotNull(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDh6PSnttDsv+vi
tUZTP1E3hVBah6PUGDWZhYgNiyW8quTWCmPvBmCR2YzuhUrY5+CtKP8UJOQico+p
oJHSAPsrzSr6YsGs3c9SQOslBmm9Fkh9/f/GZVTVZ6u5AsUmOcVvZ2q7Sz8Vj/aR
aIm0EJqRe9cQ5vvN9sg25rIv4xKwIZJ1VixKWJLmpCmDINqn7xvl+ldlUmSr3aGt
w21uSDuEJhQlzO3yf2FwJMkJ9SkCm9oVDXyl77OnKXj5bOQ/rojbyGeIxDJSUDWE
GKyRPuqKi6rSbwg6h2G/Z9qBJkqM5NNTbGRIFz/9/LdmmwvtaqCxlLtD7RVEryAp
+qTGDk5hAgMBAAECggEBAMYYfNDEYpf4A2SdCLne/9zrrfZ0kphdUkL48MDPj5vN
TzTRj6f9s5ixZ/+QKn3hdwbguCx13QbH5mocP0IjUhyqoFFHYAWxyyaZfpjM8tO4
QoEYxby3BpjLe62UXESUzChQSytJZFwIDXKcdIPNO3zvVzufEJcfG5no2b9cIvsG
Dy6J1FNILWxCtDIqBM+G1B1is9DhZnUDgn0iKzINiZmh1I1l7k/4tMnozVIKAfwo
f1kYjG/d2IzDM02mTeTElz3IKeNriaOIYTZgI26xLJxTkiFnBV4JOWFAZw15X+yR
+DrjGSIkTfhzbLa20Vt3AFM+LFK0ZoXT2dRnjbYPjQECgYEA+9XJFGwLcEX6pl1p
IwXAjXKJdju9DDn4lmHTW0Pbw25h1EXONwm/NPafwsWmPll9kW9IwsxUQVUyBC9a
c3Q7rF1e8ai/qqVFRIZof275MI82ciV2Mw8Hz7FPAUyoju5CvnjAEH4+irt1VE/7
SgdvQ1gDBQFegS69ijdz+cOhFxkCgYEA5aVoseMy/gIlsCvNPyw9+Jz/zBpKItX0
jGzdF7lhERRO2cursujKaoHntRckHcE3P/Z4K565bvVq+VaVG0T/BcBKPmPHrLmY
iuVXidltW7Jh9/RCVwb5+BvqlwlC470PEwhqoUatY/fPJ74srztrqJHvp1L29FT5
sdmlJW8YwokCgYAUa3dMgp5C0knKp5RY1KSSU5E11w4zKZgwiWob4lq1dAPWtHpO
GCo63yyBHImoUJVP75gUw4Cpc4EEudo5tlkIVuHV8nroGVKOhd9/Rb5K47Hke4kk
Brn5a0Use9qPDF65Fw1ryPDFSwHufjXAAO5SpZZJF51UGDgiNvDedbBgMQKBgHSk
t7DjPhtW69234eCckD2fQS5ijBV1p2lMQmCygGM0dXiawvN02puOsCqDPoz+fxm2
DwPY80cw0M0k9UeMnBxHt25JMDrDan/iTbxu++T/jlNrdebOXFlxlI5y3c7fULDS
LZcNVzTXwhjlt7yp6d0NgzTyJw2ju9BiREfnTiRBAoGBAOPHrTOnPyjO+bVcCPTB
WGLsbBd77mVPGIuL0XGrvbVYPE8yIcNbZcthd8VXL/38Ygy8SIZh2ZqsrU1b5WFa
XUMLnGEODSS8x/GmW3i3KeirW5OxBNjfUzEF4XkJP8m41iTdsQEXQf9DdUY7X+CB
VL5h7N0VstYhGgycuPpcIUQa
-----END PRIVATE KEY-----
10 changes: 10 additions & 0 deletions java/connector-node/risingwave-sink-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@
<version>3.23.0</version>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.78</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
<version>1.78</version>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,7 @@ public BatchAppendOnlyJDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
var factory = JdbcUtils.getDialectFactory(jdbcUrl);
this.config = config;
try {
conn =
JdbcUtils.getConnection(
config.getJdbcUrl(),
config.getUser(),
config.getPassword(),
config.isAutoCommit(),
config.getBatchInsertRows());
conn = config.getConnection();
// column name -> java.sql.Types
Map<String, Integer> columnTypeMapping =
getColumnTypeMapping(conn, config.getTableName(), config.getSchemaName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,7 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
this.config = config;

try {
conn =
JdbcUtils.getConnection(
config.getJdbcUrl(),
config.getUser(),
config.getPassword(),
config.isAutoCommit(),
DUMMY_BATCH_INSERT_ROWS);
conn = config.getConnection();
// Table schema has been validated before, so we get the PK from it directly
this.pkColumnNames = tableSchema.getPrimaryKeys();
// column name -> java.sql.Types
Expand Down Expand Up @@ -195,13 +189,7 @@ public boolean write(Iterable<SinkRow> rows) {
conn.close();

// create a new connection if the current connection is invalid
conn =
JdbcUtils.getConnection(
config.getJdbcUrl(),
config.getUser(),
config.getPassword(),
config.isAutoCommit(),
DUMMY_BATCH_INSERT_ROWS);
conn = config.getConnection();
// reset the flag since we will retry to prepare the batch again
updateFlag = false;
jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.risingwave.connector.api.sink.CommonSinkConfig;
import java.sql.Connection;
import java.sql.SQLException;

public class JDBCSinkConfig extends CommonSinkConfig {
private String jdbcUrl;
Expand Down Expand Up @@ -103,4 +105,16 @@ public String getDatabaseName() {
public int getBatchInsertRows() {
return batchInsertRows;
}

/**
* Creates a JDBC connection based on this configuration. Subclasses can override this method to
* provide specialized connection logic. The connection returned by this method is *not*
* autoCommit by default.
*
* @return JDBC connection
* @throws SQLException if connection fails
*/
public Connection getConnection() throws SQLException {
return JdbcUtils.getConnectionDefault(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,26 @@ public class JDBCSinkFactory implements SinkFactory {
public static final String JDBC_URL_PROP = "jdbc.url";
public static final String TABLE_NAME_PROP = "table.name";

/**
* Creates the appropriate config class based on the JDBC URL. Returns SnowflakeJDBCSinkConfig
* for Snowflake, otherwise JDBCSinkConfig.
*
* @param mapper ObjectMapper for deserialization
* @param tableProperties properties to deserialize
* @return appropriate config instance
*/
private JDBCSinkConfig createConfig(ObjectMapper mapper, Map<String, String> tableProperties) {
String jdbcUrl = tableProperties.get(JDBC_URL_PROP);
if (jdbcUrl != null && jdbcUrl.startsWith("jdbc:snowflake")) {
return mapper.convertValue(tableProperties, SnowflakeJDBCSinkConfig.class);
}
return mapper.convertValue(tableProperties, JDBCSinkConfig.class);
}

@Override
public SinkWriter createWriter(TableSchema tableSchema, Map<String, String> tableProperties) {
ObjectMapper mapper = new ObjectMapper();
JDBCSinkConfig config = mapper.convertValue(tableProperties, JDBCSinkConfig.class);
JDBCSinkConfig config = createConfig(mapper, tableProperties);
if ((config.getJdbcUrl().startsWith("jdbc:snowflake")
|| config.getJdbcUrl().startsWith("jdbc:redshift"))) {
return new BatchAppendOnlyJDBCSink(config, tableSchema);
Expand All @@ -51,7 +67,7 @@ public void validate(
TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType) {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true);
JDBCSinkConfig config = mapper.convertValue(tableProperties, JDBCSinkConfig.class);
JDBCSinkConfig config = createConfig(mapper, tableProperties);

String jdbcUrl = config.getJdbcUrl();
String tableName = config.getTableName();
Expand All @@ -60,9 +76,7 @@ public void validate(
Set<String> jdbcPks = new HashSet<>();
Set<String> jdbcTableNames = new HashSet<>();

try (Connection conn =
DriverManager.getConnection(
jdbcUrl, config.getUser(), config.getPassword());
try (Connection conn = config.getConnection();
ResultSet tableNamesResultSet =
conn.getMetaData().getTables(null, schemaName, "%", null);
ResultSet columnResultSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import java.sql.SQLException;
import java.util.Optional;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JdbcUtils {
private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class);

static final int CONNECTION_TIMEOUT = 30;
static final int SOCKET_TIMEOUT = 300;
Expand All @@ -47,15 +50,18 @@ public static Optional<JdbcDialectFactory> getDialectFactory(String jdbcUrl) {
}
}

/** The connection returned by this method is *not* autoCommit */
public static Connection getConnection(
String jdbcUrl, String user, String password, boolean autoCommit, int batchInsertRows)
throws SQLException {
/**
* Creates base JDBC connection properties with common settings. This is a helper method that
* can be used by default and specialized connection logic.
*
* @param jdbcUrl JDBC URL to determine database-specific settings
* @param user Username for authentication
* @return Properties object with base connection settings
*/
static Properties createBaseProperties(String jdbcUrl, String user) {
var props = new Properties();

// enable TCP keep alive to avoid connection closed by server
// both MySQL and PG support this property
// https://jdbc.postgresql.org/documentation/use/
// https://dev.mysql.com/doc/connectors/en/connector-j-connp-props-networking.html#cj-conn-prop_tcpKeepAlive
props.setProperty("tcpKeepAlive", "true");

// default timeout in seconds
Expand All @@ -66,20 +72,40 @@ public static Connection getConnection(
int socketTimeout = isPg ? SOCKET_TIMEOUT : SOCKET_TIMEOUT * 1000;
props.setProperty("connectTimeout", String.valueOf(connectTimeout));
props.setProperty("socketTimeout", String.valueOf(socketTimeout));

if (user != null) {
props.put("user", user);
}
if (password != null) {
props.put("password", password);

return props;
}

/**
* Creates a JDBC connection for the default configuration (password authentication). The
* connection returned by this method is *not* autoCommit by default.
*
* @param config JDBC sink configuration
* @return JDBC connection
* @throws SQLException if connection fails
*/
static Connection getConnectionDefault(JDBCSinkConfig config) throws SQLException {
String jdbcUrl = config.getJdbcUrl();
var props = createBaseProperties(jdbcUrl, config.getUser());

// Default password authentication
if (config.getPassword() != null) {
props.put("password", config.getPassword());
}
if (jdbcUrl.startsWith("jdbc:redshift") && batchInsertRows > 0) {

if (jdbcUrl.startsWith("jdbc:redshift") && config.getBatchInsertRows() > 0) {
props.setProperty("reWriteBatchedInserts", "true");
props.setProperty("reWriteBatchedInsertsSize", String.valueOf(batchInsertRows));
props.setProperty(
"reWriteBatchedInsertsSize", String.valueOf(config.getBatchInsertRows()));
}

var conn = DriverManager.getConnection(jdbcUrl, props);
// disable auto commit can improve performance
conn.setAutoCommit(autoCommit);
conn.setAutoCommit(config.isAutoCommit());
// explicitly set isolation level to RC
conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
return conn;
Expand Down
Loading
Loading