Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,12 @@ dataobj:
# CLI flag: -dataobj-consumer.section-stripe-merge-limit
[section_stripe_merge_limit: <int> | default = 2]

# The maximum age of a builder before it is flushed. This ensures that
# objects are built at least once per max-age if
# target-builder-memory-limit is not reached. 0 means disabled.
# CLI flag: -dataobj-consumer.max-age
[max_age: <duration> | default = 1h]

uploader:
# The size of the SHA prefix to use for generating object storage keys for
# data objects.
Expand Down
2 changes: 1 addition & 1 deletion pkg/dataobj/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f)

f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", 60*60*time.Second, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes")
f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", 60*time.Minute, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes")
}
46 changes: 43 additions & 3 deletions pkg/dataobj/consumer/logsobj/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"time"

"github.com/coder/quartz"
"github.com/grafana/dskit/flagext"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -61,6 +62,11 @@ type BuilderConfig struct {
// values of MergeSize trade off lower memory overhead for higher time spent
// merging.
SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"`

// MaxAge configures the maximum age of a builder before it is flushed.
// This ensures that objects are built at least once per MaxAge if
// TargetObjectSize is not reached.
MaxAge time.Duration `yaml:"max_age"`
}

// RegisterFlagsWithPrefix registers flags with the given prefix.
Expand All @@ -75,6 +81,8 @@ func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet
f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "The target maximum amount of uncompressed data to hold in sections, for sections that support being limited by size. Uncompressed size is used for consistent I/O and planning.")
f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of logs to buffer in memory before adding into columnar builders, used to reduce CPU load of sorting.")
f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of log section stripes to merge into a section at once. Must be greater than 1.")
f.DurationVar(&cfg.MaxAge, prefix+"max-age", 60*time.Minute, "The maximum age of a builder before it is flushed. This ensures that objects are built at least once per max-age if target-builder-memory-limit is not reached. 0 means disabled.")

}

// Validate validates the BuilderConfig.
Expand Down Expand Up @@ -124,7 +132,11 @@ type Builder struct {
streams *streams.Builder
logs *logs.Builder

state builderState
state builderState
firstAppendedAt, lastAppendedAt time.Time

// Used in tests.
clock quartz.Clock
}

type builderState int
Expand Down Expand Up @@ -166,6 +178,7 @@ func NewBuilder(cfg BuilderConfig) (*Builder, error) {
BufferSize: int(cfg.BufferSize),
StripeMergeLimit: cfg.SectionStripeMergeLimit,
}),
clock: quartz.NewReal(),
}, nil
}

Expand All @@ -185,13 +198,15 @@ func (b *Builder) Append(stream logproto.Stream) error {
return err
}

now := b.clock.Now()
// Check whether the buffer is full before a stream can be appended; this is
// tends to overestimate, but we may still go over our target size.
//
// Since this check only happens after the first call to Append,
// b.currentSizeEstimate will always be updated to reflect the size following
// the previous append.
if b.state != builderStateEmpty && b.currentSizeEstimate+labelsEstimate(ls)+streamSizeEstimate(stream) > int(b.cfg.TargetObjectSize) {
additionalEstimate := labelsEstimate(ls) + streamSizeEstimate(stream)
if b.isFull(additionalEstimate, now) {
return ErrBuilderFull
}

Expand Down Expand Up @@ -223,10 +238,33 @@ func (b *Builder) Append(stream logproto.Stream) error {
}

b.currentSizeEstimate = b.estimatedSize()
b.state = builderStateDirty
if b.state == builderStateEmpty {
b.state = builderStateDirty
b.firstAppendedAt = now
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I specifically chose now instead of the record timestamp. If we used the record timestamp, when replaying a partition (i.e. following a crash), we would build one data object per record, which is not what we want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be better implemented at the consumer level, since that's also where we handle the idle timeout. WDYT?

Copy link
Member

@rfratto rfratto Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#18622 exposes the time range for a builder which would make doing this within the consumer possible, but nobody has been able to review it yet

}
b.lastAppendedAt = now
return nil
}

