Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/content/docs/modules/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,36 @@ spec:

A module YAML file can contain multiple YAML documents, separated by `---`, each representing a component to be included in the application.
Each component is defined by a kind typename string and a spec object containing the component's properties.

# Configuration string interpolation
You can use `${placeholders}` inside `spec` elements. These will be replaced by entries from a configuration map, consisting of:
1. System properties
2. Environment variables
3. flink-conf.yaml entries with prefix 'statefun.module.global-config.'
4. Command line args

where (4) override (3) which override (2) which override (1).

Example:
```yaml
kind: io.statefun.endpoints.v2/http
spec:
functions: com.example/*
urlPathTemplate: ${FUNC_PROTOCOL}://${FUNC_DNS}/{function.name}
---
kind: io.statefun.kafka.v1/ingress
spec:
id: com.example/my-ingress
address: ${KAFKA_ADDRESS}:${KAFKA_PORT}
consumerGroupId: my-consumer-group
topics:
- topic: ${KAFKA_INGRESS_TOPIC}
(...)
properties:
- ssl.truststore.location: ${SSL_TRUSTSTORE_LOCATION}
- ssl.truststore.password: ${SSL_TRUSTSTORE_PASSWORD}
(...)
```
{{< hint info >}}
Please note that `{function.name}` is not a placeholder to be replaced by entries from the merged configuration. See [url template]({{< ref "docs/modules/http-endpoint" >}})
{{< /hint >}}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void runWith() throws Throwable {
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
.withBuildContextFileFromClasspath("ssl/", "ssl/")
.withModuleGlobalConfiguration("MAX_NUM_BATCH_REQUESTS", "10000")
.dependsOn(remoteFunction);

SmokeRunner.run(parameters, builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ kind: io.statefun.endpoints.v2/http
spec:
functions: statefun.smoke.e2e/command-interpreter-fn
urlPathTemplate: https://remote-function-host
maxNumBatchRequests: 10000
maxNumBatchRequests: ${MAX_NUM_BATCH_REQUESTS}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public void runWith() throws Throwable {
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
.withBuildContextFileFromClasspath("certs", "/certs/")
.withModuleGlobalConfiguration("TEST_COMMAND_INTERPRETER_FN", "command-interpreter-fn")
.withModuleGlobalConfiguration("TEST_SERVER_PROTOCOL", "https://")
.withModuleGlobalConfiguration("TEST_NUM_BATCH_REQUESTS", "10000")
.withModuleGlobalConfiguration("TEST_REMOTE_FUNCTION_PORT", "8000")
// TEST_REMOTE_FUNCTION_HOST placeholder value is taken from docker env variables
.dependsOn(remoteFunction);

SmokeRunner.run(parameters, builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ COPY statefun-smoke-e2e-driver.jar /opt/statefun/modules/statefun-smoke-e2e/
COPY remote-module/ /opt/statefun/modules/statefun-smoke-e2e/
COPY certs/ /opt/statefun/modules/statefun-smoke-e2e/certs/
COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml

ENV TEST_REMOTE_FUNCTION_HOST=remote-function-host
ENV TEST_SERVER_PROTOCOL=WILL-BE-REPLACED-BY-GLOBAL-VARIABLE-FROM-CLI
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

kind: io.statefun.endpoints.v2/http
spec:
functions: statefun.smoke.e2e/command-interpreter-fn
urlPathTemplate: https://remote-function-host:8000
maxNumBatchRequests: 10000
functions: statefun.smoke.e2e/${TEST_COMMAND_INTERPRETER_FN}
urlPathTemplate: ${TEST_SERVER_PROTOCOL}${TEST_REMOTE_FUNCTION_HOST}:${TEST_REMOTE_FUNCTION_PORT}
maxNumBatchRequests: ${TEST_NUM_BATCH_REQUESTS}
transport:
type: io.statefun.transports.v1/async
trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void runWith() throws Throwable {
StatefulFunctionsAppContainers.Builder builder =
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
.withModuleGlobalConfiguration("REMOTE_FUNCTION_HOST", "remote-function-host")
.dependsOn(remoteFunction);

SmokeRunner.run(parameters, builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
kind: io.statefun.endpoints.v2/http
spec:
functions: statefun.smoke.e2e/command-interpreter-fn
urlPathTemplate: http://remote-function-host:8000
urlPathTemplate: http://${REMOTE_FUNCTION_HOST}:8000
maxNumBatchRequests: 10000
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,27 @@

import static org.apache.flink.statefun.flink.core.spi.ExtensionResolverAccessor.getExtensionResolver;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
import org.apache.flink.statefun.extensions.ComponentBinder;
import org.apache.flink.statefun.extensions.ComponentJsonObject;
import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RemoteModule implements StatefulFunctionModule {

private static final Logger LOG = LoggerFactory.getLogger(RemoteModule.class);
private static final Pattern PLACEHOLDER_REGEX = Pattern.compile("\\$\\{(.*?)\\}");
private final List<JsonNode> componentNodes;

RemoteModule(List<JsonNode> componentNodes) {
Expand All @@ -41,8 +49,16 @@ public final class RemoteModule implements StatefulFunctionModule {

@Override
public void configure(Map<String, String> globalConfiguration, Binder moduleBinder) {
Map<String, String> systemPropsThenEnvVarsThenGlobalConfig =
ParameterTool.fromSystemProperties()
.mergeWith(
ParameterTool.fromMap(System.getenv())
.mergeWith(ParameterTool.fromMap(globalConfiguration)))
.toMap();
parseComponentNodes(componentNodes)
.forEach(component -> bindComponent(component, moduleBinder));
.forEach(
component ->
bindComponent(component, moduleBinder, systemPropsThenEnvVarsThenGlobalConfig));
}

private static List<ComponentJsonObject> parseComponentNodes(
Expand All @@ -53,10 +69,102 @@ private static List<ComponentJsonObject> parseComponentNodes(
.collect(Collectors.toList());
}

private static void bindComponent(ComponentJsonObject component, Binder moduleBinder) {
private static void bindComponent(
ComponentJsonObject component, Binder moduleBinder, Map<String, String> configuration) {

JsonNode resolvedSpec = valueResolutionFunction(configuration).apply(component.specJsonNode());
ComponentJsonObject resolvedComponent = new ComponentJsonObject(component.get(), resolvedSpec);

final ExtensionResolver extensionResolver = getExtensionResolver(moduleBinder);
final ComponentBinder componentBinder =
extensionResolver.resolveExtension(component.binderTypename(), ComponentBinder.class);
componentBinder.bind(component, moduleBinder);
extensionResolver.resolveExtension(
resolvedComponent.binderTypename(), ComponentBinder.class);
componentBinder.bind(resolvedComponent, moduleBinder);
}

private static Function<JsonNode, JsonNode> valueResolutionFunction(Map<String, String> config) {
return value -> {
if (value.isObject()) {
return resolveObject((ObjectNode) value, config);
} else if (value.isArray()) {
return resolveArray((ArrayNode) value, config);
} else if (value.isValueNode()) {
return resolveValueNode((ValueNode) value, config);
}

LOG.warn(
"Unrecognised type (not in: object, array, value). Skipping ${placeholder} resolution for that node.");
return value;
};
}

private static Function<Map.Entry<String, JsonNode>, AbstractMap.SimpleEntry<String, JsonNode>>
keyValueResolutionFunction(Map<String, String> config) {
return fieldNameValuePair ->
new AbstractMap.SimpleEntry<>(
fieldNameValuePair.getKey(),
valueResolutionFunction(config).apply(fieldNameValuePair.getValue()));
}

private static ValueNode resolveValueNode(ValueNode node, Map<String, String> config) {
StringBuffer stringBuffer = new StringBuffer();
Matcher placeholderMatcher = PLACEHOLDER_REGEX.matcher(node.asText());
boolean placeholderReplaced = false;

while (placeholderMatcher.find()) {
if (config.containsKey(placeholderMatcher.group(1))) {
placeholderMatcher.appendReplacement(stringBuffer, config.get(placeholderMatcher.group(1)));
placeholderReplaced = true;
} else {
throw new IllegalArgumentException(
String.format(
"Could not resolve placeholder '%s'. An entry for this key was not found in the configuration.",
node.asText()));
}
}

if (placeholderReplaced) {
placeholderMatcher.appendTail(stringBuffer);
return new TextNode(stringBuffer.toString());
}

return node;
}

private static ObjectNode resolveObject(ObjectNode node, Map<String, String> config) {
return getFieldStream(node)
.map(keyValueResolutionFunction(config))
.reduce(
new ObjectNode(JsonNodeFactory.instance),
(accumulatedObjectNode, resolvedFieldNameValueTuple) -> {
accumulatedObjectNode.put(
resolvedFieldNameValueTuple.getKey(), resolvedFieldNameValueTuple.getValue());
return accumulatedObjectNode;
},
(objectNode1, objectNode2) -> {
throw new NotImplementedException("This reduce is not used with parallel streams");
});
}

private static ArrayNode resolveArray(ArrayNode node, Map<String, String> config) {
return getElementStream(node)
.map(valueResolutionFunction(config))
.reduce(
new ArrayNode(JsonNodeFactory.instance),
(accumulatedArrayNode, resolvedValue) -> {
accumulatedArrayNode.add(resolvedValue);
return accumulatedArrayNode;
},
(arrayNode1, arrayNode2) -> {
throw new NotImplementedException("This reduce is not used with parallel streams");
});
}

private static Stream<Map.Entry<String, JsonNode>> getFieldStream(ObjectNode node) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.fields(), 0), false);
}

private static Stream<JsonNode> getElementStream(ArrayNode node) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.elements(), 0), false);
}
}
Loading