Skip to content

Commit 0eeab73

Browse files
Poorvankbhatiafapaul
authored andcommitted
[FLINK-32695] [Tests] Migrated TableFactoryHarness.ScanSourceBase to Source V2
1 parent a516014 commit 0eeab73

File tree

5 files changed

+126
-67
lines changed

5 files changed

+126
-67
lines changed

flink-table/flink-table-planner/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,13 @@ under the License.
170170
<scope>test</scope>
171171
</dependency>
172172

173+
<dependency>
174+
<groupId>org.apache.flink</groupId>
175+
<artifactId>flink-test-utils-connector</artifactId>
176+
<version>${project.version}</version>
177+
<scope>test</scope>
178+
</dependency>
179+
173180
<!-- ArchUnit test dependencies -->
174181
<dependency>
175182
<groupId>org.apache.flink</groupId>

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TableFactoryHarness.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818

1919
package org.apache.flink.table.planner.factories;
2020

21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
2122
import org.apache.flink.configuration.ConfigOption;
22-
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
23-
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
24-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
23+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
24+
import org.apache.flink.connector.datagen.source.GeneratorFunction;
25+
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
2526
import org.apache.flink.table.api.Schema;
2627
import org.apache.flink.table.api.TableDescriptor;
2728
import org.apache.flink.table.api.TableEnvironment;
@@ -31,10 +32,11 @@
3132
import org.apache.flink.table.catalog.DefaultCatalogTable;
3233
import org.apache.flink.table.connector.ChangelogMode;
3334
import org.apache.flink.table.connector.sink.DynamicTableSink;
34-
import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
35+
import org.apache.flink.table.connector.sink.SinkV2Provider;
3536
import org.apache.flink.table.connector.source.DynamicTableSource;
3637
import org.apache.flink.table.connector.source.LookupTableSource;
3738
import org.apache.flink.table.connector.source.ScanTableSource;
39+
import org.apache.flink.table.connector.source.SourceProvider;
3840
import org.apache.flink.table.data.RowData;
3941
import org.apache.flink.table.factories.DynamicTableFactory;
4042
import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -347,15 +349,15 @@ public ChangelogMode getChangelogMode() {
347349

348350
@Override
349351
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
350-
return SourceFunctionProvider.of(
351-
new SourceFunction<RowData>() {
352-
@Override
353-
public void run(SourceContext<RowData> ctx) {}
354-
355-
@Override
356-
public void cancel() {}
357-
},
358-
bounded);
352+
return SourceProvider.of(
353+
new DataGeneratorSource<>(
354+
(GeneratorFunction<Long, RowData>)
355+
value -> {
356+
throw new UnsupportedOperationException(
357+
"TableFactoryHarness no-op source should not generate data");
358+
},
359+
0L, // Generate 0 elements (no-op)
360+
TypeInformation.of(RowData.class)));
359361
}
360362
}
361363

@@ -398,11 +400,16 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
398400

399401
@Override
400402
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
401-
return SinkFunctionProvider.of(
402-
new SinkFunction<RowData>() {
403-
@Override
404-
public void invoke(RowData value, Context context1) {}
405-
});
403+
return SinkV2Provider.of(
404+
TestSinkV2.<RowData>newBuilder()
405+
.setWriter(
406+
new TestSinkV2.DefaultSinkWriter<RowData>() {
407+
@Override
408+
public void write(RowData element, Context context) {
409+
// No-op implementation for testing harness
410+
}
411+
})
412+
.build());
406413
}
407414

408415
@Override

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818

1919
package org.apache.flink.table.planner.functions;
2020

21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
2122
import org.apache.flink.client.program.MiniClusterClient;
2223
import org.apache.flink.configuration.Configuration;
2324
import org.apache.flink.configuration.StateBackendOptions;
24-
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
25+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
26+
import org.apache.flink.connector.datagen.source.GeneratorFunction;
2527
import org.apache.flink.runtime.minicluster.MiniCluster;
26-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
2728
import org.apache.flink.table.api.EnvironmentSettings;
2829
import org.apache.flink.table.api.Schema;
2930
import org.apache.flink.table.api.Table;
@@ -33,6 +34,7 @@
3334
import org.apache.flink.table.api.ValidationException;
3435
import org.apache.flink.table.connector.ChangelogMode;
3536
import org.apache.flink.table.connector.source.DynamicTableSource;
37+
import org.apache.flink.table.connector.source.SourceProvider;
3638
import org.apache.flink.table.data.RowData;
3739
import org.apache.flink.table.expressions.DefaultSqlFactory;
3840
import org.apache.flink.table.expressions.Expression;
@@ -132,7 +134,11 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
132134
final DataStructureConverter converter =
133135
context.createDataStructureConverter(producedDataType);
134136

