diff --git a/pkg/dataobj/consumer/logsobj/builder.go b/pkg/dataobj/consumer/logsobj/builder.go index 6bf3d38c21982..e69128b5d09b6 100644 --- a/pkg/dataobj/consumer/logsobj/builder.go +++ b/pkg/dataobj/consumer/logsobj/builder.go @@ -127,10 +127,10 @@ type BuilderConfig struct { // RegisterFlagsWithPrefix registers flags with the given prefix. func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // Set defaults for base builder configuration - _ = cfg.BuilderBaseConfig.TargetPageSize.Set("2MB") - _ = cfg.BuilderBaseConfig.TargetObjectSize.Set("1GB") - _ = cfg.BuilderBaseConfig.BufferSize.Set("16MB") - _ = cfg.BuilderBaseConfig.TargetSectionSize.Set("128MB") + _ = cfg.TargetPageSize.Set("2MB") + _ = cfg.TargetObjectSize.Set("1GB") + _ = cfg.BufferSize.Set("16MB") + _ = cfg.TargetSectionSize.Set("128MB") cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f) f.StringVar(&cfg.DataobjSortOrder, prefix+"dataobj-sort-order", sortStreamASC, "The desired sort order of the logs section. Can either be `stream-asc` (order by streamID ascending and timestamp descending) or `timestamp-desc` (order by timestamp descending and streamID ascending).") diff --git a/pkg/dataobj/index/config.go b/pkg/dataobj/index/config.go index 592a3959ddf68..ed4c1ff9e060c 100644 --- a/pkg/dataobj/index/config.go +++ b/pkg/dataobj/index/config.go @@ -21,10 +21,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // Set defaults for base builder configuration - _ = cfg.BuilderBaseConfig.TargetPageSize.Set("128KB") - _ = cfg.BuilderBaseConfig.TargetObjectSize.Set("64MB") - _ = cfg.BuilderBaseConfig.BufferSize.Set("2MB") - _ = cfg.BuilderBaseConfig.TargetSectionSize.Set("16MB") + _ = cfg.TargetPageSize.Set("128KB") + _ = cfg.TargetObjectSize.Set("64MB") + _ = cfg.BufferSize.Set("2MB") + _ = cfg.TargetSectionSize.Set("16MB") cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f) f.IntVar(&cfg.EventsPerIndex, prefix+"events-per-index", 32, "Experimental: The number of events to batch before building an index") diff --git a/pkg/dataobj/metastore/iter.go b/pkg/dataobj/metastore/iter.go new file mode 100644 index 0000000000000..018c4c467960d --- /dev/null +++ b/pkg/dataobj/metastore/iter.go @@ -0,0 +1,197 @@ +package metastore + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/scalar" + "github.com/grafana/dskit/user" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" +) + +// forEachStreamSectionPointer iterates over all the section pointers that point to one of the +// [streamIDs] in a given [indexObj] that overlap [sStart, sEnd] (inclusive) time range and +// calls [f] on every found SectionPointer. +func forEachStreamSectionPointer( + ctx context.Context, + indexObj *dataobj.Object, + sStart, sEnd *scalar.Timestamp, + streamIDs []int64, + f func(pointers.SectionPointer), +) error { + targetTenant, err := user.ExtractOrgID(ctx) + if err != nil { + return fmt.Errorf("extracting org ID: %w", err) + } + var reader pointers.Reader + defer reader.Close() + + // prepare streamIDs scalars only once, they are used in the predicate later + var sStreamIDs []scalar.Scalar + for _, streamID := range streamIDs { + sStreamIDs = append(sStreamIDs, scalar.NewInt64Scalar(streamID)) + } + + const batchSize = 128 + buf := make([]pointers.SectionPointer, batchSize) + + // iterate over the sections and fill buf column by column + // once the read operation is over invoke client's [f] on every read row (numRows not always the same as len(buf)) + for _, section := range indexObj.Sections().Filter(pointers.CheckSection) { + if section.Tenant != targetTenant { + continue + } + + sec, err := pointers.Open(ctx, section) + if err != nil { + return fmt.Errorf("opening section: %w", err) + } + + pointerCols, err := findPointersColumnsByTypes( + sec.Columns(), + pointers.ColumnTypePath, + pointers.ColumnTypeSection, + pointers.ColumnTypeStreamID, + pointers.ColumnTypeStreamIDRef, + pointers.ColumnTypeMinTimestamp, + pointers.ColumnTypeMaxTimestamp, + pointers.ColumnTypeRowCount, + pointers.ColumnTypeUncompressedSize, + ) + if err != nil { + return fmt.Errorf("finding pointers columns: %w", err) + } + + var ( + colStreamID *pointers.Column + colMinTimestamp *pointers.Column + colMaxTimestamp *pointers.Column + ) + + for _, c := range pointerCols { + if c.Type == pointers.ColumnTypeStreamID { + colStreamID = c + } + if c.Type == pointers.ColumnTypeMinTimestamp { + colMinTimestamp = c + } + if c.Type == pointers.ColumnTypeMaxTimestamp { + colMaxTimestamp = c + } + if colStreamID != nil && colMinTimestamp != nil && colMaxTimestamp != nil { + break + } + } + + if colStreamID == nil || colMinTimestamp == nil || colMaxTimestamp == nil { + return fmt.Errorf( + "one of mandatory columns is missing: (streamID=%t, minTimestamp=%t, maxTimestamp=%t)", + colStreamID == nil, colMinTimestamp == nil, colMaxTimestamp == nil, + ) + } + + reader.Reset(pointers.ReaderOptions{ + Columns: pointerCols, + Predicates: []pointers.Predicate{ + pointers.WhereTimeRangeOverlapsWith(colMinTimestamp, colMaxTimestamp, sStart, sEnd), + pointers.InPredicate{ + Column: colStreamID, + Values: sStreamIDs, + }, + }, + }) + + for { + rec, readErr := reader.Read(ctx, batchSize) + if readErr != nil && !errors.Is(readErr, io.EOF) { + return fmt.Errorf("reading recordBatch: %w", readErr) + } + numRows := int(rec.NumRows()) + if numRows == 0 && errors.Is(readErr, io.EOF) { + break + } + + for colIdx := range int(rec.NumCols()) { + col := rec.Column(colIdx) + pointerCol := pointerCols[colIdx] + + switch pointerCol.Type { + case pointers.ColumnTypePath: + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].Path = col.(*array.String).Value(rIdx) + } + case pointers.ColumnTypeSection: + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].Section = col.(*array.Int64).Value(rIdx) + } + case pointers.ColumnTypeStreamID: + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].StreamID = col.(*array.Int64).Value(rIdx) + } + case pointers.ColumnTypeStreamIDRef: + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].StreamIDRef = col.(*array.Int64).Value(rIdx) + } + case pointers.ColumnTypeMinTimestamp: + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].StartTs = time.Unix(0, int64(col.(*array.Timestamp).Value(rIdx))) + } + case pointers.ColumnTypeMaxTimestamp: + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].EndTs = time.Unix(0, int64(col.(*array.Timestamp).Value(rIdx))) + } + case pointers.ColumnTypeRowCount: + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].LineCount = col.(*array.Int64).Value(rIdx) + } + case pointers.ColumnTypeUncompressedSize: + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].UncompressedSize = col.(*array.Int64).Value(rIdx) + } + default: + continue + } + } + + for rowIdx := range numRows { + f(buf[rowIdx]) + } + + if errors.Is(readErr, io.EOF) { + break + } + } + } + + return nil +} diff --git a/pkg/dataobj/metastore/object.go b/pkg/dataobj/metastore/object.go index 43b2105888cec..3576313d27770 100644 --- a/pkg/dataobj/metastore/object.go +++ b/pkg/dataobj/metastore/object.go @@ -14,6 +14,8 @@ import ( "sync" "time" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/scalar" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/tenant" @@ -206,11 +208,7 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma // Search the stream sections of the matching objects to find matching streams streamMatchers := streamPredicateFromMatchers(start, end, matchers...) - pointerPredicate := pointers.TimeRangeRowPredicate{ - Start: start, - End: end, - } - streamSectionPointers, err := m.getSectionsForStreams(ctx, indexObjects, streamMatchers, pointerPredicate) + streamSectionPointers, err := m.getSectionsForStreams(ctx, indexObjects, streamMatchers, start, end) if err != nil { return nil, err } @@ -504,12 +502,15 @@ func (m *ObjectMetastore) listStreamIDsFromLogObjects(ctx context.Context, objec // getSectionsForStreams reads the section data from matching streams and aggregates them into section descriptors. // This is an exact lookup and includes metadata from the streams in each section: the stream IDs, the min-max timestamps, the number of bytes & number of lines. -func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, timeRangePredicate pointers.TimeRangeRowPredicate) ([]*DataobjSectionDescriptor, error) { +func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, start, end time.Time) ([]*DataobjSectionDescriptor, error) { if streamPredicate == nil { // At least one stream matcher is required, currently. return nil, nil } + sStart := scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns) + sEnd := scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns) + timer := prometheus.NewTimer(m.metrics.streamFilterTotalDuration) defer timer.ObserveDuration() @@ -540,7 +541,8 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject objectSectionDescriptors := make(map[SectionKey]*DataobjSectionDescriptor) sectionPointerReadTimer := prometheus.NewTimer(m.metrics.streamFilterPointersReadDuration) - err = forEachObjPointer(ctx, indexObject, timeRangePredicate, matchingStreamIDs, func(pointer pointers.SectionPointer) { + + err = forEachStreamSectionPointer(ctx, indexObject, sStart, sEnd, matchingStreamIDs, func(pointer pointers.SectionPointer) { key.ObjectPath = pointer.Path key.SectionIdx = pointer.Section @@ -551,6 +553,7 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject } sectionDescriptor.Merge(pointer) }) + if err != nil { return fmt.Errorf("reading section pointers from index: %w", err) } @@ -827,3 +830,19 @@ func dedupeAndSort(objects [][]string) []string { sort.Strings(paths) return paths } + +func findPointersColumnsByTypes(allColumns []*pointers.Column, columnTypes ...pointers.ColumnType) ([]*pointers.Column, error) { + result := make([]*pointers.Column, 0, len(columnTypes)) + + for _, c := range allColumns { + for _, neededType := range columnTypes { + if neededType != c.Type { + continue + } + + result = append(result, c) + } + } + + return result, nil +} diff --git a/pkg/dataobj/metastore/object_bench_test.go b/pkg/dataobj/metastore/object_bench_test.go new file mode 100644 index 0000000000000..500190faee85f --- /dev/null +++ b/pkg/dataobj/metastore/object_bench_test.go @@ -0,0 +1,142 @@ +package metastore + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/user" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" + "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" + "github.com/grafana/loki/v3/pkg/dataobj/uploader" + "github.com/grafana/loki/v3/pkg/logql/syntax" +) + +type readSectionsBenchmarkParams struct { + name string + indexFilesNum int +} + +func BenchmarkReadSections(b *testing.B) { + benchmarks := []readSectionsBenchmarkParams{ + { + name: "single index file", + indexFilesNum: 1, + }, + { + name: "multiple index files", + indexFilesNum: 200, + }, + } + for _, bm := range benchmarks { + benchmarkReadSections(b, bm) + } +} + +func benchmarkReadSections(b *testing.B, bm readSectionsBenchmarkParams) { + b.Run(bm.name, func(b *testing.B) { + ctx := context.Background() + bucket := objstore.NewInMemBucket() + + objUploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger()) + require.NoError(b, objUploader.RegisterMetrics(prometheus.NewPedanticRegistry())) + + metastoreTocWriter := NewTableOfContentsWriter(bucket, log.NewNopLogger()) + + // Calculate how many streams per index file + streamsPerIndex := len(testStreams) / bm.indexFilesNum + if streamsPerIndex == 0 { + streamsPerIndex = 1 + } + + // Track global stream ID counter across all index files + globalStreamID := int64(0) + + // Create multiple index files + for fileIdx := 0; fileIdx < bm.indexFilesNum; fileIdx++ { + // Create index builder for this file + builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ + TargetPageSize: 1024 * 1024, + TargetObjectSize: 10 * 1024 * 1024, + TargetSectionSize: 128, + BufferSize: 1024 * 1024, + SectionStripeMergeLimit: 2, + }, nil) + require.NoError(b, err) + + // Determine which streams to add to this index file + // Use modulo to cycle through testStreams if we need more entries than available + startIdx := fileIdx * streamsPerIndex + endIdx := startIdx + streamsPerIndex + if fileIdx == bm.indexFilesNum-1 { + // Last file gets all remaining streams needed to reach the desired count + endIdx = startIdx + streamsPerIndex + (len(testStreams)-endIdx%len(testStreams))%len(testStreams) + } + + // Add test streams to this index file, cycling through testStreams if necessary + for i := startIdx; i < endIdx; i++ { + streamIdx := i % len(testStreams) + ts := testStreams[streamIdx] + lbls, err := syntax.ParseLabels(ts.Labels) + require.NoError(b, err) + + newIdx, err := builder.AppendStream(tenantID, streams.Stream{ + ID: globalStreamID, + Labels: lbls, + MinTimestamp: ts.Entries[0].Timestamp, + MaxTimestamp: ts.Entries[0].Timestamp, + UncompressedSize: 0, + }) + require.NoError(b, err) + + err = builder.ObserveLogLine(tenantID, "test-path", int64(fileIdx+1), newIdx, globalStreamID, ts.Entries[0].Timestamp, int64(len(ts.Entries[0].Line))) + require.NoError(b, err) + + globalStreamID++ + } + + // Build and store the index object + timeRanges := builder.TimeRanges() + obj, closer, err := builder.Flush() + require.NoError(b, err) + b.Cleanup(func() { _ = closer.Close() }) + + path, err := objUploader.Upload(context.Background(), obj) + require.NoError(b, err) + + err = metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges) + require.NoError(b, err) + } + + // Create the metastore instance + mstore := NewObjectMetastore(bucket, log.NewNopLogger(), nil) + + // Prepare benchmark parameters + benchCtx := user.InjectOrgID(ctx, tenantID) + start := now.Add(-5 * time.Hour) + end := now.Add(5 * time.Hour) + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), + } + + b.ResetTimer() + b.ReportAllocs() + + // Run the benchmark + for range b.N { + sections, err := mstore.Sections(benchCtx, start, end, matchers, nil) + require.NoError(b, err) + require.NotEmpty(b, sections) + } + + // Stop timer before cleanup + b.StopTimer() + }) +} diff --git a/pkg/dataobj/metastore/object_test.go b/pkg/dataobj/metastore/object_test.go index fada6796c3f74..00ba99ee7ce82 100644 --- a/pkg/dataobj/metastore/object_test.go +++ b/pkg/dataobj/metastore/object_test.go @@ -65,7 +65,7 @@ var ( // Similar to store_test.go -- we need a populated dataobj/builder/metastore to test labels and values type testDataBuilder struct { - t *testing.T + t testing.TB bucket objstore.Bucket builder *logsobj.Builder @@ -499,7 +499,7 @@ func queryMetastore(t *testing.T, tenant string, mfunc func(context.Context, tim mfunc(ctx, start, end, mstore) } -func newTestDataBuilder(t *testing.T) *testDataBuilder { +func newTestDataBuilder(t testing.TB) *testDataBuilder { bucket := objstore.NewInMemBucket() builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ diff --git a/pkg/dataobj/sections/pointers/predicate.go b/pkg/dataobj/sections/pointers/predicate.go new file mode 100644 index 0000000000000..adf4f04963ec5 --- /dev/null +++ b/pkg/dataobj/sections/pointers/predicate.go @@ -0,0 +1,148 @@ +package pointers + +import "github.com/apache/arrow-go/v18/arrow/scalar" + +// Predicate is an expression used to filter column values in a [Reader]. +type Predicate interface{ isPredicate() } + +// Supported predicates. +type ( + // An AndPredicate is a [Predicate] which asserts that a row may only be + // included if both the Left and Right Predicate are true. + AndPredicate struct{ Left, Right Predicate } + + // An OrPredicate is a [Predicate] which asserts that a row may only be + // included if either the Left or Right Predicate are true. + OrPredicate struct{ Left, Right Predicate } + + // A NotePredicate is a [Predicate] which asserts that a row may only be + // included if the inner Predicate is false. + NotPredicate struct{ Inner Predicate } + + // TruePredicate is a [Predicate] which always returns true. + TruePredicate struct{} + + // FalsePredicate is a [Predicate] which always returns false. + FalsePredicate struct{} + + // An EqualPredicate is a [Predicate] which asserts that a row may only be + // included if the Value of the Column is equal to the Value. + EqualPredicate struct { + Column *Column // Column to check. + Value scalar.Scalar // Value to check equality for. + } + + // An InPredicate is a [Predicate] which asserts that a row may only be + // included if the Value of the Column is present in the provided Values. + InPredicate struct { + Column *Column // Column to check. + Values []scalar.Scalar // Values to check for inclusion. + } + + // A GreaterThanPredicate is a [Predicate] which asserts that a row may only + // be included if the Value of the Column is greater than the provided Value. + GreaterThanPredicate struct { + Column *Column // Column to check. + Value scalar.Scalar // Value for which rows in Column must be greater than. + } + + // A LessThanPredicate is a [Predicate] which asserts that a row may only be + // included if the Value of the Column is less than the provided Value. + LessThanPredicate struct { + Column *Column // Column to check. + Value scalar.Scalar // Value for which rows in Column must be less than. + } + + // FuncPredicate is a [Predicate] which asserts that a row may only be + // included if the Value of the Column passes the Keep function. + // + // Instances of FuncPredicate are ineligible for page filtering and should + // only be used when there isn't a more explicit Predicate implementation. + FuncPredicate struct { + Column *Column // Column to check. + + // Keep is invoked with the column and value pair to check. Keep is given + // the Column instance to allow for reusing the same function across + // multiple columns, if necessary. + // + // If Keep returns true, the row is kept. + Keep func(column *Column, value scalar.Scalar) bool + } +) + +func (AndPredicate) isPredicate() {} +func (OrPredicate) isPredicate() {} +func (NotPredicate) isPredicate() {} +func (TruePredicate) isPredicate() {} +func (FalsePredicate) isPredicate() {} +func (EqualPredicate) isPredicate() {} +func (InPredicate) isPredicate() {} +func (GreaterThanPredicate) isPredicate() {} +func (LessThanPredicate) isPredicate() {} +func (FuncPredicate) isPredicate() {} + +// walkPredicate traverses a predicate in depth-first order: it starts by +// calling fn(p). If fn(p) returns true, walkPredicate is invoked recursively +// with fn for each of the non-nil children of p, followed by a call of +// fn(nil). +func walkPredicate(p Predicate, fn func(Predicate) bool) { + if p == nil || !fn(p) { + return + } + + switch p := p.(type) { + case AndPredicate: + walkPredicate(p.Left, fn) + walkPredicate(p.Right, fn) + + case OrPredicate: + walkPredicate(p.Left, fn) + walkPredicate(p.Right, fn) + + case NotPredicate: + walkPredicate(p.Inner, fn) + + case TruePredicate: // No children. + case FalsePredicate: // No children. + case EqualPredicate: // No children. + case InPredicate: // No children. + case GreaterThanPredicate: // No children. + case LessThanPredicate: // No children. + case FuncPredicate: // No children. + + default: + panic("streams.walkPredicate: unsupported predicate type") + } + + fn(nil) +} + +func WhereTimeRangeOverlapsWith( + colMinTimestamp *Column, + colMaxTimestamp *Column, + start scalar.Scalar, + end scalar.Scalar, +) Predicate { + return AndPredicate{ + Left: OrPredicate{ + Left: EqualPredicate{ + Column: colMaxTimestamp, + Value: start, + }, + Right: GreaterThanPredicate{ + Column: colMaxTimestamp, + Value: start, + }, + }, + Right: OrPredicate{ + Left: EqualPredicate{ + Column: colMinTimestamp, + Value: end, + }, + Right: LessThanPredicate{ + Column: colMinTimestamp, + Value: end, + }, + }, + } +} diff --git a/pkg/dataobj/sections/pointers/reader.go b/pkg/dataobj/sections/pointers/reader.go new file mode 100644 index 0000000000000..f5b313bb492dc --- /dev/null +++ b/pkg/dataobj/sections/pointers/reader.go @@ -0,0 +1,513 @@ +package pointers + +import ( + "context" + "errors" + "fmt" + _ "io" // Used for documenting io.EOF. + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/arrow/scalar" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/arrowconv" + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow" + "github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar" +) + +// ReaderOptions customizes the behavior of a [Reader]. +type ReaderOptions struct { + // Columns to read. Each column must belong to the same [Section]. + Columns []*Column + + // Predicates holds a set of predicates to apply when reading the section. + // Columns referenced in Predicates must be in the set of Columns. + Predicates []Predicate + + // Allocator to use for allocating Arrow records. If nil, + // [memory.DefaultAllocator] is used. + Allocator memory.Allocator +} + +// Validate returns an error if the opts is not valid. ReaderOptions are only +// valid when: +// +// - Each [Column] in Columns belongs to the same [Section]. +// - Each [Predicate] in Predicates references a [Column] from Columns. +// - Scalar values used in predicates are of a supported type: an int64, +// uint64, timestamp, or a byte array. +func (opts *ReaderOptions) Validate() error { + columnLookup := make(map[*Column]struct{}, len(opts.Columns)) + + if len(opts.Columns) > 0 { + // Ensure all columns belong to the same section. + var checkSection *Section + + for _, col := range opts.Columns { + if checkSection != nil && col.Section != checkSection { + return fmt.Errorf("all columns must belong to the same section: got=%p want=%p", col.Section, checkSection) + } else if checkSection == nil { + checkSection = col.Section + } + columnLookup[col] = struct{}{} + } + } + + var errs []error + + validateColumn := func(col *Column) { + if col == nil { + errs = append(errs, fmt.Errorf("column is nil")) + } else if _, found := columnLookup[col]; !found { + errs = append(errs, fmt.Errorf("column %p not in Columns", col)) + } + } + + validateScalar := func(s scalar.Scalar) { + _, ok := arrowconv.DatasetType(s.DataType()) + if !ok { + errs = append(errs, fmt.Errorf("unsupported scalar type %s", s.DataType())) + } + } + + for _, p := range opts.Predicates { + walkPredicate(p, func(p Predicate) bool { + // Validate that predicates reference valid columns and use valid + // scalars. + switch p := p.(type) { + case nil: // End of walk; nothing to do. + + case AndPredicate: // Nothing to do. + case OrPredicate: // Nothing to do. + case NotPredicate: // Nothing to do. + case TruePredicate: // Nothing to do. + case FalsePredicate: // Nothing to do. + + case EqualPredicate: + validateColumn(p.Column) + validateScalar(p.Value) + + case InPredicate: + validateColumn(p.Column) + for _, val := range p.Values { + validateScalar(val) + } + + case GreaterThanPredicate: + validateColumn(p.Column) + validateScalar(p.Value) + + case LessThanPredicate: + validateColumn(p.Column) + validateScalar(p.Value) + + case FuncPredicate: + validateColumn(p.Column) + + default: + errs = append(errs, fmt.Errorf("unrecognized predicate type %T", p)) + } + + return true + }) + } + + return errors.Join(errs...) +} + +// A Reader reads batches of rows from a [Section]. +type Reader struct { + opts ReaderOptions + schema *arrow.Schema // Set on [Reader.Reset]. + + ready bool + inner *dataset.Reader + buf []dataset.Row + + builder *array.RecordBuilder +} + +// NewReader creates a new Reader from the provided options. Options are not +// validated until the first call to [Reader.Read]. +func NewReader(opts ReaderOptions) *Reader { + var r Reader + r.Reset(opts) + return &r +} + +// Schema returns the [arrow.Schema] used by the Reader. Fields in the schema +// match the order of columns listed in [ReaderOptions]. +// +// Names of fields in the schema are guaranteed to be unique per column but are +// not guaranteed to be stable. +// +// The returned Schema must not be modified. +func (r *Reader) Schema() *arrow.Schema { return r.schema } + +// Read reads the batch of rows from the section, returning them as an Arrow +// record. +// +// If [ReaderOptions] has predicates, only rows that match the predicates are +// returned. If none of the next batchSize rows matched the predicate, Read +// returns a nil record with a nil error. +// +// Read will return an error if the next batch of rows could not be read due to +// invalid options or I/O errors. At the end of the section, Read returns nil, +// [io.EOF]. +// +// Read may return a non-nil record with a non-nil error, including if the +// error is [io.EOF]. Callers should always process the record before +// processing the error value. +// +// When a record is returned, it will match the schema specified by +// [Reader.Schema]. These records must always be released after use. +func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.RecordBatch, error) { + if !r.ready { + err := r.init() + if err != nil { + return nil, fmt.Errorf("initializing Reader: %w", err) + } + } + + r.buf = slicegrow.GrowToCap(r.buf, batchSize) + r.buf = r.buf[:batchSize] + + n, readErr := r.inner.Read(ctx, r.buf) + r.builder.Reserve(n) + for rowIndex := range n { + row := r.buf[rowIndex] + + for columnIndex, val := range row.Values { + columnBuilder := r.builder.Field(columnIndex) + + if val.IsNil() { + columnBuilder.AppendNull() + continue + } + + // Append non-null values. We switch on [ColumnType] here so it's easier + // to follow the mapping of ColumnType to Arrow type. The mappings here + // should align with both [columnToField] (for Arrow type) and + // [Builder.encodeTo] (for dataset type). + // + // Passing our byte slices to [array.StringBuilder.BinaryBuilder.Append] are safe; it + // will copy the contents of the value and we can reuse the buffer on the + // next call to [dataset.Reader.Read]. + columnType := r.opts.Columns[columnIndex].Type + switch columnType { + case ColumnTypeInvalid: + columnBuilder.AppendNull() // Unsupported column + case ColumnTypePath: + columnBuilder.(*array.StringBuilder).BinaryBuilder.Append(val.Binary()) + case ColumnTypeSection: + columnBuilder.(*array.Int64Builder).Append(val.Int64()) + case ColumnTypePointerKind: + columnBuilder.(*array.Int64Builder).Append(val.Int64()) + + case ColumnTypeStreamID: // Appends IDs as int64 + columnBuilder.(*array.Int64Builder).Append(val.Int64()) + case ColumnTypeStreamIDRef: + columnBuilder.(*array.Int64Builder).Append(val.Int64()) + case ColumnTypeMinTimestamp, ColumnTypeMaxTimestamp: // Values are nanosecond timestamps as int64 + columnBuilder.(*array.TimestampBuilder).Append(arrow.Timestamp(val.Int64())) + case ColumnTypeRowCount: + columnBuilder.(*array.Int64Builder).Append(val.Int64()) + case ColumnTypeUncompressedSize: // Appends uncompressed size as int64 + columnBuilder.(*array.Int64Builder).Append(val.Int64()) + + case ColumnTypeColumnName: + columnBuilder.(*array.StringBuilder).BinaryBuilder.Append(val.Binary()) + case ColumnTypeColumnIndex: + columnBuilder.(*array.Int64Builder).Append(val.Int64()) + case ColumnTypeValuesBloomFilter: + columnBuilder.(*array.BinaryBuilder).Append(val.Binary()) + + default: + // We'll only hit this if we added a new column type but forgot to + // support reading it. + return nil, fmt.Errorf("unsupported column type %s for column %d", columnType, columnIndex) + } + } + } + + // We only return readErr after processing n so that we properly handle n>0 + // while also getting an error such as io.EOF. + return r.builder.NewRecordBatch(), readErr +} + +func (r *Reader) init() error { + if err := r.opts.Validate(); err != nil { + return fmt.Errorf("invalid options: %w", err) + } else if r.opts.Allocator == nil { + r.opts.Allocator = memory.DefaultAllocator + } + + var innerSection *columnar.Section + innerColumns := make([]*columnar.Column, len(r.opts.Columns)) + for i, column := range r.opts.Columns { + if innerSection == nil { + innerSection = column.Section.inner + } + innerColumns[i] = column.inner + } + + dset, err := columnar.MakeDataset(innerSection, innerColumns) + if err != nil { + return fmt.Errorf("creating dataset: %w", err) + } else if len(dset.Columns()) != len(r.opts.Columns) { + return fmt.Errorf("dataset has %d columns, expected %d", len(dset.Columns()), len(r.opts.Columns)) + } + + columnLookup := make(map[*Column]dataset.Column, len(r.opts.Columns)) + for i, col := range dset.Columns() { + columnLookup[r.opts.Columns[i]] = col + } + + preds, err := mapPredicates(r.opts.Predicates, columnLookup) + if err != nil { + return fmt.Errorf("mapping predicates: %w", err) + } + + innerOptions := dataset.ReaderOptions{ + Dataset: dset, + Columns: dset.Columns(), + Predicates: preds, + Prefetch: true, + } + if r.inner == nil { + r.inner = dataset.NewReader(innerOptions) + } else { + r.inner.Reset(innerOptions) + } + + if r.builder == nil { + r.builder = array.NewRecordBuilder(r.opts.Allocator, r.schema) + } + + r.ready = true + return nil +} + +func mapPredicates(ps []Predicate, columnLookup map[*Column]dataset.Column) (predicates []dataset.Predicate, err error) { + // For simplicity, [mapPredicate] and the functions it calls panic if they + // encounter an unsupported conversion. + // + // These should normally be handled by [ReaderOptions.Validate], but we catch + // any panics here to gracefully return an error to the caller instead of + // potentially crashing the goroutine. + defer func() { + if r := recover(); r == nil { + return + } else if recoveredErr, ok := r.(error); ok { + err = recoveredErr + } else { + err = fmt.Errorf("error while mapping: %v", r) + } + }() + + for _, p := range ps { + predicates = append(predicates, mapPredicate(p, columnLookup)) + } + return +} + +func mapPredicate(p Predicate, columnLookup map[*Column]dataset.Column) dataset.Predicate { + switch p := p.(type) { + case AndPredicate: + return dataset.AndPredicate{ + Left: mapPredicate(p.Left, columnLookup), + Right: mapPredicate(p.Right, columnLookup), + } + + case OrPredicate: + return dataset.OrPredicate{ + Left: mapPredicate(p.Left, columnLookup), + Right: mapPredicate(p.Right, columnLookup), + } + + case NotPredicate: + return dataset.NotPredicate{ + Inner: mapPredicate(p.Inner, columnLookup), + } + + case TruePredicate: + return dataset.TruePredicate{} + + case FalsePredicate: + return dataset.FalsePredicate{} + + case EqualPredicate: + col, ok := columnLookup[p.Column] + if !ok { + panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) + } + return dataset.EqualPredicate{ + Column: col, + Value: arrowconv.FromScalar(p.Value, mustConvertType(p.Value.DataType())), + } + + case InPredicate: + col, ok := columnLookup[p.Column] + if !ok { + panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) + } + + vals := make([]dataset.Value, len(p.Values)) + for i := range p.Values { + vals[i] = arrowconv.FromScalar(p.Values[i], mustConvertType(p.Values[i].DataType())) + } + + var valueSet dataset.ValueSet + switch col.ColumnDesc().Type.Physical { + case datasetmd.PHYSICAL_TYPE_INT64: + valueSet = dataset.NewInt64ValueSet(vals) + case datasetmd.PHYSICAL_TYPE_UINT64: + valueSet = dataset.NewUint64ValueSet(vals) + case datasetmd.PHYSICAL_TYPE_BINARY: + valueSet = dataset.NewBinaryValueSet(vals) + default: + panic("InPredicate not implemented for datatype") + } + + return dataset.InPredicate{ + Column: col, + Values: valueSet, + } + + case GreaterThanPredicate: + col, ok := columnLookup[p.Column] + if !ok { + panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) + } + return dataset.GreaterThanPredicate{ + Column: col, + Value: arrowconv.FromScalar(p.Value, mustConvertType(p.Value.DataType())), + } + + case LessThanPredicate: + col, ok := columnLookup[p.Column] + if !ok { + panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) + } + return dataset.LessThanPredicate{ + Column: col, + Value: arrowconv.FromScalar(p.Value, mustConvertType(p.Value.DataType())), + } + + case FuncPredicate: + col, ok := columnLookup[p.Column] + if !ok { + panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) + } + + fieldType := columnToField(p.Column).Type + + return dataset.FuncPredicate{ + Column: col, + Keep: func(_ dataset.Column, value dataset.Value) bool { + return p.Keep(p.Column, arrowconv.ToScalar(value, fieldType)) + }, + } + + default: + panic(fmt.Sprintf("unsupported predicate type %T", p)) + } +} + +func mustConvertType(dtype arrow.DataType) datasetmd.PhysicalType { + toType, ok := arrowconv.DatasetType(dtype) + if !ok { + panic(fmt.Sprintf("unsupported dataset type %s", dtype)) + } + return toType +} + +// Reset discards any state and resets r with a new set of optiosn. This +// permits reusing a Reader rather than allocating a new one. +func (r *Reader) Reset(opts ReaderOptions) { + r.opts = opts + r.schema = columnsSchema(opts.Columns) + + r.ready = false + + if r.inner != nil { + // Close our inner reader so it releases resources immediately. It'll be + // fully reset on the next call to [Reader.init]. + _ = r.inner.Close() + } + if r.builder != nil { + r.builder = nil + } +} + +// Close closes the Reader and releases any resources it holds. Closed Readers +// can be reused by calling [Reader.Reset]. +func (r *Reader) Close() error { + if r.inner != nil { + return r.inner.Close() + } + if r.builder != nil { + r.builder = nil + } + return nil +} + +func columnsSchema(cols []*Column) *arrow.Schema { + fields := make([]arrow.Field, 0, len(cols)) + for _, col := range cols { + fields = append(fields, columnToField(col)) + } + return arrow.NewSchema(fields, nil) +} + +var columnDatatypes = map[ColumnType]arrow.DataType{ + ColumnTypeInvalid: arrow.Null, + ColumnTypePath: arrow.BinaryTypes.String, + ColumnTypeSection: arrow.PrimitiveTypes.Int64, + ColumnTypePointerKind: arrow.PrimitiveTypes.Int64, + + ColumnTypeStreamID: arrow.PrimitiveTypes.Int64, + ColumnTypeStreamIDRef: arrow.PrimitiveTypes.Int64, + ColumnTypeMinTimestamp: arrow.FixedWidthTypes.Timestamp_ns, + ColumnTypeMaxTimestamp: arrow.FixedWidthTypes.Timestamp_ns, + ColumnTypeRowCount: arrow.PrimitiveTypes.Int64, + ColumnTypeUncompressedSize: arrow.PrimitiveTypes.Int64, + + ColumnTypeColumnIndex: arrow.PrimitiveTypes.Int64, + ColumnTypeColumnName: arrow.BinaryTypes.String, + ColumnTypeValuesBloomFilter: arrow.BinaryTypes.Binary, +} + +func columnToField(col *Column) arrow.Field { + dtype, ok := columnDatatypes[col.Type] + if !ok { + dtype = arrow.Null + } + + return arrow.Field{ + Name: makeColumnName(col.Name, col.Type.String(), dtype), + Type: dtype, + Nullable: true, // All columns are nullable. + } +} + +// makeColumnName returns a unique name for a [Column] and its expected data +// type. +// +// Unique names are used by unit tests to be able to produce expected rows. +func makeColumnName(label string, name string, dty arrow.DataType) string { + switch { + case label == "" && name == "": + return dty.Name() + case label == "" && name != "": + return name + "." + dty.Name() + default: + if name == "" { + name = "" + } + return label + "." + name + "." + dty.Name() + } +} diff --git a/pkg/dataobj/sections/pointers/reader_bench_test.go b/pkg/dataobj/sections/pointers/reader_bench_test.go new file mode 100644 index 0000000000000..809c9fb2db54b --- /dev/null +++ b/pkg/dataobj/sections/pointers/reader_bench_test.go @@ -0,0 +1,239 @@ +package pointers_test + +import ( + "context" + "errors" + "io" + "slices" + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/scalar" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" +) + +const maxStreamID = 200 + +// buildBenchSection creates a section with many pointers for benchmarking +func buildBenchSection(b *testing.B, numPointers int) *pointers.Section { + b.Helper() + + sectionBuilder := pointers.NewBuilder(nil, 0, 2) + + // Create diverse set of pointers with different stream IDs + for i := 0; i < numPointers; i++ { + streamID := int64(i % maxStreamID) // Cycle through different stream IDs + path := "path/to/object" + section := int64(i % 10) + startTs := unixTime(int64(i * 100)) + endTs := unixTime(int64(i*100 + 50)) + + sectionBuilder.ObserveStream(path, section, streamID, streamID, startTs, 1024) + sectionBuilder.ObserveStream(path, section, streamID, streamID, endTs, 0) + } + + objectBuilder := dataobj.NewBuilder(nil) + require.NoError(b, objectBuilder.Append(sectionBuilder)) + + obj, closer, err := objectBuilder.Flush() + require.NoError(b, err) + b.Cleanup(func() { closer.Close() }) + + sec, err := pointers.Open(context.Background(), obj.Sections()[0]) + require.NoError(b, err) + return sec +} + +type readerBenchParams struct { + name string + numPointers int + numStreamIDs int +} + +func BenchmarkReaders(b *testing.B) { + benchmarks := []readerBenchParams{ + { + name: "1k pointers, 10 stream IDs", + numPointers: 1000, + numStreamIDs: 10, + }, + { + name: "1k pointers, 200 stream IDs", + numPointers: 1000, + numStreamIDs: 200, + }, + { + name: "10k pointers, 200 stream IDs", + numPointers: 10000, + numStreamIDs: 200, + }, + { + name: "100k pointers, 200 stream IDs", + numPointers: 100000, + numStreamIDs: 200, + }, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + b.Run("RowReader", func(b *testing.B) { + benchmarkRowReader(b, bm) + }) + b.Run("Reader", func(b *testing.B) { + benchmarkReader(b, bm) + }) + }) + } +} + +func benchmarkRowReader(b *testing.B, params readerBenchParams) { + ctx := context.Background() + sec := buildBenchSection(b, params.numPointers) + + // Prepare stream IDs to match + streamIDs := make([]int64, params.numStreamIDs) + for i := 0; i < params.numStreamIDs; i++ { + streamIDs[i] = int64(i) + } + + // Prepare predicate if needed + predicate := pointers.TimeRangeRowPredicate{ + Start: unixTime(0), + End: unixTime(100000), + } + + b.ResetTimer() + b.ReportAllocs() + + for range b.N { + reader := pointers.NewRowReader(sec) + err := reader.MatchStreams(slices.Values(streamIDs)) + require.NoError(b, err) + + err = reader.SetPredicate(predicate) + require.NoError(b, err) + + buf := make([]pointers.SectionPointer, 128) + totalRead := 0 + + for { + n, err := reader.Read(ctx, buf) + totalRead += n + + if err != nil { + if errors.Is(err, io.EOF) { + break + } + require.NoError(b, err) + } + } + + require.NoError(b, reader.Close()) + + // Ensure we actually read something + if totalRead == 0 { + b.Fatal("read 0 pointers") + } + } + + b.StopTimer() +} + +func benchmarkReader(b *testing.B, params readerBenchParams) { + ctx := context.Background() + sec := buildBenchSection(b, params.numPointers) + + // Get all columns for reading + columns := sec.Columns() + + // Prepare stream IDs to match + streamIDs := make([]scalar.Scalar, params.numStreamIDs) + for i := 0; i < params.numStreamIDs; i++ { + streamIDs[i] = scalar.NewInt64Scalar(int64(i)) + } + + var streamIDCol *pointers.Column + for _, col := range columns { + if col.Type == pointers.ColumnTypeStreamID { + streamIDCol = col + break + } + } + require.NotNil(b, streamIDCol) + + // Build predicates + var predicates []pointers.Predicate + + predicates = append(predicates, pointers.InPredicate{ + Column: streamIDCol, + Values: streamIDs, + }) + + var minTsCol, maxTsCol *pointers.Column + for _, col := range columns { + switch col.Type { + case pointers.ColumnTypeMinTimestamp: + minTsCol = col + case pointers.ColumnTypeMaxTimestamp: + maxTsCol = col + } + } + require.NotNil(b, minTsCol) + require.NotNil(b, maxTsCol) + + startScalar := scalar.NewTimestampScalar(arrow.Timestamp(0), arrow.FixedWidthTypes.Timestamp_ns) + endScalar := scalar.NewTimestampScalar(arrow.Timestamp(100000000000000), arrow.FixedWidthTypes.Timestamp_ns) + + predicates = append(predicates, pointers.AndPredicate{ + Left: pointers.GreaterThanPredicate{ + Column: maxTsCol, + Value: startScalar, + }, + Right: pointers.LessThanPredicate{ + Column: minTsCol, + Value: endScalar, + }, + }) + + b.ResetTimer() + b.ReportAllocs() + + opts := pointers.ReaderOptions{ + Columns: columns, + Predicates: predicates, + } + + reader := pointers.NewReader(opts) + + for range b.N { + reader.Reset(opts) + totalRows := int64(0) + + for { + rec, err := reader.Read(ctx, 128) + if rec != nil { + totalRows += rec.NumRows() + rec.Release() + } + + if err != nil { + if errors.Is(err, io.EOF) { + break + } + require.NoError(b, err) + } + } + + require.NoError(b, reader.Close()) + + // Ensure we actually read something + if totalRows == 0 { + b.Fatal("read 0 rows") + } + } + + b.StopTimer() +} diff --git a/pkg/dataobj/sections/pointers/reader_test.go b/pkg/dataobj/sections/pointers/reader_test.go new file mode 100644 index 0000000000000..ae6a9bafadcdf --- /dev/null +++ b/pkg/dataobj/sections/pointers/reader_test.go @@ -0,0 +1,692 @@ +package pointers_test + +import ( + "bytes" + "context" + "errors" + "io" + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/arrow/scalar" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" + "github.com/grafana/loki/v3/pkg/util/arrowtest" +) + +// TestReader does a basic end-to-end test over a reader with a predicate applied. +func TestReader(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072}, + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col1", ColumnIndex: 0, ValuesBloomFilter: []byte{1, 2, 3}}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col2", ColumnIndex: 1, ValuesBloomFilter: []byte{4, 5, 6}}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + pointerKindCol = sec.Columns()[2] + streamIDCol = sec.Columns()[3] + streamIDRefCol = sec.Columns()[4] + minTimestampCol = sec.Columns()[5] + maxTimestampCol = sec.Columns()[6] + rowCountCol = sec.Columns()[7] + uncompressedCol = sec.Columns()[8] + columnNameCol = sec.Columns()[9] + columnIndexCol = sec.Columns()[10] + valuesBloomCol = sec.Columns()[11] + ) + + require.Equal(t, "path", pathCol.Name) + require.Equal(t, pointers.ColumnTypePath, pathCol.Type) + require.Equal(t, "", sectionCol.Name) + require.Equal(t, pointers.ColumnTypeSection, sectionCol.Type) + require.Equal(t, "", pointerKindCol.Name) + require.Equal(t, pointers.ColumnTypePointerKind, pointerKindCol.Type) + require.Equal(t, "", streamIDCol.Name) + require.Equal(t, pointers.ColumnTypeStreamID, streamIDCol.Type) + require.Equal(t, "", streamIDRefCol.Name) + require.Equal(t, pointers.ColumnTypeStreamIDRef, streamIDRefCol.Type) + require.Equal(t, "", minTimestampCol.Name) + require.Equal(t, pointers.ColumnTypeMinTimestamp, minTimestampCol.Type) + require.Equal(t, "", maxTimestampCol.Name) + require.Equal(t, pointers.ColumnTypeMaxTimestamp, maxTimestampCol.Type) + require.Equal(t, "", rowCountCol.Name) + require.Equal(t, pointers.ColumnTypeRowCount, rowCountCol.Type) + require.Equal(t, "", uncompressedCol.Name) + require.Equal(t, pointers.ColumnTypeUncompressedSize, uncompressedCol.Type) + require.Equal(t, "column_name", columnNameCol.Name) + require.Equal(t, pointers.ColumnTypeColumnName, columnNameCol.Type) + require.Equal(t, "", columnIndexCol.Name) + require.Equal(t, pointers.ColumnTypeColumnIndex, columnIndexCol.Type) + require.Equal(t, "values_bloom_filter", valuesBloomCol.Name) + require.Equal(t, pointers.ColumnTypeValuesBloomFilter, valuesBloomCol.Type) + + for _, tt := range []struct { + name string + columns []*pointers.Column + expected arrowtest.Rows + }{ + { + name: "basic reads with predicate", + columns: []*pointers.Column{pathCol, sectionCol, pointerKindCol, streamIDCol, streamIDRefCol}, + expected: arrowtest.Rows{ + {"path.path.utf8": "path1", "section.int64": int64(1), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex), "stream_id.int64": int64(10), "stream_id_ref.int64": int64(100)}, + {"path.path.utf8": "path2", "section.int64": int64(2), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex), "stream_id.int64": int64(20), "stream_id_ref.int64": int64(200)}, + }, + }, + // tests that the reader evaluates predicates correctly even when only some columns are selected for output + { + name: "reads with subset of columns", + columns: []*pointers.Column{pathCol, sectionCol, pointerKindCol, streamIDCol}, + expected: arrowtest.Rows{ + {"path.path.utf8": "path1", "section.int64": int64(1), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex), "stream_id.int64": int64(10)}, + {"path.path.utf8": "path2", "section.int64": int64(2), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex), "stream_id.int64": int64(20)}, + }, + }, + // tests reading all columns + { + name: "read all columns for stream pointers", + columns: []*pointers.Column{ + pathCol, sectionCol, pointerKindCol, streamIDCol, streamIDRefCol, + minTimestampCol, maxTimestampCol, rowCountCol, uncompressedCol, + }, + expected: arrowtest.Rows{ + { + "path.path.utf8": "path1", + "section.int64": int64(1), + "pointer_kind.int64": int64(pointers.PointerKindStreamIndex), + "stream_id.int64": int64(10), + "stream_id_ref.int64": int64(100), + "min_timestamp.timestamp": unixTime(10).UTC(), + "max_timestamp.timestamp": unixTime(20).UTC(), + "row_count.int64": int64(2), + "uncompressed_size.int64": int64(1024), + }, + { + "path.path.utf8": "path2", + "section.int64": int64(2), + "pointer_kind.int64": int64(pointers.PointerKindStreamIndex), + "stream_id.int64": int64(20), + "stream_id_ref.int64": int64(200), + "min_timestamp.timestamp": unixTime(30).UTC(), + "max_timestamp.timestamp": unixTime(40).UTC(), + "row_count.int64": int64(2), + "uncompressed_size.int64": int64(2048), + }, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: tt.columns, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.InPredicate{ + Column: streamIDCol, + Values: []scalar.Scalar{ + scalar.NewInt64Scalar(10), + scalar.NewInt64Scalar(20), + }, + }, + pointers.EqualPredicate{ + Column: pointerKindCol, + Value: scalar.NewInt64Scalar(int64(pointers.PointerKindStreamIndex)), + }, + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err, "failed to get rows from table") + require.Equal(t, tt.expected, actual) + }) + } +} + +// TestReaderWithEqualPredicate tests reading with an EqualPredicate. +func TestReaderWithEqualPredicate(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + streamIDCol = sec.Columns()[3] + ) + + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{pathCol, sectionCol, streamIDCol}, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.EqualPredicate{ + Column: streamIDCol, + Value: scalar.NewInt64Scalar(20), + }, + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + {"path.path.utf8": "path2", "section.int64": int64(2), "stream_id.int64": int64(20)}, + } + require.Equal(t, expected, actual) +} + +// TestReaderWithInPredicate tests reading with an InPredicate. +func TestReaderWithInPredicate(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072}, + {Path: "path4", Section: 4, PointerKind: pointers.PointerKindStreamIndex, StreamID: 40, StreamIDRef: 400, StartTs: unixTime(70), EndTs: unixTime(80), LineCount: 20, UncompressedSize: 4096}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + streamIDCol = sec.Columns()[3] + ) + + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{pathCol, sectionCol, streamIDCol}, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.InPredicate{ + Column: streamIDCol, + Values: []scalar.Scalar{ + scalar.NewInt64Scalar(10), + scalar.NewInt64Scalar(30), + }, + }, + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + {"path.path.utf8": "path1", "section.int64": int64(1), "stream_id.int64": int64(10)}, + {"path.path.utf8": "path3", "section.int64": int64(3), "stream_id.int64": int64(30)}, + } + require.Equal(t, expected, actual) +} + +// TestReaderWithGreaterThanPredicate tests reading with a GreaterThanPredicate. +func TestReaderWithGreaterThanPredicate(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + streamIDCol = sec.Columns()[3] + ) + + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{pathCol, sectionCol, streamIDCol}, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.GreaterThanPredicate{ + Column: streamIDCol, + Value: scalar.NewInt64Scalar(15), + }, + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + {"path.path.utf8": "path2", "section.int64": int64(2), "stream_id.int64": int64(20)}, + {"path.path.utf8": "path3", "section.int64": int64(3), "stream_id.int64": int64(30)}, + } + require.Equal(t, expected, actual) +} + +// TestReaderWithLessThanPredicate tests reading with a LessThanPredicate. +func TestReaderWithLessThanPredicate(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + streamIDCol = sec.Columns()[3] + ) + + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{pathCol, sectionCol, streamIDCol}, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.LessThanPredicate{ + Column: streamIDCol, + Value: scalar.NewInt64Scalar(25), + }, + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + {"path.path.utf8": "path1", "section.int64": int64(1), "stream_id.int64": int64(10)}, + {"path.path.utf8": "path2", "section.int64": int64(2), "stream_id.int64": int64(20)}, + } + require.Equal(t, expected, actual) +} + +// TestReaderWithTimestampPredicates tests reading with timestamp predicates. +func TestReaderWithTimestampPredicates(t *testing.T) { + var ( + t10 = unixTime(10) + t20 = unixTime(20) + t25 = unixTime(25) + t25s = scalar.NewTimestampScalar(arrow.Timestamp(t25.UnixNano()), &arrow.TimestampType{Unit: arrow.Nanosecond}) + t30 = unixTime(30) + t40 = unixTime(40) + t50 = unixTime(50) + t55 = unixTime(55) + t55s = scalar.NewTimestampScalar(arrow.Timestamp(t55.UnixNano()), &arrow.TimestampType{Unit: arrow.Nanosecond}) + t60 = unixTime(60) + ) + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: t10, EndTs: t20, LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: t30, EndTs: t40, LineCount: 10, UncompressedSize: 2048}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: t50, EndTs: t60, LineCount: 15, UncompressedSize: 3072}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + minTimestampCol = sec.Columns()[5] + maxTimestampCol = sec.Columns()[6] + ) + + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{pathCol, sectionCol, minTimestampCol, maxTimestampCol}, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.WhereTimeRangeOverlapsWith(minTimestampCol, maxTimestampCol, t25s, t55s), + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + { + "path.path.utf8": "path2", + "section.int64": int64(2), + "min_timestamp.timestamp": t30.UTC(), + "max_timestamp.timestamp": t40.UTC(), + }, + { + "path.path.utf8": "path3", + "section.int64": int64(3), + "min_timestamp.timestamp": t50.UTC(), + "max_timestamp.timestamp": t60.UTC(), + }, + } + require.Equal(t, expected, actual) +} + +// TestReaderWithFuncPredicate tests reading with a FuncPredicate. +func TestReaderWithFuncPredicate(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + ) + + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{pathCol, sectionCol}, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.FuncPredicate{ + Column: pathCol, + Keep: func(_ *pointers.Column, value scalar.Scalar) bool { + if !value.IsValid() { + return false + } + + bb := value.(*scalar.String).Value.Bytes() + return bytes.Equal(bb, []byte("path1")) || bytes.Equal(bb, []byte("path3")) + }, + }, + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + {"path.path.utf8": "path1", "section.int64": int64(1)}, + {"path.path.utf8": "path3", "section.int64": int64(3)}, + } + require.Equal(t, expected, actual) +} + +// TestReaderWithAndPredicate tests reading with an AndPredicate. +func TestReaderWithAndPredicate(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + streamIDCol = sec.Columns()[3] + ) + + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{pathCol, sectionCol, streamIDCol}, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.AndPredicate{ + Left: pointers.GreaterThanPredicate{ + Column: streamIDCol, + Value: scalar.NewInt64Scalar(5), + }, + Right: pointers.LessThanPredicate{ + Column: streamIDCol, + Value: scalar.NewInt64Scalar(25), + }, + }, + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + {"path.path.utf8": "path1", "section.int64": int64(1), "stream_id.int64": int64(10)}, + {"path.path.utf8": "path2", "section.int64": int64(2), "stream_id.int64": int64(20)}, + } + require.Equal(t, expected, actual) +} + +// TestReaderWithOrPredicate tests reading with an OrPredicate. +func TestReaderWithOrPredicate(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + streamIDCol = sec.Columns()[3] + ) + + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{pathCol, sectionCol, streamIDCol}, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.OrPredicate{ + Left: pointers.EqualPredicate{ + Column: streamIDCol, + Value: scalar.NewInt64Scalar(10), + }, + Right: pointers.EqualPredicate{ + Column: streamIDCol, + Value: scalar.NewInt64Scalar(30), + }, + }, + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + {"path.path.utf8": "path1", "section.int64": int64(1), "stream_id.int64": int64(10)}, + {"path.path.utf8": "path3", "section.int64": int64(3), "stream_id.int64": int64(30)}, + } + require.Equal(t, expected, actual) +} + +// TestReaderWithNotPredicate tests reading with a NotPredicate. +func TestReaderWithNotPredicate(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 30, StreamIDRef: 300, StartTs: unixTime(50), EndTs: unixTime(60), LineCount: 15, UncompressedSize: 3072}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + streamIDCol = sec.Columns()[3] + ) + + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{pathCol, sectionCol, streamIDCol}, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.NotPredicate{ + Inner: pointers.EqualPredicate{ + Column: streamIDCol, + Value: scalar.NewInt64Scalar(20), + }, + }, + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + {"path.path.utf8": "path1", "section.int64": int64(1), "stream_id.int64": int64(10)}, + {"path.path.utf8": "path3", "section.int64": int64(3), "stream_id.int64": int64(30)}, + } + require.Equal(t, expected, actual) +} + +// TestReaderWithColumnIndexPointers tests reading column index pointers. +func TestReaderWithColumnIndexPointers(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col1", ColumnIndex: 0, ValuesBloomFilter: []byte{1, 2, 3}}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col2", ColumnIndex: 1, ValuesBloomFilter: []byte{4, 5, 6}}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col3", ColumnIndex: 2, ValuesBloomFilter: []byte{7, 8, 9}}, + }) + + cols := sec.Columns() + require.Len(t, cols, 6, "expected 6 columns for column index pointers") + + var ( + pathCol = cols[0] + sectionCol = cols[1] + pointerKindCol = cols[2] + columnNameCol = cols[3] + columnIndexCol = cols[4] + valuesBloomCol = cols[5] + ) + + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{ + pathCol, sectionCol, pointerKindCol, columnNameCol, columnIndexCol, valuesBloomCol, + }, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{ + pointers.EqualPredicate{ + Column: pointerKindCol, + Value: scalar.NewInt64Scalar(int64(pointers.PointerKindColumnIndex)), + }, + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + { + "path.path.utf8": "path1", + "section.int64": int64(1), + "pointer_kind.int64": int64(pointers.PointerKindColumnIndex), + "column_name.column_name.utf8": "col1", + "column_index.int64": int64(0), + "values_bloom_filter.values_bloom_filter.binary": []byte{1, 2, 3}, + }, + { + "path.path.utf8": "path2", + "section.int64": int64(2), + "pointer_kind.int64": int64(pointers.PointerKindColumnIndex), + "column_name.column_name.utf8": "col2", + "column_index.int64": int64(1), + "values_bloom_filter.values_bloom_filter.binary": []byte{4, 5, 6}, + }, + { + "path.path.utf8": "path3", + "section.int64": int64(3), + "pointer_kind.int64": int64(pointers.PointerKindColumnIndex), + "column_name.column_name.utf8": "col3", + "column_index.int64": int64(2), + "values_bloom_filter.values_bloom_filter.binary": []byte{7, 8, 9}, + }, + } + require.Equal(t, expected, actual) +} + +// TestReaderWithMixedPointers tests reading both stream and column index pointers. +func TestReaderWithMixedPointers(t *testing.T) { + sec := buildSection(t, []pointers.SectionPointer{ + {Path: "path1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 10, StreamIDRef: 100, StartTs: unixTime(10), EndTs: unixTime(20), LineCount: 5, UncompressedSize: 1024}, + {Path: "path2", Section: 2, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "col1", ColumnIndex: 0, ValuesBloomFilter: []byte{1, 2, 3}}, + {Path: "path3", Section: 3, PointerKind: pointers.PointerKindStreamIndex, StreamID: 20, StreamIDRef: 200, StartTs: unixTime(30), EndTs: unixTime(40), LineCount: 10, UncompressedSize: 2048}, + }) + + var ( + pathCol = sec.Columns()[0] + sectionCol = sec.Columns()[1] + pointerKindCol = sec.Columns()[2] + ) + + // Read all pointer kinds without filtering + r := pointers.NewReader(pointers.ReaderOptions{ + Columns: []*pointers.Column{pathCol, sectionCol, pointerKindCol}, + Allocator: memory.DefaultAllocator, + Predicates: []pointers.Predicate{}, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + {"path.path.utf8": "path1", "section.int64": int64(1), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex)}, + {"path.path.utf8": "path3", "section.int64": int64(3), "pointer_kind.int64": int64(pointers.PointerKindStreamIndex)}, + {"path.path.utf8": "path2", "section.int64": int64(2), "pointer_kind.int64": int64(pointers.PointerKindColumnIndex)}, + } + require.Equal(t, expected, actual) +} + +func buildSection(t *testing.T, ptrData []pointers.SectionPointer) *pointers.Section { + t.Helper() + + sectionBuilder := pointers.NewBuilder(nil, 0, 2) + + for _, ptr := range ptrData { + if ptr.PointerKind == pointers.PointerKindStreamIndex { + sectionBuilder.ObserveStream(ptr.Path, ptr.Section, ptr.StreamIDRef, ptr.StreamID, ptr.StartTs, ptr.UncompressedSize) + sectionBuilder.ObserveStream(ptr.Path, ptr.Section, ptr.StreamIDRef, ptr.StreamID, ptr.EndTs, 0) + } else { + sectionBuilder.RecordColumnIndex(ptr.Path, ptr.Section, ptr.ColumnName, ptr.ColumnIndex, ptr.ValuesBloomFilter) + } + } + + objectBuilder := dataobj.NewBuilder(nil) + require.NoError(t, objectBuilder.Append(sectionBuilder)) + + obj, closer, err := objectBuilder.Flush() + require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) + + sec, err := pointers.Open(t.Context(), obj.Sections()[0]) + require.NoError(t, err) + return sec +} + +func readTable(ctx context.Context, r *pointers.Reader) (arrow.Table, error) { + var recs []arrow.RecordBatch + + for { + rec, err := r.Read(ctx, 128) + if rec != nil { + if rec.NumRows() > 0 { + recs = append(recs, rec) + } + } + + if err != nil && errors.Is(err, io.EOF) { + break + } else if err != nil { + return nil, err + } + } + + if len(recs) == 0 { + return nil, io.EOF + } + + return array.NewTableFromRecords(recs[0].Schema(), recs), nil +} diff --git a/pkg/dataobj/sections/pointers/row_reader_test.go b/pkg/dataobj/sections/pointers/row_reader_test.go index 93f2961a0d172..a0afabc09daf0 100644 --- a/pkg/dataobj/sections/pointers/row_reader_test.go +++ b/pkg/dataobj/sections/pointers/row_reader_test.go @@ -1,4 +1,4 @@ -package pointers +package pointers_test import ( "context" @@ -9,67 +9,69 @@ import ( "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" + "github.com/grafana/loki/v3/pkg/dataobj" ) -var pointerTestData = []SectionPointer{ - {Path: "testPath1", Section: 0, PointerKind: PointerKindStreamIndex, StreamID: 1, StreamIDRef: 3, StartTs: unixTime(10), EndTs: unixTime(15), LineCount: 2, UncompressedSize: 2}, - {Path: "testPath2", Section: 0, PointerKind: PointerKindStreamIndex, StreamID: 1, StreamIDRef: 5, StartTs: unixTime(13), EndTs: unixTime(18), LineCount: 2, UncompressedSize: 3}, - {Path: "testPath1", Section: 1, PointerKind: PointerKindStreamIndex, StreamID: 2, StreamIDRef: 4, StartTs: unixTime(12), EndTs: unixTime(17), LineCount: 2, UncompressedSize: 4}, - {Path: "testPath2", Section: 1, PointerKind: PointerKindColumnIndex, ColumnName: "testColumn", ColumnIndex: 1, ValuesBloomFilter: []byte{1, 2, 3}}, - {Path: "testPath2", Section: 2, PointerKind: PointerKindColumnIndex, ColumnName: "testColumn2", ColumnIndex: 2, ValuesBloomFilter: []byte{1, 2, 3, 4}}, +var pointerTestData = []pointers.SectionPointer{ + {Path: "testPath1", Section: 0, PointerKind: pointers.PointerKindStreamIndex, StreamID: 1, StreamIDRef: 3, StartTs: unixTime(10), EndTs: unixTime(15), LineCount: 2, UncompressedSize: 2}, + {Path: "testPath2", Section: 0, PointerKind: pointers.PointerKindStreamIndex, StreamID: 1, StreamIDRef: 5, StartTs: unixTime(13), EndTs: unixTime(18), LineCount: 2, UncompressedSize: 3}, + {Path: "testPath1", Section: 1, PointerKind: pointers.PointerKindStreamIndex, StreamID: 2, StreamIDRef: 4, StartTs: unixTime(12), EndTs: unixTime(17), LineCount: 2, UncompressedSize: 4}, + {Path: "testPath2", Section: 1, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "testColumn", ColumnIndex: 1, ValuesBloomFilter: []byte{1, 2, 3}}, + {Path: "testPath2", Section: 2, PointerKind: pointers.PointerKindColumnIndex, ColumnName: "testColumn2", ColumnIndex: 2, ValuesBloomFilter: []byte{1, 2, 3, 4}}, } func TestRowReader(t *testing.T) { dec := buildPointersDecoder(t, 0, 2) // 3 pages - r := NewRowReader(dec) + r := pointers.NewRowReader(dec) actual, err := readAllPointers(context.Background(), r) require.NoError(t, err) require.Equal(t, pointerTestData, actual) } func TestRowReaderTimeRange(t *testing.T) { - var streamTestData []SectionPointer + var streamTestData []pointers.SectionPointer for _, d := range pointerTestData { - if d.PointerKind == PointerKindStreamIndex { + if d.PointerKind == pointers.PointerKindStreamIndex { streamTestData = append(streamTestData, d) } } tests := []struct { name string - predicate TimeRangeRowPredicate - want []SectionPointer + predicate pointers.TimeRangeRowPredicate + want []pointers.SectionPointer }{ { name: "no match", - predicate: TimeRangeRowPredicate{Start: unixTime(100), End: unixTime(200)}, + predicate: pointers.TimeRangeRowPredicate{Start: unixTime(100), End: unixTime(200)}, want: nil, }, { name: "all match", - predicate: TimeRangeRowPredicate{Start: unixTime(0), End: unixTime(20)}, + predicate: pointers.TimeRangeRowPredicate{Start: unixTime(0), End: unixTime(20)}, want: streamTestData, }, { name: "partial match", - predicate: TimeRangeRowPredicate{Start: unixTime(16), End: unixTime(18)}, - want: []SectionPointer{ + predicate: pointers.TimeRangeRowPredicate{Start: unixTime(16), End: unixTime(18)}, + want: []pointers.SectionPointer{ streamTestData[1], streamTestData[2], }, }, { name: "end predicate equal start of stream", - predicate: TimeRangeRowPredicate{Start: unixTime(0), End: unixTime(10)}, - want: []SectionPointer{ + predicate: pointers.TimeRangeRowPredicate{Start: unixTime(0), End: unixTime(10)}, + want: []pointers.SectionPointer{ streamTestData[0], }, }, { name: "start predicate equal end of stream", - predicate: TimeRangeRowPredicate{Start: unixTime(18), End: unixTime(100)}, - want: []SectionPointer{ + predicate: pointers.TimeRangeRowPredicate{Start: unixTime(18), End: unixTime(100)}, + want: []pointers.SectionPointer{ streamTestData[1], }, }, @@ -78,7 +80,7 @@ func TestRowReaderTimeRange(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { dec := buildPointersDecoder(t, 0, 2) - r := NewRowReader(dec) + r := pointers.NewRowReader(dec) err := r.SetPredicate(tt.predicate) require.NoError(t, err) actual, err := readAllPointers(context.Background(), r) @@ -90,12 +92,12 @@ func TestRowReaderTimeRange(t *testing.T) { func unixTime(sec int64) time.Time { return time.Unix(sec, 0) } -func buildPointersDecoder(t *testing.T, pageSize, pageRows int) *Section { +func buildPointersDecoder(t *testing.T, pageSize, pageRows int) *pointers.Section { t.Helper() - s := NewBuilder(nil, pageSize, pageRows) + s := pointers.NewBuilder(nil, pageSize, pageRows) for _, d := range pointerTestData { - if d.PointerKind == PointerKindStreamIndex { + if d.PointerKind == pointers.PointerKindStreamIndex { s.ObserveStream(d.Path, d.Section, d.StreamIDRef, d.StreamID, d.StartTs, d.UncompressedSize) s.ObserveStream(d.Path, d.Section, d.StreamIDRef, d.StreamID, d.EndTs, 0) } else { @@ -110,15 +112,15 @@ func buildPointersDecoder(t *testing.T, pageSize, pageRows int) *Section { require.NoError(t, err) t.Cleanup(func() { closer.Close() }) - sec, err := Open(t.Context(), obj.Sections()[0]) + sec, err := pointers.Open(t.Context(), obj.Sections()[0]) require.NoError(t, err) return sec } -func readAllPointers(ctx context.Context, r *RowReader) ([]SectionPointer, error) { +func readAllPointers(ctx context.Context, r *pointers.RowReader) ([]pointers.SectionPointer, error) { var ( - res []SectionPointer - buf = make([]SectionPointer, 128) + res []pointers.SectionPointer + buf = make([]pointers.SectionPointer, 128) ) for {