本文主要是介绍Akka-路由策略,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
本文将演示如下示例:
用IDEA创建3个akka项目(创建3个服务),分别叫做service1,service2,service3,然后使用service1,通过不同的路由策略,给2和3发送消息,代码演示之前,先介绍一下Akka的路由策略,它们分别是:
- 轮询(Round Robin):
- service2和service3轮流接到service1的消息
- 保证消息被均匀分配到各个Actor,适用于需要均匀负载的场景。
- 配置文件中的内容为
round-robin-group
或者round-robin-pool
,先不用管这俩有什么区别
- 随机(Random):
- service2和service3不一定谁收到service1的消息
- 适用于当Actor处理时间差异不大,且需要一定随机性的场景。
- 配置文件中的内容为
random-group
或者random-pool
- 空闲(Smallest Mailbox):
- 选择当前拥有最少消息的Actor实例发送消息。
- 旨在减少等待时间,将消息发送到最有可能立即处理它的Actor。
- 配置文件中的内容为
smallest-mailbox-pool
,不支持group
- 广播(Broadcast):
- service2和service3同时都会收到service1的消息
- 适用于需要将消息同时分发给多个Actor进行处理的情况。
- 配置文件中的内容为
broadcast-group
或者broadcast-pool
- 分散聚集(Scatter/Gather First Completed):
- 将消息分散给各个处理者,只使用第一个回复消息,并忽略其他回复。
- 适用于需要从多个处理者中选择最快响应的场景。
- 配置文件中的内容为
scatter-gather-group
或者scatter-gather-pool
- 一致性哈希(Consistent Hashing):
- 根据消息内容的哈希值来确定消息应该发送到哪个Actor实例。
- 适用于需要将特定数据发送到特定Actor,以实现分布式缓存或数据分区的情况。
- 配置文件中的内容为
consistent-hashing-group
或者consistent-hashing-pool
- 尾切(Tail Chopping):
- 类似于分散聚集,但不是一次性向所有Actor发送消息,而是每向一个Actor发送消息后等待一小段时间。
- 这可以减少网络负载,同时仍能保证较快的响应时间。
- 配置文件中的内容为
tail-chopping-group
或者tail-chopping-pool
- 平衡池(Balancing Pool):
- 多个Actor共享同一个邮箱,一旦有空闲就处理邮箱中的任务。
- 这种策略可以确保所有Actor都处于繁忙状态,适用于本地集群且Actor任务处理时间相近的场景。
- 配置文件中的内容为
balancing-pool
,不支持group
步骤1: 创建3个Maven项目,这3个项目的maven的pom文件都相同,也可以参考这里搭建服务,依赖如下:
<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.11</artifactId><version>2.4.20</version>
</dependency>
<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.11</artifactId><version>2.4.20</version>
</dependency>
步骤2: 创建Actor,Akka中的Actor就是一个继承UntypedActor的普通Java类,将这个类复制3份,注意包名一定要完全相同,然后service1,service2,service3这三个项目每个都存放一份这个类,注意本例的包名,保证三个项目包路径都一样
package akka.demo.actor;// 这个Actor01类,必须在三个项目中都有,且全限定名(包.类名称)完全相同import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.routing.FromConfig;
import com.typesafe.config.ConfigFactory;public class MyActor extends UntypedActor {@Overridepublic void onReceive(Object o) throws Throwable {System.out.println("消息来自:" + getSender().path() + "消息内容:" + o);}
}
步骤3: 增加配置文件,在这3个项目中的resources文件夹下,都创建一个叫做application.conf
的配置文件,3个服务的配置文件分别如下
service1的application.conf
文件如下
akka{actor{provider="akka.remote.RemoteActorRefProvider"deployment{# 表示有一个叫abc的Actor,这个Actor其实是个路由器,因为在Akka的概念中,路由器也是Actor# 每次使用abc这个路由器发送消息的时候,这个路由器会将消息发给service2的actor02,或者发送给service3的actor03# 具体发给2还是3,要看我们使用哪种路由策略(也就是本文主要内容)/abc{router = round-robin-group # 注意这里,这里就是路由策略routees.paths = ["akka.tcp://sys@127.0.0.1:2552/user/actor02", # 这个就是service2的IP端口号,下面有service2的配置"akka.tcp://sys@127.0.0.1:2553/user/actor03" # 这个就是service3的IP端口号,下面有service3的配置]}}}# 表示我自己的IP端口remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2551}}
}
service2的application.conf
文件如下
akka{actor{provider="akka.remote.RemoteActorRefProvider"}remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2552}}
}
service3的application.conf
文件如下,除了端口号,其他和service2是一样的
akka{actor{provider="akka.remote.RemoteActorRefProvider"}remote{enabled-transports=["akka.remote.netty.tcp"]netty.tcp{hostname="127.0.0.1"port=2553}}
}
步骤4: 创建Main方法,用来启动服务,分别在这三个项目中的任意位置,增加如下main方法:
service1的main方法
public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");// 读取service1的application.conf文件,根据这个文件里面的abc的信息,创建一个路由器类型的Actor引用ActorRef routeActorRef = system.actorOf(FromConfig.getInstance().props(), "abc");// 通过该引用,给service2和service3发消息,具体2和3谁能接到消息,取决于service1配置文件中的router属性for (int i = 0; i < 20; i++) {Thread.sleep(3000);routeActorRef.tell("内容" + i, ActorRef.noSender());}
}
service2的main方法
public static void main(String[] args) throws Throwable {ActorSystem system = ActorSystem.create("sys");// 创建一个名字叫做actor02的Actor,actorOf方法返回了引用,但是我们除了打印不做别的事情// 这行代码表示在service2的服务中,创建完毕了一个叫做actor02的Actor,这样service1远程调用// 的时候,能调用到这个actor02ActorRef actorReference = system.actorOf(Props.create(MyActor.class), "actor02");System.out.println(actorReference.path()); // 这行打印的内容是与service1的配置文件中的routees.paths属性有关系的System.out.println("sys系统创建完毕");
}
service3的main方法
public static void main(String[] args) throws Throwable {// 没什么好说的,和service2基本一样,就是创建的Actor的名字叫做actor03ActorSystem system = ActorSystem.create("sys");ActorRef actorReference = system.actorOf(Props.create(MyActor.class), "actor03");System.out.println(actorReference.path()); // 这行打印的内容是与service1的配置文件中的routees.paths属性有关系的System.out.println("sys系统创建完毕");
}
步骤5: 启动服务,注意,我们先运行2和3的main方法,然后再运行1的main方法
由于在service1的配置文件中,router = round-robin-group,这表示我们使用的路由策略是轮询,所以运行service1之后,在2和3的控制台会轮流打印出0,1,2,3,4,5,6…
本文示例到此结束了,下面说点其他的
1.配置文件中我们可以见到xxx-group和xxx-pool,其中group和pool表示Akka中路由的两种模式
2.文中仅仅演示了多个机器调用的情况,还不属于完整的集群配置,这个后续文章中再补充
3.文中使用的配置文件叫做application.conf
,这是akka默认的配置名,假如我的配置文件叫xzy.conf
怎么办,这就需要我们指定配置文件名称,代码是ActorSystem system = ActorSystem.create("sys",, ConfigFactory.load("xzy.conf"));
4.配置文件中有一个叫做router
,还有一个叫routees
,其中router表示路由,routees表示路由的目标,从示例中你应该是可以看出来的
下一篇文章:Akka的路由模式Group/Pool
这篇关于Akka-路由策略的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!