13、Nacos 源码分析-Distro协议(上)

2023-11-11 04:20

本文主要是介绍13、Nacos 源码分析-Distro协议(上),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

应粉丝要求,想要我出一篇关于NacosDistro协议的相关内容。于是我也去研究了一下。下面分享一下我对NacosDistro协议的一些见解。如有不对之处欢迎批评指正。

本次还是以Nacos2.2.0的版本来进行分析。

首先定位一下distro的代码哪里,最简单的方法就是用IDE的查找方式,找到了两个关于distro的文件夹。

在这里插入图片描述

我们在写代码的时候按功能模块定义文件夹,可以方便我们自己和他人在快速找到相关的逻辑。

我们接着看core模块中的distro结构,它位于distributed包下面,和他同级别的还有idraft文件夹,说明这个distributed包是和分布式相关的内容,包含了distro协议,raft协议和分布式id生成的相关内容。

在这里插入图片描述

既然是分析distro协议,当然关注点在distro包中了,那从哪里开始呢?

我们看到有个DistroConfig,那就从这个开始吧,因为不管是什么逻辑代码,肯定有有一些配置项,配置信息的,我们可以通过配置信息和配置项去找到相关的逻辑和核心处理类。

在spring中,我们也可以通过分析@Configuration配置信息查看配置了一些基础信息,可以更快了了解工程的基础架构设施有哪些

DistroConfig类继承了AbstractDynamicConfig,里面也是写了几个配置信息。

public class DistroConfig extends AbstractDynamicConfig {private static final String DISTRO = "Distro";// 单例模式下的初始化private static final DistroConfig INSTANCE = new DistroConfig();// 同步延迟时间,默认1sprivate long syncDelayMillis = DistroConstants.DEFAULT_DATA_SYNC_DELAY_MILLISECONDS;// 同步超时时间,默认3sprivate long syncTimeoutMillis = DistroConstants.DEFAULT_DATA_SYNC_TIMEOUT_MILLISECONDS;// 同步重试延迟时间,默认3sprivate long syncRetryDelayMillis = DistroConstants.DEFAULT_DATA_SYNC_RETRY_DELAY_MILLISECONDS;// 验证间隔时间,默认5sprivate long verifyIntervalMillis = DistroConstants.DEFAULT_DATA_VERIFY_INTERVAL_MILLISECONDS;// 验证超时时间,默认3sprivate long verifyTimeoutMillis = DistroConstants.DEFAULT_DATA_VERIFY_TIMEOUT_MILLISECONDS;// 导入数据延迟重试时间,默认30sprivate long loadDataRetryDelayMillis = DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS;// 导入超时时间,默认30sprivate long loadDataTimeoutMillis = DistroConstants.DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS;private DistroConfig() {super(DISTRO);resetConfig();}@Overrideprotected void getConfigFromEnv() {// 省略从环境变量中获取配置信息内容}// 省略get,set和printConfig方法
}

通过对配置信息引用的查找,我们可以找到一个核心类DistroClientComponentRegistry,这个类位于com.alibaba.nacos.naming.consistency.ephemeral.distro.v2,其初始化了几个重要的与Distro相关的类。

@Component
// spring创建Bean
public class DistroClientComponentRegistry {private final ServerMemberManager serverMemberManager;private final DistroProtocol distroProtocol;private final DistroComponentHolder componentHolder;private final DistroTaskEngineHolder taskEngineHolder;private final ClientManager clientManager;private final ClusterRpcClientProxy clusterRpcClientProxy;// 构造方法注入public DistroClientComponentRegistry(ServerMemberManager serverMemberManager, DistroProtocol distroProtocol,DistroComponentHolder componentHolder, DistroTaskEngineHolder taskEngineHolder,ClientManagerDelegate clientManager, ClusterRpcClientProxy clusterRpcClientProxy) {this.serverMemberManager = serverMemberManager;this.distroProtocol = distroProtocol;this.componentHolder = componentHolder;this.taskEngineHolder = taskEngineHolder;this.clientManager = clientManager;this.clusterRpcClientProxy = clusterRpcClientProxy;}/*** 为v2的发行版协议注册必要的组件*/@PostConstructpublic void doRegister() {// 构造方法后执行DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol);DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy,serverMemberManager);DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(taskEngineHolder);// 给componentHolder注册相关处理类componentHolder.registerDataStorage(DistroClientDataProcessor.TYPE, dataProcessor);componentHolder.registerDataProcessor(dataProcessor);componentHolder.registerTransportAgent(DistroClientDataProcessor.TYPE, transportAgent);componentHolder.registerFailedTaskHandler(DistroClientDataProcessor.TYPE, taskFailedHandler);}
}

