Akka(40): Http:Marshalling reviewed - 传输数据序列化重温

2024-04-09 04:48

本文主要是介绍Akka(40): Http:Marshalling reviewed - 传输数据序列化重温,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   上篇我们讨论了Akka-http的文件交换。由于文件内容编码和传输线上数据表达型式皆为bytes,所以可以直接把文件内容存进HttpEntity中进行传递。那么对于在内存里自定义的高级数据类型则应该需要首先进行byte转换后才能放入HttpEntity中了。高级数据类型与byte之间的相互转换就是marshalling和unmarshalling过程了。这个我们在前几篇讨论里提及过,在本篇再重温加强印象。因为我们的主要目的是实现数据库表行的交换,所以应该把焦点放在 T <-> MessageEntity这样的转换上。在Akka-http中T->MessageEntity转换是通过Marshaller[T,MessageEntity]实现的,Marshaller类型定义如下:

sealed abstract class Marshaller[-A, +B] {def apply(value: A)(implicit ec: ExecutionContext): Future[List[Marshalling[B]]]
...
}
object Marshallerextends GenericMarshallerswith PredefinedToEntityMarshallerswith PredefinedToResponseMarshallerswith PredefinedToRequestMarshallers {/*** Creates a [[Marshaller]] from the given function.*/def apply[A, B](f: ExecutionContext ⇒ A ⇒ Future[List[Marshalling[B]]]): Marshaller[A, B] =new Marshaller[A, B] {def apply(value: A)(implicit ec: ExecutionContext) =try f(ec)(value)catch { case NonFatal(e) ⇒ FastFuture.failed(e) }}
...

这个类型包嵌了个类型转换函数:A => Future[List[Marshalling[B]]],最终目的是A=>B的转换。增加了一层Marshalling类型是为了更方便对B类型目标进行筛选、修改操作。我们看看类型Marshal的转换函数to[???]就明白了:

class Marshal[A](val value: A) {/*** Marshals `value` using the first available [[Marshalling]] for `A` and `B` provided by the given [[Marshaller]].* If the marshalling is flexible with regard to the used charset `UTF-8` is chosen.*/def to[B](implicit m: Marshaller[A, B], ec: ExecutionContext): Future[B] =m(value).fast.map {_.head match {case Marshalling.WithFixedContentType(_, marshal) ⇒ marshal()case Marshalling.WithOpenCharset(_, marshal)      ⇒ marshal(HttpCharsets.`UTF-8`)case Marshalling.Opaque(marshal)                  ⇒ marshal()}}

首先,在可视域内需要Marshaller[A,B]隐式实例存在,Marshalling提供筛选,最后Marshaller的包嵌函数marshal进行了具体的类型转换。Akka-http提供了基础数据类型到MessageEntity转换的隐式实例,如下:

trait PredefinedToEntityMarshallers extends MultipartMarshallers {implicit val ByteArrayMarshaller: ToEntityMarshaller[Array[Byte]] = byteArrayMarshaller(`application/octet-stream`)def byteArrayMarshaller(contentType: ContentType): ToEntityMarshaller[Array[Byte]] =Marshaller.withFixedContentType(contentType) { bytes ⇒ HttpEntity(contentType, bytes) }implicit val ByteStringMarshaller: ToEntityMarshaller[ByteString] = byteStringMarshaller(`application/octet-stream`)def byteStringMarshaller(contentType: ContentType): ToEntityMarshaller[ByteString] =Marshaller.withFixedContentType(contentType) { bytes ⇒ HttpEntity(contentType, bytes) }implicit val CharArrayMarshaller: ToEntityMarshaller[Array[Char]] = charArrayMarshaller(`text/plain`)def charArrayMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[Array[Char]] =Marshaller.withOpenCharset(mediaType) { (value, charset) ⇒ marshalCharArray(value, mediaType withCharset charset) }def charArrayMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[Array[Char]] =Marshaller.withFixedContentType(mediaType) { value ⇒ marshalCharArray(value, mediaType) }private def marshalCharArray(value: Array[Char], contentType: ContentType.NonBinary): HttpEntity.Strict =if (value.length > 0) {val charBuffer = CharBuffer.wrap(value)val byteBuffer = contentType.charset.nioCharset.encode(charBuffer)val array = new Array[Byte](byteBuffer.remaining())byteBuffer.get(array)HttpEntity(contentType, array)} else HttpEntity.Emptyimplicit val DoneMarshaller: ToEntityMarshaller[akka.Done] =Marshaller.withFixedContentType(`text/plain(UTF-8)`) { done ⇒HttpEntity(`text/plain(UTF-8)`, "")}implicit val StringMarshaller: ToEntityMarshaller[String] = stringMarshaller(`text/plain`)def stringMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[String] =Marshaller.withOpenCharset(mediaType) { (s, cs) ⇒ HttpEntity(mediaType withCharset cs, s) }def stringMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[String] =Marshaller.withFixedContentType(mediaType) { s ⇒ HttpEntity(mediaType, s) }implicit val FormDataMarshaller: ToEntityMarshaller[FormData] =Marshaller.withOpenCharset(`application/x-www-form-urlencoded`) { _ toEntity _ }implicit val MessageEntityMarshaller: ToEntityMarshaller[MessageEntity] =Marshaller strict { value ⇒ Marshalling.WithFixedContentType(value.contentType, () ⇒ value) }
}object PredefinedToEntityMarshallers extends PredefinedToEntityMarshallers

注意:上面的这些转换函数类型都是ToEntityMarshaller,这是一个类型别称,实际上就是Marshaller[T,MessageEntity]:

