本文主要是介绍Akka-路由模式Group/Pool,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
为了文章好理解,我们先统一一下概念,在Akka中,Actor是一个高度抽象的概念,Akka的路由器也是一个Actor,所以我们把路由器,叫做路由Actor,接收消息(消费消息)的Actor,在本文中我们叫做消息Actor
Akka中有两种路由模式,分别是Group模式和Pool模式,如果我要将消息X通过路由Actor发送到多台机器,那么:
Pool: 在多台机器上,每台机器上的消息Actor都是由路由Actor创建的,也就是说,处理消息的这些Actor,是路由Actor的儿子,每台机器上的Actor可以通过parent()方法来测试,注意,路由Actor和消息Actor可不一定在同一台机器上
Group: 多台机器上,每台机器上的消息Actor,都是我们程序员自己写代码创建好的,是通过actorOf创建的,路由Actor将消息X通过路径发送给这些处理消息的Actor
创建Akka项目
也可以参考本系列文章的环境搭建或者关于Akka的路由策略,可以参考这里,如果你已经搭建了akka环境,那么可跳过本段落,直接浏览下文的示例1
步骤1: Maven依赖
<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.11</artifactId><version>2.4.20</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.11</artifactId><version>2.4.20</version></dependency>
步骤2: 创建一个Actor
package akka.demo.actor;import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.FromConfig;public class MyActor extends UntypedActor {@Overridepublic void onReceive(Object o) throws Throwable {// getSelf()方法表示了我是谁// getSender()方法表示了谁发的消息System.out.println(getSelf() + "收到了来自:" + getSender().path() + "消息内容:" + o);}
}
步骤3: 在Resource文件夹下创建一个叫做application.conf
的文件,至于文件的内容,下面示例中再给出
示例1:Pool模式
下面的代码演示了名字为"abc"的路由Actor创建了3个子Actor,然后通过广播的策略方式,给3个子Actor发送消息,子Actor接收到消息之后,打印消息内容
步骤1: 首先在配置文件application.conf
中写如下内容
akka{actor{provider="akka.remote.RemoteActorRefProvider"deployment{# 定义一个叫做abc的路由Actor,它包含一个池,池子里有3个子Actor(也就是上文中的消息Actor)/abc{# 这个路由Actor使用的路由策略是broadcast-pool广播router = broadcast-pool# abc中包含3个处理消息的Actornr-of-instances = 3}}}# 自己的IP端口remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2551}}
}
步骤2: 写个main方法
public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");ActorRef routeActorRef = system.actorOf(FromConfig.getInstance().props(Props.create(MyActor.class)), "abc");System.out.println(routeActorRef.path());for (int i = 0; i < 20; i++) {Thread.sleep(3000);routeActorRef.tell("内容" + i, ActorRef.noSender());}}
步骤3: 运行main方法,打印内容如下
// 循环,第1次打印3条数据,说明广播策略生效,3个子actor都接到了消息
Actor[akka://sys/user/abc/$c#-119781784]收到了来自:akka://sys/deadLetters消息内容:内容0
Actor[akka://sys/user/abc/$b#2011987574]收到了来自:akka://sys/deadLetters消息内容:内容0
Actor[akka://sys/user/abc/$a#-755258569]收到了来自:akka://sys/deadLetters消息内容:内容0
// 循环,第2次打印3条数据,说明广播策略生效,3个子actor再次都接到了消息
Actor[akka://sys/user/abc/$a#-755258569]收到了来自:akka://sys/deadLetters消息内容:内容1
Actor[akka://sys/user/abc/$b#2011987574]收到了来自:akka://sys/deadLetters消息内容:内容1
Actor[akka://sys/user/abc/$c#-119781784]收到了来自:akka://sys/deadLetters消息内容:内容1
到这里pool模式演示完毕,需要注意的时候,由于我是一台机器,而且只开了一个服务,所以这3个子Actor都是在同一台机器上的同一个JVM下,实际的集群情况,这3个子Actor会分配到不同的机器上,不过这是后话了,后续文章会有专门的集群配置
示例1:Group模式
group示例需要开3个服务,并且每个服务的conf文件内容都不一样,如果没理解什么意思,你可以参考关于Akka的路由策略这篇文章起3个服务
下面的示例通过服务1,通过广播的路由策略,给服务2和服务3发送消息
步骤1:
服务1的配置文件application.conf
中写如下内容
akka{actor{provider="akka.remote.RemoteActorRefProvider"deployment{/abc{# 使用广播的路由策略,注意,这个地方是-group,上一个例子是-poolrouter = broadcast-group# 下面的代码我们要通过服务1的路由Actor(名字叫abc),给服务2的消息Actor(名字叫actor02)和# 服务3的消息Actor(名字叫actor3)发消息# 后文的main方法中会创建actor02和actor03,而上文的Pool示例中是Akka自动创建的消息Actor# 并且自动创建的消息Actor的名字是随机的,从上文示例中可以看到名字类似"$a#-755258569"# 一个手动创建,一个自动创建,这也是group和pool的明显区别之一routees.paths = ["akka.tcp://sys@127.0.0.1:2552/user/actor02","akka.tcp://sys@127.0.0.1:2553/user/actor03"]}}}# 自己的IP端口remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2551}}
}
服务2的配置文件application.conf
中写如下内容
akka{actor{provider="akka.remote.RemoteActorRefProvider"}remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2552}}
}
服务3的配置文件application.conf
中写如下内容
akka{actor{provider="akka.remote.RemoteActorRefProvider"}remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2553}}
}
步骤2: 创建main方法
服务1的main方法
public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");// FromConfig.getInstance().props()// 通过配置文件,创建一个叫做abc的路由ActorActorRef routeActorRef = system.actorOf(FromConfig.getInstance().props(Props.create(MyActor.class)), "abc");System.out.println(routeActorRef.path());for (int i = 0; i < 20; i++) {Thread.sleep(3000);// 由于配置文件中定义了abc要给名字为actor02和actor03的消息Actor发消息,所以// 下面这行代码会根据IP端口号,然后给它俩发消息,前提是必须通过IP和端口号能找到// 这俩Actor才行routeActorRef.tell("内容" + i, ActorRef.noSender());}}
服务2的main方法
public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");// 为了能让服务1(abc)找到actor02,所以我们创建一个叫做actor02的消息ActorActorRef actorReference = system.actorOf(Props.create(MyActor.class), "actor02");System.out.println(actorReference.path());System.out.println("sys系统创建完毕");
}
服务3的main方法
public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");// 为了能让服务1(abc)找到actor03,所以我们创建一个叫做actor03的消息ActorActorRef actorReference = system.actorOf(Props.create(MyActor.class), "actor03");System.out.println(actorReference.path());System.out.println("sys系统创建完毕");
}
步骤3: 先启动服务2和服务3,最后启动服务1,会发现服务2和服务3控制台打印如下
消息来自:akka.tcp://sys@127.0.0.1:2551/deadLetters消息内容:内容0
本文演示完毕,已经从技术角度说明了group模式和pool模式的区别,主要就是Actor的生命周期归谁管的问题,是程序员硬编码控制,还是通过路由Actor自己控制,关于生命周期的话题,后续文章会有
有一说一,具体的使用场景我还没有想出来,因为根据现有的结果来看,似乎任何情况都可以使用group的方式,而不用pool的方式,因为我也是调研akka,所以没有真正意义上的实战,也是摸着石头过河,目前我能想到的应该是需要动态拓容的场景,或许能需要pool模式????
下一篇文章:Akka的集群搭建(还没开始写 TODO)
这篇关于Akka-路由模式Group/Pool的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!