diff --git a/pkg/logql/log/fmt.go b/pkg/logql/log/fmt.go index 5a2946644034e..52919d3f1956a 100644 --- a/pkg/logql/log/fmt.go +++ b/pkg/logql/log/fmt.go @@ -414,7 +414,7 @@ func (lf *LabelsFormatter) Process(ts int64, l []byte, lbs *LabelsBuilder) ([]by if f.Rename { v, _, ok := lbs.GetWithCategory(f.Value) if ok { - lbs.Set(ParsedLabel, f.Name, v) + lbs.Set(ParsedLabel, unsafeGetBytes(f.Name), v) lbs.Del(f.Value) } continue @@ -428,7 +428,7 @@ func (lf *LabelsFormatter) Process(ts int64, l []byte, lbs *LabelsBuilder) ([]by lbs.SetErrorDetails(err.Error()) continue } - lbs.Set(ParsedLabel, f.Name, lf.buf.String()) + lbs.Set(ParsedLabel, unsafeGetBytes(f.Name), lf.buf.Bytes()) } return l, true } diff --git a/pkg/logql/log/keep_labels.go b/pkg/logql/log/keep_labels.go index 6f44efeae1550..087ce438ac59d 100644 --- a/pkg/logql/log/keep_labels.go +++ b/pkg/logql/log/keep_labels.go @@ -17,28 +17,31 @@ func (kl *KeepLabels) Process(_ int64, line []byte, lbls *LabelsBuilder) ([]byte return line, true } - // TODO: Reuse buf? - for _, lb := range lbls.UnsortedLabels(nil) { - if isSpecialLabel(lb.Name) { - continue + del := make([]string, 0, 10) + lbls.Range(func(name, value []byte) { + if isSpecialLabel(unsafeGetString(name)) { + return } var keep bool for _, keepLabel := range kl.labels { - if keepLabel.Matcher != nil && keepLabel.Matcher.Name == lb.Name && keepLabel.Matcher.Matches(lb.Value) { + if keepLabel.Matcher != nil && keepLabel.Matcher.Name == unsafeGetString(name) && keepLabel.Matcher.Matches(unsafeGetString(value)) { keep = true break } - if keepLabel.Name == lb.Name { + if keepLabel.Name == unsafeGetString(name) { keep = true break } } if !keep { - lbls.Del(lb.Name) + del = append(del, unsafeGetString(name)) } + }) + for _, name := range del { + lbls.Del(name) } return line, true diff --git a/pkg/logql/log/label_filter_test.go b/pkg/logql/log/label_filter_test.go index f5258e66754ef..8a451b724add5 100644 --- a/pkg/logql/log/label_filter_test.go +++ b/pkg/logql/log/label_filter_test.go @@ -4,6 +4,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/prometheus/prometheus/model/labels" diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index f4cec1111b88c..0ca2f7d9d2f8c 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -113,7 +113,7 @@ func categoriesContain(categories []LabelCategory, category LabelCategory) bool // Only one base builder is used and it contains cache for each LabelsBuilders. type BaseLabelsBuilder struct { del []string - add [numValidCategories][]labels.Label + add [numValidCategories]*columnarLabels // nolint:structcheck // https://github.com/golangci/golangci-lint/issues/826 err string @@ -134,26 +134,29 @@ type BaseLabelsBuilder struct { // LabelsBuilder is the same as labels.Builder but tailored for this package. type LabelsBuilder struct { base labels.Labels - buf []labels.Label + buf []labels.Label // TODO: try to avoid this currentResult LabelsResult groupedResult LabelsResult + scratchBuilder labels.ScratchBuilder + *BaseLabelsBuilder } +const initialLabelsCapacity = 16 + // NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results. func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint, without, noLabels bool) *BaseLabelsBuilder { if parserKeyHints == nil { parserKeyHints = NoParserHints() } - const labelsCapacity = 16 return &BaseLabelsBuilder{ del: make([]string, 0, 5), - add: [numValidCategories][]labels.Label{ - StreamLabel: make([]labels.Label, 0, labelsCapacity), - StructuredMetadataLabel: make([]labels.Label, 0, labelsCapacity), - ParsedLabel: make([]labels.Label, 0, labelsCapacity), + add: [numValidCategories]*columnarLabels{ + StreamLabel: newColumnarLabels(initialLabelsCapacity), + StructuredMetadataLabel: newColumnarLabels(initialLabelsCapacity), + ParsedLabel: newColumnarLabels(initialLabelsCapacity), }, resultCache: make(map[uint64]LabelsResult), hasher: newHasher(), @@ -195,7 +198,11 @@ func (b *BaseLabelsBuilder) ForLabels(lbs labels.Labels, hash uint64) *LabelsBui func (b *BaseLabelsBuilder) Reset() { b.del = b.del[:0] for k := range b.add { - b.add[k] = b.add[k][:0] + if b.add[k] != nil { + b.add[k].reset() + } else { + b.add[k] = newColumnarLabels(initialLabelsCapacity) + } } b.err = "" b.errDetails = "" @@ -215,7 +222,7 @@ func (b *BaseLabelsBuilder) hasDel() bool { func (b *BaseLabelsBuilder) hasAdd() bool { for _, lbls := range b.add { - if len(lbls) > 0 { + if lbls.len() > 0 { return true } } @@ -225,7 +232,7 @@ func (b *BaseLabelsBuilder) hasAdd() bool { func (b *BaseLabelsBuilder) sizeAdd() int { var length int for _, lbls := range b.add { - length += len(lbls) + length += lbls.len() } return length } @@ -275,7 +282,7 @@ func (b *LabelsBuilder) BaseHas(key string) bool { } // GetWithCategory returns the value and the category of a labels key if it exists. -func (b *LabelsBuilder) GetWithCategory(key string) (string, LabelCategory, bool) { +func (b *LabelsBuilder) GetWithCategory(key string) ([]byte, LabelCategory, bool) { v, category, ok := b.getWithCategory(key) if category == StructuredMetadataLabel { b.referencedStructuredMetadata = true @@ -285,39 +292,37 @@ func (b *LabelsBuilder) GetWithCategory(key string) (string, LabelCategory, bool } // GetWithCategory returns the value and the category of a labels key if it exists. -func (b *LabelsBuilder) getWithCategory(key string) (string, LabelCategory, bool) { +func (b *LabelsBuilder) getWithCategory(key string) ([]byte, LabelCategory, bool) { for category, lbls := range b.add { - for _, l := range lbls { - if l.Name == key { - return l.Value, LabelCategory(category), true - } + if v, ok := lbls.get(unsafeGetBytes(key)); ok { + return v, LabelCategory(category), true } } for _, d := range b.del { if d == key { - return "", InvalidCategory, false + return nil, InvalidCategory, false } } value := b.base.Get(key) if value != "" { - return value, StreamLabel, true + return []byte(value), StreamLabel, true } - return "", InvalidCategory, false + return nil, InvalidCategory, false } func (b *LabelsBuilder) Get(key string) (string, bool) { v, _, ok := b.GetWithCategory(key) - return v, ok + return string(v), ok } // Del deletes the label of the given name. func (b *LabelsBuilder) Del(ns ...string) *LabelsBuilder { for _, n := range ns { for category := range b.add { - b.deleteWithCategory(LabelCategory(category), n) + b.deleteWithCategory(LabelCategory(category), unsafeGetBytes(n)) } b.del = append(b.del, n) } @@ -325,18 +330,14 @@ func (b *LabelsBuilder) Del(ns ...string) *LabelsBuilder { } // deleteWithCategory removes the label from the specified category -func (b *LabelsBuilder) deleteWithCategory(category LabelCategory, n string) { - for i, l := range b.add[category] { - if l.Name == n { - b.add[category] = append(b.add[category][:i], b.add[category][i+1:]...) - } - } +func (b *LabelsBuilder) deleteWithCategory(category LabelCategory, n []byte) { + b.add[category].del(n) } // Set the name/value pair as a label. // The value `v` may not be set if a category with higher preference already contains `n`. // Category preference goes as Parsed > Structured Metadata > Stream. -func (b *LabelsBuilder) Set(category LabelCategory, n, v string) *LabelsBuilder { +func (b *LabelsBuilder) Set(category LabelCategory, n, v []byte) *LabelsBuilder { // Parsed takes precedence over Structured Metadata and Stream labels. // If category is Parsed, we delete `n` from the structured metadata and stream labels. if category == ParsedLabel { @@ -349,7 +350,7 @@ func (b *LabelsBuilder) Set(category LabelCategory, n, v string) *LabelsBuilder // If `n` exists in the parsed labels, we won't overwrite it's value and we just return what we have. if category == StructuredMetadataLabel { b.deleteWithCategory(StreamLabel, n) - if labelsContain(b.add[ParsedLabel], n) { + if _, ok := b.add[ParsedLabel].get(n); ok { return b } } @@ -362,13 +363,11 @@ func (b *LabelsBuilder) Set(category LabelCategory, n, v string) *LabelsBuilder } } - for i, a := range b.add[category] { - if a.Name == n { - b.add[category][i].Value = v - return b - } + if ok := b.add[category].override(n, v); ok { + return b } - b.add[category] = append(b.add[category], labels.Label{Name: n, Value: v}) + + b.add[category].add(n, v) if category == ParsedLabel { // We record parsed labels as extracted so that future parse stages can @@ -377,7 +376,8 @@ func (b *LabelsBuilder) Set(category LabelCategory, n, v string) *LabelsBuilder // Note that because this is used for bypassing extracted fields, and // because parsed labels always take precedence over structured metadata // and stream labels, we must only call RecordExtracted for parsed labels. - b.parserKeyHints.RecordExtracted(n) + //b.parserKeyHints.RecordExtracted(string(n)) + b.parserKeyHints.RecordExtracted(unsafeGetString(n)) } return b } @@ -401,7 +401,7 @@ func (b *LabelsBuilder) Add(category LabelCategory, lbs labels.Labels) *LabelsBu return } - b.Set(category, name, l.Value) + b.Set(category, unsafeGetBytes(name), unsafeGetBytes(l.Value)) }) return b } @@ -437,37 +437,27 @@ func (b *LabelsBuilder) appendErrors(buf []labels.Label) []labels.Label { } return buf } - -func (b *LabelsBuilder) UnsortedLabels(buf []labels.Label, categories ...LabelCategory) []labels.Label { +func (b *LabelsBuilder) Range(f func(name, value []byte), categories ...LabelCategory) { if categories == nil { categories = allCategories } if !b.hasDel() && !b.hasAdd() && categoriesContain(categories, StreamLabel) { - if buf == nil { - buf = make([]labels.Label, 0, b.base.Len()+1) // +1 for error label. - } else { - buf = buf[:0] - } b.base.Range(func(l labels.Label) { - buf = append(buf, l) + f(unsafeGetBytes(l.Name), unsafeGetBytes(l.Value)) }) if categoriesContain(categories, ParsedLabel) { - buf = b.appendErrors(buf) + if b.err != "" { + f(unsafeGetBytes(logqlmodel.ErrorLabel), unsafeGetBytes(b.err)) + } + if b.errDetails != "" { + f(unsafeGetBytes(logqlmodel.ErrorDetailsLabel), unsafeGetBytes(b.errDetails)) + } } - return buf - } - - // In the general case, labels are removed, modified or moved - // rather than added. - if buf == nil { - size := b.base.Len() + b.sizeAdd() + 1 - buf = make([]labels.Label, 0, size) - } else { - buf = buf[:0] + return } if categoriesContain(categories, StreamLabel) { @@ -480,41 +470,64 @@ func (b *LabelsBuilder) UnsortedLabels(buf []labels.Label, categories ...LabelCa } // Skip stream labels which value will be replaced by structured metadata - if labelsContain(b.add[StructuredMetadataLabel], l.Name) { + if labelsContain(b.add[StructuredMetadataLabel], unsafeGetBytes(l.Name)) { return } // Skip stream labels which value will be replaced by parsed labels - if labelsContain(b.add[ParsedLabel], l.Name) { + if labelsContain(b.add[ParsedLabel], unsafeGetBytes(l.Name)) { return } // Take value from stream label if present - if value, found := findLabelValue(b.add[StreamLabel], l.Name); found { - buf = append(buf, labels.Label{Name: l.Name, Value: value}) + if value, found := b.add[StreamLabel].get(unsafeGetBytes(l.Name)); found { + f(unsafeGetBytes(l.Name), value) } else { - buf = append(buf, l) + f(unsafeGetBytes(l.Name), unsafeGetBytes(l.Value)) } }) } if categoriesContain(categories, StructuredMetadataLabel) { - for _, l := range b.add[StructuredMetadataLabel] { - if labelsContain(b.add[ParsedLabel], l.Name) { + for i := 0; i < b.add[StructuredMetadataLabel].len(); i++ { + name, value := b.add[StructuredMetadataLabel].getAt(i) + if labelsContain(b.add[ParsedLabel], name) { continue } - buf = append(buf, l) + f(name, value) } } if categoriesContain(categories, ParsedLabel) { - buf = append(buf, b.add[ParsedLabel]...) + for i := 0; i < b.add[ParsedLabel].len(); i++ { + name, value := b.add[ParsedLabel].getAt(i) + f(name, value) + } } + if (b.HasErr() || b.HasErrorDetails()) && categoriesContain(categories, ParsedLabel) { - buf = b.appendErrors(buf) + if b.err != "" { + f(unsafeGetBytes(logqlmodel.ErrorLabel), unsafeGetBytes(b.err)) + } + if b.errDetails != "" { + f(unsafeGetBytes(logqlmodel.ErrorDetailsLabel), unsafeGetBytes(b.errDetails)) + } + } +} + +// TODO: ideally we remove this +func (b *LabelsBuilder) UnsortedLabels(buf []labels.Label, categories ...LabelCategory) []labels.Label { + if buf == nil { + buf = make([]labels.Label, 0, b.base.Len()+1) // +1 for error label. + } else { + buf = buf[:0] } + b.Range(func(name, value []byte) { + buf = append(buf, labels.Label{Name: string(name), Value: string(value)}) + }, categories...) + return buf } @@ -556,12 +569,12 @@ func (b *LabelsBuilder) IntoMap(m map[string]string) { } return } - b.buf = b.UnsortedLabels(b.buf) + // todo should we also cache maps since limited by the result ? // Maps also don't create a copy of the labels. - for _, l := range b.buf { - m[l.Name] = l.Value - } + b.Range(func(name, value []byte) { + m[string(name)] = string(value) + }) } func (b *LabelsBuilder) Map() (map[string]string, bool) { @@ -571,18 +584,19 @@ func (b *LabelsBuilder) Map() (map[string]string, bool) { } return b.baseMap, false } - b.buf = b.UnsortedLabels(b.buf) + // todo should we also cache maps since limited by the result ? // Maps also don't create a copy of the labels. res := smp.Get() - for _, l := range b.buf { - res[l.Name] = l.Value - } + b.Range(func(name, value []byte) { + res[string(name)] = string(value) + }) return res, true } // LabelsResult returns the LabelsResult from the builder. // No grouping is applied and the cache is used when possible. +// TODO: benchmark for high cardinality labels func (b *LabelsBuilder) LabelsResult() LabelsResult { // unchanged path. if !b.hasDel() && !b.hasAdd() && !b.HasErr() { @@ -590,8 +604,13 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { } // Get all labels at once and sort them - b.buf = b.UnsortedLabels(b.buf) - lbls := labels.New(b.buf...) + b.scratchBuilder.Reset() + b.Range(func(name, value []byte) { + b.scratchBuilder.UnsafeAddBytes(name, value) + }) + b.scratchBuilder.Sort() + + lbls := b.scratchBuilder.Labels() hash := b.hasher.Hash(lbls) if cached, ok := b.resultCache[hash]; ok { @@ -599,38 +618,55 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { } // Now segregate the sorted labels into their categories - var stream, meta, parsed []labels.Label - - for _, l := range b.buf { - // Skip error labels for stream and meta categories - if l.Name == logqlmodel.ErrorLabel || l.Name == logqlmodel.ErrorDetailsLabel { - parsed = append(parsed, l) - continue + var stream, meta, parsed labels.Labels + + // Parsed + b.scratchBuilder.Reset() + b.Range(func(name, value []byte) { + // Add error labels to parsed labels, ie skip them for stream and meta categories + if unsafeGetString(name) == logqlmodel.ErrorLabel || unsafeGetString(name) == logqlmodel.ErrorDetailsLabel { + b.scratchBuilder.UnsafeAddBytes(name, value) + return } // Check which category this label belongs to - if labelsContain(b.add[ParsedLabel], l.Name) { - parsed = append(parsed, l) - } else if labelsContain(b.add[StructuredMetadataLabel], l.Name) { - meta = append(meta, l) - } else { - stream = append(stream, l) + if labelsContain(b.add[ParsedLabel], name) { + b.scratchBuilder.UnsafeAddBytes(name, value) } - } + }) + b.scratchBuilder.Sort() + parsed = b.scratchBuilder.Labels() + + // Structured Metadata + b.scratchBuilder.Reset() + b.Range(func(name, value []byte) { + if labelsContain(b.add[StructuredMetadataLabel], name) { + b.scratchBuilder.UnsafeAddBytes(name, value) + } + }) + b.scratchBuilder.Sort() + meta = b.scratchBuilder.Labels() - result := NewLabelsResult(lbls.String(), hash, labels.New(stream...), labels.New(meta...), labels.New(parsed...)) + // Stream + b.scratchBuilder.Reset() + b.Range(func(name, value []byte) { + if !labelsContain(b.add[ParsedLabel], name) && !labelsContain(b.add[StructuredMetadataLabel], name) && + unsafeGetString(name) != logqlmodel.ErrorLabel && unsafeGetString(name) != logqlmodel.ErrorDetailsLabel { + b.scratchBuilder.UnsafeAddBytes(name, value) + } + }) + b.scratchBuilder.Sort() + stream = b.scratchBuilder.Labels() + + result := NewLabelsResult(lbls.String(), hash, stream, meta, parsed) b.resultCache[hash] = result return result } -func labelsContain(labels []labels.Label, name string) bool { - for _, l := range labels { - if l.Name == name { - return true - } - } - return false +func labelsContain(labels *columnarLabels, name []byte) bool { + _, ok := labels.get(name) + return ok } func findLabelValue(labels []labels.Label, name string) (string, bool) { @@ -642,6 +678,7 @@ func findLabelValue(labels []labels.Label, name string) (string, bool) { return "", false } +// TODO: use scratch builder instead func (b *BaseLabelsBuilder) toUncategorizedResult(buf []labels.Label) LabelsResult { lbls := labels.New(buf...) hash := b.hasher.Hash(lbls) @@ -679,6 +716,7 @@ func (b *LabelsBuilder) GroupedLabels() LabelsResult { if b.without { return b.withoutResult() } + // TODO: use scratch builder instead return b.withResult() } @@ -696,14 +734,12 @@ Outer: } } for category, la := range b.add { - for _, l := range la { - if g == l.Name { - if LabelCategory(category) == StructuredMetadataLabel { - b.referencedStructuredMetadata = true - } - b.buf = append(b.buf, l) - continue Outer + if value, ok := la.get(unsafeGetBytes(g)); ok { + if LabelCategory(category) == StructuredMetadataLabel { + b.referencedStructuredMetadata = true } + b.buf = append(b.buf, labels.Label{Name: g, Value: string(value)}) + continue Outer } } @@ -733,10 +769,8 @@ func (b *LabelsBuilder) withoutResult() LabelsResult { } } for _, lbls := range b.add { - for _, la := range lbls { - if l.Name == la.Name { - return - } + if _, ok := lbls.get(unsafeGetBytes(l.Name)); ok { + return } } for _, lg := range b.groups { @@ -749,16 +783,17 @@ func (b *LabelsBuilder) withoutResult() LabelsResult { for category, lbls := range b.add { OuterAdd: - for _, la := range lbls { + for i := 0; i < lbls.len(); i++ { + name, value := lbls.getAt(i) for _, lg := range b.groups { - if la.Name == lg { + if unsafeGetString(name) == lg { if LabelCategory(category) == StructuredMetadataLabel { b.referencedStructuredMetadata = true } continue OuterAdd } } - b.buf = append(b.buf, la) + b.buf = append(b.buf, labels.Label{Name: string(name), Value: string(value)}) } } diff --git a/pkg/logql/log/labels_columnar.go b/pkg/logql/log/labels_columnar.go new file mode 100644 index 0000000000000..3a8e27c9aee6f --- /dev/null +++ b/pkg/logql/log/labels_columnar.go @@ -0,0 +1,188 @@ +package log + +import ( + "bytes" + "slices" +) + +// stringColumn is a columnar representation of a string slice. The bytes of all +// strings are stored in a single bytes slice `data`. The `offsets` slice points +// to the start of each string in `data`. The first entry will always be 0. All +// following entries are the accumulated length of all previous strings. +// +// Strings are always accessed via the `indices` slices. It points to the index +// of the string in the `offsets` slice. This technique allows deleting and +// sorting without copying the data. E.g. sorting just sorts the indices but not +// `data` not `offsets`. +type stringColumn struct { + data []byte + offsets []int + + // indices is a selection vector. + indices []int +} + +func newStringColumn(capacity int) *stringColumn { + return &stringColumn{ + data: make([]byte, 0, capacity*16), + offsets: make([]int, 0, capacity), + indices: make([]int, 0, capacity), + } +} + +// add appends a new string +func (s *stringColumn) add(value []byte) { + // The old values length is the offset of the new value + s.offsets = append(s.offsets, len(s.data)) + + s.data = append(s.data, value...) + + // Point to the last offset added + s.indices = append(s.indices, len(s.offsets)-1) +} + +// del remove the index from the selection vector. It does not remove the value. +// Use compact to also remove it from the data. +// TODO: implement compact +func (s *stringColumn) del(i int) { + s.indices = append(s.indices[:i], s.indices[i+1:]...) +} + +func (s *stringColumn) reset() { + s.data = s.data[:0] + s.offsets = s.offsets[:0] + s.indices = s.indices[:0] +} + +// get returns the string at the given index. +func (s *stringColumn) get(i int) []byte { + index := s.indices[i] + start := s.offsets[index] + if index+1 >= len(s.offsets) { + return s.data[start:] + } + + end := s.offsets[index+1] + return s.data[start:end] +} + +// index returns the index of the string in the column. +func (s *stringColumn) index(value []byte) int { + idx := bytes.Index(s.data, value) + if idx == -1 { + return -1 + } + + // Find index in offsets + idx = slices.Index(s.offsets, idx) + if idx == -1 { + return -1 + } + + // Verify length + var length int + if idx+1 >= len(s.offsets) { + length = len(s.data) - s.offsets[idx] + } else { + length = s.offsets[idx+1] - s.offsets[idx] + } + if length != len(value) { + return -1 + } + + // Find index in indices + return slices.Index(s.indices, idx) +} + +func (s *stringColumn) len() int { + return len(s.indices) +} + +type columnarLabels struct { + names *stringColumn + values *stringColumn +} + +func (c *columnarLabels) add(name, value []byte) { + c.names.add(name) + c.values.add(value) +} + +// override overrides the value of a label if it exists and returns true. +// If the label does not exist, it returns false and does nothing. +func (c *columnarLabels) override(name, value []byte) bool { + for i := 0; i < len(c.names.indices); i++ { + if bytes.Equal(c.names.get(i), name) { + c.values.del(i) + c.names.del(i) + c.add(name, value) + return true + } + } + return false +} + +func (c *columnarLabels) reset() { + if c.names == nil { + c.names = newStringColumn(0) + } + if c.values == nil { + c.values = newStringColumn(0) + } + c.names.reset() + c.values.reset() +} + +func (c *columnarLabels) len() int { + return c.names.len() +} + +func (c *columnarLabels) get(key []byte) ([]byte, bool) { + // Benchmarking showed that linear search is faster for small number of labels. + if c.names.len() <= 50 { + for i := 0; i < len(c.names.indices); i++ { + if bytes.Equal(c.names.get(i), key) { + return c.values.get(i), true + } + } + return nil, false + } + + idx := c.names.index(key) + if idx == -1 { + return nil, false + } + return c.values.get(idx), true +} + +func (c *columnarLabels) getAt(i int) (name, value []byte) { + return c.names.get(i), c.values.get(i) +} + +func (c *columnarLabels) del(name []byte) { + // TODO: to a string search on s.names.data + for i := 0; i < len(c.names.indices); i++ { + if bytes.Equal(c.names.get(i), name) { + c.names.del(i) + c.values.del(i) + } + } +} + +func newColumnarLabels(capacity int) *columnarLabels { + return &columnarLabels{ + names: newStringColumn(capacity), + values: newStringColumn(capacity), + } +} + +func newColumnarLabelsFromStrings(ss ...string) *columnarLabels { + if len(ss)%2 != 0 { + panic("invalid number of strings") + } + c := newColumnarLabels(len(ss) / 2) + for i := 0; i < len(ss); i += 2 { + c.add(unsafeGetBytes(ss[i]), unsafeGetBytes(ss[i+1])) + } + return c +} diff --git a/pkg/logql/log/labels_columnar_test.go b/pkg/logql/log/labels_columnar_test.go new file mode 100644 index 0000000000000..bad563c8db180 --- /dev/null +++ b/pkg/logql/log/labels_columnar_test.go @@ -0,0 +1,173 @@ +package log + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStringColumn_Add(t *testing.T) { + sc := newStringColumn(10) + sc.add([]byte("foo")) + sc.add([]byte("bar")) + sc.add([]byte("baz")) + sc.add([]byte("qux")) + + require.Equal(t, 4, sc.len()) + + require.Equal(t, []byte("foo"), sc.get(0)) + require.Equal(t, []byte("bar"), sc.get(1)) + require.Equal(t, []byte("baz"), sc.get(2)) + require.Equal(t, []byte("qux"), sc.get(3)) +} + +func TestStringColumn_Del(t *testing.T) { + sc := newStringColumn(10) + sc.add([]byte("foo")) + sc.add([]byte("bar")) + sc.add([]byte("baz")) + sc.add([]byte("qux")) + + all := "" + for i := 0; i < sc.len(); i++ { + all += string(sc.get(i)) + } + require.Equal(t, "foobarbazqux", all) + + require.Equal(t, 4, sc.len()) + sc.del(2) + require.Equal(t, 3, sc.len()) + + all = "" + for i := 0; i < sc.len(); i++ { + all += string(sc.get(i)) + } + require.Equal(t, "foobarqux", all) +} + +func TestColumnarLabels_Get(t *testing.T) { + cl := newColumnarLabels(10) + cl.add([]byte("foo"), []byte("bar")) + cl.add([]byte("baz"), []byte("qux")) + + require.Equal(t, 2, cl.len()) + v, ok := cl.get([]byte("foo")) + require.True(t, ok) + require.Equal(t, []byte("bar"), v) + + v, ok = cl.get([]byte("baz")) + require.True(t, ok) + require.Equal(t, []byte("qux"), v) +} + +func TestStringColumn_Index(t *testing.T) { + sc := newStringColumn(10) + sc.add([]byte("foo")) + sc.add([]byte("bar")) + sc.add([]byte("baz")) + sc.add([]byte("qux")) + + require.Equal(t, 0, sc.index([]byte("foo"))) + require.Equal(t, 1, sc.index([]byte("bar"))) + require.Equal(t, 2, sc.index([]byte("baz"))) + require.Equal(t, 3, sc.index([]byte("qux"))) + + require.Equal(t, -1, sc.index([]byte("ba"))) + require.Equal(t, -1, sc.index([]byte("bazq"))) + require.Equal(t, -1, sc.index([]byte("qu"))) + require.Equal(t, -1, sc.index([]byte("nonexistent"))) + + sc.del(2) + require.Equal(t, 2, sc.index([]byte("qux"))) + require.Equal(t, -1, sc.index([]byte("baz"))) +} + +func TestColumnarLabels_GetAt(t *testing.T) { + cl := newColumnarLabels(10) + cl.add([]byte("foo"), []byte("bar")) + cl.add([]byte("baz"), []byte("qux")) + + require.Equal(t, 2, cl.len()) + n, v := cl.getAt(0) + require.Equal(t, []byte("foo"), n) + require.Equal(t, []byte("bar"), v) + + n, v = cl.getAt(1) + require.Equal(t, []byte("baz"), n) + require.Equal(t, []byte("qux"), v) +} + +func TestColumnarLabels_Del(t *testing.T) { + cl := newColumnarLabels(10) + cl.add([]byte("foo"), []byte("bar")) + cl.add([]byte("baz"), []byte("qux")) + + require.Equal(t, 2, cl.len()) + cl.del([]byte("foo")) + require.Equal(t, 1, cl.len()) + + _, ok := cl.get([]byte("foo")) + require.False(t, ok) + + n, v := cl.getAt(0) + require.Equal(t, []byte("baz"), n) + require.Equal(t, []byte("qux"), v) +} + +func TestColumnarLabels_Override(t *testing.T) { + cl := newColumnarLabels(10) + cl.add([]byte("foo"), []byte("bar")) + cl.add([]byte("baz"), []byte("qux")) + + require.Equal(t, 2, cl.len()) + ok := cl.override([]byte("foo"), []byte("new")) + require.True(t, ok) + require.Equal(t, 2, cl.len()) + + v, ok := cl.get([]byte("foo")) + require.True(t, ok) + require.Equal(t, []byte("new"), v) + + v, ok = cl.get([]byte("baz")) + require.True(t, ok) + require.Equal(t, []byte("qux"), v) + + ok = cl.override([]byte("nonexistent"), []byte("new")) + require.False(t, ok) + require.Equal(t, 2, cl.len()) + + _, ok = cl.get([]byte("nonexistent")) + require.False(t, ok) +} + +func BenchmarkColumnarLabels_Get(b *testing.B) { + for _, l := range []int{10, 30, 100, 300, 1000} { + cl := newColumnarLabels(l) + for i := 0; i < l; i++ { + cl.add([]byte(fmt.Sprintf("foo%d", i)), []byte(fmt.Sprintf("bar%d", i))) + } + + b.ResetTimer() + b.Run(fmt.Sprintf("get-middle-%d", l), func(b *testing.B) { + for i := 0; i < b.N; i++ { + cl.get([]byte(fmt.Sprintf("foo%d", l/2))) + } + }) + b.Run(fmt.Sprintf("get-first-%d", l), func(b *testing.B) { + for i := 0; i < b.N; i++ { + cl.get([]byte("foo0")) + } + }) + b.Run(fmt.Sprintf("get-last-%d", l), func(b *testing.B) { + for i := 0; i < b.N; i++ { + cl.get([]byte(fmt.Sprintf("foo%d", l-1))) + } + }) + b.Run(fmt.Sprintf("get-nonexistent-%d", l), func(b *testing.B) { + for i := 0; i < b.N; i++ { + cl.get([]byte("nonexistent")) + } + }) + } +} diff --git a/pkg/logql/log/labels_test.go b/pkg/logql/log/labels_test.go index 11d134799009d..ff67630181a6b 100644 --- a/pkg/logql/log/labels_test.go +++ b/pkg/logql/log/labels_test.go @@ -18,8 +18,8 @@ func TestLabelsBuilder_Get(t *testing.T) { lbs := labels.FromStrings("already", "in") b := NewBaseLabelsBuilder().ForLabels(lbs, labels.StableHash(lbs)) b.Reset() - b.Set(StructuredMetadataLabel, "foo", "bar") - b.Set(ParsedLabel, "bar", "buzz") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) + b.Set(ParsedLabel, []byte("bar"), []byte("buzz")) _, category, ok := b.GetWithCategory("bar") require.Equal(t, ParsedLabel, category) @@ -36,11 +36,11 @@ func TestLabelsBuilder_Get(t *testing.T) { require.False(t, ok) v, category, ok := b.GetWithCategory("bar") require.True(t, ok) - require.Equal(t, "buzz", v) + require.Equal(t, "buzz", string(v)) require.Equal(t, ParsedLabel, category) v, category, ok = b.GetWithCategory("already") require.True(t, ok) - require.Equal(t, "in", v) + require.Equal(t, "in", string(v)) require.Equal(t, StreamLabel, category) b.Del("bar") _, _, ok = b.GetWithCategory("bar") @@ -170,10 +170,10 @@ func TestLabelsBuilder_LabelsResult(t *testing.T) { withErr := labels.FromStrings(append(strs, logqlmodel.ErrorLabel, "err")...) assertLabelResult(t, withErr, b.LabelsResult()) - b.Set(StructuredMetadataLabel, "foo", "bar") - b.Set(StreamLabel, "namespace", "tempo") - b.Set(ParsedLabel, "buzz", "fuzz") - b.Set(ParsedLabel, "ToReplace", "other") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) + b.Set(StreamLabel, []byte("namespace"), []byte("tempo")) + b.Set(ParsedLabel, []byte("buzz"), []byte("fuzz")) + b.Set(ParsedLabel, []byte("ToReplace"), []byte("other")) b.Del("job") expectedStreamLbls := labels.FromStrings( @@ -201,10 +201,10 @@ func TestLabelsBuilder_LabelsResult(t *testing.T) { assert.Equal(t, expectedParsedLbls, actual.Parsed()) b.Reset() - b.Set(StreamLabel, "namespace", "tempo") - b.Set(StreamLabel, "bazz", "tazz") - b.Set(StructuredMetadataLabel, "bazz", "sazz") - b.Set(ParsedLabel, "ToReplace", "other") + b.Set(StreamLabel, []byte("namespace"), []byte("tempo")) + b.Set(StreamLabel, []byte("bazz"), []byte("tazz")) + b.Set(StructuredMetadataLabel, []byte("bazz"), []byte("sazz")) + b.Set(ParsedLabel, []byte("ToReplace"), []byte("other")) expectedStreamLbls = labels.FromStrings( "namespace", "tempo", @@ -238,8 +238,8 @@ func TestLabelsBuilder_Set(t *testing.T) { b := NewBaseLabelsBuilder().ForLabels(lbs, labels.StableHash(lbs)) // test duplicating stream label with parsed label - b.Set(StructuredMetadataLabel, "stzz", "stvzz") - b.Set(ParsedLabel, "toreplace", "buzz") + b.Set(StructuredMetadataLabel, []byte("stzz"), []byte("stvzz")) + b.Set(ParsedLabel, []byte("toreplace"), []byte("buzz")) expectedStreamLbls := labels.FromStrings("namespace", "loki", "cluster", "us-central1") expectedStucturedMetadataLbls := labels.FromStrings("stzz", "stvzz") expectedParsedLbls := labels.FromStrings("toreplace", "buzz") @@ -255,9 +255,9 @@ func TestLabelsBuilder_Set(t *testing.T) { b.Reset() // test duplicating structured metadata label with parsed label - b.Set(StructuredMetadataLabel, "stzz", "stvzz") - b.Set(StructuredMetadataLabel, "toreplace", "muzz") - b.Set(ParsedLabel, "toreplace", "buzz") + b.Set(StructuredMetadataLabel, []byte("stzz"), []byte("stvzz")) + b.Set(StructuredMetadataLabel, []byte("toreplace"), []byte("muzz")) + b.Set(ParsedLabel, []byte("toreplace"), []byte("buzz")) expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1") expectedStucturedMetadataLbls = labels.FromStrings("stzz", "stvzz") expectedParsedLbls = labels.FromStrings("toreplace", "buzz") @@ -273,8 +273,8 @@ func TestLabelsBuilder_Set(t *testing.T) { b.Reset() // test duplicating stream label with structured meta data label - b.Set(StructuredMetadataLabel, "toreplace", "muzz") - b.Set(ParsedLabel, "stzz", "stvzz") + b.Set(StructuredMetadataLabel, []byte("toreplace"), []byte("muzz")) + b.Set(ParsedLabel, []byte("stzz"), []byte("stvzz")) expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1") expectedStucturedMetadataLbls = labels.FromStrings("toreplace", "muzz") expectedParsedLbls = labels.FromStrings("stzz", "stvzz") @@ -290,9 +290,9 @@ func TestLabelsBuilder_Set(t *testing.T) { b.Reset() // test duplicating parsed label with structured meta data label - b.Set(ParsedLabel, "toreplace", "puzz") - b.Set(StructuredMetadataLabel, "stzz", "stvzzz") - b.Set(StructuredMetadataLabel, "toreplace", "muzz") + b.Set(ParsedLabel, []byte("toreplace"), []byte("puzz")) + b.Set(StructuredMetadataLabel, []byte("stzz"), []byte("stvzzz")) + b.Set(StructuredMetadataLabel, []byte("toreplace"), []byte("muzz")) expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1") expectedStucturedMetadataLbls = labels.FromStrings("stzz", "stvzzz") expectedParsedLbls = labels.FromStrings("toreplace", "puzz") @@ -308,8 +308,8 @@ func TestLabelsBuilder_Set(t *testing.T) { b.Reset() // test duplicating structured meta data label with stream label - b.Set(ParsedLabel, "stzz", "stvzzz") - b.Set(StructuredMetadataLabel, "toreplace", "muzz") + b.Set(ParsedLabel, []byte("stzz"), []byte("stvzzz")) + b.Set(StructuredMetadataLabel, []byte("toreplace"), []byte("muzz")) expectedStreamLbls = labels.FromStrings("namespace", "loki", "cluster", "us-central1") expectedStucturedMetadataLbls = labels.FromStrings("toreplace", "muzz") expectedParsedLbls = labels.FromStrings("stzz", "stvzzz") @@ -331,15 +331,15 @@ func TestLabelsBuilder_UnsortedLabels(t *testing.T) { } lbs := labels.FromStrings(strs...) b := NewBaseLabelsBuilder().ForLabels(lbs, labels.StableHash(lbs)) - b.add[StructuredMetadataLabel] = []labels.Label{{Name: "toreplace", Value: "buzz"}, {Name: "fzz", Value: "bzz"}} - b.add[ParsedLabel] = []labels.Label{{Name: "pzz", Value: "pvzz"}} + b.add[StructuredMetadataLabel] = newColumnarLabelsFromStrings("toreplace", "buzz", "fzz", "bzz") + b.add[ParsedLabel] = newColumnarLabelsFromStrings("pzz", "pvzz") expected := []labels.Label{{Name: "cluster", Value: "us-central1"}, {Name: "namespace", Value: "loki"}, {Name: "fzz", Value: "bzz"}, {Name: "toreplace", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}} actual := b.UnsortedLabels(nil) require.ElementsMatch(t, expected, actual) b.Reset() - b.add[StructuredMetadataLabel] = []labels.Label{{Name: "fzz", Value: "bzz"}} - b.add[ParsedLabel] = []labels.Label{{Name: "toreplace", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}} + b.add[StructuredMetadataLabel] = newColumnarLabelsFromStrings("fzz", "bzz") + b.add[ParsedLabel] = newColumnarLabelsFromStrings("toreplace", "buzz", "pzz", "pvzz") expected = []labels.Label{{Name: "cluster", Value: "us-central1"}, {Name: "namespace", Value: "loki"}, {Name: "fzz", Value: "bzz"}, {Name: "toreplace", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}} actual = b.UnsortedLabels(nil) sortLabelSlice(expected) @@ -347,8 +347,8 @@ func TestLabelsBuilder_UnsortedLabels(t *testing.T) { assert.Equal(t, expected, actual) b.Reset() - b.add[StructuredMetadataLabel] = []labels.Label{{Name: "fzz", Value: "bzz"}, {Name: "toreplacezz", Value: "test"}} - b.add[ParsedLabel] = []labels.Label{{Name: "toreplacezz", Value: "buzz"}, {Name: "pzz", Value: "pvzz"}} + b.add[StructuredMetadataLabel] = newColumnarLabelsFromStrings("fzz", "bzz", "toreplacezz", "test") + b.add[ParsedLabel] = newColumnarLabelsFromStrings("toreplacezz", "buzz", "pzz", "pvzz") expected = []labels.Label{{Name: "cluster", Value: "us-central1"}, {Name: "namespace", Value: "loki"}, {Name: "fzz", Value: "bzz"}, {Name: "toreplace", Value: "fuzz"}, {Name: "pzz", Value: "pvzz"}, {Name: "toreplacezz", Value: "buzz"}} actual = b.UnsortedLabels(nil) sortLabelSlice(expected) @@ -375,9 +375,9 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { assertLabelResult(t, withErr, b.GroupedLabels()) b.Reset() - b.Set(StructuredMetadataLabel, "foo", "bar") - b.Set(StreamLabel, "namespace", "tempo") - b.Set(ParsedLabel, "buzz", "fuzz") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) + b.Set(StreamLabel, []byte("namespace"), []byte("tempo")) + b.Set(ParsedLabel, []byte("buzz"), []byte("fuzz")) b.Del("job") expected := labels.FromStrings("namespace", "tempo") assertLabelResult(t, expected, b.GroupedLabels()) @@ -390,19 +390,19 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { b.Del("job") assertLabelResult(t, labels.EmptyLabels(), b.GroupedLabels()) b.Reset() - b.Set(StreamLabel, "namespace", "tempo") + b.Set(StreamLabel, []byte("namespace"), []byte("tempo")) assertLabelResult(t, labels.FromStrings("job", "us-central1/loki"), b.GroupedLabels()) require.False(t, b.referencedStructuredMetadata) b = NewBaseLabelsBuilderWithGrouping([]string{"foo"}, nil, false, false).ForLabels(lbs, labels.StableHash(lbs)) - b.Set(StructuredMetadataLabel, "foo", "bar") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) assertLabelResult(t, labels.FromStrings("foo", "bar"), b.GroupedLabels()) require.True(t, b.referencedStructuredMetadata) b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, nil, true, false).ForLabels(lbs, labels.StableHash(lbs)) b.Del("job") - b.Set(StructuredMetadataLabel, "foo", "bar") - b.Set(StreamLabel, "job", "something") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) + b.Set(StreamLabel, []byte("job"), []byte("something")) expected = labels.FromStrings("namespace", "loki", "cluster", "us-central1", "foo", "bar", @@ -411,7 +411,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { require.False(t, b.referencedStructuredMetadata) b = NewBaseLabelsBuilderWithGrouping([]string{"foo"}, nil, true, false).ForLabels(lbs, labels.StableHash(lbs)) - b.Set(StructuredMetadataLabel, "foo", "bar") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) expected = labels.FromStrings("namespace", "loki", "job", "us-central1/loki", "cluster", "us-central1", @@ -420,8 +420,8 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { require.True(t, b.referencedStructuredMetadata) b = NewBaseLabelsBuilderWithGrouping(nil, nil, false, false).ForLabels(lbs, labels.StableHash(lbs)) - b.Set(StructuredMetadataLabel, "foo", "bar") - b.Set(StreamLabel, "job", "something") + b.Set(StructuredMetadataLabel, []byte("foo"), []byte("bar")) + b.Set(StreamLabel, []byte("job"), []byte("something")) expected = labels.FromStrings("namespace", "loki", "job", "something", "cluster", "us-central1", diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index f6ebc1de29f62..c62422aa457a3 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -35,7 +35,8 @@ var ( _ Stage = &RegexpParser{} _ Stage = &LogfmtParser{} - trueBytes = []byte("true") + trueBytes = []byte("true") + falseBytes = []byte("false") errUnexpectedJSONObject = fmt.Errorf("expecting json object(%d), but it is not", jsoniter.ObjectValue) errMissingCapture = errors.New("at least one named capture must be supplied") @@ -59,6 +60,7 @@ type JSONParser struct { keys internedStringSet parserHints ParserHint sanitizedPrefixBuffer []byte + valueBuffer []byte } // NewJSONParser creates a log stage that can parse a json log line and add properties as labels. @@ -68,6 +70,7 @@ func NewJSONParser(captureJSONPath bool) *JSONParser { keys: internedStringSet{}, captureJSONPath: captureJSONPath, sanitizedPrefixBuffer: make([]byte, 0, 64), + valueBuffer: make([]byte, 0, 64), } } @@ -150,7 +153,10 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu if !ok || j.lbs.ParserLabelHints().Extracted(sanitizedKey) { return nil } - j.lbs.Set(ParsedLabel, sanitizedKey, readValue(value, dataType)) + + j.valueBuffer = readValue(value, dataType, j.valueBuffer) + j.lbs.Set(ParsedLabel, unsafeGetBytes(sanitizedKey), j.valueBuffer) + if j.captureJSONPath { j.lbs.SetJSONPath(sanitizedKey, []string{string(key)}) } @@ -195,7 +201,8 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu return nil } - j.lbs.Set(ParsedLabel, keyString, readValue(value, dataType)) + j.valueBuffer = readValue(value, dataType, j.valueBuffer) + j.lbs.Set(ParsedLabel, unsafeGetBytes(keyString), j.valueBuffer) if !j.parserHints.ShouldContinueParsingLine(keyString, j.lbs) { return errLabelDoesNotMatch @@ -242,37 +249,36 @@ func (j *JSONParser) buildJSONPathFromPrefixBuffer() []string { func (j *JSONParser) RequiredLabelNames() []string { return []string{} } -func readValue(v []byte, dataType jsonparser.ValueType) string { +func readValue(v []byte, dataType jsonparser.ValueType, buf []byte) []byte { switch dataType { case jsonparser.String: - return unescapeJSONString(v) + return unescapeJSONString(v, buf) case jsonparser.Null: - return "" + return nil case jsonparser.Number: - return string(v) + return v case jsonparser.Boolean: if bytes.Equal(v, trueBytes) { - return trueString + return trueBytes } - return falseString + return falseBytes default: - return "" + return nil } } -func unescapeJSONString(b []byte) string { - var stackbuf [unescapeStackBufSize]byte // stack-allocated array for allocation-free unescaping of small strings - bU, err := jsonparser.Unescape(b, stackbuf[:]) +func unescapeJSONString(b, buf []byte) []byte { + bU, err := jsonparser.Unescape(b, buf[:]) if err != nil { - return "" + return nil } - res := string(bU) + res := unsafeGetString(bU) if strings.ContainsRune(res, utf8.RuneError) { - res = strings.Map(removeInvalidUtf, res) + return []byte(strings.Map(removeInvalidUtf, res)) } - return res + return bU } type RegexpParser struct { @@ -338,7 +344,7 @@ func (r *RegexpParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte continue } - lbs.Set(ParsedLabel, key, string(value)) + lbs.Set(ParsedLabel, unsafeGetBytes(key), value) if !parserHints.ShouldContinueParsingLine(key, lbs) { return line, false } @@ -414,7 +420,7 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte continue } - lbs.Set(ParsedLabel, key, string(val)) + lbs.Set(ParsedLabel, unsafeGetBytes(key), val) if !parserHints.ShouldContinueParsingLine(key, lbs) { return line, false } @@ -476,7 +482,7 @@ func (l *PatternParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byt continue } - lbs.Set(ParsedLabel, name, string(m)) + lbs.Set(ParsedLabel, unsafeGetBytes(name), m) if !parserHints.ShouldContinueParsingLine(name, lbs) { return line, false } @@ -535,7 +541,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde for id, paths := range l.expressions { keys[id] = fmt.Sprintf("%v", paths...) if !lbs.BaseHas(id) { - lbs.Set(ParsedLabel, id, "") + lbs.Set(ParsedLabel, unsafeGetBytes(id), nil) } } @@ -597,7 +603,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde } } - lbs.Set(ParsedLabel, key, string(val)) + lbs.Set(ParsedLabel, unsafeGetBytes(key), val) if lbs.ParserLabelHints().AllRequiredExtracted() { break @@ -616,9 +622,10 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde func (l *LogfmtExpressionParser) RequiredLabelNames() []string { return []string{} } type JSONExpressionParser struct { - ids []string - paths [][]string - keys internedStringSet + ids []string + paths [][]string + keys internedStringSet + valueBuffer []byte } func NewJSONExpressionParser(expressions []LabelExtractionExpr) (*JSONExpressionParser, error) { @@ -639,9 +646,10 @@ func NewJSONExpressionParser(expressions []LabelExtractionExpr) (*JSONExpression } return &JSONExpressionParser{ - ids: ids, - paths: paths, - keys: internedStringSet{}, + ids: ids, + paths: paths, + keys: internedStringSet{}, + valueBuffer: make([]byte, 0, 64), }, nil } @@ -688,11 +696,12 @@ func (j *JSONExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilder) switch typ { case jsonparser.Null: - lbs.Set(ParsedLabel, key, "") + lbs.Set(ParsedLabel, unsafeGetBytes(key), nil) case jsonparser.Object: - lbs.Set(ParsedLabel, key, string(data)) + lbs.Set(ParsedLabel, unsafeGetBytes(key), data) default: - lbs.Set(ParsedLabel, key, unescapeJSONString(data)) + j.valueBuffer = unescapeJSONString(data, j.valueBuffer) + lbs.Set(ParsedLabel, unsafeGetBytes(key), j.valueBuffer) } matches++ @@ -702,7 +711,7 @@ func (j *JSONExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilder) if matches < len(j.ids) { for _, id := range j.ids { if _, ok := lbs.Get(id); !ok { - lbs.Set(ParsedLabel, id, "") + lbs.Set(ParsedLabel, unsafeGetBytes(id), nil) } } } @@ -722,9 +731,9 @@ func isValidJSONStart(data []byte) bool { func (j *JSONExpressionParser) RequiredLabelNames() []string { return []string{} } type UnpackParser struct { - lbsBuffer []string - - keys internedStringSet + lbsBuffer []string + valueBuffer []byte + keys internedStringSet } // NewUnpackParser creates a new unpack stage. @@ -733,8 +742,9 @@ type UnpackParser struct { // see https://grafana.com/docs/loki/latest/clients/promtail/stages/pack/ func NewUnpackParser() *UnpackParser { return &UnpackParser{ - lbsBuffer: make([]string, 0, 16), - keys: internedStringSet{}, + lbsBuffer: make([]string, 0, 16), + keys: internedStringSet{}, + valueBuffer: make([]byte, 0, 64), } } @@ -772,7 +782,7 @@ func addErrLabel(msg string, err error, lbs *LabelsBuilder) { } if lbs.ParserLabelHints().PreserveError() { - lbs.Set(ParsedLabel, logqlmodel.PreserveErrorLabel, "true") + lbs.Set(ParsedLabel, unsafeGetBytes(logqlmodel.PreserveErrorLabel), trueBytes) } } @@ -808,7 +818,8 @@ func (u *UnpackParser) unpack(entry []byte, lbs *LabelsBuilder) ([]byte, error) } // append to the buffer of labels - u.lbsBuffer = append(u.lbsBuffer, sanitizeLabelKey(key, true), unescapeJSONString(value)) + u.valueBuffer = unescapeJSONString(value, u.valueBuffer) + u.lbsBuffer = append(u.lbsBuffer, sanitizeLabelKey(key, true), string(u.valueBuffer)) default: return nil } @@ -823,7 +834,7 @@ func (u *UnpackParser) unpack(entry []byte, lbs *LabelsBuilder) ([]byte, error) // flush the buffer if we found a packed entry. if isPacked { for i := 0; i < len(u.lbsBuffer); i = i + 2 { - lbs.Set(ParsedLabel, u.lbsBuffer[i], u.lbsBuffer[i+1]) + lbs.Set(ParsedLabel, unsafeGetBytes(u.lbsBuffer[i]), unsafeGetBytes(u.lbsBuffer[i+1])) if !lbs.ParserLabelHints().ShouldContinueParsingLine(u.lbsBuffer[i], lbs) { return entry, errLabelDoesNotMatch } diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index a39e0777d4abc..b22cdd8174cd1 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -306,7 +306,7 @@ func TestLabelShortCircuit(t *testing.T) { name, category, ok := lbs.GetWithCategory("name") require.True(t, ok) require.Equal(t, ParsedLabel, category) - require.Contains(t, name, "text1") + require.Contains(t, string(name), "text1") }) } } @@ -1375,7 +1375,7 @@ func TestLogfmtConsistentPrecedence(t *testing.T) { var ( metadataStream = NewBaseLabelsBuilder(). ForLabels(labels.FromStrings("foo", "bar"), 0). - Set(StructuredMetadataLabel, "app", "loki") + Set(StructuredMetadataLabel, []byte("app"), []byte("loki")) basicStream = NewBaseLabelsBuilder(). ForLabels(labels.FromStrings("foo", "baz"), 0) @@ -1390,12 +1390,12 @@ func TestLogfmtConsistentPrecedence(t *testing.T) { require.True(t, ok) res, cat, ok := metadataStream.GetWithCategory("app") - require.Equal(t, "lowkey", res) + require.Equal(t, "lowkey", string(res)) require.Equal(t, ParsedLabel, cat) require.True(t, ok) res, cat, ok = basicStream.GetWithCategory("app") - require.Equal(t, "lowkey", res) + require.Equal(t, "lowkey", string(res)) require.Equal(t, ParsedLabel, cat) require.True(t, ok) }) @@ -1404,7 +1404,7 @@ func TestLogfmtConsistentPrecedence(t *testing.T) { var ( metadataStream = NewBaseLabelsBuilder(). ForLabels(labels.FromStrings("foo", "bar"), 0). - Set(StructuredMetadataLabel, "app", "loki") + Set(StructuredMetadataLabel, []byte("app"), []byte("loki")) basicStream = NewBaseLabelsBuilder(). ForLabels(labels.FromStrings("foo", "baz"), 0) @@ -1419,12 +1419,12 @@ func TestLogfmtConsistentPrecedence(t *testing.T) { require.True(t, ok) res, cat, ok := metadataStream.GetWithCategory("app") - require.Equal(t, "lowkey", res) + require.Equal(t, "lowkey", string(res)) require.Equal(t, ParsedLabel, cat) require.True(t, ok) res, cat, ok = basicStream.GetWithCategory("app") - require.Equal(t, "lowkey", res) + require.Equal(t, "lowkey", string(res)) require.Equal(t, ParsedLabel, cat) require.True(t, ok) }) diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index 2205ffffd66e8..e43db3b2a41e7 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -1,6 +1,7 @@ package log import ( + "fmt" "testing" "time" @@ -91,15 +92,15 @@ func TestPipeline(t *testing.T) { // Reset caches p.baseBuilder.del = []string{"foo", "bar"} - p.baseBuilder.add = [numValidCategories][]labels.Label{ - ParsedLabel: {{Name: "baz", Value: "blip"}}, + p.baseBuilder.add = [numValidCategories]*columnarLabels{ + ParsedLabel: newColumnarLabelsFromStrings("baz", "blip"), } p.Reset() require.Len(t, p.streamPipelines, 0) require.Len(t, p.baseBuilder.del, 0) for _, v := range p.baseBuilder.add { - require.Len(t, v, 0) + require.Equal(t, 0, v.len()) } } @@ -167,15 +168,15 @@ func TestPipelineWithStructuredMetadata(t *testing.T) { // Reset caches p.baseBuilder.del = []string{"foo", "bar"} - p.baseBuilder.add = [numValidCategories][]labels.Label{ - ParsedLabel: {{Name: "baz", Value: "blip"}}, + p.baseBuilder.add = [numValidCategories]*columnarLabels{ + ParsedLabel: newColumnarLabelsFromStrings("baz", "blip"), } p.Reset() require.Len(t, p.streamPipelines, 0) require.Len(t, p.baseBuilder.del, 0) for _, v := range p.baseBuilder.add { - require.Len(t, v, 0) + require.Equal(t, 0, v.len()) } } @@ -684,14 +685,24 @@ func mustFilter(f Filterer, err error) Filterer { return f } -func jsonBenchmark(b *testing.B, parser Stage) { - b.ReportAllocs() +func jsonBenchmark(b *testing.B, parser Stage, lines int) { p := NewPipeline([]Stage{ mustFilter(NewFilter("metrics.go", LineMatchEqual)).ToStage(), parser, }) - line := []byte(`{"ts":"2020-12-27T09:15:54.333026285Z","error":"action could not be completed", "context":{"file": "metrics.go"}}`) + streams := make([][]byte, lines) + for i := 0; i < lines; i++ { + streams[i] = []byte(fmt.Sprintf( + `{ + "ts":"2020-12-27T09:15:54.333026285Z", + "error":"action could not be completed", + "context":{"file": "metrics.go"}, + "line": "line %d" + }`, + i, // disables caching + )) + } lbs := labels.FromStrings("cluster", "ops-tool1", "name", "querier", "pod", "querier-5896759c79-q7q9h", @@ -701,17 +712,21 @@ func jsonBenchmark(b *testing.B, parser Stage) { "job", "loki-dev/querier", "pod_template_hash", "5896759c79", ) + b.ReportAllocs() b.ResetTimer() - sp := p.ForStream(lbs) for n := 0; n < b.N; n++ { - resLine, resLbs, resMatches = sp.Process(0, line, labels.EmptyLabels()) + // ForStream creates the LabelsBuilder. Since it caches the results it must be created for each iteration. + sp := p.ForStream(lbs) + for _, line := range streams { + resLine, resLbs, resMatches = sp.Process(0, line, labels.EmptyLabels()) - if !resMatches { - b.Fatalf("resulting line not ok: %s\n", line) - } + if !resMatches { + b.Fatalf("resulting line not ok: %s\n", line) + } - if resLbs.Labels().Get("context_file") != "metrics.go" { - b.Fatalf("label was not extracted correctly! %+v\n", resLbs) + if resLbs.Parsed().Get("context_file") != "metrics.go" { + b.Fatalf("label was not extracted correctly! context_file was %v, expected metrics.go, %+v\n", resLbs.Parsed().Get("context_file"), resLbs.Parsed()) + } } } } @@ -740,7 +755,11 @@ func invalidJSONBenchmark(b *testing.B, parser Stage) { } func BenchmarkJSONParser(b *testing.B) { - jsonBenchmark(b, NewJSONParser(false)) + jsonBenchmark(b, NewJSONParser(false), 1) +} + +func BenchmarkJSONParserHighCardinality(b *testing.B) { + jsonBenchmark(b, NewJSONParser(true), 100_000) } func BenchmarkJSONParserInvalidLine(b *testing.B) { @@ -755,7 +774,7 @@ func BenchmarkJSONExpressionParser(b *testing.B) { b.Fatal("cannot create new JSON expression parser") } - jsonBenchmark(b, parser) + jsonBenchmark(b, parser, 1) } func BenchmarkJSONExpressionParserInvalidLine(b *testing.B) { @@ -769,7 +788,7 @@ func BenchmarkJSONExpressionParserInvalidLine(b *testing.B) { invalidJSONBenchmark(b, parser) } -func logfmtBenchmark(b *testing.B, parser Stage) { +func logfmtBenchmark(b *testing.B, parser Stage, lines int) { b.ReportAllocs() p := NewPipeline([]Stage{ @@ -777,7 +796,10 @@ func logfmtBenchmark(b *testing.B, parser Stage) { parser, }) - line := []byte(`level=info ts=2020-10-18T18:04:22.147378997Z caller=metrics.go:81 org_id=29 traceID=29a0f088b047eb8c latency=fast query="{stream=\"stdout\",pod=\"loki-canary-xmjzp\"}" query_type=limited range_type=range length=20s step=1s duration=58.126671ms status=200 throughput_mb=2.496547 total_bytes_mb=0.145116`) + streams := make([][]byte, lines) + for i := 0; i < lines; i++ { + streams[i] = []byte(fmt.Sprintf(`level=info ts=2020-10-18T18:04:22.147378997Z caller=metrics.go:81 org_id=29 traceID=29a0f088b047eb8c latency=fast query="{stream=\"stdout\",pod=\"loki-canary-xmjzp\"}" query_type=limited range_type=range length=20s step=1s duration=58.126671ms status=200 throughput_mb=2.496547 total_bytes_mb=0.145116, line=%d`, i)) + } lbs := labels.FromStrings("cluster", "ops-tool1", "name", "querier", "ts", "2020-10-18T18:04:22.147378997Z", @@ -785,20 +807,26 @@ func logfmtBenchmark(b *testing.B, parser Stage) { b.ResetTimer() sp := p.ForStream(lbs) for n := 0; n < b.N; n++ { - resLine, resLbs, resMatches = sp.Process(0, line, labels.EmptyLabels()) + for _, line := range streams { + resLine, resLbs, resMatches = sp.Process(0, line, labels.EmptyLabels()) - if !resMatches { - b.Fatalf("resulting line not ok: %s\n", line) - } + if !resMatches { + b.Fatalf("resulting line not ok: %s\n", line) + } - if resLbs.Labels().Get("ts") != "2020-10-18T18:04:22.147378997Z" { - b.Fatalf("label was not extracted correctly! %+v\n", resLbs) + if resLbs.Stream().Get("ts") != "2020-10-18T18:04:22.147378997Z" { + b.Fatalf("label was not extracted correctly! ts was %v, expected 2020-10-18T18:04:22.147378997Z, %+v\n", resLbs.Stream().Get("ts"), resLbs) + } } } } func BenchmarkLogfmtParser(b *testing.B) { - logfmtBenchmark(b, NewLogfmtParser(false, false)) + logfmtBenchmark(b, NewLogfmtParser(false, false), 1) +} + +func BenchmarkLogfmtParserHighCardinality(b *testing.B) { + logfmtBenchmark(b, NewLogfmtParser(false, false), 100_000) } func BenchmarkLogfmtExpressionParser(b *testing.B) { @@ -809,5 +837,5 @@ func BenchmarkLogfmtExpressionParser(b *testing.B) { b.Fatal("cannot create new logfmt expression parser:", err.Error()) } - logfmtBenchmark(b, parser) + logfmtBenchmark(b, parser, 1) } diff --git a/pkg/logql/syntax/extractor_test.go b/pkg/logql/syntax/extractor_test.go index d64c7003ff75a..7252a0f67c702 100644 --- a/pkg/logql/syntax/extractor_test.go +++ b/pkg/logql/syntax/extractor_test.go @@ -160,8 +160,7 @@ func Test_MultiVariantExpr_Extractors(t *testing.T) { seen := make(map[string]float64, len(samples)) for _, s := range samples { - lbls := s.Labels.Labels() - seen[lbls.String()] = s.Value + seen[s.Labels.String()] = s.Value } mvExpr, err := ParseSampleExpr(tc.variantQuery) diff --git a/pkg/logql/syntax/parser_test.go b/pkg/logql/syntax/parser_test.go index a55c237477b31..89f660d1ba22c 100644 --- a/pkg/logql/syntax/parser_test.go +++ b/pkg/logql/syntax/parser_test.go @@ -3467,12 +3467,12 @@ func Test_PipelineCombined(t *testing.T) { p, err := expr.Pipeline() require.Nil(t, err) sp := p.ForStream(labels.EmptyLabels()) - line, lbs, matches := sp.Process(0, []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`), labels.EmptyLabels()) + line, lbsResult, matches := sp.Process(0, []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`), labels.EmptyLabels()) require.True(t, matches) require.Equal( t, labels.FromStrings("caller", "logging.go:66", "duration", "1.5s", "level", "debug", "method", "POST", "msg", "POST /api/prom/api/v1/query_range (200) 1.5s", "path", "/api/prom/api/v1/query_range", "status", "200", "traceID", "a9d4d8a928d8db1", "ts", "2020-10-02T10:10:42.092268913Z"), - lbs.Labels(), + lbsResult.Labels(), ) require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line)) }