Akka(35): Http:Server side streaming

2024-04-09 04:48
文章标签 http server 35 akka streaming side

本文主要是介绍Akka(35): Http:Server side streaming,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   在前面几篇讨论里我们都提到过:Akka-http是一项系统集成工具库。它是以数据交换的形式进行系统集成的。所以,Akka-http的核心功能应该是数据交换的实现了:应该能通过某种公开的数据格式和传输标准比较方便的实现包括异类系统之间通过网上进行的数据交换。覆盖包括:数据编码、发送和数据接收、解析全过程。Akka-http提供了许多网上传输标准数据的概括模型以及数据类型转换方法,可以使编程人员很方便的构建网上往来的Request和Response。但是,现实中的数据交换远远不止针对request和response操作能够满足的。系统之间数据交换经常涉及文件或者数据库表类型的数据上传下载。虽然在Http标准中描述了如何通过MultiPart消息类型进行批量数据的传输,但是这个标准涉及的实现细节包括数据内容描述、数据分段方式、消息数据长度计算等等简直可以立即令人却步。Akka-http是基于Akka-stream开发的:不但它的工作流程可以用Akka-stream来表达,它还支持stream化的数据传输。我们知道:Akka-stream提供了功能强大的FileIO和Data-Streaming,可以用Stream-Source代表文件或数据库数据源。简单来说:Akka-http的消息数据内容HttpEntity可以支持理论上无限长度的data-stream。最可贵的是:这个Source是个Reactive-Stream-Source,具备了back-pressure机制,可以有效应付数据交换参与两方Reactive端点不同的数据传输速率。

  Akka-http的stream类型数据内容是以Source[T,_]类型表示的。首先,Akka-stream通过FileIO对象提供了足够多的file-io操作函数,其中有个fromPath函数可以用某个文件内容数据构建一个Source类型:

/*** Creates a Source from a files contents.* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,* except the final element, which will be up to `chunkSize` in size.** You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or* set it for a given Source by using [[akka.stream.ActorAttributes]].** It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,* and a possible exception if IO operation was not completed successfully.** @param f         the file path to read from* @param chunkSize the size of each read operation, defaults to 8192*/def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =fromPath(f, chunkSize, startPosition = 0)

这个函数构建了Source[ByteString,Future[IOResult]],我们需要把ByteString转化成MessageEntity。首先需要在implicit-scope内提供Marshaller[ByteString,MessageEntity]类型的隐式实例:

trait JsonCodec extends Json4sSupport {import org.json4s.DefaultFormatsimport org.json4s.ext.JodaTimeSerializersimplicit val serilizer = jackson.Serializationimplicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodecobject ServerStreaming extends App {import JsConverters._
...

我们还需要Json-Streaming支持:

  implicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)

FileIO是blocking操作,我们还可以选用独立的线程供blocking操作使用:

   FileIO.fromPath(file, 256).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))

现在我们可以从在server上用一个文件构建Source然后再转成Response:

  val route =get {path("files"/Remaining) { name =>complete(loadFile(name))} }def loadFile(path: String) = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get("/Users/tiger/"+path)FileIO.fromPath(file, 256).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher")).map(_.utf8String)}

同样,我们也可以把数据库表内数据转成Akka-Stream-Source,然后再实现到MessageEntity的转换。转换过程包括用Query读取数据库表内数据后转成Reactive-Publisher,然后把publisher转成Akka-Stream-Source,如下:

object SlickDAO {import slick.jdbc.H2Profile.api._val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")val db = dbConfig.dbcase class CountyModel(id: Int, name: String)case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)def name = column[String]("NAME",O.Length(64))def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)}val CountyQuery = TableQuery[CountyTable]def loadTable(filter: String) = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}val publisher = db.stream(qry.result)Source.fromPublisher(publisher = publisher).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}
}

然后进行到MessageEntity的转换:

  val route =get {path("files"/Remaining) { name =>complete(loadFile(name))} ~path("tables"/Segment) { t =>complete(SlickDAO.loadTable(t))}}

下面是本次示范的完整源代码:

