Skip to content

Conversation

@VaniHaripriya
Copy link
Contributor

@VaniHaripriya VaniHaripriya commented Jul 2, 2025

Description of your changes:

This change enables the specification of node affinity through input parameters, in alignment with the enhancement discussed in #9682.

Testing Instructions

SDK

  • Create a Python virtualenv , generate the proto files locally and install the SDK.
$ python -m venv .venv
$ pushd api
$ make clean-python python
$ popd
$ source .venv/bin/activate
$ pip install --upgrade pip
$ pip install wheel setuptools protobuf grpcio grpcio-tools
$ pip install -r sdk/python/requirements-dev.txt
$ pip install -e api/v2alpha1/python
$ pip install -e sdk/python
$ pip install -e kubernetes_platform/python

Use the example code to compile

$ kfp dsl compile --py nodeaffinity.py --output nodeaffinity.yaml

You should be able to compile and find the following snippet in the main.yaml file:

---
platforms:
  kubernetes:
    deploymentSpec:
      executors:
        exec-print-hello-with-explicit-affinity:
          nodeAffinity:
          - matchFields:
            - key: metadata.name
              operator: In
              values:
              - node-1
              - node-2
            weight: 100.0
        exec-print-hello-with-json-affinity:
          nodeAffinity:
          - nodeAffinityJson:
              runtimeValue:
                constant:
                  preferredDuringSchedulingIgnoredDuringExecution:
                  - preference:
                      matchExpressions:
                      - key: disktype
                        operator: In
                        values:
                        - ssd
                    weight: 100.0

Checklist:

@google-oss-prow google-oss-prow bot requested review from DharmitD and hbelmiro July 2, 2025 23:37
@VaniHaripriya VaniHaripriya force-pushed the add-node-affinity branch 4 times, most recently from 229ed04 to 251291b Compare July 3, 2025 22:19
@nsingla
Copy link
Contributor

nsingla commented Jul 7, 2025

@VaniHaripriya Possible for you to provide an example of pipeline (sdk component) to verify this?

@google-oss-prow google-oss-prow bot added size/XXL and removed size/L labels Jul 10, 2025
@VaniHaripriya VaniHaripriya force-pushed the add-node-affinity branch 8 times, most recently from b09e168 to 1c59ce6 Compare July 15, 2025 05:22
@HumairAK HumairAK requested review from gmfrasca and removed request for DharmitD July 15, 2025 14:12
@gmfrasca
Copy link
Member

/lgtm

@nsingla
Copy link
Contributor

nsingla commented Jul 16, 2025

@VaniHaripriya Thanks for adding the test description, its very clear now as to how someone can add node affinity. However I am still not sure about the validation to see if this actually worked or not 🤔
Did you try to create a multi node cluster and verified if node affinity actually worked or not? or may within the single cluster, try to see if the only node does not get selected as it does not meet the criteria?

@VaniHaripriya
Copy link
Contributor Author

@VaniHaripriya Thanks for adding the test description, its very clear now as to how someone can add node affinity. However I am still not sure about the validation to see if this actually worked or not 🤔 Did you try to create a multi node cluster and verified if node affinity actually worked or not? or may within the single cluster, try to see if the only node does not get selected as it does not meet the criteria?

@nsingla I confirmed that the node affinity settings are correctly reflected in the generated pipeline YAML and pod specs during runtime. I used a multi-node cluster and set node affinity criteria that do not match the node labels — this caused the pods to remain unscheduled, which confirms that the affinity rules were enforced.

Copy link
Contributor

@hbelmiro hbelmiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in general @VaniHaripriya.
I just left some suggestions to validate the input at compile time (SDK) for a better experience.

