From 4d5ca6e68a541b0e824ff49d00d8b980aba1854d Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 11:10:25 +0200 Subject: [PATCH 01/25] Accept byte slice isntead of string --- pkg/logql/log/labels.go | 3 ++- pkg/logql/log/parser.go | 44 +++++++++++++++++++++-------------------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index f4cec1111b88c..7ae094f1937ff 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -336,7 +336,7 @@ func (b *LabelsBuilder) deleteWithCategory(category LabelCategory, n string) { // Set the name/value pair as a label. // The value `v` may not be set if a category with higher preference already contains `n`. // Category preference goes as Parsed > Structured Metadata > Stream. -func (b *LabelsBuilder) Set(category LabelCategory, n, v string) *LabelsBuilder { +func (b *LabelsBuilder) Set(category LabelCategory, n, v []byte) *LabelsBuilder { // Parsed takes precedence over Structured Metadata and Stream labels. // If category is Parsed, we delete `n` from the structured metadata and stream labels. if category == ParsedLabel { @@ -583,6 +583,7 @@ func (b *LabelsBuilder) Map() (map[string]string, bool) { // LabelsResult returns the LabelsResult from the builder. // No grouping is applied and the cache is used when possible. +// TODO: benchmark for high cardinality labels func (b *LabelsBuilder) LabelsResult() LabelsResult { // unchanged path. if !b.hasDel() && !b.hasAdd() && !b.HasErr() { diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index f6ebc1de29f62..116d4bc172297 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -36,6 +36,7 @@ var ( _ Stage = &LogfmtParser{} trueBytes = []byte("true") + falseBytes = []byte("false") errUnexpectedJSONObject = fmt.Errorf("expecting json object(%d), but it is not", jsoniter.ObjectValue) errMissingCapture = errors.New("at least one named capture must be supplied") @@ -242,37 +243,38 @@ func (j *JSONParser) buildJSONPathFromPrefixBuffer() []string { func (j *JSONParser) RequiredLabelNames() []string { return []string{} } -func readValue(v []byte, dataType jsonparser.ValueType) string { +func readValue(v []byte, dataType jsonparser.ValueType) []byte { switch dataType { case jsonparser.String: return unescapeJSONString(v) case jsonparser.Null: - return "" + return nil case jsonparser.Number: - return string(v) + return v case jsonparser.Boolean: if bytes.Equal(v, trueBytes) { - return trueString + return trueBytes } - return falseString + return falseBytes default: - return "" + return nil } } -func unescapeJSONString(b []byte) string { +func unescapeJSONString(b []byte) []byte { var stackbuf [unescapeStackBufSize]byte // stack-allocated array for allocation-free unescaping of small strings bU, err := jsonparser.Unescape(b, stackbuf[:]) if err != nil { - return "" + return nil } - res := string(bU) + res := unsafeGetString(bU) if strings.ContainsRune(res, utf8.RuneError) { - res = strings.Map(removeInvalidUtf, res) + // TODO: verify that this works + return []byte(strings.Map(removeInvalidUtf, res)) } - return res + return bU } type RegexpParser struct { @@ -338,7 +340,7 @@ func (r *RegexpParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte continue } - lbs.Set(ParsedLabel, key, string(value)) + lbs.Set(ParsedLabel, key, value) if !parserHints.ShouldContinueParsingLine(key, lbs) { return line, false } @@ -414,7 +416,7 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte continue } - lbs.Set(ParsedLabel, key, string(val)) + lbs.Set(ParsedLabel, key, val) if !parserHints.ShouldContinueParsingLine(key, lbs) { return line, false } @@ -476,7 +478,7 @@ func (l *PatternParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byt continue } - lbs.Set(ParsedLabel, name, string(m)) + lbs.Set(ParsedLabel, name, m) if !parserHints.ShouldContinueParsingLine(name, lbs) { return line, false } @@ -535,7 +537,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde for id, paths := range l.expressions { keys[id] = fmt.Sprintf("%v", paths...) if !lbs.BaseHas(id) { - lbs.Set(ParsedLabel, id, "") + lbs.Set(ParsedLabel, id, nil) } } @@ -597,7 +599,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde } } - lbs.Set(ParsedLabel, key, string(val)) + lbs.Set(ParsedLabel, key, val) if lbs.ParserLabelHints().AllRequiredExtracted() { break @@ -688,9 +690,9 @@ func (j *JSONExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilder) switch typ { case jsonparser.Null: - lbs.Set(ParsedLabel, key, "") + lbs.Set(ParsedLabel, key, nil) case jsonparser.Object: - lbs.Set(ParsedLabel, key, string(data)) + lbs.Set(ParsedLabel, key, data) default: lbs.Set(ParsedLabel, key, unescapeJSONString(data)) } @@ -702,7 +704,7 @@ func (j *JSONExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilder) if matches < len(j.ids) { for _, id := range j.ids { if _, ok := lbs.Get(id); !ok { - lbs.Set(ParsedLabel, id, "") + lbs.Set(ParsedLabel, id, nil) } } } @@ -772,7 +774,7 @@ func addErrLabel(msg string, err error, lbs *LabelsBuilder) { } if lbs.ParserLabelHints().PreserveError() { - lbs.Set(ParsedLabel, logqlmodel.PreserveErrorLabel, "true") + lbs.Set(ParsedLabel, logqlmodel.PreserveErrorLabel, trueBytes) } } @@ -808,7 +810,7 @@ func (u *UnpackParser) unpack(entry []byte, lbs *LabelsBuilder) ([]byte, error) } // append to the buffer of labels - u.lbsBuffer = append(u.lbsBuffer, sanitizeLabelKey(key, true), unescapeJSONString(value)) + u.lbsBuffer = append(u.lbsBuffer, sanitizeLabelKey(key, true), unescapeJSONString(value) default: return nil } From dd044a41112f44bac4914f9f3b7293f32cc917e5 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 15:34:36 +0200 Subject: [PATCH 02/25] Make labels compile --- pkg/logql/log/fmt.go | 4 +- pkg/logql/log/keep_labels.go | 8 +- pkg/logql/log/labels.go | 291 ++++++++++++++++++++++++----------- pkg/logql/log/parser.go | 29 ++-- 4 files changed, 223 insertions(+), 109 deletions(-) diff --git a/pkg/logql/log/fmt.go b/pkg/logql/log/fmt.go index 5a2946644034e..52919d3f1956a 100644 --- a/pkg/logql/log/fmt.go +++ b/pkg/logql/log/fmt.go @@ -414,7 +414,7 @@ func (lf *LabelsFormatter) Process(ts int64, l []byte, lbs *LabelsBuilder) ([]by if f.Rename { v, _, ok := lbs.GetWithCategory(f.Value) if ok { - lbs.Set(ParsedLabel, f.Name, v) + lbs.Set(ParsedLabel, unsafeGetBytes(f.Name), v) lbs.Del(f.Value) } continue @@ -428,7 +428,7 @@ func (lf *LabelsFormatter) Process(ts int64, l []byte, lbs *LabelsBuilder) ([]by lbs.SetErrorDetails(err.Error()) continue } - lbs.Set(ParsedLabel, f.Name, lf.buf.String()) + lbs.Set(ParsedLabel, unsafeGetBytes(f.Name), lf.buf.Bytes()) } return l, true } diff --git a/pkg/logql/log/keep_labels.go b/pkg/logql/log/keep_labels.go index 6f44efeae1550..da77a0c181092 100644 --- a/pkg/logql/log/keep_labels.go +++ b/pkg/logql/log/keep_labels.go @@ -2,6 +2,7 @@ package log import ( "github.com/grafana/loki/v3/pkg/logqlmodel" + "github.com/prometheus/prometheus/model/labels" ) type KeepLabels struct { @@ -17,10 +18,9 @@ func (kl *KeepLabels) Process(_ int64, line []byte, lbls *LabelsBuilder) ([]byte return line, true } - // TODO: Reuse buf? - for _, lb := range lbls.UnsortedLabels(nil) { + lbls.Range(func(lb labels.Label) { if isSpecialLabel(lb.Name) { - continue + return } var keep bool @@ -39,7 +39,7 @@ func (kl *KeepLabels) Process(_ int64, line []byte, lbls *LabelsBuilder) ([]byte if !keep { lbls.Del(lb.Name) } - } + }) return line, true } diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 7ae094f1937ff..5df4c1420b3d7 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -1,6 +1,7 @@ package log import ( + "bytes" "fmt" "sync" @@ -109,11 +110,112 @@ func categoriesContain(categories []LabelCategory, category LabelCategory) bool return false } +type stringColumn struct { + data []byte + offsets []int + + // indices is a selection vector. + indices []int +} + +func (s *stringColumn) add(value []byte) { + // The old values length is the offset of the new value + s.offsets = append(s.offsets, len(s.data)) + + s.data = append(s.data, value...) + + // Point to the last offset added + s.indices = append(s.indices, len(s.offsets)-1) +} + +// del remove the index from the selection vector. It does not remove the value. +// Use compact to also remove it from the data. +// TODO: implement compact +func (s *stringColumn) del(i int) { + s.indices = append(s.indices[:i], s.indices[i+1:]...) +} + +func (s *stringColumn) reset() { + s.data = s.data[:0] + s.offsets = s.offsets[:0] + s.indices = s.indices[:0] +} + +func (s *stringColumn) get(i int) []byte { + // TODO: test this. It's tricky + index := s.indices[i] + start := s.offsets[index] + end := s.offsets[index+1] + return s.data[start:end] +} + +type columnarLabels struct { + names stringColumn + values stringColumn +} + +func (c *columnarLabels) add(name, value []byte) { + c.names.add(name) + c.values.add(value) +} + +// override overrides the value of a label if it exists and returns true. +// If the label does not exist, it returns false and does nothing. +func (c *columnarLabels) override(name, value []byte) bool { + for i := 0; i < len(c.names.indices); i++ { + if bytes.Equal(c.names.get(i), name) { + c.values.del(i) + c.names.del(i) + c.add(name, value) + return true + } + } + return false +} + +func (c *columnarLabels) reset() { + c.names.reset() + c.values.reset() +} + +func (s *columnarLabels) len() int { + return len(s.names.data) +} + +func (s *columnarLabels) get(key []byte) ([]byte, bool) { + for i := 0; i < len(s.names.indices); i++ { + if bytes.Equal(s.names.get(i), key) { + return s.values.get(i), true + } + } + return nil, false +} + +func (s *columnarLabels) getAt(i int) (name, value []byte) { + return s.names.get(i), s.values.get(i) +} + +func (s *columnarLabels) del(name []byte) { + for i := 0; i < len(s.names.indices); i++ { + if bytes.Equal(s.names.get(i), name) { + s.names.del(i) + s.values.del(i) + } + } +} + +func newColumnarLabels(capacity int) *columnarLabels { + return &columnarLabels{ + names: stringColumn{data: make([]byte, 0, capacity*16), offsets: make([]int, 0, capacity), indices: make([]int, 0, capacity)}, + values: stringColumn{data: make([]byte, 0, capacity*16), offsets: make([]int, 0, capacity), indices: make([]int, 0, capacity)}, + } +} + // BaseLabelsBuilder is a label builder used by pipeline and stages. // Only one base builder is used and it contains cache for each LabelsBuilders. type BaseLabelsBuilder struct { del []string - add [numValidCategories][]labels.Label + add [numValidCategories]*columnarLabels // nolint:structcheck // https://github.com/golangci/golangci-lint/issues/826 err string @@ -150,10 +252,10 @@ func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint const labelsCapacity = 16 return &BaseLabelsBuilder{ del: make([]string, 0, 5), - add: [numValidCategories][]labels.Label{ - StreamLabel: make([]labels.Label, 0, labelsCapacity), - StructuredMetadataLabel: make([]labels.Label, 0, labelsCapacity), - ParsedLabel: make([]labels.Label, 0, labelsCapacity), + add: [numValidCategories]*columnarLabels{ + StreamLabel: newColumnarLabels(labelsCapacity), + StructuredMetadataLabel: newColumnarLabels(labelsCapacity), + ParsedLabel: newColumnarLabels(labelsCapacity), }, resultCache: make(map[uint64]LabelsResult), hasher: newHasher(), @@ -195,7 +297,7 @@ func (b *BaseLabelsBuilder) ForLabels(lbs labels.Labels, hash uint64) *LabelsBui func (b *BaseLabelsBuilder) Reset() { b.del = b.del[:0] for k := range b.add { - b.add[k] = b.add[k][:0] + b.add[k].reset() } b.err = "" b.errDetails = "" @@ -215,7 +317,7 @@ func (b *BaseLabelsBuilder) hasDel() bool { func (b *BaseLabelsBuilder) hasAdd() bool { for _, lbls := range b.add { - if len(lbls) > 0 { + if lbls.len() > 0 { return true } } @@ -225,7 +327,7 @@ func (b *BaseLabelsBuilder) hasAdd() bool { func (b *BaseLabelsBuilder) sizeAdd() int { var length int for _, lbls := range b.add { - length += len(lbls) + length += lbls.len() } return length } @@ -275,7 +377,7 @@ func (b *LabelsBuilder) BaseHas(key string) bool { } // GetWithCategory returns the value and the category of a labels key if it exists. -func (b *LabelsBuilder) GetWithCategory(key string) (string, LabelCategory, bool) { +func (b *LabelsBuilder) GetWithCategory(key string) ([]byte, LabelCategory, bool) { v, category, ok := b.getWithCategory(key) if category == StructuredMetadataLabel { b.referencedStructuredMetadata = true @@ -285,39 +387,37 @@ func (b *LabelsBuilder) GetWithCategory(key string) (string, LabelCategory, bool } // GetWithCategory returns the value and the category of a labels key if it exists. -func (b *LabelsBuilder) getWithCategory(key string) (string, LabelCategory, bool) { +func (b *LabelsBuilder) getWithCategory(key string) ([]byte, LabelCategory, bool) { for category, lbls := range b.add { - for _, l := range lbls { - if l.Name == key { - return l.Value, LabelCategory(category), true - } + if v, ok := lbls.get(unsafeGetBytes(key)); ok { + return v, LabelCategory(category), true } } for _, d := range b.del { if d == key { - return "", InvalidCategory, false + return nil, InvalidCategory, false } } value := b.base.Get(key) if value != "" { - return value, StreamLabel, true + return []byte(value), StreamLabel, true } - return "", InvalidCategory, false + return nil, InvalidCategory, false } func (b *LabelsBuilder) Get(key string) (string, bool) { v, _, ok := b.GetWithCategory(key) - return v, ok + return string(v), ok } // Del deletes the label of the given name. func (b *LabelsBuilder) Del(ns ...string) *LabelsBuilder { for _, n := range ns { for category := range b.add { - b.deleteWithCategory(LabelCategory(category), n) + b.deleteWithCategory(LabelCategory(category), unsafeGetBytes(n)) } b.del = append(b.del, n) } @@ -325,12 +425,8 @@ func (b *LabelsBuilder) Del(ns ...string) *LabelsBuilder { } // deleteWithCategory removes the label from the specified category -func (b *LabelsBuilder) deleteWithCategory(category LabelCategory, n string) { - for i, l := range b.add[category] { - if l.Name == n { - b.add[category] = append(b.add[category][:i], b.add[category][i+1:]...) - } - } +func (b *LabelsBuilder) deleteWithCategory(category LabelCategory, n []byte) { + b.add[category].del(n) } // Set the name/value pair as a label. @@ -349,7 +445,7 @@ func (b *LabelsBuilder) Set(category LabelCategory, n, v []byte) *LabelsBuilder // If `n` exists in the parsed labels, we won't overwrite it's value and we just return what we have. if category == StructuredMetadataLabel { b.deleteWithCategory(StreamLabel, n) - if labelsContain(b.add[ParsedLabel], n) { + if _, ok := b.add[ParsedLabel].get(n); ok { return b } } @@ -362,13 +458,11 @@ func (b *LabelsBuilder) Set(category LabelCategory, n, v []byte) *LabelsBuilder } } - for i, a := range b.add[category] { - if a.Name == n { - b.add[category][i].Value = v - return b - } + if ok := b.add[category].override(n, v); ok { + return b } - b.add[category] = append(b.add[category], labels.Label{Name: n, Value: v}) + + b.add[category].add(n, v) if category == ParsedLabel { // We record parsed labels as extracted so that future parse stages can @@ -377,7 +471,7 @@ func (b *LabelsBuilder) Set(category LabelCategory, n, v []byte) *LabelsBuilder // Note that because this is used for bypassing extracted fields, and // because parsed labels always take precedence over structured metadata // and stream labels, we must only call RecordExtracted for parsed labels. - b.parserKeyHints.RecordExtracted(n) + b.parserKeyHints.RecordExtracted(string(n)) } return b } @@ -401,7 +495,7 @@ func (b *LabelsBuilder) Add(category LabelCategory, lbs labels.Labels) *LabelsBu return } - b.Set(category, name, l.Value) + b.Set(category, unsafeGetBytes(name), unsafeGetBytes(l.Value)) }) return b } @@ -437,37 +531,31 @@ func (b *LabelsBuilder) appendErrors(buf []labels.Label) []labels.Label { } return buf } - -func (b *LabelsBuilder) UnsortedLabels(buf []labels.Label, categories ...LabelCategory) []labels.Label { +func (b *LabelsBuilder) Range(f func(l labels.Label), categories ...LabelCategory) { if categories == nil { categories = allCategories } if !b.hasDel() && !b.hasAdd() && categoriesContain(categories, StreamLabel) { - if buf == nil { - buf = make([]labels.Label, 0, b.base.Len()+1) // +1 for error label. - } else { - buf = buf[:0] - } - b.base.Range(func(l labels.Label) { - buf = append(buf, l) - }) + b.base.Range(f) if categoriesContain(categories, ParsedLabel) { - buf = b.appendErrors(buf) + if b.err != "" { + f(labels.Label{ + Name: logqlmodel.ErrorLabel, + Value: b.err, + }) + } + if b.errDetails != "" { + f(labels.Label{ + Name: logqlmodel.ErrorDetailsLabel, + Value: b.errDetails, + }) + } } - return buf - } - - // In the general case, labels are removed, modified or moved - // rather than added. - if buf == nil { - size := b.base.Len() + b.sizeAdd() + 1 - buf = make([]labels.Label, 0, size) - } else { - buf = buf[:0] + return } if categoriesContain(categories, StreamLabel) { @@ -480,41 +568,71 @@ func (b *LabelsBuilder) UnsortedLabels(buf []labels.Label, categories ...LabelCa } // Skip stream labels which value will be replaced by structured metadata - if labelsContain(b.add[StructuredMetadataLabel], l.Name) { + if labelsContain(b.add[StructuredMetadataLabel], unsafeGetBytes(l.Name)) { return } // Skip stream labels which value will be replaced by parsed labels - if labelsContain(b.add[ParsedLabel], l.Name) { + if labelsContain(b.add[ParsedLabel], unsafeGetBytes(l.Name)) { return } // Take value from stream label if present - if value, found := findLabelValue(b.add[StreamLabel], l.Name); found { - buf = append(buf, labels.Label{Name: l.Name, Value: value}) + if value, found := b.add[StreamLabel].get(unsafeGetBytes(l.Name)); found { + f(labels.Label{Name: l.Name, Value: unsafeGetString(value)}) // TODO: make clear that string must be copied } else { - buf = append(buf, l) + f(l) } }) } if categoriesContain(categories, StructuredMetadataLabel) { - for _, l := range b.add[StructuredMetadataLabel] { - if labelsContain(b.add[ParsedLabel], l.Name) { + for i := 0; i < b.add[StructuredMetadataLabel].len(); i++ { + name, value := b.add[StructuredMetadataLabel].getAt(i) + if labelsContain(b.add[ParsedLabel], name) { continue } - buf = append(buf, l) + f(labels.Label{Name: unsafeGetString(name), Value: unsafeGetString(value)}) } } if categoriesContain(categories, ParsedLabel) { - buf = append(buf, b.add[ParsedLabel]...) + for i := 0; i < b.add[ParsedLabel].len(); i++ { + name, value := b.add[ParsedLabel].getAt(i) + f(labels.Label{Name: unsafeGetString(name), Value: unsafeGetString(value)}) + } } + if (b.HasErr() || b.HasErrorDetails()) && categoriesContain(categories, ParsedLabel) { - buf = b.appendErrors(buf) + if b.err != "" { + f(labels.Label{ + Name: logqlmodel.ErrorLabel, + Value: b.err, + }) + } + if b.errDetails != "" { + f(labels.Label{ + Name: logqlmodel.ErrorDetailsLabel, + Value: b.errDetails, + }) + } } + return +} + +func (b *LabelsBuilder) UnsortedLabels(buf []labels.Label, categories ...LabelCategory) []labels.Label { + if buf == nil { + buf = make([]labels.Label, 0, b.base.Len()+1) // +1 for error label. + } else { + buf = buf[:0] + } + + b.Range(func(l labels.Label) { + buf = append(buf, l) + }, categories...) + return buf } @@ -600,7 +718,7 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { } // Now segregate the sorted labels into their categories - var stream, meta, parsed []labels.Label + var stream, meta, parsed []labels.Label // TODO: use ScratchBuilder instead for _, l := range b.buf { // Skip error labels for stream and meta categories @@ -610,9 +728,9 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { } // Check which category this label belongs to - if labelsContain(b.add[ParsedLabel], l.Name) { + if labelsContain(b.add[ParsedLabel], unsafeGetBytes(l.Name)) { parsed = append(parsed, l) - } else if labelsContain(b.add[StructuredMetadataLabel], l.Name) { + } else if labelsContain(b.add[StructuredMetadataLabel], unsafeGetBytes(l.Name)) { meta = append(meta, l) } else { stream = append(stream, l) @@ -625,13 +743,9 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { return result } -func labelsContain(labels []labels.Label, name string) bool { - for _, l := range labels { - if l.Name == name { - return true - } - } - return false +func labelsContain(labels *columnarLabels, name []byte) bool { + _, ok := labels.get(name) + return ok } func findLabelValue(labels []labels.Label, name string) (string, bool) { @@ -643,6 +757,7 @@ func findLabelValue(labels []labels.Label, name string) (string, bool) { return "", false } +// TODO: use scratch builder instead func (b *BaseLabelsBuilder) toUncategorizedResult(buf []labels.Label) LabelsResult { lbls := labels.New(buf...) hash := b.hasher.Hash(lbls) @@ -680,6 +795,7 @@ func (b *LabelsBuilder) GroupedLabels() LabelsResult { if b.without { return b.withoutResult() } + // TODO: use scratch builder instead return b.withResult() } @@ -697,14 +813,12 @@ Outer: } } for category, la := range b.add { - for _, l := range la { - if g == l.Name { - if LabelCategory(category) == StructuredMetadataLabel { - b.referencedStructuredMetadata = true - } - b.buf = append(b.buf, l) - continue Outer + if value, ok := la.get(unsafeGetBytes(g)); ok { + if LabelCategory(category) == StructuredMetadataLabel { + b.referencedStructuredMetadata = true } + b.buf = append(b.buf, labels.Label{Name: g, Value: string(value)}) + continue Outer } } @@ -734,10 +848,8 @@ func (b *LabelsBuilder) withoutResult() LabelsResult { } } for _, lbls := range b.add { - for _, la := range lbls { - if l.Name == la.Name { - return - } + if _, ok := lbls.get(unsafeGetBytes(l.Name)); ok { + return } } for _, lg := range b.groups { @@ -750,16 +862,17 @@ func (b *LabelsBuilder) withoutResult() LabelsResult { for category, lbls := range b.add { OuterAdd: - for _, la := range lbls { + for i := 0; i < lbls.len(); i++ { + name, value := lbls.getAt(i) for _, lg := range b.groups { - if la.Name == lg { + if unsafeGetString(name) == lg { if LabelCategory(category) == StructuredMetadataLabel { b.referencedStructuredMetadata = true } continue OuterAdd } } - b.buf = append(b.buf, la) + b.buf = append(b.buf, labels.Label{Name: string(name), Value: string(value)}) } } diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index 116d4bc172297..fa39cdc55fcbf 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -151,7 +151,7 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu if !ok || j.lbs.ParserLabelHints().Extracted(sanitizedKey) { return nil } - j.lbs.Set(ParsedLabel, sanitizedKey, readValue(value, dataType)) + j.lbs.Set(ParsedLabel, unsafeGetBytes(sanitizedKey), readValue(value, dataType)) if j.captureJSONPath { j.lbs.SetJSONPath(sanitizedKey, []string{string(key)}) } @@ -196,7 +196,7 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu return nil } - j.lbs.Set(ParsedLabel, keyString, readValue(value, dataType)) + j.lbs.Set(ParsedLabel, unsafeGetBytes(keyString), readValue(value, dataType)) if !j.parserHints.ShouldContinueParsingLine(keyString, j.lbs) { return errLabelDoesNotMatch @@ -340,7 +340,7 @@ func (r *RegexpParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte continue } - lbs.Set(ParsedLabel, key, value) + lbs.Set(ParsedLabel, unsafeGetBytes(key), value) if !parserHints.ShouldContinueParsingLine(key, lbs) { return line, false } @@ -416,7 +416,7 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte continue } - lbs.Set(ParsedLabel, key, val) + lbs.Set(ParsedLabel, unsafeGetBytes(key), val) if !parserHints.ShouldContinueParsingLine(key, lbs) { return line, false } @@ -478,7 +478,7 @@ func (l *PatternParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byt continue } - lbs.Set(ParsedLabel, name, m) + lbs.Set(ParsedLabel, unsafeGetBytes(name), m) if !parserHints.ShouldContinueParsingLine(name, lbs) { return line, false } @@ -537,7 +537,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde for id, paths := range l.expressions { keys[id] = fmt.Sprintf("%v", paths...) if !lbs.BaseHas(id) { - lbs.Set(ParsedLabel, id, nil) + lbs.Set(ParsedLabel, unsafeGetBytes(id), nil) } } @@ -599,7 +599,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde } } - lbs.Set(ParsedLabel, key, val) + lbs.Set(ParsedLabel, unsafeGetBytes(key), val) if lbs.ParserLabelHints().AllRequiredExtracted() { break @@ -690,11 +690,11 @@ func (j *JSONExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilder) switch typ { case jsonparser.Null: - lbs.Set(ParsedLabel, key, nil) + lbs.Set(ParsedLabel, unsafeGetBytes(key), nil) case jsonparser.Object: - lbs.Set(ParsedLabel, key, data) + lbs.Set(ParsedLabel, unsafeGetBytes(key), data) default: - lbs.Set(ParsedLabel, key, unescapeJSONString(data)) + lbs.Set(ParsedLabel, unsafeGetBytes(key), unescapeJSONString(data)) } matches++ @@ -704,7 +704,7 @@ func (j *JSONExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilder) if matches < len(j.ids) { for _, id := range j.ids { if _, ok := lbs.Get(id); !ok { - lbs.Set(ParsedLabel, id, nil) + lbs.Set(ParsedLabel, unsafeGetBytes(id), nil) } } } @@ -774,7 +774,7 @@ func addErrLabel(msg string, err error, lbs *LabelsBuilder) { } if lbs.ParserLabelHints().PreserveError() { - lbs.Set(ParsedLabel, logqlmodel.PreserveErrorLabel, trueBytes) + lbs.Set(ParsedLabel, unsafeGetBytes(logqlmodel.PreserveErrorLabel), trueBytes) } } @@ -810,7 +810,8 @@ func (u *UnpackParser) unpack(entry []byte, lbs *LabelsBuilder) ([]byte, error) } // append to the buffer of labels - u.lbsBuffer = append(u.lbsBuffer, sanitizeLabelKey(key, true), unescapeJSONString(value) + // TODO: use byte slice + u.lbsBuffer = append(u.lbsBuffer, sanitizeLabelKey(key, true), string(unescapeJSONString(value))) default: return nil } @@ -825,7 +826,7 @@ func (u *UnpackParser) unpack(entry []byte, lbs *LabelsBuilder) ([]byte, error) // flush the buffer if we found a packed entry. if isPacked { for i := 0; i < len(u.lbsBuffer); i = i + 2 { - lbs.Set(ParsedLabel, u.lbsBuffer[i], u.lbsBuffer[i+1]) + lbs.Set(ParsedLabel, unsafeGetBytes(u.lbsBuffer[i]), unsafeGetBytes(u.lbsBuffer[i+1])) if !lbs.ParserLabelHints().ShouldContinueParsingLine(u.lbsBuffer[i], lbs) { return entry, errLabelDoesNotMatch } From c1a1004e4267042c938100f2bdb756b1bbb2cc5f Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 15:37:00 +0200 Subject: [PATCH 03/25] todo for string search --- pkg/logql/log/labels.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 5df4c1420b3d7..43a0fa9a42f8d 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -183,6 +183,7 @@ func (s *columnarLabels) len() int { } func (s *columnarLabels) get(key []byte) ([]byte, bool) { + // TODO: to a string search on s.names.data for i := 0; i < len(s.names.indices); i++ { if bytes.Equal(s.names.get(i), key) { return s.values.get(i), true @@ -196,6 +197,7 @@ func (s *columnarLabels) getAt(i int) (name, value []byte) { } func (s *columnarLabels) del(name []byte) { + // TODO: to a string search on s.names.data for i := 0; i < len(s.names.indices); i++ { if bytes.Equal(s.names.get(i), name) { s.names.del(i) From 1249c7ddd0e42e43e693820ce4fdc53a11e58990 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 15:53:29 +0200 Subject: [PATCH 04/25] Make most tests compile --- pkg/logql/log/labels.go | 9 ++++ pkg/logql/log/labels_test.go | 76 +++++++++++++++++----------------- pkg/logql/log/parser_test.go | 4 +- pkg/logql/log/pipeline_test.go | 12 ++++-- 4 files changed, 57 insertions(+), 44 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 43a0fa9a42f8d..18e9013400e23 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -213,6 +213,15 @@ func newColumnarLabels(capacity int) *columnarLabels { } } +// TODO: accept string... like labels.FromStrings +func newColumnarLabelsFrom(labels []labels.Label) *columnarLabels { + c := newColumnarLabels(len(labels)) + for _, l := range labels { + c.add(unsafeGetBytes(l.Name), unsafeGetBytes(l.Value)) + } + return c +} + // BaseLabelsBuilder is a label builder used by pipeline and stages. // Only one base builder is used and it contains cache for each LabelsBuilders. type BaseLabelsBuilder struct { diff --git a/pkg/logql/log/labels_test.go b/pkg/logql/log/labels_test.go index 11d134799009d..2f32e0204bc30 100644 --- a/pkg/logql/log/labels_test.go +++ b/pkg/logql/log/labels_test.go @@ -18,8 +18,8 @@ func TestLabelsBuilder_Get(t *testing.T) { lbs := labels.FromStrings("already", "in") b := NewBaseLabelsBuilder().ForLabels(lbs, labels.StableHash(lbs)) b.Reset() - b.Set(StructuredMetadataLabel, "foo", "bar") - b.Set(ParsedLabel, "bar", "buzz") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) + b.Set(ParsedLabel, []byte("bar"), []byte("buzz")) _, category, ok := b.GetWithCategory("bar") require.Equal(t, ParsedLabel, category) @@ -170,10 +170,10 @@ func TestLabelsBuilder_LabelsResult(t *testing.T) { withErr := labels.FromStrings(append(strs, logqlmodel.ErrorLabel, "err")...) assertLabelResult(t, withErr, b.LabelsResult()) - b.Set(StructuredMetadataLabel, "foo", "bar") - b.Set(StreamLabel, "namespace", "tempo") - b.Set(ParsedLabel, "buzz", "fuzz") - b.Set(ParsedLabel, "ToReplace", "other") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) + b.Set(StreamLabel, []byte("namespace"), []byte("tempo")) + b.Set(ParsedLabel, []byte("buzz"), []byte("fuzz")) + b.Set(ParsedLabel, []byte("ToReplace"), []byte("other")) b.Del("job") expectedStreamLbls := labels.FromStrings( @@ -201,10 +201,10 @@ func TestLabelsBuilder_LabelsResult(t *testing.T) { assert.Equal(t, expectedParsedLbls, actual.Parsed()) b.Reset() - b.Set(StreamLabel, "namespace", "tempo") - b.Set(StreamLabel, "bazz", "tazz") - b.Set(StructuredMetadataLabel, "bazz", "sazz") - b.Set(ParsedLabel, "ToReplace", "other") + b.Set(StreamLabel, []byte("namespace"), []byte("tempo")) + b.Set(StreamLabel, []byte("bazz"), []byte("tazz")) + b.Set(StructuredMetadataLabel, []byte("bazz"), []byte("sazz")) + b.Set(ParsedLabel, []byte("ToReplace"), []byte("other")) expectedStreamLbls = labels.FromStrings( "namespace", "tempo", @@ -238,8 +238,8 @@ func TestLabelsBuilder_Set(t *testing.T) { b := NewBaseLabelsBuilder().ForLabels(lbs, labels.StableHash(lbs)) // test duplicating stream label with parsed label - b.Set(StructuredMetadataLabel, "stzz", "stvzz") - b.Set(ParsedLabel, "toreplace", "buzz") + b.Set(StructuredMetadataLabel, []byte("stzz"), []byte("stvzz")) + b.Set(ParsedLabel, []byte("toreplace"), []byte("buzz")) expectedStreamLbls := labels.FromStrings("namespace", "loki", "cluster", "us-central1") expectedStucturedMetadataLbls := labels.FromStrings("stzz", "stvzz") expectedParsedLbls := labels.FromStrings("toreplace", "buzz") @@ -255,9 +255,9 @@ func TestLabelsBuilder_Set(t *testing.T) { b.Reset() // test duplicating structured metadata label with parsed label - b.Set(StructuredMetadataLabel, "stzz", "stvzz") - b.Set(StructuredMetadataLabel, "toreplace", "muzz") - b.Set(ParsedLabel, "toreplace", "buzz") + b.Set(StructuredMetadataLabel, []byte("stzz"), []byte("stvzz")) + b.Set(StructuredMetadataLabel, []byte("toreplace"), []byte("muzz")) + b.Set(ParsedLabel, []byte("toreplace"), []byte("buzz")) expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1") expectedStucturedMetadataLbls = labels.FromStrings("stzz", "stvzz") expectedParsedLbls = labels.FromStrings("toreplace", "buzz") @@ -273,8 +273,8 @@ func TestLabelsBuilder_Set(t *testing.T) { b.Reset() // test duplicating stream label with structured meta data label - b.Set(StructuredMetadataLabel, "toreplace", "muzz") - b.Set(ParsedLabel, "stzz", "stvzz") + b.Set(StructuredMetadataLabel, []byte("toreplace"), []byte("muzz")) + b.Set(ParsedLabel, []byte("stzz"), []byte("stvzz")) expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1") expectedStucturedMetadataLbls = labels.FromStrings("toreplace", "muzz") expectedParsedLbls = labels.FromStrings("stzz", "stvzz") @@ -290,9 +290,9 @@ func TestLabelsBuilder_Set(t *testing.T) { b.Reset() // test duplicating parsed label with structured meta data label - b.Set(ParsedLabel, "toreplace", "puzz") - b.Set(StructuredMetadataLabel, "stzz", "stvzzz") - b.Set(StructuredMetadataLabel, "toreplace", "muzz") + b.Set(ParsedLabel, []byte("toreplace"), []byte("puzz")) + b.Set(StructuredMetadataLabel, []byte("stzz"), []byte("stvzzz")) + b.Set(StructuredMetadataLabel, []byte("toreplace"), []byte("muzz")) expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1") expectedStucturedMetadataLbls = labels.FromStrings("stzz", "stvzzz") expectedParsedLbls = labels.FromStrings("toreplace", "puzz") @@ -308,8 +308,8 @@ func TestLabelsBuilder_Set(t *testing.T) { b.Reset() // test duplicating structured meta data label with stream label - b.Set(ParsedLabel, "stzz", "stvzzz") - b.Set(StructuredMetadataLabel, "toreplace", "muzz") + b.Set(ParsedLabel, []byte("stzz"), []byte("stvzzz")) + b.Set(StructuredMetadataLabel, []byte("toreplace"), []byte("muzz")) expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1") expectedStucturedMetadataLbls = labels.FromStrings("toreplace", "muzz") expectedParsedLbls = labels.FromStrings("stzz", "stvzzz") @@ -331,15 +331,15 @@ func TestLabelsBuilder_UnsortedLabels(t *testing.T) { } lbs := labels.FromStrings(strs...) b := NewBaseLabelsBuilder().ForLabels(lbs, labels.StableHash(lbs)) - b.add[StructuredMetadataLabel] = []labels.Label{{Name: "toreplace", Value: "buzz"}, {Name: "fzz", Value: "bzz"}} - b.add[ParsedLabel] = []labels.Label{{Name: "pzz", Value: "pvzz"}} + b.add[StructuredMetadataLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "toreplace", Value: "buzz"}, {Name: "fzz", Value: "bzz"}}) + b.add[ParsedLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "pzz", Value: "pvzz"}}) expected := []labels.Label{{Name: "cluster", Value: "us-central1"}, {Name: "namespace", Value: "loki"}, {Name: "fzz", Value: "bzz"}, {Name: "toreplace", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}} actual := b.UnsortedLabels(nil) require.ElementsMatch(t, expected, actual) b.Reset() - b.add[StructuredMetadataLabel] = []labels.Label{{Name: "fzz", Value: "bzz"}} - b.add[ParsedLabel] = []labels.Label{{Name: "toreplace", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}} + b.add[StructuredMetadataLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "fzz", Value: "bzz"}}) + b.add[ParsedLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "toreplace", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}}) expected = []labels.Label{{Name: "cluster", Value: "us-central1"}, {Name: "namespace", Value: "loki"}, {Name: "fzz", Value: "bzz"}, {Name: "toreplace", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}} actual = b.UnsortedLabels(nil) sortLabelSlice(expected) @@ -347,8 +347,8 @@ func TestLabelsBuilder_UnsortedLabels(t *testing.T) { assert.Equal(t, expected, actual) b.Reset() - b.add[StructuredMetadataLabel] = []labels.Label{{Name: "fzz", Value: "bzz"}, {Name: "toreplacezz", Value: "test"}} - b.add[ParsedLabel] = []labels.Label{{Name: "toreplacezz", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}} + b.add[StructuredMetadataLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "fzz", Value: "bzz"}, {Name: "toreplacezz", Value: "test"}}) + b.add[ParsedLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "toreplacezz", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}}) expected = []labels.Label{{Name: "cluster", Value: "us-central1"}, {Name: "namespace", Value: "loki"}, {Name: "fzz", Value: "bzz"}, {Name: "toreplace", Value: "fuzz"}, {Name: "pzz", Value: "pvzz"}, {Name: "toreplacezz", Value: "buzz"}} actual = b.UnsortedLabels(nil) sortLabelSlice(expected) @@ -375,9 +375,9 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { assertLabelResult(t, withErr, b.GroupedLabels()) b.Reset() - b.Set(StructuredMetadataLabel, "foo", "bar") - b.Set(StreamLabel, "namespace", "tempo") - b.Set(ParsedLabel, "buzz", "fuzz") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) + b.Set(StreamLabel, []byte("namespace"), []byte("tempo")) + b.Set(ParsedLabel, []byte("buzz"), []byte("fuzz")) b.Del("job") expected := labels.FromStrings("namespace", "tempo") assertLabelResult(t, expected, b.GroupedLabels()) @@ -390,19 +390,19 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { b.Del("job") assertLabelResult(t, labels.EmptyLabels(), b.GroupedLabels()) b.Reset() - b.Set(StreamLabel, "namespace", "tempo") + b.Set(StreamLabel, []byte("namespace"), []byte("tempo")) assertLabelResult(t, labels.FromStrings("job", "us-central1/loki"), b.GroupedLabels()) require.False(t, b.referencedStructuredMetadata) b = NewBaseLabelsBuilderWithGrouping([]string{"foo"}, nil, false, false).ForLabels(lbs, labels.StableHash(lbs)) - b.Set(StructuredMetadataLabel, "foo", "bar") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) assertLabelResult(t, labels.FromStrings("foo", "bar"), b.GroupedLabels()) require.True(t, b.referencedStructuredMetadata) b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, nil, true, false).ForLabels(lbs, labels.StableHash(lbs)) b.Del("job") - b.Set(StructuredMetadataLabel, "foo", "bar") - b.Set(StreamLabel, "job", "something") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) + b.Set(StreamLabel, []byte("job"), []byte("something")) expected = labels.FromStrings("namespace", "loki", "cluster", "us-central1", "foo", "bar", @@ -411,7 +411,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { require.False(t, b.referencedStructuredMetadata) b = NewBaseLabelsBuilderWithGrouping([]string{"foo"}, nil, true, false).ForLabels(lbs, labels.StableHash(lbs)) - b.Set(StructuredMetadataLabel, "foo", "bar") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) expected = labels.FromStrings("namespace", "loki", "job", "us-central1/loki", "cluster", "us-central1", @@ -420,8 +420,8 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { require.True(t, b.referencedStructuredMetadata) b = NewBaseLabelsBuilderWithGrouping(nil, nil, false, false).ForLabels(lbs, labels.StableHash(lbs)) - b.Set(StructuredMetadataLabel, "foo", "bar") - b.Set(StreamLabel, "job", "something") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) + b.Set(StreamLabel, []byte("job"), []byte("something")) expected = labels.FromStrings("namespace", "loki", "job", "something", "cluster", "us-central1", diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index a39e0777d4abc..50f19ddf0735b 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -1375,7 +1375,7 @@ func TestLogfmtConsistentPrecedence(t *testing.T) { var ( metadataStream = NewBaseLabelsBuilder(). ForLabels(labels.FromStrings("foo", "bar"), 0). - Set(StructuredMetadataLabel, "app", "loki") + Set(StructuredMetadataLabel, []byte("app"), []byte("loki")) basicStream = NewBaseLabelsBuilder(). ForLabels(labels.FromStrings("foo", "baz"), 0) @@ -1404,7 +1404,7 @@ func TestLogfmtConsistentPrecedence(t *testing.T) { var ( metadataStream = NewBaseLabelsBuilder(). ForLabels(labels.FromStrings("foo", "bar"), 0). - Set(StructuredMetadataLabel, "app", "loki") + Set(StructuredMetadataLabel, []byte("app"), []byte("loki")) basicStream = NewBaseLabelsBuilder(). ForLabels(labels.FromStrings("foo", "baz"), 0) diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index 2205ffffd66e8..fd005c7d2c475 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -91,8 +91,10 @@ func TestPipeline(t *testing.T) { // Reset caches p.baseBuilder.del = []string{"foo", "bar"} - p.baseBuilder.add = [numValidCategories][]labels.Label{ - ParsedLabel: {{Name: "baz", Value: "blip"}}, + p.baseBuilder.add = [numValidCategories]*columnarLabels{ + ParsedLabel: newColumnarLabelsFrom([]labels.Label{ + {Name: "baz", Value: "blip"}, + }), } p.Reset() @@ -167,8 +169,10 @@ func TestPipelineWithStructuredMetadata(t *testing.T) { // Reset caches p.baseBuilder.del = []string{"foo", "bar"} - p.baseBuilder.add = [numValidCategories][]labels.Label{ - ParsedLabel: {{Name: "baz", Value: "blip"}}, + p.baseBuilder.add = [numValidCategories]*columnarLabels{ + ParsedLabel: newColumnarLabelsFrom([]labels.Label{ + {Name: "baz", Value: "blip"}, + }), } p.Reset() From cd07a0e8539eb5ad7a646d770c523b723e41011f Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 15:55:47 +0200 Subject: [PATCH 05/25] Simplify columnar creation --- pkg/logql/log/labels.go | 38 ++++++++++++++++++---------------- pkg/logql/log/labels_test.go | 12 +++++------ pkg/logql/log/pipeline_test.go | 8 ++----- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 18e9013400e23..7a5713fae0029 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -111,7 +111,7 @@ func categoriesContain(categories []LabelCategory, category LabelCategory) bool } type stringColumn struct { - data []byte + data []byte offsets []int // indices is a selection vector. @@ -213,11 +213,13 @@ func newColumnarLabels(capacity int) *columnarLabels { } } -// TODO: accept string... like labels.FromStrings -func newColumnarLabelsFrom(labels []labels.Label) *columnarLabels { - c := newColumnarLabels(len(labels)) - for _, l := range labels { - c.add(unsafeGetBytes(l.Name), unsafeGetBytes(l.Value)) +func newColumnarLabelsFromStrings(ss ...string) *columnarLabels { + if len(ss)%2 != 0 { + panic("invalid number of strings") + } + c := newColumnarLabels(len(ss) / 2) + for i := 0; i < len(ss); i += 2 { + c.add(unsafeGetBytes(ss[i]), unsafeGetBytes(ss[i+1])) } return c } @@ -616,18 +618,18 @@ func (b *LabelsBuilder) Range(f func(l labels.Label), categories ...LabelCategor } if (b.HasErr() || b.HasErrorDetails()) && categoriesContain(categories, ParsedLabel) { - if b.err != "" { - f(labels.Label{ - Name: logqlmodel.ErrorLabel, - Value: b.err, - }) - } - if b.errDetails != "" { - f(labels.Label{ - Name: logqlmodel.ErrorDetailsLabel, - Value: b.errDetails, - }) - } + if b.err != "" { + f(labels.Label{ + Name: logqlmodel.ErrorLabel, + Value: b.err, + }) + } + if b.errDetails != "" { + f(labels.Label{ + Name: logqlmodel.ErrorDetailsLabel, + Value: b.errDetails, + }) + } } return diff --git a/pkg/logql/log/labels_test.go b/pkg/logql/log/labels_test.go index 2f32e0204bc30..b47a2292b52bd 100644 --- a/pkg/logql/log/labels_test.go +++ b/pkg/logql/log/labels_test.go @@ -331,15 +331,15 @@ func TestLabelsBuilder_UnsortedLabels(t *testing.T) { } lbs := labels.FromStrings(strs...) b := NewBaseLabelsBuilder().ForLabels(lbs, labels.StableHash(lbs)) - b.add[StructuredMetadataLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "toreplace", Value: "buzz"}, {Name: "fzz", Value: "bzz"}}) - b.add[ParsedLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "pzz", Value: "pvzz"}}) + b.add[StructuredMetadataLabel] = newColumnarLabelsFromStrings("toreplace", "buzz", "fzz", "bzz") + b.add[ParsedLabel] = newColumnarLabelsFromStrings("pzz", "pvzz") expected := []labels.Label{{Name: "cluster", Value: "us-central1"}, {Name: "namespace", Value: "loki"}, {Name: "fzz", Value: "bzz"}, {Name: "toreplace", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}} actual := b.UnsortedLabels(nil) require.ElementsMatch(t, expected, actual) b.Reset() - b.add[StructuredMetadataLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "fzz", Value: "bzz"}}) - b.add[ParsedLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "toreplace", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}}) + b.add[StructuredMetadataLabel] = newColumnarLabelsFromStrings("fzz", "bzz") + b.add[ParsedLabel] = newColumnarLabelsFromStrings("toreplace", "buzz", "pzz", "pvzz") expected = []labels.Label{{Name: "cluster", Value: "us-central1"}, {Name: "namespace", Value: "loki"}, {Name: "fzz", Value: "bzz"}, {Name: "toreplace", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}} actual = b.UnsortedLabels(nil) sortLabelSlice(expected) @@ -347,8 +347,8 @@ func TestLabelsBuilder_UnsortedLabels(t *testing.T) { assert.Equal(t, expected, actual) b.Reset() - b.add[StructuredMetadataLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "fzz", Value: "bzz"}, {Name: "toreplacezz", Value: "test"}}) - b.add[ParsedLabel] = newColumnarLabelsFrom([]labels.Label{{Name: "toreplacezz", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}}) + b.add[StructuredMetadataLabel] = newColumnarLabelsFromStrings("fzz", "bzz", "toreplacezz", "test") + b.add[ParsedLabel] = newColumnarLabelsFromStrings("toreplacezz", "buzz", "pzz", "pvzz") expected = []labels.Label{{Name: "cluster", Value: "us-central1"}, {Name: "namespace", Value: "loki"}, {Name: "fzz", Value: "bzz"}, {Name: "toreplace", Value: "fuzz"}, {Name: "pzz", Value: "pvzz"}, {Name: "toreplacezz", Value: "buzz"}} actual = b.UnsortedLabels(nil) sortLabelSlice(expected) diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index fd005c7d2c475..ea887f7a301cd 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -92,9 +92,7 @@ func TestPipeline(t *testing.T) { // Reset caches p.baseBuilder.del = []string{"foo", "bar"} p.baseBuilder.add = [numValidCategories]*columnarLabels{ - ParsedLabel: newColumnarLabelsFrom([]labels.Label{ - {Name: "baz", Value: "blip"}, - }), + ParsedLabel: newColumnarLabelsFromStrings("baz", "blip"), } p.Reset() @@ -170,9 +168,7 @@ func TestPipelineWithStructuredMetadata(t *testing.T) { // Reset caches p.baseBuilder.del = []string{"foo", "bar"} p.baseBuilder.add = [numValidCategories]*columnarLabels{ - ParsedLabel: newColumnarLabelsFrom([]labels.Label{ - {Name: "baz", Value: "blip"}, - }), + ParsedLabel: newColumnarLabelsFromStrings("baz", "blip"), } p.Reset() From aa52859cec4e8dc3de38e2903af7d0c96cb14e5a Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 16:03:04 +0200 Subject: [PATCH 06/25] Test columnar strings --- pkg/logql/log/labels.go | 115 ---------------------- pkg/logql/log/labels_columnar.go | 133 ++++++++++++++++++++++++++ pkg/logql/log/labels_columnar_test.go | 46 +++++++++ 3 files changed, 179 insertions(+), 115 deletions(-) create mode 100644 pkg/logql/log/labels_columnar.go create mode 100644 pkg/logql/log/labels_columnar_test.go diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 7a5713fae0029..dc5fa8deee65f 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -1,7 +1,6 @@ package log import ( - "bytes" "fmt" "sync" @@ -110,120 +109,6 @@ func categoriesContain(categories []LabelCategory, category LabelCategory) bool return false } -type stringColumn struct { - data []byte - offsets []int - - // indices is a selection vector. - indices []int -} - -func (s *stringColumn) add(value []byte) { - // The old values length is the offset of the new value - s.offsets = append(s.offsets, len(s.data)) - - s.data = append(s.data, value...) - - // Point to the last offset added - s.indices = append(s.indices, len(s.offsets)-1) -} - -// del remove the index from the selection vector. It does not remove the value. -// Use compact to also remove it from the data. -// TODO: implement compact -func (s *stringColumn) del(i int) { - s.indices = append(s.indices[:i], s.indices[i+1:]...) -} - -func (s *stringColumn) reset() { - s.data = s.data[:0] - s.offsets = s.offsets[:0] - s.indices = s.indices[:0] -} - -func (s *stringColumn) get(i int) []byte { - // TODO: test this. It's tricky - index := s.indices[i] - start := s.offsets[index] - end := s.offsets[index+1] - return s.data[start:end] -} - -type columnarLabels struct { - names stringColumn - values stringColumn -} - -func (c *columnarLabels) add(name, value []byte) { - c.names.add(name) - c.values.add(value) -} - -// override overrides the value of a label if it exists and returns true. -// If the label does not exist, it returns false and does nothing. -func (c *columnarLabels) override(name, value []byte) bool { - for i := 0; i < len(c.names.indices); i++ { - if bytes.Equal(c.names.get(i), name) { - c.values.del(i) - c.names.del(i) - c.add(name, value) - return true - } - } - return false -} - -func (c *columnarLabels) reset() { - c.names.reset() - c.values.reset() -} - -func (s *columnarLabels) len() int { - return len(s.names.data) -} - -func (s *columnarLabels) get(key []byte) ([]byte, bool) { - // TODO: to a string search on s.names.data - for i := 0; i < len(s.names.indices); i++ { - if bytes.Equal(s.names.get(i), key) { - return s.values.get(i), true - } - } - return nil, false -} - -func (s *columnarLabels) getAt(i int) (name, value []byte) { - return s.names.get(i), s.values.get(i) -} - -func (s *columnarLabels) del(name []byte) { - // TODO: to a string search on s.names.data - for i := 0; i < len(s.names.indices); i++ { - if bytes.Equal(s.names.get(i), name) { - s.names.del(i) - s.values.del(i) - } - } -} - -func newColumnarLabels(capacity int) *columnarLabels { - return &columnarLabels{ - names: stringColumn{data: make([]byte, 0, capacity*16), offsets: make([]int, 0, capacity), indices: make([]int, 0, capacity)}, - values: stringColumn{data: make([]byte, 0, capacity*16), offsets: make([]int, 0, capacity), indices: make([]int, 0, capacity)}, - } -} - -func newColumnarLabelsFromStrings(ss ...string) *columnarLabels { - if len(ss)%2 != 0 { - panic("invalid number of strings") - } - c := newColumnarLabels(len(ss) / 2) - for i := 0; i < len(ss); i += 2 { - c.add(unsafeGetBytes(ss[i]), unsafeGetBytes(ss[i+1])) - } - return c -} - // BaseLabelsBuilder is a label builder used by pipeline and stages. // Only one base builder is used and it contains cache for each LabelsBuilders. type BaseLabelsBuilder struct { diff --git a/pkg/logql/log/labels_columnar.go b/pkg/logql/log/labels_columnar.go new file mode 100644 index 0000000000000..7bd17975b2796 --- /dev/null +++ b/pkg/logql/log/labels_columnar.go @@ -0,0 +1,133 @@ +package log + +import "bytes" + +type stringColumn struct { + data []byte + offsets []int + + // indices is a selection vector. + indices []int +} + +func newStringColumn(capacity int) *stringColumn { + return &stringColumn{ + data: make([]byte, 0, capacity*16), + offsets: make([]int, 0, capacity), + indices: make([]int, 0, capacity), + } +} + +func (s *stringColumn) add(value []byte) { + // The old values length is the offset of the new value + s.offsets = append(s.offsets, len(s.data)) + + s.data = append(s.data, value...) + + // Point to the last offset added + s.indices = append(s.indices, len(s.offsets)-1) +} + +// del remove the index from the selection vector. It does not remove the value. +// Use compact to also remove it from the data. +// TODO: implement compact +func (s *stringColumn) del(i int) { + s.indices = append(s.indices[:i], s.indices[i+1:]...) +} + +func (s *stringColumn) reset() { + s.data = s.data[:0] + s.offsets = s.offsets[:0] + s.indices = s.indices[:0] +} + +func (s *stringColumn) get(i int) []byte { + // TODO: test this. It's tricky + index := s.indices[i] + start := s.offsets[index] + if index+1 >= len(s.offsets) { + return s.data[start:] + } + + end := s.offsets[index+1] + return s.data[start:end] +} + +func (s *stringColumn) len() int { + return len(s.indices) +} + +type columnarLabels struct { + names *stringColumn + values *stringColumn +} + +func (c *columnarLabels) add(name, value []byte) { + c.names.add(name) + c.values.add(value) +} + +// override overrides the value of a label if it exists and returns true. +// If the label does not exist, it returns false and does nothing. +func (c *columnarLabels) override(name, value []byte) bool { + for i := 0; i < len(c.names.indices); i++ { + if bytes.Equal(c.names.get(i), name) { + c.values.del(i) + c.names.del(i) + c.add(name, value) + return true + } + } + return false +} + +func (c *columnarLabels) reset() { + c.names.reset() + c.values.reset() +} + +func (s *columnarLabels) len() int { + return len(s.names.data) +} + +func (s *columnarLabels) get(key []byte) ([]byte, bool) { + // TODO: to a string search on s.names.data + for i := 0; i < len(s.names.indices); i++ { + if bytes.Equal(s.names.get(i), key) { + return s.values.get(i), true + } + } + return nil, false +} + +func (s *columnarLabels) getAt(i int) (name, value []byte) { + return s.names.get(i), s.values.get(i) +} + +func (s *columnarLabels) del(name []byte) { + // TODO: to a string search on s.names.data + for i := 0; i < len(s.names.indices); i++ { + if bytes.Equal(s.names.get(i), name) { + s.names.del(i) + s.values.del(i) + } + } +} + +func newColumnarLabels(capacity int) *columnarLabels { + return &columnarLabels{ + names: newStringColumn(capacity), + values: newStringColumn(capacity), + } +} + +func newColumnarLabelsFromStrings(ss ...string) *columnarLabels { + if len(ss)%2 != 0 { + panic("invalid number of strings") + } + c := newColumnarLabels(len(ss) / 2) + for i := 0; i < len(ss); i += 2 { + c.add(unsafeGetBytes(ss[i]), unsafeGetBytes(ss[i+1])) + } + return c +} diff --git a/pkg/logql/log/labels_columnar_test.go b/pkg/logql/log/labels_columnar_test.go new file mode 100644 index 0000000000000..e9886d20a0bf7 --- /dev/null +++ b/pkg/logql/log/labels_columnar_test.go @@ -0,0 +1,46 @@ +package log + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStringColumn_Add(t *testing.T) { + sc := newStringColumn(10) + sc.add([]byte("foo")) + sc.add([]byte("bar")) + sc.add([]byte("baz")) + sc.add([]byte("qux")) + + require.Equal(t, 4, sc.len()) + + require.Equal(t, []byte("foo"), sc.get(0)) + require.Equal(t, []byte("bar"), sc.get(1)) + require.Equal(t, []byte("baz"), sc.get(2)) + require.Equal(t, []byte("qux"), sc.get(3)) +} + +func TestStringColumn_Del(t *testing.T) { + sc := newStringColumn(10) + sc.add([]byte("foo")) + sc.add([]byte("bar")) + sc.add([]byte("baz")) + sc.add([]byte("qux")) + + all := "" + for i := 0; i < sc.len(); i++ { + all += string(sc.get(i)) + } + require.Equal(t, "foobarbazqux", all) + + require.Equal(t, 4, sc.len()) + sc.del(2) + require.Equal(t, 3, sc.len()) + + all = "" + for i := 0; i < sc.len(); i++ { + all += string(sc.get(i)) + } + require.Equal(t, "foobarqux", all) +} From 392b9e679ef58ca877cd21c936e4b7662fcb6508 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 16:08:26 +0200 Subject: [PATCH 07/25] Test columnar labels --- pkg/logql/log/labels_columnar.go | 2 +- pkg/logql/log/labels_columnar_test.go | 73 +++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/pkg/logql/log/labels_columnar.go b/pkg/logql/log/labels_columnar.go index 7bd17975b2796..e9dfb2761f3a5 100644 --- a/pkg/logql/log/labels_columnar.go +++ b/pkg/logql/log/labels_columnar.go @@ -87,7 +87,7 @@ func (c *columnarLabels) reset() { } func (s *columnarLabels) len() int { - return len(s.names.data) + return s.names.len() } func (s *columnarLabels) get(key []byte) ([]byte, bool) { diff --git a/pkg/logql/log/labels_columnar_test.go b/pkg/logql/log/labels_columnar_test.go index e9886d20a0bf7..c3f1004a366c9 100644 --- a/pkg/logql/log/labels_columnar_test.go +++ b/pkg/logql/log/labels_columnar_test.go @@ -44,3 +44,76 @@ func TestStringColumn_Del(t *testing.T) { } require.Equal(t, "foobarqux", all) } + +func TestColumnarLabels_Get(t *testing.T) { + cl := newColumnarLabels(10) + cl.add([]byte("foo"), []byte("bar")) + cl.add([]byte("baz"), []byte("qux")) + + require.Equal(t, 2, cl.len()) + v, ok := cl.get([]byte("foo")) + require.True(t, ok) + require.Equal(t, []byte("bar"), v) + + v, ok = cl.get([]byte("baz")) + require.True(t, ok) + require.Equal(t, []byte("qux"), v) +} + +func TestColumnarLabels_GetAt(t *testing.T) { + cl := newColumnarLabels(10) + cl.add([]byte("foo"), []byte("bar")) + cl.add([]byte("baz"), []byte("qux")) + + require.Equal(t, 2, cl.len()) + n, v := cl.getAt(0) + require.Equal(t, []byte("foo"), n) + require.Equal(t, []byte("bar"), v) + + n, v = cl.getAt(1) + require.Equal(t, []byte("baz"), n) + require.Equal(t, []byte("qux"), v) +} + +func TestColumnarLabels_Del(t *testing.T) { + cl := newColumnarLabels(10) + cl.add([]byte("foo"), []byte("bar")) + cl.add([]byte("baz"), []byte("qux")) + + require.Equal(t, 2, cl.len()) + cl.del([]byte("foo")) + require.Equal(t, 1, cl.len()) + + _, ok := cl.get([]byte("foo")) + require.False(t, ok) + + n, v := cl.getAt(0) + require.Equal(t, []byte("baz"), n) + require.Equal(t, []byte("qux"), v) +} + +func TestColumnarLabels_Override(t *testing.T) { + cl := newColumnarLabels(10) + cl.add([]byte("foo"), []byte("bar")) + cl.add([]byte("baz"), []byte("qux")) + + require.Equal(t, 2, cl.len()) + ok := cl.override([]byte("foo"), []byte("new")) + require.True(t, ok) + require.Equal(t, 2, cl.len()) + + v, ok := cl.get([]byte("foo")) + require.True(t, ok) + require.Equal(t, []byte("new"), v) + + v, ok = cl.get([]byte("baz")) + require.True(t, ok) + require.Equal(t, []byte("qux"), v) + + ok = cl.override([]byte("nonexistent"), []byte("new")) + require.False(t, ok) + require.Equal(t, 2, cl.len()) + + _, ok = cl.get([]byte("nonexistent")) + require.False(t, ok) +} From cf2c4929aa693da91bbf57bc340f70845e5902fa Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 16:15:44 +0200 Subject: [PATCH 08/25] Pass labels test --- pkg/logql/log/labels_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/logql/log/labels_test.go b/pkg/logql/log/labels_test.go index b47a2292b52bd..ff67630181a6b 100644 --- a/pkg/logql/log/labels_test.go +++ b/pkg/logql/log/labels_test.go @@ -36,11 +36,11 @@ func TestLabelsBuilder_Get(t *testing.T) { require.False(t, ok) v, category, ok := b.GetWithCategory("bar") require.True(t, ok) - require.Equal(t, "buzz", v) + require.Equal(t, "buzz", string(v)) require.Equal(t, ParsedLabel, category) v, category, ok = b.GetWithCategory("already") require.True(t, ok) - require.Equal(t, "in", v) + require.Equal(t, "in", string(v)) require.Equal(t, StreamLabel, category) b.Del("bar") _, _, ok = b.GetWithCategory("bar") From 2d24502b11a1080e50479c1221d8deb95821cf09 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 16:22:20 +0200 Subject: [PATCH 09/25] Pass parser tests --- pkg/logql/log/parser_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 50f19ddf0735b..b22cdd8174cd1 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -306,7 +306,7 @@ func TestLabelShortCircuit(t *testing.T) { name, category, ok := lbs.GetWithCategory("name") require.True(t, ok) require.Equal(t, ParsedLabel, category) - require.Contains(t, name, "text1") + require.Contains(t, string(name), "text1") }) } } @@ -1390,12 +1390,12 @@ func TestLogfmtConsistentPrecedence(t *testing.T) { require.True(t, ok) res, cat, ok := metadataStream.GetWithCategory("app") - require.Equal(t, "lowkey", res) + require.Equal(t, "lowkey", string(res)) require.Equal(t, ParsedLabel, cat) require.True(t, ok) res, cat, ok = basicStream.GetWithCategory("app") - require.Equal(t, "lowkey", res) + require.Equal(t, "lowkey", string(res)) require.Equal(t, ParsedLabel, cat) require.True(t, ok) }) @@ -1419,12 +1419,12 @@ func TestLogfmtConsistentPrecedence(t *testing.T) { require.True(t, ok) res, cat, ok := metadataStream.GetWithCategory("app") - require.Equal(t, "lowkey", res) + require.Equal(t, "lowkey", string(res)) require.Equal(t, ParsedLabel, cat) require.True(t, ok) res, cat, ok = basicStream.GetWithCategory("app") - require.Equal(t, "lowkey", res) + require.Equal(t, "lowkey", string(res)) require.Equal(t, ParsedLabel, cat) require.True(t, ok) }) From 9d54b289ddaeade3249373922e56a0292f0b4792 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 16:26:38 +0200 Subject: [PATCH 10/25] Pass all but one pipeline test --- pkg/logql/log/labels.go | 6 +++++- pkg/logql/log/labels_columnar.go | 6 ++++++ pkg/logql/log/pipeline_test.go | 6 +++--- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index dc5fa8deee65f..24a33277931d3 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -195,7 +195,11 @@ func (b *BaseLabelsBuilder) ForLabels(lbs labels.Labels, hash uint64) *LabelsBui func (b *BaseLabelsBuilder) Reset() { b.del = b.del[:0] for k := range b.add { - b.add[k].reset() + if b.add[k] != nil { + b.add[k].reset() + } else { + b.add[k] = newColumnarLabels(0) + } } b.err = "" b.errDetails = "" diff --git a/pkg/logql/log/labels_columnar.go b/pkg/logql/log/labels_columnar.go index e9dfb2761f3a5..2fa12088fda9c 100644 --- a/pkg/logql/log/labels_columnar.go +++ b/pkg/logql/log/labels_columnar.go @@ -82,6 +82,12 @@ func (c *columnarLabels) override(name, value []byte) bool { } func (c *columnarLabels) reset() { + if c.names == nil { + c.names = newStringColumn(0) + } + if c.values == nil { + c.values = newStringColumn(0) + } c.names.reset() c.values.reset() } diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index ea887f7a301cd..e2b7aa427b08a 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -99,7 +99,7 @@ func TestPipeline(t *testing.T) { require.Len(t, p.streamPipelines, 0) require.Len(t, p.baseBuilder.del, 0) for _, v := range p.baseBuilder.add { - require.Len(t, v, 0) + require.Equal(t, 0, v.len()) } } @@ -175,7 +175,7 @@ func TestPipelineWithStructuredMetadata(t *testing.T) { require.Len(t, p.streamPipelines, 0) require.Len(t, p.baseBuilder.del, 0) for _, v := range p.baseBuilder.add { - require.Len(t, v, 0) + require.Equal(t, 0, v.len()) } } @@ -511,7 +511,7 @@ func TestKeepLabelsPipeline(t *testing.T) { for i, line := range tt.lines { finalLine, finalLbs, _ := sp.Process(0, line, labels.EmptyLabels()) require.Equal(t, tt.wantLine[i], finalLine) - require.Equal(t, tt.wantLabels[i], finalLbs.Labels()) + require.Equal(t, tt.wantLabels[i], finalLbs.Labels()) // TODO: fix this require.Equal(t, labels.EmptyLabels(), finalLbs.Stream()) require.Equal(t, labels.EmptyLabels(), finalLbs.StructuredMetadata()) require.Equal(t, tt.wantLabels[i], finalLbs.Parsed()) From e596dd581d28c889b8c1218f4a4aa6619a2885dc Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 16:57:00 +0200 Subject: [PATCH 11/25] Fix keep labels --- pkg/logql/log/keep_labels.go | 6 +++++- pkg/logql/log/pipeline_test.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/logql/log/keep_labels.go b/pkg/logql/log/keep_labels.go index da77a0c181092..a078fe15a2fa3 100644 --- a/pkg/logql/log/keep_labels.go +++ b/pkg/logql/log/keep_labels.go @@ -18,6 +18,7 @@ func (kl *KeepLabels) Process(_ int64, line []byte, lbls *LabelsBuilder) ([]byte return line, true } + del := make([]string, 0, 10) lbls.Range(func(lb labels.Label) { if isSpecialLabel(lb.Name) { return @@ -37,9 +38,12 @@ func (kl *KeepLabels) Process(_ int64, line []byte, lbls *LabelsBuilder) ([]byte } if !keep { - lbls.Del(lb.Name) + del = append(del, lb.Name) } }) + for _, name := range del { + lbls.Del(name) + } return line, true } diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index e2b7aa427b08a..1da9347368c16 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -511,7 +511,7 @@ func TestKeepLabelsPipeline(t *testing.T) { for i, line := range tt.lines { finalLine, finalLbs, _ := sp.Process(0, line, labels.EmptyLabels()) require.Equal(t, tt.wantLine[i], finalLine) - require.Equal(t, tt.wantLabels[i], finalLbs.Labels()) // TODO: fix this + require.Equal(t, tt.wantLabels[i], finalLbs.Labels()) require.Equal(t, labels.EmptyLabels(), finalLbs.Stream()) require.Equal(t, labels.EmptyLabels(), finalLbs.StructuredMetadata()) require.Equal(t, tt.wantLabels[i], finalLbs.Parsed()) From b80a02802f7276a1ae4f2a1173ebad823f2157b0 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 22 Jul 2025 17:01:42 +0200 Subject: [PATCH 12/25] Reduce UnsortedLabels --- pkg/logql/log/labels.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 24a33277931d3..114d254ba01ca 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -524,6 +524,7 @@ func (b *LabelsBuilder) Range(f func(l labels.Label), categories ...LabelCategor return } +// TODO: ideally we remove this func (b *LabelsBuilder) UnsortedLabels(buf []labels.Label, categories ...LabelCategory) []labels.Label { if buf == nil { buf = make([]labels.Label, 0, b.base.Len()+1) // +1 for error label. @@ -576,12 +577,12 @@ func (b *LabelsBuilder) IntoMap(m map[string]string) { } return } - b.buf = b.UnsortedLabels(b.buf) + // todo should we also cache maps since limited by the result ? // Maps also don't create a copy of the labels. - for _, l := range b.buf { + b.Range(func(l labels.Label) { m[l.Name] = l.Value - } + }) } func (b *LabelsBuilder) Map() (map[string]string, bool) { @@ -591,13 +592,13 @@ func (b *LabelsBuilder) Map() (map[string]string, bool) { } return b.baseMap, false } - b.buf = b.UnsortedLabels(b.buf) + // todo should we also cache maps since limited by the result ? // Maps also don't create a copy of the labels. res := smp.Get() - for _, l := range b.buf { + b.Range(func(l labels.Label) { res[l.Name] = l.Value - } + }) return res, true } From 71930c66fa4414eaf0a9d875c78273bfe6179b7a Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Wed, 23 Jul 2025 10:57:58 +0200 Subject: [PATCH 13/25] Add benchmark --- pkg/logql/log/pipeline_test.go | 40 ++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index 1da9347368c16..f6edcd056a5c1 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -1,6 +1,7 @@ package log import ( + "fmt" "testing" "time" @@ -684,14 +685,24 @@ func mustFilter(f Filterer, err error) Filterer { return f } -func jsonBenchmark(b *testing.B, parser Stage) { - b.ReportAllocs() +func jsonBenchmark(b *testing.B, parser Stage, lines, cardinality int) { p := NewPipeline([]Stage{ mustFilter(NewFilter("metrics.go", LineMatchEqual)).ToStage(), parser, }) - line := []byte(`{"ts":"2020-12-27T09:15:54.333026285Z","error":"action could not be completed", "context":{"file": "metrics.go"}}`) + streams := make([][]byte, lines) + for i := 0; i < lines; i++ { + streams[i] = []byte(fmt.Sprintf( + `{ + "ts":"2020-12-27T09:15:54.333026285Z", + "error":"action could not be completed", + "context":{"file": "metrics.go"}, + "cardinality": %d + }`, + i%cardinality, + )) + } lbs := labels.FromStrings("cluster", "ops-tool1", "name", "querier", "pod", "querier-5896759c79-q7q9h", @@ -701,17 +712,20 @@ func jsonBenchmark(b *testing.B, parser Stage) { "job", "loki-dev/querier", "pod_template_hash", "5896759c79", ) + b.ReportAllocs() b.ResetTimer() sp := p.ForStream(lbs) for n := 0; n < b.N; n++ { - resLine, resLbs, resMatches = sp.Process(0, line, labels.EmptyLabels()) + for _, line := range streams { + resLine, resLbs, resMatches = sp.Process(0, line, labels.EmptyLabels()) - if !resMatches { - b.Fatalf("resulting line not ok: %s\n", line) - } + if !resMatches { + b.Fatalf("resulting line not ok: %s\n", line) + } - if resLbs.Labels().Get("context_file") != "metrics.go" { - b.Fatalf("label was not extracted correctly! %+v\n", resLbs) + if resLbs.Labels().Get("context_file") != "metrics.go" { + b.Fatalf("label was not extracted correctly! %+v\n", resLbs) + } } } } @@ -740,7 +754,11 @@ func invalidJSONBenchmark(b *testing.B, parser Stage) { } func BenchmarkJSONParser(b *testing.B) { - jsonBenchmark(b, NewJSONParser(false)) + jsonBenchmark(b, NewJSONParser(false), 1, 1) +} + +func BenchmarkJSONParserHighCardinality(b *testing.B) { + jsonBenchmark(b, NewJSONParser(true), 100_000, 10_000) } func BenchmarkJSONParserInvalidLine(b *testing.B) { @@ -755,7 +773,7 @@ func BenchmarkJSONExpressionParser(b *testing.B) { b.Fatal("cannot create new JSON expression parser") } - jsonBenchmark(b, parser) + jsonBenchmark(b, parser, 1, 1) } func BenchmarkJSONExpressionParserInvalidLine(b *testing.B) { From 23f92d4a078f6606d55a3caa4425e740ffb7660b Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Wed, 23 Jul 2025 11:40:36 +0200 Subject: [PATCH 14/25] Disable caching --- pkg/logql/log/keep_labels.go | 11 ++-- pkg/logql/log/labels.go | 83 ++++++++++++++---------------- pkg/logql/log/pipeline_test.go | 5 +- pkg/logql/syntax/extractor_test.go | 3 +- pkg/logql/syntax/parser_test.go | 4 +- 5 files changed, 52 insertions(+), 54 deletions(-) diff --git a/pkg/logql/log/keep_labels.go b/pkg/logql/log/keep_labels.go index a078fe15a2fa3..087ce438ac59d 100644 --- a/pkg/logql/log/keep_labels.go +++ b/pkg/logql/log/keep_labels.go @@ -2,7 +2,6 @@ package log import ( "github.com/grafana/loki/v3/pkg/logqlmodel" - "github.com/prometheus/prometheus/model/labels" ) type KeepLabels struct { @@ -19,26 +18,26 @@ func (kl *KeepLabels) Process(_ int64, line []byte, lbls *LabelsBuilder) ([]byte } del := make([]string, 0, 10) - lbls.Range(func(lb labels.Label) { - if isSpecialLabel(lb.Name) { + lbls.Range(func(name, value []byte) { + if isSpecialLabel(unsafeGetString(name)) { return } var keep bool for _, keepLabel := range kl.labels { - if keepLabel.Matcher != nil && keepLabel.Matcher.Name == lb.Name && keepLabel.Matcher.Matches(lb.Value) { + if keepLabel.Matcher != nil && keepLabel.Matcher.Name == unsafeGetString(name) && keepLabel.Matcher.Matches(unsafeGetString(value)) { keep = true break } - if keepLabel.Name == lb.Name { + if keepLabel.Name == unsafeGetString(name) { keep = true break } } if !keep { - del = append(del, lb.Name) + del = append(del, unsafeGetString(name)) } }) for _, name := range del { diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 114d254ba01ca..1de6c59146f60 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -49,6 +49,7 @@ func (l labelsResult) String() string { return l.s } +// TODO: this is slow. func (l labelsResult) Labels() labels.Labels { size := l.stream.Len() + l.structuredMetadata.Len() + l.parsed.Len() b := labels.NewScratchBuilder(size) @@ -134,10 +135,12 @@ type BaseLabelsBuilder struct { // LabelsBuilder is the same as labels.Builder but tailored for this package. type LabelsBuilder struct { base labels.Labels - buf []labels.Label + buf []labels.Label // TODO: try to avoid this currentResult LabelsResult groupedResult LabelsResult + scratchBuilder labels.ScratchBuilder + *BaseLabelsBuilder } @@ -433,27 +436,23 @@ func (b *LabelsBuilder) appendErrors(buf []labels.Label) []labels.Label { } return buf } -func (b *LabelsBuilder) Range(f func(l labels.Label), categories ...LabelCategory) { +func (b *LabelsBuilder) Range(f func(name, value []byte), categories ...LabelCategory) { if categories == nil { categories = allCategories } if !b.hasDel() && !b.hasAdd() && categoriesContain(categories, StreamLabel) { - b.base.Range(f) + b.base.Range(func(l labels.Label) { + f(unsafeGetBytes(l.Name), unsafeGetBytes(l.Value)) + }) if categoriesContain(categories, ParsedLabel) { if b.err != "" { - f(labels.Label{ - Name: logqlmodel.ErrorLabel, - Value: b.err, - }) + f(unsafeGetBytes(logqlmodel.ErrorLabel), unsafeGetBytes(b.err)) } if b.errDetails != "" { - f(labels.Label{ - Name: logqlmodel.ErrorDetailsLabel, - Value: b.errDetails, - }) + f(unsafeGetBytes(logqlmodel.ErrorDetailsLabel), unsafeGetBytes(b.errDetails)) } } @@ -481,9 +480,9 @@ func (b *LabelsBuilder) Range(f func(l labels.Label), categories ...LabelCategor // Take value from stream label if present if value, found := b.add[StreamLabel].get(unsafeGetBytes(l.Name)); found { - f(labels.Label{Name: l.Name, Value: unsafeGetString(value)}) // TODO: make clear that string must be copied + f(unsafeGetBytes(l.Name), value) } else { - f(l) + f(unsafeGetBytes(l.Name), unsafeGetBytes(l.Value)) } }) } @@ -495,29 +494,23 @@ func (b *LabelsBuilder) Range(f func(l labels.Label), categories ...LabelCategor continue } - f(labels.Label{Name: unsafeGetString(name), Value: unsafeGetString(value)}) + f(name, value) } } if categoriesContain(categories, ParsedLabel) { for i := 0; i < b.add[ParsedLabel].len(); i++ { name, value := b.add[ParsedLabel].getAt(i) - f(labels.Label{Name: unsafeGetString(name), Value: unsafeGetString(value)}) + f(name, value) } } if (b.HasErr() || b.HasErrorDetails()) && categoriesContain(categories, ParsedLabel) { if b.err != "" { - f(labels.Label{ - Name: logqlmodel.ErrorLabel, - Value: b.err, - }) + f(unsafeGetBytes(logqlmodel.ErrorLabel), unsafeGetBytes(b.err)) } if b.errDetails != "" { - f(labels.Label{ - Name: logqlmodel.ErrorDetailsLabel, - Value: b.errDetails, - }) + f(unsafeGetBytes(logqlmodel.ErrorDetailsLabel), unsafeGetBytes(b.errDetails)) } } @@ -532,8 +525,8 @@ func (b *LabelsBuilder) UnsortedLabels(buf []labels.Label, categories ...LabelCa buf = buf[:0] } - b.Range(func(l labels.Label) { - buf = append(buf, l) + b.Range(func(name, value []byte) { + buf = append(buf, labels.Label{Name: string(name), Value: string(value)}) }, categories...) return buf @@ -580,8 +573,8 @@ func (b *LabelsBuilder) IntoMap(m map[string]string) { // todo should we also cache maps since limited by the result ? // Maps also don't create a copy of the labels. - b.Range(func(l labels.Label) { - m[l.Name] = l.Value + b.Range(func(name, value []byte) { + m[string(name)] = string(value) }) } @@ -596,8 +589,8 @@ func (b *LabelsBuilder) Map() (map[string]string, bool) { // todo should we also cache maps since limited by the result ? // Maps also don't create a copy of the labels. res := smp.Get() - b.Range(func(l labels.Label) { - res[l.Name] = l.Value + b.Range(func(name, value []byte) { + res[string(name)] = string(value) }) return res, true } @@ -612,8 +605,12 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { } // Get all labels at once and sort them - b.buf = b.UnsortedLabels(b.buf) - lbls := labels.New(b.buf...) + b.scratchBuilder.Reset() + b.Range(func(name, value []byte) { + b.scratchBuilder.UnsafeAddBytes(name, value) + }) + + lbls := b.scratchBuilder.Labels() hash := b.hasher.Hash(lbls) if cached, ok := b.resultCache[hash]; ok { @@ -621,26 +618,26 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { } // Now segregate the sorted labels into their categories - var stream, meta, parsed []labels.Label // TODO: use ScratchBuilder instead + var stream, meta, parsed labels.ScratchBuilder - for _, l := range b.buf { + b.Range(func(name, value []byte) { // Skip error labels for stream and meta categories - if l.Name == logqlmodel.ErrorLabel || l.Name == logqlmodel.ErrorDetailsLabel { - parsed = append(parsed, l) - continue + if unsafeGetString(name) == logqlmodel.ErrorLabel || unsafeGetString(name) == logqlmodel.ErrorDetailsLabel { + parsed.UnsafeAddBytes(name, value) + return } // Check which category this label belongs to - if labelsContain(b.add[ParsedLabel], unsafeGetBytes(l.Name)) { - parsed = append(parsed, l) - } else if labelsContain(b.add[StructuredMetadataLabel], unsafeGetBytes(l.Name)) { - meta = append(meta, l) + if labelsContain(b.add[ParsedLabel], name) { + parsed.UnsafeAddBytes(name, value) + } else if labelsContain(b.add[StructuredMetadataLabel], name) { + meta.UnsafeAddBytes(name, value) } else { - stream = append(stream, l) + stream.UnsafeAddBytes(name, value) } - } + }) - result := NewLabelsResult(lbls.String(), hash, labels.New(stream...), labels.New(meta...), labels.New(parsed...)) + result := NewLabelsResult(lbls.String(), hash, stream.Labels(), meta.Labels(), parsed.Labels()) b.resultCache[hash] = result return result diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index f6edcd056a5c1..f8b87e27d9910 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -698,8 +698,10 @@ func jsonBenchmark(b *testing.B, parser Stage, lines, cardinality int) { "ts":"2020-12-27T09:15:54.333026285Z", "error":"action could not be completed", "context":{"file": "metrics.go"}, + "line": "line %d", "cardinality": %d }`, + i, // disables caching i%cardinality, )) } @@ -714,8 +716,9 @@ func jsonBenchmark(b *testing.B, parser Stage, lines, cardinality int) { ) b.ReportAllocs() b.ResetTimer() - sp := p.ForStream(lbs) for n := 0; n < b.N; n++ { + // ForStream creates the LabelsBuilder. Since it caches the results it must be created for each iteration. + sp := p.ForStream(lbs) for _, line := range streams { resLine, resLbs, resMatches = sp.Process(0, line, labels.EmptyLabels()) diff --git a/pkg/logql/syntax/extractor_test.go b/pkg/logql/syntax/extractor_test.go index d64c7003ff75a..7252a0f67c702 100644 --- a/pkg/logql/syntax/extractor_test.go +++ b/pkg/logql/syntax/extractor_test.go @@ -160,8 +160,7 @@ func Test_MultiVariantExpr_Extractors(t *testing.T) { seen := make(map[string]float64, len(samples)) for _, s := range samples { - lbls := s.Labels.Labels() - seen[lbls.String()] = s.Value + seen[s.Labels.String()] = s.Value } mvExpr, err := ParseSampleExpr(tc.variantQuery) diff --git a/pkg/logql/syntax/parser_test.go b/pkg/logql/syntax/parser_test.go index a55c237477b31..89f660d1ba22c 100644 --- a/pkg/logql/syntax/parser_test.go +++ b/pkg/logql/syntax/parser_test.go @@ -3467,12 +3467,12 @@ func Test_PipelineCombined(t *testing.T) { p, err := expr.Pipeline() require.Nil(t, err) sp := p.ForStream(labels.EmptyLabels()) - line, lbs, matches := sp.Process(0, []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`), labels.EmptyLabels()) + line, lbsResult, matches := sp.Process(0, []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`), labels.EmptyLabels()) require.True(t, matches) require.Equal( t, labels.FromStrings("caller", "logging.go:66", "duration", "1.5s", "level", "debug", "method", "POST", "msg", "POST /api/prom/api/v1/query_range (200) 1.5s", "path", "/api/prom/api/v1/query_range", "status", "200", "traceID", "a9d4d8a928d8db1", "ts", "2020-10-02T10:10:42.092268913Z"), - lbs.Labels(), + lbsResult.Labels(), ) require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line)) } From 562a507895a91759b7e45379ad915e513284bad6 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Wed, 23 Jul 2025 12:11:22 +0200 Subject: [PATCH 15/25] Reuse buffers --- pkg/logql/log/labels.go | 36 +++++++++++++++++++++++++++--------- pkg/logql/log/parser.go | 32 +++++++++++++++++++++----------- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 1de6c59146f60..70d80f538b9f8 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -376,7 +376,8 @@ func (b *LabelsBuilder) Set(category LabelCategory, n, v []byte) *LabelsBuilder // Note that because this is used for bypassing extracted fields, and // because parsed labels always take precedence over structured metadata // and stream labels, we must only call RecordExtracted for parsed labels. - b.parserKeyHints.RecordExtracted(string(n)) + //b.parserKeyHints.RecordExtracted(string(n)) + b.parserKeyHints.RecordExtracted(unsafeGetString(n)) } return b } @@ -618,26 +619,43 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { } // Now segregate the sorted labels into their categories - var stream, meta, parsed labels.ScratchBuilder + var stream, meta, parsed labels.Labels + // Parsed + b.scratchBuilder.Reset() b.Range(func(name, value []byte) { // Skip error labels for stream and meta categories if unsafeGetString(name) == logqlmodel.ErrorLabel || unsafeGetString(name) == logqlmodel.ErrorDetailsLabel { - parsed.UnsafeAddBytes(name, value) + b.scratchBuilder.UnsafeAddBytes(name, value) return } // Check which category this label belongs to if labelsContain(b.add[ParsedLabel], name) { - parsed.UnsafeAddBytes(name, value) - } else if labelsContain(b.add[StructuredMetadataLabel], name) { - meta.UnsafeAddBytes(name, value) - } else { - stream.UnsafeAddBytes(name, value) + b.scratchBuilder.UnsafeAddBytes(name, value) + } + }) + parsed = b.scratchBuilder.Labels() + + // Structured Metadata + b.scratchBuilder.Reset() + b.Range(func(name, value []byte) { + if labelsContain(b.add[StructuredMetadataLabel], name) { + b.scratchBuilder.UnsafeAddBytes(name, value) + } + }) + meta = b.scratchBuilder.Labels() + + // Stream + b.scratchBuilder.Reset() + b.Range(func(name, value []byte) { + if labelsContain(b.add[StreamLabel], name) { + b.scratchBuilder.UnsafeAddBytes(name, value) } }) + stream = b.scratchBuilder.Labels() - result := NewLabelsResult(lbls.String(), hash, stream.Labels(), meta.Labels(), parsed.Labels()) + result := NewLabelsResult(lbls.String(), hash, stream, meta, parsed) b.resultCache[hash] = result return result diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index fa39cdc55fcbf..f21a54d4b8f5e 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -60,6 +60,7 @@ type JSONParser struct { keys internedStringSet parserHints ParserHint sanitizedPrefixBuffer []byte + valueBuffer []byte } // NewJSONParser creates a log stage that can parse a json log line and add properties as labels. @@ -69,6 +70,7 @@ func NewJSONParser(captureJSONPath bool) *JSONParser { keys: internedStringSet{}, captureJSONPath: captureJSONPath, sanitizedPrefixBuffer: make([]byte, 0, 64), + valueBuffer: make([]byte, 0, 64), } } @@ -151,7 +153,10 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu if !ok || j.lbs.ParserLabelHints().Extracted(sanitizedKey) { return nil } - j.lbs.Set(ParsedLabel, unsafeGetBytes(sanitizedKey), readValue(value, dataType)) + + j.valueBuffer = readValue(value, dataType, j.valueBuffer) + j.lbs.Set(ParsedLabel, unsafeGetBytes(sanitizedKey), j.valueBuffer) + if j.captureJSONPath { j.lbs.SetJSONPath(sanitizedKey, []string{string(key)}) } @@ -196,7 +201,8 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu return nil } - j.lbs.Set(ParsedLabel, unsafeGetBytes(keyString), readValue(value, dataType)) + j.valueBuffer = readValue(value, dataType, j.valueBuffer) + j.lbs.Set(ParsedLabel, unsafeGetBytes(keyString), j.valueBuffer) if !j.parserHints.ShouldContinueParsingLine(keyString, j.lbs) { return errLabelDoesNotMatch @@ -243,10 +249,10 @@ func (j *JSONParser) buildJSONPathFromPrefixBuffer() []string { func (j *JSONParser) RequiredLabelNames() []string { return []string{} } -func readValue(v []byte, dataType jsonparser.ValueType) []byte { +func readValue(v []byte, dataType jsonparser.ValueType, buf []byte) []byte { switch dataType { case jsonparser.String: - return unescapeJSONString(v) + return unescapeJSONString(v, buf) case jsonparser.Null: return nil case jsonparser.Number: @@ -261,9 +267,9 @@ func readValue(v []byte, dataType jsonparser.ValueType) []byte { } } -func unescapeJSONString(b []byte) []byte { - var stackbuf [unescapeStackBufSize]byte // stack-allocated array for allocation-free unescaping of small strings - bU, err := jsonparser.Unescape(b, stackbuf[:]) +func unescapeJSONString(b, buf []byte) []byte { + //var stackbuf [unescapeStackBufSize]byte // stack-allocated array for allocation-free unescaping of small strings + bU, err := jsonparser.Unescape(b, buf[:]) if err != nil { return nil } @@ -621,6 +627,7 @@ type JSONExpressionParser struct { ids []string paths [][]string keys internedStringSet + valueBuffer []byte } func NewJSONExpressionParser(expressions []LabelExtractionExpr) (*JSONExpressionParser, error) { @@ -644,6 +651,7 @@ func NewJSONExpressionParser(expressions []LabelExtractionExpr) (*JSONExpression ids: ids, paths: paths, keys: internedStringSet{}, + valueBuffer: make([]byte, 0, 64), }, nil } @@ -694,7 +702,8 @@ func (j *JSONExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilder) case jsonparser.Object: lbs.Set(ParsedLabel, unsafeGetBytes(key), data) default: - lbs.Set(ParsedLabel, unsafeGetBytes(key), unescapeJSONString(data)) + j.valueBuffer = unescapeJSONString(data, j.valueBuffer) + lbs.Set(ParsedLabel, unsafeGetBytes(key), j.valueBuffer) } matches++ @@ -725,7 +734,7 @@ func (j *JSONExpressionParser) RequiredLabelNames() []string { return []string{} type UnpackParser struct { lbsBuffer []string - + valueBuffer []byte keys internedStringSet } @@ -737,6 +746,7 @@ func NewUnpackParser() *UnpackParser { return &UnpackParser{ lbsBuffer: make([]string, 0, 16), keys: internedStringSet{}, + valueBuffer: make([]byte, 0, 64), } } @@ -810,8 +820,8 @@ func (u *UnpackParser) unpack(entry []byte, lbs *LabelsBuilder) ([]byte, error) } // append to the buffer of labels - // TODO: use byte slice - u.lbsBuffer = append(u.lbsBuffer, sanitizeLabelKey(key, true), string(unescapeJSONString(value))) + u.valueBuffer = unescapeJSONString(value, u.valueBuffer) + u.lbsBuffer = append(u.lbsBuffer, sanitizeLabelKey(key, true), string(u.valueBuffer)) default: return nil } From aa5d0426798c62a330603800d3bba19fe54be3c2 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Fri, 25 Jul 2025 16:19:49 +0200 Subject: [PATCH 16/25] Benchmark highcardinality logfmt --- pkg/logql/log/pipeline_test.go | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index f8b87e27d9910..d18a97e8a49b4 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -685,7 +685,7 @@ func mustFilter(f Filterer, err error) Filterer { return f } -func jsonBenchmark(b *testing.B, parser Stage, lines, cardinality int) { +func jsonBenchmark(b *testing.B, parser Stage, lines int) { p := NewPipeline([]Stage{ mustFilter(NewFilter("metrics.go", LineMatchEqual)).ToStage(), @@ -698,11 +698,9 @@ func jsonBenchmark(b *testing.B, parser Stage, lines, cardinality int) { "ts":"2020-12-27T09:15:54.333026285Z", "error":"action could not be completed", "context":{"file": "metrics.go"}, - "line": "line %d", - "cardinality": %d + "line": "line %d" }`, i, // disables caching - i%cardinality, )) } lbs := labels.FromStrings("cluster", "ops-tool1", @@ -757,11 +755,11 @@ func invalidJSONBenchmark(b *testing.B, parser Stage) { } func BenchmarkJSONParser(b *testing.B) { - jsonBenchmark(b, NewJSONParser(false), 1, 1) + jsonBenchmark(b, NewJSONParser(false), 1) } func BenchmarkJSONParserHighCardinality(b *testing.B) { - jsonBenchmark(b, NewJSONParser(true), 100_000, 10_000) + jsonBenchmark(b, NewJSONParser(true), 100_000) } func BenchmarkJSONParserInvalidLine(b *testing.B) { @@ -776,7 +774,7 @@ func BenchmarkJSONExpressionParser(b *testing.B) { b.Fatal("cannot create new JSON expression parser") } - jsonBenchmark(b, parser, 1, 1) + jsonBenchmark(b, parser, 1) } func BenchmarkJSONExpressionParserInvalidLine(b *testing.B) { @@ -790,7 +788,7 @@ func BenchmarkJSONExpressionParserInvalidLine(b *testing.B) { invalidJSONBenchmark(b, parser) } -func logfmtBenchmark(b *testing.B, parser Stage) { +func logfmtBenchmark(b *testing.B, parser Stage, lines int) { b.ReportAllocs() p := NewPipeline([]Stage{ @@ -798,7 +796,10 @@ func logfmtBenchmark(b *testing.B, parser Stage) { parser, }) - line := []byte(`level=info ts=2020-10-18T18:04:22.147378997Z caller=metrics.go:81 org_id=29 traceID=29a0f088b047eb8c latency=fast query="{stream=\"stdout\",pod=\"loki-canary-xmjzp\"}" query_type=limited range_type=range length=20s step=1s duration=58.126671ms status=200 throughput_mb=2.496547 total_bytes_mb=0.145116`) + streams := make([][]byte, lines) + for i := 0; i < lines; i++ { + streams[i] = []byte(fmt.Sprintf(`level=info ts=2020-10-18T18:04:22.147378997Z caller=metrics.go:81 org_id=29 traceID=29a0f088b047eb8c latency=fast query="{stream=\"stdout\",pod=\"loki-canary-xmjzp\"}" query_type=limited range_type=range length=20s step=1s duration=58.126671ms status=200 throughput_mb=2.496547 total_bytes_mb=0.145116, line=%d`, i)) + } lbs := labels.FromStrings("cluster", "ops-tool1", "name", "querier", "ts", "2020-10-18T18:04:22.147378997Z", @@ -806,20 +807,26 @@ func logfmtBenchmark(b *testing.B, parser Stage) { b.ResetTimer() sp := p.ForStream(lbs) for n := 0; n < b.N; n++ { - resLine, resLbs, resMatches = sp.Process(0, line, labels.EmptyLabels()) + for _, line := range streams { + resLine, resLbs, resMatches = sp.Process(0, line, labels.EmptyLabels()) if !resMatches { b.Fatalf("resulting line not ok: %s\n", line) } if resLbs.Labels().Get("ts") != "2020-10-18T18:04:22.147378997Z" { - b.Fatalf("label was not extracted correctly! %+v\n", resLbs) + b.Fatalf("label was not extracted correctly! %+v\n", resLbs) + } } } } func BenchmarkLogfmtParser(b *testing.B) { - logfmtBenchmark(b, NewLogfmtParser(false, false)) + logfmtBenchmark(b, NewLogfmtParser(false, false), 1) +} + +func BenchmarkLogfmtParserHighCardinality(b *testing.B) { + logfmtBenchmark(b, NewLogfmtParser(false, false), 100_000) } func BenchmarkLogfmtExpressionParser(b *testing.B) { @@ -830,5 +837,5 @@ func BenchmarkLogfmtExpressionParser(b *testing.B) { b.Fatal("cannot create new logfmt expression parser:", err.Error()) } - logfmtBenchmark(b, parser) + logfmtBenchmark(b, parser, 1) } From 5da55cc4ae17b8a5f16305afba6c9d569113b027 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Mon, 28 Jul 2025 17:25:07 +0200 Subject: [PATCH 17/25] Fix logfmt labels --- pkg/logql/log/labels.go | 2 +- pkg/logql/log/pipeline_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 70d80f538b9f8..462d01eb2f05e 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -649,7 +649,7 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { // Stream b.scratchBuilder.Reset() b.Range(func(name, value []byte) { - if labelsContain(b.add[StreamLabel], name) { + if !labelsContain(b.add[ParsedLabel], name) && !labelsContain(b.add[StructuredMetadataLabel], name) { b.scratchBuilder.UnsafeAddBytes(name, value) } }) diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index d18a97e8a49b4..60659d77e3ea1 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -815,7 +815,7 @@ func logfmtBenchmark(b *testing.B, parser Stage, lines int) { } if resLbs.Labels().Get("ts") != "2020-10-18T18:04:22.147378997Z" { - b.Fatalf("label was not extracted correctly! %+v\n", resLbs) + b.Fatalf("label was not extracted correctly! ts was %v, expected 2020-10-18T18:04:22.147378997Z, %+v\n", resLbs.Labels().Get("ts"), resLbs) } } } From b1e5ce2106b239a0db0fb457d4ae3a0c3e64e262 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 29 Jul 2025 10:56:07 +0200 Subject: [PATCH 18/25] Use unsafe --- pkg/logql/log/parser.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index f21a54d4b8f5e..ecc813480181a 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -158,7 +158,8 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu j.lbs.Set(ParsedLabel, unsafeGetBytes(sanitizedKey), j.valueBuffer) if j.captureJSONPath { - j.lbs.SetJSONPath(sanitizedKey, []string{string(key)}) + // TODO: very this is ok. buildJSONPathFromPrefixBuffer uses and unsafe string + j.lbs.SetJSONPath(sanitizedKey, []string{unsafeGetString(key)}) } if !j.parserHints.ShouldContinueParsingLine(sanitizedKey, j.lbs) { From 8c163a0d67ca88eab633ac860aaa28755157a816 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 29 Jul 2025 12:09:39 +0200 Subject: [PATCH 19/25] Faster labels get --- pkg/logql/log/labels_columnar.go | 41 ++++++++++++++++++-- pkg/logql/log/labels_columnar_test.go | 54 +++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/pkg/logql/log/labels_columnar.go b/pkg/logql/log/labels_columnar.go index 2fa12088fda9c..e2bdcece6ee2e 100644 --- a/pkg/logql/log/labels_columnar.go +++ b/pkg/logql/log/labels_columnar.go @@ -1,6 +1,9 @@ package log -import "bytes" +import ( + "bytes" + "slices" +) type stringColumn struct { data []byte @@ -42,7 +45,6 @@ func (s *stringColumn) reset() { } func (s *stringColumn) get(i int) []byte { - // TODO: test this. It's tricky index := s.indices[i] start := s.offsets[index] if index+1 >= len(s.offsets) { @@ -53,6 +55,33 @@ func (s *stringColumn) get(i int) []byte { return s.data[start:end] } +func (s *stringColumn) index(value []byte) int { + idx := bytes.Index(s.data, value) + if idx == -1 { + return -1 + } + + // Find index in offsets + idx = slices.Index(s.offsets, idx) + if idx == -1 { + return -1 + } + + // Verify length + var length int + if idx+1 >= len(s.offsets) { + length = len(s.data) - s.offsets[idx] + } else { + length = s.offsets[idx+1] - s.offsets[idx] + } + if length != len(value) { + return -1 + } + + // Find index in indices + return slices.Index(s.indices, idx) +} + func (s *stringColumn) len() int { return len(s.indices) } @@ -97,13 +126,19 @@ func (s *columnarLabels) len() int { } func (s *columnarLabels) get(key []byte) ([]byte, bool) { - // TODO: to a string search on s.names.data + /* for i := 0; i < len(s.names.indices); i++ { if bytes.Equal(s.names.get(i), key) { return s.values.get(i), true } } return nil, false + */ + idx := s.names.index(key) + if idx == -1 { + return nil, false + } + return s.values.get(idx), true } func (s *columnarLabels) getAt(i int) (name, value []byte) { diff --git a/pkg/logql/log/labels_columnar_test.go b/pkg/logql/log/labels_columnar_test.go index c3f1004a366c9..bad563c8db180 100644 --- a/pkg/logql/log/labels_columnar_test.go +++ b/pkg/logql/log/labels_columnar_test.go @@ -1,6 +1,7 @@ package log import ( + "fmt" "testing" "github.com/stretchr/testify/require" @@ -60,6 +61,28 @@ func TestColumnarLabels_Get(t *testing.T) { require.Equal(t, []byte("qux"), v) } +func TestStringColumn_Index(t *testing.T) { + sc := newStringColumn(10) + sc.add([]byte("foo")) + sc.add([]byte("bar")) + sc.add([]byte("baz")) + sc.add([]byte("qux")) + + require.Equal(t, 0, sc.index([]byte("foo"))) + require.Equal(t, 1, sc.index([]byte("bar"))) + require.Equal(t, 2, sc.index([]byte("baz"))) + require.Equal(t, 3, sc.index([]byte("qux"))) + + require.Equal(t, -1, sc.index([]byte("ba"))) + require.Equal(t, -1, sc.index([]byte("bazq"))) + require.Equal(t, -1, sc.index([]byte("qu"))) + require.Equal(t, -1, sc.index([]byte("nonexistent"))) + + sc.del(2) + require.Equal(t, 2, sc.index([]byte("qux"))) + require.Equal(t, -1, sc.index([]byte("baz"))) +} + func TestColumnarLabels_GetAt(t *testing.T) { cl := newColumnarLabels(10) cl.add([]byte("foo"), []byte("bar")) @@ -117,3 +140,34 @@ func TestColumnarLabels_Override(t *testing.T) { _, ok = cl.get([]byte("nonexistent")) require.False(t, ok) } + +func BenchmarkColumnarLabels_Get(b *testing.B) { + for _, l := range []int{10, 30, 100, 300, 1000} { + cl := newColumnarLabels(l) + for i := 0; i < l; i++ { + cl.add([]byte(fmt.Sprintf("foo%d", i)), []byte(fmt.Sprintf("bar%d", i))) + } + + b.ResetTimer() + b.Run(fmt.Sprintf("get-middle-%d", l), func(b *testing.B) { + for i := 0; i < b.N; i++ { + cl.get([]byte(fmt.Sprintf("foo%d", l/2))) + } + }) + b.Run(fmt.Sprintf("get-first-%d", l), func(b *testing.B) { + for i := 0; i < b.N; i++ { + cl.get([]byte("foo0")) + } + }) + b.Run(fmt.Sprintf("get-last-%d", l), func(b *testing.B) { + for i := 0; i < b.N; i++ { + cl.get([]byte(fmt.Sprintf("foo%d", l-1))) + } + }) + b.Run(fmt.Sprintf("get-nonexistent-%d", l), func(b *testing.B) { + for i := 0; i < b.N; i++ { + cl.get([]byte("nonexistent")) + } + }) + } +} From 6e94d7510660444c4c528ee8e860fab244243a5a Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 29 Jul 2025 12:53:22 +0200 Subject: [PATCH 20/25] Switch indexing --- pkg/logql/log/labels_columnar.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/logql/log/labels_columnar.go b/pkg/logql/log/labels_columnar.go index e2bdcece6ee2e..cdd3c7e809372 100644 --- a/pkg/logql/log/labels_columnar.go +++ b/pkg/logql/log/labels_columnar.go @@ -126,14 +126,16 @@ func (s *columnarLabels) len() int { } func (s *columnarLabels) get(key []byte) ([]byte, bool) { - /* - for i := 0; i < len(s.names.indices); i++ { - if bytes.Equal(s.names.get(i), key) { - return s.values.get(i), true + // Benchmarking showed that linear search is faster for small number of labels. + if s.names.len() <= 50 { + for i := 0; i < len(s.names.indices); i++ { + if bytes.Equal(s.names.get(i), key) { + return s.values.get(i), true + } } + return nil, false } - return nil, false - */ + idx := s.names.index(key) if idx == -1 { return nil, false From 06ccf48c7721508a7ee51c56db4975ca36f4abc0 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 29 Jul 2025 13:40:59 +0200 Subject: [PATCH 21/25] Disable validation --- pkg/logql/log/labels.go | 1 - pkg/logql/log/pipeline_test.go | 12 ++++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 462d01eb2f05e..61893cbd9aed3 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -49,7 +49,6 @@ func (l labelsResult) String() string { return l.s } -// TODO: this is slow. func (l labelsResult) Labels() labels.Labels { size := l.stream.Len() + l.structuredMetadata.Len() + l.parsed.Len() b := labels.NewScratchBuilder(size) diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index 60659d77e3ea1..c158c801a3d79 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -724,9 +724,9 @@ func jsonBenchmark(b *testing.B, parser Stage, lines int) { b.Fatalf("resulting line not ok: %s\n", line) } - if resLbs.Labels().Get("context_file") != "metrics.go" { - b.Fatalf("label was not extracted correctly! %+v\n", resLbs) - } + //if resLbs.Parsed().Get("context_file") != "metrics.go" { + // b.Fatalf("label was not extracted correctly! context_file was %v, expected metrics.go, %+v\n", resLbs.Parsed().Get("context_file"), resLbs.Parsed()) + //} } } } @@ -814,9 +814,9 @@ func logfmtBenchmark(b *testing.B, parser Stage, lines int) { b.Fatalf("resulting line not ok: %s\n", line) } - if resLbs.Labels().Get("ts") != "2020-10-18T18:04:22.147378997Z" { - b.Fatalf("label was not extracted correctly! ts was %v, expected 2020-10-18T18:04:22.147378997Z, %+v\n", resLbs.Labels().Get("ts"), resLbs) - } + if resLbs.Stream().Get("ts") != "2020-10-18T18:04:22.147378997Z" { + b.Fatalf("label was not extracted correctly! ts was %v, expected 2020-10-18T18:04:22.147378997Z, %+v\n", resLbs.Stream().Get("ts"), resLbs) + } } } } From c0f5df56d92dd581a5f4a2bbb4e842623d8148f1 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Wed, 6 Aug 2025 12:16:20 +0200 Subject: [PATCH 22/25] Format code --- pkg/logql/log/parser.go | 24 ++++++++++++------------ pkg/logql/log/pipeline_test.go | 12 ++++++------ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index ecc813480181a..5f74d5761d543 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -35,7 +35,7 @@ var ( _ Stage = &RegexpParser{} _ Stage = &LogfmtParser{} - trueBytes = []byte("true") + trueBytes = []byte("true") falseBytes = []byte("false") errUnexpectedJSONObject = fmt.Errorf("expecting json object(%d), but it is not", jsoniter.ObjectValue) @@ -158,7 +158,7 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu j.lbs.Set(ParsedLabel, unsafeGetBytes(sanitizedKey), j.valueBuffer) if j.captureJSONPath { - // TODO: very this is ok. buildJSONPathFromPrefixBuffer uses and unsafe string + // TODO: very this is ok. buildJSONPathFromPrefixBuffer uses and unsafe string j.lbs.SetJSONPath(sanitizedKey, []string{unsafeGetString(key)}) } @@ -625,9 +625,9 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde func (l *LogfmtExpressionParser) RequiredLabelNames() []string { return []string{} } type JSONExpressionParser struct { - ids []string - paths [][]string - keys internedStringSet + ids []string + paths [][]string + keys internedStringSet valueBuffer []byte } @@ -649,9 +649,9 @@ func NewJSONExpressionParser(expressions []LabelExtractionExpr) (*JSONExpression } return &JSONExpressionParser{ - ids: ids, - paths: paths, - keys: internedStringSet{}, + ids: ids, + paths: paths, + keys: internedStringSet{}, valueBuffer: make([]byte, 0, 64), }, nil } @@ -734,9 +734,9 @@ func isValidJSONStart(data []byte) bool { func (j *JSONExpressionParser) RequiredLabelNames() []string { return []string{} } type UnpackParser struct { - lbsBuffer []string + lbsBuffer []string valueBuffer []byte - keys internedStringSet + keys internedStringSet } // NewUnpackParser creates a new unpack stage. @@ -745,8 +745,8 @@ type UnpackParser struct { // see https://grafana.com/docs/loki/latest/clients/promtail/stages/pack/ func NewUnpackParser() *UnpackParser { return &UnpackParser{ - lbsBuffer: make([]string, 0, 16), - keys: internedStringSet{}, + lbsBuffer: make([]string, 0, 16), + keys: internedStringSet{}, valueBuffer: make([]byte, 0, 64), } } diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index c158c801a3d79..f7b9ed8eb3448 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -512,7 +512,7 @@ func TestKeepLabelsPipeline(t *testing.T) { for i, line := range tt.lines { finalLine, finalLbs, _ := sp.Process(0, line, labels.EmptyLabels()) require.Equal(t, tt.wantLine[i], finalLine) - require.Equal(t, tt.wantLabels[i], finalLbs.Labels()) + require.Equal(t, tt.wantLabels[i], finalLbs.Labels()) require.Equal(t, labels.EmptyLabels(), finalLbs.Stream()) require.Equal(t, labels.EmptyLabels(), finalLbs.StructuredMetadata()) require.Equal(t, tt.wantLabels[i], finalLbs.Parsed()) @@ -810,13 +810,13 @@ func logfmtBenchmark(b *testing.B, parser Stage, lines int) { for _, line := range streams { resLine, resLbs, resMatches = sp.Process(0, line, labels.EmptyLabels()) - if !resMatches { - b.Fatalf("resulting line not ok: %s\n", line) - } + if !resMatches { + b.Fatalf("resulting line not ok: %s\n", line) + } - if resLbs.Stream().Get("ts") != "2020-10-18T18:04:22.147378997Z" { + if resLbs.Stream().Get("ts") != "2020-10-18T18:04:22.147378997Z" { b.Fatalf("label was not extracted correctly! ts was %v, expected 2020-10-18T18:04:22.147378997Z, %+v\n", resLbs.Stream().Get("ts"), resLbs) - } + } } } } From 8b8d435751946b248493175668fb7d404628db5d Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 19 Aug 2025 15:16:17 +0200 Subject: [PATCH 23/25] Sort labels --- pkg/logql/log/labels.go | 4 ++++ pkg/logql/log/labels_columnar.go | 32 ++++++++++++++++---------------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 61893cbd9aed3..c4b3e02a2e84d 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -609,6 +609,7 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { b.Range(func(name, value []byte) { b.scratchBuilder.UnsafeAddBytes(name, value) }) + b.scratchBuilder.Sort() lbls := b.scratchBuilder.Labels() hash := b.hasher.Hash(lbls) @@ -634,6 +635,7 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { b.scratchBuilder.UnsafeAddBytes(name, value) } }) + b.scratchBuilder.Sort() parsed = b.scratchBuilder.Labels() // Structured Metadata @@ -643,6 +645,7 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { b.scratchBuilder.UnsafeAddBytes(name, value) } }) + b.scratchBuilder.Sort() meta = b.scratchBuilder.Labels() // Stream @@ -652,6 +655,7 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { b.scratchBuilder.UnsafeAddBytes(name, value) } }) + b.scratchBuilder.Sort() stream = b.scratchBuilder.Labels() result := NewLabelsResult(lbls.String(), hash, stream, meta, parsed) diff --git a/pkg/logql/log/labels_columnar.go b/pkg/logql/log/labels_columnar.go index cdd3c7e809372..05f7f2f261edb 100644 --- a/pkg/logql/log/labels_columnar.go +++ b/pkg/logql/log/labels_columnar.go @@ -121,38 +121,38 @@ func (c *columnarLabels) reset() { c.values.reset() } -func (s *columnarLabels) len() int { - return s.names.len() +func (c *columnarLabels) len() int { + return c.names.len() } -func (s *columnarLabels) get(key []byte) ([]byte, bool) { +func (c *columnarLabels) get(key []byte) ([]byte, bool) { // Benchmarking showed that linear search is faster for small number of labels. - if s.names.len() <= 50 { - for i := 0; i < len(s.names.indices); i++ { - if bytes.Equal(s.names.get(i), key) { - return s.values.get(i), true + if c.names.len() <= 50 { + for i := 0; i < len(c.names.indices); i++ { + if bytes.Equal(c.names.get(i), key) { + return c.values.get(i), true } } return nil, false } - idx := s.names.index(key) + idx := c.names.index(key) if idx == -1 { return nil, false } - return s.values.get(idx), true + return c.values.get(idx), true } -func (s *columnarLabels) getAt(i int) (name, value []byte) { - return s.names.get(i), s.values.get(i) +func (c *columnarLabels) getAt(i int) (name, value []byte) { + return c.names.get(i), c.values.get(i) } -func (s *columnarLabels) del(name []byte) { +func (c *columnarLabels) del(name []byte) { // TODO: to a string search on s.names.data - for i := 0; i < len(s.names.indices); i++ { - if bytes.Equal(s.names.get(i), name) { - s.names.del(i) - s.values.del(i) + for i := 0; i < len(c.names.indices); i++ { + if bytes.Equal(c.names.get(i), name) { + c.names.del(i) + c.values.del(i) } } } From 5fdbdbcad3511cdf2cee83b81ce62e0b0343eade Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 19 Aug 2025 15:40:52 +0200 Subject: [PATCH 24/25] Fix error label issue --- pkg/logql/log/label_filter_test.go | 1 + pkg/logql/log/labels.go | 7 +++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/logql/log/label_filter_test.go b/pkg/logql/log/label_filter_test.go index f5258e66754ef..8a451b724add5 100644 --- a/pkg/logql/log/label_filter_test.go +++ b/pkg/logql/log/label_filter_test.go @@ -4,6 +4,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/prometheus/prometheus/model/labels" diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index c4b3e02a2e84d..76fdeec58df81 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -513,8 +513,6 @@ func (b *LabelsBuilder) Range(f func(name, value []byte), categories ...LabelCat f(unsafeGetBytes(logqlmodel.ErrorDetailsLabel), unsafeGetBytes(b.errDetails)) } } - - return } // TODO: ideally we remove this @@ -624,7 +622,7 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { // Parsed b.scratchBuilder.Reset() b.Range(func(name, value []byte) { - // Skip error labels for stream and meta categories + // Add error labels to parsed labels, ie skip them for stream and meta categories if unsafeGetString(name) == logqlmodel.ErrorLabel || unsafeGetString(name) == logqlmodel.ErrorDetailsLabel { b.scratchBuilder.UnsafeAddBytes(name, value) return @@ -651,7 +649,8 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { // Stream b.scratchBuilder.Reset() b.Range(func(name, value []byte) { - if !labelsContain(b.add[ParsedLabel], name) && !labelsContain(b.add[StructuredMetadataLabel], name) { + if !labelsContain(b.add[ParsedLabel], name) && !labelsContain(b.add[StructuredMetadataLabel], name) && + unsafeGetString(name) != logqlmodel.ErrorLabel && unsafeGetString(name) != logqlmodel.ErrorDetailsLabel { b.scratchBuilder.UnsafeAddBytes(name, value) } }) From 9dc3ce62253e3f92b2a8df2cd3c6a78b911a3adf Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Mon, 1 Sep 2025 11:12:09 +0200 Subject: [PATCH 25/25] Address feedback --- pkg/logql/log/labels.go | 11 ++++++----- pkg/logql/log/labels_columnar.go | 12 ++++++++++++ pkg/logql/log/parser.go | 5 +---- pkg/logql/log/pipeline_test.go | 6 +++--- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 76fdeec58df81..0ca2f7d9d2f8c 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -143,19 +143,20 @@ type LabelsBuilder struct { *BaseLabelsBuilder } +const initialLabelsCapacity = 16 + // NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results. func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint, without, noLabels bool) *BaseLabelsBuilder { if parserKeyHints == nil { parserKeyHints = NoParserHints() } - const labelsCapacity = 16 return &BaseLabelsBuilder{ del: make([]string, 0, 5), add: [numValidCategories]*columnarLabels{ - StreamLabel: newColumnarLabels(labelsCapacity), - StructuredMetadataLabel: newColumnarLabels(labelsCapacity), - ParsedLabel: newColumnarLabels(labelsCapacity), + StreamLabel: newColumnarLabels(initialLabelsCapacity), + StructuredMetadataLabel: newColumnarLabels(initialLabelsCapacity), + ParsedLabel: newColumnarLabels(initialLabelsCapacity), }, resultCache: make(map[uint64]LabelsResult), hasher: newHasher(), @@ -200,7 +201,7 @@ func (b *BaseLabelsBuilder) Reset() { if b.add[k] != nil { b.add[k].reset() } else { - b.add[k] = newColumnarLabels(0) + b.add[k] = newColumnarLabels(initialLabelsCapacity) } } b.err = "" diff --git a/pkg/logql/log/labels_columnar.go b/pkg/logql/log/labels_columnar.go index 05f7f2f261edb..3a8e27c9aee6f 100644 --- a/pkg/logql/log/labels_columnar.go +++ b/pkg/logql/log/labels_columnar.go @@ -5,6 +5,15 @@ import ( "slices" ) +// stringColumn is a columnar representation of a string slice. The bytes of all +// strings are stored in a single bytes slice `data`. The `offsets` slice points +// to the start of each string in `data`. The first entry will always be 0. All +// following entries are the accumulated length of all previous strings. +// +// Strings are always accessed via the `indices` slices. It points to the index +// of the string in the `offsets` slice. This technique allows deleting and +// sorting without copying the data. E.g. sorting just sorts the indices but not +// `data` not `offsets`. type stringColumn struct { data []byte offsets []int @@ -21,6 +30,7 @@ func newStringColumn(capacity int) *stringColumn { } } +// add appends a new string func (s *stringColumn) add(value []byte) { // The old values length is the offset of the new value s.offsets = append(s.offsets, len(s.data)) @@ -44,6 +54,7 @@ func (s *stringColumn) reset() { s.indices = s.indices[:0] } +// get returns the string at the given index. func (s *stringColumn) get(i int) []byte { index := s.indices[i] start := s.offsets[index] @@ -55,6 +66,7 @@ func (s *stringColumn) get(i int) []byte { return s.data[start:end] } +// index returns the index of the string in the column. func (s *stringColumn) index(value []byte) int { idx := bytes.Index(s.data, value) if idx == -1 { diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index 2cdb8e9c3e7ba..035241182b234 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -158,8 +158,7 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu j.lbs.Set(ParsedLabel, unsafeGetBytes(sanitizedKey), j.valueBuffer) if j.captureJSONPath { - // TODO: very this is ok. buildJSONPathFromPrefixBuffer uses and unsafe string - j.lbs.SetJSONPath(sanitizedKey, []string{unsafeGetString(key)}) + j.lbs.SetJSONPath(sanitizedKey, []string{string(key)}) } if !j.parserHints.ShouldContinueParsingLine(sanitizedKey, j.lbs) { @@ -269,7 +268,6 @@ func readValue(v []byte, dataType jsonparser.ValueType, buf []byte) []byte { } func unescapeJSONString(b, buf []byte) []byte { - //var stackbuf [unescapeStackBufSize]byte // stack-allocated array for allocation-free unescaping of small strings bU, err := jsonparser.Unescape(b, buf[:]) if err != nil { return nil @@ -277,7 +275,6 @@ func unescapeJSONString(b, buf []byte) []byte { res := unsafeGetString(bU) if strings.ContainsRune(res, utf8.RuneError) { - // TODO: verify that this works return []byte(strings.Map(removeInvalidUtf, res)) } diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index f7b9ed8eb3448..e43db3b2a41e7 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -724,9 +724,9 @@ func jsonBenchmark(b *testing.B, parser Stage, lines int) { b.Fatalf("resulting line not ok: %s\n", line) } - //if resLbs.Parsed().Get("context_file") != "metrics.go" { - // b.Fatalf("label was not extracted correctly! context_file was %v, expected metrics.go, %+v\n", resLbs.Parsed().Get("context_file"), resLbs.Parsed()) - //} + if resLbs.Parsed().Get("context_file") != "metrics.go" { + b.Fatalf("label was not extracted correctly! context_file was %v, expected metrics.go, %+v\n", resLbs.Parsed().Get("context_file"), resLbs.Parsed()) + } } } }