Akka(36): Http:Client-side-Api,Client-Connections

2024-04-09 04:48

本文主要是介绍Akka(36): Http:Client-side-Api,Client-Connections,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   Akka-http的客户端Api应该是以HttpRequest操作为主轴的网上消息交换模式编程工具。我们知道:Akka-http是搭建在Akka-stream之上的。所以,Akka-http在客户端构建与服务器的连接通道也可以用Akka-stream的Flow来表示。这个Flow可以通过调用Http.outgoingConnection来获取:

  /*** Creates a [[akka.stream.scaladsl.Flow]] representing a prospective HTTP client connection to the given endpoint.* Every materialization of the produced flow will attempt to establish a new outgoing connection.** To configure additional settings for requests made using this method,* use the `akka.http.client` config section or pass in a [[akka.http.scaladsl.settings.ClientConnectionSettings]] explicitly.*/def outgoingConnection(host: String, port: Int = 80,localAddress: Option[InetSocketAddress] = None,settings:     ClientConnectionSettings  = ClientConnectionSettings(system),log:          LoggingAdapter            = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =_outgoingConnection(host, port, settings.withLocalAddressOverride(localAddress), ConnectionContext.noEncryption(), ClientTransport.TCP, log)

我们看到:这个函数实现了对Server端地址host+port的设定,返回的结果类型是Flow[HttpRequest,HttpResponse,Future[OutgoingConnection]]。这个Flow代表将输入的HttpRequest转换成输出的HttpResponse。这个转换过程包括了与Server之间的Request,Response消息交换。下面我们试着用这个Flow来向Server端发送request,并获取response:

  val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] =Http().outgoingConnection("akka.io")def sendHttpRequest(req: HttpRequest) = {Source.single(req).via(connFlow).runWith(Sink.head)}sendHttpRequest(HttpRequest(uri="/")).andThen{case Success(resp) => println(s"got response: ${resp.status.intValue()}")case Failure(err) => println(s"request failed: ${err.getMessage}")}.andThen {case _ => sys.terminate()}

以上用法只能处理零星短小的requests,这是因为虽然connFlow是一次性实例化,但每次调用runWith都会构建新的connection,而实例化和构建新connection会拖慢系统运行速度,不适用于像streaming这样大量消息的相互传递。

上面的这种模式就是所谓Connection-Level-Client-Side-Api。这种模式可以让用户有更大程度的自由度控制connection的构建、使用及在connection上发送request的方式。一般来讲,当返回response的entity被完全消耗后系统会自动close connection,这套api还提供了一些手动方法可以在有需要的情况下手动进行connection close,如下:

 //close connection by cancelling response entityresp.entity.dataBytes.runWith(Sink.cancelled)//close connection by receiving response with close headerHttp().bindAndHandleSync({ req ⇒ HttpResponse(headers = headers.Connection("close") :: Nil) },"akka.io",80)(mat)

