Skip to content

Commit b786095

Browse files
authored
chore: Read multi-tenant objects in query-engine (#19104)
1 parent 62cec3e commit b786095

File tree

6 files changed

+85
-32
lines changed

6 files changed

+85
-32
lines changed

pkg/dataobj/metastore/object.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/go-kit/log"
1818
"github.com/go-kit/log/level"
1919
"github.com/grafana/dskit/tenant"
20+
"github.com/grafana/dskit/user"
2021
"github.com/prometheus/client_golang/prometheus"
2122
"github.com/prometheus/prometheus/model/labels"
2223
"github.com/thanos-io/objstore"
@@ -222,6 +223,8 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma
222223
return nil, err
223224
}
224225

226+
level.Debug(m.logger).Log("msg", "resolved index files", "paths", strings.Join(indexPaths, ","))
227+
225228
// Search the stream sections of the matching objects to find matching streams
226229
streamMatchers := streamPredicateFromMatchers(start, end, matchers...)
227230
pointerPredicate := pointers.TimeRangeRowPredicate{
@@ -703,12 +706,19 @@ func (m *ObjectMetastore) listObjects(ctx context.Context, path string, start, e
703706
}
704707

705708
func forEachIndexPointer(ctx context.Context, object *dataobj.Object, predicate indexpointers.RowPredicate, f func(indexpointers.IndexPointer)) error {
709+
targetTenant, err := user.ExtractOrgID(ctx)
710+
if err != nil {
711+
return fmt.Errorf("extracting org ID: %w", err)
712+
}
706713
var reader indexpointers.RowReader
707714
defer reader.Close()
708715

709716
buf := make([]indexpointers.IndexPointer, 1024)
710717

711718
for _, section := range object.Sections().Filter(indexpointers.CheckSection) {
719+
if section.Tenant != targetTenant {
720+
continue
721+
}
712722
sec, err := indexpointers.Open(ctx, section)
713723
if err != nil {
714724
return fmt.Errorf("opening section: %w", err)
@@ -740,13 +750,17 @@ func forEachIndexPointer(ctx context.Context, object *dataobj.Object, predicate
740750
}
741751

742752
func forEachStream(ctx context.Context, object *dataobj.Object, predicate streams.RowPredicate, f func(streams.Stream)) error {
753+
targetTenant, err := user.ExtractOrgID(ctx)
754+
if err != nil {
755+
return fmt.Errorf("extracting org ID: %w", err)
756+
}
743757
var reader streams.RowReader
744758
defer reader.Close()
745759

746760
buf := make([]streams.Stream, 1024)
747761

748-
for _, section := range object.Sections() {
749-
if !streams.CheckSection(section) {
762+
for _, section := range object.Sections().Filter(streams.CheckSection) {
763+
if section.Tenant != targetTenant {
750764
continue
751765
}
752766

@@ -779,12 +793,20 @@ func forEachStream(ctx context.Context, object *dataobj.Object, predicate stream
779793
}
780794

781795
func forEachObjPointer(ctx context.Context, object *dataobj.Object, predicate pointers.RowPredicate, matchIDs []int64, f func(pointers.SectionPointer)) error {
796+
targetTenant, err := user.ExtractOrgID(ctx)
797+
if err != nil {
798+
return fmt.Errorf("extracting org ID: %w", err)
799+
}
782800
var reader pointers.RowReader
783801
defer reader.Close()
784802

785803
buf := make([]pointers.SectionPointer, 1024)
786804

787805
for _, section := range object.Sections().Filter(pointers.CheckSection) {
806+
if section.Tenant != targetTenant {
807+
continue
808+
}
809+
788810
sec, err := pointers.Open(ctx, section)
789811
if err != nil {
790812
return fmt.Errorf("opening section: %w", err)

pkg/dataobj/metastore/object_test.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616

1717
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
1818
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
19-
"github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy"
2019
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
2120
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
2221
"github.com/grafana/loki/v3/pkg/logproto"
@@ -268,12 +267,27 @@ func TestSectionsForStreamMatchers(t *testing.T) {
268267
UncompressedSize: 0,
269268
})
270269
require.NoError(t, err)
271-
err = builder.ObserveLogLine(tenantID, "test-path", 0, newIdx, int64(i), ts.Entries[0].Timestamp, int64(len(ts.Entries[0].Line)))
270+
err = builder.ObserveLogLine(tenantID, "test-path", 1, newIdx, int64(i), ts.Entries[0].Timestamp, int64(len(ts.Entries[0].Line)))
272271
require.NoError(t, err)
273272
}
274273

274+
// Add one more stream for a different tenant to ensure it is not resolved.
275+
altTenant := "tenant-alt"
276+
altTenantSection := int64(99) // Emulate a different section from a log object that doesn't collide with the main tenant's section
277+
newIdx, err := builder.AppendStream(altTenant, streams.Stream{
278+
ID: 1,
279+
Labels: labels.New(labels.Label{Name: "app", Value: "foo"}, labels.Label{Name: "tenant", Value: altTenant}),
280+
MinTimestamp: now.Add(-3 * time.Hour),
281+
MaxTimestamp: now.Add(-2 * time.Hour),
282+
UncompressedSize: 5,
283+
})
284+
require.NoError(t, err)
285+
err = builder.ObserveLogLine(altTenant, "test-path", altTenantSection, newIdx, 1, now.Add(-2*time.Hour), 5)
286+
require.NoError(t, err)
287+
288+
// Build and store the object
275289
timeRanges := builder.TimeRanges()
276-
require.Len(t, timeRanges, 1)
290+
require.Len(t, timeRanges, 2)
277291

278292
obj, closer, err := builder.Flush()
279293
require.NoError(t, err)
@@ -288,14 +302,7 @@ func TestSectionsForStreamMatchers(t *testing.T) {
288302
require.NoError(t, err)
289303

290304
metastoreTocWriter := NewTableOfContentsWriter(bucket, log.NewNopLogger())
291-
292-
err = metastoreTocWriter.WriteEntry(context.Background(), path, []multitenancy.TimeRange{
293-
{
294-
Tenant: tenantID,
295-
MinTime: timeRanges[0].MinTime,
296-
MaxTime: timeRanges[0].MaxTime,
297-
},
298-
})
305+
err = metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges)
299306
require.NoError(t, err)
300307

301308
mstore := NewObjectMetastore(bucket, log.NewNopLogger(), prometheus.NewPedanticRegistry())
@@ -345,6 +352,9 @@ func TestSectionsForStreamMatchers(t *testing.T) {
345352
sections, err := mstore.Sections(ctx, now.Add(-time.Hour), now.Add(time.Hour), tt.matchers, tt.predicates)
346353
require.NoError(t, err)
347354
require.Len(t, sections, tt.wantCount)
355+
for _, section := range sections {
356+
require.NotEqual(t, section.SectionIdx, altTenantSection)
357+
}
348358
})
349359
}
350360
}

pkg/dataobj/querier/metadata_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestStore_SelectSeries(t *testing.T) {
2525
ctx = user.InjectOrgID(ctx, testTenant)
2626

2727
// Setup test data
28-
now := setupTestData(ctx, t, builder)
28+
now := setupTestData(ctx, t, builder, testTenant)
2929
meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil)
3030
store := NewStore(builder.bucket, log.NewNopLogger(), meta)
3131

@@ -170,7 +170,7 @@ func TestStore_LabelNamesForMetricName(t *testing.T) {
170170
ctx = user.InjectOrgID(ctx, testTenant)
171171

172172
// Setup test data
173-
now := setupTestData(ctx, t, builder)
173+
now := setupTestData(ctx, t, builder, testTenant)
174174
meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil)
175175
store := NewStore(builder.bucket, log.NewNopLogger(), meta)
176176

@@ -240,7 +240,7 @@ func TestStore_LabelValuesForMetricName(t *testing.T) {
240240
ctx = user.InjectOrgID(ctx, testTenant)
241241

242242
// Setup test data
243-
now := setupTestData(ctx, t, builder)
243+
now := setupTestData(ctx, t, builder, testTenant)
244244
meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil)
245245
store := NewStore(builder.bucket, log.NewNopLogger(), meta)
246246

pkg/dataobj/querier/store.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/go-kit/log"
1414
"github.com/go-kit/log/level"
15+
"github.com/grafana/dskit/user"
1516
"github.com/prometheus/common/model"
1617
"github.com/prometheus/prometheus/model/labels"
1718
"github.com/thanos-io/objstore"
@@ -462,10 +463,7 @@ func shardObjects(
462463
func findLogsSection(ctx context.Context, obj *dataobj.Object, index int) (*logs.Section, error) {
463464
var count int
464465

465-
for _, section := range obj.Sections() {
466-
if !logs.CheckSection(section) {
467-
continue
468-
}
466+
for _, section := range obj.Sections().Filter(logs.CheckSection) {
469467
if count == index {
470468
return logs.Open(ctx, section)
471469
}
@@ -476,8 +474,13 @@ func findLogsSection(ctx context.Context, obj *dataobj.Object, index int) (*logs
476474
}
477475

478476
func findStreamsSection(ctx context.Context, obj *dataobj.Object) (*streams.Section, error) {
479-
for _, section := range obj.Sections() {
480-
if !streams.CheckSection(section) {
477+
targetTenant, err := user.ExtractOrgID(ctx)
478+
if err != nil {
479+
return nil, fmt.Errorf("extracting org ID: %w", err)
480+
}
481+
482+
for _, section := range obj.Sections().Filter(streams.CheckSection) {
483+
if section.Tenant != targetTenant {
481484
continue
482485
}
483486
return streams.Open(ctx, section)
@@ -636,12 +639,20 @@ func fetchSectionsStats(ctx context.Context, objects []object) ([]sectionsStats,
636639
))
637640
defer sp.AddEvent("fetched metadata")
638641

642+
targetTenant, err := user.ExtractOrgID(ctx)
643+
if err != nil {
644+
return nil, fmt.Errorf("extracting org ID: %w", err)
645+
}
646+
639647
res := make([]sectionsStats, 0, len(objects))
640648

641649
for _, obj := range objects {
642650
var stats sectionsStats
643651

644652
for _, section := range obj.Sections() {
653+
if section.Tenant != targetTenant {
654+
continue
655+
}
645656
switch {
646657
case streams.CheckSection(section):
647658
stats.StreamsSections++

pkg/dataobj/querier/store_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestStore_SelectSamples(t *testing.T) {
4343
ctx = user.InjectOrgID(ctx, testTenant)
4444

4545
// Setup test data
46-
now := setupTestData(ctx, t, builder)
46+
now := setupTestData(ctx, t, builder, testTenant)
4747
meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil)
4848
store := NewStore(builder.bucket, log.NewNopLogger(), meta)
4949

@@ -206,7 +206,7 @@ func TestStore_SelectLogs(t *testing.T) {
206206
ctx = user.InjectOrgID(ctx, testTenant)
207207

208208
// Setup test data
209-
now := setupTestData(ctx, t, builder)
209+
now := setupTestData(ctx, t, builder, testTenant)
210210
meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil)
211211
store := NewStore(builder.bucket, log.NewLogfmtLogger(os.Stdout), meta)
212212

@@ -370,13 +370,13 @@ func TestStore_SelectLogs(t *testing.T) {
370370
}
371371
}
372372

373-
func setupTestData(ctx context.Context, t *testing.T, builder *testDataBuilder) time.Time {
373+
func setupTestData(ctx context.Context, t *testing.T, builder *testDataBuilder, tenant string) time.Time {
374374
t.Helper()
375375
now := time.Unix(0, int64(time.Hour)).UTC()
376376

377377
// Data before the query range (should not be included in results)
378378
builder.addStream(
379-
"tenant",
379+
tenant,
380380
`{app="foo", env="prod"}`,
381381
logproto.Entry{Timestamp: now.Add(-2 * time.Hour), Line: "foo_before1"},
382382
logproto.Entry{Timestamp: now.Add(-2 * time.Hour).Add(30 * time.Second), Line: "foo_before2"},
@@ -386,15 +386,15 @@ func setupTestData(ctx context.Context, t *testing.T, builder *testDataBuilder)
386386

387387
// Data within query range
388388
builder.addStream(
389-
"tenant",
389+
tenant,
390390
`{app="foo", env="prod"}`,
391391
logproto.Entry{Timestamp: now, Line: "foo1"},
392392
logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"},
393393
logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"},
394394
logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"},
395395
)
396396
builder.addStream(
397-
"tenant",
397+
tenant,
398398
`{app="foo", env="dev"}`,
399399
logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"},
400400
logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"},
@@ -403,15 +403,15 @@ func setupTestData(ctx context.Context, t *testing.T, builder *testDataBuilder)
403403
builder.flush(ctx)
404404

405405
builder.addStream(
406-
"tenant",
406+
tenant,
407407
`{app="bar", env="prod"}`,
408408
logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"},
409409
logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"},
410410
logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"},
411411
logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"},
412412
)
413413
builder.addStream(
414-
"tenant",
414+
tenant,
415415
`{app="bar", env="dev"}`,
416416
logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"},
417417
logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"},
@@ -420,7 +420,7 @@ func setupTestData(ctx context.Context, t *testing.T, builder *testDataBuilder)
420420
builder.flush(ctx)
421421

422422
builder.addStream(
423-
"tenant",
423+
tenant,
424424
`{app="baz", env="prod", team="a"}`,
425425
logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"},
426426
logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"},
@@ -431,7 +431,7 @@ func setupTestData(ctx context.Context, t *testing.T, builder *testDataBuilder)
431431

432432
// Data after the query range (should not be included in results)
433433
builder.addStream(
434-
"tenant",
434+
tenant,
435435
`{app="foo", env="prod"}`,
436436
logproto.Entry{Timestamp: now.Add(2 * time.Hour), Line: "foo_after1"},
437437
logproto.Entry{Timestamp: now.Add(2 * time.Hour).Add(30 * time.Second), Line: "foo_after2"},

pkg/engine/executor/executor.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/apache/arrow-go/v18/arrow/memory"
99
"github.com/go-kit/log"
1010
"github.com/go-kit/log/level"
11+
"github.com/grafana/dskit/user"
1112
"github.com/thanos-io/objstore"
1213
"go.opentelemetry.io/otel"
1314
"go.opentelemetry.io/otel/attribute"
@@ -128,7 +129,16 @@ func (c *Context) executeDataObjScan(ctx context.Context, node *physical.DataObj
128129
logsSection *logs.Section
129130
)
130131

132+
tenant, err := user.ExtractOrgID(ctx)
133+
if err != nil {
134+
return errorPipeline(ctx, fmt.Errorf("missing org ID: %w", err))
135+
}
136+
131137
for _, sec := range obj.Sections().Filter(streams.CheckSection) {
138+
if sec.Tenant != tenant {
139+
continue
140+
}
141+
132142
if streamsSection != nil {
133143
return errorPipeline(ctx, fmt.Errorf("multiple streams sections found in data object %q", node.Location))
134144
}

0 commit comments

Comments
 (0)