Skip to content
Open
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Release Notes.

### Bug Fixes

- Use `topic` instead of `session_id` as the Prometheus label on liaison `queue_sub` chunk-ordering counters to avoid unbounded metric cardinality.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue 3 — docs and Grafana panel were ripped out by the revert doc commit (1a7d987), but the PR description still claims they ship.

After upgrade, operators with dashboards or alerts grouped by session_id will silently see empty results, and the orphaned high-cardinality *_session_id series will linger in Prometheus storage until retention expires.

Either restore the observability.md and grafana-cluster.json updates, or extend this changelog entry with an explicit upgrade note (e.g. "operators must update dashboards keyed on session_id; existing per-session series will age out with retention").

Copy link
Copy Markdown
Contributor Author

@OmCheeLin OmCheeLin Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked docs/operation/grafana-cluster.json and searched for session_id and those metric names (like out_of_order_chunks_received, chunks_buffered, buffer_timeouts, etc.), but couldn’t find anything.

In docs/operation/observability.md, there’s only one mention of banyandb_queue_sub_total_msg_sent_err (line 69). There’s no session_id, nor any of the chunk-ordering metrics introduced in this change.

That’s why I reverted the docs for now and plan to include the original metrics as well as the additional metrics introduced later in the final PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue 4 — no regression test added.

The PR template's Add a unit test to verify that the fix works checkbox is still unchecked. The existing fixtures in chunked_sync_test.go (e.g. test-session-1 at line 307, test-session-2 at line 364) already construct sessions with metadata.Topic set, so a small test that wires a mock counter into s.metrics, drives an out-of-order chunk, and asserts that the emitted label equals the topic (and that no per-session series is created) would be straightforward.

Without it, a future revert to req.SessionId would silently restore the cardinality blow-up this PR is meant to prevent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

