Skip to content

Commit 24a049d

Browse files
authored
chore: parallelize mapping symbolization to improve query performance (#4594)
* chore: parallelize mapping symbolization to improve query performance
1 parent dee59f4 commit 24a049d

File tree

2 files changed

+146
-22
lines changed

2 files changed

+146
-22
lines changed

pkg/symbolizer/symbolizer.go

Lines changed: 105 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/go-kit/log"
1616
"github.com/go-kit/log/level"
1717
"github.com/prometheus/client_golang/prometheus"
18+
"golang.org/x/sync/errgroup"
1819

1920
googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
2021
"github.com/grafana/pyroscope/lidia"
@@ -26,21 +27,35 @@ type DebuginfodClient interface {
2627
}
2728

2829
type Config struct {
29-
DebuginfodURL string `yaml:"debuginfod_url"`
30+
DebuginfodURL string `yaml:"debuginfod_url"`
31+
MaxDebuginfodConcurrency int `yaml:"max_debuginfod_concurrency" category:"advanced"`
3032
}
3133

3234
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
3335
f.StringVar(&cfg.DebuginfodURL, "symbolizer.debuginfod-url", "https://debuginfod.elfutils.org", "URL of the debuginfod server")
36+
f.IntVar(&cfg.MaxDebuginfodConcurrency, "symbolizer.max-debuginfod-concurrency", 10, "Maximum number of concurrent symbolization requests to debuginfod server.")
37+
}
38+
39+
func (cfg *Config) Validate() error {
40+
if cfg.MaxDebuginfodConcurrency < 1 {
41+
return fmt.Errorf("invalid max-debuginfod-concurrency value, must be positive")
42+
}
43+
return nil
3444
}
3545

3646
type Symbolizer struct {
3747
logger log.Logger
3848
client DebuginfodClient
3949
bucket objstore.Bucket
4050
metrics *metrics
51+
cfg Config
4152
}
4253

4354
func New(logger log.Logger, cfg Config, reg prometheus.Registerer, bucket objstore.Bucket) (*Symbolizer, error) {
55+
if err := cfg.Validate(); err != nil {
56+
return nil, err
57+
}
58+
4459
metrics := newMetrics(reg)
4560

4661
client, err := NewDebuginfodClient(logger, cfg.DebuginfodURL, metrics)
@@ -53,6 +68,7 @@ func New(logger log.Logger, cfg Config, reg prometheus.Registerer, bucket objsto
5368
client: client,
5469
bucket: bucket,
5570
metrics: metrics,
71+
cfg: cfg,
5672
}, nil
5773
}
5874

@@ -84,36 +100,103 @@ func (s *Symbolizer) SymbolizePprof(ctx context.Context, profile *googlev1.Profi
84100
stringMap[str] = int64(i)
85101
}
86102

87-
var allSymbolizedLocs []symbolizedLocation
103+
allSymbolizedLocs, err := s.symbolizeMappingsConcurrently(ctx, profile, locationsByMapping)
104+
if err != nil {
105+
return fmt.Errorf("symbolizing mappings: %w", err)
106+
}
107+
108+
s.updateAllSymbolsInProfile(profile, allSymbolizedLocs, stringMap)
109+
110+
return nil
111+
}
112+
113+
// symbolizeMappingsConcurrently symbolizes multiple mappings concurrently with a concurrency limit.
114+
func (s *Symbolizer) symbolizeMappingsConcurrently(
115+
ctx context.Context,
116+
profile *googlev1.Profile,
117+
locationsByMapping map[uint64][]*googlev1.Location,
118+
) ([]symbolizedLocation, error) {
119+
maxConcurrency := s.cfg.MaxDebuginfodConcurrency
120+
if maxConcurrency <= 0 {
121+
maxConcurrency = 10
122+
}
123+
124+
type mappingJob struct {
125+
mappingID uint64
126+
locations []*googlev1.Location
127+
}
88128

129+
type mappingResult struct {
130+
mappingID uint64
131+
locations []symbolizedLocation
132+
}
133+
134+
totalLocs := 0
135+
jobs := make(chan mappingJob, len(locationsByMapping))
89136
for mappingID, locations := range locationsByMapping {
90-
mapping := profile.Mapping[mappingID-1]
137+
totalLocs += len(locations)
138+
jobs <- mappingJob{mappingID: mappingID, locations: locations}
139+
}
140+
close(jobs)
91141

92-
binaryName, err := s.extractBinaryName(profile, mapping)
93-
if err != nil {
94-
return fmt.Errorf("extract binary name: %w", err)
95-
}
142+
// Process jobs concurrently with errgroup for proper error handling
143+
g, ctx := errgroup.WithContext(ctx)
144+
g.SetLimit(maxConcurrency)
96145

97-
buildID, err := s.extractBuildID(profile, mapping)
98-
if err != nil {
99-
return fmt.Errorf("extract build ID: %w", err)
100-
}
146+
// Results channel with buffer to avoid blocking jobs
147+
results := make(chan mappingResult, len(locationsByMapping))
101148

102-
req := s.createSymbolizationRequest(binaryName, buildID, locations)
103-
s.symbolize(ctx, &req)
149+
for job := range jobs {
150+
job := job
151+
g.Go(func() error {
152+
mapping := profile.Mapping[job.mappingID-1]
104153

105-
for i, loc := range locations {
106-
allSymbolizedLocs = append(allSymbolizedLocs, symbolizedLocation{
107-
loc: loc,
108-
symLoc: req.locations[i],
109-
mapping: mapping,
110-
})
111-
}
154+
binaryName, err := s.extractBinaryName(profile, mapping)
155+
if err != nil {
156+
return fmt.Errorf("extract binary name for mapping %d: %w", job.mappingID, err)
157+
}
158+
159+
buildID, err := s.extractBuildID(profile, mapping)
160+
if err != nil {
161+
return fmt.Errorf("extract build ID for mapping %d: %w", job.mappingID, err)
162+
}
163+
164+
req := s.createSymbolizationRequest(binaryName, buildID, job.locations)
165+
s.symbolize(ctx, &req)
166+
167+
// Collect symbolized locations for this mapping
168+
symbolizedLocs := make([]symbolizedLocation, len(job.locations))
169+
for i, loc := range job.locations {
170+
symbolizedLocs[i] = symbolizedLocation{
171+
loc: loc,
172+
symLoc: req.locations[i],
173+
mapping: mapping,
174+
}
175+
}
176+
177+
select {
178+
case results <- mappingResult{mappingID: job.mappingID, locations: symbolizedLocs}:
179+
case <-ctx.Done():
180+
return ctx.Err()
181+
}
182+
183+
return nil
184+
})
112185
}
113186

114-
s.updateAllSymbolsInProfile(profile, allSymbolizedLocs, stringMap)
187+
err := g.Wait()
188+
close(results)
115189

116-
return nil
190+
if err != nil {
191+
return nil, err
192+
}
193+
194+
allSymbolizedLocs := make([]symbolizedLocation, 0, totalLocs)
195+
for result := range results {
196+
allSymbolizedLocs = append(allSymbolizedLocs, result.locations...)
197+
}
198+
199+
return allSymbolizedLocs, nil
117200
}
118201

