本文主要是介绍全网最细的nacos服务端服务注册:没有之一,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
服务端注册入口
BaseRpcServer
由于有@PostConstrct注解,也就是说在构造方法后会执行,startServer会由它的子类BaseGrpcServer.startServer重写
BaseGrpcServer.startServer
addServices:
其中grpcCommonRequestAcceptor和grpcBiStreamRequestAcceptor就是实现了nacos_grpc_service.proto的两个子处理类
public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase {
�public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase {
�
对应nacos_grpc_service.proto中的两个方法处理
service Request {// Sends a commonRequestrpc request (Payload) returns (Payload) {}
}service BiRequestStream {// Sends a biStreamRequestrpc requestBiStream (stream Payload) returns (stream Payload) {}
}
GrpcRequestAcceptor.request
这里最终会调用到InstanceRequestHandler.handle方法,不用猜基本上就知道要做什么,实例相关的操作
InstanceRequestHandler.handle
最终会调用到EphemeralClientOperationServiceImpl.registerInstance
EphemeralClientOperationServiceImpl.registerInstance
具体做了几件事:
- 从ServiceManger里面通过service来获取service(包含namespace,group,name,ephemeral=true),我们来看一下ServiceManger具体是什么样子的存储结构?
更加形象的是这样:
- 从ClientManger通过clientId获取Client,这里我们来看一下clientId、ClientManger、Client分别是什么?
ClientId:
ClientManger: 专门用来管理客户端连接的组件
Client:客户端信息
上图中connectionBasedClientManger里面的clients属性 ,里面存储的key、value,key为clientId,value为客户端信息,也就是上面的ConnectionBasedClient
- 创建发布信息,并将发布信息放到ConnectionBasedClient中
将Service和publishInfo放到publishers中,实际的数据如图:
- 发布ClientRegisterServiceEvent�事件,具体需要看看这个事件是由谁来处理,其实很好找,看看这个那个类使用了这个事件就行,最终找到了ClientServcieIndexesManger
ClientServiceIndexesManager.handleClientOperation
这里总结一下做了几件事:
- 将service以及clientId放到publisherIndexes中,存储的格式就是<Service,Set>,更加形象的图如图:
- 发布ServiceChangedEvent事件,这个事件最终会被NamingSubscriberServiceV2Impl给处理
NamingSubscriberServiceV2Impl.onEvent
这里会将事件封装成PushDelayTask,然后使用delayTaskEngine,这里的delayTaskEngine是PushDelayTaskExecute
PushDelayTaskExecuteEngine�.addTask
PushDealyTaskExecuteEngine继承了NacosDelayTaskExecuteEngine,最终会调用到父类的addTask方法
其实这里比较简单,就是把task加入到ConcurrentHashMap里面去,key就是Servcie,value就是pushDelayTask
加入到这个ConcurrentHashMap肯定是要被处理的,那我们看看是在哪里被处理?
- NacosDelayTaskExecuteEngine在构造方法里面创建了一个定时调度的线程池,里面有一个被定时调度的任务,每隔100ms被调度执行
- NacosDelayTaskExecuteEngine.processTask
遍历上面的concurrentHashMap,然后通过taskkey然后找到处理类,因为我们没有设置,所以这里的NacosTaskProcessor就是子类PushDelayTaskExecuteEnginge里面设置的PushDelayTaskProcessor
- PushDelayTaskProcessor.process
这里的NacosExecuteTaskExecuteEngine还是需要详细分析一下
到这里用图总结一下从NamingSubscriberServiceV2Impl.onEvent到此处:
NacosExecuteTaskExecute.addTask
NacosExecuteTaskExecute.addTask
逻辑比较简单,先看看它有没有对应的NacosTaskProcessor,实际就是没有的(key没有,默认也没有设置),接下来就直接通过getWorker来处理
从executeWorkers中取一个TaskExecuteWorker给这个task来执行,所以我们要看一下这个executeWorkers到底是什么?
这里的TaskExecuteWorker就是一个单线程执行器
TaskExecuteWorker.process
又是似曾相识的套路,往queue里面塞东西,那具体在哪里处理的?
里面有内部单线程处理:
也就说最终还是会调用到传进来task的run方法,看到这里感觉无比的累啊,应该快结束了
到这里也拿一张图总结一下:
PushExecuteTask.run
总结一下做了几件事:
- 生成推送数据:generatePushData
- 通过serviceIndexesManger + service能找到service对应那些clientId
- 通过clientManger+ clientId能找到client对应的发布信息publishers,再通过publishers+service能找到这个service对应的Instance
我们来看一下实际debug得到的数据:
- 往serviceClusterIndex.put(service, clusters)
- 往serviceDataIndexes.put(service, serviceInfo)
- 推送给目标客户端
- getTargetClients: 通过ClientServiceIndexManger+ service来获取这个订阅这个service所有clientId
- 遍历订阅这个service的所有clientId,通过ClientManger+ clientId找到这个客户端,再通过这个客户端的subsribers+ service就能知道这个客户端订阅这个service的详情了
- 最后调用doPushWithCallback往订阅客户端推送数据
这篇关于全网最细的nacos服务端服务注册:没有之一的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!