Skip to content

Commit 38f5518

Browse files
committed
[FLINK-26537][statefun] Allow disabling StatefulFunctionsConfigValidator validation for classloader.parent-first-patterns.additional
1 parent f4dc3e9 commit 38f5518

File tree

5 files changed

+51
-22
lines changed

5 files changed

+51
-22
lines changed

docs/content/docs/deployment/configurations.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,5 +75,14 @@ These may be set through your job's ``flink-conf.yaml``.
7575
<td>Integer</td>
7676
<td>The max number of async operations per task before backpressure is applied.</td>
7777
</tr>
78+
<tr>
79+
<td><h5>statefun.embedded</h5></td>
80+
<td style="word-wrap: break-word;">false</td>
81+
<td>Boolean</td>
82+
<td>Set to 'true' if Flink is running this job from an uber jar, rather than using statefun-specific docker images.
83+
This disables the validation of whether 'classloader.parent-first-patterns.additional'
84+
contains 'org.apache.flink.statefun', 'org.apache.kafka' and 'com.google.protobuf' patterns.
85+
It is then up to the creator of the uber jar to ensure that the three dependencies (statefun, kafka and protobuf) don't have version conflicts.</td>
86+
</tr>
7887
</tbody>
7988
</table>

statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,13 @@ public class StatefulFunctionsConfig implements Serializable {
107107
.withDescription(
108108
"The name of the remote module entity to look for. Also supported, file:///...");
109109

110+
public static final ConfigOption<Boolean> EMBEDDED =
111+
ConfigOptions.key("statefun.embedded")
112+
.booleanType()
113+
.defaultValue(false)
114+
.withDescription(
115+
"True if Flink is running this job from an uber jar, rather than using statefun-specific docker images");
116+
110117
/**
111118
* Creates a new {@link StatefulFunctionsConfig} based on the default configurations in the
112119
* current environment set via the {@code flink-conf.yaml}.
@@ -134,7 +141,9 @@ public static StatefulFunctionsConfig fromFlinkConfiguration(Configuration flink
134141

135142
private String remoteModuleName;
136143

137-
private Map<String, String> globalConfigurations = new HashMap<>();
144+
private boolean embedded;
145+
146+
private final Map<String, String> globalConfigurations = new HashMap<>();
138147

139148
/**
140149
* Create a new configuration object based on the values set in flink-conf.
@@ -149,6 +158,7 @@ private StatefulFunctionsConfig(Configuration configuration) {
149158
this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
150159
this.maxAsyncOperationsPerTask = configuration.get(ASYNC_MAX_OPERATIONS_PER_TASK);
151160
this.remoteModuleName = configuration.get(REMOTE_MODULE_NAME);
161+
this.embedded = configuration.getBoolean(EMBEDDED);
152162

153163
for (String key : configuration.keySet()) {
154164
if (key.startsWith(MODULE_CONFIG_PREFIX)) {
@@ -234,6 +244,19 @@ public void setRemoteModuleName(String remoteModuleName) {
234244
this.remoteModuleName = Objects.requireNonNull(remoteModuleName);
235245
}
236246

247+
/** Returns whether the job was launched in embedded mode (see {@linkplain #EMBEDDED}). */
248+
public boolean isEmbedded() {
249+
return embedded;
250+
}
251+
252+
/**
253+
* Sets the embedded mode. If true, disables certain validation steps. See documentation:
254+
* Configurations.
255+
*/
256+
public void setEmbedded(boolean embedded) {
257+
this.embedded = embedded;
258+
}
259+
237260
/**
238261
* Retrieves the universe provider for loading modules.
239262
*

statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,7 @@
1818

1919
package org.apache.flink.statefun.flink.core;
2020

21-
import java.util.Arrays;
22-
import java.util.Collections;
23-
import java.util.HashSet;
24-
import java.util.List;
25-
import java.util.Locale;
26-
import java.util.Set;
21+
import java.util.*;
2722
import org.apache.flink.configuration.ConfigOption;
2823
import org.apache.flink.configuration.ConfigOptions;
2924
import org.apache.flink.configuration.Configuration;
@@ -42,8 +37,10 @@ private StatefulFunctionsConfigValidator() {}
4237

4338
public static final int MAX_CONCURRENT_CHECKPOINTS = 1;
4439

45-
static void validate(Configuration configuration) {
46-
validateParentFirstClassloaderPatterns(configuration);
40+
static void validate(boolean isEmbedded, Configuration configuration) {
41+
if (!isEmbedded) {
42+
validateParentFirstClassloaderPatterns(configuration);
43+
}
4744
validateCustomPayloadSerializerClassName(configuration);
4845
validateNoHeapBackedTimers(configuration);
4946
validateUnalignedCheckpointsDisabled(configuration);
@@ -70,10 +67,9 @@ private static Set<String> parentFirstClassloaderPatterns(Configuration configur
7067
}
7168

7269
private static void validateCustomPayloadSerializerClassName(Configuration configuration) {
73-
74-
MessageFactoryType factoryType =
70+
final MessageFactoryType factoryType =
7571
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
76-
String customPayloadSerializerClassName =
72+
final String customPayloadSerializerClassName =
7773
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
7874

7975
if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {

statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.net.URL;
2121
import java.net.URLClassLoader;
22-
import java.util.Map;
2322
import java.util.Objects;
2423
import java.util.concurrent.atomic.AtomicInteger;
2524
import org.apache.flink.api.java.utils.ParameterTool;
@@ -36,19 +35,21 @@ public class StatefulFunctionsJob {
3635
private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new AtomicInteger();
3736

3837
public static void main(String... args) throws Exception {
39-
ParameterTool parameterTool = ParameterTool.fromArgs(args);
40-
Map<String, String> globalConfigurations = parameterTool.toMap();
41-
38+
ParameterTool argsParameterTool = ParameterTool.fromArgs(args);
4239
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
43-
4440
Configuration flinkConfig = FlinkConfigExtractor.reflectivelyExtractFromEnv(env);
45-
StatefulFunctionsConfigValidator.validate(flinkConfig);
4641

4742
StatefulFunctionsConfig stateFunConfig =
48-
StatefulFunctionsConfig.fromFlinkConfiguration(flinkConfig);
49-
stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
43+
StatefulFunctionsConfig.fromFlinkConfiguration(
44+
ParameterTool.fromMap(flinkConfig.toMap())
45+
.mergeWith(argsParameterTool)
46+
.getConfiguration());
47+
48+
stateFunConfig.addAllGlobalConfigurations(argsParameterTool.toMap());
5049
stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());
5150

51+
StatefulFunctionsConfigValidator.validate(stateFunConfig.isEmbedded(), flinkConfig);
52+
5253
main(env, stateFunConfig);
5354
}
5455

statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void invalidCustomSerializerThrows() {
9191
Configuration configuration = baseConfiguration();
9292
configuration.set(
9393
StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_CUSTOM_PAYLOADS);
94-
StatefulFunctionsConfigValidator.validate(configuration);
94+
StatefulFunctionsConfigValidator.validate(false, configuration);
9595
}
9696

9797
@Test(expected = StatefulFunctionsInvalidConfigException.class)
@@ -101,6 +101,6 @@ public void invalidNonCustomSerializerThrows() {
101101
StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
102102
configuration.set(
103103
StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS, serializerClassName);
104-
StatefulFunctionsConfigValidator.validate(configuration);
104+
StatefulFunctionsConfigValidator.validate(false, configuration);
105105
}
106106
}

0 commit comments

Comments
 (0)