Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions client/v3/member_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2026 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientv3

import (
"context"
"time"
)

// MemberEventType represents the type of membership change event.
type MemberEventType int

const (
// MemberAdded indicates a new member was added to the cluster.
MemberAdded MemberEventType = iota
// MemberRemoved indicates a member was removed from the cluster.
MemberRemoved
// MemberUpdated indicates a member's configuration was updated.
MemberUpdated
)

// MemberEvent represents a membership change event.
type MemberEvent struct {
Type MemberEventType
Member *Member
}

// MemberWatchConfig configures the member watch behavior.
type MemberWatchConfig struct {
// PollInterval is how often to poll for membership changes.
// Default is 10 seconds.
PollInterval time.Duration
}

// MemberWatcher watches for membership changes using polling.
type MemberWatcher struct {
client *Client
config MemberWatchConfig
}

// NewMemberWatcher creates a new MemberWatcher.
func NewMemberWatcher(client *Client, config MemberWatchConfig) *MemberWatcher {
if config.PollInterval == 0 {
config.PollInterval = 10 * time.Second
}
return &MemberWatcher{
client: client,
config: config,
}
}

// Watch starts watching for membership changes.
// It returns a channel that receives MemberEvent when members are added, removed, or updated.
// The watch stops when the context is canceled.
// If includeInitial is true, the current members are sent as MemberAdded events first.
func (w *MemberWatcher) Watch(ctx context.Context, includeInitial bool) <-chan MemberEvent {
eventCh := make(chan MemberEvent)

go func() {
defer close(eventCh)

// Get initial member list
resp, err := w.client.Cluster.MemberList(ctx)
if err != nil {
return
}

previousMembers := make(map[uint64]*Member)
for _, m := range resp.Members {
member := (*Member)(m)
previousMembers[m.ID] = member

if includeInitial {
select {
case eventCh <- MemberEvent{Type: MemberAdded, Member: member}:
case <-ctx.Done():
return
}
}
}

ticker := time.NewTicker(w.config.PollInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Sync endpoints and get current members
w.client.Sync(ctx)

resp, err := w.client.Cluster.MemberList(ctx)
if err != nil {
continue
}

currentMembers := make(map[uint64]*Member)
for _, m := range resp.Members {
currentMembers[m.ID] = (*Member)(m)
}

// Detect added members
for id, member := range currentMembers {
if _, exists := previousMembers[id]; !exists {
select {
case eventCh <- MemberEvent{Type: MemberAdded, Member: member}:
case <-ctx.Done():
return
}
}
}

// Detect removed members
for id, member := range previousMembers {
if _, exists := currentMembers[id]; !exists {
select {
case eventCh <- MemberEvent{Type: MemberRemoved, Member: member}:
case <-ctx.Done():
return
}
}
}

// Detect updated members (peer URLs changed)
for id, current := range currentMembers {
if prev, exists := previousMembers[id]; exists {
if !stringSlicesEqual(current.PeerURLs, prev.PeerURLs) {
select {
case eventCh <- MemberEvent{Type: MemberUpdated, Member: current}:
case <-ctx.Done():
return
}
}
}
}

previousMembers = currentMembers
}
}
}()

return eventCh
}

// stringSlicesEqual compares two string slices for equality.
func stringSlicesEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}