2828import org .apache .flink .streaming .api .datastream .DataStream ;
2929import org .apache .flink .streaming .api .datastream .DataStreamSource ;
3030import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
31- import org .apache .flink .streaming .connectors .kafka .KafkaDeserializationSchema ;
3231import org .apache .flink .streaming .connectors .kafka .config .BoundedMode ;
3332import org .apache .flink .streaming .connectors .kafka .config .StartupMode ;
34- import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartition ;
3533import org .apache .flink .streaming .connectors .kafka .table .DynamicKafkaDeserializationSchema .MetadataConverter ;
3634import org .apache .flink .table .api .DataTypes ;
3735import org .apache .flink .table .connector .ChangelogMode ;
4846import org .apache .flink .table .data .StringData ;
4947import org .apache .flink .table .data .TimestampData ;
5048import org .apache .flink .table .types .DataType ;
51- import org .apache .flink .table .types .utils .DataTypeUtils ;
49+ import org .apache .flink .table .types .FieldsDataType ;
50+ import org .apache .flink .table .types .logical .LogicalType ;
51+ import org .apache .flink .table .types .logical .RowType ;
52+ import org .apache .flink .table .types .logical .utils .LogicalTypeUtils ;
5253import org .apache .flink .util .Preconditions ;
5354
5455import com .datasqrl .flinkrunner .connector .kafka .DeserFailureHandler ;
6970import java .util .Locale ;
7071import java .util .Map ;
7172import java .util .Objects ;
73+ import java .util .Optional ;
7274import java .util .Properties ;
7375import java .util .regex .Pattern ;
7476import java .util .stream .Collectors ;
7577import java .util .stream .IntStream ;
7678import java .util .stream .Stream ;
7779
80+ import static org .apache .flink .table .types .logical .LogicalTypeRoot .ROW ;
81+
7882/** A version-agnostic Kafka {@link ScanTableSource}. */
7983@ Internal
8084public class SafeKafkaDynamicSource
@@ -141,7 +145,7 @@ public class SafeKafkaDynamicSource
141145 * Specific startup offsets; only relevant when startup mode is {@link
142146 * StartupMode#SPECIFIC_OFFSETS}.
143147 */
144- protected final Map <KafkaTopicPartition , Long > specificStartupOffsets ;
148+ protected final Map <TopicPartition , Long > specificStartupOffsets ;
145149
146150 /**
147151 * The start timestamp to locate partition offsets; only relevant when startup mode is {@link
@@ -156,7 +160,7 @@ public class SafeKafkaDynamicSource
156160 * Specific end offsets; only relevant when bounded mode is {@link
157161 * BoundedMode#SPECIFIC_OFFSETS}.
158162 */
159- protected final Map <KafkaTopicPartition , Long > specificBoundedOffsets ;
163+ protected final Map <TopicPartition , Long > specificBoundedOffsets ;
160164
161165 /**
162166 * The bounded timestamp to locate partition offsets; only relevant when bounded mode is {@link
@@ -169,6 +173,9 @@ public class SafeKafkaDynamicSource
169173
170174 protected final String tableIdentifier ;
171175
176+ /** Parallelism of the physical Kafka consumer. * */
177+ protected final @ Nullable Integer parallelism ;
178+
172179 protected final DeserFailureHandler deserFailureHandler ;
173180
174181 public SafeKafkaDynamicSource (
@@ -182,13 +189,14 @@ public SafeKafkaDynamicSource(
182189 @ Nullable Pattern topicPattern ,
183190 Properties properties ,
184191 StartupMode startupMode ,
185- Map <KafkaTopicPartition , Long > specificStartupOffsets ,
192+ Map <TopicPartition , Long > specificStartupOffsets ,
186193 long startupTimestampMillis ,
187194 BoundedMode boundedMode ,
188- Map <KafkaTopicPartition , Long > specificBoundedOffsets ,
195+ Map <TopicPartition , Long > specificBoundedOffsets ,
189196 long boundedTimestampMillis ,
190197 boolean upsertMode ,
191198 String tableIdentifier ,
199+ @ Nullable Integer parallelism ,
192200 DeserFailureHandler deserFailureHandler ) {
193201 // Format attributes
194202 this .physicalDataType =
@@ -229,6 +237,7 @@ public SafeKafkaDynamicSource(
229237 this .boundedTimestampMillis = boundedTimestampMillis ;
230238 this .upsertMode = upsertMode ;
231239 this .tableIdentifier = tableIdentifier ;
240+ this .parallelism = parallelism ;
232241 this .deserFailureHandler = deserFailureHandler ;
233242 }
234243
@@ -269,6 +278,11 @@ public DataStream<RowData> produceDataStream(
269278 public boolean isBounded () {
270279 return kafkaSource .getBoundedness () == Boundedness .BOUNDED ;
271280 }
281+
282+ @ Override
283+ public Optional <Integer > getParallelism () {
284+ return Optional .ofNullable (parallelism );
285+ }
272286 };
273287 }
274288
@@ -347,6 +361,7 @@ public DynamicTableSource copy() {
347361 boundedTimestampMillis ,
348362 upsertMode ,
349363 tableIdentifier ,
364+ parallelism ,
350365 deserFailureHandler );
351366 copy .producedDataType = producedDataType ;
352367 copy .metadataKeys = metadataKeys ;
@@ -387,7 +402,8 @@ public boolean equals(Object o) {
387402 && boundedTimestampMillis == that .boundedTimestampMillis
388403 && Objects .equals (upsertMode , that .upsertMode )
389404 && Objects .equals (tableIdentifier , that .tableIdentifier )
390- && Objects .equals (watermarkStrategy , that .watermarkStrategy );
405+ && Objects .equals (watermarkStrategy , that .watermarkStrategy )
406+ && Objects .equals (parallelism , that .parallelism );
391407 }
392408
393409 @ Override
@@ -412,7 +428,8 @@ public int hashCode() {
412428 boundedTimestampMillis ,
413429 upsertMode ,
414430 tableIdentifier ,
415- watermarkStrategy );
431+ watermarkStrategy ,
432+ parallelism );
416433 }
417434
418435 // --------------------------------------------------------------------------------------------
@@ -422,7 +439,7 @@ protected KafkaSource<RowData> createKafkaSource(
422439 DeserializationSchema <RowData > valueDeserialization ,
423440 TypeInformation <RowData > producedTypeInfo ) {
424441
425- final KafkaDeserializationSchema <RowData > kafkaDeserializer =
442+ final KafkaRecordDeserializationSchema <RowData > kafkaDeserializer =
426443 createKafkaDeserializationSchema (
427444 keyDeserialization , valueDeserialization , producedTypeInfo );
428445
@@ -455,8 +472,7 @@ protected KafkaSource<RowData> createKafkaSource(
455472 specificStartupOffsets .forEach (
456473 (tp , offset ) ->
457474 offsets .put (
458- new TopicPartition (tp .getTopic (), tp .getPartition ()),
459- offset ));
475+ new TopicPartition (tp .topic (), tp .partition ()), offset ));
460476 kafkaSourceBuilder .setStartingOffsets (OffsetsInitializer .offsets (offsets ));
461477 break ;
462478 case TIMESTAMP :
@@ -480,18 +496,15 @@ protected KafkaSource<RowData> createKafkaSource(
480496 specificBoundedOffsets .forEach (
481497 (tp , offset ) ->
482498 offsets .put (
483- new TopicPartition (tp .getTopic (), tp .getPartition ()),
484- offset ));
499+ new TopicPartition (tp .topic (), tp .partition ()), offset ));
485500 kafkaSourceBuilder .setBounded (OffsetsInitializer .offsets (offsets ));
486501 break ;
487502 case TIMESTAMP :
488503 kafkaSourceBuilder .setBounded (OffsetsInitializer .timestamp (boundedTimestampMillis ));
489504 break ;
490505 }
491506
492- kafkaSourceBuilder
493- .setProperties (properties )
494- .setDeserializer (KafkaRecordDeserializationSchema .of (kafkaDeserializer ));
507+ kafkaSourceBuilder .setProperties (properties ).setDeserializer (kafkaDeserializer );
495508
496509 return kafkaSourceBuilder .build ();
497510 }
@@ -513,7 +526,7 @@ private OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
513526 .collect (Collectors .joining ("," )))));
514527 }
515528
516- private KafkaDeserializationSchema <RowData > createKafkaDeserializationSchema (
529+ private KafkaRecordDeserializationSchema <RowData > createKafkaDeserializationSchema (
517530 DeserializationSchema <RowData > keyDeserialization ,
518531 DeserializationSchema <RowData > valueDeserialization ,
519532 TypeInformation <RowData > producedTypeInfo ) {
@@ -567,11 +580,30 @@ private KafkaDeserializationSchema<RowData> createKafkaDeserializationSchema(
567580 }
568581 DataType physicalFormatDataType = Projection .of (projection ).project (this .physicalDataType );
569582 if (prefix != null ) {
570- physicalFormatDataType = DataTypeUtils . stripRowPrefix (physicalFormatDataType , prefix );
583+ physicalFormatDataType = stripRowPrefix (physicalFormatDataType , prefix );
571584 }
572585 return format .createRuntimeDecoder (context , physicalFormatDataType );
573586 }
574587
588+ /** Removes a string prefix from the fields of the given row data type. */
589+ private static DataType stripRowPrefix (DataType dataType , String prefix ) {
590+ Preconditions .checkArgument (dataType .getLogicalType ().is (ROW ), "Row data type expected." );
591+ final RowType rowType = (RowType ) dataType .getLogicalType ();
592+ final List <String > newFieldNames =
593+ rowType .getFieldNames ().stream ()
594+ .map (
595+ s -> {
596+ if (s .startsWith (prefix )) {
597+ return s .substring (prefix .length ());
598+ }
599+ return s ;
600+ })
601+ .collect (Collectors .toList ());
602+ final LogicalType newRowType = LogicalTypeUtils .renameRowFields (rowType , newFieldNames );
603+ return new FieldsDataType (
604+ newRowType , dataType .getConversionClass (), dataType .getChildren ());
605+ }
606+
575607 // --------------------------------------------------------------------------------------------
576608 // Metadata handling
577609 // --------------------------------------------------------------------------------------------
0 commit comments