diff --git a/quickfixj-core/src/main/doc/usermanual/usage/configuration.html b/quickfixj-core/src/main/doc/usermanual/usage/configuration.html index f7cf0195eb..772be4cc09 100644 --- a/quickfixj-core/src/main/doc/usermanual/usage/configuration.html +++ b/quickfixj-core/src/main/doc/usermanual/usage/configuration.html @@ -1309,6 +1309,12 @@

QuickFIX Settings

Y
N N + + AllowPosDup + Whether to allow PossDupFlag and OrigSendingTime when sending messages. This is useful on occasions, primarily when a QFJ application is acting as purely a pass-through/monitoring hop. + Y
N + N + diff --git a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java index 75f6cb8e69..355d7612eb 100644 --- a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java +++ b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java @@ -213,6 +213,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf final boolean enableNextExpectedMsgSeqNum = getSetting(settings, sessionID, Session.SETTING_ENABLE_NEXT_EXPECTED_MSG_SEQ_NUM, false); final boolean enableLastMsgSeqNumProcessed = getSetting(settings, sessionID, Session.SETTING_ENABLE_LAST_MSG_SEQ_NUM_PROCESSED, false); final int resendRequestChunkSize = getSetting(settings, sessionID, Session.SETTING_RESEND_REQUEST_CHUNK_SIZE, Session.DEFAULT_RESEND_RANGE_CHUNK_SIZE); + final boolean allowPossDup = getSetting(settings, sessionID, Session.SETTING_ALLOW_POS_DUP_MESSAGES, false); final int[] logonIntervals = getLogonIntervalsInSeconds(settings, sessionID); final Set allowedRemoteAddresses = getInetAddresses(settings, sessionID); @@ -231,7 +232,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum, enableLastMsgSeqNumProcessed, - validateChecksum, logonTags, heartBeatTimeoutMultiplier); + validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup); session.setLogonTimeout(logonTimeout); session.setLogoutTimeout(logoutTimeout); diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 41600a1dc5..3369a989aa 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -373,6 +373,11 @@ public class Session implements Closeable { public static final String SETTING_VALIDATE_CHECKSUM = "ValidateChecksum"; + /** + * Option so that the session does not remove PossDupFlag (43) and OrigSendingTime (122) information when sending. + */ + public static final String SETTING_ALLOW_POS_DUP_MESSAGES = "AllowPosDup"; + private static final ConcurrentMap sessions = new ConcurrentHashMap<>(); private final Application application; @@ -423,6 +428,7 @@ public class Session implements Closeable { private boolean enableNextExpectedMsgSeqNum = false; private boolean enableLastMsgSeqNumProcessed = false; private boolean validateChecksum = true; + private boolean allowPosDup = false; private int maxScheduledWriteRequests = 0; @@ -464,7 +470,7 @@ public class Session implements Closeable { messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, - false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); } Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID, @@ -482,7 +488,8 @@ public class Session implements Closeable { boolean forceResendWhenCorruptedStore, Set allowedRemoteAddresses, boolean validateIncomingMessage, int resendRequestChunkSize, boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed, - boolean validateChecksum, List logonTags, double heartBeatTimeoutMultiplier) { + boolean validateChecksum, List logonTags, double heartBeatTimeoutMultiplier, + boolean allowPossDup) { this.application = application; this.sessionID = sessionID; this.sessionSchedule = sessionSchedule; @@ -517,6 +524,7 @@ public class Session implements Closeable { this.enableLastMsgSeqNumProcessed = enableLastMsgSeqNumProcessed; this.validateChecksum = validateChecksum; this.logonTags = logonTags; + this.allowPosDup = allowPossDup; final Log engineLog = (logFactory != null) ? logFactory.create(sessionID) : null; if (engineLog instanceof SessionStateListener) { @@ -2676,7 +2684,7 @@ private void resetState() { * information already is present). * * The returned status flag is included for - * compatibility with the JNI API but it's usefulness is questionable. + * compatibility with the JNI API but its usefulness is questionable. * In QuickFIX/J, the message is transmitted using asynchronous network I/O so the boolean * only indicates the message was successfully queued for transmission. An error could still * occur before the message data is actually sent. @@ -2685,6 +2693,30 @@ private void resetState() { * @return a status flag indicating whether the write to the network layer was successful. */ public boolean send(Message message) { + return send(message, this.allowPosDup); + } + + /** + * Send a message to a counterparty. Sequence numbers and information about the sender + * and target identification will be added automatically (or overwritten if that + * information already is present). + * + * The returned status flag is included for + * compatibility with the JNI API but its usefulness is questionable. + * In QuickFIX/J, the message is transmitted using asynchronous network I/O so the boolean + * only indicates the message was successfully queued for transmission. An error could still + * occur before the message data is actually sent. + * + * @param message the message to send + * @param allowPosDup whether to allow PossDupFlag and OrigSendingTime in the message + * @return a status flag indicating whether the write to the network layer was successful. + */ + public boolean send(Message message, boolean allowPosDup) { + // Send message as is if allowPosDup flag is set + if (allowPosDup) { + return sendRaw(message, 0); + } + message.getHeader().removeField(PossDupFlag.FIELD); message.getHeader().removeField(OrigSendingTime.FIELD); return sendRaw(message, 0); @@ -2998,6 +3030,10 @@ public boolean isAllowedForSession(InetAddress remoteInetAddress) { || allowedRemoteAddresses.contains(remoteInetAddress); } + public void setAllowPosDup(boolean allowPosDup) { + this.allowPosDup = allowPosDup; + } + /** * Closes session resources and unregisters session. This is for internal * use and should typically not be called by an user application. diff --git a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java index 8be5bbad1f..af6ad7a851 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java +++ b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java @@ -111,6 +111,7 @@ public static final class Builder { private boolean enableNextExpectedMsgSeqNum = false; private final boolean enableLastMsgSeqNumProcessed = false; private final boolean validateChecksum = true; + private final boolean allowPosDup = false; private List logonTags = new ArrayList<>(); public Session build() { @@ -123,7 +124,7 @@ public Session build() { resetOnError, disconnectOnError, disableHeartBeatCheck, false, rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum, - enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier); + enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup); } public Builder setBeginString(final String beginString) { diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index d7e3741177..c59c9280c6 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -103,7 +103,7 @@ public void testDisposalOfFileResources() throws Exception { new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false, false, false, false, false, true, false, 1.5, null, true, new int[] { 5 }, false, false, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { // Simulate socket disconnect session.setResponder(null); } @@ -144,7 +144,7 @@ public void testNondisposableFileResources() throws Exception { new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false, false, false, false, false, true, false, 1.5, null, true, new int[] { 5 }, false, false, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { // Simulate socket disconnect session.setResponder(null); @@ -2105,7 +2105,7 @@ private void testSequenceResetGapFillWithChunkSize(int chunkSize) UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, false, false, false, false, true, false, true, false, null, true, - chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) { + chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2167,7 +2167,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); Responder mockResponder = mock(Responder.class); when(mockResponder.send(anyString())).thenReturn(true); @@ -2215,7 +2215,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); Responder mockResponder = mock(Responder.class); when(mockResponder.send(anyString())).thenReturn(true); @@ -2264,7 +2264,7 @@ public void testMsgSeqNumTooHighWithDisconnectOnError() throws Exception { UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, false, disconnectOnError, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2300,7 +2300,7 @@ public void testTimestampPrecision() throws Exception { UtcTimestampPrecision.NANOS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, false, disconnectOnError, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2352,7 +2352,7 @@ private void testLargeQueue(int N) throws Exception { new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2468,7 +2468,7 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2982,4 +2982,70 @@ public void disconnect() { } } + @Test + public void testSendWithAllowPosDupAsFalse_ShouldRemovePossDupFlagAndOrigSendingTime() throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); + final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null); + UnitTestResponder responder = new UnitTestResponder(); + session.setResponder(responder); + logonTo(session); + + session.send(createPossDupAppMessage(1), false); + + final Message sentMessage = new Message(responder.sentMessageData); + + assertFalse(sentMessage.getHeader().isSetField(PossDupFlag.FIELD)); + assertFalse(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD)); + } + + @Test + public void testSendWithAllowPosDupAsFalse_ShouldRemovePossDupFlagAndOrigSendingTime_GivenAllowPosDupConfigurationPropertySetToTrue() throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); + final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null); + UnitTestResponder responder = new UnitTestResponder(); + session.setResponder(responder); + session.setAllowPosDup(true); + logonTo(session); + session.send(createPossDupAppMessage(1), false); + + final Message sentMessage = new Message(responder.sentMessageData); + + assertFalse(sentMessage.getHeader().isSetField(PossDupFlag.FIELD)); + assertFalse(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD)); + } + + @Test + public void testSendWithAllowPosDupAsTrue_ShouldKeepPossDupFlagAndOrigSendingTime() throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); + final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null); + UnitTestResponder responder = new UnitTestResponder(); + session.setResponder(responder); + logonTo(session); + session.send(createPossDupAppMessage(1), true); + + final Message sentMessage = new Message(responder.sentMessageData); + + assertTrue(sentMessage.getHeader().isSetField(PossDupFlag.FIELD)); + assertTrue(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD)); + } + + @Test + public void testSend_ShouldKeepPossDupFlagAndOrigSendingTime_GivenAllowPosDupConfigurationPropertySetToTrue() throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + final SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); + final Session session = SessionFactoryTestSupport.createSession(sessionID, application, false, false, true, true, null); + UnitTestResponder responder = new UnitTestResponder(); + session.setResponder(responder); + session.setAllowPosDup(true); + logonTo(session); + session.send(createPossDupAppMessage(1)); + + final Message sentMessage = new Message(responder.sentMessageData); + + assertTrue(sentMessage.getHeader().isSetField(PossDupFlag.FIELD)); + assertTrue(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD)); + } }