Skip to content

Commit e292b77

Browse files
[improve][io] support array schema for JDBC postgres connector (#24549)
Co-authored-by: Omri Fried <[email protected]>
1 parent fa28d1c commit e292b77

File tree

14 files changed

+4335
-30
lines changed

14 files changed

+4335
-30
lines changed

pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,51 @@
1818
*/
1919
package org.apache.pulsar.io.jdbc;
2020

21+
import java.sql.PreparedStatement;
2122
import org.apache.pulsar.io.core.annotations.Connector;
2223
import org.apache.pulsar.io.core.annotations.IOType;
2324

2425
@Connector(
25-
name = "jdbc-clickhouse",
26-
type = IOType.SINK,
27-
help = "A simple JDBC sink for ClickHouse that writes pulsar messages to a database table",
28-
configClass = JdbcSinkConfig.class
26+
name = "jdbc-clickhouse",
27+
type = IOType.SINK,
28+
help = "A simple JDBC sink for ClickHouse that writes pulsar messages to a database table",
29+
configClass = JdbcSinkConfig.class
2930
)
3031
public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
3132

33+
/**
34+
* ClickHouse array support is not currently implemented.
35+
* <p>
36+
* While ClickHouse has native support for array types (e.g., Array(Int32), Array(String)),
37+
* the automatic conversion from Avro arrays to ClickHouse arrays is not yet implemented
38+
* in this JDBC sink. ClickHouse arrays have specific syntax and behavior that would
39+
* require dedicated implementation.
40+
* </p>
41+
* <p>
42+
* <strong>Alternatives:</strong>
43+
* <ul>
44+
* <li>Use PostgreSQL JDBC sink for native array support</li>
45+
* <li>Implement custom ClickHouse array conversion logic</li>
46+
* <li>Serialize arrays to JSON strings for storage in String columns</li>
47+
* <li>Use separate tables for one-to-many relationships</li>
48+
* </ul>
49+
* </p>
50+
* <p>
51+
* <strong>Future Enhancement:</strong>
52+
* This method could be enhanced to support ClickHouse-specific array conversion
53+
* using the ClickHouse JDBC driver's array handling capabilities.
54+
* </p>
55+
*
56+
* @param statement the PreparedStatement (not used)
57+
* @param index the parameter index (not used)
58+
* @param arrayValue the array value (not used)
59+
* @param targetSqlType the target SQL type (not used)
60+
* @throws UnsupportedOperationException always thrown as ClickHouse array support is not implemented
61+
*/
62+
@Override
63+
protected void handleArrayValue(PreparedStatement statement, int index, Object arrayValue, String targetSqlType)
64+
throws Exception {
65+
throw new UnsupportedOperationException("Array types are not supported by ClickHouse JDBC sink. "
66+
+ "Consider using PostgreSQL JDBC sink for array support.");
67+
}
3268
}

pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java

Lines changed: 202 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.function.Function;
3131
import lombok.extern.slf4j.Slf4j;
3232
import org.apache.avro.Schema;
33+
import org.apache.avro.generic.GenericData;
3334
import org.apache.pulsar.client.api.schema.GenericObject;
3435
import org.apache.pulsar.client.api.schema.GenericRecord;
3536
import org.apache.pulsar.client.api.schema.KeyValueSchema;
@@ -55,6 +56,61 @@ public List<ColumnId> getColumnsForUpsert() {
5556
throw new IllegalStateException("UPSERT not supported");
5657
}
5758

