akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?

2024-04-09 04:32

本文主要是介绍akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  在讨论lagom之前,先从遇到的需求开始介绍:现代企业的it系统变得越来越多元化、复杂化了。线上、线下各种系统必须用某种方式集成在一起。从各种it系统的基本共性分析:最明显的特征应该是后台数据库的角色了,起码,大家都需要使用数据。另外,每个系统都可能具备大量实时在线用户、海量数据特性,代表着对数据处理能力有极大的要求,预示系统只有通过分布式处理方式才能有效运行。

一个月前开始设计一个企业的it系统,在讨论数据中台时就遇到这样的需求。这个所谓的数据中台的主要作用是为整体系统提供一套统一的数据使用api,前后连接包括web,mobile,desktop的前端系统以及由多种传统及分布式数据库系统,形成一个统一的数据使用接口。实际上,数据库连接不只是简单的读写操作,还需要包括所有实时的数据处理:根据业务要求对数据进行相应的处理然后使用。那么这是一个怎样的系统呢?首先,它必须是分布式的:为了对付大量的前端用户同时调用同一个api,把这个api的功能同时分派到多个服务器上运行是个有效的解决方法。这是个akka-cluster-sharding模式。数据中台api是向所有内部系统以及一些特定的外部第三方系统开放的,用http标准协议支持各系统与数据后台的连接也是合理的。这个akka-http, akka-grpc可以胜任。然后各系统之间的集成可以通过一个流运算工具如kafka实现各聚合根之间的交互连接。

似乎所有需要的工具都齐备了,其中akka占了大部分功能。但有些问题是:基于akka技术栈来编程或多或少有些门槛要求。最起码需要一定程度的akka开发经验。更不用提组织一个开发团队了。如果市面上有个什么能提供相应能力的开发工具,可以轻松快速上手的,那么项目开发就可以立即启动了。

现在来谈谈lagom:lagom是一套scala栈的微服务软件开发工具。从官方文档介绍了解到lagom主要提供了一套服务接口定义及服务功能开发框架。值得一提的是服务功能可以是集群分片模式的。走了一遍lagom的启动示范代码,感觉这是一套集开发、测试、部署为一体的框架(framework)。在这个框架里按照规定开发几个简单的服务api非常顺利,很方便。这让我对使用lagom产生了兴趣,想继续调研一下利用lagoom来开发上面所提及数据中台的可行性。lagom服务接入部分是通过play实现的。play我不太熟悉,想深入了解一下用akka-http替代的可行性,不过看来不太容易。最让我感到失望的是lagom的服务分片(service-sharding)直接就是akka-cluster那一套:cluster、event-sourcing、CQRS什么的都需要自己从头到尾重新编写。用嵌入的kafka进行服务整合与单独用kafka也不会增加太多麻烦。倒是lagom提供的这个集开发、测试、部署为一体的框架在团队开发管理中应该能发挥良好的作用。

在我看来:服务接入方面由于涉及身份验证、使用权限、二进制文件类型数据交换等使用akka-http,akka-grpc会更有控制力。服务功能实现直接就用akka-cluster-sharding,把计算任务分布到各节点上,这个我们前面已经介绍过了。

所以,最后还是决定直接用akka-typed来实现这个数据中台。用了一个多月时间做研发,到现在看来效果不错,能够符合项目要求。下面是一些用akka-typed实现业务集成的过程介绍。首先,系统特点是功能分片:系统按业务条块分成多个片shardregion,每个片里的entity负责处理一项业务的多个功能。多个用户调用一项业务功能代表多个entity分布在不同的集群节点上并行运算。下面是一个业务群的代码示范:

object Shards extends LogSupport {def apply(mgoHosts: List[String],trace: Boolean, keepAlive: FiniteDuration, pocurl: String)(implicit authBase: AuthBase): Behavior[Nothing] = {Behaviors.setup[Nothing] { ctx =>val sharding = ClusterSharding(ctx.system)log.stepOn = truelog.step(s"starting cluster-monitor ...")(MachineId("",""))ctx.spawn(MonitorActor(),"abs-cluster-monitor")log.step(s"initializing sharding for ${Authenticator.EntityKey} ...")(MachineId("",""))val authEntityType = Entity(Authenticator.EntityKey) { entityContext =>Authenticator(entityContext.shard,mgoHosts,trace,keepAlive)}.withStopMessage(Authenticator.StopAuthenticator)sharding.init(authEntityType)log.step(s"initializing sharding for ${CrmWorker.EntityKey} ...")(MachineId("",""))val crmEntityType = Entity(CrmWorker.EntityKey) { entityContext =>CrmWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(CrmWorker.StopWorker)sharding.init(crmEntityType)log.step(s"initializing sharding for ${GateKeeper.EntityKey} ...")(MachineId("",""))val gateEntityType = Entity(GateKeeper.EntityKey) { entityContext =>GateKeeper(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(GateKeeper.StopGateKeeper)sharding.init(gateEntityType)log.step(s"initializing sharding for ${PluWorker.EntityKey} ...")(MachineId("",""))val pluEntityType = Entity(PluWorker.EntityKey) { entityContext =>PluWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(PluWorker.StopWorker)sharding.init(pluEntityType)log.step(s"initializing sharding for ${PocWorker.EntityKey} ...")(MachineId("",""))val pocEntityType = Entity(PocWorker.EntityKey) { entityContext =>PocWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive,pocurl)}.withStopMessage(PocWorker.StopWorker)sharding.init(pocEntityType)Behaviors.empty}}
}

可以看到,不同类型的片以不同的EntityKey来代表。前端接入是基于akka-http的,如下:

object CrmRoute extends LogSupport {def route(entityRef: EntityRef[CrmWorker.Command])(implicit ec: ExecutionContext, jsStreaming: EntityStreamingSupport, timeout: Timeout): akka.http.scaladsl.server.Route = {concat(pathPrefix("ismember") {parameter(Symbol("faceid")) { fid =>val futResp = entityRef.ask[CrmWorker.Response](CrmWorker.IsMemberFace(fid, _)).map {case CrmWorker.ValidMember(memberId) => memberIdcase CrmWorker.InvalidMember(msg) => throw new Exception(msg)}onSuccess(futResp)(complete(_))}},pathPrefix("getmember") {parameter(Symbol("memberid")) { mid =>val futResp = entityRef.ask[CrmWorker.Response](CrmWorker.GetMemberInfo(mid, _)).map {case CrmWorker.MemberInfo(json) => HttpEntity(MediaTypes.`application/json`,json)case CrmWorker.InvalidMemberInfo(msg) => throw new Exception(msg)}onSuccess(futResp)(complete(_))}})}
}

各项业务功能调用通过entityRef.ask发送给了某个用户指定节点上的entity。akka的actor是线程的再细分,即一个actor可能与其它成千上万个actor共享一条线程。所以绝对不容许任何blocking。我是用下面示范的模式来实现non-blocking的:

  def apply(shard: ActorRef[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration): Behavior[Command] = {val (shopId,posId) = entityId.split(':').toList match {case sid::pid::Nil  => (sid,pid) }implicit val loc = Messages.MachineId(shopId,posId)log.stepOn = trace//    Behaviors.supervise(Behaviors.setup[Command] { ctx =>implicit val ec = ctx.executionContextctx.setReceiveTimeout(keepAlive, Idle)Behaviors.withTimers[Command] { timer =>Behaviors.receiveMessage[Command] {case IsMemberFace(fid, replyTo) =>log.step(s"CrmWorker: IsMemberFace($fid)")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(isMemberFace(fid)) {case Success(mid) => {if (mid._1.isEmpty) {replyTo ! InvalidMember(mid._2)Done(loc.shopid, loc.posid, s"IsMemberFace with Error ${mid._2}")} else {replyTo ! ValidMember(mid._1)Done(loc.shopid, loc.posid, s"IsMemberFace.")}}case Failure(err) =>log.error(s"CrmWorker: IsMemberFace Error: ${err.getMessage}")replyTo ! InvalidMember(err.getMessage)Done(loc.shopid, loc.posid, s"IsMemberFace with error: ${err.getMessage}")}Behaviors.samecase GetMemberInfo(mid, replyTo) =>log.step(s"CrmWorker: GetMemberInfo($mid)")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(getMemberInfo(mid)) {case Success(json) => {replyTo ! MemberInfo(json)Done(loc.shopid, loc.posid, s"GetMemberInfo with json ${json}")}case Failure(err) =>log.error(s"CrmWorker: GetMemberInfo Error: ${err.getMessage}")replyTo ! InvalidMemberInfo(err.getMessage)Done(loc.shopid, loc.posid, s"GetMemberInfo with error: ${err.getMessage}")}Behaviors.samecase Idle =>// after receive timeoutshard ! ClusterSharding.Passivate(ctx.self)Behaviors.samecase StopWorker =>Behaviors.stopped(() => log.step(s"CrmWorker: {$shopId,$posId} passivated to stop.")(MachineId(shopId, posId)))case Done(shopid, termid, work) =>if (maybeMgoClient.isDefined)maybeMgoClient.get.close()log.step(s"CrmWorker: {$shopid,$termid} finished $work")(MachineId(shopid,termid))Behaviors.samecase _ => Behaviors.same}.receiveSignal {case (_,PostStop) =>log.step(s"CrmWorker: {$shopId,$posId} stopped.")(MachineId(shopId, posId))Behaviors.same}}}//   ).onFailure(SupervisorStrategy.restart)}

主要是使用ctx.pipeToSelf(work)把一个Future转换成内部消息。这里的work的实现最终必须返回Future类型,如下面的示范:

object CrmServices extends JsonConverter with LogSupport {import MgoHelpers._def validMember(docs: Seq[Document], faceid: String): Future[(String,String)] = {val memberId: (String, String) = docs match {case Nil => ("", s"faceid[$faceid]不存在!")case docs =>val member = MemberInfo.fromDocument(docs.head)if (member.expireDt.compareTo(mgoDateTimeNow) < 0)("", s"会员:${member.memberId}-${member.memberName}会籍已过期!")else(member.memberId, "")}FastFuture.successful(memberId)}def isMemberFace(faceid: String)(implicit mgoClient: MongoClient, ec: ExecutionContext): Future[(String,String)] = {implicit val db = mgoClient.getDatabase(CrmModels.SCHEMA.DBNAME)val col = db.getCollection(CrmModels.SCHEMA.MEMBERINFO)val memberInfo: Future[Seq[Document]] = col.find(equal(SCHEMA.FACEID,faceid)).toFuture()for {mi <- memberInfo(id,msg) <- validMember(mi,faceid)} yield (id,msg)}def getMemberInfo(memberid: String)(implicit mgoClient: MongoClient, ec: ExecutionContext): Future[String] = {implicit val db = mgoClient.getDatabase(CrmModels.SCHEMA.DBNAME)val col = db.getCollection(CrmModels.SCHEMA.MEMBERINFO)val memberInfo: Future[Seq[Document]] = col.find(equal(SCHEMA.MEMBERID,memberid)).toFuture()for {docs <- memberInfojstr <- FastFuture.successful(if(docs.isEmpty) "" else toJson(MemberInfo.fromDocument(docs.head)))} yield jstr}}

另外,由于每个用户第一次调用一项业务功能时akka-cluster-shardregion都会自动在某个节点上构建一个新的entity,如果上万个用户使用过某个功能,那么就会有万个entity及其所占用的资源如mongodb客户端等停留在内存里。所以在完成一项功能运算后应关闭entity,释放占用的资源。这个是通过shard ! ClusterSharding.passivate(ctx.self)实现的。

这篇关于akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887177

相关文章

springboot整合 xxl-job及使用步骤

《springboot整合xxl-job及使用步骤》XXL-JOB是一个分布式任务调度平台,用于解决分布式系统中的任务调度和管理问题,文章详细介绍了XXL-JOB的架构,包括调度中心、执行器和Web... 目录一、xxl-job是什么二、使用步骤1. 下载并运行管理端代码2. 访问管理页面,确认是否启动成功

SpringBoot整合kaptcha验证码过程(复制粘贴即可用)

《SpringBoot整合kaptcha验证码过程(复制粘贴即可用)》本文介绍了如何在SpringBoot项目中整合Kaptcha验证码实现,通过配置和编写相应的Controller、工具类以及前端页... 目录SpringBoot整合kaptcha验证码程序目录参考有两种方式在springboot中使用k

Spring Boot 中整合 MyBatis-Plus详细步骤(最新推荐)

《SpringBoot中整合MyBatis-Plus详细步骤(最新推荐)》本文详细介绍了如何在SpringBoot项目中整合MyBatis-Plus,包括整合步骤、基本CRUD操作、分页查询、批... 目录一、整合步骤1. 创建 Spring Boot 项目2. 配置项目依赖3. 配置数据源4. 创建实体类

SpringBoot整合InfluxDB的详细过程

《SpringBoot整合InfluxDB的详细过程》InfluxDB是一个开源的时间序列数据库,由Go语言编写,适用于存储和查询按时间顺序产生的数据,它具有高效的数据存储和查询机制,支持高并发写入和... 目录一、简单介绍InfluxDB是什么?1、主要特点2、应用场景二、使用步骤1、集成原生的Influ

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

业务中14个需要进行A/B测试的时刻[信息图]

在本指南中,我们将全面了解有关 A/B测试 的所有内容。 我们将介绍不同类型的A/B测试,如何有效地规划和启动测试,如何评估测试是否成功,您应该关注哪些指标,多年来我们发现的常见错误等等。 什么是A/B测试? A/B测试(有时称为“分割测试”)是一种实验类型,其中您创建两种或多种内容变体——如登录页面、电子邮件或广告——并将它们显示给不同的受众群体,以查看哪一种效果最好。 本质上,A/B测

业务协同平台--简介

一、使用场景         1.多个系统统一在业务协同平台定义协同策略,由业务协同平台代替人工完成一系列的单据录入         2.同时业务协同平台将执行任务推送给pda、pad等执行终端,通知各人员、设备进行作业执行         3.作业过程中,可设置完成时间预警、作业节点通知,时刻了解作业进程         4.做完再给你做过程分析,给出优化建议         就问你这一套下

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等

springboot整合swagger2之最佳实践

来源:https://blog.lqdev.cn/2018/07/21/springboot/chapter-ten/ Swagger是一款RESTful接口的文档在线自动生成、功能测试功能框架。 一个规范和完整的框架,用于生成、描述、调用和可视化RESTful风格的Web服务,加上swagger-ui,可以有很好的呈现。 SpringBoot集成 pom <!--swagge