akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?

2024-04-09 04:32

本文主要是介绍akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  在讨论lagom之前,先从遇到的需求开始介绍:现代企业的it系统变得越来越多元化、复杂化了。线上、线下各种系统必须用某种方式集成在一起。从各种it系统的基本共性分析:最明显的特征应该是后台数据库的角色了,起码,大家都需要使用数据。另外,每个系统都可能具备大量实时在线用户、海量数据特性,代表着对数据处理能力有极大的要求,预示系统只有通过分布式处理方式才能有效运行。

一个月前开始设计一个企业的it系统,在讨论数据中台时就遇到这样的需求。这个所谓的数据中台的主要作用是为整体系统提供一套统一的数据使用api,前后连接包括web,mobile,desktop的前端系统以及由多种传统及分布式数据库系统,形成一个统一的数据使用接口。实际上,数据库连接不只是简单的读写操作,还需要包括所有实时的数据处理:根据业务要求对数据进行相应的处理然后使用。那么这是一个怎样的系统呢?首先,它必须是分布式的:为了对付大量的前端用户同时调用同一个api,把这个api的功能同时分派到多个服务器上运行是个有效的解决方法。这是个akka-cluster-sharding模式。数据中台api是向所有内部系统以及一些特定的外部第三方系统开放的,用http标准协议支持各系统与数据后台的连接也是合理的。这个akka-http, akka-grpc可以胜任。然后各系统之间的集成可以通过一个流运算工具如kafka实现各聚合根之间的交互连接。

似乎所有需要的工具都齐备了,其中akka占了大部分功能。但有些问题是:基于akka技术栈来编程或多或少有些门槛要求。最起码需要一定程度的akka开发经验。更不用提组织一个开发团队了。如果市面上有个什么能提供相应能力的开发工具,可以轻松快速上手的,那么项目开发就可以立即启动了。

现在来谈谈lagom:lagom是一套scala栈的微服务软件开发工具。从官方文档介绍了解到lagom主要提供了一套服务接口定义及服务功能开发框架。值得一提的是服务功能可以是集群分片模式的。走了一遍lagom的启动示范代码,感觉这是一套集开发、测试、部署为一体的框架(framework)。在这个框架里按照规定开发几个简单的服务api非常顺利,很方便。这让我对使用lagom产生了兴趣,想继续调研一下利用lagoom来开发上面所提及数据中台的可行性。lagom服务接入部分是通过play实现的。play我不太熟悉,想深入了解一下用akka-http替代的可行性,不过看来不太容易。最让我感到失望的是lagom的服务分片(service-sharding)直接就是akka-cluster那一套:cluster、event-sourcing、CQRS什么的都需要自己从头到尾重新编写。用嵌入的kafka进行服务整合与单独用kafka也不会增加太多麻烦。倒是lagom提供的这个集开发、测试、部署为一体的框架在团队开发管理中应该能发挥良好的作用。

在我看来:服务接入方面由于涉及身份验证、使用权限、二进制文件类型数据交换等使用akka-http,akka-grpc会更有控制力。服务功能实现直接就用akka-cluster-sharding,把计算任务分布到各节点上,这个我们前面已经介绍过了。

所以,最后还是决定直接用akka-typed来实现这个数据中台。用了一个多月时间做研发,到现在看来效果不错,能够符合项目要求。下面是一些用akka-typed实现业务集成的过程介绍。首先,系统特点是功能分片:系统按业务条块分成多个片shardregion,每个片里的entity负责处理一项业务的多个功能。多个用户调用一项业务功能代表多个entity分布在不同的集群节点上并行运算。下面是一个业务群的代码示范:

