Akka(41): Http:DBTable-rows streaming - 数据库表行交换

2024-04-09 04:48

本文主要是介绍Akka(41): Http:DBTable-rows streaming - 数据库表行交换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  在前面一篇讨论里我们介绍了通过http进行文件的交换。因为文件内容是以一堆bytes来表示的,而http消息的数据部分也是byte类型的,所以我们可以直接用Source[ByteString,_]来读取文件然后放进HttpEntity中。我们还提到:如果需要进行数据库数据交换的话,可以用Source[ROW,_]来表示库表行,但首先必须进行ROW -> ByteString的转换。在上期讨论我们提到过这种转换其实是ROW->Json->ByteString或者反方向的转换,在Akka-http里称之为Marshalling和Unmarshalling。Akka-http的Marshalling实现采用了type-class编程模式,需要为每一种类型与Json的转换在可视域内提供Marshaller[A,B]类型的隐式实例。Akka-http默认的Json工具库是Spray-Json,着重case class,而且要提供JsonFormat?(case-class),其中?代表case class的参数个数,用起来略显复杂。不过因为是Akka-http的配套库,在将来Akka-http的持续发展中具有一定的优势,所以我们还是用它来进行下面的示范。

下面就让我们开始写些代码吧。首先,我们用一个case class代表数据库表行结构,然后用它作为流元素来构建一个Source,如下:

  case class County(id: Int, name: String)val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }

我们先设计服务端的数据下载部分:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupporttrait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }implicit val countyFormat = jsonFormat2(County)
}object HttpDBServer extends App {import Converters._implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherimplicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)val route =path("rows") {get {complete {source}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

在上面的代码里我们直接把source放进了complete(),然后期望这个directive能通过ToEntityMarshaller[County]类实例用Spray-Json把Source[County,NotUsed]转换成Source[ByteString,NotUsed]然后放入HttpResponse的HttpEntity里。转换结果只能在客户端得到证实。我们知道HttpResponse里的Entity.dataBytes就是一个Source[ByteString,_],我们可以把它Unmarshall成Source[County,_],然后用Akka-stream来操作:

      case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futSource = Unmarshal(entity).to[Source[County,NotUsed]]futSource.onSuccess {case source => source.runForeach(println)}


上面这个Unmarshal调用了下面这个FromEntityUnmarshaller[County]隐式实例:

  // support for as[Source[T, NotUsed]]implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] =Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒if (support.supported.matches(e.contentType)) {val frames = e.dataBytes.via(support.framingDecoder)val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_)val unmarshallingFlow =if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal)else Flow[ByteString].mapAsync(support.parallelism)(unmarshal)val elements = frames.viaMat(unmarshallingFlow)(Keep.right)FastFuture.successful(elements)} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))}

这个隐式实例是由Spray-Jason提供的,在SprayJsonSupport.scala里。
下面是这部分客户端的完整代码:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling.Unmarshaltrait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)implicit val countyFormat = jsonFormat2(County)
}object HttpDBClient extends App {import Converters._implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherimplicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()def downloadRows(request: HttpRequest) = {val futResp = Http(sys).singleRequest(request)futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futSource = Unmarshal(entity).to[Source[County,NotUsed]]futSource.onSuccess {case source => source.runForeach(println)}case Success(r@HttpResponse(code, _, _, _)) =>println(s"download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download rows!")case Failure(err) => println(s"download failed: ${err.getMessage}")}}downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))scala.io.StdIn.readLine()sys.terminate()}

以上我们已经实现了客户端从服务端下载一段数据库表行,然后以Akka-stream的操作方式来处理下载数据。那么反向交换即从客户端上传一段表行的话就需要把一个Source[T,_]转换成Source[ByteString,_]然后放进HttpRequest的HttpEntity里。服务端收到数据后又要进行反向的转换即把Request.Entity.dataBytes从Source[ByteString,_]转回Source[T,_]。Akka-http在客户端没有提供像complete这样的强大的自动化功能。我们可能需要自定义并提供像ToRequestMarshaller[Source[T,_]]这样的隐式实例。但Akka-http的Marshalling-type-class是个非常复杂的系统。如果我们的目的是简单提供一个Source[ByteString,_],我们是否可以直接调用Spray-Json的函数来进行ROW->Son->ByteString转换呢?如下:

  import akka.util.ByteStringimport akka.http.scaladsl.model.HttpEntity.limitableByteSourceval source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}def countyToByteString(c: County) = {ByteString(c.toJson.toString)}val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)val rowBytes = limitableByteSource(source via flowCountyToByteString)val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")val data = HttpEntity(ContentTypes.`application/json`,rowBytes)

我们直接用toJson函数进行County->Json转换实现了flowCountyToByteString。toJason是Spray-Json提供的一个函数:

package json {case class DeserializationException(msg: String, cause: Throwable = null, fieldNames: List[String] = Nil) extends RuntimeException(msg, cause)class SerializationException(msg: String) extends RuntimeException(msg)private[json] class PimpedAny[T](any: T) {def toJson(implicit writer: JsonWriter[T]): JsValue = writer.write(any)}private[json] class PimpedString(string: String) {@deprecated("deprecated in favor of parseJson", "1.2.6")def asJson: JsValue = parseJsondef parseJson: JsValue = JsonParser(string)}
}

假设服务端收到数据后以Akka-stream方式再转换成一个List返回,我们用下面的方法来测试功能:

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {val futResp = Http(sys).singleRequest(request.copy(entity = dataEntity))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}

服务端接收数据处理方法如下:

     post {withoutSizeLimit {entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete {futofNames}}}}

考虑到在数据转换的过程中可能会出现异常。需要异常处理方法来释放backpressure:

  def postExceptionHandler: ExceptionHandler =ExceptionHandler {case _: RuntimeException =>extractRequest { req =>req.discardEntityBytes()complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))}}post {withoutSizeLimit {handleExceptions(postExceptionHandler) {entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete {futofNames}}}}}

在客户端试运行返回结果显示:

  uploadRows(request,data)["","广西壮族自治区地市县编号 #1","广西壮族自治区地市县编号 #2","广西壮族自治区地市县编号 #3","广西壮族自治区地市县编号 #4","广西壮族自治区地市县编号 #5"]

正是我们期待的结果。

下面是本次讨论的示范代码:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }implicit val countyFormat = jsonFormat2(County)
}object HttpDBServer extends App {import Converters._implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherimplicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)def postExceptionHandler: ExceptionHandler =ExceptionHandler {case _: RuntimeException =>extractRequest { req =>req.discardEntityBytes()complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))}}val route =path("rows") {get {complete {source}} ~post {withoutSizeLimit {handleExceptions(postExceptionHandler) {entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete {futofNames}}}}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)implicit val countyFormat = jsonFormat2(County)
}object HttpDBClient extends App {import Converters._implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherimplicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()def downloadRows(request: HttpRequest) = {val futResp = Http(sys).singleRequest(request)futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futSource = Unmarshal(entity).to[Source[County,NotUsed]]futSource.onSuccess {case source => source.runForeach(println)}case Success(r@HttpResponse(code, _, _, _)) =>println(s"download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download rows!")case Failure(err) => println(s"download failed: ${err.getMessage}")}}downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))import akka.util.ByteStringimport akka.http.scaladsl.model.HttpEntity.limitableByteSourceval source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}def countyToByteString(c: County) = {ByteString(c.toJson.toString)}val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)val rowBytes = limitableByteSource(source via flowCountyToByteString)val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")val data = HttpEntity(ContentTypes.`application/json`,rowBytes)def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {val futResp = Http(sys).singleRequest(request.copy(entity = dataEntity))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}uploadRows(request,data)scala.io.StdIn.readLine()sys.terminate()}





这篇关于Akka(41): Http:DBTable-rows streaming - 数据库表行交换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887209

相关文章

关于如何更好管理好数据库的一点思考

