Skip to content

[FLINK-35746][Kubernetes-Operator] Add getJobConfiguration and getJobCheckpointConfiguration #1000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@
package org.apache.flink.kubernetes.operator.config;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;

Expand All @@ -41,7 +48,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.Value;
import org.apache.commons.lang3.ObjectUtils;
Expand All @@ -50,6 +59,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -89,6 +99,20 @@ public class FlinkConfigManager {
private final Consumer<Set<String>> namespaceListener;
private volatile ConcurrentHashMap<FlinkVersion, List<String>> relevantFlinkVersionPrefixes;

// Job-state-aware cache for runtime configuration overrides
private final Cache<RuntimeConfigCacheKey, Map<String, String>> runtimeConfigCache;

/** Cache key for runtime configuration overrides. */
@Data
@Builder
@AllArgsConstructor
private static class RuntimeConfigCacheKey {
private final String namespace;
private final String name;
private final String jobId;
private final String updateTime;
}

protected static final Pattern FLINK_VERSION_PATTERN =
Pattern.compile(
VERSION_CONF_PREFIX.replaceAll("\\.", "\\\\\\.")
Expand Down Expand Up @@ -133,6 +157,16 @@ public Configuration load(Key k) {
}
});

// Initialize runtime configuration cache with similar timeout
this.runtimeConfigCache =
CacheBuilder.newBuilder()
.maximumSize(
defaultConfig.get(
KubernetesOperatorConfigOptions
.OPERATOR_CONFIG_CACHE_SIZE))
.expireAfterAccess(cacheTimeout)
.build();

updateDefaultConfig(defaultConfig);
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleWithFixedDelay(
Expand All @@ -141,6 +175,13 @@ public Configuration load(Key k) {
cacheTimeout.toMillis(),
TimeUnit.MILLISECONDS);

// Also clean up runtime config cache
executorService.scheduleWithFixedDelay(
runtimeConfigCache::cleanUp,
cacheTimeout.toMillis(),
cacheTimeout.toMillis(),
TimeUnit.MILLISECONDS);

if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
scheduleConfigWatcher(executorService);
}
Expand Down Expand Up @@ -352,8 +393,193 @@ public Configuration getObserveConfig(FlinkDeployment deployment) {
return conf;
}

/**
* Get the observe configuration enhanced with runtime configuration overrides fetched from the
* job. This method fetches the actual runtime configuration from the job via REST API and
* overrides the deployment spec configuration with the actual values.
*
* @param deployment Deployment resource
* @param flinkService FlinkService to use for fetching runtime configuration
* @return Observe config enhanced with runtime configuration overrides
*/
public Configuration getObserveConfigWithRuntimeOverrides(
FlinkDeployment deployment, FlinkService flinkService) {
// Get the base observe configuration
Configuration conf = getObserveConfig(deployment);

// Apply runtime configuration overrides if job is running
applyRuntimeConfigurationOverrides(deployment, flinkService, conf);

return conf;
}

/**
* Get configuration for interacting with session jobs enhanced with runtime configuration
* overrides. This method fetches the actual runtime configuration from the job via REST API and
* overrides the session job spec configuration with the actual values.
*
* @param sessionJob Session job resource
* @param flinkService FlinkService to use for fetching runtime configuration
* @return Session job config enhanced with runtime configuration overrides
*/
public Configuration getSessionJobConfigWithRuntimeOverrides(
FlinkSessionJob sessionJob, FlinkService flinkService, Configuration conf) {
// Apply runtime configuration overrides if job is running
applyRuntimeConfigurationOverrides(sessionJob, flinkService, conf);

return conf;
}

/**
* Generic method to apply runtime configuration overrides from any Flink resource to the
* provided configuration. This method fetches the actual runtime configuration from the running
* job and overrides the resource spec configuration with the actual values to ensure the
* operator works with the correct configuration values.
*
* <p>Uses a job-state-aware cache to ensure runtime configuration is only fetched once per job
* when it reaches RUNNING state.
*
* @param resource Flink resource (FlinkDeployment or FlinkSessionJob)
* @param flinkService FlinkService to use for fetching runtime configuration
* @param conf Configuration to apply overrides to
*/
private <T extends AbstractFlinkResource<?, ?>> void applyRuntimeConfigurationOverrides(
T resource, FlinkService flinkService, Configuration conf) {
try {
// Check if job is running and has a job ID
var jobStatus = resource.getStatus().getJobStatus();
if (jobStatus == null || jobStatus.getJobId() == null) {
LOG.debug("No job ID available for runtime configuration fetch");
return;
}

JobID jobId = JobID.fromHexString(jobStatus.getJobId());
JobStatus jobState = jobStatus.getState();

// Only fetch runtime config if job is in a running state
if (jobState == null || !jobState.equals(JobStatus.RUNNING)) {
LOG.debug(
"Job {} is not in RUNNING state (current: {}), skipping runtime config fetch",
jobId,
jobState);
return;
}

// Check if JobManager is ready (only for FlinkDeployment)
if (resource instanceof FlinkDeployment) {
FlinkDeployment deployment = (FlinkDeployment) resource;
if (deployment.getStatus().getJobManagerDeploymentStatus()
!= JobManagerDeploymentStatus.READY) {
LOG.debug(
"JobManager is not ready, skipping runtime config fetch for job {}",
jobId);
return;
}
}

// Create cache key based on job state to ensure we only fetch once per job per state
RuntimeConfigCacheKey cacheKey =
RuntimeConfigCacheKey.builder()
.namespace(resource.getMetadata().getNamespace())
.name(resource.getMetadata().getName())
.jobId(jobStatus.getJobId())
.updateTime(jobStatus.getUpdateTime())
.build();

// Check if runtime config is already cached for this job state
Map<String, String> runtimeConfig = runtimeConfigCache.getIfPresent(cacheKey);

if (runtimeConfig == null) {
LOG.debug(
"Runtime configuration not cached for job {}, fetching from REST API",
jobId);

// Fetch both job configuration and checkpoint configuration
runtimeConfig = new HashMap<>();

// 1. Fetch job configuration
try {
Map<String, String> jobConfig = flinkService.getJobConfiguration(conf, jobId);
if (jobConfig != null) {
runtimeConfig.putAll(jobConfig);
LOG.debug(
"Fetched {} job configuration entries for job {}",
jobConfig.size(),
jobId);
LOG.debug("Fetched job configuration: {}", jobConfig);
} else {
LOG.debug("Empty job configuration for job {}", jobId);
}
} catch (Exception e) {
LOG.warn(
"Failed to fetch job configuration for job {} via REST API: {}",
jobId,
e.getMessage());
LOG.debug("Job configuration fetch exception details", e);
}

// 2. Fetch checkpoint configuration and convert to config map
try {
Map<String, String> checkpointConfig =
flinkService.getJobCheckpointConfiguration(conf, jobId);
if (checkpointConfig != null) {
runtimeConfig.putAll(checkpointConfig);
LOG.debug(
"Fetched {} checkpoint configuration entries for job {}",
checkpointConfig.size(),
jobId);
}
} catch (Exception e) {
LOG.warn(
"Failed to fetch checkpoint configuration for job {} via REST API: {}",
jobId,
e.getMessage());
LOG.debug("Checkpoint configuration fetch exception details", e);
}

// Cache the combined runtime configuration to avoid repeated fetches
runtimeConfigCache.put(cacheKey, runtimeConfig);
LOG.debug(
"Cached runtime configuration for job {} (cache key: {})", jobId, cacheKey);
} else {
LOG.debug("Using cached runtime configuration for job {}", jobId);
}

// Apply the fetched runtime configuration to the main configuration object
// This overrides the deployment spec configuration with actual runtime values
if (runtimeConfig != null && !runtimeConfig.isEmpty()) {
LOG.info(
"[VALIDATION] Before override, parallelism.default = {}",
conf.get(CoreOptions.DEFAULT_PARALLELISM));
LOG.info(
"Applying {} runtime configuration overrides for job {}",
runtimeConfig.size(),
jobId);
runtimeConfig.forEach(conf::setString);
LOG.info(
"[VALIDATION] After override, parallelism.default = {}",
conf.get(CoreOptions.DEFAULT_PARALLELISM));
} else {
LOG.debug("No runtime configuration available for job {}", jobId);
}

} catch (Exception e) {
// Don't fail observe config creation if runtime config fetch fails
// Fall back to deployment spec configuration
LOG.warn(
"Failed to fetch runtime configuration for {} {}, "
+ "using deployment spec configuration instead. This may cause "
+ "operator decisions to be based on incorrect configuration values.",
resource.getClass().getSimpleName(),
resource.getMetadata().getName(),
e);
}
}

private void addOperatorConfigsFromSpec(AbstractFlinkSpec spec, Configuration conf) {

// Observe config should include the latest operator related settings

if (spec.getFlinkConfiguration() != null) {
spec.getFlinkConfiguration()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public Configuration getDeployConfig(AbstractFlinkSpec spec) {

@Override
protected Configuration createObserveConfig() {
return configManager.getObserveConfig(getResource());
// Use enhanced observe config that fetches runtime configuration from the job
// and overrides deployment spec configuration with actual runtime values
return configManager.getObserveConfigWithRuntimeOverrides(getResource(), getFlinkService());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ protected FlinkService createFlinkService() {

@Override
protected Configuration createObserveConfig() {
return getDeployConfig(getResource().getSpec());
Configuration conf = getDeployConfig(getResource().getSpec());
return configManager.getSessionJobConfigWithRuntimeOverrides(
getResource(), getFlinkService(), conf);
}

@Override
Expand Down
Loading