135-
return SourceFunctionProvider.of(new Source(rows, converter), true);
137+
return SourceProvider.of(
138+
new DataGeneratorSource<>(
139+
new SourceGeneratorFunction(rows, converter),
140+
(long) rows.size(), // Generate exactly rows.size() elements
141+
TypeInformation.of(RowData.class)));
136142
}
137143
};
138144
}
@@ -689,22 +695,24 @@ protected Table query(TableEnvironment tEnv, Table sourceTable) {
689695

690696
// ---------------------------------------------------------------------------------------------
691697

692-
private static class Source implements SourceFunction<RowData> {
698+
private static class SourceGeneratorFunction implements GeneratorFunction<Long, RowData> {
693699

694700
private final List<Row> rows;
695701
private final DynamicTableSource.DataStructureConverter converter;
696702

697-
public Source(List<Row> rows, DynamicTableSource.DataStructureConverter converter) {
703+
public SourceGeneratorFunction(
704+
List<Row> rows, DynamicTableSource.DataStructureConverter converter) {
698705
this.rows = rows;
699706
this.converter = converter;
700707
}
701708

702709
@Override
703-
public void run(SourceContext<RowData> ctx) throws Exception {
704-
rows.stream().map(row -> (RowData) converter.toInternal(row)).forEach(ctx::collect);
710+
public RowData map(Long index) throws Exception {
711+
int idx = index.intValue();
712+
if (idx < rows.size()) {
713+
return (RowData) converter.toInternal(rows.get(idx));
714+
}
715+
throw new IllegalArgumentException("Index out of bounds: " + index);
705716
}
706-
707-
@Override
708-
public void cancel() {}
709717
}
710718
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@
1818

1919
package org.apache.flink.table.planner.functions;
2020

21-
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
22-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
21+
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
23+
import org.apache.flink.connector.datagen.source.GeneratorFunction;
2324
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
2425
import org.apache.flink.table.api.DataTypes;
2526
import org.apache.flink.table.api.EnvironmentSettings;
2627
import org.apache.flink.table.api.Schema;
2728
import org.apache.flink.table.api.TableDescriptor;
2829
import org.apache.flink.table.api.TableEnvironment;
2930
import org.apache.flink.table.connector.sink.SinkV2Provider;
31+
import org.apache.flink.table.connector.source.SourceProvider;
3032
import org.apache.flink.table.data.ArrayData;
3133
import org.apache.flink.table.data.MapData;
3234
import org.apache.flink.table.data.RowData;
@@ -158,16 +160,18 @@ private TestSource() {
158160

159161
@Override
160162
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
161-
return SourceFunctionProvider.of(new TestSourceFunction(), false);
163+
return SourceProvider.of(
164+
new DataGeneratorSource<>(
165+
new TestGeneratorFunction(),
166+
1L, // Generate exactly 1 element
167+
TypeInformation.of(RowData.class)));
162168
}
163169
}
164170

165-
private static class TestSourceFunction implements SourceFunction<RowData> {
166-
167-
public TestSourceFunction() {}
171+
private static class TestGeneratorFunction implements GeneratorFunction<Long, RowData> {
168172

169173
@Override
170-
public void run(SourceContext<RowData> ctx) {
174+
public RowData map(Long index) throws Exception {
171175
final BinaryRowData row = new BinaryRowData(4);
172176
final BinaryRowWriter writer = new BinaryRowWriter(row);
173177
writer.writeInt(0, 42);
@@ -196,11 +200,7 @@ public void run(SourceContext<RowData> ctx) {
196200
new MapDataSerializer(new DoubleType(), new BigIntType()));
197201
writer.complete();
198202

199-
ctx.collect(row);
200-
ctx.close();
203+
return row;
201204
}
202-
203-
@Override
204-
public void cancel() {}
205205
}
206206
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818

1919
package org.apache.flink.table.planner.plan.nodes.exec.common;
2020

21-
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
21+
import org.apache.flink.api.connector.source.ReaderOutput;
22+
import org.apache.flink.api.connector.source.SourceReader;
23+
import org.apache.flink.api.connector.source.SourceReaderContext;
24+
import org.apache.flink.api.connector.source.SplitEnumerator;
25+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
26+
import org.apache.flink.core.io.InputStatus;
2227
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
2328
import org.apache.flink.streaming.api.datastream.DataStream;
2429
import org.apache.flink.streaming.api.datastream.DataStreamSink;
2530
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2631
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
27-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
2832
import org.apache.flink.streaming.api.watermark.Watermark;
2933
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
3034
import org.apache.flink.table.api.DataTypes;
@@ -42,9 +46,14 @@
4246
import org.apache.flink.table.connector.sink.SinkV2Provider;
4347
import org.apache.flink.table.connector.source.DynamicTableSource;
4448
import org.apache.flink.table.connector.source.ScanTableSource;
49+
import org.apache.flink.table.connector.source.SourceProvider;
4550
import org.apache.flink.table.data.RowData;
4651
import org.apache.flink.table.planner.factories.TableFactoryHarness;
4752
import org.apache.flink.test.junit5.MiniClusterExtension;
53+
import org.apache.flink.test.util.source.AbstractTestSource;
54+
import org.apache.flink.test.util.source.SingleSplitEnumerator;
55+
import org.apache.flink.test.util.source.TestSourceReader;
56+
import org.apache.flink.test.util.source.TestSplit;
4857
import org.apache.flink.testutils.junit.SharedObjectsExtension;
4958
import org.apache.flink.testutils.junit.SharedReference;
5059
import org.apache.flink.types.Row;
@@ -55,6 +64,7 @@
5564

5665
import javax.annotation.Nullable;
5766

67+
import java.io.Serializable;
5868
import java.time.Instant;
5969
import java.util.ArrayList;
6070
import java.util.Arrays;
@@ -574,7 +584,8 @@ private static void assertTimestampResults(
574584
}
575585
}
576586

577-
private static class TestSource extends TableFactoryHarness.ScanSourceBase {
587+
private static class TestSource extends TableFactoryHarness.ScanSourceBase
588+
implements Serializable {
578589

579590
private final List<Row> rows;
580591

@@ -584,34 +595,60 @@ private TestSource(List<Row> rows) {
584595
}
585596

586597
@Override
587-
public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(
588-
ScanTableSource.ScanContext context) {
589-
final DynamicTableSource.DataStructureConverter converter =
598+
public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
599+
DynamicTableSource.DataStructureConverter converter =
590600
context.createDataStructureConverter(
591601
getFactoryContext().getPhysicalRowDataType());
592602

593-
return SourceFunctionProvider.of(new TestSourceFunction(rows, converter), false);
594-
}
595-
}
596-
597-
private static class TestSourceFunction implements SourceFunction<RowData> {
598-
599-
private final List<Row> rows;
600-
private final DynamicTableSource.DataStructureConverter converter;
601-
602-
public TestSourceFunction(
603-
List<Row> rows, DynamicTableSource.DataStructureConverter converter) {
604-
this.rows = rows;
605-
this.converter = converter;
606-
}
607-
608-
@Override
609-
public void run(SourceContext<RowData> ctx) throws Exception {
610-
rows.stream().map(row -> (RowData) converter.toInternal(row)).forEach(ctx::collect);
603+
return SourceProvider.of(
604+
new AbstractTestSource<>() {
605+
@Override
606+
public SplitEnumerator<TestSplit, Void> createEnumerator(
607+
SplitEnumeratorContext<TestSplit> enumCtx) {
608+
return new SingleSplitEnumerator(enumCtx);
609+
}
610+
611+
@Override
612+
public SourceReader<RowData, TestSplit> createReader(
613+
SourceReaderContext ctx) {
614+
// Each subtask gets a reader, but only one will actually receive the
615+
// split.
616+
return new TestSourceReader<>(ctx) {
617+
private boolean hasSplit = false;
618+
private boolean noMoreSplits = false;
619+
private boolean emitted = false;
620+
621+
@Override
622+
public void addSplits(List<TestSplit> splits) {
623+
// Mark reader as active if it got a split
624+
hasSplit = !splits.isEmpty();
625+
}
626+
627+
@Override
628+
public void notifyNoMoreSplits() {
629+
// Called for readers that never get a split → they can finish
630+
noMoreSplits = true;
631+
}
632+
633+
@Override
634+
public InputStatus pollNext(ReaderOutput<RowData> out) {
635+
if (hasSplit && !emitted) {
636+
rows.forEach(
637+
r ->
638+
out.collect(
639+
(RowData) converter.toInternal(r)));
640+
emitted = true;
641+
return InputStatus.END_OF_INPUT;
642+
}
643+
if (!hasSplit && noMoreSplits) {
644+
return InputStatus.END_OF_INPUT;
645+
}
646+
return InputStatus.NOTHING_AVAILABLE;
647+
}
648+
};
649+
}
650+
});
611651
}
612-
613-
@Override
614-
public void cancel() {}
615652
}
616653

617654
private static class TestTimestampWriter extends TestSinkV2.DefaultSinkWriter<RowData> {

0 commit comments

Comments
 (0)