Netconf集群(ODL-beryllium)

2024-02-19 18:58
文章标签 集群 netconf odl beryllium

本文主要是介绍Netconf集群(ODL-beryllium),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

安装Feature

feature:install oscp-mdsal-all oscp-restconf-all oscp-mdsal-clustering oscp-netconf-clustered-topology oscp-netconf-mdsal oscp-netconf-connector-all


这里顺带介绍下关于集群akka.conf的配置文件

./configure_cluster.sh 1 192.168.1.1 192.168.1.2 192.168.1.3

1:代表当前执行脚本的节点是member1,对应IP为192.168.1.1

该脚本会自动生成akka.conf  module-shard.conf modules.conf 

bin/custom_shard_config.txt 则可以自定义新的分片

./configure-cluster-ipdetect.sh 192.168.1.1 192.168.1.2 192.168.1.3

检测脚本,用于检测当前节点的ip是否在种子节点(列表)中,同时根据顺序修改member号

==================================================================

netconf-topology安装完成后,ClusteredNetconfTopology.java----->onSessionInitiated

该方法由配置子系统触发执行


调试中,这是配置是02-netconf-topology.xml


注意上面调试信息,是打印在NetconfTopologyImpl.java中的onSessionInitiated方法中

注册成功后,会返回CommitStatus(ConfigRegistryImpl.java)--------------commitConfigSafe


方法中创建一个BaseTopologyManager的TypeActor


该Actor需要传入8个参数

RoleChangeStrategy-------------TopologyRoleChangeStrategy
entityType----------------topology-netconf

entityName---------------topology-manager

public final class BaseTopologyManagerimplements TopologyManager {

在BaseTopologyManager的actor preStart方法中,使用传入(new)的TopologyRoleChangeStrategy


执行registerRoleCandidate


其中entity对象:

entityType----------------topology-netconf

entityName---------------topology-manager

在TopologyRoleChangeStrategy中的registerRoleCandidate中,进行了ownership的entity注册和监听器的注册

OwnerShip介绍参考

https://www.sdnlab.com/20046.html

https://www.sdnlab.com/community/article/103


注意到EntityOwnerShipService由DistributedEntityOwnershipService实现

在操作库中在于entity-ownership的分片


Entity的主备,则由该分片主备决定


介绍下上重要的两个方法:

DistributedEntityOwnershipService--registerCandidate---executeLocalEntityOwnershipShardOperation

---EntityOwnershipShard-----onRegisterCandidateLocal

核心在Operational库里写入数据

DistributedEntityOwnershipService--registerListener---executeLocalEntityOwnershipShardOperation

---EntityOwnershipShard-----onRegisterListenerLocal-------EntityOwnershipListenerSupport-----addEntityOwnershipListener

listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());

private final Multimap<String, EntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();

<entityType,listener>存储在entityTypeListenerMap


private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
根据Listener创建ListenerActorRefEntry

    private <T> void addListener(EntityOwnershipListener listener, T mapKey,Multimap<T, EntityOwnershipListener> toListenerMap) {if(toListenerMap.put(mapKey, listener)) {ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);if(listenerEntry == null) {listenerActorMap.put(listener, new ListenerActorRefEntry());} else {listenerEntry.referenceCount++;}}}

ListenerActorRefEntry使用EntityOwnershipListenerActor



在该分片RoleChanged变化时,触发分片里已经注册Entity的RoleChanged(重写),同时触发其ownershipChanged监听操作,提供在应对集群主备切换时 APP能够进行处理。。

注意到

class EntityOwnershipShard extends Shard {

所以EntityOwner分片可以处理所有Shard消息,包括RoleChanged

    @Overrideprotected void onStateChanged() {super.onStateChanged();boolean isLeader = isLeader();LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader());// Examine current RAFT state to see if we are in jeopardy, potentially notifying all listenersfinal boolean inJeopardy = inJeopardy(getRaftState());final boolean wasInJeopardy = listenerSupport.setInJeopardy(inJeopardy);if (inJeopardy != wasInJeopardy) {LOG.debug("{}: {} jeopardy state, notifying all listeners", persistenceId(),inJeopardy ? "entered" : "left");notifyAllListeners();}commitCoordinator.onStateChanged(this, isLeader);}
处执行继承分片的onStateChanged还有该分片自己独特处理逻辑,其最执行notifyAllListener用于触发App对应的操作


注意到最终的通知来到EntityOwnershipListenerSupport

