Skip to content

Feature/support nested fields #130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.aiven.kafka.connect.debezium.converters;

import java.math.BigDecimal;
import java.util.Locale;
import java.util.Properties;

import org.apache.kafka.connect.data.SchemaBuilder;
Expand Down Expand Up @@ -51,9 +52,9 @@ public void converterFor(final RelationalColumn column,
}
if (data instanceof BigDecimal) {
// Expected type
return String.format("%.2f", data);
return String.format(Locale.ROOT, "%.2f", data);
} else if (data instanceof Number) {
return String.format("%.2f", ((Number) data).floatValue());
return String.format(Locale.ROOT, "%.2f", ((Number) data).floatValue());
} else {
throw new IllegalArgumentException("Money type should have BigDecimal type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,10 @@ public R apply(final R record) {
struct.schema().fields().forEach(field -> {
newStruct.put(field.name(), struct.get(field));
});
config.fieldNames().forEach(field -> {
config.fields().forEach(field -> {
try {
if (struct.get(field) == null) {
outputValue.add(config.fieldReplaceMissing());
} else {
outputValue.add(struct.get(field).toString());
}
String value = field.readAsString(struct).orElse(config.fieldReplaceMissing());
outputValue.add(value);
} catch (final DataException e) {
log.debug("{} is missing, concat will use {}", field, config.fieldReplaceMissing());
outputValue.add(config.fieldReplaceMissing());
Expand All @@ -94,12 +91,9 @@ public R apply(final R record) {
} else if (schemaAndValue.value() instanceof Map) {
final Map newValue = new HashMap<>((Map<?, ?>) schemaAndValue.value());
final StringJoiner outputValue = new StringJoiner(config.delimiter());
config.fieldNames().forEach(field -> {
if (newValue.get(field) == null) {
outputValue.add(config.fieldReplaceMissing());
} else {
outputValue.add(newValue.get(field).toString());
}
config.fields().forEach(field -> {
String value = field.readAsString(newValue).orElse(config.fieldReplaceMissing());
outputValue.add(value);
});
newValue.put(config.outputFieldName(), outputValue.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import io.aiven.kafka.connect.transforms.utils.CursorField;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import static java.util.stream.Collectors.toList;

final class ConcatFieldsConfig extends AbstractConfig {
public static final String FIELD_NAMES_CONFIG = "field.names";
private static final String FIELD_NAMES_DOC =
Expand Down Expand Up @@ -69,8 +73,9 @@ static ConfigDef config() {
DELIMITER_DOC);
}

final List<String> fieldNames() {
return getList(FIELD_NAMES_CONFIG);
final List<CursorField> fields() {
return getList(FIELD_NAMES_CONFIG).stream().map(CursorField::new)
.collect(toList());
}

final String outputFieldName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import io.aiven.kafka.connect.transforms.utils.CursorField;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.SchemaAndValue;
Expand All @@ -44,31 +46,33 @@ public void configure(final Map<String, ?> configs) {
@Override
public R apply(final R record) {
final SchemaAndValue schemaAndValue = getSchemaAndValue(record);

String fieldName = config.field().getCursor();
if (schemaAndValue.value() == null) {
throw new DataException(keyOrValue() + " can't be null: " + record);
}

final Object fieldValue;
final Optional<Object> fieldValueOpt;
if (schemaAndValue.value() instanceof Struct) {
final Struct struct = (Struct) schemaAndValue.value();
if (struct.schema().field(config.fieldName()) == null) {
throw new DataException(config.fieldName() + " field must be present and its value can't be null: "
if (config.field().read(struct.schema()) == null) {
throw new DataException(fieldName + " field must be present and its value can't be null: "
+ record);
}
fieldValue = struct.get(config.fieldName());
fieldValueOpt = config.field().read(struct);
} else if (schemaAndValue.value() instanceof Map) {
final Map<?, ?> map = (Map<?, ?>) schemaAndValue.value();
fieldValue = map.get(config.fieldName());
fieldValueOpt = config.field().read(map);
} else {
throw new DataException(keyOrValue() + " type must be STRUCT or MAP: " + record);
}

if (fieldValue == null) {
throw new DataException(config.fieldName() + " field must be present and its value can't be null: "
if (fieldValueOpt.isEmpty()) {
throw new DataException(fieldName + " field must be present and its value can't be null: "
+ record);
}

Object fieldValue = fieldValueOpt.orElse(null);

final long newTimestamp;
if (fieldValue instanceof Long) {
final var longFieldValue = (long) fieldValue;
Expand All @@ -81,7 +85,7 @@ public R apply(final R record) {
final var dateFieldValue = (Date) fieldValue;
newTimestamp = dateFieldValue.getTime();
} else {
throw new DataException(config.fieldName()
throw new DataException(fieldName
+ " field must be INT64 or org.apache.kafka.connect.data.Timestamp: "
+ record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import io.aiven.kafka.connect.transforms.utils.CursorField;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -100,8 +102,8 @@ public void ensureValid(final String name, final Object value) {
EPOCH_RESOLUTION_DOC);
}

final String fieldName() {
return getString(FIELD_NAME_CONFIG);
final CursorField field() {
return new CursorField(getString(FIELD_NAME_CONFIG));
}

final TimestampResolution timestampResolution() {
Expand Down
44 changes: 23 additions & 21 deletions src/main/java/io/aiven/kafka/connect/transforms/ExtractTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Optional;

import io.aiven.kafka.connect.transforms.utils.CursorField;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
Expand Down Expand Up @@ -73,26 +74,28 @@ public R apply(final R record) {
final Optional<String> newTopic;

if (schemaAndValue.schema() == null) { // schemaless values (Map)
if (config.fieldName().isPresent()) {
if (config.field().isPresent()) {
newTopic = topicNameFromNamedFieldSchemaless(
record.toString(), schemaAndValue.value(), config.fieldName().get());
record.toString(), schemaAndValue.value(), config.field().get());
} else {
newTopic = topicNameWithoutFieldNameSchemaless(
record.toString(), schemaAndValue.value());
}
} else { // schema-based values (Struct)
if (config.fieldName().isPresent()) {
if (config.field().isPresent()) {
newTopic = topicNameFromNamedFieldWithSchema(
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get());
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.field().get());
} else {
newTopic = topicNameWithoutFieldNameWithSchema(
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
}
}

if (newTopic.isPresent()) {
String appended = record.topic() + config.appendDelimiter() + newTopic.get();
String newName = config.appendToExisting() ? appended : newTopic.get();
return record.newRecord(
newTopic.get(),
newName,
record.kafkaPartition(),
record.keySchema(),
record.key(),
Expand All @@ -112,7 +115,7 @@ public R apply(final R record) {

private Optional<String> topicNameFromNamedFieldSchemaless(final String recordStr,
final Object value,
final String fieldName) {
final CursorField field) {
if (value == null) {
throw new DataException(dataPlace() + " can't be null if field name is specified: " + recordStr);
}
Expand All @@ -123,15 +126,15 @@ private Optional<String> topicNameFromNamedFieldSchemaless(final String recordSt

@SuppressWarnings("unchecked") final Map<String, Object> valueMap = (Map<String, Object>) value;

final Optional<String> result = Optional.ofNullable(valueMap.get(fieldName))
.map(field -> {
if (!SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM.contains(field.getClass())) {
throw new DataException(fieldName + " type in " + dataPlace()
final Optional<String> result = field.read(valueMap)
.map(fieldValue -> {
if (!SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM.contains(fieldValue.getClass())) {
throw new DataException(field.getCursor() + " type in " + dataPlace()
+ " " + value
+ " must be " + SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM
+ ": " + recordStr);
}
return field;
return fieldValue;
})
.map(Object::toString);

Expand All @@ -141,7 +144,7 @@ private Optional<String> topicNameFromNamedFieldSchemaless(final String recordSt
if (config.skipMissingOrNull()) {
return Optional.empty();
} else {
throw new DataException(fieldName + " in " + dataPlace() + " can't be null or empty: " + recordStr);
throw new DataException(field.getCursor() + " in " + dataPlace() + " can't be null or empty: " + recordStr);
}
}
}
Expand Down Expand Up @@ -169,7 +172,7 @@ private Optional<String> topicNameWithoutFieldNameSchemaless(final String record
private Optional<String> topicNameFromNamedFieldWithSchema(final String recordStr,
final Schema schema,
final Object value,
final String fieldName) {
final CursorField field) {
if (Schema.Type.STRUCT != schema.type()) {
throw new DataException(dataPlace() + " schema type must be STRUCT if field name is specified: "
+ recordStr);
Expand All @@ -179,32 +182,31 @@ private Optional<String> topicNameFromNamedFieldWithSchema(final String recordSt
throw new DataException(dataPlace() + " can't be null if field name is specified: " + recordStr);
}

final Field field = schema.field(fieldName);
if (field == null) {
final Field fieldSchema = field.read(schema);
if (fieldSchema == null) {
if (config.skipMissingOrNull()) {
return Optional.empty();
} else {
throw new DataException(fieldName + " in " + dataPlace() + " schema can't be missing: " + recordStr);
throw new DataException(field.getCursor() + " in " + dataPlace() + " schema can't be missing: " + recordStr);
}
}

if (!SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM.contains(field.schema().type())) {
throw new DataException(fieldName + " schema type in " + dataPlace()
if (!SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM.contains(fieldSchema.schema().type())) {
throw new DataException(field.getCursor() + " schema type in " + dataPlace()
+ " must be " + SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM
+ ": " + recordStr);
}

final Struct struct = (Struct) value;

final Optional<String> result = Optional.ofNullable(struct.get(fieldName))
.map(Object::toString);
final Optional<String> result = field.readAsString(struct);
if (result.isPresent() && !result.get().equals("")) {
return result;
} else {
if (config.skipMissingOrNull()) {
return Optional.empty();
} else {
throw new DataException(fieldName + " in " + dataPlace() + " can't be null or empty: " + recordStr);
throw new DataException(field.getCursor() + " in " + dataPlace() + " can't be null or empty: " + recordStr);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Optional;

import io.aiven.kafka.connect.transforms.utils.CursorField;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

Expand All @@ -33,6 +34,14 @@ class ExtractTopicConfig extends AbstractConfig {
"In case the source of the new topic name is null or missing, "
+ "should a record be silently passed without transformation.";

public static final String APPEND_TO_ORIGINAL_TOPIC_NAME_CONFIG = "append.to.topic";
private static final String APPEND_TO_ORIGINAL_TOPIC_NAME_DOC =
"Appends the selected value to the existing topic name to derive the new topic name.";

public static final String APPEND_DELIMITER_CONFIG = "append.to.topic.delimiter";
private static final String APPEND_DELIMITER_DOC =
"Appends the selected value with the given delimiter to the existing topic name.";

ExtractTopicConfig(final Map<?, ?> originals) {
super(config(), originals);
}
Expand All @@ -50,18 +59,38 @@ static ConfigDef config() {
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
SKIP_MISSING_OR_NULL_DOC);
SKIP_MISSING_OR_NULL_DOC)
.define(
APPEND_TO_ORIGINAL_TOPIC_NAME_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
APPEND_TO_ORIGINAL_TOPIC_NAME_DOC)
.define(
APPEND_DELIMITER_CONFIG,
ConfigDef.Type.STRING,
"-",
ConfigDef.Importance.LOW,
APPEND_DELIMITER_DOC);
}

Optional<String> fieldName() {
Optional<CursorField> field() {
final String rawFieldName = getString(FIELD_NAME_CONFIG);
if (null == rawFieldName || "".equals(rawFieldName)) {
return Optional.empty();
}
return Optional.of(rawFieldName);
return Optional.of(new CursorField(rawFieldName));
}

boolean skipMissingOrNull() {
return getBoolean(SKIP_MISSING_OR_NULL_CONFIG);
}

boolean appendToExisting() {
return getBoolean(APPEND_TO_ORIGINAL_TOPIC_NAME_CONFIG);
}

String appendDelimiter() {
return getString(APPEND_DELIMITER_CONFIG);
}
}
Loading