From 032dd0ec1bdea16837b6c18b600ac9925619907c Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 18 Aug 2025 12:53:47 +0200 Subject: [PATCH] [FLINK-38257][SQL] Add module dynamic function loading possibility --- .../table/SavepointMetadataTableFunction.java | 39 +++++++++ .../flink/state/table/module/StateModule.java | 81 ++++++++++--------- ...namicBuiltInFunctionDefinitionFactory.java | 37 +++++++++ .../module/ExampleDynamicTableFunction.java | 56 +++++++++++++ .../state/table/module/StateModuleTest.java | 66 +++++++++++++++ ...ns.DynamicBuiltInFunctionDefinitionFactory | 16 ++++ ...namicBuiltInFunctionDefinitionFactory.java | 41 ++++++++++ 7 files changed, 297 insertions(+), 39 deletions(-) create mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/ExampleDynamicBuiltInFunctionDefinitionFactory.java create mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/ExampleDynamicTableFunction.java create mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/StateModuleTest.java create mode 100644 flink-libraries/flink-state-processing-api/src/test/resources/META-INF/services/org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/DynamicBuiltInFunctionDefinitionFactory.java diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java index adf170320bd3d..5d7a6ee673675 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java @@ -24,10 +24,15 @@ import org.apache.flink.state.api.runtime.SavepointLoader; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.flink.table.functions.SpecializedFunction; import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.inference.TypeStrategies; import org.apache.flink.types.Row; +import static org.apache.flink.table.functions.FunctionKind.TABLE; + @Internal @FunctionHint( output = @@ -41,6 +46,40 @@ + "operator-coordinator-state-size-in-bytes BIGINT NOT NULL, " + "operator-total-size-in-bytes BIGINT NOT NULL>")) public class SavepointMetadataTableFunction extends TableFunction { + + public static final BuiltInFunctionDefinition SAVEPOINT_METADATA = + BuiltInFunctionDefinition.newBuilder() + .name("savepoint_metadata") + .kind(TABLE) + .runtimeClass(SavepointMetadataTableFunction.class.getName()) + .outputTypeStrategy( + TypeStrategies.explicit( + DataTypes.ROW( + DataTypes.FIELD( + "checkpoint-id", DataTypes.BIGINT().notNull()), + DataTypes.FIELD("operator-name", DataTypes.STRING()), + DataTypes.FIELD("operator-uid", DataTypes.STRING()), + DataTypes.FIELD( + "operator-uid-hash", + DataTypes.STRING().notNull()), + DataTypes.FIELD( + "operator-parallelism", + DataTypes.INT().notNull()), + DataTypes.FIELD( + "operator-max-parallelism", + DataTypes.INT().notNull()), + DataTypes.FIELD( + "operator-subtask-state-count", + DataTypes.INT().notNull()), + DataTypes.FIELD( + "operator-coordinator-state-size-in-bytes", + DataTypes.BIGINT().notNull()), + DataTypes.FIELD( + "operator-total-size-in-bytes", + DataTypes.BIGINT().notNull())))) + .notDeterministic() + .build(); + public SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) {} public void eval(String savepointPath) { diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java index c69a407dcf265..b95973fbabb07 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java @@ -19,61 +19,34 @@ package org.apache.flink.state.table.module; import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.state.table.SavepointMetadataTableFunction; -import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.functions.BuiltInFunctionDefinition; +import org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.module.Module; -import org.apache.flink.table.types.inference.TypeStrategies; -import java.util.Collections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.ServiceLoader; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.flink.table.functions.FunctionKind.TABLE; - /** Module of state in Flink. */ @Experimental public class StateModule implements Module { - public static final String IDENTIFIER = "state"; + private static final Logger LOG = LoggerFactory.getLogger(StateModule.class); - public static final BuiltInFunctionDefinition SAVEPOINT_METADATA = - BuiltInFunctionDefinition.newBuilder() - .name("savepoint_metadata") - .kind(TABLE) - .runtimeClass(SavepointMetadataTableFunction.class.getName()) - .outputTypeStrategy( - TypeStrategies.explicit( - DataTypes.ROW( - DataTypes.FIELD( - "checkpoint-id", DataTypes.BIGINT().notNull()), - DataTypes.FIELD("operator-name", DataTypes.STRING()), - DataTypes.FIELD("operator-uid", DataTypes.STRING()), - DataTypes.FIELD( - "operator-uid-hash", - DataTypes.STRING().notNull()), - DataTypes.FIELD( - "operator-parallelism", - DataTypes.INT().notNull()), - DataTypes.FIELD( - "operator-max-parallelism", - DataTypes.INT().notNull()), - DataTypes.FIELD( - "operator-subtask-state-count", - DataTypes.INT().notNull()), - DataTypes.FIELD( - "operator-coordinator-state-size-in-bytes", - DataTypes.BIGINT().notNull()), - DataTypes.FIELD( - "operator-total-size-in-bytes", - DataTypes.BIGINT().notNull())))) - .build(); + public static final String IDENTIFIER = "state"; public static final StateModule INSTANCE = new StateModule(); @@ -82,8 +55,19 @@ public class StateModule implements Module { private final Set functionNamesWithoutInternal; private StateModule() { - final List definitions = - Collections.singletonList(SAVEPOINT_METADATA); + final List definitions = new ArrayList<>(); + + definitions.add(SavepointMetadataTableFunction.SAVEPOINT_METADATA); + ServiceLoader.load(DynamicBuiltInFunctionDefinitionFactory.class) + .iterator() + .forEachRemaining( + f -> { + if (f.factoryIdentifier().startsWith(IDENTIFIER + ".")) { + definitions.addAll(f.getBuiltInFunctionDefinitions()); + } + }); + checkDuplicatedFunctions(definitions); + this.normalizedFunctions = definitions.stream() .collect( @@ -101,6 +85,25 @@ private StateModule() { .collect(Collectors.toSet()); } + @VisibleForTesting + static void checkDuplicatedFunctions(List definitions) { + Set seen = new HashSet<>(); + Set duplicates = new HashSet<>(); + + for (BuiltInFunctionDefinition definition : definitions) { + String name = definition.getName(); + if (!seen.add(name)) { + duplicates.add(name); + } + } + + if (!duplicates.isEmpty()) { + String error = "Duplicate function names found: " + String.join(",", duplicates); + LOG.error(error); + throw new IllegalStateException(error); + } + } + @Override public Set listFunctions() { return listFunctions(false); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/ExampleDynamicBuiltInFunctionDefinitionFactory.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/ExampleDynamicBuiltInFunctionDefinitionFactory.java new file mode 100644 index 0000000000000..aa6f61898b372 --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/ExampleDynamicBuiltInFunctionDefinitionFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table.module; + +import org.apache.flink.table.functions.BuiltInFunctionDefinition; +import org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory; + +import java.util.List; + +public class ExampleDynamicBuiltInFunctionDefinitionFactory + implements DynamicBuiltInFunctionDefinitionFactory { + @Override + public String factoryIdentifier() { + return StateModule.IDENTIFIER + ".example_function_factory"; + } + + @Override + public List getBuiltInFunctionDefinitions() { + return List.of(ExampleDynamicTableFunction.FUNCTION_DEFINITION); + } +} diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/ExampleDynamicTableFunction.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/ExampleDynamicTableFunction.java new file mode 100644 index 0000000000000..fb04aad37fbaa --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/ExampleDynamicTableFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table.module; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.functions.BuiltInFunctionDefinition; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.inference.TypeStrategies; +import org.apache.flink.types.Row; + +import static org.apache.flink.table.functions.FunctionKind.TABLE; + +@Internal +@FunctionHint(output = @DataTypeHint("ROW")) +public class ExampleDynamicTableFunction extends TableFunction { + + public static final BuiltInFunctionDefinition FUNCTION_DEFINITION = + BuiltInFunctionDefinition.newBuilder() + .name("example_dynamic_table_function") + .kind(TABLE) + .runtimeClass(ExampleDynamicTableFunction.class.getName()) + .outputTypeStrategy( + TypeStrategies.explicit( + DataTypes.ROW( + DataTypes.FIELD( + "my-column", DataTypes.STRING().notNull())))) + .build(); + + public ExampleDynamicTableFunction(SpecializedFunction.SpecializedContext context) {} + + public void eval() { + Row row = Row.withNames(); + row.setField("my-column", "my-value"); + collect(row); + } +} diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/StateModuleTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/StateModuleTest.java new file mode 100644 index 0000000000000..33d4c1258dfa2 --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/StateModuleTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table.module; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Test; + +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit tests for the savepoint SQL reader. */ +public class StateModuleTest { + @Test + public void testDynamicBuiltinFunctionShouldBeLoaded() throws Exception { + Configuration config = new Configuration(); + config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + tEnv.executeSql("LOAD MODULE state"); + Table table = tEnv.sqlQuery("SELECT * FROM example_dynamic_table_function()"); + List result = tEnv.toDataStream(table).executeAndCollect(100); + + assertThat(result.size()).isEqualTo(1); + Iterator it = result.iterator(); + assertThat(it.next().toString()).isEqualTo("+I[my-value]"); + } + + @Test + public void testMultipleFunctionsWithSameNameShouldThrow() { + assertThatThrownBy( + () -> + StateModule.checkDuplicatedFunctions( + List.of( + ExampleDynamicTableFunction.FUNCTION_DEFINITION, + ExampleDynamicTableFunction.FUNCTION_DEFINITION))) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Duplicate function names found: example_dynamic_table_function"); + } +} diff --git a/flink-libraries/flink-state-processing-api/src/test/resources/META-INF/services/org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory b/flink-libraries/flink-state-processing-api/src/test/resources/META-INF/services/org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory new file mode 100644 index 0000000000000..3456393ac6800 --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/test/resources/META-INF/services/org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.state.table.module.ExampleDynamicBuiltInFunctionDefinitionFactory diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/DynamicBuiltInFunctionDefinitionFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/DynamicBuiltInFunctionDefinitionFactory.java new file mode 100644 index 0000000000000..63295cb2d5118 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/DynamicBuiltInFunctionDefinitionFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.Experimental; + +import java.util.List; + +/** + * {@link BuiltInFunctionDefinition} factory for dynamic function registration. This could be useful + * when functions wanted to be automatically loaded from SQL modules. A good example usage is that + * state processor API SQL module is providing extra table functions from Kafka connector. + */ +@Experimental +public interface DynamicBuiltInFunctionDefinitionFactory { + /** + * Returns the unique identifier of the factory. The suggested pattern is the following: + * [module-name].[factory-name]. Such case modules can load all [module-name] prefixed functions + * which belong to them. + */ + String factoryIdentifier(); + + /** Returns list of {@link BuiltInFunctionDefinition} which can be registered dynamically. */ + List getBuiltInFunctionDefinitions(); +}