Skip to content

Commit 44f63ae

Browse files
committed
handle logon error
1 parent e4fe18c commit 44f63ae

File tree

6 files changed

+54
-28
lines changed

6 files changed

+54
-28
lines changed

acceptor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package quickfix
1818
import (
1919
"bufio"
2020
"bytes"
21+
"context"
2122
"crypto/tls"
2223
"io"
2324
"net"
@@ -361,6 +362,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
361362
a.sessionAddr.Store(sessID, netConn.RemoteAddr())
362363
msgIn := make(chan fixIn)
363364
msgOut := make(chan []byte)
365+
ctx := context.Background()
364366

365367
if err := session.connect(msgIn, msgOut); err != nil {
366368
a.globalLog.OnEventf("Unable to accept session %v connection: %v", sessID, err.Error())
@@ -369,10 +371,10 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
369371

370372
go func() {
371373
msgIn <- fixIn{msgBytes, parser.lastRead}
372-
readLoop(parser, msgIn, a.globalLog)
374+
readLoop(ctx, parser, msgIn, a.globalLog)
373375
}()
374376

375-
writeLoop(netConn, msgOut, a.globalLog)
377+
writeLoop(ctx, netConn, msgOut, a.globalLog)
376378
}
377379

378380
func (a *Acceptor) dynamicSessionsLoop() {

connection.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,19 @@
1515

1616
package quickfix
1717

18-
import "io"
18+
import (
19+
"context"
20+
"io"
21+
)
1922

20-
func writeLoop(connection io.Writer, messageOut chan []byte, log Log) {
23+
func writeLoop(ctx context.Context, connection io.Writer, messageOut chan []byte, log Log) {
2124
for {
25+
select {
26+
case <-ctx.Done():
27+
return
28+
default:
29+
}
30+
2231
msg, ok := <-messageOut
2332
if !ok {
2433
return
@@ -30,10 +39,16 @@ func writeLoop(connection io.Writer, messageOut chan []byte, log Log) {
3039
}
3140
}
3241

33-
func readLoop(parser *parser, msgIn chan fixIn, log Log) {
42+
func readLoop(ctx context.Context, parser *parser, msgIn chan fixIn, log Log) {
3443
defer close(msgIn)
3544

3645
for {
46+
select {
47+
case <-ctx.Done():
48+
return
49+
default:
50+
}
51+
3752
msg, err := parser.ReadMessage()
3853
if err != nil {
3954
log.OnEvent(err.Error())

connection_internal_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ package quickfix
1717

1818
import (
1919
"bytes"
20+
"context"
2021
"strings"
2122
"testing"
2223
)
2324

2425
func TestWriteLoop(t *testing.T) {
26+
ctx := context.Background()
2527
writer := bytes.NewBufferString("")
2628
msgOut := make(chan []byte)
2729

@@ -31,7 +33,7 @@ func TestWriteLoop(t *testing.T) {
3133
msgOut <- []byte("test msg 3")
3234
close(msgOut)
3335
}()
34-
writeLoop(writer, msgOut, nullLog{})
36+
writeLoop(ctx, writer, msgOut, nullLog{})
3537

3638
expected := "test msg 1 test msg 2 test msg 3"
3739

@@ -41,11 +43,12 @@ func TestWriteLoop(t *testing.T) {
4143
}
4244

4345
func TestReadLoop(t *testing.T) {
46+
ctx := context.Background()
4447
msgIn := make(chan fixIn)
4548
stream := "hello8=FIX.4.09=5blah10=103garbage8=FIX.4.09=4foo10=103"
4649

4750
parser := newParser(strings.NewReader(stream))
48-
go readLoop(parser, msgIn, nullLog{})
51+
go readLoop(ctx, parser, msgIn, nullLog{})
4952

5053
var tests = []struct {
5154
expectedMsg string

initiator.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,17 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
163163
return
164164
}
165165

166-
ctx, cancel := context.WithCancel(context.Background())
166+
ctx := context.Background()
167+
dialCtx, dialCancel := context.WithCancel(ctx)
168+
readWriteCtx, readWriteCancel := context.WithCancel(ctx)
167169

168170
// We start a goroutine in order to be able to cancel the dialer mid-connection
169171
// on receiving a stop signal to stop the initiator.
170172
go func() {
171173
select {
172174
case <-i.stopChan:
173-
cancel()
175+
dialCancel()
176+
readWriteCancel()
174177
case <-ctx.Done():
175178
return
176179
}
@@ -183,7 +186,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
183186
address := session.SocketConnectAddress[connectionAttempt%len(session.SocketConnectAddress)]
184187
session.log.OnEventf("Connecting to: %v", address)
185188

186-
netConn, err := dialer.DialContext(ctx, "tcp", address)
189+
netConn, err := dialer.DialContext(dialCtx, "tcp", address)
187190
if err != nil {
188191
session.log.OnEventf("Failed to connect: %v", err)
189192
goto reconnect
@@ -207,24 +210,26 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
207210

208211
msgIn = make(chan fixIn)
209212
msgOut = make(chan []byte)
210-
if err := session.connect(msgIn, msgOut); err != nil {
211-
session.log.OnEventf("Failed to initiate: %v", err)
212-
goto reconnect
213-
}
213+
214214

215-
go readLoop(newParser(bufio.NewReader(netConn)), msgIn, session.log)
215+
go readLoop(readWriteCtx,newParser(bufio.NewReader(netConn)), msgIn, session.log)
216216
disconnected = make(chan interface{})
217217
go func() {
218-
writeLoop(netConn, msgOut, session.log)
218+
writeLoop(readWriteCtx,netConn, msgOut, session.log)
219219
if err := netConn.Close(); err != nil {
220220
session.log.OnEvent(err.Error())
221221
}
222222
close(disconnected)
223223
}()
224224

225+
if err := session.connect(msgIn, msgOut); err != nil {
226+
session.log.OnEventf("Failed to initiate: %v", err)
227+
goto reconnect
228+
}
229+
225230
// This ensures we properly cleanup the goroutine and context used for
226231
// dial cancelation after successful connection.
227-
cancel()
232+
dialCancel()
228233

229234
select {
230235
case <-disconnected:
@@ -233,7 +238,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
233238
}
234239

235240
reconnect:
236-
cancel()
241+
dialCancel()
237242

238243
connectionAttempt++
239244
session.log.OnEventf("Reconnecting in %v", session.ReconnectInterval)

session.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -849,15 +849,15 @@ func (s *session) onAdmin(msg interface{}) {
849849
return
850850
}
851851

852-
if msg.err != nil {
853-
close(msg.err)
854-
}
855-
856852
s.messageIn = msg.messageIn
857853
s.messageOut = msg.messageOut
858854
s.sentReset = false
859855

860-
s.Connect(s)
856+
err := s.Connect(s)
857+
if msg.err != nil {
858+
msg.err <- err
859+
close(msg.err)
860+
}
861861

862862
case stopReq:
863863
s.Stop(s)

session_state.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,36 +36,37 @@ func (sm *stateMachine) Start(s *session) {
3636
sm.CheckSessionTime(s, time.Now())
3737
}
3838

39-
func (sm *stateMachine) Connect(session *session) {
39+
func (sm *stateMachine) Connect(session *session) error{
4040
// No special logon logic needed for FIX Acceptors.
4141
if !session.InitiateLogon {
4242
sm.setState(session, logonState{})
43-
return
43+
return nil
4444
}
4545

4646
if session.RefreshOnLogon {
4747
if err := session.store.Refresh(); err != nil {
4848
session.logError(err)
49-
return
49+
return err
5050
}
5151
}
5252

5353
if session.ResetOnLogon {
5454
if err := session.store.Reset(); err != nil {
5555
session.logError(err)
56-
return
56+
return err
5757
}
5858
}
5959

6060
session.log.OnEvent("Sending logon request")
6161
if err := session.sendLogon(); err != nil {
6262
session.logError(err)
63-
return
63+
return err
6464
}
6565

6666
sm.setState(session, logonState{})
6767
// Fire logon timeout event after the pre-configured delay period.
6868
time.AfterFunc(session.LogonTimeout, func() { session.sessionEvent <- internal.LogonTimeout })
69+
return nil
6970
}
7071

7172
func (sm *stateMachine) Stop(session *session) {

0 commit comments

Comments
 (0)