diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 167c460536ac9..6ff934bac76dc 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1564,6 +1564,12 @@ ], "sqlState" : "42702" }, + "EXECUTOR_KUBERNETES_SERVICE_REQUIRES_BLOCK_MANAGER_PORT" : { + "message" : [ + "Enabling the executor Kubernetes service requires to be set to a positive number, for instance ." + ], + "sqlState" : "42000" + }, "EXEC_IMMEDIATE_DUPLICATE_ARGUMENT_ALIASES" : { "message" : [ "The USING clause of this EXECUTE IMMEDIATE command contained multiple arguments with same alias (), which is invalid; please update the command to specify unique aliases and then try it again." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index a56043b4912f2..92c18fcb858b5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -435,6 +435,20 @@ private[spark] object Config extends Logging { .toSequence .createWithDefault(Nil) + val KUBERNETES_EXECUTOR_ENABLE_SERVICE = + ConfigBuilder("spark.kubernetes.executor.enableService") + .doc("If a Kubernetes service is created for the executor. " + + "A Kubernetes service is created for the executor pod that allows to connect to executor " + + "ports via the Kubernetes service instead of the pod host IP. Once the executor got " + + "decommissioned, connecting to such ports instantly fails with 'connection refused'. " + + "Connection to the port via the pod host IP instead fails with a 'connection timeout' " + + "after NETWORK_TIMEOUT, which defaults to 2 minutes. " + + "The executor kubernetes service provides access to the executor's block manager, so " + + "BLOCK_MANAGER_PORT has to be given a value greater than zero.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL = ConfigBuilder("spark.kubernetes.executor.decommissionLabel") .doc("Label to apply to a pod which is being decommissioned." + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorServiceFeatureStep.scala new file mode 100644 index 0000000000000..00eedfde83a14 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorServiceFeatureStep.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.jdk.CollectionConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, ServiceBuilder} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod} +import org.apache.spark.internal.config.{BLOCK_MANAGER_PORT, SHUFFLE_SERVICE_PORT} + +class ExecutorServiceFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep { + private val spark_app_selector_label = "spark-app-selector" + private val spark_exec_id_label = "spark-exec-id" + private val service_selector_labels = Set(spark_app_selector_label, spark_exec_id_label) + private lazy val selector = conf.labels + .filter { case (key, _) => service_selector_labels.contains(key) } + + private lazy val sparkAppSelector = getLabel(spark_app_selector_label) + private lazy val sparkExecId = getLabel(spark_exec_id_label) + // name length is 8 + 38 + 6 + 10 = 62 + // which fits in KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH = 63 + private lazy val serviceName = s"svc-$sparkAppSelector-exec-$sparkExecId" + + // The executor kubernetes services requires BLOCK_MANAGER_PORT to be set + private val blockManagerPortName = "spark-block-manager" + private val blockManagerPort = conf.sparkConf.get(BLOCK_MANAGER_PORT) + SparkException.require(blockManagerPort > 0, + "EXECUTOR_KUBERNETES_SERVICE_REQUIRES_BLOCK_MANAGER_PORT", + Map( + "blockManagerPortConfigKey" -> BLOCK_MANAGER_PORT.key, + "defaultShuffleServicePort" -> SHUFFLE_SERVICE_PORT.defaultValue.get.toString)); + + private def getLabel(label: String): String = { + val value = conf.labels.get(label) + value.getOrElse( + throw new SparkException(s"This feature step requires label $label") + ) + } + + override def configurePod(pod: SparkPod): SparkPod = { + SparkPod( + pod.pod, + // tell the executor entry point its Kubernetes service name + new ContainerBuilder(pod.container) + .addNewEnv() + .withName("EXECUTOR_SERVICE_NAME") + .withValue(serviceName) + .endEnv() + .build()) + } + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + val service = new ServiceBuilder() + .withNewMetadata() + .withName(serviceName) + .endMetadata() + .withNewSpec() + .withSelector(selector.asJava) + .addNewPort() + .withName(blockManagerPortName) + .withPort(blockManagerPort) + .withNewTargetPort(blockManagerPort) + .endPort() + .endSpec() + .build() + + Seq(service) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index e84a0c97724c2..ec9d23447b88c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -447,6 +447,7 @@ class ExecutorPodsAllocator( kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create() try { addOwnerReference(createdExecutorPod, resources) + kubernetesClient.resourceList(resources: _*).forceConflicts().serverSideApply() resources .filter(_.getKind == "PersistentVolumeClaim") .foreach { resource => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 2253c07df116e..b14451489e0ea 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -65,6 +65,12 @@ private[spark] class KubernetesExecutorBuilder { } } + val optionalFeatures = Seq( + Some(conf.get(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE)) + .filter(enabled => enabled) + .map(_ => new ExecutorServiceFeatureStep(conf)) + ).flatten + val allFeatures = Seq( new BasicExecutorFeatureStep(conf, secMgr, resourceProfile), new ExecutorKubernetesCredentialsFeatureStep(conf), @@ -72,7 +78,7 @@ private[spark] class KubernetesExecutorBuilder { new EnvSecretsFeatureStep(conf), new MountVolumesFeatureStep(conf), new HadoopConfExecutorFeatureStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf)) ++ optionalFeatures ++ userFeatures val features = allFeatures.filterNot(f => conf.get(Config.KUBERNETES_EXECUTOR_POD_EXCLUDED_FEATURE_STEPS).contains(f.getClass.getName)) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index d4f7b9f67fd6f..6563777def96c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._ import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException} -import io.fabric8.kubernetes.client.dsl.PodResource +import io.fabric8.kubernetes.client.dsl.{NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, ServerSideApplicable} import org.mockito.{Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.{any, anyString, eq => meq} import org.mockito.Mockito.{never, times, verify, when} @@ -33,6 +33,7 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester._ +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec} @@ -142,6 +143,11 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + val apl = mock[NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[HasMetadata]] + val ssa = mock[ServerSideApplicable[java.util.List[HasMetadata]]] + when(apl.forceConflicts()).thenReturn(ssa) + when(kubernetesClient.resourceList()).thenReturn(apl) + when(kubernetesClient.resourceList(any[HasMetadata]())).thenReturn(apl) when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace) when(pvcWithNamespace.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index 5f0f04da9196b..92275d1d38fd9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -16,12 +16,16 @@ */ package org.apache.spark.scheduler.cluster.k8s +import scala.jdk.CollectionConverters.IterableHasAsScala + +import io.fabric8.kubernetes.api.model.Service import io.fabric8.kubernetes.client.KubernetesClient +import org.mockito.Mockito.mock -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.{SecurityManager, SparkConf, SparkIllegalArgumentException} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features.KubernetesExecutorCustomFeatureConfigStep -import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.internal.config.{BLOCK_MANAGER_PORT, ConfigEntry} import org.apache.spark.resource.ResourceProfile class KubernetesExecutorBuilderSuite extends PodBuilderSuite { @@ -65,6 +69,57 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite { val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf) new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, defaultProfile).pod } + + test("SPARK-XXXXX: check executor kubernetes spec with service disabled by default") { + val sparkConf = baseConf + val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf) + val secMgr = new SecurityManager(sparkConf) + val client = mock(classOf[KubernetesClient]) + val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val spec = new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile) + + val containerEnvs = spec.pod.container.getEnv.asScala + assert(!containerEnvs.exists(_.getName === "EXECUTOR_SERVICE_NAME")) + + assert(spec.executorKubernetesResources.size === 0) + } + + test("SPARK-XXXXX: check executor kubernetes spec with service enabled") { + val sparkConf = baseConf.clone + .set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true) + .set(BLOCK_MANAGER_PORT, 1234) + val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf) + val secMgr = new SecurityManager(sparkConf) + val client = mock(classOf[KubernetesClient]) + val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val spec = new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile) + + val containerEnvs = spec.pod.container.getEnv.asScala + assert(containerEnvs.exists(_.getName === "EXECUTOR_SERVICE_NAME")) + val containerEnv = containerEnvs.filter(_.getName === "EXECUTOR_SERVICE_NAME").head + assert(containerEnv.getValue === "svc-appId-exec-1") + + assert(spec.executorKubernetesResources.size === 1) + val resource = spec.executorKubernetesResources.head + assert(resource.getKind === "Service") + val service = resource.asInstanceOf[Service] + assert(service.getMetadata.getName === "svc-appId-exec-1") + assert(service.getSpec.getPorts.size() === 1) + val port = service.getSpec.getPorts.get(0) + assert(port.getName === "spark-block-manager") + assert(port.getPort === 1234) + } + + test("SPARK-XXXXX: check executor kubernetes service requires block manager port") { + val sparkConf = baseConf.clone.set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true) + val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf) + val secMgr = new SecurityManager(sparkConf) + val client = mock(classOf[KubernetesClient]) + val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + assertThrows[SparkIllegalArgumentException] { + new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile) + } + } } /** diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index f9561b9aa4ed5..a782f3bcb7d14 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -102,7 +102,8 @@ case "$1" in --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID - --hostname $SPARK_EXECUTOR_POD_IP + ${EXECUTOR_SERVICE_NAME:+--bind-address $SPARK_EXECUTOR_POD_IP} + --hostname ${EXECUTOR_SERVICE_NAME:-$SPARK_EXECUTOR_POD_IP} --resourceProfileId $SPARK_RESOURCE_PROFILE_ID --podName $SPARK_EXECUTOR_POD_NAME )