diff --git a/docs/development/extensions-core/k8s-jobs.md b/docs/development/extensions-core/k8s-jobs.md index 289934646af5..6dc7bdd70ea9 100644 --- a/docs/development/extensions-core/k8s-jobs.md +++ b/docs/development/extensions-core/k8s-jobs.md @@ -48,9 +48,9 @@ Other configurations required are: Druid operators can dynamically tune certain features within this extension. You don't need to restart the Overlord service for these changes to take effect. -Druid can dynamically tune [pod template selection](#pod-template-selection), which allows you to configure the pod -template based on the task to be run. To enable dynamic pod template selection, first configure the -[custom template pod adapter](#custom-template-pod-adapter). +Druid can dynamically tune [pod template selection](#pod-template-selection) and [capacity](#properties). Where capacity refers to `druid.indexer.runner.capacity`. + +Pod template selection allows you to configure the pod template based on the task to be run. To enable dynamic pod template selection, first configure the [custom template pod adapter](#custom-template-pod-adapter). Use the following APIs to view and update the dynamic configuration for the Kubernetes task runner. @@ -126,7 +126,8 @@ Host: http://ROUTER_IP:ROUTER_PORT "type": ["index_kafka"] } ] - } + }, + "capacity": 12 } ``` @@ -135,6 +136,8 @@ Host: http://ROUTER_IP:ROUTER_PORT Updates the dynamic configuration for the Kubernetes Task Runner +Note: Both `podTemplateSelectStrategy` and `capacity` are optional fields. A POST request may include either, both, or neither. + ##### URL `POST` `/druid/indexer/v1/k8s/taskrunner/executionconfig` @@ -193,7 +196,8 @@ curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskrunner/executionconf "type": ["index_kafka"] } ] - } + }, + "capacity": 6 }' ``` @@ -225,7 +229,8 @@ Content-Type: application/json "type": ["index_kafka"] } ] - } + }, + "capacity": 6 } ``` @@ -309,7 +314,7 @@ Host: http://ROUTER_IP:ROUTER_PORT "comment": "", "ip": "127.0.0.1" }, - "payload": "{\"type\": \"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"}", + "payload": "{\"type\": \"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"},\"capacity\":6", "auditTime": "2024-06-13T20:59:51.622Z" } ] @@ -790,7 +795,7 @@ Should you require the needed permissions for interacting across Kubernetes name | `druid.indexer.runner.annotations` | `JsonObject` | Additional annotations you want to add to peon pod. | `{}` | No | | `druid.indexer.runner.peonMonitors` | `JsonArray` | Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord. | `[]` | No | | `druid.indexer.runner.graceTerminationPeriodSeconds` | `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. | `PT30S` (K8s default) | No | -| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. | `2147483647` | No | +| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. Value will be overridden if a dynamic config value has been set. | `2147483647` | No | | `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core for the task. | `1000` | No | | `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the ingestion task makes a best effort to persist the pod logs from `k8s` to persistent task log storage. The timeout ensures that `k8s` connection issues do not cause the pod to hang indefinitely thereby blocking Overlord operations. If the timeout occurs before the logs are saved, those logs will not be available in Druid. | `PT300S` | NO | diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java index 021abef08281..936e86c888ce 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java @@ -95,7 +95,7 @@ public class KubernetesOverlordModule implements DruidModule public void configure(Binder binder) { // druid.indexer.runner.type=k8s - JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerConfig.class); + JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerStaticConfig.class); JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX, KubernetesAndWorkerTaskRunnerConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class); JacksonConfigProvider.bind(binder, KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class, null); @@ -150,10 +150,20 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, JDK_HTTPCLIENT_PROPERITES_PREFIX, DruidKubernetesJdkHttpClientConfig.class); } + @Provides + @LazySingleton + public KubernetesTaskRunnerEffectiveConfig provideEffectiveConfig( + KubernetesTaskRunnerStaticConfig staticConfig, + Supplier dynamicConfigSupplier + ) + { + return new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicConfigSupplier); + } + @Provides @LazySingleton public DruidKubernetesClient makeKubernetesClient( - KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, + KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerConfig, DruidKubernetesHttpClientFactory httpClientFactory, Lifecycle lifecycle ) @@ -217,7 +227,7 @@ TaskRunnerFactory provideWorkerTaskRunner( TaskAdapter provideTaskAdapter( DruidKubernetesClient client, Properties properties, - KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, + KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig, TaskConfig taskConfig, StartupLoggingConfig startupLoggingConfig, @Self DruidNode druidNode, @@ -260,7 +270,7 @@ TaskAdapter provideTaskAdapter( druidNode, smileMapper, taskLogs, - new DynamicConfigPodTemplateSelector(properties, dynamicConfigRef) + new DynamicConfigPodTemplateSelector(properties, kubernetesTaskRunnerConfig) ); } else { return new SingleContainerTaskAdapter( diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 51913d13a71e..48d709d1f841 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.common.config.ConfigManager; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.error.DruidException; import org.apache.druid.indexer.RunnerTaskState; @@ -44,6 +45,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -56,6 +58,7 @@ import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException; +import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogStreamer; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -76,8 +79,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -100,6 +106,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner { private static final EmittingLogger log = new EmittingLogger(KubernetesTaskRunner.class); + private static final String OBSERVER_KEY = "k8s-task-runner-capacity-%s"; private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); // to cleanup old jobs that might not have been deleted. @@ -111,19 +118,23 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner private final KubernetesPeonClient client; private final KubernetesTaskRunnerConfig config; private final ListeningExecutorService exec; + private final ThreadPoolExecutor tpe; private final HttpClient httpClient; private final PeonLifecycleFactory peonLifecycleFactory; private final ServiceEmitter emitter; // currently worker categories aren't supported, so it's hardcoded. protected static final String WORKER_CATEGORY = "_k8s_worker_category"; + private final AtomicInteger currentCapacity; + public KubernetesTaskRunner( TaskAdapter adapter, KubernetesTaskRunnerConfig config, KubernetesPeonClient client, HttpClient httpClient, PeonLifecycleFactory peonLifecycleFactory, - ServiceEmitter emitter + ServiceEmitter emitter, + ConfigManager configManager ) { this.adapter = adapter; @@ -132,10 +143,12 @@ public KubernetesTaskRunner( this.httpClient = httpClient; this.peonLifecycleFactory = peonLifecycleFactory; this.cleanupExecutor = Executors.newScheduledThreadPool(1); - this.exec = MoreExecutors.listeningDecorator( - Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d") - ); this.emitter = emitter; + + this.currentCapacity = new AtomicInteger(config.getCapacity()); + this.tpe = new ThreadPoolExecutor(currentCapacity.get(), currentCapacity.get(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), Execs.makeThreadFactory("k8s-task-runner-%d", null)); + this.exec = MoreExecutors.listeningDecorator(this.tpe); + configManager.addListener(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, StringUtils.format(OBSERVER_KEY, Thread.currentThread().getId()), this::syncCapacityWithDynamicConfig); } @Override @@ -179,6 +192,24 @@ protected KubernetesWorkItem joinAsync(Task task) } } + private void syncCapacityWithDynamicConfig(KubernetesTaskRunnerDynamicConfig config) + { + int newCapacity = config.getCapacity(); + if (newCapacity == currentCapacity.get()) { + return; + } + log.info("Adjusting k8s task runner capacity from [%d] to [%d]", currentCapacity.get(), newCapacity); + // maximum pool size must always be greater than or equal to the core pool size + if (newCapacity < currentCapacity.get()) { + tpe.setCorePoolSize(newCapacity); + tpe.setMaximumPoolSize(newCapacity); + } else { + tpe.setMaximumPoolSize(newCapacity); + tpe.setCorePoolSize(newCapacity); + } + currentCapacity.set(newCapacity); + } + private TaskStatus runTask(Task task) { return doTask(task, true); @@ -294,7 +325,7 @@ public void shutdown(String taskid, String reason) synchronized (tasks) { tasks.remove(taskid); } - + } @Override @@ -420,7 +451,7 @@ public void stop() @Override public Map getTotalTaskSlotCount() { - return ImmutableMap.of(WORKER_CATEGORY, (long) config.getCapacity()); + return ImmutableMap.of(WORKER_CATEGORY, (long) currentCapacity.get()); } @Override @@ -438,13 +469,13 @@ public Optional getScalingStats() @Override public Map getIdleTaskSlotCount() { - return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0, config.getCapacity() - tasks.size())); + return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0, currentCapacity.get() - tasks.size())); } @Override public Map getUsedTaskSlotCount() { - return ImmutableMap.of(WORKER_CATEGORY, (long) Math.min(config.getCapacity(), tasks.size())); + return ImmutableMap.of(WORKER_CATEGORY, (long) Math.min(currentCapacity.get(), tasks.size())); } @Override @@ -535,7 +566,7 @@ public RunnerTaskState getRunnerTaskState(String taskId) @Override public int getTotalCapacity() { - return config.getCapacity(); + return currentCapacity.get(); } @Override diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 663126ba2e65..e7b9eb2b8043 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -19,339 +19,60 @@ package org.apache.druid.k8s.overlord; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import org.apache.commons.lang3.ObjectUtils; import org.joda.time.Period; -import javax.annotation.Nonnull; import javax.validation.constraints.Max; import javax.validation.constraints.Min; -import javax.validation.constraints.NotNull; + import java.util.List; import java.util.Map; -public class KubernetesTaskRunnerConfig +public interface KubernetesTaskRunnerConfig { - @JsonProperty - @NotNull - private String namespace; - - @JsonProperty - private String k8sTaskPodNamePrefix = ""; - - // This property is the namespace that the Overlord is running in. - // For cases where we want task pods to run on different namespace from the overlord, we need to specify the namespace of the overlord here. - // Else, we can simply leave this field alone. - @JsonProperty - private String overlordNamespace = ""; - - @JsonProperty - private boolean debugJobs = false; - - /** - * Deprecated, please specify adapter type runtime property instead - *

- * I.E `druid.indexer.runner.k8s.adapter.type: overlordMultiContainer` - */ - @Deprecated - @JsonProperty - private boolean sidecarSupport = false; - - @JsonProperty - // if this is not set, then the first container in your pod spec is assumed to be the overlord container. - // usually this is fine, but when you are dynamically adding sidecars like istio, the service mesh could - // in fact place the istio-proxy container as the first container. Thus, you would specify this value to - // the name of your primary container. e.g. druid-overlord - private String primaryContainerName = null; - - @JsonProperty - // for multi-container jobs, we need this image to shut down sidecars after the main container - // has completed - private String kubexitImage = "karlkfi/kubexit:v0.3.2"; - - // how much time to wait for preStop hooks to execute - // lower number speeds up pod termination time to release locks - // faster, defaults to your k8s setup, usually 30 seconds. - private Long graceTerminationPeriodSeconds = null; - - @JsonProperty - // disable using http / https proxy environment variables - private boolean disableClientProxy; - - @JsonProperty - @NotNull - private Period maxTaskDuration = new Period("PT4H"); - - @JsonProperty - @NotNull - // how long to wait for the jobs to be cleaned up. - private Period taskCleanupDelay = new Period("P2D"); - - @JsonProperty - @NotNull - // interval for k8s job cleanup to run - private Period taskCleanupInterval = new Period("PT10m"); - - @JsonProperty - @NotNull - // how long to wait to join peon k8s jobs on startup - private Period taskJoinTimeout = new Period("PT1M"); - - - @JsonProperty - @NotNull - // how long to wait for the peon k8s job to launch - private Period k8sjobLaunchTimeout = new Period("PT1H"); - - @JsonProperty - @NotNull - // how long to wait for log saving operations to complete - private Period logSaveTimeout = new Period("PT300S"); - - @JsonProperty - // ForkingTaskRunner inherits the monitors from the MM, in k8s mode - // the peon inherits the monitors from the overlord, so if someone specifies - // a TaskCountStatsMonitor in the overlord for example, the peon process - // fails because it can not inject this monitor in the peon process. - private List peonMonitors = ImmutableList.of(); - - @JsonProperty - @NotNull - private List javaOptsArray = ImmutableList.of(); - - @JsonProperty - @NotNull - private int cpuCoreInMicro = 0; - - @JsonProperty - @NotNull - private Map labels = ImmutableMap.of(); - - @JsonProperty - @NotNull - private Map annotations = ImmutableMap.of(); - - @JsonProperty - @Min(1) - @Max(Integer.MAX_VALUE) - @NotNull - private Integer capacity = Integer.MAX_VALUE; - - public KubernetesTaskRunnerConfig() - { - } + String getNamespace(); - private KubernetesTaskRunnerConfig( - @Nonnull String namespace, - String overlordNamespace, - String k8sTaskPodNamePrefix, - boolean debugJobs, - boolean sidecarSupport, - String primaryContainerName, - String kubexitImage, - Long graceTerminationPeriodSeconds, - boolean disableClientProxy, - Period maxTaskDuration, - Period taskCleanupDelay, - Period taskCleanupInterval, - Period k8sjobLaunchTimeout, - Period logSaveTimeout, - List peonMonitors, - List javaOptsArray, - int cpuCoreInMicro, - Map labels, - Map annotations, - Integer capacity, - Period taskJoinTimeout - ) - { - this.namespace = namespace; - this.overlordNamespace = ObjectUtils.getIfNull( - overlordNamespace, - this.overlordNamespace - ); - this.k8sTaskPodNamePrefix = k8sTaskPodNamePrefix; - this.debugJobs = ObjectUtils.getIfNull( - debugJobs, - this.debugJobs - ); - this.sidecarSupport = ObjectUtils.getIfNull( - sidecarSupport, - this.sidecarSupport - ); - this.primaryContainerName = ObjectUtils.getIfNull( - primaryContainerName, - this.primaryContainerName - ); - this.kubexitImage = ObjectUtils.getIfNull( - kubexitImage, - this.kubexitImage - ); - this.graceTerminationPeriodSeconds = ObjectUtils.getIfNull( - graceTerminationPeriodSeconds, - this.graceTerminationPeriodSeconds - ); - this.disableClientProxy = disableClientProxy; - this.maxTaskDuration = ObjectUtils.getIfNull( - maxTaskDuration, - this.maxTaskDuration - ); - this.taskCleanupDelay = ObjectUtils.getIfNull( - taskCleanupDelay, - this.taskCleanupDelay - ); - this.taskCleanupInterval = ObjectUtils.getIfNull( - taskCleanupInterval, - this.taskCleanupInterval - ); - this.k8sjobLaunchTimeout = ObjectUtils.getIfNull( - k8sjobLaunchTimeout, - this.k8sjobLaunchTimeout - ); - this.taskJoinTimeout = ObjectUtils.getIfNull( - taskJoinTimeout, - this.taskJoinTimeout - ); - this.logSaveTimeout = ObjectUtils.getIfNull( - logSaveTimeout, - this.logSaveTimeout - ); - this.peonMonitors = ObjectUtils.getIfNull( - peonMonitors, - this.peonMonitors - ); - this.javaOptsArray = ObjectUtils.getIfNull( - javaOptsArray, - this.javaOptsArray - ); - this.cpuCoreInMicro = ObjectUtils.getIfNull( - cpuCoreInMicro, - this.cpuCoreInMicro - ); - this.labels = ObjectUtils.getIfNull( - labels, - this.labels - ); - this.annotations = ObjectUtils.getIfNull( - annotations, - this.annotations - ); - this.capacity = ObjectUtils.getIfNull( - capacity, - this.capacity - ); - } + String getOverlordNamespace(); - public String getNamespace() - { - return namespace; - } - - public String getOverlordNamespace() - { - return overlordNamespace; - } - - public String getK8sTaskPodNamePrefix() - { - return k8sTaskPodNamePrefix; - } + String getK8sTaskPodNamePrefix(); - public boolean isDebugJobs() - { - return debugJobs; - } + boolean isDebugJobs(); @Deprecated - public boolean isSidecarSupport() - { - return sidecarSupport; - } + boolean isSidecarSupport(); - public String getPrimaryContainerName() - { - return primaryContainerName; - } + String getPrimaryContainerName(); - public String getKubexitImage() - { - return kubexitImage; - } + String getKubexitImage(); - public Long getGraceTerminationPeriodSeconds() - { - return graceTerminationPeriodSeconds; - } + Long getGraceTerminationPeriodSeconds(); - public boolean isDisableClientProxy() - { - return disableClientProxy; - } + boolean isDisableClientProxy(); - public Period getTaskTimeout() - { - return maxTaskDuration; - } + Period getTaskTimeout(); - public Period getTaskJoinTimeout() - { - return taskJoinTimeout; - } + Period getTaskJoinTimeout(); + Period getTaskCleanupDelay(); - public Period getTaskCleanupDelay() - { - return taskCleanupDelay; - } + Period getTaskCleanupInterval(); - public Period getTaskCleanupInterval() - { - return taskCleanupInterval; - } + Period getTaskLaunchTimeout(); - public Period getTaskLaunchTimeout() - { - return k8sjobLaunchTimeout; - } + Period getLogSaveTimeout(); - public Period getLogSaveTimeout() - { - return logSaveTimeout; - } + List getPeonMonitors(); - public List getPeonMonitors() - { - return peonMonitors; - } - - public List getJavaOptsArray() - { - return javaOptsArray; - } + List getJavaOptsArray(); - public int getCpuCoreInMicro() - { - return cpuCoreInMicro; - } + int getCpuCoreInMicro(); - public Map getLabels() - { - return labels; - } + Map getLabels(); - public Map getAnnotations() - { - return annotations; - } + Map getAnnotations(); - public Integer getCapacity() - { - return capacity; - } + Integer getCapacity(); - public static Builder builder() + static Builder builder() { return new Builder(); } @@ -511,9 +232,9 @@ public Builder withLogSaveTimeout(Period logSaveTimeout) return this; } - public KubernetesTaskRunnerConfig build() + public KubernetesTaskRunnerStaticConfig build() { - return new KubernetesTaskRunnerConfig( + return new KubernetesTaskRunnerStaticConfig( this.namespace, this.overlordNamespace, this.k8sTaskPodNamePrefix, diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java new file mode 100644 index 000000000000..c1593c3577db --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.google.common.base.Supplier; +import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig; +import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy; +import org.joda.time.Period; + +import java.util.List; +import java.util.Map; + +/** + * Provides a flexible mechanism to configure Kubernetes task pods, + * by merging the static base settings from {@link KubernetesTaskRunnerConfig} + * with dynamic overrides from {@link KubernetesTaskRunnerDynamicConfig}. + *

