第十八章-消息重推-客户端发起

2024-05-16 01:28

本文主要是介绍第十八章-消息重推-客户端发起,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

消息重推主要是针对消费端消费失败,需要下次再消费的情况,RocketMq中提供了多个重推的入口,这里不一一介绍,只延续章节16.5中的重推逻辑继续讲。

ConsumeMessageConcurrentlyService.sendMessageBack

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {/*** 消息消费的重试策略* -1,不重试,直接放到DLQ(死信队列)* 0,由broker控制重试频率,默认是这个* >0,由客户自己控制重试频率*/int delayLevel = context.getDelayLevelWhenNextConsume();// 在消费重推前,将 namespace包裹topicmsg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {// 进一步调用this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}

DefaultMQPushConsumerImpl.sendMessageBack

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {// 根据brokerName查找对应的broker 地址String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());// 进一步调用this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);// 调用失败,那就重新生成一条新的消息,走正常发送消息的口径 send 方法来发送消息Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());// 正常消息发送口径this.mQClientFactory.getDefaultMQProducer().send(newMsg);} finally {// 将topic卸掉namespace的包装msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}
}

MQClientAPIImpl.consumerSendMessageBack

public void consumerSendMessageBack(final String addr, // broker 地址final MessageExt msg, // 要重推的消息对象final String consumerGroup, // 消费者组名final int delayLevel,  // 消息重试策略final long timeoutMillis, // 超时时间,默认5000msfinal int maxConsumeRetryTimes // 最大重试次数,默认16次
) throws RemotingException, MQBrokerException, InterruptedException {// 组装发送到Broker的请求ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();// 消息重推的请求码是 RequestCode.CONSUMER_SEND_MSG_BACK,Broker端也根据这个码来处理的RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);// 设置消费者组名requestHeader.setGroup(consumerGroup);// 保留原始topic,因为消息重推,要修改这个topicrequestHeader.setOriginTopic(msg.getTopic());// 该消息的物理偏移,也就是在commitlog文件中的偏移requestHeader.setOffset(msg.getCommitLogOffset());// 消息重试策略requestHeader.setDelayLevel(delayLevel);// 保留消息的原始msgIdrequestHeader.setOriginMsgId(msg.getMsgId());// 设置最大重试次数requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);// 通过vip通道发送重试消息,vip通道就是原broker的端口-2,进一步调用RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;// 调用发送成功,直接返回上一层switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());
}

NettyRemotingClient.invokeSync

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {// 记录开始时间long beginStartTime = System.currentTimeMillis();// netty channelfinal Channel channel = this.getAndCreateChannel(addr);// 通道是激活状态if (channel != null && channel.isActive()) {try {// 执行前钩子方法,可以自定义doBeforeRpcHooks(addr, request);// 计算所花费时间,是否超过超时时间long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {// 超时,则抛出超时异常throw new RemotingTimeoutException("invokeSync call timeout");}// 再进一步调用RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);// 执行后钩子方法,可以自定义doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);// 返回调用结果return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {// channel 非激活状态,则关闭,并抛出连接异常this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}
}

NettyRemotingClient.invokeSyncImpl

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque(); // 唯一标识此次请求的idtry {// 将响应处理封装成Future,并存放到 responseTable map 中,等待消息拉取成功后再处理final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();// 客户端channel(这里就是消费端),发送请求到服务端broker,并注册拉取后的监听channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Override// operationComplete 方法由谁调用呢,这个问题在`章节16.4`中讲过,可以看 ResponseFuture.executeInvokeCallback 方法,由channel收到拉取的消息后,再调用public void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {// 拉取成功,设置发送状态成功标志responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}// 请求失败处理responseTable.remove(opaque);responseFuture.setCause(f.cause()); //失败原因responseFuture.putResponse(null); // 返回为nulllog.warn("send a request command to channel <" + addr + "> failed.");}});// 等待发送完成RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);// null ,表示超时了if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {// 表示发送失败了,抛出失败原因throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}// 返回结果return responseCommand;} finally {this.responseTable.remove(opaque);}
}

至此,客户端的操作完成了,重点工作在Broker端的处理,将在下一章介绍。

这篇关于第十八章-消息重推-客户端发起的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/993478

相关文章

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Python手搓邮件发送客户端

《Python手搓邮件发送客户端》这篇文章主要为大家详细介绍了如何使用Python手搓邮件发送客户端,支持发送邮件,附件,定时发送以及个性化邮件正文,感兴趣的可以了解下... 目录1. 简介2.主要功能2.1.邮件发送功能2.2.个性签名功能2.3.定时发送功能2. 4.附件管理2.5.配置加载功能2.6.

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

SpringBoot 自定义消息转换器使用详解

《SpringBoot自定义消息转换器使用详解》本文详细介绍了SpringBoot消息转换器的知识,并通过案例操作演示了如何进行自定义消息转换器的定制开发和使用,感兴趣的朋友一起看看吧... 目录一、前言二、SpringBoot 内容协商介绍2.1 什么是内容协商2.2 内容协商机制深入理解2.2.1 内容

SpringBoot实现websocket服务端及客户端的详细过程

《SpringBoot实现websocket服务端及客户端的详细过程》文章介绍了WebSocket通信过程、服务端和客户端的实现,以及可能遇到的问题及解决方案,感兴趣的朋友一起看看吧... 目录一、WebSocket通信过程二、服务端实现1.pom文件添加依赖2.启用Springboot对WebSocket

QT实现TCP客户端自动连接

《QT实现TCP客户端自动连接》这篇文章主要为大家详细介绍了QT中一个TCP客户端自动连接的测试模型,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录版本 1:没有取消按钮 测试效果测试代码版本 2:有取消按钮测试效果测试代码版本 1:没有取消按钮 测试效果缺陷:无法手动停

Nacos客户端本地缓存和故障转移方式

《Nacos客户端本地缓存和故障转移方式》Nacos客户端在从Server获得服务时,若出现故障,会通过ServiceInfoHolder和FailoverReactor进行故障转移,ServiceI... 目录1. ServiceInfoHolder本地缓存目录2. FailoverReactorinit

Java Websocket实例【服务端与客户端实现全双工通讯】

Java Websocket实例【服务端与客户端实现全双工通讯】 现很多网站为了实现即时通讯,所用的技术都是轮询(polling)。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发 出HTTP request,然后由服务器返回最新的数据给客服端的浏览器。这种传统的HTTP request 的模式带来很明显的缺点 – 浏 览器需要不断的向服务器发出请求,然而HTTP

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队