Scala Actor通信

2024-06-01 11:08
文章标签 通信 scala actor

本文主要是介绍Scala Actor通信,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Scala Actor通信

 

 

 

 

简介

Actor提供了并发程序中与传统的基于锁的结构不同的另一种选择。通过尽可能避免锁和共享状态,Actor使得我们能够更加容易设计出正确、没有死锁或争用状态的程序。Scala类库提供了一个Actor模型的简单实现,除此之外还有其他更高级的Actor类库,比如Akka(http://akka.io)。

 

 

创建和启动Actor

actor是扩展自Actor特质的类。该特质带有一个抽象方法act。可以重写这个方法来指定该actor的行为。

 

创建Actor

//继承Actor特质类
class HiActor extends Actor {//重写Actor特质类的抽象方法act,从而实现自己的业务逻辑override def act(): Unit = {//消息循环while (true)//使用case模式匹配接收到的消息receive {case "Hi" => println("Hello")}}
}

act方法与Java中的Runnable接口的run方法很相似。正如不同的线程的run方法那样,不同的actor的act方法也是并行运行的。不过actor针对响应消息作了优化,而线程可以展开任意的活动。

 

 

 

启动Actor

要启动一个actor,需要构造一个实例,并调用start方法:

val actor1 = new HiActor
actor1.start()

现在,actor1的act方法时并行运行的,你可以开始向它发送消息。调用start的线程会继续执行。

 

 

如果你需要临时创建actor而不是定义一个类。Actor伴生对象带有一个actor方法来创建和启动actor:

import scala.actors.Actor._
val actor2 = actor {while (true){receive{case "Hi" => println("Hello")}}
}

有时,一个匿名的actor需要向另一个actor发送指向自己的引用。这个引用可以通过self属性获取。

 

 

Actor的执行是异步的:

import scala.actors.Actor
object exam1 {def main(args: Array[String]): Unit = {//创建actor实例actor1val actor1 = new HiActoractor1.start()actor1 ! "Hi"for (i <- 1 to 10) {println("This is Main...")Thread.sleep(1000)}}
}//继承Actor特质类
class HiActor extends Actor {//重写Actor特质类的抽象方法act,从而实现自己的业务逻辑override def act(): Unit = {//消息循环while (true)receive {case "Hi" => {for (i <- 1 to 10) {println("This is Actor...")Thread.sleep(1000)}}}}
}

打印结果:

This is Main...
This is Actor...
This is Main...
//...


 

 

发送消息

actor是一个处理异步消息的对象。你向某个actor发送消息,该actor处理消息,或许还会向其他actor发送消息做进一步的处理。消息可以是任意对象。

 

要发送消息,我们可以用为actor定义的 ! 操作符:

actor1 ! "Hi"

代码示例:

import scala.actors.Actorobject exam1 {def main(args: Array[String]): Unit = {//创建actor实例actor1val actor1 = new HiActor//启动actor1actor1.start()//向actor1发送 Hiactor1 ! "Hi"}
}//继承Actor特质类
class HiActor extends Actor {//重写Actor特质类的抽象方法act,从而实现自己的业务逻辑override def act(): Unit = {//消息循环while (true)//使用case模式匹配接收到的消息receive {case "Hi" => println("Hello")}}
}

消息被送往该actor,当前线程继续执行。这样的做法称作“发完就忘”。也可以等待一个回复。

 


一个比较好的做法就是使用样例类作为消息,这样,actor就可以使用模式匹配来处理消息了。

代码示例:

import scala.actors.Actorobject exam1 {def main(args: Array[String]): Unit = {//创建actor实例actor1val actor1 = new HiActor//启动actor1actor1.start()//向actor1发送消息actor1 ! Person("Alice", 20)actor1 ! Person("Bob", 29)actor1 ! Person("Cindy", 16)}
}//继承Actor特质类
class HiActor extends Actor {//重写Actor特质类的抽象方法act,从而实现自己的业务逻辑override def act(): Unit = {//消息循环while (true)//使用case模式匹配接收到的消息receive {//可以是具体的对象//例如:case Person("Alice",20) => println("OK")case Person(name, age) => println("this is " + name + " :  age is " + age)}}
}
//样例类
case class Person(val name: String, val age: Int)

打印结果:

this is Alice :  age is 20
this is Bob :  age is 29
this is Cindy :  age is 16

这样就可以实现一个检查人信息的actor了。

 

 

 

接收消息

 

发送到actor的消息被存放在一个“邮箱”中。Receive方法从邮箱获取下一条消息并将它传递给它的参数,该参数是一个偏函数。

例如:

receive{case Deposit(amount) =>   ...case Withdraw(amount) =>   ... 
}


Receive方法的参数是:

{case Deposit(amount) =>   ...case Withdraw(amount) =>   ...
}

Receive方法将从邮箱中收来的消息传递给这个偏函数。

 

 

注意:

 

  • 消息投递的过程是异步的,它们会以什么样的顺序到达并不确定,所以在设计的时候要考虑到这一点,让程序不要依赖任何特定的消息投递顺序。
  •  
  • 如果receive方法被调用时并没有消息,则该调用会阻塞,直到有消息抵达。
  •  
  • 如果邮箱中没哟任何消息可以被偏函数处理,则对receive方法的调用也会阻塞,直到一个可以匹配的消息抵达。
  •  
  • 邮箱可能被那些不与任何case语句匹配的消息占满。你可以添加一个case _ 语句来处理任意的消息。
  •  
  • 邮箱会串行化消息。Actor运行在单个线程中。它会先接收一条消息,然后接收下一条消息。不过在actor的代码中不用担心争用状况。

例如:

receive {case Deposit(amount) => balance += amoutcase Withdraw(amount) => balance -= amoutcase   ... 
}

Actor中不会有将增值和减值互惨在一起的危险。

 

 

 

向其他Actor发送消息

当运算被分拆到不同actor来并行处理问题的各个部分时,这些处理结果需要被收集到一起。Actor可以将结果存入到一个线程安全的数据结构当中,比如一个并发的哈希映射,但actor模型并不鼓励使用共享数据。因而当actor计算出结果后,应该向另一个actor发送消息。

 

Actor计算结果发送方向:

  1. 可以是一些全局的actor。不过,当actor数量很多时,该方案伸缩性并不好。
  2. Actor可以构造成带有指向另一个或多个actor的引用。
  3. Actor可以接收带有指向另一个actor的引用消息。
  4. Actor可以返回消息给发送方。

 

 

 

消息通道

你可以通过共享消息通道来实现actor之间消息通信。

这样做的好处:

  • 消息通道是类型安全的——你只能发送或接收特定的类型消息。
  • 你不会不小心通过消息通道调用到某个actor方法。

 

消息通道可以是一个OutputChannel(带 ! 的方法),也可以是一个InputChannel(带receive或react方法)。Channel类同时扩展了OutputChannel和InputChannel特质。

 

要构造一个消息通道,你可以提供一个actor:

val channel = new Channel[Int](actor1)

如果不提供构造参数,那么消息通道就会绑定到当前执行的这个actor上。

 

代码示例:

import scala.actors.{Actor, Channel, OutputChannel}
import scala.actors.Actor._object exam1 {def main(args: Array[String]): Unit = {val actor1 = actor {val channel = new Channel[Int]val computeActor: Computer = new Computerval input:Seq[Int] = List(11,22,33)computeActor ! Compute(input,channel)channel.receive{case x:Int =>  //...case _ => //...}}actor1.start()}
}case class Compute(input: Seq[Int], result: OutputChannel[Int])class Computer extends Actor {override def act(): Unit = {while (true) {receive {case Compute(input, out) => {val answer = 88; out ! answer}}}}
}


  

同步消息和Future

同步消息

actor可以发送一个消息并等待回复,用 !? 操作符即可。

例如:

import scala.actors.Actorobject exam1 {def main(args: Array[String]): Unit = {//创建actor实例actor1val actor1 = new HiActor//启动actor1actor1.start()//向actor1发送消息并等待回复val reply = actor1 !? Person("Alice", 20)println(reply)}
}//继承Actor特质类
class HiActor extends Actor {//重写Actor特质类的抽象方法act,从而实现自己的业务逻辑override def act(): Unit = {//消息循环while (true)receive {//可以是具体的对象//例如:case Person("Alice",20) => println("OK")case Person(name, age) => {println("this is " + name + " :  age is " + age); sender ! "replied message..."}}}
}//样例类
case class Person(val name: String, val age: Int)

打印结果:

this is Alice :  age is 20
replied message...

这两种返回效果相同:

sender ! "replied message..."
reply("replied message...")

同步消息很容易引发死锁。通常而言,最好避免在actor的act方法里执行阻塞调用。

 

 

future

除了等待对方返回结果之外,你也可以选择接收一个future——这是一个将在结果可用时产出结果的对象。使用 !! 方法:

val replyFuture = acount !! Deposite(1000)

isSet方法会检查结果是否已经可用。要接收该结果,使用函数调用的表示方法:

val reply = replyFuture()

这个调用将会阻塞,直到回复被发送。

 

 

 

共享线程

某些程序包含的actor过多,以至于要为actor创建单独的线程开销会很大。此时需要考虑在同一个线程中运行多个actor。Actor有时大部分时间用户等待消息,这时actor所在的单独线程会堵塞,与其这样不如用一个线程来执行多个actor的消息处理函数。

 

在Scala中,react方法可以实现这样的功能。react方法接收一个偏函数,并将它添加到邮箱,然后退出。

  

react工作原理

当你调用一个actor的start时,start方法会以某种方式来确保最终会有某个线程来调用那个actor的act方法。如果act方法调用了react ,则react方法会在actor的邮箱中查找传递给偏函数的能够处理的消息 。(和receive方法一样,传递待处理消息给偏函数的isDefinedAt方法。) 如果找到一个可以处理的消息,react 会安排一个在未来某个时间处理该消息的计划并抛出异常。如果它没有找到这样的消息,它会将actor置于“冷存储” 状态 ,在它通过邮箱收到更多消息时重新激活,并抛出异常。不论是哪种情况,react都会以这个异常的方式完成其执行,act 方法也随之结束  调用act的线程会捕获这个异常,忘掉这个actor , 并继续处理其他事务。这就是为什么你想要react在处理第一个消息之外做更多的事,你将需要在偏函数中再次调用act方法 ,或使用某种其他的手段来让react再次被调用。

 

较为复杂的代码示例

import java.net.InetAddress
import java.net.UnknownHostException
import actors._, Actor._
import scala.actors.Actor/*** scala Actor构建在java的线程基础之上的,* 为了避免频繁的线程创建、销毁和切换等,scala中提供了react方法* 方法执行完毕后,仍然被保留*/
object NameResolver extends Actor{def act(){react{//模式匹配  case Net(name,actor) =>/*** 想actor发送解析后的IP地址* 该例中,actor为本身,即向本身发送消息,存入邮箱*/actor!getIpAddress(name)//再次调用act方法,从本身邮箱中读取消息  //如果消息为空,则等待  actcase "EXIT" =>println("Name resolver exiting.")//匹配邮箱中的单个信息,本例中会匹配邮箱中的IP地址信息case msg =>println("Unhandled message: " + msg)act}}def getIpAddress(name:String):Option[InetAddress] ={try{println(InetAddress.getByName(name))Some(InetAddress.getByName(name))} catch {case _: UnknownHostException => None}}
}
case class Net(name:String,actor:Actor)object ActorWithReAct extends App {//启动Actor  NameResolver.start()//发消息  NameResolver!Net("www.baidu.com",self)//接收消息  println(self.receive{case msg => msg})
}  


 

 

Actor生命周期

actor的act方法在actor的start方法被调用的时候开始执行。通常,actor接下来做的事情是进入某个循环,例如:

def act(): Unit ={while (...){receive{//}}
}
 

actor在如下情形之一会终止执行:

  • Act方法返回
  • Act方法由于异常被终止
  • Actor调用exit方法

 

exit方法是个受保护的方法。它只能被Actor子类中调用。例如:

val actor2 = actor{while(true){receive{case "Hi" => println("Hello")case "Bye" => exit()}}
}

其他方法不能调用exit()来终止一个actor。

 

当actor因一个异常终止时,退出原因就是UncaughtException样例类的一个实例。该样例类有如下属性:

 

acotr: 抛出异常的actor。

  • message: Some(msg)  msg为该actor处理的最后一条消息;或者None,如果actor在没来得及处理任何消息之前就挂掉的话。
  • Sender:Some(channel)  channel代表最后一条消息的发送方的输出消息通道;或者None,如果actor在没来得及处理任何消息之前就挂掉。
  • Thread:  actor退出时所在的线程。
  • Cause: 相应的异常。

 



总结

  1. 每个actor都要扩展Actor类并提供act方法。
  2. 要往actor发送消息,可以用actor ! Message。
  3. 消息发送是异步的:“发完就忘”
  4. 要接收消息,actor可以调用receive或react,通常是在循环中这样做。
  5. receive/react的参数是由case语句组成的代码块(是一个偏函数)
  6. 不同actor之间不应该共享状态。总是使用消息来发送数据。
  7. 不要直接调用actor方法。通过消息进行通信。
  8. 避免同步消息——也就是说将发送消息和等待响应分开。
  9. 不同actor可以通过react而不是receive来共享线程,前提是消息处理器的控制流转足够简单。
  10. 让actor挂掉是可以的,前提是你有其他actor监控着actor生死。用链接来设置监控关系。

这篇关于Scala Actor通信的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1020834

相关文章

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

【STM32】SPI通信-软件与硬件读写SPI

SPI通信-软件与硬件读写SPI 软件SPI一、SPI通信协议1、SPI通信2、硬件电路3、移位示意图4、SPI时序基本单元(1)开始通信和结束通信(2)模式0---用的最多(3)模式1(4)模式2(5)模式3 5、SPI时序(1)写使能(2)指定地址写(3)指定地址读 二、W25Q64模块介绍1、W25Q64简介2、硬件电路3、W25Q64框图4、Flash操作注意事项软件SPI读写W2

vue2 组件通信

props + emits props:用于接收父组件传递给子组件的数据。可以定义期望从父组件接收的数据结构和类型。‘子组件不可更改该数据’emits:用于定义组件可以向父组件发出的事件。这允许父组件监听子组件的事件并作出响应。(比如数据更新) props检查属性 属性名类型描述默认值typeFunction指定 prop 应该是什么类型,如 String, Number, Boolean,

linux中使用rust语言在不同进程之间通信

第一种:使用mmap映射相同文件 fn main() {let pid = std::process::id();println!(

C++编程:ZeroMQ进程间(订阅-发布)通信配置优化

文章目录 0. 概述1. 发布者同步发送(pub)与订阅者异步接收(sub)示例代码可能的副作用: 2. 适度增加缓存和队列示例代码副作用: 3. 动态的IPC通道管理示例代码副作用: 4. 接收消息的超时设置示例代码副作用: 5. 增加I/O线程数量示例代码副作用: 6. 异步消息发送(使用`dontwait`标志)示例代码副作用: 7. 其他可以考虑的优化项7.1 立即发送(ZMQ_IM

VB和51单片机串口通信讲解(只针对VB部分)

标记:该篇文章全部搬自如下网址:http://www.crystalradio.cn/thread-321839-1-1.html,谢谢啦            里面关于中文接收的部分,大家可以好好学习下,题主也在研究中................... Commport;设置或返回串口号。 SettingS:以字符串的形式设置或返回串口通信参数。 Portopen:设置或返回串口

深入理解TCP通信

这大概是自己博客上面第三次写TCP通信demo了,总是写同样的内容也不太好啊,不过每一次都比前一次进步一点。这次主要使用了VIM编辑工具、gdb调试、wireshirk、netstat查看网络状态。 参考《C++服务器视频教程》、《Unix网络编程》 一、VIM常用命令 vim server.cpp #打开一个文件:w 写入文件:wq 保存并退出:q! 不保存退出显示行号

电子电气架构---私有总线通信和诊断规则

电子电气架构—私有总线通信和诊断规则 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力证明自己,无利益不试图说服别人,是精神上的节能减排。 无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事.而不是让内心的烦躁、

DoIP-ISO 13400-1 道路车辆-基于互联网协议的诊断通信(DoIP)-第 1 部分:一般信息和用例定义 (1/2)

如下内容基于2011版本的 ISO 13400开展,内容较多,拆分为2篇,此篇为 1/2。 前言 ISO(国际标准化组织)是一个全球范围内的国际标准机构联合体(ISO 成员机构)。国际标准的制备工作通常通过 ISO 技术委员会进行。每个相关成员机构都有权在已建立的技术委员会中代表其利益。与 ISO 保持联系的国际组织、政府和非政府组织也参与这项工作。ISO 与国际电工委员会(IEC)在所有电气

微软C#套接字异步通信代码

Asynchronous Server Socket Example.NET Framework 4 其他版本 The following example program creates a server that receives connection requests from clients. The server is built with an asynchronous socket,