Skip to content

Conversation

@lukebakken
Copy link
Contributor

@lukebakken lukebakken commented Nov 21, 2025

The Go client currently uses equal probability distribution for consumer placement, including the leader node in the candidate pool alongside replicas. This results in consumers connecting to the leader ~33% of the time (with 2 replicas), which violates RabbitMQ Streams best practices that recommend consumers connect to replicas rather than leaders.

This change modifies BrokerForConsumerWithResolver to only include the leader in the candidate pool when no replicas are available. When replicas exist, consumers randomly select from replicas only, guaranteeing zero probability of leader placement. This matches the .NET client behavior and follows RabbitMQ Streams best practices for consumer placement.

The leader is used as a fallback when availableReplicas == 0, ensuring consumers can still connect when replicas are unavailable.

(this PR builds on #445, just FYI)

@lukebakken lukebakken force-pushed the lukebakken/consumer-replica-preference branch from bbb7210 to b7f721a Compare November 21, 2025 23:20
@lukebakken lukebakken marked this pull request as ready for review November 21, 2025 23:28
@lukebakken lukebakken force-pushed the lukebakken/consumer-replica-preference branch 2 times, most recently from 45e7c74 to 502140e Compare November 22, 2025 00:01
@Gsantomaggio
Copy link
Member

Uh! Good catch @lukebakken! Thank you!

@Gsantomaggio Gsantomaggio added this to the 1.6.2 milestone Nov 22, 2025
@Gsantomaggio Gsantomaggio self-requested a review November 22, 2025 17:16
@lukebakken
Copy link
Contributor Author

Thank @samuelmasse! He's been extensively comparing the behavior of stream clients and found this particular issue 😺

@Gsantomaggio
Copy link
Member

Gsantomaggio commented Nov 23, 2025

@lukebakken, can you please execute the tests? There is a problem with the clientOptions interface.

@samuelmasse, if you need info with stream clients, let me know

Thank you both! :)!

@lukebakken lukebakken force-pushed the lukebakken/consumer-replica-preference branch from c24c566 to 90271f5 Compare November 24, 2025 15:39
@lukebakken
Copy link
Contributor Author

@Gsantomaggio working on it now. I must have missed the email saying CI failed.

@lukebakken lukebakken force-pushed the lukebakken/consumer-replica-preference branch 2 times, most recently from 6403433 to a5f372e Compare November 24, 2025 17:07
@lukebakken
Copy link
Contributor Author

lukebakken commented Nov 24, 2025

Turns out CI didn't run because it's not configured to on a PR. Anyway, everything passes now on my machine. I'll see what's up in CI.

I'll follow-up with another PR to do the following:

  • Update github actions versions
  • Re-instate Windows build
  • Fix test suites. In pkg/stream/environment_test.go the test that failed in this PR is entirely in the Describe block, which isn't how tests should be structured (hence the change in this PR). I'll review the test suite to fix this.

UPDATE: I can't re-run the tests, and I don't think the one failure is due to these changes.

@lukebakken
Copy link
Contributor Author

Once #447 is merged, I'll rebase this branch.

@lukebakken lukebakken force-pushed the lukebakken/consumer-replica-preference branch from a5f372e to bf02ddc Compare November 24, 2025 19:58
@codecov
Copy link

codecov bot commented Nov 24, 2025

Codecov Report

❌ Patch coverage is 65.71429% with 24 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.40%. Comparing base (2eed375) to head (9f2cc48).
⚠️ Report is 120 commits behind head on main.

Files with missing lines Patch % Lines
pkg/stream/environment.go 62.85% 11 Missing and 2 partials ⚠️
pkg/stream/client.go 70.00% 5 Missing and 1 partial ⚠️
pkg/ha/ha_consumer.go 40.00% 3 Missing ⚠️
pkg/stream/consumer.go 66.66% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #446      +/-   ##
==========================================
- Coverage   84.50%   83.40%   -1.11%     
==========================================
  Files          26       28       +2     
  Lines        3756     4206     +450     
==========================================
+ Hits         3174     3508     +334     
- Misses        411      473      +62     
- Partials      171      225      +54     
Flag Coverage Δ
unittests 83.40% <65.71%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@lukebakken lukebakken marked this pull request as draft November 24, 2025 20:11
@lukebakken lukebakken force-pushed the lukebakken/consumer-replica-preference branch from bf02ddc to 091b19f Compare November 24, 2025 20:16
@lukebakken lukebakken marked this pull request as ready for review November 24, 2025 20:17
@lukebakken lukebakken marked this pull request as draft November 24, 2025 20:24
The Go client currently uses equal probability distribution for consumer
placement, including the leader node in the candidate pool alongside
replicas. This results in consumers connecting to the leader ~33% of the
time (with 2 replicas), which violates RabbitMQ Streams best practices
that recommend consumers connect to replicas rather than leaders.

This change modifies `BrokerForConsumerWithResolver` to only include the
leader in the candidate pool when no replicas are available. When replicas
exist, consumers randomly select from replicas only, guaranteeing zero
probability of leader placement. This matches the .NET client behavior and
follows RabbitMQ Streams best practices for consumer placement.

The leader is used as a fallback when `availableReplicas == 0`, ensuring
consumers can still connect when replicas are unavailable.

Use same retry delay as .NET stream client

Includes `golangci-lint` fixups
@lukebakken lukebakken force-pushed the lukebakken/consumer-replica-preference branch from 091b19f to 042f6b5 Compare November 24, 2025 20:29
@lukebakken lukebakken marked this pull request as ready for review November 24, 2025 20:35
@lukebakken
Copy link
Contributor Author

@Gsantomaggio OK I'm sure the latest failures are unrelated to these changes. Thanks for the review!

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
The method does not indicate the last message stored, but the last stored in memory.

Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio
Copy link
Member

@lukebakken Thanks again for the PR.

It was not a flaky test, but it was a bug.

I deprecated the function func (consumer *Consumer) GetLastStoredOffset() int64, which was there to avoid constantly querying the server to check the last stored offset. GetLastStoredOffset

There is an edge case in which multiple clients use the same consumer name, and the last stored offset in memory is not the one the user expects.
So, to avoid confusion, it is better to use QueryOffset, which always gets the value from the server.

QueryOffset() is the right way to get the offset.

The bug was in GetLastStoredOffset(): during a restart, the consumer did not return the correct value.

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio self-assigned this Nov 25, 2025
@Gsantomaggio Gsantomaggio added the enhancement New feature or request label Nov 25, 2025
@Gsantomaggio Gsantomaggio merged commit 9cde6b9 into rabbitmq:main Nov 26, 2025
16 of 19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants