Skip to content

Commit 8ce187f

Browse files
AlexandrosKyriakakismichaelwilner
authored andcommitted
Make Initiator`s inbound channel buffered (#22)
We have observed slow consumption of FIX messages in the Initiator. Since the outbound channel is buffered, the event loop is sending too many messages at once but reads incoming messages one by one. This creates an inconsistency between incoming consumption messages and outgoing production messages. In this PR we resolve this issue by making the inbound channel buffered. So the Initiator can now read and write in a truly asynchronous way. This change is feature flagged. Can be used by setting `InitiatorInChanCapacity` with an integer representing the capacity of the inbound channel to the session settings config.
1 parent 4b2c4f2 commit 8ce187f

File tree

6 files changed

+43
-2
lines changed

6 files changed

+43
-2
lines changed

acceptor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
359359
}
360360

361361
a.sessionAddr.Store(sessID, netConn.RemoteAddr())
362-
msgIn := make(chan fixIn)
362+
msgIn := make(chan fixIn, session.InChanCapacity)
363363
msgOut := make(chan []byte)
364364

365365
if err := session.connect(msgIn, msgOut); err != nil {

config/configuration.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,16 @@ const (
489489
// Valid Values:
490490
// - Any positive integer
491491
MaxLatency string = "MaxLatency"
492+
493+
// InChanCapacity sets the maximum number of messages that can be buffered in the channel for incoming FIX messages.
494+
//
495+
// Required: No
496+
//
497+
// Default: 1
498+
//
499+
// Valid Values:
500+
// - A positive integer, or zero for an unbuffered channel
501+
InChanCapacity string = "InChanCapacity"
492502
)
493503

494504
const (

initiator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
205205
netConn = tlsConn
206206
}
207207

208-
msgIn = make(chan fixIn)
208+
msgIn = make(chan fixIn, session.InChanCapacity)
209209
msgOut = make(chan []byte)
210210
if err := session.connect(msgIn, msgOut); err != nil {
211211
session.log.OnEventf("Failed to initiate: %v", err)

internal/session_settings.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type SessionSettings struct {
2121
TimeZone *time.Location
2222
ResetSeqTime time.Time
2323
EnableResetSeqTime bool
24+
InChanCapacity int
2425

2526
// Required on logon for FIX.T.1 messages.
2627
DefaultApplVerID string

session.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,21 @@ func (s *session) checkBeginString(msg *Message) MessageRejectError {
757757
return nil
758758
}
759759

760+
func (s *session) drainMessageIn() {
761+
s.log.OnEventf("Draining %d messages from inbound channel...", len(s.messageIn))
762+
for {
763+
select {
764+
case fixInc, ok := <-s.messageIn:
765+
if !ok {
766+
return
767+
}
768+
s.Incoming(s, fixInc)
769+
default:
770+
return
771+
}
772+
}
773+
}
774+
760775
func (s *session) doReject(msg *Message, rej MessageRejectError) error {
761776
reply := msg.reverseRoute()
762777

@@ -824,6 +839,9 @@ func (s *session) onDisconnect() {
824839
s.messageOut = nil
825840
}
826841

842+
// s.messageIn is buffered so we need to drain it before disconnection
843+
s.drainMessageIn()
844+
827845
s.messageIn = nil
828846
}
829847

session_factory.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,18 @@ func (f sessionFactory) newSession(
431431
s.DisableMessagePersist = !persistMessages
432432
}
433433

434+
if settings.HasSetting(config.InChanCapacity) {
435+
if s.InChanCapacity, err = settings.IntSetting(config.InChanCapacity); err != nil {
436+
return
437+
} else if s.InChanCapacity < 0 {
438+
err = IncorrectFormatForSetting{Setting: config.InChanCapacity, Value: []byte(strconv.Itoa(s.InChanCapacity))}
439+
return
440+
}
441+
} else {
442+
// Default to 1 buffered message per channel
443+
s.InChanCapacity = 1
444+
}
445+
434446
if f.BuildInitiators {
435447
if err = f.buildInitiatorSettings(s, settings); err != nil {
436448
return

0 commit comments

Comments
 (0)