Skip to content

Commit fd50425

Browse files
committed
[FLINK-26570][statefun] Remote module configuration interpolation
1 parent 38f5518 commit fd50425

File tree

11 files changed

+290
-29
lines changed

11 files changed

+290
-29
lines changed

docs/content/docs/modules/overview.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,33 @@ spec:
6161
6262
A module YAML file can contain multiple YAML documents, separated by `---`, each representing a component to be included in the application.
6363
Each component is defined by a kind typename string and a spec object containing the component's properties.
64+
65+
# Configuration string interpolation
66+
You can use `${placeholders}` inside `spec` elements. These will be replaced by entries from a configuration map, consisting of:
67+
1. System properties
68+
2. Environment properties
69+
3. Command line arguments
70+
71+
where (3) override (2) which override (1).
72+
73+
Example:
74+
```yaml
75+
kind: io.statefun.endpoints.v2/http
76+
spec:
77+
functions: com.example/*
78+
urlPathTemplate: ${FUNC_PROTOCOL}://${FUNC_DNS}/{function.name}
79+
---
80+
kind: io.statefun.kafka.v1/ingress
81+
spec:
82+
id: com.example/my-ingress
83+
address: ${KAFKA_ADDRESS}:${KAFKA_PORT}
84+
consumerGroupId: my-consumer-group
85+
topics:
86+
- topic: ${KAFKA_INGRESS_TOPIC}
87+
(...)
88+
properties:
89+
- ssl.truststore.location: ${SSL_TRUSTSTORE_LOCATION}
90+
- ssl.truststore.password: ${SSL_TRUSTSTORE_PASSWORD}
91+
(...)
92+
```
93+
Please note that `{function.name}` is not a placeholder to be replaced by entries from the merged configuration. See [url template]({{< ref "docs/modules/http-endpoint/#url-template" >}})

statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public void runWith() throws Throwable {
4848
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
4949
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
5050
.withBuildContextFileFromClasspath("ssl/", "ssl/")
51+
.withModuleGlobalConfiguration("MAX_NUM_BATCH_REQUESTS", "10000")
5152
.dependsOn(remoteFunction);
5253

5354
SmokeRunner.run(parameters, builder);

statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ kind: io.statefun.endpoints.v2/http
1717
spec:
1818
functions: statefun.smoke.e2e/command-interpreter-fn
1919
urlPathTemplate: https://remote-function-host
20-
maxNumBatchRequests: 10000
20+
maxNumBatchRequests: ${MAX_NUM_BATCH_REQUESTS}

statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public void runWith() throws Throwable {
4848
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
4949
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
5050
.withBuildContextFileFromClasspath("certs", "/certs/")
51+
.withModuleGlobalConfiguration("TEST_COMMAND_INTERPRETER_FN", "command-interpreter-fn")
52+
.withModuleGlobalConfiguration("TEST_SERVER_PROTOCOL", "https://")
53+
.withModuleGlobalConfiguration("TEST_REMOTE_FUNCTION_HOST", "remote-function-host")
54+
.withModuleGlobalConfiguration("TEST_NUM_BATCH_REQUESTS", "10000")
55+
.withModuleGlobalConfiguration("TEST_REMOTE_FUNCTION_PORT", "8000")
5156
.dependsOn(remoteFunction);
5257

5358
SmokeRunner.run(parameters, builder);

statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
kind: io.statefun.endpoints.v2/http
1717
spec:
18-
functions: statefun.smoke.e2e/command-interpreter-fn
19-
urlPathTemplate: https://remote-function-host:8000
20-
maxNumBatchRequests: 10000
18+
functions: statefun.smoke.e2e/${TEST_COMMAND_INTERPRETER_FN}
19+
urlPathTemplate: ${TEST_SERVER_PROTOCOL}${TEST_REMOTE_FUNCTION_HOST}:${TEST_REMOTE_FUNCTION_PORT}
20+
maxNumBatchRequests: ${TEST_NUM_BATCH_REQUESTS}
2121
transport:
2222
type: io.statefun.transports.v1/async
2323
trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem

statefun-e2e-tests/statefun-smoke-e2e-js/src/test/java/org/apache/flink/statefun/e2e/smoke/js/SmokeVerificationJsE2E.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public void runWith() throws Throwable {
4747
StatefulFunctionsAppContainers.Builder builder =
4848
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
4949
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
50+
.withModuleGlobalConfiguration("REMOTE_FUNCTION_HOST","remote-function-host")
5051
.dependsOn(remoteFunction);
5152

5253
SmokeRunner.run(parameters, builder);

statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@
1616
kind: io.statefun.endpoints.v2/http
1717
spec:
1818
functions: statefun.smoke.e2e/command-interpreter-fn
19-
urlPathTemplate: http://remote-function-host:8000
19+
urlPathTemplate: http://${REMOTE_FUNCTION_HOST}:8000
2020
maxNumBatchRequests: 10000

statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModule.java

Lines changed: 107 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,27 @@
2020

2121
import static org.apache.flink.statefun.flink.core.spi.ExtensionResolverAccessor.getExtensionResolver;
2222

23-
import java.util.List;
24-
import java.util.Map;
25-
import java.util.Objects;
23+
import java.util.*;
24+
import java.util.function.Function;
25+
import java.util.regex.Matcher;
26+
import java.util.regex.Pattern;
2627
import java.util.stream.Collectors;
28+
import java.util.stream.Stream;
2729
import java.util.stream.StreamSupport;
30+
import org.apache.commons.lang3.NotImplementedException;
31+
import org.apache.flink.api.java.utils.ParameterTool;
2832
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
2934
import org.apache.flink.statefun.extensions.ComponentBinder;
3035
import org.apache.flink.statefun.extensions.ComponentJsonObject;
3136
import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
3237
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3340

3441
public final class RemoteModule implements StatefulFunctionModule {
35-
42+
private static final Logger LOG = LoggerFactory.getLogger(RemoteModule.class);
43+
private static final Pattern PLACEHOLDER_REGEX = Pattern.compile("\\$\\{(.*?)\\}");
3644
private final List<JsonNode> componentNodes;
3745

3846
RemoteModule(List<JsonNode> componentNodes) {
@@ -41,8 +49,16 @@ public final class RemoteModule implements StatefulFunctionModule {
4149

4250
@Override
4351
public void configure(Map<String, String> globalConfiguration, Binder moduleBinder) {
52+
Map<String, String> systemPropsThenEnvVarsThenGlobalConfig =
53+
ParameterTool.fromSystemProperties()
54+
.mergeWith(
55+
ParameterTool.fromMap(System.getenv())
56+
.mergeWith(ParameterTool.fromMap(globalConfiguration)))
57+
.toMap();
4458
parseComponentNodes(componentNodes)
45-
.forEach(component -> bindComponent(component, moduleBinder));
59+
.forEach(
60+
component ->
61+
bindComponent(component, moduleBinder, systemPropsThenEnvVarsThenGlobalConfig));
4662
}
4763

4864
private static List<ComponentJsonObject> parseComponentNodes(
@@ -53,10 +69,95 @@ private static List<ComponentJsonObject> parseComponentNodes(
5369
.collect(Collectors.toList());
5470
}
5571

56-
private static void bindComponent(ComponentJsonObject component, Binder moduleBinder) {
72+
private static void bindComponent(
73+
ComponentJsonObject component, Binder moduleBinder, Map<String, String> configuration) {
74+
component.setSpecJsonNode(
75+
valueResolutionFunction(configuration).apply(component.specJsonNode()));
76+
5777
final ExtensionResolver extensionResolver = getExtensionResolver(moduleBinder);
5878
final ComponentBinder componentBinder =
5979
extensionResolver.resolveExtension(component.binderTypename(), ComponentBinder.class);
6080
componentBinder.bind(component, moduleBinder);
6181
}
82+
83+
private static Function<JsonNode, JsonNode> valueResolutionFunction(Map<String, String> config) {
84+
return value -> {
85+
if (value.isObject()) {
86+
return resolveObject((ObjectNode) value, config);
87+
} else if (value.isArray()) {
88+
return resolveArray((ArrayNode) value, config);
89+
} else if (value.isValueNode()) {
90+
return resolveValueNode((ValueNode) value, config);
91+
}
92+
93+
LOG.warn(
94+
"Unrecognised type (not in: object, array, value). Skipping ${placeholder} resolution for that node.");
95+
return value;
96+
};
97+
}
98+
99+
private static ValueNode resolveValueNode(ValueNode node, Map<String, String> config) {
100+
StringBuffer stringBuffer = new StringBuffer();
101+
Matcher placeholderMatcher = PLACEHOLDER_REGEX.matcher(node.asText());
102+
boolean placeholderReplaced = false;
103+
104+
while (placeholderMatcher.find()) {
105+
if (config.containsKey(placeholderMatcher.group(1))) {
106+
placeholderMatcher.appendReplacement(stringBuffer, config.get(placeholderMatcher.group(1)));
107+
placeholderReplaced = true;
108+
}
109+
}
110+
111+
if (placeholderReplaced) {
112+
placeholderMatcher.appendTail(stringBuffer);
113+
return new TextNode(stringBuffer.toString());
114+
}
115+
116+
return node;
117+
}
118+
119+
private static ObjectNode resolveObject(ObjectNode node, Map<String, String> config) {
120+
return getFieldStream(node)
121+
.map(keyValueResolutionFunction(config))
122+
.reduce(
123+
new ObjectNode(JsonNodeFactory.instance),
124+
(accumulatedObjectNode, resolvedFieldNameValueTuple) -> {
125+
accumulatedObjectNode.put(
126+
resolvedFieldNameValueTuple.getKey(), resolvedFieldNameValueTuple.getValue());
127+
return accumulatedObjectNode;
128+
},
129+
(objectNode1, objectNode2) -> {
130+
throw new NotImplementedException("This reduce is not used with parallel streams");
131+
});
132+
}
133+
134+
private static ArrayNode resolveArray(ArrayNode node, Map<String, String> config) {
135+
return getElementStream(node)
136+
.map(valueResolutionFunction(config))
137+
.reduce(
138+
new ArrayNode(JsonNodeFactory.instance),
139+
(accumulatedArrayNode, resolvedValue) -> {
140+
accumulatedArrayNode.add(resolvedValue);
141+
return accumulatedArrayNode;
142+
},
143+
(arrayNode1, arrayNode2) -> {
144+
throw new NotImplementedException("This reduce is not used with parallel streams");
145+
});
146+
}
147+
148+
private static Function<Map.Entry<String, JsonNode>, AbstractMap.SimpleEntry<String, JsonNode>>
149+
keyValueResolutionFunction(Map<String, String> config) {
150+
return fieldNameValuePair ->
151+
new AbstractMap.SimpleEntry<>(
152+
fieldNameValuePair.getKey(),
153+
valueResolutionFunction(config).apply(fieldNameValuePair.getValue()));
154+
}
155+
156+
private static Stream<Map.Entry<String, JsonNode>> getFieldStream(ObjectNode node) {
157+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.fields(), 0), false);
158+
}
159+
160+
private static Stream<JsonNode> getElementStream(ArrayNode node) {
161+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.elements(), 0), false);
162+
}
62163
}

statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModuleTest.java

Lines changed: 91 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,33 @@
1919
package org.apache.flink.statefun.flink.core.jsonmodule;
2020

2121
import static org.hamcrest.MatcherAssert.assertThat;
22-
import static org.hamcrest.Matchers.hasKey;
23-
import static org.hamcrest.Matchers.not;
24-
import static org.hamcrest.Matchers.notNullValue;
25-
import static org.hamcrest.Matchers.nullValue;
22+
import static org.hamcrest.Matchers.*;
2623

2724
import java.net.URL;
2825
import java.util.HashMap;
2926
import java.util.Map;
27+
import java.util.concurrent.atomic.AtomicInteger;
3028
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
29+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
3130
import org.apache.flink.statefun.extensions.ComponentBinder;
3231
import org.apache.flink.statefun.extensions.ComponentJsonObject;
3332
import org.apache.flink.statefun.extensions.ExtensionModule;
3433
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
3534
import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
3635
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
37-
import org.apache.flink.statefun.sdk.EgressType;
38-
import org.apache.flink.statefun.sdk.FunctionType;
39-
import org.apache.flink.statefun.sdk.IngressType;
40-
import org.apache.flink.statefun.sdk.StatefulFunction;
41-
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
42-
import org.apache.flink.statefun.sdk.TypeName;
43-
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
44-
import org.apache.flink.statefun.sdk.io.EgressSpec;
45-
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
46-
import org.apache.flink.statefun.sdk.io.IngressSpec;
36+
import org.apache.flink.statefun.sdk.*;
37+
import org.apache.flink.statefun.sdk.io.*;
4738
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
4839
import org.junit.Test;
4940

5041
public final class RemoteModuleTest {
42+
private static final String TEST_CONFIG_KEY_1 = "key1";
43+
private static final String TEST_CONFIG_KEY_2 = "key2";
44+
private static final String TEST_CONFIG_VALUE_1 = "foo";
45+
private static final String TEST_CONFIG_VALUE_2 = "bar";
5146

5247
private final String modulePath = "remote-module/module.yaml";
48+
private final String moduleWithPlaceholdersPath = "remote-module/moduleWithPlaceholders.yaml";
5349

5450
@Test
5551
public void exampleUsage() {
@@ -63,13 +59,90 @@ public void testComponents() {
6359
StatefulFunctionModule module = fromPath(modulePath);
6460

6561
StatefulFunctionsUniverse universe = emptyUniverse();
66-
setupUniverse(universe, module, new TestComponentBindersModule());
62+
setupUniverse(universe, module, new TestComponentBindersModule(), new HashMap<>());
6763

6864
assertThat(universe.functions(), hasKey(TestComponentBinder1.TEST_FUNCTION_TYPE));
6965
assertThat(universe.ingress(), hasKey(TestComponentBinder2.TEST_INGRESS.id()));
7066
assertThat(universe.egress(), hasKey(TestComponentBinder3.TEST_EGRESS.id()));
7167
}
7268

69+
@Test
70+
public void configuringComponentsShouldResolvePlaceholders() {
71+
final AtomicInteger counter = new AtomicInteger();
72+
final Map<String, String> configuration = new HashMap<>();
73+
configuration.put(TEST_CONFIG_KEY_1, TEST_CONFIG_VALUE_1);
74+
configuration.put(TEST_CONFIG_KEY_2, TEST_CONFIG_VALUE_2);
75+
76+
final StatefulFunctionModule module = fromPath(moduleWithPlaceholdersPath);
77+
78+
setupUniverse(
79+
new StatefulFunctionsUniverse(
80+
MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null)),
81+
module,
82+
(globalConfigurations, binder) -> {
83+
binder.bindExtension(
84+
TypeName.parseFrom("com.foo.bar/test.component.1"),
85+
(ComponentBinder)
86+
(component, remoteModuleBinder) -> {
87+
assertThat(
88+
component.specJsonNode().get("static").textValue(), is("staticValue"));
89+
assertThat(
90+
component.specJsonNode().get("placeholder").textValue(),
91+
is(TEST_CONFIG_VALUE_1));
92+
counter.incrementAndGet();
93+
});
94+
binder.bindExtension(
95+
TypeName.parseFrom("com.foo.bar/test.component.2"),
96+
(ComponentBinder)
97+
(component, remoteModuleBinder) -> {
98+
assertThat(
99+
component.specJsonNode().get("front").textValue(),
100+
is(String.format("%sbar", TEST_CONFIG_VALUE_1)));
101+
assertThat(
102+
component.specJsonNode().get("back").textValue(),
103+
is(String.format("foo%s", TEST_CONFIG_VALUE_2)));
104+
assertThat(
105+
component.specJsonNode().get("two").textValue(),
106+
is(String.format("%s%s", TEST_CONFIG_VALUE_1, TEST_CONFIG_VALUE_2)));
107+
assertThat(
108+
component.specJsonNode().get("mixed").textValue(),
109+
is(String.format("a%sb%sc", TEST_CONFIG_VALUE_1, TEST_CONFIG_VALUE_2)));
110+
111+
ArrayNode arrayNode = (ArrayNode) component.specJsonNode().get("array");
112+
assertThat(arrayNode.get(0).textValue(), is(TEST_CONFIG_VALUE_1));
113+
assertThat(arrayNode.get(1).textValue(), is("bar"));
114+
assertThat(arrayNode.get(2).intValue(), is(1000));
115+
assertThat(arrayNode.get(3).booleanValue(), is(true));
116+
117+
ArrayNode arrayNodeWithObjects =
118+
(ArrayNode) component.specJsonNode().get("arrayWithObjects");
119+
assertThat(
120+
arrayNodeWithObjects.get(0).get("a").textValue(), is(TEST_CONFIG_VALUE_2));
121+
assertThat(arrayNodeWithObjects.get(1).get("a").textValue(), is("fizz"));
122+
123+
ArrayNode arrayWithNestedObjects =
124+
(ArrayNode) component.specJsonNode().get("arrayWithNestedObjects");
125+
assertThat(
126+
arrayWithNestedObjects.get(0).get("a").get("b").textValue(), is("foo"));
127+
assertThat(
128+
arrayWithNestedObjects.get(0).get("a").get("c").textValue(),
129+
is(TEST_CONFIG_VALUE_1));
130+
counter.incrementAndGet();
131+
});
132+
binder.bindExtension(
133+
TypeName.parseFrom("com.foo.bar/test.component.3"),
134+
(ComponentBinder)
135+
(component, remoteModuleBinder) -> {
136+
assertThat(component.specJsonNode().get("anInt").intValue(), is(1));
137+
assertThat(component.specJsonNode().get("aBool").booleanValue(), is(true));
138+
counter.incrementAndGet();
139+
});
140+
},
141+
configuration);
142+
143+
assertThat(counter.get(), is(3)); // ensure all assertions were run
144+
}
145+
73146
private static StatefulFunctionModule fromPath(String path) {
74147
URL moduleUrl = RemoteModuleTest.class.getClassLoader().getResource(path);
75148
assertThat(moduleUrl, not(nullValue()));
@@ -85,8 +158,9 @@ private static StatefulFunctionsUniverse emptyUniverse() {
85158
private static void setupUniverse(
86159
StatefulFunctionsUniverse universe,
87160
StatefulFunctionModule functionModule,
88-
ExtensionModule extensionModule) {
89-
final Map<String, String> globalConfig = new HashMap<>();
161+
ExtensionModule extensionModule,
162+
Map<String, String> globalConfig) {
163+
90164
extensionModule.configure(globalConfig, universe);
91165
functionModule.configure(globalConfig, universe);
92166
}

0 commit comments

Comments
 (0)