Skip to content

Commit 2b0d122

Browse files
khainespracucci
andauthored
Don't track as error an HA tracker CAS operation intentionally aborted (#3745) (#3746)
Signed-off-by: Marco Pracucci <[email protected]> Signed-off-by: Ken Haines <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent af2c64e commit 2b0d122

File tree

5 files changed

+95
-7
lines changed

5 files changed

+95
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
* [BUGFIX] Alertmanager: don't serve HTTP requests until Alertmanager has fully started. Serving HTTP requests earlier may result in loss of configuration for the user. #3679
8585
* [BUGFIX] Do not log "failed to load config" if runtime config file is empty. #3706
8686
* [BUGFIX] Do not allow to use a runtime config file containing multiple YAML documents. #3706
87+
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
8788

8889
## 1.6.0
8990

pkg/distributor/ha_tracker.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now t
276276
}
277277

278278
// We shouldn't failover to accepting a new replica if the timestamp we've received this sample at
279-
// is less than failOver timeout amount of time since the timestamp in the KV store.
279+
// is less than failover timeout amount of time since the timestamp in the KV store.
280280
if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout {
281281
return nil, false, replicasNotMatchError{replica: replica, elected: desc.Replica}
282282
}
@@ -306,6 +306,11 @@ func (e replicasNotMatchError) Is(err error) bool {
306306
return ok1 || ok2
307307
}
308308

309+
// IsOperationAborted returns whether the error has been caused by an operation intentionally aborted.
310+
func (e replicasNotMatchError) IsOperationAborted() bool {
311+
return true
312+
}
313+
309314
type tooManyClustersError struct {
310315
limit int
311316
}

pkg/distributor/ha_tracker_test.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"time"
88

99
"github.com/pkg/errors"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/prometheus/pkg/labels"
1012
"github.com/prometheus/prometheus/pkg/timestamp"
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
@@ -17,6 +19,7 @@ import (
1719
"github.com/cortexproject/cortex/pkg/ring"
1820
"github.com/cortexproject/cortex/pkg/ring/kv"
1921
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
22+
"github.com/cortexproject/cortex/pkg/util"
2023
"github.com/cortexproject/cortex/pkg/util/flagext"
2124
"github.com/cortexproject/cortex/pkg/util/services"
2225
"github.com/cortexproject/cortex/pkg/util/test"
@@ -196,13 +199,14 @@ func TestCheckReplicaMultiCluster(t *testing.T) {
196199
replica1 := "replica1"
197200
replica2 := "replica2"
198201

202+
reg := prometheus.NewPedanticRegistry()
199203
c, err := newClusterTracker(HATrackerConfig{
200204
EnableHATracker: true,
201205
KVStore: kv.Config{Store: "inmemory"},
202206
UpdateTimeout: 100 * time.Millisecond,
203207
UpdateTimeoutJitterMax: 0,
204208
FailoverTimeout: time.Second,
205-
}, trackerLimits{maxClusters: 100}, nil)
209+
}, trackerLimits{maxClusters: 100}, reg)
206210
require.NoError(t, err)
207211
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
208212
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
@@ -224,20 +228,34 @@ func TestCheckReplicaMultiCluster(t *testing.T) {
224228
assert.NoError(t, err)
225229
err = c.checkReplica(context.Background(), "user", "c2", replica1)
226230
assert.NoError(t, err)
231+
232+
// We expect no CAS operation failures.
233+
metrics, err := reg.Gather()
234+
require.NoError(t, err)
235+
236+
assert.Equal(t, uint64(0), util.GetSumOfHistogramSampleCount(metrics, "cortex_kv_request_duration_seconds", labels.Selector{
237+
labels.MustNewMatcher(labels.MatchEqual, "operation", "CAS"),
238+
labels.MustNewMatcher(labels.MatchRegexp, "status_code", "5.*"),
239+
}))
240+
assert.Greater(t, util.GetSumOfHistogramSampleCount(metrics, "cortex_kv_request_duration_seconds", labels.Selector{
241+
labels.MustNewMatcher(labels.MatchEqual, "operation", "CAS"),
242+
labels.MustNewMatcher(labels.MatchRegexp, "status_code", "2.*"),
243+
}), uint64(0))
227244
}
228245

