本文主要是介绍Netconf集群(ODL-beryllium),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
安装Feature
feature:install oscp-mdsal-all oscp-restconf-all oscp-mdsal-clustering oscp-netconf-clustered-topology oscp-netconf-mdsal oscp-netconf-connector-all
这里顺带介绍下关于集群akka.conf的配置文件
./configure_cluster.sh 1 192.168.1.1 192.168.1.2 192.168.1.3
1:代表当前执行脚本的节点是member1,对应IP为192.168.1.1
该脚本会自动生成akka.conf module-shard.conf modules.conf
bin/custom_shard_config.txt 则可以自定义新的分片./configure-cluster-ipdetect.sh 192.168.1.1 192.168.1.2 192.168.1.3
检测脚本,用于检测当前节点的ip是否在种子节点(列表)中,同时根据顺序修改member号
==================================================================
netconf-topology安装完成后,ClusteredNetconfTopology.java----->onSessionInitiated
该方法由配置子系统触发执行
调试中,这是配置是02-netconf-topology.xml
注意上面调试信息,是打印在NetconfTopologyImpl.java中的onSessionInitiated方法中
注册成功后,会返回CommitStatus(ConfigRegistryImpl.java)--------------commitConfigSafe
方法中创建一个BaseTopologyManager的TypeActor
该Actor需要传入8个参数
RoleChangeStrategy-------------TopologyRoleChangeStrategy
entityType----------------topology-netconf
entityName---------------topology-manager
public final class BaseTopologyManagerimplements TopologyManager {
在BaseTopologyManager的actor preStart方法中,使用传入(new)的TopologyRoleChangeStrategy
执行registerRoleCandidate
其中entity对象:
entityType----------------topology-netconf
entityName---------------topology-manager
在TopologyRoleChangeStrategy中的registerRoleCandidate中,进行了ownership的entity注册和监听器的注册
OwnerShip介绍参考
https://www.sdnlab.com/20046.html
https://www.sdnlab.com/community/article/103
注意到EntityOwnerShipService由DistributedEntityOwnershipService实现
在操作库中在于entity-ownership的分片
Entity的主备,则由该分片主备决定
介绍下上重要的两个方法:
DistributedEntityOwnershipService--registerCandidate---executeLocalEntityOwnershipShardOperation
---EntityOwnershipShard-----onRegisterCandidateLocal
核心在Operational库里写入数据
DistributedEntityOwnershipService--registerListener---executeLocalEntityOwnershipShardOperation
---EntityOwnershipShard-----onRegisterListenerLocal-------EntityOwnershipListenerSupport-----addEntityOwnershipListener
listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
private final Multimap<String, EntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
<entityType,listener>存储在entityTypeListenerMap
private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
根据Listener创建ListenerActorRefEntry
private <T> void addListener(EntityOwnershipListener listener, T mapKey,Multimap<T, EntityOwnershipListener> toListenerMap) {if(toListenerMap.put(mapKey, listener)) {ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);if(listenerEntry == null) {listenerActorMap.put(listener, new ListenerActorRefEntry());} else {listenerEntry.referenceCount++;}}}
ListenerActorRefEntry使用EntityOwnershipListenerActor
在该分片RoleChanged变化时,触发分片里已经注册Entity的RoleChanged(重写),同时触发其ownershipChanged监听操作,提供在应对集群主备切换时 APP能够进行处理。。
注意到
class EntityOwnershipShard extends Shard {
所以EntityOwner分片可以处理所有Shard消息,包括RoleChanged
@Overrideprotected void onStateChanged() {super.onStateChanged();boolean isLeader = isLeader();LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader());// Examine current RAFT state to see if we are in jeopardy, potentially notifying all listenersfinal boolean inJeopardy = inJeopardy(getRaftState());final boolean wasInJeopardy = listenerSupport.setInJeopardy(inJeopardy);if (inJeopardy != wasInJeopardy) {LOG.debug("{}: {} jeopardy state, notifying all listeners", persistenceId(),inJeopardy ? "entered" : "left");notifyAllListeners();}commitCoordinator.onStateChanged(this, isLeader);}
处执行继承分片的onStateChanged还有该分片自己独特处理逻辑,其最执行notifyAllListener用于触发App对应的操作
注意到最终的通知来到EntityOwnershipListenerSupport
private void notifyListeners(Entity entity, boolean wasOwner, boolean isOwner, boolean hasOwner,Collection<EntityOwnershipListener> listeners) {EntityOwnershipChange changed = new EntityOwnershipChange(entity, wasOwner, isOwner, hasOwner, inJeopardy);for(EntityOwnershipListener listener: listeners) {ActorRef listenerActor = listenerActorFor(listener);LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed);listenerActor.tell(changed, ActorRef.noSender());}}
其中循环的listeners,为上文中的entityTypeListenerMap中获取
===================================================================
TopologyRoleChangeStrategy.java
在触发到Netconf的onRoleChanged时,升主,注册topology-netconf的数据库变更通知
(但也有场景,业务需要集群的所有节点上都能收到数据变更消息,为此ODL提供了ClusteredDataChangeListener和ClusteredDataTreeChangeListener,这两类监听器在所所有节点上都能收到主节点发出的数据变更消息)
降备则注销topology-netconf数据库变更通知
此时只要有节点数据写入,TopologyRoleChangeStrategy中的onDataTreeChanged监听回调则会执行
最后,执行NodeListener的onRoleChanged
该NodeListener由registerRoleCandidate指定,在BaseTopologyManager的preStart中进行注册的,注册的就是本身的actor
在BaseTopologyManager代码中可以看到,对于isMaster进行特别处理,
再执行delegateTopologyHandler(NetconfTopologyManagerCallback impements TopologyManagerCallback)的onRoleChanged
上面已经在主上注册了数据指定数据库的变更通知,在节点操作时,则会进入到
相应的NodeCreate NodeUpdate NodeDelete
依旧BaseTopologyManager的对应方法中(NodeCreate)
@Overridepublic ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);if (created.containsKey(nodeId)) {LOG.warn("Node{} already exists, triggering update..", nodeId);return onNodeUpdated(nodeId, node);}created.put(nodeId, node);final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();if (isMaster) {//主上操作futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));// only master should call connect on peers and aggregate futuresfor (TopologyManager topologyManager : peers.values()) {///需要遍历其它节点// convert binding into NormalizedNode for transferfinal Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);LOG.debug("start update node {} to peers:{} ",nodeId,typedExtension.getActorRefFor(topologyManager).path());// add a future into our futures that gets its completion status from the converted scala futurefinal SettableFuture<Node> settableFuture = SettableFuture.create();futures.add(settableFuture);在其它节点也执行创建逻辑final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {@Overridepublic void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {if (failure != null) {settableFuture.setException(failure);return;}final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());final Node value = (Node) fromNormalizedNode.getValue();LOG.debug("update node {} success in peers {}",nodeId,typedExtension.getActorRefFor(topologyManager).path());settableFuture.set(value);}}, TypedActor.context().dispatcher());}
注意还需要更新节点的状态,因应其是有与备连接信息的,所以这里存在一个状态的聚合逻辑final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {@Overridepublic void onSuccess(final Node result) {LOG.debug("Futures aggregated succesfully");naSalNodeWriter.init(nodeId, result);}@Overridepublic void onFailure(final Throwable t) {// If the combined connection attempt failed, set the node to connection failedLOG.debug("Futures aggregation failed");naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));// FIXME disconnect those which succeeded// just issue a delete on delegateTopologyHandler that gets handled on lower level}}, TypedActor.context().dispatcher());//combine peer futuresreturn aggregatedFuture;}// trigger create on this slavereturn delegateTopologyHandler.onNodeCreated(nodeId, node);}
下面关注一下聚合操作做了些什么?(create,update,delete其实原理一样)
NetconfNodeOperationalDataAggregator-------------》combineCreateAttempts
见注释
可以看到,聚合结果完成后,则开始进行数据写入
为了记录这个操作,会进入到LoggingSalNodeWriter之后再进TopologyNodeWriter
public void update(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {lock.lock();LOG.debug("TopologyNodeWriter start to update {} with {}.", id, operationalDataNode);try {final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();final InstanceIdentifier<Node> path = createBindingPathForTopology(new NodeKey(id), topologyId);LOG.debug("{}: Update device state transaction {} merging operational data started. Putting data on path {}",id, writeTx.getIdentifier(), operationalDataNode);writeTx.put(LogicalDatastoreType.OPERATIONAL, path, operationalDataNode);LOG.debug("{}: Update device state transaction {} merging operational data ended.",id, writeTx.getIdentifier());commitTransaction(writeTx, "update", id);} finally {lock.unlock();}
}
SalNodeWriter
public void update(@Nonnull final NodeId id, @Nonnull final Node operationalDataNode) {LOG.debug("SalNodeWriter start to update {} with {}.", id, operationalDataNode);// mergefinal WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();wTx.put(LogicalDatastoreType.OPERATIONAL, createBindingPathForTopology(id), operationalDataNode);commitTransaction(wTx, id, "update");
}
关注下LoggingSalNodeWriter
NodeWriter
查看writer的使用处
再将writer包装到LoggingSalNodeWriter中
========================
NetconfNodeManagerCallback.java------------->onDeviceConnected
BaseTopologyManager.java-------------->notifyNodeStatusChange
BaseTopologyManager.java------>
@Override public void notifyNodeStatusChange(final NodeId nodeId) {LOG.debug("Connection status has changed on node {}, isMaster {}", nodeId.getValue(), isMaster);if (isMaster) {// grab status from all peers and aggregate final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
NetconfTopologyManagerCallback.java-------->getCurrentStatusForNode
该类存储了
private final Map<NodeId, NodeManager> nodes = new HashMap<>();
以nodeId为key,以BaseNodeManager为value
这篇关于Netconf集群(ODL-beryllium)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!