@@ -15,6 +15,7 @@ import (
1515 "k8s.io/kubectl/pkg/cmd/delete"
1616 "k8s.io/kubectl/pkg/cmd/util"
1717 "k8s.io/kubectl/pkg/util/slice"
18+ applyerror "sigs.k8s.io/cli-utils/pkg/apply/error"
1819 "sigs.k8s.io/cli-utils/pkg/apply/event"
1920 "sigs.k8s.io/cli-utils/pkg/apply/info"
2021 "sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
@@ -72,22 +73,16 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
7273 // the resource set.
7374 objs , objsWithCRD , err := a .filterCRsWithCRDInSet (objects )
7475 if err != nil {
75- a .sendTaskResult (taskContext , err )
76+ sendBatchApplyEvents (taskContext , objs , err )
77+ a .sendTaskResult (taskContext )
7678 return
7779 }
7880
7981 // Just send the apply event here. We know it must be a
8082 // Created event since the type didn't already exist in the
8183 // cluster.
8284 for _ , obj := range objsWithCRD {
83- taskContext .EventChannel () <- event.Event {
84- Type : event .ApplyType ,
85- ApplyEvent : event.ApplyEvent {
86- Type : event .ApplyEventResourceUpdate ,
87- Operation : event .Created ,
88- Object : obj ,
89- },
90- }
85+ taskContext .EventChannel () <- createApplyEvent (object .UnstructuredToObjMeta (obj ), event .Created , nil )
9186 }
9287 // Update the resource set to no longer include the CRs.
9388 objects = objs
@@ -97,15 +92,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
9792 // for that here. It could happen if this is dry-run and we removed
9893 // all resources in the previous step.
9994 if len (objects ) == 0 {
100- a .sendTaskResult (taskContext , nil )
101- return
102- }
103-
104- // Set the client and mapping fields on the provided
105- // infos so they can be applied to the cluster.
106- infos , err := a .InfoHelper .BuildInfos (objects )
107- if err != nil {
108- a .sendTaskResult (taskContext , err )
95+ a .sendTaskResult (taskContext )
10996 return
11097 }
11198
@@ -114,15 +101,30 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
114101 ao , err := applyOptionsFactoryFunc (taskContext .EventChannel (),
115102 a .ServerSideOptions , a .DryRunStrategy , a .Factory )
116103 if err != nil {
117- a .sendTaskResult (taskContext , err )
104+ sendBatchApplyEvents (taskContext , objects , err )
105+ a .sendTaskResult (taskContext )
118106 return
119107 }
120- ao .SetObjects (infos )
121- err = ao .Run ()
122- if err != nil {
123- a .sendTaskResult (taskContext , err )
124- return
108+
109+ var infos []* resource.Info
110+ for _ , obj := range objects {
111+ // Set the client and mapping fields on the provided
112+ // info so they can be applied to the cluster.
113+ info , err := a .InfoHelper .BuildInfo (obj )
114+ if err != nil {
115+ taskContext .EventChannel () <- createApplyEvent (
116+ object .UnstructuredToObjMeta (obj ), event .Unchanged , applyerror .NewUnknownTypeError (err ))
117+ continue
118+ }
119+ infos = append (infos , info )
120+ ao .SetObjects ([]* resource.Info {info })
121+ err = ao .Run ()
122+ if err != nil {
123+ taskContext .EventChannel () <- createApplyEvent (
124+ object .UnstructuredToObjMeta (obj ), event .Unchanged , applyerror .NewApplyRunError (err ))
125+ }
125126 }
127+
126128 // Fetch the Generation from all Infos after they have been
127129 // applied.
128130 for _ , inf := range infos {
@@ -140,7 +142,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
140142 taskContext .ResourceApplied (id , uid , gen )
141143 }
142144 }
143- a .sendTaskResult (taskContext , nil )
145+ a .sendTaskResult (taskContext )
144146 }()
145147}
146148
@@ -187,10 +189,8 @@ func newApplyOptions(eventChannel chan event.Event, serverSideOptions common.Ser
187189 }, nil
188190}
189191
190- func (a * ApplyTask ) sendTaskResult (taskContext * taskrunner.TaskContext , err error ) {
191- taskContext .TaskChannel () <- taskrunner.TaskResult {
192- Err : err ,
193- }
192+ func (a * ApplyTask ) sendTaskResult (taskContext * taskrunner.TaskContext ) {
193+ taskContext .TaskChannel () <- taskrunner.TaskResult {}
194194}
195195
196196// filterCRsWithCRDInSet loops through all the resources and filters out the
@@ -277,3 +277,25 @@ func buildCRDsInfo(crds []*unstructured.Unstructured) *crdsInfo {
277277
278278// ClearTimeout is not supported by the ApplyTask.
279279func (a * ApplyTask ) ClearTimeout () {}
280+
281+ // createApplyEvent is a helper function to package an apply event for a single resource.
282+ func createApplyEvent (id object.ObjMetadata , operation event.ApplyEventOperation , err error ) event.Event {
283+ return event.Event {
284+ Type : event .ApplyType ,
285+ ApplyEvent : event.ApplyEvent {
286+ Type : event .ApplyEventResourceUpdate ,
287+ Operation : operation ,
288+ Identifier : id ,
289+ Error : err ,
290+ },
291+ }
292+ }
293+
294+ // sendBatchApplyEvents is a helper function to send out multiple apply events for
295+ // a list of resources when failed to initialize the apply process.
296+ func sendBatchApplyEvents (taskContext * taskrunner.TaskContext , objects []* unstructured.Unstructured , err error ) {
297+ for _ , obj := range objects {
298+ taskContext .EventChannel () <- createApplyEvent (
299+ object .UnstructuredToObjMeta (obj ), event .Unchanged , applyerror .NewInitializeApplyOptionError (err ))
300+ }
301+ }
0 commit comments