haskell并发编程

2024-02-29 06:58
文章标签 并发 编程 haskell

本文主要是介绍haskell并发编程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

线程相关


forkIO :: IO a -> IO ThreadId

forkIO是Haskell内建的函数,它的参数是一个IO动作,forkIO所做的事情就是创建一个并发的Haskell线程 来执行这个IO动作。一旦这个新线程建立,Haskell的运行时系统便会将它与其他Haskell线程并行执行。

Haskell里面由forkIO产生出来的线程是非常轻量级的:只占用几百个字节的内存,所以一个程序里面就算 产生上千个线程也是完全正常的 。(注:ForkIO creates a spark not OS thread and then the scheduler decides when sparks should be scheduled to threads. Thus you get a guarantee of concurrent but not parallel execution.)

例子:

module Main where

import Control.Concurrent.MVar
import Control.Concurrent
import Data.Maybe

timeout :: Int -> IO a -> IO (Maybe a)
timeout time action = do
      someMVar <- newEmptyMVar -- MVar is a Maybe
      timeoutThread <- forkIO $ nothingIzer time someMVar
      forkIO $ actionRunner action someMVar timeoutThread
      takeMVar someMVar >>= return
      where 
          nothingIzer time mvar = threadDelay time >> putMVar mvar Nothing
          actionRunner action mvar timeoutThread =do
              res <- action
              killThread timeoutThread
              putMVar mvar $ Just res

main :: IO ()
main = do
     res <- timeout (5 * 10 ^ 6) (getLine >>= putStrLn)
     case resof
         Nothing -> putStrLn "Timeout"
         Just x -> putStrLn "Success"

STM相关

传统的并发模型通过Mutex/Conditional Variable/Semaphore的设施来控制对共享资源的访问控制,但是这一经典模型使得编写正确高效的并发程序变得异常困难:

  .遗漏合适的锁保护导致的race condition
  .锁使用不当导致的死锁deadlock
  .异常未合适处理导致的程序崩溃
  .条件变量通知操作遗漏导致的等待处理没有被合适的唤醒
  .锁粒度控制不当造成性能下降

STM的基本设计规则如下:
   .对共享资源的访问进行控制从而使不同线程的操作相互隔离
   .规则约束:
      如果没有其它线程访问共享数据,那么当前线程对数据的修改同时对其它线程可见
      反之,当前线程的操作将被完全丢弃并自动重启

这里的要么全做要么什么也不做的方式保证了共享数据访问操作的原子性,和数据库中的Transaction很相像。


STM的主要优点之一是可组合性(composability)与模块性(modularity)。不同的fragments可能组合成较大components,而基于锁的系统就很难做到。

----------------------------------------------------------------------------------------------

transfer :: Account -> Account -> Int -> IO ( )

-- Transfer 'amount' from account 'from' to account 'to'
transfer from to amount = atomically (
      do { deposit to amount; withdraw from amount }
)

atomically :: STM a -> IO a

atomically的参数是一个类型为STM a的动作。STM动作类似于IO动作,它们都可能具有副作用,但STM动作的副作用的容许范围要小得多。 STM中你可以做的事情主要就是对事务变量(类型为TVar a)进行读写,就像我们在IO动作里面主要对IORef进行读写一样

atomically用伪代码描述如下:

  atomically action = do

    varState <- getStateOfTVars
    (newState, ret) <- runTransactionWith action varState
    success <- attemptToCommitChangesToTVars newState
    if success
    then return ret
    else atomically action -- try again


readTVar  :: TVar a -> STM a
writeTVar :: TVar a -> a -> STM ( )

跟IO动作一样,STM动作也可以由do块组合起来,实际上,do块针对STM动作进行了重载,return也是;这样它们便可以运用于STM和IO两种动作了。其实Haskell并没有特别针对IO和STM动作来重载do和return,IO和STM其实只是一个更一般的模式的特例,这个更一般的模式便是所谓的monad,do和return的重载便是通过用Haskell的非常泛化的"类型的类型"(type-class)系统来表达monad而得以实现的。

type Account =TVar Int
withdraw :: Account -> Int -> STM ( )
withdraw acc amount = do {
     bal <- readTVar acc; writeTVar acc (bal - amount)
}

我们用一个包含一个Int(账户余额)的事务变量来表示一个账户。withdraw是一个STM动作,将账户中的余额提走amount。

为了完成transfer的定义,我们可以通过withdraw来定义deposit:

deposit :: Account -> Int -> STM ( )
deposit acc amount = withdraw acc (- amount)

Haskell的类型系统优雅地阻止了我们在事务之外读写TVar


例如假设我们这样写:
bad :: Account -> IO ( )
bad acc = dohPutStr  stdout  "Withdrawing..."; withdraw acc 10 }

以上代码不能通过编译,因为hPutStr是一个IO动作,而withdraw则是一个STM动作,这两者不能放在同一个do块中。但如果我们把withdraw再放在一个atomically调用当中就可以了(atomically返回IO动作):

good :: Account -> IO ( )

good acc = dohPutStr stdout "Withdrawing..."; atomically (withdraw acc 10) }



操作

类型签名

atomically

STM a -> IO a

retry

STM a

orElse

STM a -> STM a -> STM a

newTVar

a -> STM (TVar a)

readTVar

TVar a -> STM a

writeTVar

TVar a -> a -> STM ( )


实例一 Santa.hs(来自Grey Wils的《Beautiful Code》)

module Main where

import Control.Concurrent.STM
import Control.Concurrent
import System.Random

main = do {
    elf_gp <- newGroup 3;
    sequence_ [ elf elf_gp n | n <- [1..10]];

    rein_gp <- newGroup 9;
    sequence_ [ reindeer rein_gp n | n <- [1..9]] ;

    forever (santa elf_gp rein_gp) }
  where
    elf gp id = forkIO (forever (do { elf1 gp id; randomDelay }))
    reindeer gp id = forkIO (forever (do { reindeer1 gp id; randomDelay }))


--圣诞老人是这个问题里面最有趣的,因为他会进行选择。他必须等到一组驯鹿或一组小矮人在那儿等他的时候才会继续行动。一旦他选择了是带领驯鹿还是小矮人之后,他便将他们带去做该做的事。

--santa利用awaitGroup来等待一个群准备好;choose拿到awaitGroup返回的两扇门之后便将它们传给run函数,后者依次操纵这两扇门--operatorGate会一直阻塞,直到所有小矮人(或驯鹿)都穿过门之后才会返回。
santa :: Group -> Group -> IO ()
santa elf_group rein_group = do {
    putStr "----------\n" ;
    choose [(awaitGroup rein_group, run "deliver toys"),
            (awaitGroup elf_group, run "meet in my study")] }
  where
    run :: String -> (Gate,Gate) -> IO ()
    run task (in_gate,out_gate) = do {
        putStr ("Ho! Ho! Ho! let's " ++ task ++ "\n") ;
        operateGate in_gate;
        operateGate out_gate }

helper1 :: Group -> IO () -> IO ()
helper1 group do_task = do {
    (in_gate, out_gate) <- joinGroup group;
    passGate in_gate;
    do_task;
    passGate out_gate }

elf1, reindeer1 :: Group -> Int -> IO ()
elf1 group id = helper1 group (meetInStudy id)
reindeer1 group id = helper1 group (deliverToys id)

meetInStudy id = putStr ("Elf " ++ show id ++ " meeting in the study\n")
deliverToys id = putStr ("Reindeer " ++ show id ++ " delivering toys\n")

---------------
data Group = MkGroup Int (TVar (Int, Gate, Gate))

newGroup :: Int -> IO Group
newGroup n = atomically (
    do {
        g1 <- newGate n;
        g2 <- newGate n;
        tv <- newTVar (n, g1, g2);
        return (MkGroup n tv) })

