Skip to content

Commit 902b586

Browse files
committed
feat(backend): Pipeline specpatch from APIServer envvar
- Allows a KFP admin to apply a custom specpatch to all incoming pipelines - Useful for installment-wide configuration, such as workflow ttl - Take in a JSON string provided via the environment variable `COMPILED_PIPELINE_SPEC_PATCH` Signed-off-by: Giulio Frasca <[email protected]>
1 parent 3b64d73 commit 902b586

File tree

4 files changed

+73
-1
lines changed

4 files changed

+73
-1
lines changed

backend/src/apiserver/common/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const (
3232
KubeflowUserIDPrefix string = "KUBEFLOW_USERID_PREFIX"
3333
UpdatePipelineVersionByDefault string = "AUTO_UPDATE_PIPELINE_DEFAULT_VERSION"
3434
TokenReviewAudience string = "TOKEN_REVIEW_AUDIENCE"
35+
CompiledPipelineSpecPatch string = "COMPILED_PIPELINE_SPEC_PATCH"
3536
)
3637

3738
func IsPipelineVersionUpdatedByDefault() bool {
@@ -127,3 +128,7 @@ func GetKubeflowUserIDPrefix() string {
127128
func GetTokenReviewAudience() string {
128129
return GetStringConfigWithDefault(TokenReviewAudience, DefaultTokenReviewAudience)
129130
}
131+
132+
func GetCompiledPipelineSpecPatch() string {
133+
return GetStringConfigWithDefault(CompiledPipelineSpecPatch, "{}")
134+
}

backend/src/v2/compiler/argocompiler/argo.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,17 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
183183

184184
// compile
185185
err = compiler.Accept(job, kubernetesSpec, c)
186+
if err != nil {
187+
return nil, err
188+
}
189+
190+
// Apply any workflow spec patches from environment variable
191+
patchJSON := common.GetCompiledPipelineSpecPatch()
192+
if err := c.ApplyWorkflowSpecPatch(patchJSON); err != nil {
193+
return nil, fmt.Errorf("failed to apply workflow spec patch: %w", err)
194+
}
186195

187-
return c.wf, err
196+
return c.wf, nil
188197
}
189198

190199
func retrieveLastValidString(s string) string {

backend/src/v2/compiler/argocompiler/spec_patch.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515
package argocompiler
1616

1717
import (
18+
"encoding/json"
19+
"fmt"
20+
21+
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
22+
log "github.com/sirupsen/logrus"
1823
"google.golang.org/protobuf/types/known/structpb"
24+
"k8s.io/apimachinery/pkg/util/strategicpatch"
1925
)
2026

2127
func (c *workflowCompiler) AddKubernetesSpec(name string, kubernetesSpec *structpb.Struct) error {
@@ -25,3 +31,52 @@ func (c *workflowCompiler) AddKubernetesSpec(name string, kubernetesSpec *struct
2531
}
2632
return nil
2733
}
34+
35+
// ApplyWorkflowSpecPatch applies a JSON patch to the compiled workflow specification.
36+
// It validates the JSON and applies it using Kubernetes strategic merge patch.
37+
// Only the workflow's "spec" field can be patched for security reasons.
38+
func (c *workflowCompiler) ApplyWorkflowSpecPatch(patchJSON string) error {
39+
if c.wf == nil {
40+
return fmt.Errorf("workflow is nil")
41+
}
42+
43+
// Check for empty patch string
44+
if patchJSON == "" {
45+
log.Debug("Empty workflow spec patch string provided, skipping patching")
46+
return nil
47+
}
48+
49+
log.Debug("Applying workflow spec patch")
50+
51+
// Validate that the patch is valid JSON by attempting to unmarshal it
52+
var specPatchValidation map[string]interface{}
53+
if err := json.Unmarshal([]byte(patchJSON), &specPatchValidation); err != nil {
54+
return fmt.Errorf("invalid JSON in COMPILED_PIPELINE_SPEC_PATCH: %w", err)
55+
}
56+
57+
// Check if the patch is empty (no fields to patch)
58+
if len(specPatchValidation) == 0 {
59+
log.Debug("Empty workflow spec patch provided, skipping patching")
60+
return nil
61+
}
62+
63+
// Convert the current workflow spec to JSON
64+
originalSpecJSON, err := json.Marshal(c.wf.Spec)
65+
if err != nil {
66+
return fmt.Errorf("failed to marshal workflow spec to JSON: %w", err)
67+
}
68+
69+
// Apply the strategic merge patch to the spec directly
70+
patchedSpecJSON, err := strategicpatch.StrategicMergePatch(originalSpecJSON, []byte(patchJSON), wfapi.WorkflowSpec{})
71+
if err != nil {
72+
return fmt.Errorf("failed to apply strategic merge patch to workflow spec: %w", err)
73+
}
74+
75+
// Unmarshal the patched spec back into the workflow
76+
if err := json.Unmarshal(patchedSpecJSON, &c.wf.Spec); err != nil {
77+
return fmt.Errorf("failed to unmarshal patched workflow spec: %w", err)
78+
}
79+
80+
log.Debug("Successfully applied workflow spec patch")
81+
return nil
82+
}

manifests/kustomize/base/pipeline/ml-pipeline-apiserver-deployment.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ spec:
126126
value: ghcr.io/kubeflow/kfp-driver:2.14.3
127127
- name: V2_LAUNCHER_IMAGE
128128
value: ghcr.io/kubeflow/kfp-launcher:2.14.3
129+
# JSON patch to apply to compiled workflow specifications
130+
- name: COMPILED_PIPELINE_SPEC_PATCH
131+
value: "{}"
129132
image: ghcr.io/kubeflow/kfp-api-server:dummy
130133
imagePullPolicy: IfNotPresent
131134
name: ml-pipeline-api-server

0 commit comments

Comments
 (0)