Akka(41): Http:DBTable-rows streaming - 数据库表行交换

2024-04-09 04:48

本文主要是介绍Akka(41): Http:DBTable-rows streaming - 数据库表行交换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  在前面一篇讨论里我们介绍了通过http进行文件的交换。因为文件内容是以一堆bytes来表示的,而http消息的数据部分也是byte类型的,所以我们可以直接用Source[ByteString,_]来读取文件然后放进HttpEntity中。我们还提到:如果需要进行数据库数据交换的话,可以用Source[ROW,_]来表示库表行,但首先必须进行ROW -> ByteString的转换。在上期讨论我们提到过这种转换其实是ROW->Json->ByteString或者反方向的转换,在Akka-http里称之为Marshalling和Unmarshalling。Akka-http的Marshalling实现采用了type-class编程模式,需要为每一种类型与Json的转换在可视域内提供Marshaller[A,B]类型的隐式实例。Akka-http默认的Json工具库是Spray-Json,着重case class,而且要提供JsonFormat?(case-class),其中?代表case class的参数个数,用起来略显复杂。不过因为是Akka-http的配套库,在将来Akka-http的持续发展中具有一定的优势,所以我们还是用它来进行下面的示范。

下面就让我们开始写些代码吧。首先,我们用一个case class代表数据库表行结构,然后用它作为流元素来构建一个Source,如下:

  case class County(id: Int, name: String)val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }

我们先设计服务端的数据下载部分:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupporttrait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }implicit val countyFormat = jsonFormat2(County)
}object HttpDBServer extends App {import Converters._implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherimplicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)val route =path("rows") {get {complete {source}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

在上面的代码里我们直接把source放进了complete(),然后期望这个directive能通过ToEntityMarshaller[County]类实例用Spray-Json把Source[County,NotUsed]转换成Source[ByteString,NotUsed]然后放入HttpResponse的HttpEntity里。转换结果只能在客户端得到证实。我们知道HttpResponse里的Entity.dataBytes就是一个Source[ByteString,_],我们可以把它Unmarshall成Source[County,_],然后用Akka-stream来操作:

      case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futSource = Unmarshal(entity).to[Source[County,NotUsed]]futSource.onSuccess {case source => source.runForeach(println)}


上面这个Unmarshal调用了下面这个FromEntityUnmarshaller[County]隐式实例:

  // support for as[Source[T, NotUsed]]implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] =Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒if (support.supported.matches(e.contentType)) {val frames = e.dataBytes.via(support.framingDecoder)val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_)val unmarshallingFlow =if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal)else Flow[ByteString].mapAsync(support.parallelism)(unmarshal)val elements = frames.viaMat(unmarshallingFlow)(Keep.right)FastFuture.successful(elements)} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))}

这个隐式实例是由Spray-Jason提供的,在SprayJsonSupport.scala里。
下面是这部分客户端的完整代码:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling.Unmarshaltrait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)implicit val countyFormat = jsonFormat2(County)
}object HttpDBClient extends App {import Converters._implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherimplicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()def downloadRows(request: HttpRequest) = {val futResp = Http(sys).singleRequest(request)futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futSource = Unmarshal(entity).to[Source[County,NotUsed]]futSource.onSuccess {case source => source.runForeach(println)}case Success(r@HttpResponse(code, _, _, _)) =>println(s"download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download rows!")case Failure(err) => println(s"download failed: ${err.getMessage}")}}downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))scala.io.StdIn.readLine()sys.terminate()}

以上我们已经实现了客户端从服务端下载一段数据库表行,然后以Akka-stream的操作方式来处理下载数据。那么反向交换即从客户端上传一段表行的话就需要把一个Source[T,_]转换成Source[ByteString,_]然后放进HttpRequest的HttpEntity里。服务端收到数据后又要进行反向的转换即把Request.Entity.dataBytes从Source[ByteString,_]转回Source[T,_]。Akka-http在客户端没有提供像complete这样的强大的自动化功能。我们可能需要自定义并提供像ToRequestMarshaller[Source[T,_]]这样的隐式实例。但Akka-http的Marshalling-type-class是个非常复杂的系统。如果我们的目的是简单提供一个Source[ByteString,_],我们是否可以直接调用Spray-Json的函数来进行ROW->Son->ByteString转换呢?如下:

  import akka.util.ByteStringimport akka.http.scaladsl.model.HttpEntity.limitableByteSourceval source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}def countyToByteString(c: County) = {ByteString(c.toJson.toString)}val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)val rowBytes = limitableByteSource(source via flowCountyToByteString)val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")val data = HttpEntity(ContentTypes.`application/json`,rowBytes)

我们直接用toJson函数进行County->Json转换实现了flowCountyToByteString。toJason是Spray-Json提供的一个函数:

package json {case class DeserializationException(msg: String, cause: Throwable = null, fieldNames: List[String] = Nil) extends RuntimeException(msg, cause)class SerializationException(msg: String) extends RuntimeException(msg)private[json] class PimpedAny[T](any: T) {def toJson(implicit writer: JsonWriter[T]): JsValue = writer.write(any)}private[json] class PimpedString(string: String) {@deprecated("deprecated in favor of parseJson", "1.2.6")def asJson: JsValue = parseJsondef parseJson: JsValue = JsonParser(string)}
}

假设服务端收到数据后以Akka-stream方式再转换成一个List返回,我们用下面的方法来测试功能:

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {val futResp = Http(sys).singleRequest(request.copy(entity = dataEntity))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}

服务端接收数据处理方法如下:

     post {withoutSizeLimit {entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete {futofNames}}}}

考虑到在数据转换的过程中可能会出现异常。需要异常处理方法来释放backpressure:

  def postExceptionHandler: ExceptionHandler =ExceptionHandler {case _: RuntimeException =>extractRequest { req =>req.discardEntityBytes()complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))}}post {withoutSizeLimit {handleExceptions(postExceptionHandler) {entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete {futofNames}}}}}

在客户端试运行返回结果显示:

  uploadRows(request,data)["","广西壮族自治区地市县编号 #1","广西壮族自治区地市县编号 #2","广西壮族自治区地市县编号 #3","广西壮族自治区地市县编号 #4","广西壮族自治区地市县编号 #5"]

正是我们期待的结果。

下面是本次讨论的示范代码:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }implicit val countyFormat = jsonFormat2(County)
}object HttpDBServer extends App {import Converters._implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherimplicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)def postExceptionHandler: ExceptionHandler =ExceptionHandler {case _: RuntimeException =>extractRequest { req =>req.discardEntityBytes()complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))}}val route =path("rows") {get {complete {source}} ~post {withoutSizeLimit {handleExceptions(postExceptionHandler) {entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete {futofNames}}}}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)implicit val countyFormat = jsonFormat2(County)
}object HttpDBClient extends App {import Converters._implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherimplicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()def downloadRows(request: HttpRequest) = {val futResp = Http(sys).singleRequest(request)futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futSource = Unmarshal(entity).to[Source[County,NotUsed]]futSource.onSuccess {case source => source.runForeach(println)}case Success(r@HttpResponse(code, _, _, _)) =>println(s"download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download rows!")case Failure(err) => println(s"download failed: ${err.getMessage}")}}downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))import akka.util.ByteStringimport akka.http.scaladsl.model.HttpEntity.limitableByteSourceval source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}def countyToByteString(c: County) = {ByteString(c.toJson.toString)}val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)val rowBytes = limitableByteSource(source via flowCountyToByteString)val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")val data = HttpEntity(ContentTypes.`application/json`,rowBytes)def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {val futResp = Http(sys).singleRequest(request.copy(entity = dataEntity))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}uploadRows(request,data)scala.io.StdIn.readLine()sys.terminate()}





