2121import com .google .common .cache .Cache ;
2222import com .google .common .cache .CacheBuilder ;
2323import io .confluent .connect .avro .AvroData ;
24+ import java .util .ArrayList ;
2425import java .util .Base64 ;
26+ import java .util .HashMap ;
27+ import java .util .List ;
2528import java .util .Map ;
2629import java .util .Optional ;
2730import java .util .concurrent .TimeUnit ;
3134import org .apache .kafka .common .TopicPartition ;
3235import org .apache .kafka .connect .json .JsonConverterConfig ;
3336import org .apache .kafka .connect .source .SourceRecord ;
37+ import org .apache .kafka .connect .transforms .Transformation ;
38+ import org .apache .kafka .connect .transforms .predicates .Predicate ;
3439import org .apache .pulsar .client .api .Schema ;
3540import org .apache .pulsar .common .schema .KeyValue ;
3641import org .apache .pulsar .common .schema .KeyValueEncodingType ;
@@ -51,6 +56,15 @@ public class KafkaConnectSource extends AbstractKafkaConnectSource<KeyValue<byte
5156 private boolean jsonWithEnvelope = false ;
5257 private static final String JSON_WITH_ENVELOPE_CONFIG = "json-with-envelope" ;
5358
59+ private Map <String , Predicate <SourceRecord >> predicates = new HashMap <>();
60+
61+ private record PredicatedTransform (
62+ Predicate <SourceRecord > predicate ,
63+ Transformation <SourceRecord > transform ,
64+ boolean negated
65+ ) {}
66+ private List <PredicatedTransform > transformations = new ArrayList <>();
67+
5468 public void open (Map <String , Object > config , SourceContext sourceContext ) throws Exception {
5569 if (config .get (JSON_WITH_ENVELOPE_CONFIG ) != null ) {
5670 jsonWithEnvelope = Boolean .parseBoolean (config .get (JSON_WITH_ENVELOPE_CONFIG ).toString ());
@@ -60,17 +74,120 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
6074 }
6175 log .info ("jsonWithEnvelope: {}" , jsonWithEnvelope );
6276
77+ initPredicates (config );
78+ initTransforms (config );
6379 super .open (config , sourceContext );
6480 }
6581
82+ private void initPredicates (Map <String , Object > config ) {
83+ Object predicatesListObj = config .get ("predicates" );
84+ if (predicatesListObj != null ) {
85+ String predicatesList = predicatesListObj .toString ();
86+ for (String predicateName : predicatesList .split ("," )) {
87+ predicateName = predicateName .trim ();
88+ String prefix = "predicates." + predicateName + "." ;
89+ String typeKey = prefix + "type" ;
90+ Object classNameObj = config .get (typeKey );
91+ if (classNameObj == null ) {
92+ continue ;
93+ }
94+ String className = classNameObj .toString ();
95+ try {
96+ @ SuppressWarnings ("unchecked" )
97+ Class <Predicate <SourceRecord >> clazz =
98+ (Class <Predicate <SourceRecord >>) Class .forName (className );
99+ Predicate <SourceRecord > predicate = clazz .getDeclaredConstructor ().newInstance ();
100+ java .util .Map <String , Object > predicateConfig = config .entrySet ().stream ()
101+ .filter (e -> e .getKey ().startsWith (prefix ))
102+ .collect (java .util .stream .Collectors .toMap (
103+ e -> e .getKey ().substring (prefix .length ()),
104+ java .util .Map .Entry ::getValue
105+ ));
106+ log .info ("predicate config: {}" , predicateConfig );
107+ predicate .configure (predicateConfig );
108+ predicates .put (predicateName , predicate );
109+ } catch (Exception e ) {
110+ throw new RuntimeException ("Failed to instantiate predicate: " + className , e );
111+ }
112+ }
113+ }
114+ }
115+
116+ private void initTransforms (Map <String , Object > config ) {
117+ transformations .clear ();
118+ Object transformsListObj = config .get ("transforms" );
119+ if (transformsListObj != null ) {
120+ String transformsList = transformsListObj .toString ();
121+ for (String transformName : transformsList .split ("," )) {
122+ transformName = transformName .trim ();
123+ String prefix = "transforms." + transformName + "." ;
124+ String typeKey = prefix + "type" ;
125+ Object classNameObj = config .get (typeKey );
126+ if (classNameObj == null ) {
127+ continue ;
128+ }
129+ String className = classNameObj .toString ();
130+ try {
131+ @ SuppressWarnings ("unchecked" )
132+ Class <Transformation <SourceRecord >> clazz =
133+ (Class <Transformation <SourceRecord >>) Class .forName (className );
134+ Transformation <SourceRecord > transform = clazz .getDeclaredConstructor ().newInstance ();
135+ java .util .Map <String , Object > transformConfig = config .entrySet ().stream ()
136+ .filter (e -> e .getKey ().startsWith (prefix ))
137+ .collect (java .util .stream .Collectors .toMap (
138+ e -> e .getKey ().substring (prefix .length ()),
139+ java .util .Map .Entry ::getValue
140+ ));
141+ log .info ("transform config: {}" , transformConfig );
142+ String predicateName = (String ) transformConfig .get ("predicate" );
143+ boolean negated = Boolean .parseBoolean (
144+ String .valueOf (transformConfig .getOrDefault ("negate" , "false" )));
145+ Predicate <SourceRecord > predicate = null ;
146+ if (predicateName != null ) {
147+ predicate = predicates .get (predicateName );
148+ if (predicate == null ) {
149+ log .warn ("Transform {} references non-existent predicate: {}" ,
150+ transformName , predicateName );
151+ }
152+ }
153+ transform .configure (transformConfig );
154+ transformations .add (new PredicatedTransform (predicate , transform , negated ));
155+ } catch (Exception e ) {
156+ throw new RuntimeException ("Failed to instantiate SMT: " + className , e );
157+ }
158+ }
159+ }
160+ }
161+
162+ private static final AvroData avroData = new AvroData (1000 );
66163
67164 public synchronized KafkaSourceRecord processSourceRecord (final SourceRecord srcRecord ) {
68- KafkaSourceRecord record = new KafkaSourceRecord (srcRecord );
165+ SourceRecord transformedRecord = applyTransforms (srcRecord );
166+
69167 offsetWriter .offset (srcRecord .sourcePartition (), srcRecord .sourceOffset ());
168+ if (transformedRecord == null ) {
169+ return null ;
170+ }
171+
172+ KafkaSourceRecord record = new KafkaSourceRecord (transformedRecord );
70173 return record ;
71174 }
72175
73- private static final AvroData avroData = new AvroData (1000 );
176+ public SourceRecord applyTransforms (SourceRecord record ) {
177+ SourceRecord current = record ;
178+ for (PredicatedTransform pt : transformations ) {
179+ if (current == null ) {
180+ break ;
181+ }
182+
183+ if (pt .predicate != null && !(pt .negated != pt .predicate .test (current ))) {
184+ continue ;
185+ }
186+
187+ current = pt .transform .apply (current );
188+ }
189+ return current ;
190+ }
74191
75192 public class KafkaSourceRecord extends AbstractKafkaSourceRecord <KeyValue <byte [], byte []>>
76193 implements KVRecord <byte [], byte []> {
0 commit comments