本文主要是介绍Scala Actor通信,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Scala Actor通信
简介
Actor提供了并发程序中与传统的基于锁的结构不同的另一种选择。通过尽可能避免锁和共享状态,Actor使得我们能够更加容易设计出正确、没有死锁或争用状态的程序。Scala类库提供了一个Actor模型的简单实现,除此之外还有其他更高级的Actor类库,比如Akka(http://akka.io)。
创建和启动Actor
actor是扩展自Actor特质的类。该特质带有一个抽象方法act。可以重写这个方法来指定该actor的行为。
创建Actor
//继承Actor特质类
class HiActor extends Actor {//重写Actor特质类的抽象方法act,从而实现自己的业务逻辑override def act(): Unit = {//消息循环while (true)//使用case模式匹配接收到的消息receive {case "Hi" => println("Hello")}}
}
act方法与Java中的Runnable接口的run方法很相似。正如不同的线程的run方法那样,不同的actor的act方法也是并行运行的。不过actor针对响应消息作了优化,而线程可以展开任意的活动。
启动Actor
要启动一个actor,需要构造一个实例,并调用start方法:
val actor1 = new HiActor
actor1.start()
现在,actor1的act方法时并行运行的,你可以开始向它发送消息。调用start的线程会继续执行。
如果你需要临时创建actor而不是定义一个类。Actor伴生对象带有一个actor方法来创建和启动actor:
import scala.actors.Actor._
val actor2 = actor {while (true){receive{case "Hi" => println("Hello")}}
}
有时,一个匿名的actor需要向另一个actor发送指向自己的引用。这个引用可以通过self属性获取。
Actor的执行是异步的:
import scala.actors.Actor
object exam1 {def main(args: Array[String]): Unit = {//创建actor实例actor1val actor1 = new HiActoractor1.start()actor1 ! "Hi"for (i <- 1 to 10) {println("This is Main...")Thread.sleep(1000)}}
}//继承Actor特质类
class HiActor extends Actor {//重写Actor特质类的抽象方法act,从而实现自己的业务逻辑override def act(): Unit = {//消息循环while (true)receive {case "Hi" => {for (i <- 1 to 10) {println("This is Actor...")Thread.sleep(1000)}}}}
}
打印结果:
This is Main...
This is Actor...
This is Main...
//...
发送消息
actor是一个处理异步消息的对象。你向某个actor发送消息,该actor处理消息,或许还会向其他actor发送消息做进一步的处理。消息可以是任意对象。
要发送消息,我们可以用为actor定义的 ! 操作符:
actor1 ! "Hi"
代码示例:
import scala.actors.Actorobject exam1 {def main(args: Array[String]): Unit = {//创建actor实例actor1val actor1 = new HiActor//启动actor1actor1.start()//向actor1发送 Hiactor1 ! "Hi"}
}//继承Actor特质类
class HiActor extends Actor {//重写Actor特质类的抽象方法act,从而实现自己的业务逻辑override def act(): Unit = {//消息循环while (true)//使用case模式匹配接收到的消息receive {case "Hi" => println("Hello")}}
}
消息被送往该actor,当前线程继续执行。这样的做法称作“发完就忘”。也可以等待一个回复。
一个比较好的做法就是使用样例类作为消息,这样,actor就可以使用模式匹配来处理消息了。
代码示例:
import scala.actors.Actorobject exam1 {def main(args: Array[String]): Unit = {//创建actor实例actor1val actor1 = new HiActor//启动actor1actor1.start()//向actor1发送消息actor1 ! Person("Alice", 20)actor1 ! Person("Bob", 29)actor1 ! Person("Cindy", 16)}
}//继承Actor特质类
class HiActor extends Actor {//重写Actor特质类的抽象方法act,从而实现自己的业务逻辑override def act(): Unit = {//消息循环while (true)//使用case模式匹配接收到的消息receive {//可以是具体的对象//例如:case Person("Alice",20) => println("OK")case Person(name, age) => println("this is " + name + " : age is " + age)}}
}
//样例类
case class Person(val name: String, val age: Int)
打印结果:
this is Alice : age is 20
this is Bob : age is 29
this is Cindy : age is 16
这样就可以实现一个检查人信息的actor了。
接收消息
发送到actor的消息被存放在一个“邮箱”中。Receive方法从邮箱获取下一条消息并将它传递给它的参数,该参数是一个偏函数。
例如:
receive{case Deposit(amount) => ...case Withdraw(amount) => ...
}
Receive方法的参数是:
{case Deposit(amount) => ...case Withdraw(amount) => ...
}
Receive方法将从邮箱中收来的消息传递给这个偏函数。
注意:
- 消息投递的过程是异步的,它们会以什么样的顺序到达并不确定,所以在设计的时候要考虑到这一点,让程序不要依赖任何特定的消息投递顺序。
- 如果receive方法被调用时并没有消息,则该调用会阻塞,直到有消息抵达。
- 如果邮箱中没哟任何消息可以被偏函数处理,则对receive方法的调用也会阻塞,直到一个可以匹配的消息抵达。
- 邮箱可能被那些不与任何case语句匹配的消息占满。你可以添加一个case _ 语句来处理任意的消息。
- 邮箱会串行化消息。Actor运行在单个线程中。它会先接收一条消息,然后接收下一条消息。不过在actor的代码中不用担心争用状况。
例如:
receive {case Deposit(amount) => balance += amoutcase Withdraw(amount) => balance -= amoutcase ...
}
Actor中不会有将增值和减值互惨在一起的危险。
向其他Actor发送消息
当运算被分拆到不同actor来并行处理问题的各个部分时,这些处理结果需要被收集到一起。Actor可以将结果存入到一个线程安全的数据结构当中,比如一个并发的哈希映射,但actor模型并不鼓励使用共享数据。因而当actor计算出结果后,应该向另一个actor发送消息。
Actor计算结果发送方向:
- 可以是一些全局的actor。不过,当actor数量很多时,该方案伸缩性并不好。
- Actor可以构造成带有指向另一个或多个actor的引用。
- Actor可以接收带有指向另一个actor的引用消息。
- Actor可以返回消息给发送方。
消息通道
你可以通过共享消息通道来实现actor之间消息通信。
这样做的好处:
- 消息通道是类型安全的——你只能发送或接收特定的类型消息。
- 你不会不小心通过消息通道调用到某个actor方法。
消息通道可以是一个OutputChannel(带 ! 的方法),也可以是一个InputChannel(带receive或react方法)。Channel类同时扩展了OutputChannel和InputChannel特质。
要构造一个消息通道,你可以提供一个actor:
val channel = new Channel[Int](actor1)
如果不提供构造参数,那么消息通道就会绑定到当前执行的这个actor上。
代码示例:
import scala.actors.{Actor, Channel, OutputChannel}
import scala.actors.Actor._object exam1 {def main(args: Array[String]): Unit = {val actor1 = actor {val channel = new Channel[Int]val computeActor: Computer = new Computerval input:Seq[Int] = List(11,22,33)computeActor ! Compute(input,channel)channel.receive{case x:Int => //...case _ => //...}}actor1.start()}
}case class Compute(input: Seq[Int], result: OutputChannel[Int])class Computer extends Actor {override def act(): Unit = {while (true) {receive {case Compute(input, out) => {val answer = 88; out ! answer}}}}
}
同步消息和Future
同步消息
actor可以发送一个消息并等待回复,用 !? 操作符即可。
例如:
import scala.actors.Actorobject exam1 {def main(args: Array[String]): Unit = {//创建actor实例actor1val actor1 = new HiActor//启动actor1actor1.start()//向actor1发送消息并等待回复val reply = actor1 !? Person("Alice", 20)println(reply)}
}//继承Actor特质类
class HiActor extends Actor {//重写Actor特质类的抽象方法act,从而实现自己的业务逻辑override def act(): Unit = {//消息循环while (true)receive {//可以是具体的对象//例如:case Person("Alice",20) => println("OK")case Person(name, age) => {println("this is " + name + " : age is " + age); sender ! "replied message..."}}}
}//样例类
case class Person(val name: String, val age: Int)
打印结果:
this is Alice : age is 20
replied message...
这两种返回效果相同:
sender ! "replied message..."
reply("replied message...")
同步消息很容易引发死锁。通常而言,最好避免在actor的act方法里执行阻塞调用。
future
除了等待对方返回结果之外,你也可以选择接收一个future——这是一个将在结果可用时产出结果的对象。使用 !! 方法:
val replyFuture = acount !! Deposite(1000)
isSet方法会检查结果是否已经可用。要接收该结果,使用函数调用的表示方法:
val reply = replyFuture()
这个调用将会阻塞,直到回复被发送。
共享线程
某些程序包含的actor过多,以至于要为actor创建单独的线程开销会很大。此时需要考虑在同一个线程中运行多个actor。Actor有时大部分时间用户等待消息,这时actor所在的单独线程会堵塞,与其这样不如用一个线程来执行多个actor的消息处理函数。
在Scala中,react方法可以实现这样的功能。react方法接收一个偏函数,并将它添加到邮箱,然后退出。
react工作原理
当你调用一个actor的start时,start方法会以某种方式来确保最终会有某个线程来调用那个actor的act方法。如果act方法调用了react ,则react方法会在actor的邮箱中查找传递给偏函数的能够处理的消息 。(和receive方法一样,传递待处理消息给偏函数的isDefinedAt方法。) 如果找到一个可以处理的消息,react 会安排一个在未来某个时间处理该消息的计划并抛出异常。如果它没有找到这样的消息,它会将actor置于“冷存储” 状态 ,在它通过邮箱收到更多消息时重新激活,并抛出异常。不论是哪种情况,react都会以这个异常的方式完成其执行,act 方法也随之结束 调用act的线程会捕获这个异常,忘掉这个actor , 并继续处理其他事务。这就是为什么你想要react在处理第一个消息之外做更多的事,你将需要在偏函数中再次调用act方法 ,或使用某种其他的手段来让react再次被调用。
较为复杂的代码示例
import java.net.InetAddress
import java.net.UnknownHostException
import actors._, Actor._
import scala.actors.Actor/*** scala Actor构建在java的线程基础之上的,* 为了避免频繁的线程创建、销毁和切换等,scala中提供了react方法* 方法执行完毕后,仍然被保留*/
object NameResolver extends Actor{def act(){react{//模式匹配 case Net(name,actor) =>/*** 想actor发送解析后的IP地址* 该例中,actor为本身,即向本身发送消息,存入邮箱*/actor!getIpAddress(name)//再次调用act方法,从本身邮箱中读取消息 //如果消息为空,则等待 actcase "EXIT" =>println("Name resolver exiting.")//匹配邮箱中的单个信息,本例中会匹配邮箱中的IP地址信息case msg =>println("Unhandled message: " + msg)act}}def getIpAddress(name:String):Option[InetAddress] ={try{println(InetAddress.getByName(name))Some(InetAddress.getByName(name))} catch {case _: UnknownHostException => None}}
}
case class Net(name:String,actor:Actor)object ActorWithReAct extends App {//启动Actor NameResolver.start()//发消息 NameResolver!Net("www.baidu.com",self)//接收消息 println(self.receive{case msg => msg})
}
Actor生命周期
actor的act方法在actor的start方法被调用的时候开始执行。通常,actor接下来做的事情是进入某个循环,例如:
def act(): Unit ={while (...){receive{//}}
}
actor在如下情形之一会终止执行:
- Act方法返回
- Act方法由于异常被终止
- Actor调用exit方法
exit方法是个受保护的方法。它只能被Actor子类中调用。例如:
val actor2 = actor{while(true){receive{case "Hi" => println("Hello")case "Bye" => exit()}}
}
其他方法不能调用exit()来终止一个actor。
当actor因一个异常终止时,退出原因就是UncaughtException样例类的一个实例。该样例类有如下属性:
acotr: 抛出异常的actor。
- message: Some(msg) msg为该actor处理的最后一条消息;或者None,如果actor在没来得及处理任何消息之前就挂掉的话。
- Sender:Some(channel) channel代表最后一条消息的发送方的输出消息通道;或者None,如果actor在没来得及处理任何消息之前就挂掉。
- Thread: actor退出时所在的线程。
- Cause: 相应的异常。
总结
- 每个actor都要扩展Actor类并提供act方法。
- 要往actor发送消息,可以用actor ! Message。
- 消息发送是异步的:“发完就忘”
- 要接收消息,actor可以调用receive或react,通常是在循环中这样做。
- receive/react的参数是由case语句组成的代码块(是一个偏函数)
- 不同actor之间不应该共享状态。总是使用消息来发送数据。
- 不要直接调用actor方法。通过消息进行通信。
- 避免同步消息——也就是说将发送消息和等待响应分开。
- 不同actor可以通过react而不是receive来共享线程,前提是消息处理器的控制流转足够简单。
- 让actor挂掉是可以的,前提是你有其他actor监控着actor生死。用链接来设置监控关系。
这篇关于Scala Actor通信的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!