这篇关于Akka(41): Http:DBTable-rows streaming - 数据库表行交换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887209

相关文章

数据库面试必备之MySQL中的乐观锁与悲观锁

《数据库面试必备之MySQL中的乐观锁与悲观锁》:本文主要介绍数据库面试必备之MySQL中乐观锁与悲观锁的相关资料,乐观锁适用于读多写少的场景,通过版本号检查避免冲突,而悲观锁适用于写多读少且对数... 目录一、引言二、乐观锁(一)原理(二)应用场景(三)示例代码三、悲观锁(一)原理(二)应用场景(三)示例

Node.js 数据库 CRUD 项目示例详解(完美解决方案)

《Node.js数据库CRUD项目示例详解(完美解决方案)》:本文主要介绍Node.js数据库CRUD项目示例详解(完美解决方案),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考... 目录项目结构1. 初始化项目2. 配置数据库连接 (config/db.js)3. 创建模型 (models/

Nginx中配置HTTP/2协议的详细指南

《Nginx中配置HTTP/2协议的详细指南》HTTP/2是HTTP协议的下一代版本,旨在提高性能、减少延迟并优化现代网络环境中的通信效率,本文将为大家介绍Nginx配置HTTP/2协议想详细步骤,需... 目录一、HTTP/2 协议概述1.HTTP/22. HTTP/2 的核心特性3. HTTP/2 的优

Spring Security基于数据库的ABAC属性权限模型实战开发教程

《SpringSecurity基于数据库的ABAC属性权限模型实战开发教程》:本文主要介绍SpringSecurity基于数据库的ABAC属性权限模型实战开发教程,本文给大家介绍的非常详细,对大... 目录1. 前言2. 权限决策依据RBACABAC综合对比3. 数据库表结构说明4. 实战开始5. MyBA

使用Python自建轻量级的HTTP调试工具

《使用Python自建轻量级的HTTP调试工具》这篇文章主要为大家详细介绍了如何使用Python自建一个轻量级的HTTP调试工具,文中的示例代码讲解详细,感兴趣的小伙伴可以参考一下... 目录一、为什么需要自建工具二、核心功能设计三、技术选型四、分步实现五、进阶优化技巧六、使用示例七、性能对比八、扩展方向建

Ubuntu中远程连接Mysql数据库的详细图文教程

《Ubuntu中远程连接Mysql数据库的详细图文教程》Ubuntu是一个以桌面应用为主的Linux发行版操作系统,这篇文章主要为大家详细介绍了Ubuntu中远程连接Mysql数据库的详细图文教程,有... 目录1、版本2、检查有没有mysql2.1 查询是否安装了Mysql包2.2 查看Mysql版本2.

Oracle数据库常见字段类型大全以及超详细解析

《Oracle数据库常见字段类型大全以及超详细解析》在Oracle数据库中查询特定表的字段个数通常需要使用SQL语句来完成,:本文主要介绍Oracle数据库常见字段类型大全以及超详细解析,文中通过... 目录前言一、字符类型(Character)1、CHAR:定长字符数据类型2、VARCHAR2:变长字符数

Win11安装PostgreSQL数据库的两种方式详细步骤

《Win11安装PostgreSQL数据库的两种方式详细步骤》PostgreSQL是备受业界青睐的关系型数据库,尤其是在地理空间和移动领域,:本文主要介绍Win11安装PostgreSQL数据库的... 目录一、exe文件安装 (推荐)下载安装包1. 选择操作系统2. 跳转到EDB(PostgreSQL 的

SpringBoot实现数据库读写分离的3种方法小结

《SpringBoot实现数据库读写分离的3种方法小结》为了提高系统的读写性能和可用性,读写分离是一种经典的数据库架构模式,在SpringBoot应用中,有多种方式可以实现数据库读写分离,本文将介绍三... 目录一、数据库读写分离概述二、方案一:基于AbstractRoutingDataSource实现动态

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.