Skip to content

Commit 4b1bdf5

Browse files
ForVichuangxiaopingRD
authored andcommitted
[SPARK-53324][K8S] Introduce pending pod limit per ResourceProfile
### What changes were proposed in this pull request? Introducing a limit for pending PODs (newly created/requested executors included) per resource profile. There exists a config for a global limit for all resource profiles, but here we add a limit per resource profile. apache#33492 does a lot of the plumbing for us already, counting newly created and pending pods, and we can just pass through the pending pods per resource profile, and limit the number of requests we were going to make for pods for that resource profile to min(previousRequest, maxPodsPerRP). ### Why are the changes needed? For multiple resource profile use cases you can set limits that apply at the resource profile level, instead of globally. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? unit tests added ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51913 from ForVic/vsunderl/max_pending_pods_per_rpid. Authored-by: ForVic <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent ecd6e33 commit 4b1bdf5

File tree

3 files changed

+96
-4
lines changed

3 files changed

+96
-4
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,18 @@ private[spark] object Config extends Logging {
778778
.checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer")
779779
.createWithDefault(Int.MaxValue)
780780

781+
val KUBERNETES_MAX_PENDING_PODS_PER_RPID =
782+
ConfigBuilder("spark.kubernetes.allocation.maxPendingPodsPerRp")
783+
.doc("Maximum number of pending PODs allowed per resource profile ID during executor " +
784+
"allocation. This provides finer-grained control over pending pods by limiting them " +
785+
"per resource profile rather than globally. When set, this limit is enforced " +
786+
"independently for each resource profile ID.")
787+
.version("4.1.0")
788+
.intConf
789+
.checkValue(value => value > 0,
790+
"Maximum number of pending pods per rp id should be a positive integer")
791+
.createWithDefault(Int.MaxValue)
792+
781793
val KUBERNETES_EXECUTOR_SNAPSHOTS_SUBSCRIBERS_GRACE_PERIOD =
782794
ConfigBuilder("spark.kubernetes.executorSnapshotsSubscribersShutdownGracePeriod")
783795
.doc("Time to wait for graceful shutdown kubernetes-executor-snapshots-subscribers " +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,15 @@ class ExecutorPodsAllocator(
7373

7474
protected val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)
7575

76+
protected val maxPendingPodsPerRpid = conf.get(KUBERNETES_MAX_PENDING_PODS_PER_RPID)
77+
78+
// If maxPendingPodsPerRpid is set, ensure it's not greater than maxPendingPods
79+
if (maxPendingPodsPerRpid != Int.MaxValue) {
80+
require(maxPendingPodsPerRpid <= maxPendingPods,
81+
s"Maximum pending pods per resource profile ID ($maxPendingPodsPerRpid) must be less than " +
82+
s"or equal to maximum pending pods ($maxPendingPods).")
83+
}
84+
7685
protected val podCreationTimeout = math.max(
7786
podAllocationDelay * 5,
7887
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
@@ -350,7 +359,7 @@ class ExecutorPodsAllocator(
350359
}
351360
}
352361
if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum) {
353-
Some(rpId, podCountForRpId, targetNum)
362+
Some(rpId, podCountForRpId, targetNum, notRunningPodCountForRpId)
354363
} else {
355364
// for this resource profile we do not request more PODs
356365
None
@@ -364,10 +373,13 @@ class ExecutorPodsAllocator(
364373
if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0 &&
365374
!(snapshots.isEmpty && podAllocOnPVC && maxPVCs <= PVC_COUNTER.get())) {
366375
ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId, remainingSlotFromPendingPods)
367-
.foreach { case ((rpId, podCountForRpId, targetNum), sharedSlotFromPendingPods) =>
376+
.foreach { case ((rpId, podCountForRpId, targetNum, pendingPodCountForRpId),
377+
sharedSlotFromPendingPods) =>
378+
val remainingSlotsForRpId = maxPendingPodsPerRpid - pendingPodCountForRpId
368379
val numMissingPodsForRpId = targetNum - podCountForRpId
369-
val numExecutorsToAllocate =
370-
math.min(math.min(numMissingPodsForRpId, podAllocationSize), sharedSlotFromPendingPods)
380+
val numExecutorsToAllocate = Seq(numMissingPodsForRpId, podAllocationSize,
381+
sharedSlotFromPendingPods, remainingSlotsForRpId).min
382+
371383
logInfo(log"Going to request ${MDC(LogKeys.COUNT, numExecutorsToAllocate)} executors from" +
372384
log" Kubernetes for ResourceProfile Id: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}, " +
373385
log"target: ${MDC(LogKeys.NUM_POD_TARGET, targetNum)}, " +

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,74 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
238238
verify(labeledPods, times(1)).delete()
239239
}
240240

241+
test("pending pod limit per resource profile ID") {
242+
when(podOperations
243+
.withField("status.phase", "Pending"))
244+
.thenReturn(podOperations)
245+
when(podOperations
246+
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
247+
.thenReturn(podOperations)
248+
when(podOperations
249+
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
250+
.thenReturn(podOperations)
251+
when(podOperations
252+
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any(classOf[Array[String]]): _*))
253+
.thenReturn(podOperations)
254+
255+
val startTime = Instant.now.toEpochMilli
256+
waitForExecutorPodsClock.setTime(startTime)
257+
258+
// Two resource profiles, default and rp
259+
val rpb = new ResourceProfileBuilder()
260+
val ereq = new ExecutorResourceRequests()
261+
val treq = new TaskResourceRequests()
262+
ereq.cores(4).memory("2g")
263+
treq.cpus(2)
264+
rpb.require(ereq).require(treq)
265+
val rp = rpb.build()
266+
267+
val confWithLowMaxPendingPodsPerRpId = conf.clone
268+
.set(KUBERNETES_MAX_PENDING_PODS_PER_RPID.key, "2")
269+
podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPodsPerRpId, secMgr,
270+
executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
271+
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
272+
273+
// Request more than the max per rp for one rp
274+
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3))
275+
// 2 for default, and 2 for rp
276+
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4)
277+
verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id))
278+
verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, defaultProfile.id))
279+
verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id))
280+
verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, rp.id))
281+
verify(podResource, times(4)).create()
282+
283+
// Mark executor 2 and 3 as pending, leaving 2 as newly created but this does not free up
284+
// any pending pod slot so no new pod is requested
285+
snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id))
286+
snapshotsStore.updatePod(pendingExecutor(3, rp.id))
287+
snapshotsStore.notifySubscribers()
288+
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4)
289+
verify(podResource, times(4)).create()
290+
verify(labeledPods, never()).delete()
291+
292+
// Downscaling for defaultProfile resource ID with 1 executor to make one free slot
293+
// for pendings pods, the non default should still be limited by the max pending pods per rp.
294+
waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
295+
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 3))
296+
snapshotsStore.notifySubscribers()
297+
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
298+
verify(labeledPods, times(1)).delete()
299+
300+
// Make one pod running from non-default rp so we have one more slot for pending pods.
301+
snapshotsStore.updatePod(runningExecutor(3, rp.id))
302+
snapshotsStore.updatePod(pendingExecutor(4, rp.id))
303+
snapshotsStore.notifySubscribers()
304+
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
305+
verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, rp.id))
306+
verify(labeledPods, times(1)).delete()
307+
}
308+
241309
test("Initially request executors in batches. Do not request another batch if the" +
242310
" first has not finished.") {
243311
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))

0 commit comments

Comments
 (0)