Skip to content

Commit aad9961

Browse files
committed
fix reservoir locking
1 parent 97aab7d commit aad9961

File tree

4 files changed

+15
-5
lines changed

4 files changed

+15
-5
lines changed

sdk/metric/exemplar/fixed_size_reservoir.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"math"
99
"math/rand/v2"
10+
"sync"
1011
"time"
1112

1213
"go.opentelemetry.io/otel/attribute"
@@ -37,6 +38,7 @@ var _ Reservoir = &FixedSizeReservoir{}
3738
type FixedSizeReservoir struct {
3839
reservoir.ConcurrentSafe
3940
*storage
41+
mu sync.Mutex
4042

4143
// count is the number of measurement seen.
4244
count int64
@@ -192,6 +194,8 @@ func (r *FixedSizeReservoir) advance() {
192194
//
193195
// The Reservoir state is preserved after this call.
194196
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
197+
r.mu.Lock()
198+
defer r.mu.Unlock()
195199
r.storage.Collect(dest)
196200
// Call reset here even though it will reset r.count and restart the random
197201
// number series. This will persist any old exemplars as long as no new

sdk/metric/exemplar/histogram_reservoir.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"slices"
99
"sort"
10+
"sync"
1011
"time"
1112

1213
"go.opentelemetry.io/otel/attribute"
@@ -42,6 +43,7 @@ var _ Reservoir = &HistogramReservoir{}
4243
type HistogramReservoir struct {
4344
reservoir.ConcurrentSafe
4445
*storage
46+
mu sync.Mutex
4547

4648
// bounds are bucket bounds in ascending order.
4749
bounds []float64
@@ -76,3 +78,12 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
7678
defer r.mu.Unlock()
7779
r.store(idx, m)
7880
}
81+
82+
// Collect returns all the held exemplars.
83+
//
84+
// The Reservoir state is preserved after this call.
85+
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
86+
r.mu.Lock()
87+
defer r.mu.Unlock()
88+
r.storage.Collect(dest)
89+
}

sdk/metric/exemplar/storage.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
55

66
import (
77
"context"
8-
"sync"
98
"time"
109

1110
"go.opentelemetry.io/otel/attribute"
@@ -14,7 +13,6 @@ import (
1413

1514
// storage is an exemplar storage for [Reservoir] implementations.
1615
type storage struct {
17-
mu sync.Mutex
1816
// measurements are the measurements sampled.
1917
//
2018
// This does not use []metricdata.Exemplar because it potentially would
@@ -34,8 +32,6 @@ func (r *storage) store(idx int, m measurement) {
3432
//
3533
// The Reservoir state is preserved after this call.
3634
func (r *storage) Collect(dest *[]Exemplar) {
37-
r.mu.Lock()
38-
defer r.mu.Unlock()
3935
*dest = reset(*dest, len(r.measurements), len(r.measurements))
4036
var n int
4137
for _, m := range r.measurements {

sdk/metric/internal/aggregate/filtered_reservoir.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ type FilteredExemplarReservoir[N int64 | float64] interface {
2929

3030
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
3131
type filteredExemplarReservoir[N int64 | float64] struct {
32-
mu sync.Mutex
3332
filter exemplar.Filter
3433
reservoir exemplar.Reservoir
3534
// The exemplar.Reservoir is not required to be concurrent safe, but

0 commit comments

Comments
 (0)