  type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]

从源代码上看这些Marshaller的隐式实例都提供了转换函数 T=>HttpEntity。这样就可以在实际类型转换时只要能找到对应Marshaller的隐式实例就可以调用它的转换函数进行转换操作了。

现在,只要通过import把这些隐式实例导入可视域内就可以这样调用Marshal了:

import akka.http.scaladsl.marshalling.Marshal

  val aChars = Array[Char]('h','e','l','l','o')val aBytes = Array[Byte](0,1,2,3)val strHello = Marshal("Hello").to[MessageEntity]val chHello = Marshal(aChars).to[MessageEntity]val bt0123 = Marshal(aBytes).to[MessageEntity]

那么对于结构复杂的自定义类型又如何呢?如下:

  case class Person(id: Int, name: String)val john = Person(12,"John")val futP = Marshal(john).to[MessageEntity]

这个futP无法通过编译,报错如下:

Error:(17, 30) could not find implicit value for parameter m: akka.http.scaladsl.marshalling.Marshaller[MarshalDemo.Person,akka.http.scaladsl.model.MessageEntity]val futP = Marshal(john).to[MessageEntity]

这是因为编译器compiler无法找到Marshaller[Person,MessageEntity]这个类型的隐式实例。现在我只为Person自定义一个Marshaller隐式实例:

  implicit val PersonMarshaller: ToEntityMarshaller[Person] = personMarshaller(`text/plain`)def personMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[Person] =Marshaller.withOpenCharset(mediaType) { (p, ps) ⇒ HttpEntity(mediaType withCharset ps, ByteString(p.toString)) }def personMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[Person] =Marshaller.withFixedContentType(mediaType) { p ⇒ HttpEntity(mediaType, ByteString(p.toString)) }

这个Marshaller代表的转换过程是:Person -> Person.String -> ByteString。中间多了层用String函数来描述Person类型。这只是我个人的转换方式,所以反向转换Unmarshalling也必须按照我的方式把String转回Person。实际上这种转换的开放标准之一就是Json,大家共同按照标准要求的表达形式进行转换操作就能达到同样的目的了。

Akka-http自带的Json解决方案用的是Spray-Json,下面我们就用Spray-Json来实现转换:

import akka.http.scaladsl.marshallers.sprayjson._
import spray.json._trait Formats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends Formats {case class Person(id: Int, name: String)implicit val userFormat = jsonFormat2(Person.apply)
}
...import Converters._val john = Person(12,"John")val futP = Marshal(john).to[MessageEntity]

现在的转换流程变成了:Person -> Json -> ByteString。Akka-http是通过RootJasonFormat[T]来提供转换隐式实例的:

/*** A special JsonFormat signaling that the format produces a legal JSON root object, i.e. either a JSON array* or a JSON object.*/
trait RootJsonFormat[T] extends JsonFormat[T] with RootJsonReader[T] with RootJsonWriter[T]RootJsonFormat[T]代表T类型实例的Json转换。RootJsonFormat[T]的继承父辈包括:
/*** Provides the JSON deserialization and serialization for type T.*/
trait JsonFormat[T] extends JsonReader[T] with JsonWriter[T]/*** A special JsonReader capable of reading a legal JSON root object, i.e. either a JSON array or a JSON object.*/
@implicitNotFound(msg = "Cannot find RootJsonReader or RootJsonFormat type class for ${T}")
trait RootJsonReader[T] extends JsonReader[T]/*** A special JsonWriter capable of writing a legal JSON root object, i.e. either a JSON array or a JSON object.*/
@implicitNotFound(msg = "Cannot find RootJsonWriter or RootJsonFormat type class for ${T}")
trait RootJsonWriter[T] extends JsonWriter[T]

在我们的例子里Person的Marshaller隐式实例是通过jsonFormat2函数获取的:

