本文主要是介绍QuickFix/J:ResendRequest ResetRequest,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
当接收到消息,取出msg tag=34,判断序列号与预期序列号是否一致,如果比预期的大,则会发送ResendRequest请求;如果比预期的小,则认为是发生了重大事故,则会发送logout发起断连。
所以,对于fix而言,接收的序列号过大时,是可自我修复的,而当序列号过小,则是属于重大事故,会断开连接,人工干预处理后再重连。
相关逻辑在Session.next(Message message, boolean isProcessingQueuedMessages)中:
private boolean doTargetTooLow(Message msg) throws FieldNotFound, IOException {//判断是否为重传消息,如果不是重传消息,则logout,并且抛出异常if (!isPossibleDuplicate(msg)) {final int msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD);final String text = "MsgSeqNum too low, expecting " + getExpectedTargetNum()+ " but received " + msgSeqNum;generateLogout(text);throw new SessionException(text);}//reset消息判断逻辑return validatePossDup(msg);
}private boolean validatePossDup(Message msg) throws FieldNotFound, IOException {final Message.Header header = msg.getHeader();final String msgType = header.getString(MsgType.FIELD);if (!MsgType.SEQUENCE_RESET.equals(msgType)) {if (header.isSetField(OrigSendingTime.FIELD)) {final LocalDateTime origSendingTime = header.getUtcTimeStamp(OrigSendingTime.FIELD);final LocalDateTime sendingTime = header.getUtcTimeStamp(SendingTime.FIELD);if (origSendingTime.compareTo(sendingTime) > 0) {generateReject(msg, BAD_TIME_REJ_REASON, OrigSendingTime.FIELD);generateLogout(BAD_ORIG_TIME_TEXT);return false;}} else {// QFJ-703if (requiresOrigSendingTime) {generateReject(msg, SessionRejectReason.REQUIRED_TAG_MISSING,OrigSendingTime.FIELD);return false;}}}return true;
}
private void doTargetTooHigh(Message msg) throws FieldNotFound, IOException, InvalidMessage {final Message.Header header = msg.getHeader();final String beginString = header.getString(BeginString.FIELD);final int msgSeqNum = header.getInt(MsgSeqNum.FIELD);getLog().onEvent("MsgSeqNum too high, expecting " + getExpectedTargetNum() + " but received "+ msgSeqNum + ": " + msg);enqueueMessage(msg, msgSeqNum);//判断是否正在重传(ResendRequest)中,避免消息的重复接收并处理if (state.isResendRequested()) {final ResendRange range = state.getResendRange();if (!redundantResentRequestsAllowed && msgSeqNum >= range.getBeginSeqNo()) {int endSeqNo = range.getEndSeqNo();String end = endSeqNo == 0 ? "infinity" : Integer.toString(endSeqNo);getLog().onEvent("Already sent ResendRequest FROM: " + range.getBeginSeqNo() + " TO: " + end+ ". Not sending another.");return;}}//发起ResendRequestgenerateResendRequest(beginString, msgSeqNum);
}private void generateResendRequest(String beginString, int msgSeqNum) {final int beginSeqNo = getExpectedTargetNum();final int endSeqNo = msgSeqNum - 1;sendResendRequest(beginString, msgSeqNum, beginSeqNo, endSeqNo);
}
如果没有持久化消息,则只会发送resetRequest,只有持有化消息,才能重发消息。
如果是管理类消息(msgType=0A12345),一般都会跳过重传消息,除非ForceResendWhenCorruptedStore指定为true,其值默认为false。每次消息重传处理后,都会发送一次resetRequest,将对方的序列号重置为此次需发送的消息序列号+1,以与下次发送消息的序列号值匹配(递增)。
如下核心逻辑:
for (final String message : messages) {appMessageJustSent = false;final Message msg;try {// QFJ-626msg = parseMessage(message);msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD);} catch (final Exception e) {getLog().onErrorEvent("Error handling ResendRequest: failed to parse message (" + e.getMessage()+ "): " + message);// Note: a SequenceReset message will be generated to fill the gapcontinue;}if ((current != msgSeqNum) && begin == 0) {begin = current;}final String msgType = msg.getHeader().getString(MsgType.FIELD);//管理类消息,不会重传if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) {if (begin == 0) {begin = msgSeqNum;}} else {//业务类消息,设置原发送时间(OrigSendingTime)、是否为重传消息(PossDupFlag)initializeResendFields(msg);if (resendApproved(msg)) {if (begin != 0) {generateSequenceReset(receivedMessage, begin, msgSeqNum);}getLog().onEvent("Resending message: " + msgSeqNum);send(msg.toString());begin = 0;appMessageJustSent = true;} else {if (begin == 0) {begin = msgSeqNum;}}}current = msgSeqNum + 1;
}
这篇关于QuickFix/J:ResendRequest ResetRequest的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!