Akka(37): Http:客户端操作模式

2024-04-09 04:48

本文主要是介绍Akka(37): Http:客户端操作模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   Akka-http的客户端连接模式除Connection-Level和Host-Level之外还有一种非常便利的模式:Request-Level-Api。这种模式免除了连接Connection的概念,任何时候可以直接调用singleRequest来与服务端沟通。下面我们用几个例子来示范singleRequest的用法:

  (for {response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message"))message <- Unmarshal(response.entity).to[String]} yield message).andThen {case Success(msg) => println(s"Received message: $msg")case Failure(err) => println(s"Error: ${err.getMessage}")}.andThen {case _ => sys.terminate()}

这是一个GET操作:用Http().singleRequest直接把HttpRequest发送给服务端uri并获取返回的HttpResponse。我们看到,整组函数的返回类型都是Future[?],所以用for-comprehension来把所有实际运算包嵌在Future运算模式内(context)。下面这个例子是客户端上传数据示范:

  (for {entity <- Marshal("Wata hell you doing?").to[RequestEntity]response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity))message <- Unmarshal(response.entity).to[String]} yield message).andThen {case Success(msg) => println(s"Received message: $msg")case Failure(err) => println(s"Error: ${err.getMessage}")}.andThen {case _ => sys.terminate()}

以上是个PUT操作。我们需要先构建数据载体HttpEntity。格式转换函数Marshal也返回Future[HttpEntity],所以也可以包含在for语句内。关注一下这个andThen,它可以连接一串多个monadic运算,在不影响上游运算结果的情况下实现一些副作用计算。

值得注意的是上面这两个例子虽然表现形式很简洁,但我们无法对数据转换过程中的异常及response的状态码等进行监控。所以我们应该把整个过程拆分成两部分:先获取response,再具体处理response,包括核对状态,处理数据等:

  case class Item(id: Int, name: String, price: Double)def getItem(itemId: Int): Future[HttpResponse] = for {response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))} yield responsedef extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = {futResp.andThen {case Success(HttpResponse(StatusCodes.OK, _, entity, _)) =>Unmarshal(entity).to[T].onComplete {case Success(t) => println(s"Got response entity: ${t}")case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")}case Success(_) => println("Exception in response!")case Failure(err) => println(s"Response Failed: ${err.getMessage}")}}extractEntity[Item](getItem(13))

现在这个extractEntity[Item](getItem(13))可以实现全过程的监控管理了。用同样的模式实现PUT操作:

  def putItem(item: Item): Future[HttpResponse] =for {reqEntity <- Marshal(item).to[RequestEntity]response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))} yield responseextractEntity[Item](putItem(Item(23,"Item#23", 46.0))).andThen { case _ => sys.terminate()}

当然,我们还是使用了前面几篇讨论里的Marshalling方式来进行数据格式的自动转换:

import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson
...
trait JsonCodec extends Json4sSupport {import org.json4s.DefaultFormatsimport org.json4s.ext.JodaTimeSerializersimplicit val serilizer = jackson.Serializationimplicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec
...import JsConverters._implicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)

如果我们需要对数据交换过程进行更细致的管控,用Host-Level-Api会更加适合。下面我们就针对Host-Level-Api构建一个客户端的工具库:

class PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)(implicit sys: ActorSystem, mat: ActorMaterializer) {import sys.dispatcherprivate val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings)
//单一requestdef requestSingleResponse(req: HttpRequest): Future[HttpResponse] = {Source.single(req -> 1).via(cnnPool).runWith(Sink.head).flatMap {case (Success(resp), _) => Future.successful(resp)case (Failure(fail), _) => Future.failed(fail)}}
//组串requestdef orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {Source(reqs.zipWithIndex.toMap).via(cnnPool).runFold(SortedMap[Int, Future[HttpResponse]]()) {case (m, (Success(r), idx)) => m + (idx -> Future.successful(r))case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f))}.flatMap { m => Future.sequence(m.values) }}
}

下面是一种比较安全的模式:使用了queue来暂存request从而解决因发送方与接收方速率不同所产生的问题:

class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)(qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew)(implicit sys: ActorSystem, mat: ActorMaterializer) {import sys.dispatcherprivate val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] =Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings)val queue =Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy).via(cnnPool).to(Sink.foreach({case ((Success(resp), p)) => p.success(resp)case ((Failure(e), p))    => p.failure(e)})).run()def queueRequest(request: HttpRequest): Future[HttpResponse] = {val responsePromise = Promise[HttpResponse]()queue.offer(request -> responsePromise).flatMap {case QueueOfferResult.Enqueued    => responsePromise.futurecase QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))case QueueOfferResult.Failure(ex) => Future.failed(ex)case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))}}
}

下面是这些工具函数的具体使用示范:

  val settings = ConnectionPoolSettings(sys).withMaxConnections(8).withMaxOpenRequests(8).withMaxRetries(3).withPipeliningLimit(4)val pooledClient = new PooledClient("localhost",8011,settings)def getItemByPool(itemId: Int): Future[HttpResponse] = for {response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))} yield responseextractEntity[Item](getItemByPool(13))def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = {val reqs = itemIds.map { id =>HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id")}val rets = (for {responses <- pooledClient.orderedResponses(reqs)} yield responses)rets}val futResps = getItemsByPool(List(3,5,7))futResps.andThen {case Success(listOfResps) => {listOfResps.foreach { r =>r match {case HttpResponse(StatusCodes.OK, _, entity, _) =>Unmarshal(entity).to[Item].onComplete {case Success(t) => println(s"Got response entity: ${t}")case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")}case _ => println("Exception in response!")}}}case _ => println("Failed to get list of responses!")}val queuedClient = new QueuedRequestsClient("localhost",8011,settings)()def putItemByQueue(item: Item): Future[HttpResponse] =for {reqEntity <- Marshal(item).to[RequestEntity]response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))} yield responseextractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0))).andThen { case _ => sys.terminate()}

下面是本次讨论的示范源代码:

服务端代码:

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson
trait JsonCodec extends Json4sSupport {import org.json4s.DefaultFormatsimport org.json4s.ext.JodaTimeSerializersimplicit val serilizer = jackson.Serializationimplicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodecobject TestServer extends App with JsonCodec {implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherimport JsConverters._case class Item(id: Int, name: String, price: Double)val messages = path("message") {get {complete("hello, how are you?")} ~put {entity(as[String]) {msg =>complete(msg)}}}val items =(path("item" / IntNumber) & get) { id =>get {complete(Item(id, s"item#$id", id * 2.0))}} ~(path("item") & put) {entity(as[Item]) {item =>complete(item)}}val route = messages ~ itemsval (host, port) = ("localhost", 8011)val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

客户端源代码:

import akka.actor._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._import scala.util._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jacksonimport scala.concurrent._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.marshalling.Marshalimport scala.collection.SortedMap
import akka.http.scaladsl.common._trait JsonCodec extends Json4sSupport {import org.json4s.DefaultFormatsimport org.json4s.ext.JodaTimeSerializersimplicit val serilizer = jackson.Serializationimplicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodecclass PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)(implicit sys: ActorSystem, mat: ActorMaterializer) {import sys.dispatcherprivate val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings)def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = {Source.single(req -> 1).via(cnnPool).runWith(Sink.head).flatMap {case (Success(resp), _) => Future.successful(resp)case (Failure(fail), _) => Future.failed(fail)}}def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {Source(reqs.zipWithIndex.toMap).via(cnnPool).runFold(SortedMap[Int, Future[HttpResponse]]()) {case (m, (Success(r), idx)) => m + (idx -> Future.successful(r))case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f))}.flatMap { m => Future.sequence(m.values) }}
}
class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)(qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew)(implicit sys: ActorSystem, mat: ActorMaterializer) {import sys.dispatcherprivate val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] =Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings)val queue =Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy).via(cnnPool).to(Sink.foreach({case ((Success(resp), p)) => p.success(resp)case ((Failure(e), p))    => p.failure(e)})).run()def queueRequest(request: HttpRequest): Future[HttpResponse] = {val responsePromise = Promise[HttpResponse]()queue.offer(request -> responsePromise).flatMap {case QueueOfferResult.Enqueued    => responsePromise.futurecase QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))case QueueOfferResult.Failure(ex) => Future.failed(ex)case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))}}
}
object ClientRequesting extends App {import JsConverters._implicit val sys = ActorSystem("sysClient")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherimplicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)case class Item(id: Int, name: String, price: Double)def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = {futResp.andThen {case Success(HttpResponse(StatusCodes.OK, _, entity, _)) =>Unmarshal(entity).to[T].onComplete {case Success(t) => println(s"Got response entity: ${t}")case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")}case Success(_) => println("Exception in response!")case Failure(err) => println(s"Response Failed: ${err.getMessage}")}}(for {response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message"))message <- Unmarshal(response.entity).to[String]} yield message).andThen {case Success(msg) => println(s"Received message: $msg")case Failure(err) => println(s"Error: ${err.getMessage}")}  //.andThen {case _ => sys.terminate()}(for {entity <- Marshal("Wata hell you doing?").to[RequestEntity]response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity))message <- Unmarshal(response.entity).to[String]} yield message).andThen {case Success(msg) => println(s"Received message: $msg")case Failure(err) => println(s"Error: ${err.getMessage}")} //.andThen {case _ => sys.terminate()}def getItem(itemId: Int): Future[HttpResponse] = for {response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))} yield responseextractEntity[Item](getItem(13))def putItem(item: Item): Future[HttpResponse] =for {reqEntity <- Marshal(item).to[RequestEntity]response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))} yield responseextractEntity[Item](putItem(Item(23,"Item#23", 46.0))).andThen { case _ => sys.terminate()}val settings = ConnectionPoolSettings(sys).withMaxConnections(8).withMaxOpenRequests(8).withMaxRetries(3).withPipeliningLimit(4)val pooledClient = new PooledClient("localhost",8011,settings)def getItemByPool(itemId: Int): Future[HttpResponse] = for {response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))} yield responseextractEntity[Item](getItemByPool(13))def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = {val reqs = itemIds.map { id =>HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id")}val rets = (for {responses <- pooledClient.orderedResponses(reqs)} yield responses)rets}val futResps = getItemsByPool(List(3,5,7))futResps.andThen {case Success(listOfResps) => {listOfResps.foreach { r =>r match {case HttpResponse(StatusCodes.OK, _, entity, _) =>Unmarshal(entity).to[Item].onComplete {case Success(t) => println(s"Got response entity: ${t}")case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")}case _ => println("Exception in response!")}}}case _ => println("Failed to get list of responses!")}val queuedClient = new QueuedRequestsClient("localhost",8011,settings)()def putItemByQueue(item: Item): Future[HttpResponse] =for {reqEntity <- Marshal(item).to[RequestEntity]response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))} yield responseextractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0))).andThen { case _ => sys.terminate()}}






