Skip to content

Commit a3564cd

Browse files
committed
chore(dataobj): return Object on Builder.Flush
This commit updates Builder.Flush to return an Object rather than writing bytes to an argument. To support this change, dataobj.Object has two additional methods: - dataobj.Object.Size, to retrieve the full encoded size of the object - dataobj.Object.Reader, to read from the encoded object This API change allows downstream users of dataobj.Builder to stream the encoded object for analysis or uploading, without requiring the caller to retain the entire object in memory at once. In the future, this will be used to reduce the memory overhead of dataobj-consumers by buffering flushed sections on an ephemeral disk instead of in memory.
1 parent e1bc2f9 commit a3564cd

31 files changed

+362
-352
lines changed

pkg/dataobj/builder.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package dataobj
33
import (
44
"bytes"
55
"fmt"
6+
"io"
7+
8+
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool"
69
)
710

811
// A Builder builds data objects from a set of incoming log data. Log data is
@@ -56,22 +59,48 @@ func (b *Builder) Bytes() int {
5659
return b.encoder.Bytes()
5760
}
5861

59-
// Flush flushes all buffered data to the buffer provided. Calling Flush can
60-
// result in a no-op if there is no buffered data to flush.
62+
// Flush constructs a new Object from the accumulated sections. Allocated
63+
// resources for the Object must be released by calling Close on the returned
64+
// io.Closer. After closing, the returned Object must no longer be read.
6165
//
62-
// [Builder.Reset] is called after a successful Flush to discard any pending
63-
// data and allow new data to be appended.
64-
func (b *Builder) Flush(output *bytes.Buffer) (int64, error) {
65-
sz, err := b.encoder.Flush(output)
66+
// Flush returns an error if the object could not be constructed.
67+
// [Builder.Reset] is called after a successful flush to discard any pending
68+
// data, allowing new data to be appended.
69+
func (b *Builder) Flush() (*Object, io.Closer, error) {
70+
flushSize, err := b.encoder.FlushSize()
71+
if err != nil {
72+
return nil, nil, fmt.Errorf("determining object size: %w", err)
73+
}
74+
75+
buf := bufpool.Get(int(flushSize))
76+
77+
closer := func() error {
78+
bufpool.Put(buf)
79+
return nil
80+
}
81+
82+
sz, err := b.encoder.Flush(buf)
6683
if err != nil {
67-
return sz, fmt.Errorf("building object: %w", err)
84+
return nil, nil, fmt.Errorf("flushing object: %w", err)
85+
}
86+
87+
obj, err := FromReaderAt(bytes.NewReader(buf.Bytes()), sz)
88+
if err != nil {
89+
bufpool.Put(buf)
90+
return nil, nil, fmt.Errorf("error building object: %w", err)
6891
}
6992

7093
b.Reset()
71-
return sz, nil
94+
return obj, funcIOCloser(closer), nil
7295
}
7396

7497
// Reset discards pending data and resets the builder to an empty state.
7598
func (b *Builder) Reset() {
7699
b.encoder.Reset()
77100
}
101+
102+
type funcIOCloser func() error
103+
104+
func (fc funcIOCloser) Close() error {
105+
return fc()
106+
}

pkg/dataobj/consumer/logsobj/builder.go

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
package logsobj
33

44
import (
5-
"bytes"
65
"context"
76
"errors"
87
"flag"
98
"fmt"
9+
"io"
1010
"time"
1111

1212
"github.com/grafana/dskit/flagext"
@@ -293,27 +293,24 @@ func (b *Builder) estimatedSize() int {
293293
return size
294294
}
295295

296-
type FlushStats struct {
297-
MinTimestamp time.Time
298-
MaxTimestamp time.Time
296+
// TimeRange returns the current time range of the builder.
297+
func (b *Builder) TimeRange() (time.Time, time.Time) {
298+
return b.streams.TimeRange()
299299
}
300300

301301
// Flush flushes all buffered data to the buffer provided. Calling Flush can result
302302
// in a no-op if there is no buffered data to flush.
303303
//
304-
// [Builder.Reset] is called after a successful Flush to discard any pending data and allow new data to be appended.
305-
func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) {
304+
// [Builder.Reset] is called after a successful Flush to discard any pending
305+
// data and allow new data to be appended.
306+
func (b *Builder) Flush() (*dataobj.Object, io.Closer, error) {
306307
if b.state == builderStateEmpty {
307-
return FlushStats{}, ErrBuilderEmpty
308+
return nil, nil, ErrBuilderEmpty
308309
}
309310

310311
timer := prometheus.NewTimer(b.metrics.buildTime)
311312
defer timer.ObserveDuration()
312313

313-
// Appending sections resets them, so we need to load the time range before
314-
// appending.
315-
minTime, maxTime := b.streams.TimeRange()
316-
317314
// Flush sections one more time in case they have data.
318315
var flushErrors []error
319316

@@ -322,34 +319,21 @@ func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) {
322319

323320
if err := errors.Join(flushErrors...); err != nil {
324321
b.metrics.flushFailures.Inc()
325-
return FlushStats{}, fmt.Errorf("building object: %w", err)
322+
return nil, nil, fmt.Errorf("building object: %w", err)
326323
}
327324

328-
sz, err := b.builder.Flush(output)
325+
obj, closer, err := b.builder.Flush()
329326
if err != nil {
330327
b.metrics.flushFailures.Inc()
331-
return FlushStats{}, fmt.Errorf("building object: %w", err)
328+
return nil, nil, fmt.Errorf("building object: %w", err)
332329
}
333330

334-
b.metrics.builtSize.Observe(float64(sz))
335-
336-
var (
337-
// We don't know if output was empty before calling Flush, so we only start
338-
// reading from where we know writing began.
339-
340-
objReader = bytes.NewReader(output.Bytes()[output.Len()-int(sz):])
341-
objLength = sz
342-
)
343-
obj, err := dataobj.FromReaderAt(objReader, objLength)
344-
if err != nil {
345-
b.metrics.flushFailures.Inc()
346-
return FlushStats{}, fmt.Errorf("failed to create readable object: %w", err)
347-
}
331+
b.metrics.builtSize.Observe(float64(obj.Size()))
348332

349333
err = b.observeObject(context.Background(), obj)
350334

351335
b.Reset()
352-
return FlushStats{MinTimestamp: minTime, MaxTimestamp: maxTime}, err
336+
return obj, closer, err
353337
}
354338

355339
func (b *Builder) observeObject(ctx context.Context, obj *dataobj.Object) error {

pkg/dataobj/consumer/logsobj/builder_test.go

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package logsobj
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"strings"
@@ -12,7 +11,6 @@ import (
1211

1312
"github.com/grafana/loki/pkg/push"
1413

15-
"github.com/grafana/loki/v3/pkg/dataobj"
1614
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
1715
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
1816
"github.com/grafana/loki/v3/pkg/logproto"
@@ -29,9 +27,6 @@ var testBuilderConfig = BuilderConfig{
2927
}
3028

3129
func TestBuilder(t *testing.T) {
32-
buf := bytes.NewBuffer(nil)
33-
dirtyBuf := bytes.NewBuffer([]byte("dirty"))
34-
3530
testStreams := []logproto.Stream{
3631
{
3732
Labels: `{cluster="test",app="foo"}`,
@@ -83,34 +78,10 @@ func TestBuilder(t *testing.T) {
8378
for _, entry := range testStreams {
8479
require.NoError(t, builder.Append(entry))
8580
}
86-
_, err = builder.Flush(buf)
87-
require.NoError(t, err)
88-
})
89-
90-
t.Run("Read", func(t *testing.T) {
91-
obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
92-
require.NoError(t, err)
93-
require.Equal(t, 1, obj.Sections().Count(streams.CheckSection))
94-
require.Equal(t, 1, obj.Sections().Count(logs.CheckSection))
95-
})
96-
97-
t.Run("BuildWithDirtyBuffer", func(t *testing.T) {
98-
builder, err := NewBuilder(testBuilderConfig)
81+
obj, closer, err := builder.Flush()
9982
require.NoError(t, err)
83+
defer closer.Close()
10084

101-
for _, entry := range testStreams {
102-
require.NoError(t, builder.Append(entry))
103-
}
104-
105-
_, err = builder.Flush(dirtyBuf)
106-
require.NoError(t, err)
107-
108-
require.Equal(t, buf.Len(), dirtyBuf.Len()-5)
109-
})
110-
111-
t.Run("ReadFromDirtyBuffer", func(t *testing.T) {
112-
obj, err := dataobj.FromReaderAt(bytes.NewReader(dirtyBuf.Bytes()[5:]), int64(dirtyBuf.Len()-5))
113-
require.NoError(t, err)
11485
require.Equal(t, 1, obj.Sections().Count(streams.CheckSection))
11586
require.Equal(t, 1, obj.Sections().Count(logs.CheckSection))
11687
})

pkg/dataobj/consumer/partition_processor.go

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -194,20 +194,23 @@ func (p *partitionProcessor) initBuilder() error {
194194
return initErr
195195
}
196196

197-
func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error {
198-
stats, err := p.builder.Flush(flushBuffer)
197+
func (p *partitionProcessor) flushStream() error {
198+
minTime, maxTime := p.builder.TimeRange()
199+
200+
obj, closer, err := p.builder.Flush()
199201
if err != nil {
200202
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
201203
return err
202204
}
205+
defer closer.Close()
203206

204-
objectPath, err := p.uploader.Upload(p.ctx, flushBuffer)
207+
objectPath, err := p.uploader.Upload(p.ctx, obj)
205208
if err != nil {
206209
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
207210
return err
208211
}
209212

210-
if err := p.metastoreUpdater.Update(p.ctx, objectPath, stats.MinTimestamp, stats.MaxTimestamp); err != nil {
213+
if err := p.metastoreUpdater.Update(p.ctx, objectPath, minTime, maxTime); err != nil {
211214
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
212215
return err
213216
}
@@ -284,17 +287,10 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {
284287
return
285288
}
286289

287-
func() {
288-
flushBuffer := p.bufPool.Get().(*bytes.Buffer)
289-
defer p.bufPool.Put(flushBuffer)
290-
291-
flushBuffer.Reset()
292-
293-
if err := p.flushStream(flushBuffer); err != nil {
294-
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
295-
return
296-
}
297-
}()
290+
if err := p.flushStream(); err != nil {
291+
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
292+
return
293+
}
298294

299295
if err := p.commitRecords(record); err != nil {
300296
level.Error(p.logger).Log("msg", "failed to commit records", "err", err)
@@ -346,17 +342,10 @@ func (p *partitionProcessor) idleFlush() {
346342
return // Avoid checking too frequently
347343
}
348344

349-
func() {
350-
flushBuffer := p.bufPool.Get().(*bytes.Buffer)
351-
defer p.bufPool.Put(flushBuffer)
352-
353-
flushBuffer.Reset()
354-
355-
if err := p.flushStream(flushBuffer); err != nil {
356-
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
357-
return
358-
}
345+
if err := p.flushStream(); err != nil {
346+
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
347+
return
348+
}
359349

360-
p.lastFlush = time.Now()
361-
}()
350+
p.lastFlush = time.Now()
362351
}

pkg/dataobj/dataobj.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ import (
8585

8686
// An Object is a representation of a data object.
8787
type Object struct {
88-
dec *decoder
88+
rr rangeReader
89+
dec *decoder
90+
size int64
8991

9092
metadata *filemd.Metadata
9193
sections []*Section
@@ -95,8 +97,14 @@ type Object struct {
9597
// FromBucket returns an error if the metadata of the Object cannot be read or
9698
// if the provided ctx times out.
9799
func FromBucket(ctx context.Context, bucket objstore.BucketReader, path string) (*Object, error) {
98-
dec := &decoder{rr: &bucketRangeReader{bucket: bucket, path: path}}
99-
obj := &Object{dec: dec}
100+
rr := &bucketRangeReader{bucket: bucket, path: path}
101+
size, err := rr.Size(ctx)
102+
if err != nil {
103+
return nil, fmt.Errorf("getting size: %w", err)
104+
}
105+
106+
dec := &decoder{rr: rr}
107+
obj := &Object{rr: rr, dec: dec, size: size}
100108
if err := obj.init(ctx); err != nil {
101109
return nil, err
102110
}
@@ -107,8 +115,9 @@ func FromBucket(ctx context.Context, bucket objstore.BucketReader, path string)
107115
// specifies the size of the data object in bytes. FromReaderAt returns an
108116
// error if the metadata of the Object cannot be read.
109117
func FromReaderAt(r io.ReaderAt, size int64) (*Object, error) {
110-
dec := &decoder{rr: &readerAtRangeReader{size: size, r: r}}
111-
obj := &Object{dec: dec}
118+
rr := &readerAtRangeReader{size: size, r: r}
119+
dec := &decoder{rr: rr}
120+
obj := &Object{rr: rr, dec: dec, size: size}
112121
if err := obj.init(context.Background()); err != nil {
113122
return nil, err
114123
}
@@ -139,6 +148,14 @@ func (o *Object) init(ctx context.Context) error {
139148
return nil
140149
}
141150

151+
// Size returns the size of the data object in bytes.
152+
func (o *Object) Size() int64 { return o.size }
153+
142154
// Sections returns the list of sections available in the Object. The slice of
143155
// returned sections must not be mutated.
144156
func (o *Object) Sections() Sections { return o.sections }
157+
158+
// Reader returns a reader for the entire raw data object.
159+
func (o *Object) Reader(ctx context.Context) (io.ReadCloser, error) {
160+
return o.rr.Read(ctx)
161+
}

pkg/dataobj/encoder.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,25 @@ func (enc *encoder) Bytes() int {
196196
return enc.data.Len()
197197
}
198198

199+
// FlushSize returns the exact number of bytes that would be written upon
200+
// calling [encoder.Flush].
201+
func (enc *encoder) FlushSize() (int64, error) {
202+
return enc.flush(streamio.Discard)
203+
}
204+
199205
// Flush flushes any buffered data to the underlying writer. After flushing,
200206
// enc is reset.
201207
func (enc *encoder) Flush(w streamio.Writer) (int64, error) {
208+
size, err := enc.flush(w)
209+
if err != nil {
210+
return size, err
211+
}
212+
213+
enc.Reset()
214+
return size, nil
215+
}
216+
217+
func (enc *encoder) flush(w streamio.Writer) (int64, error) {
202218
cw := countingWriter{w: w}
203219

204220
if enc.data == nil {
@@ -241,7 +257,6 @@ func (enc *encoder) Flush(w streamio.Writer) (int64, error) {
241257
return cw.count, fmt.Errorf("writing magic tailer: %w", err)
242258
}
243259

244-
enc.Reset()
245260
return cw.count, nil
246261
}
247262

0 commit comments

Comments
 (0)