Skip to content

Commit e529c0e

Browse files
committed
Change to expose header at Distributor side
Signed-off-by: SungJin1212 <[email protected]>
1 parent 0d9a807 commit e529c0e

File tree

6 files changed

+136
-135
lines changed

6 files changed

+136
-135
lines changed

integration/e2e/util.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,3 +499,46 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe
499499

500500
return
501501
}
502+
503+
func GenerateV2SeriesWithSamples(
504+
name string,
505+
startTime time.Time,
506+
scrapeInterval time.Duration,
507+
startValue int,
508+
numSamples int,
509+
additionalLabels ...prompb.Label,
510+
) (symbols []string, series writev2.TimeSeries) {
511+
tsMillis := TimeToMilliseconds(startTime)
512+
durMillis := scrapeInterval.Milliseconds()
513+
514+
st := writev2.NewSymbolTable()
515+
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}
516+
517+
for _, label := range additionalLabels {
518+
lbs = append(lbs, labels.Label{
519+
Name: label.Name,
520+
Value: label.Value,
521+
})
522+
}
523+
524+
startTMillis := tsMillis
525+
samples := make([]writev2.Sample, numSamples)
526+
for i := 0; i < numSamples; i++ {
527+
scrapeJitter := rand.Int63n(10) + 1 // add a jitter to simulate real-world scenarios, refer to: https://github.com/prometheus/prometheus/issues/13213
528+
samples[i] = writev2.Sample{
529+
Timestamp: startTMillis + scrapeJitter,
530+
Value: float64(i + startValue),
531+
}
532+
startTMillis += durMillis
533+
}
534+
535+
series = writev2.TimeSeries{
536+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
537+
Samples: samples,
538+
Metadata: writev2.Metadata{
539+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
540+
},
541+
}
542+
543+
return st.Symbols(), series
544+
}

integration/remote_write_v2_test.go

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/prometheus/common/model"
14+
"github.com/prometheus/prometheus/model/labels"
1415
"github.com/prometheus/prometheus/prompb"
1516
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
1617
"github.com/prometheus/prometheus/tsdb/tsdbutil"
@@ -25,7 +26,7 @@ import (
2526

2627
func TestIngesterRollingUpdate(t *testing.T) {
2728
// Test ingester rolling update situation: when -distributor.remote-write2-enabled is true, and ingester uses the v1.19.0 image.
28-
// Expected: remote write 2.0 push success, but response header values are set to "0".
29+
// Expected: remote write 2.0 push success
2930
const blockRangePeriod = 5 * time.Second
3031
ingesterImage := "quay.io/cortexproject/cortex:v1.19.0"
3132

@@ -97,7 +98,7 @@ func TestIngesterRollingUpdate(t *testing.T) {
9798
res, err := c.PushV2(symbols1, series)
9899
require.NoError(t, err)
99100
require.Equal(t, 200, res.StatusCode)
100-
testPushHeader(t, res.Header, "0", "0", "0")
101+
testPushHeader(t, res.Header, "1", "0", "0")
101102

102103
// sample
103104
result, err := c.Query("test_series", now)
@@ -115,13 +116,13 @@ func TestIngesterRollingUpdate(t *testing.T) {
115116
res, err = c.PushV2(symbols2, histogramSeries)
116117
require.NoError(t, err)
117118
require.Equal(t, 200, res.StatusCode)
118-
testPushHeader(t, res.Header, "0", "0", "0")
119+
testPushHeader(t, res.Header, "0", "1", "0")
119120

120-
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
121+
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
121122
res, err = c.PushV2(symbols3, histogramFloatSeries)
122123
require.NoError(t, err)
123124
require.Equal(t, 200, res.StatusCode)
124-
testPushHeader(t, res.Header, "0", "0", "0")
125+
testPushHeader(t, res.Header, "0", "1", "0")
125126

126127
testHistogramTimestamp := now.Add(blockRangePeriod * 2)
127128
expectedHistogram := tsdbutil.GenerateTestHistogram(int64(histogramIdx))
@@ -138,6 +139,8 @@ func TestIngesterRollingUpdate(t *testing.T) {
138139
}
139140

140141
func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {
142+
// Test `-distributor.remote-write2-enabled=false` but the Sender pushes PRW2
143+
// Expected: status code is 200, but samples are not written.
141144
const blockRangePeriod = 5 * time.Second
142145

143146
s, err := e2e.NewScenario(networkName)
@@ -198,6 +201,11 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {
198201
res, err := c.PushV2(symbols1, series)
199202
require.NoError(t, err)
200203
require.Equal(t, 200, res.StatusCode)
204+
205+
// sample
206+
result, err := c.Query("test_series", now)
207+
require.NoError(t, err)
208+
require.Empty(t, result)
201209
}
202210

203211
func TestIngest(t *testing.T) {
@@ -281,7 +289,8 @@ func TestIngest(t *testing.T) {
281289
require.Equal(t, 200, res.StatusCode)
282290
testPushHeader(t, res.Header, "0", "1", "0")
283291

284-
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
292+
// float histogram
293+
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
285294
res, err = c.PushV2(symbols3, histogramFloatSeries)
286295
require.NoError(t, err)
287296
require.Equal(t, 200, res.StatusCode)
@@ -383,6 +392,71 @@ func TestExemplar(t *testing.T) {
383392
require.Equal(t, 1, len(exemplars))
384393
}
385394

395+
func Test_WriteStatWithReplication(t *testing.T) {
396+
// Test `X-Prometheus-Remote-Write-Samples-Written` header value
397+
// with the replication.
398+
s, err := e2e.NewScenario(networkName)
399+
require.NoError(t, err)
400+
defer s.Close()
401+
402+
// Start dependencies.
403+
consul := e2edb.NewConsulWithName("consul")
404+
require.NoError(t, s.StartAndWaitReady(consul))
405+
406+
flags := mergeFlags(
407+
AlertmanagerLocalFlags(),
408+
map[string]string{
409+
"-store.engine": blocksStorageEngine,
410+
"-blocks-storage.backend": "filesystem",
411+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
412+
"-blocks-storage.bucket-store.sync-interval": "15m",
413+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
414+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
415+
"-querier.query-store-for-labels-enabled": "true",
416+
"-blocks-storage.tsdb.ship-interval": "1s",
417+
"-blocks-storage.tsdb.enable-native-histograms": "true",
418+
// Ingester.
419+
"-ring.store": "consul",
420+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
421+
"-ingester.max-exemplars": "100",
422+
// Distributor.
423+
"-distributor.replication-factor": "3",
424+
"-distributor.remote-write2-enabled": "true",
425+
// Store-gateway.
426+
"-store-gateway.sharding-enabled": "false",
427+
// alert manager
428+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
429+
},
430+
)
431+
432+
// Start Cortex components.
433+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
434+
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
435+
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
436+
ingester3 := e2ecortex.NewIngester("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
437+
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))
438+
439+
// Wait until distributor have updated the ring.
440+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
441+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
442+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
443+
444+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
445+
require.NoError(t, err)
446+
447+
now := time.Now()
448+
449+
// series push
450+
start := now.Add(-time.Minute * 10)
451+
numSamples := 20
452+
scrapeInterval := 30 * time.Second
453+
symbols, series := e2e.GenerateV2SeriesWithSamples("test_series", start, scrapeInterval, 0, numSamples, prompb.Label{Name: "job", Value: "test"})
454+
res, err := c.PushV2(symbols, []writev2.TimeSeries{series})
455+
require.NoError(t, err)
456+
require.Equal(t, 200, res.StatusCode)
457+
testPushHeader(t, res.Header, "20", "0", "0")
458+
}
459+
386460
func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) {
387461
require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written"))
388462
require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written"))

pkg/distributor/distributor.go

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -817,18 +817,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
817817
keys := append(seriesKeys, metadataKeys...)
818818
initialMetadataIndex := len(seriesKeys)
819819

820-
ws := WriteStats{}
821-
822-
err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID, &ws)
820+
err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID)
823821
if err != nil {
824822
return nil, err
825823
}
826824

827825
resp := &cortexpb.WriteResponse{}
828826
if d.cfg.RemoteWrite2Enabled {
829-
resp.Samples = ws.LoadSamples()
830-
resp.Histograms = ws.LoadHistogram()
831-
resp.Exemplars = ws.LoadExemplars()
827+
// We simply expose validated samples, histograms, and exemplars
828+
// to the header. We should improve it to expose the actual
829+
// written values by the Ingesters.
830+
resp.Samples = int64(validatedFloatSamples)
831+
resp.Histograms = int64(validatedHistogramSamples)
832+
resp.Exemplars = int64(validatedExemplars)
832833
}
833834

834835
return resp, firstPartialErr
@@ -893,7 +894,7 @@ func (d *Distributor) cleanStaleIngesterMetrics() {
893894
}
894895
}
895896

896-
func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string, ws *WriteStats) error {
897+
func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string) error {
897898
span, _ := opentracing.StartSpanFromContext(ctx, "doBatch")
898899
defer span.Finish()
899900

@@ -928,7 +929,7 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s
928929
}
929930
}
930931

931-
return d.send(localCtx, ingester, timeseries, metadata, req.Source, ws)
932+
return d.send(localCtx, ingester, timeseries, metadata, req.Source)
932933
}, func() {
933934
cortexpb.ReuseSlice(req.Timeseries)
934935
req.Free()
@@ -1162,7 +1163,7 @@ func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
11621163
})
11631164
}
11641165

1165-
func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.WriteRequest_SourceEnum, ws *WriteStats) error {
1166+
func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.WriteRequest_SourceEnum) error {
11661167
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
11671168
if err != nil {
11681169
return err
@@ -1178,21 +1179,20 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
11781179
d.inflightClientRequests.Inc()
11791180
defer d.inflightClientRequests.Dec()
11801181

1181-
var resp *cortexpb.WriteResponse
11821182
if d.cfg.UseStreamPush {
11831183
req := &cortexpb.WriteRequest{
11841184
Timeseries: timeseries,
11851185
Metadata: metadata,
11861186
Source: source,
11871187
}
1188-
resp, err = c.PushStreamConnection(ctx, req)
1188+
_, err = c.PushStreamConnection(ctx, req)
11891189
} else {
11901190
req := cortexpb.PreallocWriteRequestFromPool()
11911191
req.Timeseries = timeseries
11921192
req.Metadata = metadata
11931193
req.Source = source
11941194

1195-
resp, err = c.PushPreAlloc(ctx, req)
1195+
_, err = c.PushPreAlloc(ctx, req)
11961196

11971197
// We should not reuse the req in case of errors:
11981198
// See: https://github.com/grpc/grpc-go/issues/6355
@@ -1214,13 +1214,6 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
12141214
}
12151215
}
12161216

1217-
if resp != nil {
1218-
// track write stats
1219-
ws.SetSamples(resp.Samples)
1220-
ws.SetHistograms(resp.Histograms)
1221-
ws.SetExemplars(resp.Exemplars)
1222-
}
1223-
12241217
return err
12251218
}
12261219

pkg/distributor/write_stats.go

Lines changed: 0 additions & 62 deletions
This file was deleted.

pkg/distributor/write_stats_test.go

Lines changed: 0 additions & 41 deletions
This file was deleted.

0 commit comments

Comments
 (0)