这篇关于Akka(37): Http:客户端操作模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887205

相关文章

Python调用Orator ORM进行数据库操作

《Python调用OratorORM进行数据库操作》OratorORM是一个功能丰富且灵活的PythonORM库,旨在简化数据库操作,它支持多种数据库并提供了简洁且直观的API,下面我们就... 目录Orator ORM 主要特点安装使用示例总结Orator ORM 是一个功能丰富且灵活的 python O

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Java实现状态模式的示例代码

《Java实现状态模式的示例代码》状态模式是一种行为型设计模式,允许对象根据其内部状态改变行为,本文主要介绍了Java实现状态模式的示例代码,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来... 目录一、简介1、定义2、状态模式的结构二、Java实现案例1、电灯开关状态案例2、番茄工作法状态案例

C++实现封装的顺序表的操作与实践

《C++实现封装的顺序表的操作与实践》在程序设计中,顺序表是一种常见的线性数据结构,通常用于存储具有固定顺序的元素,与链表不同,顺序表中的元素是连续存储的,因此访问速度较快,但插入和删除操作的效率可能... 目录一、顺序表的基本概念二、顺序表类的设计1. 顺序表类的成员变量2. 构造函数和析构函数三、顺序表

使用C++实现单链表的操作与实践

《使用C++实现单链表的操作与实践》在程序设计中,链表是一种常见的数据结构,特别是在动态数据管理、频繁插入和删除元素的场景中,链表相比于数组,具有更高的灵活性和高效性,尤其是在需要频繁修改数据结构的应... 目录一、单链表的基本概念二、单链表类的设计1. 节点的定义2. 链表的类定义三、单链表的操作实现四、

Python利用自带模块实现屏幕像素高效操作

《Python利用自带模块实现屏幕像素高效操作》这篇文章主要为大家详细介绍了Python如何利用自带模块实现屏幕像素高效操作,文中的示例代码讲解详,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1、获取屏幕放缩比例2、获取屏幕指定坐标处像素颜色3、一个简单的使用案例4、总结1、获取屏幕放缩比例from

使用Java实现获取客户端IP地址

《使用Java实现获取客户端IP地址》这篇文章主要为大家详细介绍了如何使用Java实现获取客户端IP地址,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 首先是获取 IP,直接上代码import org.springframework.web.context.request.Requ

通过prometheus监控Tomcat运行状态的操作流程

《通过prometheus监控Tomcat运行状态的操作流程》文章介绍了如何安装和配置Tomcat,并使用Prometheus和TomcatExporter来监控Tomcat的运行状态,文章详细讲解了... 目录Tomcat安装配置以及prometheus监控Tomcat一. 安装并配置tomcat1、安装