Skip to content

Commit 032dd0e

Browse files
committed
[FLINK-38257][SQL] Add module dynamic function loading possibility
1 parent 0eeab73 commit 032dd0e

File tree

7 files changed

+297
-39
lines changed

7 files changed

+297
-39
lines changed

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,15 @@
2424
import org.apache.flink.state.api.runtime.SavepointLoader;
2525
import org.apache.flink.table.annotation.DataTypeHint;
2626
import org.apache.flink.table.annotation.FunctionHint;
27+
import org.apache.flink.table.api.DataTypes;
28+
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
2729
import org.apache.flink.table.functions.SpecializedFunction;
2830
import org.apache.flink.table.functions.TableFunction;
31+
import org.apache.flink.table.types.inference.TypeStrategies;
2932
import org.apache.flink.types.Row;
3033

34+
import static org.apache.flink.table.functions.FunctionKind.TABLE;
35+
3136
@Internal
3237
@FunctionHint(
3338
output =
@@ -41,6 +46,40 @@
4146
+ "operator-coordinator-state-size-in-bytes BIGINT NOT NULL, "
4247
+ "operator-total-size-in-bytes BIGINT NOT NULL>"))
4348
public class SavepointMetadataTableFunction extends TableFunction<Row> {
49+
50+
public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
51+
BuiltInFunctionDefinition.newBuilder()
52+
.name("savepoint_metadata")
53+
.kind(TABLE)
54+
.runtimeClass(SavepointMetadataTableFunction.class.getName())
55+
.outputTypeStrategy(
56+
TypeStrategies.explicit(
57+
DataTypes.ROW(
58+
DataTypes.FIELD(
59+
"checkpoint-id", DataTypes.BIGINT().notNull()),
60+
DataTypes.FIELD("operator-name", DataTypes.STRING()),
61+
DataTypes.FIELD("operator-uid", DataTypes.STRING()),
62+
DataTypes.FIELD(
63+
"operator-uid-hash",
64+
DataTypes.STRING().notNull()),
65+
DataTypes.FIELD(
66+
"operator-parallelism",
67+
DataTypes.INT().notNull()),
68+
DataTypes.FIELD(
69+
"operator-max-parallelism",
70+
DataTypes.INT().notNull()),
71+
DataTypes.FIELD(
72+
"operator-subtask-state-count",
73+
DataTypes.INT().notNull()),
74+
DataTypes.FIELD(
75+
"operator-coordinator-state-size-in-bytes",
76+
DataTypes.BIGINT().notNull()),
77+
DataTypes.FIELD(
78+
"operator-total-size-in-bytes",
79+
DataTypes.BIGINT().notNull()))))
80+
.notDeterministic()
81+
.build();
82+
4483
public SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) {}
4584

