akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?

2024-04-09 04:32

本文主要是介绍akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  在讨论lagom之前,先从遇到的需求开始介绍:现代企业的it系统变得越来越多元化、复杂化了。线上、线下各种系统必须用某种方式集成在一起。从各种it系统的基本共性分析:最明显的特征应该是后台数据库的角色了,起码,大家都需要使用数据。另外,每个系统都可能具备大量实时在线用户、海量数据特性,代表着对数据处理能力有极大的要求,预示系统只有通过分布式处理方式才能有效运行。

一个月前开始设计一个企业的it系统,在讨论数据中台时就遇到这样的需求。这个所谓的数据中台的主要作用是为整体系统提供一套统一的数据使用api,前后连接包括web,mobile,desktop的前端系统以及由多种传统及分布式数据库系统,形成一个统一的数据使用接口。实际上,数据库连接不只是简单的读写操作,还需要包括所有实时的数据处理:根据业务要求对数据进行相应的处理然后使用。那么这是一个怎样的系统呢?首先,它必须是分布式的:为了对付大量的前端用户同时调用同一个api,把这个api的功能同时分派到多个服务器上运行是个有效的解决方法。这是个akka-cluster-sharding模式。数据中台api是向所有内部系统以及一些特定的外部第三方系统开放的,用http标准协议支持各系统与数据后台的连接也是合理的。这个akka-http, akka-grpc可以胜任。然后各系统之间的集成可以通过一个流运算工具如kafka实现各聚合根之间的交互连接。

似乎所有需要的工具都齐备了,其中akka占了大部分功能。但有些问题是:基于akka技术栈来编程或多或少有些门槛要求。最起码需要一定程度的akka开发经验。更不用提组织一个开发团队了。如果市面上有个什么能提供相应能力的开发工具,可以轻松快速上手的,那么项目开发就可以立即启动了。

现在来谈谈lagom:lagom是一套scala栈的微服务软件开发工具。从官方文档介绍了解到lagom主要提供了一套服务接口定义及服务功能开发框架。值得一提的是服务功能可以是集群分片模式的。走了一遍lagom的启动示范代码,感觉这是一套集开发、测试、部署为一体的框架(framework)。在这个框架里按照规定开发几个简单的服务api非常顺利,很方便。这让我对使用lagom产生了兴趣,想继续调研一下利用lagoom来开发上面所提及数据中台的可行性。lagom服务接入部分是通过play实现的。play我不太熟悉,想深入了解一下用akka-http替代的可行性,不过看来不太容易。最让我感到失望的是lagom的服务分片(service-sharding)直接就是akka-cluster那一套:cluster、event-sourcing、CQRS什么的都需要自己从头到尾重新编写。用嵌入的kafka进行服务整合与单独用kafka也不会增加太多麻烦。倒是lagom提供的这个集开发、测试、部署为一体的框架在团队开发管理中应该能发挥良好的作用。

在我看来:服务接入方面由于涉及身份验证、使用权限、二进制文件类型数据交换等使用akka-http,akka-grpc会更有控制力。服务功能实现直接就用akka-cluster-sharding,把计算任务分布到各节点上,这个我们前面已经介绍过了。

所以,最后还是决定直接用akka-typed来实现这个数据中台。用了一个多月时间做研发,到现在看来效果不错,能够符合项目要求。下面是一些用akka-typed实现业务集成的过程介绍。首先,系统特点是功能分片:系统按业务条块分成多个片shardregion,每个片里的entity负责处理一项业务的多个功能。多个用户调用一项业务功能代表多个entity分布在不同的集群节点上并行运算。下面是一个业务群的代码示范:

object Shards extends LogSupport {def apply(mgoHosts: List[String],trace: Boolean, keepAlive: FiniteDuration, pocurl: String)(implicit authBase: AuthBase): Behavior[Nothing] = {Behaviors.setup[Nothing] { ctx =>val sharding = ClusterSharding(ctx.system)log.stepOn = truelog.step(s"starting cluster-monitor ...")(MachineId("",""))ctx.spawn(MonitorActor(),"abs-cluster-monitor")log.step(s"initializing sharding for ${Authenticator.EntityKey} ...")(MachineId("",""))val authEntityType = Entity(Authenticator.EntityKey) { entityContext =>Authenticator(entityContext.shard,mgoHosts,trace,keepAlive)}.withStopMessage(Authenticator.StopAuthenticator)sharding.init(authEntityType)log.step(s"initializing sharding for ${CrmWorker.EntityKey} ...")(MachineId("",""))val crmEntityType = Entity(CrmWorker.EntityKey) { entityContext =>CrmWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(CrmWorker.StopWorker)sharding.init(crmEntityType)log.step(s"initializing sharding for ${GateKeeper.EntityKey} ...")(MachineId("",""))val gateEntityType = Entity(GateKeeper.EntityKey) { entityContext =>GateKeeper(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(GateKeeper.StopGateKeeper)sharding.init(gateEntityType)log.step(s"initializing sharding for ${PluWorker.EntityKey} ...")(MachineId("",""))val pluEntityType = Entity(PluWorker.EntityKey) { entityContext =>PluWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(PluWorker.StopWorker)sharding.init(pluEntityType)log.step(s"initializing sharding for ${PocWorker.EntityKey} ...")(MachineId("",""))val pocEntityType = Entity(PocWorker.EntityKey) { entityContext =>PocWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive,pocurl)}.withStopMessage(PocWorker.StopWorker)sharding.init(pocEntityType)Behaviors.empty}}
}

可以看到,不同类型的片以不同的EntityKey来代表。前端接入是基于akka-http的,如下:

object CrmRoute extends LogSupport {def route(entityRef: EntityRef[CrmWorker.Command])(implicit ec: ExecutionContext, jsStreaming: EntityStreamingSupport, timeout: Timeout): akka.http.scaladsl.server.Route = {concat(pathPrefix("ismember") {parameter(Symbol("faceid")) { fid =>val futResp = entityRef.ask[CrmWorker.Response](CrmWorker.IsMemberFace(fid, _)).map {case CrmWorker.ValidMember(memberId) => memberIdcase CrmWorker.InvalidMember(msg) => throw new Exception(msg)}onSuccess(futResp)(complete(_))}},pathPrefix("getmember") {parameter(Symbol("memberid")) { mid =>val futResp = entityRef.ask[CrmWorker.Response](CrmWorker.GetMemberInfo(mid, _)).map {case CrmWorker.MemberInfo(json) => HttpEntity(MediaTypes.`application/json`,json)case CrmWorker.InvalidMemberInfo(msg) => throw new Exception(msg)}onSuccess(futResp)(complete(_))}})}
}

各项业务功能调用通过entityRef.ask发送给了某个用户指定节点上的entity。akka的actor是线程的再细分,即一个actor可能与其它成千上万个actor共享一条线程。所以绝对不容许任何blocking。我是用下面示范的模式来实现non-blocking的:

  def apply(shard: ActorRef[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration): Behavior[Command] = {val (shopId,posId) = entityId.split(':').toList match {case sid::pid::Nil  => (sid,pid) }implicit val loc = Messages.MachineId(shopId,posId)log.stepOn = trace//    Behaviors.supervise(Behaviors.setup[Command] { ctx =>implicit val ec = ctx.executionContextctx.setReceiveTimeout(keepAlive, Idle)Behaviors.withTimers[Command] { timer =>Behaviors.receiveMessage[Command] {case IsMemberFace(fid, replyTo) =>log.step(s"CrmWorker: IsMemberFace($fid)")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(isMemberFace(fid)) {case Success(mid) => {if (mid._1.isEmpty) {replyTo ! InvalidMember(mid._2)Done(loc.shopid, loc.posid, s"IsMemberFace with Error ${mid._2}")} else {replyTo ! ValidMember(mid._1)Done(loc.shopid, loc.posid, s"IsMemberFace.")}}case Failure(err) =>log.error(s"CrmWorker: IsMemberFace Error: ${err.getMessage}")replyTo ! InvalidMember(err.getMessage)Done(loc.shopid, loc.posid, s"IsMemberFace with error: ${err.getMessage}")}Behaviors.samecase GetMemberInfo(mid, replyTo) =>log.step(s"CrmWorker: GetMemberInfo($mid)")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(getMemberInfo(mid)) {case Success(json) => {replyTo ! MemberInfo(json)Done(loc.shopid, loc.posid, s"GetMemberInfo with json ${json}")}case Failure(err) =>log.error(s"CrmWorker: GetMemberInfo Error: ${err.getMessage}")replyTo ! InvalidMemberInfo(err.getMessage)Done(loc.shopid, loc.posid, s"GetMemberInfo with error: ${err.getMessage}")}Behaviors.samecase Idle =>// after receive timeoutshard ! ClusterSharding.Passivate(ctx.self)Behaviors.samecase StopWorker =>Behaviors.stopped(() => log.step(s"CrmWorker: {$shopId,$posId} passivated to stop.")(MachineId(shopId, posId)))case Done(shopid, termid, work) =>if (maybeMgoClient.isDefined)maybeMgoClient.get.close()log.step(s"CrmWorker: {$shopid,$termid} finished $work")(MachineId(shopid,termid))Behaviors.samecase _ => Behaviors.same}.receiveSignal {case (_,PostStop) =>log.step(s"CrmWorker: {$shopId,$posId} stopped.")(MachineId(shopId, posId))Behaviors.same}}}//   ).onFailure(SupervisorStrategy.restart)}

主要是使用ctx.pipeToSelf(work)把一个Future转换成内部消息。这里的work的实现最终必须返回Future类型,如下面的示范:

object CrmServices extends JsonConverter with LogSupport {import MgoHelpers._def validMember(docs: Seq[Document], faceid: String): Future[(String,String)] = {val memberId: (String, String) = docs match {case Nil => ("", s"faceid[$faceid]不存在!")case docs =>val member = MemberInfo.fromDocument(docs.head)if (member.expireDt.compareTo(mgoDateTimeNow) < 0)("", s"会员:${member.memberId}-${member.memberName}会籍已过期!")else(member.memberId, "")}FastFuture.successful(memberId)}def isMemberFace(faceid: String)(implicit mgoClient: MongoClient, ec: ExecutionContext): Future[(String,String)] = {implicit val db = mgoClient.getDatabase(CrmModels.SCHEMA.DBNAME)val col = db.getCollection(CrmModels.SCHEMA.MEMBERINFO)val memberInfo: Future[Seq[Document]] = col.find(equal(SCHEMA.FACEID,faceid)).toFuture()for {mi <- memberInfo(id,msg) <- validMember(mi,faceid)} yield (id,msg)}def getMemberInfo(memberid: String)(implicit mgoClient: MongoClient, ec: ExecutionContext): Future[String] = {implicit val db = mgoClient.getDatabase(CrmModels.SCHEMA.DBNAME)val col = db.getCollection(CrmModels.SCHEMA.MEMBERINFO)val memberInfo: Future[Seq[Document]] = col.find(equal(SCHEMA.MEMBERID,memberid)).toFuture()for {docs <- memberInfojstr <- FastFuture.successful(if(docs.isEmpty) "" else toJson(MemberInfo.fromDocument(docs.head)))} yield jstr}}

另外,由于每个用户第一次调用一项业务功能时akka-cluster-shardregion都会自动在某个节点上构建一个新的entity,如果上万个用户使用过某个功能,那么就会有万个entity及其所占用的资源如mongodb客户端等停留在内存里。所以在完成一项功能运算后应关闭entity,释放占用的资源。这个是通过shard ! ClusterSharding.passivate(ctx.self)实现的。

这篇关于akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887177

相关文章

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

业务中14个需要进行A/B测试的时刻[信息图]

在本指南中,我们将全面了解有关 A/B测试 的所有内容。 我们将介绍不同类型的A/B测试,如何有效地规划和启动测试,如何评估测试是否成功,您应该关注哪些指标,多年来我们发现的常见错误等等。 什么是A/B测试? A/B测试(有时称为“分割测试”)是一种实验类型,其中您创建两种或多种内容变体——如登录页面、电子邮件或广告——并将它们显示给不同的受众群体,以查看哪一种效果最好。 本质上,A/B测

业务协同平台--简介

一、使用场景         1.多个系统统一在业务协同平台定义协同策略,由业务协同平台代替人工完成一系列的单据录入         2.同时业务协同平台将执行任务推送给pda、pad等执行终端,通知各人员、设备进行作业执行         3.作业过程中,可设置完成时间预警、作业节点通知,时刻了解作业进程         4.做完再给你做过程分析,给出优化建议         就问你这一套下

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等

springboot整合swagger2之最佳实践

来源:https://blog.lqdev.cn/2018/07/21/springboot/chapter-ten/ Swagger是一款RESTful接口的文档在线自动生成、功能测试功能框架。 一个规范和完整的框架,用于生成、描述、调用和可视化RESTful风格的Web服务,加上swagger-ui,可以有很好的呈现。 SpringBoot集成 pom <!--swagge

springboot 整合swagger

没有多余废话,就是干 spring-boot 2.7.8 springfox-boot-starter 3.0.0 结构 POM.xml <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/

深入解析秒杀业务中的核心问题 —— 从并发控制到事务管理

深入解析秒杀业务中的核心问题 —— 从并发控制到事务管理 秒杀系统是应对高并发、高压力下的典型业务场景,涉及到并发控制、库存管理、事务管理等多个关键技术点。本文将深入剖析秒杀商品业务中常见的几个核心问题,包括 AOP 事务管理、同步锁机制、乐观锁、CAS 操作,以及用户限购策略。通过这些技术的结合,确保秒杀系统在高并发场景下的稳定性和一致性。 1. AOP 代理对象与事务管理 在秒杀商品

Vue2电商项目(二) Home模块的开发;(还需要补充js节流和防抖的回顾链接)

文章目录 一、Home模块拆分1. 三级联动组件TypeNav2. 其余组件 二、发送请求的准备工作1. axios的二次封装2. 统一管理接口API----跨域3. nprogress进度条 三、 vuex模块开发四、TypeNav三级联动组件开发1. 动态展示三级联动数据2. 三级联动 动态背景(1)、方式一:CSS样式(2)、方式二:JS 3. 控制二三级数据隐藏与显示--绑定styl

使用WebP解决网站加载速度问题,这些细节你需要了解

说到网页的图片格式,大家最常想到的可能是JPEG、PNG,毕竟这些老牌格式陪伴我们这么多年。然而,近几年,有一个格式悄悄崭露头角,那就是WebP。很多人可能听说过,但到底它好在哪?你的网站或者项目是不是也应该用WebP呢?别着急,今天咱们就来好好聊聊WebP这个图片格式的前世今生,以及它值不值得你花时间去用。 为什么会有WebP? 你有没有遇到过这样的情况?网页加载特别慢,尤其是那

uniapp,vite整合windicss

官方文档:https://weapp-tw.icebreaker.top/docs/quick-start/frameworks/hbuilderx 安装: npm i -D tailwindcss postcss autoprefixer# 初始化 tailwind.config.js 文件npx tailwindcss initnpm i -D weapp-tailwindcss# 假