@@ -22,8 +22,8 @@ import (
22
22
"knative.dev/operator/pkg/apis/operator/base"
23
23
operatorv1beta1 "knative.dev/operator/pkg/apis/operator/v1beta1"
24
24
knativeeventinginformer "knative.dev/operator/pkg/client/injection/informers/operator/v1beta1/knativeeventing"
25
- kubeclient "knative.dev/pkg/client/injection/kube/client"
26
25
"knative.dev/pkg/controller"
26
+ "knative.dev/pkg/injection"
27
27
"knative.dev/pkg/logging"
28
28
"knative.dev/pkg/ptr"
29
29
)
@@ -39,6 +39,8 @@ type coreScaler struct {
39
39
hasCRDsInstalled atomic.Bool
40
40
cancel context.CancelFunc
41
41
factory externalversions.SharedInformerFactory
42
+
43
+ logger * zap.Logger
42
44
}
43
45
44
46
type CoreScalerWrapper struct {
@@ -98,18 +100,22 @@ func newInternalScaler(ctx context.Context, resync cache.ResourceEventHandler) *
98
100
99
101
logger := logging .FromContext (ctx ).With (zap .String ("component" , "scaler" ))
100
102
103
+ apiExtensionClient , _ := apiextension .NewForConfig (injection .GetConfig (ctx ))
104
+
101
105
s := & coreScaler {
102
106
BrokerLister : f .Eventing ().V1 ().Brokers ().Lister (),
103
107
104
108
InMemoryChannelLister : f .Messaging ().V1 ().InMemoryChannels ().Lister (),
105
109
106
- apiExtensionClient : apiextension . New ( kubeclient . Get ( ctx ). AppsV1 (). RESTClient ()) ,
110
+ apiExtensionClient : apiExtensionClient ,
107
111
108
112
cacheSynced : sync.WaitGroup {},
109
113
hasCRDsInstalled : atomic.Bool {},
110
114
111
115
cancel : cancel ,
112
116
factory : f ,
117
+
118
+ logger : logger .Desugar (),
113
119
}
114
120
_ , _ = f .Eventing ().V1 ().Brokers ().Informer ().AddEventHandler (resync )
115
121
@@ -121,6 +127,7 @@ func newInternalScaler(ctx context.Context, resync cache.ResourceEventHandler) *
121
127
hasCRDsInstalled , err := s .verifyCRDsInstalled (ctx )
122
128
logger .Debugw ("Waiting for CRDs to be installed" , zap .Bool ("hasCRDsInstalled" , hasCRDsInstalled ))
123
129
if err != nil {
130
+ logger .Debugw ("Failed to wait for CRDs to be installed" , zap .Error (err ))
124
131
return false , nil
125
132
}
126
133
return hasCRDsInstalled , nil
@@ -152,6 +159,7 @@ func (s *coreScaler) scale(ke *operatorv1beta1.KnativeEventing) error {
152
159
153
160
hasMTChannelBrokers , err := s .hasMTChannelBrokers ()
154
161
if err != nil {
162
+ s .logger .Warn ("failed to verify if there are MT Channel Based Brokers" , zap .Error (err ))
155
163
return err
156
164
}
157
165
if hasMTChannelBrokers {
@@ -166,6 +174,7 @@ func (s *coreScaler) scale(ke *operatorv1beta1.KnativeEventing) error {
166
174
167
175
hasInMemoryChannels , err := s .hasInMemoryChannels ()
168
176
if err != nil {
177
+ s .logger .Warn ("failed to verify if there are in memory channels" , zap .Error (err ))
169
178
return err
170
179
}
171
180
if hasInMemoryChannels {
@@ -198,11 +207,11 @@ func (s *coreScaler) hasMTChannelBrokers() (bool, error) {
198
207
}
199
208
200
209
func (s * coreScaler ) hasInMemoryChannels () (bool , error ) {
201
- eventTypes , err := s .InMemoryChannelLister .List (labels .Everything ())
210
+ imcs , err := s .InMemoryChannelLister .List (labels .Everything ())
202
211
if err != nil {
203
- return false , fmt .Errorf ("failed to list eventtypes : %w" , err )
212
+ return false , fmt .Errorf ("failed to list inmemorychannels : %w" , err )
204
213
}
205
- return len (eventTypes ) > 0 , nil
214
+ return len (imcs ) > 0 , nil
206
215
}
207
216
208
217
func (s * coreScaler ) ensureAtLeastOneReplica (ke * operatorv1beta1.KnativeEventing , name string ) {
@@ -211,6 +220,8 @@ func (s *coreScaler) ensureAtLeastOneReplica(ke *operatorv1beta1.KnativeEventing
211
220
replicas = ke .Spec .HighAvailability .Replicas
212
221
}
213
222
223
+ s .logger .Info ("Scaling up component" , zap .String ("name" , name ), zap .Int32 ("replicas" , * replicas ))
224
+
214
225
for i , w := range ke .Spec .Workloads {
215
226
if w .Name == name {
216
227
if w .Replicas == nil {
@@ -227,6 +238,8 @@ func (s *coreScaler) ensureAtLeastOneReplica(ke *operatorv1beta1.KnativeEventing
227
238
}
228
239
229
240
func (s * coreScaler ) scaleToZero (ke * operatorv1beta1.KnativeEventing , name string ) {
241
+ s .logger .Info ("Scaling down component" , zap .String ("name" , name ))
242
+
230
243
replicas := pointer .Int32 (0 )
231
244
for i , w := range ke .Spec .Workloads {
232
245
if w .Name == name {
0 commit comments