Skip to content

Commit 4911d31

Browse files
efcasadolhotari
authored andcommitted
[improve][io] support kafka connect transforms and predicates (#24221)
(cherry picked from commit c0c5044)
1 parent e099f3a commit 4911d31

File tree

4 files changed

+203
-3
lines changed

4 files changed

+203
-3
lines changed

pulsar-io/kafka-connect-adaptor/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,18 @@
104104
</exclusions>
105105
</dependency>
106106

107+
<dependency>
108+
<groupId>org.apache.kafka</groupId>
109+
<artifactId>connect-transforms</artifactId>
110+
<version>${kafka-client.version}</version>
111+
<exclusions>
112+
<exclusion>
113+
<artifactId>jose4j</artifactId>
114+
<groupId>org.bitbucket.b_c</groupId>
115+
</exclusion>
116+
</exclusions>
117+
</dependency>
118+
107119
<!-- pulsar-client is only needed for MessageId conversion (for seeking), commons-lang3 and Netty buffer manipulation -->
108120
<dependency>
109121
<groupId>${project.groupId}</groupId>

pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public synchronized Record<T> read() throws Exception {
176176
}
177177
if (currentBatch.hasNext()) {
178178
AbstractKafkaSourceRecord<T> processRecord = processSourceRecord(currentBatch.next());
179-
if (processRecord.isEmpty()) {
179+
if (processRecord == null || processRecord.isEmpty()) {
180180
outstandingRecords.decrementAndGet();
181181
continue;
182182
} else {

pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import com.google.common.cache.Cache;
2222
import com.google.common.cache.CacheBuilder;
2323
import io.confluent.connect.avro.AvroData;
24+
import java.util.ArrayList;
2425
import java.util.Base64;
26+
import java.util.HashMap;
27+
import java.util.List;
2528
import java.util.Map;
2629
import java.util.Optional;
2730
import java.util.concurrent.TimeUnit;
@@ -31,6 +34,8 @@
3134
import org.apache.kafka.common.TopicPartition;
3235
import org.apache.kafka.connect.json.JsonConverterConfig;
3336
import org.apache.kafka.connect.source.SourceRecord;
37+
import org.apache.kafka.connect.transforms.Transformation;
38+
import org.apache.kafka.connect.transforms.predicates.Predicate;
3439
import org.apache.pulsar.client.api.Schema;
3540
import org.apache.pulsar.common.schema.KeyValue;
3641
import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -51,6 +56,15 @@ public class KafkaConnectSource extends AbstractKafkaConnectSource<KeyValue<byte
5156
private boolean jsonWithEnvelope = false;
5257
private static final String JSON_WITH_ENVELOPE_CONFIG = "json-with-envelope";
5358

59+
private Map<String, Predicate<SourceRecord>> predicates = new HashMap<>();
60+
61+
private record PredicatedTransform(
62+
Predicate<SourceRecord> predicate,
63+
Transformation<SourceRecord> transform,
64+
boolean negated
65+
) {}
66+
private List<PredicatedTransform> transformations = new ArrayList<>();
67+
5468
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
5569
if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) {
5670
jsonWithEnvelope = Boolean.parseBoolean(config.get(JSON_WITH_ENVELOPE_CONFIG).toString());
@@ -60,17 +74,120 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
6074
}
6175
log.info("jsonWithEnvelope: {}", jsonWithEnvelope);
6276

77+
initPredicates(config);
78+
initTransforms(config);
6379
super.open(config, sourceContext);
6480
}
6581

82+
private void initPredicates(Map<String, Object> config) {
83+
Object predicatesListObj = config.get("predicates");
84+
if (predicatesListObj != null) {
85+
String predicatesList = predicatesListObj.toString();
86+
for (String predicateName : predicatesList.split(",")) {
87+
predicateName = predicateName.trim();
88+
String prefix = "predicates." + predicateName + ".";
89+
String typeKey = prefix + "type";
90+
Object classNameObj = config.get(typeKey);
91+
if (classNameObj == null) {
92+
continue;
93+
}
94+
String className = classNameObj.toString();
95+
try {
96+
@SuppressWarnings("unchecked")
97+
Class<Predicate<SourceRecord>> clazz =
98+
(Class<Predicate<SourceRecord>>) Class.forName(className);
99+
Predicate<SourceRecord> predicate = clazz.getDeclaredConstructor().newInstance();
100+
java.util.Map<String, Object> predicateConfig = config.entrySet().stream()
101+
.filter(e -> e.getKey().startsWith(prefix))
102+
.collect(java.util.stream.Collectors.toMap(
103+
e -> e.getKey().substring(prefix.length()),
104+
java.util.Map.Entry::getValue
105+
));
106+
log.info("predicate config: {}", predicateConfig);
107+
predicate.configure(predicateConfig);
108+
predicates.put(predicateName, predicate);
109+
} catch (Exception e) {
110+
throw new RuntimeException("Failed to instantiate predicate: " + className, e);
111+
}
112+
}
113+
}
114+
}
115+
116+
private void initTransforms(Map<String, Object> config) {
117+
transformations.clear();
118+
Object transformsListObj = config.get("transforms");
119+
if (transformsListObj != null) {
120+
String transformsList = transformsListObj.toString();
121+
for (String transformName : transformsList.split(",")) {
122+
transformName = transformName.trim();
123+
String prefix = "transforms." + transformName + ".";
124+
String typeKey = prefix + "type";
125+
Object classNameObj = config.get(typeKey);
126+
if (classNameObj == null) {
127+
continue;
128+
}
129+
String className = classNameObj.toString();
130+
try {
131+
@SuppressWarnings("unchecked")
132+
Class<Transformation<SourceRecord>> clazz =
133+
(Class<Transformation<SourceRecord>>) Class.forName(className);
134+
Transformation<SourceRecord> transform = clazz.getDeclaredConstructor().newInstance();
135+
java.util.Map<String, Object> transformConfig = config.entrySet().stream()
136+
.filter(e -> e.getKey().startsWith(prefix))
137+
.collect(java.util.stream.Collectors.toMap(
138+
e -> e.getKey().substring(prefix.length()),
139+
java.util.Map.Entry::getValue
140+
));
141+
log.info("transform config: {}", transformConfig);
142+
String predicateName = (String) transformConfig.get("predicate");
143+
boolean negated = Boolean.parseBoolean(
144+
String.valueOf(transformConfig.getOrDefault("negate", "false")));
145+
Predicate<SourceRecord> predicate = null;
146+
if (predicateName != null) {
147+
predicate = predicates.get(predicateName);
148+
if (predicate == null) {
149+
log.warn("Transform {} references non-existent predicate: {}",
150+
transformName, predicateName);
151+
}
152+
}
153+
transform.configure(transformConfig);
154+
transformations.add(new PredicatedTransform(predicate, transform, negated));
155+
} catch (Exception e) {
156+
throw new RuntimeException("Failed to instantiate SMT: " + className, e);
157+
}
158+
}
159+
}
160+
}
161+
162+
private static final AvroData avroData = new AvroData(1000);
66163

67164
public synchronized KafkaSourceRecord processSourceRecord(final SourceRecord srcRecord) {
68-
KafkaSourceRecord record = new KafkaSourceRecord(srcRecord);
165+
SourceRecord transformedRecord = applyTransforms(srcRecord);
166+
69167
offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset());
168+
if (transformedRecord == null) {
169+
return null;
170+
}
171+
172+
KafkaSourceRecord record = new KafkaSourceRecord(transformedRecord);
70173
return record;
71174
}
72175

73-
private static final AvroData avroData = new AvroData(1000);
176+
public SourceRecord applyTransforms(SourceRecord record) {
177+
SourceRecord current = record;
178+
for (PredicatedTransform pt : transformations) {
179+
if (current == null) {
180+
break;
181+
}
182+
183+
if (pt.predicate != null && !(pt.negated != pt.predicate.test(current))) {
184+
continue;
185+
}
186+
187+
current = pt.transform.apply(current);
188+
}
189+
return current;
190+
}
74191

75192
public class KafkaSourceRecord extends AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>>
76193
implements KVRecord<byte[], byte[]> {

pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import static org.mockito.Mockito.mock;
2222
import static org.mockito.Mockito.when;
2323
import static org.testng.Assert.assertFalse;
24+
import static org.testng.Assert.assertNotNull;
2425
import static org.testng.Assert.assertNull;
2526
import static org.testng.Assert.assertTrue;
27+
import static org.testng.Assert.assertEquals;
2628
import java.io.File;
2729
import java.io.OutputStream;
2830
import java.nio.file.Files;
@@ -31,6 +33,7 @@
3133
import lombok.extern.slf4j.Slf4j;
3234
import org.apache.kafka.connect.file.FileStreamSourceConnector;
3335
import org.apache.kafka.connect.runtime.TaskConfig;
36+
import org.apache.kafka.connect.source.SourceRecord;
3437
import org.apache.pulsar.client.api.ProducerConsumerBase;
3538
import org.apache.pulsar.client.api.PulsarClient;
3639
import org.apache.pulsar.common.schema.KeyValue;
@@ -101,6 +104,74 @@ public void testOpenAndReadTaskDirect() throws Exception {
101104
testOpenAndReadTask(config);
102105
}
103106

107+
@Test
108+
void testTransformation() throws Exception {
109+
Map<String, Object> config = setupTransformConfig(false, false);
110+
runTransformTest(config, true);
111+
}
112+
113+
@Test
114+
void testTransformationWithPredicate() throws Exception {
115+
Map<String, Object> config = setupTransformConfig(true, false);
116+
runTransformTest(config, true);
117+
}
118+
119+
@Test
120+
void testTransformationWithNegatedPredicate() throws Exception {
121+
Map<String, Object> config = setupTransformConfig(true, true);
122+
runTransformTest(config, false);
123+
}
124+
125+
private Map<String, Object> setupTransformConfig(boolean withPredicate, boolean negated) {
126+
Map<String, Object> config = getConfig();
127+
config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask");
128+
129+
if (withPredicate) {
130+
config.put("predicates", "TopicMatch");
131+
config.put("predicates.TopicMatch.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches");
132+
config.put("predicates.TopicMatch.pattern", "test-topic");
133+
}
134+
135+
config.put("transforms", "Cast");
136+
config.put("transforms.Cast.type", "org.apache.kafka.connect.transforms.Cast$Value");
137+
config.put("transforms.Cast.spec", "myField:int32");
138+
139+
if (withPredicate) {
140+
config.put("transforms.Cast.predicate", "TopicMatch");
141+
if (negated) {
142+
config.put("transforms.Cast.negate", "true");
143+
}
144+
}
145+
146+
return config;
147+
}
148+
149+
private void runTransformTest(Map<String, Object> config, boolean expectTransformed) throws Exception {
150+
kafkaConnectSource = new KafkaConnectSource();
151+
kafkaConnectSource.open(config, context);
152+
153+
Map<String, Object> value = new HashMap<>();
154+
value.put("myField", "42");
155+
SourceRecord record = new SourceRecord(
156+
null, null, "test-topic", null,
157+
null, null, null, value
158+
);
159+
160+
SourceRecord transformed = kafkaConnectSource.applyTransforms(record);
161+
162+
@SuppressWarnings("unchecked")
163+
Map<String, Object> transformedValue = (Map<String, Object>) transformed.value();
164+
assertNotNull(transformedValue);
165+
166+
if (expectTransformed) {
167+
assertEquals(42, ((Number)transformedValue.get("myField")).intValue());
168+
assertTrue(transformedValue.get("myField") instanceof Number);
169+
} else {
170+
assertEquals("42", transformedValue.get("myField"));
171+
assertTrue(transformedValue.get("myField") instanceof String);
172+
}
173+
}
174+
104175
private Map<String, Object> getConfig() {
105176
Map<String, Object> config = new HashMap<>();
106177

0 commit comments

Comments
 (0)