Skip to content

Commit 8a04cb3

Browse files
committed
Add QueryTracker
Signed-off-by: Justin Jung <[email protected]>
1 parent 58f469d commit 8a04cb3

File tree

2 files changed

+325
-0
lines changed

2 files changed

+325
-0
lines changed

pkg/util/tracker/query.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package tracker
2+
3+
import (
4+
"container/heap"
5+
"sync"
6+
"time"
7+
)
8+
9+
const (
10+
ttl = 2 * time.Second
11+
slidingWindowSize = 3 * time.Second
12+
maxTrackedQueries = 100
13+
)
14+
15+
type QueryTracker struct {
16+
heap *queryHeap
17+
lookup map[string]*queryItem
18+
mu sync.Mutex
19+
}
20+
21+
type queryItem struct {
22+
requestID string
23+
bytesRate *slidingWindow
24+
lastUpdate time.Time
25+
index int
26+
}
27+
28+
type queryHeap []*queryItem
29+
30+
func (h queryHeap) Len() int { return len(h) }
31+
func (h queryHeap) Less(i, j int) bool { return h[i].bytesRate.rate() < h[j].bytesRate.rate() }
32+
func (h queryHeap) Swap(i, j int) {
33+
h[i], h[j] = h[j], h[i]
34+
h[i].index = i
35+
h[j].index = j
36+
}
37+
38+
func (h *queryHeap) Push(x interface{}) {
39+
item := x.(*queryItem)
40+
item.index = len(*h)
41+
*h = append(*h, item)
42+
}
43+
44+
func (h *queryHeap) Pop() interface{} {
45+
old := *h
46+
n := len(old)
47+
item := old[n-1]
48+
item.index = -1
49+
*h = old[0 : n-1]
50+
return item
51+
}
52+
53+
func NewQueryTracker() *QueryTracker {
54+
h := &queryHeap{}
55+
heap.Init(h)
56+
tracker := &QueryTracker{
57+
heap: h,
58+
lookup: make(map[string]*queryItem),
59+
}
60+
61+
go tracker.loop()
62+
return tracker
63+
}
64+
65+
func (q *QueryTracker) loop() {
66+
ticker := time.NewTicker(time.Second)
67+
defer ticker.Stop()
68+
69+
for range ticker.C {
70+
q.cleanup()
71+
}
72+
}
73+
74+
func (q *QueryTracker) cleanup() {
75+
now := time.Now()
76+
stale := now.Add(-ttl)
77+
78+
q.mu.Lock()
79+
defer q.mu.Unlock()
80+
81+
var toRemove []*queryItem
82+
for _, item := range *q.heap {
83+
if item.lastUpdate.Before(stale) {
84+
toRemove = append(toRemove, item)
85+
}
86+
}
87+
88+
for _, item := range toRemove {
89+
heap.Remove(q.heap, item.index)
90+
delete(q.lookup, item.requestID)
91+
}
92+
}
93+
94+
func (q *QueryTracker) Add(requestID string, bytes uint64) {
95+
q.mu.Lock()
96+
defer q.mu.Unlock()
97+
98+
now := time.Now()
99+
item, exists := q.lookup[requestID]
100+
101+
if !exists {
102+
item = &queryItem{
103+
requestID: requestID,
104+
bytesRate: newSlidingWindow(slidingWindowSize),
105+
}
106+
item.bytesRate.add(bytes)
107+
item.lastUpdate = now
108+
109+
if q.heap.Len() < maxTrackedQueries {
110+
heap.Push(q.heap, item)
111+
q.lookup[requestID] = item
112+
} else {
113+
minItem := (*q.heap)[0]
114+
if item.bytesRate.rate() > minItem.bytesRate.rate() {
115+
delete(q.lookup, minItem.requestID)
116+
heap.Pop(q.heap)
117+
heap.Push(q.heap, item)
118+
q.lookup[requestID] = item
119+
}
120+
}
121+
} else {
122+
item.bytesRate.add(bytes)
123+
item.lastUpdate = now
124+
heap.Fix(q.heap, item.index)
125+
}
126+
}
127+
128+
func (q *QueryTracker) GetWorstQuery() (string, float64) {
129+
q.mu.Lock()
130+
defer q.mu.Unlock()
131+
132+
if q.heap.Len() == 0 {
133+
return "", 0
134+
}
135+
136+
var worstQueryID string
137+
var worstRate float64
138+
139+
for _, item := range *q.heap {
140+
rate := item.bytesRate.rate()
141+
if rate > worstRate {
142+
worstRate = rate
143+
worstQueryID = item.requestID
144+
}
145+
}
146+
147+
return worstQueryID, worstRate
148+
}
149+
150+
type slidingWindow struct {
151+
buckets []uint64
152+
windowSize time.Duration
153+
lastUpdate time.Time
154+
currentIdx int
155+
mu sync.Mutex
156+
}
157+
158+
func newSlidingWindow(windowSize time.Duration) *slidingWindow {
159+
seconds := int(windowSize.Seconds())
160+
return &slidingWindow{
161+
buckets: make([]uint64, seconds),
162+
windowSize: windowSize,
163+
lastUpdate: time.Now().Truncate(time.Second),
164+
}
165+
}
166+
167+
func (swr *slidingWindow) add(bytes uint64) {
168+
swr.mu.Lock()
169+
defer swr.mu.Unlock()
170+
171+
now := time.Now().Truncate(time.Second)
172+
173+
// Calculate how many seconds have passed since last update
174+
secondsDrift := int(now.Sub(swr.lastUpdate).Seconds())
175+
if secondsDrift > 0 {
176+
// Clear old buckets
177+
for i := 0; i < min(secondsDrift, len(swr.buckets)); i++ {
178+
nextIdx := (swr.currentIdx + i) % len(swr.buckets)
179+
swr.buckets[nextIdx] = 0
180+
}
181+
// Update current index
182+
swr.currentIdx = (swr.currentIdx + secondsDrift) % len(swr.buckets)
183+
swr.lastUpdate = now
184+
}
185+
186+
// Add bytes to current bucket
187+
swr.buckets[swr.currentIdx] += bytes
188+
}
189+
190+
func (swr *slidingWindow) rate() float64 {
191+
swr.mu.Lock()
192+
defer swr.mu.Unlock()
193+
194+
var totalBytes uint64
195+
for _, bytes := range swr.buckets {
196+
totalBytes += bytes
197+
}
198+
199+
return float64(totalBytes) / swr.windowSize.Seconds()
200+
}

