Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,7 @@ const (
ShardDistributorStoreGetStateScope
ShardDistributorStoreRecordHeartbeatScope
ShardDistributorStoreSubscribeScope
ShardDistributorStoreSubscribeToAssignmentChangesScope

// The scope for the shard distributor executor
ShardDistributorExecutorScope
Expand Down Expand Up @@ -2153,20 +2154,21 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
DiagnosticsWorkflowScope: {operation: "DiagnosticsWorkflow"},
},
ShardDistributor: {
ShardDistributorGetShardOwnerScope: {operation: "GetShardOwner"},
ShardDistributorWatchNamespaceStateScope: {operation: "WatchNamespaceState"},
ShardDistributorHeartbeatScope: {operation: "ExecutorHeartbeat"},
ShardDistributorAssignLoopScope: {operation: "ShardAssignLoop"},
ShardDistributorExecutorScope: {operation: "Executor"},
ShardDistributorStoreGetShardOwnerScope: {operation: "StoreGetShardOwner"},
ShardDistributorStoreAssignShardScope: {operation: "StoreAssignShard"},
ShardDistributorStoreAssignShardsScope: {operation: "StoreAssignShards"},
ShardDistributorStoreDeleteExecutorsScope: {operation: "StoreDeleteExecutors"},
ShardDistributorStoreDeleteShardStatsScope: {operation: "StoreDeleteShardStats"},
ShardDistributorStoreGetHeartbeatScope: {operation: "StoreGetHeartbeat"},
ShardDistributorStoreGetStateScope: {operation: "StoreGetState"},
ShardDistributorStoreRecordHeartbeatScope: {operation: "StoreRecordHeartbeat"},
ShardDistributorStoreSubscribeScope: {operation: "StoreSubscribe"},
ShardDistributorGetShardOwnerScope: {operation: "GetShardOwner"},
ShardDistributorWatchNamespaceStateScope: {operation: "WatchNamespaceState"},
ShardDistributorHeartbeatScope: {operation: "ExecutorHeartbeat"},
ShardDistributorAssignLoopScope: {operation: "ShardAssignLoop"},
ShardDistributorExecutorScope: {operation: "Executor"},
ShardDistributorStoreGetShardOwnerScope: {operation: "StoreGetShardOwner"},
ShardDistributorStoreAssignShardScope: {operation: "StoreAssignShard"},
ShardDistributorStoreAssignShardsScope: {operation: "StoreAssignShards"},
ShardDistributorStoreDeleteExecutorsScope: {operation: "StoreDeleteExecutors"},
ShardDistributorStoreDeleteShardStatsScope: {operation: "StoreDeleteShardStats"},
ShardDistributorStoreGetHeartbeatScope: {operation: "StoreGetHeartbeat"},
ShardDistributorStoreGetStateScope: {operation: "StoreGetState"},
ShardDistributorStoreRecordHeartbeatScope: {operation: "StoreRecordHeartbeat"},
ShardDistributorStoreSubscribeScope: {operation: "StoreSubscribe"},
ShardDistributorStoreSubscribeToAssignmentChangesScope: {operation: "StoreSubscribeToAssignmentChanges"},
},
}

Expand Down
75 changes: 74 additions & 1 deletion service/sharddistributor/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,78 @@ func (h *handlerImpl) assignEphemeralShard(ctx context.Context, namespace string
}