找到了入口后,我们开始一个个分析。

#ServerMemberManager

ServerMemberManager在11、Nacos 配置服务服务端源码分析(二)这篇中已经分析了,它的主要功能是识别配置的服务端的其他节点的地址,可以支持配置在配置文件中或者单独的cluster.conf文件中,并且对文件进行了监控,发现文件数据变化的话,动态更新服务端的地址。

DistroProtocol

DistroProtocol顾明思议,这个就是DistroProtocol协议的本体。我们重点分析一下这个类。

@Component
// spring管理,由spring创建
public class DistroProtocol {private final ServerMemberManager memberManager;private final DistroComponentHolder distroComponentHolder;private final DistroTaskEngineHolder distroTaskEngineHolder;private volatile boolean isInitialized = false;public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,DistroTaskEngineHolder distroTaskEngineHolder) {this.memberManager = memberManager;this.distroComponentHolder = distroComponentHolder;this.distroTaskEngineHolder = distroTaskEngineHolder;// 构造方法开始执行Distro协议任务startDistroTask();}private void startDistroTask() {if (EnvUtil.getStandaloneMode()) {// 单机模式无需使用Distro协议isInitialized = true;return;}// 开启验证startVerifyTask();// 开启导入startLoadTask();}private void startLoadTask() {// 回调函数,如果导入数据成功,说明已经初始化,否则为falseDistroCallback loadCallback = new DistroCallback() {@Overridepublic void onSuccess() {isInitialized = true;}@Overridepublic void onFailed(Throwable throwable) {isInitialized = false;}};// 执行数据加载导入,只执行一次GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));}private void startVerifyTask() {// 固定每次间隔5s执行验证处理GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,distroTaskEngineHolder.getExecuteWorkersManager()),DistroConfig.getInstance().getVerifyIntervalMillis());}public boolean isInitialized() {return isInitialized;}/*** 通过配置的延迟开始同步,默认1s*/public void sync(DistroKey distroKey, DataOperation action) {sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());}/*** 将数据同步到所有远程服务器*/public void sync(DistroKey distroKey, DataOperation action, long delay) {for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}}/*** 同步到目标服务器*/public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),targetServer);DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}}/*** 从指定服务器查询数据*/public DistroData queryFromRemote(DistroKey distroKey) {if (null == distroKey.getTargetServer()) {Loggers.DISTRO.warn("[DISTRO] Can't query data from empty server");return null;}String resourceType = distroKey.getResourceType();DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);if (null == transportAgent) {Loggers.DISTRO.warn("[DISTRO] Can't find transport agent for key {}", resourceType);return null;}return transportAgent.getData(distroKey, distroKey.getTargetServer());}/*** 接收同步的distro数据,查找响应的处理器的处理*/public boolean onReceive(DistroData distroData) {Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),distroData.getDistroKey());String resourceType = distroData.getDistroKey().getResourceType();DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);return false;}return dataProcessor.processData(distroData);}/*** 接收验证数据,查找响应的处理器的处理*/public boolean onVerify(DistroData distroData, String sourceAddress) {if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),distroData.getDistroKey());}String resourceType = distroData.getDistroKey().getResourceType();DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);return false;}return dataProcessor.processVerifyData(distroData, sourceAddress);}/*** 根据input distro key查找数据*/public DistroData onQuery(DistroKey distroKey) {String resourceType = distroKey.getResourceType();DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(resourceType);if (null == distroDataStorage) {Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", resourceType);return new DistroData(distroKey, new byte[0]);}return distroDataStorage.getDistroData(distroKey);}/*** 查询所有快照数据*/public DistroData onSnapshot(String type) {DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);if (null == distroDataStorage) {Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);return new DistroData(new DistroKey("snapshot", type), new byte[0]);}return distroDataStorage.getDatumSnapshot();}
}