    private void notifyListeners(Entity entity, boolean wasOwner, boolean isOwner, boolean hasOwner,Collection<EntityOwnershipListener> listeners) {EntityOwnershipChange changed = new EntityOwnershipChange(entity, wasOwner, isOwner, hasOwner, inJeopardy);for(EntityOwnershipListener listener: listeners) {ActorRef listenerActor = listenerActorFor(listener);LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed);listenerActor.tell(changed, ActorRef.noSender());}}

其中循环的listeners,为上文中的entityTypeListenerMap中获取

===================================================================

TopologyRoleChangeStrategy.java


在触发到Netconf的onRoleChanged时,升主,注册topology-netconf的数据库变更通知

(但也有场景,业务需要集群的所有节点上都能收到数据变更消息,为此ODL提供了ClusteredDataChangeListenerClusteredDataTreeChangeListener,这两类监听器在所所有节点上都能收到主节点发出的数据变更消息)

降备则注销topology-netconf数据库变更通知


此时只要有节点数据写入,TopologyRoleChangeStrategy中的onDataTreeChanged监听回调则会执行



最后,执行NodeListener的onRoleChanged

NodeListener由registerRoleCandidate指定,在BaseTopologyManager的preStart中进行注册的,注册的就是本身的actor


BaseTopologyManager代码中可以看到,对于isMaster进行特别处理,

再执行delegateTopologyHandler(NetconfTopologyManagerCallback impements TopologyManagerCallback)的onRoleChanged


上面已经在主上注册了数据指定数据库的变更通知,在节点操作时,则会进入到

相应的NodeCreate NodeUpdate NodeDelete

