Netconf集群(ODL-beryllium)

2024-02-19 18:58
文章标签 集群 netconf odl beryllium

本文主要是介绍Netconf集群(ODL-beryllium),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

安装Feature

feature:install oscp-mdsal-all oscp-restconf-all oscp-mdsal-clustering oscp-netconf-clustered-topology oscp-netconf-mdsal oscp-netconf-connector-all


这里顺带介绍下关于集群akka.conf的配置文件

./configure_cluster.sh 1 192.168.1.1 192.168.1.2 192.168.1.3

1:代表当前执行脚本的节点是member1,对应IP为192.168.1.1

该脚本会自动生成akka.conf  module-shard.conf modules.conf 

bin/custom_shard_config.txt 则可以自定义新的分片

./configure-cluster-ipdetect.sh 192.168.1.1 192.168.1.2 192.168.1.3

检测脚本,用于检测当前节点的ip是否在种子节点(列表)中,同时根据顺序修改member号

==================================================================

netconf-topology安装完成后,ClusteredNetconfTopology.java----->onSessionInitiated

该方法由配置子系统触发执行


调试中,这是配置是02-netconf-topology.xml


注意上面调试信息,是打印在NetconfTopologyImpl.java中的onSessionInitiated方法中

注册成功后,会返回CommitStatus(ConfigRegistryImpl.java)--------------commitConfigSafe


方法中创建一个BaseTopologyManager的TypeActor


该Actor需要传入8个参数

RoleChangeStrategy-------------TopologyRoleChangeStrategy
entityType----------------topology-netconf

entityName---------------topology-manager

public final class BaseTopologyManagerimplements TopologyManager {

在BaseTopologyManager的actor preStart方法中,使用传入(new)的TopologyRoleChangeStrategy


执行registerRoleCandidate


其中entity对象:

entityType----------------topology-netconf

entityName---------------topology-manager

在TopologyRoleChangeStrategy中的registerRoleCandidate中,进行了ownership的entity注册和监听器的注册

OwnerShip介绍参考

https://www.sdnlab.com/20046.html

https://www.sdnlab.com/community/article/103


注意到EntityOwnerShipService由DistributedEntityOwnershipService实现

在操作库中在于entity-ownership的分片


Entity的主备,则由该分片主备决定


介绍下上重要的两个方法:

DistributedEntityOwnershipService--registerCandidate---executeLocalEntityOwnershipShardOperation

---EntityOwnershipShard-----onRegisterCandidateLocal

核心在Operational库里写入数据

DistributedEntityOwnershipService--registerListener---executeLocalEntityOwnershipShardOperation

---EntityOwnershipShard-----onRegisterListenerLocal-------EntityOwnershipListenerSupport-----addEntityOwnershipListener

listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());

private final Multimap<String, EntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();

<entityType,listener>存储在entityTypeListenerMap


private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
根据Listener创建ListenerActorRefEntry

    private <T> void addListener(EntityOwnershipListener listener, T mapKey,Multimap<T, EntityOwnershipListener> toListenerMap) {if(toListenerMap.put(mapKey, listener)) {ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);if(listenerEntry == null) {listenerActorMap.put(listener, new ListenerActorRefEntry());} else {listenerEntry.referenceCount++;}}}

ListenerActorRefEntry使用EntityOwnershipListenerActor



在该分片RoleChanged变化时,触发分片里已经注册Entity的RoleChanged(重写),同时触发其ownershipChanged监听操作,提供在应对集群主备切换时 APP能够进行处理。。

注意到

class EntityOwnershipShard extends Shard {

所以EntityOwner分片可以处理所有Shard消息,包括RoleChanged

    @Overrideprotected void onStateChanged() {super.onStateChanged();boolean isLeader = isLeader();LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader());// Examine current RAFT state to see if we are in jeopardy, potentially notifying all listenersfinal boolean inJeopardy = inJeopardy(getRaftState());final boolean wasInJeopardy = listenerSupport.setInJeopardy(inJeopardy);if (inJeopardy != wasInJeopardy) {LOG.debug("{}: {} jeopardy state, notifying all listeners", persistenceId(),inJeopardy ? "entered" : "left");notifyAllListeners();}commitCoordinator.onStateChanged(this, isLeader);}
处执行继承分片的onStateChanged还有该分片自己独特处理逻辑,其最执行notifyAllListener用于触发App对应的操作


注意到最终的通知来到EntityOwnershipListenerSupport

    private void notifyListeners(Entity entity, boolean wasOwner, boolean isOwner, boolean hasOwner,Collection<EntityOwnershipListener> listeners) {EntityOwnershipChange changed = new EntityOwnershipChange(entity, wasOwner, isOwner, hasOwner, inJeopardy);for(EntityOwnershipListener listener: listeners) {ActorRef listenerActor = listenerActorFor(listener);LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed);listenerActor.tell(changed, ActorRef.noSender());}}

其中循环的listeners,为上文中的entityTypeListenerMap中获取

===================================================================

TopologyRoleChangeStrategy.java


在触发到Netconf的onRoleChanged时,升主,注册topology-netconf的数据库变更通知

(但也有场景,业务需要集群的所有节点上都能收到数据变更消息,为此ODL提供了ClusteredDataChangeListenerClusteredDataTreeChangeListener,这两类监听器在所所有节点上都能收到主节点发出的数据变更消息)

降备则注销topology-netconf数据库变更通知


此时只要有节点数据写入,TopologyRoleChangeStrategy中的onDataTreeChanged监听回调则会执行



最后,执行NodeListener的onRoleChanged

NodeListener由registerRoleCandidate指定,在BaseTopologyManager的preStart中进行注册的,注册的就是本身的actor


BaseTopologyManager代码中可以看到,对于isMaster进行特别处理,

再执行delegateTopologyHandler(NetconfTopologyManagerCallback impements TopologyManagerCallback)的onRoleChanged


上面已经在主上注册了数据指定数据库的变更通知,在节点操作时,则会进入到

相应的NodeCreate NodeUpdate NodeDelete

