-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Skip unnecessary disconnect and wait for disconnect to finish in shutdown #10031
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
2d9ae14
987e6f8
b4efe3f
b2c398b
cfdafb0
c1ce7d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -775,7 +775,9 @@ func NewBrontide(cfg Config) *Brontide { | |
// Start starts all helper goroutines the peer needs for normal operations. In | ||
// the case this peer has already been started, then this function is a noop. | ||
func (p *Brontide) Start() error { | ||
if atomic.AddInt32(&p.started, 1) != 1 { | ||
if !atomic.CompareAndSwapInt32(&p.started, 0, 1) { | ||
p.log.Warn("already started") | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -1579,17 +1581,30 @@ func (p *Brontide) maybeSendChannelUpdates() { | |
// calling Disconnect will signal the quit channel and the method will not | ||
// block, since no goroutines were spawned. | ||
func (p *Brontide) WaitForDisconnect(ready chan struct{}) { | ||
p.log.Trace("waiting for disconnect") | ||
defer p.log.Trace("peer disconnected") | ||
|
||
// Before we try to call the `Wait` goroutine, we'll make sure the main | ||
// set of goroutines are already active. | ||
select { | ||
case <-p.startReady: | ||
p.log.Trace("startReady received, waiting for signal ready") | ||
|
||
case <-p.cg.Done(): | ||
p.log.Trace("peer quit, exit waiting for signal startReady") | ||
|
||
// Wait for goroutines to finish before exit. | ||
p.cg.WgWait() | ||
|
||
return | ||
} | ||
|
||
select { | ||
case <-ready: | ||
p.log.Trace("ready received, waiting goroutines to finish") | ||
|
||
case <-p.cg.Done(): | ||
p.log.Trace("peer quit, exit waiting for signal ready") | ||
} | ||
|
||
p.cg.WgWait() | ||
|
@@ -1604,6 +1619,9 @@ func (p *Brontide) WaitForDisconnect(ready chan struct{}) { | |
// the peer has finished starting up before calling this method. | ||
func (p *Brontide) Disconnect(reason error) { | ||
if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) { | ||
p.log.Warnf("got disconnect reason [%v], but peer already "+ | ||
"disconnected", reason) | ||
|
||
return | ||
} | ||
|
||
|
@@ -1614,11 +1632,13 @@ func (p *Brontide) Disconnect(reason error) { | |
// started, otherwise we will skip reading it as this chan won't be | ||
// closed, hence blocks forever. | ||
if atomic.LoadInt32(&p.started) == 1 { | ||
p.log.Debugf("Peer hasn't finished starting up yet, waiting " + | ||
"on startReady signal before closing connection") | ||
p.log.Debug("waiting on startReady signal before closing " + | ||
"connection") | ||
|
||
select { | ||
case <-p.startReady: | ||
p.log.Debug("startReady received") | ||
|
||
case <-p.cg.Done(): | ||
return | ||
} | ||
|
@@ -2047,6 +2067,9 @@ func (p *Brontide) readHandler() { | |
// gossiper? | ||
p.initGossipSync() | ||
|
||
// exitErr is the error to be used when disconnect the peer. | ||
var exitErr error | ||
|
||
discStream := newDiscMsgStream(p) | ||
discStream.Start() | ||
defer discStream.Stop() | ||
|
@@ -2060,7 +2083,8 @@ out: | |
} | ||
} | ||
if err != nil { | ||
p.log.Infof("unable to read message from peer: %v", err) | ||
p.log.Debugf("unable to read message from peer: %v", | ||
err) | ||
|
||
// If we could not read our peer's message due to an | ||
// unknown type or invalid alias, we continue processing | ||
|
@@ -2098,6 +2122,7 @@ out: | |
// didn't recognize, then we'll stop all processing as | ||
// this is a fatal error. | ||
default: | ||
exitErr = err | ||
break out | ||
} | ||
} | ||
|
@@ -2182,8 +2207,7 @@ out: | |
err := p.resendChanSyncMsg(targetChan) | ||
if err != nil { | ||
// TODO(halseth): send error to peer? | ||
p.log.Errorf("resend failed: %v", | ||
err) | ||
p.log.Errorf("resend failed: %v", err) | ||
} | ||
} | ||
|
||
|
@@ -2242,9 +2266,13 @@ out: | |
idleTimer.Reset(idleTimeout) | ||
} | ||
|
||
p.Disconnect(errors.New("read handler closed")) | ||
// Disconnect the peer on exitErr, but only if the peer hasn't been | ||
// disconnected before. | ||
if atomic.LoadInt32(&p.disconnect) == 0 { | ||
p.Disconnect(exitErr) | ||
Roasbeef marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
yyforyongyu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
p.log.Trace("readHandler for peer done") | ||
p.log.Debugf("peer quit, exit readHandler") | ||
} | ||
|
||
// handleCustomMessage handles the given custom message if a handler is | ||
|
@@ -2729,7 +2757,7 @@ out: | |
} | ||
|
||
case <-p.cg.Done(): | ||
exitErr = lnpeer.ErrPeerExiting | ||
p.log.Debug("peer quit, exit writeHandler") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why don't we want to record the peerExiting error anymore ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here we already know the peer is quitting, which means it has been disconnected, so we don't use this error and call disconnect again below. |
||
break out | ||
} | ||
} | ||
|
@@ -2738,7 +2766,9 @@ out: | |
// disconnect. | ||
p.cg.WgDone() | ||
|
||
p.Disconnect(exitErr) | ||
if exitErr != nil { | ||
p.Disconnect(exitErr) | ||
} | ||
|
||
p.log.Trace("writeHandler for peer done") | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5209,7 +5209,12 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { | |
// | ||
// NOTE: We call it in a goroutine to avoid blocking the main server | ||
// goroutine because we might hold the server's mutex. | ||
go peer.Disconnect(fmt.Errorf("server: DisconnectPeer called")) | ||
s.wg.Add(1) | ||
go func() { | ||
defer s.wg.Done() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this might be the only significant change in this PR. |
||
|
||
peer.Disconnect(fmt.Errorf("server: DisconnectPeer called")) | ||
}() | ||
|
||
return nil | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.