Skip to content

Commit a10fb83

Browse files
committed
Add some high level pipeline tests for prometheus
1 parent 22de0cf commit a10fb83

File tree

10 files changed

+328
-17
lines changed

10 files changed

+328
-17
lines changed

internal/component/otelcol/exporter/prometheus/prometheus.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ import (
88
"time"
99

1010
"github.com/go-kit/log"
11+
"github.com/prometheus/prometheus/storage"
12+
1113
"github.com/grafana/alloy/internal/component"
1214
"github.com/grafana/alloy/internal/component/otelcol"
1315
"github.com/grafana/alloy/internal/component/otelcol/exporter/prometheus/internal/convert"
1416
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
1517
"github.com/grafana/alloy/internal/component/prometheus"
1618
"github.com/grafana/alloy/internal/featuregate"
1719
"github.com/grafana/alloy/internal/service/labelstore"
18-
"github.com/prometheus/prometheus/storage"
1920
)
2021

2122
func init() {
@@ -87,7 +88,7 @@ func New(o component.Options, c Arguments) (*Component, error) {
8788
return nil, err
8889
}
8990
ls := service.(labelstore.LabelStore)
90-
fanout := prometheus.NewFanout(nil, o.ID, o.Registerer, ls)
91+
fanout := prometheus.NewFanout(nil, o.Registerer, ls)
9192

9293
converter := convert.New(o.Logger, fanout, convertArgumentsToConvertOptions(c))
9394

internal/component/prometheus/enrich/enrich.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func New(opts component.Options, args Arguments) (*Component, error) {
9797
}
9898
}
9999

