Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [FEATURE] Distributor: Add experimental flag `-validation.label-value-length-over-limit-strategy` to configure how to handle label values over the length limit. #12627 #12844
* [FEATURE] Ingester: Introduce metric `cortex_ingester_owned_target_info_series` for counting the number of owned `target_info` series by tenant. #12681
* [FEATURE] MQE: Add support for experimental `ts_of_min_over_time`, `ts_of_max_over_time`, `ts_of_first_over_time` and `ts_of_last_over_time` PromQL functions. #12819
* [FEATURE] Ingester: Add experimental flags `-ingest-storage.write-logs-fsync-before-kafka-commit-enabled` and `-ingest-storage.write-logs-fsync-before-kafka-commit-concurrency` to fsync write logs before the offset is committed to Kafka. This is enabled by default. #12816
* [ENHANCEMENT] Query-frontend: CLI flag `-query-frontend.enabled-promql-experimental-functions` and its associated YAML configuration is now stable. #12368
* [ENHANCEMENT] Query-scheduler/query-frontend: Add native histogram definitions to `cortex_query_{scheduler|frontend}_queue_duration_seconds`. #12288
* [ENHANCEMENT] Querier: Add native histogram definition to `cortex_bucket_index_load_duration_seconds`. #12094
Expand Down
22 changes: 22 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -8160,6 +8160,28 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "write_logs_fsync_before_kafka_commit_enabled",
"required": false,
"desc": "Enable fsyncing of WAL and WBL before Kafka offsets are committed.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "ingest-storage.write-logs-fsync-before-kafka-commit-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "write_logs_fsync_before_kafka_commit_concurrency",
"required": false,
"desc": "Number of tenants to concurrently fsync WAL and WBL before Kafka offsets are committed. Ignored if -ingest-storage.write-logs-fsync-before-kafka-commit-enabled=false",
"fieldValue": null,
"fieldDefaultValue": 1,
"fieldFlag": "ingest-storage.write-logs-fsync-before-kafka-commit-concurrency",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
4 changes: 4 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,10 @@ Usage of ./cmd/mimir/mimir:
The maximum time a write request that goes through the ingest storage waits before it times out. Set to `0` to disable the timeout.
-ingest-storage.read-consistency string
[experimental] The default consistency level to enforce for queries when using the ingest storage. Supports values: strong, eventual. (default "eventual")
-ingest-storage.write-logs-fsync-before-kafka-commit-concurrency int
[experimental] Number of tenants to concurrently fsync WAL and WBL before Kafka offsets are committed. Ignored if -ingest-storage.write-logs-fsync-before-kafka-commit-enabled=false (default 1)
-ingest-storage.write-logs-fsync-before-kafka-commit-enabled
[experimental] Enable fsyncing of WAL and WBL before Kafka offsets are committed. (default true)
-ingester.active-series-custom-trackers value
Additional active series metrics, matching the provided matchers. Matchers should be in form <name>:<matcher>, like 'foobar:{foo="bar"}'. Multiple matchers can be provided either providing the flag multiple times or providing multiple semicolon-separated values to a single flag.
-ingester.active-series-metrics-enabled
Expand Down
11 changes: 11 additions & 0 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4596,6 +4596,17 @@ migration:
# before it times out. Set to `0` to disable the timeout.
# CLI flag: -ingest-storage.migration.ingest-storage-max-wait-time
[ingest_storage_max_wait_time: <duration> | default = 0s]

# (experimental) Enable fsyncing of WAL and WBL before Kafka offsets are
# committed.
# CLI flag: -ingest-storage.write-logs-fsync-before-kafka-commit-enabled
[write_logs_fsync_before_kafka_commit_enabled: <boolean> | default = true]