从这个类可以看出,其方法都是在接受数据,查询数据,同步数据的操作。我们先重点分析一下startDistroTask的两个操作。

startVerifyTask

startVerifyTask()是用了一个可延迟执行的线程池创建线程执行DistroVerifyTimedTaskrun()方法。

public void run() {try {// 获取非本机的其他服务节点List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("server list is: {}", targetServer);}for (String each : distroComponentHolder.getDataStorageTypes()) {// 根据类型来验证,这个类型代表着协议类型,2.2.0的版本只会用Grpc的类型verifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);}
}private void verifyForDataStorage(String type, List<Member> targetServer) {// 获取处理存储类DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);if (!dataStorage.isFinishInitial()) {Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",dataStorage.getClass().getSimpleName());return;}// 拿到验证的数据List<DistroData> verifyData = dataStorage.getVerifyData();if (null == verifyData || verifyData.isEmpty()) {return;}for (Member member : targetServer) {DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);if (null == agent) {continue;}// 通过执行器执行executeTaskExecuteEngine.addTask(member.getAddress() + type,new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));}
}

在验证的处理方法中,先需要拿到非本机的其他节点,然后进行验证

验证的时候会首先拿到本机的数据,再对所有节点都执行验证操作

我们看下getVerifyData()部分代码,看看验证数据是什么。

public List<DistroData> getVerifyData() {List<DistroData> result = null;for (String each : clientManager.allClientId()) {// 对每个本机所管理的注册客户端进行处理Client client = clientManager.getClient(each);if (null == client || !client.isEphemeral()) {// 空的或者是非临时性的节点,不处理continue;}// 如果是自己管理的客户端if (clientManager.isResponsibleClient(client)) {// 需要验证的数据就是每个节点的clientId和revisionDistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(),client.getRevision());DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);DistroData data = new DistroData(distroKey,ApplicationUtils.getBean(Serializer.class).serialize(verifyData));data.setType(DataOperation.VERIFY);if (result == null) {result = new LinkedList<>();}result.add(data);}}return result;
}

通过代码发现,其需要验证的连接服务的客户端的clinetIdrevision。获取到数据后,再通过executeTaskExecuteEngine.addTask(member.getAddress() + type, new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type))处理。

我们继续看下DistroVerifyExecuteTaskrun方法。

 public void run() {for (DistroData each : verifyData) {try {if (transportAgent.supportCallbackTransport()) {// 支持回调的处理方法doSyncVerifyDataWithCallback(each);} else {// 不支持回调的处理方法doSyncVerifyData(each);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);}}}

为了简单,这里只分析非回调的方法

public boolean syncVerifyData(DistroData verifyData, String targetServer) {if (isNoExistTarget(targetServer)) {// 本地节点的服务列表不包含目标服务,直接返回return true;}// 将目标服务器替换为自身服务器,以便可以进行回调。verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());// 创建DistroDataRequestDistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy, key: {}", targetServer,verifyData.getDistroKey());return false;}try {// rpc请求Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! key: {} ", verifyData.getDistroKey(), e);}return false;
}

DistroVerifyExecuteTask的逻辑是通过Grpc发送到其余所有服务节点同步验证DistroData

startLoadTask

startLoadTask()执行DistroLoadDataTaskrun()方法

public void run() {try {// 执行load方法load();if (!checkCompleted()) {// 未处理完,延迟重试GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());} else {// 处理完了则回调loadCallback.onSuccess();Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");}} catch (Exception e) {loadCallback.onFailed(e);Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);}
}private void load() throws Exception {while (memberManager.allMembersWithoutSelf().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");TimeUnit.SECONDS.sleep(1);}while (distroComponentHolder.getDataStorageTypes().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");TimeUnit.SECONDS.sleep(1);}for (String each : distroComponentHolder.getDataStorageTypes()) {if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {// 处理所有协议类型,这里只有V2 Grpc类型loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));}}
}private boolean loadAllDataSnapshotFromRemote(String resourceType) {DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == transportAgent || null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",resourceType, transportAgent, dataProcessor);return false;}for (Member each : memberManager.allMembersWithoutSelf()) {long startTime = System.currentTimeMillis();try {Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());// 获取快照,处理快照DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());Loggers.DISTRO.info("[DISTRO-INIT] it took {} ms to load snapshot {} from {} and snapshot size is {}.",System.currentTimeMillis() - startTime, resourceType, each.getAddress(),getDistroDataLength(distroData));boolean result = dataProcessor.processSnapshot(distroData);Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),result);if (result) {distroComponentHolder.findDataStorage(resourceType).finishInitial();return true;}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);}}return false;
}

导入数据是为了导入所有节点上的最新的快照数据。

DistroComponentHolder

DistroComponentHolder比较简单,就是对需要使用的处理类,任务执行类,协议处理类,远程调用类进行存储,只有这部分内容

@Component
// spring 创建
public class DistroComponentHolder {// 存储不同类型的DistroData传输对象private final Map<String, DistroTransportAgent> transportAgentMap = new HashMap<>();// 存储不同类型的DistroData装载容器private final Map<String, DistroDataStorage> dataStorageMap = new HashMap<>();// 存储不同类型的Distro失败任务处理器private final Map<String, DistroFailedTaskHandler> failedTaskHandlerMap = new HashMap<>();// 存储不同类型的DistroData数据处理器private final Map<String, DistroDataProcessor> dataProcessorMap = new HashMap<>();// 省略一些放入和获取方法
}

DistroTaskEngineHolder

DistroTaskEngineHolder持有了两种TaskExecuteEngine,分别是立即执行的DistroExecuteTaskExecuteEngine和可延迟的DistroDelayTaskExecuteEngine。关于这两种TaskExecuteEngine可以查看5、Nacos 服务注册服务端源码分析(四)

@Component
public class DistroTaskEngineHolder implements DisposableBean {private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);// 设置默认的处理类delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);}public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {return delayTaskExecuteEngine;}public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {return executeWorkersManager;}public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);}@Overridepublic void destroy() throws Exception {this.delayTaskExecuteEngine.shutdown();this.executeWorkersManager.shutdown();}
}

ClientManagerDelegate

ClientManagerDelegate是一个委托类,其可以委托给ConnectionBasedClientManagerEphemeralIpPortClientManagerPersistentIpPortClientManager的一个进行处理。

@DependsOn({"clientServiceIndexesManager", "namingMetadataManager"})
@Component("clientManager")
public class ClientManagerDelegate implements ClientManager {private final ConnectionBasedClientManager connectionBasedClientManager;private final EphemeralIpPortClientManager ephemeralIpPortClientManager;private final PersistentIpPortClientManager persistentIpPortClientManager;// 构造方法注入public ClientManagerDelegate(ConnectionBasedClientManager connectionBasedClientManager,EphemeralIpPortClientManager ephemeralIpPortClientManager,PersistentIpPortClientManager persistentIpPortClientManager) {this.connectionBasedClientManager = connectionBasedClientManager;this.ephemeralIpPortClientManager = ephemeralIpPortClientManager;this.persistentIpPortClientManager = persistentIpPortClientManager;}@Overridepublic boolean clientConnected(String clientId, ClientAttributes attributes) {// 客户端连接return getClientManagerById(clientId).clientConnected(clientId, attributes);}@Overridepublic boolean clientConnected(Client client) {// 客户端连接return getClientManagerById(client.getClientId()).clientConnected(client);}@Overridepublic boolean syncClientConnected(String clientId, ClientAttributes attributes) {// 同步客户端连接return getClientManagerById(clientId).syncClientConnected(clientId, attributes);}@Overridepublic boolean clientDisconnected(String clientId) {// 客户端断开return getClientManagerById(clientId).clientDisconnected(clientId);}@Overridepublic Client getClient(String clientId) {// 获取客户端return getClientManagerById(clientId).getClient(clientId);}@Overridepublic boolean contains(String clientId) {return connectionBasedClientManager.contains(clientId) || ephemeralIpPortClientManager.contains(clientId)|| persistentIpPortClientManager.contains(clientId);}@Overridepublic Collection<String> allClientId() {Collection<String> result = new HashSet<>();result.addAll(connectionBasedClientManager.allClientId());result.addAll(ephemeralIpPortClientManager.allClientId());result.addAll(persistentIpPortClientManager.allClientId());return result;}@Overridepublic boolean isResponsibleClient(Client client) {// 判断是否为本机负责的clientreturn getClientManagerById(client.getClientId()).isResponsibleClient(client);}@Overridepublic boolean verifyClient(DistroClientVerifyInfo verifyData) {// 验证客户端信息return getClientManagerById(verifyData.getClientId()).verifyClient(verifyData);}// 判断获取ClientManagerprivate ClientManager getClientManagerById(String clientId) {if (isConnectionBasedClient(clientId)) {return connectionBasedClientManager;}return clientId.endsWith(ClientConstants.PERSISTENT_SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;}private boolean isConnectionBasedClient(String clientId) {return !clientId.contains(IpPortBasedClient.ID_DELIMITER);}
}

ClusterRpcClientProxy

ClusterRpcClientProxy是一个服务节点间的Rpc代理类,这里需要注意的是它仅做服务节点间的相互调用。并不是之前讨论的客户端到服务端的请求或者服务端到客户端的请求。但是其本质任然是拿到ip和端口,建立Grpc通道进行相互请求。它继承了MemberChangeListener,而MemberChangeListener继承了Subscriber<MembersChangeEvent>,关注了MembersChangeEvent。也就是会对服务间的节点变动做出响应。下面我们具体来看下这个类的逻辑

@Service
// spring 容器创建
public class ClusterRpcClientProxy extends MemberChangeListener {// 默认请求超时时间3sprivate static final long DEFAULT_REQUEST_TIME_OUT = 3000L;@AutowiredServerMemberManager serverMemberManager;@PostConstruct// 构造函数后执行public void init() {try {// 向通知中心注册自己,自己作为一个订阅者,订阅在MemberChangeListener中声明的MembersChangeEvent事件NotifyCenter.registerSubscriber(this);// 获取除去自己外的其他服务节点成员List<Member> members = serverMemberManager.allMembersWithoutSelf();// 刷新成员refresh(members);Loggers.CLUSTER.warn("[ClusterRpcClientProxy] success to refresh cluster rpc client on start up,members ={} ",members);} catch (NacosException e) {Loggers.CLUSTER.warn("[ClusterRpcClientProxy] fail to refresh cluster rpc client,{} ", e.getMessage());}}private void refresh(List<Member> members) throws NacosException {// 确保创建新成员的客户端for (Member member : members) {if (MemberUtil.isSupportedLongCon(member)) {// 创建对应成员的Grpc客户端并启动客户端createRpcClientAndStart(member, ConnectionType.GRPC);}}// 关闭并删除旧成员。Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();List<String> newMemberKeys = members.stream().filter(MemberUtil::isSupportedLongCon).map(this::memberClientKey).collect(Collectors.toList());while (iterator.hasNext()) {Map.Entry<String, RpcClient> next1 = iterator.next();if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());RpcClientFactory.getClient(next1.getKey()).shutdown();iterator.remove();}}}private String memberClientKey(Member member) {return "Cluster-" + member.getAddress();}private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException {Map<String, String> labels = new HashMap<>(2);labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_CLUSTER);String memberClientKey = memberClientKey(member);// 创建GRpc客户端RpcClient client = buildRpcClient(type, labels, memberClientKey);if (!client.getConnectionType().equals(type)) {Loggers.CLUSTER.info(",connection type changed,destroy client of member - > : {}", member);RpcClientFactory.destroyClient(memberClientKey);client = buildRpcClient(type, labels, memberClientKey);}if (client.isWaitInitiated()) {Loggers.CLUSTER.info("start a new rpc client to member - > : {}", member);//one fixed serverclient.serverListFactory(new ServerListFactory() {@Overridepublic String genNextServer() {return member.getAddress();}@Overridepublic String getCurrentServer() {return member.getAddress();}@Overridepublic List<String> getServerList() {return CollectionUtils.list(member.getAddress());}});// 启动客户端client.start();}}/*** 构造Grpc客户端*/private RpcClient buildRpcClient(ConnectionType type, Map<String, String> labels, String memberClientKey) {RpcClient clusterClient = RpcClientFactory.createClusterClient(memberClientKey, type, EnvUtil.getAvailableProcessors(2),EnvUtil.getAvailableProcessors(8), labels);return clusterClient;}// 忽略发送Rpc请求...@Overridepublic void onEvent(MembersChangeEvent event) {try {// 获取到服务节点变动,刷新服务端成员节点List<Member> members = serverMemberManager.allMembersWithoutSelf();refresh(members);} catch (NacosException e) {Loggers.CLUSTER.warn("[serverlist] fail to refresh cluster rpc client, event:{}, msg: {} ", event, e.getMessage());}}/*** 检查成员的客户端是否正在运行*/public boolean isRunning(Member member) {RpcClient client = RpcClientFactory.getClient(memberClientKey(member));if (null == client) {return false;}return client.isRunning();}
}

ClusterRpcClientProxy逻辑代码中,只要发现服务节点变化,就需要重新创建各个节点的GRpc连接,并销毁之前的连接。

DistroClientDataProcessor

构造函数中的几个类分析完后,我们接着继续看@PostConstruct中的几个重要类。首先看到的就是创建了DistroClientDataProcessor。先看下这个类图。

在这里插入图片描述

从类图来看,它肯定是一个事件订阅者,处理这某一类的事件。

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {public static final String TYPE = "Nacos:Naming:v2:ClientData";private final ClientManager clientManager;private final DistroProtocol distroProtocol;private volatile boolean isFinishInitial;public DistroClientDataProcessor(ClientManager clientManager, DistroProtocol distroProtocol) {this.clientManager = clientManager;this.distroProtocol = distroProtocol;// 注册自己为订阅者NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());}@Overridepublic List<Class<? extends Event>> subscribeTypes() {List<Class<? extends Event>> result = new LinkedList<>();// 订阅了以下3类事件,分别是客户端变动事件,客户端断开事件和客户端验证失败事件result.add(ClientEvent.ClientChangedEvent.class);result.add(ClientEvent.ClientDisconnectEvent.class);result.add(ClientEvent.ClientVerifyFailedEvent.class);return result;}@Overridepublic void onEvent(Event event) {if (EnvUtil.getStandaloneMode()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {// 同步到验证失败的服务节点上syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {// 同步到所有服务节点syncToAllServer((ClientEvent) event);}}private void syncToVerifyFailedServer(ClientEvent.ClientVerifyFailedEvent event) {Client client = clientManager.getClient(event.getClientId());if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);// 验证是否应直接同步失败的数据,依然是被包装,再通过rpc请求到对应的服务节点distroProtocol.syncToTarget(distroKey, DataOperation.ADD, event.getTargetServer(), 0L);}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}// 通过协议同步,逻辑是在内部执行相关的任务,再发送rpc请求if (event instanceof ClientEvent.ClientDisconnectEvent) {DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}  // 省略后边部分代码...
}

DistroTransportAgent

DistroTransportAgent是一个协议传输的代理类,主要负责同步数据用。它是一个接口定义。

public interface DistroTransportAgent {/*** 是否支持带回调的传输数据*/boolean supportCallbackTransport();/*** 同步数据*/boolean syncData(DistroData data, String targetServer);/*** 使用回调同步数据*/void syncData(DistroData data, String targetServer, DistroCallback callback);/*** 同步验证数据*/boolean syncVerifyData(DistroData verifyData, String targetServer);/*** 使用回调同步验证数据*/void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback);/*** 从目标服务器获取数据*/DistroData getData(DistroKey key, String targetServer);/*** 从目标服务器获取所有快照*/DistroData getDatumSnapshot(String targetServer);
}

其实现类为DistroClientTransportAgent。我们看下实现类中的同步数据方法是怎么实现的

@Override
public boolean syncData(DistroData data, String targetServer) {// 目标服务不存在if (isNoExistTarget(targetServer)) {return true;}// 构造distro协议数据的请求DistroDataRequest request = new DistroDataRequest(data, data.getType());// 找到对应的ip,端口等属性信息Member member = memberManager.find(targetServer);// 检查服务节点是否在线if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,data.getDistroKey());return false;}try {// 在线的话,发送grpc的请求Response response = clusterRpcClientProxy.sendRequest(member, request);// 检查结果是否成功return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e);}return false;
}

同步数据其实也就是将协议数据发送给其他的节点。

DistroClientTaskFailedHandler

DistroClientTaskFailedHandler比较简单,是对失败的一些任务延迟在进行重试。

public class DistroClientTaskFailedHandler implements DistroFailedTaskHandler {private final DistroTaskEngineHolder distroTaskEngineHolder;  public DistroClientTaskFailedHandler(DistroTaskEngineHolder distroTaskEngineHolder) {this.distroTaskEngineHolder = distroTaskEngineHolder;}@Overridepublic void retry(DistroKey distroKey, DataOperation action) {// 设置了重试的延迟时间DistroDelayTask retryTask = new DistroDelayTask(distroKey, action,DistroConfig.getInstance().getSyncRetryDelayMillis());// 放入延迟对垒,等待执行distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKey, retryTask);}
}

总结

分析到这里,我们可以进行以下总结

  1. 在应用启动的时候会首次加载快照数据,尽快恢复之前的状态
  2. 后台会不断的发送验证数据到各个节点,保证数据一致
  3. 在其通知失败的情况下,会继续进行延迟重试
  4. 维护的服务器集群地址可进行动态更细,并做健康检查,判断服务以及服务实例的上线下线
  5. distro协议类似于尽最大努力通知,其在不断的进行发送同步验证数据来保证数据的一致性

在分析过程中,我们始终是作为一个服务节点的客户端在分析代码,当服务节点的服务端接收到数据怎么处理的呢?只有即分析了客户端,又分析了服务端,这个才算是一个完整的闭环。另外我们开始只分析了DistroProtocol中在构造方法中的startVerifyTaskstartLoadTask方法,其他方法也很重要,但还没分析。这些内容的话留在下一篇继续讲解,敬请期待。

这篇关于13、Nacos 源码分析-Distro协议(上)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/387513

相关文章

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字

Python 迭代器和生成器概念及场景分析

《Python迭代器和生成器概念及场景分析》yield是Python中实现惰性计算和协程的核心工具,结合send()、throw()、close()等方法,能够构建高效、灵活的数据流和控制流模型,这... 目录迭代器的介绍自定义迭代器省略的迭代器生产器的介绍yield的普通用法yield的高级用法yidle

C++ Sort函数使用场景分析

《C++Sort函数使用场景分析》sort函数是algorithm库下的一个函数,sort函数是不稳定的,即大小相同的元素在排序后相对顺序可能发生改变,如果某些场景需要保持相同元素间的相对顺序,可使... 目录C++ Sort函数详解一、sort函数调用的两种方式二、sort函数使用场景三、sort函数排序

Nginx中配置HTTP/2协议的详细指南

《Nginx中配置HTTP/2协议的详细指南》HTTP/2是HTTP协议的下一代版本,旨在提高性能、减少延迟并优化现代网络环境中的通信效率,本文将为大家介绍Nginx配置HTTP/2协议想详细步骤,需... 目录一、HTTP/2 协议概述1.HTTP/22. HTTP/2 的核心特性3. HTTP/2 的优

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++

关于WebSocket协议状态码解析

《关于WebSocket协议状态码解析》:本文主要介绍关于WebSocket协议状态码的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录WebSocket协议状态码解析1. 引言2. WebSocket协议状态码概述3. WebSocket协议状态码详解3

kotlin中const 和val的区别及使用场景分析

《kotlin中const和val的区别及使用场景分析》在Kotlin中,const和val都是用来声明常量的,但它们的使用场景和功能有所不同,下面给大家介绍kotlin中const和val的区别,... 目录kotlin中const 和val的区别1. val:2. const:二 代码示例1 Java

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很