Akka(38): Http:Entityof ByteString-数据传输基础

2024-04-09 04:48

本文主要是介绍Akka(38): Http:Entityof ByteString-数据传输基础,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  我们说过Akka-http是一个好的系统集成工具,集成是通过数据交换方式实现的。Http是个在网上传输和接收的规范协议。所以,在使用Akka-http之前,可能我们还是需要把Http模式的网上数据交换细节了解清楚。数据交换双方是通过Http消息类型Request和Response来实现的。在Akka-http中对应的是HttpRequest和HttpResponse。这两个类型都具备HttpEntity类型来装载需要交换的数据。首先,无论如何数据在线上的表现形式肯定是一串bytes。所以,数据交换两头Request,Response中的Entity也必须是以bytes来表达的。在Akka-http里我们把需要传输的数据转换成ByteString,通过网络发送給接收端、接收端再把收到消息Entity中的ByteString转换成目标类型的数据。这两个转换过程就是Akka-http的Marshalling和Unmarshalling过程了。我们先从HttpEntity的构建函数来了解它的定义:

object HttpEntity {implicit def apply(string: String): HttpEntity.Strict = apply(ContentTypes.`text/plain(UTF-8)`, string)implicit def apply(bytes: Array[Byte]): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, bytes)implicit def apply(data: ByteString): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, data)def apply(contentType: ContentType.NonBinary, string: String): HttpEntity.Strict =if (string.isEmpty) empty(contentType) else apply(contentType, ByteString(string.getBytes(contentType.charset.nioCharset)))def apply(contentType: ContentType, bytes: Array[Byte]): HttpEntity.Strict =if (bytes.length == 0) empty(contentType) else apply(contentType, ByteString(bytes))def apply(contentType: ContentType, data: ByteString): HttpEntity.Strict =if (data.isEmpty) empty(contentType) else HttpEntity.Strict(contentType, data)def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]): UniversalEntity =if (contentLength == 0) empty(contentType) else HttpEntity.Default(contentType, contentLength, data)def apply(contentType: ContentType, data: Source[ByteString, Any]): HttpEntity.Chunked =HttpEntity.Chunked.fromData(contentType, data)
...

很明显,HttpEntity可以分两大类,一种是Strict类型的,它的data是ByteString。另一种是UniversalEntity类型,它的数据dataBytes是Source[ByteString,Any]。无论如何最终在线上的还是ByteString。HttpEntity的ContentType注明了传输数据格式,有:

