Skip to content

Commit 0a9b319

Browse files
committed
tmp
1 parent 593100c commit 0a9b319

File tree

3 files changed

+160
-17
lines changed

3 files changed

+160
-17
lines changed

pkg/dataobj/metastore/object.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,9 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject
513513
return nil, nil
514514
}
515515

516+
sStart := scalar.NewTimestampScalar(arrow.Timestamp(timeRangePredicate.Start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
517+
sEnd := scalar.NewTimestampScalar(arrow.Timestamp(timeRangePredicate.End.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
518+
516519
timer := prometheus.NewTimer(m.metrics.streamFilterTotalDuration)
517520
defer timer.ObserveDuration()
518521

@@ -543,19 +546,8 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject
543546

544547
objectSectionDescriptors := make(map[SectionKey]*DataobjSectionDescriptor)
545548
sectionPointerReadTimer := prometheus.NewTimer(m.metrics.streamFilterPointersReadDuration)
546-
err = forEachObjPointer(ctx, indexObject, timeRangePredicate, matchingStreamIDs, func(pointer pointers.SectionPointer) {
547-
key.ObjectPath = pointer.Path
548-
key.SectionIdx = pointer.Section
549-
550-
sectionDescriptor, ok := objectSectionDescriptors[key]
551-
if !ok {
552-
objectSectionDescriptors[key] = NewSectionDescriptor(pointer)
553-
return
554-
}
555-
sectionDescriptor.Merge(pointer)
556-
})
557549

558-
//err = forEachStreamSectionPointer(ctx, indexObject, timeRangePredicate.Start, timeRangePredicate.End, matchingStreamIDs, func(pointer pointers.SectionPointer) {
550+
//err = forEachObjPointer(ctx, indexObject, timeRangePredicate, matchingStreamIDs, func(pointer pointers.SectionPointer) {
559551
// key.ObjectPath = pointer.Path
560552
// key.SectionIdx = pointer.Section
561553
//
@@ -567,6 +559,18 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject
567559
// sectionDescriptor.Merge(pointer)
568560
//})
569561

562+
err = forEachStreamSectionPointer(ctx, indexObject, sStart, sEnd, matchingStreamIDs, func(pointer pointers.SectionPointer) {
563+
key.ObjectPath = pointer.Path
564+
key.SectionIdx = pointer.Section
565+
566+
sectionDescriptor, ok := objectSectionDescriptors[key]
567+
if !ok {
568+
objectSectionDescriptors[key] = NewSectionDescriptor(pointer)
569+
return
570+
}
571+
sectionDescriptor.Merge(pointer)
572+
})
573+
570574
if err != nil {
571575
return fmt.Errorf("reading section pointers from index: %w", err)
572576
}
@@ -780,16 +784,14 @@ func forEachStream(ctx context.Context, object *dataobj.Object, predicate stream
780784
return nil
781785
}
782786

783-
func forEachStreamSectionPointer(ctx context.Context, indexObj *dataobj.Object, start, end time.Time, streamIDs []int64, f func(pointers.SectionPointer)) error {
787+
func forEachStreamSectionPointer(ctx context.Context, indexObj *dataobj.Object, sStart, sEnd *scalar.Timestamp, streamIDs []int64, f func(pointers.SectionPointer)) error {
784788
targetTenant, err := user.ExtractOrgID(ctx)
785789
if err != nil {
786790
return fmt.Errorf("extracting org ID: %w", err)
787791
}
788792
var reader pointers.Reader
789793
defer reader.Close()
790794

791-
sStart := scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
792-
sEnd := scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
793795
var sStreamIDs []scalar.Scalar
794796
for _, streamID := range streamIDs {
795797
sStreamIDs = append(sStreamIDs, scalar.NewInt64Scalar(streamID))
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package metastore
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/go-kit/log"
9+
"github.com/grafana/dskit/user"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/prometheus/model/labels"
12+
"github.com/stretchr/testify/require"
13+
"github.com/thanos-io/objstore"
14+
15+
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
16+
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
17+
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
18+
"github.com/grafana/loki/v3/pkg/logql/syntax"
19+
)
20+
21+
type readSectionsBenchmarkParams struct {
22+
name string
23+
indexFilesNum int
24+
}
25+
26+
func BenchmarkReadSections(b *testing.B) {
27+
benchmarks := []readSectionsBenchmarkParams{
28+
{
29+
name: "single index file",
30+
indexFilesNum: 1,
31+
},
32+
{
33+
name: "multiple index files",
34+
indexFilesNum: 200,
35+
},
36+
}
37+
for _, bm := range benchmarks {
38+
benchmarkReadSections(b, bm)
39+
}
40+
}
41+
42+
func benchmarkReadSections(b *testing.B, bm readSectionsBenchmarkParams) {
43+
b.Run(bm.name, func(b *testing.B) {
44+
ctx := context.Background()
45+
bucket := objstore.NewInMemBucket()
46+
47+
objUploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger())
48+
require.NoError(b, objUploader.RegisterMetrics(prometheus.NewPedanticRegistry()))
49+
50+
metastoreTocWriter := NewTableOfContentsWriter(bucket, log.NewNopLogger())
51+
52+
// Calculate how many streams per index file
53+
streamsPerIndex := len(testStreams) / bm.indexFilesNum
54+
if streamsPerIndex == 0 {
55+
streamsPerIndex = 1
56+
}
57+
58+
// Track global stream ID counter across all index files
59+
globalStreamID := int64(0)
60+
61+
// Create multiple index files
62+
for fileIdx := 0; fileIdx < bm.indexFilesNum; fileIdx++ {
63+
// Create index builder for this file
64+
builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{
65+
TargetPageSize: 1024 * 1024,
66+
TargetObjectSize: 10 * 1024 * 1024,
67+
TargetSectionSize: 128,
68+
BufferSize: 1024 * 1024,
69+
SectionStripeMergeLimit: 2,
70+
}, nil)
71+
require.NoError(b, err)
72+
73+
// Determine which streams to add to this index file
74+
// Use modulo to cycle through testStreams if we need more entries than available
75+
startIdx := fileIdx * streamsPerIndex
76+
endIdx := startIdx + streamsPerIndex
77+
if fileIdx == bm.indexFilesNum-1 {
78+
// Last file gets all remaining streams needed to reach the desired count
79+
endIdx = startIdx + streamsPerIndex + (len(testStreams)-endIdx%len(testStreams))%len(testStreams)
80+
}
81+
82+
// Add test streams to this index file, cycling through testStreams if necessary
83+
for i := startIdx; i < endIdx; i++ {
84+
streamIdx := i % len(testStreams)
85+
ts := testStreams[streamIdx]
86+
lbls, err := syntax.ParseLabels(ts.Labels)
87+
require.NoError(b, err)
88+
89+
newIdx, err := builder.AppendStream(tenantID, streams.Stream{
90+
ID: globalStreamID,
91+
Labels: lbls,
92+
MinTimestamp: ts.Entries[0].Timestamp,
93+
MaxTimestamp: ts.Entries[0].Timestamp,
94+
UncompressedSize: 0,
95+
})
96+
require.NoError(b, err)
97+
98+
err = builder.ObserveLogLine(tenantID, "test-path", int64(fileIdx+1), newIdx, globalStreamID, ts.Entries[0].Timestamp, int64(len(ts.Entries[0].Line)))
99+
require.NoError(b, err)
100+
101+
globalStreamID++
102+
}
103+
104+
// Build and store the index object
105+
timeRanges := builder.TimeRanges()
106+
obj, closer, err := builder.Flush()
107+
require.NoError(b, err)
108+
b.Cleanup(func() { _ = closer.Close() })
109+
110+
path, err := objUploader.Upload(context.Background(), obj)
111+
require.NoError(b, err)
112+
113+
err = metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges)
114+
require.NoError(b, err)
115+
}
116+
117+
// Create the metastore instance
118+
mstore := NewObjectMetastore(bucket, log.NewNopLogger(), nil)
119+
120+
// Prepare benchmark parameters
121+
benchCtx := user.InjectOrgID(ctx, tenantID)
122+
start := now.Add(-5 * time.Hour)
123+
end := now.Add(5 * time.Hour)
124+
matchers := []*labels.Matcher{
125+
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
126+
}
127+
128+
b.ResetTimer()
129+
b.ReportAllocs()
130+
131+
// Run the benchmark
132+
for range b.N {
133+
sections, err := mstore.Sections(benchCtx, start, end, matchers, nil)
134+
require.NoError(b, err)
135+
require.NotEmpty(b, sections)
136+
}
137+
138+
// Stop timer before cleanup
139+
b.StopTimer()
140+
})
141+
}

pkg/dataobj/metastore/object_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ var (
6565

6666
// Similar to store_test.go -- we need a populated dataobj/builder/metastore to test labels and values
6767
type testDataBuilder struct {
68-
t *testing.T
68+
t testing.TB
6969
bucket objstore.Bucket
7070

7171
builder *logsobj.Builder
@@ -499,7 +499,7 @@ func queryMetastore(t *testing.T, tenant string, mfunc func(context.Context, tim
499499
mfunc(ctx, start, end, mstore)
500500
}
501501

502-
func newTestDataBuilder(t *testing.T) *testDataBuilder {
502+
func newTestDataBuilder(t testing.TB) *testDataBuilder {
503503
bucket := objstore.NewInMemBucket()
504504

505505
builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{

0 commit comments

Comments
 (0)