akka-streams - 从应用角度学习:basic stream parts

2024-04-09 04:32

本文主要是介绍akka-streams - 从应用角度学习:basic stream parts,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   实际上很早就写了一系列关于akka-streams的博客。但那个时候纯粹是为了了解akka而去学习的,主要是从了解akka-streams的原理为出发点。因为akka-streams是akka系列工具的基础,如:akka-http, persistence-query等都是基于akka-streams的,其实没有真正把akka-streams用起来。这段时间所遇到的一些需求也是通过集合来解决的。不过,现在所处的环境还是逼迫着去真正了解akka-streams的应用场景。现状是这样的:跨入大数据时代,已经有大量的现代IT系统从传统关系数据库转到分布式数据库(非关系数据库)了。不难想象,这些应用的数据操作编程不说截然不同吧,肯定也会有巨大改变。特别是在传统SQL编程中依赖数据关系的join已经不复存在了,groupby、disctict等操作方法也不是所有的分布式数据库都能支持的。而这些操作在具体的数据呈现和数据处理中又是不可缺少的。当然,有很多需求可以通过集合来满足,但涉及到大数据处理我想最好还是通过流处理来实现,因为流处理stream-processing的其中一项特点就是能够在有限的内存空间里处理无限量的数据。所以流处理应该是分布式数据处理的理想方式了。这是这次写akka-streams的初衷:希望能通过akka-streams来实现分布式数据处理编程。

先从基本流部件basic stream parts开始,即source,flow,sink。这几个部件可以组合成一个所谓线性流linear-stream。一个流对数据的处理包括两部分:1、对流中元素进行转变,如:source:Source[Int,NotUsed] = Source(1 to 10).map(i => i.toString),把流里的所有Int转变成String、2、对流内元素进行计算得出运算结果,如:sink: Sink[Int,Future[Int]] = Sink.fold(0)(_ + _)。当我们run这个sink后得出Future[Int],如:res: Future[Int] = src.runWith(sink)。这两项对流元素的操作所产生的结果不同:元素转换得到动态流动的一串元素、运算元素得到一个静态值,这个运算值materialized-value只能在Sink里获取。即使有这样的表示方式:Source[Int,Future[Int]],这是个迷惑,这个运算值只能通过自定义的graph才能得到,也就是说基本组件是没这个功能的。举个具体的例子吧:val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int] 这个表达式貌似可以在Source方获取运算值,再看看Source.maybe[Int]:

  def maybe[T]: Source[T, Promise[Option[T]]] =Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T], Promise[Option[T]]]])

可以看出这个Source.maybe是从graph构建的。

上面这个例子里用一个Source对接一个Sink已经组成了一个完整的流,那么Flow是用来干什么的呢?由于运算值是无法当作流元素传递的,Flow只能是用来对Source传下来的元素进行转换后再传递给Sink,也就是说Flow是由一个或多个处理环节构成的。用Flow来分步实现功能是流处理实现并行运算的基本方式,如:

Source(1 to 10).async.via(Flow[Int].map(i => i + 1)).async.runWith(sink)

用async把这个流分割成3个运算发送给3个actor去同时运算。乍看之下map好像是个Flow,它们的作用也似乎相同,也可以对接Source。如:Source(1 to 10).map(_ + 1)。但map和Flow还是有分别的,从类型款式来看Flow[In,Out,M]比起map[A,B]多出来了M,运算值。所以via(map(_.toString))无法匹配类型。那么对于定义带有预先处理环节的Sink就必须用Flow来实现了:ex_sink = Flow[Int].map(_ + 1).to(sink)。

虽然运算值不能像流元素一样流动,但akka-streams提供了机制让用户选择是否返回某个节点的运算值M。系统默认只选择最最左边节点的M,如:

// A source that can be signalled explicitly from the outside
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler// A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int]val stream: RunnableGraph[(Cancellable, Future[Int])] =source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both)val stream1: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both)

运算值M可以通过viaMat,toMat选择,然后stream.run()获取。akka-streams提供了简便一点的运算方式runWith:指定runWith参数流组件的M为最终运算值。如:

// Using runWith will always give the materialized values of the stages added
// by runWith() itself
val r4: Future[Int] = source.via(flow).runWith(sink)
val r5: Promise[Option[Int]] = flow.to(sink).runWith(source)
val r6: (Promise[Option[Int]], Future[Int]) = flow.runWith(source, sink)

