Skip to content

Commit 9b8c879

Browse files
authored
Emit error when the rule synchronization fails (#6902)
Signed-off-by: SungJin1212 <[email protected]>
1 parent 37213a1 commit 9b8c879

File tree

3 files changed

+34
-13
lines changed

3 files changed

+34
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
2121
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
2222
* [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893
23+
* [ENHANCEMENT] Ruler: Emit an error message when the rule synchronization fails. #6902
2324
* [ENHANCEMENT] Querier: Support snappy and zstd response compression for `-querier.response-compression` flag. #6848
2425
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
2526
* [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859

pkg/ruler/ruler.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -693,29 +693,40 @@ func (r *Ruler) run(ctx context.Context) error {
693693
ringTickerChan = ringTicker.C
694694
}
695695

696-
r.syncRules(ctx, rulerSyncReasonInitial)
696+
syncRuleErrMsg := func(syncRulesErr error) {
697+
level.Error(r.logger).Log("msg", "failed to sync rules", "err", syncRulesErr)
698+
}
699+
700+
initialSyncErr := r.syncRules(ctx, rulerSyncReasonInitial)
701+
if initialSyncErr != nil {
702+
syncRuleErrMsg(initialSyncErr)
703+
}
697704
for {
705+
var syncRulesErr error
698706
select {
699707
case <-ctx.Done():
700708
return nil
701709
case <-tick.C:
702-
r.syncRules(ctx, rulerSyncReasonPeriodic)
710+
syncRulesErr = r.syncRules(ctx, rulerSyncReasonPeriodic)
703711
case <-ringTickerChan:
704712
// We ignore the error because in case of error it will return an empty
705713
// replication set which we use to compare with the previous state.
706714
currRingState, _ := r.ring.GetAllHealthy(RingOp)
707715

708716
if ring.HasReplicationSetChanged(ringLastState, currRingState) {
709717
ringLastState = currRingState
710-
r.syncRules(ctx, rulerSyncReasonRingChange)
718+
syncRulesErr = r.syncRules(ctx, rulerSyncReasonRingChange)
711719
}
712720
case err := <-r.subservicesWatcher.Chan():
713721
return errors.Wrap(err, "ruler subservice failed")
714722
}
723+
if syncRulesErr != nil {
724+
syncRuleErrMsg(syncRulesErr)
725+
}
715726
}
716727
}
717728

718-
func (r *Ruler) syncRules(ctx context.Context, reason string) {
729+
func (r *Ruler) syncRules(ctx context.Context, reason string) error {
719730
level.Info(r.logger).Log("msg", "syncing rules", "reason", reason)
720731
r.rulerSync.WithLabelValues(reason).Inc()
721732
timer := prometheus.NewTimer(nil)
@@ -727,19 +738,21 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) {
727738

728739
loadedConfigs, backupConfigs, err := r.loadRuleGroups(ctx)
729740
if err != nil {
730-
return
741+
return err
731742
}
732743

733744
if ctx.Err() != nil {
734745
level.Info(r.logger).Log("msg", "context is canceled. not syncing rules")
735-
return
746+
return err
736747
}
737748
// This will also delete local group files for users that are no longer in 'configs' map.
738749
r.manager.SyncRuleGroups(ctx, loadedConfigs)
739750

740751
if r.cfg.RulesBackupEnabled() {
741752
r.manager.BackUpRuleGroups(ctx, backupConfigs)
742753
}
754+
755+
return nil
743756
}
744757

745758
func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) {

pkg/ruler/ruler_test.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1342,7 +1342,8 @@ func TestGetRules(t *testing.T) {
13421342

13431343
// Sync Rules
13441344
forEachRuler(func(_ string, r *Ruler) {
1345-
r.syncRules(context.Background(), rulerSyncReasonInitial)
1345+
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
1346+
require.NoError(t, err)
13461347
})
13471348

13481349
if tc.sharding {
@@ -1572,7 +1573,8 @@ func TestGetRulesFromBackup(t *testing.T) {
15721573

15731574
// Sync Rules
15741575
forEachRuler(func(_ string, r *Ruler) {
1575-
r.syncRules(context.Background(), rulerSyncReasonInitial)
1576+
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
1577+
require.NoError(t, err)
15761578
})
15771579

15781580
// update the State of the rulers in the ring based on tc.rulerStateMap
@@ -1788,7 +1790,8 @@ func getRulesHATest(replicationFactor int) func(t *testing.T) {
17881790

17891791
// Sync Rules
17901792
forEachRuler(func(_ string, r *Ruler) {
1791-
r.syncRules(context.Background(), rulerSyncReasonInitial)
1793+
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
1794+
require.NoError(t, err)
17921795
})
17931796

17941797
// update the State of the rulers in the ring based on tc.rulerStateMap
@@ -1811,8 +1814,10 @@ func getRulesHATest(replicationFactor int) func(t *testing.T) {
18111814
t.Errorf("ruler %s was not terminated with error %s", "ruler1", err.Error())
18121815
}
18131816

1814-
rulerAddrMap["ruler2"].syncRules(context.Background(), rulerSyncReasonPeriodic)
1815-
rulerAddrMap["ruler3"].syncRules(context.Background(), rulerSyncReasonPeriodic)
1817+
err = rulerAddrMap["ruler2"].syncRules(context.Background(), rulerSyncReasonPeriodic)
1818+
require.NoError(t, err)
1819+
err = rulerAddrMap["ruler3"].syncRules(context.Background(), rulerSyncReasonPeriodic)
1820+
require.NoError(t, err)
18161821

18171822
requireGroupStateEqual := func(a *GroupStateDesc, b *GroupStateDesc) {
18181823
require.Equal(t, a.Group.Interval, b.Group.Interval)
@@ -2800,7 +2805,8 @@ func TestRecoverAlertsPostOutage(t *testing.T) {
28002805
evalFunc := func(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {}
28012806

28022807
r, _ := buildRulerWithIterFunc(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil, evalFunc)
2803-
r.syncRules(context.Background(), rulerSyncReasonInitial)
2808+
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
2809+
require.NoError(t, err)
28042810

28052811
// assert initial state of rule group
28062812
ruleGroup := r.manager.GetRules("user1")[0]
@@ -3265,7 +3271,8 @@ func TestGetShardSizeForUser(t *testing.T) {
32653271

32663272
// Sync Rules
32673273
forEachRuler(func(_ string, r *Ruler) {
3268-
r.syncRules(context.Background(), rulerSyncReasonInitial)
3274+
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
3275+
require.NoError(t, err)
32693276
})
32703277

32713278
result := testRuler.getShardSizeForUser(tc.userID)

0 commit comments

Comments
 (0)