第十八章-消息重推-客户端发起

2024-05-16 01:28

本文主要是介绍第十八章-消息重推-客户端发起,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

消息重推主要是针对消费端消费失败,需要下次再消费的情况,RocketMq中提供了多个重推的入口,这里不一一介绍,只延续章节16.5中的重推逻辑继续讲。

ConsumeMessageConcurrentlyService.sendMessageBack

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {/*** 消息消费的重试策略* -1,不重试,直接放到DLQ(死信队列)* 0,由broker控制重试频率,默认是这个* >0,由客户自己控制重试频率*/int delayLevel = context.getDelayLevelWhenNextConsume();// 在消费重推前,将 namespace包裹topicmsg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {// 进一步调用this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}

DefaultMQPushConsumerImpl.sendMessageBack

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {// 根据brokerName查找对应的broker 地址String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());// 进一步调用this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);// 调用失败,那就重新生成一条新的消息,走正常发送消息的口径 send 方法来发送消息Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());// 正常消息发送口径this.mQClientFactory.getDefaultMQProducer().send(newMsg);} finally {// 将topic卸掉namespace的包装msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}
}

MQClientAPIImpl.consumerSendMessageBack

public void consumerSendMessageBack(final String addr, // broker 地址final MessageExt msg, // 要重推的消息对象final String consumerGroup, // 消费者组名final int delayLevel,  // 消息重试策略final long timeoutMillis, // 超时时间,默认5000msfinal int maxConsumeRetryTimes // 最大重试次数,默认16次
) throws RemotingException, MQBrokerException, InterruptedException {// 组装发送到Broker的请求ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();// 消息重推的请求码是 RequestCode.CONSUMER_SEND_MSG_BACK,Broker端也根据这个码来处理的RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);// 设置消费者组名requestHeader.setGroup(consumerGroup);// 保留原始topic,因为消息重推,要修改这个topicrequestHeader.setOriginTopic(msg.getTopic());// 该消息的物理偏移,也就是在commitlog文件中的偏移requestHeader.setOffset(msg.getCommitLogOffset());// 消息重试策略requestHeader.setDelayLevel(delayLevel);// 保留消息的原始msgIdrequestHeader.setOriginMsgId(msg.getMsgId());// 设置最大重试次数requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);// 通过vip通道发送重试消息,vip通道就是原broker的端口-2,进一步调用RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;// 调用发送成功,直接返回上一层switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());
}

NettyRemotingClient.invokeSync

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {// 记录开始时间long beginStartTime = System.currentTimeMillis();// netty channelfinal Channel channel = this.getAndCreateChannel(addr);// 通道是激活状态if (channel != null && channel.isActive()) {try {// 执行前钩子方法,可以自定义doBeforeRpcHooks(addr, request);// 计算所花费时间,是否超过超时时间long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {// 超时,则抛出超时异常throw new RemotingTimeoutException("invokeSync call timeout");}// 再进一步调用RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);// 执行后钩子方法,可以自定义doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);// 返回调用结果return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {// channel 非激活状态,则关闭,并抛出连接异常this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}
}

NettyRemotingClient.invokeSyncImpl

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque(); // 唯一标识此次请求的idtry {// 将响应处理封装成Future,并存放到 responseTable map 中,等待消息拉取成功后再处理final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();// 客户端channel(这里就是消费端),发送请求到服务端broker,并注册拉取后的监听channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Override// operationComplete 方法由谁调用呢,这个问题在`章节16.4`中讲过,可以看 ResponseFuture.executeInvokeCallback 方法,由channel收到拉取的消息后,再调用public void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {// 拉取成功,设置发送状态成功标志responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}// 请求失败处理responseTable.remove(opaque);responseFuture.setCause(f.cause()); //失败原因responseFuture.putResponse(null); // 返回为nulllog.warn("send a request command to channel <" + addr + "> failed.");}});// 等待发送完成RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);// null ,表示超时了if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {// 表示发送失败了,抛出失败原因throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}// 返回结果return responseCommand;} finally {this.responseTable.remove(opaque);}
}

至此,客户端的操作完成了,重点工作在Broker端的处理,将在下一章介绍。

这篇关于第十八章-消息重推-客户端发起的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/993478

相关文章

Java Websocket实例【服务端与客户端实现全双工通讯】

Java Websocket实例【服务端与客户端实现全双工通讯】 现很多网站为了实现即时通讯,所用的技术都是轮询(polling)。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发 出HTTP request,然后由服务器返回最新的数据给客服端的浏览器。这种传统的HTTP request 的模式带来很明显的缺点 – 浏 览器需要不断的向服务器发出请求,然而HTTP

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Android 友盟消息推送集成遇到的问题

友盟消息推送遇到的问题 集成友盟消息推送,步骤根据提供的技术文档接入便可。可是当你集成到项目中去的时候,可能并不是一帆风顺就搞定,因为你项目里面是可能集成了其他的sdk(比如支付宝,微信,七鱼等等三方的sdk)。那么这个时候,再加上友盟的消息推送sdk集成可能就会出现问题。 问题清单 友盟消息推送sdk和支付宝sdk冲突问题 后台配置了消息推送,也显示发送成功,但是手机没有收到消息通知

Redis 客户端Jedis使用---连接池

Jedis 是Redis 的Java客户端,通过一段时间的使用,jedis基本实现redis的所有功能,并且jedis在客户端实现redis数据分片功能,Redis本身是没有数据分布功能。 一、下载jedis 代码 jedis 代码地址:https://github.com/xetorthio/jedis 再次感受到开源的强大。呵呵,大家有时间可以看看源码。 二、项目中如何使用Jedi

Java Socket服务器端与客户端的编程步骤总结

一,InetAddress类: InetAddress类没有构造方法,所以不能直接new出一个对象; 可以通过InetAddress类的静态方法获得InetAddress的对象; InetAddress.getLocalHost(); InetAddress.getByName(""); 类主要方法: String - address.getHostName(); String - addre

9.7(UDP局域网多客户端聊天室)

服务器端 #include<myhead.h>#define SERIP "192.168.0.132"#define SERPORT 8888#define MAX 50//定义用户结构体typedef struct{struct sockaddr_in addr;int flag;}User;User users[MAX];//用户列表void add_user(struct s

【知识分享】MQTT实战-使用mosquitto客户端连接emqx服务器

一、简介     MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的通信协议,旨在实现物联网设备之间的低带宽、高延迟的通信。MQTT协议设计简洁,使用TCP/IP协议进行通信,适用于各种网络环境,尤其适合在有限的网络带宽和不稳定的网络连接条件下进行通信。     MQTT的工作原理是基于发布/订阅模式的消息传递,它包括两个主要

消息队列的理解和应用场景

知乎上的一个通俗理解的优秀答案 by 祁达方 小红是小明的姐姐。 小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走。久而久之,两人都觉得麻烦。 后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看。 书架就是一个消息队列,小红是生产者,小明是