229246
func TestCheckReplicaMultiClusterTimeout(t *testing.T) {
230247
start := mtime.Now()
231248
replica1 := "replica1"
232249
replica2 := "replica2"
233250

251+
reg := prometheus.NewPedanticRegistry()
234252
c, err := newClusterTracker(HATrackerConfig{
235253
EnableHATracker: true,
236254
KVStore: kv.Config{Store: "inmemory"},
237255
UpdateTimeout: 100 * time.Millisecond,
238256
UpdateTimeoutJitterMax: 0,
239257
FailoverTimeout: time.Second,
240-
}, trackerLimits{maxClusters: 100}, nil)
258+
}, trackerLimits{maxClusters: 100}, reg)
241259
require.NoError(t, err)
242260
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
243261
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
@@ -259,7 +277,13 @@ func TestCheckReplicaMultiClusterTimeout(t *testing.T) {
259277
err = c.checkReplica(context.Background(), "user", "c2", replica1)
260278
assert.NoError(t, err)
261279

262-
// Wait more than the timeout.
280+
// Reject samples from replica 2 in each cluster.
281+
err = c.checkReplica(context.Background(), "user", "c1", replica2)
282+
assert.Error(t, err)
283+
err = c.checkReplica(context.Background(), "user", "c2", replica2)
284+
assert.Error(t, err)
285+
286+
// Wait more than the failover timeout.
263287
mtime.NowForce(start.Add(1100 * time.Millisecond))
264288

265289
// Accept a sample from c1/replica2.
@@ -271,6 +295,19 @@ func TestCheckReplicaMultiClusterTimeout(t *testing.T) {
271295
assert.Error(t, err)
272296
err = c.checkReplica(context.Background(), "user", "c2", replica1)
273297
assert.NoError(t, err)
298+
299+
// We expect no CAS operation failures.
300+
metrics, err := reg.Gather()
301+
require.NoError(t, err)
302+
303+
assert.Equal(t, uint64(0), util.GetSumOfHistogramSampleCount(metrics, "cortex_kv_request_duration_seconds", labels.Selector{
304+
labels.MustNewMatcher(labels.MatchEqual, "operation", "CAS"),
305+
labels.MustNewMatcher(labels.MatchRegexp, "status_code", "5.*"),
306+
}))
307+
assert.Greater(t, util.GetSumOfHistogramSampleCount(metrics, "cortex_kv_request_duration_seconds", labels.Selector{
308+
labels.MustNewMatcher(labels.MatchEqual, "operation", "CAS"),
309+
labels.MustNewMatcher(labels.MatchRegexp, "status_code", "2.*"),
310+
}), uint64(0))
274311
}
275312

276313
// Test that writes only happen every update timeout.

pkg/ring/kv/metrics.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,22 @@ func RegistererWithKVName(reg prometheus.Registerer, name string) prometheus.Reg
2020
return prometheus.WrapRegistererWith(prometheus.Labels{"kv_name": name}, reg)
2121
}
2222

23-
// errorCode converts an error into an HTTP status code, modified from weaveworks/common/instrument
24-
func errorCode(err error) string {
23+
// getCasErrorCode converts the provided CAS error into the code that should be used to track the operation
24+
// in metrics.
25+
func getCasErrorCode(err error) string {
2526
if err == nil {
2627
return "200"
2728
}
2829
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok {
2930
return strconv.Itoa(int(resp.GetCode()))
3031
}
32+
33+
// If the error has been returned to abort the CAS operation, then we shouldn't
34+
// consider it an error when tracking metrics.
35+
if casErr, ok := err.(interface{ IsOperationAborted() bool }); ok && casErr.IsOperationAborted() {
36+
return "200"
37+
}
38+
3139
return "500"
3240
}
3341

@@ -81,7 +89,7 @@ func (m metrics) Delete(ctx context.Context, key string) error {
8189
}
8290

8391
func (m metrics) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
84-
return instrument.CollectedRequest(ctx, "CAS", m.requestDuration, errorCode, func(ctx context.Context) error {
92+
return instrument.CollectedRequest(ctx, "CAS", m.requestDuration, getCasErrorCode, func(ctx context.Context) error {
8593
return m.c.CAS(ctx, key, f)
8694
})
8795
}

pkg/util/metrics_helper.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/go-kit/kit/log/level"
1111
"github.com/prometheus/client_golang/prometheus"
1212
dto "github.com/prometheus/client_model/go"
13+
"github.com/prometheus/prometheus/pkg/labels"
1314
)
1415

1516
// Data for single value (counter/gauge) with labels.
@@ -661,3 +662,39 @@ func (r *UserRegistries) BuildMetricFamiliesPerUser() MetricFamiliesPerUser {
661662
}
662663
return data
663664
}
665+
666+
// FromLabelPairsToLabels converts dto.LabelPair into labels.Labels.
667+
func FromLabelPairsToLabels(pairs []*dto.LabelPair) labels.Labels {
668+
builder := labels.NewBuilder(nil)
669+
for _, pair := range pairs {
670+
builder.Set(pair.GetName(), pair.GetValue())
671+
}
672+
return builder.Labels()
673+
}
674+
675+
// GetSumOfHistogramSampleCount returns the sum of samples count of histograms matching the provided metric name
676+
// and optional label matchers. Returns 0 if no metric matches.
677+
func GetSumOfHistogramSampleCount(families []*dto.MetricFamily, metricName string, matchers labels.Selector) uint64 {
678+
sum := uint64(0)
679+
680+
for _, metric := range families {
681+
if metric.GetName() != metricName {
682+
continue
683+
}
684+
685+
if metric.GetType() != dto.MetricType_HISTOGRAM {
686+
continue
687+
}
688+
689+
for _, series := range metric.GetMetric() {
690+
if !matchers.Matches(FromLabelPairsToLabels(series.GetLabel())) {
691+
continue
692+
}
693+
694+
histogram := series.GetHistogram()
695+
sum += histogram.GetSampleCount()
696+
}
697+
}
698+
699+
return sum
700+
}

0 commit comments

Comments
 (0)