Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions pkg/dataobj/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package dataobj
import (
"bytes"
"fmt"
"io"

"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool"
)

// A Builder builds data objects from a set of incoming log data. Log data is
Expand Down Expand Up @@ -56,22 +59,48 @@ func (b *Builder) Bytes() int {
return b.encoder.Bytes()
}

// Flush flushes all buffered data to the buffer provided. Calling Flush can
// result in a no-op if there is no buffered data to flush.
// Flush constructs a new Object from the accumulated sections. Allocated
// resources for the Object must be released by calling Close on the returned
// io.Closer. After closing, the returned Object must no longer be read.
//
// [Builder.Reset] is called after a successful Flush to discard any pending
// data and allow new data to be appended.
func (b *Builder) Flush(output *bytes.Buffer) (int64, error) {
sz, err := b.encoder.Flush(output)
// Flush returns an error if the object could not be constructed.
// [Builder.Reset] is called after a successful flush to discard any pending
// data, allowing new data to be appended.
func (b *Builder) Flush() (*Object, io.Closer, error) {
flushSize, err := b.encoder.FlushSize()
if err != nil {
return nil, nil, fmt.Errorf("determining object size: %w", err)
}

buf := bufpool.Get(int(flushSize))

closer := func() error {
bufpool.Put(buf)
return nil
}

sz, err := b.encoder.Flush(buf)
if err != nil {
return sz, fmt.Errorf("building object: %w", err)
return nil, nil, fmt.Errorf("flushing object: %w", err)
}

obj, err := FromReaderAt(bytes.NewReader(buf.Bytes()), sz)
if err != nil {
bufpool.Put(buf)
return nil, nil, fmt.Errorf("error building object: %w", err)
}

b.Reset()
return sz, nil
return obj, funcIOCloser(closer), nil
}

// Reset discards pending data and resets the builder to an empty state.
func (b *Builder) Reset() {
b.encoder.Reset()
}

type funcIOCloser func() error

func (fc funcIOCloser) Close() error {
return fc()
}
42 changes: 13 additions & 29 deletions pkg/dataobj/consumer/logsobj/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
package logsobj

import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"io"
"time"

"github.com/grafana/dskit/flagext"
Expand Down Expand Up @@ -293,27 +293,24 @@ func (b *Builder) estimatedSize() int {
return size
}

type FlushStats struct {
MinTimestamp time.Time
MaxTimestamp time.Time
// TimeRange returns the current time range of the builder.
func (b *Builder) TimeRange() (time.Time, time.Time) {
return b.streams.TimeRange()
}

// Flush flushes all buffered data to the buffer provided. Calling Flush can result
// in a no-op if there is no buffered data to flush.
//
// [Builder.Reset] is called after a successful Flush to discard any pending data and allow new data to be appended.
func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) {
// [Builder.Reset] is called after a successful Flush to discard any pending
// data and allow new data to be appended.
func (b *Builder) Flush() (*dataobj.Object, io.Closer, error) {
if b.state == builderStateEmpty {
return FlushStats{}, ErrBuilderEmpty
return nil, nil, ErrBuilderEmpty
}

timer := prometheus.NewTimer(b.metrics.buildTime)
defer timer.ObserveDuration()

// Appending sections resets them, so we need to load the time range before
// appending.
minTime, maxTime := b.streams.TimeRange()

// Flush sections one more time in case they have data.
var flushErrors []error

Expand All @@ -322,34 +319,21 @@ func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) {

if err := errors.Join(flushErrors...); err != nil {
b.metrics.flushFailures.Inc()
return FlushStats{}, fmt.Errorf("building object: %w", err)
return nil, nil, fmt.Errorf("building object: %w", err)
}

sz, err := b.builder.Flush(output)
obj, closer, err := b.builder.Flush()
if err != nil {
b.metrics.flushFailures.Inc()
return FlushStats{}, fmt.Errorf("building object: %w", err)
return nil, nil, fmt.Errorf("building object: %w", err)
}

b.metrics.builtSize.Observe(float64(sz))

var (
// We don't know if output was empty before calling Flush, so we only start
// reading from where we know writing began.

objReader = bytes.NewReader(output.Bytes()[output.Len()-int(sz):])
objLength = sz
)
obj, err := dataobj.FromReaderAt(objReader, objLength)
if err != nil {
b.metrics.flushFailures.Inc()
return FlushStats{}, fmt.Errorf("failed to create readable object: %w", err)
}
b.metrics.builtSize.Observe(float64(obj.Size()))

err = b.observeObject(context.Background(), obj)

b.Reset()
return FlushStats{MinTimestamp: minTime, MaxTimestamp: maxTime}, err
return obj, closer, err
}

func (b *Builder) observeObject(ctx context.Context, obj *dataobj.Object) error {
Expand Down
33 changes: 2 additions & 31 deletions pkg/dataobj/consumer/logsobj/builder_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package logsobj

import (
"bytes"
"context"
"errors"
"strings"
Expand All @@ -12,7 +11,6 @@ import (

"github.com/grafana/loki/pkg/push"

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/logproto"
Expand All @@ -29,9 +27,6 @@ var testBuilderConfig = BuilderConfig{
}

func TestBuilder(t *testing.T) {
buf := bytes.NewBuffer(nil)
dirtyBuf := bytes.NewBuffer([]byte("dirty"))

testStreams := []logproto.Stream{
{
Labels: `{cluster="test",app="foo"}`,
Expand Down Expand Up @@ -83,34 +78,10 @@ func TestBuilder(t *testing.T) {
for _, entry := range testStreams {
require.NoError(t, builder.Append(entry))
}
_, err = builder.Flush(buf)
require.NoError(t, err)
})

t.Run("Read", func(t *testing.T) {
obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
require.NoError(t, err)
require.Equal(t, 1, obj.Sections().Count(streams.CheckSection))
require.Equal(t, 1, obj.Sections().Count(logs.CheckSection))
})

t.Run("BuildWithDirtyBuffer", func(t *testing.T) {
builder, err := NewBuilder(testBuilderConfig)
obj, closer, err := builder.Flush()
require.NoError(t, err)
defer closer.Close()

for _, entry := range testStreams {
require.NoError(t, builder.Append(entry))
}

_, err = builder.Flush(dirtyBuf)
require.NoError(t, err)

require.Equal(t, buf.Len(), dirtyBuf.Len()-5)
})

t.Run("ReadFromDirtyBuffer", func(t *testing.T) {
obj, err := dataobj.FromReaderAt(bytes.NewReader(dirtyBuf.Bytes()[5:]), int64(dirtyBuf.Len()-5))
require.NoError(t, err)
require.Equal(t, 1, obj.Sections().Count(streams.CheckSection))
require.Equal(t, 1, obj.Sections().Count(logs.CheckSection))
})
Expand Down
43 changes: 16 additions & 27 deletions pkg/dataobj/consumer/partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,23 @@ func (p *partitionProcessor) initBuilder() error {
return initErr
}

func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error {
stats, err := p.builder.Flush(flushBuffer)
func (p *partitionProcessor) flushStream() error {
minTime, maxTime := p.builder.TimeRange()

obj, closer, err := p.builder.Flush()
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
return err
}
defer closer.Close()

objectPath, err := p.uploader.Upload(p.ctx, flushBuffer)
objectPath, err := p.uploader.Upload(p.ctx, obj)
if err != nil {
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
return err
}

if err := p.metastoreUpdater.Update(p.ctx, objectPath, stats.MinTimestamp, stats.MaxTimestamp); err != nil {
if err := p.metastoreUpdater.Update(p.ctx, objectPath, minTime, maxTime); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
return err
}
Expand Down Expand Up @@ -284,17 +287,10 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {
return
}

func() {
flushBuffer := p.bufPool.Get().(*bytes.Buffer)
defer p.bufPool.Put(flushBuffer)

flushBuffer.Reset()

if err := p.flushStream(flushBuffer); err != nil {
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
return
}
}()
if err := p.flushStream(); err != nil {
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
return
}

if err := p.commitRecords(record); err != nil {
level.Error(p.logger).Log("msg", "failed to commit records", "err", err)
Expand Down Expand Up @@ -346,17 +342,10 @@ func (p *partitionProcessor) idleFlush() {
return // Avoid checking too frequently
}

func() {
flushBuffer := p.bufPool.Get().(*bytes.Buffer)
defer p.bufPool.Put(flushBuffer)

flushBuffer.Reset()

if err := p.flushStream(flushBuffer); err != nil {
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
return
}
if err := p.flushStream(); err != nil {
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
return
}

p.lastFlush = time.Now()
}()
p.lastFlush = time.Now()
}
27 changes: 22 additions & 5 deletions pkg/dataobj/dataobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ import (

// An Object is a representation of a data object.
type Object struct {
dec *decoder
rr rangeReader
dec *decoder
size int64

metadata *filemd.Metadata
sections []*Section
Expand All @@ -95,8 +97,14 @@ type Object struct {
// FromBucket returns an error if the metadata of the Object cannot be read or
// if the provided ctx times out.
func FromBucket(ctx context.Context, bucket objstore.BucketReader, path string) (*Object, error) {
dec := &decoder{rr: &bucketRangeReader{bucket: bucket, path: path}}
obj := &Object{dec: dec}
rr := &bucketRangeReader{bucket: bucket, path: path}
size, err := rr.Size(ctx)
if err != nil {
return nil, fmt.Errorf("getting size: %w", err)
}

dec := &decoder{rr: rr}
obj := &Object{rr: rr, dec: dec, size: size}
if err := obj.init(ctx); err != nil {
return nil, err
}
Expand All @@ -107,8 +115,9 @@ func FromBucket(ctx context.Context, bucket objstore.BucketReader, path string)
// specifies the size of the data object in bytes. FromReaderAt returns an
// error if the metadata of the Object cannot be read.
func FromReaderAt(r io.ReaderAt, size int64) (*Object, error) {
dec := &decoder{rr: &readerAtRangeReader{size: size, r: r}}
obj := &Object{dec: dec}
rr := &readerAtRangeReader{size: size, r: r}
dec := &decoder{rr: rr}
obj := &Object{rr: rr, dec: dec, size: size}
if err := obj.init(context.Background()); err != nil {
return nil, err
}
Expand Down Expand Up @@ -139,6 +148,14 @@ func (o *Object) init(ctx context.Context) error {
return nil
}

// Size returns the size of the data object in bytes.
func (o *Object) Size() int64 { return o.size }

// Sections returns the list of sections available in the Object. The slice of
// returned sections must not be mutated.
func (o *Object) Sections() Sections { return o.sections }

// Reader returns a reader for the entire raw data object.
func (o *Object) Reader(ctx context.Context) (io.ReadCloser, error) {
return o.rr.Read(ctx)
}
17 changes: 16 additions & 1 deletion pkg/dataobj/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,25 @@ func (enc *encoder) Bytes() int {
return enc.data.Len()
}

// FlushSize returns the exact number of bytes that would be written upon
// calling [encoder.Flush].
func (enc *encoder) FlushSize() (int64, error) {
return enc.flush(streamio.Discard)
}

// Flush flushes any buffered data to the underlying writer. After flushing,
// enc is reset.
func (enc *encoder) Flush(w streamio.Writer) (int64, error) {
size, err := enc.flush(w)
if err != nil {
return size, err
}

enc.Reset()
return size, nil
}

func (enc *encoder) flush(w streamio.Writer) (int64, error) {
cw := countingWriter{w: w}

if enc.data == nil {
Expand Down Expand Up @@ -241,7 +257,6 @@ func (enc *encoder) Flush(w streamio.Writer) (int64, error) {
return cw.count, fmt.Errorf("writing magic tailer: %w", err)
}

enc.Reset()
return cw.count, nil
}

Expand Down
Loading
Loading