Akka之actor name [c1] is not unique!

2024-02-19 18:58
文章标签 akka c1 unique actor

本文主要是介绍Akka之actor name [c1] is not unique!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在使用ODL的过程中,突然出现如题所述的故障,字面上看在actor的池中存在了两个同名的actor。在功能开发设计时,当然是不允许actor名称相同,基本可以排除开发人员识定义了重名的actor。下面编写可复现的测试说明该问题。

测试代码,父Actor

package com.zte.sunquan.deom.ofo;import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import java.util.Optional;/*** Created by sunquan on 2018/3/29.*/
public class ParentActor extends AbstractActor {private String name;private ActorRef c1;public ParentActor(String name) {this.name = name;}public ParentActor() {c1 = getContext().actorOf(ChildActor.props("c1"), "c1");ActorRef c2 = getContext().actorOf(ChildActor.props("c2"), "c2");}public static Props props() {return Props.create(ParentActor.class);}@Overridepublic Receive createReceive() {return receiveBuilder().matchEquals("killc1",p->{getContext().actorOf(ChildActor.props("c1"),"c1");getContext().stop(c1);}).matchAny(p -> {System.out.println("parent:" + p);}).build();}@Overridepublic void preStart() throws Exception {super.preStart();System.out.println("parent pre start");}@Overridepublic void postStop() throws Exception {super.postStop();System.out.println("parent pre stop");}@Overridepublic void preRestart(Throwable reason, Optional<Object> message) throws Exception {super.preRestart(reason, message);System.out.println("parent pre reStart2");}@Overridepublic void postRestart(Throwable reason) throws Exception {super.postRestart(reason);System.out.println("parent pre reStart1");}
}

子Actor

package com.zte.sunquan.deom.ofo;import akka.actor.AbstractActor;
import akka.actor.Props;
import java.util.Optional;/*** Created by sunquan on 2018/3/29.*/
public class ChildActor extends AbstractActor {private String name;public ChildActor() {}public ChildActor(String name) {this.name = name;}public static Props props(String name) {return Props.create(ChildActor.class,name);}@Overridepublic Receive createReceive() {return receiveBuilder().matchAny(p -> {System.out.println("child:" + p);}).build();}@Overridepublic void preStart() throws Exception {super.preStart();System.out.println("child " + name + " pre start");}@Overridepublic void postStop() throws Exception {super.postStop();System.out.println("child " + name + " pre stop");}@Overridepublic void preRestart(Throwable reason, Optional<Object> message) throws Exception {super.preRestart(reason, message);System.out.println("child " + name + " pre reStart1");}@Overridepublic void postRestart(Throwable reason) throws Exception {super.postRestart(reason);System.out.println("child " + name + " pre reStart2");}
}

测试用例:

package com.zte.sunquan.deom.ofo;import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import org.junit.Test;/*** Created by sunquan on 2018/3/29.*/
public class PCTest {static ActorSystem _system = ActorSystem.create("mysystem");@Testpublic void test() throws InterruptedException {_system.actorOf(ParentActor.props(),"parent");ActorRef parent = _system.actorFor("akka://mysystem/user/parent");Thread.sleep(3000);ActorRef c1 = _system.actorFor("akka://mysystem/user/parent/c1");ActorRef c2 = _system.actorFor("akka://mysystem/user/parent/c2");parent.tell("p1",ActorRef.noSender());c1.tell("c1",ActorRef.noSender());c2.tell("c2",ActorRef.noSender());parent.tell("killc1",ActorRef.noSender());_system.actorOf(ChildActor.props("c1"),"c1");Thread.sleep(5000);_system.terminate();}}

打印结果:


