Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.SpecializedFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.types.Row;

import static org.apache.flink.table.functions.FunctionKind.TABLE;

@Internal
@FunctionHint(
output =
Expand All @@ -41,6 +46,40 @@
+ "operator-coordinator-state-size-in-bytes BIGINT NOT NULL, "
+ "operator-total-size-in-bytes BIGINT NOT NULL>"))
public class SavepointMetadataTableFunction extends TableFunction<Row> {

public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice to keep the definition near the code. do you think we should also mark that the function should not be pre-calculated by the optimizer? i mean to make it clear that flink must always run it at query time, not try to be clever and “pre-compute” it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not yet clear🤷🏻‍♂️

Copy link
Contributor

@zch93 zch93 Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe i misunderstood what happens here with savepoint_metadata, but flink's planner can constant-fold and cache function results at plan time if it believes a function is deterministic. that is correct for pure math/string functions, but could be problematic for something like this savepoint_metadata, which read external, changing state (fs/checkpoint files). and between planning and exec, or between two exec, the metadata could change. so i just thought whether we should tell the planner this function is non-deterministic, so it will not pre-evaluate or cache it, and will always run it at exec time. idk whether the BuiltInFunctionDefinition.newBuilder() can supports to call .deterministic(false), or use and override the UDF isDeterministic()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair point. I'm not fan of fixing something which is unrelated but this is so tiny that adding notDeterministic() is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it worth the discussion separate whether the planner caches it with input params or not. I would guess yes, otherwise it just doesn't make sense from usage point of view. If the params are stored together with the result then adding notDeterministic() doesn't hurt but also not helping.

BuiltInFunctionDefinition.newBuilder()
.name("savepoint_metadata")
.kind(TABLE)
.runtimeClass(SavepointMetadataTableFunction.class.getName())
.outputTypeStrategy(
TypeStrategies.explicit(
DataTypes.ROW(
DataTypes.FIELD(
"checkpoint-id", DataTypes.BIGINT().notNull()),
DataTypes.FIELD("operator-name", DataTypes.STRING()),
DataTypes.FIELD("operator-uid", DataTypes.STRING()),
DataTypes.FIELD(
"operator-uid-hash",
DataTypes.STRING().notNull()),
DataTypes.FIELD(
"operator-parallelism",
DataTypes.INT().notNull()),
DataTypes.FIELD(
"operator-max-parallelism",
DataTypes.INT().notNull()),
DataTypes.FIELD(
"operator-subtask-state-count",
DataTypes.INT().notNull()),
DataTypes.FIELD(
"operator-coordinator-state-size-in-bytes",
DataTypes.BIGINT().notNull()),
DataTypes.FIELD(
"operator-total-size-in-bytes",
DataTypes.BIGINT().notNull()))))
.notDeterministic()
.build();

public SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) {}

public void eval(String savepointPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,61 +19,34 @@
package org.apache.flink.state.table.module;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.state.table.SavepointMetadataTableFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.types.inference.TypeStrategies;

import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.table.functions.FunctionKind.TABLE;

/** Module of state in Flink. */
@Experimental
public class StateModule implements Module {

public static final String IDENTIFIER = "state";
private static final Logger LOG = LoggerFactory.getLogger(StateModule.class);

public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
BuiltInFunctionDefinition.newBuilder()
.name("savepoint_metadata")
.kind(TABLE)
.runtimeClass(SavepointMetadataTableFunction.class.getName())
.outputTypeStrategy(
TypeStrategies.explicit(
DataTypes.ROW(
DataTypes.FIELD(
"checkpoint-id", DataTypes.BIGINT().notNull()),
DataTypes.FIELD("operator-name", DataTypes.STRING()),
DataTypes.FIELD("operator-uid", DataTypes.STRING()),
DataTypes.FIELD(
"operator-uid-hash",
DataTypes.STRING().notNull()),
DataTypes.FIELD(
"operator-parallelism",
DataTypes.INT().notNull()),
DataTypes.FIELD(
"operator-max-parallelism",
DataTypes.INT().notNull()),
DataTypes.FIELD(
"operator-subtask-state-count",
DataTypes.INT().notNull()),
DataTypes.FIELD(
"operator-coordinator-state-size-in-bytes",
DataTypes.BIGINT().notNull()),
DataTypes.FIELD(
"operator-total-size-in-bytes",
DataTypes.BIGINT().notNull()))))
.build();
public static final String IDENTIFIER = "state";

public static final StateModule INSTANCE = new StateModule();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is not part of this pr, but it seems like the constructor does classpath scanning, and that is fine if the module is loaded once, but worth a note: if someone repeatedly constructs modules in tests, scanning happens each time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think testing the module as it's going to behave in PROD is something which is desired even if it requires some time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, totally fair
my (out-of-scope) nit was only about to maybe add a short javadoc on StateModule noting that the constructor performs ServiceLoader discovery, so repeated instantiation re-scans the classpath, and maybe in some tests, future authors can prefer StateModule.INSTANCE over new StateModule() to avoid unintentional repeated scans


Expand All @@ -82,8 +55,19 @@ public class StateModule implements Module {
private final Set<String> functionNamesWithoutInternal;

private StateModule() {
final List<BuiltInFunctionDefinition> definitions =
Collections.singletonList(SAVEPOINT_METADATA);
final List<BuiltInFunctionDefinition> definitions = new ArrayList<>();

definitions.add(SavepointMetadataTableFunction.SAVEPOINT_METADATA);
ServiceLoader.load(DynamicBuiltInFunctionDefinitionFactory.class)
.iterator()
.forEachRemaining(
f -> {
if (f.factoryIdentifier().startsWith(IDENTIFIER + ".")) {
definitions.addAll(f.getBuiltInFunctionDefinitions());
}
});
checkDuplicatedFunctions(definitions);

this.normalizedFunctions =
definitions.stream()
.collect(
Expand All @@ -101,6 +85,25 @@ private StateModule() {
.collect(Collectors.toSet());
}

@VisibleForTesting
static void checkDuplicatedFunctions(List<BuiltInFunctionDefinition> definitions) {
Set<String> seen = new HashSet<>();
Set<String> duplicates = new HashSet<>();

for (BuiltInFunctionDefinition definition : definitions) {
String name = definition.getName();
if (!seen.add(name)) {
duplicates.add(name);
}
}

if (!duplicates.isEmpty()) {
String error = "Duplicate function names found: " + String.join(",", duplicates);
LOG.error(error);
throw new IllegalStateException(error);
}
}

@Override
public Set<String> listFunctions() {
return listFunctions(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.table.module;

import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory;

import java.util.List;

public class ExampleDynamicBuiltInFunctionDefinitionFactory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

api looks minimal and clear.
nit: maybe we can standardize to . and document that function names returned must be unique within the module (and what happens otherwise)
nit: maybe we can add an optional ModuleContext param (classloader, config). not needed now, but helps if a provider later needs context without breaking the spi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is now fully experimental and later subject to change. If the time comes and ModuleContext or something else is required then we're going to add it. Finding out later needs is just not working in general 🙂
Related the names there is already a builtin function name suggestion which already given. Of course this is not documented because it's not public API and I would stick to that since it exists from ages.

Copy link
Contributor

@zch93 zch93 Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thanks for clarifying
on the “built-in function name suggestion”: understood, it is internal and long-standing, so i am fine relying on it. if feasible, a short internal comment pointing to where that suggestion lives would help new contributors

bottom line: +1 to keep the api minimal as proposed

implements DynamicBuiltInFunctionDefinitionFactory {
@Override
public String factoryIdentifier() {
return StateModule.IDENTIFIER + ".example_function_factory";
}

@Override
public List<BuiltInFunctionDefinition> getBuiltInFunctionDefinitions() {
return List.of(ExampleDynamicTableFunction.FUNCTION_DEFINITION);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.table.module;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.SpecializedFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.types.Row;

import static org.apache.flink.table.functions.FunctionKind.TABLE;

@Internal
@FunctionHint(output = @DataTypeHint("ROW<my-column STRING NOT NULL>"))
public class ExampleDynamicTableFunction extends TableFunction<Row> {

public static final BuiltInFunctionDefinition FUNCTION_DEFINITION =
BuiltInFunctionDefinition.newBuilder()
.name("example_dynamic_table_function")
.kind(TABLE)
.runtimeClass(ExampleDynamicTableFunction.class.getName())
.outputTypeStrategy(
TypeStrategies.explicit(
DataTypes.ROW(
DataTypes.FIELD(
"my-column", DataTypes.STRING().notNull()))))
.build();

public ExampleDynamicTableFunction(SpecializedFunction.SpecializedContext context) {}

public void eval() {
Row row = Row.withNames();
row.setField("my-column", "my-value");
collect(row);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.table.module;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import org.junit.jupiter.api.Test;

import java.util.Iterator;
import java.util.List;

import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Unit tests for the savepoint SQL reader. */
public class StateModuleTest {
@Test
public void testDynamicBuiltinFunctionShouldBeLoaded() throws Exception {
Configuration config = new Configuration();
config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.executeSql("LOAD MODULE state");
Table table = tEnv.sqlQuery("SELECT * FROM example_dynamic_table_function()");
List<Row> result = tEnv.toDataStream(table).executeAndCollect(100);

assertThat(result.size()).isEqualTo(1);
Iterator<Row> it = result.iterator();
assertThat(it.next().toString()).isEqualTo("+I[my-value]");
}

@Test
public void testMultipleFunctionsWithSameNameShouldThrow() {
assertThatThrownBy(
() ->
StateModule.checkDuplicatedFunctions(
List.of(
ExampleDynamicTableFunction.FUNCTION_DEFINITION,
ExampleDynamicTableFunction.FUNCTION_DEFINITION)))
.isInstanceOf(IllegalStateException.class)
.hasMessage("Duplicate function names found: example_dynamic_table_function");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.state.table.module.ExampleDynamicBuiltInFunctionDefinitionFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions;

import org.apache.flink.annotation.Experimental;

import java.util.List;

/**
* {@link BuiltInFunctionDefinition} factory for dynamic function registration. This could be useful
* when functions wanted to be automatically loaded from SQL modules. A good example usage is that
* state processor API SQL module is providing extra table functions from Kafka connector.
*/
@Experimental
public interface DynamicBuiltInFunctionDefinitionFactory {
/**
* Returns the unique identifier of the factory. The suggested pattern is the following:
* [module-name].[factory-name]. Such case modules can load all [module-name] prefixed functions
* which belong to them.
*/
String factoryIdentifier();

/** Returns list of {@link BuiltInFunctionDefinition} which can be registered dynamically. */
List<BuiltInFunctionDefinition> getBuiltInFunctionDefinitions();
}