Skip to content

Commit 214733c

Browse files
aleks-pkorniltsev
andauthored
feat: annotate sampled profiles (#4375)
* feat: annotate sampled profiles * Update pkg/distributor/annotation/throttling.go Co-authored-by: Tolya Korniltsev <[email protected]> --------- Co-authored-by: Tolya Korniltsev <[email protected]>
1 parent d156d74 commit 214733c

File tree

8 files changed

+115
-58
lines changed

8 files changed

+115
-58
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package annotation
2+
3+
const (
4+
ProfileAnnotationKeyThrottled = "pyroscope.ingest.throttled"
5+
ProfileAnnotationKeySampled = "pyroscope.ingest.sampled"
6+
)
7+
8+
type ProfileAnnotation struct {
9+
Body interface{} `json:"body"`
10+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package annotation
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/grafana/pyroscope/pkg/distributor/sampling"
7+
)
8+
9+
type SampledAnnotation struct {
10+
Source *sampling.Source `json:"source"`
11+
}
12+
13+
func CreateProfileAnnotation(source *sampling.Source) ([]byte, error) {
14+
a := &ProfileAnnotation{
15+
Body: SampledAnnotation{
16+
Source: source,
17+
},
18+
}
19+
return json.Marshal(a)
20+
}
Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,23 @@
1-
package ingestlimits
1+
package annotation
22

33
import (
44
"encoding/json"
55
"fmt"
6+
7+
"github.com/grafana/pyroscope/pkg/distributor/ingestlimits"
68
)
79

8-
func CreateTenantAnnotation(c *Config) ([]byte, error) {
9-
annotation := &ProfileAnnotation{
10+
type ThrottledAnnotation struct {
11+
PeriodType string `json:"periodType"`
12+
PeriodLimitMb int `json:"periodLimitMb"`
13+
LimitResetTime int64 `json:"limitResetTime"`
14+
SamplingPeriodSec int `json:"samplingPeriodSec"`
15+
SamplingRequests int `json:"samplingRequests"`
16+
UsageGroup string `json:"usageGroup"`
17+
}
18+
19+
func CreateTenantAnnotation(c *ingestlimits.Config) ([]byte, error) {
20+
a := &ProfileAnnotation{
1021
Body: ThrottledAnnotation{
1122
PeriodType: c.PeriodType,
1223
PeriodLimitMb: c.PeriodLimitMb,
@@ -15,15 +26,15 @@ func CreateTenantAnnotation(c *Config) ([]byte, error) {
1526
SamplingRequests: c.Sampling.NumRequests,
1627
},
1728
}
18-
return json.Marshal(annotation)
29+
return json.Marshal(a)
1930
}
2031

21-
func CreateUsageGroupAnnotation(c *Config, usageGroup string) ([]byte, error) {
32+
func CreateUsageGroupAnnotation(c *ingestlimits.Config, usageGroup string) ([]byte, error) {
2233
l, ok := c.UsageGroups[usageGroup]
2334
if !ok {
2435
return nil, fmt.Errorf("usageGroup %s not found", usageGroup)
2536
}
26-
annotation := &ProfileAnnotation{
37+
a := &ProfileAnnotation{
2738
Body: ThrottledAnnotation{
2839
PeriodType: c.PeriodType,
2940
PeriodLimitMb: l.PeriodLimitMb,
@@ -33,22 +44,5 @@ func CreateUsageGroupAnnotation(c *Config, usageGroup string) ([]byte, error) {
3344
UsageGroup: usageGroup,
3445
},
3546
}
36-
return json.Marshal(annotation)
37-
}
38-
39-
type ProfileAnnotation struct {
40-
Body interface{} `json:"body"`
41-
}
42-
43-
const (
44-
ProfileAnnotationKeyThrottled = "pyroscope.ingest.throttled"
45-
)
46-
47-
type ThrottledAnnotation struct {
48-
PeriodType string `json:"periodType"`
49-
PeriodLimitMb int `json:"periodLimitMb"`
50-
LimitResetTime int64 `json:"limitResetTime"`
51-
SamplingPeriodSec int `json:"samplingPeriodSec"`
52-
SamplingRequests int `yaml:"samplingRequests"`
53-
UsageGroup string `json:"usageGroup"`
47+
return json.Marshal(a)
5448
}

pkg/distributor/distributor.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -382,17 +382,24 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
382382
return err
383383
}
384384

385-
if sample, usageGroup := d.shouldSample(tenantID, groups.Names()); !sample {
385+
willSample, samplingSource := d.shouldSample(tenantID, groups.Names())
386+
if !willSample {
386387
level.Debug(logger).Log(
387388
"msg", "skipping push request due to sampling",
388389
"tenant", tenantID,
389-
"usage_group", usageGroup,
390+
"usage_group", samplingSource.UsageGroup,
391+
"probability", samplingSource.Probability,
390392
)
391393
validation.DiscardedProfiles.WithLabelValues(string(validation.SkippedBySamplingRules), tenantID).Add(float64(req.TotalProfiles))
392394
validation.DiscardedBytes.WithLabelValues(string(validation.SkippedBySamplingRules), tenantID).Add(float64(req.TotalBytesUncompressed))
393395
groups.CountDiscardedBytes(string(validation.SkippedBySamplingRules), req.TotalBytesUncompressed)
394396
return nil
395397
}
398+
if samplingSource != nil {
399+
if err := req.MarkSampledRequest(samplingSource); err != nil {
400+
return err
401+
}
402+
}
396403

397404
profLanguage := d.GetProfileLanguage(req)
398405
defer func() { // defer to allow re-calculate the size of the profile after normalization
@@ -919,7 +926,7 @@ func (d *Distributor) checkUsageGroupsIngestLimit(req *distributormodel.ProfileS
919926
}
920927

921928
// shouldSample returns true if the profile should be injected and optionally the usage group that was responsible for the decision.
922-
func (d *Distributor) shouldSample(tenantID string, groupsInRequest []validation.UsageGroupMatchName) (bool, *validation.UsageGroupMatchName) {
929+
func (d *Distributor) shouldSample(tenantID string, groupsInRequest []validation.UsageGroupMatchName) (bool, *sampling.Source) {
923930
l := d.limits.DistributorSampling(tenantID)
924931
if l == nil {
925932
return true, nil
@@ -950,7 +957,12 @@ func (d *Distributor) shouldSample(tenantID string, groupsInRequest []validation
950957
return true, nil
951958
}
952959

953-
return rand.Float64() <= samplingProbability, match
960+
source := &sampling.Source{
961+
UsageGroup: match.ResolvedName,
962+
Probability: samplingProbability,
963+
}
964+
965+
return rand.Float64() <= samplingProbability, source
954966
}
955967

956968
type profileTracker struct {

pkg/distributor/distributor_test.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
3939
connectapi "github.com/grafana/pyroscope/pkg/api/connect"
4040
"github.com/grafana/pyroscope/pkg/clientpool"
41+
"github.com/grafana/pyroscope/pkg/distributor/annotation"
4142
"github.com/grafana/pyroscope/pkg/distributor/ingestlimits"
4243
distributormodel "github.com/grafana/pyroscope/pkg/distributor/model"
4344
"github.com/grafana/pyroscope/pkg/distributor/sampling"
@@ -2062,7 +2063,7 @@ func TestPush_Aggregation(t *testing.T) {
20622063
for i, req := range ingesterClient.requests {
20632064
for _, series := range req.Series {
20642065
require.Lenf(t, series.Annotations, 1, "failed request %d", i)
2065-
assert.Equal(t, ingestlimits.ProfileAnnotationKeyThrottled, series.Annotations[0].Key)
2066+
assert.Equal(t, annotation.ProfileAnnotationKeyThrottled, series.Annotations[0].Key)
20662067
assert.Contains(t, series.Annotations[0].Value, "\"periodLimitMb\":128")
20672068
}
20682069
}
@@ -2344,7 +2345,7 @@ func TestDistributor_shouldSample(t *testing.T) {
23442345
groups []validation.UsageGroupMatchName
23452346
samplingConfig *sampling.Config
23462347
expected bool
2347-
expectedMatch *validation.UsageGroupMatchName
2348+
expectedMatch *sampling.Source
23482349
}{
23492350
{
23502351
name: "no sampling config - should accept",
@@ -2373,9 +2374,9 @@ func TestDistributor_shouldSample(t *testing.T) {
23732374
},
23742375
},
23752376
expected: true,
2376-
expectedMatch: &validation.UsageGroupMatchName{
2377-
ConfiguredName: "group1",
2378-
ResolvedName: "group1",
2377+
expectedMatch: &sampling.Source{
2378+
UsageGroup: "group1",
2379+
Probability: 1.0,
23792380
},
23802381
},
23812382
{
@@ -2388,9 +2389,9 @@ func TestDistributor_shouldSample(t *testing.T) {
23882389
},
23892390
},
23902391
expected: true,
2391-
expectedMatch: &validation.UsageGroupMatchName{
2392-
ConfiguredName: "configured-name",
2393-
ResolvedName: "resolved-name",
2392+
expectedMatch: &sampling.Source{
2393+
UsageGroup: "resolved-name",
2394+
Probability: 1.0,
23942395
},
23952396
},
23962397
{
@@ -2403,9 +2404,9 @@ func TestDistributor_shouldSample(t *testing.T) {
24032404
},
24042405
},
24052406
expected: false,
2406-
expectedMatch: &validation.UsageGroupMatchName{
2407-
ConfiguredName: "group1",
2408-
ResolvedName: "group1",
2407+
expectedMatch: &sampling.Source{
2408+
UsageGroup: "group1",
2409+
Probability: 0.0,
24092410
},
24102411
},
24112412
{
@@ -2422,9 +2423,9 @@ func TestDistributor_shouldSample(t *testing.T) {
24222423
},
24232424
},
24242425
expected: false,
2425-
expectedMatch: &validation.UsageGroupMatchName{
2426-
ConfiguredName: "group2",
2427-
ResolvedName: "group2",
2426+
expectedMatch: &sampling.Source{
2427+
UsageGroup: "group2",
2428+
Probability: 0.0,
24282429
},
24292430
},
24302431
{
@@ -2441,9 +2442,9 @@ func TestDistributor_shouldSample(t *testing.T) {
24412442
},
24422443
},
24432444
expected: true,
2444-
expectedMatch: &validation.UsageGroupMatchName{
2445-
ConfiguredName: "test_service",
2446-
ResolvedName: "test_service",
2445+
expectedMatch: &sampling.Source{
2446+
UsageGroup: "test_service",
2447+
Probability: 1.0,
24472448
},
24482449
},
24492450
{
@@ -2460,9 +2461,9 @@ func TestDistributor_shouldSample(t *testing.T) {
24602461
},
24612462
},
24622463
expected: true,
2463-
expectedMatch: &validation.UsageGroupMatchName{
2464-
ConfiguredName: "test_service",
2465-
ResolvedName: "test_service",
2464+
expectedMatch: &sampling.Source{
2465+
UsageGroup: "test_service",
2466+
Probability: 1.0,
24662467
},
24672468
},
24682469
}

pkg/distributor/model/push.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package model
22

33
import (
44
v1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
5+
"github.com/grafana/pyroscope/pkg/distributor/annotation"
56
"github.com/grafana/pyroscope/pkg/distributor/ingestlimits"
7+
"github.com/grafana/pyroscope/pkg/distributor/sampling"
68
phlaremodel "github.com/grafana/pyroscope/pkg/model"
79
"github.com/grafana/pyroscope/pkg/pprof"
810
)
@@ -45,8 +47,8 @@ type ProfileSeries struct {
4547
DiscardedBytesRelabeling int64
4648
}
4749

48-
func (p *ProfileSeries) GetLanguage() string {
49-
spyName := phlaremodel.Labels(p.Labels).Get(phlaremodel.LabelNamePyroscopeSpy)
50+
func (req *ProfileSeries) GetLanguage() string {
51+
spyName := phlaremodel.Labels(req.Labels).Get(phlaremodel.LabelNamePyroscopeSpy)
5052
if spyName != "" {
5153
lang := getProfileLanguageFromSpy(spyName)
5254
if lang != "" {
@@ -94,25 +96,37 @@ func (req *ProfileSeries) Clone() *ProfileSeries {
9496
}
9597

9698
func (req *ProfileSeries) MarkThrottledTenant(l *ingestlimits.Config) error {
97-
annotation, err := ingestlimits.CreateTenantAnnotation(l)
99+
a, err := annotation.CreateTenantAnnotation(l)
98100
if err != nil {
99101
return err
100102
}
101103
req.Annotations = append(req.Annotations, &v1.ProfileAnnotation{
102-
Key: ingestlimits.ProfileAnnotationKeyThrottled,
103-
Value: string(annotation),
104+
Key: annotation.ProfileAnnotationKeyThrottled,
105+
Value: string(a),
104106
})
105107
return nil
106108
}
107109

108110
func (req *ProfileSeries) MarkThrottledUsageGroup(l *ingestlimits.Config, usageGroup string) error {
109-
annotation, err := ingestlimits.CreateUsageGroupAnnotation(l, usageGroup)
111+
a, err := annotation.CreateUsageGroupAnnotation(l, usageGroup)
110112
if err != nil {
111113
return err
112114
}
113115
req.Annotations = append(req.Annotations, &v1.ProfileAnnotation{
114-
Key: ingestlimits.ProfileAnnotationKeyThrottled,
115-
Value: string(annotation),
116+
Key: annotation.ProfileAnnotationKeyThrottled,
117+
Value: string(a),
118+
})
119+
return nil
120+
}
121+
122+
func (req *ProfileSeries) MarkSampledRequest(source *sampling.Source) error {
123+
a, err := annotation.CreateProfileAnnotation(source)
124+
if err != nil {
125+
return err
126+
}
127+
req.Annotations = append(req.Annotations, &v1.ProfileAnnotation{
128+
Key: annotation.ProfileAnnotationKeySampled,
129+
Value: string(a),
116130
})
117131
return nil
118132
}

pkg/distributor/model/push_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/stretchr/testify/require"
99

1010
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
11+
"github.com/grafana/pyroscope/pkg/distributor/annotation"
1112
"github.com/grafana/pyroscope/pkg/distributor/ingestlimits"
1213
)
1314

@@ -57,7 +58,7 @@ func TestMarkThrottledTenant(t *testing.T) {
5758
},
5859
verify: func(t *testing.T, req *ProfileSeries) {
5960
require.Len(t, req.Annotations, 1)
60-
assert.Equal(t, ingestlimits.ProfileAnnotationKeyThrottled, req.Annotations[0].Key)
61+
assert.Equal(t, annotation.ProfileAnnotationKeyThrottled, req.Annotations[0].Key)
6162
assert.Contains(t, req.Annotations[0].Value, "\"periodLimitMb\":128")
6263
},
6364
},
@@ -109,7 +110,7 @@ func TestMarkThrottledUsageGroup(t *testing.T) {
109110
usageGroup: "group-1",
110111
verify: func(t *testing.T, req *ProfileSeries) {
111112
require.Len(t, req.Annotations, 1)
112-
assert.Equal(t, ingestlimits.ProfileAnnotationKeyThrottled, req.Annotations[0].Key)
113+
assert.Equal(t, annotation.ProfileAnnotationKeyThrottled, req.Annotations[0].Key)
113114
assert.Contains(t, req.Annotations[0].Value, "\"periodLimitMb\":64")
114115
assert.Contains(t, req.Annotations[0].Value, "\"usageGroup\":\"group-1\"")
115116
},

pkg/distributor/sampling/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,8 @@ type Config struct {
88
type UsageGroupSampling struct {
99
Probability float64 `yaml:"probability" json:"probability"`
1010
}
11+
12+
type Source struct {
13+
UsageGroup string `json:"usageGroup"`
14+
Probability float64 `json:"probability"`
15+
}

0 commit comments

Comments
 (0)