Skip to content

Commit a27d37a

Browse files
committed
Remove labelstore interactions from the interceptor
1 parent 4f6ec89 commit a27d37a

File tree

11 files changed

+40
-118
lines changed

11 files changed

+40
-118
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ Main (unreleased)
2424

2525
- Add `meta_cache_address` to `beyla.ebpf` component. (@skl)
2626

27+
- Remove labelstore interactions from the prometheus interceptor simplifying prometheus pipelines. (@kgeckhart)
28+
2729
### Bugfixes
2830

2931
- `loki.source.api` no longer drops request when relabel rules drops a specific stream. (@kalleep)
@@ -98,7 +100,6 @@ v1.12.0-rc.0
98100
- The `otelcol.processor.servicegraph` component now supports defining the maximum number of buckets for generated exponential histograms.
99101
- See the upstream [core][https://github.com/open-telemetry/opentelemetry-collector/blob/v0.139.0/CHANGELOG.md] and [contrib][https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.139.0/CHANGELOG.md] changelogs for more details.
100102

101-
102103
### Enhancements
103104

104105
- Add per-application rate limiting with the `strategy` attribute in the `faro.receiver` component, to prevent one application from consuming the rate limit quota of others. (@hhertout)

internal/component/prometheus/enrich/enrich.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ func New(opts component.Options, args Arguments) (*Component, error) {
100100
c.fanout = prometheus.NewFanout(args.ForwardTo, opts.Registerer, ls)
101101
c.receiver = prometheus.NewInterceptor(
102102
c.fanout,
103-
ls,
104103
prometheus.WithComponentID(c.opts.ID),
105104
prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
106105
if c.exited.Load() {

internal/component/prometheus/enrich/enrich_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@ import (
55
"testing"
66
"time"
77

8+
prom "github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/prometheus/model/labels"
10+
"github.com/prometheus/prometheus/storage"
11+
"github.com/stretchr/testify/require"
12+
813
"github.com/grafana/alloy/internal/component"
914
"github.com/grafana/alloy/internal/component/discovery"
1015
"github.com/grafana/alloy/internal/component/prometheus"
1116
"github.com/grafana/alloy/internal/service/labelstore"
1217
"github.com/grafana/alloy/internal/util"
13-
prom "github.com/prometheus/client_golang/prometheus"
14-
"github.com/prometheus/prometheus/model/labels"
15-
"github.com/prometheus/prometheus/storage"
16-
"github.com/stretchr/testify/require"
1718
)
1819

1920
func TestEnricher(t *testing.T) {
@@ -108,9 +109,8 @@ func TestEnricher(t *testing.T) {
108109
}
109110
for _, tt := range tests {
110111
t.Run(tt.name, func(t *testing.T) {
111-
ls := labelstore.New(nil, prom.DefaultRegisterer)
112112
fanout := prometheus.NewInterceptor(
113-
nil, ls,
113+
nil,
114114
prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
115115
for name, value := range tt.expectedLabels {
116116
require.Equal(t, l.Get(name), value)

internal/component/prometheus/interceptor.go

Lines changed: 9 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ import (
88
"github.com/prometheus/prometheus/model/labels"
99
"github.com/prometheus/prometheus/model/metadata"
1010
"github.com/prometheus/prometheus/storage"
11-
"go.uber.org/atomic"
12-
13-
"github.com/grafana/alloy/internal/service/labelstore"
1411
)
1512

1613
// Interceptor is a storage.Appendable which invokes callback functions upon
@@ -26,23 +23,16 @@ type Interceptor struct {
2623
// next is the next appendable to pass in the chain.
2724
next storage.Appendable
2825

29-
ls labelstore.LabelStore
30-
31-
// lastSeriesCount stores the number of series that were sent through the last interceptappender. It helps to estimate how
32-
// much memory to allocate for the staleness trackers.
33-
lastSeriesCount atomic.Int64
34-
3526
componentID string
3627
}
3728

3829
var _ storage.Appendable = (*Interceptor)(nil)
3930

4031
// NewInterceptor creates a new Interceptor storage.Appendable. Options can be
4132
// provided to NewInterceptor to install custom hooks for different methods.
42-
func NewInterceptor(next storage.Appendable, ls labelstore.LabelStore, opts ...InterceptorOption) *Interceptor {
33+
func NewInterceptor(next storage.Appendable, opts ...InterceptorOption) *Interceptor {
4334
i := &Interceptor{
4435
next: next,
45-
ls: ls,
4636
}
4737
for _, opt := range opts {
4838
opt(i)
@@ -102,27 +92,23 @@ func WithComponentID(id string) InterceptorOption {
10292
}
10393

10494
// Appender satisfies the Appendable interface.
105-
func (f *Interceptor) Appender(ctx context.Context) storage.Appender {
95+
func (i *Interceptor) Appender(ctx context.Context) storage.Appender {
10696
app := &interceptappender{
107-
interceptor: f,
108-
ls: f.ls,
109-
stalenessTrackers: make([]labelstore.StalenessTracker, 0, f.lastSeriesCount.Load()),
97+
interceptor: i,
11098
}
111-
if f.next != nil {
112-
app.child = f.next.Appender(ctx)
99+
if i.next != nil {
100+
app.child = i.next.Appender(ctx)
113101
}
114102
return app
115103
}
116104

117-
func (f *Interceptor) String() string {
118-
return f.componentID + ".receiver"
105+
func (i *Interceptor) String() string {
106+
return i.componentID + ".receiver"
119107
}
120108

121109
type interceptappender struct {
122-
interceptor *Interceptor
123-
child storage.Appender
124-
ls labelstore.LabelStore
125-
stalenessTrackers []labelstore.StalenessTracker
110+
interceptor *Interceptor
111+
child storage.Appender
126112
}
127113

128114
func (a *interceptappender) SetOptions(opts *storage.AppendOptions) {
@@ -135,15 +121,6 @@ var _ storage.Appender = (*interceptappender)(nil)
135121

136122
// Append satisfies the Appender interface.
137123
func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
138-
if ref == 0 {
139-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
140-
}
141-
a.stalenessTrackers = append(a.stalenessTrackers, labelstore.StalenessTracker{
142-
GlobalRefID: uint64(ref),
143-
Labels: l,
144-
Value: v,
145-
})
146-
147124
if a.interceptor.onAppend != nil {
148125
return a.interceptor.onAppend(ref, l, t, v, a.child)
149126
}
@@ -155,8 +132,6 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int
155132

156133
// Commit satisfies the Appender interface.
157134
func (a *interceptappender) Commit() error {
158-
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
159-
a.ls.TrackStaleness(a.stalenessTrackers)
160135
if a.child == nil {
161136
return nil
162137
}
@@ -165,8 +140,6 @@ func (a *interceptappender) Commit() error {
165140

166141
// Rollback satisfies the Appender interface.
167142
func (a *interceptappender) Rollback() error {
168-
a.interceptor.lastSeriesCount.Store(int64(len(a.stalenessTrackers)))
169-
a.ls.TrackStaleness(a.stalenessTrackers)
170143
if a.child == nil {
171144
return nil
172145
}
@@ -180,10 +153,6 @@ func (a *interceptappender) AppendExemplar(
180153
e exemplar.Exemplar,
181154
) (storage.SeriesRef, error) {
182155

183-
if ref == 0 {
184-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
185-
}
186-
187156
if a.interceptor.onAppendExemplar != nil {
188157
return a.interceptor.onAppendExemplar(ref, l, e, a.child)
189158
}
@@ -200,10 +169,6 @@ func (a *interceptappender) UpdateMetadata(
200169
m metadata.Metadata,
201170
) (storage.SeriesRef, error) {
202171

203-
if ref == 0 {
204-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
205-
}
206-
207172
if a.interceptor.onUpdateMetadata != nil {
208173
return a.interceptor.onUpdateMetadata(ref, l, m, a.child)
209174
}
@@ -221,10 +186,6 @@ func (a *interceptappender) AppendHistogram(
221186
fh *histogram.FloatHistogram,
222187
) (storage.SeriesRef, error) {
223188

224-
if ref == 0 {
225-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
226-
}
227-
// TODO histograms are not currently tracked for staleness causing them to be held forever
228189
if a.interceptor.onAppendHistogram != nil {
229190
return a.interceptor.onAppendHistogram(ref, l, t, h, fh, a.child)
230191
}
@@ -240,10 +201,6 @@ func (a *interceptappender) AppendCTZeroSample(
240201
t, ct int64,
241202
) (storage.SeriesRef, error) {
242203

243-
if ref == 0 {
244-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
245-
}
246-
247204
if a.interceptor.onAppendCTZeroSample != nil {
248205
return a.interceptor.onAppendCTZeroSample(ref, l, t, ct, a.child)
249206
}
@@ -261,10 +218,6 @@ func (a *interceptappender) AppendHistogramCTZeroSample(
261218
fh *histogram.FloatHistogram,
262219
) (storage.SeriesRef, error) {
263220

264-
if ref == 0 {
265-
ref = storage.SeriesRef(a.ls.GetOrAddGlobalRefID(l))
266-
}
267-
268221
if a.child == nil {
269222
return 0, nil
270223
}

internal/component/prometheus/pipeline_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func newDefaultPipeline(t testing.TB, logger log.Logger, handlerFunc http.Handle
221221
ls := labelstore.New(logger, promclient.DefaultRegisterer)
222222
rwAppendable := newRemoteWriteComponent(t, logger, srv.URL, ls)
223223
pipelineAppendable := prometheus.NewFanout([]storage.Appendable{rwAppendable}, promclient.DefaultRegisterer, ls)
224-
scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", ls, livedebugging.NewLiveDebugging(), pipelineAppendable)
224+
scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", livedebugging.NewLiveDebugging(), pipelineAppendable)
225225

226226
return scrapeInterceptor, srv, ls
227227
}
@@ -233,7 +233,7 @@ func newRelabelPipeline(t testing.TB, logger log.Logger, handlerFunc http.Handle
233233
rwAppendable := newRemoteWriteComponent(t, logger, srv.URL, ls)
234234
relabelAppendable := newRelabelComponent(t, logger, []storage.Appendable{rwAppendable}, ls)
235235
pipelineAppendable := prometheus.NewFanout([]storage.Appendable{relabelAppendable}, promclient.DefaultRegisterer, ls)
236-
scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", ls, livedebugging.NewLiveDebugging(), pipelineAppendable)
236+
scrapeInterceptor := scrape.NewInterceptor("prometheus.scrape.test", livedebugging.NewLiveDebugging(), pipelineAppendable)
237237

238238
return scrapeInterceptor, srv, ls
239239
}

internal/component/prometheus/receive_http/receive_http_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@ import (
1717
"time"
1818

1919
"github.com/golang/snappy"
20-
"github.com/grafana/alloy/internal/component"
21-
fnet "github.com/grafana/alloy/internal/component/common/net"
22-
alloyprom "github.com/grafana/alloy/internal/component/prometheus"
23-
"github.com/grafana/alloy/internal/service/labelstore"
24-
"github.com/grafana/alloy/internal/util"
25-
"github.com/grafana/alloy/syntax/alloytypes"
2620
"github.com/phayes/freeport"
2721
"github.com/prometheus/client_golang/prometheus"
2822
"github.com/prometheus/common/config"
@@ -35,6 +29,13 @@ import (
3529
"github.com/stretchr/testify/require"
3630
"google.golang.org/protobuf/proto"
3731
"google.golang.org/protobuf/protoadapt"
32+
33+
"github.com/grafana/alloy/internal/component"
34+
fnet "github.com/grafana/alloy/internal/component/common/net"
35+
alloyprom "github.com/grafana/alloy/internal/component/prometheus"
36+
"github.com/grafana/alloy/internal/service/labelstore"
37+
"github.com/grafana/alloy/internal/util"
38+
"github.com/grafana/alloy/syntax/alloytypes"
3839
)
3940

4041
// generateTestCertAndKey generates a self-signed certificate and private key for testing
@@ -569,10 +570,8 @@ func testAppendable(actualSamples chan testSample) []storage.Appendable {
569570
return ref, nil
570571
}
571572

572-
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
573573
return []storage.Appendable{alloyprom.NewInterceptor(
574574
nil,
575-
ls,
576575
alloyprom.WithAppendHook(
577576
hookFn))}
578577
}

internal/component/prometheus/relabel/relabel.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ func New(o component.Options, args Arguments) (*Component, error) {
156156
c.fanout = prometheus.NewFanout(args.ForwardTo, o.Registerer, ls)
157157
c.receiver = prometheus.NewInterceptor(
158158
c.fanout,
159-
ls,
160159
prometheus.WithComponentID(c.opts.ID),
161160
prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) {
162161
if c.exited.Load() {

internal/component/prometheus/relabel/relabel_test.go

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,30 +25,6 @@ import (
2525
"github.com/grafana/alloy/syntax"
2626
)
2727

28-
func TestRelabelThroughAppend(t *testing.T) {
29-
appendable, relabeller := generateRelabel(t)
30-
lbls := labels.FromStrings("__address__", "localhost")
31-
32-
app := appendable.Appender(t.Context())
33-
relabedRef, err := app.Append(storage.SeriesRef(0), lbls, time.Now().UnixMilli(), 0)
34-
require.NoError(t, err)
35-
require.NoError(t, app.Commit())
36-
37-
require.True(t, relabeller.cache.Len() == 1)
38-
// Get the first entry since we only have one we can get oldest
39-
ref, cachedLbls, _ := relabeller.cache.GetOldest()
40-
41-
// We shouldn't have allowed a zero ref to be cached
42-
require.NotEqual(t, storage.SeriesRef(0), ref)
43-
require.NotEqual(t, lbls, cachedLbls)
44-
45-
// We should have added a new ref after relabeling
46-
require.NotEqual(t, storage.SeriesRef(0), relabedRef)
47-
48-
// That ref should not be the cached ref
49-
require.NotEqual(t, relabedRef, ref)
50-
}
51-
5228
func TestUpdateReset(t *testing.T) {
5329
_, relabeller := generateRelabel(t)
5430
lbls := labels.FromStrings("__address__", "localhost")
@@ -72,8 +48,7 @@ func TestValidator(t *testing.T) {
7248
}
7349

7450
func TestNil(t *testing.T) {
75-
ls := labelstore.New(nil, prom.DefaultRegisterer)
76-
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
51+
fanout := prometheus.NewInterceptor(nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, _ labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
7752
require.True(t, false)
7853
return ref, nil
7954
}))
@@ -138,8 +113,7 @@ func TestMetrics(t *testing.T) {
138113
}
139114

140115
func BenchmarkCache(b *testing.B) {
141-
ls := labelstore.New(nil, prom.DefaultRegisterer)
142-
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
116+
fanout := prometheus.NewInterceptor(nil, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
143117
require.True(b, l.Has("new_label"))
144118
return ref, nil
145119
}))
@@ -174,8 +148,7 @@ func BenchmarkCache(b *testing.B) {
174148
}
175149

176150
func generateRelabel(t *testing.T) (storage.Appendable, *Component) {
177-
ls := labelstore.New(nil, prom.DefaultRegisterer)
178-
fanout := prometheus.NewInterceptor(nil, ls)
151+
fanout := prometheus.NewInterceptor(nil)
179152
var appendable storage.Appendable
180153
relabeller, err := New(component.Options{
181154
ID: "1",

internal/component/prometheus/remotewrite/remote_write.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ func New(o component.Options, args Arguments) (*Component, error) {
153153

154154
res.receiver = prometheus.NewInterceptor(
155155
res.storage,
156-
ls,
157156
prometheus.WithComponentID(res.opts.ID),
158157
// In the methods below, conversion is needed because remote_writes assume
159158
// they are responsible for generating ref IDs. This means two

0 commit comments

Comments
 (0)