4685
public void eval(String savepointPath) {

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java

Lines changed: 42 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,61 +19,34 @@
1919
package org.apache.flink.state.table.module;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.state.table.SavepointMetadataTableFunction;
23-
import org.apache.flink.table.api.DataTypes;
2424
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
25+
import org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory;
2526
import org.apache.flink.table.functions.FunctionDefinition;
2627
import org.apache.flink.table.module.Module;
27-
import org.apache.flink.table.types.inference.TypeStrategies;
2828

29-
import java.util.Collections;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.ArrayList;
33+
import java.util.HashSet;
3034
import java.util.List;
3135
import java.util.Locale;
3236
import java.util.Map;
3337
import java.util.Optional;
38+
import java.util.ServiceLoader;
3439
import java.util.Set;
3540
import java.util.function.Function;
3641
import java.util.stream.Collectors;
3742

38-
import static org.apache.flink.table.functions.FunctionKind.TABLE;
39-
4043
/** Module of state in Flink. */
4144
@Experimental
4245
public class StateModule implements Module {
4346

44-
public static final String IDENTIFIER = "state";
47+
private static final Logger LOG = LoggerFactory.getLogger(StateModule.class);
4548

46-
public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
47-
BuiltInFunctionDefinition.newBuilder()
48-
.name("savepoint_metadata")
49-
.kind(TABLE)
50-
.runtimeClass(SavepointMetadataTableFunction.class.getName())
51-
.outputTypeStrategy(
52-
TypeStrategies.explicit(
53-
DataTypes.ROW(
54-
DataTypes.FIELD(
55-
"checkpoint-id", DataTypes.BIGINT().notNull()),
56-
DataTypes.FIELD("operator-name", DataTypes.STRING()),
57-
DataTypes.FIELD("operator-uid", DataTypes.STRING()),
58-
DataTypes.FIELD(
59-
"operator-uid-hash",
60-
DataTypes.STRING().notNull()),
61-
DataTypes.FIELD(
62-
"operator-parallelism",
63-
DataTypes.INT().notNull()),
64-
DataTypes.FIELD(
65-
"operator-max-parallelism",
66-
DataTypes.INT().notNull()),
67-
DataTypes.FIELD(
68-
"operator-subtask-state-count",
69-
DataTypes.INT().notNull()),
70-
DataTypes.FIELD(
71-
"operator-coordinator-state-size-in-bytes",
72-
DataTypes.BIGINT().notNull()),
73-
DataTypes.FIELD(
74-
"operator-total-size-in-bytes",
75-
DataTypes.BIGINT().notNull()))))
76-
.build();
49+
public static final String IDENTIFIER = "state";
7750

7851
public static final StateModule INSTANCE = new StateModule();
7952

@@ -82,8 +55,19 @@ public class StateModule implements Module {
8255
private final Set<String> functionNamesWithoutInternal;
8356

8457
private StateModule() {
85-
final List<BuiltInFunctionDefinition> definitions =
86-
Collections.singletonList(SAVEPOINT_METADATA);
58+
final List<BuiltInFunctionDefinition> definitions = new ArrayList<>();
59+
60+
definitions.add(SavepointMetadataTableFunction.SAVEPOINT_METADATA);
61+
ServiceLoader.load(DynamicBuiltInFunctionDefinitionFactory.class)
62+
.iterator()
63+
.forEachRemaining(
64+
f -> {
65+
if (f.factoryIdentifier().startsWith(IDENTIFIER + ".")) {
66+
definitions.addAll(f.getBuiltInFunctionDefinitions());
67+
}
68+
});
69+
checkDuplicatedFunctions(definitions);
70+
8771
this.normalizedFunctions =
8872
definitions.stream()
8973
.collect(
@@ -101,6 +85,25 @@ private StateModule() {
10185
.collect(Collectors.toSet());
10286
}
10387

88+
@VisibleForTesting
89+
static void checkDuplicatedFunctions(List<BuiltInFunctionDefinition> definitions) {
90+
Set<String> seen = new HashSet<>();
91+
Set<String> duplicates = new HashSet<>();
92+
93+
for (BuiltInFunctionDefinition definition : definitions) {
94+
String name = definition.getName();
95+
if (!seen.add(name)) {
96+
duplicates.add(name);
97+
}
98+
}
99+
100+
if (!duplicates.isEmpty()) {
101+
String error = "Duplicate function names found: " + String.join(",", duplicates);
102+
LOG.error(error);
103+
throw new IllegalStateException(error);
104+
}
105+
}
106+
104107
@Override
105108
public Set<String> listFunctions() {
106109
return listFunctions(false);
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.table.module;
20+
21+
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
22+
import org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory;
23+
24+
import java.util.List;
25+
26+
public class ExampleDynamicBuiltInFunctionDefinitionFactory
27+
implements DynamicBuiltInFunctionDefinitionFactory {
28+
@Override
29+
public String factoryIdentifier() {
30+
return StateModule.IDENTIFIER + ".example_function_factory";
31+
}
32+
33+
@Override
34+
public List<BuiltInFunctionDefinition> getBuiltInFunctionDefinitions() {
35+
return List.of(ExampleDynamicTableFunction.FUNCTION_DEFINITION);
36+
}
37+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.table.module;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.annotation.DataTypeHint;
23+
import org.apache.flink.table.annotation.FunctionHint;
24+
import org.apache.flink.table.api.DataTypes;
25+
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
26+
import org.apache.flink.table.functions.SpecializedFunction;
27+
import org.apache.flink.table.functions.TableFunction;
28+
import org.apache.flink.table.types.inference.TypeStrategies;
29+
import org.apache.flink.types.Row;
30+
31+
import static org.apache.flink.table.functions.FunctionKind.TABLE;
32+
33+
@Internal
34+
@FunctionHint(output = @DataTypeHint("ROW<my-column STRING NOT NULL>"))
35+
public class ExampleDynamicTableFunction extends TableFunction<Row> {
36+
37+
public static final BuiltInFunctionDefinition FUNCTION_DEFINITION =
38+
BuiltInFunctionDefinition.newBuilder()
39+
.name("example_dynamic_table_function")
40+
.kind(TABLE)
41+
.runtimeClass(ExampleDynamicTableFunction.class.getName())
42+
.outputTypeStrategy(
43+
TypeStrategies.explicit(
44+
DataTypes.ROW(
45+
DataTypes.FIELD(
46+
"my-column", DataTypes.STRING().notNull()))))
47+
.build();
48+
49+
public ExampleDynamicTableFunction(SpecializedFunction.SpecializedContext context) {}
50+
51+
public void eval() {
52+
Row row = Row.withNames();
53+
row.setField("my-column", "my-value");
54+
collect(row);
55+
}
56+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.table.module;
20+
21+
import org.apache.flink.api.common.RuntimeExecutionMode;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24+
import org.apache.flink.table.api.Table;
25+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
26+
import org.apache.flink.types.Row;
27+
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.Iterator;
31+
import java.util.List;
32+
33+
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
34+
import static org.assertj.core.api.Assertions.assertThat;
35+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
36+
37+
/** Unit tests for the savepoint SQL reader. */
38+
public class StateModuleTest {
39+
@Test
40+
public void testDynamicBuiltinFunctionShouldBeLoaded() throws Exception {
41+
Configuration config = new Configuration();
42+
config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
43+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
44+
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
45+
46+
tEnv.executeSql("LOAD MODULE state");
47+
Table table = tEnv.sqlQuery("SELECT * FROM example_dynamic_table_function()");
48+
List<Row> result = tEnv.toDataStream(table).executeAndCollect(100);
49+
50+
assertThat(result.size()).isEqualTo(1);
51+
Iterator<Row> it = result.iterator();
52+
assertThat(it.next().toString()).isEqualTo("+I[my-value]");
53+
}
54+
55+
@Test
56+
public void testMultipleFunctionsWithSameNameShouldThrow() {
57+
assertThatThrownBy(
58+
() ->
59+
StateModule.checkDuplicatedFunctions(
60+
List.of(
61+
ExampleDynamicTableFunction.FUNCTION_DEFINITION,
62+
ExampleDynamicTableFunction.FUNCTION_DEFINITION)))
63+
.isInstanceOf(IllegalStateException.class)
64+
.hasMessage("Duplicate function names found: example_dynamic_table_function");
65+
}
66+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
org.apache.flink.state.table.module.ExampleDynamicBuiltInFunctionDefinitionFactory
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.functions;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
import java.util.List;
24+
25+
/**
26+
* {@link BuiltInFunctionDefinition} factory for dynamic function registration. This could be useful
27+
* when functions wanted to be automatically loaded from SQL modules. A good example usage is that
28+
* state processor API SQL module is providing extra table functions from Kafka connector.
29+
*/
30+
@Experimental
31+
public interface DynamicBuiltInFunctionDefinitionFactory {
32+
/**
33+
* Returns the unique identifier of the factory. The suggested pattern is the following:
34+
* [module-name].[factory-name]. Such case modules can load all [module-name] prefixed functions
35+
* which belong to them.
36+
*/
37+
String factoryIdentifier();
38+
39+
/** Returns list of {@link BuiltInFunctionDefinition} which can be registered dynamically. */
40+
List<BuiltInFunctionDefinition> getBuiltInFunctionDefinitions();
41+
}

0 commit comments

Comments
 (0)