+ * Kubernetes will always use this effective config to run new tasks. + */ +public class KubernetesTaskRunnerEffectiveConfig implements KubernetesTaskRunnerConfig +{ + private final KubernetesTaskRunnerStaticConfig staticConfig; + private final Supplier dynamicConfigSupplier; + + public KubernetesTaskRunnerEffectiveConfig( + KubernetesTaskRunnerStaticConfig staticConfig, + Supplier dynamicConfigSupplier + ) + { + this.staticConfig = staticConfig; + this.dynamicConfigSupplier = dynamicConfigSupplier; + } + + @Override + public String getNamespace() + { + return staticConfig.getNamespace(); + } + + @Override + public String getOverlordNamespace() + { + return staticConfig.getOverlordNamespace(); + } + + @Override + public String getK8sTaskPodNamePrefix() + { + return staticConfig.getK8sTaskPodNamePrefix(); + } + + @Override + public boolean isDebugJobs() + { + return staticConfig.isDebugJobs(); + } + + @Override + public boolean isSidecarSupport() + { + return staticConfig.isSidecarSupport(); + } + + @Override + public String getPrimaryContainerName() + { + return staticConfig.getPrimaryContainerName(); + } + + @Override + public String getKubexitImage() + { + return staticConfig.getKubexitImage(); + } + + @Override + public Long getGraceTerminationPeriodSeconds() + { + return staticConfig.getGraceTerminationPeriodSeconds(); + } + + @Override + public boolean isDisableClientProxy() + { + return staticConfig.isDisableClientProxy(); + } + + @Override + public Period getTaskTimeout() + { + return staticConfig.getTaskTimeout(); + } + + @Override + public Period getTaskJoinTimeout() + { + return staticConfig.getTaskJoinTimeout(); + } + + @Override + public Period getTaskCleanupDelay() + { + return staticConfig.getTaskCleanupDelay(); + } + + @Override + public Period getTaskCleanupInterval() + { + return staticConfig.getTaskCleanupInterval(); + } + + @Override + public Period getTaskLaunchTimeout() + { + return staticConfig.getTaskLaunchTimeout(); + } + + @Override + public Period getLogSaveTimeout() + { + return staticConfig.getLogSaveTimeout(); + } + + @Override + public List getPeonMonitors() + { + return staticConfig.getPeonMonitors(); + } + + @Override + public List getJavaOptsArray() + { + return staticConfig.getJavaOptsArray(); + } + + @Override + public int getCpuCoreInMicro() + { + return staticConfig.getCpuCoreInMicro(); + } + + @Override + public Map getLabels() + { + return staticConfig.getLabels(); + } + + @Override + public Map getAnnotations() + { + return staticConfig.getAnnotations(); + } + + @Override + public Integer getCapacity() + { + if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null || dynamicConfigSupplier.get().getCapacity() == null) { + return staticConfig.getCapacity(); + } + return dynamicConfigSupplier.get().getCapacity(); + } + + public PodTemplateSelectStrategy getPodTemplateSelectStrategy() + { + if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null || dynamicConfigSupplier.get().getPodTemplateSelectStrategy() == null) { + return KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY; + } + return dynamicConfigSupplier.get().getPodTemplateSelectStrategy(); + } +} + diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index dd8111ed49ed..516d229c8917 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; +import org.apache.druid.common.config.ConfigManager; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.indexing.overlord.TaskRunnerFactory; @@ -39,23 +40,25 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory adapterTypeAllowingTasksInDifferentNamespaces = Set.of(PodTemplateTaskAdapter.TYPE); @Inject public KubernetesTaskRunnerFactory( @Smile ObjectMapper smileMapper, @EscalatedGlobal final HttpClient httpClient, - KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig, + KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig, TaskLogs taskLogs, DruidKubernetesClient druidKubernetesClient, ServiceEmitter emitter, - TaskAdapter taskAdapter + TaskAdapter taskAdapter, + ConfigManager configManager ) { this.smileMapper = smileMapper; @@ -65,6 +68,7 @@ public KubernetesTaskRunnerFactory( this.druidKubernetesClient = druidKubernetesClient; this.emitter = emitter; this.taskAdapter = taskAdapter; + this.configManager = configManager; } @Override @@ -99,7 +103,8 @@ public KubernetesTaskRunner build() smileMapper, kubernetesTaskRunnerConfig.getLogSaveTimeout().toStandardDuration().getMillis() ), - emitter + emitter, + configManager ); return runner; } diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java new file mode 100644 index 000000000000..b68e70075dbf --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfig.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang3.ObjectUtils; +import org.joda.time.Period; + +import javax.annotation.Nonnull; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import java.util.List; +import java.util.Map; + +/** + * This configuration is populated from runtime properties with the prefix + * {@code druid.indexer.runner}. It is the base configuration that + * {@link KubernetesTaskRunnerEffectiveConfig} will use if no dynamic config is provided. + */ +public class KubernetesTaskRunnerStaticConfig implements KubernetesTaskRunnerConfig +{ + @JsonProperty + @NotNull + private String namespace; + + @JsonProperty + private String k8sTaskPodNamePrefix = ""; + + // This property is the namespace that the Overlord is running in. + // For cases where we want task pods to run on different namespace from the overlord, we need to specify the namespace of the overlord here. + // Else, we can simply leave this field alone. + @JsonProperty + private String overlordNamespace = ""; + + @JsonProperty + private boolean debugJobs = false; + + /** + * Deprecated, please specify adapter type runtime property instead + *

+ * I.E `druid.indexer.runner.k8s.adapter.type: overlordMultiContainer` + */ + @Deprecated + @JsonProperty + private boolean sidecarSupport = false; + + @JsonProperty + // if this is not set, then the first container in your pod spec is assumed to be the overlord container. + // usually this is fine, but when you are dynamically adding sidecars like istio, the service mesh could + // in fact place the istio-proxy container as the first container. Thus, you would specify this value to + // the name of your primary container. e.g. druid-overlord + private String primaryContainerName = null; + + @JsonProperty + // for multi-container jobs, we need this image to shut down sidecars after the main container + // has completed + private String kubexitImage = "karlkfi/kubexit:v0.3.2"; + + // how much time to wait for preStop hooks to execute + // lower number speeds up pod termination time to release locks + // faster, defaults to your k8s setup, usually 30 seconds. + private Long graceTerminationPeriodSeconds = null; + + @JsonProperty + // disable using http / https proxy environment variables + private boolean disableClientProxy; + + @JsonProperty + @NotNull + private Period maxTaskDuration = new Period("PT4H"); + + @JsonProperty + @NotNull + // how long to wait for the jobs to be cleaned up. + private Period taskCleanupDelay = new Period("P2D"); + + @JsonProperty + @NotNull + // interval for k8s job cleanup to run + private Period taskCleanupInterval = new Period("PT10m"); + + @JsonProperty + @NotNull + // how long to wait to join peon k8s jobs on startup + private Period taskJoinTimeout = new Period("PT1M"); + + + @JsonProperty + @NotNull + // how long to wait for the peon k8s job to launch + private Period k8sjobLaunchTimeout = new Period("PT1H"); + + @JsonProperty + @NotNull + // how long to wait for log saving operations to complete + private Period logSaveTimeout = new Period("PT300S"); + + @JsonProperty + // ForkingTaskRunner inherits the monitors from the MM, in k8s mode + // the peon inherits the monitors from the overlord, so if someone specifies + // a TaskCountStatsMonitor in the overlord for example, the peon process + // fails because it can not inject this monitor in the peon process. + private List peonMonitors = ImmutableList.of(); + + @JsonProperty + @NotNull + private List javaOptsArray = ImmutableList.of(); + + @JsonProperty + @NotNull + private int cpuCoreInMicro = 0; + + @JsonProperty + @NotNull + private Map labels = ImmutableMap.of(); + + @JsonProperty + @NotNull + private Map annotations = ImmutableMap.of(); + + @JsonProperty + @Min(1) + @Max(Integer.MAX_VALUE) + @NotNull + private Integer capacity = Integer.MAX_VALUE; + + public KubernetesTaskRunnerStaticConfig() + { + } + + public KubernetesTaskRunnerStaticConfig( + @Nonnull String namespace, + String overlordNamespace, + String k8sTaskPodNamePrefix, + boolean debugJobs, + boolean sidecarSupport, + String primaryContainerName, + String kubexitImage, + Long graceTerminationPeriodSeconds, + boolean disableClientProxy, + Period maxTaskDuration, + Period taskCleanupDelay, + Period taskCleanupInterval, + Period k8sjobLaunchTimeout, + Period logSaveTimeout, + List peonMonitors, + List javaOptsArray, + int cpuCoreInMicro, + Map labels, + Map annotations, + Integer capacity, + Period taskJoinTimeout + ) + { + this.namespace = namespace; + this.overlordNamespace = ObjectUtils.getIfNull( + overlordNamespace, + this.overlordNamespace + ); + this.k8sTaskPodNamePrefix = k8sTaskPodNamePrefix; + this.debugJobs = ObjectUtils.getIfNull( + debugJobs, + this.debugJobs + ); + this.sidecarSupport = ObjectUtils.getIfNull( + sidecarSupport, + this.sidecarSupport + ); + this.primaryContainerName = ObjectUtils.getIfNull( + primaryContainerName, + this.primaryContainerName + ); + this.kubexitImage = ObjectUtils.getIfNull( + kubexitImage, + this.kubexitImage + ); + this.graceTerminationPeriodSeconds = ObjectUtils.getIfNull( + graceTerminationPeriodSeconds, + this.graceTerminationPeriodSeconds + ); + this.disableClientProxy = disableClientProxy; + this.maxTaskDuration = ObjectUtils.getIfNull( + maxTaskDuration, + this.maxTaskDuration + ); + this.taskCleanupDelay = ObjectUtils.getIfNull( + taskCleanupDelay, + this.taskCleanupDelay + ); + this.taskCleanupInterval = ObjectUtils.getIfNull( + taskCleanupInterval, + this.taskCleanupInterval + ); + this.k8sjobLaunchTimeout = ObjectUtils.getIfNull( + k8sjobLaunchTimeout, + this.k8sjobLaunchTimeout + ); + this.taskJoinTimeout = ObjectUtils.getIfNull( + taskJoinTimeout, + this.taskJoinTimeout + ); + this.logSaveTimeout = ObjectUtils.getIfNull( + logSaveTimeout, + this.logSaveTimeout + ); + this.peonMonitors = ObjectUtils.getIfNull( + peonMonitors, + this.peonMonitors + ); + this.javaOptsArray = ObjectUtils.getIfNull( + javaOptsArray, + this.javaOptsArray + ); + this.cpuCoreInMicro = ObjectUtils.getIfNull( + cpuCoreInMicro, + this.cpuCoreInMicro + ); + this.labels = ObjectUtils.getIfNull( + labels, + this.labels + ); + this.annotations = ObjectUtils.getIfNull( + annotations, + this.annotations + ); + this.capacity = ObjectUtils.getIfNull( + capacity, + this.capacity + ); + } + + @Override + public String getNamespace() + { + return namespace; + } + + @Override + public String getOverlordNamespace() + { + return overlordNamespace; + } + + @Override + public String getK8sTaskPodNamePrefix() + { + return k8sTaskPodNamePrefix; + } + + @Override + public boolean isDebugJobs() + { + return debugJobs; + } + + @Override + @Deprecated + public boolean isSidecarSupport() + { + return sidecarSupport; + } + + @Override + public String getPrimaryContainerName() + { + return primaryContainerName; + } + + @Override + public String getKubexitImage() + { + return kubexitImage; + } + + @Override + public Long getGraceTerminationPeriodSeconds() + { + return graceTerminationPeriodSeconds; + } + + @Override + public boolean isDisableClientProxy() + { + return disableClientProxy; + } + + @Override + public Period getTaskTimeout() + { + return maxTaskDuration; + } + + @Override + public Period getTaskJoinTimeout() + { + return taskJoinTimeout; + } + + + @Override + public Period getTaskCleanupDelay() + { + return taskCleanupDelay; + } + + @Override + public Period getTaskCleanupInterval() + { + return taskCleanupInterval; + } + + @Override + public Period getTaskLaunchTimeout() + { + return k8sjobLaunchTimeout; + } + + @Override + public Period getLogSaveTimeout() + { + return logSaveTimeout; + } + + @Override + public List getPeonMonitors() + { + return peonMonitors; + } + + @Override + public List getJavaOptsArray() + { + return javaOptsArray; + } + + @Override + public int getCpuCoreInMicro() + { + return cpuCoreInMicro; + } + + @Override + public Map getLabels() + { + return labels; + } + + @Override + public Map getAnnotations() + { + return annotations; + } + + @Override + public Integer getCapacity() + { + return capacity; + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java index eddd5e4a1ee1..fde4dcc7b0bf 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfig.java @@ -21,21 +21,26 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; +import javax.annotation.Nullable; import java.util.Objects; public class DefaultKubernetesTaskRunnerDynamicConfig implements KubernetesTaskRunnerDynamicConfig { + @Nullable private final PodTemplateSelectStrategy podTemplateSelectStrategy; + @Nullable + private final Integer capacity; + @JsonCreator public DefaultKubernetesTaskRunnerDynamicConfig( - @JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy podTemplateSelectStrategy + @JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy podTemplateSelectStrategy, + @JsonProperty("capacity") Integer capacity ) { - Preconditions.checkNotNull(podTemplateSelectStrategy); this.podTemplateSelectStrategy = podTemplateSelectStrategy; + this.capacity = capacity; } @Override @@ -45,6 +50,31 @@ public PodTemplateSelectStrategy getPodTemplateSelectStrategy() return podTemplateSelectStrategy; } + @Override + @JsonProperty + public Integer getCapacity() + { + return capacity; + } + + @Override + public KubernetesTaskRunnerDynamicConfig merge(KubernetesTaskRunnerDynamicConfig other) + { + if (other == null) { + return this; + } + Integer mergeCapacity = getCapacity(); + if (other.getCapacity() != null) { + mergeCapacity = other.getCapacity(); + } + + PodTemplateSelectStrategy mergePodTemplateSelectStrategy = getPodTemplateSelectStrategy(); + if (other.getPodTemplateSelectStrategy() != null) { + mergePodTemplateSelectStrategy = other.getPodTemplateSelectStrategy(); + } + return new DefaultKubernetesTaskRunnerDynamicConfig(mergePodTemplateSelectStrategy, mergeCapacity); + } + @Override public boolean equals(Object o) { @@ -55,13 +85,14 @@ public boolean equals(Object o) return false; } DefaultKubernetesTaskRunnerDynamicConfig that = (DefaultKubernetesTaskRunnerDynamicConfig) o; - return Objects.equals(podTemplateSelectStrategy, that.podTemplateSelectStrategy); + return Objects.equals(capacity, that.capacity) && + Objects.equals(podTemplateSelectStrategy, that.podTemplateSelectStrategy); } @Override public int hashCode() { - return Objects.hashCode(podTemplateSelectStrategy); + return Objects.hash(podTemplateSelectStrategy, capacity); } @Override @@ -69,6 +100,7 @@ public String toString() { return "DefaultKubernetesTaskRunnerDynamicConfig{" + "podTemplateSelectStrategy=" + podTemplateSelectStrategy + + "capacity=" + capacity + '}'; } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java index ec03b045f503..432a41933ede 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java @@ -73,7 +73,7 @@ public KubernetesTaskExecutionConfigResource( * Updates the Kubernetes execution configuration. * * @param dynamicConfig the new execution configuration to set - * @param req the HTTP servlet request providing context for audit information + * @param req the HTTP servlet request providing context for audit information * @return a response indicating the success or failure of the update operation */ @POST @@ -84,13 +84,19 @@ public Response setExecutionConfig( @Context final HttpServletRequest req ) { + KubernetesTaskRunnerDynamicConfig currentConfig = getDynamicConfig(); + KubernetesTaskRunnerDynamicConfig mergedConfig = dynamicConfig; + + if (currentConfig != null) { + mergedConfig = currentConfig.merge(dynamicConfig); + } final ConfigManager.SetResult setResult = configManager.set( KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - dynamicConfig, + mergedConfig, AuthorizationUtils.buildAuditInfo(req) ); if (setResult.isOk()) { - log.info("Updating K8s execution configs: %s", dynamicConfig); + log.info("Updating K8s execution configs: %s", mergedConfig); return Response.ok().build(); } else { @@ -147,11 +153,15 @@ public Response getExecutionConfigHistory( @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(ConfigResourceFilter.class) public Response getExecutionConfig() + { + return Response.ok(getDynamicConfig()).build(); + } + + private KubernetesTaskRunnerDynamicConfig getDynamicConfig() { if (dynamicConfigRef == null) { dynamicConfigRef = configManager.watch(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class); } - - return Response.ok(dynamicConfigRef.get()).build(); + return dynamicConfigRef.get(); } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java index 4f6d4b07c41d..fd9c9b465313 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfig.java @@ -22,6 +22,9 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + /** * Represents the configuration for task execution within a Kubernetes environment. * This interface allows for dynamic configuration of task execution strategies based @@ -38,7 +41,26 @@ public interface KubernetesTaskRunnerDynamicConfig /** * Retrieves the execution behavior strategy associated with this configuration. + * * @return the execution behavior strategy */ PodTemplateSelectStrategy getPodTemplateSelectStrategy(); + + /** + * Retrieves the capacity associated with this configuration. + * + * @return the capacity + */ + @Min(0) + @Max(Integer.MAX_VALUE) + Integer getCapacity(); + + /** + * Merges this configuration with another, preferring values from {@code other} + * and falling back to this configuration when not present. + * + * @param other the configuration to merge with + * @return the merged configuration + */ + KubernetesTaskRunnerDynamicConfig merge(KubernetesTaskRunnerDynamicConfig other); } diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java index 92832b2ff660..ff4a4c2cd727 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelector.java @@ -26,8 +26,7 @@ import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig; -import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerEffectiveConfig; import java.io.File; import java.nio.file.Files; @@ -43,17 +42,17 @@ public class DynamicConfigPodTemplateSelector implements PodTemplateSelector + ".k8s.podTemplate."; private final Properties properties; - private final Supplier dynamicConfigRef; + private final KubernetesTaskRunnerEffectiveConfig effectiveConfig; // Supplier allows Overlord to read the most recent pod template file without calling initializeTemplatesFromFileSystem() again. private HashMap> podTemplates; public DynamicConfigPodTemplateSelector( Properties properties, - Supplier dynamicConfigRef + KubernetesTaskRunnerEffectiveConfig effectiveConfig ) { this.properties = properties; - this.dynamicConfigRef = dynamicConfigRef; + this.effectiveConfig = effectiveConfig; initializeTemplatesFromFileSystem(); } @@ -120,14 +119,6 @@ private void validateTemplateSupplier(Supplier templateSupplier) th @Override public Optional getPodTemplateForTask(Task task) { - PodTemplateSelectStrategy podTemplateSelectStrategy; - KubernetesTaskRunnerDynamicConfig dynamicConfig = dynamicConfigRef.get(); - if (dynamicConfig == null || dynamicConfig.getPodTemplateSelectStrategy() == null) { - podTemplateSelectStrategy = KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY; - } else { - podTemplateSelectStrategy = dynamicConfig.getPodTemplateSelectStrategy(); - } - - return Optional.of(podTemplateSelectStrategy.getPodTemplateForTask(task, podTemplates)); + return Optional.of(effectiveConfig.getPodTemplateSelectStrategy().getPodTemplateForTask(task, podTemplates)); } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index 2744b82b126d..8c68d9324e83 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -269,7 +269,7 @@ private Map getPodTemplateAnnotations(Task task) throws IOExcept } return podTemplateAnnotationBuilder.build(); } - + private Map getJobLabels(KubernetesTaskRunnerConfig config, Task task) { Preconditions.checkNotNull(config.getNamespace(), "When using Custom Pod Templates, druid.indexer.runner.namespace cannot be null."); diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java index 59c9508005f6..dd8960864e0e 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java @@ -27,6 +27,7 @@ import com.google.inject.ProvisionException; import com.google.inject.TypeLiteral; import org.apache.druid.audit.AuditManager; +import org.apache.druid.common.config.ConfigManager; import org.apache.druid.common.config.ConfigManagerConfig; import org.apache.druid.guice.ConfigModule; import org.apache.druid.guice.DruidGuiceExtensions; @@ -44,6 +45,7 @@ import org.apache.druid.k8s.overlord.common.httpclient.jdk.DruidKubernetesJdkHttpClientFactory; import org.apache.druid.k8s.overlord.common.httpclient.okhttp.DruidKubernetesOkHttpHttpClientFactory; import org.apache.druid.k8s.overlord.common.httpclient.vertx.DruidKubernetesVertxHttpClientFactory; +import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig; import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter; import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter; import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter; @@ -51,14 +53,18 @@ import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.server.DruidNode; +import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import java.net.URL; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; @RunWith(EasyMockRunner.class) public class KubernetesOverlordModuleTest @@ -81,8 +87,26 @@ public class KubernetesOverlordModuleTest private AuditManager auditManager; @Mock private MetadataStorageConnector metadataStorageConnector; + @Mock + private ConfigManager configManager; private Injector injector; + @Before + public void setUpConfigManagerMock() + { + EasyMock.reset(configManager); + EasyMock.expect(configManager.watchConfig( + EasyMock.anyString(), + EasyMock.anyObject() + )).andReturn(new AtomicReference<>(null)).anyTimes(); + EasyMock.expect(configManager.addListener( + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY), + EasyMock.anyString(), + EasyMock.anyObject(Consumer.class) + )).andReturn(true).anyTimes(); + EasyMock.replay(configManager); + } + @Test public void testDefaultHttpRemoteTaskRunnerFactoryBindSuccessfully() { @@ -325,6 +349,7 @@ private Injector makeInjectorWithProperties( }).toInstance(Suppliers.ofInstance(metadataStorageTablesConfig)); binder.bind(AuditManager.class).toInstance(auditManager); binder.bind(MetadataStorageConnector.class).toInstance(metadataStorageConnector); + binder.bind(ConfigManager.class).toInstance(configManager); }, new ConfigModule(), new IndexingServiceTaskLogsModule(props), diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfigTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfigTest.java new file mode 100644 index 000000000000..c44f8a34b4e6 --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfigTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord; + +import com.google.common.base.Supplier; +import org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig; +import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig; +import org.apache.druid.k8s.overlord.execution.PodTemplateSelectStrategy; +import org.apache.druid.k8s.overlord.execution.TaskTypePodTemplateSelectStrategy; +import org.junit.Assert; +import org.junit.Test; + +public class KubernetesTaskRunnerEffectiveConfigTest +{ + @Test + public void test_getCapacity_usesStaticWhenDynamicNull() + { + KubernetesTaskRunnerStaticConfig staticConfig = KubernetesTaskRunnerConfig.builder() + .withCapacity(7) + .build(); + Supplier dynamicSupplier = () -> null; + KubernetesTaskRunnerEffectiveConfig effective = new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier); + + Assert.assertEquals(7, effective.getCapacity().intValue()); + } + + @Test + public void test_getCapacity_usesDynamicWhenProvided() + { + KubernetesTaskRunnerStaticConfig staticConfig = KubernetesTaskRunnerConfig.builder() + .withCapacity(2) + .build(); + Supplier dynamicSupplier = () -> new DefaultKubernetesTaskRunnerDynamicConfig(null, 9); + KubernetesTaskRunnerEffectiveConfig effective = new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier); + + Assert.assertEquals(9, effective.getCapacity().intValue()); + } + + @Test + public void test_getCapacity_usesStaticWhenDynamicNullCapacity() + { + KubernetesTaskRunnerStaticConfig staticConfig = KubernetesTaskRunnerConfig.builder() + .withCapacity(7) + .build(); + Supplier dynamicSupplier = () -> new DefaultKubernetesTaskRunnerDynamicConfig(null, null); + KubernetesTaskRunnerEffectiveConfig effective = new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier); + + Assert.assertEquals(7, effective.getCapacity().intValue()); + } + + @Test + public void test_getPodTemplateSelectStrategy_usesDefaultWhenDynamicNull() + { + KubernetesTaskRunnerStaticConfig staticConfig = KubernetesTaskRunnerConfig.builder().build(); + Supplier dynamicSupplier = () -> null; + KubernetesTaskRunnerEffectiveConfig effective = new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier); + + PodTemplateSelectStrategy strategy = effective.getPodTemplateSelectStrategy(); + Assert.assertTrue(strategy instanceof TaskTypePodTemplateSelectStrategy); + Assert.assertEquals(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY, strategy); + } + + @Test + public void test_getPodTemplateSelectStrategy_usesDynamicWhenProvided() + { + KubernetesTaskRunnerStaticConfig staticConfig = KubernetesTaskRunnerConfig.builder().build(); + PodTemplateSelectStrategy custom = new TaskTypePodTemplateSelectStrategy(); + Supplier dynamicSupplier = () -> new DefaultKubernetesTaskRunnerDynamicConfig(custom, null); + KubernetesTaskRunnerEffectiveConfig effective = new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicSupplier); + + Assert.assertEquals(custom, effective.getPodTemplateSelectStrategy()); + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java index 6849f9a0ecc7..a67ab70a0a8a 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.client.ConfigBuilder; +import org.apache.druid.common.config.ConfigManager; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -32,6 +33,7 @@ import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.NoopTaskLogs; import org.apache.druid.tasklogs.TaskLogs; +import org.easymock.EasyMock; import org.easymock.Mock; import org.junit.Assert; import org.junit.Before; @@ -42,24 +44,28 @@ public class KubernetesTaskRunnerFactoryTest { private ObjectMapper objectMapper; - private KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig; + private KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig; private TaskLogs taskLogs; private DruidKubernetesClient druidKubernetesClient; @Mock private ServiceEmitter emitter; private TaskAdapter taskAdapter; + @Mock private ConfigManager configManager; @Before public void setup() { objectMapper = new TestUtils().getTestObjectMapper(); - kubernetesTaskRunnerConfig = KubernetesTaskRunnerConfig.builder() + KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerStaticConfig = KubernetesTaskRunnerConfig.builder() .withCapacity(1) .build(); taskLogs = new NoopTaskLogs(); druidKubernetesClient = new DruidKubernetesClient(new DruidKubernetesVertxHttpClientFactory(new DruidKubernetesVertxHttpClientConfig()), new ConfigBuilder().build()); taskAdapter = new TestTaskAdapter(); + kubernetesTaskRunnerConfig = new KubernetesTaskRunnerEffectiveConfig(kubernetesTaskRunnerStaticConfig, () -> null); + configManager = EasyMock.createNiceMock(ConfigManager.class); + EasyMock.replay(configManager); } @Test @@ -72,7 +78,8 @@ public void test_get_returnsSameKuberentesTaskRunner_asBuild() taskLogs, druidKubernetesClient, emitter, - taskAdapter + taskAdapter, + configManager ); KubernetesTaskRunner expectedRunner = factory.build(); diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfigTest.java similarity index 94% rename from extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java rename to extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfigTest.java index 1f4a7281f649..91b5148e2688 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerStaticConfigTest.java @@ -29,15 +29,15 @@ import java.io.IOException; -public class KubernetesTaskRunnerConfigTest +public class KubernetesTaskRunnerStaticConfigTest { @Test public void test_deserializable() throws IOException { ObjectMapper mapper = new DefaultObjectMapper(); - KubernetesTaskRunnerConfig config = mapper.readValue( + KubernetesTaskRunnerStaticConfig config = mapper.readValue( this.getClass().getClassLoader().getResource("kubernetesTaskRunnerConfig.json"), - KubernetesTaskRunnerConfig.class + KubernetesTaskRunnerStaticConfig.class ); Assert.assertEquals("namespace", config.getNamespace()); @@ -60,7 +60,7 @@ public void test_deserializable() throws IOException @Test public void test_builder_preservesDefaults() { - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("namespace") .withDisableClientProxy(true) .build(); @@ -85,7 +85,7 @@ public void test_builder_preservesDefaults() @Test public void test_builder() { - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("namespace") .withDebugJob(true) .withSidecarSupport(true) diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 083bf2db0e6c..1c72078a52e7 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -20,6 +20,7 @@ package org.apache.druid.k8s.overlord; import com.google.common.base.Optional; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -27,6 +28,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import org.apache.commons.io.IOUtils; +import org.apache.druid.common.config.ConfigManager; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -39,6 +41,8 @@ import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; +import org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig; +import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -53,6 +57,8 @@ import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; @@ -60,6 +66,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -78,27 +85,37 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle; @Mock private ServiceEmitter emitter; @Mock private ListenableFuture statusFuture; + @Mock private ConfigManager configManager; - private KubernetesTaskRunnerConfig config; + private KubernetesTaskRunnerStaticConfig staticConfig; + private KubernetesTaskRunnerEffectiveConfig config; private KubernetesTaskRunner runner; private Task task; @Before public void setup() { - config = KubernetesTaskRunnerConfig.builder() + staticConfig = KubernetesTaskRunnerConfig.builder() .withCapacity(1) .build(); + Supplier dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(null, 1); + + config = new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicConfigRef); + task = K8sTestUtils.createTask(ID, 0); + configManager = EasyMock.createNiceMock(ConfigManager.class); + EasyMock.replay(configManager); + runner = new KubernetesTaskRunner( taskAdapter, config, peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + configManager ); } @@ -113,7 +130,8 @@ public void test_start_withExistingJobs() throws IOException peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + configManager ) { @Override @@ -163,7 +181,8 @@ public void test_start_withExistingJobs_oneJobFails() throws IOException peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + configManager ) { @Override @@ -219,7 +238,8 @@ public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOE peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + configManager ) { @Override @@ -771,9 +791,75 @@ public void test_stop() peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + configManager ); kubernetesTaskRunner.stop(); Assert.assertThrows(RejectedExecutionException.class, () -> kubernetesTaskRunner.run(task)); } + + @Test + public void test_syncCapacityWithDynamicConfig_increase_updatesExecutorAndCapacity() throws Exception + { + Method method = KubernetesTaskRunner.class.getDeclaredMethod( + "syncCapacityWithDynamicConfig", + KubernetesTaskRunnerDynamicConfig.class + ); + method.setAccessible(true); + + // increase from 1 -> 3 + method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null, 3)); + + Field tpeField = KubernetesTaskRunner.class.getDeclaredField("tpe"); + tpeField.setAccessible(true); + ThreadPoolExecutor executor = (ThreadPoolExecutor) tpeField.get(runner); + + Assert.assertEquals(3, executor.getCorePoolSize()); + Assert.assertEquals(3, executor.getMaximumPoolSize()); + Assert.assertEquals(3, runner.getTotalCapacity()); + } + + @Test + public void test_syncCapacityWithDynamicConfig_decrease_updatesExecutorAndCapacity() throws Exception + { + Method method = KubernetesTaskRunner.class.getDeclaredMethod( + "syncCapacityWithDynamicConfig", + KubernetesTaskRunnerDynamicConfig.class + ); + method.setAccessible(true); + + // first increase to 4 to ensure we can decrease after + method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null, 4)); + // then decrease 4 -> 2 + method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null, 2)); + + Field tpeField = KubernetesTaskRunner.class.getDeclaredField("tpe"); + tpeField.setAccessible(true); + ThreadPoolExecutor executor = (ThreadPoolExecutor) tpeField.get(runner); + + Assert.assertEquals(2, executor.getCorePoolSize()); + Assert.assertEquals(2, executor.getMaximumPoolSize()); + Assert.assertEquals(2, runner.getTotalCapacity()); + } + + @Test + public void test_syncCapacityWithDynamicConfig_sameCapacity_noChangeAndNoError() throws Exception + { + Method method = KubernetesTaskRunner.class.getDeclaredMethod( + "syncCapacityWithDynamicConfig", + KubernetesTaskRunnerDynamicConfig.class + ); + method.setAccessible(true); + + // initial capacity is 1 in setup; calling with 1 should be a no-op + method.invoke(runner, new DefaultKubernetesTaskRunnerDynamicConfig(null, 1)); + + Field tpeField = KubernetesTaskRunner.class.getDeclaredField("tpe"); + tpeField.setAccessible(true); + ThreadPoolExecutor executor = (ThreadPoolExecutor) tpeField.get(runner); + + Assert.assertEquals(1, executor.getCorePoolSize()); + Assert.assertEquals(1, executor.getMaximumPoolSize()); + Assert.assertEquals(1, runner.getTotalCapacity()); + } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java index de8919e329de..31c6d7fdd6de 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/DefaultKubernetesTaskRunnerDynamicConfigTest.java @@ -31,17 +31,26 @@ public class DefaultKubernetesTaskRunnerDynamicConfigTest public void getPodTemplateSelectStrategyTest() { PodTemplateSelectStrategy strategy = new TaskTypePodTemplateSelectStrategy(); - DefaultKubernetesTaskRunnerDynamicConfig config = new DefaultKubernetesTaskRunnerDynamicConfig(strategy); + DefaultKubernetesTaskRunnerDynamicConfig config = new DefaultKubernetesTaskRunnerDynamicConfig(strategy, 1); Assert.assertEquals(strategy, config.getPodTemplateSelectStrategy()); } + @Test + public void getCapacityTest() + { + Integer capacity = 4; + DefaultKubernetesTaskRunnerDynamicConfig config = new DefaultKubernetesTaskRunnerDynamicConfig(null, 4); + + Assert.assertEquals(capacity, config.getCapacity()); + } + @Test public void testSerde() throws Exception { final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); PodTemplateSelectStrategy strategy = new TaskTypePodTemplateSelectStrategy(); - DefaultKubernetesTaskRunnerDynamicConfig config = new DefaultKubernetesTaskRunnerDynamicConfig(strategy); + DefaultKubernetesTaskRunnerDynamicConfig config = new DefaultKubernetesTaskRunnerDynamicConfig(strategy, 1); DefaultKubernetesTaskRunnerDynamicConfig config2 = objectMapper.readValue( objectMapper.writeValueAsBytes(config), diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java index b76b7eaf0cfe..c06056f01133 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java @@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -52,6 +53,10 @@ public void setUp() @Test public void setExecutionConfigSuccessfulUpdate() { + EasyMock.expect(configManager.watch( + KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, + KubernetesTaskRunnerDynamicConfig.class + )).andReturn(new AtomicReference<>(null)); KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( configManager, auditManager @@ -75,6 +80,10 @@ public void setExecutionConfigSuccessfulUpdate() @Test public void setExecutionConfigFailedUpdate() { + EasyMock.expect(configManager.watch( + KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, + KubernetesTaskRunnerDynamicConfig.class + )).andReturn(new AtomicReference<>(null)); KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( configManager, auditManager @@ -94,4 +103,116 @@ public void setExecutionConfigFailedUpdate() Response result = testedResource.setExecutionConfig(dynamicConfig, req); assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), result.getStatus()); } + + @Test + public void setExecutionConfig_MergeUsesCurrentCapacityWhenRequestCapacityNull() + { + KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( + configManager, + auditManager + ); + + PodTemplateSelectStrategy currentStrategy = new TaskTypePodTemplateSelectStrategy(); + KubernetesTaskRunnerDynamicConfig currentConfig = new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 5); + EasyMock.expect(configManager.watch( + KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, + KubernetesTaskRunnerDynamicConfig.class + )).andReturn(new AtomicReference<>(currentConfig)); + + PodTemplateSelectStrategy requestStrategy = new TaskTypePodTemplateSelectStrategy(); + KubernetesTaskRunnerDynamicConfig requestConfig = new DefaultKubernetesTaskRunnerDynamicConfig(requestStrategy, null); + + KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new DefaultKubernetesTaskRunnerDynamicConfig(requestStrategy, 5); + + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes(); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes(); + EasyMock.replay(req); + + EasyMock.expect(configManager.set( + KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, + expectedMergedConfig, + AuthorizationUtils.buildAuditInfo(req) + )).andReturn(ConfigManager.SetResult.ok()); + + EasyMock.replay(configManager, auditManager); + + Response result = testedResource.setExecutionConfig(requestConfig, req); + assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); + } + + @Test + public void setExecutionConfig_MergeUsesCurrentStrategyWhenRequestStrategyNull() + { + KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( + configManager, + auditManager + ); + + PodTemplateSelectStrategy currentStrategy = new TaskTypePodTemplateSelectStrategy(); + KubernetesTaskRunnerDynamicConfig currentConfig = new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 2); + EasyMock.expect(configManager.watch( + KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, + KubernetesTaskRunnerDynamicConfig.class + )).andReturn(new AtomicReference<>(currentConfig)); + + KubernetesTaskRunnerDynamicConfig requestConfig = new DefaultKubernetesTaskRunnerDynamicConfig(null, 7); + + KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 7); + + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes(); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes(); + EasyMock.replay(req); + + EasyMock.expect(configManager.set( + KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, + expectedMergedConfig, + AuthorizationUtils.buildAuditInfo(req) + )).andReturn(ConfigManager.SetResult.ok()); + + EasyMock.replay(configManager, auditManager); + + Response result = testedResource.setExecutionConfig(requestConfig, req); + assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); + } + + @Test + public void setExecutionConfig_MergeUsesCurrentWhenBothRequestFieldsNull() + { + KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( + configManager, + auditManager + ); + + PodTemplateSelectStrategy currentStrategy = new TaskTypePodTemplateSelectStrategy(); + KubernetesTaskRunnerDynamicConfig currentConfig = new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 9); + EasyMock.expect(configManager.watch( + KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, + KubernetesTaskRunnerDynamicConfig.class + )).andReturn(new AtomicReference<>(currentConfig)); + + KubernetesTaskRunnerDynamicConfig requestConfig = new DefaultKubernetesTaskRunnerDynamicConfig(null, null); + + KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 9); + + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes(); + EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes(); + EasyMock.replay(req); + + EasyMock.expect(configManager.set( + KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, + expectedMergedConfig, + AuthorizationUtils.buildAuditInfo(req) + )).andReturn(ConfigManager.SetResult.ok()); + + EasyMock.replay(configManager, auditManager); + + Response result = testedResource.setExecutionConfig(requestConfig, req); + assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); + } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java index 77a819dde9c1..b5fac1233c09 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskRunnerDynamicConfigTest.java @@ -36,7 +36,8 @@ public void testSerde() throws JsonProcessingException + " \"type\": \"default\",\n" + " \"podTemplateSelectStrategy\": {\n" + " \"type\": \"default\"\n" - + " }\n" + + " },\n" + + " \"capacity\": 3\n" + "}"; KubernetesTaskRunnerDynamicConfig deserialized = jsonMapper.readValue( @@ -45,6 +46,7 @@ public void testSerde() throws JsonProcessingException ); PodTemplateSelectStrategy selectStrategy = deserialized.getPodTemplateSelectStrategy(); Assert.assertTrue(selectStrategy instanceof TaskTypePodTemplateSelectStrategy); + Assert.assertEquals(Integer.valueOf(3), deserialized.getCapacity()); json = "{\n" + " \"type\": \"default\",\n" @@ -72,5 +74,14 @@ public void testSerde() throws JsonProcessingException selectStrategy = deserialized.getPodTemplateSelectStrategy(); Assert.assertTrue(selectStrategy instanceof SelectorBasedPodTemplateSelectStrategy); Assert.assertEquals(2, ((SelectorBasedPodTemplateSelectStrategy) selectStrategy).getSelectors().size()); + Assert.assertNull(deserialized.getCapacity()); + + json = "{\n" + + " \"type\": \"default\",\n" + + " \"capacity\": 12" + + "}"; + deserialized = jsonMapper.readValue(json, KubernetesTaskRunnerDynamicConfig.class); + Assert.assertEquals(Integer.valueOf(12), deserialized.getCapacity()); + Assert.assertNull(deserialized.getPodTemplateSelectStrategy()); } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java index 33f8349c6199..016f3280a472 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; import org.apache.druid.k8s.overlord.common.JobResponse; import org.apache.druid.k8s.overlord.common.K8sTaskId; @@ -110,9 +111,9 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception PodSpec podSpec = K8sTestUtils.getDummyPodSpec(); Task task = K8sTestUtils.getTask(); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("default") - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("default") + .build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( k8sClient, config, diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java index 106a98fa6a8e..1052c97c5377 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DynamicConfigPodTemplateSelectorTest.java @@ -29,6 +29,9 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerEffectiveConfig; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.execution.DefaultKubernetesTaskRunnerDynamicConfig; import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig; @@ -53,14 +56,16 @@ public class DynamicConfigPodTemplateSelectorTest private Path tempDir; private ObjectMapper mapper; private PodTemplate podTemplateSpec; - private Supplier dynamicConfigRef; + private KubernetesTaskRunnerEffectiveConfig effectiveConfig; @BeforeEach public void setup() { mapper = new TestUtils().getTestObjectMapper(); podTemplateSpec = K8sTestUtils.fileToResource("basePodTemplate.yaml", PodTemplate.class); - dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY); + Supplier dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(KubernetesTaskRunnerDynamicConfig.DEFAULT_STRATEGY, 1); + KubernetesTaskRunnerStaticConfig staticConfig = KubernetesTaskRunnerConfig.builder().build(); + effectiveConfig = new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicConfigRef); } @Test @@ -71,7 +76,7 @@ public void test_fromTask_withoutBasePodTemplateInRuntimeProperites_raisesIAE() IAE.class, () -> new DynamicConfigPodTemplateSelector( new Properties(), - dynamicConfigRef + effectiveConfig ) ); Assertions.assertEquals( @@ -93,7 +98,7 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites_withEmptyFile_r IAE.class, () -> new DynamicConfigPodTemplateSelector( props, - dynamicConfigRef + effectiveConfig ) ); @@ -111,7 +116,7 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites() throws IOExce DynamicConfigPodTemplateSelector adapter = new DynamicConfigPodTemplateSelector( props, - dynamicConfigRef + effectiveConfig ); Task task = new NoopTask("id", "id", "datasource", 0, 0, null); @@ -146,7 +151,7 @@ public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperties() throws DynamicConfigPodTemplateSelector selector = new DynamicConfigPodTemplateSelector( props, - dynamicConfigRef + effectiveConfig ); Task kafkaTask = new NoopTask("id", "id", "datasource", 0, 0, null) @@ -191,9 +196,10 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperties_withEmptyFile_r props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", noopTemplatePath.toString()); Assert.assertThrows(IAE.class, () -> new DynamicConfigPodTemplateSelector( - props, - dynamicConfigRef - )); + props, + effectiveConfig + ) + ); } @Test @@ -208,7 +214,7 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOExce DynamicConfigPodTemplateSelector podTemplateSelector = new DynamicConfigPodTemplateSelector( props, - dynamicConfigRef + effectiveConfig ); Task task = new NoopTask("id", "id", "datasource", 0, 0, null); @@ -242,16 +248,21 @@ public void test_fromTask_matchPodTemplateBasedOnStrategy() throws IOException Properties props = new Properties(); props.setProperty("druid.indexer.runner.k8s.podTemplate.base", baseTemplatePath.toString()); props.setProperty("druid.indexer.runner.k8s.podTemplate.lowThroughput", lowThroughputTemplatePath.toString()); - dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig(new SelectorBasedPodTemplateSelectStrategy( - Collections.singletonList( - new Selector("lowThroughput", null, null, Sets.newSet(dataSource) + Supplier dynamicConfigRef = () -> new DefaultKubernetesTaskRunnerDynamicConfig( + new SelectorBasedPodTemplateSelectStrategy( + Collections.singletonList( + new Selector("lowThroughput", null, null, Sets.newSet(dataSource) + ) ) - ) - )); + ), 1 + ); + + KubernetesTaskRunnerStaticConfig staticConfig = KubernetesTaskRunnerConfig.builder().build(); + effectiveConfig = new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicConfigRef); DynamicConfigPodTemplateSelector podTemplateSelector = new DynamicConfigPodTemplateSelector( props, - dynamicConfigRef + effectiveConfig ); Task taskWithMatchedDatasource = new NoopTask("id", "id", dataSource, 0, 0, null); @@ -276,7 +287,7 @@ public void test_fromTask_LazyLoadInvalidPodTemplateThrowsError() throws IOExcep DynamicConfigPodTemplateSelector adapter = new DynamicConfigPodTemplateSelector( props, - dynamicConfigRef + effectiveConfig ); Task task = new NoopTask("id", "id", "datasource", 0, 0, null); @@ -311,7 +322,7 @@ public void test_fromTask_LazyLoadPodTemplateChangesPodSpecs() throws IOExceptio DynamicConfigPodTemplateSelector adapter = new DynamicConfigPodTemplateSelector( props, - dynamicConfigRef + effectiveConfig ); Task task = new NoopTask("id", "id", "datasource", 0, 0, null); diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java index 16cccf04e280..552d7201fd05 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java @@ -52,6 +52,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTestUtils; @@ -138,12 +139,12 @@ public PodSpec getSpec() } }; - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("test") - .withOverlordNamespace("test_different") - .withAnnotations(ImmutableMap.of("annotation_key", "annotation_value")) - .withLabels(ImmutableMap.of("label_key", "label_value")) - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("test") + .withOverlordNamespace("test_different") + .withAnnotations(ImmutableMap.of("annotation_key", "annotation_value")) + .withLabels(ImmutableMap.of("label_key", "label_value")) + .build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, config, @@ -175,9 +176,9 @@ public void serializingAndDeserializingATask() throws IOException { // given a task create a k8s job TestKubernetesClient testClient = new TestKubernetesClient(client); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("test") - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("test") + .build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, config, @@ -229,9 +230,9 @@ public PodSpec getSpec() } }; - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("test") - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("test") + .build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, config, @@ -277,9 +278,9 @@ public PodSpec getSpec() public void toTask_useTaskPayloadManager() throws IOException { TestKubernetesClient testClient = new TestKubernetesClient(client); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("test") - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("test") + .build(); Task taskInTaskPayloadManager = K8sTestUtils.getTask(); TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class); Mockito.when(mockTestLogs.streamTaskPayload("ID")).thenReturn(com.google.common.base.Optional.of( @@ -309,7 +310,7 @@ public void toTask_useTaskPayloadManager() throws IOException public void getTaskId() { TestKubernetesClient testClient = new TestKubernetesClient(client); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder().build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder().build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, config, @@ -331,7 +332,7 @@ public void getTaskId() public void getTaskId_noAnnotations() { TestKubernetesClient testClient = new TestKubernetesClient(client); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder().build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder().build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, config, @@ -353,7 +354,7 @@ public void getTaskId_noAnnotations() public void getTaskId_missingTaskIdAnnotation() { TestKubernetesClient testClient = new TestKubernetesClient(client); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder().build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder().build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, config, @@ -458,9 +459,9 @@ void testAddingMonitors() throws IOException new File("/tmp/"), 0 ); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("test") - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("test") + .build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, config, @@ -482,9 +483,9 @@ void testAddingMonitors() throws IOException // we have an override, but nothing in the overlord config = KubernetesTaskRunnerConfig.builder() - .withNamespace("test") - .withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor")) - .build(); + .withNamespace("test") + .withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor")) + .build(); adapter = new SingleContainerTaskAdapter( testClient, config, @@ -532,9 +533,9 @@ void testEphemeralStorageIsRespected() throws IOException { TestKubernetesClient testClient = new TestKubernetesClient(client); Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("namespace") - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("namespace") + .build(); SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, @@ -582,9 +583,9 @@ void testProbesRemoved() throws IOException { TestKubernetesClient testClient = new TestKubernetesClient(client); Pod pod = K8sTestUtils.fileToResource("probesPodSpec.yaml", Pod.class); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("test") - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("test") + .build(); SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, @@ -635,11 +636,11 @@ void testCPUResourceIsRespected() throws IOException List javaOpts = new ArrayList<>(); javaOpts.add("-Xms1G -Xmx2G -XX:MaxDirectMemorySize=3G"); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("namespace") - .withJavaOptsArray(javaOpts) - .withCpuCore(2000) - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("namespace") + .withJavaOptsArray(javaOpts) + .withCpuCore(2000) + .build(); SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java index 48330d625069..477758d9ac4e 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.common.PeonCommandContext; import org.apache.druid.k8s.overlord.common.TestKubernetesClient; @@ -87,9 +88,9 @@ public void testMultiContainerSupport() throws IOException { TestKubernetesClient testClient = new TestKubernetesClient(client); Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", Pod.class); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("namespace") - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("namespace") + .build(); MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter( testClient, config, @@ -138,10 +139,10 @@ public void testMultiContainerSupportWithNamedContainer() throws IOException { TestKubernetesClient testClient = new TestKubernetesClient(client); Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpecOrder.yaml", Pod.class); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("namespace") - .withPrimaryContainerName("primary") - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("namespace") + .withPrimaryContainerName("primary") + .build(); MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter( testClient, config, @@ -192,11 +193,11 @@ public void testOverridingPeonMonitors() throws IOException { TestKubernetesClient testClient = new TestKubernetesClient(client); Pod pod = K8sTestUtils.fileToResource("podSpec.yaml", Pod.class); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("namespace") - .withPrimaryContainerName("primary") - .withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor")) - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("namespace") + .withPrimaryContainerName("primary") + .withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor")) + .build(); MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter( testClient, diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java index 45f2c356cfe0..01d99a896f62 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java @@ -33,6 +33,7 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig; import org.apache.druid.k8s.overlord.common.Base64Compression; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; import org.apache.druid.k8s.overlord.common.K8sTaskId; @@ -58,7 +59,7 @@ public class PodTemplateTaskAdapterTest { - private KubernetesTaskRunnerConfig taskRunnerConfig; + private KubernetesTaskRunnerStaticConfig taskRunnerConfig; private PodTemplate podTemplateSpec; private TaskConfig taskConfig; private DruidNode node; diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java index d7d74cf1812a..832a7292304f 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; +import org.apache.druid.k8s.overlord.KubernetesTaskRunnerStaticConfig; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.common.PeonCommandContext; import org.apache.druid.k8s.overlord.common.TestKubernetesClient; @@ -86,9 +87,9 @@ public void testSingleContainerSupport() throws IOException { TestKubernetesClient testClient = new TestKubernetesClient(client); Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", Pod.class); - KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() - .withNamespace("namespace") - .build(); + KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("namespace") + .build(); SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, config, diff --git a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java index 87cbad379410..12265df31e27 100644 --- a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java +++ b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java @@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; /** */ @@ -219,6 +220,26 @@ public SetResult set(final String key, final ConfigSerde serde, @Nullable } } + public boolean addListener(String configKey, String listenerKey, Consumer listener) + { + ConfigHolder holder = watchedConfigs.get(configKey); + if (holder == null) { + log.warn("ConfigHolder not found for configKey[%s]", configKey); + return false; + } + return holder.addListener(listenerKey, listener); + } + + public boolean removeListener(String configKey, String listenerKey, Consumer listener) + { + ConfigHolder holder = watchedConfigs.get(configKey); + if (holder == null) { + log.warn("ConfigHolder not found for configKey[%s]", configKey); + return false; + } + return holder.removeListener(listenerKey, listener); + } + @Nonnull private MetadataCASUpdate createMetadataCASUpdate( String keyValue, @@ -285,6 +306,7 @@ private static class ConfigHolder private final AtomicReference rawBytes; private final ConfigSerde serde; private final AtomicReference reference; + private final ConcurrentMap> listeners; ConfigHolder( byte[] rawBytes, @@ -294,6 +316,7 @@ private static class ConfigHolder this.rawBytes = new AtomicReference<>(rawBytes); this.serde = serde; this.reference = new AtomicReference<>(serde.deserialize(rawBytes)); + this.listeners = new ConcurrentHashMap<>(); } public AtomicReference getReference() @@ -306,10 +329,38 @@ public boolean swapIfNew(byte[] newBytes) if (!Arrays.equals(newBytes, rawBytes.get())) { reference.set(serde.deserialize(newBytes)); rawBytes.set(newBytes); + listeners.forEach((key, listener) -> { + try { + listener.accept(reference.get()); + } + catch (Exception e) { + log.warn(e, "Exception when calling listener for key[%s]", key); + } + }); return true; } return false; } + + public boolean addListener(String key, Consumer listener) + { + Consumer val = listeners.putIfAbsent(key, listener); + if (val != null) { + log.warn("Listener key[%s] already exists", key); + return false; + } + return true; + } + + public boolean removeListener(String key, Consumer listener) + { + boolean isRemoved = listeners.remove(key, listener); + if (!isRemoved) { + log.warn("Listener key[%s] not found", key); + return false; + } + return true; + } } private class PollingCallable implements Callable