Skip to content

Commit 512b67e

Browse files
committed
optimize histogram reservoir performance
1 parent 6990fa0 commit 512b67e

File tree

4 files changed

+45
-26
lines changed

4 files changed

+45
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
4040
- `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/log` synchronously de-duplicates the passed attributes instead of delegating it to the returned `LoggerOption`. (#7266)
4141
- `Distinct` in `go.opentelemetry.io/otel/attribute` is no longer guaranteed to uniquely identify an attribute set. Collisions between `Distinct` values for different Sets are possible with extremely high cardinality (billions of series per instrument), but are highly unlikely. (#7175)
4242
- The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421)
43+
- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 10x. (#7443)
4344

4445
<!-- Released section -->
4546
<!-- Don't change this section unless doing release -->

sdk/metric/exemplar/fixed_size_reservoir_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,15 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
4545
}
4646

4747
var sum float64
48-
for _, m := range r.measurements {
49-
sum += m.Value.Float64()
48+
for _, val := range r.measurements {
49+
loaded := val.Load()
50+
if loaded == nil {
51+
continue
52+
}
53+
m := loaded.(*measurement)
54+
if m != nil {
55+
sum += m.Value.Float64()
56+
}
5057
}
5158
mean := sum / float64(sampleSize)
5259

sdk/metric/exemplar/histogram_reservoir.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,5 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
6969
panic("unknown value type")
7070
}
7171

72-
idx := sort.SearchFloat64s(r.bounds, n)
73-
m := newMeasurement(ctx, t, v, a)
74-
75-
r.mu.Lock()
76-
defer r.mu.Unlock()
77-
r.store(idx, m)
72+
r.store(sort.SearchFloat64s(r.bounds, n), newMeasurement(ctx, t, v, a))
7873
}

sdk/metric/exemplar/storage.go

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
66
import (
77
"context"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"go.opentelemetry.io/otel/attribute"
@@ -19,15 +20,18 @@ type storage struct {
1920
//
2021
// This does not use []metricdata.Exemplar because it potentially would
2122
// require an allocation for trace and span IDs in the hot path of Offer.
22-
measurements []measurement
23+
measurements []atomic.Value
2324
}
2425

2526
func newStorage(n int) *storage {
26-
return &storage{measurements: make([]measurement, n)}
27+
return &storage{measurements: make([]atomic.Value, n)}
2728
}
2829

29-
func (r *storage) store(idx int, m measurement) {
30-
r.measurements[idx] = m
30+
func (r *storage) store(idx int, m *measurement) {
31+
old := r.measurements[idx].Swap(m)
32+
if old != nil {
33+
mPool.Put(old)
34+
}
3135
}
3236

3337
// Collect returns all the held exemplars.
@@ -38,7 +42,12 @@ func (r *storage) Collect(dest *[]Exemplar) {
3842
defer r.mu.Unlock()
3943
*dest = reset(*dest, len(r.measurements), len(r.measurements))
4044
var n int
41-
for _, m := range r.measurements {
45+
for _, val := range r.measurements {
46+
loaded := val.Load()
47+
if loaded == nil {
48+
continue
49+
}
50+
m := loaded.(*measurement)
4251
if !m.valid {
4352
continue
4453
}
@@ -58,20 +67,26 @@ type measurement struct {
5867
// Value is the value of the measurement.
5968
Value Value
6069
// SpanContext is the SpanContext active when a measurement was made.
61-
SpanContext trace.SpanContext
70+
Ctx context.Context
6271

6372
valid bool
6473
}
6574

75+
var mPool = sync.Pool{
76+
New: func() any {
77+
return &measurement{}
78+
},
79+
}
80+
6681
// newMeasurement returns a new non-empty Measurement.
67-
func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement {
68-
return measurement{
69-
FilteredAttributes: droppedAttr,
70-
Time: ts,
71-
Value: v,
72-
SpanContext: trace.SpanContextFromContext(ctx),
73-
valid: true,
74-
}
82+
func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) *measurement {
83+
m := mPool.Get().(*measurement)
84+
m.FilteredAttributes = droppedAttr
85+
m.Time = ts
86+
m.Value = v
87+
m.Ctx = ctx
88+
m.valid = true
89+
return m
7590
}
7691

7792
// exemplar returns m as an [Exemplar].
@@ -80,15 +95,16 @@ func (m measurement) exemplar(dest *Exemplar) {
8095
dest.Time = m.Time
8196
dest.Value = m.Value
8297

83-
if m.SpanContext.HasTraceID() {
84-
traceID := m.SpanContext.TraceID()
98+
sc := trace.SpanContextFromContext(m.Ctx)
99+
if sc.HasTraceID() {
100+
traceID := sc.TraceID()
85101
dest.TraceID = traceID[:]
86102
} else {
87103
dest.TraceID = dest.TraceID[:0]
88104
}
89105

90-
if m.SpanContext.HasSpanID() {
91-
spanID := m.SpanContext.SpanID()
106+
if sc.HasSpanID() {
107+
spanID := sc.SpanID()
92108
dest.SpanID = spanID[:]
93109
} else {
94110
dest.SpanID = dest.SpanID[:0]

0 commit comments

Comments
 (0)