值得注意的是:我们可以分别从Source,Sink,Flow开始针对Source runWith(Sink), Sink runWith(Source)及Flow runWith (Source,Sink)。

用基础流组件Source,Flow,Sink构成的流是直线型的。也就是说从Source流出的元素会一个不漏的经过Flow进入Sink,不能多也不能少。可能Source.filter会产生疑惑,不过看看filter函数定义就明白了:

def filter(p: Out => Boolean): Repr[Out] = via(Filter(p))@InternalApi private[akka] final case class Filter[T](p: T => Boolean) extends SimpleLinearGraphStage[T] {override def initialAttributes: Attributes = DefaultAttributes.filteroverride def toString: String = "Filter"override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =new GraphStageLogic(shape) with OutHandler with InHandler {def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].deciderprivate var buffer: OptionVal[T] = OptionVal.noneoverride def preStart(): Unit = pull(in)override def onPush(): Unit =try {val elem = grab(in)if (p(elem))if (isAvailable(out)) {push(out, elem)pull(in)} elsebuffer = OptionVal.Some(elem)else pull(in)} catch {case NonFatal(ex) =>decider(ex) match {case Supervision.Stop => failStage(ex)case _                => pull(in)}}override def onPull(): Unit =buffer match {case OptionVal.Some(value) =>push(out, value)buffer = OptionVal.noneif (!isClosed(in)) pull(in)else completeStage()case _ => // already pulled}override def onUpstreamFinish(): Unit =if (buffer.isEmpty) super.onUpstreamFinish()// else onPull will completesetHandlers(in, out, this)}
}

怎样?够复杂的了吧。很明显,复杂点的流处理需要根据上游元素内容来维护内部状态从而重新构建向下游发送元素的机制。如果想实现join,groupby,distict这些功能就必然对流动元素除转换之外还需要进行增减操作。这项需求可能还必须留在后面的sream-graph章节中讨论解决方案了。不过临时解决方法可以通过运算值M来实现。因为M可以是一个集合,在构建这个M集合时是可以对集合元素进行增减的,下面这段代码示范了一种cassandra数据表groupby的效果:

  def getVouchers(terminalid: String, susp: Boolean)(implicit classicSystem: ActorSystem) = {implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra")implicit val ec = classicSystem.dispatchervar stmt = "select * from pos_on_cloud.txn_log where terminal = ? and txndate = ?"if (susp) stmt = "select * from pos_on_cloud.txn_hold where terminal = ? and txndate = ?"val source  = session.select(stmt,terminalid,LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")))val sink = Sink.fold[List[TxnItem],TxnItem](List[TxnItem]()){(acc,txn) =>if (acc.isEmpty) txn.copy(price = 1) :: accelseif (acc.head.num == txn.num) {if (txn.salestype == SALESTYPE.itm &&txn.txntype == TXNTYPE.sales) {val nacc = acc.head.copy(price = acc.head.price + 1,qty = acc.head.qty + txn.qty,amount = acc.head.amount + txn.amount,dscamt = acc.head.dscamt + txn.dscamt)nacc :: acc.drop(1)} else acc}else txn :: acc}for {vchs <- source.map(TxnItem.fromCqlRow).toMat(sink)(Keep.right).run()_ <- session.close(ec)} yield vchs}

当然,基本流组件在流模式数据库读写方面还是比较高效的,如:

    def futTxns(items: Seq[TxnItem]): Future[Seq[TxnItem]] = Source(items.toSeq).via(CassandraFlow.create(CassandraWriteSettings.defaults,CQLScripts.insertTxns,statementBinder)).runWith(Sink.seq)

 

这篇关于akka-streams - 从应用角度学习:basic stream parts的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887180

相关文章

51单片机学习记录———定时器

文章目录 前言一、定时器介绍二、STC89C52定时器资源三、定时器框图四、定时器模式五、定时器相关寄存器六、定时器练习 前言 一个学习嵌入式的小白~ 有问题评论区或私信指出~ 提示:以下是本篇文章正文内容,下面案例可供参考 一、定时器介绍 定时器介绍:51单片机的定时器属于单片机的内部资源,其电路的连接和运转均在单片机内部完成。 定时器作用: 1.用于计数系统,可

问题:第一次世界大战的起止时间是 #其他#学习方法#微信

问题:第一次世界大战的起止时间是 A.1913 ~1918 年 B.1913 ~1918 年 C.1914 ~1918 年 D.1914 ~1919 年 参考答案如图所示

[word] word设置上标快捷键 #学习方法#其他#媒体

word设置上标快捷键 办公中,少不了使用word,这个是大家必备的软件,今天给大家分享word设置上标快捷键,希望在办公中能帮到您! 1、添加上标 在录入一些公式,或者是化学产品时,需要添加上标内容,按下快捷键Ctrl+shift++就能将需要的内容设置为上标符号。 word设置上标快捷键的方法就是以上内容了,需要的小伙伴都可以试一试呢!

AssetBundle学习笔记

AssetBundle是unity自定义的资源格式,通过调用引擎的资源打包接口对资源进行打包成.assetbundle格式的资源包。本文介绍了AssetBundle的生成,使用,加载,卸载以及Unity资源更新的一个基本步骤。 目录 1.定义: 2.AssetBundle的生成: 1)设置AssetBundle包的属性——通过编辑器界面 补充:分组策略 2)调用引擎接口API

计算绕原点旋转某角度后的点的坐标

问题: A点(x, y)按顺时针旋转 theta 角度后点的坐标为A1点(x1,y1)  ,求x1 y1坐标用(x,y)和 theta 来表示 方法一: 设 OA 向量和x轴的角度为 alpha , 那么顺时针转过 theta后 ,OA1 向量和x轴的角度为 (alpha - theta) 。 使用圆的参数方程来表示点坐标。A的坐标可以表示为: \[\left\{ {\begin{ar

Javascript高级程序设计(第四版)--学习记录之变量、内存

原始值与引用值 原始值:简单的数据即基础数据类型,按值访问。 引用值:由多个值构成的对象即复杂数据类型,按引用访问。 动态属性 对于引用值而言,可以随时添加、修改和删除其属性和方法。 let person = new Object();person.name = 'Jason';person.age = 42;console.log(person.name,person.age);//'J

大学湖北中医药大学法医学试题及答案,分享几个实用搜题和学习工具 #微信#学习方法#职场发展

今天分享拥有拍照搜题、文字搜题、语音搜题、多重搜题等搜题模式,可以快速查找问题解析,加深对题目答案的理解。 1.快练题 这是一个网站 找题的网站海量题库,在线搜题,快速刷题~为您提供百万优质题库,直接搜索题库名称,支持多种刷题模式:顺序练习、语音听题、本地搜题、顺序阅读、模拟考试、组卷考试、赶快下载吧! 2.彩虹搜题 这是个老公众号了 支持手写输入,截图搜题,详细步骤,解题必备

《offer来了》第二章学习笔记

1.集合 Java四种集合:List、Queue、Set和Map 1.1.List:可重复 有序的Collection ArrayList: 基于数组实现,增删慢,查询快,线程不安全 Vector: 基于数组实现,增删慢,查询快,线程安全 LinkedList: 基于双向链实现,增删快,查询慢,线程不安全 1.2.Queue:队列 ArrayBlockingQueue:

硬件基础知识——自学习梳理

计算机存储分为闪存和永久性存储。 硬盘(永久存储)主要分为机械磁盘和固态硬盘。 机械磁盘主要靠磁颗粒的正负极方向来存储0或1,且机械磁盘没有使用寿命。 固态硬盘就有使用寿命了,大概支持30w次的读写操作。 闪存使用的是电容进行存储,断电数据就没了。 器件之间传输bit数据在总线上是一个一个传输的,因为通过电压传输(电流不稳定),但是电压属于电势能,所以可以叠加互相干扰,这也就是硬盘,U盘

亮相WOT全球技术创新大会,揭秘火山引擎边缘容器技术在泛CDN场景的应用与实践

2024年6月21日-22日,51CTO“WOT全球技术创新大会2024”在北京举办。火山引擎边缘计算架构师李志明受邀参与,以“边缘容器技术在泛CDN场景的应用和实践”为主题,与多位行业资深专家,共同探讨泛CDN行业技术架构以及云原生与边缘计算的发展和展望。 火山引擎边缘计算架构师李志明表示:为更好地解决传统泛CDN类业务运行中的问题,火山引擎边缘容器团队参考行业做法,结合实践经验,打造火山