Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 249 additions & 7 deletions pkg/dataobj/metastore/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"sync"
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
Expand Down Expand Up @@ -206,11 +209,7 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma

// Search the stream sections of the matching objects to find matching streams
streamMatchers := streamPredicateFromMatchers(start, end, matchers...)
pointerPredicate := pointers.TimeRangeRowPredicate{
Start: start,
End: end,
}
streamSectionPointers, err := m.getSectionsForStreams(ctx, indexObjects, streamMatchers, pointerPredicate)
streamSectionPointers, err := m.getSectionsForStreams(ctx, indexObjects, streamMatchers, start, end)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -504,12 +503,15 @@ func (m *ObjectMetastore) listStreamIDsFromLogObjects(ctx context.Context, objec

// getSectionsForStreams reads the section data from matching streams and aggregates them into section descriptors.
// This is an exact lookup and includes metadata from the streams in each section: the stream IDs, the min-max timestamps, the number of bytes & number of lines.
func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, timeRangePredicate pointers.TimeRangeRowPredicate) ([]*DataobjSectionDescriptor, error) {
func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, start, end time.Time) ([]*DataobjSectionDescriptor, error) {
if streamPredicate == nil {
// At least one stream matcher is required, currently.
return nil, nil
}

sStart := scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
sEnd := scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)

timer := prometheus.NewTimer(m.metrics.streamFilterTotalDuration)
defer timer.ObserveDuration()

Expand Down Expand Up @@ -540,7 +542,8 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject

objectSectionDescriptors := make(map[SectionKey]*DataobjSectionDescriptor)
sectionPointerReadTimer := prometheus.NewTimer(m.metrics.streamFilterPointersReadDuration)
err = forEachObjPointer(ctx, indexObject, timeRangePredicate, matchingStreamIDs, func(pointer pointers.SectionPointer) {

err = forEachStreamSectionPointer(ctx, indexObject, sStart, sEnd, matchingStreamIDs, func(pointer pointers.SectionPointer) {
key.ObjectPath = pointer.Path
key.SectionIdx = pointer.Section

Expand All @@ -551,6 +554,7 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject
}
sectionDescriptor.Merge(pointer)
})

if err != nil {
return fmt.Errorf("reading section pointers from index: %w", err)
}
Expand Down Expand Up @@ -764,6 +768,228 @@ func forEachStream(ctx context.Context, object *dataobj.Object, predicate stream
return nil
}

func forEachStreamSectionPointer(ctx context.Context, indexObj *dataobj.Object, sStart, sEnd *scalar.Timestamp, streamIDs []int64, f func(pointers.SectionPointer)) error {
targetTenant, err := user.ExtractOrgID(ctx)
if err != nil {
return fmt.Errorf("extracting org ID: %w", err)
}
var reader pointers.Reader
defer reader.Close()

var sStreamIDs []scalar.Scalar
for _, streamID := range streamIDs {
sStreamIDs = append(sStreamIDs, scalar.NewInt64Scalar(streamID))
}

const batchSize = 128

buf := make([]sectionPointerBuilder, batchSize)

for _, section := range indexObj.Sections().Filter(pointers.CheckSection) {
if section.Tenant != targetTenant {
continue
}

sec, err := pointers.Open(ctx, section)
if err != nil {
return fmt.Errorf("opening section: %w", err)
}

pointerCols, err := findPointersColumnsByTypes(
sec.Columns(),
pointers.ColumnTypePath,
pointers.ColumnTypeSection,
pointers.ColumnTypeStreamID,
pointers.ColumnTypeStreamIDRef,
pointers.ColumnTypeMinTimestamp,
pointers.ColumnTypeMaxTimestamp,
pointers.ColumnTypeRowCount,
pointers.ColumnTypeUncompressedSize,
)
if err != nil {
return fmt.Errorf("finding pointers columns: %w", err)
}

var (
colStreamID *pointers.Column
colMinTimestamp *pointers.Column
colMaxTimestamp *pointers.Column
)

for _, c := range pointerCols {
if c.Type == pointers.ColumnTypeStreamID {
colStreamID = c
}
if c.Type == pointers.ColumnTypeMinTimestamp {
colMinTimestamp = c
}
if c.Type == pointers.ColumnTypeMaxTimestamp {
colMaxTimestamp = c
}
if colStreamID != nil && colMinTimestamp != nil && colMaxTimestamp != nil {
break
}
}

if colStreamID == nil || colMinTimestamp == nil || colMaxTimestamp == nil {
return fmt.Errorf(
"one of mandatory columns is missing: (streamID=%t, minTimestamp=%t, maxTimestamp=%t)",
colStreamID == nil, colMinTimestamp == nil, colMaxTimestamp == nil,
)
}

reader.Reset(pointers.ReaderOptions{
Columns: pointerCols,
Predicates: []pointers.Predicate{
pointers.AndPredicate{
Left: pointers.OrPredicate{
Left: pointers.EqualPredicate{
Column: colMaxTimestamp,
Value: sStart,
},
Right: pointers.GreaterThanPredicate{
Column: colMaxTimestamp,
Value: sStart,
},
},
Right: pointers.OrPredicate{
Left: pointers.EqualPredicate{
Column: colMinTimestamp,
Value: sEnd,
},
Right: pointers.LessThanPredicate{
Column: colMinTimestamp,
Value: sEnd,
},
},
},
pointers.InPredicate{
Column: colStreamID,
Values: sStreamIDs,
},
},
})

for {
rec, err := reader.Read(ctx, batchSize)
if err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("reading recordBatch: %w", err)
}
numRows := int(rec.NumRows())
if numRows == 0 && errors.Is(err, io.EOF) {
break
}

for colIdx := range int(rec.NumCols()) {
col := rec.Column(colIdx)
pointerCol := pointerCols[colIdx]

switch pointerCol.Type {
case pointers.ColumnTypePath:
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].path = col.(*array.String).Value(rIdx)
}
case pointers.ColumnTypeSection:
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].section = col.(*array.Int64).Value(rIdx)
}
case pointers.ColumnTypeStreamID:
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].streamID = col.(*array.Int64).Value(rIdx)
}
case pointers.ColumnTypeStreamIDRef:
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].streamIDRef = col.(*array.Int64).Value(rIdx)
}
case pointers.ColumnTypeMinTimestamp:
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].start = time.Unix(0, int64(col.(*array.Timestamp).Value(rIdx)))
}
case pointers.ColumnTypeMaxTimestamp:
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].end = time.Unix(0, int64(col.(*array.Timestamp).Value(rIdx)))
}
case pointers.ColumnTypeRowCount:
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].lineCount = col.(*array.Int64).Value(rIdx)
}
case pointers.ColumnTypeUncompressedSize:
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].uncompressedSize = col.(*array.Int64).Value(rIdx)
}
default:
continue
}
}