本文尝试从数据库设计理论、ER图简介、性能优化、避免过度设计及权限管理方面进行思考阐述。 一、数据库范式 以下通过详细的示例说明数据库范式的概念,将逐步规范化一个例子,逐级说明每个范式的要求和变换过程。 示例:学生课程登记系统 初始表格如下: 学生ID学生姓名课程ID课程名称教师教师办公室1张三101数学王老师101室2李四102英语李老师102室3王五101数学王老师101室4赵六103物理陈

数据库期末复习知识点

A卷 1. 选择题(30') 2. 判断范式(10') 判断到第三范式 3. 程序填空(20') 4. 分析填空(15') 5. 写SQL(25') 5'一题 恶性 B卷 1. 单选(30') 2. 填空 (20') 3. 程序填空(20') 4. 写SQL(30') 知识点 第一章 数据库管理系统(DBMS)  主要功能 数据定义功能 (DDL, 数据定义语

给数据库的表添加字段

周五有一个需求是这样的: 原来数据库有一个表B,现在需要添加一个字段C,我把代码中增删改查部分进行了修改, 比如insert中也添入了字段C。 但没有考虑到一个问题,数据库的兼容性。因为之前的版本已经投入使用了,再升级的话,需要进行兼容处理,当时脑子都蒙了,转不过来,后来同事解决了这个问题。 现在想想,思路就是,把数据库的表结构存入文件中,如xxx.sql 实时更新该文件: CREAT

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

SQL Server中,添加数据库到AlwaysOn高可用性组条件

1、将数据添加到AlwaysOn高可用性组,需要满足以下条件: 2、更多具体AlwaysOn设置,参考:https://msdn.microsoft.com/zh-cn/library/windows/apps/ff878487(v=sql.120).aspx 注:上述资源来自MSDN。

SQL Server中,用Restore DataBase把数据库还原到指定的路径

restore database 数据库名 from disk='备份文件路径' with move '数据库文件名' to '数据库文件放置路径', move '日志文件名' to '日志文件存放置路径' Go 如: restore database EaseWe from disk='H:\EaseWe.bak' with move 'Ease

数据库原理与安全复习笔记(未完待续)

1 概念 产生与发展:人工管理阶段 → \to → 文件系统阶段 → \to → 数据库系统阶段。 数据库系统特点:数据的管理者(DBMS);数据结构化;数据共享性高,冗余度低,易于扩充;数据独立性高。DBMS 对数据的控制功能:数据的安全性保护;数据的完整性检查;并发控制;数据库恢复。 数据库技术研究领域:数据库管理系统软件的研发;数据库设计;数据库理论。数据模型要素 数据结构:描述数据库

MySQL数据库(四):视图和索引

在数据库管理中,视图和索引是两种关键工具,它们各自发挥独特的作用以优化数据查询和管理。视图通过简化复杂查询、提高数据安全性和提供数据抽象,帮助用户轻松访问数据。而索引则通过加速查询、确保数据唯一性以及优化排序和分组操作,显著提升数据库性能。理解和合理运用这两者,对数据库系统的高效运行至关重要。 目录 一、视图概念(面试) 二、视图的作用(面试) 三、视图的创建和使用 3.1

Java中如何优化数据库查询性能?

Java中如何优化数据库查询性能? 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我们将深入探讨在Java中如何优化数据库查询性能,这是提升应用程序响应速度和用户体验的关键技术。 优化数据库查询性能的重要性 在现代应用开发中,数据库查询是最常见的操作之一。随着数据量的增加和业务复杂度的提升,数据库查询的性能优化显得尤为重

BD错误集锦7——在集成Spring MVC + MyBtis时使用c3p0作为数据库时报错Method com/mchange/v2/c3p0/impl/NewProxyPreparedStatem

异常信息如下: Type Exception ReportMessage Handler dispatch failed; nested exception is java.lang.AbstractMethodError: Method com/mchange/v2/c3p0/impl/NewProxyPreparedStatement.isClosed()Z is abstractDescr