parent pre start
child c2 pre start
child c1 pre start
child:c2
child:c1
parent:p1
child c1 pre start        ===========ParentActor接收killc1,导致创建c1
parent pre stop           ===========此时由于c1存在同名actor,未关闭,所以导致父Actor,关闭重启
parent pre reStart2
child c2 pre stop        
child c1 pre stop
child c1 pre start
child c2 pre start         
parent pre start
parent pre reStart1       ============父Actor重启以及其中的子Actor重启
[ERROR] [03/29/2018 14:08:05.983] [mysystem-akka.actor.default-dispatcher-6] [akka://mysystem/user/parent] actor name [c1] is not unique!
akka.actor.InvalidActorNameException: actor name [c1] is not unique!at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:129)at akka.actor.dungeon.Children$class.reserveChild(Children.scala:134)at akka.actor.ActorCell.reserveChild(ActorCell.scala:370)at akka.actor.dungeon.Children$class.makeChild(Children.scala:272)at akka.actor.dungeon.Children$class.actorOf(Children.scala:44)at akka.actor.ActorCell.actorOf(ActorCell.scala:370)at com.zte.sunquan.deom.ofo.ParentActor.lambda$createReceive$0(ParentActor.java:31)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at akka.actor.Actor$class.aroundReceive(Actor.scala:514)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:132)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)at akka.actor.ActorCell.invoke(ActorCell.scala:496)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)at akka.dispatch.Mailbox.run(Mailbox.scala:224)at akka.dispatch.Mailbox.exec(Mailbox.scala:234)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)[INFO] [03/29/2018 14:08:10.968] [Thread-0] [CoordinatedShutdown(akka://mysystem)] Starting coordinated shutdown from JVM shutdown hook
child c1 pre stop
child c1 pre stop
child c2 pre stop
parent pre stop
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 -Duser.language=en -Duser.country=USProcess finished with exit code 0

在向ParentActor发送killC后的处理逻辑中有再次创建c1这个Actor,与池中已存在的c1这个Actor的创建重名异常,被父actor中的监督策略捕获,触发重启(包括了子actor)

父 Actor 首先调用 preRestart ,然后 被实例化,在实例化过程中,再次创建c1这个actor,再调用 postRestart,最后再重启它的子Actor,子Actor c1重启时,又重名。所以打印异常,终止了所有actor。

 

如何避免上述异常出现:

1、在创建Actor时,要确保这个Actor被正常完全关闭了。为些专门写了个工具类,用于彻底关闭Actor

package com.zte.sunquan.deom.ofo;import java.util.concurrent.TimeUnit;import akka.actor.ActorRef;
import akka.actor.TypedActorExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @Author 10184538* @Date 2019/9/19 19:41**/
public class TypedActorUtils {private static final Logger LOG = LoggerFactory.getLogger(TypedActorUtils.class);private static final int RETRY_TIMES = 25;public static void stop(TypedActorExtension typedActorExtension, Object resolver) {//Bug Fix:stop resolve and avoid (akka.actor.InvalidActorNameException)int times = RETRY_TIMES;boolean isStop = false;if (resolver == null || typedActorExtension == null) {LOG.info("resolver or typeActorExtension cant not be null.");return;}typedActorExtension.stop(resolver);while (times-- > 0) {try {TimeUnit.SECONDS.sleep(1);ActorRef actorRefFor = typedActorExtension.getActorRefFor(resolver);if (actorRefFor == null) {isStop = true;break;}if (actorRefFor.isTerminated()) {isStop = true;break;}} catch (InterruptedException e) {LOG.error("Stop actor {} exception.", resolver.toString(), e);Thread.currentThread().interrupt();} catch (NullPointerException e) {isStop = true;break;}}if (isStop) {LOG.trace("resolver:{} stop success", resolver);} else {LOG.debug("resolver:{} stop failed", resolver);}}
}

TypedActorUtils.stop(TypedActor.get(actorSystem), deviceDataBroker);

2、参考如下:
官方文档和所配图片含义不一致。官方文档中说明当监督者收到Terminate消息后,则可以重新使用actor路径。
而生命周期示意图中将actor关闭后的生命周期分为三步

1:stop actor 

2:notify watchers

 3:allow to reuse path

具体介绍:
    分析akka 源码:当子actor收到Terminate(akka.actor.actorCell)消息后,触发terminate(akka.actor.dungeon.FaultHandling)流程,其会关闭子actor中的所有childActor,然后再关闭自己,此时通过actorRef.isTerminate返回即为true,后在finishTerminate()流程中,通过parent.sendSystemMessage(DeathWatchNotificationparent)通知父actor删除子actor(异步),此时path路径可重复使用,同时通知所有的监督者,子actor已经被关闭
 

故障分析:我们在父actor中关闭子actor并重建,关闭子actor时需要通知父actor删除子actor,而父actor正在处理关闭子actor并重建,此时相当于死锁。
如上原因,则将所有的子actor关闭创建采用异步的模式,让出父actor。参考代码:

Executors.newSingleThreadExecutor().submit(() -> {
+            LOG.info("Create master source provider for node {}", nodeId);
+            //when became master, we should stop ClusteredDeviceSourcesResolver
+            //We will stop masterSourceProvider before create it in TypedActorUtils.createActor
+            TypedActorUtils.stop(TypedActor.get(actorSystem), resolver);
+
+            synchronized (object) {
+                Callable<MasterSourceProvider> callable = new Callable<MasterSourceProvider>() {
+                    @Override
+                    public MasterSourceProvider call() throws Exception {
+                        return TypedActor.get(cachedContext).typedActorOf(
+                                new TypedProps<>(MasterSourceProvider.class,
+                                        new Creator<MasterSourceProviderImpl>() {
+                                            @Override
+                                            public MasterSourceProviderImpl create() throws Exception {
+                                                return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
+                                            }
+                                        }), "masterSourceProvider");
+                    }
+                };
+                masterSourceProvider = TypedActorUtils.createActor(masterSourceProvider, callable, TypedActor.get(actorSystem));
+            }


 

 

 

这篇关于Akka之actor name [c1] is not unique!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/725631

相关文章

C1-2 ABB二次SDK开发——手把手教登录对应的机器人控制器(图片引导操作)登录机器人控制器和刷新机器人列表

1.完成配置后我们开始进行操作 C1-1 ABB二次SDK开发——C#Window窗体-环境配置(带ABB二次开发SDK资源包)-CSDN博客文章浏览阅读95次。3.记住路径,右键C#引用,然后导入ABB.Robotics.Controllers.PC.dll。2.安装资源文件PCABB二次开发的SDK,并打开安装路径。1.新建VSC#的windowfrom项目。4.在框架代码主界面代码中添加。

scala并发编程原生线程Actor、Case Class下的消息传递和偏函数实战

参考代码: import scala.actors._case class Person(name:String,age:Int)class HelloActor extends Actor{def act(){while(true){receive{case Person(name,age)=>{ //偏函数println("Name: "+ name + ":" +"Age:"

LeetCode 63 Unique Paths II

题意: 给出一个带有障碍物的棋盘,每次行动向下或向右移动一格,求从左上角到右下角有几种方案。 思路: 简单dp题,假设dp[i][j]表示第i行第j列的方案数,那么状态转移方程就为 dp[i][j] = dp[i - 1][j] + dp[i][j - 1] 。 注意下边界条件就好了,而且对于障碍物,直接把dp清零即可。 可以发现这个dp只和当前行和上一行有关,进而做空间优化,用一

LeetCode 62 Unique Paths

题意: 一个n*m的棋盘,每次行动只能向下或者向右走1格,求从左上角走到右下角有几种不同的方案数。 思路: 因为行动只能向下向右,所以总步数是一定的,即n - m + 2步。那么问题就变成了这里面的哪几步是向下的,就是组合数了,即从n - m + 2个中选n - 1个的组合数。 题目里说的n和m值太夸张了,因为他的函数返回int……所以肯定很小。 代码: class S

C1-1 ABB二次SDK开发——C#Window窗体-环境配置(带ABB二次开发SDK资源包)

一.使用Visual Stdio创建一个项目 1.新建VSC#的windowfrom项目 2.安装资源文件PCABB二次开发的SDK,并打开安装路径 3.记住路径,右键C#引用,然后导入ABB.Robotics.Controllers.PC.dll 4.在框架代码主界面代码中添加 using ABB.Robotics.Controllers;using ABB.Roboti

Unique Email Address

思路1:面试的时候可以自己写process method class Solution {public int numUniqueEmails(String[] emails) {if(emails == null || emails.length == 0) {return 0;}HashSet<String> set = new HashSet<String>();for(String

C++ boost::upgrade_lock boost::upgrade_to_unique_lock如何使用 例子

upgrade_lock将可将读锁(shared_lock)升级为upgrade_lock,与shared_lock不互斥,与别的upgrade_lock和unique_lock互斥。 也就是说线程A获得mutex的upgrade_lock后,线程B、C等还可以获得mutex的share_mutex,反之亦然。 upgrade_to_unique_lock可将upgrade_lock升级为独占

C++ 有 mutex.lock 为什么要用 lock_guard 、unique_lock

因为直接操作 mutex,即直接调用 mutex 的 lock / unlock 函数。   而使用 lock_guard 可以自动加锁、解锁   C++ Boost库 多线程 线程锁mutex lock_guard 、unique_lock 实例_软件工程小施同学 的专栏-CSDN博客

yolov8代码记录---(tasks.py中的c1、c2和args) / (断点续训)

一、task中的c1、c2和args参数解析 如果你想在yolov8中修改或添加 新的结构块,基本都会修改到task.py中的c1、c2和args参数。 此处以Conv所在的判断分支代码为例: if m in (Classify, Conv, ConvTranspose, ..., C3x, RepC3):c1, c2 = ch[f], args[0]if c2 != nc:c2 = make_

【C++11及其特性】智能指针——unique_ptr

unique_ptr目录 一.排他所有权模式二.auto_ptr的缺点1.可以直接复制和拷贝构造2.STL可以直接赋值3.不支持动态内存分配数组 三.unique_ptr(C++11)1.不支持直接赋值和构造2.STL可以不可以直接赋值3.支持动态内存分配数组 四.unique_ptr的用法1.构造函数2.赋值操作3.主动释放对象4.放弃对象控制权5.重置6.交换 五.排他性智能指针的陷阱六