依旧BaseTopologyManager的对应方法中(NodeCreate)

 @Overridepublic ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);if (created.containsKey(nodeId)) {LOG.warn("Node{} already exists, triggering update..", nodeId);return onNodeUpdated(nodeId, node);}created.put(nodeId, node);final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();if (isMaster) {//主上操作futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));// only master should call connect on peers and aggregate futuresfor (TopologyManager topologyManager : peers.values()) {///需要遍历其它节点// convert binding into NormalizedNode for transferfinal Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);LOG.debug("start update node {} to peers:{} ",nodeId,typedExtension.getActorRefFor(topologyManager).path());// add a future into our futures that gets its completion status from the converted scala futurefinal SettableFuture<Node> settableFuture = SettableFuture.create();futures.add(settableFuture);在其它节点也执行创建逻辑final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {@Overridepublic void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {if (failure != null) {settableFuture.setException(failure);return;}final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());final Node value = (Node) fromNormalizedNode.getValue();LOG.debug("update node {} success in peers {}",nodeId,typedExtension.getActorRefFor(topologyManager).path());settableFuture.set(value);}}, TypedActor.context().dispatcher());}
注意还需要更新节点的状态,因应其是有与备连接信息的,所以这里存在一个状态的聚合逻辑final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {@Overridepublic void onSuccess(final Node result) {LOG.debug("Futures aggregated succesfully");naSalNodeWriter.init(nodeId, result);}@Overridepublic void onFailure(final Throwable t) {// If the combined connection attempt failed, set the node to connection failedLOG.debug("Futures aggregation failed");naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));// FIXME disconnect those which succeeded// just issue a delete on delegateTopologyHandler that gets handled on lower level}}, TypedActor.context().dispatcher());//combine peer futuresreturn aggregatedFuture;}// trigger create on this slavereturn delegateTopologyHandler.onNodeCreated(nodeId, node);}

下面关注一下聚合操作做了些什么?(create,update,delete其实原理一样)

NetconfNodeOperationalDataAggregator-------------》combineCreateAttempts


见注释


可以看到,聚合结果完成后,则开始进行数据写入


为了记录这个操作,会进入到LoggingSalNodeWriter之后再进TopologyNodeWriter

public void update(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {lock.lock();LOG.debug("TopologyNodeWriter start to update {} with {}.", id, operationalDataNode);try {final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);LOG.debug("{}: Update device state transaction {} merging operational data started. Putting data on path {}",id, writeTx.getIdentifier(), operationalDataNode);writeTx.put(LogicalDatastoreType.OPERATIONAL, path, operationalDataNode);LOG.debug("{}: Update device state transaction {} merging operational data ended.",id, writeTx.getIdentifier());commitTransaction(writeTx, "update", id);} finally {lock.unlock();}
}

SalNodeWriter

public void update(@Nonnull final NodeId id, @Nonnull final Node operationalDataNode) {LOG.debug("SalNodeWriter start to update {} with {}.", id, operationalDataNode);// mergefinal WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();wTx.put(LogicalDatastoreType.OPERATIONAL, createBindingPathForTopology(id), operationalDataNode);commitTransaction(wTx, id, "update");
}

关注下LoggingSalNodeWriter


NodeWriter


查看writer的使用处


再将writer包装到LoggingSalNodeWriter中

========================

NetconfNodeManagerCallback.java------------->onDeviceConnected

BaseTopologyManager.java-------------->notifyNodeStatusChange


BaseTopologyManager.java------>

@Override
public void notifyNodeStatusChange(final NodeId nodeId) {LOG.debug("Connection status has changed on node {}, isMaster {}", nodeId.getValue(), isMaster);if (isMaster) {// grab status from all peers and aggregate
        final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));

NetconfTopologyManagerCallback.java-------->getCurrentStatusForNode   

该类存储了

private final Map<NodeId, NodeManager> nodes = new HashMap<>();


以nodeId为key,以BaseNodeManager为value



这篇关于Netconf集群(ODL-beryllium)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/725633

相关文章

centos7基于keepalived+nginx部署k8s1.26.0高可用集群

《centos7基于keepalived+nginx部署k8s1.26.0高可用集群》Kubernetes是一个开源的容器编排平台,用于自动化地部署、扩展和管理容器化应用程序,在生产环境中,为了确保集... 目录一、初始化(所有节点都执行)二、安装containerd(所有节点都执行)三、安装docker-

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

Nacos集群数据同步方式

《Nacos集群数据同步方式》文章主要介绍了Nacos集群中服务注册信息的同步机制,涉及到负责节点和非负责节点之间的数据同步过程,以及DistroProtocol协议在同步中的应用... 目录引言负责节点(发起同步)DistroProtocolDistroSyncChangeTask获取同步数据getDis

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

一种改进的red5集群方案的应用、基于Red5服务器集群负载均衡调度算法研究

转自: 一种改进的red5集群方案的应用: http://wenku.baidu.com/link?url=jYQ1wNwHVBqJ-5XCYq0PRligp6Y5q6BYXyISUsF56My8DP8dc9CZ4pZvpPz1abxJn8fojMrL0IyfmMHStpvkotqC1RWlRMGnzVL1X4IPOa_  基于Red5服务器集群负载均衡调度算法研究 http://ww

828华为云征文|华为云Flexus X实例docker部署rancher并构建k8s集群

828华为云征文|华为云Flexus X实例docker部署rancher并构建k8s集群 华为云最近正在举办828 B2B企业节,Flexus X实例的促销力度非常大,特别适合那些对算力性能有高要求的小伙伴。如果你有自建MySQL、Redis、Nginx等服务的需求,一定不要错过这个机会。赶紧去看看吧! 什么是华为云Flexus X实例 华为云Flexus X实例云服务是新一代开箱即用、体

kubernetes集群部署Zabbix监控平台

一、zabbix介绍 1.zabbix简介 Zabbix是一个基于Web界面的分布式系统监控的企业级开源软件。可以监视各种系统与设备的参数,保障服务器及设备的安全运营。 2.zabbix特点 (1)安装与配置简单。 (2)可视化web管理界面。 (3)免费开源。 (4)支持中文。 (5)自动发现。 (6)分布式监控。 (7)实时绘图。 3.zabbix的主要功能