diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 8a2fd2651..7ccf1b69a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -19,17 +19,24 @@ package org.apache.flink.kubernetes.operator.config; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.EnvUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; @@ -41,7 +48,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.fabric8.kubernetes.api.model.ObjectMeta; +import lombok.AllArgsConstructor; import lombok.Builder; +import lombok.Data; import lombok.SneakyThrows; import lombok.Value; import org.apache.commons.lang3.ObjectUtils; @@ -50,6 +59,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -89,6 +99,20 @@ public class FlinkConfigManager { private final Consumer> namespaceListener; private volatile ConcurrentHashMap> relevantFlinkVersionPrefixes; + // Job-state-aware cache for runtime configuration overrides + private final Cache> runtimeConfigCache; + + /** Cache key for runtime configuration overrides. */ + @Data + @Builder + @AllArgsConstructor + private static class RuntimeConfigCacheKey { + private final String namespace; + private final String name; + private final String jobId; + private final String updateTime; + } + protected static final Pattern FLINK_VERSION_PATTERN = Pattern.compile( VERSION_CONF_PREFIX.replaceAll("\\.", "\\\\\\.") @@ -133,6 +157,16 @@ public Configuration load(Key k) { } }); + // Initialize runtime configuration cache with similar timeout + this.runtimeConfigCache = + CacheBuilder.newBuilder() + .maximumSize( + defaultConfig.get( + KubernetesOperatorConfigOptions + .OPERATOR_CONFIG_CACHE_SIZE)) + .expireAfterAccess(cacheTimeout) + .build(); + updateDefaultConfig(defaultConfig); ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleWithFixedDelay( @@ -141,6 +175,13 @@ public Configuration load(Key k) { cacheTimeout.toMillis(), TimeUnit.MILLISECONDS); + // Also clean up runtime config cache + executorService.scheduleWithFixedDelay( + runtimeConfigCache::cleanUp, + cacheTimeout.toMillis(), + cacheTimeout.toMillis(), + TimeUnit.MILLISECONDS); + if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) { scheduleConfigWatcher(executorService); } @@ -352,8 +393,193 @@ public Configuration getObserveConfig(FlinkDeployment deployment) { return conf; } + /** + * Get the observe configuration enhanced with runtime configuration overrides fetched from the + * job. This method fetches the actual runtime configuration from the job via REST API and + * overrides the deployment spec configuration with the actual values. + * + * @param deployment Deployment resource + * @param flinkService FlinkService to use for fetching runtime configuration + * @return Observe config enhanced with runtime configuration overrides + */ + public Configuration getObserveConfigWithRuntimeOverrides( + FlinkDeployment deployment, FlinkService flinkService) { + // Get the base observe configuration + Configuration conf = getObserveConfig(deployment); + + // Apply runtime configuration overrides if job is running + applyRuntimeConfigurationOverrides(deployment, flinkService, conf); + + return conf; + } + + /** + * Get configuration for interacting with session jobs enhanced with runtime configuration + * overrides. This method fetches the actual runtime configuration from the job via REST API and + * overrides the session job spec configuration with the actual values. + * + * @param sessionJob Session job resource + * @param flinkService FlinkService to use for fetching runtime configuration + * @return Session job config enhanced with runtime configuration overrides + */ + public Configuration getSessionJobConfigWithRuntimeOverrides( + FlinkSessionJob sessionJob, FlinkService flinkService, Configuration conf) { + // Apply runtime configuration overrides if job is running + applyRuntimeConfigurationOverrides(sessionJob, flinkService, conf); + + return conf; + } + + /** + * Generic method to apply runtime configuration overrides from any Flink resource to the + * provided configuration. This method fetches the actual runtime configuration from the running + * job and overrides the resource spec configuration with the actual values to ensure the + * operator works with the correct configuration values. + * + *

Uses a job-state-aware cache to ensure runtime configuration is only fetched once per job + * when it reaches RUNNING state. + * + * @param resource Flink resource (FlinkDeployment or FlinkSessionJob) + * @param flinkService FlinkService to use for fetching runtime configuration + * @param conf Configuration to apply overrides to + */ + private > void applyRuntimeConfigurationOverrides( + T resource, FlinkService flinkService, Configuration conf) { + try { + // Check if job is running and has a job ID + var jobStatus = resource.getStatus().getJobStatus(); + if (jobStatus == null || jobStatus.getJobId() == null) { + LOG.debug("No job ID available for runtime configuration fetch"); + return; + } + + JobID jobId = JobID.fromHexString(jobStatus.getJobId()); + JobStatus jobState = jobStatus.getState(); + + // Only fetch runtime config if job is in a running state + if (jobState == null || !jobState.equals(JobStatus.RUNNING)) { + LOG.debug( + "Job {} is not in RUNNING state (current: {}), skipping runtime config fetch", + jobId, + jobState); + return; + } + + // Check if JobManager is ready (only for FlinkDeployment) + if (resource instanceof FlinkDeployment) { + FlinkDeployment deployment = (FlinkDeployment) resource; + if (deployment.getStatus().getJobManagerDeploymentStatus() + != JobManagerDeploymentStatus.READY) { + LOG.debug( + "JobManager is not ready, skipping runtime config fetch for job {}", + jobId); + return; + } + } + + // Create cache key based on job state to ensure we only fetch once per job per state + RuntimeConfigCacheKey cacheKey = + RuntimeConfigCacheKey.builder() + .namespace(resource.getMetadata().getNamespace()) + .name(resource.getMetadata().getName()) + .jobId(jobStatus.getJobId()) + .updateTime(jobStatus.getUpdateTime()) + .build(); + + // Check if runtime config is already cached for this job state + Map runtimeConfig = runtimeConfigCache.getIfPresent(cacheKey); + + if (runtimeConfig == null) { + LOG.debug( + "Runtime configuration not cached for job {}, fetching from REST API", + jobId); + + // Fetch both job configuration and checkpoint configuration + runtimeConfig = new HashMap<>(); + + // 1. Fetch job configuration + try { + Map jobConfig = flinkService.getJobConfiguration(conf, jobId); + if (jobConfig != null) { + runtimeConfig.putAll(jobConfig); + LOG.debug( + "Fetched {} job configuration entries for job {}", + jobConfig.size(), + jobId); + LOG.debug("Fetched job configuration: {}", jobConfig); + } else { + LOG.debug("Empty job configuration for job {}", jobId); + } + } catch (Exception e) { + LOG.warn( + "Failed to fetch job configuration for job {} via REST API: {}", + jobId, + e.getMessage()); + LOG.debug("Job configuration fetch exception details", e); + } + + // 2. Fetch checkpoint configuration and convert to config map + try { + Map checkpointConfig = + flinkService.getJobCheckpointConfiguration(conf, jobId); + if (checkpointConfig != null) { + runtimeConfig.putAll(checkpointConfig); + LOG.debug( + "Fetched {} checkpoint configuration entries for job {}", + checkpointConfig.size(), + jobId); + } + } catch (Exception e) { + LOG.warn( + "Failed to fetch checkpoint configuration for job {} via REST API: {}", + jobId, + e.getMessage()); + LOG.debug("Checkpoint configuration fetch exception details", e); + } + + // Cache the combined runtime configuration to avoid repeated fetches + runtimeConfigCache.put(cacheKey, runtimeConfig); + LOG.debug( + "Cached runtime configuration for job {} (cache key: {})", jobId, cacheKey); + } else { + LOG.debug("Using cached runtime configuration for job {}", jobId); + } + + // Apply the fetched runtime configuration to the main configuration object + // This overrides the deployment spec configuration with actual runtime values + if (runtimeConfig != null && !runtimeConfig.isEmpty()) { + LOG.info( + "[VALIDATION] Before override, parallelism.default = {}", + conf.get(CoreOptions.DEFAULT_PARALLELISM)); + LOG.info( + "Applying {} runtime configuration overrides for job {}", + runtimeConfig.size(), + jobId); + runtimeConfig.forEach(conf::setString); + LOG.info( + "[VALIDATION] After override, parallelism.default = {}", + conf.get(CoreOptions.DEFAULT_PARALLELISM)); + } else { + LOG.debug("No runtime configuration available for job {}", jobId); + } + + } catch (Exception e) { + // Don't fail observe config creation if runtime config fetch fails + // Fall back to deployment spec configuration + LOG.warn( + "Failed to fetch runtime configuration for {} {}, " + + "using deployment spec configuration instead. This may cause " + + "operator decisions to be based on incorrect configuration values.", + resource.getClass().getSimpleName(), + resource.getMetadata().getName(), + e); + } + } + private void addOperatorConfigsFromSpec(AbstractFlinkSpec spec, Configuration conf) { + // Observe config should include the latest operator related settings + if (spec.getFlinkConfiguration() != null) { spec.getFlinkConfiguration() .forEach( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java index 294421783..2cc1bd569 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java @@ -59,7 +59,9 @@ public Configuration getDeployConfig(AbstractFlinkSpec spec) { @Override protected Configuration createObserveConfig() { - return configManager.getObserveConfig(getResource()); + // Use enhanced observe config that fetches runtime configuration from the job + // and overrides deployment spec configuration with actual runtime values + return configManager.getObserveConfigWithRuntimeOverrides(getResource(), getFlinkService()); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java index f0256b31f..506224f49 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java @@ -77,7 +77,9 @@ protected FlinkService createFlinkService() { @Override protected Configuration createObserveConfig() { - return getDeployConfig(getResource().getSpec()); + Configuration conf = getDeployConfig(getResource().getSpec()); + return configManager.getSessionJobConfigWithRuntimeOverrides( + getResource(), getFlinkService(), conf); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 211f2e9dc..d79964ebb 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -66,10 +66,14 @@ import org.apache.flink.runtime.rest.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobConfigHeaders; +import org.apache.flink.runtime.rest.messages.JobConfigInfo; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders; @@ -145,6 +149,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -170,6 +175,14 @@ public abstract class AbstractFlinkService implements FlinkService { public static final String FIELD_NAME_TOTAL_CPU = "total-cpu"; public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory"; public static final String FIELD_NAME_STATE_SIZE = "state-size"; + private static final com.fasterxml.jackson.databind.ObjectMapper objectMapper = + new com.fasterxml.jackson.databind.ObjectMapper(); + + static { + objectMapper.setVisibility( + com.fasterxml.jackson.annotation.PropertyAccessor.FIELD, + com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.ANY); + } protected final KubernetesClient kubernetesClient; protected final ExecutorService executorService; @@ -567,7 +580,7 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { .getExternalPointer() .equals(NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER)) { throw new UpgradeFailureException( - "Latest checkpoint not externally addressable, manual recovery required.", + "Latest checkpoint not externally addressable, Manual restore required.", "CheckpointNotFound"); } return latestCheckpointOpt.map( @@ -870,6 +883,297 @@ public JobExceptionsInfoWithHistory getJobExceptions( } } + @Override + public Map getJobConfiguration(Configuration conf, JobID jobId) + throws Exception { + LOG.debug("Fetching job configuration for job {}", jobId); + try (var clusterClient = getClusterClient(conf)) { + Map jobConfig = new HashMap<>(); + + // Use JobConfigHeaders to get job configuration directly + var jobConfigHeaders = JobConfigHeaders.getInstance(); + var parameters = new JobMessageParameters(); + parameters.jobPathParameter.resolve(jobId); + + try { + JobConfigInfo configurationInfo = + clusterClient + .sendRequest( + jobConfigHeaders, + parameters, + EmptyRequestBody.getInstance()) + .get( + operatorConfig.getFlinkClientTimeout().toSeconds(), + TimeUnit.SECONDS); + LOG.debug("Job Config Info: {}", configurationInfo); + + // Extract configuration from the response + if (configurationInfo != null) { + LOG.debug("Job Configuration Info: {}", configurationInfo); + if (configurationInfo.getExecutionConfigInfo() != null) { + jobConfig.put( + "parallelism.default", + String.valueOf( + configurationInfo + .getExecutionConfigInfo() + .getParallelism())); + // Correct this as restart-strategy is incorect + // if (!Objects.equals( + // + // configurationInfo.getExecutionConfigInfo().getRestartStrategy(), + // "Cluster level default restart strategy")) + // { + // jobConfig.put( + // "restart-strategy", + // configurationInfo + // .getExecutionConfigInfo() + // .getRestartStrategy()); + // } + jobConfig.put( + "pipeline.object-reuse", + String.valueOf( + configurationInfo + .getExecutionConfigInfo() + .isObjectReuse())); + jobConfig.putAll( + configurationInfo + .getExecutionConfigInfo() + .getGlobalJobParameters()); + } + + LOG.debug("Fetched {} job configuration entries", jobConfig.size()); + } else { + LOG.warn("Job configuration is null for job {}", jobId); + } + + } catch (Exception e) { + LOG.warn( + "Failed to fetch job configuration for job {} via REST API: {}", + jobId, + e.getMessage()); + LOG.debug("Job configuration fetch exception details", e); + } + + LOG.debug( + "Fetched {} configuration entries for job {}: {}", + jobConfig.size(), + jobId, + jobConfig); + return jobConfig; + } + } + + // Constants for checkpoint configuration field mappings + private static final Map CHECKPOINT_FIELD_MAPPINGS = + Map.of( + "processingMode", "execution.checkpointing.mode", + "checkpointInterval", "execution.checkpointing.interval", + "checkpointTimeout", "execution.checkpointing.timeout", + "minPauseBetweenCheckpoints", "execution.checkpointing.min-pause", + "maxConcurrentCheckpoints", + "execution.checkpointing.max-concurrent-checkpoints", + "tolerableFailedCheckpoints", + "execution.checkpointing.tolerable-failed-checkpoints", + "unalignedCheckpoints", "execution.checkpointing.unaligned.enabled", + "alignedCheckpointTimeout", + "execution.checkpointing.aligned-checkpoint-timeout", + "checkpointsWithFinishedTasks", + "execution.checkpointing.checkpoints-after-tasks-finish", + "stateChangelog", "state.changelog.enabled"); + + private static final Set DURATION_FIELDS = + Set.of( + "checkpointInterval", + "checkpointTimeout", + "minPauseBetweenCheckpoints", + "alignedCheckpointTimeout", + "periodicMaterializationInterval"); + + @Override + public Map getJobCheckpointConfiguration(Configuration conf, JobID jobId) + throws Exception { + LOG.debug("Fetching checkpoint configuration for job {}", jobId); + try (var clusterClient = getClusterClient(conf)) { + var checkpointConfigHeaders = CheckpointConfigHeaders.getInstance(); + var parameters = new JobMessageParameters(); + parameters.jobPathParameter.resolve(jobId); + + try { + var checkpointConfigInfo = + clusterClient + .sendRequest( + checkpointConfigHeaders, + parameters, + EmptyRequestBody.getInstance()) + .get( + operatorConfig.getFlinkClientTimeout().toSeconds(), + TimeUnit.SECONDS); + + // Convert response to handle nested objects + Map rawResponse = + objectMapper.convertValue( + checkpointConfigInfo, + new com.fasterxml.jackson.core.type.TypeReference< + Map>() {}); + + LOG.debug( + "Raw checkpoint configuration response for job {}: {}", jobId, rawResponse); + + return mapCheckpointConfiguration(rawResponse, jobId); + + } catch (Exception e) { + LOG.warn( + "Failed to fetch checkpoint configuration for job {} via REST API, falling back to job config logic.", + jobId, + e); + + if (e.getCause() instanceof RestClientException) { + RestClientException restException = (RestClientException) e.getCause(); + if (restException.getHttpResponseStatus() == HttpResponseStatus.NOT_FOUND) { + throw new RuntimeException("Job not found: " + jobId, e); + } + } + + // Fallback for pre-Flink 1.15 versions + return getJobConfigFromRest(clusterClient, jobId).entrySet().stream() + .filter( + entry -> + entry.getKey().startsWith("execution.checkpointing.") + || entry.getKey() + .startsWith("state.backend.changelog")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + } + } + + private Map mapCheckpointConfiguration( + Map rawResponse, JobID jobId) { + Map mappedConfig = new HashMap<>(); + + // Handle simple field mappings + CHECKPOINT_FIELD_MAPPINGS.forEach( + (jsonField, configKey) -> { + if (rawResponse.containsKey(jsonField)) { + String value = String.valueOf(rawResponse.get(jsonField)); + + // Special handling for processing mode + if ("processingMode".equals(jsonField)) { + value = mapProcessingMode(value); + } + + // Add duration suffix for time-based fields + if (DURATION_FIELDS.contains(jsonField)) { + value += "ms"; + } + + mappedConfig.put(configKey, value); + } + }); + + // Handle complex nested mappings + mapExternalizedCheckpointInfo(rawResponse, mappedConfig); + mapStateBackend(rawResponse, mappedConfig); + mapCheckpointStorage(rawResponse, mappedConfig); + mapStateChangelog(rawResponse, mappedConfig); + + LOG.debug( + "Mapped {} checkpoint configuration entries for job {}: {}", + mappedConfig.size(), + jobId, + mappedConfig); + return mappedConfig; + } + + private String mapProcessingMode(String mode) { + if ("exactly_once".equals(mode)) { + return "EXACTLY_ONCE"; + } else if ("at_least_once".equals(mode)) { + return "AT_LEAST_ONCE"; + } else { + return mode.toUpperCase(); + } + } + + private void mapExternalizedCheckpointInfo( + Map rawResponse, Map mappedConfig) { + Object externalizationObj = rawResponse.get("externalizedCheckpointInfo"); + if (externalizationObj instanceof Map) { + @SuppressWarnings("unchecked") + Map externalization = (Map) externalizationObj; + Boolean enabled = (Boolean) externalization.get("enabled"); + Boolean deleteOnCancellation = (Boolean) externalization.get("deleteOnCancellation"); + + String retention = "NO_EXTERNALIZED_CHECKPOINTS"; + if (Boolean.TRUE.equals(enabled)) { + retention = + Boolean.TRUE.equals(deleteOnCancellation) + ? "DELETE_ON_CANCELLATION" + : "RETAIN_ON_CANCELLATION"; + } + mappedConfig.put( + "execution.checkpointing.externalized-checkpoint-retention", retention); + } + } + + private void mapStateBackend( + Map rawResponse, Map mappedConfig) { + if (rawResponse.containsKey("stateBackend")) { + String stateBackend = String.valueOf(rawResponse.get("stateBackend")); + String backendType; + if ("EmbeddedRocksDBStateBackend".equals(stateBackend)) { + backendType = "rocksdb"; + } else if ("HashMapStateBackend".equals(stateBackend)) { + backendType = "hashmap"; + } else { + backendType = stateBackend.toLowerCase(); + } + mappedConfig.put("state.backend.type", backendType); + } + } + + private void mapCheckpointStorage( + Map rawResponse, Map mappedConfig) { + if (rawResponse.containsKey("checkpointStorage")) { + String checkpointStorage = String.valueOf(rawResponse.get("checkpointStorage")); + String storageType; + if ("FileSystemCheckpointStorage".equals(checkpointStorage)) { + storageType = "filesystem"; + } else if ("JobManagerCheckpointStorage".equals(checkpointStorage)) { + storageType = "jobmanager"; + } else { + storageType = checkpointStorage.toLowerCase(); + } + mappedConfig.put("execution.checkpointing.storage", storageType); + } + } + + private void mapStateChangelog( + Map rawResponse, Map mappedConfig) { + if (rawResponse.containsKey("periodicMaterializationInterval")) { + String intervalMs = String.valueOf(rawResponse.get("periodicMaterializationInterval")); + mappedConfig.put("state.changelog.periodic-materialize.interval", intervalMs + "ms"); + } + + if (rawResponse.containsKey("changelogStorage")) { + mappedConfig.put( + "state.changelog.storage", String.valueOf(rawResponse.get("changelogStorage"))); + } + } + + private Map getJobConfigFromRest( + RestClusterClient clusterClient, JobID jobId) throws Exception { + var jobConfigHeaders = JobConfigHeaders.getInstance(); + var parameters = new JobMessageParameters(); + parameters.jobPathParameter.resolve(jobId); + var jobConfigInfo = + clusterClient + .sendRequest(jobConfigHeaders, parameters, EmptyRequestBody.getInstance()) + .get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS); + return objectMapper.convertValue( + jobConfigInfo, + new com.fasterxml.jackson.core.type.TypeReference>() {}); + } + @VisibleForTesting protected void runJar( JobSpec job, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index b1a078fbb..d13fc0683 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -133,6 +133,28 @@ JobExceptionsInfoWithHistory getJobExceptions( AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) throws Exception; + /** + * Fetches the actual runtime configuration of a job from Flink REST API. This is used to get + * the real configuration after programmatic overrides. + * + * @param conf Configuration for REST client + * @param jobId Job ID to fetch configuration for + * @return Map of configuration key-value pairs + * @throws Exception if REST call fails + */ + Map getJobConfiguration(Configuration conf, JobID jobId) throws Exception; + + /** + * Fetches the checkpoint configuration of a job from Flink REST API. + * + * @param conf Configuration for REST client + * @param jobId Job ID to fetch checkpoint configuration for + * @return CheckpointConfigInfo containing checkpoint configuration + * @throws Exception if REST call fails + */ + Map getJobCheckpointConfiguration(Configuration conf, JobID jobId) + throws Exception; + /** Result of a cancel operation. */ @AllArgsConstructor class CancelResult {