Skip to content

Commit f431206

Browse files
authored
storage/ingest: refactor recordConsumer.Consume to take iter.Seq (#12482)
#### What this PR does The PR updates internals of `pkg/storage/ingest`. Now, the `recordConsumer.Consume` accepts an iterator over Kafka records, `iter.Seq[*kgo.Record]`. Following the discussion in #12443 (comment), we want the block-builder to reuse the `pusherConsumer`, but we want to avoid exposing the existing `record` data structs. Instead of expanding the existing interface, I propose we refactor the `recordConsumer` and get rid of the intermediate `record` data type completely. --- I checked the benchmarks of the `PusherConsumer`, and, although this wasn't planned, I got some positive results from the refactoring (_I didn't investigate if the results represent real use case, or this is just a side effect of the benchmarks themselves_): ``` goos: darwin goarch: arm64 pkg: github.com/grafana/mimir/pkg/storage/ingest cpu: Apple M2 │ 1.bench │ 2.bench │ │ sec/op │ sec/op vs base │ PusherConsumer/sequential_pusher-8 332.6µ ± 0% 333.7µ ± 0% ~ (p=0.089 n=10) PusherConsumer/parallel_pusher-8 332.1µ ± 0% 334.0µ ± 0% +0.59% (p=0.003 n=10) geomean 332.4µ 333.9µ +0.46% │ 1.bench │ 2.bench │ │ B/op │ B/op vs base │ PusherConsumer/sequential_pusher-8 41.05Ki ± 0% 40.41Ki ± 0% -1.57% (p=0.000 n=10) PusherConsumer/parallel_pusher-8 41.64Ki ± 0% 40.97Ki ± 0% -1.61% (p=0.000 n=10) geomean 41.35Ki 40.69Ki -1.59% │ 1.bench │ 2.bench │ │ allocs/op │ allocs/op vs base │ PusherConsumer/sequential_pusher-8 564.0 ± 0% 519.0 ± 0% -7.98% (p=0.000 n=10) PusherConsumer/parallel_pusher-8 570.0 ± 0% 525.0 ± 0% -7.89% (p=0.000 n=10) geomean 567.0 522.0 -7.94% ``` #### Checklist - [x] Tests updated. - [ ] Documentation added. - [ ] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [ ] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features. Signed-off-by: Vladimir Varankin <[email protected]>
1 parent 0fb3072 commit f431206

File tree

4 files changed

+178
-169
lines changed

4 files changed

+178
-169
lines changed

pkg/storage/ingest/pusher.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"errors"
88
"fmt"
9+
"iter"
910
"math/rand/v2"
1011
"sync"
1112
"time"
@@ -17,6 +18,7 @@ import (
1718
"github.com/grafana/dskit/middleware"
1819
"github.com/grafana/dskit/multierror"
1920
"github.com/grafana/dskit/user"
21+
"github.com/twmb/franz-go/pkg/kgo"
2022

2123
"github.com/grafana/mimir/pkg/mimirpb"
2224
util_log "github.com/grafana/mimir/pkg/util/log"
@@ -60,7 +62,7 @@ func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsu
6062

6163
// Consume implements the recordConsumer interface.
6264
// It'll use a separate goroutine to unmarshal the next record while we push the current record to storage.
63-
func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnErr error) {
65+
func (c pusherConsumer) Consume(ctx context.Context, records iter.Seq[*kgo.Record]) (returnErr error) {
6466
defer func(processingStart time.Time) {
6567
c.metrics.processingTimeSeconds.Observe(time.Since(processingStart).Seconds())
6668
}(time.Now())
@@ -71,7 +73,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr
7173
ctx context.Context
7274
tenantID string
7375
err error
74-
index int
76+
offset int64
7577
}
7678

7779
recordsChannel := make(chan parsedRecord)
@@ -80,10 +82,10 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr
8082
ctx, cancel := context.WithCancelCause(ctx)
8183

8284
// Now, unmarshal the records into the channel.
83-
go func(unmarshalCtx context.Context, records []record, ch chan<- parsedRecord) {
85+
go func(unmarshalCtx context.Context, records iter.Seq[*kgo.Record], ch chan<- parsedRecord) {
8486
defer close(ch)
8587

86-
for index, r := range records {
88+
for rec := range records {
8789
// Before we being unmarshalling the write request check if the context was cancelled.
8890
select {
8991
case <-unmarshalCtx.Done():
@@ -93,18 +95,18 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr
9395
}
9496

9597
parsed := parsedRecord{
96-
ctx: r.ctx,
97-
tenantID: r.tenantID,
9898
PreallocWriteRequest: &mimirpb.PreallocWriteRequest{},
99-
index: index,
99+
// This context carries the tracing data for this individual record;
100+
// kotel populates this data when it fetches the messages.
101+
ctx: rec.Context,
102+
tenantID: string(rec.Key),
103+
offset: rec.Offset,
100104
}
101105

102-
if r.version > LatestRecordVersion {
103-
parsed.err = fmt.Errorf("received a record with an unsupported version: %d, max supported version: %d", r.version, LatestRecordVersion)
104-
}
106+
recVersion := ParseRecordVersion(rec)
105107

106108
// We don't free the WriteRequest slices because they are being freed by a level below.
107-
err := DeserializeRecordContent(r.content, parsed.PreallocWriteRequest, r.version)
109+
err := DeserializeRecordContent(rec.Value, parsed.PreallocWriteRequest, recVersion)
108110
if err != nil {
109111
parsed.err = fmt.Errorf("parsing ingest consumer write request: %w", err)
110112
}
@@ -121,8 +123,8 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr
121123
// We accumulate the total bytes across all records per tenant to determine the number of timeseries we expected to receive.
122124
// Then, we'll use that to determine the number of shards we need to parallelize the writes.
123125
var bytesPerTenant = make(map[string]int)
124-
for _, r := range records {
125-
bytesPerTenant[r.tenantID] += len(r.content)
126+
for rec := range records {
127+
bytesPerTenant[string(rec.Key)] += len(rec.Value)
126128
}
127129

128130
// Create and start the storage writer.
@@ -149,7 +151,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) (returnEr
149151
err := c.pushToStorage(r.ctx, r.tenantID, &r.WriteRequest, writer)
150152
if err != nil {
151153
cancel(cancellation.NewErrorf("error while pushing to storage")) // Stop the unmarshalling goroutine.
152-
return fmt.Errorf("consuming record at index %d for tenant %s: %w", r.index, r.tenantID, err)
154+
return fmt.Errorf("consuming record at offset %d for tenant %s: %w", r.offset, r.tenantID, err)
153155
}
154156
}
155157

0 commit comments

Comments
 (0)