-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][io] support kafka connect transforms and predicates #24221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@efcasado Please add the following content to your PR description and select a checkbox: |
|
Thanks for the contribution, @efcasado! |
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@efcasado Are you also planning to contribute to the Pulsar IO Debezium connectors? Debezium version is outdated in Pulsar: Line 204 in a4a3409
It would be great to get that updated too. Would you be interested in handling that too? Previous upgrading attempt was #23078, but that's stalled. Could we get Debezium upgraded to 3.1.x ? |
|
@efcasado In SMT, there's also predicates (Confluent doc for predicates). Do you have intention to add support for that too? |
I guess it makes sense to include them in this |
Actually, that's how this |
It's fine to do it in this PR. |
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work! Added a comment about supporting negate for predicates. It would be great to have some unit test coverage too.
...fka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
Outdated
Show resolved
Hide resolved
...fka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
Outdated
Show resolved
Hide resolved
...connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
Outdated
Show resolved
Hide resolved
|
@lhotari I believe I addressed all the above points and the change is ready for another round of reviews 😊 |
...fka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
Show resolved
Hide resolved
...fka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
Outdated
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #24221 +/- ##
============================================
+ Coverage 73.57% 74.18% +0.61%
+ Complexity 32624 32555 -69
============================================
Files 1877 1866 -11
Lines 139502 144919 +5417
Branches 15299 16557 +1258
============================================
+ Hits 102638 107508 +4870
+ Misses 28908 28897 -11
- Partials 7956 8514 +558
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
Great work on this PR @efcasado! Merging this soon.
Looking forward to future contributions! |
(cherry picked from commit c0c5044)
|
@lhotari Will this change be cherry-picked for |
I don't see barriers in cherry-picking to 3.3.x since this isn't introducing breaking changes. I'll mark it for 3.0.x and 3.3.x too. |
(cherry picked from commit c0c5044)
(cherry picked from commit c0c5044)
Motivation
The current implementation of the Kafka Connect adaptor in Pulsar IO does not support Kafka Connect transforms and predicates. This limits the flexibility and potential of using Kafka Connect-based source connectors, such as the Debezium PostgreSQL source connector, which rely heavily on transformations to modify, filter, or enrich records before they are ingested. By adding support for transforms, we enable users to fully leverage these connectors' capabilities, sparing them the need to deploy additional Pulsar Functions to perform simple message-level transformations.
Modifications
Extended
KafkaConnectSourcewith three additional steps:initPredicates,initTransformsandapplyTransforms.initPredicatesandinitTransformsare called during the initialization of the Kafka Connect adaptor and are responsible for setting up any configured predicates and transforms.applyTransformsis executed duringprocessRecordsand applies all available transforms to each source record before it is processed.kafka-connect-adaptorhas been updated to include theconnect-transformspackage.Verifying this change
This change added tests and can be verified as follows:
KafkaConnectSourceTesttest to verify that transforms can be initialized and appliedDocumentation
docdoc-requireddoc-not-neededdoc-complete