object ContentTypes {val `application/json` = ContentType(MediaTypes.`application/json`)val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`)val `text/plain(UTF-8)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-8`val `text/html(UTF-8)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-8`val `text/xml(UTF-8)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-8`val `text/csv(UTF-8)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-8`// used for explicitly suppressing the rendering of Content-Type headers on requests and responsesval NoContentType = ContentType(MediaTypes.NoMediaType)
}

注意:ContentType只是一种备注,不影响线上数据表达形式,线上的数据永远是ByteString。但是,其中的application/octet-stream类型代表数据必须是Source[ByteString,Any]。我们下面就通过客户端的例子来理解HttpEntity。下面是一个客户端测试函数:

  def runService(request: HttpRequest, rentity: RequestEntity) = {val futResp = for {entity <- Future.successful(rentity)resp <- Http(sys).singleRequest(request.copy(entity = rentity))} yield respfutResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download rows!")case Failure(err) => println(s"Download failed: ${err.getMessage}")}}

我们只需要对这个函数传入RequestEntity就可以了解返回Response里Entity的许多细节了。首先我们要求服务端发送一个纯字符串Hello World。服务端代码如下:

 } ~ path("text") {get {complete("Hello World!")} ~

虽然complete("Hello World!")有些迷糊,不过应该complete做了些字符串到ByteString的转换。我们可以从上面这个runService函数得到证实。下面是这个例子的调用:

  val reqText = HttpRequest(uri = s"http://localhost:8011/text")runService(reqText,HttpEntity.Empty).andThen{case _ => sys.terminate()}

从显示的结果可以得出runService函数中的entity.dataBytes.map(_.utf8String)已经把ByteString转换成了String,也就是说服务器端发送的Entity里的数据是ByteString。

我们再试着发送一些数据給服务端,然后让服务端把结果通过response entity返回来:

    } ~ path("text") {get {complete("Hello World!")} ~post {withoutSizeLimit {extractDataBytes { bytes =>val data = bytes.runFold(ByteString())(_ ++ _)onComplete(data) { t =>complete(t)}}}}

我们看到服务端对request entity的操作是以ByteString进行的。客户端上传一串字符的request如下:

  val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text")val uploadText = HttpEntity(ContentTypes.`text/plain(UTF-8)`,// transform each number to a chunk of bytesByteString("hello world again"))runService(postText,uploadText).andThen{case _ => sys.terminate()}

我们可以看到放进entity里的数据是ByteString。

我们知道Akka-http是基于Akka-Stream的,具备Reactive-Stream功能特性。下面我们就示范一下如何进行stream的上传下载。首先定制一个Source:

  val numbers = Source.fromIterator(() =>Iterator.continually(Random.nextInt())).map(n => ByteString(s"$n\n"))//make conform to withoutSizeLimit constrainval source = limitableByteSource(numbers)

服务端也是用HttpEntity来装载这个Source然后通过HttpRequest传给客户端的:

  path("random") {get {complete(HttpEntity(ContentTypes.`application/octet-stream`,// transform each number to a chunk of bytessource.take(10000)))} ~

我们在客户端还是用runService来解析传过来的entity。由于接收一个大型的Source,所以需要修改一下接收方式代码:

   futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println)Await.result(futEnt, Duration.Inf) // throws if binding failsprintln("End of stream!!!")case Success(r@HttpResponse(code, _, _, _)) =>println(s"Download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download rows!")case Failure(err) => println(s"Download failed: ${err.getMessage}")}

用下面的方式调用:

  val reqRandom = HttpRequest(uri = s"http://localhost:8011/random")runService(reqRandom,HttpEntity.Empty).andThen{case _ => sys.terminate()}

再示范一下在客户端用Source上传数据。服务端代码:

       post {withoutSizeLimit {extractDataBytes { bytes =>val data = bytes.runFold(ByteString())(_ ++ _)onComplete(data) { t =>complete(t)}}}}

客户端上传数据范例:

 val numbers = Source.fromIterator(() =>Iterator.continually(Random.nextInt())).map(n => ByteString(s"$n\n"))//make conform to withoutSizeLimit constrainval source = limitableByteSource(numbers)val bytes = HttpEntity(ContentTypes.`application/octet-stream`,// transform each number to a chunk of bytessource.take(10000))val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random")runService(postRandom,bytes).andThen{case _ => sys.terminate()}

从上面讨论我们了解了在Marshal,Unmarshal下层只是ByteString的操作和转换。下面是本次讨论示范源代码:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.util.ByteString
import akka.http.scaladsl.model.HttpEntity._import scala.util.Randomobject ServerEntity extends App {implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherval numbers = Source.fromIterator(() =>Iterator.continually(Random.nextInt())).map(n => ByteString(s"$n\n"))//make conform to withoutSizeLimit constrainval source = limitableByteSource(numbers)val route =path("random") {get {withoutSizeLimit {complete(HttpEntity(ContentTypes.`application/octet-stream`,// transform each number to a chunk of bytessource.take(1000)))}} ~post {withoutSizeLimit {extractDataBytes { bytes =>val data = bytes.runFold(ByteString())(_ ++ _)onComplete(data) { t =>complete(t)}}}}} ~ path("text") {get {complete("Hello World!")} ~post {withoutSizeLimit {extractDataBytes { bytes =>val data = bytes.runFold(ByteString())(_ ++ _)onComplete(data) { t =>complete(t)}}}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._import scala.concurrent.duration._
import akka.util.ByteStringimport scala.concurrent._
import scala.util._object ClientEntity extends App {implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherdef runService(request: HttpRequest, rentity: RequestEntity) = {val futResp = for {entity <- Future.successful(rentity)resp <- Http(sys).singleRequest(request.copy(entity = rentity))} yield respfutResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println)Await.result(futEnt, Duration.Inf) // throws if binding failsprintln("End of stream!!!")case Success(r@HttpResponse(code, _, _, _)) =>println(s"Download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download rows!")case Failure(err) => println(s"Download failed: ${err.getMessage}")}}val reqText = HttpRequest(uri = s"http://localhost:8011/text")
//  runService(reqText,HttpEntity.Empty)
//    .andThen{case _ => sys.terminate()}val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text")val uploadText = HttpEntity(ContentTypes.`text/plain(UTF-8)`,// transform each number to a chunk of bytesByteString("hello world again"))
//  runService(postText,uploadText)
//    .andThen{case _ => sys.terminate()}val reqRandom = HttpRequest(uri = s"http://localhost:8011/random")//   runService(reqRandom,HttpEntity.Empty)//    .andThen{case _ => sys.terminate()}val numbers = Source.fromIterator(() =>Iterator.continually(Random.nextInt())).map(n => ByteString(s"$n\n"))//make conform to withoutSizeLimit constrainval source = limitableByteSource(numbers)val bytes = HttpEntity(ContentTypes.`application/octet-stream`,// transform each number to a chunk of bytessource.take(10000))val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random")runService(postRandom,bytes).andThen{case _ => sys.terminate()}}
















这篇关于Akka(38): Http:Entityof ByteString-数据传输基础的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887206

相关文章

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

MySQL中my.ini文件的基础配置和优化配置方式

《MySQL中my.ini文件的基础配置和优化配置方式》文章讨论了数据库异步同步的优化思路,包括三个主要方面:幂等性、时序和延迟,作者还分享了MySQL配置文件的优化经验,并鼓励读者提供支持... 目录mysql my.ini文件的配置和优化配置优化思路MySQL配置文件优化总结MySQL my.ini文件

Node.js 中 http 模块的深度剖析与实战应用小结

《Node.js中http模块的深度剖析与实战应用小结》本文详细介绍了Node.js中的http模块,从创建HTTP服务器、处理请求与响应,到获取请求参数,每个环节都通过代码示例进行解析,旨在帮... 目录Node.js 中 http 模块的深度剖析与实战应用一、引言二、创建 HTTP 服务器:基石搭建(一

Python如何实现 HTTP echo 服务器

《Python如何实现HTTPecho服务器》本文介绍了如何使用Python实现一个简单的HTTPecho服务器,该服务器支持GET和POST请求,并返回JSON格式的响应,GET请求返回请求路... 一个用来做测试的简单的 HTTP echo 服务器。from http.server import HT

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

BUUCTF靶场[web][极客大挑战 2019]Http、[HCTF 2018]admin

目录   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 [web][HCTF 2018]admin 考点:弱密码字典爆破 四种方法:   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 访问环境 老规矩,我们先查看源代码

【Linux 从基础到进阶】Ansible自动化运维工具使用

Ansible自动化运维工具使用 Ansible 是一款开源的自动化运维工具,采用无代理架构(agentless),基于 SSH 连接进行管理,具有简单易用、灵活强大、可扩展性高等特点。它广泛用于服务器管理、应用部署、配置管理等任务。本文将介绍 Ansible 的安装、基本使用方法及一些实际运维场景中的应用,旨在帮助运维人员快速上手并熟练运用 Ansible。 1. Ansible的核心概念

AI基础 L9 Local Search II 局部搜索

Local Beam search 对于当前的所有k个状态,生成它们的所有可能后继状态。 检查生成的后继状态中是否有任何状态是解决方案。 如果所有后继状态都不是解决方案,则从所有后继状态中选择k个最佳状态。 当达到预设的迭代次数或满足某个终止条件时,算法停止。 — Choose k successors randomly, biased towards good ones — Close

【Linux】应用层http协议

一、HTTP协议 1.1 简要介绍一下HTTP        我们在网络的应用层中可以自己定义协议,但是,已经有大佬定义了一些现成的,非常好用的应用层协议,供我们直接使用,HTTP(超文本传输协议)就是其中之一。        在互联网世界中,HTTP(超文本传输协议)是一个至关重要的协议,他定义了客户端(如浏览器)与服务器之间如何进行通信,以交换或者传输超文本(比如HTML文档)。