joinGroup :: Group -> IO (Gate,Gate)
joinGroup (MkGroup n tv) = atomically (
    do {
        (n_left, g1, g2) <- readTVar tv;
        check (n_left > 0);
        writeTVar tv (n_left-1, g1, g2);
        return (g1,g2) })

awaitGroup :: Group -> STM (Gate,Gate)
awaitGroup (MkGroup n tv) = do {
    (n_left, g1, g2) <- readTVar tv;
    check (n_left == 0);
    new_g1 <- newGate n;
    new_g2 <- newGate n;
    writeTVar tv (n,new_g1,new_g2);
    return (g1,g2) }

---------------
data Gate = MkGate Int (TVar Int)

newGate :: Int -> STM Gate
newGate n = do { tv <- newTVar 0; return (MkGate n tv) }

passGate :: Gate -> IO ()
passGate (MkGate n tv) = atomically (
    do { n_left <- readTVar tv;
         check (n_left > 0);
         writeTVar tv (n_left-1) })

operateGate :: Gate -> IO ()
operateGate (MkGate n tv) = do {
    atomically (
        writeTVar tv n);
        atomically (
            do { n_left <- readTVar tv; check (n_left == 0) }) }

----------------

forever :: IO () -> IO ()
-- Repeatedly perform the action
forever act = do { act; forever act }

randomDelay :: IO ()
-- Delay for a random time between 1 and 1000,000 microseconds
randomDelay = do {

    waitTime <- getStdRandom (randomR (1, 1000000));
    threadDelay waitTime }


--foldr1 orElse [x1, … , xn]的结果是x1 orElse x2 orElse x3 … orElse xn)

--choose首先在各个动作之间作一次原子选择,取得返回出来的动作(act,类型为IO( )),然后执行该动作。

choose :: [(STM a, a -> IO ())] -> IO ()
choose choices = do {
    to_do <- atomically (foldr1 orElse stm_actions);
    to_do }
where
    stm_actions :: [STM (IO ())]
    stm_actions = [ do { val <- guard; return (rhs val) } | (guard, rhs) <- choices ]

If there is no stm installed, you need to run `cabal install stm` before build it.

$ ghc Santa.hs -package stm -o  santa


实例二

module DirectAddressTable 
( DAT
, newDAT
, lookupDAT
, insertDAT
, getAssocsDAT
)
where
import Data.Array.IO
import Data.Array.MArray

newtype DAT = DAT (IOArray Int Char)

-- create a fixed size array; missing keys have value '-'.
newDAT :: Int -> IO DAT
newDAT n = do a <-newArray (0, n - 1) '-'
              return (DAT a)

-- lookup an item.
lookupDAT :: DAT -> Int -> IO (Maybe Char)
lookupDAT (DAT a) i = do   

    c <- readArray a i 
 return (if c=='-'then Nothing else Just c)

-- insert an item
insertDAT :: DAT -> Int -> Char -> IO ()
insertDAT (DAT a) i v = writeArray a i v

-- get all associations (exclude missing items, i.e. those whose value is '-').
getAssocsDAT :: DAT -> IO [(Int,Char)]
getAssocsDAT (DAT a) = do
    assocs <- getAssocs a
    return [ (k,c) | (k,c) <- assocs, c /= '-' ]

   I then have a main program that initializes a new table, forks some threads, with each thread writing and reading some fixed number of values to the just initialized table. The overall number of elements to write is fixed. The number of threads to use is a taken from a command line argument, and the elements to process are evenly divided among the threads.


-- file DirectTableTest.hs
import DirectAddressTable
import Control.Concurrent
import Control.Parallel
import System.Environment

main =  do
   args <- getArgs
   let numThreads =read (args !! 0)
   vs <- sequence (replicate numThreads newEmptyMVar)
   a <- newDAT arraySize  
   sequence_ [forkIO (doLotsOfStuff numThreads i a >>= putMVar v) | (i,v) <-zip [1..] vs]
   sequence_ [takeMVar v >>= \a -> getAssocsDAT a >>= \xs -> print (last xs)  | v <- vs]