  def jsonFormat2[P1 :JF, P2 :JF, T <: Product :ClassManifest](construct: (P1, P2) => T): RootJsonFormat[T] = {val Array(p1, p2) = extractFieldNames(classManifest[T])jsonFormat(construct, p1, p2)}def jsonFormat[P1 :JF, P2 :JF, T <: Product](construct: (P1, P2) => T, fieldName1: String, fieldName2: String): RootJsonFormat[T] = new RootJsonFormat[T]{def write(p: T) = {val fields = new collection.mutable.ListBuffer[(String, JsValue)]fields.sizeHint(2 * 3)fields ++= productElement2Field[P1](fieldName1, p, 0)fields ++= productElement2Field[P2](fieldName2, p, 1)JsObject(fields: _*)}def read(value: JsValue) = {val p1V = fromField[P1](value, fieldName1)val p2V = fromField[P2](value, fieldName2)construct(p1V, p2V)}}

就是这个函数返回了RootJsonFormat[T]。可以看到,功能的具体实现在jsonFormat函数里,在这里实现了对json数据结构的读写。jsonFormat2是在ProductFormatsInstances trait里的,也就是ProductFormats:

trait ProductFormats extends ProductFormatsInstances {this: StandardFormats =>

我们上面例子里的Formats trait继承了DefaultJsonProtocal,这里面包括了所有json转换实例构建方法:

/*** Provides all the predefined JsonFormats.*/
trait DefaultJsonProtocolextends BasicFormatswith StandardFormatswith CollectionFormatswith ProductFormatswith AdditionalFormatsobject DefaultJsonProtocol extends DefaultJsonProtocol

再看看RootJasonFormat及相关继承情况:

/*** A special JsonFormat signaling that the format produces a legal JSON root object, i.e. either a JSON array* or a JSON object.*/
trait RootJsonFormat[T] extends JsonFormat[T] with RootJsonReader[T] with RootJsonWriter[T]/*** Provides the JSON deserialization and serialization for type T.*/
trait JsonFormat[T] extends JsonReader[T] with JsonWriter[T]/*** A special JsonReader capable of reading a legal JSON root object, i.e. either a JSON array or a JSON object.*/
@implicitNotFound(msg = "Cannot find RootJsonReader or RootJsonFormat type class for ${T}")
trait RootJsonReader[T] extends JsonReader[T]/*** A special JsonWriter capable of writing a legal JSON root object, i.e. either a JSON array or a JSON object.*/
@implicitNotFound(msg = "Cannot find RootJsonWriter or RootJsonFormat type class for ${T}")
trait RootJsonWriter[T] extends JsonWriter[T]

下面是Spray-Json的具体实现:

package json {case class DeserializationException(msg: String, cause: Throwable = null, fieldNames: List[String] = Nil) extends RuntimeException(msg, cause)class SerializationException(msg: String) extends RuntimeException(msg)private[json] class PimpedAny[T](any: T) {def toJson(implicit writer: JsonWriter[T]): JsValue = writer.write(any)}private[json] class PimpedString(string: String) {@deprecated("deprecated in favor of parseJson", "1.2.6")def asJson: JsValue = parseJsondef parseJson: JsValue = JsonParser(string)}
}

toJson,asJason分别需要JsonWriter,JsonReader的隐式实例。

从上面的讨论中我们对任意结构类型的一个实例进行序列化转换有了一定了解。这个类型的实例可以被是作为数据库的一条记录,通过上面讨论的方式在服务端和客户端进行交换。这是因为SprayJsonSupport可以提供任意类T的Marshaller[T,MessageEntity]隐式实例。

因为我们的主要目的是实现数据库表多行的交换,所以必须要实现以表行为元素数据流的数据交换,也就是说最起码能要在可视域内提供Marshall[Source[T],_],MessageEnity]及Unmarshaller[MessageEntity,Source[T,_]]的隐式实例才行。在服务端我们尝试过用complete(Source[T,NotUsed])来完成HttpResponse的构建。但单独用Marshal(source).to[Source[T,NotUsed]]则编译出错。这是因为Akka-http提供的是ToResponseMarshaller[Source[T,M]]的隐式实例:

implicit def fromEntityStreamingSupportAndByteStringMarshaller[T, M](implicit s: EntityStreamingSupport, m: ToByteStringMarshaller[T]): ToResponseMarshaller[Source[T, M]] = {Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒FastFuture successful {Marshalling.WithFixedContentType(s.contentType, () ⇒ {val availableMarshallingsPerElement = source.mapAsync(1) { t ⇒ m(t)(ec) }// TODO optimise such that we pick the optimal marshalling only once (headAndTail needed?)// TODO, NOTE: this is somewhat duplicated from Marshal.scala it could be made DRYerval bestMarshallingPerElement = availableMarshallingsPerElement mapConcat { marshallings ⇒// pick the Marshalling that matches our EntityStreamingSupport(s.contentType match {case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset | _: ContentType.WithMissingCharset) ⇒marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal }case best @ ContentType.WithCharset(bestMT, bestCS) ⇒marshallings collectFirst {case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshalcase Marshalling.WithOpenCharset(`bestMT`, marshal)    ⇒ () ⇒ marshal(bestCS)}}).toList}val marshalledElements: Source[ByteString, M] =bestMarshallingPerElement.map(_.apply()) // marshal!.via(s.framingRenderer)HttpResponse(entity = HttpEntity(s.contentType, marshalledElements))}) :: Nil}}}

这个complete(m => ToResponseMarshallable)是个magnet-pattern函数,巧妙在ToResponseMarshallable参数类型:

/** Something that can later be marshalled into a response */
trait ToResponseMarshallable {type Tdef value: Timplicit def marshaller: ToResponseMarshaller[T]def apply(request: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] =Marshal(value).toResponseFor(request)
}object ToResponseMarshallable {implicit def apply[A](_value: A)(implicit _marshaller: ToResponseMarshaller[A]): ToResponseMarshallable =new ToResponseMarshallable {type T = Adef value: T = _valuedef marshaller: ToResponseMarshaller[T] = _marshaller}implicit val marshaller: ToResponseMarshaller[ToResponseMarshallable] =Marshaller { implicit ec ⇒ marshallable ⇒ marshallable.marshaller(marshallable.value) }
}

magnet-pattern我们就不多谈了。但它的伴生对象中包含了对任何类型ToResponseMarshallable的隐式实例,所以complete能够通过编译。SprayJsonSupport中倒是提供了Unmarshaller[MessageEntity,T]的隐式实例:

  // support for as[Source[T, NotUsed]]implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] =Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒if (support.supported.matches(e.contentType)) {val frames = e.dataBytes.via(support.framingDecoder)val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_)val unmarshallingFlow =if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal)else Flow[ByteString].mapAsync(support.parallelism)(unmarshal)val elements = frames.viaMat(unmarshallingFlow)(Keep.right)FastFuture.successful(elements)} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))}

看来如果需要实现stream的双向交换,我们还必须提供Marshaller[Source[T,NotUsed],MessageEntity]以及Unmarshaller[MessageEntity,Source[T,NotUsed]]才行。篇幅所限,具体实现方法移到下篇讨论。

下面是本次讨论的示范源代码:

import akka.actor._
import akka.stream.scaladsl._
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.MediaTypes._
import akka.util.ByteString
import akka.http.scaladsl.marshallers.sprayjson._
import spray.json._trait Formats extends SprayJsonSupport with DefaultJsonProtocol
object sprayConverters extends Formats {case class Person(id: Int, name: String)implicit val userFormat = jsonFormat2(Person.apply)
}object MarshalDemo extends App {import sprayConverters._implicit val sys = ActorSystem("marshaller")implicit val ec = sys.dispatcherval aChars = Array[Char]('h','e','l','l','o')val aBytes = Array[Byte](0,1,2,3)val strHello = Marshal("Hello").to[MessageEntity]val chHello = Marshal(aChars).to[MessageEntity]val bt0123 = Marshal(aBytes).to[MessageEntity]implicit val PersonMarshaller: ToEntityMarshaller[Person] = personMarshaller(`text/plain`)def personMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[Person] =Marshaller.withOpenCharset(mediaType) { (p, ps) ⇒ HttpEntity(mediaType withCharset ps, ByteString(p.toString)) }def personMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[Person] =Marshaller.withFixedContentType(mediaType) { p ⇒ HttpEntity(mediaType, ByteString(p.toString)) }val john = Person(12,"John")val futP = Marshal(john).to[MessageEntity]val ps = (1 to 5).map {i => Person(i,s"member#i")}val source = Source(ps)import akka.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSupport }import akka.http.scaladsl.server.Directives._implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()val route =path("data") {//     val fut = Marshal(source).to[Source[Person,NotUsed]]  //compile failed: implicit not foundval fut2 = Marshal(source).toResponseFor(HttpRequest(method = HttpMethods.GET)) // compile ok!complete(source)  //ok due to magnet-patern type ToResponseMarshallable}}



这篇关于Akka(40): Http:Marshalling reviewed - 传输数据序列化重温的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887208

相关文章

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

Node.js 中 http 模块的深度剖析与实战应用小结

《Node.js中http模块的深度剖析与实战应用小结》本文详细介绍了Node.js中的http模块,从创建HTTP服务器、处理请求与响应,到获取请求参数,每个环节都通过代码示例进行解析,旨在帮... 目录Node.js 中 http 模块的深度剖析与实战应用一、引言二、创建 HTTP 服务器:基石搭建(一

Python如何实现 HTTP echo 服务器

《Python如何实现HTTPecho服务器》本文介绍了如何使用Python实现一个简单的HTTPecho服务器,该服务器支持GET和POST请求,并返回JSON格式的响应,GET请求返回请求路... 一个用来做测试的简单的 HTTP echo 服务器。from http.server import HT

Java中JSON字符串反序列化(动态泛型)

《Java中JSON字符串反序列化(动态泛型)》文章讨论了在定时任务中使用反射调用目标对象时处理动态参数的问题,通过将方法参数存储为JSON字符串并进行反序列化,可以实现动态调用,然而,这种方式容易导... 需求:定时任务扫描,反射调用目标对象,但是,方法的传参不是固定的。方案一:将方法参数存成jsON字

BUUCTF靶场[web][极客大挑战 2019]Http、[HCTF 2018]admin

目录   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 [web][HCTF 2018]admin 考点:弱密码字典爆破 四种方法:   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 访问环境 老规矩,我们先查看源代码

【Linux】应用层http协议

一、HTTP协议 1.1 简要介绍一下HTTP        我们在网络的应用层中可以自己定义协议,但是,已经有大佬定义了一些现成的,非常好用的应用层协议,供我们直接使用,HTTP(超文本传输协议)就是其中之一。        在互联网世界中,HTTP(超文本传输协议)是一个至关重要的协议,他定义了客户端(如浏览器)与服务器之间如何进行通信,以交换或者传输超文本(比如HTML文档)。

如何确定 Go 语言中 HTTP 连接池的最佳参数?

确定 Go 语言中 HTTP 连接池的最佳参数可以通过以下几种方式: 一、分析应用场景和需求 并发请求量: 确定应用程序在特定时间段内可能同时发起的 HTTP 请求数量。如果并发请求量很高,需要设置较大的连接池参数以满足需求。例如,对于一个高并发的 Web 服务,可能同时有数百个请求在处理,此时需要较大的连接池大小。可以通过压力测试工具模拟高并发场景,观察系统在不同并发请求下的性能表现,从而

Anaconda 中遇到CondaHTTPError: HTTP 404 NOT FOUND for url的问题及解决办法

最近在跑一个开源项目遇到了以下问题,查了很多资料都大(抄)同(来)小(抄)异(去)的,解决不了根本问题,费了很大的劲终于得以解决,记录如下: 1、问题及过程: (myenv) D:\Workspace\python\XXXXX>conda install python=3.6.13 Solving environment: done.....Proceed ([y]/n)? yDownloa

构建高性能WEB之HTTP首部优化

0x00 前言 在讨论浏览器优化之前,首先我们先分析下从客户端发起一个HTTP请求到用户接收到响应之间,都发生了什么?知己知彼,才能百战不殆。这也是作为一个WEB开发者,为什么一定要深入学习TCP/IP等网络知识。 0x01 到底发生什么了? 当用户发起一个HTTP请求时,首先客户端将与服务端之间建立TCP连接,成功建立连接后,服务端将对请求进行处理,并对客户端做出响应,响应内容一般包括响应

Golang支持平滑升级的HTTP服务

前段时间用Golang在做一个HTTP的接口,因编译型语言的特性,修改了代码需要重新编译可执行文件,关闭正在运行的老程序,并启动新程序。对于访问量较大的面向用户的产品,关闭、重启的过程中势必会出现无法访问的情况,从而影响用户体验。 使用Golang的系统包开发HTTP服务,是无法支持平滑升级(优雅重启)的,本文将探讨如何解决该问题。 一、平滑升级(优雅重启)的一般思路 一般情况下,要实现平滑