object Shards extends LogSupport {def apply(mgoHosts: List[String],trace: Boolean, keepAlive: FiniteDuration, pocurl: String)(implicit authBase: AuthBase): Behavior[Nothing] = {Behaviors.setup[Nothing] { ctx =>val sharding = ClusterSharding(ctx.system)log.stepOn = truelog.step(s"starting cluster-monitor ...")(MachineId("",""))ctx.spawn(MonitorActor(),"abs-cluster-monitor")log.step(s"initializing sharding for ${Authenticator.EntityKey} ...")(MachineId("",""))val authEntityType = Entity(Authenticator.EntityKey) { entityContext =>Authenticator(entityContext.shard,mgoHosts,trace,keepAlive)}.withStopMessage(Authenticator.StopAuthenticator)sharding.init(authEntityType)log.step(s"initializing sharding for ${CrmWorker.EntityKey} ...")(MachineId("",""))val crmEntityType = Entity(CrmWorker.EntityKey) { entityContext =>CrmWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(CrmWorker.StopWorker)sharding.init(crmEntityType)log.step(s"initializing sharding for ${GateKeeper.EntityKey} ...")(MachineId("",""))val gateEntityType = Entity(GateKeeper.EntityKey) { entityContext =>GateKeeper(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(GateKeeper.StopGateKeeper)sharding.init(gateEntityType)log.step(s"initializing sharding for ${PluWorker.EntityKey} ...")(MachineId("",""))val pluEntityType = Entity(PluWorker.EntityKey) { entityContext =>PluWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(PluWorker.StopWorker)sharding.init(pluEntityType)log.step(s"initializing sharding for ${PocWorker.EntityKey} ...")(MachineId("",""))val pocEntityType = Entity(PocWorker.EntityKey) { entityContext =>PocWorker(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive,pocurl)}.withStopMessage(PocWorker.StopWorker)sharding.init(pocEntityType)Behaviors.empty}}
}

可以看到,不同类型的片以不同的EntityKey来代表。前端接入是基于akka-http的,如下:

object CrmRoute extends LogSupport {def route(entityRef: EntityRef[CrmWorker.Command])(implicit ec: ExecutionContext, jsStreaming: EntityStreamingSupport, timeout: Timeout): akka.http.scaladsl.server.Route = {concat(pathPrefix("ismember") {parameter(Symbol("faceid")) { fid =>val futResp = entityRef.ask[CrmWorker.Response](CrmWorker.IsMemberFace(fid, _)).map {case CrmWorker.ValidMember(memberId) => memberIdcase CrmWorker.InvalidMember(msg) => throw new Exception(msg)}onSuccess(futResp)(complete(_))}},pathPrefix("getmember") {parameter(Symbol("memberid")) { mid =>val futResp = entityRef.ask[CrmWorker.Response](CrmWorker.GetMemberInfo(mid, _)).map {case CrmWorker.MemberInfo(json) => HttpEntity(MediaTypes.`application/json`,json)case CrmWorker.InvalidMemberInfo(msg) => throw new Exception(msg)}onSuccess(futResp)(complete(_))}})}
}

各项业务功能调用通过entityRef.ask发送给了某个用户指定节点上的entity。akka的actor是线程的再细分,即一个actor可能与其它成千上万个actor共享一条线程。所以绝对不容许任何blocking。我是用下面示范的模式来实现non-blocking的:

  def apply(shard: ActorRef[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration): Behavior[Command] = {val (shopId,posId) = entityId.split(':').toList match {case sid::pid::Nil  => (sid,pid) }implicit val loc = Messages.MachineId(shopId,posId)log.stepOn = trace//    Behaviors.supervise(Behaviors.setup[Command] { ctx =>implicit val ec = ctx.executionContextctx.setReceiveTimeout(keepAlive, Idle)Behaviors.withTimers[Command] { timer =>Behaviors.receiveMessage[Command] {case IsMemberFace(fid, replyTo) =>log.step(s"CrmWorker: IsMemberFace($fid)")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(isMemberFace(fid)) {case Success(mid) => {if (mid._1.isEmpty) {replyTo ! InvalidMember(mid._2)Done(loc.shopid, loc.posid, s"IsMemberFace with Error ${mid._2}")} else {replyTo ! ValidMember(mid._1)Done(loc.shopid, loc.posid, s"IsMemberFace.")}}case Failure(err) =>log.error(s"CrmWorker: IsMemberFace Error: ${err.getMessage}")replyTo ! InvalidMember(err.getMessage)Done(loc.shopid, loc.posid, s"IsMemberFace with error: ${err.getMessage}")}Behaviors.samecase GetMemberInfo(mid, replyTo) =>log.step(s"CrmWorker: GetMemberInfo($mid)")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(getMemberInfo(mid)) {case Success(json) => {replyTo ! MemberInfo(json)Done(loc.shopid, loc.posid, s"GetMemberInfo with json ${json}")}case Failure(err) =>log.error(s"CrmWorker: GetMemberInfo Error: ${err.getMessage}")replyTo ! InvalidMemberInfo(err.getMessage)Done(loc.shopid, loc.posid, s"GetMemberInfo with error: ${err.getMessage}")}Behaviors.samecase Idle =>// after receive timeoutshard ! ClusterSharding.Passivate(ctx.self)Behaviors.samecase StopWorker =>Behaviors.stopped(() => log.step(s"CrmWorker: {$shopId,$posId} passivated to stop.")(MachineId(shopId, posId)))case Done(shopid, termid, work) =>if (maybeMgoClient.isDefined)maybeMgoClient.get.close()log.step(s"CrmWorker: {$shopid,$termid} finished $work")(MachineId(shopid,termid))Behaviors.samecase _ => Behaviors.same}.receiveSignal {case (_,PostStop) =>log.step(s"CrmWorker: {$shopId,$posId} stopped.")(MachineId(shopId, posId))Behaviors.same}}}//   ).onFailure(SupervisorStrategy.restart)}

主要是使用ctx.pipeToSelf(work)把一个Future转换成内部消息。这里的work的实现最终必须返回Future类型,如下面的示范:

object CrmServices extends JsonConverter with LogSupport {import MgoHelpers._def validMember(docs: Seq[Document], faceid: String): Future[(String,String)] = {val memberId: (String, String) = docs match {case Nil => ("", s"faceid[$faceid]不存在!")case docs =>val member = MemberInfo.fromDocument(docs.head)if (member.expireDt.compareTo(mgoDateTimeNow) < 0)("", s"会员:${member.memberId}-${member.memberName}会籍已过期!")else(member.memberId, "")}FastFuture.successful(memberId)}def isMemberFace(faceid: String)(implicit mgoClient: MongoClient, ec: ExecutionContext): Future[(String,String)] = {implicit val db = mgoClient.getDatabase(CrmModels.SCHEMA.DBNAME)val col = db.getCollection(CrmModels.SCHEMA.MEMBERINFO)val memberInfo: Future[Seq[Document]] = col.find(equal(SCHEMA.FACEID,faceid)).toFuture()for {mi <- memberInfo(id,msg) <- validMember(mi,faceid)} yield (id,msg)}def getMemberInfo(memberid: String)(implicit mgoClient: MongoClient, ec: ExecutionContext): Future[String] = {implicit val db = mgoClient.getDatabase(CrmModels.SCHEMA.DBNAME)val col = db.getCollection(CrmModels.SCHEMA.MEMBERINFO)val memberInfo: Future[Seq[Document]] = col.find(equal(SCHEMA.MEMBERID,memberid)).toFuture()for {docs <- memberInfojstr <- FastFuture.successful(if(docs.isEmpty) "" else toJson(MemberInfo.fromDocument(docs.head)))} yield jstr}}

另外,由于每个用户第一次调用一项业务功能时akka-cluster-shardregion都会自动在某个节点上构建一个新的entity,如果上万个用户使用过某个功能,那么就会有万个entity及其所占用的资源如mongodb客户端等停留在内存里。所以在完成一项功能运算后应关闭entity,释放占用的资源。这个是通过shard ! ClusterSharding.passivate(ctx.self)实现的。

这篇关于akka-typed(9) - 业务分片、整合,谈谈lagom, 需要吗?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887177

相关文章

一文详解如何从零构建Spring Boot Starter并实现整合

《一文详解如何从零构建SpringBootStarter并实现整合》SpringBoot是一个开源的Java基础框架,用于创建独立、生产级的基于Spring框架的应用程序,:本文主要介绍如何从... 目录一、Spring Boot Starter的核心价值二、Starter项目创建全流程2.1 项目初始化(

Spring Boot 整合 MyBatis 连接数据库及常见问题

《SpringBoot整合MyBatis连接数据库及常见问题》MyBatis是一个优秀的持久层框架,支持定制化SQL、存储过程以及高级映射,下面详细介绍如何在SpringBoot项目中整合My... 目录一、基本配置1. 添加依赖2. 配置数据库连接二、项目结构三、核心组件实现(示例)1. 实体类2. Ma

SpringBoot整合jasypt实现重要数据加密

《SpringBoot整合jasypt实现重要数据加密》Jasypt是一个专注于简化Java加密操作的开源工具,:本文主要介绍详细介绍了如何使用jasypt实现重要数据加密,感兴趣的小伙伴可... 目录jasypt简介 jasypt的优点SpringBoot使用jasypt创建mapper接口配置文件加密

SpringBoot整合MybatisPlus的基本应用指南

《SpringBoot整合MybatisPlus的基本应用指南》MyBatis-Plus,简称MP,是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,下面小编就来和大家介绍一下... 目录一、MyBATisPlus简介二、SpringBoot整合MybatisPlus1、创建数据库和

Java8需要知道的4个函数式接口简单教程

《Java8需要知道的4个函数式接口简单教程》:本文主要介绍Java8中引入的函数式接口,包括Consumer、Supplier、Predicate和Function,以及它们的用法和特点,文中... 目录什么是函数是接口?Consumer接口定义核心特点注意事项常见用法1.基本用法2.结合andThen链

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

Spring Boot 3 整合 Spring Cloud Gateway实践过程

《SpringBoot3整合SpringCloudGateway实践过程》本文介绍了如何使用SpringCloudAlibaba2023.0.0.0版本构建一个微服务网关,包括统一路由、限... 目录引子为什么需要微服务网关实践1.统一路由2.限流防刷3.登录鉴权小结引子当前微服务架构已成为中大型系统的标

SpringBoot整合easy-es的详细过程

《SpringBoot整合easy-es的详细过程》本文介绍了EasyES,一个基于Elasticsearch的ORM框架,旨在简化开发流程并提高效率,EasyES支持SpringBoot框架,并提供... 目录一、easy-es简介二、实现基于Spring Boot框架的应用程序代码1.添加相关依赖2.添

SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程

《SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程》本文详细介绍了如何在虚拟机和宝塔面板中安装RabbitMQ,并使用Java代码实现消息的发送和接收,通过异步通讯,可以优化... 目录一、RabbitMQ安装二、启动RabbitMQ三、javascript编写Java代码1、引入

Spring Boot整合log4j2日志配置的详细教程

《SpringBoot整合log4j2日志配置的详细教程》:本文主要介绍SpringBoot项目中整合Log4j2日志框架的步骤和配置,包括常用日志框架的比较、配置参数介绍、Log4j2配置详解... 目录前言一、常用日志框架二、配置参数介绍1. 日志级别2. 输出形式3. 日志格式3.1 PatternL