Skip to content

Commit 6a5cf7a

Browse files
committed
Log rate limits
1 parent 4ac9594 commit 6a5cf7a

File tree

3 files changed

+77
-34
lines changed

3 files changed

+77
-34
lines changed

pkg/storage/bucket/object_client_adapter.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,7 @@ func (o *ObjectClientAdapter) GetObject(ctx context.Context, objectKey string) (
130130

131131
// Wrap with rate-limited reader if query has a bandwidth limit
132132
if limiter := getQueryRateLimiter(ctx); limiter != nil {
133-
reader = &rateLimitedReader{
134-
ReadCloser: reader,
135-
limiter: limiter,
136-
ctx: ctx,
137-
}
133+
reader = newRateLimitedReader(ctx, reader, o.logger)
138134
}
139135

140136
return reader, size, err
@@ -148,11 +144,7 @@ func (o *ObjectClientAdapter) GetObjectRange(ctx context.Context, objectKey stri
148144

149145
// Wrap with rate-limited reader if query has a bandwidth limit
150146
if limiter := getQueryRateLimiter(ctx); limiter != nil {
151-
reader = &rateLimitedReader{
152-
ReadCloser: reader,
153-
limiter: limiter,
154-
ctx: ctx,
155-
}
147+
reader = newRateLimitedReader(ctx, reader, o.logger)
156148
}
157149

158150
return reader, nil

pkg/storage/bucket/rate_limited_reader.go

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,16 @@ package bucket
22

33
import (
44
"context"
5+
"fmt"
56
"io"
7+
"time"
68

9+
"github.com/dustin/go-humanize"
10+
"github.com/go-kit/log"
11+
"github.com/go-kit/log/level"
712
"golang.org/x/time/rate"
13+
14+
util_log "github.com/grafana/loki/v3/pkg/util/log"
815
)
916

1017
// minReadSize is the minimum chunk size for reading data.
@@ -57,6 +64,16 @@ type rateLimitedReader struct {
5764
io.ReadCloser
5865
limiter *rate.Limiter
5966
ctx context.Context
67+
logger log.Logger
68+
}
69+
70+
func newRateLimitedReader(ctx context.Context, readCloser io.ReadCloser, logger log.Logger) *rateLimitedReader {
71+
return &rateLimitedReader{
72+
ReadCloser: readCloser,
73+
limiter: getQueryRateLimiter(ctx),
74+
ctx: ctx,
75+
logger: logger,
76+
}
6077
}
6178

6279
// Read reads data from the underlying reader while respecting the rate limit.
@@ -74,26 +91,68 @@ func (r *rateLimitedReader) Read(p []byte) (n int, err error) {
7491
return r.ReadCloser.Read(p)
7592
}
7693

77-
// Read in batches, waiting for rate limiter approval before each batch
94+
// Cap the read size to the minimum read size and the burst
95+
minReadSize := min(minReadSize, burst)
7896
totalRead := 0
79-
for totalRead < len(p) {
80-
remaining := len(p) - totalRead
81-
// Use minReadSize as the read size, but don't exceed burst or remaining buffer
82-
readSize := minReadSize
83-
if readSize > remaining {
84-
readSize = remaining
85-
}
86-
if readSize > burst {
87-
readSize = burst
97+
98+
// Other logging stats
99+
var (
100+
rateLimitedCount int
101+
totalWaitTime time.Duration
102+
maxWaitTime time.Duration
103+
)
104+
105+
// Defer logging to ensure it happens on all exit paths
106+
defer func() {
107+
if rateLimitedCount > 0 && r.logger != nil {
108+
logger := util_log.WithContext(r.ctx, r.logger)
109+
level.Debug(logger).Log(
110+
"msg", "query rate limited during bucket read operation",
111+
"rateLimitedCount", rateLimitedCount,
112+
"totalWaitTime", totalWaitTime.String(),
113+
"maxWaitTime", maxWaitTime.String(),
114+
"readBufferSize", humanize.Bytes(uint64(len(p))),
115+
"readBytes", humanize.Bytes(uint64(totalRead)),
116+
"remainingBytes", humanize.Bytes(uint64(len(p)-totalRead)),
117+
"err", err,
118+
)
88119
}
120+
}()
89121

90-
// Wait for rate limiter approval before reading this batch
91-
if err := r.limiter.WaitN(r.ctx, readSize); err != nil {
122+
for totalRead < len(p) {
123+
remaining := len(p) - totalRead
124+
// Use minReadSize but cap to the remaining
125+
readSize := min(minReadSize, remaining)
126+
127+
// Reserve rate limiter tokens for this batch read
128+
reservation := r.limiter.ReserveN(time.Now(), readSize)
129+
if !reservation.OK() {
130+
// Reservation failed (e.g., readSize > burst), return error
131+
// This should not happen in practice since we cap readSize to burst
92132
if totalRead > 0 {
93-
// Return what we've read so far
94133
return totalRead, nil
95134
}
96-
return 0, err
135+
return 0, fmt.Errorf("rate limited reader: reservation failed. readSize (%d) > burst: (%d)?", readSize, burst)
136+
}
137+
138+
// If we need to wait, record the logging stats and wait for the delay
139+
if delay := reservation.Delay(); delay > 0 {
140+
rateLimitedCount++
141+
totalWaitTime += delay
142+
maxWaitTime = max(maxWaitTime, delay)
143+
144+
timer := time.NewTimer(delay)
145+
select {
146+
case <-timer.C:
147+
// Delay completed, proceed
148+
case <-r.ctx.Done():
149+
timer.Stop()
150+
reservation.Cancel()
151+
if totalRead > 0 {
152+
return totalRead, nil
153+
}
154+
return 0, r.ctx.Err()
155+
}
97156
}
98157

99158
// Read from underlying reader (up to the approved read size)

pkg/storage/bucket/rate_limited_reader_test.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,7 @@ func TestRateLimitedReader(t *testing.T) {
7373
// Create rate-limited reader if limiter exists
7474
var rlReader io.ReadCloser = reader
7575
if limiter := getQueryRateLimiter(ctx); limiter != nil {
76-
rlReader = &rateLimitedReader{
77-
ReadCloser: reader,
78-
limiter: limiter,
79-
ctx: ctx,
80-
}
76+
rlReader = newRateLimitedReader(ctx, reader, nil)
8177
}
8278

8379
// Read data and measure time
@@ -120,11 +116,7 @@ func TestRateLimitedReader_MultipleReadersShareLimiter(t *testing.T) {
120116
readers := make([]io.ReadCloser, numReaders)
121117
for i := 0; i < numReaders; i++ {
122118
reader := io.NopCloser(bytes.NewReader(data))
123-
readers[i] = &rateLimitedReader{
124-
ReadCloser: reader,
125-
limiter: limiter,
126-
ctx: ctx,
127-
}
119+
readers[i] = newRateLimitedReader(ctx, reader, nil)
128120
}
129121

130122
// Read from all readers concurrently

0 commit comments

Comments
 (0)