Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import subprocess
import time
import typing
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import (Any, Callable, Dict, List, Literal, Optional, Set, Tuple,
Union)

import ijson

Expand Down Expand Up @@ -3136,7 +3137,11 @@ def filter_pods(namespace: str,
context: Optional[str],
tag_filters: Dict[str, str],
status_filters: Optional[List[str]] = None) -> Dict[str, Any]:
"""Filters pods by tags and status."""
"""Filters pods by tags and status.

Returned dict is sorted by name, with workers sorted by their numeric suffix.
This ensures consistent ordering for SSH configuration and other operations.
"""
non_included_pod_statuses = POD_STATUSES.copy()

field_selector = ''
Expand All @@ -3154,7 +3159,32 @@ def filter_pods(namespace: str,
pods = [
pod for pod in pod_list.items if pod.metadata.deletion_timestamp is None
]
return {pod.metadata.name: pod for pod in pods}

# Sort pods by name, with workers sorted by their numeric suffix.
# This ensures consistent ordering (e.g., cluster-head, cluster-worker1,
# cluster-worker2, cluster-worker3, ...) even when Kubernetes API
# returns them in arbitrary order. This works even if there were
# somehow pod names other than head/worker ones, and those end up at
# the end of the list.
def get_pod_sort_key(
pod: V1Pod
) -> Union[Tuple[Literal[0], str], Tuple[Literal[1], int], Tuple[Literal[2],
str]]:
name = pod.metadata.name
name_suffix = name.split('-')[-1]
if name_suffix == 'head':
return (0, name)
elif name_suffix.startswith('worker'):
try:
return (1, int(name_suffix.split('worker')[-1]))
except (ValueError, IndexError):
return (2, name)
else:
return (2, name)

sorted_pods = sorted(pods, key=get_pod_sort_key)

return {pod.metadata.name: pod for pod in sorted_pods}


def _remove_pod_annotation(pod: Any,
Expand Down
69 changes: 69 additions & 0 deletions tests/unit_tests/kubernetes/test_kubernetes_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import collections
import os
import re
import tempfile
from typing import Optional
import unittest
Expand Down Expand Up @@ -1913,3 +1914,71 @@ def track_calls(cloud,
assert len(custom_metadata_calls) >= 1
assert custom_metadata_calls[0]['cloud'] == 'ssh', \
"custom_metadata should use SSH cloud"


@pytest.mark.parametrize('unsorted_pod_names, expected_sorted_pod_names', [
([
'test-cluster-worker10', 'test-cluster-worker2', 'test-cluster-head',
'test-cluster-worker1', 'test-cluster-worker3'
], [
'test-cluster-head', 'test-cluster-worker1', 'test-cluster-worker2',
'test-cluster-worker3', 'test-cluster-worker10'
]),
([
'test-cluster-worker1', 'test-cluster-worker20', 'test-cluster-head',
'test-cluster-worker3', 'test-cluster-worker2'
], [
'test-cluster-head', 'test-cluster-worker1', 'test-cluster-worker2',
'test-cluster-worker3', 'test-cluster-worker20'
]),
([
'test-cluster-worker1', 'test-cluster-worker2', 'test-cluster-head',
'test-cluster-worker3', 'test-cluster-worker4'
], [
'test-cluster-head', 'test-cluster-worker1', 'test-cluster-worker2',
'test-cluster-worker3', 'test-cluster-worker4'
]),
([
'test-cluster-head', 'test-cluster-worker1', 'test-cluster-worker2',
'test-cluster-worker3', 'test-cluster-worker4'
], [
'test-cluster-head', 'test-cluster-worker1', 'test-cluster-worker2',
'test-cluster-worker3', 'test-cluster-worker4'
]),
([
'my-worker-head', 'my-worker-worker1', 'my-worker-worker2',
'my-worker-worker3', 'my-worker-worker4'
], [
'my-worker-head', 'my-worker-worker1', 'my-worker-worker2',
'my-worker-worker3', 'my-worker-worker4'
]),
([
'my-worker-head', 'my-worker-worker1', 'extra-pod', 'my-worker-worker2',
'my-worker-worker3', 'my-worker-worker4'
], [
'my-worker-head', 'my-worker-worker1', 'my-worker-worker2',
'my-worker-worker3', 'my-worker-worker4', 'extra-pod'
]),
])
def test_filter_pods_sorts_by_name(unsorted_pod_names,
expected_sorted_pod_names):
"""Test that filter_pods returns pods sorted correctly"""
mock_pod_list = mock.MagicMock()
mock_pod_list.items = []
for pod_name in unsorted_pod_names:
mock_pod = mock.MagicMock()
mock_pod.metadata.name = pod_name
mock_pod.metadata.deletion_timestamp = None
mock_pod_list.items.append(mock_pod)

with patch('sky.provision.kubernetes.utils.kubernetes.core_api'
) as mock_core_api:
mock_core_api.return_value.list_namespaced_pod.return_value = mock_pod_list

result = utils.filter_pods(namespace='test-namespace',
context='test-context',
tag_filters={'test-label': 'test-value'})

# Verify the pods are returned in sorted order
pod_names = list(result.keys())
assert pod_names == expected_sorted_pod_names
Loading