pkg/util/tracker/query_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package tracker
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestQueryTracker(t *testing.T) {
12+
queryTracker := NewQueryTracker()
13+
queryTracker.Add("r-1", 3000)
14+
queryTracker.Add("r-2", 300)
15+
requestID, rate := queryTracker.GetWorstQuery()
16+
17+
assert.Equal(t, "r-1", requestID)
18+
assert.Equal(t, float64(1000), rate)
19+
}
20+
21+
func TestQueryTracker_MaxQueries(t *testing.T) {
22+
queryTracker := NewQueryTracker()
23+
24+
// Add more than maxTrackedQueries with low rates
25+
for i := 0; i < 200; i++ {
26+
queryTracker.Add(fmt.Sprintf("low-%d", i), 100)
27+
}
28+
29+
// Add high-rate query
30+
queryTracker.Add("high-rate", 5000)
31+
32+
// Should track the high-rate query and evict low-rate ones
33+
requestID, rate := queryTracker.GetWorstQuery()
34+
assert.Equal(t, "high-rate", requestID)
35+
assert.True(t, rate > 1000)
36+
37+
// Verify heap size is bounded
38+
queryTracker.mu.Lock()
39+
heapSize := queryTracker.heap.Len()
40+
queryTracker.mu.Unlock()
41+
assert.LessOrEqual(t, heapSize, maxTrackedQueries)
42+
}
43+
44+
func TestQueryTracker_TTL(t *testing.T) {
45+
queryTracker := NewQueryTracker()
46+
for i := 0; i < 200; i++ {
47+
queryTracker.Add(fmt.Sprintf("low-%d", i), 300)
48+
}
49+
requestID, rate := queryTracker.GetWorstQuery()
50+
assert.Equal(t, float64(100), rate)
51+
52+
time.Sleep(4 * time.Second) // expire all items
53+
requestID, rate = queryTracker.GetWorstQuery()
54+
55+
assert.Equal(t, "", requestID)
56+
assert.Equal(t, float64(0), rate)
57+
}
58+
59+
func TestQueryTracker_EmptyHeap(t *testing.T) {
60+
queryTracker := NewQueryTracker()
61+
requestID, rate := queryTracker.GetWorstQuery()
62+
assert.Equal(t, "", requestID)
63+
assert.Equal(t, float64(0), rate)
64+
}
65+
66+
func TestQueryTracker_UpdateExistingQuery(t *testing.T) {
67+
queryTracker := NewQueryTracker()
68+
queryTracker.Add("r-1", 1000)
69+
initialRate := queryTracker.lookup["r-1"].bytesRate.rate()
70+
71+
// Add more bytes to same query
72+
queryTracker.Add("r-1", 2000)
73+
updatedRate := queryTracker.lookup["r-1"].bytesRate.rate()
74+
75+
assert.True(t, updatedRate > initialRate)
76+
requestID, rate := queryTracker.GetWorstQuery()
77+
assert.Equal(t, "r-1", requestID)
78+
assert.Equal(t, float64(1000), rate) // 3000 bytes / 3 seconds
79+
}
80+
81+
func TestQueryTracker_HeapEviction(t *testing.T) {
82+
queryTracker := NewQueryTracker()
83+
84+
// Fill heap to capacity with low-rate queries
85+
for i := 0; i < maxTrackedQueries; i++ {
86+
queryTracker.Add(fmt.Sprintf("low-%d", i), 100)
87+
}
88+
89+
// Verify heap is at capacity
90+
assert.Equal(t, maxTrackedQueries, queryTracker.heap.Len())
91+
92+
// Add a high-rate query that should evict the lowest
93+
queryTracker.Add("high-rate", 10000)
94+
95+
// Heap should still be at capacity
96+
assert.Equal(t, maxTrackedQueries, queryTracker.heap.Len())
97+
98+
// High-rate query should be tracked
99+
_, exists := queryTracker.lookup["high-rate"]
100+
assert.True(t, exists)
101+
102+
// Should be the worst query
103+
requestID, rate := queryTracker.GetWorstQuery()
104+
assert.Equal(t, "high-rate", requestID)
105+
assert.True(t, rate > 3000)
106+
}
107+
108+
func TestQueryTracker_NoEvictionForLowRate(t *testing.T) {
109+
queryTracker := NewQueryTracker()
110+
111+
// Fill heap with medium-rate queries
112+
for i := 0; i < maxTrackedQueries; i++ {
113+
queryTracker.Add(fmt.Sprintf("med-%d", i), 1000)
114+
}
115+
116+
// Try to add a lower-rate query
117+
queryTracker.Add("low-rate", 50)
118+
119+
// Low-rate query should not be tracked
120+
_, exists := queryTracker.lookup["low-rate"]
121+
assert.False(t, exists)
122+
123+
// Heap should still be at capacity
124+
assert.Equal(t, maxTrackedQueries, queryTracker.heap.Len())
125+
}

0 commit comments

Comments
 (0)