RocketMQ的长轮询(Long Polling)实现分析

2023-10-16 21:44

本文主要是介绍RocketMQ的长轮询(Long Polling)实现分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

前言

长轮询

1.实现步骤

1.1客户端轮询发送请求

1.2服务端处理数据

1.3客户端接收数据

2.实现实例

RocketMQ长轮询

1.PullMessage服务

2.PullMessageProcessor服务

3.PullCallback回调

总结


前言

消息队列一般在消费端都会提供push和pull两种模式,RocketMQ同样实现了这两种模式,分别提供了两个实现类:DefaultMQPushConsumer和DefaultMQPullConsumer;两种方式各有优势:

push模式:推送模式,即服务端有数据之后立马推送消息给客户端,需要客户端和服务器建立长连接,实时性很高,对客户端来说也简单,接收处理消息即可;缺点就是服务端不知道客户端处理消息的能力,可能会导致数据积压,同时也增加了服务端的工作量,影响服务端的性能;

pull模式:拉取模式,即客户端主动去服务端拉取数据,主动权在客户端,拉取数据,然后处理数据,再拉取数据,一直循环下去,具体拉取数据的时间间隔不好设定,太短可能会导致大量的连接拉取不到数据,太长导致数据接收不及时; RocketMQ使用了长轮询的方式,兼顾了push和pull两种模式的优点,下面首先对长轮询做简单介绍,进而分析RocketMQ内置的长轮询模式。

长轮询

长轮询通过客户端和服务端的配合,达到主动权在客户端,同时也能保证数据的实时性;长轮询本质上也是轮询,只不过对普通的轮询做了优化处理,服务端在没有数据的时候并不是马上返回数据,会hold住请求,等待服务端有数据,或者一直没有数据超时处理,然后一直循环下去;下面看一下如何简单实现一个长轮询;

1.实现步骤

1.1客户端轮询发送请求

客户端应该存在一个一直循环的程序,不停的向服务端发送获取消息请求;

1.2服务端处理数据

服务器接收到客户端请求之后,首先查看是否有数据,如果有数据则直接返回,如果没有则保持连接,等待获取数据,服务端获取数据之后,会通知之前的请求连接来获取数据,然后返回给客户端;

1.3客户端接收数据

正常情况下,客户端会马上接收到服务端的数据,或者等待一段时间获取到数据;如果一直获取不到数据,会有超时处理;在获取数据或者超时处理之后会关闭连接,然后再次发起长轮询请求;

2.实现实例

以下使用netty模拟一个http服务器,使用HttpURLConnection模拟客户端发送请求,使用BlockingQueue存放数据;

服务端代码

public class Server {public static void start(final int port) throws Exception {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup woker = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();try {serverBootstrap.channel(NioServerSocketChannel.class).group(boss, woker).childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("http-decoder", new HttpServerCodec());ch.pipeline().addLast(new HttpServerHandler());}});ChannelFuture future = serverBootstrap.bind(port).sync();System.out.println("server start ok port is " + port);DataCenter.start();future.channel().closeFuture().sync();} finally {boss.shutdownGracefully();woker.shutdownGracefully();}}public static void main(String[] args) throws Exception {start(8080);}
}

netty默认支持http协议,直接使用即可,启动端口为8080;同时启动数据中心服务,相关代码如下:

public class DataCenter {private static Random random = new Random();private static BlockingQueue<String> queue = new LinkedBlockingQueue<>();private static AtomicInteger num = new AtomicInteger();public static void start() {while (true) {try {Thread.sleep(random.nextInt(5) * 1000);String data = "hello world" + num.incrementAndGet();queue.put(data);System.out.println("store data:" + data);} catch (InterruptedException e) {e.printStackTrace();}}}public static String getData() throws InterruptedException {return queue.take();}}

为了模拟服务端没有数据,需要等待的情况,这里使用BlockingQueue来模拟,不定期的往队列里面插入数据,同时对外提供获取数据的方法,使用的是take方法,没有数据会阻塞知道有数据为止;getData在类HttpServerHandler中使用,此类也很简单,如下:

public class HttpServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof HttpRequest) {FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);httpResponse.content().writeBytes(DataCenter.getData().getBytes());httpResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");httpResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, httpResponse.content().readableBytes());ctx.writeAndFlush(httpResponse);}}
}

获取到客户端的请求之后,从数据中心获取一条消息,如果没有数据,会进行等待,直到有数据为止;然后使用FullHttpResponse返回给客户端;客户端使用HttpURLConnection来和服务端建立连接,不停的拉取数据,代码如下:

public class Client {public static void main(String[] args) {while (true) {HttpURLConnection connection = null;try {URL url = new URL("http://localhost:8080");connection = (HttpURLConnection) url.openConnection();connection.setReadTimeout(10000);connection.setConnectTimeout(3000);connection.setRequestMethod("GET");connection.connect();if (200 == connection.getResponseCode()) {BufferedReader reader = null;try {reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));StringBuffer result = new StringBuffer();String line = null;while ((line = reader.readLine()) != null) {result.append(line);}System.out.println("时间:" + new Date().toString() + "result =  " + result);} finally {if (reader != null) {reader.close();}}}} catch (IOException e) {e.printStackTrace();} finally {if (connection != null) {connection.disconnect();}}}}
}

以上只是简单的模拟了长轮询的方式,下面重点来看看RocketMQ是如何实现长轮询的;

RocketMQ长轮询

RocketMQ的消费端提供了两种消费模式分别是:DefaultMQPushConsumer和DefaultMQPullConsumer,其中DefaultMQPushConsumer就是使用的长轮询,所以下面重点分析此类;

1.PullMessage服务

从名字可以看出来就是客户端从服务端拉取数据的服务,看里面的一个核心方法:

@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}

服务启动之后,会一直不停的循环调用拉取数据,PullRequest可以看作是拉取数据需要的参数,部分代码如下:

public class PullRequest {private String consumerGroup;private MessageQueue messageQueue;private ProcessQueue processQueue;private long nextOffset;private boolean lockedFirst = false;...省略...
}

每个MessageQueue 对应了封装成了一个PullRequest,因为拉取数据是以每个Broker下面的Queue为单位,同时里面还一个ProcessQueue,每个MessageQueue也同样对应一个ProcessQueue,保存了这个MessageQueue消息处理状态的快照;还有nextOffset用来标识读取的位置;继续看一段pullMessage中的内容,给服务端发送请求的头内容:

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;

其中有一个参数是SuspendTimeoutMillis,作用是设置Broker的最长阻塞时间,默认为15秒,前提是没有消息的情况下,有消息会立刻返回;

2.PullMessageProcessor服务

从名字可以看出,服务端用来处理pullMessage的服务,下面重点看一下processRequest方法,其中包括对获取不同结果做的处理:

 switch (response.getCode()) {case ResponseCode.SUCCESS:...省略...break;case ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}case ResponseCode.PULL_RETRY_IMMEDIATELY:break;case ResponseCode.PULL_OFFSET_MOVED:...省略...break;default:assert false;

一共处理了四个类型,我们关心的是在没有获取到数据的情况下是如何处理的,可以重点看一下ResponseCode.PULL_NOT_FOUND,表示没有拉取到数据,此时会调用PullRequestHoldService服务,从名字可以看出此服务用来hold住请求,不会立马返回,response被至为了null,不给客户端响应;下面重点看一下PullRequestHoldService:

@Overridepublic void run() {log.info("{} service started", this.getServiceName());while (!this.isStopped()) {try {if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);} else {this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());}

此方法主要就是通过不停的检查被hold住的请求,检查是否已经有数据了,具体检查哪些就是在ResponseCode.PULL_NOT_FOUND中调用的suspendPullRequest方法:

private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =new ConcurrentHashMap<String, ManyPullRequest>(1024);public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}mpr.addPullRequest(pullRequest);}

将需要hold处理的PullRequest放入到一个ConcurrentHashMap中,等待被检查;具体的检查代码在checkHoldRequest中:

private void checkHoldRequest() {for (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);try {this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}}

此方法用来获取指定messageQueue下最大的offset,然后用来和当前的offset来比较,来确定是否有新的消息到来;往下看notifyMessageArriving方法:

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {List<PullRequest> replayList = new ArrayList<PullRequest>();for (PullRequest request : requestList) {long newestOffset = maxOffset;if (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);}if (newestOffset > request.getPullFromThisOffset()) {if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}}

方法中两个重要的判定就是:比较当前的offset和maxoffset,看是否有新的消息到来,有新的消息返回客户端;另外一个就是比较当前的时间和阻塞的时间,看是否超过了最大的阻塞时间,超过也同样返回; 此方法不光在PullRequestHoldService服务类中循环调用检查,同时在DefaultMessageStore中消息被存储的时候调用;其实就是主动检查和被动通知两种方式。

3.PullCallback回调

服务端处理完之后,给客户端响应,回调其中的PullCallback,其中在处理完消息之后,重要的一步就是再次把pullRequest放到PullMessageService服务中,等待下一次的轮询;

总结

本文首先介绍了两种消费消息的模式,介绍了其中的优缺点,然后引出了长轮询,并且在本地简单模拟了长轮询,最后重点介绍了RocketMQ中是如何实现的长轮询

这篇关于RocketMQ的长轮询(Long Polling)实现分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/220964

相关文章

SpringBoot集成redisson实现延时队列教程

《SpringBoot集成redisson实现延时队列教程》文章介绍了使用Redisson实现延迟队列的完整步骤,包括依赖导入、Redis配置、工具类封装、业务枚举定义、执行器实现、Bean创建、消费... 目录1、先给项目导入Redisson依赖2、配置redis3、创建 RedissonConfig 配

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

Nginx部署HTTP/3的实现步骤

《Nginx部署HTTP/3的实现步骤》本文介绍了在Nginx中部署HTTP/3的详细步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录前提条件第一步:安装必要的依赖库第二步:获取并构建 BoringSSL第三步:获取 Nginx

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja