diff --git a/pkg/dataobj/builder.go b/pkg/dataobj/builder.go index 7b0105e580efd..015f88a873013 100644 --- a/pkg/dataobj/builder.go +++ b/pkg/dataobj/builder.go @@ -3,6 +3,9 @@ package dataobj import ( "bytes" "fmt" + "io" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool" ) // A Builder builds data objects from a set of incoming log data. Log data is @@ -56,22 +59,48 @@ func (b *Builder) Bytes() int { return b.encoder.Bytes() } -// Flush flushes all buffered data to the buffer provided. Calling Flush can -// result in a no-op if there is no buffered data to flush. +// Flush constructs a new Object from the accumulated sections. Allocated +// resources for the Object must be released by calling Close on the returned +// io.Closer. After closing, the returned Object must no longer be read. // -// [Builder.Reset] is called after a successful Flush to discard any pending -// data and allow new data to be appended. -func (b *Builder) Flush(output *bytes.Buffer) (int64, error) { - sz, err := b.encoder.Flush(output) +// Flush returns an error if the object could not be constructed. +// [Builder.Reset] is called after a successful flush to discard any pending +// data, allowing new data to be appended. +func (b *Builder) Flush() (*Object, io.Closer, error) { + flushSize, err := b.encoder.FlushSize() + if err != nil { + return nil, nil, fmt.Errorf("determining object size: %w", err) + } + + buf := bufpool.Get(int(flushSize)) + + closer := func() error { + bufpool.Put(buf) + return nil + } + + sz, err := b.encoder.Flush(buf) if err != nil { - return sz, fmt.Errorf("building object: %w", err) + return nil, nil, fmt.Errorf("flushing object: %w", err) + } + + obj, err := FromReaderAt(bytes.NewReader(buf.Bytes()), sz) + if err != nil { + bufpool.Put(buf) + return nil, nil, fmt.Errorf("error building object: %w", err) } b.Reset() - return sz, nil + return obj, funcIOCloser(closer), nil } // Reset discards pending data and resets the builder to an empty state. func (b *Builder) Reset() { b.encoder.Reset() } + +type funcIOCloser func() error + +func (fc funcIOCloser) Close() error { + return fc() +} diff --git a/pkg/dataobj/consumer/logsobj/builder.go b/pkg/dataobj/consumer/logsobj/builder.go index db0f78b1503b3..08ef3afeed45b 100644 --- a/pkg/dataobj/consumer/logsobj/builder.go +++ b/pkg/dataobj/consumer/logsobj/builder.go @@ -2,11 +2,11 @@ package logsobj import ( - "bytes" "context" "errors" "flag" "fmt" + "io" "time" "github.com/grafana/dskit/flagext" @@ -293,27 +293,24 @@ func (b *Builder) estimatedSize() int { return size } -type FlushStats struct { - MinTimestamp time.Time - MaxTimestamp time.Time +// TimeRange returns the current time range of the builder. +func (b *Builder) TimeRange() (time.Time, time.Time) { + return b.streams.TimeRange() } // Flush flushes all buffered data to the buffer provided. Calling Flush can result // in a no-op if there is no buffered data to flush. // -// [Builder.Reset] is called after a successful Flush to discard any pending data and allow new data to be appended. -func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) { +// [Builder.Reset] is called after a successful Flush to discard any pending +// data and allow new data to be appended. +func (b *Builder) Flush() (*dataobj.Object, io.Closer, error) { if b.state == builderStateEmpty { - return FlushStats{}, ErrBuilderEmpty + return nil, nil, ErrBuilderEmpty } timer := prometheus.NewTimer(b.metrics.buildTime) defer timer.ObserveDuration() - // Appending sections resets them, so we need to load the time range before - // appending. - minTime, maxTime := b.streams.TimeRange() - // Flush sections one more time in case they have data. var flushErrors []error @@ -322,34 +319,21 @@ func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) { if err := errors.Join(flushErrors...); err != nil { b.metrics.flushFailures.Inc() - return FlushStats{}, fmt.Errorf("building object: %w", err) + return nil, nil, fmt.Errorf("building object: %w", err) } - sz, err := b.builder.Flush(output) + obj, closer, err := b.builder.Flush() if err != nil { b.metrics.flushFailures.Inc() - return FlushStats{}, fmt.Errorf("building object: %w", err) + return nil, nil, fmt.Errorf("building object: %w", err) } - b.metrics.builtSize.Observe(float64(sz)) - - var ( - // We don't know if output was empty before calling Flush, so we only start - // reading from where we know writing began. - - objReader = bytes.NewReader(output.Bytes()[output.Len()-int(sz):]) - objLength = sz - ) - obj, err := dataobj.FromReaderAt(objReader, objLength) - if err != nil { - b.metrics.flushFailures.Inc() - return FlushStats{}, fmt.Errorf("failed to create readable object: %w", err) - } + b.metrics.builtSize.Observe(float64(obj.Size())) err = b.observeObject(context.Background(), obj) b.Reset() - return FlushStats{MinTimestamp: minTime, MaxTimestamp: maxTime}, err + return obj, closer, err } func (b *Builder) observeObject(ctx context.Context, obj *dataobj.Object) error { diff --git a/pkg/dataobj/consumer/logsobj/builder_test.go b/pkg/dataobj/consumer/logsobj/builder_test.go index 5cd6ecff29854..f24c7e9ea1336 100644 --- a/pkg/dataobj/consumer/logsobj/builder_test.go +++ b/pkg/dataobj/consumer/logsobj/builder_test.go @@ -1,7 +1,6 @@ package logsobj import ( - "bytes" "context" "errors" "strings" @@ -12,7 +11,6 @@ import ( "github.com/grafana/loki/pkg/push" - "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" "github.com/grafana/loki/v3/pkg/logproto" @@ -29,9 +27,6 @@ var testBuilderConfig = BuilderConfig{ } func TestBuilder(t *testing.T) { - buf := bytes.NewBuffer(nil) - dirtyBuf := bytes.NewBuffer([]byte("dirty")) - testStreams := []logproto.Stream{ { Labels: `{cluster="test",app="foo"}`, @@ -83,34 +78,10 @@ func TestBuilder(t *testing.T) { for _, entry := range testStreams { require.NoError(t, builder.Append(entry)) } - _, err = builder.Flush(buf) - require.NoError(t, err) - }) - - t.Run("Read", func(t *testing.T) { - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) - require.NoError(t, err) - require.Equal(t, 1, obj.Sections().Count(streams.CheckSection)) - require.Equal(t, 1, obj.Sections().Count(logs.CheckSection)) - }) - - t.Run("BuildWithDirtyBuffer", func(t *testing.T) { - builder, err := NewBuilder(testBuilderConfig) + obj, closer, err := builder.Flush() require.NoError(t, err) + defer closer.Close() - for _, entry := range testStreams { - require.NoError(t, builder.Append(entry)) - } - - _, err = builder.Flush(dirtyBuf) - require.NoError(t, err) - - require.Equal(t, buf.Len(), dirtyBuf.Len()-5) - }) - - t.Run("ReadFromDirtyBuffer", func(t *testing.T) { - obj, err := dataobj.FromReaderAt(bytes.NewReader(dirtyBuf.Bytes()[5:]), int64(dirtyBuf.Len()-5)) - require.NoError(t, err) require.Equal(t, 1, obj.Sections().Count(streams.CheckSection)) require.Equal(t, 1, obj.Sections().Count(logs.CheckSection)) }) diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index 5ada61cb07f51..f0c0025799ccc 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -194,20 +194,23 @@ func (p *partitionProcessor) initBuilder() error { return initErr } -func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error { - stats, err := p.builder.Flush(flushBuffer) +func (p *partitionProcessor) flushStream() error { + minTime, maxTime := p.builder.TimeRange() + + obj, closer, err := p.builder.Flush() if err != nil { level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) return err } + defer closer.Close() - objectPath, err := p.uploader.Upload(p.ctx, flushBuffer) + objectPath, err := p.uploader.Upload(p.ctx, obj) if err != nil { level.Error(p.logger).Log("msg", "failed to upload object", "err", err) return err } - if err := p.metastoreUpdater.Update(p.ctx, objectPath, stats.MinTimestamp, stats.MaxTimestamp); err != nil { + if err := p.metastoreUpdater.Update(p.ctx, objectPath, minTime, maxTime); err != nil { level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) return err } @@ -284,17 +287,10 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) { return } - func() { - flushBuffer := p.bufPool.Get().(*bytes.Buffer) - defer p.bufPool.Put(flushBuffer) - - flushBuffer.Reset() - - if err := p.flushStream(flushBuffer); err != nil { - level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) - return - } - }() + if err := p.flushStream(); err != nil { + level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) + return + } if err := p.commitRecords(record); err != nil { level.Error(p.logger).Log("msg", "failed to commit records", "err", err) @@ -346,17 +342,10 @@ func (p *partitionProcessor) idleFlush() { return // Avoid checking too frequently } - func() { - flushBuffer := p.bufPool.Get().(*bytes.Buffer) - defer p.bufPool.Put(flushBuffer) - - flushBuffer.Reset() - - if err := p.flushStream(flushBuffer); err != nil { - level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) - return - } + if err := p.flushStream(); err != nil { + level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) + return + } - p.lastFlush = time.Now() - }() + p.lastFlush = time.Now() } diff --git a/pkg/dataobj/dataobj.go b/pkg/dataobj/dataobj.go index c5a12769ff26d..b5787ef01193f 100644 --- a/pkg/dataobj/dataobj.go +++ b/pkg/dataobj/dataobj.go @@ -85,7 +85,9 @@ import ( // An Object is a representation of a data object. type Object struct { - dec *decoder + rr rangeReader + dec *decoder + size int64 metadata *filemd.Metadata sections []*Section @@ -95,8 +97,14 @@ type Object struct { // FromBucket returns an error if the metadata of the Object cannot be read or // if the provided ctx times out. func FromBucket(ctx context.Context, bucket objstore.BucketReader, path string) (*Object, error) { - dec := &decoder{rr: &bucketRangeReader{bucket: bucket, path: path}} - obj := &Object{dec: dec} + rr := &bucketRangeReader{bucket: bucket, path: path} + size, err := rr.Size(ctx) + if err != nil { + return nil, fmt.Errorf("getting size: %w", err) + } + + dec := &decoder{rr: rr} + obj := &Object{rr: rr, dec: dec, size: size} if err := obj.init(ctx); err != nil { return nil, err } @@ -107,8 +115,9 @@ func FromBucket(ctx context.Context, bucket objstore.BucketReader, path string) // specifies the size of the data object in bytes. FromReaderAt returns an // error if the metadata of the Object cannot be read. func FromReaderAt(r io.ReaderAt, size int64) (*Object, error) { - dec := &decoder{rr: &readerAtRangeReader{size: size, r: r}} - obj := &Object{dec: dec} + rr := &readerAtRangeReader{size: size, r: r} + dec := &decoder{rr: rr} + obj := &Object{rr: rr, dec: dec, size: size} if err := obj.init(context.Background()); err != nil { return nil, err } @@ -139,6 +148,14 @@ func (o *Object) init(ctx context.Context) error { return nil } +// Size returns the size of the data object in bytes. +func (o *Object) Size() int64 { return o.size } + // Sections returns the list of sections available in the Object. The slice of // returned sections must not be mutated. func (o *Object) Sections() Sections { return o.sections } + +// Reader returns a reader for the entire raw data object. +func (o *Object) Reader(ctx context.Context) (io.ReadCloser, error) { + return o.rr.Read(ctx) +} diff --git a/pkg/dataobj/encoder.go b/pkg/dataobj/encoder.go index a79c7175a2605..548b2f5ab5c1a 100644 --- a/pkg/dataobj/encoder.go +++ b/pkg/dataobj/encoder.go @@ -196,9 +196,25 @@ func (enc *encoder) Bytes() int { return enc.data.Len() } +// FlushSize returns the exact number of bytes that would be written upon +// calling [encoder.Flush]. +func (enc *encoder) FlushSize() (int64, error) { + return enc.flush(streamio.Discard) +} + // Flush flushes any buffered data to the underlying writer. After flushing, // enc is reset. func (enc *encoder) Flush(w streamio.Writer) (int64, error) { + size, err := enc.flush(w) + if err != nil { + return size, err + } + + enc.Reset() + return size, nil +} + +func (enc *encoder) flush(w streamio.Writer) (int64, error) { cw := countingWriter{w: w} if enc.data == nil { @@ -241,7 +257,6 @@ func (enc *encoder) Flush(w streamio.Writer) (int64, error) { return cw.count, fmt.Errorf("writing magic tailer: %w", err) } - enc.Reset() return cw.count, nil } diff --git a/pkg/dataobj/index/builder.go b/pkg/dataobj/index/builder.go index 389107f5c69eb..3246bf3060490 100644 --- a/pkg/dataobj/index/builder.go +++ b/pkg/dataobj/index/builder.go @@ -76,9 +76,8 @@ type Builder struct { bufferedEvents map[string][]metastore.ObjectWrittenEvent // Builder initialization - builderCfg indexobj.BuilderConfig - bucket objstore.Bucket - flushBuffer *bytes.Buffer + builderCfg indexobj.BuilderConfig + bucket objstore.Bucket // Metrics metrics *indexBuilderMetrics @@ -137,9 +136,6 @@ func NewIndexBuilder( return nil, fmt.Errorf("failed to register metrics for index builder: %w", err) } - // Allocate a single buffer - flushBuffer := bytes.NewBuffer(make([]byte, int(float64(cfg.BuilderConfig.TargetObjectSize)*1.2))) - // Set up queues to download the next object (I/O bound) while processing the current one (CPU bound) in order to maximize throughput. // Setting the channel buffer sizes caps the total memory usage by only keeping up to 3 objects in memory at a time: One being processed, one fully downloaded and one being downloaded from the queue. downloadQueue := make(chan metastore.ObjectWrittenEvent, cfg.EventsPerIndex) @@ -151,7 +147,6 @@ func NewIndexBuilder( client: eventConsumerClient, logger: logger, bucket: bucket, - flushBuffer: flushBuffer, downloadedObjects: downloadedObjects, downloadQueue: downloadQueue, metrics: metrics, @@ -308,37 +303,59 @@ func (p *Builder) buildIndex(events []metastore.ObjectWrittenEvent) error { return processingErrors.Err() } - p.flushBuffer.Reset() - stats, err := p.calculator.Flush(p.flushBuffer) + minTime, maxTime := p.calculator.TimeRange() + obj, closer, err := p.calculator.Flush() if err != nil { return fmt.Errorf("failed to flush builder: %w", err) } + defer closer.Close() + + key, err := ObjectKey(p.ctx, events[0].Tenant, obj) + if err != nil { + return fmt.Errorf("failed to generate object key: %w", err) + } - size := p.flushBuffer.Len() + reader, err := obj.Reader(p.ctx) + if err != nil { + return fmt.Errorf("failed to read object: %w", err) + } + defer reader.Close() - key := ObjectKey(events[0].Tenant, p.flushBuffer) - if err := indexStorageBucket.Upload(p.ctx, key, p.flushBuffer); err != nil { + if err := indexStorageBucket.Upload(p.ctx, key, reader); err != nil { return fmt.Errorf("failed to upload index: %w", err) } metastoreUpdater := metastore.NewUpdater(p.mCfg.Updater, indexStorageBucket, events[0].Tenant, p.logger) - if stats.MinTimestamp.IsZero() || stats.MaxTimestamp.IsZero() { + if minTime.IsZero() || maxTime.IsZero() { return errors.New("failed to get min/max timestamps") } - if err := metastoreUpdater.Update(p.ctx, key, stats.MinTimestamp, stats.MaxTimestamp); err != nil { + if err := metastoreUpdater.Update(p.ctx, key, minTime, maxTime); err != nil { return fmt.Errorf("failed to update metastore: %w", err) } - level.Info(p.logger).Log("msg", "finished building index", "tenant", events[0].Tenant, "events", len(events), "size", size, "duration", time.Since(start)) + level.Info(p.logger).Log("msg", "finished building index", "tenant", events[0].Tenant, "events", len(events), "size", obj.Size(), "duration", time.Since(start)) return nil } // ObjectKey determines the key in object storage to upload the object to, based on our path scheme. -func ObjectKey(tenantID string, object *bytes.Buffer) string { - sum := sha256.Sum224(object.Bytes()) +func ObjectKey(ctx context.Context, tenantID string, object *dataobj.Object) (string, error) { + h := sha256.New224() + + reader, err := object.Reader(ctx) + if err != nil { + return "", err + } + defer reader.Close() + + if _, err := io.Copy(h, reader); err != nil { + return "", err + } + + var sumBytes [sha256.Size224]byte + sum := h.Sum(sumBytes[:]) sumStr := hex.EncodeToString(sum[:]) - return fmt.Sprintf("tenant-%s/indexes/%s/%s", tenantID, sumStr[:2], sumStr[2:]) + return fmt.Sprintf("tenant-%s/indexes/%s/%s", tenantID, sumStr[:2], sumStr[2:]), nil } func (p *Builder) commitRecords(record *kgo.Record) error { diff --git a/pkg/dataobj/index/builder_test.go b/pkg/dataobj/index/builder_test.go index b767e68008437..fba77ddbc19f8 100644 --- a/pkg/dataobj/index/builder_test.go +++ b/pkg/dataobj/index/builder_test.go @@ -54,11 +54,15 @@ func buildLogObject(t *testing.T, app string, path string, bucket objstore.Bucke require.NoError(t, err) } - buf := bytes.NewBuffer(nil) - _, err = candidate.Flush(buf) + obj, closer, err := candidate.Flush() require.NoError(t, err) + defer closer.Close() - err = bucket.Upload(context.Background(), path, buf) + reader, err := obj.Reader(t.Context()) + require.NoError(t, err) + defer reader.Close() + + err = bucket.Upload(t.Context(), path, reader) require.NoError(t, err) } diff --git a/pkg/dataobj/index/calculate.go b/pkg/dataobj/index/calculate.go index 507c95d2ee090..b8e26b06ce2ad 100644 --- a/pkg/dataobj/index/calculate.go +++ b/pkg/dataobj/index/calculate.go @@ -1,7 +1,6 @@ package index import ( - "bytes" "context" "errors" "fmt" @@ -41,8 +40,12 @@ func (c *Calculator) Reset() { clear(c.indexStreamIDLookup) } -func (c *Calculator) Flush(buffer *bytes.Buffer) (indexobj.FlushStats, error) { - return c.indexobjBuilder.Flush(buffer) +func (c *Calculator) TimeRange() (minTime, maxTime time.Time) { + return c.indexobjBuilder.TimeRange() +} + +func (c *Calculator) Flush() (*dataobj.Object, io.Closer, error) { + return c.indexobjBuilder.Flush() } // Calculate reads the log data from the input logs object and appends the resulting indexes to calculator's builder. diff --git a/pkg/dataobj/index/calculate_test.go b/pkg/dataobj/index/calculate_test.go index b6e80b02478fb..d6123b9e6ed05 100644 --- a/pkg/dataobj/index/calculate_test.go +++ b/pkg/dataobj/index/calculate_test.go @@ -1,7 +1,6 @@ package index import ( - "bytes" "context" "errors" "fmt" @@ -92,12 +91,9 @@ func createTestLogObject(t *testing.T) *dataobj.Object { require.NoError(t, err) } - buf := bytes.NewBuffer(nil) - _, err = builder.Flush(buf) - require.NoError(t, err) - - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + obj, closer, err := builder.Flush() require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) return obj } @@ -117,17 +113,16 @@ func TestCalculator_Calculate(t *testing.T) { } // Verify we can flush the results - buf := bytes.NewBuffer(nil) - stats, err := calculator.Flush(buf) + minTime, maxTime := calculator.TimeRange() + obj, closer, err := calculator.Flush() require.NoError(t, err) - require.Greater(t, buf.Len(), 0) - require.False(t, stats.MinTimestamp.IsZero()) - require.Equal(t, stats.MinTimestamp, time.Unix(10, 0).UTC()) - require.False(t, stats.MaxTimestamp.IsZero()) - require.Equal(t, stats.MaxTimestamp, time.Unix(25, 0).UTC()) + defer closer.Close() - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) - require.NoError(t, err) + require.Greater(t, obj.Size(), int64(0)) + require.False(t, minTime.IsZero()) + require.Equal(t, minTime, time.Unix(10, 0).UTC()) + require.False(t, maxTime.IsZero()) + require.Equal(t, maxTime, time.Unix(25, 0).UTC()) // Confirm we have multiple pointers sections count := obj.Sections().Count(pointers.CheckSection) diff --git a/pkg/dataobj/index/indexobj/builder.go b/pkg/dataobj/index/indexobj/builder.go index f11d2794c7155..0549005919da6 100644 --- a/pkg/dataobj/index/indexobj/builder.go +++ b/pkg/dataobj/index/indexobj/builder.go @@ -2,11 +2,11 @@ package indexobj import ( - "bytes" "context" "errors" "flag" "fmt" + "io" "time" "github.com/grafana/dskit/flagext" @@ -329,28 +329,25 @@ func (b *Builder) estimatedSize() int { return size } -type FlushStats struct { - MinTimestamp time.Time - MaxTimestamp time.Time +// TimeRange returns the time range of the data in the builder. +func (b *Builder) TimeRange() (minTime, maxTime time.Time) { + return b.streams.TimeRange() } // Flush flushes all buffered data to the buffer provided. Calling Flush can result // in a no-op if there is no buffered data to flush. // -// [Builder.Reset] is called after a successful Flush to discard any pending data and allow new data to be appended. -func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) { +// [Builder.Reset] is called after a successful Flush to discard any pending +// data and allow new data to be appended. +func (b *Builder) Flush() (*dataobj.Object, io.Closer, error) { if b.state == builderStateEmpty { - return FlushStats{}, ErrBuilderEmpty + return nil, nil, ErrBuilderEmpty } b.metrics.flushTotal.Inc() timer := prometheus.NewTimer(b.metrics.buildTime) defer timer.ObserveDuration() - // Appending sections resets them, so we need to load the time range before - // appending. - minTime, maxTime := b.streams.TimeRange() - // Flush sections one more time in case they have data. var flushErrors []error @@ -360,34 +357,20 @@ func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) { if err := errors.Join(flushErrors...); err != nil { b.metrics.flushFailures.Inc() - return FlushStats{}, fmt.Errorf("building object: %w", err) - } - - sz, err := b.builder.Flush(output) - if err != nil { - b.metrics.flushFailures.Inc() - return FlushStats{}, fmt.Errorf("building object: %w", err) + return nil, nil, fmt.Errorf("building object: %w", err) } - b.metrics.builtSize.Observe(float64(sz)) - - var ( - // We don't know if output was empty before calling Flush, so we only start - // reading from where we know writing began. - - objReader = bytes.NewReader(output.Bytes()[output.Len()-int(sz):]) - objLength = sz - ) - obj, err := dataobj.FromReaderAt(objReader, objLength) + obj, closer, err := b.builder.Flush() if err != nil { b.metrics.flushFailures.Inc() - return FlushStats{}, fmt.Errorf("failed to create readable object: %w", err) + return nil, nil, fmt.Errorf("building object: %w", err) } + b.metrics.builtSize.Observe(float64(obj.Size())) err = b.observeObject(context.Background(), obj) b.Reset() - return FlushStats{MinTimestamp: minTime, MaxTimestamp: maxTime}, err + return obj, closer, err } func (b *Builder) observeObject(ctx context.Context, obj *dataobj.Object) error { diff --git a/pkg/dataobj/index/indexobj/builder_test.go b/pkg/dataobj/index/indexobj/builder_test.go index a1f61495d51f1..1de1629f45b69 100644 --- a/pkg/dataobj/index/indexobj/builder_test.go +++ b/pkg/dataobj/index/indexobj/builder_test.go @@ -1,7 +1,6 @@ package indexobj import ( - "bytes" "context" "errors" "fmt" @@ -11,7 +10,6 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" - "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" @@ -28,9 +26,6 @@ var testBuilderConfig = BuilderConfig{ } func TestBuilder(t *testing.T) { - buf := bytes.NewBuffer(nil) - dirtyBuf := bytes.NewBuffer([]byte("dirty")) - testStreams := []streams.Stream{ { ID: 1, @@ -70,30 +65,6 @@ func TestBuilder(t *testing.T) { builder, err := NewBuilder(testBuilderConfig) require.NoError(t, err) - for _, stream := range testStreams { - _, err := builder.AppendStream(stream) - require.NoError(t, err) - } - for _, pointer := range testPointers { - err := builder.AppendColumnIndex(pointer.Path, pointer.Section, pointer.ColumnName, pointer.ColumnIndex, pointer.ValuesBloomFilter) - require.NoError(t, err) - } - _, err = builder.Flush(buf) - require.NoError(t, err) - }) - - t.Run("Read", func(t *testing.T) { - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) - require.NoError(t, err) - require.Equal(t, 1, obj.Sections().Count(streams.CheckSection)) - require.Equal(t, 1, obj.Sections().Count(pointers.CheckSection)) - require.Equal(t, 0, obj.Sections().Count(logs.CheckSection)) - }) - - t.Run("BuildWithDirtyBuffer", func(t *testing.T) { - builder, err := NewBuilder(testBuilderConfig) - require.NoError(t, err) - for _, stream := range testStreams { _, err := builder.AppendStream(stream) require.NoError(t, err) @@ -103,15 +74,10 @@ func TestBuilder(t *testing.T) { require.NoError(t, err) } - _, err = builder.Flush(dirtyBuf) + obj, closer, err := builder.Flush() require.NoError(t, err) + defer closer.Close() - require.Equal(t, buf.Len(), dirtyBuf.Len()-5) - }) - - t.Run("ReadFromDirtyBuffer", func(t *testing.T) { - obj, err := dataobj.FromReaderAt(bytes.NewReader(dirtyBuf.Bytes()[5:]), int64(dirtyBuf.Len()-5)) - require.NoError(t, err) require.Equal(t, 1, obj.Sections().Count(streams.CheckSection)) require.Equal(t, 1, obj.Sections().Count(pointers.CheckSection)) require.Equal(t, 0, obj.Sections().Count(logs.CheckSection)) diff --git a/pkg/dataobj/metastore/metastore_test.go b/pkg/dataobj/metastore/metastore_test.go index 5efb1b8034d2d..5ec389a537277 100644 --- a/pkg/dataobj/metastore/metastore_test.go +++ b/pkg/dataobj/metastore/metastore_test.go @@ -13,8 +13,6 @@ import ( "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/user" "github.com/thanos-io/objstore" - - "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" ) func BenchmarkWriteMetastores(b *testing.B) { @@ -46,9 +44,9 @@ func BenchmarkWriteMetastores(b *testing.B) { // Add test data spanning multiple metastore windows now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC) - flushStats := make([]logsobj.FlushStats, 1000) + stats := make([]flushStats, 1000) for i := 0; i < 1000; i++ { - flushStats[i] = logsobj.FlushStats{ + stats[i] = flushStats{ MinTimestamp: now.Add(-1 * time.Hour).Add(time.Duration(i) * time.Millisecond), MaxTimestamp: now, } @@ -58,7 +56,7 @@ func BenchmarkWriteMetastores(b *testing.B) { t.ReportAllocs() for i := 0; i < t.N; i++ { // Test writing metastores - stats := flushStats[i%len(flushStats)] + stats := stats[i%len(stats)] err := m.Update(ctx, "path", stats.MinTimestamp, stats.MaxTimestamp) require.NoError(t, err) } @@ -96,7 +94,7 @@ func TestWriteMetastores(t *testing.T) { // Add test data spanning multiple metastore windows now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC) - flushStats := logsobj.FlushStats{ + stats := flushStats{ MinTimestamp: now.Add(-1 * time.Hour), MaxTimestamp: now, } @@ -104,7 +102,7 @@ func TestWriteMetastores(t *testing.T) { require.Len(t, bucket.Objects(), 0) // Test writing metastores - err := m.Update(ctx, "test-dataobj-path", flushStats.MinTimestamp, flushStats.MaxTimestamp) + err := m.Update(ctx, "test-dataobj-path", stats.MinTimestamp, stats.MaxTimestamp) require.NoError(t, err) require.Len(t, bucket.Objects(), 1) @@ -113,7 +111,7 @@ func TestWriteMetastores(t *testing.T) { originalSize = len(obj) } - flushResult2 := logsobj.FlushStats{ + flushResult2 := flushStats{ MinTimestamp: now.Add(-15 * time.Minute), MaxTimestamp: now, } @@ -461,3 +459,8 @@ func TestObjectOverlapsRange(t *testing.T) { }) } } + +type flushStats struct { + MinTimestamp time.Time + MaxTimestamp time.Time +} diff --git a/pkg/dataobj/metastore/object_test.go b/pkg/dataobj/metastore/object_test.go index 9e61f1a8c1f7f..5eccc7c9bf0d2 100644 --- a/pkg/dataobj/metastore/object_test.go +++ b/pkg/dataobj/metastore/object_test.go @@ -1,7 +1,6 @@ package metastore import ( - "bytes" "context" "os" "slices" @@ -74,17 +73,16 @@ func (b *testDataBuilder) addStreamAndFlush(stream logproto.Stream) { err := b.builder.Append(stream) require.NoError(b.t, err) - buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) - stats, err := b.builder.Flush(buf) + minTime, maxTime := b.builder.TimeRange() + obj, closer, err := b.builder.Flush() require.NoError(b.t, err) + defer closer.Close() - path, err := b.uploader.Upload(context.Background(), buf) + path, err := b.uploader.Upload(b.t.Context(), obj) require.NoError(b.t, err) - err = b.meta.Update(context.Background(), path, stats.MinTimestamp, stats.MaxTimestamp) + err = b.meta.Update(context.Background(), path, minTime, maxTime) require.NoError(b.t, err) - - b.builder.Reset() } func TestStreamIDs(t *testing.T) { diff --git a/pkg/dataobj/metastore/updater.go b/pkg/dataobj/metastore/updater.go index d1a2691647f04..809a9e1fa354d 100644 --- a/pkg/dataobj/metastore/updater.go +++ b/pkg/dataobj/metastore/updater.go @@ -3,6 +3,7 @@ package metastore import ( "bytes" "context" + stderrors "errors" "io" "strconv" "sync" @@ -171,16 +172,19 @@ func (m *Updater) Update(ctx context.Context, dataobjPath string, minTimestamp, return nil, errors.Wrap(err, "appending to metastore builder") } - m.buf.Reset() + var ( + obj *dataobj.Object + closer io.Closer + ) switch ty { case StorageFormatTypeV1: - _, err = m.metastoreBuilder.Flush(m.buf) + obj, closer, err = m.metastoreBuilder.Flush() if err != nil { return nil, errors.Wrap(err, "flushing metastore builder") } case StorageFormatTypeV2: - _, err = m.builder.Flush(m.buf) + obj, closer, err = m.builder.Flush() if err != nil { return nil, errors.Wrap(err, "flushing metastore builder") } @@ -188,8 +192,24 @@ func (m *Updater) Update(ctx context.Context, dataobjPath string, minTimestamp, return nil, errors.New("unknown metastore top-level object type") } + reader, err := obj.Reader(ctx) + if err != nil { + _ = closer.Close() + return nil, err + } + encodingDuration.ObserveDuration() - return io.NopCloser(m.buf), nil + return &wrappedReadCloser{ + rc: reader, + OnClose: func() error { + // We must close our object reader before closing the object + // itself. + var errs []error + errs = append(errs, reader.Close()) + errs = append(errs, closer.Close()) + return stderrors.Join(errs...) + }, + }, nil }) if err == nil { level.Info(m.logger).Log("msg", "successfully merged & updated metastore", "metastore", metastorePath) @@ -206,6 +226,24 @@ func (m *Updater) Update(ctx context.Context, dataobjPath string, minTimestamp, return err } +// wrappedReadCloser wraps an io.ReadCloser and calls OnClose when Close is +// called. wrappedReadCloser will not close rc on Close is OnClose is defined. +type wrappedReadCloser struct { + rc io.ReadCloser + OnClose func() error +} + +func (w *wrappedReadCloser) Read(p []byte) (int, error) { + return w.rc.Read(p) +} + +func (w *wrappedReadCloser) Close() error { + if w.OnClose != nil { + return w.OnClose() + } + return w.rc.Close() +} + func (m *Updater) append(ty StorageFormatType, dataobjPath string, minTimestamp, maxTimestamp time.Time) error { switch ty { // Backwards compatibility with old metastore top-level objects. diff --git a/pkg/dataobj/metastore/updater_test.go b/pkg/dataobj/metastore/updater_test.go index e51dd5afa02a8..c7ec5e2832e79 100644 --- a/pkg/dataobj/metastore/updater_test.go +++ b/pkg/dataobj/metastore/updater_test.go @@ -40,11 +40,11 @@ func TestUpdater(t *testing.T) { }) require.NoError(t, err) - var buf bytes.Buffer - _, err = builder.Flush(&buf) + obj, closer, err := builder.Flush() require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) - bucket := newInMemoryBucket(t, tenantID, unixTime(0), &buf) + bucket := newInMemoryBucket(t, tenantID, unixTime(0), obj) builder.Reset() updater := newUpdater(t, tenantID, bucket, builder, nil) @@ -66,11 +66,11 @@ func TestUpdater(t *testing.T) { err = builder.AppendIndexPointer("testdata/metastore.obj", unixTime(10), unixTime(20)) require.NoError(t, err) - var buf bytes.Buffer - _, err = builder.Flush(&buf) + obj, closer, err := builder.Flush() require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) - bucket := newInMemoryBucket(t, tenantID, unixTime(0), &buf) + bucket := newInMemoryBucket(t, tenantID, unixTime(0), obj) builder.Reset() updater := newUpdater(t, tenantID, bucket, nil, builder) @@ -134,7 +134,7 @@ func newUpdater(t *testing.T, tenantID string, bucket objstore.Bucket, v1 *logso return updater } -func newInMemoryBucket(t *testing.T, tenantID string, window time.Time, buf *bytes.Buffer) objstore.Bucket { +func newInMemoryBucket(t *testing.T, tenantID string, window time.Time, obj *dataobj.Object) objstore.Bucket { t.Helper() var ( @@ -142,9 +142,12 @@ func newInMemoryBucket(t *testing.T, tenantID string, window time.Time, buf *byt path = metastorePath(tenantID, window) ) - if buf != nil && buf.Len() > 0 { - err := bucket.Upload(context.Background(), path, buf) + if obj != nil && obj.Size() > 0 { + reader, err := obj.Reader(t.Context()) require.NoError(t, err) + defer reader.Close() + + require.NoError(t, bucket.Upload(t.Context(), path, reader)) } return bucket diff --git a/pkg/dataobj/querier/store_test.go b/pkg/dataobj/querier/store_test.go index e71e45afe26f4..6b9bfa437d4ec 100644 --- a/pkg/dataobj/querier/store_test.go +++ b/pkg/dataobj/querier/store_test.go @@ -1,7 +1,6 @@ package querier import ( - "bytes" "cmp" "context" "os" @@ -560,16 +559,17 @@ func (b *testDataBuilder) addStream(labels string, entries ...logproto.Entry) { } func (b *testDataBuilder) flush() { - buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) - stats, err := b.builder.Flush(buf) + minTime, maxTime := b.builder.TimeRange() + obj, closer, err := b.builder.Flush() require.NoError(b.t, err) + defer closer.Close() // Upload the data object using the uploader - path, err := b.uploader.Upload(context.Background(), buf) + path, err := b.uploader.Upload(b.t.Context(), obj) require.NoError(b.t, err) // Update metastore with the new data object - err = b.meta.Update(context.Background(), path, stats.MinTimestamp, stats.MaxTimestamp) + err = b.meta.Update(context.Background(), path, minTime, maxTime) require.NoError(b.t, err) b.builder.Reset() diff --git a/pkg/dataobj/range_reader.go b/pkg/dataobj/range_reader.go index 65452399330e2..f262dddbeef47 100644 --- a/pkg/dataobj/range_reader.go +++ b/pkg/dataobj/range_reader.go @@ -14,8 +14,12 @@ type rangeReader interface { // Size returns the full size of the object. Size(ctx context.Context) (int64, error) + // Read returns a reader over the entire object. Callers may create multiple + // concurrent instances of Read. + Read(ctx context.Context) (io.ReadCloser, error) + // ReadRange returns a reader over a range of bytes. Callers may create - // multiple current instance of ReadRange. + // multiple concurrent instances of ReadRange. ReadRange(ctx context.Context, offset int64, length int64) (io.ReadCloser, error) } @@ -32,6 +36,10 @@ func (rr *bucketRangeReader) Size(ctx context.Context) (int64, error) { return attrs.Size, nil } +func (rr *bucketRangeReader) Read(ctx context.Context) (io.ReadCloser, error) { + return rr.bucket.Get(ctx, rr.path) +} + func (rr *bucketRangeReader) ReadRange(ctx context.Context, offset int64, length int64) (io.ReadCloser, error) { return rr.bucket.GetRange(ctx, rr.path, offset, length) } @@ -45,6 +53,10 @@ func (rr *readerAtRangeReader) Size(_ context.Context) (int64, error) { return rr.size, nil } +func (rr *readerAtRangeReader) Read(_ context.Context) (io.ReadCloser, error) { + return io.NopCloser(io.NewSectionReader(rr.r, 0, rr.size)), nil +} + func (rr *readerAtRangeReader) ReadRange(_ context.Context, offset int64, length int64) (io.ReadCloser, error) { if length > math.MaxInt { return nil, fmt.Errorf("length too large: %d", length) diff --git a/pkg/dataobj/sections/indexpointers/builder_test.go b/pkg/dataobj/sections/indexpointers/builder_test.go index 0a10dfa85e107..7fe05ec9cf0fa 100644 --- a/pkg/dataobj/sections/indexpointers/builder_test.go +++ b/pkg/dataobj/sections/indexpointers/builder_test.go @@ -1,7 +1,6 @@ package indexpointers import ( - "bytes" "context" "testing" "time" @@ -28,13 +27,13 @@ func TestBuilder(t *testing.T) { ib.Append(p.path, p.start, p.end) } - var buf bytes.Buffer b := dataobj.NewBuilder() err := b.Append(ib) require.NoError(t, err) - _, err = b.Flush(&buf) + obj, closer, err := b.Flush() require.NoError(t, err) + defer closer.Close() expect := []IndexPointer{ { @@ -49,10 +48,6 @@ func TestBuilder(t *testing.T) { }, } - bufBytes := buf.Bytes() - obj, err := dataobj.FromReaderAt(bytes.NewReader(bufBytes), int64(len(bufBytes))) - require.NoError(t, err) - var actual []IndexPointer for result := range Iter(context.Background(), obj) { pointer, err := result.Value() diff --git a/pkg/dataobj/sections/indexpointers/row_reader_test.go b/pkg/dataobj/sections/indexpointers/row_reader_test.go index 26af9519bfaf0..2d718d7eef712 100644 --- a/pkg/dataobj/sections/indexpointers/row_reader_test.go +++ b/pkg/dataobj/sections/indexpointers/row_reader_test.go @@ -1,7 +1,6 @@ package indexpointers import ( - "bytes" "context" "errors" "io" @@ -37,16 +36,12 @@ func buildIndexPointersDecoder(t *testing.T, pageSize int) *Section { s.Append(d.Path, d.StartTs, d.EndTs) } - var buf bytes.Buffer - builder := dataobj.NewBuilder() require.NoError(t, builder.Append(s)) - _, err := builder.Flush(&buf) - require.NoError(t, err) - - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + obj, closer, err := builder.Flush() require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) sec, err := Open(t.Context(), obj.Sections()[0]) require.NoError(t, err) diff --git a/pkg/dataobj/sections/logs/builder_test.go b/pkg/dataobj/sections/logs/builder_test.go index 5e01683c7d700..d4bf84a4ee271 100644 --- a/pkg/dataobj/sections/logs/builder_test.go +++ b/pkg/dataobj/sections/logs/builder_test.go @@ -1,8 +1,8 @@ package logs_test import ( - "bytes" "context" + "io" "testing" "time" @@ -46,8 +46,9 @@ func Test(t *testing.T) { tracker.Append(record) } - buf, err := buildObject(tracker) + obj, closer, err := buildObject(tracker) require.NoError(t, err) + defer closer.Close() // The order of records should be sorted by timestamp DESC then stream ID, and all // metadata should be sorted by key then value. @@ -72,9 +73,6 @@ func Test(t *testing.T) { }, } - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf), int64(len(buf))) - require.NoError(t, err) - i := 0 for result := range logs.Iter(context.Background(), obj) { record, err := result.Value() @@ -84,14 +82,10 @@ func Test(t *testing.T) { } } -func buildObject(lt *logs.Builder) ([]byte, error) { - var buf bytes.Buffer - +func buildObject(lt *logs.Builder) (*dataobj.Object, io.Closer, error) { builder := dataobj.NewBuilder() if err := builder.Append(lt); err != nil { - return nil, err - } else if _, err := builder.Flush(&buf); err != nil { - return nil, err + return nil, nil, err } - return buf.Bytes(), nil + return builder.Flush() } diff --git a/pkg/dataobj/sections/logs/reader_test.go b/pkg/dataobj/sections/logs/reader_test.go index e5b621bb6b26b..a07381d411ec3 100644 --- a/pkg/dataobj/sections/logs/reader_test.go +++ b/pkg/dataobj/sections/logs/reader_test.go @@ -102,12 +102,9 @@ func buildSection(t *testing.T, recs []logs.Record) *logs.Section { objectBuilder := dataobj.NewBuilder() require.NoError(t, objectBuilder.Append(sectionBuilder)) - var buf bytes.Buffer - _, err := objectBuilder.Flush(&buf) - require.NoError(t, err) - - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + obj, closer, err := objectBuilder.Flush() require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) sec, err := logs.Open(t.Context(), obj.Sections()[0]) require.NoError(t, err) diff --git a/pkg/dataobj/sections/logs/row_reader_test.go b/pkg/dataobj/sections/logs/row_reader_test.go index 67e78316cd6fc..bd33b11d6c5fa 100644 --- a/pkg/dataobj/sections/logs/row_reader_test.go +++ b/pkg/dataobj/sections/logs/row_reader_test.go @@ -1,7 +1,6 @@ package logs import ( - "bytes" "context" "slices" "testing" @@ -50,15 +49,13 @@ func buildSection(t *testing.T) *Section { Line: []byte("test2"), }) - out := bytes.NewBuffer(nil) b := dataobj.NewBuilder() err := b.Append(logsBuilder) require.NoError(t, err) - _, err = b.Flush(out) - require.NoError(t, err) - obj, err := dataobj.FromReaderAt(bytes.NewReader(out.Bytes()), int64(out.Len())) + obj, closer, err := b.Flush() require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) var logsSection *Section for _, section := range obj.Sections() { diff --git a/pkg/dataobj/sections/pointers/builder_test.go b/pkg/dataobj/sections/pointers/builder_test.go index 26bed108a6b3b..1c75f32e3206b 100644 --- a/pkg/dataobj/sections/pointers/builder_test.go +++ b/pkg/dataobj/sections/pointers/builder_test.go @@ -1,8 +1,8 @@ package pointers import ( - "bytes" "context" + "io" "testing" "time" @@ -36,8 +36,9 @@ func TestAddingStreams(t *testing.T) { tracker.ObserveStream(tc.path, tc.section, tc.streamIDInObject, tc.streamID, tc.maxTimestamp, 0) } - buf, err := buildObject(tracker) + obj, closer, err := buildObject(tracker) require.NoError(t, err) + defer closer.Close() expect := []SectionPointer{ { @@ -72,9 +73,6 @@ func TestAddingStreams(t *testing.T) { }, } - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf), int64(len(buf))) - require.NoError(t, err) - var actual []SectionPointer for result := range Iter(context.Background(), obj) { pointer, err := result.Value() @@ -105,8 +103,9 @@ func TestAddingColumnIndexes(t *testing.T) { tracker.RecordColumnIndex(tc.path, tc.section, tc.columnName, tc.columnIndex, tc.valuesBloomFilter) } - buf, err := buildObject(tracker) + obj, closer, err := buildObject(tracker) require.NoError(t, err) + defer closer.Close() expect := []SectionPointer{ { @@ -139,9 +138,6 @@ func TestAddingColumnIndexes(t *testing.T) { }, } - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf), int64(len(buf))) - require.NoError(t, err) - var actual []SectionPointer for result := range Iter(context.Background(), obj) { pointer, err := result.Value() @@ -152,14 +148,10 @@ func TestAddingColumnIndexes(t *testing.T) { require.Equal(t, expect, actual) } -func buildObject(st *Builder) ([]byte, error) { - var buf bytes.Buffer - +func buildObject(st *Builder) (*dataobj.Object, io.Closer, error) { builder := dataobj.NewBuilder() if err := builder.Append(st); err != nil { - return nil, err - } else if _, err := builder.Flush(&buf); err != nil { - return nil, err + return nil, nil, err } - return buf.Bytes(), nil + return builder.Flush() } diff --git a/pkg/dataobj/sections/pointers/row_reader_test.go b/pkg/dataobj/sections/pointers/row_reader_test.go index 4b48493cc43dd..dd1031212e296 100644 --- a/pkg/dataobj/sections/pointers/row_reader_test.go +++ b/pkg/dataobj/sections/pointers/row_reader_test.go @@ -1,7 +1,6 @@ package pointers import ( - "bytes" "context" "errors" "io" @@ -44,16 +43,12 @@ func buildPointersDecoder(t *testing.T, pageSize int) *Section { } } - var buf bytes.Buffer - builder := dataobj.NewBuilder() require.NoError(t, builder.Append(s)) - _, err := builder.Flush(&buf) - require.NoError(t, err) - - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + obj, closer, err := builder.Flush() require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) sec, err := Open(t.Context(), obj.Sections()[0]) require.NoError(t, err) diff --git a/pkg/dataobj/sections/streams/builder_test.go b/pkg/dataobj/sections/streams/builder_test.go index 3f85848feea55..cd7c90b4f2344 100644 --- a/pkg/dataobj/sections/streams/builder_test.go +++ b/pkg/dataobj/sections/streams/builder_test.go @@ -1,8 +1,8 @@ package streams_test import ( - "bytes" "context" + "io" "strings" "testing" "time" @@ -33,8 +33,9 @@ func Test(t *testing.T) { tracker.Record(tc.Labels, tc.Time, tc.Size) } - buf, err := buildObject(tracker) + obj, closer, err := buildObject(tracker) require.NoError(t, err) + defer closer.Close() expect := []streams.Stream{ { @@ -55,9 +56,6 @@ func Test(t *testing.T) { }, } - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf), int64(len(buf))) - require.NoError(t, err) - var actual []streams.Stream for result := range streams.Iter(context.Background(), obj) { stream, err := result.Value() @@ -80,14 +78,10 @@ func copyLabels(in labels.Labels) labels.Labels { return builder.Labels() } -func buildObject(st *streams.Builder) ([]byte, error) { - var buf bytes.Buffer - +func buildObject(st *streams.Builder) (*dataobj.Object, io.Closer, error) { builder := dataobj.NewBuilder() if err := builder.Append(st); err != nil { - return nil, err - } else if _, err := builder.Flush(&buf); err != nil { - return nil, err + return nil, nil, err } - return buf.Bytes(), nil + return builder.Flush() } diff --git a/pkg/dataobj/sections/streams/row_reader_test.go b/pkg/dataobj/sections/streams/row_reader_test.go index c5ed05398ed23..7660209131feb 100644 --- a/pkg/dataobj/sections/streams/row_reader_test.go +++ b/pkg/dataobj/sections/streams/row_reader_test.go @@ -1,7 +1,6 @@ package streams_test import ( - "bytes" "context" "errors" "io" @@ -89,16 +88,12 @@ func buildStreamsSection(t *testing.T, pageSize int) *streams.Section { s.Record(d.Labels, d.Timestamp, d.UncompressedSize) } - var buf bytes.Buffer - builder := dataobj.NewBuilder() require.NoError(t, builder.Append(s)) - _, err := builder.Flush(&buf) - require.NoError(t, err) - - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + obj, closer, err := builder.Flush() require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) sec, err := streams.Open(t.Context(), obj.Sections()[0]) require.NoError(t, err) diff --git a/pkg/dataobj/uploader/uploader.go b/pkg/dataobj/uploader/uploader.go index a43dfc2d3c0e0..a55c75ea27a6f 100644 --- a/pkg/dataobj/uploader/uploader.go +++ b/pkg/dataobj/uploader/uploader.go @@ -1,12 +1,12 @@ package uploader import ( - "bytes" "context" "crypto/sha256" "encoding/hex" "flag" "fmt" + "io" "time" "github.com/go-kit/log" @@ -14,6 +14,8 @@ import ( "github.com/grafana/dskit/backoff" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/dataobj" ) type Config struct { @@ -62,21 +64,39 @@ func (d *Uploader) UnregisterMetrics(reg prometheus.Registerer) { } // getKey determines the key in object storage to upload the object to, based on our path scheme. -func (d *Uploader) getKey(object *bytes.Buffer) string { - sum := sha256.Sum224(object.Bytes()) - sumStr := hex.EncodeToString(sum[:]) +func (d *Uploader) getKey(ctx context.Context, object *dataobj.Object) (string, error) { + hash := sha256.New224() + + reader, err := object.Reader(ctx) + if err != nil { + return "", err + } + defer reader.Close() + + if _, err := io.Copy(hash, reader); err != nil { + return "", err + } - return fmt.Sprintf("tenant-%s/objects/%s/%s", d.tenantID, sumStr[:d.SHAPrefixSize], sumStr[d.SHAPrefixSize:]) + var sumBytes [sha256.Size224]byte + sum := hash.Sum(sumBytes[:0]) + sumStr := hex.EncodeToString(sum) + + return fmt.Sprintf("tenant-%s/objects/%s/%s", d.tenantID, sumStr[:d.SHAPrefixSize], sumStr[d.SHAPrefixSize:]), nil } // Upload uploads an object to the configured bucket and returns the key. -func (d *Uploader) Upload(ctx context.Context, object *bytes.Buffer) (string, error) { +func (d *Uploader) Upload(ctx context.Context, object *dataobj.Object) (key string, err error) { start := time.Now() timer := prometheus.NewTimer(d.metrics.uploadTime) defer timer.ObserveDuration() - objectPath := d.getKey(object) + objectPath, err := d.getKey(ctx, object) + if err != nil { + d.metrics.uploadFailures.Inc() + d.metrics.uploadSize.WithLabelValues(statusFailure).Observe(float64(object.Size())) + return "", fmt.Errorf("generating object key: %w", err) + } backoff := backoff.New(ctx, backoff.Config{ MinBackoff: 100 * time.Millisecond, @@ -84,14 +104,21 @@ func (d *Uploader) Upload(ctx context.Context, object *bytes.Buffer) (string, er MaxRetries: 20, }) - size := len(object.Bytes()) + size := object.Size() logger := log.With(d.logger, "key", objectPath, "size", size) - var err error for backoff.Ongoing() { level.Debug(logger).Log("msg", "attempting to upload dataobj to object storage", "attempt", backoff.NumRetries()) - err = d.bucket.Upload(ctx, objectPath, bytes.NewReader(object.Bytes())) + + err = func() error { + reader, err := object.Reader(ctx) + if err != nil { + return err + } + defer reader.Close() + return d.bucket.Upload(ctx, objectPath, reader) + }() if err == nil { break } diff --git a/pkg/engine/executor/dataobjscan_test.go b/pkg/engine/executor/dataobjscan_test.go index fe27964b25204..87f12ac62be61 100644 --- a/pkg/engine/executor/dataobjscan_test.go +++ b/pkg/engine/executor/dataobjscan_test.go @@ -1,7 +1,6 @@ package executor import ( - "bytes" "math" "testing" "time" @@ -357,13 +356,8 @@ func buildDataobj(t testing.TB, streams []logproto.Stream) *dataobj.Object { require.NoError(t, builder.Append(stream)) } - var buf bytes.Buffer - _, err = builder.Flush(&buf) - require.NoError(t, err) - - r := bytes.NewReader(buf.Bytes()) - - obj, err := dataobj.FromReaderAt(r, r.Size()) + obj, closer, err := builder.Flush() require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) return obj } diff --git a/pkg/engine/executor/streams_view_test.go b/pkg/engine/executor/streams_view_test.go index 0ce5c438c91f3..90867f7aa7303 100644 --- a/pkg/engine/executor/streams_view_test.go +++ b/pkg/engine/executor/streams_view_test.go @@ -1,7 +1,6 @@ package executor import ( - "bytes" "iter" "slices" "testing" @@ -148,16 +147,12 @@ func buildStreamsSection(t *testing.T, streamLabels []labels.Labels) *streams.Se objBuilder := dataobj.NewBuilder() require.NoError(t, objBuilder.Append(streamsBuilder), "failed to append streams section") - var buf bytes.Buffer - _, err := objBuilder.Flush(&buf) + obj, closer, err := objBuilder.Flush() require.NoError(t, err, "failed to flush dataobj") - - obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) - require.NoError(t, err, "failed to create dataobj from reader") + t.Cleanup(func() { closer.Close() }) sec, err := streams.Open(t.Context(), obj.Sections()[0]) require.NoError(t, err, "failed to open streams section") - return sec } diff --git a/pkg/logql/bench/store_dataobj.go b/pkg/logql/bench/store_dataobj.go index e870ec2955cfa..3c2e96b4fe398 100644 --- a/pkg/logql/bench/store_dataobj.go +++ b/pkg/logql/bench/store_dataobj.go @@ -133,20 +133,21 @@ func (s *DataObjStore) flush() error { // Reset the buffer s.buf.Reset() - // Flush the builder to the buffer - stats, err := s.builder.Flush(s.buf) + minTime, maxTime := s.builder.TimeRange() + obj, closer, err := s.builder.Flush() if err != nil { return fmt.Errorf("failed to flush builder: %w", err) } + defer closer.Close() // Upload the data object using the uploader - path, err := s.uploader.Upload(context.Background(), s.buf) + path, err := s.uploader.Upload(context.Background(), obj) if err != nil { return fmt.Errorf("failed to upload data object: %w", err) } // Update metastore with the new data object - err = s.meta.Update(context.Background(), path, stats.MinTimestamp, stats.MaxTimestamp) + err = s.meta.Update(context.Background(), path, minTime, maxTime) if err != nil { return fmt.Errorf("failed to update metastore: %w", err) } @@ -177,18 +178,30 @@ func (s *DataObjStore) Close() error { func (s *DataObjStore) buildIndex() error { flushAndUpload := func(calculator *index.Calculator) error { - s.buf.Reset() - stats, err := calculator.Flush(s.buf) + minTime, maxTime := calculator.TimeRange() + obj, closer, err := calculator.Flush() if err != nil { return fmt.Errorf("failed to flush index: %w", err) } - key := index.ObjectKey(s.tenantID, s.buf) - err = s.indexWriterBucket.Upload(context.Background(), key, s.buf) + defer closer.Close() + + key, err := index.ObjectKey(context.Background(), s.tenantID, obj) + if err != nil { + return fmt.Errorf("failed to create object key: %w", err) + } + + reader, err := obj.Reader(context.Background()) + if err != nil { + return fmt.Errorf("failed to create reader for index object: %w", err) + } + defer reader.Close() + + err = s.indexWriterBucket.Upload(context.Background(), key, reader) if err != nil { return fmt.Errorf("failed to upload index: %w", err) } - err = s.indexMetastore.Update(context.Background(), key, stats.MinTimestamp, stats.MaxTimestamp) + err = s.indexMetastore.Update(context.Background(), key, minTime, maxTime) if err != nil { return fmt.Errorf("failed to update metastore: %w", err) }