From 28d720aca532ffcd9b8f54dcf8528dfdca97cbe1 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Fri, 1 Aug 2025 15:07:22 +0100 Subject: [PATCH 1/4] feat: add max age to builder --- pkg/dataobj/consumer/config.go | 2 +- pkg/dataobj/consumer/logsobj/builder.go | 46 ++++++++++++++++++-- pkg/dataobj/consumer/logsobj/builder_test.go | 27 ++++++++++++ 3 files changed, 71 insertions(+), 4 deletions(-) diff --git a/pkg/dataobj/consumer/config.go b/pkg/dataobj/consumer/config.go index 2137e1f921ea7..98d2421531a42 100644 --- a/pkg/dataobj/consumer/config.go +++ b/pkg/dataobj/consumer/config.go @@ -29,5 +29,5 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f) - f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", 60*60*time.Second, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes") + f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", 60*time.Minute, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes") } diff --git a/pkg/dataobj/consumer/logsobj/builder.go b/pkg/dataobj/consumer/logsobj/builder.go index db0f78b1503b3..04a9760763989 100644 --- a/pkg/dataobj/consumer/logsobj/builder.go +++ b/pkg/dataobj/consumer/logsobj/builder.go @@ -9,6 +9,7 @@ import ( "fmt" "time" + "github.com/coder/quartz" "github.com/grafana/dskit/flagext" lru "github.com/hashicorp/golang-lru/v2" "github.com/prometheus/client_golang/prometheus" @@ -61,6 +62,11 @@ type BuilderConfig struct { // values of MergeSize trade off lower memory overhead for higher time spent // merging. SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"` + + // MaxAge configures the maximum age of a builder before it is flushed. + // This ensures that objects are built at least once per MaxAge if + // TargetObjectSize is not reached. + MaxAge time.Duration `yaml:"max_age"` } // RegisterFlagsWithPrefix registers flags with the given prefix. @@ -75,6 +81,8 @@ func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "The target maximum amount of uncompressed data to hold in sections, for sections that support being limited by size. Uncompressed size is used for consistent I/O and planning.") f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of logs to buffer in memory before adding into columnar builders, used to reduce CPU load of sorting.") f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of log section stripes to merge into a section at once. Must be greater than 1.") + f.DurationVar(&cfg.MaxAge, prefix+"max-age", 60*time.Minute, "The maximum age of a builder before it is flushed. This ensures that objects are built at least once per max-age if target-builder-memory-limit is not reached. 0 means disabled.") + } // Validate validates the BuilderConfig. @@ -124,7 +132,11 @@ type Builder struct { streams *streams.Builder logs *logs.Builder - state builderState + state builderState + firstAppendedAt, lastAppendedAt time.Time + + // Used in tests. + clock quartz.Clock } type builderState int @@ -166,6 +178,7 @@ func NewBuilder(cfg BuilderConfig) (*Builder, error) { BufferSize: int(cfg.BufferSize), StripeMergeLimit: cfg.SectionStripeMergeLimit, }), + clock: quartz.NewReal(), }, nil } @@ -185,13 +198,15 @@ func (b *Builder) Append(stream logproto.Stream) error { return err } + now := b.clock.Now() // Check whether the buffer is full before a stream can be appended; this is // tends to overestimate, but we may still go over our target size. // // Since this check only happens after the first call to Append, // b.currentSizeEstimate will always be updated to reflect the size following // the previous append. - if b.state != builderStateEmpty && b.currentSizeEstimate+labelsEstimate(ls)+streamSizeEstimate(stream) > int(b.cfg.TargetObjectSize) { + additionalEstimate := labelsEstimate(ls) + streamSizeEstimate(stream) + if b.isFull(additionalEstimate, now) { return ErrBuilderFull } @@ -223,10 +238,33 @@ func (b *Builder) Append(stream logproto.Stream) error { } b.currentSizeEstimate = b.estimatedSize() - b.state = builderStateDirty + if b.state == builderStateEmpty { + b.state = builderStateDirty + b.firstAppendedAt = now + } + b.lastAppendedAt = now return nil } +// isFull returns true if the builder is full. The builder is full when either +// the estimated size exceeds the target object size, or the max age is +// exceeded. +// +// TODO(grobinson): Should rename this to shouldFlush, but then we should +// also change the signal from ErrBuilderFull to something else. +func (b *Builder) isFull(additionalEstimate int, now time.Time) bool { + if b.state == builderStateDirty { + if b.currentSizeEstimate+additionalEstimate > int(b.cfg.TargetObjectSize) { + return true + } + // This check is disabled when MaxAge is zero. + if b.cfg.MaxAge > 0 && now.Sub(b.firstAppendedAt) > b.cfg.MaxAge { + return true + } + } + return false +} + func (b *Builder) parseLabels(labelString string) (labels.Labels, error) { cached, ok := b.labelCache.Get(labelString) if ok { @@ -389,6 +427,8 @@ func (b *Builder) Reset() { b.metrics.sizeEstimate.Set(0) b.currentSizeEstimate = 0 b.state = builderStateEmpty + b.firstAppendedAt = time.Time{} + b.lastAppendedAt = time.Time{} } // RegisterMetrics registers metrics about builder to report to reg. All diff --git a/pkg/dataobj/consumer/logsobj/builder_test.go b/pkg/dataobj/consumer/logsobj/builder_test.go index 5cd6ecff29854..26f7000041c8f 100644 --- a/pkg/dataobj/consumer/logsobj/builder_test.go +++ b/pkg/dataobj/consumer/logsobj/builder_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/coder/quartz" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/push" @@ -141,3 +142,29 @@ func TestBuilder_Append(t *testing.T) { require.NoError(t, err) } } + +// TestBuilder_Append_MaxAge ensures that the buffer eventually reports full +// once max age is reached. +func TestBuilder_Append_MaxAge(t *testing.T) { + clock := quartz.NewMock(t) + cfg := testBuilderConfig + cfg.MaxAge = time.Minute + builder, err := NewBuilder(cfg) + require.NoError(t, err) + builder.clock = clock + stream := logproto.Stream{ + Labels: `{cluster="test",app="foo"}`, + Entries: []push.Entry{{ + Timestamp: time.Now().UTC(), + Line: "a", + }}, + } + require.NoError(t, builder.Append(stream)) + // Advance the clock. This should not fail as we have not exceeded the + // max age. + clock.Advance(time.Minute) + require.NoError(t, builder.Append(stream)) + // Advance the clock once more, we should now have exceeded the max age. + clock.Advance(time.Second) + require.EqualError(t, builder.Append(stream), "builder full") +} From 46258faa53785a74a62a16a5e18ac6bdddd30843 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Fri, 1 Aug 2025 15:10:15 +0100 Subject: [PATCH 2/4] Update docs --- docs/sources/shared/configuration.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 39d5fec791e9e..e80ad499229ef 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1088,6 +1088,12 @@ dataobj: # CLI flag: -dataobj-consumer.section-stripe-merge-limit [section_stripe_merge_limit: | default = 2] + # The maximum age of a builder before it is flushed. This ensures that + # objects are built at least once per max-age if + # target-builder-memory-limit is not reached. 0 means disabled. + # CLI flag: -dataobj-consumer.max-age + [max_age: | default = 1h] + uploader: # The size of the SHA prefix to use for generating object storage keys for # data objects. From 32972d2eaeb0c0ab5f2da8e018f9e381302b3417 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Fri, 1 Aug 2025 15:29:14 +0100 Subject: [PATCH 3/4] Improve tests --- pkg/dataobj/consumer/logsobj/builder_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/dataobj/consumer/logsobj/builder_test.go b/pkg/dataobj/consumer/logsobj/builder_test.go index 26f7000041c8f..c2572d82ac413 100644 --- a/pkg/dataobj/consumer/logsobj/builder_test.go +++ b/pkg/dataobj/consumer/logsobj/builder_test.go @@ -167,4 +167,10 @@ func TestBuilder_Append_MaxAge(t *testing.T) { // Advance the clock once more, we should now have exceeded the max age. clock.Advance(time.Second) require.EqualError(t, builder.Append(stream), "builder full") + // Reset the builder. It should not be full. + builder.Reset() + require.NoError(t, builder.Append(stream)) + // Advance the clock, and it should have exceeded the max age again. + clock.Advance(time.Minute + time.Second) + require.EqualError(t, builder.Append(stream), "builder full") } From 5ad5211ac519f7ae41c233e2e0ea35900af7e3ee Mon Sep 17 00:00:00 2001 From: George Robinson Date: Mon, 4 Aug 2025 14:03:49 +0100 Subject: [PATCH 4/4] Add metric loki_dataobj_consumer_flushes_total --- pkg/dataobj/consumer/metrics.go | 6 ++++++ pkg/dataobj/consumer/partition_processor.go | 2 ++ 2 files changed, 8 insertions(+) diff --git a/pkg/dataobj/consumer/metrics.go b/pkg/dataobj/consumer/metrics.go index 74cb1ff91fd80..5b6cc0040545d 100644 --- a/pkg/dataobj/consumer/metrics.go +++ b/pkg/dataobj/consumer/metrics.go @@ -19,6 +19,7 @@ type partitionOffsetMetrics struct { // Request counters commitsTotal prometheus.Counter appendsTotal prometheus.Counter + flushesTotal prometheus.Counter latestDelay prometheus.Gauge // Latest delta between record timestamp and current time processingDelay prometheus.Histogram // Processing delay histogram @@ -45,6 +46,10 @@ func newPartitionOffsetMetrics() *partitionOffsetMetrics { Name: "loki_dataobj_consumer_appends_total", Help: "Total number of appends", }), + flushesTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_dataobj_consumer_flushes_total", + Help: "Total number of flushes", + }), latestDelay: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "loki_dataobj_consumer_latest_processing_delay_seconds", Help: "Latest time difference bweteen record timestamp and processing time in seconds", @@ -87,6 +92,7 @@ func (p *partitionOffsetMetrics) register(reg prometheus.Registerer) error { p.commitsTotal, p.appendsTotal, + p.flushesTotal, p.latestDelay, p.processingDelay, diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index 5ada61cb07f51..8fd07a4cea9f5 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -195,6 +195,8 @@ func (p *partitionProcessor) initBuilder() error { } func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error { + p.metrics.flushesTotal.Inc() + stats, err := p.builder.Flush(flushBuffer) if err != nil { level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)