100-
c.fanout = prometheus.NewFanout(args.ForwardTo, opts.ID, opts.Registerer, ls)
100+
c.fanout = prometheus.NewFanout(args.ForwardTo, opts.Registerer, ls)
101101
c.receiver = prometheus.NewInterceptor(
102102
c.fanout,
103103
ls,

internal/component/prometheus/fanout.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ var _ storage.Appendable = (*Fanout)(nil)
2626
type Fanout struct {
2727
mut sync.RWMutex
2828
// children is where to fan out.
29-
children []storage.Appendable
30-
// ComponentID is what component this belongs to.
31-
componentID string
29+
children []storage.Appendable
3230
writeLatency prometheus.Histogram
3331
samplesCounter prometheus.Counter
3432
ls labelstore.LabelStore
@@ -39,7 +37,7 @@ type Fanout struct {
3937
}
4038

4139
// NewFanout creates a fanout appendable.
42-
func NewFanout(children []storage.Appendable, componentID string, register prometheus.Registerer, ls labelstore.LabelStore) *Fanout {
40+
func NewFanout(children []storage.Appendable, register prometheus.Registerer, ls labelstore.LabelStore) *Fanout {
4341
wl := prometheus.NewHistogram(prometheus.HistogramOpts{
4442
Name: "prometheus_fanout_latency",
4543
Help: "Write latency for sending to direct and indirect components",
@@ -55,7 +53,6 @@ func NewFanout(children []storage.Appendable, componentID string, register prome
5553

5654
return &Fanout{
5755
children: children,
58-
componentID: componentID,
5956
writeLatency: wl,
6057
samplesCounter: s,
6158
ls: ls,

internal/component/prometheus/fanout_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import (
1313

1414
func TestRollback(t *testing.T) {
1515
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
16-
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
16+
fanout := NewFanout([]storage.Appendable{NewFanout(nil, prometheus.DefaultRegisterer, ls)}, prometheus.DefaultRegisterer, ls)
1717
app := fanout.Appender(t.Context())
1818
err := app.Rollback()
1919
require.NoError(t, err)
2020
}
2121

2222
func TestCommit(t *testing.T) {
2323
ls := labelstore.New(nil, prometheus.DefaultRegisterer)
24-
fanout := NewFanout([]storage.Appendable{NewFanout(nil, "1", prometheus.DefaultRegisterer, ls)}, "", prometheus.DefaultRegisterer, ls)
24+
fanout := NewFanout([]storage.Appendable{NewFanout(nil, prometheus.DefaultRegisterer, ls)}, prometheus.DefaultRegisterer, ls)
2525
app := fanout.Appender(t.Context())
2626
err := app.Commit()
2727
require.NoError(t, err)

internal/component/prometheus/operator/common/crdmanager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (c *crdManager) Run(ctx context.Context) error {
145145
}()
146146

147147
// Start prometheus scrape manager.
148-
alloyAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.ID, c.opts.Registerer, c.ls)
148+
alloyAppendable := prometheus.NewFanout(c.args.ForwardTo, c.opts.Registerer, c.ls)
149149
opts := &scrape.Options{}
150150
c.scrapeManager, err = scrape.NewManager(opts, slog.New(logging.NewSlogGoKitHandler(c.logger)), nil, alloyAppendable, unregisterer)
151151
if err != nil {
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
package prometheus_test
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
"net/http/httptest"
7+
"testing"
8+
"time"
9+
10+
"github.com/go-kit/log"
11+
promclient "github.com/prometheus/client_golang/prometheus"
12+
"github.com/prometheus/prometheus/model/labels"
13+
"github.com/prometheus/prometheus/prompb"
14+
"github.com/prometheus/prometheus/storage"
15+
"github.com/prometheus/prometheus/storage/remote"
16+
"github.com/stretchr/testify/require"
17+
18+
"github.com/grafana/alloy/internal/component"
19+
"github.com/grafana/alloy/internal/component/prometheus"
20+
"github.com/grafana/alloy/internal/component/prometheus/relabel"
21+
"github.com/grafana/alloy/internal/component/prometheus/remotewrite"
22+
"github.com/grafana/alloy/internal/runtime/componenttest"
23+
"github.com/grafana/alloy/internal/service/labelstore"
24+
"github.com/grafana/alloy/internal/util"
25+
"github.com/grafana/alloy/syntax"
26+
)
27+
28+
// This test simulates a scrape -> remote_write pipeline, without actually scraping
29+
func TestPipeline(t *testing.T) {
30+
handler, writeResult := channelServerHandler(t)
31+
pipeline, srv, ls := newDefaultPipeline(t, util.TestLogger(t), handler)
32+
defer srv.Close()
33+
34+
// We need to use a future timestamp since remote_write will ignore any
35+
// sample which is earlier than the time when it started. Adding a minute
36+
// ensures that our samples will never get ignored.
37+
sampleTimestamp := time.Now().Add(time.Minute).UnixMilli()
38+
39+
// Send metrics to our component. These will be written to the WAL and
40+
// subsequently written to our HTTP server.
41+
lset1 := labels.FromStrings("foo", "bar")
42+
sendMetric(t, pipeline.Appender(t.Context()), lset1, sampleTimestamp, 12)
43+
lset2 := labels.FromStrings("fizz", "buzz")
44+
sendMetric(t, pipeline.Appender(t.Context()), lset2, sampleTimestamp, 34)
45+
46+
expect := []prompb.TimeSeries{{
47+
Labels: []prompb.Label{
48+
{Name: "cluster", Value: "local"},
49+
{Name: "foo", Value: "bar"},
50+
},
51+
Samples: []prompb.Sample{
52+
{Timestamp: sampleTimestamp, Value: 12},
53+
},
54+
}, {
55+
Labels: []prompb.Label{
56+
{Name: "cluster", Value: "local"},
57+
{Name: "fizz", Value: "buzz"},
58+
},
59+
Samples: []prompb.Sample{
60+
{Timestamp: sampleTimestamp, Value: 34},
61+
},
62+
}}
63+
64+
select {
65+
case <-time.After(time.Minute):
66+
require.FailNow(t, "timed out waiting for metrics")
67+
case res := <-writeResult:
68+
require.Equal(t, expect, res.Timeseries)
69+
}
70+
71+
ref := ls.GetOrAddGlobalRefID(lset1)
72+
require.NotZero(t, ref)
73+
localRef := ls.GetLocalRefID("prometheus.remote_write.test", ref)
74+
require.NotZero(t, localRef)
75+
76+
ref = ls.GetOrAddGlobalRefID(lset2)
77+
require.NotZero(t, ref)
78+
localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref)
79+
require.NotZero(t, localRef)
80+
}
81+
82+
// This test simulates a scrape -> relabel -> remote_write pipeline, without actually scraping
83+
func TestRelabelPipeline(t *testing.T) {
84+
handler, writeResult := channelServerHandler(t)
85+
pipeline, srv, ls := newRelabelPipeline(t, util.TestLogger(t), handler)
86+
defer srv.Close()
87+
88+
// We need to use a future timestamp since remote_write will ignore any
89+
// sample which is earlier than the time when it started. Adding a minute
90+
// ensures that our samples will never get ignored.
91+
sampleTimestamp := time.Now().Add(time.Minute).UnixMilli()
92+
93+
// Send metrics to our component. These will be written to the WAL and
94+
// subsequently written to our HTTP server.
95+
lset1 := labels.FromStrings("foo", "bar")
96+
sendMetric(t, pipeline.Appender(t.Context()), lset1, sampleTimestamp, 12)
97+
lset2 := labels.FromStrings("fizz", "buzz")
98+
sendMetric(t, pipeline.Appender(t.Context()), lset2, sampleTimestamp, 34)
99+
100+
expect := []prompb.TimeSeries{{
101+
Labels: []prompb.Label{
102+
{Name: "cluster", Value: "local"},
103+
{Name: "foo", Value: "bar"},
104+
{Name: "lbl", Value: "foo"},
105+
},
106+
Samples: []prompb.Sample{
107+
{Timestamp: sampleTimestamp, Value: 12},
108+
},
109+
}, {
110+
Labels: []prompb.Label{
111+
{Name: "cluster", Value: "local"},
112+
{Name: "fizz", Value: "buzz"},
113+
{Name: "lbl", Value: "foo"},
114+
},
115+
Samples: []prompb.Sample{
116+
{Timestamp: sampleTimestamp, Value: 34},
117+
},
118+
}}
119+
120+
select {
121+
case <-time.After(time.Minute):
122+
require.FailNow(t, "timed out waiting for metrics")
123+
case res := <-writeResult:
124+
require.Equal(t, expect, res.Timeseries)
125+
}
126+
127+
ref := ls.GetOrAddGlobalRefID(lset1)
128+
require.NotZero(t, ref)
129+
// This was relabeled, so we shouldn't have a local ref
130+
localRef := ls.GetLocalRefID("prometheus.remote_write.test", ref)
131+
require.Zero(t, localRef)
132+
133+
ref = ls.GetOrAddGlobalRefID(lset2)
134+
require.NotZero(t, ref)
135+
// This was relabeled, so we shouldn't have a local ref
136+
localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref)
137+
require.Zero(t, localRef)
138+
139+
lset1Relabeled := labels.NewBuilder(lset1).Set("lbl", "foo").Labels()
140+
ref = ls.GetOrAddGlobalRefID(lset1Relabeled)
141+
require.NotZero(t, ref)
142+
localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref)
143+
require.NotZero(t, localRef)
144+
145+
lset2Relabeled := labels.NewBuilder(lset2).Set("lbl", "foo").Labels()
146+
ref = ls.GetOrAddGlobalRefID(lset2Relabeled)
147+
require.NotZero(t, ref)
148+
localRef = ls.GetLocalRefID("prometheus.remote_write.test", ref)
149+
require.NotZero(t, localRef)
150+
}
151+
152+
func channelServerHandler(t *testing.T) (http.HandlerFunc, chan *prompb.WriteRequest) {
153+
writeResult := make(chan *prompb.WriteRequest)
154+
serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
155+
req, err := remote.DecodeWriteRequest(r.Body)
156+
if err != nil {
157+
http.Error(w, err.Error(), http.StatusBadRequest)
158+
return
159+
}
160+
161+
select {
162+
case writeResult <- req:
163+
default:
164+
require.Fail(t, "failed to send remote_write result over channel")
165+
}
166+
})
167+
168+
return serverHandler, writeResult
169+
}
170+
171+
func BenchmarkPipelines(b *testing.B) {
172+
tests := []struct {
173+
name string
174+
pipelineBuilder func(t testing.TB, logger log.Logger, handlerFunc http.HandlerFunc) (storage.Appendable, *httptest.Server, labelstore.LabelStore)
175+
}{
176+
{"default", newDefaultPipeline},
177+
{"relabel", newRelabelPipeline},
178+
}
179+
180+
numberOfMetrics := []int{2, 10, 100, 1000}
181+
182+
for _, n := range numberOfMetrics {
183+
for _, tt := range tests {
184+
// Don't need the server to do anything here
185+
pipeline, srv, _ := tt.pipelineBuilder(b, log.NewNopLogger(), func(w http.ResponseWriter, r *http.Request) {})
186+
b.Run(fmt.Sprintf("%s/%d-metrics", tt.name, n), func(b *testing.B) {
187+
b.ReportAllocs()
188+
b.ResetTimer()
189+
190+
for b.Loop() {
191+
for i := 0; i < n; i++ {
192+
sendMetric(
193+
b,
194+
pipeline.Appender(b.Context()),
195+
labels.FromStrings(fmt.Sprintf("metric-%d", i), fmt.Sprintf("metric-%d", i)),
196+
time.Now().Add(time.Minute).UnixMilli(),
197+
float64(i),
198+
)
199+
}
200+
}
201+
})
202+
srv.Close()
203+
}
204+
}
205+
}
206+
207+
func newDefaultPipeline(t testing.TB, logger log.Logger, handlerFunc http.HandlerFunc) (storage.Appendable, *httptest.Server, labelstore.LabelStore) {
208+
srv := httptest.NewServer(handlerFunc)
209+
210+
ls := labelstore.New(logger, promclient.DefaultRegisterer)
211+
rwAppendable := newRemoteWriteComponent(t, logger, srv.URL, ls)
212+
pipelineAppendable := prometheus.NewFanout([]storage.Appendable{rwAppendable}, promclient.DefaultRegisterer, ls)
213+
214+
return pipelineAppendable, srv, ls
215+
}
216+
217+
func newRelabelPipeline(t testing.TB, logger log.Logger, handlerFunc http.HandlerFunc) (storage.Appendable, *httptest.Server, labelstore.LabelStore) {
218+
srv := httptest.NewServer(handlerFunc)
219+
220+
ls := labelstore.New(logger, promclient.DefaultRegisterer)
221+
rwAppendable := newRemoteWriteComponent(t, logger, srv.URL, ls)
222+
relabelAppendable := newRelabelComponent(t, logger, []storage.Appendable{rwAppendable}, ls)
223+
pipelineAppendable := prometheus.NewFanout([]storage.Appendable{relabelAppendable}, promclient.DefaultRegisterer, ls)
224+
225+
return pipelineAppendable, srv, ls
226+
}
227+
228+
func newRemoteWriteComponent(t testing.TB, logger log.Logger, url string, ls *labelstore.Service) storage.Appendable {
229+
// Create our component and wait for it to start running, so we can write
230+
// metrics to the WAL.
231+
cfg := fmt.Sprintf(`
232+
external_labels = {
233+
cluster = "local",
234+
}
235+
endpoint {
236+
name = "test-url"
237+
url = "%s/api/v1/write"
238+
remote_timeout = "100ms"
239+
240+
queue_config {
241+
batch_send_deadline = "100ms"
242+
}
243+
}
244+
`, url)
245+
var args remotewrite.Arguments
246+
require.NoError(t, syntax.Unmarshal([]byte(cfg), &args))
247+
248+
tc, err := componenttest.NewControllerFromID(logger, "prometheus.remote_write")
249+
require.NoError(t, err)
250+
go func() {
251+
err = tc.Run(componenttest.TestContext(t), args, func(opts component.Options) component.Options {
252+
inner := opts.GetServiceData
253+
opts.GetServiceData = func(name string) (interface{}, error) {
254+
if name == labelstore.ServiceName {
255+
return ls, nil
256+
}
257+
return inner(name)
258+
}
259+
return opts
260+
})
261+
require.NoError(t, err)
262+
}()
263+
require.NoError(t, tc.WaitRunning(5*time.Second))
264+
265+
return tc.Exports().(remotewrite.Exports).Receiver
266+
}
267+
268+
func newRelabelComponent(t testing.TB, logger log.Logger, forwardTo []storage.Appendable, ls *labelstore.Service) storage.Appendable {
269+
cfg := `forward_to = []
270+
rule {
271+
action = "replace"
272+
target_label = "lbl"
273+
replacement = "foo"
274+
}`
275+
var args relabel.Arguments
276+
require.NoError(t, syntax.Unmarshal([]byte(cfg), &args))
277+
args.ForwardTo = forwardTo
278+
279+
tc, err := componenttest.NewControllerFromID(logger, "prometheus.relabel")
280+
require.NoError(t, err)
281+
go func() {
282+
err = tc.Run(componenttest.TestContext(t), args, func(opts component.Options) component.Options {
283+
inner := opts.GetServiceData
284+
opts.GetServiceData = func(name string) (interface{}, error) {
285+
if name == labelstore.ServiceName {
286+
return ls, nil
287+
}
288+
return inner(name)
289+
}
290+
return opts
291+
})
292+
require.NoError(t, err)
293+
}()
294+
require.NoError(t, tc.WaitRunning(5*time.Second))
295+
296+
return tc.Exports().(relabel.Exports).Receiver
297+
}
298+
299+
func sendMetric(
300+
t testing.TB,
301+
appender storage.Appender,
302+
labels labels.Labels,
303+
time int64,
304+
value float64,
305+
) {
306+
_, err := appender.Append(0, labels, time, value)
307+
require.NoError(t, err)
308+
require.NoError(t, appender.Commit())
309+
}

0 commit comments

Comments
 (0)