# (experimental) Number of tenants to concurrently fsync WAL and WBL before
# Kafka offsets are committed. Ignored if
# -ingest-storage.write-logs-fsync-before-kafka-commit-enabled=false
# CLI flag: -ingest-storage.write-logs-fsync-before-kafka-commit-concurrency
[write_logs_fsync_before_kafka_commit_concurrency: <int> | default = 1]
```

### blocks_storage
Expand Down
2 changes: 2 additions & 0 deletions operations/mimir/mimir-flags-defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,8 @@
"ingest-storage.kafka.ingestion-concurrency-target-flushes-per-shard": 80,
"ingest-storage.kafka.ingestion-concurrency-estimated-bytes-per-sample": 500,
"ingest-storage.migration.ingest-storage-max-wait-time": 0,
"ingest-storage.write-logs-fsync-before-kafka-commit-enabled": true,
"ingest-storage.write-logs-fsync-before-kafka-commit-concurrency": 1,
"blocks-storage.backend": "filesystem",
"blocks-storage.s3.endpoint": "",
"blocks-storage.s3.region": "",
Expand Down
4 changes: 4 additions & 0 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ func (b *TSDBBuilder) newTSDB(tenant tsdbTenant) (*userTSDB, error) {
return udb, nil
}

func (b *TSDBBuilder) NotifyPreCommit(_ context.Context) error {
return nil
}

// Function to upload the blocks.
type blockUploader func(_ context.Context, tenantID, dbDir string, metas []tsdb.BlockMeta) error

Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7192,6 +7192,10 @@ func (c *mockIngesterPusherAdapter) PushToStorageAndReleaseRequest(ctx context.C
return err
}

func (c *mockIngesterPusherAdapter) NotifyPreCommit(_ context.Context) error {
return nil
}

// noopIngester is a mocked ingester which does nothing.
type noopIngester struct {
client.IngesterClient
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ type API interface {
TenantTSDBHandler(http.ResponseWriter, *http.Request)
PrepareInstanceRingDownscaleHandler(http.ResponseWriter, *http.Request)
PushToStorageAndReleaseRequest(context.Context, *mimirpb.WriteRequest) error
NotifyPreCommit(ctx context.Context) error
}
17 changes: 17 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing,
// This is injected already higher up for methods invoked via the network.
// Here we use it so that pushes from kafka also get a tenant assigned since the PartitionReader invokes the ingester.
profilingIngester := NewIngesterProfilingWrapper(i)

i.ingestReader, err = ingest.NewPartitionReaderForPusher(kafkaCfg, i.ingestPartitionID, cfg.IngesterRing.InstanceID, profilingIngester, log.With(logger, "component", "ingest_reader"), registerer)
if err != nil {
return nil, errors.Wrap(err, "creating ingest storage reader")
Expand Down Expand Up @@ -4388,3 +4389,19 @@ func timeUntilCompaction(now time.Time, compactionInterval, zoneOffset time.Dura

return compactionInterval - timeSinceLastCompaction
}

func (i *Ingester) NotifyPreCommit(ctx context.Context) error {
if !i.cfg.IngestStorageConfig.WriteLogsFsyncBeforeKafkaCommit {
return nil
}

level.Debug(i.logger).Log("msg", "fsyncing tsdbs")

return concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.IngestStorageConfig.WriteLogsFsyncBeforeKafkaCommitConcurrency, func(ctx context.Context, userID string) error {
db := i.getTSDB(userID)
if db == nil {
return nil
}
return db.Head().FsyncWLSegments()
})
}
4 changes: 4 additions & 0 deletions pkg/ingester/ingester_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (i *ActivityTrackerWrapper) PushToStorageAndReleaseRequest(ctx context.Cont
return i.ing.PushToStorageAndReleaseRequest(ctx, request)
}

func (i *ActivityTrackerWrapper) NotifyPreCommit(ctx context.Context) error {
return i.ing.NotifyPreCommit(ctx)
}

func (i *ActivityTrackerWrapper) QueryStream(request *client.QueryRequest, server client.Ingester_QueryStreamServer) error {
ix := i.tracker.Insert(func() string {
return requestActivity(server.Context(), "Ingester/QueryStream", request)
Expand Down
4 changes: 4 additions & 0 deletions pkg/ingester/ingester_profiling.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (i *ProfilingWrapper) PushToStorageAndReleaseRequest(ctx context.Context, r
return i.ing.PushToStorageAndReleaseRequest(ctx, request)
}

func (i *ProfilingWrapper) NotifyPreCommit(ctx context.Context) error {
return i.ing.NotifyPreCommit(ctx)
}

func (i *ProfilingWrapper) QueryStream(request *client.QueryRequest, server client.Ingester_QueryStreamServer) error {
ctx := server.Context()
if isTraceSampled(ctx) {
Expand Down
70 changes: 70 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12179,3 +12179,73 @@ var ingesterSampleTypeScenarios = map[string]struct {
},
},
}

func TestIngester_NotifyPreCommit(t *testing.T) {
// Simple test that checks NotifyPreCommit doesn't fail, and fsync count increases when it's called.
cfg := defaultIngesterTestConfig(t)
limits := defaultLimitsTestConfig()
overrides := validation.NewOverrides(limits, nil)

tempDir := t.TempDir()
cfg.BlocksStorageConfig.TSDB.Dir = tempDir
cfg.BlocksStorageConfig.Bucket.Backend = "s3"
cfg.BlocksStorageConfig.Bucket.S3.Endpoint = "localhost"
cfg.IngestStorageConfig.WriteLogsFsyncBeforeKafkaCommit = true

reg := prometheus.NewRegistry()
ingester, err := New(cfg, overrides, createAndStartRing(t, cfg.IngesterRing.ToRingConfig()), nil, nil, nil, reg, log.NewNopLogger())
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester))
defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck

// Push some data to create some TSDBs
for _, u := range []string{"user1", "user2", "user3"} {
ctx := user.InjectOrgID(context.Background(), u)
req := &mimirpb.WriteRequest{
Source: mimirpb.API,
Timeseries: []mimirpb.PreallocTimeseries{
{
TimeSeries: &mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{
{Name: labels.MetricName, Value: "test_metric"},
},
Samples: []mimirpb.Sample{
{Value: 1, TimestampMs: time.Now().UnixMilli()},
{Value: 2, TimestampMs: time.Now().Add(time.Second).UnixMilli()},
{Value: 3, TimestampMs: time.Now().Add(2 * time.Second).UnixMilli()},
},
},
},
},
}

_, err = ingester.Push(ctx, req)
require.NoError(t, err)
}

getFsyncCount := func() uint64 {
metrics, err := reg.Gather()
require.NoError(t, err)
for _, mf := range metrics {
if mf.GetName() == "cortex_ingester_tsdb_wal_fsync_duration_seconds" {
for _, m := range mf.GetMetric() {
if m.GetSummary() != nil {
return m.GetSummary().GetSampleCount()
}
}
}
}
return 0
}

fsyncCountBefore := getFsyncCount()

err = ingester.NotifyPreCommit(context.Background())
require.NoError(t, err)

fsyncCountAfter := getFsyncCount()

// As there are three users, fsync should have been called at least three times
assert.GreaterOrEqual(t, fsyncCountAfter-fsyncCountBefore, uint64(3))
}
5 changes: 5 additions & 0 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,15 @@ type Config struct {
Enabled bool `yaml:"enabled"`
KafkaConfig KafkaConfig `yaml:"kafka"`
Migration MigrationConfig `yaml:"migration"`

WriteLogsFsyncBeforeKafkaCommit bool `yaml:"write_logs_fsync_before_kafka_commit_enabled" category:"experimental"`
WriteLogsFsyncBeforeKafkaCommitConcurrency int `yaml:"write_logs_fsync_before_kafka_commit_concurrency" category:"experimental"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "ingest-storage.enabled", false, "True to enable the ingestion via object storage.")
f.BoolVar(&cfg.WriteLogsFsyncBeforeKafkaCommit, "ingest-storage.write-logs-fsync-before-kafka-commit-enabled", true, "Enable fsyncing of WAL and WBL before Kafka offsets are committed.")
f.IntVar(&cfg.WriteLogsFsyncBeforeKafkaCommitConcurrency, "ingest-storage.write-logs-fsync-before-kafka-commit-concurrency", 1, "Number of tenants to concurrently fsync WAL and WBL before Kafka offsets are committed. Ignored if -ingest-storage.write-logs-fsync-before-kafka-commit-enabled=false")