func (h *handlerImpl) WatchNamespaceState(request *types.WatchNamespaceStateRequest, server WatchNamespaceStateServer) error {
return fmt.Errorf("not implemented")
h.startWG.Wait()

// Subscribe to state changes from storage
assignmentChangesChan, unSubscribe, err := h.storage.SubscribeToAssignmentChanges(server.Context(), request.Namespace)
defer unSubscribe()
if err != nil {
return fmt.Errorf("subscribe to namespace state: %w", err)
}

// Send initial state immediately so client doesn't have to wait for first update
state, err := h.storage.GetState(server.Context(), request.Namespace)
if err != nil {
return fmt.Errorf("get initial state: %w", err)
}
response := toWatchNamespaceStateResponse(state)
if err := server.Send(response); err != nil {
return fmt.Errorf("send initial state: %w", err)
}

// Stream subsequent updates
for {
select {
case <-server.Context().Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we stop shardDistributor? is it implicitly handled witht he server context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - I assume so - it's the only context availible at least

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can test this, shutting down the shard distributor and checking that the canaries are not hanging but they connect to a new stream :)

return server.Context().Err()
case assignmentChanges, ok := <-assignmentChangesChan:
if !ok {
return fmt.Errorf("unexpected close of updates channel")
}
response := &types.WatchNamespaceStateResponse{
Executors: make([]*types.ExecutorShardAssignment, 0, len(state.ShardAssignments)),
}
for executor, shardIDs := range assignmentChanges {
response.Executors = append(response.Executors, &types.ExecutorShardAssignment{
ExecutorID: executor.ExecutorID,
AssignedShards: WrapShards(shardIDs),
Metadata: executor.Metadata,
})
}

err = server.Send(response)
if err != nil {
return fmt.Errorf("send response: %w", err)
}
}
}
}

func toWatchNamespaceStateResponse(state *store.NamespaceState) *types.WatchNamespaceStateResponse {
response := &types.WatchNamespaceStateResponse{
Executors: make([]*types.ExecutorShardAssignment, 0, len(state.ShardAssignments)),
}

for executorID, assignment := range state.ShardAssignments {
// Extract shard IDs from the assigned shards map
shardIDs := make([]string, 0, len(assignment.AssignedShards))
for shardID := range assignment.AssignedShards {
shardIDs = append(shardIDs, shardID)
}

response.Executors = append(response.Executors, &types.ExecutorShardAssignment{
ExecutorID: executorID,
AssignedShards: WrapShards(shardIDs),
Metadata: state.Executors[executorID].Metadata,
})
}
return response
}

func WrapShards(shardIDs []string) []*types.Shard {
shards := make([]*types.Shard, 0, len(shardIDs))
for _, shardID := range shardIDs {
shards = append(shards, &types.Shard{ShardKey: shardID})
}
return shards
}
81 changes: 81 additions & 0 deletions service/sharddistributor/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ package handler
import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -215,3 +217,82 @@ func TestGetShardOwner(t *testing.T) {
})
}
}

func TestWatchNamespaceState(t *testing.T) {
ctrl := gomock.NewController(t)
logger := testlogger.New(t)
mockStorage := store.NewMockStore(ctrl)
mockServer := NewMockWatchNamespaceStateServer(ctrl)

cfg := config.ShardDistribution{
Namespaces: []config.Namespace{
{Name: "test-ns", Type: config.NamespaceTypeFixed, ShardNum: 2},
},
}

handler := &handlerImpl{
logger: logger,
shardDistributionCfg: cfg,
storage: mockStorage,
startWG: sync.WaitGroup{},
}

t.Run("successful streaming", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

initialState := &store.NamespaceState{
ShardAssignments: map[string]store.AssignedState{
"executor-1": {
AssignedShards: map[string]*types.ShardAssignment{
"shard-1": {},
},
},
},
}

updatesChan := make(chan map[*store.ShardOwner][]string, 1)
unsubscribe := func() { close(updatesChan) }

mockServer.EXPECT().Context().Return(ctx).AnyTimes()
mockStorage.EXPECT().GetState(gomock.Any(), "test-ns").Return(initialState, nil)
mockStorage.EXPECT().SubscribeToAssignmentChanges(gomock.Any(), "test-ns").Return(updatesChan, unsubscribe, nil)

// Expect initial state send
mockServer.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *types.WatchNamespaceStateResponse) error {
require.Len(t, resp.Executors, 1)
require.Equal(t, "executor-1", resp.Executors[0].ExecutorID)
return nil
})

