本文主要是介绍Akka之actor name [c1] is not unique!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在使用ODL的过程中,突然出现如题所述的故障,字面上看在actor的池中存在了两个同名的actor。在功能开发设计时,当然是不允许actor名称相同,基本可以排除开发人员识定义了重名的actor。下面编写可复现的测试说明该问题。
测试代码,父Actor
package com.zte.sunquan.deom.ofo;import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import java.util.Optional;/*** Created by sunquan on 2018/3/29.*/
public class ParentActor extends AbstractActor {private String name;private ActorRef c1;public ParentActor(String name) {this.name = name;}public ParentActor() {c1 = getContext().actorOf(ChildActor.props("c1"), "c1");ActorRef c2 = getContext().actorOf(ChildActor.props("c2"), "c2");}public static Props props() {return Props.create(ParentActor.class);}@Overridepublic Receive createReceive() {return receiveBuilder().matchEquals("killc1",p->{getContext().actorOf(ChildActor.props("c1"),"c1");getContext().stop(c1);}).matchAny(p -> {System.out.println("parent:" + p);}).build();}@Overridepublic void preStart() throws Exception {super.preStart();System.out.println("parent pre start");}@Overridepublic void postStop() throws Exception {super.postStop();System.out.println("parent pre stop");}@Overridepublic void preRestart(Throwable reason, Optional<Object> message) throws Exception {super.preRestart(reason, message);System.out.println("parent pre reStart2");}@Overridepublic void postRestart(Throwable reason) throws Exception {super.postRestart(reason);System.out.println("parent pre reStart1");}
}
子Actor
package com.zte.sunquan.deom.ofo;import akka.actor.AbstractActor;
import akka.actor.Props;
import java.util.Optional;/*** Created by sunquan on 2018/3/29.*/
public class ChildActor extends AbstractActor {private String name;public ChildActor() {}public ChildActor(String name) {this.name = name;}public static Props props(String name) {return Props.create(ChildActor.class,name);}@Overridepublic Receive createReceive() {return receiveBuilder().matchAny(p -> {System.out.println("child:" + p);}).build();}@Overridepublic void preStart() throws Exception {super.preStart();System.out.println("child " + name + " pre start");}@Overridepublic void postStop() throws Exception {super.postStop();System.out.println("child " + name + " pre stop");}@Overridepublic void preRestart(Throwable reason, Optional<Object> message) throws Exception {super.preRestart(reason, message);System.out.println("child " + name + " pre reStart1");}@Overridepublic void postRestart(Throwable reason) throws Exception {super.postRestart(reason);System.out.println("child " + name + " pre reStart2");}
}
测试用例:
package com.zte.sunquan.deom.ofo;import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import org.junit.Test;/*** Created by sunquan on 2018/3/29.*/
public class PCTest {static ActorSystem _system = ActorSystem.create("mysystem");@Testpublic void test() throws InterruptedException {_system.actorOf(ParentActor.props(),"parent");ActorRef parent = _system.actorFor("akka://mysystem/user/parent");Thread.sleep(3000);ActorRef c1 = _system.actorFor("akka://mysystem/user/parent/c1");ActorRef c2 = _system.actorFor("akka://mysystem/user/parent/c2");parent.tell("p1",ActorRef.noSender());c1.tell("c1",ActorRef.noSender());c2.tell("c2",ActorRef.noSender());parent.tell("killc1",ActorRef.noSender());_system.actorOf(ChildActor.props("c1"),"c1");Thread.sleep(5000);_system.terminate();}}
打印结果:
parent pre start
child c2 pre start
child c1 pre start
child:c2
child:c1
parent:p1
child c1 pre start ===========ParentActor接收killc1,导致创建c1
parent pre stop ===========此时由于c1存在同名actor,未关闭,所以导致父Actor,关闭重启
parent pre reStart2
child c2 pre stop
child c1 pre stop
child c1 pre start
child c2 pre start
parent pre start
parent pre reStart1 ============父Actor重启以及其中的子Actor重启
[ERROR] [03/29/2018 14:08:05.983] [mysystem-akka.actor.default-dispatcher-6] [akka://mysystem/user/parent] actor name [c1] is not unique!
akka.actor.InvalidActorNameException: actor name [c1] is not unique!at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:129)at akka.actor.dungeon.Children$class.reserveChild(Children.scala:134)at akka.actor.ActorCell.reserveChild(ActorCell.scala:370)at akka.actor.dungeon.Children$class.makeChild(Children.scala:272)at akka.actor.dungeon.Children$class.actorOf(Children.scala:44)at akka.actor.ActorCell.actorOf(ActorCell.scala:370)at com.zte.sunquan.deom.ofo.ParentActor.lambda$createReceive$0(ParentActor.java:31)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at akka.actor.Actor$class.aroundReceive(Actor.scala:514)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:132)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)at akka.actor.ActorCell.invoke(ActorCell.scala:496)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)at akka.dispatch.Mailbox.run(Mailbox.scala:224)at akka.dispatch.Mailbox.exec(Mailbox.scala:234)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)[INFO] [03/29/2018 14:08:10.968] [Thread-0] [CoordinatedShutdown(akka://mysystem)] Starting coordinated shutdown from JVM shutdown hook
child c1 pre stop
child c1 pre stop
child c2 pre stop
parent pre stop
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 -Duser.language=en -Duser.country=USProcess finished with exit code 0
在向ParentActor发送killC后的处理逻辑中有再次创建c1这个Actor,与池中已存在的c1这个Actor的创建重名异常,被父actor中的监督策略捕获,触发重启(包括了子actor)
父 Actor 首先调用 preRestart ,然后 被实例化,在实例化过程中,再次创建c1这个actor,再调用 postRestart,最后再重启它的子Actor,子Actor c1重启时,又重名。所以打印异常,终止了所有actor。
如何避免上述异常出现:
1、在创建Actor时,要确保这个Actor被正常完全关闭了。为些专门写了个工具类,用于彻底关闭Actor
package com.zte.sunquan.deom.ofo;import java.util.concurrent.TimeUnit;import akka.actor.ActorRef;
import akka.actor.TypedActorExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @Author 10184538* @Date 2019/9/19 19:41**/
public class TypedActorUtils {private static final Logger LOG = LoggerFactory.getLogger(TypedActorUtils.class);private static final int RETRY_TIMES = 25;public static void stop(TypedActorExtension typedActorExtension, Object resolver) {//Bug Fix:stop resolve and avoid (akka.actor.InvalidActorNameException)int times = RETRY_TIMES;boolean isStop = false;if (resolver == null || typedActorExtension == null) {LOG.info("resolver or typeActorExtension cant not be null.");return;}typedActorExtension.stop(resolver);while (times-- > 0) {try {TimeUnit.SECONDS.sleep(1);ActorRef actorRefFor = typedActorExtension.getActorRefFor(resolver);if (actorRefFor == null) {isStop = true;break;}if (actorRefFor.isTerminated()) {isStop = true;break;}} catch (InterruptedException e) {LOG.error("Stop actor {} exception.", resolver.toString(), e);Thread.currentThread().interrupt();} catch (NullPointerException e) {isStop = true;break;}}if (isStop) {LOG.trace("resolver:{} stop success", resolver);} else {LOG.debug("resolver:{} stop failed", resolver);}}
}
TypedActorUtils.stop(TypedActor.get(actorSystem), deviceDataBroker);
2、参考如下:
官方文档和所配图片含义不一致。官方文档中说明当监督者收到Terminate消息后,则可以重新使用actor路径。
而生命周期示意图中将actor关闭后的生命周期分为三步
1:stop actor
2:notify watchers
3:allow to reuse path
具体介绍:
分析akka 源码:当子actor收到Terminate(akka.actor.actorCell)消息后,触发terminate(akka.actor.dungeon.FaultHandling)流程,其会关闭子actor中的所有childActor,然后再关闭自己,此时通过actorRef.isTerminate返回即为true,后在finishTerminate()流程中,通过parent.sendSystemMessage(DeathWatchNotificationparent)通知父actor删除子actor(异步),此时path路径可重复使用,同时通知所有的监督者,子actor已经被关闭
故障分析:我们在父actor中关闭子actor并重建,关闭子actor时需要通知父actor删除子actor,而父actor正在处理关闭子actor并重建,此时相当于死锁。
如上原因,则将所有的子actor关闭创建采用异步的模式,让出父actor。参考代码:
Executors.newSingleThreadExecutor().submit(() -> {
+ LOG.info("Create master source provider for node {}", nodeId);
+ //when became master, we should stop ClusteredDeviceSourcesResolver
+ //We will stop masterSourceProvider before create it in TypedActorUtils.createActor
+ TypedActorUtils.stop(TypedActor.get(actorSystem), resolver);
+
+ synchronized (object) {
+ Callable<MasterSourceProvider> callable = new Callable<MasterSourceProvider>() {
+ @Override
+ public MasterSourceProvider call() throws Exception {
+ return TypedActor.get(cachedContext).typedActorOf(
+ new TypedProps<>(MasterSourceProvider.class,
+ new Creator<MasterSourceProviderImpl>() {
+ @Override
+ public MasterSourceProviderImpl create() throws Exception {
+ return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
+ }
+ }), "masterSourceProvider");
+ }
+ };
+ masterSourceProvider = TypedActorUtils.createActor(masterSourceProvider, callable, TypedActor.get(actorSystem));
+ }
这篇关于Akka之actor name [c1] is not unique!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!