Skip to content

Conversation

@jakobht
Copy link
Member

@jakobht jakobht commented Nov 11, 2025

What changed?
Implemented the WatchNamespaceState streaming RPC endpoint for the shard distributor service, including a pub/sub mechanism for real-time assignment change notifications.

Why?
The WatchNamespaceState endpoint was previously unimplemented. This enables executors and spectators to receive real-time updates about shard assignment changes without polling, improving responsiveness and reducing load on the storage layer.

How did you test it?
Added unit tests for the handler's streaming behavior and the pub/sub mechanism.

Potential risks
Low - this is a new feature in an experimental service. The pub/sub implementation includes non-blocking publish to prevent slow subscribers from blocking the system.

Release notes
N/A - shard distributor is experimental

Documentation Changes
None required

Signed-off-by: Jakob Haahr Taankvist <[email protected]>
Signed-off-by: Jakob Haahr Taankvist <[email protected]>
Signed-off-by: Jakob Haahr Taankvist <[email protected]>
Signed-off-by: Jakob Haahr Taankvist <[email protected]>
Signed-off-by: Jakob Haahr Taankvist <[email protected]>
Signed-off-by: Jakob Haahr Taankvist <[email protected]>
Signed-off-by: Jakob Haahr Taankvist <[email protected]>
// Stream subsequent updates
for {
select {
case <-server.Context().Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we stop shardDistributor? is it implicitly handled witht he server context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - I assume so - it's the only context availible at least

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can test this, shutting down the shard distributor and checking that the canaries are not hanging but they connect to a new stream :)

select {
case sub <- state:
default:
// Subscriber is not reading fast enough, skip this update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we retry? we call refresh and then publish only in case of changes, let's say that no changes happen, some subscribers will have stale info until next change, are we fine with that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a good point - maybe we can send a reconciliation message every 1s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a good idea

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do a follow up PR

// Subscribe returns a channel that receives executor state updates.
func (p *executorStatePubSub) subscribe(ctx context.Context) (<-chan map[*store.ShardOwner][]string, func()) {
ch := make(chan map[*store.ShardOwner][]string)
uniqueID := uuid.New().String()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking out loud, should we return the subscription ID for debug purposes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the value, but maybe if you elaborate a bit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of a issues with a subscription we only have the subscriptionID stored on SD side and we don't know which instance is not receiving updates. We can understand which namespace is impacted but maybe it is too wide. I am thinking if we should prepend the caller instance to this uid for example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ill do a followup PR to add a spectator ID so we can make this connection

@jakobht jakobht merged commit 57f0d8d into cadence-workflow:master Nov 12, 2025
42 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants