Skip to content

Commit 9d29e6e

Browse files
authored
fix: append operation can be only canceled by the wal itself but not the rpc (#45078)
issue: #45077 We need to promise the state of wal consistent with the memory state of streamingnode. So we don't allow the append operation can be cancelled by the append caller to avoid leave a inconsistent state of alive wal. The wal append operation can only be cancelled when the wal is shutting down. Signed-off-by: chyezh <[email protected]>
1 parent 2631e7f commit 9d29e6e

File tree

2 files changed

+35
-15
lines changed

2 files changed

+35
-15
lines changed

internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package adaptor
22

33
import (
44
"context"
5+
"time"
56

67
"go.uber.org/zap"
78

@@ -12,7 +13,6 @@ import (
1213
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
1314
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
1415
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
15-
"github.com/milvus-io/milvus/pkg/v2/util/lifetime"
1616
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
1717
)
1818

@@ -21,7 +21,8 @@ var _ wal.WAL = (*roWALAdaptorImpl)(nil)
2121
type roWALAdaptorImpl struct {
2222
log.Binder
2323
lifetime *typeutil.Lifetime
24-
available lifetime.SafeChan
24+
availableCtx context.Context
25+
availableCancel context.CancelFunc
2526
idAllocator *typeutil.IDAllocator
2627
roWALImpls walimpls.ROWALImpls
2728
scannerRegistry scannerRegistry
@@ -89,20 +90,20 @@ func (w *roWALAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.S
8990

9091
// IsAvailable returns whether the wal is available.
9192
func (w *roWALAdaptorImpl) IsAvailable() bool {
92-
return !w.available.IsClosed()
93+
return w.availableCtx.Err() == nil
9394
}
9495

9596
// Available returns a channel that will be closed when the wal is shut down.
9697
func (w *roWALAdaptorImpl) Available() <-chan struct{} {
97-
return w.available.CloseCh()
98+
return w.availableCtx.Done()
9899
}
99100

100101
// Close overrides Scanner Close function.
101102
func (w *roWALAdaptorImpl) Close() {
102103
// begin to close the wal.
103104
w.Logger().Info("wal begin to close...")
104105
w.lifetime.SetState(typeutil.LifetimeStateStopped)
105-
w.available.Close()
106+
w.forceCancelAfterGracefulTimeout()
106107
w.lifetime.Wait()
107108

108109
w.Logger().Info("wal begin to close scanners...")
@@ -124,3 +125,14 @@ func (w *roWALAdaptorImpl) Close() {
124125
// close all metrics.
125126
w.scanMetrics.Close()
126127
}
128+
129+
// forceCancelAfterGracefulTimeout forces to cancel the context after the graceful timeout.
130+
func (w *roWALAdaptorImpl) forceCancelAfterGracefulTimeout() {
131+
if w.availableCtx.Err() != nil {
132+
return
133+
}
134+
time.AfterFunc(3*time.Second, func() {
135+
// perform a force cancel to avoid resource leak.
136+
w.availableCancel()
137+
})
138+
}

internal/streamingnode/server/wal/adaptor/wal_adaptor.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
2323
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
2424
"github.com/milvus-io/milvus/pkg/v2/util/conc"
25-
"github.com/milvus-io/milvus/pkg/v2/util/lifetime"
25+
"github.com/milvus-io/milvus/pkg/v2/util/contextutil"
2626
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
2727
)
2828

@@ -39,11 +39,13 @@ func adaptImplsToROWAL(
3939
log.FieldComponent("wal"),
4040
zap.String("channel", basicWAL.Channel().String()),
4141
)
42+
ctx, cancel := context.WithCancel(context.Background())
4243
roWAL := &roWALAdaptorImpl{
43-
roWALImpls: basicWAL,
44-
lifetime: typeutil.NewLifetime(),
45-
available: lifetime.NewSafeChan(),
46-
idAllocator: typeutil.NewIDAllocator(),
44+
roWALImpls: basicWAL,
45+
lifetime: typeutil.NewLifetime(),
46+
availableCtx: ctx,
47+
availableCancel: cancel,
48+
idAllocator: typeutil.NewIDAllocator(),
4749
scannerRegistry: scannerRegistry{
4850
channel: basicWAL.Channel(),
4951
idAllocator: typeutil.NewIDAllocator(),
@@ -147,14 +149,20 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
147149
select {
148150
case <-ctx.Done():
149151
return nil, ctx.Err()
150-
case <-w.available.CloseCh():
152+
case <-w.availableCtx.Done():
151153
return nil, status.NewOnShutdownError("wal is on shutdown")
152154
case <-w.interceptorBuildResult.Interceptor.Ready():
153155
}
154156

155157
// Setup the term of wal.
156158
msg = msg.WithWALTerm(w.Channel().Term)
157159

160+
// we need to promise the state of wal kept consistent with the memory state of streamingnode.
161+
// So we don't allow the append operation can be canceled by the append caller to avoid leave a inconsistent state of alive wal,
162+
// the wal append operation can only be canceled when the wal is shutting down.
163+
ctx, cancel := contextutil.MergeContext(context.WithoutCancel(ctx), w.availableCtx)
164+
defer cancel()
165+
158166
appendMetrics := w.writeMetrics.StartAppend(msg)
159167
ctx = utility.WithAppendMetricsContext(ctx, appendMetrics)
160168

@@ -181,7 +189,7 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
181189
if errors.Is(err, walimpls.ErrFenced) {
182190
// if the append operation of wal is fenced, we should report the error to the client.
183191
if w.isFenced.CompareAndSwap(false, true) {
184-
w.available.Close()
192+
w.forceCancelAfterGracefulTimeout()
185193
w.Logger().Warn("wal is fenced, mark as unavailable, all append opertions will be rejected", zap.Error(err))
186194
}
187195
return nil, status.NewChannelFenced(w.Channel().String())
@@ -231,8 +239,8 @@ func (w *walAdaptorImpl) retryAppendWhenRecoverableError(ctx context.Context, ms
231239

232240
select {
233241
case <-ctx.Done():
234-
return nil, ctx.Err()
235-
case <-w.available.CloseCh():
242+
return nil, context.Cause(ctx)
243+
case <-w.availableCtx.Done():
236244
return nil, status.NewOnShutdownError("wal is on shutdown")
237245
case <-time.After(nextInterval):
238246
}
@@ -265,7 +273,7 @@ func (w *walAdaptorImpl) Close() {
265273

266274
// begin to close the wal.
267275
w.lifetime.SetState(typeutil.LifetimeStateStopped)
268-
w.available.Close()
276+
w.forceCancelAfterGracefulTimeout()
269277
w.lifetime.Wait()
270278

271279
// close the flusher.

0 commit comments

Comments
 (0)