依旧BaseTopologyManager的对应方法中(NodeCreate)

 @Overridepublic ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);if (created.containsKey(nodeId)) {LOG.warn("Node{} already exists, triggering update..", nodeId);return onNodeUpdated(nodeId, node);}created.put(nodeId, node);final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();if (isMaster) {//主上操作futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));// only master should call connect on peers and aggregate futuresfor (TopologyManager topologyManager : peers.values()) {///需要遍历其它节点// convert binding into NormalizedNode for transferfinal Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);LOG.debug("start update node {} to peers:{} ",nodeId,typedExtension.getActorRefFor(topologyManager).path());// add a future into our futures that gets its completion status from the converted scala futurefinal SettableFuture<Node> settableFuture = SettableFuture.create();futures.add(settableFuture);在其它节点也执行创建逻辑final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {@Overridepublic void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {if (failure != null) {settableFuture.setException(failure);return;}final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());final Node value = (Node) fromNormalizedNode.getValue();LOG.debug("update node {} success in peers {}",nodeId,typedExtension.getActorRefFor(topologyManager).path());settableFuture.set(value);}}, TypedActor.context().dispatcher());}
注意还需要更新节点的状态,因应其是有与备连接信息的,所以这里存在一个状态的聚合逻辑final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {@Overridepublic void onSuccess(final Node result) {LOG.debug("Futures aggregated succesfully");naSalNodeWriter.init(nodeId, result);}@Overridepublic void onFailure(final Throwable t) {// If the combined connection attempt failed, set the node to connection failedLOG.debug("Futures aggregation failed");naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));// FIXME disconnect those which succeeded// just issue a delete on delegateTopologyHandler that gets handled on lower level}}, TypedActor.context().dispatcher());//combine peer futuresreturn aggregatedFuture;}// trigger create on this slavereturn delegateTopologyHandler.onNodeCreated(nodeId, node);}

下面关注一下聚合操作做了些什么?(create,update,delete其实原理一样)

NetconfNodeOperationalDataAggregator-------------》combineCreateAttempts


见注释


可以看到,聚合结果完成后,则开始进行数据写入


为了记录这个操作,会进入到LoggingSalNodeWriter之后再进TopologyNodeWriter

public void update(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {lock.lock();LOG.debug("TopologyNodeWriter start to update {} with {}.", id, operationalDataNode);try {final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);LOG.debug("{}: Update device state transaction {} merging operational data started. Putting data on path {}",id, writeTx.getIdentifier(), operationalDataNode);writeTx.put(LogicalDatastoreType.OPERATIONAL, path, operationalDataNode);LOG.debug("{}: Update device state transaction {} merging operational data ended.",id, writeTx.getIdentifier());commitTransaction(writeTx, "update", id);} finally {lock.unlock();}
}

SalNodeWriter

public void update(@Nonnull final NodeId id, @Nonnull final Node operationalDataNode) {LOG.debug("SalNodeWriter start to update {} with {}.", id, operationalDataNode);// mergefinal WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();wTx.put(LogicalDatastoreType.OPERATIONAL, createBindingPathForTopology(id), operationalDataNode);commitTransaction(wTx, id, "update");
}

关注下LoggingSalNodeWriter


NodeWriter


查看writer的使用处


再将writer包装到LoggingSalNodeWriter中

========================

NetconfNodeManagerCallback.java------------->onDeviceConnected

BaseTopologyManager.java-------------->notifyNodeStatusChange


BaseTopologyManager.java------>

@Override
public void notifyNodeStatusChange(final NodeId nodeId) {LOG.debug("Connection status has changed on node {}, isMaster {}", nodeId.getValue(), isMaster);if (isMaster) {// grab status from all peers and aggregate
        final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));

NetconfTopologyManagerCallback.java-------->getCurrentStatusForNode   

该类存储了

private final Map<NodeId, NodeManager> nodes = new HashMap<>();


以nodeId为key,以BaseNodeManager为value



这篇关于Netconf集群(ODL-beryllium)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/725633

相关文章

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

SpringBoot连接Redis集群教程

《SpringBoot连接Redis集群教程》:本文主要介绍SpringBoot连接Redis集群教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 依赖2. 修改配置文件3. 创建RedisClusterConfig4. 测试总结1. 依赖 <de

Nginx使用Keepalived部署web集群(高可用高性能负载均衡)实战案例

《Nginx使用Keepalived部署web集群(高可用高性能负载均衡)实战案例》本文介绍Nginx+Keepalived实现Web集群高可用负载均衡的部署与测试,涵盖架构设计、环境配置、健康检查、... 目录前言一、架构设计二、环境准备三、案例部署配置 前端 Keepalived配置 前端 Nginx

Redis高可用-主从复制、哨兵模式与集群模式详解

《Redis高可用-主从复制、哨兵模式与集群模式详解》:本文主要介绍Redis高可用-主从复制、哨兵模式与集群模式的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝... 目录Redis高可用-主从复制、哨兵模式与集群模式概要一、主从复制(Master-Slave Repli

Redis分片集群的实现

《Redis分片集群的实现》Redis分片集群是一种将Redis数据库分散到多个节点上的方式,以提供更高的性能和可伸缩性,本文主要介绍了Redis分片集群的实现,具有一定的参考价值,感兴趣的可以了解一... 目录1. Redis Cluster的核心概念哈希槽(Hash Slots)主从复制与故障转移2.

centos7基于keepalived+nginx部署k8s1.26.0高可用集群

《centos7基于keepalived+nginx部署k8s1.26.0高可用集群》Kubernetes是一个开源的容器编排平台,用于自动化地部署、扩展和管理容器化应用程序,在生产环境中,为了确保集... 目录一、初始化(所有节点都执行)二、安装containerd(所有节点都执行)三、安装docker-

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

Nacos集群数据同步方式

《Nacos集群数据同步方式》文章主要介绍了Nacos集群中服务注册信息的同步机制,涉及到负责节点和非负责节点之间的数据同步过程,以及DistroProtocol协议在同步中的应用... 目录引言负责节点(发起同步)DistroProtocolDistroSyncChangeTask获取同步数据getDis

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节