akka-grpc - 应用案例

2024-04-09 04:32
文章标签 应用 案例 grpc akka

本文主要是介绍akka-grpc - 应用案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  上期说道:http/2还属于一种不算普及的技术协议,可能目前只适合用于内部系统集成,现在开始大面积介入可能为时尚早。不过有些项目需求不等人,需要使用这项技术,所以研究了一下akka-grpc,写了一篇介绍。本想到此为止,继续其它项目。想想这样做法有点不负责任,像是草草收场。毕竟用akka-grpc做了些事情,想想还是再写这篇跟大家分享使用kka-grpc的过程。

我说过,了解akka-grpc的主要目的还是在protobuf的应用上。这是一种高效率的序列化协议。刚好,公司有这么个项目,是一个图像处理平台:把很多图片拍摄终端的图像传上平台进行商品识别、OCR等图像处理。由于终端数量多、图像处理又特别消耗内存、CPU等计算资源、又要求快速响应,所以第一考虑就是使用akka-cluster把图像处理任务分割到多个节点上并行处理。这里就需要仔细考虑图片在终端到平台、然后集群节点与点actor间的传输效率了。如何在akka系统里使用protobuf格式的数据正是本篇讨论和示范的目的。

akka-grpc应用一般从IDL文件里消息类型和服务函数的定义开始,如下面这个.proto文件示范:

syntax = "proto3";import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";option (scalapb.options) = {// don't append file name to packageflat_package: true// generate one Scala file for all messages (services still get their own file)single_file: true// add imports to generated file// useful when extending traits or using custom types// import: "io.ontherocks.hellogrpc.RockingMessage"// code to put at the top of generated file// works only with `single_file: true`//preamble: "sealed trait SomeSealedTrait"
};package com.datatech.pos.abs;message UCredential {string userid = 1;string password = 2;
}message JWToken {string jwt = 1;
}message Picture {int32 num = 1;bytes blob = 2;
}
message Capture {string ean = 1;bytes cover1 = 2;bytes cover2 = 3;
}message Book {string ean = 1;string ver = 2;string isbn = 3;string title = 4;string publisher = 5;double price = 6;bytes cover1 = 7;bytes cover2 = 8;
}message QueryResult {int32  sts         = 1;string msg         = 2;Book bookinfo   = 3;
}service Services {rpc GetAuthToken(UCredential) returns (JWToken) {};rpc SavePicture(Picture) returns (QueryResult) {};rpc GetPicture(Picture) returns (Picture) {};
//  rpc SaveCapture(Capture) returns (QueryResult) {};
//  rpc GetCapture(Capture) returns (Capture) {};
//  rpc GetBookInfo(Capture) returns (QueryResult) {};
}

因为这次示范针对的是protobuf的使用,所以就拣了SavePicture,GetPicture这两项服务函数。JWToken只是用户身份凭证,集群分片shard-entityId是以用户凭证为基础的,所以平台需要通过JWT进行跨节点任务指派以实现分布式图像处理运算。

下面就要在编译器插件自动产生的基础服务接口代码基础上进行具体的服务功能实现。这部分主要是对接口函数的实现(oveerride):

class gRPCServices(trace: Boolean, system: ActorSystem, sharding: ClusterSharding)(implicit  waitResponseTimeout: Timeout, authenticator: AuthBase) extends ServicesPowerApi with LogSupport {implicit val ec = system.dispatcherlog.stepOn = traceoverride def getAuthToken(request: UCredential, meta: Metadata): Future[JWToken] = {val entityRef = sharding.entityRefFor(Authenticator.EntityKey, UUID.randomUUID.toString)val jwtResp = for {ui <- entityRef.ask[Authenticator.Response](Authenticator.GetUserInfo(request.userid, _)).map {case Authenticator.UserInfo(info) => infocase _ => Map[String, Any]()}jwt <- entityRef.ask[Authenticator.Response](Authenticator.GetToken(ui, _))} yield jwtjwtResp.map {case Authenticator.JWToken(jwt) =>if (jwt.nonEmpty) JWToken(jwt)else throw new Exception("身份验证失败!无法提供凭证。")case _ => throw new Exception("身份验证失败!无法提供凭证。")}}override def savePicture(in: Picture, metadata: Metadata): Future[QueryResult] = {val jwt = getJwt(metadata).getOrElse("")val ids = authenticator.shopIdFromJwt(jwt).getOrElse(("","","","",""))val (shopId, posId, termId, impurl,devId) = idsval entityRef = sharding.entityRefFor(ImgProcessor.EntityKey, s"$shopId:$posId")val futResp = entityRef.ask[ImgProcessor.Response](ImgProcessor.SaveImage(in, _)).map {case ImgProcessor.ValidImgPro(img) => QueryResult(sts = 0, msg = "picture saved.")case ImgProcessor.FailedImgPro(msg) => QueryResult(sts = -1, msg = msg)}futResp}override def getPicture(in: Picture, metadata: Metadata): Future[Picture] = {val jwt = getJwt(metadata).getOrElse("")val ids = authenticator.shopIdFromJwt(jwt).getOrElse(("","","","",""))val (shopId, posId, termId, impurl,devId) = idsval entityRef = sharding.entityRefFor(ImgProcessor.EntityKey, s"$shopId:$posId")val futResp = entityRef.ask[ImgProcessor.Response](ImgProcessor.GetImage(in.num, _)).map {case ImgProcessor.ValidImgPro(img) => imgcase ImgProcessor.FailedImgPro(msg) => Picture(-1, ByteString.EMPTY)}futResp}def getJwt(metadata: Metadata): Option[String] = {metadata.getText("bearer")}
}

由于是通过PowerApi模式产生的接口代码,所以接口函数都带有MetaData参数,代表HttpRequest header集合。可以看到:服务函数实现都是通过entityRef,一个分片调度器分配到集群某个节点ImgProcessor.EntityKey类型的entity-actor上进行的。shopId:posId就是代表为某用户构建的entityId,这个是通过用户在Request中提供的MetaData参数中jwt解析得出的。

可以看到,具体服务提供是通过集群的分片实现的。下面是这个分片的代码示范:

      log.step(s"initializing sharding for ${ImgProcessor.EntityKey} ...")(MachineId("",""))val imgEntityType = Entity(ImgProcessor.EntityKey) { entityContext =>ImgProcessor(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive)}.withStopMessage(ImgProcessor.StopWorker)sharding.init(imgEntityType)

上面imgEntityType就是shard-entity类型,其实就是按用户提供的jwt在任意集群节点上实时构建的一个opencv图像处理器。下面是这个entity-actor的示范代码:

object ImgProcessor extends LogSupport {sealed trait Command extends CborSerializablecase class SaveImage(img: Picture, replyTo: ActorRef[Response]) extends Commandcase class GetImage(imgnum: Int,replyTo: ActorRef[Response]) extends Commandsealed trait Response extends CborSerializablecase class ValidImgPro(img: Picture) extends Responsecase class FailedImgPro(msg: String) extends Responsedef apply(shard: ActorRef[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration): Behavior[Command] = {val (shopId,posId) = entityId.split(':').toList match {case sid::pid::Nil  => (sid,pid) }implicit val loc = Messages.MachineId(shopId,posId)log.stepOn = traceBehaviors.setup[Command] { ctx =>implicit val ec = ctx.executionContextctx.setReceiveTimeout(keepAlive, Idle)Behaviors.withTimers[Command] { timer =>Behaviors.receiveMessage[Command] {case SaveImage(img, replyTo) =>log.step(s"ImgProcessor: SaveImage(${img.num})")implicit val client = mongoClient(mgoHosts)maybeMgoClient = Some(client)ctx.pipeToSelf(savePicture(img)) {case Success(_) => {replyTo ! ValidImgPro(img)Done(loc.shopid, loc.posid, s"saved image #${img.num}.")}case Failure(err) =>log.error(s"ImgProcessor: SaveImage Error: ${err.getMessage}")replyTo ! FailedImgPro(err.getMessage)Done(loc.shopid, loc.posid, s"SaveImage with error: ${err.getMessage}")}Behaviors.samecase GetImage(imgnum, replyTo) =>
...}}

整个图片传输是通过actor的消息实现的。akka消息支持多种序列化格式,包括protobuf, 在配置文件.conf里定义:

akka {loglevel = INFOactor {provider = clusterserializers {jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"proto = "akka.remote.serialization.ProtobufSerializer"}serialization-bindings {"com.datatech.pos.abs.CborSerializable" = jackson-cbor"scalapb.GeneratedMessage" = proto}}
}

grpc server 基本上是个标准模块,不同的只是service参数:

class gRPCServer(host: String, port: Int) extends LogSupport {def runServer(system: ActorSystem[_], service: gRPCServices): Future[Http.ServerBinding] = {implicit val classic = system.toClassicimplicit val ec: ExecutionContext = system.executionContext// Create service handlersval serviceHandler: HttpRequest => Future[HttpResponse] =ServicesPowerApiHandler(service)// Bind service handler servers to localhost:8080/8081val binding = Http().bindAndHandleAsync(serviceHandler,interface = host,port = port,connectionContext = HttpConnectionContext())// report successful bindingbinding.foreach { binding => println(s"******* startup gRPC-server on: port = $port  *******") }binding//#server}
}

下面是客户端测试代码:

object gRPCTestClient {def main(args: Array[String]): Unit = {val config_onenode = ConfigFactory.load("onenode")implicit val sys = ActorSystem("grpc-client", config_onenode)implicit val ec = sys.dispatcherval clientSettings = GrpcClientSettings.fromConfig(Services.name)//   val clientSettings = GrpcClientSettings.connectToServiceAt("192.168.11.189", 50052);implicit val client = ServicesClient(clientSettings)val futJwt = client.getAuthToken(UCredential("9013", "123456"))val jwt = Await.result(futJwt, 5.seconds).jwtprintln(s"got jwt: ${jwt}")scala.io.StdIn.readLine()val bytes = FileStreaming.FileToByteArray("books/59c10d099b26e.jpg")val mat = bytesToMat(bytes)show(mat,"sent picture")scala.io.StdIn.readLine()val picture = Picture(111,marshal(bytes))val futQR = client.savePicture().addHeader("Bearer", jwt).invoke(Picture(111,marshal(bytes)))futQR.onComplete {case Success(qr) => println(s"Saving Success: ${qr.msg}")case Failure(err) => println(s"Saving Error: ${err.getMessage}")}scala.io.StdIn.readLine()val futPic = client.getPicture().addHeader("Bearer", jwt).invoke(Picture(111,ByteString.EMPTY))futPic.onComplete {case Success(pic) =>val image = bytesToMat(unmarshal(pic.blob))show(image, s"picture:${pic.num}")case Failure(err) => println(s"Reading Error: ${err.getMessage}")}scala.io.StdIn.readLine()sys.terminate()}
}

基本流程是:先通过getAuthToken获取jwt;在调用服务时通过addHeader("bearer",jwt)把jwt随着函数调用一起提交给服务端。

客户端设置可以在配置文件中定义:

akka {loglevel = INFOgrpc.client {"com.datatech.pos.abs.Services" {host = 192.168.11.189port = 52051override-authority = foo.test.google.fruse-tls = false}}}

 

这篇关于akka-grpc - 应用案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887179

相关文章

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Android Kotlin 高阶函数详解及其在协程中的应用小结

《AndroidKotlin高阶函数详解及其在协程中的应用小结》高阶函数是Kotlin中的一个重要特性,它能够将函数作为一等公民(First-ClassCitizen),使得代码更加简洁、灵活和可... 目录1. 引言2. 什么是高阶函数?3. 高阶函数的基础用法3.1 传递函数作为参数3.2 Lambda

Java中&和&&以及|和||的区别、应用场景和代码示例

《Java中&和&&以及|和||的区别、应用场景和代码示例》:本文主要介绍Java中的逻辑运算符&、&&、|和||的区别,包括它们在布尔和整数类型上的应用,文中通过代码介绍的非常详细,需要的朋友可... 目录前言1. & 和 &&代码示例2. | 和 ||代码示例3. 为什么要使用 & 和 | 而不是总是使

Python循环缓冲区的应用详解

《Python循环缓冲区的应用详解》循环缓冲区是一个线性缓冲区,逻辑上被视为一个循环的结构,本文主要为大家介绍了Python中循环缓冲区的相关应用,有兴趣的小伙伴可以了解一下... 目录什么是循环缓冲区循环缓冲区的结构python中的循环缓冲区实现运行循环缓冲区循环缓冲区的优势应用案例Python中的实现库

SpringBoot整合MybatisPlus的基本应用指南

《SpringBoot整合MybatisPlus的基本应用指南》MyBatis-Plus,简称MP,是一个MyBatis的增强工具,在MyBatis的基础上只做增强不做改变,下面小编就来和大家介绍一下... 目录一、MyBATisPlus简介二、SpringBoot整合MybatisPlus1、创建数据库和

python中time模块的常用方法及应用详解

《python中time模块的常用方法及应用详解》在Python开发中,时间处理是绕不开的刚需场景,从性能计时到定时任务,从日志记录到数据同步,时间模块始终是开发者最得力的工具之一,本文将通过真实案例... 目录一、时间基石:time.time()典型场景:程序性能分析进阶技巧:结合上下文管理器实现自动计时

MySQL中实现多表查询的操作方法(配sql+实操图+案例巩固 通俗易懂版)

《MySQL中实现多表查询的操作方法(配sql+实操图+案例巩固通俗易懂版)》本文主要讲解了MySQL中的多表查询,包括子查询、笛卡尔积、自连接、多表查询的实现方法以及多列子查询等,通过实际例子和操... 目录复合查询1. 回顾查询基本操作group by 分组having1. 显示部门号为10的部门名,员

Java逻辑运算符之&&、|| 与&、 |的区别及应用

《Java逻辑运算符之&&、||与&、|的区别及应用》:本文主要介绍Java逻辑运算符之&&、||与&、|的区别及应用的相关资料,分别是&&、||与&、|,并探讨了它们在不同应用场景中... 目录前言一、基本概念与运算符介绍二、短路与与非短路与:&& 与 & 的区别1. &&:短路与(AND)2. &:非短

Spring AI集成DeepSeek三步搞定Java智能应用的详细过程

《SpringAI集成DeepSeek三步搞定Java智能应用的详细过程》本文介绍了如何使用SpringAI集成DeepSeek,一个国内顶尖的多模态大模型,SpringAI提供了一套统一的接口,简... 目录DeepSeek 介绍Spring AI 是什么?Spring AI 的主要功能包括1、环境准备2