Concurrency in Scala

2024-08-27 10:48
文章标签 scala concurrency

本文主要是介绍Concurrency in Scala,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文:https://twitter.github.io/scala_school/zh_cn/concurrency.html

Runnable/Callable

Runnable接口只有一个没有返回值的方法。

trait Runnable {def run(): Unit
}

Callable与之类似,除了它有一个返回值

trait Callable[V] {def call(): V
}

线程

Scala并发是建立在Java并发模型基础上的。

在Sun JVM上,对IO密集的任务,我们可以在一台机器运行成千上万个线程。

一个线程需要一个Runnable。你必须调用线程的 start 方法来运行Runnable。

scala> val hello = new Thread(new Runnable {def run() {println("hello world")}
})
hello: java.lang.Thread = Thread[Thread-3,5,main]scala> hello.start
hello world

当你看到一个类实现了Runnable接口,你就知道它的目的是运行在一个线程中。

单线程代码

这里有一个可以工作但有问题的代码片断。

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Dateclass NetworkService(port: Int, poolSize: Int) extends Runnable {val serverSocket = new ServerSocket(port)def run() {while (true) {// This will block until a connection comes in.val socket = serverSocket.accept()(new Handler(socket)).run()}}
}class Handler(socket: Socket) extends Runnable {def message = (Thread.currentThread.getName() + "\n").getBytesdef run() {socket.getOutputStream.write(message)socket.getOutputStream.close()}
}(new NetworkService(2020, 2)).run

每个请求都会回应当前线程的名称,所以结果始终是 main 。

这段代码的主要缺点是在同一时间,只有一个请求可以被相应!

你可以把每个请求放入一个线程中处理。只要简单改变

(new Handler(socket)).run()

(new Thread(new Handler(socket))).start()

但如果你想重用线程或者对线程的行为有其他策略呢?

Executors

随着Java 5的发布,它决定提供一个针对线程的更抽象的接口。

你可以通过 Executors 对象的静态方法得到一个 ExecutorService 对象。这些方法为你提供了可以通过各种政策配置的 ExecutorService ,如线程池。

下面改写我们之前的阻塞式网络服务器来允许并发请求。

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Dateclass NetworkService(port: Int, poolSize: Int) extends Runnable {val serverSocket = new ServerSocket(port)val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)def run() {try {while (true) {// This will block until a connection comes in.val socket = serverSocket.accept()pool.execute(new Handler(socket))}} finally {pool.shutdown()}}
}class Handler(socket: Socket) extends Runnable {def message = (Thread.currentThread.getName() + "\n").getBytesdef run() {socket.getOutputStream.write(message)socket.getOutputStream.close()}
}(new NetworkService(2020, 2)).run

这里有一个连接脚本展示了内部线程是如何重用的。

$ nc localhost 2020
pool-1-thread-1$ nc localhost 2020
pool-1-thread-2$ nc localhost 2020
pool-1-thread-1$ nc localhost 2020
pool-1-thread-2

Futures

Future 代表异步计算。你可以把你的计算包装在Future中,当你需要计算结果的时候,你只需调用一个阻塞的 get() 方法就可以了。一个 Executor 返回一个 Future 。如果使用Finagle RPC系统,你可以使用 Future 实例持有可能尚未到达的结果。

一个 FutureTask 是一个Runnable实现,就是被设计为由 Executor 运行的

val future = new FutureTask[String](new Callable[String]() {def call(): String = {searcher.search(target);
}})
executor.execute(future)

现在我需要结果,所以阻塞直到其完成。

val blockingResult = Await.result(future)

参考 Scala School的Finagle介绍中大量使用了Future,包括一些把它们结合起来的不错的方法。以及 Effective Scala 对Futures的意见。

线程安全问题

class Person(var name: String) {def set(changedName: String) {name = changedName}
}

这个程序在多线程环境中是不安全的。如果有两个线程有引用到同一个Person实例,并调用 set ,你不能预测两个调用结束后 name 的结果。

在Java内存模型中,允许每个处理器把值缓存在L1或L2缓存中,所以在不同处理器上运行的两个线程都可以有自己的数据视图。

让我们来讨论一些工具,来使线程保持一致的数据视图。

三种工具

同步

互斥锁(Mutex)提供所有权语义。当你进入一个互斥体,你拥有它。同步是JVM中使用互斥锁最常见的方式。在这个例子中,我们会同步Person。

在JVM中,你可以同步任何不为null的实例。

class Person(var name: String) {def set(changedName: String) {this.synchronized {name = changedName}}
}
volatile

随着Java 5内存模型的变化,volatile和synchronized基本上是相同的,除了volatile允许空值。

synchronized 允许更细粒度的锁。 而 volatile 则对每次访问同步。

class Person(@volatile var name: String) {def set(changedName: String) {name = changedName}
}
AtomicReference

此外,在Java 5中还添加了一系列低级别的并发原语。 AtomicReference 类是其中之一

import java.util.concurrent.atomic.AtomicReferenceclass Person(val name: AtomicReference[String]) {def set(changedName: String) {name.set(changedName)}
}
这个成本是什么?

AtomicReference 是这两种选择中最昂贵的,因为你必须去通过方法调度(method dispatch)来访问值。

volatile 和 synchronized 是建立在Java的内置监视器基础上的。如果没有资源争用,监视器的成本很小。由于 synchronized 允许你进行更细粒度的控制权,从而会有更少的争夺,所以 synchronized 往往是最好的选择。

当你进入同步点,访问volatile引用,或去掉AtomicReferences引用时, Java会强制处理器刷新其缓存线从而提供了一致的数据视图。

如果我错了,请大家指正。这是一个复杂的课题,我敢肯定要弄清楚这一点需要一个漫长的课堂讨论。

Java5的其他灵巧的工具

正如前面提到的 AtomicReference ,Java5带来了许多很棒的工具。

CountDownLatch

CountDownLatch 是一个简单的多线程互相通信的机制。

val doneSignal = new CountDownLatch(2)
doAsyncWork(1)
doAsyncWork(2)doneSignal.await()
println("both workers finished!")

先不说别的,这是一个优秀的单元测试。比方说,你正在做一些异步工作,并要确保功能完成。你的函数只需要 倒数计数(countDown) 并在测试中 等待(await) 就可以了。

AtomicInteger/Long

由于对Int和Long递增是一个经常用到的任务,所以增加了 AtomicInteger 和 AtomicLong 。

AtomicBoolean

我可能不需要解释这是什么。

ReadWriteLocks

读写锁(ReadWriteLock) 使你拥有了读线程和写线程的锁控制。当写线程获取锁的时候读线程只能等待。

让我们构建一个不安全的搜索引擎

下面是一个简单的倒排索引,它不是线程安全的。我们的倒排索引按名字映射到一个给定的用户。

这里的代码天真地假设只有单个线程来访问。

注意使用了 mutable.HashMap 替代了默认的构造函数 this()

import scala.collection.mutablecase class User(name: String, id: Int)class InvertedIndex(val userMap: mutable.Map[String, User]) {def this() = this(new mutable.HashMap[String, User])def tokenizeName(name: String): Seq[String] = {name.split(" ").map(_.toLowerCase)}def add(term: String, user: User) {userMap += term -> user}def add(user: User) {tokenizeName(user.name).foreach { term =>add(term, user)}}
}

这里没有写如何从索引中获取用户。稍后我们会补充。

让我们把它变为线程安全

在上面的倒排索引例子中,userMap不能保证是线程安全的。多个客户端可以同时尝试添加项目,并有可能出现前面 Person 例子中的视图错误。

由于userMap不是线程安全的,那我们怎样保持在同一个时间只有一个线程能改变它呢?

你可能会考虑在做添加操作时锁定userMap。

def add(user: User) {userMap.synchronized {tokenizeName(user.name).foreach { term =>add(term, user)}}
}

不幸的是,这个粒度太粗了。一定要试图在互斥锁以外做尽可能多的耗时的工作。还记得我说过如果不存在资源争夺,锁开销就会很小吗。如果在锁代码块里面做的工作越少,争夺就会越少。

def add(user: User) {// tokenizeName was measured to be the most expensive operation.val tokens = tokenizeName(user.name)tokens.foreach { term =>userMap.synchronized {add(term, user)}}
}

SynchronizedMap

我们可以通过SynchronizedMap特质将同步混入一个可变的HashMap。

我们可以扩展现有的InvertedIndex,提供给用户一个简单的方式来构建同步索引。

import scala.collection.mutable.SynchronizedMapclass SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}

如果你看一下其实现,你就会意识到,它只是在每个方法上加同步锁来保证其安全性,所以它很可能没有你希望的性能。

Java ConcurrentHashMap

Java有一个很好的线程安全的ConcurrentHashMap。值得庆幸的是,我们可以通过JavaConverters获得不错的Scala语义。

事实上,我们可以通过扩展老的不安全的代码,来无缝地接入新的线程安全InvertedIndex。

import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])extends InvertedIndex(userMap) {def this() = this(new ConcurrentHashMap[String, User] asScala)
}

让我们加载InvertedIndex

原始方式

trait UserMaker {def makeUser(line: String) = line.split(",") match {case Array(name, userid) => User(name, userid.trim().toInt)}
}class FileRecordProducer(path: String) extends UserMaker {def run() {Source.fromFile(path, "utf-8").getLines.foreach { line =>index.add(makeUser(line))}}
}

对于文件中的每一行,我们可以调用 makeUser 然后 add 到 InvertedIndex中。如果我们使用并发InvertedIndex,我们可以并行调用add因为makeUser没有副作用,所以我们的代码已经是线程安全的了。

我们不能并行读取文件,但我们 可以 并行构造用户并且把它添加到索引中。

一个解决方案:生产者/消费者

异步计算的一个常见模式是把消费者和生产者分开,让他们只能通过 队列(Queue) 沟通。让我们看看如何将这个模式应用在我们的搜索引擎索引中。

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {def run() {Source.fromFile(path, "utf-8").getLines.foreach { line =>queue.put(line)}}
}// Abstract consumer
abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {def run() {while (true) {val item = queue.take()consume(item)}}def consume(x: T)
}val queue = new LinkedBlockingQueue[String]()// One thread for the producer
val producer = new Producer[String]("users.txt", q)
new Thread(producer).start()trait UserMaker {def makeUser(line: String) = line.split(",") match {case Array(name, userid) => User(name, userid.trim().toInt)}
}class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {def consume(t: String) = index.add(makeUser(t))
}// Let's pretend we have 8 cores on this machine.
val cores = 8
val pool = Executors.newFixedThreadPool(cores)// Submit one consumer per core.
for (i <- i to cores) {pool.submit(new IndexerConsumer[String](index, q))
}

这篇关于Concurrency in Scala的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1111452

相关文章

【scala 安装错误】错误: 找不到或无法加载主类 scala.tools.nsc.MainGenericRunner

错误: 找不到或无法加载主类 scala.tools.nsc.MainGenericRunner 原因: Scala安装路径中包含空格。 解决办法:scala 不要安装在E:\Program Files 这种有空格的目录下,简直坑

Scala模式匹配下提取器构造

示例代码: object :> {def unapply[A] (list:List[A]) = {Some( (list.init,list.last) )}}object Extractor_Advanced {def main(args: Array[String]): Unit = {(1 to 9).toList match{ case _ :> 9 => println(

从spark源码的角度思考scala中的模式匹配

1.scala中模式匹配 2.spark源码中的模式匹配思考 spark中master会收到worker发过来的akka的消息, 此消息是case class即(Master.class中): case class RegisterWorker(id:String,host:String,port:Int,cores:Int,memory:Int,webUiPort:int

Scala界面事件处理

示例代码: import scala.swing.SimpleSwingApplicationimport scala.swing.MainFrameimport scala.swing.Buttonimport scala.swing.Labelimport scala.swing.Orientationimport scala.swing.BoxPanelimpo

Scala界面Panel、Layout初探

示例代码: package com.dt.scala.guiimport scala.swing.SimpleSwingApplicationimport scala.swing.MainFrameimport scala.swing.Buttonimport scala.swing.Labelimport scala.swing.Orientationimport scal

scala界面GUI编程实战初步了解

示例代码: import scala.swing._//SimpleSwingApplication继承自SwingApplication类(此类中有main方法,因此可以运行显示界面)object Hello_GUI extends SimpleSwingApplication {def top = new MainFrame{ //顶级容器title = "Hello GUI"co

Scala并发编程react、loop代码实战详解

示例代码及注释: //scala并发编程中的react和loop,共同特点://通过线程存用的方式让性能有所提升。//Actor本身的运行,被actor子系统管理的时候,会有一个或者多个远程的线程让当前的actor使用//一般情况下每个Actor都有自己的线程。只有有自己的线程时,我们的Actor中的actor方法才会执行。//但是,这样线程的开销会非常大,所以为了共用线

scala并发编程原生线程Actor、Case Class下的消息传递和偏函数实战

参考代码: import scala.actors._case class Person(name:String,age:Int)class HelloActor extends Actor{def act(){while(true){receive{case Person(name,age)=>{ //偏函数println("Name: "+ name + ":" +"Age:"

scala基础概念

Scala是面向行的语言,Scala 语句末尾的分号写或者不写都可以。 对象 - 对象有属性和行为。例如:一只哈士奇的属性有:颜色,名字,行为有:叫、跑、吃等。对象是一个类的实例。 类 - 类是对象的抽象;对象是类的具体实例。 方法 - 方法描述的基本的行为,一个类可以包含多个方法。 字段 - 每个对象都有它唯一的实例变量集合,即字段。对象的属性通过给字段赋值来创建。 基本语法

Scala:Scala基础语法【Scala语言是一个完全面向对象编程语言-->万物皆对象;Scala语言是一个完全函数式编程语言-->万物皆函数】

一、变量和数据类型 1、变量 说明:在Scala中声明一个变量时,可以不指定类型,编译器根据值确定 var | val 变量名 [: 变量类型] = 变量值 声明变量时,类型可以省略(编译器自动推导,即类型推导)类型确定后,就不能修改,说明Scala是强数据类型语言。变量声明时,需要初始值object TestVar {def main(args: Array[String]): Uni