Skip to content

Commit ef30deb

Browse files
committed
Start and stop upgrade-check using controller-runtime
Blocking functions can be added to a controller-runtime Manager so that they start after caches have started and synced. They also stop before caches have stopped.
1 parent 1baca25 commit ef30deb

File tree

4 files changed

+69
-92
lines changed

4 files changed

+69
-92
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ deploy-dev: build-postgres-operator createnamespaces
101101
hack/create-kubeconfig.sh postgres-operator pgo
102102
env \
103103
CRUNCHY_DEBUG=true \
104-
CHECK_FOR_UPGRADES=false \
104+
CHECK_FOR_UPGRADES='$(if $(CHECK_FOR_UPGRADES),$(CHECK_FOR_UPGRADES),false)' \
105105
KUBECONFIG=hack/.kube/postgres-operator/pgo \
106106
$(shell $(PGO_KUBE_CLIENT) kustomize ./config/dev | \
107107
sed -ne '/^kind: Deployment/,/^---/ { \

cmd/postgres-operator/main.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,39 +82,38 @@ func main() {
8282
mgr, err := runtime.CreateRuntimeManager(os.Getenv("PGO_TARGET_NAMESPACE"), cfg, false)
8383
assertNoError(err)
8484

85+
openshift := isOpenshift(ctx, cfg)
86+
8587
// add all PostgreSQL Operator controllers to the runtime manager
86-
err = addControllersToManager(ctx, mgr)
88+
err = addControllersToManager(mgr, openshift)
8789
assertNoError(err)
8890

89-
log.Info("starting controller runtime manager and will wait for signal to exit")
90-
9191
// Enable upgrade checking
9292
upgradeCheckingDisabled := strings.EqualFold(os.Getenv("CHECK_FOR_UPGRADES"), "false")
9393
if !upgradeCheckingDisabled {
9494
log.Info("upgrade checking enabled")
9595
// get the URL for the check for upgrades endpoint if set in the env
96-
upgradeCheckURL := os.Getenv("CHECK_FOR_UPGRADES_URL")
97-
go upgradecheck.CheckForUpgradesScheduler(ctx, versionString, upgradeCheckURL,
98-
mgr.GetClient(), mgr.GetConfig(), isOpenshift(ctx, mgr.GetConfig()),
99-
mgr.GetCache(),
100-
)
96+
assertNoError(upgradecheck.ManagedScheduler(mgr,
97+
openshift, os.Getenv("CHECK_FOR_UPGRADES_URL"), versionString))
10198
} else {
10299
log.Info("upgrade checking disabled")
103100
}
104101

102+
log.Info("starting controller runtime manager and will wait for signal to exit")
103+
105104
assertNoError(mgr.Start(ctx))
106105
log.Info("signal received, exiting")
107106
}
108107

109108
// addControllersToManager adds all PostgreSQL Operator controllers to the provided controller
110109
// runtime manager.
111-
func addControllersToManager(ctx context.Context, mgr manager.Manager) error {
110+
func addControllersToManager(mgr manager.Manager, openshift bool) error {
112111
r := &postgrescluster.Reconciler{
113112
Client: mgr.GetClient(),
114113
Owner: postgrescluster.ControllerName,
115114
Recorder: mgr.GetEventRecorderFor(postgrescluster.ControllerName),
116115
Tracer: otel.Tracer(postgrescluster.ControllerName),
117-
IsOpenShift: isOpenshift(ctx, mgr.GetConfig()),
116+
IsOpenShift: openshift,
118117
}
119118
return r.SetupWithManager(mgr)
120119
}

internal/upgradecheck/http.go

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/apimachinery/pkg/util/wait"
2626
"k8s.io/client-go/rest"
2727
crclient "sigs.k8s.io/controller-runtime/pkg/client"
28+
"sigs.k8s.io/controller-runtime/pkg/manager"
2829

2930
"github.com/crunchydata/postgres-operator/internal/logging"
3031
)
@@ -51,10 +52,6 @@ const (
5152
upgradeCheckURL = "https://operator-maestro.crunchydata.com/pgo-versions"
5253
)
5354

54-
var (
55-
upgradeCheckPeriod = 24 * time.Hour
56-
)
57-
5855
type HTTPClient interface {
5956
Do(req *http.Request) (*http.Response, error)
6057
}
@@ -135,48 +132,53 @@ func checkForUpgrades(ctx context.Context, url, versionString string, backoff wa
135132
return string(bodyBytes), req.Header.Get(clientHeader), err
136133
}
137134

138-
// CheckForUpgradesScheduler invokes the check func when the operator starts
139-
// and then on the given period schedule. It stops when the context is cancelled.
140-
func CheckForUpgradesScheduler(ctx context.Context,
141-
versionString, url string, crclient crclient.Client,
142-
cfg *rest.Config, isOpenShift bool,
143-
cacheClient CacheWithWait,
144-
) {
145-
log := logging.FromContext(ctx)
135+
type CheckForUpgradesScheduler struct {
136+
Client crclient.Client
137+
Config *rest.Config
138+
139+
OpenShift bool
140+
Refresh time.Duration
141+
URL, Version string
142+
}
146143

144+
// ManagedScheduler creates a [CheckForUpgradesScheduler] and adds it to m.
145+
func ManagedScheduler(m manager.Manager, openshift bool, url, version string) error {
147146
if url == "" {
148147
url = upgradeCheckURL
149148
}
150149

151-
// Since we pass the client to this function before we start the manager
152-
// in cmd/postgres-operator/main.go, we want to make sure cache is synced
153-
// before using the client.
154-
// If the cache fails to sync, that probably indicates a more serious problem
155-
// with the manager starting, so we don't have to worry about restarting or retrying
156-
// this process -- simply log and return
157-
if synced := cacheClient.WaitForCacheSync(ctx); !synced {
158-
log.V(1).Info("unable to sync cache for upgrade check")
159-
return
160-
}
150+
return m.Add(&CheckForUpgradesScheduler{
151+
Client: m.GetClient(),
152+
Config: m.GetConfig(),
153+
OpenShift: openshift,
154+
Refresh: 24 * time.Hour,
155+
URL: url,
156+
Version: version,
157+
})
158+
}
159+
160+
// NeedLeaderElection returns true so that s runs only on the single
161+
// [manager.Manager] that is elected leader in the Kubernetes cluster.
162+
func (s *CheckForUpgradesScheduler) NeedLeaderElection() bool { return true }
163+
164+
// Start checks for upgrades periodically. It blocks until ctx is cancelled.
165+
func (s *CheckForUpgradesScheduler) Start(ctx context.Context) error {
166+
s.check(ctx)
161167

162-
check(ctx, versionString, url, crclient, cfg, isOpenShift)
168+
ticker := time.NewTicker(s.Refresh)
169+
defer ticker.Stop()
163170

164-
ticker := time.NewTicker(upgradeCheckPeriod)
165171
for {
166172
select {
167173
case <-ticker.C:
168-
check(ctx, versionString, url, crclient, cfg, isOpenShift)
174+
s.check(ctx)
169175
case <-ctx.Done():
170-
ticker.Stop()
171-
return
176+
return ctx.Err()
172177
}
173178
}
174179
}
175180

176-
func check(ctx context.Context,
177-
versionString, url string, crclient crclient.Client,
178-
cfg *rest.Config, isOpenShift bool,
179-
) {
181+
func (s *CheckForUpgradesScheduler) check(ctx context.Context) {
180182
log := logging.FromContext(ctx)
181183

182184
defer func() {
@@ -186,7 +188,7 @@ func check(ctx context.Context,
186188
}()
187189

188190
info, header, err := checkForUpgrades(ctx,
189-
url, versionString, backoff, crclient, cfg, isOpenShift)
191+
s.URL, s.Version, backoff, s.Client, s.Config, s.OpenShift)
190192

191193
if err != nil {
192194
log.V(1).Info("could not complete upgrade check", "response", err.Error())

internal/upgradecheck/http_test.go

Lines changed: 25 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"gotest.tools/v3/assert"
3131
"k8s.io/apimachinery/pkg/util/wait"
3232
"k8s.io/client-go/rest"
33+
"sigs.k8s.io/controller-runtime/pkg/manager"
3334

3435
"github.com/crunchydata/postgres-operator/internal/logging"
3536
"github.com/crunchydata/postgres-operator/internal/testing/cmp"
@@ -56,14 +57,6 @@ func (m *MockClient) Do(req *http.Request) (*http.Response, error) {
5657
return funcFoo()
5758
}
5859

59-
type MockCacheClient struct {
60-
works bool
61-
}
62-
63-
func (cc *MockCacheClient) WaitForCacheSync(ctx context.Context) bool {
64-
return cc.works
65-
}
66-
6760
func TestCheckForUpgrades(t *testing.T) {
6861
fakeClient := setupFakeClientWithPGOScheme(t, false)
6962
ctx := logging.NewContext(context.Background(), logging.Discard())
@@ -168,11 +161,9 @@ func TestCheckForUpgradesScheduler(t *testing.T) {
168161
_, server := setupVersionServer(t, true)
169162
defer server.Close()
170163
cfg := &rest.Config{Host: server.URL}
171-
const testUpgradeCheckURL = "http://localhost:8080"
172164

173165
t.Run("panic from checkForUpgrades doesn't bubble up", func(t *testing.T) {
174-
ctx, cancel := context.WithCancel(context.Background())
175-
defer cancel()
166+
ctx := context.Background()
176167

177168
// capture logs
178169
var calls []string
@@ -187,44 +178,18 @@ func TestCheckForUpgradesScheduler(t *testing.T) {
187178
panic(fmt.Errorf("oh no!"))
188179
}
189180

190-
go CheckForUpgradesScheduler(ctx, "4.7.3", testUpgradeCheckURL, fakeClient, cfg, false,
191-
&MockCacheClient{works: true})
192-
time.Sleep(1 * time.Second)
193-
cancel()
181+
s := CheckForUpgradesScheduler{
182+
Client: fakeClient,
183+
Config: cfg,
184+
}
185+
s.check(ctx)
194186

195-
// Sleeping leads to some non-deterministic results, but we expect at least 1 execution
196-
// plus one log for the failure to apply the configmap
197-
assert.Assert(t, len(calls) >= 2)
187+
assert.Equal(t, len(calls), 2)
198188
assert.Assert(t, cmp.Contains(calls[1], `encountered panic in upgrade check`))
199189
})
200190

201-
t.Run("cache sync fail leads to log and exit", func(t *testing.T) {
202-
ctx, cancel := context.WithCancel(context.Background())
203-
defer cancel()
204-
205-
// capture logs
206-
var calls []string
207-
ctx = logging.NewContext(ctx, funcr.NewJSON(func(object string) {
208-
calls = append(calls, object)
209-
}, funcr.Options{
210-
Verbosity: 1,
211-
}))
212-
213-
// Set loop time to 1s and sleep for 2s before sending the done signal -- though the cache sync
214-
// failure will exit the func before the sleep ends
215-
upgradeCheckPeriod = 1 * time.Second
216-
go CheckForUpgradesScheduler(ctx, "4.7.3", testUpgradeCheckURL, fakeClient, cfg, false,
217-
&MockCacheClient{works: false})
218-
time.Sleep(2 * time.Second)
219-
cancel()
220-
221-
assert.Assert(t, len(calls) == 1)
222-
assert.Assert(t, cmp.Contains(calls[0], `unable to sync cache for upgrade check`))
223-
})
224-
225191
t.Run("successful log each loop, ticker works", func(t *testing.T) {
226-
ctx, cancel := context.WithCancel(context.Background())
227-
defer cancel()
192+
ctx := context.Background()
228193

229194
// capture logs
230195
var calls []string
@@ -244,11 +209,14 @@ func TestCheckForUpgradesScheduler(t *testing.T) {
244209
}
245210

246211
// Set loop time to 1s and sleep for 2s before sending the done signal
247-
upgradeCheckPeriod = 1 * time.Second
248-
go CheckForUpgradesScheduler(ctx, "4.7.3", testUpgradeCheckURL, fakeClient, cfg, false,
249-
&MockCacheClient{works: true})
250-
time.Sleep(2 * time.Second)
251-
cancel()
212+
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
213+
defer cancel()
214+
s := CheckForUpgradesScheduler{
215+
Client: fakeClient,
216+
Config: cfg,
217+
Refresh: 1 * time.Second,
218+
}
219+
assert.ErrorIs(t, context.DeadlineExceeded, s.Start(ctx))
252220

253221
// Sleeping leads to some non-deterministic results, but we expect at least 2 executions
254222
// plus one log for the failure to apply the configmap
@@ -258,3 +226,11 @@ func TestCheckForUpgradesScheduler(t *testing.T) {
258226
assert.Assert(t, cmp.Contains(calls[3], `{\"pgo_versions\":[{\"tag\":\"v5.0.4\"},{\"tag\":\"v5.0.3\"},{\"tag\":\"v5.0.2\"},{\"tag\":\"v5.0.1\"},{\"tag\":\"v5.0.0\"}]}`))
259227
})
260228
}
229+
230+
func TestCheckForUpgradesSchedulerLeaderOnly(t *testing.T) {
231+
// CheckForUpgradesScheduler should implement this interface.
232+
var s manager.LeaderElectionRunnable = new(CheckForUpgradesScheduler)
233+
234+
assert.Assert(t, s.NeedLeaderElection(),
235+
"expected to only run on the leader")
236+
}

0 commit comments

Comments
 (0)