本文主要是介绍Nacos 入门篇---注册服务:如何做到高并发支持上百万服务注册(四),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、引言
上个章节我们讲了Nacos接受到客户端的注册请求,感觉看到最后也没有进行真正注册,那我们本章就来继续了解下~
二、目录
目录
一、引言
二、目录
三、回顾上节内容
四、异步任务设计思想
五、异步任务和内存队列源码解析
六、异步任务源码分析
总结
三、回顾上节内容
从上个章节Nacos客户端源码启动得知,最后会通过 http 方式 ,请求 /nacos/v1/ns/instance 地址到,服务端 naming 模块中 InstanceController类中的register方法。
在 register 方法中,最后会调用 notifier.addTask 方法,在这个方法中会把key、action包装成Pair对象,丢入到 BlockingQueue 队列当中。到此 register方法就结束了。
是不是会感觉注册服务的源码感觉看了跟没看似的?好奇最后包装成Pair对象,丢入到 BlockingQueue 当中,然后又做了什么操作,到底是怎么注册进去的~
别着急,我们这章就把这个疑惑给解决。
四、异步任务设计思想
在Nacos 1.0.0版本中,Nacos 实例注册压测报告中,可以看出实例注册能够达到上百万台实例。
测试报告地址:Nacos服务发现性能测试报告
Nacos 服务发现性能测试都是针对重点功能,通过对 3 节点规模集群进行压测,可以看到接口性能负载和容量。
压测容量服务数可达 60W,实例注册数达 110W,集群运行持续稳定,达到预期;(注:由于本次注册实例使用的是 HTTP 接口,并没有将心跳上报的 TPS 包括在内,如果要支持百万实例的心跳上报,需要集群水平扩容,并调优 Tomcat 和内核参数。)
注册/查询实例 TPS 达到 13000 以上,接口达到预期。
为什么Nacos能支持这么高的并发,来保证实例完成注册?
在核心Nacos 注册的时候,采用的是 异步 任务。
异步任务流程图:
我们先看图,Nacos客户端首先通过/nacos/v1/ns/instance 地址发起注册,Nacos服务端收到请求后包装成一个个Pair对象,丢入到 BlockingQueue 当中。到此注册任务就结束了,然后给Nacos客户端进行返回。
这时Nacos服务端会新开启一个单线程异步任务,从 BlockingQueue 当中获取任务,把信息写入注册表,完成注册。
这样做的好处是什么?
- 接口响应时效更快:其实Nacos服务端实例注册接口,他并没有做 真正注册的方法,只是把信息包装成对象丢入到 BlockingQueue 当中,接口就结束了。代码逻辑很简单,所以响应时效会很快。
- 保证服务稳定性:哪怕同时有1千个、1万个Nacos客户端注册请求接口,最后也只是把任务放到阻塞队列当中。就跟消息队列中的流量削峰一样,复杂逻辑由消费者慢慢进行处理,这里异步任务就是消费者。
- 解决写时并发冲突:Naocs服务端后台,其实是只有一个线程在处理队列中的任务,写入到注册表当中。是不会有线程并发写的问题的。但是读的操作是由其他线程来操作,可能会存在读写并发冲突。这个我们下一章来讲。
五、异步任务和内存队列源码解析
主线任务:既然把任务放入到了 task当中,那么异步任务是怎么获取的 ?
先回到了上章节代码,我们先把注册信息包装成Pair对象,丢入到 BlockingQueue 阻塞队列当中。
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);/*** 向队列中添加新的通知任务。* @param datumKey data key* @param action action for data*/
public void addTask(String datumKey, DataOperation action) {if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {return;}if (action == DataOperation.CHANGE) {services.put(datumKey, StringUtils.EMPTY);}// 主线任务:taskks 是一个阻塞队列,并且把key、action包装成 Pair 对象,放入队列当中tasks.offer(Pair.with(datumKey, action));
}
那任务都丢入到 tasks 队列当中,肯定会从tasks当中获取任务。那我们看下 tasks 哪里调用了获取任务的代码,这一看就是第三个,tasks.take() 从队列中获取任务。
我们可以看到 Notifier 继承了线程类,重写 run方法。在 run 方法当中,去进行不断获取队列任务进行处理。处理任务的代码我们后面再看,抓住主线任务,那这个
Notifier 线程类在哪启动了 ?我们看下 Notifier 在哪里调用,可以看到就两处,一处是在 DistroConsistencyServiceImpl 类,一处是在 Test 测试类。那一猜逻辑代码
怎么可能写到测试类里面了,那我们直接看第一处调用即可。
public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);// 省略部分代码@Overridepublic void run() {Loggers.DISTRO.info("distro notifier started");for (; ; ) {try {Pair<String, DataOperation> pair = tasks.take();handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}// 省略部分代码
}
如下图,在这个类创建了 Notifier 对象。接着往下看,有一个init方法,方法上加了@PostConstruct注解,方法上加上了这个注解,在 Spring 创建这个 Bean 的时候,会来执行这个方法。最后把 Notifier 类提交到线程池当中。
看到这,我们大概就知道了,总结一下:
主线任务:既然把任务放入到了 task当中,那么异步任务是怎么获取的 ?
在创建 DistroConsistencyServiceImpl 类的时候,会创建 Notifier对象,Notifier对象继承了 Runnable 线程类。在重写run方法的时候会不断从 tasks 队列中获取任务进行处理。 在Spring 创建这个 Bean 的时候,会把 Notifie 类提交到线程池当中进行执行。
看完了上面的逻辑,我们把代码流程图也补充下:
六、异步任务源码分析
主线任务:异步任务中做了什么 ?
那我们就要回到 Notifier类当中的run方法了
public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);// 省略部分代码@Overridepublic void run() {Loggers.DISTRO.info("distro notifier started");for (; ; ) {try {// 从队列中获取任务Pair<String, DataOperation> pair = tasks.take();// 主线任务:处理任务逻辑在这个方法当中handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}// 省略部分代码
}
对于这里的死循环,有的小伙伴就会有疑问了?
- 这里死循环合理吗?
- 会占用CPU资源吗?
- 如果抛出异常了,是不是循环就结束了 ?
答:死循环合理,因为Nacos服务端要一直处理客户端的实例注册请求。而Nacos服务端也不知道它到底有多少客户端服务实例需要注册,所以这里写一个死循环在这里,不断重复去执行。
既然是死循环,那就要考虑到占用CPU资源的问题?
tasks是一个阻塞队列,而阻塞队列的特点就是不会占用CPU资源,并且take方法是会一直阻塞直接取到元素或者当前线程中断。
抛出异常了,也不会结束。因为for循环里面进行异常捕获了,如果抛出异常了,就去执行下一个任务了。
紧接着,把任务取出来之后,调用了 handle() 方法,在 handle() 方法中,先把之前拼接的常量 key 取出来,还有action ,这里的action传就来的就是 DataOperation.CHANGE,然后调用 listener.onChange() 方法。我们看下 listener.onChange() 方法的参数,首先是常量key,这个不用说,其次就是 dataStore.get(datumKey).value,是不是很眼熟。其实这里的dataStore.get(datumKey).value 获取的就是 dataStore 中的 Instances,
不信的话可以看下面之前的代码,我已经贴出来了。
private void handle(Pair<String, DataOperation> pair) {try {// 把之前创建的 key 和 action 取出来String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {// 这里的 action 默认就是 DataOperation.CHANGEif (action == DataOperation.CHANGE) {// 主线任务:这里的 dataStore.get(datumKey).value 其实获取的就是 Instanceslistener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}
}
之前的代码:
public void onPut(String key, Record value) {// 这里还是判断刚刚那个 key 前缀,这里是为 trueif (KeyBuilder.matchEphemeralInstanceListKey(key)) {// 创建Datum对象,把key、和 Instances都放入Datum对象里面去Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();// 最后添加到dataStore当中,这个 dataStore就是一个Map对象dataStore.put(key, datum);}
}
我们看下流程图巩固一下,这里主要是把key和value包装成 DatmStore 放到了 dataMap当中。
本章节源码分析到这里就结束了,这个DataStore的作用,其实就是在执行异步任务的时候,会通过key从 DataStore 当中获取对应的 Instance,这个Instance存储的就是注册客户端的实例数据。
总结
本章节主要讲了三大点
- 首先讲了Nacos能支持上百万台注册,主要归功于它的异步任务设计。那什么是异步任务?
答:先接受到客户端的注册实例,包装成任务,放到阻塞队列当中。然后再开一条线程死循环去阻塞队列当中获取任务一个个去执行,完成真正的实例注册。 (这里不会存在资源浪费的情况,阻塞队列有任务就执行,没任务线程就会挂起)
这样做有三大好处:
- 接口响应快 :不用等注册完成才去返回。只要接受到客户端注册实例信息,包装成任务,丢入到阻塞队列当中,就进行返回。
- 保证服务稳定性:哪怕有1千、1万个实例同时去请求实例注册接口,最后也只是把任务丢入到阻塞队列当中去。然后由后台异步任务慢慢去执行注册。
- 线程冲突:Naocs服务端后台,其实是只有一个线程在处理队列中的任务,写入到注册表当中。是不会有线程并发写的问题的
- 然后又讲了Nacos任务放入到了 task当中,那么异步任务是怎么获取的 ?
在创建 DistroConsistencyServiceImpl 类的时候,会创建 Notifier对象,Notifier对象继承了 Runnable 线程类。在重写run方法的时候会不断从 tasks 队列中获取任务进行处理。在Spring 创建这个 Bean 的时候,会把 Notifie 类提交到线程池当中进行执行。
- 最后讲了 异步任务中做了什么 ?
我们今天的源码只分析到了 dataStore.get(datumKey).value,至于 listener.onChange 想必你也能猜到,这个方法里面就是真正地把 Instance 信息写到注册表里面去
在讲解这个方法前,还需要跟你把 Nacos 内存注册表的结构讲清楚,以及 Nacos 在写注册表的时候,怎么解决读写并发冲突问题。这两个知识点,我会在下一个章节详细给你讲解。
流程图:
Nacos 内存注册表的结构讲清楚,以及 Nacos 在写注册表的时候,怎么解决读写并发冲突问题。这两个知识点,我会在下一个章节详细给你讲解~
这篇关于Nacos 入门篇---注册服务:如何做到高并发支持上百万服务注册(四)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!