doLotsOfStuff :: Int -> Int -> DAT -> IO DAT
doLotsOfStuff numThreads i a =  do
   let p j c = (c `seq` insertDAT a j c) >> lookupDAT a j >>= \v -> v `pseq` return ()
   sequence_ [ p j c | (j,c) <- bunchOfKeys i ]
   return a
   where bunchOfKeys i =take numElems $zip cyclicIndices $drop i cyclicChars
         numElems = numberOfElems `div` numThreads

cyclicIndices = cycle [0..highestIndex]
cyclicChars = cycle chars
chars = ['a'..'z']

-- Parameters
arraySize :: Int
arraySize = 100
highestIndex = arraySize - 1
numberOfElems = 10 * 1000 * 1000

   编译 & 运行

  >ghc --make -rtsopts -threaded -fforce-recomp -O2 DirectTableTest.hs

  >time ./DirectTableTest 1 +RTS -N1

  >time ./DirectTableTest 2 +RTS -N2



这篇关于haskell并发编程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/757925

相关文章

揭秘Python Socket网络编程的7种硬核用法

《揭秘PythonSocket网络编程的7种硬核用法》Socket不仅能做聊天室,还能干一大堆硬核操作,这篇文章就带大家看看Python网络编程的7种超实用玩法,感兴趣的小伙伴可以跟随小编一起... 目录1.端口扫描器:探测开放端口2.简易 HTTP 服务器:10 秒搭个网页3.局域网游戏:多人联机对战4.

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

Python异步编程中asyncio.gather的并发控制详解

《Python异步编程中asyncio.gather的并发控制详解》在Python异步编程生态中,asyncio.gather是并发任务调度的核心工具,本文将通过实际场景和代码示例,展示如何结合信号量... 目录一、asyncio.gather的原始行为解析二、信号量控制法:给并发装上"节流阀"三、进阶控制

Redis中高并发读写性能的深度解析与优化

《Redis中高并发读写性能的深度解析与优化》Redis作为一款高性能的内存数据库,广泛应用于缓存、消息队列、实时统计等场景,本文将深入探讨Redis的读写并发能力,感兴趣的小伙伴可以了解下... 目录引言一、Redis 并发能力概述1.1 Redis 的读写性能1.2 影响 Redis 并发能力的因素二、

Nginx实现高并发的项目实践

《Nginx实现高并发的项目实践》本文主要介绍了Nginx实现高并发的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录使用最新稳定版本的Nginx合理配置工作进程(workers)配置工作进程连接数(worker_co

C#多线程编程中导致死锁的常见陷阱和避免方法

《C#多线程编程中导致死锁的常见陷阱和避免方法》在C#多线程编程中,死锁(Deadlock)是一种常见的、令人头疼的错误,死锁通常发生在多个线程试图获取多个资源的锁时,导致相互等待对方释放资源,最终形... 目录引言1. 什么是死锁?死锁的典型条件:2. 导致死锁的常见原因2.1 锁的顺序问题错误示例:不同

PyCharm接入DeepSeek实现AI编程的操作流程

《PyCharm接入DeepSeek实现AI编程的操作流程》DeepSeek是一家专注于人工智能技术研发的公司,致力于开发高性能、低成本的AI模型,接下来,我们把DeepSeek接入到PyCharm中... 目录引言效果演示创建API key在PyCharm中下载Continue插件配置Continue引言

C#反射编程之GetConstructor()方法解读

《C#反射编程之GetConstructor()方法解读》C#中Type类的GetConstructor()方法用于获取指定类型的构造函数,该方法有多个重载版本,可以根据不同的参数获取不同特性的构造函... 目录C# GetConstructor()方法有4个重载以GetConstructor(Type[]

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal