Skip to content
Merged
12 changes: 12 additions & 0 deletions pulsar-io/kafka-connect-adaptor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<version>${kafka-client.version}</version>
<exclusions>
<exclusion>
<artifactId>jose4j</artifactId>
<groupId>org.bitbucket.b_c</groupId>
</exclusion>
</exclusions>
</dependency>

<!-- pulsar-client is only needed for MessageId conversion (for seeking), commons-lang3 and Netty buffer manipulation -->
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public synchronized Record<T> read() throws Exception {
}
if (currentBatch.hasNext()) {
AbstractKafkaSourceRecord<T> processRecord = processSourceRecord(currentBatch.next());
if (processRecord.isEmpty()) {
if (processRecord == null || processRecord.isEmpty()) {
outstandingRecords.decrementAndGet();
continue;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.confluent.connect.avro.AvroData;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand All @@ -31,6 +34,8 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
Expand All @@ -51,6 +56,15 @@ public class KafkaConnectSource extends AbstractKafkaConnectSource<KeyValue<byte
private boolean jsonWithEnvelope = false;
private static final String JSON_WITH_ENVELOPE_CONFIG = "json-with-envelope";

private Map<String, Predicate<SourceRecord>> predicates = new HashMap<>();

private record PredicatedTransform(
Predicate<SourceRecord> predicate,
Transformation<SourceRecord> transform,
boolean negated
) {}
private List<PredicatedTransform> transformations = new ArrayList<>();

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) {
jsonWithEnvelope = Boolean.parseBoolean(config.get(JSON_WITH_ENVELOPE_CONFIG).toString());
Expand All @@ -60,17 +74,120 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
}
log.info("jsonWithEnvelope: {}", jsonWithEnvelope);

initPredicates(config);
initTransforms(config);
super.open(config, sourceContext);
}

private void initPredicates(Map<String, Object> config) {
Object predicatesListObj = config.get("predicates");
if (predicatesListObj != null) {
String predicatesList = predicatesListObj.toString();
for (String predicateName : predicatesList.split(",")) {
predicateName = predicateName.trim();
String prefix = "predicates." + predicateName + ".";
String typeKey = prefix + "type";
Object classNameObj = config.get(typeKey);
if (classNameObj == null) {
continue;
}
String className = classNameObj.toString();
try {
@SuppressWarnings("unchecked")
Class<Predicate<SourceRecord>> clazz =
(Class<Predicate<SourceRecord>>) Class.forName(className);
Predicate<SourceRecord> predicate = clazz.getDeclaredConstructor().newInstance();
java.util.Map<String, Object> predicateConfig = config.entrySet().stream()
.filter(e -> e.getKey().startsWith(prefix))
.collect(java.util.stream.Collectors.toMap(
e -> e.getKey().substring(prefix.length()),
java.util.Map.Entry::getValue
));
log.info("predicate config: {}", predicateConfig);
predicate.configure(predicateConfig);
predicates.put(predicateName, predicate);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate predicate: " + className, e);
}
}
}
}

private void initTransforms(Map<String, Object> config) {
transformations.clear();
Object transformsListObj = config.get("transforms");
if (transformsListObj != null) {
String transformsList = transformsListObj.toString();
for (String transformName : transformsList.split(",")) {
transformName = transformName.trim();
String prefix = "transforms." + transformName + ".";
String typeKey = prefix + "type";
Object classNameObj = config.get(typeKey);
if (classNameObj == null) {
continue;
}
String className = classNameObj.toString();
try {
@SuppressWarnings("unchecked")
Class<Transformation<SourceRecord>> clazz =
(Class<Transformation<SourceRecord>>) Class.forName(className);
Transformation<SourceRecord> transform = clazz.getDeclaredConstructor().newInstance();
java.util.Map<String, Object> transformConfig = config.entrySet().stream()
.filter(e -> e.getKey().startsWith(prefix))
.collect(java.util.stream.Collectors.toMap(
e -> e.getKey().substring(prefix.length()),
java.util.Map.Entry::getValue
));
log.info("transform config: {}", transformConfig);
String predicateName = (String) transformConfig.get("predicate");
boolean negated = Boolean.parseBoolean(
String.valueOf(transformConfig.getOrDefault("negate", "false")));
Predicate<SourceRecord> predicate = null;
if (predicateName != null) {
predicate = predicates.get(predicateName);
if (predicate == null) {
log.warn("Transform {} references non-existent predicate: {}",
transformName, predicateName);
}
}
transform.configure(transformConfig);
transformations.add(new PredicatedTransform(predicate, transform, negated));
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate SMT: " + className, e);
}
}
}
}

private static final AvroData avroData = new AvroData(1000);

public synchronized KafkaSourceRecord processSourceRecord(final SourceRecord srcRecord) {
KafkaSourceRecord record = new KafkaSourceRecord(srcRecord);
SourceRecord transformedRecord = applyTransforms(srcRecord);

offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset());
if (transformedRecord == null) {
return null;
}

KafkaSourceRecord record = new KafkaSourceRecord(transformedRecord);
return record;
}