// Expect update send
mockServer.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *types.WatchNamespaceStateResponse) error {
require.Len(t, resp.Executors, 1)
require.Equal(t, "executor-2", resp.Executors[0].ExecutorID)
return nil
})

// Send update, then cancel
go func() {
time.Sleep(10 * time.Millisecond)
updatesChan <- map[*store.ShardOwner][]string{
{ExecutorID: "executor-2", Metadata: map[string]string{}}: {"shard-2"},
}
cancel()
}()

err := handler.WatchNamespaceState(&types.WatchNamespaceStateRequest{Namespace: "test-ns"}, mockServer)
require.Error(t, err)
require.ErrorIs(t, err, context.Canceled)
})

t.Run("storage error on initial state", func(t *testing.T) {
ctx := context.Background()
mockServer.EXPECT().Context().Return(ctx).AnyTimes()
mockStorage.EXPECT().GetState(gomock.Any(), "test-ns").Return(nil, errors.New("storage error"))
mockStorage.EXPECT().SubscribeToAssignmentChanges(gomock.Any(), "test-ns").Return(make(chan map[*store.ShardOwner][]string), func() {}, nil)

err := handler.WatchNamespaceState(&types.WatchNamespaceStateRequest{Namespace: "test-ns"}, mockServer)
require.Error(t, err)
require.Contains(t, err.Error(), "get initial state")
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
}, nil
}

func (s *executorStoreImpl) SubscribeToAssignmentChanges(ctx context.Context, namespace string) (<-chan map[*store.ShardOwner][]string, func(), error) {
return s.shardCache.Subscribe(ctx, namespace)
}

func (s *executorStoreImpl) Subscribe(ctx context.Context, namespace string) (<-chan int64, error) {
revisionChan := make(chan int64, 1)
watchPrefix := etcdkeys.BuildExecutorPrefix(s.prefix, namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ type namespaceShardToExecutor struct {
sync.RWMutex

shardToExecutor map[string]*store.ShardOwner
executorState map[*store.ShardOwner][]string // executor -> shardIDs
executorRevision map[string]int64
namespace string
etcdPrefix string
changeUpdateChannel clientv3.WatchChan
stopCh chan struct{}
logger log.Logger
client *clientv3.Client
pubSub *executorStatePubSub
}

func newNamespaceShardToExecutor(etcdPrefix, namespace string, client *clientv3.Client, stopCh chan struct{}, logger log.Logger) (*namespaceShardToExecutor, error) {
Expand All @@ -35,13 +37,15 @@ func newNamespaceShardToExecutor(etcdPrefix, namespace string, client *clientv3.

return &namespaceShardToExecutor{
shardToExecutor: make(map[string]*store.ShardOwner),
executorState: make(map[*store.ShardOwner][]string),
executorRevision: make(map[string]int64),
namespace: namespace,
etcdPrefix: etcdPrefix,
changeUpdateChannel: watchChan,
stopCh: stopCh,
logger: logger,
client: client,
pubSub: newExecutorStatePubSub(logger, namespace),
}, nil
}

Expand Down Expand Up @@ -94,6 +98,10 @@ func (n *namespaceShardToExecutor) GetExecutorModRevisionCmp() ([]clientv3.Cmp,
return comparisons, nil
}

func (n *namespaceShardToExecutor) Subscribe(ctx context.Context) (<-chan map[*store.ShardOwner][]string, func()) {
return n.pubSub.subscribe(ctx)
}

func (n *namespaceShardToExecutor) nameSpaceRefreashLoop() {
for {
select {
Expand Down Expand Up @@ -124,7 +132,24 @@ func (n *namespaceShardToExecutor) nameSpaceRefreashLoop() {
}

func (n *namespaceShardToExecutor) refresh(ctx context.Context) error {
err := n.refreshExecutorState(ctx)
if err != nil {
return fmt.Errorf("refresh executor state: %w", err)
}

n.RLock()
executorState := make(map[*store.ShardOwner][]string)
for executor, shardIDs := range n.executorState {
executorState[executor] = make([]string, len(shardIDs))
copy(executorState[executor], shardIDs)
}
n.RUnlock()

n.pubSub.publish(n.executorState)
return nil
}

func (n *namespaceShardToExecutor) refreshExecutorState(ctx context.Context) error {
executorPrefix := etcdkeys.BuildExecutorPrefix(n.etcdPrefix, n.namespace)

resp, err := n.client.Get(ctx, executorPrefix, clientv3.WithPrefix())
Expand All @@ -136,6 +161,7 @@ func (n *namespaceShardToExecutor) refresh(ctx context.Context) error {
defer n.Unlock()
// Clear the cache, so we don't have any stale data
n.shardToExecutor = make(map[string]*store.ShardOwner)
n.executorState = make(map[*store.ShardOwner][]string)
n.executorRevision = make(map[string]int64)

shardOwners := make(map[string]*store.ShardOwner)
Expand All @@ -154,10 +180,15 @@ func (n *namespaceShardToExecutor) refresh(ctx context.Context) error {
if err != nil {
return fmt.Errorf("parse assigned state: %w", err)
}

// Build both shard->executor and executor->shards mappings
shardIDs := make([]string, 0, len(assignedState.AssignedShards))
for shardID := range assignedState.AssignedShards {
n.shardToExecutor[shardID] = shardOwner
shardIDs = append(shardIDs, shardID)
n.executorRevision[executorID] = kv.ModRevision
}
n.executorState[shardOwner] = shardIDs

case etcdkeys.ExecutorMetadataKey:
shardOwner := getOrCreateShardOwner(shardOwners, executorID)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package shardcache

import (
"context"
"sync"

"github.com/google/uuid"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/service/sharddistributor/store"
)

// executorStatePubSub manages subscriptions to executor state changes
type executorStatePubSub struct {
mu sync.RWMutex
subscribers map[string]chan<- map[*store.ShardOwner][]string
logger log.Logger
namespace string
}

func newExecutorStatePubSub(logger log.Logger, namespace string) *executorStatePubSub {
return &executorStatePubSub{
subscribers: make(map[string]chan<- map[*store.ShardOwner][]string),
logger: logger,
namespace: namespace,
}
}

// Subscribe returns a channel that receives executor state updates.
func (p *executorStatePubSub) subscribe(ctx context.Context) (<-chan map[*store.ShardOwner][]string, func()) {
ch := make(chan map[*store.ShardOwner][]string)
uniqueID := uuid.New().String()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking out loud, should we return the subscription ID for debug purposes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the value, but maybe if you elaborate a bit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of a issues with a subscription we only have the subscriptionID stored on SD side and we don't know which instance is not receiving updates. We can understand which namespace is impacted but maybe it is too wide. I am thinking if we should prepend the caller instance to this uid for example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ill do a followup PR to add a spectator ID so we can make this connection


p.mu.Lock()
defer p.mu.Unlock()
p.subscribers[uniqueID] = ch

unSub := func() {
p.unSubscribe(uniqueID)
}

return ch, unSub
}

func (p *executorStatePubSub) unSubscribe(uniqueID string) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.subscribers, uniqueID)
}

// Publish sends the state to all subscribers (non-blocking)
func (p *executorStatePubSub) publish(state map[*store.ShardOwner][]string) {
p.mu.RLock()
defer p.mu.RUnlock()

for _, sub := range p.subscribers {
select {
case sub <- state:
default:
// Subscriber is not reading fast enough, skip this update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we retry? we call refresh and then publish only in case of changes, let's say that no changes happen, some subscribers will have stale info until next change, are we fine with that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a good point - maybe we can send a reconciliation message every 1s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a good idea

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do a follow up PR

p.logger.Warn("Subscriber not keeping up with state updates, dropping update", tag.ShardNamespace(p.namespace))
}
}
}
Loading
Loading