cfg.KafkaConfig.RegisterFlagsWithPrefix("ingest-storage.kafka.", f)
cfg.Migration.RegisterFlagsWithPrefix("ingest-storage.migration.", f)
Expand Down
16 changes: 16 additions & 0 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ import (

type Pusher interface {
PushToStorageAndReleaseRequest(context.Context, *mimirpb.WriteRequest) error
PreCommitNotifier
}

type PreCommitNotifier interface {
// NotifyPreCommit is called before committing a Kafka offset to allow for
// synchronization or cleanup operations. The offset to commit is determined before this call.
// The committer waits for this method to complete before proceeding with the actual
// commit to Kafka.
NotifyPreCommit(ctx context.Context) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you document a little bit how this interface is supposed to be implemented... or perhaps what's a good thing to put behind this interface and then how it's going to be invoked

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done:

// NotifyPreCommit is called before committing a Kafka offset to allow for
// synchronization or cleanup operations. The offset to commit is determined before this call.
// The committer waits for this method to complete before proceeding with the actual
// commit to Kafka.

}

type NoOpPreCommitNotifier struct {
}

func (n *NoOpPreCommitNotifier) NotifyPreCommit(_ context.Context) error {
return nil
}

type PusherCloser interface {
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (p pusherFunc) PushToStorageAndReleaseRequest(ctx context.Context, request
return p(ctx, request)
}

func (p pusherFunc) NotifyPreCommit(_ context.Context) error {
return nil
}

func TestPusherConsumer(t *testing.T) {
const tenantID = "t1"
writeReqs := []*mimirpb.WriteRequest{
Expand Down Expand Up @@ -609,6 +613,11 @@ func (m *mockPusher) PushToStorageAndReleaseRequest(ctx context.Context, request
return args.Error(0)
}

func (m *mockPusher) NotifyPreCommit(_ context.Context) error {
args := m.Called()
return args.Error(0)
}

func TestParallelStorageShards_ShardWriteRequest(t *testing.T) {
testCases := map[string]struct {
shardCount int
Expand Down
29 changes: 22 additions & 7 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type PartitionReader struct {
newConsumer consumerFactory
metrics ReaderMetrics

notifier PreCommitNotifier
committer *partitionCommitter

// consumedOffsetWatcher is used to wait until a given offset has been consumed.
Expand All @@ -102,10 +103,10 @@ func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instan
factory := consumerFactoryFunc(func() RecordConsumer {
return NewPusherConsumer(pusher, kafkaCfg, metrics, logger)
})
return newPartitionReader(kafkaCfg, partitionID, instanceID, factory, logger, reg)
return newPartitionReader(kafkaCfg, partitionID, instanceID, factory, pusher, logger, reg)
}

func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID string, consumer consumerFactory, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) {
func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID string, consumer consumerFactory, notifier PreCommitNotifier, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) {
r := &PartitionReader{
kafkaCfg: kafkaCfg,
partitionID: partitionID,
Expand All @@ -114,8 +115,9 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri
consumedOffsetWatcher: NewPartitionOffsetWatcher(),
concurrentFetchersMinBytesMaxWaitTime: kafkaCfg.FetchMaxWait,
highestConsumedTimestampBeforePartitionEnd: atomic.NewTime(time.Time{}),
logger: log.With(logger, "partition", partitionID),
reg: reg,
notifier: notifier,
logger: log.With(logger, "partition", partitionID),
reg: reg,
}

r.metrics = NewReaderMetrics(reg, r, kafkaCfg.Topic, nil)
Expand Down Expand Up @@ -205,7 +207,7 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) {
}
r.client.Store(client)

r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client.Load()), r.partitionID, r.consumerGroup, r.logger, r.reg)
r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client.Load()), r.partitionID, r.consumerGroup, r.notifier, r.logger, r.reg)

offsetsClient := newPartitionOffsetClient(r.client.Load(), r.kafkaCfg.Topic, r.reg, r.logger)

Expand Down Expand Up @@ -918,7 +920,9 @@ type partitionCommitter struct {
consumerGroup string

toCommit *atomic.Int64
admClient *kadm.Client
admClient AdmClient

notifier PreCommitNotifier

logger log.Logger

Expand All @@ -929,14 +933,19 @@ type partitionCommitter struct {
lastCommittedOffset prometheus.Gauge
}

func newPartitionCommitter(kafkaCfg KafkaConfig, admClient *kadm.Client, partitionID int32, consumerGroup string, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
type AdmClient interface {
CommitOffsets(ctx context.Context, group string, os kadm.Offsets) (kadm.OffsetResponses, error)
}

func newPartitionCommitter(kafkaCfg KafkaConfig, admClient AdmClient, partitionID int32, consumerGroup string, notifier PreCommitNotifier, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
c := &partitionCommitter{
logger: logger,
kafkaCfg: kafkaCfg,
partitionID: partitionID,
consumerGroup: consumerGroup,
toCommit: atomic.NewInt64(-1),
admClient: admClient,
notifier: notifier,

commitRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_offset_commit_requests_total",
Expand Down Expand Up @@ -1001,6 +1010,12 @@ func (r *partitionCommitter) commit(ctx context.Context, offset int64) (returnEr
startTime := time.Now()
r.commitRequestsTotal.Inc()

notifyErr := r.notifier.NotifyPreCommit(ctx)

if notifyErr != nil {
level.Warn(r.logger).Log("msg", "pre-commit notification failed, continuing with commit", "err", notifyErr, "offset", offset)
}

defer func() {
r.commitRequestsLatency.Observe(time.Since(startTime).Seconds())

Expand Down
Loading
Loading