// isFull returns true if the builder is full. The builder is full when either
// the estimated size exceeds the target object size, or the max age is
// exceeded.
//
// TODO(grobinson): Should rename this to shouldFlush, but then we should
// also change the signal from ErrBuilderFull to something else.
func (b *Builder) isFull(additionalEstimate int, now time.Time) bool {
if b.state == builderStateDirty {
if b.currentSizeEstimate+additionalEstimate > int(b.cfg.TargetObjectSize) {
return true
}
// This check is disabled when MaxAge is zero.
if b.cfg.MaxAge > 0 && now.Sub(b.firstAppendedAt) > b.cfg.MaxAge {
return true
}
}
return false
}

func (b *Builder) parseLabels(labelString string) (labels.Labels, error) {
cached, ok := b.labelCache.Get(labelString)
if ok {
Expand Down Expand Up @@ -389,6 +427,8 @@ func (b *Builder) Reset() {
b.metrics.sizeEstimate.Set(0)
b.currentSizeEstimate = 0
b.state = builderStateEmpty
b.firstAppendedAt = time.Time{}
b.lastAppendedAt = time.Time{}
}

// RegisterMetrics registers metrics about builder to report to reg. All
Expand Down
33 changes: 33 additions & 0 deletions pkg/dataobj/consumer/logsobj/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/coder/quartz"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/push"
Expand Down Expand Up @@ -141,3 +142,35 @@ func TestBuilder_Append(t *testing.T) {
require.NoError(t, err)
}
}

// TestBuilder_Append_MaxAge ensures that the buffer eventually reports full
// once max age is reached.
func TestBuilder_Append_MaxAge(t *testing.T) {
clock := quartz.NewMock(t)
cfg := testBuilderConfig
cfg.MaxAge = time.Minute
builder, err := NewBuilder(cfg)
require.NoError(t, err)
builder.clock = clock
stream := logproto.Stream{
Labels: `{cluster="test",app="foo"}`,
Entries: []push.Entry{{
Timestamp: time.Now().UTC(),
Line: "a",
}},
}
require.NoError(t, builder.Append(stream))
// Advance the clock. This should not fail as we have not exceeded the
// max age.
clock.Advance(time.Minute)
require.NoError(t, builder.Append(stream))
// Advance the clock once more, we should now have exceeded the max age.
clock.Advance(time.Second)
require.EqualError(t, builder.Append(stream), "builder full")
// Reset the builder. It should not be full.
builder.Reset()
require.NoError(t, builder.Append(stream))
// Advance the clock, and it should have exceeded the max age again.
clock.Advance(time.Minute + time.Second)
require.EqualError(t, builder.Append(stream), "builder full")
}
6 changes: 6 additions & 0 deletions pkg/dataobj/consumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type partitionOffsetMetrics struct {
// Request counters
commitsTotal prometheus.Counter
appendsTotal prometheus.Counter
flushesTotal prometheus.Counter

latestDelay prometheus.Gauge // Latest delta between record timestamp and current time
processingDelay prometheus.Histogram // Processing delay histogram
Expand All @@ -45,6 +46,10 @@ func newPartitionOffsetMetrics() *partitionOffsetMetrics {
Name: "loki_dataobj_consumer_appends_total",
Help: "Total number of appends",
}),
flushesTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_dataobj_consumer_flushes_total",
Help: "Total number of flushes",
}),
latestDelay: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "loki_dataobj_consumer_latest_processing_delay_seconds",
Help: "Latest time difference bweteen record timestamp and processing time in seconds",
Expand Down Expand Up @@ -87,6 +92,7 @@ func (p *partitionOffsetMetrics) register(reg prometheus.Registerer) error {

p.commitsTotal,
p.appendsTotal,
p.flushesTotal,

p.latestDelay,
p.processingDelay,
Expand Down
2 changes: 2 additions & 0 deletions pkg/dataobj/consumer/partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ func (p *partitionProcessor) initBuilder() error {
}

func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error {
p.metrics.flushesTotal.Inc()

stats, err := p.builder.Flush(flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
Expand Down
Loading