From a460c4f2952c5acdacd4452a85fa4acfadc69e26 Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Tue, 22 Jul 2025 22:22:05 +0200 Subject: [PATCH 01/13] Base migration to the new events API Signed-off-by: Borja Clemente --- ...ve-cluster-specific-code-out-of-manager.md | 4 +- pkg/cluster/cluster.go | 32 ++-------- pkg/cluster/cluster_test.go | 5 +- pkg/cluster/internal.go | 6 +- pkg/internal/recorder/recorder.go | 64 +++++++------------ .../recorder/recorder_integration_test.go | 2 +- pkg/internal/recorder/recorder_test.go | 2 +- pkg/leaderelection/leader_election.go | 3 +- pkg/manager/internal.go | 6 +- pkg/manager/manager.go | 40 ++++-------- pkg/manager/manager_test.go | 15 ++--- pkg/recorder/example_test.go | 4 +- pkg/recorder/recorder.go | 4 +- 13 files changed, 64 insertions(+), 123 deletions(-) diff --git a/designs/move-cluster-specific-code-out-of-manager.md b/designs/move-cluster-specific-code-out-of-manager.md index 67b7a419a5..c4af7bdc50 100644 --- a/designs/move-cluster-specific-code-out-of-manager.md +++ b/designs/move-cluster-specific-code-out-of-manager.md @@ -61,8 +61,8 @@ type Cluster interface { // GetCache returns a cache.Cache GetCache() cache.Cache - // GetEventRecorderFor returns a new EventRecorder for the provided name - GetEventRecorderFor(name string) record.EventRecorder + //GetEventRecorder returns a new EventRecorder for the provided name + GetEventRecorder(name string) record.EventRecorder // GetRESTMapper returns a RESTMapper GetRESTMapper() meta.RESTMapper diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 0603f4cde5..57016ebce2 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -26,7 +26,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -58,8 +58,8 @@ type Cluster interface { // GetFieldIndexer returns a client.FieldIndexer configured with the client GetFieldIndexer() client.FieldIndexer - // GetEventRecorderFor returns a new EventRecorder for the provided name - GetEventRecorderFor(name string) record.EventRecorder + // GetEventRecorder returns a new EventRecorder for the provided name + GetEventRecorder(name string) events.EventRecorder // GetRESTMapper returns a RESTMapper GetRESTMapper() meta.RESTMapper @@ -126,16 +126,10 @@ type Options struct { // // Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers // is shorter than the lifetime of your process. - EventBroadcaster record.EventBroadcaster - - // makeBroadcaster allows deferring the creation of the broadcaster to - // avoid leaking goroutines if we never call Start on this manager. It also - // returns whether or not this is a "owned" broadcaster, and as such should be - // stopped with the manager. - makeBroadcaster intrec.EventBroadcasterProducer + EventBroadcaster events.EventBroadcaster // Dependency injection for testing - newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error) + newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, broadcaster events.EventBroadcaster, stopWithProvider bool) (*intrec.Provider, error) } // Option can be used to manipulate Options. @@ -228,7 +222,8 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) { // Create the recorder provider to inject event recorders for the components. // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific // to the particular controller that it's being injected into, rather than a generic one like is here. - recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) + // Stop the broadcaster with the provider only if the broadcaster is externally given (aka non-nil). + recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.EventBroadcaster, options.EventBroadcaster == nil) if err != nil { return nil, err } @@ -281,19 +276,6 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) { options.newRecorderProvider = intrec.NewProvider } - // This is duplicated with pkg/manager, we need it here to provide - // the user with an EventBroadcaster and there for the Leader election - if options.EventBroadcaster == nil { - // defer initialization to avoid leaking by default - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return record.NewBroadcaster(), true - } - } else { - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return options.EventBroadcaster, false - } - } - if options.Logger.GetSink() == nil { options.Logger = logf.RuntimeLog.WithName("cluster") } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index c08a742403..ee994d21ff 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -40,7 +40,6 @@ var _ = Describe("cluster.Cluster", func() { c, err := New(nil) Expect(c).To(BeNil()) Expect(err.Error()).To(ContainSubstring("must specify Config")) - }) It("should return an error if it can't create a RestMapper", func() { @@ -50,7 +49,6 @@ var _ = Describe("cluster.Cluster", func() { }) Expect(c).To(BeNil()) Expect(err).To(Equal(expected)) - }) It("should return an error it can't create a client.Client", func() { @@ -96,7 +94,6 @@ var _ = Describe("cluster.Cluster", func() { Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("expected error")) }) - }) Describe("Start", func() { @@ -160,7 +157,7 @@ var _ = Describe("cluster.Cluster", func() { It("should provide a function to get the EventRecorder", func() { c, err := New(cfg) Expect(err).NotTo(HaveOccurred()) - Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) + Expect(c.GetEventRecorder("test")).NotTo(BeNil()) }) It("should provide a function to get the APIReader", func() { c, err := New(cfg) diff --git a/pkg/cluster/internal.go b/pkg/cluster/internal.go index 2742764231..cd5cffa288 100644 --- a/pkg/cluster/internal.go +++ b/pkg/cluster/internal.go @@ -24,7 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -83,8 +83,8 @@ func (c *cluster) GetCache() cache.Cache { return c.cache } -func (c *cluster) GetEventRecorderFor(name string) record.EventRecorder { - return c.recorderProvider.GetEventRecorderFor(name) +func (c *cluster) GetEventRecorder(name string) events.EventRecorder { + return c.recorderProvider.GetEventRecorder(name) } func (c *cluster) GetRESTMapper() meta.RESTMapper { diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 21f0146ba3..a5038a9304 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -23,17 +23,16 @@ import ( "sync" "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" ) // EventBroadcasterProducer makes an event broadcaster, returning // whether or not the broadcaster should be stopped with the Provider, // or not (e.g. if it's shared, it shouldn't be stopped with the Provider). -type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool) +type EventBroadcasterProducer func() (caster events.EventBroadcaster, stopWithProvider bool) // Provider is a recorder.Provider that records events to the k8s API server // and to a logr Logger. @@ -45,11 +44,11 @@ type Provider struct { scheme *runtime.Scheme // logger is the logger to use when logging diagnostic event info logger logr.Logger - evtClient corev1client.EventInterface + evtClient eventsv1client.EventsV1Interface makeBroadcaster EventBroadcasterProducer broadcasterOnce sync.Once - broadcaster record.EventBroadcaster + broadcaster events.EventBroadcaster stopBroadcaster bool } @@ -89,7 +88,7 @@ func (p *Provider) Stop(shutdownCtx context.Context) { // getBroadcaster ensures that a broadcaster is started for this // provider, and returns it. It's threadsafe. -func (p *Provider) getBroadcaster() record.EventBroadcaster { +func (p *Provider) getBroadcaster() events.EventBroadcaster { // NB(directxman12): this can technically still leak if something calls // "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we // create the broadcaster in start, we could race with other things that @@ -97,37 +96,40 @@ func (p *Provider) getBroadcaster() record.EventBroadcaster { // silently swallowing events and more locking, but that seems suboptimal. p.broadcasterOnce.Do(func() { - broadcaster, stop := p.makeBroadcaster() - broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient}) - broadcaster.StartEventWatcher( - func(e *corev1.Event) { + if p.broadcaster == nil { + p.broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: p.evtClient}) + } + // TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider. + p.broadcaster.StartRecordingToSink(nil) + + // TODO(clebs): figure out if we still need this and how the change would make sense. + p.broadcaster.StartEventWatcher( + func(e runtime.Object) { p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason) }) - p.broadcaster = broadcaster - p.stopBroadcaster = stop }) return p.broadcaster } // NewProvider create a new Provider instance. -func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) { +func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, broadcaster events.EventBroadcaster, stopWithProvider bool) (*Provider, error) { if httpClient == nil { panic("httpClient must not be nil") } - corev1Client, err := corev1client.NewForConfigAndClient(config, httpClient) + eventsv1Client, err := eventsv1client.NewForConfigAndClient(config, httpClient) if err != nil { return nil, fmt.Errorf("failed to init client: %w", err) } - p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: corev1Client.Events("")} + p := &Provider{scheme: scheme, logger: logger, broadcaster: broadcaster, stopBroadcaster: stopWithProvider, evtClient: eventsv1Client} return p, nil } -// GetEventRecorderFor returns an event recorder that broadcasts to this provider's +// GetEventRecorder returns an event recorder that broadcasts to this provider's // broadcaster. All events will be associated with a component of the given name. -func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder { +func (p *Provider) GetEventRecorder(name string) events.EventRecorder { return &lazyRecorder{ prov: p, name: name, @@ -141,41 +143,23 @@ type lazyRecorder struct { name string recOnce sync.Once - rec record.EventRecorder + rec events.EventRecorder } // ensureRecording ensures that a concrete recorder is populated for this recorder. func (l *lazyRecorder) ensureRecording() { l.recOnce.Do(func() { broadcaster := l.prov.getBroadcaster() - l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name}) + l.rec = broadcaster.NewRecorder(l.prov.scheme, l.name) }) } -func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) { - l.ensureRecording() - - l.prov.lock.RLock() - if !l.prov.stopped { - l.rec.Event(object, eventtype, reason, message) - } - l.prov.lock.RUnlock() -} -func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { - l.ensureRecording() - - l.prov.lock.RLock() - if !l.prov.stopped { - l.rec.Eventf(object, eventtype, reason, messageFmt, args...) - } - l.prov.lock.RUnlock() -} -func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { +func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { l.ensureRecording() l.prov.lock.RLock() if !l.prov.stopped { - l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...) + l.rec.Eventf(regarding, related, eventtype, reason, action, note, args...) } l.prov.lock.RUnlock() } diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index c278fbde79..e29e2497f0 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -43,7 +43,7 @@ var _ = Describe("recorder", func() { Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") - recorder := cm.GetEventRecorderFor("test-recorder") + recorder := cm.GetEventRecorder("test-recorder") instance, err := controller.New("foo-controller", cm, controller.Options{ Reconciler: reconcile.Func( func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { diff --git a/pkg/internal/recorder/recorder_test.go b/pkg/internal/recorder/recorder_test.go index e226e165a3..1aabeffa0f 100644 --- a/pkg/internal/recorder/recorder_test.go +++ b/pkg/internal/recorder/recorder_test.go @@ -48,7 +48,7 @@ var _ = Describe("recorder.Provider", func() { provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) Expect(err).NotTo(HaveOccurred()) - recorder := provider.GetEventRecorderFor("test") + recorder := provider.GetEventRecorder("test") Expect(recorder).NotTo(BeNil()) }) }) diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index 6c013e7992..25cbc51e02 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -126,9 +126,10 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op options.LeaderElectionID, corev1Client, coordinationClient, + // TODO(clebs): figure out how to solve this. resourcelock.ResourceLockConfig{ Identity: id, - EventRecorder: recorderProvider.GetEventRecorderFor(id), + EventRecorder: recorderProvider.GetEventRecorder(id), }, options.LeaderLabels, ) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index a9f91cbdd5..f2aa73faf3 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -32,9 +32,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -255,8 +255,8 @@ func (cm *controllerManager) GetCache() cache.Cache { return cm.cluster.GetCache() } -func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder { - return cm.cluster.GetEventRecorderFor(name) +func (cm *controllerManager) GetEventRecorder(name string) events.EventRecorder { + return cm.cluster.GetEventRecorder(name) } func (cm *controllerManager) GetRESTMapper() meta.RESTMapper { diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index e0e94245e7..5be59d2026 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -29,9 +29,10 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -261,7 +262,7 @@ type Options struct { // // Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers // is shorter than the lifetime of your process. - EventBroadcaster record.EventBroadcaster + EventBroadcaster events.EventBroadcaster // GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop. // To disable graceful shutdown, set to time.Duration(0) @@ -274,14 +275,8 @@ type Options struct { // +optional Controller config.Controller - // makeBroadcaster allows deferring the creation of the broadcaster to - // avoid leaking goroutines if we never call Start on this manager. It also - // returns whether or not this is a "owned" broadcaster, and as such should be - // stopped with the manager. - makeBroadcaster intrec.EventBroadcasterProducer - // Dependency injection for testing - newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error) + newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, broadcaster events.EventBroadcaster, stopWithProvider bool) (*intrec.Provider, error) newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) newMetricsServer func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) newHealthProbeListener func(addr string) (net.Listener, error) @@ -337,7 +332,10 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, errors.New("must specify Config") } // Set default values for options fields - options = setOptionsDefaults(options) + options, err := setOptionsDefaults(config, options) + if err != nil { + return nil, err + } cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { clusterOptions.Scheme = options.Scheme @@ -361,7 +359,7 @@ func New(config *rest.Config, options Options) (Manager, error) { // Create the recorder provider to inject event recorders for the components. // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific // to the particular controller that it's being injected into, rather than a generic one like is here. - recorderProvider, err := options.newRecorderProvider(config, cluster.GetHTTPClient(), cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster) + recorderProvider, err := options.newRecorderProvider(config, cluster.GetHTTPClient(), cluster.GetScheme(), options.Logger.WithName("events"), options.EventBroadcaster, options.EventBroadcaster == nil) if err != nil { return nil, err } @@ -388,7 +386,7 @@ func New(config *rest.Config, options Options) (Manager, error) { if err != nil { return nil, err } - leaderRecorderProvider, err = options.newRecorderProvider(leaderConfig, httpClient, scheme, options.Logger.WithName("events"), options.makeBroadcaster) + leaderRecorderProvider, err = options.newRecorderProvider(leaderConfig, httpClient, scheme, options.Logger.WithName("events"), options.EventBroadcaster, options.EventBroadcaster == nil) if err != nil { return nil, err } @@ -493,7 +491,7 @@ func defaultBaseContext() context.Context { } // setOptionsDefaults set default values for Options fields. -func setOptionsDefaults(options Options) Options { +func setOptionsDefaults(config *rest.Config, options Options) (Options, error) { // Allow newResourceLock to be mocked if options.newResourceLock == nil { options.newResourceLock = leaderelection.NewResourceLock @@ -504,20 +502,6 @@ func setOptionsDefaults(options Options) Options { options.newRecorderProvider = intrec.NewProvider } - // This is duplicated with pkg/cluster, we need it here - // for the leader election and there to provide the user with - // an EventBroadcaster - if options.EventBroadcaster == nil { - // defer initialization to avoid leaking by default - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return record.NewBroadcaster(), true - } - } else { - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return options.EventBroadcaster, false - } - } - if options.newMetricsServer == nil { options.newMetricsServer = metricsserver.NewServer } @@ -571,5 +555,5 @@ func setOptionsDefaults(options Options) Options { options.WebhookServer = webhook.NewServer(webhook.Options{}) } - return options + return options, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 4363d62f59..ad96e28188 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -60,7 +60,6 @@ var _ = Describe("manger.Manager", func() { m, err := New(nil, Options{}) Expect(m).To(BeNil()) Expect(err.Error()).To(ContainSubstring("must specify Config")) - }) It("should return an error if it can't create a RestMapper", func() { @@ -70,7 +69,6 @@ var _ = Describe("manger.Manager", func() { }) Expect(m).To(BeNil()) Expect(err).To(Equal(expected)) - }) It("should return an error it can't create a client.Client", func() { @@ -207,7 +205,6 @@ var _ = Describe("manger.Manager", func() { } // Don't leak routines <-mgrDone - }) It("should disable gracefulShutdown when stopping to lead", func(ctx SpecContext) { m, err := New(cfg, Options{ @@ -443,7 +440,6 @@ var _ = Describe("manger.Manager", func() { Expect(ok).To(BeTrue()) _, isLeaseLock := cm.resourceLock.(*resourcelock.LeaseLock) Expect(isLeaseLock).To(BeTrue()) - }) It("should use the specified ResourceLock", func() { m, err := New(cfg, Options{ @@ -671,7 +667,7 @@ var _ = Describe("manger.Manager", func() { }) Describe("Start", func() { - var startSuite = func(options Options, callbacks ...func(Manager)) { + startSuite := func(options Options, callbacks ...func(Manager)) { It("should Start each Component", func(ctx SpecContext) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) @@ -1256,7 +1252,6 @@ var _ = Describe("manger.Manager", func() { <-managerStopDone Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond)) }) - } Context("with defaults", func() { @@ -1790,7 +1785,6 @@ var _ = Describe("manger.Manager", func() { err = m.Start(ctx) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("manager already started")) - }) }) @@ -1820,7 +1814,7 @@ var _ = Describe("manger.Manager", func() { ns := corev1.Namespace{} ns.Name = "default" - recorder := m.GetEventRecorderFor("rock-and-roll") + recorder := m.GetEventRecorder("rock-and-roll") Expect(m.Add(RunnableFunc(func(_ context.Context) error { recorder.Event(&ns, "Warning", "BallroomBlitz", "yeah, yeah, yeah-yeah-yeah") return nil @@ -1941,7 +1935,7 @@ var _ = Describe("manger.Manager", func() { It("should provide a function to get the EventRecorder", func() { m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) - Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) + Expect(m.GetEventRecorder("test")).NotTo(BeNil()) }) It("should provide a function to get the APIReader", func() { m, err := New(cfg, Options{}) @@ -2020,8 +2014,7 @@ var _ = Describe("manger.Manager", func() { }) }) -type runnableError struct { -} +type runnableError struct{} func (runnableError) Error() string { return "not feeling like that" diff --git a/pkg/recorder/example_test.go b/pkg/recorder/example_test.go index 969420d817..6aaf67b6ed 100644 --- a/pkg/recorder/example_test.go +++ b/pkg/recorder/example_test.go @@ -30,7 +30,7 @@ var ( func Example_event() { // recorderProvider is a recorder.Provider - recorder := recorderProvider.GetEventRecorderFor("my-controller") + recorder := recorderProvider.GetEventRecorder("my-controller") // emit an event with a fixed message recorder.Event(somePod, corev1.EventTypeWarning, @@ -39,7 +39,7 @@ func Example_event() { func Example_eventf() { // recorderProvider is a recorder.Provider - recorder := recorderProvider.GetEventRecorderFor("my-controller") + recorder := recorderProvider.GetEventRecorder("my-controller") // emit an event with a variable message mildCheese := "Wensleydale" diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go index f093f0a726..03060e7e32 100644 --- a/pkg/recorder/recorder.go +++ b/pkg/recorder/recorder.go @@ -21,11 +21,11 @@ limitations under the License. package recorder import ( - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" ) // Provider knows how to generate new event recorders with given name. type Provider interface { // NewRecorder returns an EventRecorder with given name. - GetEventRecorderFor(name string) record.EventRecorder + GetEventRecorder(name string) events.EventRecorder } From 032b80cd3fc7757960aa04d89562672e4840e5c4 Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Wed, 23 Jul 2025 17:40:28 +0200 Subject: [PATCH 02/13] Add support for both the new and old events APIs simultaneously. Signed-off-by: Borja Clemente --- pkg/cluster/cluster.go | 6 +++--- pkg/cluster/internal.go | 5 +++++ pkg/internal/recorder/recorder.go | 31 +++++++++++++++++++++++++++ pkg/leaderelection/leader_election.go | 3 +-- pkg/manager/internal.go | 5 +++++ pkg/manager/manager.go | 1 - pkg/recorder/recorder.go | 4 ++++ 7 files changed, 49 insertions(+), 6 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 57016ebce2..80c457ab0a 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -33,10 +33,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" + "sigs.k8s.io/controller-runtime/pkg/recorder" ) // Cluster provides various methods to interact with a cluster. type Cluster interface { + recorder.Provider + // GetHTTPClient returns an HTTP client that can be used to talk to the apiserver GetHTTPClient() *http.Client @@ -58,9 +61,6 @@ type Cluster interface { // GetFieldIndexer returns a client.FieldIndexer configured with the client GetFieldIndexer() client.FieldIndexer - // GetEventRecorder returns a new EventRecorder for the provided name - GetEventRecorder(name string) events.EventRecorder - // GetRESTMapper returns a RESTMapper GetRESTMapper() meta.RESTMapper diff --git a/pkg/cluster/internal.go b/pkg/cluster/internal.go index cd5cffa288..a8e6a877a9 100644 --- a/pkg/cluster/internal.go +++ b/pkg/cluster/internal.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "k8s.io/client-go/tools/events" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -87,6 +88,10 @@ func (c *cluster) GetEventRecorder(name string) events.EventRecorder { return c.recorderProvider.GetEventRecorder(name) } +func (c *cluster) GetOldEventRecorder(name string) record.EventRecorder { + return c.recorderProvider.GetOldEventRecorder(name) +} + func (c *cluster) GetRESTMapper() meta.RESTMapper { return c.mapper } diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index a5038a9304..fde25ea6b8 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -27,6 +27,7 @@ import ( eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/events" + "k8s.io/client-go/tools/record" ) // EventBroadcasterProducer makes an event broadcaster, returning @@ -136,6 +137,17 @@ func (p *Provider) GetEventRecorder(name string) events.EventRecorder { } } +// GetOldEventRecorder returns an event recorder that broadcasts to this provider's +// broadcaster. All events will be associated with a component of the given name. +func (p *Provider) GetOldEventRecorder(name string) record.EventRecorder { + return &oldRecorder{ + newRecorder: &lazyRecorder{ + prov: p, + name: name, + }, + } +} + // lazyRecorder is a recorder that doesn't actually instantiate any underlying // recorder until the first event is emitted. type lazyRecorder struct { @@ -163,3 +175,22 @@ func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, } l.prov.lock.RUnlock() } + +// oldRecorder is a wrapper around the events.EventRecorder that implements the old record.EventRecorder API. +// This is a temporary solution to support both the old and new events APIs without duplicating everything. +// Internally it calls the new events API from the old API funcs and no longer supported parameters are ignored (e.g. annotations). +type oldRecorder struct { + newRecorder *lazyRecorder +} + +func (l *oldRecorder) Event(object runtime.Object, eventtype, reason, message string) { + l.newRecorder.Eventf(object, nil, eventtype, reason, "unsupported", message) +} + +func (l *oldRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + l.newRecorder.Eventf(object, nil, eventtype, reason, "unsupported", messageFmt, args...) +} + +func (l *oldRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { + l.newRecorder.Eventf(object, nil, eventtype, reason, "unsupported", messageFmt, args...) +} diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index 25cbc51e02..641762a971 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -126,10 +126,9 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op options.LeaderElectionID, corev1Client, coordinationClient, - // TODO(clebs): figure out how to solve this. resourcelock.ResourceLockConfig{ Identity: id, - EventRecorder: recorderProvider.GetEventRecorder(id), + EventRecorder: recorderProvider.GetOldEventRecorder(id), }, options.LeaderLabels, ) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index f2aa73faf3..b082f4c97b 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -259,6 +260,10 @@ func (cm *controllerManager) GetEventRecorder(name string) events.EventRecorder return cm.cluster.GetEventRecorder(name) } +func (cm *controllerManager) GetOldEventRecorder(name string) record.EventRecorder { + return cm.cluster.GetOldEventRecorder(name) +} + func (cm *controllerManager) GetRESTMapper() meta.RESTMapper { return cm.cluster.GetRESTMapper() } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 5be59d2026..04d0f1a639 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -29,7 +29,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection/resourcelock" diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go index 03060e7e32..cd0b52922a 100644 --- a/pkg/recorder/recorder.go +++ b/pkg/recorder/recorder.go @@ -22,10 +22,14 @@ package recorder import ( "k8s.io/client-go/tools/events" + "k8s.io/client-go/tools/record" ) // Provider knows how to generate new event recorders with given name. type Provider interface { // NewRecorder returns an EventRecorder with given name. GetEventRecorder(name string) events.EventRecorder + // GetOldEventRecorder returns an EventRecorder for the old events API. + // The old API is not 100% supported anymore, use the new one whenever possible. + GetOldEventRecorder(name string) record.EventRecorder } From c91e2390a0f72f56465c86251738e5488987d0a5 Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Wed, 23 Jul 2025 22:02:36 +0200 Subject: [PATCH 03/13] Adapt StartEventWatcher to the new API Signed-off-by: Borja Clemente --- pkg/internal/recorder/recorder.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index fde25ea6b8..6aa759f160 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -23,6 +23,7 @@ import ( "sync" "github.com/go-logr/logr" + eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/runtime" eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" @@ -105,8 +106,12 @@ func (p *Provider) getBroadcaster() events.EventBroadcaster { // TODO(clebs): figure out if we still need this and how the change would make sense. p.broadcaster.StartEventWatcher( - func(e runtime.Object) { - p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason) + func(obj runtime.Object) { + if e, ok := obj.(*eventsv1.Event); ok { + p.logger.V(1).Info(e.Note, "type", e.Type, "object", e.Regarding, "related", e.Related, "reason", e.Reason) + } else { + p.logger.V(1).Info("event watcher received an unsupported object type", "gvk", obj.GetObjectKind().GroupVersionKind().String()) + } }) }) From fa9c08197e180717001fb45f78e19b5bb15dfa6e Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Thu, 24 Jul 2025 10:45:22 +0200 Subject: [PATCH 04/13] Revert renaming of GetEventRecorderFor Signed-off-by: Borja Clemente --- ...ve-cluster-specific-code-out-of-manager.md | 4 +- pkg/cluster/cluster_test.go | 5 ++- pkg/cluster/internal.go | 8 ++-- pkg/internal/recorder/recorder.go | 38 ++++++++----------- .../recorder/recorder_integration_test.go | 2 +- pkg/internal/recorder/recorder_test.go | 12 +++--- pkg/leaderelection/leader_election.go | 2 +- pkg/manager/internal.go | 8 ++-- pkg/manager/manager_test.go | 7 ++-- pkg/recorder/example_test.go | 4 +- pkg/recorder/recorder.go | 10 +++-- 11 files changed, 48 insertions(+), 52 deletions(-) diff --git a/designs/move-cluster-specific-code-out-of-manager.md b/designs/move-cluster-specific-code-out-of-manager.md index c4af7bdc50..67b7a419a5 100644 --- a/designs/move-cluster-specific-code-out-of-manager.md +++ b/designs/move-cluster-specific-code-out-of-manager.md @@ -61,8 +61,8 @@ type Cluster interface { // GetCache returns a cache.Cache GetCache() cache.Cache - //GetEventRecorder returns a new EventRecorder for the provided name - GetEventRecorder(name string) record.EventRecorder + // GetEventRecorderFor returns a new EventRecorder for the provided name + GetEventRecorderFor(name string) record.EventRecorder // GetRESTMapper returns a RESTMapper GetRESTMapper() meta.RESTMapper diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index ee994d21ff..90292feb60 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" @@ -86,7 +87,7 @@ var _ = Describe("cluster.Cluster", func() { It("should return an error it can't create a recorder.Provider", func() { c, err := New(cfg, func(o *Options) { - o.newRecorderProvider = func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ intrec.EventBroadcasterProducer) (*intrec.Provider, error) { + o.newRecorderProvider = func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ events.EventBroadcaster, _ bool) (*intrec.Provider, error) { return nil, fmt.Errorf("expected error") } }) @@ -157,7 +158,7 @@ var _ = Describe("cluster.Cluster", func() { It("should provide a function to get the EventRecorder", func() { c, err := New(cfg) Expect(err).NotTo(HaveOccurred()) - Expect(c.GetEventRecorder("test")).NotTo(BeNil()) + Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) }) It("should provide a function to get the APIReader", func() { c, err := New(cfg) diff --git a/pkg/cluster/internal.go b/pkg/cluster/internal.go index a8e6a877a9..755f83b546 100644 --- a/pkg/cluster/internal.go +++ b/pkg/cluster/internal.go @@ -84,12 +84,12 @@ func (c *cluster) GetCache() cache.Cache { return c.cache } -func (c *cluster) GetEventRecorder(name string) events.EventRecorder { - return c.recorderProvider.GetEventRecorder(name) +func (c *cluster) GetEventRecorderFor(name string) record.EventRecorder { + return c.recorderProvider.GetEventRecorderFor(name) } -func (c *cluster) GetOldEventRecorder(name string) record.EventRecorder { - return c.recorderProvider.GetOldEventRecorder(name) +func (c *cluster) GetEventRecorder(name string) events.EventRecorder { + return c.recorderProvider.GetEventRecorder(name) } func (c *cluster) GetRESTMapper() meta.RESTMapper { diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 6aa759f160..9fc5610b0b 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -31,11 +31,6 @@ import ( "k8s.io/client-go/tools/record" ) -// EventBroadcasterProducer makes an event broadcaster, returning -// whether or not the broadcaster should be stopped with the Provider, -// or not (e.g. if it's shared, it shouldn't be stopped with the Provider). -type EventBroadcasterProducer func() (caster events.EventBroadcaster, stopWithProvider bool) - // Provider is a recorder.Provider that records events to the k8s API server // and to a logr Logger. type Provider struct { @@ -45,9 +40,8 @@ type Provider struct { // scheme to specify when creating a recorder scheme *runtime.Scheme // logger is the logger to use when logging diagnostic event info - logger logr.Logger - evtClient eventsv1client.EventsV1Interface - makeBroadcaster EventBroadcasterProducer + logger logr.Logger + evtClient eventsv1client.EventsV1Interface broadcasterOnce sync.Once broadcaster events.EventBroadcaster @@ -133,18 +127,9 @@ func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.S return p, nil } -// GetEventRecorder returns an event recorder that broadcasts to this provider's -// broadcaster. All events will be associated with a component of the given name. -func (p *Provider) GetEventRecorder(name string) events.EventRecorder { - return &lazyRecorder{ - prov: p, - name: name, - } -} - -// GetOldEventRecorder returns an event recorder that broadcasts to this provider's +// GetEventRecorderFor returns an event recorder that broadcasts to this provider's // broadcaster. All events will be associated with a component of the given name. -func (p *Provider) GetOldEventRecorder(name string) record.EventRecorder { +func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder { return &oldRecorder{ newRecorder: &lazyRecorder{ prov: p, @@ -153,6 +138,15 @@ func (p *Provider) GetOldEventRecorder(name string) record.EventRecorder { } } +// GetEventRecorder returns an event recorder that broadcasts to this provider's +// broadcaster. All events will be associated with a component of the given name. +func (p *Provider) GetEventRecorder(name string) events.EventRecorder { + return &lazyRecorder{ + prov: p, + name: name, + } +} + // lazyRecorder is a recorder that doesn't actually instantiate any underlying // recorder until the first event is emitted. type lazyRecorder struct { @@ -171,7 +165,7 @@ func (l *lazyRecorder) ensureRecording() { }) } -func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { +func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...any) { l.ensureRecording() l.prov.lock.RLock() @@ -192,10 +186,10 @@ func (l *oldRecorder) Event(object runtime.Object, eventtype, reason, message st l.newRecorder.Eventf(object, nil, eventtype, reason, "unsupported", message) } -func (l *oldRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { +func (l *oldRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) { l.newRecorder.Eventf(object, nil, eventtype, reason, "unsupported", messageFmt, args...) } -func (l *oldRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { +func (l *oldRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) { l.newRecorder.Eventf(object, nil, eventtype, reason, "unsupported", messageFmt, args...) } diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index e29e2497f0..c278fbde79 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -43,7 +43,7 @@ var _ = Describe("recorder", func() { Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") - recorder := cm.GetEventRecorder("test-recorder") + recorder := cm.GetEventRecorderFor("test-recorder") instance, err := controller.New("foo-controller", cm, controller.Options{ Reconciler: reconcile.Func( func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { diff --git a/pkg/internal/recorder/recorder_test.go b/pkg/internal/recorder/recorder_test.go index 1aabeffa0f..4865664f99 100644 --- a/pkg/internal/recorder/recorder_test.go +++ b/pkg/internal/recorder/recorder_test.go @@ -21,15 +21,13 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/internal/recorder" ) var _ = Describe("recorder.Provider", func() { - makeBroadcaster := func() (record.EventBroadcaster, bool) { return record.NewBroadcaster(), true } Describe("NewProvider", func() { It("should return a provider instance and a nil error.", func() { - provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) + provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), nil, true) Expect(provider).NotTo(BeNil()) Expect(err).NotTo(HaveOccurred()) }) @@ -38,17 +36,17 @@ var _ = Describe("recorder.Provider", func() { // Invalid the config cfg1 := *cfg cfg1.Host = "invalid host" - _, err := recorder.NewProvider(&cfg1, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) + _, err := recorder.NewProvider(&cfg1, httpClient, scheme.Scheme, logr.Discard(), nil, true) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("failed to init client")) }) }) - Describe("GetEventRecorder", func() { + Describe("GetEventRecorderFor", func() { It("should return a recorder instance.", func() { - provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) + provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), nil, true) Expect(err).NotTo(HaveOccurred()) - recorder := provider.GetEventRecorder("test") + recorder := provider.GetEventRecorderFor("test") Expect(recorder).NotTo(BeNil()) }) }) diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index 641762a971..6c013e7992 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -128,7 +128,7 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op coordinationClient, resourcelock.ResourceLockConfig{ Identity: id, - EventRecorder: recorderProvider.GetOldEventRecorder(id), + EventRecorder: recorderProvider.GetEventRecorderFor(id), }, options.LeaderLabels, ) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index b082f4c97b..6790d53bcd 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -256,12 +256,12 @@ func (cm *controllerManager) GetCache() cache.Cache { return cm.cluster.GetCache() } -func (cm *controllerManager) GetEventRecorder(name string) events.EventRecorder { - return cm.cluster.GetEventRecorder(name) +func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder { + return cm.cluster.GetEventRecorderFor(name) } -func (cm *controllerManager) GetOldEventRecorder(name string) record.EventRecorder { - return cm.cluster.GetOldEventRecorder(name) +func (cm *controllerManager) GetEventRecorder(name string) events.EventRecorder { + return cm.cluster.GetEventRecorder(name) } func (cm *controllerManager) GetRESTMapper() meta.RESTMapper { diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index ad96e28188..07292087bc 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -40,6 +40,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection/resourcelock" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" @@ -106,7 +107,7 @@ var _ = Describe("manger.Manager", func() { It("should return an error it can't create a recorder.Provider", func() { m, err := New(cfg, Options{ - newRecorderProvider: func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ intrec.EventBroadcasterProducer) (*intrec.Provider, error) { + newRecorderProvider: func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ events.EventBroadcaster, _ bool) (*intrec.Provider, error) { return nil, fmt.Errorf("expected error") }, }) @@ -1814,7 +1815,7 @@ var _ = Describe("manger.Manager", func() { ns := corev1.Namespace{} ns.Name = "default" - recorder := m.GetEventRecorder("rock-and-roll") + recorder := m.GetEventRecorderFor("rock-and-roll") Expect(m.Add(RunnableFunc(func(_ context.Context) error { recorder.Event(&ns, "Warning", "BallroomBlitz", "yeah, yeah, yeah-yeah-yeah") return nil @@ -1935,7 +1936,7 @@ var _ = Describe("manger.Manager", func() { It("should provide a function to get the EventRecorder", func() { m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) - Expect(m.GetEventRecorder("test")).NotTo(BeNil()) + Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) }) It("should provide a function to get the APIReader", func() { m, err := New(cfg, Options{}) diff --git a/pkg/recorder/example_test.go b/pkg/recorder/example_test.go index 6aaf67b6ed..969420d817 100644 --- a/pkg/recorder/example_test.go +++ b/pkg/recorder/example_test.go @@ -30,7 +30,7 @@ var ( func Example_event() { // recorderProvider is a recorder.Provider - recorder := recorderProvider.GetEventRecorder("my-controller") + recorder := recorderProvider.GetEventRecorderFor("my-controller") // emit an event with a fixed message recorder.Event(somePod, corev1.EventTypeWarning, @@ -39,7 +39,7 @@ func Example_event() { func Example_eventf() { // recorderProvider is a recorder.Provider - recorder := recorderProvider.GetEventRecorder("my-controller") + recorder := recorderProvider.GetEventRecorderFor("my-controller") // emit an event with a variable message mildCheese := "Wensleydale" diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go index cd0b52922a..2dee673a91 100644 --- a/pkg/recorder/recorder.go +++ b/pkg/recorder/recorder.go @@ -27,9 +27,11 @@ import ( // Provider knows how to generate new event recorders with given name. type Provider interface { - // NewRecorder returns an EventRecorder with given name. - GetEventRecorder(name string) events.EventRecorder - // GetOldEventRecorder returns an EventRecorder for the old events API. + // GetEventRecorder returns a EventRecorder with given name. + // + // Deprecated: this uses the old events API and will be removed in a future release. Please use GetEventRecorder instead. + GetEventRecorderFor(name string) record.EventRecorder + // GetEventRecorder returns an EventRecorder for the old events API. // The old API is not 100% supported anymore, use the new one whenever possible. - GetOldEventRecorder(name string) record.EventRecorder + GetEventRecorder(name string) events.EventRecorder } From 709d5522a944cca28a19785d9541a009cd39f1aa Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Thu, 24 Jul 2025 15:14:49 +0200 Subject: [PATCH 05/13] Remove StartEventWatcher call The StartEventWatcher call is no longer needed since calling StartRecordingToSink already sets up a watcher and manages it via its context. Signed-off-by: Borja Clemente --- pkg/internal/recorder/recorder.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 9fc5610b0b..143a02f2ca 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/go-logr/logr" - eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/runtime" eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" @@ -96,17 +95,7 @@ func (p *Provider) getBroadcaster() events.EventBroadcaster { p.broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: p.evtClient}) } // TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider. - p.broadcaster.StartRecordingToSink(nil) - - // TODO(clebs): figure out if we still need this and how the change would make sense. - p.broadcaster.StartEventWatcher( - func(obj runtime.Object) { - if e, ok := obj.(*eventsv1.Event); ok { - p.logger.V(1).Info(e.Note, "type", e.Type, "object", e.Regarding, "related", e.Related, "reason", e.Reason) - } else { - p.logger.V(1).Info("event watcher received an unsupported object type", "gvk", obj.GetObjectKind().GroupVersionKind().String()) - } - }) + p.broadcaster.StartRecordingToSinkWithContext(context.TODO()) }) return p.broadcaster From 047d2b287bb8c2baf747f32b5e793f5564e871dc Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Thu, 24 Jul 2025 16:45:32 +0200 Subject: [PATCH 06/13] Fix linting issues Signed-off-by: Borja Clemente --- pkg/cluster/cluster_test.go | 2 +- pkg/internal/recorder/recorder.go | 2 +- pkg/internal/recorder/recorder_integration_test.go | 2 +- pkg/leaderelection/leader_election.go | 2 +- pkg/manager/internal.go | 2 +- pkg/manager/manager.go | 9 +++------ pkg/manager/manager_test.go | 4 ++-- 7 files changed, 10 insertions(+), 13 deletions(-) diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 90292feb60..705c1394eb 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -158,7 +158,7 @@ var _ = Describe("cluster.Cluster", func() { It("should provide a function to get the EventRecorder", func() { c, err := New(cfg) Expect(err).NotTo(HaveOccurred()) - Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) + Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck }) It("should provide a function to get the APIReader", func() { c, err := New(cfg) diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 143a02f2ca..08db9e05ae 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -95,7 +95,7 @@ func (p *Provider) getBroadcaster() events.EventBroadcaster { p.broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: p.evtClient}) } // TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider. - p.broadcaster.StartRecordingToSinkWithContext(context.TODO()) + _ = p.broadcaster.StartRecordingToSinkWithContext(context.TODO()) }) return p.broadcaster diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index c278fbde79..f1fd74f49c 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -43,7 +43,7 @@ var _ = Describe("recorder", func() { Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") - recorder := cm.GetEventRecorderFor("test-recorder") + recorder := cm.GetEventRecorderFor("test-recorder") //nolint:staticcheck instance, err := controller.New("foo-controller", cm, controller.Options{ Reconciler: reconcile.Func( func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index 6c013e7992..09534df3f0 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -128,7 +128,7 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op coordinationClient, resourcelock.ResourceLockConfig{ Identity: id, - EventRecorder: recorderProvider.GetEventRecorderFor(id), + EventRecorder: recorderProvider.GetEventRecorderFor(id), //nolint:staticcheck }, options.LeaderLabels, ) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 6790d53bcd..859a75b947 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -257,7 +257,7 @@ func (cm *controllerManager) GetCache() cache.Cache { } func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder { - return cm.cluster.GetEventRecorderFor(name) + return cm.cluster.GetEventRecorderFor(name) //nolint:staticcheck } func (cm *controllerManager) GetEventRecorder(name string) events.EventRecorder { diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 04d0f1a639..61f820e590 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -331,10 +331,7 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, errors.New("must specify Config") } // Set default values for options fields - options, err := setOptionsDefaults(config, options) - if err != nil { - return nil, err - } + options = setOptionsDefaults(config, options) cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { clusterOptions.Scheme = options.Scheme @@ -490,7 +487,7 @@ func defaultBaseContext() context.Context { } // setOptionsDefaults set default values for Options fields. -func setOptionsDefaults(config *rest.Config, options Options) (Options, error) { +func setOptionsDefaults(_ *rest.Config, options Options) Options { // Allow newResourceLock to be mocked if options.newResourceLock == nil { options.newResourceLock = leaderelection.NewResourceLock @@ -554,5 +551,5 @@ func setOptionsDefaults(config *rest.Config, options Options) (Options, error) { options.WebhookServer = webhook.NewServer(webhook.Options{}) } - return options, nil + return options } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 07292087bc..7f081e5145 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1815,7 +1815,7 @@ var _ = Describe("manger.Manager", func() { ns := corev1.Namespace{} ns.Name = "default" - recorder := m.GetEventRecorderFor("rock-and-roll") + recorder := m.GetEventRecorderFor("rock-and-roll") //nolint:staticcheck Expect(m.Add(RunnableFunc(func(_ context.Context) error { recorder.Event(&ns, "Warning", "BallroomBlitz", "yeah, yeah, yeah-yeah-yeah") return nil @@ -1936,7 +1936,7 @@ var _ = Describe("manger.Manager", func() { It("should provide a function to get the EventRecorder", func() { m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) - Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) + Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck }) It("should provide a function to get the APIReader", func() { m, err := New(cfg, Options{}) From be84c84f219fcec11aff835b0b3e124b36fb61d4 Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Wed, 30 Jul 2025 16:01:29 +0200 Subject: [PATCH 07/13] Improve message when no action is provided for an event. Signed-off-by: Borja Clemente --- pkg/internal/recorder/recorder.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 08db9e05ae..f66626da4e 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -172,13 +172,13 @@ type oldRecorder struct { } func (l *oldRecorder) Event(object runtime.Object, eventtype, reason, message string) { - l.newRecorder.Eventf(object, nil, eventtype, reason, "unsupported", message) + l.newRecorder.Eventf(object, nil, eventtype, reason, "no action", message) } func (l *oldRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) { - l.newRecorder.Eventf(object, nil, eventtype, reason, "unsupported", messageFmt, args...) + l.newRecorder.Eventf(object, nil, eventtype, reason, "no action", messageFmt, args...) } func (l *oldRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) { - l.newRecorder.Eventf(object, nil, eventtype, reason, "unsupported", messageFmt, args...) + l.newRecorder.Eventf(object, nil, eventtype, reason, "no action", messageFmt, args...) } From 03d8e1a62ddd305c69984fdf606eb922441952a7 Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Sun, 3 Aug 2025 18:23:41 +0200 Subject: [PATCH 08/13] Support both events APIs Signed-off-by: Borja Clemente --- pkg/cluster/cluster.go | 35 ++++++++- pkg/cluster/cluster_test.go | 3 +- pkg/internal/recorder/recorder.go | 105 ++++++++++++++++++------- pkg/internal/recorder/recorder_test.go | 16 +++- pkg/manager/manager.go | 51 ++++++++++-- pkg/manager/manager_test.go | 3 +- 6 files changed, 166 insertions(+), 47 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 80c457ab0a..cd50e1e0a0 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -25,8 +25,10 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/events" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -126,10 +128,16 @@ type Options struct { // // Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers // is shorter than the lifetime of your process. - EventBroadcaster events.EventBroadcaster + EventBroadcaster record.EventBroadcaster + + // makeBroadcaster allows deferring the creation of the broadcaster to + // avoid leaking goroutines if we never call Start on this manager. It also + // returns whether or not this is a "owned" broadcaster, and as such should be + // stopped with the manager. + makeBroadcaster intrec.EventBroadcasterProducer // Dependency injection for testing - newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, broadcaster events.EventBroadcaster, stopWithProvider bool) (*intrec.Provider, error) + newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error) } // Option can be used to manipulate Options. @@ -223,7 +231,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) { // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific // to the particular controller that it's being injected into, rather than a generic one like is here. // Stop the broadcaster with the provider only if the broadcaster is externally given (aka non-nil). - recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.EventBroadcaster, options.EventBroadcaster == nil) + recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } @@ -276,6 +284,27 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) { options.newRecorderProvider = intrec.NewProvider } + // This is duplicated with pkg/manager, we need it here to provide + // the user with an EventBroadcaster and there for the Leader election + evtCl, err := eventsv1client.NewForConfigAndClient(config, options.HTTPClient) + if err != nil { + return options, err + } + + // This is duplicated with pkg/manager, we need it here to provide + // the user with an EventBroadcaster and there for the Leader election + if options.EventBroadcaster == nil { + // defer initialization to avoid leaking by default + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true + } + } else { + // keep supporting the options.EventBroadcaster in the old API, but do not introduce it for the new one. + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return options.EventBroadcaster, events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), false + } + } + if options.Logger.GetSink() == nil { options.Logger = logf.RuntimeLog.WithName("cluster") } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 705c1394eb..de756802c0 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/events" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" @@ -87,7 +86,7 @@ var _ = Describe("cluster.Cluster", func() { It("should return an error it can't create a recorder.Provider", func() { c, err := New(cfg, func(o *Options) { - o.newRecorderProvider = func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ events.EventBroadcaster, _ bool) (*intrec.Provider, error) { + o.newRecorderProvider = func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ intrec.EventBroadcasterProducer) (*intrec.Provider, error) { return nil, fmt.Errorf("expected error") } }) diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index f66626da4e..7772587da6 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -23,13 +23,20 @@ import ( "sync" "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" ) +// EventBroadcasterProducer makes an event broadcaster, returning +// whether or not the broadcaster should be stopped with the Provider, +// or not (e.g. if it's shared, it shouldn't be stopped with the Provider). +// This producer currently produces both a +type EventBroadcasterProducer func() (deprecatedCaster record.EventBroadcaster, caster events.EventBroadcaster, stopWithProvider bool) + // Provider is a recorder.Provider that records events to the k8s API server // and to a logr Logger. type Provider struct { @@ -39,12 +46,15 @@ type Provider struct { // scheme to specify when creating a recorder scheme *runtime.Scheme // logger is the logger to use when logging diagnostic event info - logger logr.Logger - evtClient eventsv1client.EventsV1Interface + logger logr.Logger + evtClient corev1client.EventInterface + makeBroadcaster EventBroadcasterProducer broadcasterOnce sync.Once broadcaster events.EventBroadcaster - stopBroadcaster bool + // Deprecated: will be removed in a future release. Use the broadcaster above instead. + deprecatedBroadcaster record.EventBroadcaster + stopBroadcaster bool } // NB(directxman12): this manually implements Stop instead of Being a runnable because we need to @@ -65,10 +75,11 @@ func (p *Provider) Stop(shutdownCtx context.Context) { // almost certainly already been started (e.g. by leader election). We // need to invoke this to ensure that we don't inadvertently race with // an invocation of getBroadcaster. - broadcaster := p.getBroadcaster() + deprecatedBroadcaster, broadcaster := p.getBroadcaster() if p.stopBroadcaster { p.lock.Lock() broadcaster.Shutdown() + deprecatedBroadcaster.Shutdown() p.stopped = true p.lock.Unlock() } @@ -83,7 +94,7 @@ func (p *Provider) Stop(shutdownCtx context.Context) { // getBroadcaster ensures that a broadcaster is started for this // provider, and returns it. It's threadsafe. -func (p *Provider) getBroadcaster() events.EventBroadcaster { +func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadcaster) { // NB(directxman12): this can technically still leak if something calls // "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we // create the broadcaster in start, we could race with other things that @@ -91,39 +102,44 @@ func (p *Provider) getBroadcaster() events.EventBroadcaster { // silently swallowing events and more locking, but that seems suboptimal. p.broadcasterOnce.Do(func() { - if p.broadcaster == nil { - p.broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: p.evtClient}) - } + p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster() + + // init old broadcaster + p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient}) + p.deprecatedBroadcaster.StartEventWatcher( + func(e *corev1.Event) { + p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason) + }) + + // init new broadcaster // TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider. _ = p.broadcaster.StartRecordingToSinkWithContext(context.TODO()) }) - return p.broadcaster + return p.deprecatedBroadcaster, p.broadcaster } // NewProvider create a new Provider instance. -func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, broadcaster events.EventBroadcaster, stopWithProvider bool) (*Provider, error) { +func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) { if httpClient == nil { panic("httpClient must not be nil") } - eventsv1Client, err := eventsv1client.NewForConfigAndClient(config, httpClient) + corev1Client, err := corev1client.NewForConfigAndClient(config, httpClient) if err != nil { return nil, fmt.Errorf("failed to init client: %w", err) } - p := &Provider{scheme: scheme, logger: logger, broadcaster: broadcaster, stopBroadcaster: stopWithProvider, evtClient: eventsv1Client} + p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: corev1Client.Events("")} return p, nil } // GetEventRecorderFor returns an event recorder that broadcasts to this provider's // broadcaster. All events will be associated with a component of the given name. func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder { - return &oldRecorder{ - newRecorder: &lazyRecorder{ - prov: p, - name: name, - }, + return &deprecatedRecorder{ + prov: p, + name: name, } } @@ -149,7 +165,7 @@ type lazyRecorder struct { // ensureRecording ensures that a concrete recorder is populated for this recorder. func (l *lazyRecorder) ensureRecording() { l.recOnce.Do(func() { - broadcaster := l.prov.getBroadcaster() + _, broadcaster := l.prov.getBroadcaster() l.rec = broadcaster.NewRecorder(l.prov.scheme, l.name) }) } @@ -164,21 +180,50 @@ func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, l.prov.lock.RUnlock() } -// oldRecorder is a wrapper around the events.EventRecorder that implements the old record.EventRecorder API. -// This is a temporary solution to support both the old and new events APIs without duplicating everything. -// Internally it calls the new events API from the old API funcs and no longer supported parameters are ignored (e.g. annotations). -type oldRecorder struct { - newRecorder *lazyRecorder +// deprecatedRecorder implements the old events API during the tranisiton and will be removed in a future release. +// Deprecated: will be removed in a future release. +type deprecatedRecorder struct { + prov *Provider + name string + + recOnce sync.Once + rec record.EventRecorder } -func (l *oldRecorder) Event(object runtime.Object, eventtype, reason, message string) { - l.newRecorder.Eventf(object, nil, eventtype, reason, "no action", message) +// ensureRecording ensures that a concrete recorder is populated for this recorder. +func (l *deprecatedRecorder) ensureRecording() { + l.recOnce.Do(func() { + deprecatedBroadcaster, _ := l.prov.getBroadcaster() + l.rec = deprecatedBroadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name}) + }) } -func (l *oldRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) { - l.newRecorder.Eventf(object, nil, eventtype, reason, "no action", messageFmt, args...) +func (l *deprecatedRecorder) Event(object runtime.Object, eventtype, reason, message string) { + l.ensureRecording() + + l.prov.lock.RLock() + if !l.prov.stopped { + l.rec.Event(object, eventtype, reason, message) + } + l.prov.lock.RUnlock() +} + +func (l *deprecatedRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) { + l.ensureRecording() + + l.prov.lock.RLock() + if !l.prov.stopped { + l.rec.Eventf(object, eventtype, reason, messageFmt, args...) + } + l.prov.lock.RUnlock() } -func (l *oldRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) { - l.newRecorder.Eventf(object, nil, eventtype, reason, "no action", messageFmt, args...) +func (l *deprecatedRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) { + l.ensureRecording() + + l.prov.lock.RLock() + if !l.prov.stopped { + l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...) + } + l.prov.lock.RUnlock() } diff --git a/pkg/internal/recorder/recorder_test.go b/pkg/internal/recorder/recorder_test.go index 4865664f99..cf51ee8f8d 100644 --- a/pkg/internal/recorder/recorder_test.go +++ b/pkg/internal/recorder/recorder_test.go @@ -21,13 +21,23 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" + "k8s.io/client-go/tools/events" + "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/internal/recorder" ) var _ = Describe("recorder.Provider", func() { + evtCl, err := eventsv1client.NewForConfigAndClient(cfg, httpClient) + Expect(err).NotTo(HaveOccurred()) + + makeBroadcaster := func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true + } + Describe("NewProvider", func() { It("should return a provider instance and a nil error.", func() { - provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), nil, true) + provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) Expect(provider).NotTo(BeNil()) Expect(err).NotTo(HaveOccurred()) }) @@ -36,14 +46,14 @@ var _ = Describe("recorder.Provider", func() { // Invalid the config cfg1 := *cfg cfg1.Host = "invalid host" - _, err := recorder.NewProvider(&cfg1, httpClient, scheme.Scheme, logr.Discard(), nil, true) + _, err := recorder.NewProvider(&cfg1, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("failed to init client")) }) }) Describe("GetEventRecorderFor", func() { It("should return a recorder instance.", func() { - provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), nil, true) + provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) Expect(err).NotTo(HaveOccurred()) recorder := provider.GetEventRecorderFor("test") diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 61f820e590..69191ee5a9 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -29,9 +29,11 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -261,7 +263,7 @@ type Options struct { // // Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers // is shorter than the lifetime of your process. - EventBroadcaster events.EventBroadcaster + EventBroadcaster record.EventBroadcaster // GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop. // To disable graceful shutdown, set to time.Duration(0) @@ -274,8 +276,14 @@ type Options struct { // +optional Controller config.Controller + // makeBroadcaster allows deferring the creation of the broadcaster to + // avoid leaking goroutines if we never call Start on this manager. It also + // returns whether or not this is a "owned" broadcaster, and as such should be + // stopped with the manager. + makeBroadcaster intrec.EventBroadcasterProducer + // Dependency injection for testing - newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, broadcaster events.EventBroadcaster, stopWithProvider bool) (*intrec.Provider, error) + newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error) newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) newMetricsServer func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) newHealthProbeListener func(addr string) (net.Listener, error) @@ -331,7 +339,11 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, errors.New("must specify Config") } // Set default values for options fields - options = setOptionsDefaults(config, options) + options, err := setOptionsDefaults(config, options) + if err != nil { + options.Logger.Error(err, "Failed to set defaults") + return nil, err + } cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { clusterOptions.Scheme = options.Scheme @@ -355,7 +367,7 @@ func New(config *rest.Config, options Options) (Manager, error) { // Create the recorder provider to inject event recorders for the components. // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific // to the particular controller that it's being injected into, rather than a generic one like is here. - recorderProvider, err := options.newRecorderProvider(config, cluster.GetHTTPClient(), cluster.GetScheme(), options.Logger.WithName("events"), options.EventBroadcaster, options.EventBroadcaster == nil) + recorderProvider, err := options.newRecorderProvider(config, cluster.GetHTTPClient(), cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } @@ -382,7 +394,7 @@ func New(config *rest.Config, options Options) (Manager, error) { if err != nil { return nil, err } - leaderRecorderProvider, err = options.newRecorderProvider(leaderConfig, httpClient, scheme, options.Logger.WithName("events"), options.EventBroadcaster, options.EventBroadcaster == nil) + leaderRecorderProvider, err = options.newRecorderProvider(leaderConfig, httpClient, scheme, options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } @@ -487,7 +499,7 @@ func defaultBaseContext() context.Context { } // setOptionsDefaults set default values for Options fields. -func setOptionsDefaults(_ *rest.Config, options Options) Options { +func setOptionsDefaults(config *rest.Config, options Options) (Options, error) { // Allow newResourceLock to be mocked if options.newResourceLock == nil { options.newResourceLock = leaderelection.NewResourceLock @@ -498,6 +510,31 @@ func setOptionsDefaults(_ *rest.Config, options Options) Options { options.newRecorderProvider = intrec.NewProvider } + // This is duplicated with pkg/cluster, we need it here + // for the leader election and there to provide the user with + // an EventBroadcaster + httpClient, err := rest.HTTPClientFor(config) + if err != nil { + return options, err + } + + evtCl, err := eventsv1client.NewForConfigAndClient(config, httpClient) + if err != nil { + return options, err + } + + if options.EventBroadcaster == nil { + // defer initialization to avoid leaking by default + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true + } + } else { + // keep supporting the options.EventBroadcaster in the old API, but do not introduce it for the new one. + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return options.EventBroadcaster, events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), false + } + } + if options.newMetricsServer == nil { options.newMetricsServer = metricsserver.NewServer } @@ -551,5 +588,5 @@ func setOptionsDefaults(_ *rest.Config, options Options) Options { options.WebhookServer = webhook.NewServer(webhook.Options{}) } - return options + return options, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 7f081e5145..f1b8ef4724 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -40,7 +40,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection/resourcelock" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" @@ -107,7 +106,7 @@ var _ = Describe("manger.Manager", func() { It("should return an error it can't create a recorder.Provider", func() { m, err := New(cfg, Options{ - newRecorderProvider: func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ events.EventBroadcaster, _ bool) (*intrec.Provider, error) { + newRecorderProvider: func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ intrec.EventBroadcasterProducer) (*intrec.Provider, error) { return nil, fmt.Errorf("expected error") }, }) From adfebedabbfc5c59c1bb7893cfb2e84502d7e10f Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Thu, 7 Aug 2025 11:23:48 +0200 Subject: [PATCH 09/13] Add unit tests for the new events API Signed-off-by: Borja Clemente --- pkg/cluster/cluster_test.go | 6 +++ pkg/internal/recorder/recorder.go | 6 +-- pkg/internal/recorder/recorder_test.go | 33 ++++++++++----- pkg/manager/manager_test.go | 57 +++++++++++++++++++++++++- pkg/recorder/example_test.go | 20 ++++++--- 5 files changed, 98 insertions(+), 24 deletions(-) diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index de756802c0..c275ff0bf5 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -155,6 +155,12 @@ var _ = Describe("cluster.Cluster", func() { }) It("should provide a function to get the EventRecorder", func() { + c, err := New(cfg) + Expect(err).NotTo(HaveOccurred()) + Expect(c.GetEventRecorder("test")).NotTo(BeNil()) + }) + + It("should provide a function to get the deprecated EventRecorder", func() { c, err := New(cfg) Expect(err).NotTo(HaveOccurred()) Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 7772587da6..7cfd2ae0e8 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -104,16 +104,12 @@ func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadc p.broadcasterOnce.Do(func() { p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster() - // init old broadcaster + // init deprecated broadcaster (new broadcaster does not need to be initialised) p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient}) p.deprecatedBroadcaster.StartEventWatcher( func(e *corev1.Event) { p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason) }) - - // init new broadcaster - // TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider. - _ = p.broadcaster.StartRecordingToSinkWithContext(context.TODO()) }) return p.deprecatedBroadcaster, p.broadcaster diff --git a/pkg/internal/recorder/recorder_test.go b/pkg/internal/recorder/recorder_test.go index cf51ee8f8d..e592a1e189 100644 --- a/pkg/internal/recorder/recorder_test.go +++ b/pkg/internal/recorder/recorder_test.go @@ -28,16 +28,9 @@ import ( ) var _ = Describe("recorder.Provider", func() { - evtCl, err := eventsv1client.NewForConfigAndClient(cfg, httpClient) - Expect(err).NotTo(HaveOccurred()) - - makeBroadcaster := func() (record.EventBroadcaster, events.EventBroadcaster, bool) { - return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true - } - Describe("NewProvider", func() { It("should return a provider instance and a nil error.", func() { - provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) + provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster()) Expect(provider).NotTo(BeNil()) Expect(err).NotTo(HaveOccurred()) }) @@ -46,18 +39,36 @@ var _ = Describe("recorder.Provider", func() { // Invalid the config cfg1 := *cfg cfg1.Host = "invalid host" - _, err := recorder.NewProvider(&cfg1, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) + _, err := recorder.NewProvider(&cfg1, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("failed to init client")) }) }) Describe("GetEventRecorderFor", func() { - It("should return a recorder instance.", func() { - provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) + It("should return a deprecated recorder instance.", func() { + provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster()) Expect(err).NotTo(HaveOccurred()) recorder := provider.GetEventRecorderFor("test") Expect(recorder).NotTo(BeNil()) }) }) + Describe("GetEventRecorder", func() { + It("should return a recorder instance.", func() { + provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster()) + Expect(err).NotTo(HaveOccurred()) + + recorder := provider.GetEventRecorder("test") + Expect(recorder).NotTo(BeNil()) + }) + }) }) + +func makeBroadcaster() func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + evtCl, err := eventsv1client.NewForConfigAndClient(cfg, httpClient) + Expect(err).NotTo(HaveOccurred()) + + return func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true + } +} diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index f1b8ef4724..f14fa65a09 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1804,7 +1804,7 @@ var _ = Describe("manger.Manager", func() { Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) }) - It("should not leak goroutines if the default event broadcaster is used & events are emitted", func(specCtx SpecContext) { + It("should not leak goroutines if the deprecated event broadcaster is used & events are emitted", func(specCtx SpecContext) { currentGRs := goleak.IgnoreCurrent() m, err := New(cfg, Options{ /* implicit: default setting for EventBroadcaster */ }) @@ -1852,6 +1852,54 @@ var _ = Describe("manger.Manager", func() { Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) }) + It("should not leak goroutines if the default event broadcaster is used & events are emitted", func(specCtx SpecContext) { + currentGRs := goleak.IgnoreCurrent() + + m, err := New(cfg, Options{ /* implicit: default setting for EventBroadcaster */ }) + Expect(err).NotTo(HaveOccurred()) + + By("adding a runnable that emits an event") + ns := corev1.Namespace{} + ns.Name = "default" + + recorder := m.GetEventRecorder("rock-and-roll") + Expect(m.Add(RunnableFunc(func(_ context.Context) error { + recorder.Eventf(&ns, nil, "Warning", "BallroomBlitz", "dance action", "yeah, yeah, yeah-yeah-yeah") + return nil + }))).To(Succeed()) + + By("starting the manager & waiting till we've sent our event") + ctx, cancel := context.WithCancel(specCtx) + doneCh := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(doneCh) + Expect(m.Start(ctx)).To(Succeed()) + }() + <-m.Elected() + + Eventually(func() *corev1.Event { + evts, err := clientset.CoreV1().Events("").SearchWithContext(ctx, m.GetScheme(), &ns) + Expect(err).NotTo(HaveOccurred()) + + for i, evt := range evts.Items { + if evt.Reason == "BallroomBlitz" { + return &evts.Items[i] + } + } + return nil + }).ShouldNot(BeNil()) + + By("making sure there's no extra go routines still running after we stop") + cancel() + <-doneCh + + // force-close keep-alive connections. These'll time anyway (after + // like 30s or so) but force it to speed up the tests. + clientTransport.CloseIdleConnections() + Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) + }) + It("should not leak goroutines when a runnable returns error slowly after being signaled to stop", func(specCtx SpecContext) { // This test reproduces the race condition where the manager's Start method // exits due to context cancellation, leaving no one to drain errChan @@ -1932,11 +1980,16 @@ var _ = Describe("manger.Manager", func() { Expect(m.GetFieldIndexer()).To(Equal(mgr.cluster.GetFieldIndexer())) }) - It("should provide a function to get the EventRecorder", func() { + It("should provide a function to get the deprecated EventRecorder", func() { m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck }) + It("should provide a function to get the EventRecorder", func() { + m, err := New(cfg, Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(m.GetEventRecorder("test")).NotTo(BeNil()) + }) It("should provide a function to get the APIReader", func() { m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/recorder/example_test.go b/pkg/recorder/example_test.go index 969420d817..47f14ff715 100644 --- a/pkg/recorder/example_test.go +++ b/pkg/recorder/example_test.go @@ -18,31 +18,39 @@ package recorder_test import ( corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" _ "github.com/onsi/ginkgo/v2" "sigs.k8s.io/controller-runtime/pkg/recorder" ) var ( - recorderProvider recorder.Provider - somePod *corev1.Pod // the object you're reconciling, for example + recorderProvider recorder.Provider + somePod *corev1.Pod // the object you're reconciling, for example + someRelatedObject runtime.Object // another object related to the reconciled object and the event. ) func Example_event() { // recorderProvider is a recorder.Provider - recorder := recorderProvider.GetEventRecorderFor("my-controller") + deprecatedRecorder := recorderProvider.GetEventRecorderFor("my-controller") // emit an event with a fixed message - recorder.Event(somePod, corev1.EventTypeWarning, + deprecatedRecorder.Event(somePod, corev1.EventTypeWarning, "WrongTrousers", "It's the wrong trousers, Gromit!") } func Example_eventf() { // recorderProvider is a recorder.Provider - recorder := recorderProvider.GetEventRecorderFor("my-controller") + deprecatedRecorder := recorderProvider.GetEventRecorderFor("my-controller") // emit an event with a variable message mildCheese := "Wensleydale" - recorder.Eventf(somePod, corev1.EventTypeNormal, + deprecatedRecorder.Eventf(somePod, corev1.EventTypeNormal, "DislikesCheese", "Not even %s?", mildCheese) + + recorder := recorderProvider.GetEventRecorder("my-controller") + + // emit an event with a fixed message + recorder.Eventf(somePod, someRelatedObject, corev1.EventTypeWarning, + "WrongTrousers", "getting dressed", "It's the wrong trousers, Gromit!") } From c0481cb88533f1ce9687f83afda6eae3f3c69f73 Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Fri, 8 Aug 2025 22:02:44 +0200 Subject: [PATCH 10/13] Fix new API not broadcasting events and add integration test Signed-off-by: Borja Clemente --- pkg/cluster/cluster.go | 1 - pkg/internal/recorder/recorder.go | 17 +++- .../recorder/recorder_integration_test.go | 93 ++++++++++++++++++- pkg/recorder/recorder.go | 4 +- 4 files changed, 107 insertions(+), 8 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index cd50e1e0a0..846e4f71c7 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -230,7 +230,6 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) { // Create the recorder provider to inject event recorders for the components. // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific // to the particular controller that it's being injected into, rather than a generic one like is here. - // Stop the broadcaster with the provider only if the broadcaster is externally given (aka non-nil). recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 7cfd2ae0e8..1affed1181 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -24,6 +24,7 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/runtime" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" @@ -34,7 +35,7 @@ import ( // EventBroadcasterProducer makes an event broadcaster, returning // whether or not the broadcaster should be stopped with the Provider, // or not (e.g. if it's shared, it shouldn't be stopped with the Provider). -// This producer currently produces both a +// This producer currently produces both an old API and a new API broadcaster. type EventBroadcasterProducer func() (deprecatedCaster record.EventBroadcaster, caster events.EventBroadcaster, stopWithProvider bool) // Provider is a recorder.Provider that records events to the k8s API server @@ -52,6 +53,7 @@ type Provider struct { broadcasterOnce sync.Once broadcaster events.EventBroadcaster + stopCh chan struct{} // Deprecated: will be removed in a future release. Use the broadcaster above instead. deprecatedBroadcaster record.EventBroadcaster stopBroadcaster bool @@ -79,6 +81,7 @@ func (p *Provider) Stop(shutdownCtx context.Context) { if p.stopBroadcaster { p.lock.Lock() broadcaster.Shutdown() + close(p.stopCh) deprecatedBroadcaster.Shutdown() p.stopped = true p.lock.Unlock() @@ -104,12 +107,22 @@ func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadc p.broadcasterOnce.Do(func() { p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster() - // init deprecated broadcaster (new broadcaster does not need to be initialised) + // init deprecated broadcaster p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient}) p.deprecatedBroadcaster.StartEventWatcher( func(e *corev1.Event) { p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason) }) + + // init new broadcaster + p.stopCh = make(chan struct{}) + p.broadcaster.StartRecordingToSink(p.stopCh) + _, _ = p.broadcaster.StartEventWatcher(func(event runtime.Object) { + e, isEvt := event.(*eventsv1.Event) + if isEvt { + p.logger.V(1).Info(e.Note, "type", e.Type, "object", e.Related, "action", e.Action, "reason", e.Reason) + } + }) }) return p.deprecatedBroadcaster, p.broadcaster diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index f1fd74f49c..90879df889 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -21,11 +21,14 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" ref "k8s.io/client-go/tools/reference" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -36,14 +39,14 @@ import ( ) var _ = Describe("recorder", func() { - Describe("recorder", func() { + Describe("deprecated recorder", func() { It("should publish events", func(ctx SpecContext) { By("Creating the Manager") cm, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") - recorder := cm.GetEventRecorderFor("test-recorder") //nolint:staticcheck + recorder := cm.GetEventRecorderFor("test-deprecated-recorder") //nolint:staticcheck instance, err := controller.New("foo-controller", cm, controller.Options{ Reconciler: reconcile.Func( func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { @@ -66,7 +69,7 @@ var _ = Describe("recorder", func() { }() deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"}, + ObjectMeta: metav1.ObjectMeta{Name: "deprecated-deployment-name"}, Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, @@ -108,4 +111,88 @@ var _ = Describe("recorder", func() { Expect(evt.Message).To(Equal("test-msg")) }) }) + + Describe("recorder", func() { + It("should publish events", func(ctx SpecContext) { + By("Creating the Manager") + // this test needs its own env for now to not interfere with the previous one. + // Once the deprecated API is removed this can be removed. + testenv := &envtest.Environment{} + + cfg, err := testenv.Start() + Expect(err).NotTo(HaveOccurred()) + defer testenv.Stop() //nolint:errcheck + + clientset, err := kubernetes.NewForConfig(cfg) + Expect(err).NotTo(HaveOccurred()) + + cm, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("Creating the Controller") + recorder := cm.GetEventRecorder("test-recorder") + instance, err := controller.New("bar-controller", cm, controller.Options{ + Reconciler: reconcile.Func( + func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + dp, err := clientset.AppsV1().Deployments(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + recorder.Eventf(dp, nil, corev1.EventTypeNormal, "test-reason", "test-action", "test-msg") + return reconcile.Result{}, nil + }), + }) + Expect(err).NotTo(HaveOccurred()) + + By("Watching Resources") + err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{})) + Expect(err).NotTo(HaveOccurred()) + + By("Starting the Manager") + go func() { + defer GinkgoRecover() + Expect(cm.Start(ctx)).NotTo(HaveOccurred()) + }() + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"}, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + } + + By("Invoking Reconciling") + deployment, err = clientset.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + By("Validate event is published as expected") + evtWatcher, err := clientset.EventsV1().Events("default").Watch(ctx, metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + + resultEvent := <-evtWatcher.ResultChan() + + Expect(resultEvent.Type).To(Equal(watch.Added)) + evt, isEvent := resultEvent.Object.(*eventsv1.Event) + Expect(isEvent).To(BeTrue()) + + dpRef, err := ref.GetReference(scheme.Scheme, deployment) + Expect(err).NotTo(HaveOccurred()) + + Expect(evt.Regarding).To(Equal(*dpRef)) + Expect(evt.Type).To(Equal(corev1.EventTypeNormal)) + Expect(evt.Reason).To(Equal("test-reason")) + Expect(evt.Note).To(Equal("test-msg")) + }) + }) }) diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go index 2dee673a91..cfb593591e 100644 --- a/pkg/recorder/recorder.go +++ b/pkg/recorder/recorder.go @@ -27,11 +27,11 @@ import ( // Provider knows how to generate new event recorders with given name. type Provider interface { - // GetEventRecorder returns a EventRecorder with given name. + // GetEventRecorderFor returns an EventRecorder for the old events API. // // Deprecated: this uses the old events API and will be removed in a future release. Please use GetEventRecorder instead. GetEventRecorderFor(name string) record.EventRecorder - // GetEventRecorder returns an EventRecorder for the old events API. + // GetEventRecorder returns a EventRecorder with given name. // The old API is not 100% supported anymore, use the new one whenever possible. GetEventRecorder(name string) events.EventRecorder } From 70be4c022359242f7505d9e42118fba7c9dab697 Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Tue, 12 Aug 2025 23:13:52 +0200 Subject: [PATCH 11/13] Apply review feedback - Remove outdated comment - Improve error handling when setting default Options - Replace deprecated sink recording function - log errors from StartEventWatcher Signed-off-by: Borja Clemente --- pkg/cluster/cluster.go | 4 ++-- pkg/internal/recorder/recorder.go | 19 ++++++++++++------- pkg/manager/manager.go | 3 +-- pkg/recorder/recorder.go | 1 - 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 846e4f71c7..ee14638c3f 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -19,6 +19,7 @@ package cluster import ( "context" "errors" + "fmt" "net/http" "github.com/go-logr/logr" @@ -162,8 +163,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) { } options, err := setOptionsDefaults(options, config) if err != nil { - options.Logger.Error(err, "Failed to set defaults") - return nil, err + return nil, fmt.Errorf("failed setting cluster default options: %w", err) } // Create the mapper provider diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 1affed1181..5c38f59797 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -51,9 +51,9 @@ type Provider struct { evtClient corev1client.EventInterface makeBroadcaster EventBroadcasterProducer - broadcasterOnce sync.Once - broadcaster events.EventBroadcaster - stopCh chan struct{} + broadcasterOnce sync.Once + broadcaster events.EventBroadcaster + cancelSinkRecordingFunc context.CancelFunc // Deprecated: will be removed in a future release. Use the broadcaster above instead. deprecatedBroadcaster record.EventBroadcaster stopBroadcaster bool @@ -81,7 +81,7 @@ func (p *Provider) Stop(shutdownCtx context.Context) { if p.stopBroadcaster { p.lock.Lock() broadcaster.Shutdown() - close(p.stopCh) + p.cancelSinkRecordingFunc() deprecatedBroadcaster.Shutdown() p.stopped = true p.lock.Unlock() @@ -115,14 +115,18 @@ func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadc }) // init new broadcaster - p.stopCh = make(chan struct{}) - p.broadcaster.StartRecordingToSink(p.stopCh) - _, _ = p.broadcaster.StartEventWatcher(func(event runtime.Object) { + ctx, cancel := context.WithCancel(context.Background()) + p.cancelSinkRecordingFunc = cancel + p.broadcaster.StartRecordingToSinkWithContext(ctx) + _, err := p.broadcaster.StartEventWatcher(func(event runtime.Object) { e, isEvt := event.(*eventsv1.Event) if isEvt { p.logger.V(1).Info(e.Note, "type", e.Type, "object", e.Related, "action", e.Action, "reason", e.Reason) } }) + if err != nil { + p.logger.Error(err, "error starting event watcher for broadcaster") + } }) return p.deprecatedBroadcaster, p.broadcaster @@ -190,6 +194,7 @@ func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, } // deprecatedRecorder implements the old events API during the tranisiton and will be removed in a future release. +// // Deprecated: will be removed in a future release. type deprecatedRecorder struct { prov *Provider diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 69191ee5a9..74983ddcea 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -341,8 +341,7 @@ func New(config *rest.Config, options Options) (Manager, error) { // Set default values for options fields options, err := setOptionsDefaults(config, options) if err != nil { - options.Logger.Error(err, "Failed to set defaults") - return nil, err + return nil, fmt.Errorf("failed setting manager default options: %w", err) } cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go index cfb593591e..b34fecb525 100644 --- a/pkg/recorder/recorder.go +++ b/pkg/recorder/recorder.go @@ -32,6 +32,5 @@ type Provider interface { // Deprecated: this uses the old events API and will be removed in a future release. Please use GetEventRecorder instead. GetEventRecorderFor(name string) record.EventRecorder // GetEventRecorder returns a EventRecorder with given name. - // The old API is not 100% supported anymore, use the new one whenever possible. GetEventRecorder(name string) events.EventRecorder } From f06675ed44cc5db0e3c0f7f9a8ef8e81eecf3095 Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Tue, 12 Aug 2025 23:44:43 +0200 Subject: [PATCH 12/13] Log errors from StartRecordingToSinkWithContext Signed-off-by: Borja Clemente --- pkg/internal/recorder/recorder.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 5c38f59797..6b82ad6d36 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -117,7 +117,10 @@ func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadc // init new broadcaster ctx, cancel := context.WithCancel(context.Background()) p.cancelSinkRecordingFunc = cancel - p.broadcaster.StartRecordingToSinkWithContext(ctx) + if err := p.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil { + p.logger.Error(err, "error starting recording for broadcaster") + } + _, err := p.broadcaster.StartEventWatcher(func(event runtime.Object) { e, isEvt := event.(*eventsv1.Event) if isEvt { From ad8a528d877774eca9fda2e2ac2ca8dae4ced694 Mon Sep 17 00:00:00 2001 From: Borja Clemente Date: Thu, 14 Aug 2025 23:18:00 +0200 Subject: [PATCH 13/13] Return early if starting to record fails Signed-off-by: Borja Clemente --- pkg/internal/recorder/recorder.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 6b82ad6d36..7636477f26 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -119,6 +119,7 @@ func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadc p.cancelSinkRecordingFunc = cancel if err := p.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil { p.logger.Error(err, "error starting recording for broadcaster") + return } _, err := p.broadcaster.StartEventWatcher(func(event runtime.Object) {