Skip to content

Commit 9f8232a

Browse files
committed
Add a config option to force double types when schema is unavailable
1 parent b210b86 commit 9f8232a

File tree

4 files changed

+52
-2
lines changed

4 files changed

+52
-2
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
3737
public static final String SYMBOL_COLUMNS_CONFIG = "symbols";
3838
private static final String SYMBOL_COLUMNS_DOC = "Comma separated list of columns that should be symbol type";
3939

40+
public static final String DOUBLE_COLUMNS_CONFIG = "doubles";
41+
private static final String DOUBLE_COLUMNS_DOC = "Comma separated list of columns that should be double type";
42+
4043
public static final String USERNAME = "username";
4144
private static final String USERNAME_DOC = "Username for QuestDB ILP authentication";
4245

@@ -64,6 +67,7 @@ public static ConfigDef conf() {
6467
.define(DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, Type.STRING, null, Importance.MEDIUM, DESIGNATED_TIMESTAMP_COLUMN_NAME_DOC)
6568
.define(INCLUDE_KEY_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, INCLUDE_KEY_DOC)
6669
.define(SYMBOL_COLUMNS_CONFIG, Type.STRING, null, Importance.MEDIUM, SYMBOL_COLUMNS_DOC)
70+
.define(DOUBLE_COLUMNS_CONFIG, Type.STRING, null, Importance.MEDIUM, DOUBLE_COLUMNS_DOC)
6771
.define(USERNAME, Type.STRING, "admin", Importance.MEDIUM, USERNAME_DOC)
6872
.define(TOKEN, Type.PASSWORD, null, Importance.MEDIUM, TOKEN_DOC)
6973
.define(TLS, Type.BOOLEAN, false, Importance.MEDIUM, TLS_DOC)
@@ -102,6 +106,10 @@ public String getSymbolColumns() {
102106
return getString(SYMBOL_COLUMNS_CONFIG);
103107
}
104108

109+
public String getDoubleColumns() {
110+
return getString(DOUBLE_COLUMNS_CONFIG);
111+
}
112+
105113
public String getUsername() {
106114
return getString(USERNAME);
107115
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
import org.slf4j.LoggerFactory;
1919

2020
import java.util.Collection;
21+
import java.util.Collections;
22+
import java.util.HashSet;
2123
import java.util.List;
2224
import java.util.Map;
25+
import java.util.Set;
2326
import java.util.concurrent.TimeUnit;
2427

2528
public final class QuestDBSinkTask extends SinkTask {
@@ -33,6 +36,7 @@ public final class QuestDBSinkTask extends SinkTask {
3336
private String timestampColumnName;
3437
private long timestampColumnValue = Long.MIN_VALUE;
3538
private TimeUnit timestampUnits;
39+
private Set<CharSequence> doubleColumns;
3640

3741
@Override
3842
public String version() {
@@ -42,6 +46,15 @@ public String version() {
4246
@Override
4347
public void start(Map<String, String> map) {
4448
this.config = new QuestDBSinkConnectorConfig(map);
49+
String doubleColumnsConfig = config.getDoubleColumns();
50+
if (doubleColumnsConfig == null) {
51+
doubleColumns = Collections.emptySet();
52+
} else {
53+
doubleColumns = new HashSet<>();
54+
for (String symbolColumn : doubleColumnsConfig.split(",")) {
55+
doubleColumns.add(symbolColumn.trim());
56+
}
57+
}
4558
this.sender = createSender();
4659
this.timestampColumnName = config.getDesignatedTimestampColumnName();
4760
this.timestampUnits = config.getTimestampUnitsOrNull();
@@ -181,9 +194,19 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa
181194
if (value instanceof String) {
182195
sender.stringColumn(actualName, (String) value);
183196
} else if (value instanceof Long) {
184-
sender.longColumn(actualName, (Long) value);
197+
Long longValue = (Long) value;
198+
if (doubleColumns.contains(actualName)) {
199+
sender.doubleColumn(actualName, longValue.doubleValue());
200+
} else {
201+
sender.longColumn(actualName, longValue);
202+
}
185203
} else if (value instanceof Integer) {
186-
sender.longColumn(actualName, (Integer) value);
204+
Integer intValue = (Integer) value;
205+
if (doubleColumns.contains(actualName)) {
206+
sender.doubleColumn(actualName, intValue.doubleValue());
207+
} else {
208+
sender.longColumn(actualName, intValue);
209+
}
187210
} else if (value instanceof Boolean) {
188211
sender.boolColumn(actualName, (Boolean) value);
189212
} else if (value instanceof Double) {

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,24 @@ public void testJsonNoSchema() {
452452
"select firstname,lastname,age from " + topicName);
453453
}
454454

455+
@Test
456+
public void testJsonNoSchema_mixedFlotingAndIntTypes() {
457+
connect.kafka().createTopic(topicName, 1);
458+
Map<String, String> props = baseConnectorProps(topicName);
459+
props.put("value.converter.schemas.enable", "false");
460+
props.put(QuestDBSinkConnectorConfig.DOUBLE_COLUMNS_CONFIG, "age");
461+
connect.configureConnector(CONNECTOR_NAME, props);
462+
assertConnectorTaskRunningEventually();
463+
connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
464+
connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":42.5}");
465+
466+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n"
467+
+ "\"John\",\"Doe\",42.0\r\n"
468+
+ "\"Jane\",\"Doe\",42.5\r\n",
469+
"select firstname,lastname,age from " + topicName);
470+
}
471+
472+
455473
@Test
456474
public void testJsonNoSchema_ArrayNotSupported() {
457475
connect.kafka().createTopic(topicName, 1);

readme.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ The connector supports following Options:
4747
| timestamp.units | STRING | micros | auto | Designated timestamp field units |
4848
| include.key | BOOLEAN | false | true | Include message key in target table |
4949
| symbols | STRING | instrument,stock | N/A | Comma separated list of columns that should be symbol type |
50+
| doubles | STRING | volume,price | N/A | Comma separated list of columns that should be double type |
5051
| username | STRING | user1 | admin | User name for QuestDB. Used only when token is non-empty |
5152
| token | STRING | <sub>QgHCOyq35D5HocCMrUGJinEsjEscJlCp7FZQETH21Bw</sub> | N/A | Token for QuestDB authentication |
5253
| tls | BOOLEAN | true | false | Use TLS for QuestDB connection |

0 commit comments

Comments
 (0)