for rowIdx := range numRows {
b := buf[rowIdx]
f(pointers.SectionPointer{
Path: b.path,
Section: b.section,
PointerKind: pointers.PointerKindStreamIndex,
StreamID: b.streamID,
StreamIDRef: b.streamIDRef,
StartTs: b.start,
EndTs: b.end,
LineCount: b.lineCount,
UncompressedSize: b.uncompressedSize,
})
}
}
}

return nil
}

type sectionPointerBuilder struct {
path string
section int64
streamID int64
streamIDRef int64
start time.Time
end time.Time
lineCount int64
uncompressedSize int64
}

func (b sectionPointerBuilder) build() sectionPointerBuilder {
return sectionPointerBuilder{
path: b.path,
section: b.section,
streamID: b.streamID,
streamIDRef: b.streamIDRef,
start: b.start,
end: b.end,
lineCount: b.lineCount,
uncompressedSize: b.uncompressedSize,
}
}

func forEachObjPointer(ctx context.Context, object *dataobj.Object, predicate pointers.RowPredicate, matchIDs []int64, f func(pointers.SectionPointer)) error {
targetTenant, err := user.ExtractOrgID(ctx)
if err != nil {
Expand Down Expand Up @@ -827,3 +1053,19 @@ func dedupeAndSort(objects [][]string) []string {
sort.Strings(paths)
return paths
}

func findPointersColumnsByTypes(allColumns []*pointers.Column, columnTypes ...pointers.ColumnType) ([]*pointers.Column, error) {
result := make([]*pointers.Column, 0, len(columnTypes))

for _, c := range allColumns {
for _, neededType := range columnTypes {
if neededType != c.Type {
continue
}

result = append(result, c)
}
}

return result, nil
}
Loading
Loading