本文主要是介绍第十八章-消息重推-客户端发起,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
消息重推主要是针对消费端消费失败,需要下次再消费的情况,RocketMq中提供了多个重推的入口,这里不一一介绍,只延续章节16.5
中的重推逻辑继续讲。
ConsumeMessageConcurrentlyService.sendMessageBack
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {/*** 消息消费的重试策略* -1,不重试,直接放到DLQ(死信队列)* 0,由broker控制重试频率,默认是这个* >0,由客户自己控制重试频率*/int delayLevel = context.getDelayLevelWhenNextConsume();// 在消费重推前,将 namespace包裹topicmsg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {// 进一步调用this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}
DefaultMQPushConsumerImpl.sendMessageBack
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {// 根据brokerName查找对应的broker 地址String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());// 进一步调用this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);// 调用失败,那就重新生成一条新的消息,走正常发送消息的口径 send 方法来发送消息Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());// 正常消息发送口径this.mQClientFactory.getDefaultMQProducer().send(newMsg);} finally {// 将topic卸掉namespace的包装msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}
}
MQClientAPIImpl.consumerSendMessageBack
public void consumerSendMessageBack(final String addr, // broker 地址final MessageExt msg, // 要重推的消息对象final String consumerGroup, // 消费者组名final int delayLevel, // 消息重试策略final long timeoutMillis, // 超时时间,默认5000msfinal int maxConsumeRetryTimes // 最大重试次数,默认16次
) throws RemotingException, MQBrokerException, InterruptedException {// 组装发送到Broker的请求ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();// 消息重推的请求码是 RequestCode.CONSUMER_SEND_MSG_BACK,Broker端也根据这个码来处理的RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);// 设置消费者组名requestHeader.setGroup(consumerGroup);// 保留原始topic,因为消息重推,要修改这个topicrequestHeader.setOriginTopic(msg.getTopic());// 该消息的物理偏移,也就是在commitlog文件中的偏移requestHeader.setOffset(msg.getCommitLogOffset());// 消息重试策略requestHeader.setDelayLevel(delayLevel);// 保留消息的原始msgIdrequestHeader.setOriginMsgId(msg.getMsgId());// 设置最大重试次数requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);// 通过vip通道发送重试消息,vip通道就是原broker的端口-2,进一步调用RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;// 调用发送成功,直接返回上一层switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());
}
NettyRemotingClient.invokeSync
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {// 记录开始时间long beginStartTime = System.currentTimeMillis();// netty channelfinal Channel channel = this.getAndCreateChannel(addr);// 通道是激活状态if (channel != null && channel.isActive()) {try {// 执行前钩子方法,可以自定义doBeforeRpcHooks(addr, request);// 计算所花费时间,是否超过超时时间long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {// 超时,则抛出超时异常throw new RemotingTimeoutException("invokeSync call timeout");}// 再进一步调用RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);// 执行后钩子方法,可以自定义doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);// 返回调用结果return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {// channel 非激活状态,则关闭,并抛出连接异常this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}
}
NettyRemotingClient.invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque(); // 唯一标识此次请求的idtry {// 将响应处理封装成Future,并存放到 responseTable map 中,等待消息拉取成功后再处理final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();// 客户端channel(这里就是消费端),发送请求到服务端broker,并注册拉取后的监听channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Override// operationComplete 方法由谁调用呢,这个问题在`章节16.4`中讲过,可以看 ResponseFuture.executeInvokeCallback 方法,由channel收到拉取的消息后,再调用public void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {// 拉取成功,设置发送状态成功标志responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}// 请求失败处理responseTable.remove(opaque);responseFuture.setCause(f.cause()); //失败原因responseFuture.putResponse(null); // 返回为nulllog.warn("send a request command to channel <" + addr + "> failed.");}});// 等待发送完成RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);// null ,表示超时了if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {// 表示发送失败了,抛出失败原因throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}// 返回结果return responseCommand;} finally {this.responseTable.remove(opaque);}
}
至此,客户端的操作完成了,重点工作在Broker端的处理,将在下一章介绍。
这篇关于第十八章-消息重推-客户端发起的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!