import java.nio.file._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.common._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jacksonobject SlickDAO {import slick.jdbc.H2Profile.api._val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")val db = dbConfig.dbcase class CountyModel(id: Int, name: String)case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)def name = column[String]("NAME",O.Length(64))def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)}val CountyQuery = TableQuery[CountyTable]def loadTable(filter: String) = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}val publisher = db.stream(qry.result)Source.fromPublisher(publisher = publisher).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}
}trait JsonCodec extends Json4sSupport {import org.json4s.DefaultFormatsimport org.json4s.ext.JodaTimeSerializersimplicit val serilizer = jackson.Serializationimplicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodecobject ServerStreaming extends App {import JsConverters._implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherimplicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)val (port, host) = (8011,"localhost")val route =get {path("files"/Remaining) { name =>complete(loadFile(name))} ~path("tables"/Segment) { t =>complete(SlickDAO.loadTable(t))}}def loadFile(path: String) = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get("/Users/tiger/"+path)FileIO.fromPath(file, 256).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher")).map(_.utf8String)}val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

resource/application.conf

akka {http {blocking-ops-dispatcher {type = Dispatcherexecutor = "thread-pool-executor"thread-pool-executor {// or in Akka 2.4.2+fixed-pool-size = 16}throughput = 100}}
}
slick {h2 {driver = "slick.driver.H2Driver$"db {url = "jdbc:h2:~/slickdemo;mv_store=false"driver = "org.h2.Driver"connectionPool = HikariCPnumThreads = 48maxConnections = 48minConnections = 12keepAliveConnection = true}}
}






这篇关于Akka(35): Http:Server side streaming的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887203

相关文章

BUUCTF靶场[web][极客大挑战 2019]Http、[HCTF 2018]admin

目录   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 [web][HCTF 2018]admin 考点:弱密码字典爆破 四种方法:   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 访问环境 老规矩,我们先查看源代码

【Linux】应用层http协议

一、HTTP协议 1.1 简要介绍一下HTTP        我们在网络的应用层中可以自己定义协议,但是,已经有大佬定义了一些现成的,非常好用的应用层协议,供我们直接使用,HTTP(超文本传输协议)就是其中之一。        在互联网世界中,HTTP(超文本传输协议)是一个至关重要的协议,他定义了客户端(如浏览器)与服务器之间如何进行通信,以交换或者传输超文本(比如HTML文档)。

如何确定 Go 语言中 HTTP 连接池的最佳参数?

确定 Go 语言中 HTTP 连接池的最佳参数可以通过以下几种方式: 一、分析应用场景和需求 并发请求量: 确定应用程序在特定时间段内可能同时发起的 HTTP 请求数量。如果并发请求量很高,需要设置较大的连接池参数以满足需求。例如,对于一个高并发的 Web 服务,可能同时有数百个请求在处理,此时需要较大的连接池大小。可以通过压力测试工具模拟高并发场景,观察系统在不同并发请求下的性能表现,从而

Anaconda 中遇到CondaHTTPError: HTTP 404 NOT FOUND for url的问题及解决办法

最近在跑一个开源项目遇到了以下问题,查了很多资料都大(抄)同(来)小(抄)异(去)的,解决不了根本问题,费了很大的劲终于得以解决,记录如下: 1、问题及过程: (myenv) D:\Workspace\python\XXXXX>conda install python=3.6.13 Solving environment: done.....Proceed ([y]/n)? yDownloa

red5-server源码

red5-server源码:https://github.com/Red5/red5-server

构建高性能WEB之HTTP首部优化

0x00 前言 在讨论浏览器优化之前,首先我们先分析下从客户端发起一个HTTP请求到用户接收到响应之间,都发生了什么?知己知彼,才能百战不殆。这也是作为一个WEB开发者,为什么一定要深入学习TCP/IP等网络知识。 0x01 到底发生什么了? 当用户发起一个HTTP请求时,首先客户端将与服务端之间建立TCP连接,成功建立连接后,服务端将对请求进行处理,并对客户端做出响应,响应内容一般包括响应

『功能项目』战士的平A特效【35】

我们打开上一篇34武器的切换实例的项目, 本章要做的事情是在战士的每次按A键时在指定位置生成一个平A特效 首先将之前下载的技能拖拽至场景中 完全解压缩后重命名为AEffect 拖拽至预制体文件夹 进入主角动画的战士动画层级 双击第一次攻击 选择Animation 创建事件 创建的动画事件帧放在攻击动画挥剑指定处 命名为PerpetualAtt

Golang支持平滑升级的HTTP服务

前段时间用Golang在做一个HTTP的接口,因编译型语言的特性,修改了代码需要重新编译可执行文件,关闭正在运行的老程序,并启动新程序。对于访问量较大的面向用户的产品,关闭、重启的过程中势必会出现无法访问的情况,从而影响用户体验。 使用Golang的系统包开发HTTP服务,是无法支持平滑升级(优雅重启)的,本文将探讨如何解决该问题。 一、平滑升级(优雅重启)的一般思路 一般情况下,要实现平滑

Java http请求示例

使用HttpURLConnection public static String httpGet(String host) {HttpURLConnection connection = null;try {URL url = new URL(host);connection = (HttpURLConnection) url.openConnection();connection.setReq

3.比 HTTP 更安全的 HTTPS(工作原理理解、非对称加密理解、证书理解)

所谓的协议 协议只是一种规则,你不按规则来就无法和目标方进行你的工作 协议说白了只是人定的规则,任何人都可以定协议 我们不需要太了解细节,这些制定和完善协议的人去做的,我们只需要知道协议的一个大概 HTTPS 协议 1、概述 HTTPS(Hypertext Transfer Protocol Secure)是一种安全的超文本传输协议,主要用于在客户端和服务器之间安全地传输数据