第十八章-消息重推-客户端发起

2024-05-16 01:28

本文主要是介绍第十八章-消息重推-客户端发起,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

消息重推主要是针对消费端消费失败,需要下次再消费的情况,RocketMq中提供了多个重推的入口,这里不一一介绍,只延续章节16.5中的重推逻辑继续讲。

ConsumeMessageConcurrentlyService.sendMessageBack

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {/*** 消息消费的重试策略* -1,不重试,直接放到DLQ(死信队列)* 0,由broker控制重试频率,默认是这个* >0,由客户自己控制重试频率*/int delayLevel = context.getDelayLevelWhenNextConsume();// 在消费重推前,将 namespace包裹topicmsg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {// 进一步调用this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}

DefaultMQPushConsumerImpl.sendMessageBack

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {// 根据brokerName查找对应的broker 地址String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());// 进一步调用this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);// 调用失败,那就重新生成一条新的消息,走正常发送消息的口径 send 方法来发送消息Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());// 正常消息发送口径this.mQClientFactory.getDefaultMQProducer().send(newMsg);} finally {// 将topic卸掉namespace的包装msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}
}

MQClientAPIImpl.consumerSendMessageBack

public void consumerSendMessageBack(final String addr, // broker 地址final MessageExt msg, // 要重推的消息对象final String consumerGroup, // 消费者组名final int delayLevel,  // 消息重试策略final long timeoutMillis, // 超时时间,默认5000msfinal int maxConsumeRetryTimes // 最大重试次数,默认16次
) throws RemotingException, MQBrokerException, InterruptedException {// 组装发送到Broker的请求ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();// 消息重推的请求码是 RequestCode.CONSUMER_SEND_MSG_BACK,Broker端也根据这个码来处理的RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);// 设置消费者组名requestHeader.setGroup(consumerGroup);// 保留原始topic,因为消息重推,要修改这个topicrequestHeader.setOriginTopic(msg.getTopic());// 该消息的物理偏移,也就是在commitlog文件中的偏移requestHeader.setOffset(msg.getCommitLogOffset());// 消息重试策略requestHeader.setDelayLevel(delayLevel);// 保留消息的原始msgIdrequestHeader.setOriginMsgId(msg.getMsgId());// 设置最大重试次数requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);// 通过vip通道发送重试消息,vip通道就是原broker的端口-2,进一步调用RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;// 调用发送成功,直接返回上一层switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());
}

NettyRemotingClient.invokeSync

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {// 记录开始时间long beginStartTime = System.currentTimeMillis();// netty channelfinal Channel channel = this.getAndCreateChannel(addr);// 通道是激活状态if (channel != null && channel.isActive()) {try {// 执行前钩子方法,可以自定义doBeforeRpcHooks(addr, request);// 计算所花费时间,是否超过超时时间long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {// 超时,则抛出超时异常throw new RemotingTimeoutException("invokeSync call timeout");}// 再进一步调用RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);// 执行后钩子方法,可以自定义doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);// 返回调用结果return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {// channel 非激活状态,则关闭,并抛出连接异常this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}
}

NettyRemotingClient.invokeSyncImpl

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque(); // 唯一标识此次请求的idtry {// 将响应处理封装成Future,并存放到 responseTable map 中,等待消息拉取成功后再处理final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();// 客户端channel(这里就是消费端),发送请求到服务端broker,并注册拉取后的监听channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Override// operationComplete 方法由谁调用呢,这个问题在`章节16.4`中讲过,可以看 ResponseFuture.executeInvokeCallback 方法,由channel收到拉取的消息后,再调用public void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {// 拉取成功,设置发送状态成功标志responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}// 请求失败处理responseTable.remove(opaque);responseFuture.setCause(f.cause()); //失败原因responseFuture.putResponse(null); // 返回为nulllog.warn("send a request command to channel <" + addr + "> failed.");}});// 等待发送完成RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);// null ,表示超时了if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {// 表示发送失败了,抛出失败原因throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}// 返回结果return responseCommand;} finally {this.responseTable.remove(opaque);}
}

至此,客户端的操作完成了,重点工作在Broker端的处理,将在下一章介绍。

这篇关于第十八章-消息重推-客户端发起的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/993478

相关文章

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

Redis客户端工具之RedisInsight的下载方式

《Redis客户端工具之RedisInsight的下载方式》RedisInsight是Redis官方提供的图形化客户端工具,下载步骤包括访问Redis官网、选择RedisInsight、下载链接、注册... 目录Redis客户端工具RedisInsight的下载一、点击进入Redis官网二、点击RedisI

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

使用Java实现获取客户端IP地址

《使用Java实现获取客户端IP地址》这篇文章主要为大家详细介绍了如何使用Java实现获取客户端IP地址,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 首先是获取 IP,直接上代码import org.springframework.web.context.request.Requ

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

使用C/C++调用libcurl调试消息的方式

《使用C/C++调用libcurl调试消息的方式》在使用C/C++调用libcurl进行HTTP请求时,有时我们需要查看请求的/应答消息的内容(包括请求头和请求体)以方便调试,libcurl提供了多种... 目录1. libcurl 调试工具简介2. 输出请求消息使用 CURLOPT_VERBOSE使用 C

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf