akka-typed(8) - CQRS读写分离模式

2024-04-09 04:32

本文主要是介绍akka-typed(8) - CQRS读写分离模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   前面介绍了事件源(EventSource)和集群(cluster),现在到了讨论CQRS的时候了。CQRS即读写分离模式,由独立的写方程序和读方程序组成,具体原理在以前的博客里介绍过了。akka-typed应该自然支持CQRS模式,最起码本身提供了对写方编程的支持,这点从EventSourcedBehavior 可以知道。akka-typed提供了新的EventSourcedBehavior-Actor,极大方便了对persistentActor的应用开发,但同时也给编程者造成了一些限制。如手工改变状态会更困难了、EventSourcedBehavior不支持多层式的persist,也就是说通过persist某些特定的event然后在event-handler程序里进行状态处理是不可能的了。我这里有个例子,是个购物车应用:当完成支付后需要取个快照(snapshot),下面是这个snapshot的代码:

       snapshotWhen {(state,evt,seqNr) => CommandHandler.takeSnapshot(state,evt,seqNr)}
...def takeSnapshot(state: Voucher, evt: Events.Action, lstSeqNr: Long)(implicit pid: PID) = {if (evt.isInstanceOf[Events.PaymentMade]|| evt.isInstanceOf[Events.VoidVoucher.type]|| evt.isInstanceOf[Events.SuspVoucher.type])if (state.items.isEmpty) {log.step(s"#${state.header.num} taking snapshot at [$lstSeqNr] ...")true} elsefalseelsefalse}

判断event类型是没有问题的,因为正是当前的事件,但另一个条件是购物车必须是清空了的。这个有点为难,因为这个状态要依赖这几个event运算的结果才能确定,也就是下一步,但确定结果又需要对购物车内容进行计算,好像是个死循环。在akka-classic里我们可以在判断了event运算结果后,如果需要改变状态就再persist一个特殊的event,然后在这个event的handler进行状态处理。没办法,EventSourcedBehavior不支持多层persist,只有这样做:

      case PaymentMade(acct, dpt, num, ref,amount) =>...writerInternal.lastVoucher = Voucher(vchs, vItems)endVoucher(Voucher(vchs,vItems),TXNTYPE.sales)Voucher(vchs.nextVoucher, List())...   

我只能先吧当前状态保存下来、进行结单运算、然后清空购物车,这样snapshot就可以顺利进行了。

好了,akka的读方编程是通过PersistentQuery实现的。reader的作用就是把event从数据库读出来后再恢复成具体的数据格式。我们从reader的调用了解一下这个应用里reader的实现细节:

    val readerShard = writerInternal.optSharding.get   val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId")readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass)

可以看到这个reader是一个集群分片,sharding-entity。想法是每单完成购买后发个消息给一个entity、这个entity再完成reader功能后自动终止,立即释放出占用的资源。reader-actor的定义如下:

object POSReader extends LogSupport {val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("POSReader")def apply(nodeAddress: String, trace: Boolean): Behavior[Command] = {log.stepOn = traceimplicit var pid: PID = PID("","")Behaviors.supervise(Behaviors.setup[Command] { ctx =>Behaviors.withTimers { timer =>implicit val ec = ctx.executionContextBehaviors.receiveMessage {case PerformRead(shopid, posid, vchnum, opr, bseq, eseq, txntype, xurl, xacct, xpass) =>pid = PID(shopid, posid)log.step(s"POSReader: PerformRead($shopid,$posid,$vchnum,$opr,$bseq,$eseq,$txntype,$xurl,$xacct,$xpass)")(PID(shopid, posid))val futReadSaveNExport = for {txnitems <- ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype)_ <- ExportTxns.exportTxns(xurl, xacct, xpass, vchnum, txntype == Events.TXNTYPE.suspend,{ if(txntype == Events.TXNTYPE.voidall)txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall))else txnitems },trace)(ctx.system.toClassic, pid)} yield ()ctx.pipeToSelf(futReadSaveNExport) {case Success(_) => {timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds)StopReader}case Failure(err) =>log.error(s"POSReader:  Error: ${err.getMessage}")timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds)StopReader}Behaviors.samecase StopReader =>Behaviors.samecase ReaderFinish(shopid, posid, vchnum) =>Behaviors.stopped(() => log.step(s"POSReader: {$shopid,$posid} finish reading voucher#$vchnum and stopped")(PID(shopid, posid)))}}}).onFailure(SupervisorStrategy.restart)}

reader就是一个普通的actor。值得注意的是读方程序可能是一个庞大复杂的程序,肯定需要分割成多个模块,所以我们可以按照流程顺序进行模块功能切分:这样下面的模块可能会需要上面模块产生的结果才能继续。记住,在actor中绝对避免阻塞线程,所有的模块都返回Future, 然后用for-yield串起来。上面我们用了ctx.pipeToSelf 在Future运算完成后发送ReaderFinish消息给自己,通知自己停止。

在这个例子里我们把reader任务分成:

1、从数据库读取事件

2、事件重演一次产生状态数据(购物车内容)

3、将形成的购物车内容作为交易单据项目存入数据库

4、向用户提供的restapi输出交易数据

event读取是通过cassandra-persistence-plugin实现的:

    val query =PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)// issue query to journalval source: Source[EventEnvelope, NotUsed] =query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq)// materialize stream, consuming eventsval readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }

这部分比较简单:定义一个PersistenceQuery,用它产生一个Source,然后run这个Source获取Future[List[Any]]。

重演事件产生交易数据:

    def buildVoucher(actions: List[Any]): List[TxnItem] = {log.step(s"POSReader: read actions: $actions")val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided])val listOfActions = onlytxns.reverse zip (LazyList from 1)   //zipWithIndexlistOfActions.foreach { case (txn,idx) =>txn.asInstanceOf[Action] match {case Voided(_) =>case ti@_ =>curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr)if(voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) {curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr=cshr)log.step(s"POSReader: voided txnitem: $curTxnItem")}val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true)vchState = vch.headervchItems = vch.txnItemslog.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}")}}log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}")vchItems.txnitems}

重演List[Event],产生了List[TxnItem]。

向数据库里写List[TxnItem]:

  def writeTxnsToDB(vchnum: Int, txntype: Int, bseq: Long, eseq: Long, txns: List[TxnItem])(implicit system: akka.actor.ActorSystem, session: CassandraSession, pid: PID): Future[Seq[TxnItem]] = ???

注意返回结果类型Future[Seq[TxnItem]]。我们用for-yield把这几个动作串起来:

    val txnitems: Future[List[Events.TxnItem]] = for {lst1 <- readActions    //read list from SourcelstTxns <- if (lst1.length < (endSeq -startSeq))    //if imcomplete list read againreadActionselse FastFuture.successful(lst1)items <- FastFuture.successful( buildVoucher(lstTxns) )_ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items)_ <- session.close(ec)} yield items

注意:这个for返回的Future[List[TxnItem]],是提供给restapi输出功能的。在那里List[TxnItem]会被转换成json作为post的包嵌数据。

现在所有子任务的返回结果类型都是Future了。我们可以再用for来把它们串起来:

             val futReadSaveNExport = for {txnitems <- ActionReader.readActions(ctx, vchnum, opr, bseq, eseq, trace, nodeAddress, shopid, posid, txntype)_ <- ExportTxns.exportTxns(xurl, xacct, xpass, vchnum, txntype == Events.TXNTYPE.suspend,{ if(txntype == Events.TXNTYPE.voidall)txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall))else txnitems },trace)(ctx.system.toClassic, pid)} yield ()

说到EventSourcedBehavior,因为用了cassandra-plugin,忽然想起配置文件里新旧有很大区别。现在这个application.conf是这样的:

akka {loglevel = INFOactor {provider = clusterserialization-bindings {"com.datatech.pos.cloud.CborSerializable" = jackson-cbor}}remote {artery {canonical.hostname = "192.168.11.189"canonical.port = 0}}cluster {seed-nodes = ["akka://cloud-pos-server@192.168.11.189:2551"]sharding {passivate-idle-entity-after = 5 m}}# use Cassandra to store both snapshots and the events of the persistent actorspersistence {journal.plugin = "akka.persistence.cassandra.journal"snapshot-store.plugin = "akka.persistence.cassandra.snapshot"}
}
akka.persistence.cassandra {# don't use autocreate in productionjournal.keyspace = "poc2g"journal.keyspace-autocreate = onjournal.tables-autocreate = onsnapshot.keyspace = "poc2g_snapshot"snapshot.keyspace-autocreate = onsnapshot.tables-autocreate = on
}datastax-java-driver {basic.contact-points = ["192.168.11.189:9042"]basic.load-balancing-policy.local-datacenter = "datacenter1"
}

akka.persitence.cassandra段落里可以定义keyspace名称,这样新旧版本应用可以共用一个cassandra,同时在线。

 

 

这篇关于akka-typed(8) - CQRS读写分离模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887176

相关文章

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

10. 文件的读写

10.1 文本文件 操作文件三大类: ofstream:写操作ifstream:读操作fstream:读写操作 打开方式解释ios::in为了读文件而打开文件ios::out为了写文件而打开文件,如果当前文件存在则清空当前文件在写入ios::app追加方式写文件ios::trunc如果文件存在先删除,在创建ios::ate打开文件之后令读写位置移至文件尾端ios::binary二进制方式

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

计算机毕业设计 大学志愿填报系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点赞 👍 收藏 ⭐评论 📝 🍅 文末获取源码联系 👇🏻 精彩专栏推荐订阅 👇🏻 不然下次找不到哟~Java毕业设计项目~热门选题推荐《1000套》 目录 1.技术选型 2.开发工具 3.功能

【STM32】SPI通信-软件与硬件读写SPI

SPI通信-软件与硬件读写SPI 软件SPI一、SPI通信协议1、SPI通信2、硬件电路3、移位示意图4、SPI时序基本单元(1)开始通信和结束通信(2)模式0---用的最多(3)模式1(4)模式2(5)模式3 5、SPI时序(1)写使能(2)指定地址写(3)指定地址读 二、W25Q64模块介绍1、W25Q64简介2、硬件电路3、W25Q64框图4、Flash操作注意事项软件SPI读写W2

模版方法模式template method

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/template-method 超类中定义了一个算法的框架, 允许子类在不修改结构的情况下重写算法的特定步骤。 上层接口有默认实现的方法和子类需要自己实现的方法

【iOS】MVC模式

MVC模式 MVC模式MVC模式demo MVC模式 MVC模式全称为model(模型)view(视图)controller(控制器),他分为三个不同的层分别负责不同的职责。 View:该层用于存放视图,该层中我们可以对页面及控件进行布局。Model:模型一般都拥有很好的可复用性,在该层中,我们可以统一管理一些数据。Controlller:该层充当一个CPU的功能,即该应用程序

迭代器模式iterator

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/iterator 不暴露集合底层表现形式 (列表、 栈和树等) 的情况下遍历集合中所有的元素

《x86汇编语言:从实模式到保护模式》视频来了

《x86汇编语言:从实模式到保护模式》视频来了 很多朋友留言,说我的专栏《x86汇编语言:从实模式到保护模式》写得很详细,还有的朋友希望我能写得更细,最好是覆盖全书的所有章节。 毕竟我不是作者,只有作者的解读才是最权威的。 当初我学习这本书的时候,只能靠自己摸索,网上搜不到什么好资源。 如果你正在学这本书或者汇编语言,那你有福气了。 本书作者李忠老师,以此书为蓝本,录制了全套视频。 试

利用命令模式构建高效的手游后端架构

在现代手游开发中,后端架构的设计对于支持高并发、快速迭代和复杂游戏逻辑至关重要。命令模式作为一种行为设计模式,可以有效地解耦请求的发起者与接收者,提升系统的可维护性和扩展性。本文将深入探讨如何利用命令模式构建一个强大且灵活的手游后端架构。 1. 命令模式的概念与优势 命令模式通过将请求封装为对象,使得请求的发起者和接收者之间的耦合度降低。这种模式的主要优势包括: 解耦请求发起者与处理者