private static final AvroData avroData = new AvroData(1000);
public SourceRecord applyTransforms(SourceRecord record) {
SourceRecord current = record;
for (PredicatedTransform pt : transformations) {
if (current == null) {
break;
}

if (pt.predicate != null && !(pt.negated != pt.predicate.test(current))) {
continue;
}

current = pt.transform.apply(current);
}
return current;
}

public class KafkaSourceRecord extends AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>>
implements KVRecord<byte[], byte[]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import java.io.File;
import java.io.OutputStream;
import java.nio.file.Files;
Expand All @@ -31,6 +33,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.schema.KeyValue;
Expand Down Expand Up @@ -101,6 +104,74 @@ public void testOpenAndReadTaskDirect() throws Exception {
testOpenAndReadTask(config);
}

@Test
void testTransformation() throws Exception {
Map<String, Object> config = setupTransformConfig(false, false);
runTransformTest(config, true);
}

@Test
void testTransformationWithPredicate() throws Exception {
Map<String, Object> config = setupTransformConfig(true, false);
runTransformTest(config, true);
}

@Test
void testTransformationWithNegatedPredicate() throws Exception {
Map<String, Object> config = setupTransformConfig(true, true);
runTransformTest(config, false);
}

private Map<String, Object> setupTransformConfig(boolean withPredicate, boolean negated) {
Map<String, Object> config = getConfig();
config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask");

if (withPredicate) {
config.put("predicates", "TopicMatch");
config.put("predicates.TopicMatch.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches");
config.put("predicates.TopicMatch.pattern", "test-topic");
}

config.put("transforms", "Cast");
config.put("transforms.Cast.type", "org.apache.kafka.connect.transforms.Cast$Value");
config.put("transforms.Cast.spec", "myField:int32");

if (withPredicate) {
config.put("transforms.Cast.predicate", "TopicMatch");
if (negated) {
config.put("transforms.Cast.negate", "true");
}
}

return config;
}

private void runTransformTest(Map<String, Object> config, boolean expectTransformed) throws Exception {
kafkaConnectSource = new KafkaConnectSource();
kafkaConnectSource.open(config, context);

Map<String, Object> value = new HashMap<>();
value.put("myField", "42");
SourceRecord record = new SourceRecord(
null, null, "test-topic", null,
null, null, null, value
);

SourceRecord transformed = kafkaConnectSource.applyTransforms(record);

@SuppressWarnings("unchecked")
Map<String, Object> transformedValue = (Map<String, Object>) transformed.value();
assertNotNull(transformedValue);

if (expectTransformed) {
assertEquals(42, ((Number)transformedValue.get("myField")).intValue());
assertTrue(transformedValue.get("myField") instanceof Number);
} else {
assertEquals("42", transformedValue.get("myField"));
assertTrue(transformedValue.get("myField") instanceof String);
}
}

private Map<String, Object> getConfig() {
Map<String, Object> config = new HashMap<>();

Expand Down
Loading