akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?

2024-04-09 04:32

本文主要是介绍akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  在讨论lagom之前,先从遇到的需求开始介绍:现代企业的it系统变得越来越多元化、复杂化了。线上、线下各种系统必须用某种方式集成在一起。从各种it系统的基本共性分析:最明显的特征应该是后台数据库的角色了,起码,大家都需要使用数据。另外,每个系统都可能具备大量实时在线用户、海量数据特性,代表着对数据处理能力有极大的要求,预示系统只有通过分布式处理方式才能有效运行。

一个月前开始设计一个企业的it系统,在讨论数据中台时就遇到这样的需求。这个所谓的数据中台的主要作用是为整体系统提供一套统一的数据使用api,前后连接包括web,mobile,desktop的前端系统以及由多种传统及分布式数据库系统,形成一个统一的数据使用接口。实际上,数据库连接不只是简单的读写操作,还需要包括所有实时的数据处理:根据业务要求对数据进行相应的处理然后使用。那么这是一个怎样的系统呢?首先,它必须是分布式的:为了对付大量的前端用户同时调用同一个api,把这个api的功能同时分派到多个服务器上运行是个有效的解决方法。这是个akka-cluster-sharding模式。数据中台api是向所有内部系统以及一些特定的外部第三方系统开放的,用http标准协议支持各系统与数据后台的连接也是合理的。这个akka-http, akka-grpc可以胜任。然后各系统之间的集成可以通过一个流运算工具如kafka实现各聚合根之间的交互连接。

似乎所有需要的工具都齐备了,其中akka占了大部分功能。但有些问题是:基于akka技术栈来编程或多或少有些门槛要求。最起码需要一定程度的akka开发经验。更不用提组织一个开发团队了。如果市面上有个什么能提供相应能力的开发工具,可以轻松快速上手的,那么项目开发就可以立即启动了。

现在来谈谈lagom:lagom是一套scala栈的微服务软件开发工具。从官方文档介绍了解到lagom主要提供了一套服务接口定义及服务功能开发框架。值得一提的是服务功能可以是集群分片模式的。走了一遍lagom的启动示范代码,感觉这是一套集开发、测试、部署为一体的框架(framework)。在这个框架里按照规定开发几个简单的服务api非常顺利,很方便。这让我对使用lagom产生了兴趣,想继续调研一下利用lagoom来开发上面所提及数据中台的可行性。lagom服务接入部分是通过play实现的。play我不太熟悉,想深入了解一下用akka-http替代的可行性,不过看来不太容易。最让我感到失望的是lagom的服务分片(service-sharding)直接就是akka-cluster那一套:cluster、event-sourcing、CQRS什么的都需要自己从头到尾重新编写。用嵌入的kafka进行服务整合与单独用kafka也不会增加太多麻烦。倒是lagom提供的这个集开发、测试、部署为一体的框架在团队开发管理中应该能发挥良好的作用。

在我看来:服务接入方面由于涉及身份验证、使用权限、二进制文件类型数据交换等使用akka-http,akka-grpc会更有控制力。服务功能实现直接就用akka-cluster-sharding,把计算任务分布到各节点上,这个我们前面已经介绍过了。

所以,最后还是决定直接用akka-typed来实现这个数据中台。用了一个多月时间做研发,到现在看来效果不错,能够符合项目要求。下面是一些用akka-typed实现业务集成的过程介绍。首先,系统特点是功能分片:系统按业务条块分成多个片shardregion,每个片里的entity负责处理一项业务的多个功能。多个用户调用一项业务功能代表多个entity分布在不同的集群节点上并行运算。下面是一个业务群的代码示范:

object Shards extends LogSupport {def apply(mgoHosts: List[String],trace: Boolean, keepAlive: FiniteDuration, pocurl: String)(implicit authBase: AuthBase): Behavior[Nothing] = {Behaviors.setup[Nothing] { ctx =>val sharding = ClusterSharding(ctx.system)log.stepOn = truelog.step(s"starting cluster-monitor ...")(MachineId("",""))ctx.spawn(MonitorActor(),"abs-cluster-monitor")log.step(s"initializing sharding for ${Authenticator.EntityKey} ...")(MachineId("",""))val authEntityType = Entity(Authenticator.EntityKey) { entityContext =>Authenticator(entityContext.shard,mgoHosts,trace,keepAlive)}.withStopMessage(Authenticator.StopAuthenticator)sharding.init(authEntityType)log.step(s"initializing sharding for ${CrmWorker.EntityKey} ...")(MachineId("",""))val crmEntityType = Entity(CrmWorker.EntityKey) { entityContext =>CrmWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(CrmWorker.StopWorker)sharding.init(crmEntityType)log.step(s"initializing sharding for ${GateKeeper.EntityKey} ...")(MachineId("",""))val gateEntityType = Entity(GateKeeper.EntityKey) { entityContext =>GateKeeper(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(GateKeeper.StopGateKeeper)sharding.init(gateEntityType)log.step(s"initializing sharding for ${PluWorker.EntityKey} ...")(MachineId("",""))val pluEntityType = Entity(PluWorker.EntityKey) { entityContext =>PluWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(PluWorker.StopWorker)sharding.init(pluEntityType)log.step(s"initializing sharding for ${PocWorker.EntityKey} ...")(MachineId("",""))val pocEntityType = Entity(PocWorker.EntityKey) { entityContext =>PocWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive,pocurl)}.withStopMessage(PocWorker.StopWorker)sharding.init(pocEntityType)Behaviors.empty}}
}

可以看到,不同类型的片以不同的EntityKey来代表。前端接入是基于akka-http的,如下:

object CrmRoute extends LogSupport {def route(entityRef: EntityRef[CrmWorker.Command])(implicit ec: ExecutionContext, jsStreaming: EntityStreamingSupport, timeout: Timeout): akka.http.scaladsl.server.Route = {concat(pathPrefix("ismember") {parameter(Symbol("faceid")) { fid =>val futResp = entityRef.ask[CrmWorker.Response](CrmWorker.IsMemberFace(fid, _)).map {case CrmWorker.ValidMember(memberId) => memberIdcase CrmWorker.InvalidMember(msg) => throw new Exception(msg)}onSuccess(futResp)(complete(_))}},pathPrefix("getmember") {parameter(Symbol("memberid")) { mid =>val futResp = entityRef.ask[CrmWorker.Response](CrmWorker.GetMemberInfo(mid, _)).map {case CrmWorker.MemberInfo(json) => HttpEntity(MediaTypes.`application/json`,json)case CrmWorker.InvalidMemberInfo(msg) => throw new Exception(msg)}onSuccess(futResp)(complete(_))}})}
}

各项业务功能调用通过entityRef.ask发送给了某个用户指定节点上的entity。akka的actor是线程的再细分,即一个actor可能与其它成千上万个actor共享一条线程。所以绝对不容许任何blocking。我是用下面示范的模式来实现non-blocking的:

  def apply(shard: ActorRef[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration): Behavior[Command] = {val (shopId,posId) = entityId.split(':').toList match {case sid::pid::Nil  => (sid,pid) }implicit val loc = Messages.MachineId(shopId,posId)log.stepOn = trace//    Behaviors.supervise(Behaviors.setup[Command] { ctx =>implicit val ec = ctx.executionContextctx.setReceiveTimeout(keepAlive, Idle)Behaviors.withTimers[Command] { timer =>Behaviors.receiveMessage[Command] {case IsMemberFace(fid, replyTo) =>log.step(s"CrmWorker: IsMemberFace($fid)")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(isMemberFace(fid)) {case Success(mid) => {if (mid._1.isEmpty) {replyTo ! InvalidMember(mid._2)Done(loc.shopid, loc.posid, s"IsMemberFace with Error ${mid._2}")} else {replyTo ! ValidMember(mid._1)Done(loc.shopid, loc.posid, s"IsMemberFace.")}}case Failure(err) =>log.error(s"CrmWorker: IsMemberFace Error: ${err.getMessage}")replyTo ! InvalidMember(err.getMessage)Done(loc.shopid, loc.posid, s"IsMemberFace with error: ${err.getMessage}")}Behaviors.samecase GetMemberInfo(mid, replyTo) =>log.step(s"CrmWorker: GetMemberInfo($mid)")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(getMemberInfo(mid)) {case Success(json) => {replyTo ! MemberInfo(json)Done(loc.shopid, loc.posid, s"GetMemberInfo with json ${json}")}case Failure(err) =>log.error(s"CrmWorker: GetMemberInfo Error: ${err.getMessage}")replyTo ! InvalidMemberInfo(err.getMessage)Done(loc.shopid, loc.posid, s"GetMemberInfo with error: ${err.getMessage}")}Behaviors.samecase Idle =>// after receive timeoutshard ! ClusterSharding.Passivate(ctx.self)Behaviors.samecase StopWorker =>Behaviors.stopped(() => log.step(s"CrmWorker: {$shopId,$posId} passivated to stop.")(MachineId(shopId, posId)))case Done(shopid, termid, work) =>if (maybeMgoClient.isDefined)maybeMgoClient.get.close()log.step(s"CrmWorker: {$shopid,$termid} finished $work")(MachineId(shopid,termid))Behaviors.samecase _ => Behaviors.same}.receiveSignal {case (_,PostStop) =>log.step(s"CrmWorker: {$shopId,$posId} stopped.")(MachineId(shopId, posId))Behaviors.same}}}//   ).onFailure(SupervisorStrategy.restart)}

主要是使用ctx.pipeToSelf(work)把一个Future转换成内部消息。这里的work的实现最终必须返回Future类型,如下面的示范:

object CrmServices extends JsonConverter with LogSupport {import MgoHelpers._def validMember(docs: Seq[Document], faceid: String): Future[(String,String)] = {val memberId: (String, String) = docs match {case Nil => ("", s"faceid[$faceid]不存在!")case docs =>val member = MemberInfo.fromDocument(docs.head)if (member.expireDt.compareTo(mgoDateTimeNow) < 0)("", s"会员:${member.memberId}-${member.memberName}会籍已过期!")else(member.memberId, "")}FastFuture.successful(memberId)}def isMemberFace(faceid: String)(implicit mgoClient: MongoClient, ec: ExecutionContext): Future[(String,String)] = {implicit val db = mgoClient.getDatabase(CrmModels.SCHEMA.DBNAME)val col = db.getCollection(CrmModels.SCHEMA.MEMBERINFO)val memberInfo: Future[Seq[Document]] = col.find(equal(SCHEMA.FACEID,faceid)).toFuture()for {mi <- memberInfo(id,msg) <- validMember(mi,faceid)} yield (id,msg)}def getMemberInfo(memberid: String)(implicit mgoClient: MongoClient, ec: ExecutionContext): Future[String] = {implicit val db = mgoClient.getDatabase(CrmModels.SCHEMA.DBNAME)val col = db.getCollection(CrmModels.SCHEMA.MEMBERINFO)val memberInfo: Future[Seq[Document]] = col.find(equal(SCHEMA.MEMBERID,memberid)).toFuture()for {docs <- memberInfojstr <- FastFuture.successful(if(docs.isEmpty) "" else toJson(MemberInfo.fromDocument(docs.head)))} yield jstr}}

另外,由于每个用户第一次调用一项业务功能时akka-cluster-shardregion都会自动在某个节点上构建一个新的entity,如果上万个用户使用过某个功能,那么就会有万个entity及其所占用的资源如mongodb客户端等停留在内存里。所以在完成一项功能运算后应关闭entity,释放占用的资源。这个是通过shard ! ClusterSharding.passivate(ctx.self)实现的。

这篇关于akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887177

相关文章

Spring Boot整合log4j2日志配置的详细教程

《SpringBoot整合log4j2日志配置的详细教程》:本文主要介绍SpringBoot项目中整合Log4j2日志框架的步骤和配置,包括常用日志框架的比较、配置参数介绍、Log4j2配置详解... 目录前言一、常用日志框架二、配置参数介绍1. 日志级别2. 输出形式3. 日志格式3.1 PatternL

SpringBoot整合DeepSeek实现AI对话功能

《SpringBoot整合DeepSeek实现AI对话功能》本文介绍了如何在SpringBoot项目中整合DeepSeekAPI和本地私有化部署DeepSeekR1模型,通过SpringAI框架简化了... 目录Spring AI版本依赖整合DeepSeek API key整合本地化部署的DeepSeek

Ollama整合open-webui的步骤及访问

《Ollama整合open-webui的步骤及访问》:本文主要介绍如何通过源码方式安装OpenWebUI,并详细说明了安装步骤、环境要求以及第一次使用时的账号注册和模型选择过程,需要的朋友可以参考... 目录安装环境要求步骤访问选择PjrIUE模型开始对话总结 安装官方安装地址:https://docs.

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

SpringBoot 整合 Grizzly的过程

《SpringBoot整合Grizzly的过程》Grizzly是一个高性能的、异步的、非阻塞的HTTP服务器框架,它可以与SpringBoot一起提供比传统的Tomcat或Jet... 目录为什么选择 Grizzly?Spring Boot + Grizzly 整合的优势添加依赖自定义 Grizzly 作为

springboot整合gateway的详细过程

《springboot整合gateway的详细过程》本文介绍了如何配置和使用SpringCloudGateway构建一个API网关,通过实例代码介绍了springboot整合gateway的过程,需要... 目录1. 添加依赖2. 配置网关路由3. 启用Eureka客户端(可选)4. 创建主应用类5. 自定

springboot整合 xxl-job及使用步骤

《springboot整合xxl-job及使用步骤》XXL-JOB是一个分布式任务调度平台,用于解决分布式系统中的任务调度和管理问题,文章详细介绍了XXL-JOB的架构,包括调度中心、执行器和Web... 目录一、xxl-job是什么二、使用步骤1. 下载并运行管理端代码2. 访问管理页面,确认是否启动成功

SpringBoot整合kaptcha验证码过程(复制粘贴即可用)

《SpringBoot整合kaptcha验证码过程(复制粘贴即可用)》本文介绍了如何在SpringBoot项目中整合Kaptcha验证码实现,通过配置和编写相应的Controller、工具类以及前端页... 目录SpringBoot整合kaptcha验证码程序目录参考有两种方式在springboot中使用k

Spring Boot 中整合 MyBatis-Plus详细步骤(最新推荐)

《SpringBoot中整合MyBatis-Plus详细步骤(最新推荐)》本文详细介绍了如何在SpringBoot项目中整合MyBatis-Plus,包括整合步骤、基本CRUD操作、分页查询、批... 目录一、整合步骤1. 创建 Spring Boot 项目2. 配置项目依赖3. 配置数据源4. 创建实体类

SpringBoot整合InfluxDB的详细过程

《SpringBoot整合InfluxDB的详细过程》InfluxDB是一个开源的时间序列数据库,由Go语言编写,适用于存储和查询按时间顺序产生的数据,它具有高效的数据存储和查询机制,支持高并发写入和... 目录一、简单介绍InfluxDB是什么?1、主要特点2、应用场景二、使用步骤1、集成原生的Influ