Skip to content

Commit bbf032c

Browse files
committed
Scale unused Eventing components to 0
Signed-off-by: Pierangelo Di Pilato <[email protected]>
1 parent 8a241eb commit bbf032c

File tree

2 files changed

+276
-1
lines changed

2 files changed

+276
-1
lines changed

openshift-knative-operator/pkg/eventing/extension.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,20 @@ const (
3636
)
3737

3838
// NewExtension creates a new extension for a Knative Eventing controller.
39-
func NewExtension(ctx context.Context, _ *controller.Impl) operator.Extension {
39+
func NewExtension(ctx context.Context, impl *controller.Impl) operator.Extension {
4040
return &extension{
4141
kubeclient: kubeclient.Get(ctx),
4242
dynamicclient: dynamicclient.Get(ctx),
4343
logger: logging.FromContext(ctx),
44+
scaler: NewScaler(ctx, impl),
4445
}
4546
}
4647

4748
type extension struct {
4849
kubeclient kubernetes.Interface
4950
dynamicclient dynamic.Interface
5051
logger *zap.SugaredLogger
52+
scaler *CoreScalerWrapper
5153
}
5254

5355
func (e *extension) Manifests(ke base.KComponent) ([]mf.Manifest, error) {
@@ -128,6 +130,12 @@ func (e *extension) Reconcile(ctx context.Context, comp base.KComponent) error {
128130
}
129131
}
130132

133+
if err := e.scaler.Scale(ke); err != nil {
134+
err = fmt.Errorf("failed to scale components based on resources: %w", err)
135+
ke.Status.MarkInstallFailed(err.Error())
136+
return err
137+
}
138+
131139
if !eventingistio.IsEnabled(ke.GetSpec().GetConfig()) {
132140
eventingistio.ScaleIstioController(requiredNs, ke, 0)
133141
} else {
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package eventing
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
10+
"go.uber.org/zap"
11+
apiextension "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/apimachinery/pkg/labels"
14+
"k8s.io/apimachinery/pkg/util/wait"
15+
"k8s.io/client-go/tools/cache"
16+
"k8s.io/utils/pointer"
17+
"knative.dev/eventing/pkg/apis/eventing"
18+
"knative.dev/eventing/pkg/client/informers/externalversions"
19+
"knative.dev/eventing/pkg/client/injection/client"
20+
eventingv1 "knative.dev/eventing/pkg/client/listers/eventing/v1"
21+
messagingv1 "knative.dev/eventing/pkg/client/listers/messaging/v1"
22+
"knative.dev/operator/pkg/apis/operator/base"
23+
operatorv1beta1 "knative.dev/operator/pkg/apis/operator/v1beta1"
24+
knativeeventinginformer "knative.dev/operator/pkg/client/injection/informers/operator/v1beta1/knativeeventing"
25+
kubeclient "knative.dev/pkg/client/injection/kube/client"
26+
"knative.dev/pkg/controller"
27+
"knative.dev/pkg/logging"
28+
"knative.dev/pkg/ptr"
29+
)
30+
31+
type coreScaler struct {
32+
eventingv1.BrokerLister
33+
34+
messagingv1.InMemoryChannelLister
35+
36+
apiExtensionClient *apiextension.Clientset
37+
38+
cacheSynced sync.WaitGroup
39+
hasCRDsInstalled atomic.Bool
40+
cancel context.CancelFunc
41+
factory externalversions.SharedInformerFactory
42+
}
43+
44+
type CoreScalerWrapper struct {
45+
scaler func() *coreScaler
46+
scalerMu sync.Mutex
47+
scalerCtx context.Context
48+
knativeEventingInformer cache.SharedIndexInformer
49+
impl *controller.Impl
50+
}
51+
52+
func NewScaler(ctx context.Context, impl *controller.Impl) *CoreScalerWrapper {
53+
s := &CoreScalerWrapper{
54+
scalerCtx: ctx,
55+
scalerMu: sync.Mutex{},
56+
knativeEventingInformer: knativeeventinginformer.Get(ctx).Informer(),
57+
impl: impl,
58+
}
59+
s.resetScaler()
60+
61+
return s
62+
}
63+
64+
func (w *CoreScalerWrapper) Scale(ke *operatorv1beta1.KnativeEventing) error {
65+
w.scalerMu.Lock()
66+
defer w.scalerMu.Unlock()
67+
68+
return w.scaler().scale(ke)
69+
}
70+
71+
func (w *CoreScalerWrapper) Finalize() error {
72+
w.scalerMu.Lock()
73+
defer w.scalerMu.Unlock()
74+
75+
w.scaler().finalize()
76+
w.resetScaler()
77+
78+
return nil
79+
}
80+
81+
func (w *CoreScalerWrapper) resetScaler() {
82+
w.scalerMu.Lock()
83+
defer w.scalerMu.Unlock()
84+
85+
w.scaler = sync.OnceValue(func() *coreScaler {
86+
return newInternalScaler(w.scalerCtx, controller.HandleAll(func(i interface{}) {
87+
w.impl.GlobalResync(w.knativeEventingInformer)
88+
}))
89+
})
90+
}
91+
92+
func newInternalScaler(ctx context.Context, resync cache.ResourceEventHandler) *coreScaler {
93+
94+
c := client.Get(ctx)
95+
f := externalversions.NewSharedInformerFactoryWithOptions(c, controller.GetResyncPeriod(ctx))
96+
97+
ctx, cancel := context.WithCancel(ctx)
98+
99+
logger := logging.FromContext(ctx).With(zap.String("component", "scaler"))
100+
101+
s := &coreScaler{
102+
BrokerLister: f.Eventing().V1().Brokers().Lister(),
103+
104+
InMemoryChannelLister: f.Messaging().V1().InMemoryChannels().Lister(),
105+
106+
apiExtensionClient: apiextension.New(kubeclient.Get(ctx).AppsV1().RESTClient()),
107+
108+
cacheSynced: sync.WaitGroup{},
109+
hasCRDsInstalled: atomic.Bool{},
110+
111+
cancel: cancel,
112+
factory: f,
113+
}
114+
_, _ = f.Eventing().V1().Brokers().Informer().AddEventHandler(resync)
115+
116+
_, _ = f.Messaging().V1().InMemoryChannels().Informer().AddEventHandler(resync)
117+
118+
s.cacheSynced.Add(1)
119+
go func() {
120+
err := wait.PollUntilContextCancel(ctx, time.Second, false, func(ctx context.Context) (done bool, err error) {
121+
hasCRDsInstalled, err := s.verifyCRDsInstalled(ctx)
122+
logger.Debugw("Waiting for CRDs to be installed", zap.Bool("hasCRDsInstalled", hasCRDsInstalled))
123+
if err != nil {
124+
return false, nil
125+
}
126+
return hasCRDsInstalled, nil
127+
})
128+
if err != nil {
129+
return
130+
}
131+
132+
logger.Debugw("Starting scaler informer factory and waiting for cache sync")
133+
134+
f.Start(ctx.Done())
135+
f.WaitForCacheSync(ctx.Done())
136+
s.cacheSynced.Done()
137+
}()
138+
139+
return s
140+
}
141+
142+
func (s *coreScaler) scale(ke *operatorv1beta1.KnativeEventing) error {
143+
// If CRDs are not installed, it means that this is the first time we're reconciling Eventing,
144+
// and so we need to install the resources first and then try to scale down components.
145+
if !s.hasCRDsInstalled.Load() {
146+
return nil
147+
}
148+
149+
if ke.Spec.Workloads == nil {
150+
ke.Spec.Workloads = make([]base.WorkloadOverride, 0)
151+
}
152+
153+
hasMTChannelBrokers, err := s.hasMTChannelBrokers()
154+
if err != nil {
155+
return err
156+
}
157+
if hasMTChannelBrokers {
158+
s.ensureAtLeastOneReplica(ke, "mt-broker-controller")
159+
s.ensureAtLeastOneReplica(ke, "mt-broker-ingress")
160+
s.ensureAtLeastOneReplica(ke, "mt-broker-filter")
161+
} else {
162+
s.scaleToZero(ke, "mt-broker-controller")
163+
s.scaleToZero(ke, "mt-broker-ingress")
164+
s.scaleToZero(ke, "mt-broker-filter")
165+
}
166+
167+
hasInMemoryChannels, err := s.hasInMemoryChannels()
168+
if err != nil {
169+
return err
170+
}
171+
if hasInMemoryChannels {
172+
s.ensureAtLeastOneReplica(ke, "imc-controller")
173+
s.ensureAtLeastOneReplica(ke, "imc-dispatcher")
174+
} else {
175+
s.scaleToZero(ke, "imc-controller")
176+
s.scaleToZero(ke, "imc-dispatcher")
177+
}
178+
179+
return nil
180+
}
181+
182+
func (s *coreScaler) finalize() {
183+
s.cancel()
184+
s.factory.Shutdown()
185+
}
186+
187+
func (s *coreScaler) hasMTChannelBrokers() (bool, error) {
188+
brokers, err := s.BrokerLister.List(labels.Everything())
189+
if err != nil {
190+
return false, fmt.Errorf("failed to list brokers: %w", err)
191+
}
192+
for _, b := range brokers {
193+
if v, ok := b.Annotations[eventing.BrokerClassKey]; ok && v == eventing.MTChannelBrokerClassValue {
194+
return true, nil
195+
}
196+
}
197+
return false, nil
198+
}
199+
200+
func (s *coreScaler) hasInMemoryChannels() (bool, error) {
201+
eventTypes, err := s.InMemoryChannelLister.List(labels.Everything())
202+
if err != nil {
203+
return false, fmt.Errorf("failed to list eventtypes: %w", err)
204+
}
205+
return len(eventTypes) > 0, nil
206+
}
207+
208+
func (s *coreScaler) ensureAtLeastOneReplica(ke *operatorv1beta1.KnativeEventing, name string) {
209+
replicas := ptr.Int32(1)
210+
if ke.Spec.HighAvailability != nil && ke.Spec.HighAvailability.Replicas != nil {
211+
replicas = ke.Spec.HighAvailability.Replicas
212+
}
213+
214+
for i, w := range ke.Spec.Workloads {
215+
if w.Name == name {
216+
if w.Replicas == nil {
217+
ke.Spec.Workloads[i].Replicas = replicas
218+
}
219+
return
220+
}
221+
}
222+
223+
ke.Spec.Workloads = append(ke.Spec.Workloads, base.WorkloadOverride{
224+
Name: name,
225+
Replicas: replicas,
226+
})
227+
}
228+
229+
func (s *coreScaler) scaleToZero(ke *operatorv1beta1.KnativeEventing, name string) {
230+
replicas := pointer.Int32(0)
231+
for i, w := range ke.Spec.Workloads {
232+
if w.Name == name {
233+
// Important: Only set this when replicas is unset
234+
if w.Replicas == nil {
235+
ke.Spec.Workloads[i].Replicas = replicas
236+
}
237+
return
238+
}
239+
}
240+
241+
ke.Spec.Workloads = append(ke.Spec.Workloads, base.WorkloadOverride{
242+
Name: name,
243+
Replicas: replicas,
244+
})
245+
}
246+
247+
func (s *coreScaler) verifyCRDsInstalled(ctx context.Context) (bool, error) {
248+
if s.hasCRDsInstalled.Load() {
249+
return true, nil
250+
}
251+
252+
_, err := s.apiExtensionClient.ApiextensionsV1().
253+
CustomResourceDefinitions().
254+
Get(ctx, "brokers.eventing.knative.dev", metav1.GetOptions{})
255+
if err != nil {
256+
return false, fmt.Errorf("failed to get broker CRD: %w", err)
257+
}
258+
_, err = s.apiExtensionClient.ApiextensionsV1().
259+
CustomResourceDefinitions().
260+
Get(ctx, "inmemorychannels.messaging.knative.dev", metav1.GetOptions{})
261+
if err != nil {
262+
return false, fmt.Errorf("failed to get inmemorychannel CRD: %w", err)
263+
}
264+
265+
s.hasCRDsInstalled.Store(true)
266+
return true, nil
267+
}

0 commit comments

Comments
 (0)