Comment on lines 48 to 75
key=expr.get("key"),
operator=expr.get("operator"),
values=expr.get("values", []),
)
if match_fields:
for field in match_fields:
affinity_term.match_fields.add(
key=field.get("key"),
operator=field.get("operator"),
values=field.get("values", []),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add some validations here to catch errors early.

Also, add tests for those validations.

Comment on lines 59 to 54
if weight is not None:
affinity_term.weight = weight
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can validate that weight is 1-100.


def add_node_affinity_json(
task: PipelineTask,
node_affinity_json: Union[pipeline_channel.PipelineParameterChannel, dict],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could validate node_affinity_json against the schema.
You probably can use something like the following (AI-generated):

from kubernetes import client, config

def validate_node_affinity(node_affinity_json: dict):
    """
    Validates a JSON object against the V1NodeAffinity model.

    Args:
        node_affinity_json: A Python dictionary loaded from your JSON.

    Returns:
        True if the JSON is valid, False otherwise.
    """
    try:
        # Create an empty V1NodeAffinity object to merge into.
        v1_node_affinity = client.V1NodeAffinity()

        # The ApiMerger is used to deserialize Python dicts into Kubernetes models.
        merger = client.api_client.ApiMerger()
        merger.merge(v1_node_affinity, node_affinity_json)

        print("✅ JSON is a valid V1NodeAffinity.")
        return True
    except Exception as e:
        print(f"❌ Invalid V1NodeAffinity JSON: {e}")
        return False

And add a test for an invalid JSON.

@google-oss-prow google-oss-prow bot removed the lgtm label Jul 21, 2025
@VaniHaripriya VaniHaripriya force-pushed the add-node-affinity branch 2 times, most recently from 73082f4 to 6d466f0 Compare July 22, 2025 14:32
Comment on lines 47 to 76
if match_expressions:
for expr in match_expressions:
key = expr.get("key")
operator = expr.get("operator")
if not key:
raise ValueError("Each match_expression must have a non-empty 'key'.")
if not operator:
raise ValueError(f"Each match_expression for key '{key}' must have a non-empty 'operator'.")
if operator not in VALID_OPERATORS:
raise ValueError(f"Invalid operator '{operator}' for key '{key}'. Must be one of {sorted(VALID_OPERATORS)}.")
affinity_term.match_expressions.add(
key=key,
operator=operator,
values=expr.get("values", []),
)
if match_fields:
for field in match_fields:
key = field.get("key")
operator = field.get("operator")
if not key:
raise ValueError("Each match_field must have a non-empty 'key'.")
if not operator:
raise ValueError(f"Each match_field for key '{key}' must have a non-empty 'operator'.")
if operator not in VALID_OPERATORS:
raise ValueError(f"Invalid operator '{operator}' for key '{key}'. Must be one of {sorted(VALID_OPERATORS)}.")
affinity_term.match_fields.add(
key=key,
operator=operator,
values=field.get("values", []),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not noticing earlier, but we could create a helper method to eliminate duplicate code here. Both loops do almost the same thing.

try:
k8s_model_dict = common.deserialize_dict_to_k8s_model_keys(node_affinity_json)
client.V1NodeAffinity(**k8s_model_dict)
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It doesn't need to return anything (can be void).

Comment on lines 499 to 508
k8snodeAffinity, err := json.Marshal(nodeAffinityJSON)
if err != nil {
return fmt.Errorf("failed to marshal node affinity json: %w", err)
}
var nodeAffinity k8score.NodeAffinity
if err := json.Unmarshal(k8snodeAffinity, &nodeAffinity); err != nil {
return fmt.Errorf("failed to unmarshal node affinity json: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something, but it doesn't seem right to me. Marshal and then unmarshal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code to avoid both marshal and then unmarshal.

var requiredTerms []k8score.NodeSelectorTerm
var preferredTerms []k8score.PreferredSchedulingTerm

for _, nodeAffinityTerm := range nodeAffinityTerms {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth adding a quick check at the start of the loop to skip empty terms early on:

for i, nodeAffinityTerm := range nodeAffinityTerms {
    if nodeAffinityTerm.GetNodeAffinityJson() == nil && 
       len(nodeAffinityTerm.GetMatchExpressions()) == 0 && 
       len(nodeAffinityTerm.GetMatchFields()) == 0 {
        glog.Warningf("NodeAffinityTerm %d is empty, skipping", i)
        continue
    }
    // rest of processing...
}

Right now, we only catch empty explicit terms later in the function, but JSON terms could also be empty and we'd still try to process them. This would save some unnecessary work and give clearer debugging info.

Comment on lines 364 to 418
def test_component_pipeline_input_required_scheduling(self):
"""Test JSON-based node affinity with pipeline input for required scheduling."""
@dsl.pipeline
def my_pipeline(affinity_input: dict):
task = comp()
kubernetes.add_node_affinity_json(
task,
node_affinity_json=affinity_input,
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
'platforms': {
'kubernetes': {
'deploymentSpec': {
'executors': {
'exec-comp': {
'nodeAffinity': [{
'nodeAffinityJson': {
'componentInputParameter': 'affinity_input'
}
}]
}
}
}
}
}
}

def test_component_pipeline_input_preferred_scheduling(self):
"""Test JSON-based node affinity with pipeline input for preferred scheduling."""
@dsl.pipeline
def my_pipeline(affinity_input: dict):
task = comp()
kubernetes.add_node_affinity_json(
task,
node_affinity_json=affinity_input,
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
'platforms': {
'kubernetes': {
'deploymentSpec': {
'executors': {
'exec-comp': {
'nodeAffinity': [{
'nodeAffinityJson': {
'componentInputParameter': 'affinity_input'
}
}]
}
}
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two tests look exactly the same.

task,
node_affinity_json=affinity_input_2,
)
print(json_format.MessageToDict(my_pipeline.platform_spec))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to keep this print, or is it a leftover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a leftover :-)

Copy link
Contributor

@hbelmiro hbelmiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

Copy link
Contributor

@hbelmiro hbelmiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/approve

@google-oss-prow google-oss-prow bot added the lgtm label Jul 30, 2025
@google-oss-prow
Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: hbelmiro

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow google-oss-prow bot merged commit ecf488b into kubeflow:master Jul 30, 2025
37 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants