feat: add queue latency instrumentation to principal#852
feat: add queue latency instrumentation to principal#852drewbailey wants to merge 7 commits intoargoproj-labs:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds an Changes
Sequence DiagramsequenceDiagram
participant Producer as Producer\n(Client)
participant Queue as Queue\n(internal/queue)
participant Writer as EventWriter\n(internal/event)
participant Principal as Principal\n(eventstream)
participant Metrics as Prometheus\n(internal/metrics)
Producer->>Queue: Enqueue Event
Queue->>Queue: SetEnqueuedAt(event)
Queue->>Writer: Add(event)
Writer->>Writer: writerAddedAt = now()
Writer->>Principal: send event (Send/Retry)
Principal->>Metrics: ObserveEventWriterDwell(time since writerAddedAt)
Principal->>Producer: Deliver Event
Producer->>Principal: ACK
Principal->>Metrics: ObserveAckRoundtrip(time since SentAt)
Principal->>Writer: Remove ACK / update writer state
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsGit: Failed to clone repository. Please run the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
principal/apis/eventstream/eventstream.go (1)
385-392:⚠️ Potential issue | 🟠 Major
SetMetricsis only called for new EventWriters, leaving reconnected agents withoutEventWriterDwellobservations.When an agent reconnects, the existing EventWriter is reused via
UpdateTarget()(line 387), butSetMetrics()is not called on this path. SinceprincipalMetricsis initialized tonilinNewEventWriter(), reconnected agents will silently skipEventWriterDwellobservations.🐛 Proposed fix
eventWriter := s.eventWriters.Get(c.agentName) if eventWriter != nil { eventWriter.UpdateTarget(subs) + eventWriter.SetMetrics(s.metrics) } else { eventWriter = event.NewEventWriter(subs) eventWriter.SetMetrics(s.metrics) s.eventWriters.Add(c.agentName, eventWriter) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@principal/apis/eventstream/eventstream.go` around lines 385 - 392, The existing EventWriter retrieved by s.eventWriters.Get(c.agentName) isn't having SetMetrics called, so reconnected agents miss EventWriterDwell observations; after retrieving eventWriter (before calling UpdateTarget) call eventWriter.SetMetrics(s.metrics) so both new (created via event.NewEventWriter) and reused writers get the metrics set (NewEventWriter currently initializes principal metrics to nil, so ensure SetMetrics is invoked on the reuse path as well).
🧹 Nitpick comments (1)
internal/event/event_writer.go (1)
501-521: Coalesced events retain the originalwriterAddedAt, which may not be the intended behavior.When events of the same type are coalesced in
eventQueue.add(), the tail event'sevent,backoff, andretryAfterare updated, butwriterAddedAtis not updated. This means theEventWriterDwellmetric will measure the time from when the first event of that type was added, not the latest.If the intent is to measure dwell time from when the current event data was added, consider also updating
writerAddedAt:♻️ Optional fix to update writerAddedAt on coalesce
if ev.event.Type() == tail.event.Type() { tail.event = ev.event tail.backoff = ev.backoff tail.retryAfter = ev.retryAfter + tail.writerAddedAt = ev.writerAddedAt tail.mu.Unlock() return }However, if the current behavior is intentional (measuring total wait time for a resource regardless of updates), please add a comment clarifying this design choice.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/event_writer.go` around lines 501 - 521, In eventQueue.add() the coalescing branch updates tail.event, tail.backoff and tail.retryAfter but not tail.writerAddedAt, causing EventWriterDwell to measure from the original enqueue time; to fix update the tail's writerAddedAt when coalescing (e.g. set tail.writerAddedAt = ev.writerAddedAt or the current time) inside the if ev.event.Type() == tail.event.Type() block before unlocking, or if the original semantics are intentional add a clarifying comment in eventQueue.add() explaining that dwell should measure from the first enqueue rather than the most recent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@principal/apis/eventstream/eventstream.go`:
- Around line 385-392: The existing EventWriter retrieved by
s.eventWriters.Get(c.agentName) isn't having SetMetrics called, so reconnected
agents miss EventWriterDwell observations; after retrieving eventWriter (before
calling UpdateTarget) call eventWriter.SetMetrics(s.metrics) so both new
(created via event.NewEventWriter) and reused writers get the metrics set
(NewEventWriter currently initializes principal metrics to nil, so ensure
SetMetrics is invoked on the reuse path as well).
---
Nitpick comments:
In `@internal/event/event_writer.go`:
- Around line 501-521: In eventQueue.add() the coalescing branch updates
tail.event, tail.backoff and tail.retryAfter but not tail.writerAddedAt, causing
EventWriterDwell to measure from the original enqueue time; to fix update the
tail's writerAddedAt when coalescing (e.g. set tail.writerAddedAt =
ev.writerAddedAt or the current time) inside the if ev.event.Type() ==
tail.event.Type() block before unlocking, or if the original semantics are
intentional add a clarifying comment in eventQueue.add() explaining that dwell
should measure from the first enqueue rather than the most recent.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a49d5687-adbf-4e07-953b-a5ec2b4d2a43
📒 Files selected for processing (5)
internal/event/event.gointernal/event/event_writer.gointernal/metrics/metrics.gointernal/queue/queue.goprincipal/apis/eventstream/eventstream.go
|
Would it make sense to have these metrics for the agent too, in case it is in autonomous mode? |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@agent/connection.go`:
- Around line 149-156: The ack round-trip is inflated because event.SentAt()
reflects only the first-send timestamp (stamped in
internal/event/event_writer.go) and retries reuse the same CloudEvent; update
the send path so the SentAt header is refreshed on each actual send attempt
(e.g., modify retrySentEvent or the sending routine in event_writer.go to
set/overwrite SentAt with time.Now() before each transmit) so
event.SentAt(rawEvent) in connection.go yields the most recent wire-send time
and the metrics.ObserveAckRoundtrip measurement is correct.
In `@docs/operations/metrics.md`:
- Around line 16-29: The docs table for principal metrics is missing the
repository and GPG key metric series; update the Markdown table to include the
registered metrics from internal/metrics/metrics.go such as
principal_repositories_created, principal_repositories_updated,
principal_repositories_deleted (counter) and principal_gpg_keys_created,
principal_gpg_keys_updated, principal_gpg_keys_deleted (counter) with
appropriate Type, Labels (if any), and short descriptions mirroring the other
rows so the reference matches the actual metric registrations (e.g., align with
principal_applications_* and principal_app_projects_* entries).
- Around line 35-39: The fenced diagram block containing "principal send queue
→ event writer → wire send → agent → ACK received" and the labels
"SendQueueDwell", "EventWriterDwell", "AckRoundtrip" should include a language
hint to satisfy MD040; update the triple-backtick fence to specify "text" (i.e.,
```text) so the pipeline diagram fence is labeled as plain text.
In `@internal/event/event_writer.go`:
- Around line 439-445: The dwell histogram (using
ew.outboundMetrics.ObserveEventWriterDwell with eventMsg.writerAddedAt) is being
recorded before the send, which under-reports stalls if target.Send(...) fails;
move the ObserveEventWriterDwell call so it runs only after target.Send(...)
returns successfully (i.e., after the send is accepted and after any code that
adds the event to sentEvents), preserving the existing isFireAndForget guard and
the check for eventMsg.writerAddedAt.IsZero() and nil ew.outboundMetrics; ensure
you still hold or snapshot ew.outboundMetrics as done now to avoid races.
- Around line 163-167: When adding a new eventMessage in eventQueue.add (eq.add)
the writerAddedAt timestamp is left as the older value when the queue node is
coalesced; update eventQueue.add so that when it replaces the tail's
event/backoff/retryAfter it also updates the node's writerAddedAt to the current
time (the same now used when creating the new eventMessage). Locate the code
that swaps the existing node's event/backoff/retryAfter (references:
eventQueue.add, eq.add, eventMessage.writerAddedAt) and assign writerAddedAt =
now whenever the node is updated so *_event_writer_dwell_seconds reflects the
coalesced event's actual enqueue time.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dba72327-c261-4098-9714-e7c0f3cfc3c5
📒 Files selected for processing (7)
agent/connection.goagent/connection_test.godocs/operations/metrics.mdinternal/event/event.gointernal/event/event_writer.gointernal/event/event_writer_test.gointernal/metrics/metrics.go
✅ Files skipped from review due to trivial changes (2)
- agent/connection_test.go
- internal/event/event.go
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
internal/event/event_writer.go (1)
79-80: ClarifywriterAddedAtsemantics.This field is refreshed during tail coalescing at Line 532, so “immutable once set” is misleading now. Reword the comment to describe the enqueue time of the currently queued event instead.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/event_writer.go` around lines 79 - 80, The comment for the EventWriter field writerAddedAt is misleading because the value is updated during tail coalescing; update the comment to say it records the enqueue time of the currently queued event (i.e., when the event was added to the writer's queue and may be refreshed during tail coalescing) rather than claiming it is immutable once set. Reference the EventWriter struct and the writerAddedAt field in the comment so readers know this timestamp represents the current queued event's enqueue time.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/event/event_writer.go`:
- Line 352: The histogram misses long stalls when an event's initial Send fails
because dwell is only observed in the retry-completion path; update the path
that marks an event as sent (SetSentAt called for sentMsg.event) to also emit
ObserveEventWriterDwell so first-success-after-retry is recorded. Specifically,
when SetSentAt(sentMsg.event) is executed for an entry still tracked in
sentEvents, compute the dwell as now minus the event's original
enqueue/first-attempt timestamp (use the same timestamp used elsewhere to
measure dwell) and call ObserveEventWriterDwell with the same labels used in the
retry-completion code; ensure you use sentMsg (and sentMsg.event) and the same
label set so metrics remain consistent.
---
Nitpick comments:
In `@internal/event/event_writer.go`:
- Around line 79-80: The comment for the EventWriter field writerAddedAt is
misleading because the value is updated during tail coalescing; update the
comment to say it records the enqueue time of the currently queued event (i.e.,
when the event was added to the writer's queue and may be refreshed during tail
coalescing) rather than claiming it is immutable once set. Reference the
EventWriter struct and the writerAddedAt field in the comment so readers know
this timestamp represents the current queued event's enqueue time.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 55b69cf2-9bd8-4b46-a698-958031ff3e08
📒 Files selected for processing (5)
docs/operations/metrics.mdgo.modinternal/event/event_writer.gointernal/event/event_writer_test.goprincipal/apis/eventstream/eventstream.go
🚧 Files skipped from review as they are similar to previous changes (2)
- principal/apis/eventstream/eventstream.go
- internal/event/event_writer_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
agent/connection_test.go (1)
87-87: Isolate metrics instance per subtest to reduce state coupling.Line 87 shares one
AgentMetricsacross both subtests. It works now, but it makes cases order-coupled and less future-proof if subtests become parallelized.♻️ Suggested test-isolation refactor
func TestAgentHopMetricsObservations(t *testing.T) { - agentMetrics := metrics.NewAgentMetrics() - t.Run("sender observes send queue dwell", func(t *testing.T) { + agentMetrics := metrics.NewAgentMetrics() a, _ := newAgent(t) a.metrics = agentMetrics a.emitter = event.NewEventSource("test") @@ - before := histogramSampleCount(t, a.metrics.SendQueueDwell, event.TargetApplication.String()) + before := histogramSampleCount(t, a.metrics.SendQueueDwell, event.TargetApplication.String()) @@ t.Run("receiver observes ack roundtrip by original resource type", func(t *testing.T) { + agentMetrics := metrics.NewAgentMetrics() a, _ := newAgent(t) a.metrics = agentMetrics a.emitter = event.NewEventSource("test")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agent/connection_test.go` at line 87, Shared AgentMetrics instance causes state coupling between subtests; instantiate a fresh metrics object inside each subtest to isolate state. Move the call to metrics.NewAgentMetrics() from the outer scope into each t.Run subtest body (or create a local agentMetrics variable at the start of each subtest) so tests don't share mutable state and can be parallelized safely; update any references in those subtests to use the local agentMetrics variable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@agent/connection_test.go`:
- Line 87: Shared AgentMetrics instance causes state coupling between subtests;
instantiate a fresh metrics object inside each subtest to isolate state. Move
the call to metrics.NewAgentMetrics() from the outer scope into each t.Run
subtest body (or create a local agentMetrics variable at the start of each
subtest) so tests don't share mutable state and can be parallelized safely;
update any references in those subtests to use the local agentMetrics variable.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #852 +/- ##
==========================================
+ Coverage 46.32% 46.59% +0.27%
==========================================
Files 122 122
Lines 17397 17536 +139
==========================================
+ Hits 8059 8171 +112
- Misses 8596 8619 +23
- Partials 742 746 +4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/event/event_writer_test.go (1)
554-555: Remove unnecessary sleeps in metrics-only assertions.At Line 554, Line 571, Line 587, and Line 615, the sleeps are not needed for current assertions (
len(...)andseconds >= 0). Removing them will reduce test runtime and flake risk without changing intent.♻️ Suggested cleanup
- time.Sleep(5 * time.Millisecond) - evSender.sendEvent(createResourceID(app1.ObjectMeta))- time.Sleep(5 * time.Millisecond) - evSender.sendEvent(createResourceID(app1.ObjectMeta))- time.Sleep(5 * time.Millisecond) - evSender.sendEvent(resID)- time.Sleep(5 * time.Millisecond) - evSender.sendEvent(resID)Also applies to: 571-572, 587-588, 615-616
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/event_writer_test.go` around lines 554 - 555, Remove the unnecessary time.Sleep(5 * time.Millisecond) calls used solely for metrics-only assertions in internal/event/event_writer_test.go: locate the test cases calling time.Sleep before asserting len(...) of metrics slices or checking seconds >= 0 and delete those sleep lines (they are the only use of time.Sleep in those blocks), leaving the assertions as-is; run the tests to ensure no other timing dependency remains and remove any now-unused imports if applicable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/event/event_writer_test.go`:
- Around line 554-555: Remove the unnecessary time.Sleep(5 * time.Millisecond)
calls used solely for metrics-only assertions in
internal/event/event_writer_test.go: locate the test cases calling time.Sleep
before asserting len(...) of metrics slices or checking seconds >= 0 and delete
those sleep lines (they are the only use of time.Sleep in those blocks), leaving
the assertions as-is; run the tests to ensure no other timing dependency remains
and remove any now-unused imports if applicable.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d55e22c6-41c1-439e-af9a-0cb7f38d60f1
📒 Files selected for processing (2)
internal/event/event_writer.gointernal/event/event_writer_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/event/event_writer.go
Add three new principal-side histograms to isolate which stage owns the observed ~10s propagation latency plateau: - principal_send_queue_dwell_seconds: time from SendQ.Add to EventWriter.Add - principal_event_writer_dwell_seconds: time from EventWriter.Add to wire send - principal_ack_roundtrip_seconds: time from wire send to ACK received Implementation: - enqueuedAt CloudEvents extension stamped in boundedQueue.Add (single site covers all enqueue call sites throughout the codebase) - writerAddedAt field on eventMessage set in EventWriter.Add - EventWriter gains optional principalMetrics field via SetMetrics; nil-safe so agent-side and test call sites are unaffected - Observations wired in sendFunc (SendQueueDwell) and recvFunc (AckRoundtrip) in the eventstream server Signed-off-by: Drew Bailey <drew.bailey@airbnb.com>
Under heavy load or during reconnects events can dwell longer than the previous 10s max bucket, causing all high-latency observations to accumulate in +Inf and making percentiles uncomputable. Signed-off-by: Drew Bailey <drew.bailey@airbnb.com>
… metrics Add documentation for principal_send_queue_dwell_seconds, principal_event_writer_dwell_seconds, principal_ack_roundtrip_seconds, and the rest_client_* k8s API client metrics. Restructure the table to include labels column and add pipeline diagram showing what each hop-by-hop metric measures. Signed-off-by: Drew Bailey <drew.bailey@airbnb.com>
Signed-off-by: Drew Bailey <drew.bailey@airbnb.com>
Signed-off-by: Drew Bailey <drew.bailey@airbnb.com>
Signed-off-by: Drew Bailey <drew.bailey@airbnb.com>
Signed-off-by: Drew Bailey <drew.bailey@airbnb.com>
9a672ef to
f456a7d
Compare
Add three new principal-side histograms to isolate which stage owns propagation latency:
Implementation:
What does this PR do / why we need it:
Which issue(s) this PR fixes:
Fixes #?
How to test changes / Special notes to the reviewer:
Checklist
Summary by CodeRabbit
New Features
Documentation
Tests