119202
// groupLocationsByMapping groups locations by their mapping ID

pkg/symbolizer/symbolizer_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ func TestSymbolizePprof(t *testing.T) {
245245
client: mockClient,
246246
bucket: mockBucket,
247247
metrics: newMetrics(nil),
248+
cfg: Config{MaxDebuginfodConcurrency: 1},
248249
}
249250

250251
err := s.SymbolizePprof(context.Background(), tt.profile)
@@ -284,6 +285,7 @@ func TestSymbolizationKeepsSequentialFunctionIDs(t *testing.T) {
284285
client: mockClient,
285286
bucket: mockBucket,
286287
metrics: newMetrics(nil),
288+
cfg: Config{MaxDebuginfodConcurrency: 1},
287289
}
288290

289291
err := s.SymbolizePprof(context.Background(), profile)
@@ -320,6 +322,7 @@ func TestSymbolizationWithLidiaData(t *testing.T) {
320322
client: nil,
321323
bucket: mockBucket,
322324
metrics: newMetrics(prometheus.NewRegistry()),
325+
cfg: Config{MaxDebuginfodConcurrency: 1},
323326
}
324327

325328
req := &request{
@@ -364,6 +367,7 @@ func TestSymbolizeWithObjectStore(t *testing.T) {
364367
client: mockClient,
365368
bucket: mockBucket,
366369
metrics: newMetrics(prometheus.NewRegistry()),
370+
cfg: Config{MaxDebuginfodConcurrency: 1},
367371
}
368372

369373
elfTestFile := openTestFile(t)
@@ -621,3 +625,40 @@ func createRequest(t *testing.T, buildID string, address uint64) *request {
621625
},
622626
}
623627
}
628+
629+
func TestConfigValidate(t *testing.T) {
630+
tests := []struct {
631+
name string
632+
setup func(cfg *Config)
633+
wantErr bool
634+
}{
635+
{
636+
name: "valid config with positive concurrency",
637+
setup: func(cfg *Config) { cfg.MaxDebuginfodConcurrency = 10 },
638+
wantErr: false,
639+
},
640+
{
641+
name: "invalid config with zero concurrency",
642+
setup: func(cfg *Config) { cfg.MaxDebuginfodConcurrency = 0 },
643+
wantErr: true,
644+
},
645+
{
646+
name: "invalid config with negative concurrency",
647+
setup: func(cfg *Config) { cfg.MaxDebuginfodConcurrency = -1 },
648+
wantErr: true,
649+
},
650+
}
651+
652+
for _, tt := range tests {
653+
t.Run(tt.name, func(t *testing.T) {
654+
cfg := Config{}
655+
tt.setup(&cfg)
656+
err := cfg.Validate()
657+
if tt.wantErr {
658+
require.Error(t, err)
659+
} else {
660+
require.NoError(t, err)
661+
}
662+
})
663+
}
664+
}

0 commit comments

Comments
 (0)