fix(sdk): fix for passing input pipelines parameters to parallelfor group#13079
Conversation
…me to match the pattern expected by subDAG; Assisted-by: Claude Signed-off-by: Mateusz Switala <mswitala@redhat.com>
Signed-off-by: Mateusz Switala <mswitala@redhat.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @Mateusz-Switala. Thanks for your PR. I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
There was a problem hiding this comment.
Pull request overview
Fixes propagation of pipeline input parameters referenced from Kubernetes platform config when compiling tasks inside control-flow sub-DAGs (notably dsl.ParallelFor), addressing #13078.
Changes:
- Ensure
PipelineParameterChannelvalues used in Kubernetes platform config are added to a task’s channel inputs so they can be “punched through” sub-DAGs. - Rewrite
componentInputParameterreferences in per-task platform config to use the parent sub-DAG’s injected/prefixed parameter names when applicable. - Add unit and end-to-end compilation tests covering the rewrite behavior and ParallelFor + secret pipeline param scenario.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
kubernetes_platform/python/kfp/kubernetes/common.py |
Adds pipeline-parameter channels referenced by K8s config into task channel inputs to enable sub-DAG propagation. |
sdk/python/kfp/compiler/pipeline_spec_builder.py |
Rewrites platform-config componentInputParameter names to match prefixed parent DAG inputs for non-root groups. |
sdk/python/kfp/compiler/pipeline_spec_builder_test.py |
Adds unit tests for the rewrite helper and an end-to-end ParallelFor + secret-name pipeline param compilation test. |
kubernetes_platform/python/test/unit/test_secret.py |
Adds tests intended to cover pipeline-parameter handling for secret configuration. |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Mateusz Switala <mswitala@redhat.com>
Signed-off-by: Mateusz Switala <mswitala@redhat.com>
… of github.com:Mateusz-Switala/kubeflow-pipelines into fix_allow_passing_input_pipelines_to_parallelfor_group
|
/ok-to-test |
| tasks['initial_count'] = initial_count | ||
| tasks['channel_pattern'] = secret_name.pattern | ||
|
|
||
| assert len(tasks['task']._channel_inputs) == tasks['initial_count'] + 1 |
There was a problem hiding this comment.
This test fails on the current PR head. Running python3.12 -m pytest kubernetes_platform/python/test/unit/test_secret.py -k TestParseK8sParameterInputChannelPropagation hits assert 0 == (0 + 1) here because decorating my_pipeline does not populate tasks["task"]._channel_inputs the way this test assumes. As written, the patch adds a red unit test. Please either compile/assert on the generated spec or instantiate the pipeline in a way that actually exercises parse_k8s_parameter_input before reading tasks.
| if isinstance(obj, dict): | ||
| result = {} | ||
| for key, value in obj.items(): | ||
| if key == 'componentInputParameter' and isinstance( |
There was a problem hiding this comment.
This rewrite only touches componentInputParameter, but parse_k8s_parameter_input() still emits taskOutputParameter for values coming from upstream tasks. That leaves a broken case for platform config inside sub-DAGs: if secret_name comes from a task outside the ParallelFor, the loop component gets the surfaced pipelinechannel--... input, but the executor config here still points at taskOutputParameter.producerTask=<outer task>. The v2 driver resolves task outputs within the current DAG, so that cross-DAG reference still fails. I reproduced it by compiling a pipeline that uses emit_secret_name().output as secret_name inside dsl.ParallelFor; the generated loop component had pipelinechannel--emit-secret-name-Output, while the Kubernetes executor config still referenced taskOutputParameter.
|
@filip-komarzyniec please take a look at above comments while @Mateusz-Switala is on PTO. |
Description of your changes:
These changes fixes the bug: #13078
_rewrite_platform_config_component_input_params()which rewrites componentInputParameter references in the platform config dict. When a task is inside a non-root sub-DAG, it replaces unprefixed param names (e.g. secret_name) with their prefixed equivalents (pipelinechannel--secret_name) if the prefixed version exists in the parent component's inputs. This is called in build_spec_by_group before converting the platform config to a platform spec.Checklist: