Skip to content

Commit 4289d12

Browse files
author
Zhiying Lin
committed
feat: support join & leave for member controllers
1 parent eb1c94e commit 4289d12

File tree

23 files changed

+348
-42
lines changed

23 files changed

+348
-42
lines changed

cmd/mcs-controller-manager/main.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package main
99
import (
1010
"context"
1111
"flag"
12+
"go.goms.io/fleet/pkg/utils/controller"
1213
"os"
1314
"os/signal"
1415
"sync"
@@ -241,12 +242,13 @@ func setupControllersWithManager(_ context.Context, hubMgr, memberMgr manager.Ma
241242
hubClient := hubMgr.GetClient()
242243

243244
klog.V(1).InfoS("Create multiclusterservice reconciler")
244-
if err := (&multiclusterservice.Reconciler{
245+
mcs := &multiclusterservice.Reconciler{
245246
Client: memberClient,
246247
Scheme: memberMgr.GetScheme(),
247248
FleetSystemNamespace: *fleetSystemNamespace,
248249
Recorder: memberMgr.GetEventRecorderFor(multiclusterservice.ControllerName),
249-
}).SetupWithManager(memberMgr); err != nil {
250+
}
251+
if err := mcs.SetupWithManager(memberMgr); err != nil {
250252
klog.ErrorS(err, "Unable to create multiclusterservice reconciler")
251253
return err
252254
}
@@ -269,6 +271,7 @@ func setupControllersWithManager(_ context.Context, hubMgr, memberMgr manager.Ma
269271
MemberClient: memberClient,
270272
HubClient: hubClient,
271273
AgentType: clusterv1beta1.MultiClusterServiceAgent,
274+
Controllers: []controller.MemberController{mcs},
272275
}).SetupWithManager(hubMgr); err != nil {
273276
klog.ErrorS(err, "Unable to create internalmembercluster (v1beta1 API) reconciler")
274277
return err

cmd/member-net-controller-manager/main.go

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package main
1010
import (
1111
"context"
1212
"flag"
13+
"go.goms.io/fleet/pkg/utils/controller"
1314
"os"
1415
"os/signal"
1516
"sync"
@@ -259,79 +260,94 @@ func setupControllersWithManager(ctx context.Context, hubMgr, memberMgr manager.
259260
memberClient := memberMgr.GetClient()
260261
hubClient := hubMgr.GetClient()
261262

263+
var controllers []controller.MemberController
262264
klog.V(1).InfoS("Create endpointslice controller")
263-
if err := (&endpointslice.Reconciler{
265+
endpointSliceController := &endpointslice.Reconciler{
264266
MemberClusterID: mcName,
265267
MemberClient: memberClient,
266268
HubClient: hubClient,
267269
HubNamespace: mcHubNamespace,
268-
}).SetupWithManager(ctx, memberMgr); err != nil {
270+
}
271+
if err := endpointSliceController.SetupWithManager(ctx, memberMgr); err != nil {
269272
klog.ErrorS(err, "Unable to create endpointslice controller")
270273
return err
271274
}
275+
controllers = append(controllers, endpointSliceController)
272276

273277
klog.V(1).InfoS("Create endpointsliceexport controller")
274-
if err := (&endpointsliceexport.Reconciler{
278+
endpointSliceExportController := &endpointsliceexport.Reconciler{
275279
MemberClient: memberClient,
276280
HubClient: hubClient,
277-
}).SetupWithManager(hubMgr); err != nil {
281+
}
282+
if err := endpointSliceExportController.SetupWithManager(hubMgr); err != nil {
278283
klog.ErrorS(err, "Unable to create endpointsliceexport controller")
279284
return err
280285
}
286+
controllers = append(controllers, endpointSliceExportController)
281287

282288
klog.V(1).InfoS("Create endpointsliceimport controller")
283-
if err := (&endpointsliceimport.Reconciler{
289+
endpointSliceImportController := &endpointsliceimport.Reconciler{
284290
MemberClusterID: mcName,
285291
MemberClient: memberClient,
286292
HubClient: hubClient,
287293
FleetSystemNamespace: *fleetSystemNamespace,
288-
}).SetupWithManager(ctx, memberMgr, hubMgr); err != nil {
294+
}
295+
if err := endpointSliceImportController.SetupWithManager(ctx, memberMgr, hubMgr); err != nil {
289296
klog.ErrorS(err, "Unable to create endpointsliceimport controller")
290297
return err
291298
}
299+
controllers = append(controllers, endpointSliceImportController)
292300

293301
klog.V(1).InfoS("Create internalserviceexport controller")
294-
if err := (&internalserviceexport.Reconciler{
302+
internalServiceExportController := &internalserviceexport.Reconciler{
295303
MemberClusterID: mcName,
296304
MemberClient: memberClient,
297305
HubClient: hubClient,
298306
Recorder: memberMgr.GetEventRecorderFor(internalserviceexport.ControllerName),
299-
}).SetupWithManager(hubMgr); err != nil {
307+
}
308+
if err := internalServiceExportController.SetupWithManager(hubMgr); err != nil {
300309
klog.ErrorS(err, "Unable to create internalserviceexport controller")
301310
return err
302311
}
312+
controllers = append(controllers, internalServiceExportController)
303313

304314
klog.V(1).InfoS("Create internalserviceimport controller")
305-
if err := (&internalserviceimport.Reconciler{
315+
internalServiceImportController := &internalserviceimport.Reconciler{
306316
MemberClient: memberClient,
307317
HubClient: hubClient,
308-
}).SetupWithManager(hubMgr); err != nil {
318+
}
319+
if err := internalServiceImportController.SetupWithManager(hubMgr); err != nil {
309320
klog.ErrorS(err, "Unable to create internalserviceimport controller")
310321
return err
311322
}
323+
controllers = append(controllers, internalServiceImportController)
312324

313325
klog.V(1).InfoS("Create serviceexport reconciler")
314-
if err := (&serviceexport.Reconciler{
326+
serviceExportController := &serviceexport.Reconciler{
315327
MemberClient: memberClient,
316328
HubClient: hubClient,
317329
MemberClusterID: mcName,
318330
HubNamespace: mcHubNamespace,
319331
Recorder: memberMgr.GetEventRecorderFor(serviceexport.ControllerName),
320-
}).SetupWithManager(memberMgr); err != nil {
332+
}
333+
if err := serviceExportController.SetupWithManager(memberMgr); err != nil {
321334
klog.ErrorS(err, "Unable to create serviceexport reconciler")
322335
return err
323336
}
337+
controllers = append(controllers, serviceExportController)
324338

325339
klog.V(1).InfoS("Create serviceimport reconciler")
326-
if err := (&serviceimport.Reconciler{
340+
serviceImportController := &serviceimport.Reconciler{
327341
MemberClient: memberClient,
328342
HubClient: hubClient,
329343
MemberClusterID: mcName,
330344
HubNamespace: mcHubNamespace,
331-
}).SetupWithManager(memberMgr); err != nil {
345+
}
346+
if err := serviceImportController.SetupWithManager(memberMgr); err != nil {
332347
klog.ErrorS(err, "Unable to create serviceimport reconciler")
333348
return err
334349
}
350+
controllers = append(controllers, serviceImportController)
335351

336352
if *isV1Alpha1APIEnabled {
337353
klog.V(1).InfoS("Create internalmembercluster (v1alpha1 API) reconciler")
@@ -351,6 +367,7 @@ func setupControllersWithManager(ctx context.Context, hubMgr, memberMgr manager.
351367
MemberClient: memberClient,
352368
HubClient: hubClient,
353369
AgentType: clusterv1beta1.ServiceExportImportAgent,
370+
Controllers: controllers,
354371
}).SetupWithManager(hubMgr); err != nil {
355372
klog.ErrorS(err, "Unable to create internalmembercluster (v1beta1 API) reconciler")
356373
return err

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ require (
1717

1818
require (
1919
github.com/stretchr/testify v1.9.0
20-
go.goms.io/fleet v0.10.5
20+
go.goms.io/fleet v0.10.8
21+
golang.org/x/sync v0.7.0
2122
)
2223

2324
require (

go.sum

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,10 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
9494
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
9595
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
9696
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
97-
go.goms.io/fleet v0.10.5 h1:Zc+pLk77zWv0hAqBbFZEMMd05MVw9P8jp8YHTy7WPdI=
98-
go.goms.io/fleet v0.10.5/go.mod h1:FpVP3YsiewmyGH77Yx6sLngHbZKgepnmJDIibz2pjZo=
97+
go.goms.io/fleet v0.10.7 h1:kVPcH+XPO894chIoHlMK0cNIi7xDAqy771yIAk4bQIQ=
98+
go.goms.io/fleet v0.10.7/go.mod h1:2MaaOUGGespUMwgy64MBIMXELv8lDJq+0/NyS3OGzTw=
99+
go.goms.io/fleet v0.10.8 h1:AAK4wr4uKB8ATMhC4cpCKYAq9lMr9XLYE5QE+vkBf5M=
100+
go.goms.io/fleet v0.10.8/go.mod h1:2MaaOUGGespUMwgy64MBIMXELv8lDJq+0/NyS3OGzTw=
99101
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
100102
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
101103
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
@@ -120,6 +122,8 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
120122
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
121123
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
122124
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
125+
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
126+
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
123127
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
124128
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
125129
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

pkg/controllers/member/endpointslice/controller.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"context"
1212
"fmt"
1313
"strconv"
14+
"sync/atomic"
1415
"time"
1516

1617
discoveryv1 "k8s.io/api/discovery/v1"
@@ -53,6 +54,8 @@ type Reconciler struct {
5354
HubClient client.Client
5455
// The namespace reserved for the current member cluster in the hub cluster.
5556
HubNamespace string
57+
// whether to start exporting an EndpointSlice
58+
joined atomic.Bool
5659
}
5760

5861
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceexports,verbs=get;list;watch;create;update;patch;delete
@@ -110,6 +113,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
110113
return ctrl.Result{}, nil
111114
}
112115

116+
if !r.joined.Load() {
117+
klog.V(2).InfoS("EndpointSlice controller is not started yet, requeue the request", "endpointSlice", endpointSliceRef)
118+
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
119+
}
120+
113121
// Retrieve the unique name assigned; if none has been assigned, or the one assigned is not valid, possibly due
114122
// to user tampering with the annotation, assign a new unique name.
115123
fleetUniqueName, ok := endpointSlice.Annotations[objectmeta.ExportedObjectAnnotationUniqueName]
@@ -439,3 +447,24 @@ func (r *Reconciler) annotateLastSeenGenerationAndTimestamp(ctx context.Context,
439447
endpointSlice.Annotations[metrics.MetricsAnnotationLastSeenTimestamp] = startTime.Format(metrics.MetricsLastSeenTimestampFormat)
440448
return r.MemberClient.Update(ctx, endpointSlice)
441449
}
450+
451+
// Join marks the joined status as true.
452+
func (r *Reconciler) Join(_ context.Context) error {
453+
if r.joined.Load() {
454+
return nil
455+
}
456+
klog.InfoS("Mark the endpointSlice controller joined")
457+
r.joined.Store(true)
458+
return nil
459+
}
460+
461+
// Leave marks the joined status as false.
462+
// When the controller is in the leave state, it will only handle the delete events.
463+
func (r *Reconciler) Leave(_ context.Context) error {
464+
if !r.joined.Load() {
465+
return nil
466+
}
467+
klog.InfoS("Mark the endpointSlice controller left")
468+
r.joined.Store(false)
469+
return nil
470+
}

pkg/controllers/member/endpointslice/suite_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ var (
3131
hubClient client.Client
3232
ctx context.Context
3333
cancel context.CancelFunc
34+
reconciler *Reconciler
3435
)
3536

3637
// setUpResources help set up resources in the test environment.
@@ -99,13 +100,15 @@ var _ = BeforeSuite(func() {
99100
ctrlMgr, err := ctrl.NewManager(memberCfg, ctrl.Options{Scheme: scheme.Scheme})
100101
Expect(err).NotTo(HaveOccurred())
101102

102-
err = (&Reconciler{
103+
reconciler = &Reconciler{
103104
MemberClusterID: memberClusterID,
104105
MemberClient: memberClient,
105106
HubClient: hubClient,
106107
HubNamespace: hubNSForMember,
107-
}).SetupWithManager(ctx, ctrlMgr)
108+
}
109+
err = reconciler.SetupWithManager(ctx, ctrlMgr)
108110
Expect(err).NotTo(HaveOccurred())
111+
Expect(reconciler.Join(ctx)).Should(Succeed())
109112

110113
go func() {
111114
defer GinkgoRecover()
@@ -116,6 +119,7 @@ var _ = BeforeSuite(func() {
116119

117120
var _ = AfterSuite(func() {
118121
defer klog.Flush()
122+
Expect(reconciler.Leave(ctx)).Should(Succeed())
119123
cancel()
120124

121125
By("tearing down the test environment")

pkg/controllers/member/endpointsliceexport/controller.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,17 @@ func isEndpointSliceExportLinkedWithEndpointSlice(endpointSliceExport *fleetnetv
125125
}
126126
return true
127127
}
128+
129+
// Join does nothing.
130+
// There is no need to start or stop the controller as this controller is designed to clean up any invalid
131+
// EndpointSliceExport in the hub cluster.
132+
func (r *Reconciler) Join(_ context.Context) error {
133+
// do nothing
134+
return nil
135+
}
136+
137+
// Leave does nothing.
138+
func (r *Reconciler) Leave(_ context.Context) error {
139+
// do nothing
140+
return nil
141+
}

pkg/controllers/member/endpointsliceimport/controller.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package endpointsliceimport
1010
import (
1111
"context"
1212
"fmt"
13+
"sync/atomic"
1314
"time"
1415

1516
"github.com/prometheus/client_golang/prometheus"
@@ -82,6 +83,8 @@ type Reconciler struct {
8283
HubClient client.Client
8384
// The namespace reserved for fleet resources in the member cluster.
8485
FleetSystemNamespace string
86+
// whether to start exporting an EndpointSlice
87+
joined atomic.Bool
8588
}
8689

8790
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceimports,verbs=get;list;watch;update;patch
@@ -130,6 +133,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
130133
return ctrl.Result{}, nil
131134
}
132135

136+
if !r.joined.Load() {
137+
klog.V(2).InfoS("EndpointSliceImport controller is not started yet, requeue the request", "endpointSliceImport", endpointSliceImportRef)
138+
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
139+
}
140+
133141
// Import the EndpointSlice, or update an imported EndpointSlice.
134142

135143
// Inquire the corresponding MCS to find out which Service the imported EndpointSlice should associate with.
@@ -428,3 +436,24 @@ func (r *Reconciler) observeMetrics(ctx context.Context, endpointSliceImport *fl
428436
"isFirstImport", isFirstImport)
429437
return nil
430438
}
439+
440+
// Join marks the joined status as true.
441+
func (r *Reconciler) Join(_ context.Context) error {
442+
if r.joined.Load() {
443+
return nil
444+
}
445+
klog.InfoS("Mark the endpointSliceImport controller joined")
446+
r.joined.Store(true)
447+
return nil
448+
}
449+
450+
// Leave marks the joined status as false.
451+
// When the controller is in the leave state, it will only handle the delete events.
452+
func (r *Reconciler) Leave(_ context.Context) error {
453+
if !r.joined.Load() {
454+
return nil
455+
}
456+
klog.InfoS("Mark the endpointSliceImport controller left")
457+
r.joined.Store(false)
458+
return nil
459+
}

pkg/controllers/member/endpointsliceimport/suite_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ var (
3232
hubClient client.Client
3333
ctx context.Context
3434
cancel context.CancelFunc
35+
reconciler *Reconciler
3536
)
3637

3738
// setUpResources help set up resources in the test environment.
@@ -116,12 +117,14 @@ var _ = BeforeSuite(func() {
116117
hubClient = hubCtrlMgr.GetClient()
117118
Expect(hubClient).NotTo(BeNil())
118119

119-
err = (&Reconciler{
120+
reconciler = &Reconciler{
120121
MemberClient: memberClient,
121122
HubClient: hubClient,
122123
FleetSystemNamespace: fleetSystemNS,
123-
}).SetupWithManager(ctx, memberCtrlMgr, hubCtrlMgr)
124+
}
125+
err = reconciler.SetupWithManager(ctx, memberCtrlMgr, hubCtrlMgr)
124126
Expect(err).NotTo(HaveOccurred())
127+
Expect(reconciler.Join(ctx)).Should(Succeed())
125128

126129
go func() {
127130
defer GinkgoRecover()
@@ -141,6 +144,7 @@ var _ = BeforeSuite(func() {
141144

142145
var _ = AfterSuite(func() {
143146
defer klog.Flush()
147+
Expect(reconciler.Leave(ctx)).Should(Succeed())
144148
cancel()
145149

146150
By("tearing down the test environment")

pkg/controllers/member/internalmembercluster/v1alpha1/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ var _ = BeforeSuite(func() {
8383
filepath.Join("../../../../../", "config", "crd", "bases"),
8484
// need to make sure the version matches the one in the go.mod
8585
// workaround mentioned in https://github.com/kubernetes-sigs/controller-runtime/issues/1191
86-
filepath.Join(build.Default.GOPATH, "pkg", "mod", "go.goms.io", "[email protected].5", "config", "crd", "bases"),
86+
filepath.Join(build.Default.GOPATH, "pkg", "mod", "go.goms.io", "[email protected].8", "config", "crd", "bases"),
8787
},
8888
ErrorIfCRDPathMissing: true,
8989
}

0 commit comments

Comments
 (0)