- Fix flaky trace query filtering caused by non-deterministic sidx tag ordering and add consistency checks for integration query cases.
- Fix index-mode measure queries returning documents outside the requested time range when a widened segment overlaps the query window.
- MCP: Add validation for properties and harden the mcp server.
Expand Down
16 changes: 10 additions & 6 deletions banyand/queue/sub/chunked_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *server) SyncPart(stream clusterv1.ChunkedSyncService_SyncPartServer) er
if currentSession.partCtx != nil {
if currentSession.partCtx.Handler != nil {
if finishErr := currentSession.partCtx.Handler.FinishSync(); finishErr != nil {
s.updateChunkOrderMetrics("finish_sync_err", currentSession.sessionID)
s.updateChunkOrderMetrics("finish_sync_err", currentSession.metadata.Topic)
s.log.Error().Err(finishErr).Str("session_id", currentSession.sessionID).Msg("failed to finish sync for previous session")
}
if closeErr := currentSession.partCtx.Close(); closeErr != nil {
Expand Down Expand Up @@ -253,6 +253,10 @@ func (s *server) processChunkSequential(stream clusterv1.ChunkedSyncService_Sync

func (s *server) processChunkWithReordering(stream clusterv1.ChunkedSyncService_SyncPartServer, session *syncSession, req *clusterv1.SyncPartRequest) error {
buffer := session.chunkBuffer
// must check buffer timeout before refreshing lastActivity, otherwise it will never timeout.
if err := s.checkBufferTimeout(session); err != nil {
return err
}
buffer.lastActivity = time.Now()

if req.ChunkIndex == buffer.expectedIndex {
Expand All @@ -266,7 +270,7 @@ func (s *server) processChunkWithReordering(stream clusterv1.ChunkedSyncService_

if req.ChunkIndex > buffer.expectedIndex {
gap := req.ChunkIndex - buffer.expectedIndex
s.updateChunkOrderMetrics("out_of_order_received", req.SessionId)
s.updateChunkOrderMetrics("out_of_order_received", session.metadata.Topic)

if gap > s.maxChunkGapSize {
errMsg := fmt.Sprintf("chunk gap too large: expected %d, got %d (gap: %d > max: %d)",
Expand All @@ -277,7 +281,7 @@ func (s *server) processChunkWithReordering(stream clusterv1.ChunkedSyncService_
Uint32("gap", gap).
Uint32("max_gap", s.maxChunkGapSize).
Msg("chunk gap too large, rejecting")
s.updateChunkOrderMetrics("gap_too_large", req.SessionId)
s.updateChunkOrderMetrics("gap_too_large", session.metadata.Topic)
return s.sendResponse(stream, req, clusterv1.SyncStatus_SYNC_STATUS_CHUNK_OUT_OF_ORDER, errMsg, nil)
}

Expand All @@ -288,7 +292,7 @@ func (s *server) processChunkWithReordering(stream clusterv1.ChunkedSyncService_
Uint32("buffer_size", uint32(len(buffer.chunks))).
Uint32("max_buffer_size", buffer.maxBufferSize).
Msg("chunk buffer full, rejecting chunk")
s.updateChunkOrderMetrics("buffer_full", req.SessionId)
s.updateChunkOrderMetrics("buffer_full", session.metadata.Topic)
return s.sendResponse(stream, req, clusterv1.SyncStatus_SYNC_STATUS_CHUNK_OUT_OF_ORDER, errMsg, nil)
}

Expand All @@ -301,7 +305,7 @@ func (s *server) processChunkWithReordering(stream clusterv1.ChunkedSyncService_
Uint32("buffered_chunks", uint32(len(buffer.chunks))).
Msg("buffered out-of-order chunk")
}
s.updateChunkOrderMetrics("chunk_buffered", req.SessionId)
s.updateChunkOrderMetrics("chunk_buffered", session.metadata.Topic)

return s.sendResponse(stream, req, clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED,
fmt.Sprintf("chunk %d buffered (waiting for %d)", req.ChunkIndex, buffer.expectedIndex), nil)
Expand Down Expand Up @@ -444,7 +448,7 @@ func (s *server) checkBufferTimeout(session *syncSession) error {
missing = append(missing, i)
}
}

s.updateChunkOrderMetrics("buffer_timeout", session.metadata.Topic)
return fmt.Errorf("buffer timeout: missing chunks %v after %v",
missing, session.chunkBuffer.bufferTimeout)
}
Expand Down
129 changes: 129 additions & 0 deletions banyand/queue/sub/chunked_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"hash/crc32"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -120,6 +121,9 @@ func TestChunkedSyncOutOfOrderHandling(t *testing.T) {
enableChunkReordering: tt.enableChunkReordering,
maxChunkBufferSize: tt.maxChunkBufferSize,
maxChunkGapSize: tt.maxChunkGapSize,
// Avoid accidental immediate buffer-timeout when bufferTimeout is zero.
// Some tests buffer out-of-order chunks and expect no error.
chunkBufferTimeout: time.Hour,
}

// Register a mock handler
Expand Down Expand Up @@ -189,6 +193,127 @@ func TestChunkedSyncOutOfOrderHandling(t *testing.T) {
}
}

type capturingCounter struct {
labelValues [][]string
mu sync.Mutex
}

func (c *capturingCounter) Inc(_ float64, labelValues ...string) {
c.mu.Lock()
defer c.mu.Unlock()

cp := make([]string, len(labelValues))
copy(cp, labelValues)
c.labelValues = append(c.labelValues, cp)
}

func (c *capturingCounter) Delete(_ ...string) bool {
return true
}

func (c *capturingCounter) uniqueFirstLabelValues() map[string]struct{} {
c.mu.Lock()
defer c.mu.Unlock()

m := make(map[string]struct{})
for _, lv := range c.labelValues {
if len(lv) > 0 {
m[lv[0]] = struct{}{}
}
}
return m
}

func TestChunkOrderingMetricsAreLabeledByTopic_NotSessionID(t *testing.T) {
// enable reordering, otherwise the chunk-ordering metrics will not be triggered.
s := &server{
log: logger.GetLogger("test-server-metrics-label"),
chunkedSyncHandlers: make(map[bus.Topic]queue.ChunkedSyncHandler),
enableChunkReordering: true,
maxChunkBufferSize: 10,
maxChunkGapSize: 5,
}

// handler: avoid "no handler registered" in processExpectedChunk.
mockHandler := &MockChunkedSyncHandler{}
s.chunkedSyncHandlers[data.TopicStreamPartSync] = mockHandler

// metrics: this test will trigger at least two events:
// - out_of_order_received
// - chunk_buffered
// so must put both counters, otherwise nil.Inc will panic.
outOfOrder := &capturingCounter{}
buffered := &capturingCounter{}
s.metrics = &metrics{
outOfOrderChunksReceived: outOfOrder,
chunksBuffered: buffered,
// other counters will not be triggered in this test, leave them nil
}

topic := data.TopicStreamPartSync.String()

drive := func(sessionID string) {
mockStream := &MockSyncPartStream{}
session := &syncSession{
sessionID: sessionID,
startTime: time.Now(),
chunksReceived: 0,
partsProgress: make(map[int]*partProgress),
metadata: &clusterv1.SyncMetadata{
Group: "test-group",
Topic: topic,
},
}

// send chunk 0 (establish buffer.expectedIndex=1)
req0 := &clusterv1.SyncPartRequest{
SessionId: sessionID,
ChunkIndex: 0,
ChunkData: []byte("chunk-0"),
ChunkChecksum: fmt.Sprintf("%x", crc32.ChecksumIEEE([]byte("chunk-0"))),
PartsInfo: []*clusterv1.PartInfo{
{Id: 1, Files: []*clusterv1.FileInfo{{Name: "f", Offset: 0, Size: 7}}},
},
}
require.NoError(t, s.processChunk(mockStream, session, req0))

// send chunk 2 (out-of-order: expected 1 got 2),
// will trigger out_of_order_received + chunk_buffered.
req2 := &clusterv1.SyncPartRequest{
SessionId: sessionID,
ChunkIndex: 2,
ChunkData: []byte("chunk-2"),
ChunkChecksum: fmt.Sprintf("%x", crc32.ChecksumIEEE([]byte("chunk-2"))),
PartsInfo: []*clusterv1.PartInfo{
{Id: 2, Files: []*clusterv1.FileInfo{{Name: "f", Offset: 0, Size: 7}}},
},
}
require.NoError(t, s.processChunk(mockStream, session, req2))
}

drive("test-session-A")
drive("test-session-B")

// assert: labelValues[0] must be topic; and unique label must be only one (topic)
uniqOut := outOfOrder.uniqueFirstLabelValues()
uniqBuf := buffered.uniqueFirstLabelValues()

assert.Equal(t, 1, len(uniqOut))
assert.Equal(t, 1, len(uniqBuf))

_, okOut := uniqOut[topic]
_, okBuf := uniqBuf[topic]
assert.True(t, okOut, "out_of_order_received label must be topic")
assert.True(t, okBuf, "chunk_buffered label must be topic")

// assert: never should have session_id as label
_, bad1 := uniqOut["test-session-A"]
_, bad2 := uniqOut["test-session-B"]
_, bad3 := uniqBuf["test-session-A"]
_, bad4 := uniqBuf["test-session-B"]
assert.False(t, bad1 || bad2 || bad3 || bad4, "metrics must not be labeled by session_id")
}

// MockChunkedSyncHandler implements queue.ChunkedSyncHandler for testing.
type MockChunkedSyncHandler struct{}

Expand Down Expand Up @@ -264,6 +389,10 @@ func TestChunkedSyncBufferTimeout(t *testing.T) {
session := &syncSession{
sessionID: "test-session-timeout",
startTime: time.Now(),
metadata: &clusterv1.SyncMetadata{
Topic: data.TopicStreamPartSync.String(),
Group: "test-group",
},
chunkBuffer: &chunkBuffer{
chunks: make(map[uint32]*clusterv1.SyncPartRequest),
expectedIndex: 1, // Waiting for chunk 1
Expand Down
26 changes: 13 additions & 13 deletions banyand/queue/sub/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,32 +435,32 @@ func newMetrics(factory observability.Factory) *metrics {
totalMsgSentErr: factory.NewCounter("total_msg_sent_err", "topic"),

// Chunk ordering metrics
outOfOrderChunksReceived: factory.NewCounter("out_of_order_chunks_received", "session_id"),
chunksBuffered: factory.NewCounter("chunks_buffered", "session_id"),
bufferTimeouts: factory.NewCounter("buffer_timeouts", "session_id"),
largeGapsRejected: factory.NewCounter("large_gaps_rejected", "session_id"),
bufferCapacityExceeded: factory.NewCounter("buffer_capacity_exceeded", "session_id"),
finishSyncErr: factory.NewCounter("finish_sync_err", "session_id"),
outOfOrderChunksReceived: factory.NewCounter("out_of_order_chunks_received", "topic"),
chunksBuffered: factory.NewCounter("chunks_buffered", "topic"),
bufferTimeouts: factory.NewCounter("buffer_timeouts", "topic"),
largeGapsRejected: factory.NewCounter("large_gaps_rejected", "topic"),
bufferCapacityExceeded: factory.NewCounter("buffer_capacity_exceeded", "topic"),
finishSyncErr: factory.NewCounter("finish_sync_err", "topic"),
Comment on lines 437 to +443
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No unit test was added to verify that chunk-ordering counters are labeled by topic (and that session_id is not used), which is the core regression this PR is addressing. Please add a unit test that asserts the emitted metric series contain the topic label only, ideally by recording a chunk-ordering event and inspecting the metrics registry output.

Copilot uses AI. Check for mistakes.
}
}

// updateChunkOrderMetrics updates chunk ordering metrics.
func (s *server) updateChunkOrderMetrics(event, sessionID string) {
func (s *server) updateChunkOrderMetrics(event, topic string) {
if s.metrics == nil {
return // Skip metrics if not initialized (e.g., during tests)
}
switch event {
case "out_of_order_received":
s.metrics.outOfOrderChunksReceived.Inc(1, sessionID)
s.metrics.outOfOrderChunksReceived.Inc(1, topic)
case "chunk_buffered":
s.metrics.chunksBuffered.Inc(1, sessionID)
s.metrics.chunksBuffered.Inc(1, topic)
case "buffer_timeout":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue 2 — "buffer_timeout" event is wired here but never fired anywhere.

$ grep -rn '"buffer_timeout"' --include='*.go' .
banyand/queue/sub/server.go:457   ← only this case statement

checkBufferTimeout (chunked_sync.go:434) is the natural producer but returns the error without recording the metric. Worse, checkBufferTimeout itself only has a test caller (chunked_sync_test.go:280); no production code path appears to invoke it.

Since this PR is exactly about cleaning up queue_sub chunk-ordering observability, please pick one:

  • Wire it up: add s.updateChunkOrderMetrics("buffer_timeout", session.metadata.Topic) inside checkBufferTimeout, and invoke checkBufferTimeout from a real code path (e.g. processChunkWithReordering or a periodic janitor).
  • Delete the dead code: remove the bufferTimeouts field, the factory.NewCounter("buffer_timeouts", …) registration, and this case branch.

Leaving it half-wired ships a buffer_timeouts metric to operators that is silently always zero.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

s.metrics.bufferTimeouts.Inc(1, sessionID)
s.metrics.bufferTimeouts.Inc(1, topic)
case "gap_too_large":
Comment on lines 455 to 459
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer_timeouts counter can be incremented via the buffer_timeout event, but there are no call sites that emit this event (and checkBufferTimeout is never invoked in production code). As a result, banyandb_queue_sub_buffer_timeouts will never be emitted/incremented; either wire buffer-timeout detection to call updateChunkOrderMetrics("buffer_timeout", topic) or remove the unused metric/event to avoid misleading operators.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue already existed before and was not introduced by this PR. I will address and improve it in a follow-up PR.

s.metrics.largeGapsRejected.Inc(1, sessionID)
s.metrics.largeGapsRejected.Inc(1, topic)
case "buffer_full":
s.metrics.bufferCapacityExceeded.Inc(1, sessionID)
s.metrics.bufferCapacityExceeded.Inc(1, topic)
case "finish_sync_err":
s.metrics.finishSyncErr.Inc(1, sessionID)
s.metrics.finishSyncErr.Inc(1, topic)
}
}