diff --git a/pkg/distributor/annotation/annotation.go b/pkg/distributor/annotation/annotation.go new file mode 100644 index 0000000000..b90848a145 --- /dev/null +++ b/pkg/distributor/annotation/annotation.go @@ -0,0 +1,10 @@ +package annotation + +const ( + ProfileAnnotationKeyThrottled = "pyroscope.ingest.throttled" + ProfileAnnotationKeySampled = "pyroscope.ingest.sampled" +) + +type ProfileAnnotation struct { + Body interface{} `json:"body"` +} diff --git a/pkg/distributor/annotation/sampling.go b/pkg/distributor/annotation/sampling.go new file mode 100644 index 0000000000..d6d8f0efb2 --- /dev/null +++ b/pkg/distributor/annotation/sampling.go @@ -0,0 +1,20 @@ +package annotation + +import ( + "encoding/json" + + "github.com/grafana/pyroscope/pkg/distributor/sampling" +) + +type SampledAnnotation struct { + Source *sampling.Source `json:"source"` +} + +func CreateProfileAnnotation(source *sampling.Source) ([]byte, error) { + a := &ProfileAnnotation{ + Body: SampledAnnotation{ + Source: source, + }, + } + return json.Marshal(a) +} diff --git a/pkg/distributor/ingestlimits/annotation.go b/pkg/distributor/annotation/throttling.go similarity index 66% rename from pkg/distributor/ingestlimits/annotation.go rename to pkg/distributor/annotation/throttling.go index d12f8fc887..7f0d7f1601 100644 --- a/pkg/distributor/ingestlimits/annotation.go +++ b/pkg/distributor/annotation/throttling.go @@ -1,12 +1,23 @@ -package ingestlimits +package annotation import ( "encoding/json" "fmt" + + "github.com/grafana/pyroscope/pkg/distributor/ingestlimits" ) -func CreateTenantAnnotation(c *Config) ([]byte, error) { - annotation := &ProfileAnnotation{ +type ThrottledAnnotation struct { + PeriodType string `json:"periodType"` + PeriodLimitMb int `json:"periodLimitMb"` + LimitResetTime int64 `json:"limitResetTime"` + SamplingPeriodSec int `json:"samplingPeriodSec"` + SamplingRequests int `json:"samplingRequests"` + UsageGroup string `json:"usageGroup"` +} + +func CreateTenantAnnotation(c *ingestlimits.Config) ([]byte, error) { + a := &ProfileAnnotation{ Body: ThrottledAnnotation{ PeriodType: c.PeriodType, PeriodLimitMb: c.PeriodLimitMb, @@ -15,15 +26,15 @@ func CreateTenantAnnotation(c *Config) ([]byte, error) { SamplingRequests: c.Sampling.NumRequests, }, } - return json.Marshal(annotation) + return json.Marshal(a) } -func CreateUsageGroupAnnotation(c *Config, usageGroup string) ([]byte, error) { +func CreateUsageGroupAnnotation(c *ingestlimits.Config, usageGroup string) ([]byte, error) { l, ok := c.UsageGroups[usageGroup] if !ok { return nil, fmt.Errorf("usageGroup %s not found", usageGroup) } - annotation := &ProfileAnnotation{ + a := &ProfileAnnotation{ Body: ThrottledAnnotation{ PeriodType: c.PeriodType, PeriodLimitMb: l.PeriodLimitMb, @@ -33,22 +44,5 @@ func CreateUsageGroupAnnotation(c *Config, usageGroup string) ([]byte, error) { UsageGroup: usageGroup, }, } - return json.Marshal(annotation) -} - -type ProfileAnnotation struct { - Body interface{} `json:"body"` -} - -const ( - ProfileAnnotationKeyThrottled = "pyroscope.ingest.throttled" -) - -type ThrottledAnnotation struct { - PeriodType string `json:"periodType"` - PeriodLimitMb int `json:"periodLimitMb"` - LimitResetTime int64 `json:"limitResetTime"` - SamplingPeriodSec int `json:"samplingPeriodSec"` - SamplingRequests int `yaml:"samplingRequests"` - UsageGroup string `json:"usageGroup"` + return json.Marshal(a) } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 27a55b707c..5fa782020d 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -382,17 +382,24 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof return err } - if sample, usageGroup := d.shouldSample(tenantID, groups.Names()); !sample { + willSample, samplingSource := d.shouldSample(tenantID, groups.Names()) + if !willSample { level.Debug(logger).Log( "msg", "skipping push request due to sampling", "tenant", tenantID, - "usage_group", usageGroup, + "usage_group", samplingSource.UsageGroup, + "probability", samplingSource.Probability, ) validation.DiscardedProfiles.WithLabelValues(string(validation.SkippedBySamplingRules), tenantID).Add(float64(req.TotalProfiles)) validation.DiscardedBytes.WithLabelValues(string(validation.SkippedBySamplingRules), tenantID).Add(float64(req.TotalBytesUncompressed)) groups.CountDiscardedBytes(string(validation.SkippedBySamplingRules), req.TotalBytesUncompressed) return nil } + if samplingSource != nil { + if err := req.MarkSampledRequest(samplingSource); err != nil { + return err + } + } profLanguage := d.GetProfileLanguage(req) defer func() { // defer to allow re-calculate the size of the profile after normalization @@ -919,7 +926,7 @@ func (d *Distributor) checkUsageGroupsIngestLimit(req *distributormodel.ProfileS } // shouldSample returns true if the profile should be injected and optionally the usage group that was responsible for the decision. -func (d *Distributor) shouldSample(tenantID string, groupsInRequest []validation.UsageGroupMatchName) (bool, *validation.UsageGroupMatchName) { +func (d *Distributor) shouldSample(tenantID string, groupsInRequest []validation.UsageGroupMatchName) (bool, *sampling.Source) { l := d.limits.DistributorSampling(tenantID) if l == nil { return true, nil @@ -950,7 +957,12 @@ func (d *Distributor) shouldSample(tenantID string, groupsInRequest []validation return true, nil } - return rand.Float64() <= samplingProbability, match + source := &sampling.Source{ + UsageGroup: match.ResolvedName, + Probability: samplingProbability, + } + + return rand.Float64() <= samplingProbability, source } type profileTracker struct { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 92f1d167be..9f3be65454 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -38,6 +38,7 @@ import ( typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" connectapi "github.com/grafana/pyroscope/pkg/api/connect" "github.com/grafana/pyroscope/pkg/clientpool" + "github.com/grafana/pyroscope/pkg/distributor/annotation" "github.com/grafana/pyroscope/pkg/distributor/ingestlimits" distributormodel "github.com/grafana/pyroscope/pkg/distributor/model" "github.com/grafana/pyroscope/pkg/distributor/sampling" @@ -2062,7 +2063,7 @@ func TestPush_Aggregation(t *testing.T) { for i, req := range ingesterClient.requests { for _, series := range req.Series { require.Lenf(t, series.Annotations, 1, "failed request %d", i) - assert.Equal(t, ingestlimits.ProfileAnnotationKeyThrottled, series.Annotations[0].Key) + assert.Equal(t, annotation.ProfileAnnotationKeyThrottled, series.Annotations[0].Key) assert.Contains(t, series.Annotations[0].Value, "\"periodLimitMb\":128") } } @@ -2344,7 +2345,7 @@ func TestDistributor_shouldSample(t *testing.T) { groups []validation.UsageGroupMatchName samplingConfig *sampling.Config expected bool - expectedMatch *validation.UsageGroupMatchName + expectedMatch *sampling.Source }{ { name: "no sampling config - should accept", @@ -2373,9 +2374,9 @@ func TestDistributor_shouldSample(t *testing.T) { }, }, expected: true, - expectedMatch: &validation.UsageGroupMatchName{ - ConfiguredName: "group1", - ResolvedName: "group1", + expectedMatch: &sampling.Source{ + UsageGroup: "group1", + Probability: 1.0, }, }, { @@ -2388,9 +2389,9 @@ func TestDistributor_shouldSample(t *testing.T) { }, }, expected: true, - expectedMatch: &validation.UsageGroupMatchName{ - ConfiguredName: "configured-name", - ResolvedName: "resolved-name", + expectedMatch: &sampling.Source{ + UsageGroup: "resolved-name", + Probability: 1.0, }, }, { @@ -2403,9 +2404,9 @@ func TestDistributor_shouldSample(t *testing.T) { }, }, expected: false, - expectedMatch: &validation.UsageGroupMatchName{ - ConfiguredName: "group1", - ResolvedName: "group1", + expectedMatch: &sampling.Source{ + UsageGroup: "group1", + Probability: 0.0, }, }, { @@ -2422,9 +2423,9 @@ func TestDistributor_shouldSample(t *testing.T) { }, }, expected: false, - expectedMatch: &validation.UsageGroupMatchName{ - ConfiguredName: "group2", - ResolvedName: "group2", + expectedMatch: &sampling.Source{ + UsageGroup: "group2", + Probability: 0.0, }, }, { @@ -2441,9 +2442,9 @@ func TestDistributor_shouldSample(t *testing.T) { }, }, expected: true, - expectedMatch: &validation.UsageGroupMatchName{ - ConfiguredName: "test_service", - ResolvedName: "test_service", + expectedMatch: &sampling.Source{ + UsageGroup: "test_service", + Probability: 1.0, }, }, { @@ -2460,9 +2461,9 @@ func TestDistributor_shouldSample(t *testing.T) { }, }, expected: true, - expectedMatch: &validation.UsageGroupMatchName{ - ConfiguredName: "test_service", - ResolvedName: "test_service", + expectedMatch: &sampling.Source{ + UsageGroup: "test_service", + Probability: 1.0, }, }, } diff --git a/pkg/distributor/model/push.go b/pkg/distributor/model/push.go index 1b1a63efae..4511ef086d 100644 --- a/pkg/distributor/model/push.go +++ b/pkg/distributor/model/push.go @@ -2,7 +2,9 @@ package model import ( v1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/grafana/pyroscope/pkg/distributor/annotation" "github.com/grafana/pyroscope/pkg/distributor/ingestlimits" + "github.com/grafana/pyroscope/pkg/distributor/sampling" phlaremodel "github.com/grafana/pyroscope/pkg/model" "github.com/grafana/pyroscope/pkg/pprof" ) @@ -45,8 +47,8 @@ type ProfileSeries struct { DiscardedBytesRelabeling int64 } -func (p *ProfileSeries) GetLanguage() string { - spyName := phlaremodel.Labels(p.Labels).Get(phlaremodel.LabelNamePyroscopeSpy) +func (req *ProfileSeries) GetLanguage() string { + spyName := phlaremodel.Labels(req.Labels).Get(phlaremodel.LabelNamePyroscopeSpy) if spyName != "" { lang := getProfileLanguageFromSpy(spyName) if lang != "" { @@ -94,25 +96,37 @@ func (req *ProfileSeries) Clone() *ProfileSeries { } func (req *ProfileSeries) MarkThrottledTenant(l *ingestlimits.Config) error { - annotation, err := ingestlimits.CreateTenantAnnotation(l) + a, err := annotation.CreateTenantAnnotation(l) if err != nil { return err } req.Annotations = append(req.Annotations, &v1.ProfileAnnotation{ - Key: ingestlimits.ProfileAnnotationKeyThrottled, - Value: string(annotation), + Key: annotation.ProfileAnnotationKeyThrottled, + Value: string(a), }) return nil } func (req *ProfileSeries) MarkThrottledUsageGroup(l *ingestlimits.Config, usageGroup string) error { - annotation, err := ingestlimits.CreateUsageGroupAnnotation(l, usageGroup) + a, err := annotation.CreateUsageGroupAnnotation(l, usageGroup) if err != nil { return err } req.Annotations = append(req.Annotations, &v1.ProfileAnnotation{ - Key: ingestlimits.ProfileAnnotationKeyThrottled, - Value: string(annotation), + Key: annotation.ProfileAnnotationKeyThrottled, + Value: string(a), + }) + return nil +} + +func (req *ProfileSeries) MarkSampledRequest(source *sampling.Source) error { + a, err := annotation.CreateProfileAnnotation(source) + if err != nil { + return err + } + req.Annotations = append(req.Annotations, &v1.ProfileAnnotation{ + Key: annotation.ProfileAnnotationKeySampled, + Value: string(a), }) return nil } diff --git a/pkg/distributor/model/push_test.go b/pkg/distributor/model/push_test.go index 96e7ea2fc3..b8bffb7b3e 100644 --- a/pkg/distributor/model/push_test.go +++ b/pkg/distributor/model/push_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/grafana/pyroscope/pkg/distributor/annotation" "github.com/grafana/pyroscope/pkg/distributor/ingestlimits" ) @@ -57,7 +58,7 @@ func TestMarkThrottledTenant(t *testing.T) { }, verify: func(t *testing.T, req *ProfileSeries) { require.Len(t, req.Annotations, 1) - assert.Equal(t, ingestlimits.ProfileAnnotationKeyThrottled, req.Annotations[0].Key) + assert.Equal(t, annotation.ProfileAnnotationKeyThrottled, req.Annotations[0].Key) assert.Contains(t, req.Annotations[0].Value, "\"periodLimitMb\":128") }, }, @@ -109,7 +110,7 @@ func TestMarkThrottledUsageGroup(t *testing.T) { usageGroup: "group-1", verify: func(t *testing.T, req *ProfileSeries) { require.Len(t, req.Annotations, 1) - assert.Equal(t, ingestlimits.ProfileAnnotationKeyThrottled, req.Annotations[0].Key) + assert.Equal(t, annotation.ProfileAnnotationKeyThrottled, req.Annotations[0].Key) assert.Contains(t, req.Annotations[0].Value, "\"periodLimitMb\":64") assert.Contains(t, req.Annotations[0].Value, "\"usageGroup\":\"group-1\"") }, diff --git a/pkg/distributor/sampling/config.go b/pkg/distributor/sampling/config.go index 61dab030c0..e9822c5017 100644 --- a/pkg/distributor/sampling/config.go +++ b/pkg/distributor/sampling/config.go @@ -8,3 +8,8 @@ type Config struct { type UsageGroupSampling struct { Probability float64 `yaml:"probability" json:"probability"` } + +type Source struct { + UsageGroup string `json:"usageGroup"` + Probability float64 `json:"probability"` +}