Skip to content

Commit d4b027a

Browse files
authored
chore(dataobj): return Object on Builder.Flush (#18622)
1 parent 0ec787f commit d4b027a

31 files changed

+362
-352
lines changed

pkg/dataobj/builder.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package dataobj
33
import (
44
"bytes"
55
"fmt"
6+
"io"
7+
8+
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool"
69
)
710

811
// A Builder builds data objects from a set of incoming log data. Log data is
@@ -56,22 +59,48 @@ func (b *Builder) Bytes() int {
5659
return b.encoder.Bytes()
5760
}
5861

59-
// Flush flushes all buffered data to the buffer provided. Calling Flush can
60-
// result in a no-op if there is no buffered data to flush.
62+
// Flush constructs a new Object from the accumulated sections. Allocated
63+
// resources for the Object must be released by calling Close on the returned
64+
// io.Closer. After closing, the returned Object must no longer be read.
6165
//
62-
// [Builder.Reset] is called after a successful Flush to discard any pending
63-
// data and allow new data to be appended.
64-
func (b *Builder) Flush(output *bytes.Buffer) (int64, error) {
65-
sz, err := b.encoder.Flush(output)
66+
// Flush returns an error if the object could not be constructed.
67+
// [Builder.Reset] is called after a successful flush to discard any pending
68+
// data, allowing new data to be appended.
69+
func (b *Builder) Flush() (*Object, io.Closer, error) {
70+
flushSize, err := b.encoder.FlushSize()
71+
if err != nil {
72+
return nil, nil, fmt.Errorf("determining object size: %w", err)
73+
}
74+
75+
buf := bufpool.Get(int(flushSize))
76+
77+
closer := func() error {
78+
bufpool.Put(buf)
79+
return nil
80+
}
81+
82+
sz, err := b.encoder.Flush(buf)
6683
if err != nil {
67-
return sz, fmt.Errorf("building object: %w", err)
84+
return nil, nil, fmt.Errorf("flushing object: %w", err)
85+
}
86+
87+
obj, err := FromReaderAt(bytes.NewReader(buf.Bytes()), sz)
88+
if err != nil {
89+
bufpool.Put(buf)
90+
return nil, nil, fmt.Errorf("error building object: %w", err)
6891
}
6992

7093
b.Reset()
71-
return sz, nil
94+
return obj, funcIOCloser(closer), nil
7295
}
7396

7497
// Reset discards pending data and resets the builder to an empty state.
7598
func (b *Builder) Reset() {
7699
b.encoder.Reset()
77100
}
101+
102+
type funcIOCloser func() error
103+
104+
func (fc funcIOCloser) Close() error {
105+
return fc()
106+
}

pkg/dataobj/consumer/logsobj/builder.go

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
package logsobj
33

44
import (
5-
"bytes"
65
"context"
76
"errors"
87
"flag"
98
"fmt"
9+
"io"
1010
"time"
1111

1212
"github.com/grafana/dskit/flagext"
@@ -293,27 +293,24 @@ func (b *Builder) estimatedSize() int {
293293
return size
294294
}
295295

296-
type FlushStats struct {
297-
MinTimestamp time.Time
298-
MaxTimestamp time.Time
296+
// TimeRange returns the current time range of the builder.
297+
func (b *Builder) TimeRange() (time.Time, time.Time) {
298+
return b.streams.TimeRange()
299299
}
300300

301301
// Flush flushes all buffered data to the buffer provided. Calling Flush can result
302302
// in a no-op if there is no buffered data to flush.
303303
//
304-
// [Builder.Reset] is called after a successful Flush to discard any pending data and allow new data to be appended.
305-
func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) {
304+
// [Builder.Reset] is called after a successful Flush to discard any pending
305+
// data and allow new data to be appended.
306+
func (b *Builder) Flush() (*dataobj.Object, io.Closer, error) {
306307
if b.state == builderStateEmpty {
307-
return FlushStats{}, ErrBuilderEmpty
308+
return nil, nil, ErrBuilderEmpty
308309
}
309310

310311
timer := prometheus.NewTimer(b.metrics.buildTime)
311312
defer timer.ObserveDuration()
312313

313-
// Appending sections resets them, so we need to load the time range before
314-
// appending.
315-
minTime, maxTime := b.streams.TimeRange()
316-
317314
// Flush sections one more time in case they have data.
318315
var flushErrors []error
319316

@@ -322,34 +319,21 @@ func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) {
322319

323320
if err := errors.Join(flushErrors...); err != nil {
324321
b.metrics.flushFailures.Inc()
325-
return FlushStats{}, fmt.Errorf("building object: %w", err)
322+
return nil, nil, fmt.Errorf("building object: %w", err)
326323
}
327324

328-
sz, err := b.builder.Flush(output)
325+
obj, closer, err := b.builder.Flush()
329326
if err != nil {
330327
b.metrics.flushFailures.Inc()
331-
return FlushStats{}, fmt.Errorf("building object: %w", err)
328+
return nil, nil, fmt.Errorf("building object: %w", err)
332329
}
333330

334-
b.metrics.builtSize.Observe(float64(sz))
335-
336-
var (
337-
// We don't know if output was empty before calling Flush, so we only start
338-
// reading from where we know writing began.
339-
340-
objReader = bytes.NewReader(output.Bytes()[output.Len()-int(sz):])
341-
objLength = sz
342-
)
343-
obj, err := dataobj.FromReaderAt(objReader, objLength)
344-
if err != nil {
345-
b.metrics.flushFailures.Inc()
346-
return FlushStats{}, fmt.Errorf("failed to create readable object: %w", err)
347-
}
331+
b.metrics.builtSize.Observe(float64(obj.Size()))
348332

349333
err = b.observeObject(context.Background(), obj)
350334

351335
b.Reset()
352-
return FlushStats{MinTimestamp: minTime, MaxTimestamp: maxTime}, err
336+
return obj, closer, err
353337
}
354338

355339
func (b *Builder) observeObject(ctx context.Context, obj *dataobj.Object) error {

pkg/dataobj/consumer/logsobj/builder_test.go

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package logsobj
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"strings"
@@ -12,7 +11,6 @@ import (
1211

1312
"github.com/grafana/loki/pkg/push"
1413

15-
"github.com/grafana/loki/v3/pkg/dataobj"
1614
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
1715
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
1816
"github.com/grafana/loki/v3/pkg/logproto"
@@ -29,9 +27,6 @@ var testBuilderConfig = BuilderConfig{
2927
}
3028

3129
func TestBuilder(t *testing.T) {
32-
buf := bytes.NewBuffer(nil)
33-
dirtyBuf := bytes.NewBuffer([]byte("dirty"))
34-
3530
testStreams := []logproto.Stream{
3631
{
3732
Labels: `{cluster="test",app="foo"}`,
@@ -83,34 +78,10 @@ func TestBuilder(t *testing.T) {
8378
for _, entry := range testStreams {
8479
require.NoError(t, builder.Append(entry))
8580
}
86-
_, err = builder.Flush(buf)
87-
require.NoError(t, err)
88-
})
89-
90-
t.Run("Read", func(t *testing.T) {
91-
obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
92-
require.NoError(t, err)
93-
require.Equal(t, 1, obj.Sections().Count(streams.CheckSection))
94-
require.Equal(t, 1, obj.Sections().Count(logs.CheckSection))
95-
})
96-
97-
t.Run("BuildWithDirtyBuffer", func(t *testing.T) {
98-
builder, err := NewBuilder(testBuilderConfig)
81+
obj, closer, err := builder.Flush()
9982
require.NoError(t, err)
83+
defer closer.Close()
10084

101-
for _, entry := range testStreams {
102-
require.NoError(t, builder.Append(entry))
103-
}
104-
105-
_, err = builder.Flush(dirtyBuf)
106-
require.NoError(t, err)
107-
108-
require.Equal(t, buf.Len(), dirtyBuf.Len()-5)
109-
})
110-
111-
t.Run("ReadFromDirtyBuffer", func(t *testing.T) {
112-
obj, err := dataobj.FromReaderAt(bytes.NewReader(dirtyBuf.Bytes()[5:]), int64(dirtyBuf.Len()-5))
113-
require.NoError(t, err)
11485
require.Equal(t, 1, obj.Sections().Count(streams.CheckSection))
11586
require.Equal(t, 1, obj.Sections().Count(logs.CheckSection))
11687
})

pkg/dataobj/consumer/partition_processor.go

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -194,20 +194,23 @@ func (p *partitionProcessor) initBuilder() error {
194194
return initErr
195195
}
196196

197-
func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error {
198-
stats, err := p.builder.Flush(flushBuffer)
197+
func (p *partitionProcessor) flushStream() error {
198+
minTime, maxTime := p.builder.TimeRange()
199+
200+
obj, closer, err := p.builder.Flush()
199201
if err != nil {
200202
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
201203
return err
202204
}
205+
defer closer.Close()
203206

204-
objectPath, err := p.uploader.Upload(p.ctx, flushBuffer)
207+
objectPath, err := p.uploader.Upload(p.ctx, obj)
205208
if err != nil {
206209
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
207210
return err
208211
}
209212

210-
if err := p.metastoreUpdater.Update(p.ctx, objectPath, stats.MinTimestamp, stats.MaxTimestamp); err != nil {
213+
if err := p.metastoreUpdater.Update(p.ctx, objectPath, minTime, maxTime); err != nil {
211214
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
212215
return err
213216
}
@@ -284,17 +287,10 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {
284287
return
285288
}
286289

287-
func() {
288-
flushBuffer := p.bufPool.Get().(*bytes.Buffer)
289-
defer p.bufPool.Put(flushBuffer)
290-
291-
flushBuffer.Reset()
292-
293-
if err := p.flushStream(flushBuffer); err != nil {
294-
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
295-
return
296-
}
297-
}()
290+
if err := p.flushStream(); err != nil {
291+
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
292+
return
293+
}
298294

299295
if err := p.commitRecords(record); err != nil {
300296
level.Error(p.logger).Log("msg", "failed to commit records", "err", err)
@@ -346,17 +342,10 @@ func (p *partitionProcessor) idleFlush() {
346342
return // Avoid checking too frequently
347343
}
348344

349-
func() {
350-
flushBuffer := p.bufPool.Get().(*bytes.Buffer)
351-
defer p.bufPool.Put(flushBuffer)
352-
353-
flushBuffer.Reset()
354-
355-
if err := p.flushStream(flushBuffer); err != nil {
356-
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
357-
return
358-
}
345+
if err := p.flushStream(); err != nil {
346+
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
347+
return
348+
}
359349

360-
p.lastFlush = time.Now()
361-
}()
350+
p.lastFlush = time.Now()
362351
}

pkg/dataobj/dataobj.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ import (
8585

8686
// An Object is a representation of a data object.
8787
type Object struct {
88-
dec *decoder
88+
rr rangeReader
89+
dec *decoder
90+
size int64
8991

9092
metadata *filemd.Metadata
9193
sections []*Section
@@ -95,8 +97,14 @@ type Object struct {
9597
// FromBucket returns an error if the metadata of the Object cannot be read or
9698
// if the provided ctx times out.
9799
func FromBucket(ctx context.Context, bucket objstore.BucketReader, path string) (*Object, error) {
98-
dec := &decoder{rr: &bucketRangeReader{bucket: bucket, path: path}}
99-
obj := &Object{dec: dec}
100+
rr := &bucketRangeReader{bucket: bucket, path: path}
101+
size, err := rr.Size(ctx)
102+
if err != nil {
103+
return nil, fmt.Errorf("getting size: %w", err)
104+
}
105+
106+
dec := &decoder{rr: rr}
107+
obj := &Object{rr: rr, dec: dec, size: size}
100108
if err := obj.init(ctx); err != nil {
101109
return nil, err
102110
}
@@ -107,8 +115,9 @@ func FromBucket(ctx context.Context, bucket objstore.BucketReader, path string)
107115
// specifies the size of the data object in bytes. FromReaderAt returns an
108116
// error if the metadata of the Object cannot be read.
109117
func FromReaderAt(r io.ReaderAt, size int64) (*Object, error) {
110-
dec := &decoder{rr: &readerAtRangeReader{size: size, r: r}}
111-
obj := &Object{dec: dec}
118+
rr := &readerAtRangeReader{size: size, r: r}
119+
dec := &decoder{rr: rr}
120+
obj := &Object{rr: rr, dec: dec, size: size}
112121
if err := obj.init(context.Background()); err != nil {
113122
return nil, err
114123
}
@@ -139,6 +148,14 @@ func (o *Object) init(ctx context.Context) error {
139148
return nil
140149
}
141150

151+
// Size returns the size of the data object in bytes.
152+
func (o *Object) Size() int64 { return o.size }
153+
142154
// Sections returns the list of sections available in the Object. The slice of
143155
// returned sections must not be mutated.
144156
func (o *Object) Sections() Sections { return o.sections }
157+
158+
// Reader returns a reader for the entire raw data object.
159+
func (o *Object) Reader(ctx context.Context) (io.ReadCloser, error) {
160+
return o.rr.Read(ctx)
161+
}

pkg/dataobj/encoder.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,25 @@ func (enc *encoder) Bytes() int {
196196
return enc.data.Len()
197197
}
198198

199+
// FlushSize returns the exact number of bytes that would be written upon
200+
// calling [encoder.Flush].
201+
func (enc *encoder) FlushSize() (int64, error) {
202+
return enc.flush(streamio.Discard)
203+
}
204+
199205
// Flush flushes any buffered data to the underlying writer. After flushing,
200206
// enc is reset.
201207
func (enc *encoder) Flush(w streamio.Writer) (int64, error) {
208+
size, err := enc.flush(w)
209+
if err != nil {
210+
return size, err
211+
}
212+
213+
enc.Reset()
214+
return size, nil
215+
}
216+
217+
func (enc *encoder) flush(w streamio.Writer) (int64, error) {
202218
cw := countingWriter{w: w}
203219

204220
if enc.data == nil {
@@ -241,7 +257,6 @@ func (enc *encoder) Flush(w streamio.Writer) (int64, error) {
241257
return cw.count, fmt.Errorf("writing magic tailer: %w", err)
242258
}
243259

244-
enc.Reset()
245260
return cw.count, nil
246261
}
247262

0 commit comments

Comments
 (0)