Akka-http客户端api还有一种实用的Host-Level-Client-Side-Api模式。这套api能自动针对每个端点维护一个连接池(connection-pool),用户只需对连接池进行配置。系统按照连接池配置自动维护池内线程的生、死、动、停。akka-http.host-connection-pool配置中max-connections,max-open-requests,pipelining-limit等控制着connection、在途request的数量,需要特别注意。针对某个端点的连接池是通过Http().cachedHostConnectionPool(endPoint)获取的。同样,获取的也是一个client-flow实例。因为系统自动维护着线程池,所以client-flow实例可以任意引用,无论调用次数与调用时间间隔。cachedHostConnectionPool()函数定义如下:

  /*** Same as [[#cachedHostConnectionPool]] but for encrypted (HTTPS) connections.** If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used* for encryption on the connections.** To configure additional settings for the pool (and requests made using it),* use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.*/def cachedHostConnectionPoolHttps[T](host: String, port: Int = 443,connectionContext: HttpsConnectionContext = defaultClientHttpsContext,settings:          ConnectionPoolSettings = defaultConnectionPoolSettings,log:               LoggingAdapter         = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {val cps = ConnectionPoolSetup(settings, connectionContext, log)val setup = HostConnectionPoolSetup(host, port, cps)cachedHostConnectionPool(setup)}

函数返回结果类型:Flow[(HttpRequest,T),(Try[HttpResponse],T),HostConnectionPool]。因为线程池内的线程是异步构建request和接收response的,而返回response的顺序未必按照发送request的顺序,所以需要一个tuple2的T类型标示request与返回的response进行匹配。线程池会根据idle-timeout自动终止,也可以手动通过HostConnectionPool.shutDown()实现:

  /*** Represents a connection pool to a specific target host and pool configuration.*/final case class HostConnectionPool private[http] (setup: HostConnectionPoolSetup)(private[http] val gateway: PoolGateway) { // enable test access/*** Asynchronously triggers the shutdown of the host connection pool.** The produced [[scala.concurrent.Future]] is fulfilled when the shutdown has been completed.*/def shutdown()(implicit ec: ExecutionContextExecutor): Future[Done] = gateway.shutdown()private[http] def toJava = new akka.http.javadsl.HostConnectionPool {override def setup = HostConnectionPool.this.setupoverride def shutdown(executor: ExecutionContextExecutor): CompletionStage[Done] = HostConnectionPool.this.shutdown()(executor).toJava}}

也可以通过Http().shutdownAllConnectionPools()一次性终止ActorSystem内所有线程池:

  /*** Triggers an orderly shutdown of all host connections pools currently maintained by the [[akka.actor.ActorSystem]].* The returned future is completed when all pools that were live at the time of this method call* have completed their shutdown process.** If existing pool client flows are re-used or new ones materialized concurrently with or after this* method call the respective connection pools will be restarted and not contribute to the returned future.*/def shutdownAllConnectionPools(): Future[Unit] = {val shutdownCompletedPromise = Promise[Done]()poolMasterActorRef ! ShutdownAll(shutdownCompletedPromise)shutdownCompletedPromise.future.map(_ ⇒ ())(system.dispatcher)}

我们用cachedHostConnectionPool获取一个client-flow实例:

Flow[(HttpRequest,T),(Try[HttpResponse],T),HostConnectionPool]后就可以进行输入HttpRequest到HttpResponse的转换处理。如下面的例子:

  val pooledFlow: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] =Http().cachedHostConnectionPool[Int](host="akka.io",port=80)def sendPoolRequest(req: HttpRequest, marker: Int) = {Source.single(req -> marker).via(pooledFlow).runWith(Sink.head)}sendPoolRequest(HttpRequest(uri="/"), 1).andThen{case Success((tryResp, mk)) =>tryResp match {case Success(resp) => println(s"got response: ${resp.status.intValue()}")case Failure(err) => println(s"request failed: ${err.getMessage}")}case Failure(err) => println(s"request failed: ${err.getMessage}")}.andThen {case _ => sys.terminate()}

在以上这个例子里实际同样会遇到Connection-Level-Api所遇的的问题,这是因为获取的线程池内的线程还是有限的,只能缓解因为request速率超出response速率所造成的request积压。目前最有效的方法还是通过使用一个queue来暂存request后再逐个处理:

    val QueueSize = 10// This idea came initially from this blog post:// http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.htmlval poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")val queue =Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew).via(poolClientFlow).toMat(Sink.foreach({case ((Success(resp), p)) => p.success(resp)case ((Failure(e), p))    => p.failure(e)}))(Keep.left).run()def queueRequest(request: HttpRequest): Future[HttpResponse] = {val responsePromise = Promise[HttpResponse]()queue.offer(request -> responsePromise).flatMap {case QueueOfferResult.Enqueued    => responsePromise.futurecase QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))case QueueOfferResult.Failure(ex) => Future.failed(ex)case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))}}val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))responseFuture.andThen {case Success(resp) => println(s"got response: ${resp.status.intValue()}")case Failure(err) => println(s"request failed: ${err.getMessage}")}.andThen {case _ => sys.terminate()}

下面是本次Akka-http-client-side-connection讨论的示范源代码:

import akka.actor._
import akka.http.javadsl.{HostConnectionPool, OutgoingConnection}
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._import scala.concurrent._
import scala.util._object ClientApiDemo extends App {implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcher
/*val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] =Http().outgoingConnection("www.sina.com")def sendHttpRequest(req: HttpRequest) = {Source.single(req).via(connFlow).runWith(Sink.head)}sendHttpRequest(HttpRequest(uri="/")).andThen{case Success(resp) =>//close connection by cancelling response entityresp.entity.dataBytes.runWith(Sink.cancelled)println(s"got response: ${resp.status.intValue()}")case Failure(err) => println(s"request failed: ${err.getMessage}")}//   .andThen {case _ => sys.terminate()}//close connection by receiving response with close headerHttp().bindAndHandleSync({ req ⇒ HttpResponse(headers = headers.Connection("close") :: Nil) },"akka.io",80)(mat)val pooledFlow: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] =Http().cachedHostConnectionPool[Int](host="akka.io",port=80)def sendPoolRequest(req: HttpRequest, marker: Int) = {Source.single(req -> marker).via(pooledFlow).runWith(Sink.head)}sendPoolRequest(HttpRequest(uri="/"), 1).andThen{case Success((tryResp, mk)) =>tryResp match {case Success(resp) => println(s"got response: ${resp.status.intValue()}")case Failure(err) => println(s"request failed: ${err.getMessage}")}case Failure(err) => println(s"request failed: ${err.getMessage}")}.andThen {case _ => sys.terminate()}
*/val QueueSize = 10// This idea came initially from this blog post:// http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.htmlval poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")val queue =Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew).via(poolClientFlow).toMat(Sink.foreach({case ((Success(resp), p)) => p.success(resp)case ((Failure(e), p))    => p.failure(e)}))(Keep.left).run()def queueRequest(request: HttpRequest): Future[HttpResponse] = {val responsePromise = Promise[HttpResponse]()queue.offer(request -> responsePromise).flatMap {case QueueOfferResult.Enqueued    => responsePromise.futurecase QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))case QueueOfferResult.Failure(ex) => Future.failed(ex)case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))}}val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))responseFuture.andThen {case Success(resp) => println(s"got response: ${resp.status.intValue()}")case Failure(err) => println(s"request failed: ${err.getMessage}")}.andThen {case _ => sys.terminate()}}






这篇关于Akka(36): Http:Client-side-Api,Client-Connections的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887204

相关文章

BUUCTF靶场[web][极客大挑战 2019]Http、[HCTF 2018]admin

目录   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 [web][HCTF 2018]admin 考点:弱密码字典爆破 四种方法:   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 访问环境 老规矩,我们先查看源代码

【Linux】应用层http协议

一、HTTP协议 1.1 简要介绍一下HTTP        我们在网络的应用层中可以自己定义协议,但是,已经有大佬定义了一些现成的,非常好用的应用层协议,供我们直接使用,HTTP(超文本传输协议)就是其中之一。        在互联网世界中,HTTP(超文本传输协议)是一个至关重要的协议,他定义了客户端(如浏览器)与服务器之间如何进行通信,以交换或者传输超文本(比如HTML文档)。

如何确定 Go 语言中 HTTP 连接池的最佳参数?

确定 Go 语言中 HTTP 连接池的最佳参数可以通过以下几种方式: 一、分析应用场景和需求 并发请求量: 确定应用程序在特定时间段内可能同时发起的 HTTP 请求数量。如果并发请求量很高,需要设置较大的连接池参数以满足需求。例如,对于一个高并发的 Web 服务,可能同时有数百个请求在处理,此时需要较大的连接池大小。可以通过压力测试工具模拟高并发场景,观察系统在不同并发请求下的性能表现,从而

【LabVIEW学习篇 - 21】:DLL与API的调用

文章目录 DLL与API调用DLLAPIDLL的调用 DLL与API调用 LabVIEW虽然已经足够强大,但不同的语言在不同领域都有着自己的优势,为了强强联合,LabVIEW提供了强大的外部程序接口能力,包括DLL、CIN(C语言接口)、ActiveX、.NET、MATLAB等等。通过DLL可以使用户很方便地调用C、C++、C#、VB等编程语言写的程序以及windows自带的大

Anaconda 中遇到CondaHTTPError: HTTP 404 NOT FOUND for url的问题及解决办法

最近在跑一个开源项目遇到了以下问题,查了很多资料都大(抄)同(来)小(抄)异(去)的,解决不了根本问题,费了很大的劲终于得以解决,记录如下: 1、问题及过程: (myenv) D:\Workspace\python\XXXXX>conda install python=3.6.13 Solving environment: done.....Proceed ([y]/n)? yDownloa

如何更优雅地对接第三方API

如何更优雅地对接第三方API 本文所有示例完整代码地址:https://github.com/yu-linfeng/BlogRepositories/tree/master/repositories/third 我们在日常开发过程中,有不少场景会对接第三方的API,例如第三方账号登录,第三方服务等等。第三方服务会提供API或者SDK,我依稀记得早些年Maven还没那么广泛使用,通常要对接第三方

Java基础回顾系列-第五天-高级编程之API类库

Java基础回顾系列-第五天-高级编程之API类库 Java基础类库StringBufferStringBuilderStringCharSequence接口AutoCloseable接口RuntimeSystemCleaner对象克隆 数字操作类Math数学计算类Random随机数生成类BigInteger/BigDecimal大数字操作类 日期操作类DateSimpleDateForma

构建高性能WEB之HTTP首部优化

0x00 前言 在讨论浏览器优化之前,首先我们先分析下从客户端发起一个HTTP请求到用户接收到响应之间,都发生了什么?知己知彼,才能百战不殆。这也是作为一个WEB开发者,为什么一定要深入学习TCP/IP等网络知识。 0x01 到底发生什么了? 当用户发起一个HTTP请求时,首先客户端将与服务端之间建立TCP连接,成功建立连接后,服务端将对请求进行处理,并对客户端做出响应,响应内容一般包括响应

Restful API 原理以及实现

先说说API 再说啥是RESRFUL API之前,咱先说说啥是API吧。API大家应该都知道吧,简称接口嘛。随着现在移动互联网的火爆,手机软件,也就是APP几乎快爆棚了。几乎任何一个网站或者应用都会出一款iOS或者Android APP,相比网页版的体验,APP确实各方面性能要好很多。 那么现在问题来了。比如QQ空间网站,如果我想获取一个用户发的说说列表。 QQ空间网站里面需要这个功能。

京东物流查询|开发者调用API接口实现

快递聚合查询的优势 1、高效整合多种快递信息。2、实时动态更新。3、自动化管理流程。 聚合国内外1500家快递公司的物流信息查询服务,使用API接口查询京东物流的便捷步骤,首先选择专业的数据平台的快递API接口:物流快递查询API接口-单号查询API - 探数数据 以下示例是参考的示例代码: import requestsurl = "http://api.tanshuapi.com/a