Skip to content

Commit 931338c

Browse files
committed
chore(metastore): stream pointers columnar reader
Use columnar reader to read the stream pointers
1 parent 36093ec commit 931338c

File tree

5 files changed

+646
-13
lines changed

5 files changed

+646
-13
lines changed

pkg/dataobj/metastore/object.go

Lines changed: 249 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414
"sync"
1515
"time"
1616

17+
"github.com/apache/arrow-go/v18/arrow"
18+
"github.com/apache/arrow-go/v18/arrow/array"
19+
"github.com/apache/arrow-go/v18/arrow/scalar"
1720
"github.com/go-kit/log"
1821
"github.com/go-kit/log/level"
1922
"github.com/grafana/dskit/tenant"
@@ -206,11 +209,7 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma
206209

207210
// Search the stream sections of the matching objects to find matching streams
208211
streamMatchers := streamPredicateFromMatchers(start, end, matchers...)
209-
pointerPredicate := pointers.TimeRangeRowPredicate{
210-
Start: start,
211-
End: end,
212-
}
213-
streamSectionPointers, err := m.getSectionsForStreams(ctx, indexObjects, streamMatchers, pointerPredicate)
212+
streamSectionPointers, err := m.getSectionsForStreams(ctx, indexObjects, streamMatchers, start, end)
214213
if err != nil {
215214
return nil, err
216215
}
@@ -504,12 +503,15 @@ func (m *ObjectMetastore) listStreamIDsFromLogObjects(ctx context.Context, objec
504503

505504
// getSectionsForStreams reads the section data from matching streams and aggregates them into section descriptors.
506505
// This is an exact lookup and includes metadata from the streams in each section: the stream IDs, the min-max timestamps, the number of bytes & number of lines.
507-
func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, timeRangePredicate pointers.TimeRangeRowPredicate) ([]*DataobjSectionDescriptor, error) {
506+
func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, start, end time.Time) ([]*DataobjSectionDescriptor, error) {
508507
if streamPredicate == nil {
509508
// At least one stream matcher is required, currently.
510509
return nil, nil
511510
}
512511

512+
sStart := scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
513+
sEnd := scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
514+
513515
timer := prometheus.NewTimer(m.metrics.streamFilterTotalDuration)
514516
defer timer.ObserveDuration()
515517

@@ -540,7 +542,8 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject
540542

541543
objectSectionDescriptors := make(map[SectionKey]*DataobjSectionDescriptor)
542544
sectionPointerReadTimer := prometheus.NewTimer(m.metrics.streamFilterPointersReadDuration)
543-
err = forEachObjPointer(ctx, indexObject, timeRangePredicate, matchingStreamIDs, func(pointer pointers.SectionPointer) {
545+
546+
err = forEachStreamSectionPointer(ctx, indexObject, sStart, sEnd, matchingStreamIDs, func(pointer pointers.SectionPointer) {
544547
key.ObjectPath = pointer.Path
545548
key.SectionIdx = pointer.Section
546549

@@ -551,6 +554,7 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObject
551554
}
552555
sectionDescriptor.Merge(pointer)
553556
})
557+
554558
if err != nil {
555559
return fmt.Errorf("reading section pointers from index: %w", err)
556560
}
@@ -764,6 +768,228 @@ func forEachStream(ctx context.Context, object *dataobj.Object, predicate stream
764768
return nil
765769
}
766770

771+
func forEachStreamSectionPointer(ctx context.Context, indexObj *dataobj.Object, sStart, sEnd *scalar.Timestamp, streamIDs []int64, f func(pointers.SectionPointer)) error {
772+
targetTenant, err := user.ExtractOrgID(ctx)
773+
if err != nil {
774+
return fmt.Errorf("extracting org ID: %w", err)
775+
}
776+
var reader pointers.Reader
777+
defer reader.Close()
778+
779+
var sStreamIDs []scalar.Scalar
780+
for _, streamID := range streamIDs {
781+
sStreamIDs = append(sStreamIDs, scalar.NewInt64Scalar(streamID))
782+
}
783+
784+
const batchSize = 128
785+
786+
buf := make([]sectionPointerBuilder, batchSize)
787+
788+
for _, section := range indexObj.Sections().Filter(pointers.CheckSection) {
789+
if section.Tenant != targetTenant {
790+
continue
791+
}
792+
793+
sec, err := pointers.Open(ctx, section)
794+
if err != nil {
795+
return fmt.Errorf("opening section: %w", err)
796+
}
797+
798+
pointerCols, err := findPointersColumnsByTypes(
799+
sec.Columns(),
800+
pointers.ColumnTypePath,
801+
pointers.ColumnTypeSection,
802+
pointers.ColumnTypeStreamID,
803+
pointers.ColumnTypeStreamIDRef,
804+
pointers.ColumnTypeMinTimestamp,
805+
pointers.ColumnTypeMaxTimestamp,
806+
pointers.ColumnTypeRowCount,
807+
pointers.ColumnTypeUncompressedSize,
808+
)
809+
if err != nil {
810+
return fmt.Errorf("finding pointers columns: %w", err)
811+
}
812+
813+
var (
814+
colStreamID *pointers.Column
815+
colMinTimestamp *pointers.Column
816+
colMaxTimestamp *pointers.Column
817+
)
818+
819+
for _, c := range pointerCols {
820+
if c.Type == pointers.ColumnTypeStreamID {
821+
colStreamID = c
822+
}
823+
if c.Type == pointers.ColumnTypeMinTimestamp {
824+
colMinTimestamp = c
825+
}
826+
if c.Type == pointers.ColumnTypeMaxTimestamp {
827+
colMaxTimestamp = c
828+
}
829+
if colStreamID != nil && colMinTimestamp != nil && colMaxTimestamp != nil {
830+
break
831+
}
832+
}
833+
834+
if colStreamID == nil || colMinTimestamp == nil || colMaxTimestamp == nil {
835+
return fmt.Errorf(
836+
"one of mandatory columns is missing: (streamID=%t, minTimestamp=%t, maxTimestamp=%t)",
837+
colStreamID == nil, colMinTimestamp == nil, colMaxTimestamp == nil,
838+
)
839+
}
840+
841+
reader.Reset(pointers.ReaderOptions{
842+
Columns: pointerCols,
843+
Predicates: []pointers.Predicate{
844+
pointers.AndPredicate{
845+
Left: pointers.OrPredicate{
846+
Left: pointers.EqualPredicate{
847+
Column: colMaxTimestamp,
848+
Value: sStart,
849+
},
850+
Right: pointers.GreaterThanPredicate{
851+
Column: colMaxTimestamp,
852+
Value: sStart,
853+
},
854+
},
855+
Right: pointers.OrPredicate{
856+
Left: pointers.EqualPredicate{
857+
Column: colMinTimestamp,
858+
Value: sEnd,
859+
},
860+
Right: pointers.LessThanPredicate{
861+
Column: colMinTimestamp,
862+
Value: sEnd,
863+
},
864+
},
865+
},
866+
pointers.InPredicate{
867+
Column: colStreamID,
868+
Values: sStreamIDs,
869+
},
870+
},
871+
})
872+
873+
for {
874+
rec, err := reader.Read(ctx, batchSize)
875+
if err != nil && !errors.Is(err, io.EOF) {
876+
return fmt.Errorf("reading recordBatch: %w", err)
877+
}
878+
numRows := int(rec.NumRows())
879+
if numRows == 0 && errors.Is(err, io.EOF) {
880+
break
881+
}
882+
883+
for colIdx := range int(rec.NumCols()) {
884+
col := rec.Column(colIdx)
885+
pointerCol := pointerCols[colIdx]
886+
887+
switch pointerCol.Type {
888+
case pointers.ColumnTypePath:
889+
for rIdx := range numRows {
890+
if col.IsNull(rIdx) {
891+
continue
892+
}
893+
buf[rIdx].path = col.(*array.String).Value(rIdx)
894+
}
895+
case pointers.ColumnTypeSection:
896+
for rIdx := range numRows {
897+
if col.IsNull(rIdx) {
898+
continue
899+
}
900+
buf[rIdx].section = col.(*array.Int64).Value(rIdx)
901+
}
902+
case pointers.ColumnTypeStreamID:
903+
for rIdx := range numRows {
904+
if col.IsNull(rIdx) {
905+
continue
906+
}
907+
buf[rIdx].streamID = col.(*array.Int64).Value(rIdx)
908+
}
909+
case pointers.ColumnTypeStreamIDRef:
910+
for rIdx := range numRows {
911+
if col.IsNull(rIdx) {
912+
continue
913+
}
914+
buf[rIdx].streamIDRef = col.(*array.Int64).Value(rIdx)
915+
}
916+
case pointers.ColumnTypeMinTimestamp:
917+
for rIdx := range numRows {
918+
if col.IsNull(rIdx) {
919+
continue
920+
}
921+
buf[rIdx].start = time.Unix(0, int64(col.(*array.Timestamp).Value(rIdx)))
922+
}
923+
case pointers.ColumnTypeMaxTimestamp:
924+
for rIdx := range numRows {
925+
if col.IsNull(rIdx) {
926+
continue
927+
}
928+
buf[rIdx].end = time.Unix(0, int64(col.(*array.Timestamp).Value(rIdx)))
929+
}
930+
case pointers.ColumnTypeRowCount:
931+
for rIdx := range numRows {
932+
if col.IsNull(rIdx) {
933+
continue
934+
}
935+
buf[rIdx].lineCount = col.(*array.Int64).Value(rIdx)
936+
}
937+
case pointers.ColumnTypeUncompressedSize:
938+
for rIdx := range numRows {
939+
if col.IsNull(rIdx) {
940+
continue
941+
}
942+
buf[rIdx].uncompressedSize = col.(*array.Int64).Value(rIdx)
943+
}
944+
default:
945+
continue
946+
}
947+
}
948+
949+
for rowIdx := range numRows {
950+
b := buf[rowIdx]
951+
f(pointers.SectionPointer{
952+
Path: b.path,
953+
Section: b.section,
954+
PointerKind: pointers.PointerKindStreamIndex,
955+
StreamID: b.streamID,
956+
StreamIDRef: b.streamIDRef,
957+
StartTs: b.start,
958+
EndTs: b.end,
959+
LineCount: b.lineCount,
960+
UncompressedSize: b.uncompressedSize,
961+
})
962+
}
963+
}
964+
}
965+
966+
return nil
967+
}
968+
969+
type sectionPointerBuilder struct {
970+
path string
971+
section int64
972+
streamID int64
973+
streamIDRef int64
974+
start time.Time
975+
end time.Time
976+
lineCount int64
977+
uncompressedSize int64
978+
}
979+
980+
func (b sectionPointerBuilder) build() sectionPointerBuilder {
981+
return sectionPointerBuilder{
982+
path: b.path,
983+
section: b.section,
984+
streamID: b.streamID,
985+
streamIDRef: b.streamIDRef,
986+
start: b.start,
987+
end: b.end,
988+
lineCount: b.lineCount,
989+
uncompressedSize: b.uncompressedSize,
990+
}
991+
}
992+
767993
func forEachObjPointer(ctx context.Context, object *dataobj.Object, predicate pointers.RowPredicate, matchIDs []int64, f func(pointers.SectionPointer)) error {
768994
targetTenant, err := user.ExtractOrgID(ctx)
769995
if err != nil {
@@ -827,3 +1053,19 @@ func dedupeAndSort(objects [][]string) []string {
8271053
sort.Strings(paths)
8281054
return paths
8291055
}
1056+
1057+
func findPointersColumnsByTypes(allColumns []*pointers.Column, columnTypes ...pointers.ColumnType) ([]*pointers.Column, error) {
1058+
result := make([]*pointers.Column, 0, len(columnTypes))
1059+
1060+
for _, c := range allColumns {
1061+
for _, neededType := range columnTypes {
1062+
if neededType != c.Type {
1063+
continue
1064+
}
1065+
1066+
result = append(result, c)
1067+
}
1068+
}
1069+
1070+
return result, nil
1071+
}

0 commit comments

Comments
 (0)