Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions docs/development/extensions-core/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ Other configurations required are:
Druid operators can dynamically tune certain features within this extension. You don't need to restart the Overlord
service for these changes to take effect.

Druid can dynamically tune [pod template selection](#pod-template-selection), which allows you to configure the pod
template based on the task to be run. To enable dynamic pod template selection, first configure the
[custom template pod adapter](#custom-template-pod-adapter).
Druid can dynamically tune [pod template selection](#pod-template-selection) and [capacity](#properties). Where capacity refers to `druid.indexer.runner.capacity`.

Pod template selection allows you to configure the pod template based on the task to be run. To enable dynamic pod template selection, first configure the [custom template pod adapter](#custom-template-pod-adapter).

Use the following APIs to view and update the dynamic configuration for the Kubernetes task runner.

Expand Down Expand Up @@ -126,7 +126,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
"type": ["index_kafka"]
}
]
}
},
"capacity": 12
}
```
</details>
Expand All @@ -135,6 +136,8 @@ Host: http://ROUTER_IP:ROUTER_PORT

Updates the dynamic configuration for the Kubernetes Task Runner

Note: Both `podTemplateSelectStrategy` and `capacity` are optional fields. A POST request may include either, both, or neither.

##### URL

`POST` `/druid/indexer/v1/k8s/taskrunner/executionconfig`
Expand Down Expand Up @@ -193,7 +196,8 @@ curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskrunner/executionconf
"type": ["index_kafka"]
}
]
}
},
"capacity": 6
}'
```

Expand Down Expand Up @@ -225,7 +229,8 @@ Content-Type: application/json
"type": ["index_kafka"]
}
]
}
},
"capacity": 6
}
```

Expand Down Expand Up @@ -309,7 +314,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"comment": "",
"ip": "127.0.0.1"
},
"payload": "{\"type\": \"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"}",
"payload": "{\"type\": \"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"},\"capacity\":6",
"auditTime": "2024-06-13T20:59:51.622Z"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class KubernetesOverlordModule implements DruidModule
public void configure(Binder binder)
{
// druid.indexer.runner.type=k8s
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerStaticConfig.class);
JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX, KubernetesAndWorkerTaskRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
JacksonConfigProvider.bind(binder, KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class, null);
Expand Down Expand Up @@ -150,10 +150,20 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, JDK_HTTPCLIENT_PROPERITES_PREFIX, DruidKubernetesJdkHttpClientConfig.class);
}

@Provides
@LazySingleton
public KubernetesTaskRunnerEffectiveConfig provideEffectiveConfig(
KubernetesTaskRunnerStaticConfig staticConfig,
Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigSupplier
)
{
return new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicConfigSupplier);
}

@Provides
@LazySingleton
public DruidKubernetesClient makeKubernetesClient(
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerConfig,
DruidKubernetesHttpClientFactory httpClientFactory,
Lifecycle lifecycle
)
Expand Down Expand Up @@ -217,7 +227,7 @@ TaskRunnerFactory<? extends WorkerTaskRunner> provideWorkerTaskRunner(
TaskAdapter provideTaskAdapter(
DruidKubernetesClient client,
Properties properties,
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig,
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
@Self DruidNode druidNode,
Expand Down Expand Up @@ -260,7 +270,7 @@ TaskAdapter provideTaskAdapter(
druidNode,
smileMapper,
taskLogs,
new DynamicConfigPodTemplateSelector(properties, dynamicConfigRef)
new DynamicConfigPodTemplateSelector(properties, kubernetesTaskRunnerConfig)
);
} else {
return new SingleContainerTaskAdapter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
Expand All @@ -44,6 +45,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
Expand All @@ -56,6 +58,7 @@
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.jboss.netty.handler.codec.http.HttpMethod;
Expand All @@ -76,7 +79,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -100,6 +105,7 @@
public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
{
private static final EmittingLogger log = new EmittingLogger(KubernetesTaskRunner.class);
private static final String OBSERVER_KEY = "k8s-task-runner-capacity-%s";
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();

// to cleanup old jobs that might not have been deleted.
Expand All @@ -111,19 +117,23 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
private final KubernetesPeonClient client;
private final KubernetesTaskRunnerConfig config;
private final ListeningExecutorService exec;
private final ThreadPoolExecutor tpe;
private final HttpClient httpClient;
private final PeonLifecycleFactory peonLifecycleFactory;
private final ServiceEmitter emitter;
// currently worker categories aren't supported, so it's hardcoded.
protected static final String WORKER_CATEGORY = "_k8s_worker_category";

private int currentCapacity;

public KubernetesTaskRunner(
TaskAdapter adapter,
KubernetesTaskRunnerConfig config,
KubernetesPeonClient client,
HttpClient httpClient,
PeonLifecycleFactory peonLifecycleFactory,
ServiceEmitter emitter
ServiceEmitter emitter,
ConfigManager configManager
)
{
this.adapter = adapter;
Expand All @@ -132,10 +142,12 @@ public KubernetesTaskRunner(
this.httpClient = httpClient;
this.peonLifecycleFactory = peonLifecycleFactory;
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
this.exec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
);
this.emitter = emitter;

this.currentCapacity = config.getCapacity();
this.tpe = new ThreadPoolExecutor(currentCapacity, currentCapacity, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Execs.makeThreadFactory("k8s-task-runner-%d", null));
this.exec = MoreExecutors.listeningDecorator(this.tpe);
configManager.addListener(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, StringUtils.format(OBSERVER_KEY, Thread.currentThread().getId()), this::syncCapacityWithDynamicConfig);
}

@Override
Expand Down Expand Up @@ -179,6 +191,24 @@ protected KubernetesWorkItem joinAsync(Task task)
}
}

private void syncCapacityWithDynamicConfig(KubernetesTaskRunnerDynamicConfig config)
{
int newCapacity = config.getCapacity();
if (newCapacity == currentCapacity) {
return;
}
log.info("Adjusting k8s task runner capacity from [%d] to [%d]", currentCapacity, newCapacity);
// maximum pool size must always be greater than or equal to the core pool size
if (newCapacity < currentCapacity) {
tpe.setCorePoolSize(newCapacity);
tpe.setMaximumPoolSize(newCapacity);
} else {
tpe.setMaximumPoolSize(newCapacity);
tpe.setCorePoolSize(newCapacity);
}
currentCapacity = newCapacity;
}

private TaskStatus runTask(Task task)
{
return doTask(task, true);
Expand Down Expand Up @@ -294,7 +324,7 @@ public void shutdown(String taskid, String reason)
synchronized (tasks) {
tasks.remove(taskid);
}

}

@Override
Expand Down Expand Up @@ -420,7 +450,7 @@ public void stop()
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
return ImmutableMap.of(WORKER_CATEGORY, (long) config.getCapacity());
return ImmutableMap.of(WORKER_CATEGORY, (long) currentCapacity);
}

@Override
Expand All @@ -438,13 +468,13 @@ public Optional<ScalingStats> getScalingStats()
@Override
public Map<String, Long> getIdleTaskSlotCount()
{
return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0, config.getCapacity() - tasks.size()));
return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0, currentCapacity - tasks.size()));
}

@Override
public Map<String, Long> getUsedTaskSlotCount()
{
return ImmutableMap.of(WORKER_CATEGORY, (long) Math.min(config.getCapacity(), tasks.size()));
return ImmutableMap.of(WORKER_CATEGORY, (long) Math.min(currentCapacity, tasks.size()));
}

@Override
Expand Down Expand Up @@ -535,7 +565,7 @@ public RunnerTaskState getRunnerTaskState(String taskId)
@Override
public int getTotalCapacity()
{
return config.getCapacity();
return currentCapacity;
}

@Override
Expand Down
Loading