Skip to content

Commit 2497d57

Browse files
Merge pull request #736 from quickfixgo/buffered-fix-in-channel
Make both initiator and acceptor fixIn channels buffered
2 parents 4b2c4f2 + 8ce187f commit 2497d57

File tree

6 files changed

+43
-2
lines changed

6 files changed

+43
-2
lines changed

acceptor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
359359
}
360360

361361
a.sessionAddr.Store(sessID, netConn.RemoteAddr())
362-
msgIn := make(chan fixIn)
362+
msgIn := make(chan fixIn, session.InChanCapacity)
363363
msgOut := make(chan []byte)
364364

365365
if err := session.connect(msgIn, msgOut); err != nil {

config/configuration.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,16 @@ const (
489489
// Valid Values:
490490
// - Any positive integer
491491
MaxLatency string = "MaxLatency"
492+
493+
// InChanCapacity sets the maximum number of messages that can be buffered in the channel for incoming FIX messages.
494+
//
495+
// Required: No
496+
//
497+
// Default: 1
498+
//
499+
// Valid Values:
500+
// - A positive integer, or zero for an unbuffered channel
501+
InChanCapacity string = "InChanCapacity"
492502
)
493503

494504
const (

initiator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
205205
netConn = tlsConn
206206
}
207207

208-
msgIn = make(chan fixIn)
208+
msgIn = make(chan fixIn, session.InChanCapacity)
209209
msgOut = make(chan []byte)
210210
if err := session.connect(msgIn, msgOut); err != nil {
211211
session.log.OnEventf("Failed to initiate: %v", err)

internal/session_settings.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type SessionSettings struct {
2121
TimeZone *time.Location
2222
ResetSeqTime time.Time
2323
EnableResetSeqTime bool
24+
InChanCapacity int
2425

2526
// Required on logon for FIX.T.1 messages.
2627
DefaultApplVerID string

session.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,21 @@ func (s *session) checkBeginString(msg *Message) MessageRejectError {
757757
return nil
758758
}
759759

760+
func (s *session) drainMessageIn() {
761+
s.log.OnEventf("Draining %d messages from inbound channel...", len(s.messageIn))
762+
for {
763+
select {
764+
case fixInc, ok := <-s.messageIn:
765+
if !ok {
766+
return
767+
}
768+
s.Incoming(s, fixInc)
769+
default:
770+
return
771+
}
772+
}
773+
}
774+
760775
func (s *session) doReject(msg *Message, rej MessageRejectError) error {
761776
reply := msg.reverseRoute()
762777

@@ -824,6 +839,9 @@ func (s *session) onDisconnect() {
824839
s.messageOut = nil
825840
}
826841

842+
// s.messageIn is buffered so we need to drain it before disconnection
843+
s.drainMessageIn()
844+
827845
s.messageIn = nil
828846
}
829847

session_factory.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,18 @@ func (f sessionFactory) newSession(
431431
s.DisableMessagePersist = !persistMessages
432432
}
433433

434+
if settings.HasSetting(config.InChanCapacity) {
435+
if s.InChanCapacity, err = settings.IntSetting(config.InChanCapacity); err != nil {
436+
return
437+
} else if s.InChanCapacity < 0 {
438+
err = IncorrectFormatForSetting{Setting: config.InChanCapacity, Value: []byte(strconv.Itoa(s.InChanCapacity))}
439+
return
440+
}
441+
} else {
442+
// Default to 1 buffered message per channel
443+
s.InChanCapacity = 1
444+
}
445+
434446
if f.BuildInitiators {
435447
if err = f.buildInitiatorSettings(s, settings); err != nil {
436448
return

0 commit comments

Comments
 (0)