59+
/**
60+
* Handles array value binding for database-specific array types.
61+
* <p>
62+
* This method is called when an array value needs to be bound to a PreparedStatement
63+
* parameter. Implementations should convert the array value to the appropriate
64+
* database-specific array type and bind it to the statement using the appropriate
65+
* JDBC method (typically {@code PreparedStatement.setArray()}).
66+
* </p>
67+
* <p>
68+
* The method is invoked automatically by {@link #setColumnValue(PreparedStatement, int, Object, String)}
69+
* when it detects an array type (specifically {@code org.apache.avro.generic.GenericData$Array}).
70+
* </p>
71+
* <p>
72+
* <strong>Implementation Guidelines:</strong>
73+
* <ul>
74+
* <li>Handle null arrays by calling {@code statement.setNull(index, java.sql.Types.ARRAY)}</li>
75+
* <li>Convert array elements to the appropriate database-specific types</li>
76+
* <li>Use the targetSqlType parameter to determine the correct array element type</li>
77+
* <li>Provide descriptive error messages for type mismatches and unsupported types</li>
78+
* <li>Wrap JDBC exceptions with contextual information</li>
79+
* </ul>
80+
* </p>
81+
* <p>
82+
* <strong>Example Usage:</strong>
83+
* <pre>{@code
84+
* // For PostgreSQL implementation:
85+
* if (arrayValue == null) {
86+
* statement.setNull(index, java.sql.Types.ARRAY);
87+
* return;
88+
* }
89+
*
90+
* Object[] elements = convertToObjectArray(arrayValue);
91+
* String postgresType = mapToPostgresType(targetSqlType);
92+
* Array pgArray = connection.createArrayOf(postgresType, elements);
93+
* statement.setArray(index, pgArray);
94+
* }</pre>
95+
* </p>
96+
*
97+
* @param statement the PreparedStatement to bind the array value to
98+
* @param index the parameter index (1-based) in the PreparedStatement
99+
* @param arrayValue the array value to be bound, typically a {@code GenericData.Array} or {@code Object[]}
100+
* @param targetSqlType the target SQL type name for the array column (e.g., "integer", "text", "_int4")
101+
* @throws Exception if array conversion or binding fails, including:
102+
* <ul>
103+
* <li>{@code IllegalArgumentException} for unsupported array types or type mismatches</li>
104+
* <li>{@code SQLException} for JDBC array creation or binding failures</li>
105+
* <li>{@code UnsupportedOperationException} for databases that don't support arrays</li>
106+
* </ul>
107+
* @see #setColumnValue(PreparedStatement, int, Object, String)
108+
* @see java.sql.PreparedStatement#setArray(int, java.sql.Array)
109+
* @see java.sql.Connection#createArrayOf(String, Object[])
110+
*/
111+
protected abstract void handleArrayValue(PreparedStatement statement, int index, Object arrayValue,
112+
String targetSqlType) throws Exception;
113+
58114
@Override
59115
public void bindValue(PreparedStatement statement, Mutation mutation) throws Exception {
60116
final List<ColumnId> columns = new ArrayList<>();
@@ -78,13 +134,14 @@ public void bindValue(PreparedStatement statement, Mutation mutation) throws Exc
78134
for (ColumnId columnId : columns) {
79135
String colName = columnId.getName();
80136
int colType = columnId.getType();
137+
String typeName = columnId.getTypeName();
81138
if (log.isDebugEnabled()) {
82139
log.debug("getting value for column: {} type: {}", colName, colType);
83140
}
84141
try {
85142
Object obj = mutation.getValues().apply(colName);
86143
if (obj != null) {
87-
setColumnValue(statement, index++, obj);
144+
setColumnValue(statement, index++, obj, typeName);
88145
} else {
89146
if (log.isDebugEnabled()) {
90147
log.debug("Column {} is null", colName);
@@ -174,10 +231,73 @@ private static void setColumnNull(PreparedStatement statement, int index, int ty
174231

175232
}
176233

177-
protected void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception {
234+
/**
235+
* Sets a column value in a PreparedStatement, handling various data types including arrays.
236+
* <p>
237+
* This method automatically detects the type of the value and calls the appropriate
238+
* PreparedStatement setter method. It supports primitive types, strings, and arrays.
239+
* Array support is implemented through the {@link #handleArrayValue(PreparedStatement, int, Object, String)}
240+
* method, which delegates to database-specific implementations.
241+
* </p>
242+
* <p>
243+
* <strong>Supported Types:</strong>
244+
* <ul>
245+
* <li>Primitive types: Integer, Long, Double, Float, Boolean, Short</li>
246+
* <li>String types: String</li>
247+
* <li>Binary types: ByteString</li>
248+
* <li>JSON types: GenericJsonRecord</li>
249+
* <li>Array types: GenericData.Array (requires targetSqlType parameter)</li>
250+
* </ul>
251+
* </p>
252+
* <p>
253+
* <strong>Array Handling:</strong>
254+
* When an array is detected (GenericData.Array), this method calls the abstract
255+
* {@link #handleArrayValue(PreparedStatement, int, Object, String)} method, which
256+
* must be implemented by database-specific subclasses. The targetSqlType parameter
257+
* is essential for proper array type conversion.
258+
* </p>
259+
* <p>
260+
* <strong>Example Usage:</strong>
261+
* <pre>{@code
262+
* // Setting a primitive value
263+
* setColumnValue(statement, 1, 42, "integer");
264+
*
265+
* // Setting an array value (requires database-specific implementation)
266+
* GenericData.Array<Integer> intArray = ...;
267+
* setColumnValue(statement, 2, intArray, "integer");
268+
* }</pre>
269+
* </p>
270+
*
271+
* @param statement the PreparedStatement to bind the value to
272+
* @param index the parameter index (1-based) in the PreparedStatement
273+
* @param value the value to be bound (null values are handled automatically)
274+
* @param targetSqlType the target SQL type name for the column, required for array types
275+
* @throws Exception if value binding fails, including:
276+
* <ul>
277+
* <li>Unsupported value types</li>
278+
* <li>Array conversion failures (delegated to handleArrayValue)</li>
279+
* <li>JDBC binding errors</li>
280+
* </ul>
281+
* @see #handleArrayValue(PreparedStatement, int, Object, String)
282+
* @see #setColumnValue(PreparedStatement, int, Object)
283+
*/
284+
protected void setColumnValue(PreparedStatement statement, int index, Object value,
285+
String targetSqlType) throws Exception {
178286

179287
log.debug("Setting column value, statement: {}, index: {}, value: {}", statement, index, value);
180288

289+
// Handle null values first
290+
if (value == null) {
291+
setColumnNull(statement, index, java.sql.Types.NULL);
292+
return;
293+
}
294+
295+
// Check for array types first, before other type checks
296+
if (value instanceof GenericData.Array || value instanceof Object[]) {
297+
handleArrayValue(statement, index, value, targetSqlType);
298+
return;
299+
}
300+
181301
if (value instanceof Integer) {
182302
statement.setInt(index, (Integer) value);
183303
} else if (value instanceof Long) {
@@ -201,6 +321,24 @@ protected void setColumnValue(PreparedStatement statement, int index, Object val
201321
}
202322
}
203323

324+
/**
325+
* Backward compatibility method for setColumnValue without targetSqlType parameter.
326+
* This method is provided for compatibility with existing code that may call setColumnValue
327+
* without the targetSqlType parameter. Arrays will not be supported when using this method.
328+
*
329+
* @param statement the PreparedStatement to bind the value to
330+
* @param index the parameter index (1-based) in the PreparedStatement
331+
* @param value the value to be bound
332+
* @throws Exception if value binding fails or if an array is encountered (arrays require targetSqlType)
333+
*/
334+
protected void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception {
335+
if (value instanceof GenericData.Array) {
336+
throw new Exception("Array values require targetSqlType parameter. "
337+
+ "Use setColumnValue(statement, index, value, targetSqlType) instead.");
338+
}
339+
setColumnValue(statement, index, value, null);
340+
}
341+
204342
private static Object getValueFromJsonNode(final JsonNode fn) {
205343
if (fn == null || fn.isNull()) {
206344
return null;
@@ -226,8 +364,8 @@ private static Object getValueFromJsonNode(final JsonNode fn) {
226364
}
227365

228366
private static void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericObject> schema,
229-
GenericObject record,
230-
Map<String, Object> data) {
367+
GenericObject record,
368+
Map<String, Object> data) {
231369
if (record == null) {
232370
return;
233371
}
@@ -256,6 +394,51 @@ private static void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<G
256394
}
257395
}
258396

397+
/**
398+
* Converts an Avro field value to a Java object suitable for JDBC binding.
399+
* <p>
400+
* This method handles the conversion of various Avro schema types to their corresponding
401+
* Java representations. It supports primitive types, strings, unions, and arrays.
402+
* Array conversion is performed recursively, processing each array element according
403+
* to the array's element schema.
404+
* </p>
405+
* <p>
406+
* <strong>Supported Avro Types:</strong>
407+
* <ul>
408+
* <li>Primitive types: NULL, INT, LONG, DOUBLE, FLOAT, BOOLEAN</li>
409+
* <li>String types: STRING, ENUM</li>
410+
* <li>Union types: Automatically selects the non-null type from the union</li>
411+
* <li>Array types: Recursively converts array elements to Object[]</li>
412+
* </ul>
413+
* </p>
414+
* <p>
415+
* <strong>Array Conversion:</strong>
416+
* Arrays are converted by recursively processing each element according to the
417+
* array's element schema. The result is an Object[] that can be further processed
418+
* by database-specific array handling methods.
419+
* </p>
420+
* <p>
421+
* <strong>Example Usage:</strong>
422+
* <pre>{@code
423+
* // Convert a simple integer field
424+
* Schema intSchema = Schema.create(Schema.Type.INT);
425+
* Object result = convertAvroField(42, intSchema); // Returns Integer(42)
426+
*
427+
* // Convert an array field
428+
* Schema arraySchema = Schema.createArray(Schema.create(Schema.Type.STRING));
429+
* GenericData.Array<String> avroArray = ...;
430+
* Object result = convertAvroField(avroArray, arraySchema); // Returns String[]
431+
* }</pre>
432+
* </p>
433+
*
434+
* @param avroValue the Avro value to convert (may be null)
435+
* @param schema the Avro schema describing the value's type
436+
* @return the converted Java object, or null if avroValue is null
437+
* @throws IllegalArgumentException if the avroValue doesn't match the expected schema type
438+
* @throws UnsupportedOperationException if the schema type is not supported
439+
* @see org.apache.avro.Schema.Type
440+
* @see org.apache.avro.generic.GenericData.Array
441+
*/
259442
@VisibleForTesting
260443
static Object convertAvroField(Object avroValue, Schema schema) {
261444
if (avroValue == null) {
@@ -281,6 +464,21 @@ static Object convertAvroField(Object avroValue, Schema schema) {
281464
}
282465
throw new IllegalArgumentException("Found UNION schema but it doesn't contain any type");
283466
case ARRAY:
467+
// Handle array conversion by recursively processing array elements
468+
if (avroValue instanceof GenericData.Array) {
469+
GenericData.Array<?> avroArray = (GenericData.Array<?>) avroValue;
470+
Schema elementSchema = schema.getElementType();
471+
Object[] convertedArray = new Object[avroArray.size()];
472+
473+
for (int i = 0; i < avroArray.size(); i++) {
474+
convertedArray[i] = convertAvroField(avroArray.get(i), elementSchema);
475+
}
476+
477+
return convertedArray;
478+
} else {
479+
throw new IllegalArgumentException("Expected GenericData.Array for ARRAY schema type, got: "
480+
+ avroValue.getClass().getName());
481+
}
284482
case BYTES:
285483
case FIXED:
286484
case RECORD:

0 commit comments

Comments
 (0)