restapi(4)- rest-mongo : MongoDB数据库前端的httpserver

2024-04-09 04:38

本文主要是介绍restapi(4)- rest-mongo : MongoDB数据库前端的httpserver,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   完成了一套标准的rest风格数据库CRUD操作httpserver后发现有许多不足。主要是为了追求“通用”两个字,想把所有服务接口做的更“范generic”些,结果反而限制了目标数据库的特点,最终产生了一套功能弱小的玩具。比如说吧:标准rest风格getbyId需要所有的数据表都具备id这个字段,有点傻。然后get返回的结果集又没有什么灵活的控制方法如返回数量、字段、排序等。特别对MongoDB这样的在查询操作方面接近关系式数据库的分布式数据库:上篇提到过,它的query能力强大,条件组合灵活,如果不能在网络服务api中体现出来就太可惜了。所以,这篇博文会讨论一套专门针对MongoDB的rest-server。我想达到的目的是:后台数据库是MongoDB,通过httpserver提供对MongoDB的CRUD操作,客户端通过http调用CRUD服务。后台开发对每一个数据库表单使用统一的标准增添一套新的CRUD服务。希望如此能够提高开发效率,减少代码出错机会。

MongoDB是一种文件类型数据库,数据格式更加多样化。在这次示范里希望能把MongoDB有特点的数据类型以及它们的处理方法都介绍了,包括:日期类型,二进制类型blob(图片)等。顺便提一下:普通大型文本文件也可以用二进制blob方式存入MongoDB,因为文件在http传输过程中必须以byte方式进行,所以后台httpserver接收的文件格式是一串byte,不用任何格式转换就可以直接存入MongoDB blob字段。客户端从后台下载时就需要把bytes转换成UTF8字符就可以恢复文件内容了。

首先,我们先从Model开始,在scala里用case class来表示。Model是MongoDB Document的对应。在scala编程里我们是用case class 当作Document来操作的。我们设计的Model都会继承一个ModelBase trait:

trait ModelBase[E] {def to: E
}case class Person(userid: String = "",name: String = "",age: Option[Int] = None,dob: Option[MGODate] = None,   //生日address: Option[String] = None) extends ModelBase[Document] {import org.mongodb.scala.bson._override def to: Document = {var doc = Document("userid" -> this.userid,"name" -> this.name)if (this.age != None)doc = doc + ("age" -> this.age.get)if (this.dob != None)doc = doc + ("dob" -> this.dob.get)if (this.address != None)doc = doc + ("address" -> this.address.getOrElse(""))doc}}object Person {val fromDocument: Document => Person = doc => {val keyset = doc.keySetPerson(userid = doc.getString("userid"),name = doc.getString("name"),age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],dob =  {if (keyset.contains("dob"))Some(doc.getDate("dob"))else None },address =  mgoGetStringOrNone(doc,"address"))}}

在上面例子里Person对应MongoDB里一个Document。除了注意对应类型属性与表字段类型外,还提供了to,fromDecument两个转换函数。其中to函数是继承ModelBase的,代表所有MongoDB Model都必须具备to这个函数。这点很重要,因为在从json构建成Model时,如果属于ModelBase则肯定可以调用一个to函数:

 class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(
...post {entity(as[String]) { json =>val extractedEntity: M = fromJson[M](json)val doc: Document = extractedEntity.toval futmsg = repository.insert(doc).value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(c) => c.toString()case None => "insert may not complete!"}case Left(err) => err.getMessage}}

注意这个extractedEntity:我们现在还不能确定它的具体类型,是Person,Animal,Machine? 但我们确定它是M类型,而M<:ModalBase[Document],所以M是MongoDB Model。可以调用extractedEntity.to获取一个Document。

仔细看,Person里并不包括blob类型字段。因为到现在我还没有想到办法在一个httprequest里把多个字段和图片一次性发出来,必须分两个request才能完成一个Document的上传。httpserver收到两个requests后还要进行requests的匹配对应管理,十分的复杂。所以含blob类型的Document只能把blob分拆到另一个Document里,然后用这个Document唯一一个id字段来链接:

  case class Photo (id: String,photo: Option[MGOBlob]) extends ModelBase[Document] {override def to: Document = {var doc = Document("id" -> this.id)if (photo != None)doc = doc + ("photo" -> this.photo)doc}}object Photo {def fromDocument: Document => Photo = doc => {val keyset = doc.keySetPhoto(id = doc.getString("id"),photo = mgoGetBlobOrNone(doc, "photo"))}}

从另一个角度来讲,把blob和正常字段分开来存储也有一定的优势,最多也就是需要两次query罢了。

第二部分是repository:数据库操作函数:

   class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) {def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {var res = Seq[ResultOptions]()next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}val ctxFind = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_QUERY).setCommand(Find(andThen = res))mgoQuery[Seq[R]](ctxFind,converter)}def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {var res = Seq[ResultOptions]()next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}val ctxFind = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_QUERY).setCommand(Find(filter = Some(filtr),andThen = res))mgoQuery[Seq[R]](ctxFind,converter)}def getOneDocument(filtr: Bson): DBOResult[Document] = {val ctxFind = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_QUERY).setCommand(Find(filter = Some(filtr),firstOnly = true))mgoQuery[Document](ctxFind,converter)}def insert(doc: Document): DBOResult[Completed] = {val ctxInsert = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_UPDATE).setCommand(Insert(Seq(doc)))mgoUpdate[Completed](ctxInsert)}def delete(filter: Bson): DBOResult[DeleteResult] = {val ctxDelete = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_UPDATE).setCommand(Delete(filter))mgoUpdate[DeleteResult](ctxDelete)}def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {val ctxUpdate = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_UPDATE).setCommand(Update(filter,update,None,!many))mgoUpdate[UpdateResult](ctxUpdate)}def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {val ctxUpdate = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_UPDATE).setCommand(Replace(filter,row))mgoUpdate[UpdateResult](ctxUpdate)}}

这部分上篇博文讨论过。最后是akka-http的核心部分:Route。MongoDB CRUD服务对外的api:

      (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {(filter,fields,sort,top,next) => {dbor = {filter match {case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)case None => repository.getAll(next,sort,fields,top)}}val futRows = dbor.value.value.runToFuture.map {eolr =>eolr match {case Right(olr) => olr match {case Some(lr) => lrcase None => Seq[M]()}case Left(_) => Seq[M]()}}complete(futureToJson(futRows))}} ~ post {entity(as[String]) { json =>val extractedEntity: M = fromJson[M](json)val doc: Document = extractedEntity.toval futmsg = repository.insert(doc).value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(c) => c.toString()case None => "insert may not complete!"}case Left(err) => err.getMessage}}complete(futmsg)}} ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>val bson = Document(filter)if (set == None) {entity(as[String]) { json =>val extractedEntity: M = fromJson[M](json)val doc: Document = extractedEntity.toval futmsg = repository.replace(bson, doc).value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."case None => "update may not complete!"}case Left(err) => err.getMessage}}complete(futureToJson(futmsg))}} else {set match {case Some(u) =>val ubson = Document(u)dbou = repository.update(bson, ubson, many.getOrElse(true))case None =>dbou = Left(new IllegalArgumentException("missing set statement for update!"))}val futmsg = dbou.value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."case None => "update may not complete!"}case Left(err) => err.getMessage}}complete(futureToJson(futmsg))}} ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) =>val bson = Document(filter)val futmsg = repository.delete(bson).value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(d) => s"${d.getDeletedCount} rows deleted."case None => "delete may not complete!"}case Left(err) => err.getMessage}}complete(futureToJson(futmsg))}}

与上篇最大的区别就是这次的Route支持MongoDB特性的query string,bson类型的参数。如:

http://192.168.0.189:50081/private/crud/person
http://192.168.0.189:50081/private/crud/person?filter={"userid":"c001"}
http://192.168.0.189:50081/private/crud/person?sort={"userid":-1}
http://192.168.0.189:50081/private/crud/person?filter={"userid":{$gt:"c000"}}&sort={"userid":-1}&top=3

可惜的是bson表达式中有些字符是url禁止的,所以必须预先处理一下。可以用公网的UrlEncoder在线转换:

https://www.url-encoder.com   {"userid":"c001"} -> %7B%22userid%22%3A%22c001%22%7D

在程序里可以用软件工具:"com.github.tasubo" % "jurl-tools" % "0.6"  URLEncode.encode(xyz)

   val sort ="""|{userid:-1}""".stripMarginval getAllRequest = HttpRequest(HttpMethods.GET,uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort),).addHeader(authentication)

blob服务的api Route:

      pathPrefix("blob") {(get & path(Remaining)) { id =>val filtr = equal("id", id)val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {eodoc =>eodoc match {case Right(odoc) => odoc match {case Some(doc) =>if (doc == null) Noneelse mgoGetBlobOrNone(doc, "photo")case None => None}case Left(_) => None}}onComplete(futOptPic) {case Success(optBlob) => optBlob match {case Some(blob) =>withoutSizeLimit {encodeResponseWith(Gzip) {complete(HttpEntity(ContentTypes.`application/octet-stream`,ByteArrayToSource(blob.getData)))}}case None => complete(StatusCodes.NotFound)}case Failure(err) => complete(err)}} ~(post &  parameter('id)) { id =>withoutSizeLimit {decodeRequest {extractDataBytes { bytes =>val fut = bytes.runFold(ByteString()) { case (hd, bs) =>hd ++ bs}onComplete(fut) {case Success(b) =>val doc = Document("id" -> id, "photo" -> b.toArray)val futmsg = repository.insert(doc).value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(c) => c.toString()case None => "insert may not complete!"}case Left(err) => err.getMessage}}complete(futmsg)case Failure(err) => complete(err)}}}}}} 

注意:MongoRoute[M]是个范类型。我希望对任何Model的Route只需要指定M即可,如:

  implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument))implicit val picDao = new MongoRepo[Photo]("testdb","photo", None)...  pathPrefix("public") {(pathPrefix("crud")) {new MongoRoute[Person]("person")(personDao).route ~new MongoRoute[Photo]("photo")(picDao).route}}

是否省力多了?但是,回到原来问题:blob类型在整个移动过程中都不需要进行格式转换。所以id字段名称是指定的,这点在设计表结构时要注意。

如何测试一个httpserver还是比较头痛的。用浏览器只能测试GET,其它POST,PUT,DELETE应该怎么测试?其实可以用curl:


curl -i -X GET http://rest-api.io/items
curl -i -X GET http://rest-api.io/items/5069b47aa892630aae059584
curl -i -X DELETE http://rest-api.io/items/5069b47aa892630aae059584
curl -i -X POST -H 'Content-Type: application/json' -d '{"name": "New item", "year": "2009"}' http://rest-api.io/items
curl -i -X PUT -H 'Content-Type: application/json' -d '{"name": "Updated item", "year": "2010"}' http://rest-api.io/items/5069b47aa892630aae059584

下面写两个客户端分别测试crud和blob:

TestCrudClient.scala

import akka.actor._
import akka.http.scaladsl.model.headers._import scala.concurrent._
import scala.concurrent.duration._
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import com.github.tasubo.jurl.URLEncode
import com.datatech.rest.mongo.MongoModels.Person
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson
import com.datatech.sdp.mongo.engine.MGOClasses._trait JsonCodec extends Json4sSupport {import org.json4s.DefaultFormatsimport org.json4s.ext.JodaTimeSerializersimplicit val serilizer = jackson.Serializationimplicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodecobject TestCrudClient {type UserInfo = Map[String,Any]def main(args: Array[String]): Unit = {import JsConverters._implicit val system = ActorSystem()implicit val materializer = ActorMaterializer()// needed for the future flatMap/onComplete in the endimplicit val executionContext = system.dispatcherval authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))val authRequest = HttpRequest(HttpMethods.POST,uri = "http://192.168.0.189:50081/auth",headers = List(authorization))val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)val respToken = for {resp <- futTokenjstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}} yield jstrval jstr =  Await.result[String](respToken,2 seconds)println(jstr)scala.io.StdIn.readLine()val authentication = headers.Authorization(OAuth2BearerToken(jstr))val sort ="""|{userid:-1}""".stripMarginval getAllRequest = HttpRequest(HttpMethods.GET,uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort),).addHeader(authentication)val futGetAll: Future[HttpResponse] = Http().singleRequest(getAllRequest)println(Await.result(futGetAll,2 seconds))scala.io.StdIn.readLine()var bf ="""|{"userid":"c888"}""".stripMarginprintln(URLEncode.encode(bf))val delRequest = HttpRequest(HttpMethods.DELETE,uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)).addHeader(authentication)val futDel: Future[HttpResponse] = Http().singleRequest(delRequest)println(Await.result(futDel,2 seconds))scala.io.StdIn.readLine()bf ="""|{"userid":"c001"}""".stripMarginval getRequest = HttpRequest(HttpMethods.GET,uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf),).addHeader(authentication)val futGet: Future[HttpResponse] = Http().singleRequest(getRequest)println(Await.result(futGet,2 seconds))scala.io.StdIn.readLine()val tiger = Person("c001","tiger chan",Some(56))val john = Person("c002", "johnny dep", Some(60))val peter = Person("c003", "pete brad", Some(58))val susan = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) )val ns = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) )val saveRequest = HttpRequest(HttpMethods.POST,uri = "http://192.168.0.189:50081/public/crud/person").addHeader(authentication)val futPost: Future[HttpResponse] =for {reqEntity <- Marshal(peter).to[RequestEntity]response <- Http().singleRequest(saveRequest.copy(entity=reqEntity))} yield responseprintln(Await.result(futPost,2 seconds))scala.io.StdIn.readLine()var set ="""| {$set:|   {|    name:"tiger the king",|    age:18|   }| }""".stripMarginval updateRequest = HttpRequest(HttpMethods.PUT,uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)+"&set="+URLEncode.encode(set)+"&many=true").addHeader(authentication)val futUpdate: Future[HttpResponse] = Http().singleRequest(updateRequest)println(Await.result(futUpdate,2 seconds))scala.io.StdIn.readLine()val repRequest = HttpRequest(HttpMethods.PUT,uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)).addHeader(authentication)val futReplace: Future[HttpResponse] =for {reqEntity <- Marshal(susan).to[RequestEntity]response <- Http().singleRequest(updateRequest.copy(entity=reqEntity))} yield responseprintln(Await.result(futReplace,2 seconds))scala.io.StdIn.readLine()system.terminate()}}

TestFileClient.scala

import akka.stream._
import java.nio.file._
import java.io._
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import com.datatech.rest.mongo.FileStreaming._
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import scala.util._case class FileUtil(implicit sys: ActorSystem) {import sys.dispatcherimplicit val mat = ActorMaterializer()def createEntity(file: File): RequestEntity = {require(file.exists())val formData =Multipart.FormData(Source.single(Multipart.FormData.BodyPart("test",HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performanceMap("filename" -> file.getName))))Await.result(Marshal(formData).to[RequestEntity], 3 seconds)}def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {implicit val mat = ActorMaterializer()import sys.dispatcherval futResp = Http(sys).singleRequest(//   Gzip.encodeMessage(request.copy(entity = dataEntity)   //.addHeader(`Content-Encoding`(HttpEncodings.gzip))//   ))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}def downloadFileTo(request: HttpRequest, destPath: String) = {//  val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))val futResp = Http(sys).singleRequest(request)  //.map(Gzip.decodeMessage(_))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath))).onComplete { case _ => println(s"Download file saved to: $destPath") }case Success(r@HttpResponse(code, _, _, _)) =>println(s"Download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download file!")case Failure(err) => println(s"Download failed: ${err.getMessage}")}}}object TestFileClient  {type UserInfo = Map[String,Any]def main(args: Array[String]): Unit = {implicit val system = ActorSystem()implicit val materializer = ActorMaterializer()// needed for the future flatMap/onComplete in the endimplicit val executionContext = system.dispatcherval helloRequest = HttpRequest(uri = "http://192.168.0.189:50081/")val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))val authRequest = HttpRequest(HttpMethods.POST,uri = "http://192.168.0.189:50081/auth",headers = List(authorization))val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)val respToken = for {resp <- futTokenjstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}} yield jstrval jstr =  Await.result[String](respToken,2 seconds)println(jstr)scala.io.StdIn.readLine()val authentication = headers.Authorization(OAuth2BearerToken(jstr))val entity = HttpEntity(ContentTypes.`application/octet-stream`,fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024))//val chunked = HttpEntity.Chunked.fromData(ContentTypes.`application/octet-stream`,fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024))val uploadRequest = HttpRequest(HttpMethods.POST,
//      uri = "http://192.168.0.189:50081/private/file?filename=tiger.jpg",uri = "http://192.168.0.189:50081/public/crud/photo/blob?id=tiger.jpg",).addHeader(authentication)//upload fileAwait.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)//Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)val dlRequest = HttpRequest(HttpMethods.GET,
//      uri = "http://192.168.0.189:50081/api/file/mypic.jpg",uri = "http://192.168.0.189:50081/public/crud/photo/blob/tiger.jpg",).addHeader(authentication)FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")scala.io.StdIn.readLine()system.terminate()}}

下面是本次示范中的源代码:

build.sbt

name := "rest-mongo"version := "0.1"scalaVersion := "2.12.8"scalacOptions += "-Ypartial-unification"
val akkaVersion = "2.5.23"
val akkaHttpVersion = "10.1.8"libraryDependencies ++= Seq("com.typesafe.akka" %% "akka-http"   % "10.1.8","com.typesafe.akka" %% "akka-stream" % "2.5.23","com.pauldijou" %% "jwt-core" % "3.0.1","de.heikoseeberger" %% "akka-http-json4s" % "1.22.0","org.json4s" %% "json4s-native" % "3.6.1","com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8","com.typesafe.scala-logging" %% "scala-logging" % "3.9.0","org.slf4j" % "slf4j-simple" % "1.7.25","org.json4s" %% "json4s-jackson" % "3.6.7","org.json4s" %% "json4s-ext" % "3.6.7",// for scalikejdbc"org.scalikejdbc" %% "scalikejdbc"       % "3.2.1","org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test","org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1","org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1","org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1","com.h2database"  %  "h2" % "1.4.199","com.zaxxer" % "HikariCP" % "2.7.4","com.jolbox" % "bonecp" % "0.8.0.RELEASE","com.typesafe.slick" %% "slick" % "3.3.2",//for cassandra 3.6.0"com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0","com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0","com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.1.0",//for mongodb 4.0"org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0","com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0","ch.qos.logback"  %  "logback-classic"   % "1.2.3","io.monix" %% "monix" % "3.0.0-RC3","org.typelevel" %% "cats-core" % "2.0.0-M4","com.github.tasubo" % "jurl-tools" % "0.6"
)

MongoHttpServer.scala

package com.datatech.rest.mongoimport akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import pdi.jwt._
import AuthBase._
import MockUserAuthService._
import org.mongodb.scala._import scala.collection.JavaConverters._
import MongoModels._
import MongoRepo._
import MongoRoute._object MongoHttpServer extends App {implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherval settings: MongoClientSettings = MongoClientSettings.builder().applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava)).build()implicit val client: MongoClient = MongoClient(settings)implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument))implicit val picDao = new MongoRepo[Photo]("testdb","photo", None)implicit val authenticator = new AuthBase().withAlgorithm(JwtAlgorithm.HS256).withSecretKey("OpenSesame").withUserFunc(getValidUser)val route =path("auth") {authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>post { complete(authenticator.issueJwt(userinfo))}}} ~pathPrefix("private") {authenticateOAuth2(realm = "private", authenticator.authenticateToken) { validToken =>FileRoute(validToken).route// ~ ...}} ~pathPrefix("public") {(pathPrefix("crud")) {new MongoRoute[Person]("person")(personDao).route ~new MongoRoute[Photo]("photo")(picDao).route}}val (port, host) = (50081,"192.168.0.189")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

ModalBase.scala

package com.datatech.rest.mongotrait ModelBase[E] {def to: E
}

MongoModel.scala

package com.datatech.rest.mongo
import org.mongodb.scala._
import com.datatech.sdp.mongo.engine._
import MGOClasses._object MongoModels {case class Person(userid: String = "",name: String = "",age: Option[Int] = None,dob: Option[MGODate] = None,address: Option[String] = None) extends ModelBase[Document] {import org.mongodb.scala.bson._override def to: Document = {var doc = Document("userid" -> this.userid,"name" -> this.name)if (this.age != None)doc = doc + ("age" -> this.age.get)if (this.dob != None)doc = doc + ("dob" -> this.dob.get)if (this.address != None)doc = doc + ("address" -> this.address.getOrElse(""))doc}}object Person {val fromDocument: Document => Person = doc => {val keyset = doc.keySetPerson(userid = doc.getString("userid"),name = doc.getString("name"),age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],dob =  {if (keyset.contains("dob"))Some(doc.getDate("dob"))else None },address =  mgoGetStringOrNone(doc,"address"))}}case class Photo (id: String,photo: Option[MGOBlob]) extends ModelBase[Document] {override def to: Document = {var doc = Document("id" -> this.id)if (photo != None)doc = doc + ("photo" -> this.photo)doc}}object Photo {def fromDocument: Document => Photo = doc => {val keyset = doc.keySetPhoto(id = doc.getString("id"),photo = mgoGetBlobOrNone(doc, "photo"))}}}

MongoRepo.scala

package com.datatech.rest.mongo
import org.mongodb.scala._
import org.bson.conversions.Bson
import org.mongodb.scala.result._
import com.datatech.sdp.mongo.engine._
import MGOClasses._
import MGOEngine._
import MGOCommands._
import com.datatech.sdp.result.DBOResult.DBOResult
import MongoModels._object MongoRepo {class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) {def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {var res = Seq[ResultOptions]()next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}val ctxFind = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_QUERY).setCommand(Find(andThen = res))mgoQuery[Seq[R]](ctxFind,converter)}def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {var res = Seq[ResultOptions]()next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}val ctxFind = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_QUERY).setCommand(Find(filter = Some(filtr),andThen = res))mgoQuery[Seq[R]](ctxFind,converter)}def getOneDocument(filtr: Bson): DBOResult[Document] = {val ctxFind = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_QUERY).setCommand(Find(filter = Some(filtr),firstOnly = true))mgoQuery[Document](ctxFind,converter)}def insert(doc: Document): DBOResult[Completed] = {val ctxInsert = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_UPDATE).setCommand(Insert(Seq(doc)))mgoUpdate[Completed](ctxInsert)}def delete(filter: Bson): DBOResult[DeleteResult] = {val ctxDelete = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_UPDATE).setCommand(Delete(filter))mgoUpdate[DeleteResult](ctxDelete)}def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {val ctxUpdate = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_UPDATE).setCommand(Update(filter,update,None,!many))mgoUpdate[UpdateResult](ctxUpdate)}def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {val ctxUpdate = MGOContext(dbName = db,collName=coll).setActionType(MGO_ACTION_TYPE.MGO_UPDATE).setCommand(Replace(filter,row))mgoUpdate[UpdateResult](ctxUpdate)}}}

MongoRoute.scala

package com.datatech.rest.mongo
import akka.http.scaladsl.server.Directivesimport scala.util._
import org.mongodb.scala._
import com.datatech.sdp.file.Streaming._
import org.mongodb.scala.result._
import MongoRepo._
import akka.stream.ActorMaterializer
import com.datatech.sdp.result.DBOResult._
import org.mongodb.scala.model.Filters._
import com.datatech.sdp.mongo.engine.MGOClasses._
import monix.execution.CancelableFuture
import akka.util._
import akka.http.scaladsl.model._
import akka.http.scaladsl.coding.Gzip
object MongoRoute {class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(implicit c: MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter {import monix.execution.Scheduler.Implicits.globalvar dbor: DBOResult[Seq[M]] = _var dbou: DBOResult[UpdateResult] = _val route = pathPrefix(pathName) {pathPrefix("blob") {(get & path(Remaining)) { id =>val filtr = equal("id", id)val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {eodoc =>eodoc match {case Right(odoc) => odoc match {case Some(doc) =>if (doc == null) Noneelse mgoGetBlobOrNone(doc, "photo")case None => None}case Left(_) => None}}onComplete(futOptPic) {case Success(optBlob) => optBlob match {case Some(blob) =>withoutSizeLimit {encodeResponseWith(Gzip) {complete(HttpEntity(ContentTypes.`application/octet-stream`,ByteArrayToSource(blob.getData)))}}case None => complete(StatusCodes.NotFound)}case Failure(err) => complete(err)}} ~(post &  parameter('id)) { id =>withoutSizeLimit {decodeRequest {extractDataBytes { bytes =>val fut = bytes.runFold(ByteString()) { case (hd, bs) =>hd ++ bs}onComplete(fut) {case Success(b) =>val doc = Document("id" -> id, "photo" -> b.toArray)val futmsg = repository.insert(doc).value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(c) => c.toString()case None => "insert may not complete!"}case Left(err) => err.getMessage}}complete(futmsg)case Failure(err) => complete(err)}}}}}} ~(get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {(filter,fields,sort,top,next) => {dbor = {filter match {case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)case None => repository.getAll(next,sort,fields,top)}}val futRows = dbor.value.value.runToFuture.map {eolr =>eolr match {case Right(olr) => olr match {case Some(lr) => lrcase None => Seq[M]()}case Left(_) => Seq[M]()}}complete(futureToJson(futRows))}} ~ post {entity(as[String]) { json =>val extractedEntity: M = fromJson[M](json)val doc: Document = extractedEntity.toval futmsg = repository.insert(doc).value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(c) => c.toString()case None => "insert may not complete!"}case Left(err) => err.getMessage}}complete(futmsg)}} ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>val bson = Document(filter)if (set == None) {entity(as[String]) { json =>val extractedEntity: M = fromJson[M](json)val doc: Document = extractedEntity.toval futmsg = repository.replace(bson, doc).value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."case None => "update may not complete!"}case Left(err) => err.getMessage}}complete(futureToJson(futmsg))}} else {set match {case Some(u) =>val ubson = Document(u)dbou = repository.update(bson, ubson, many.getOrElse(true))case None =>dbou = Left(new IllegalArgumentException("missing set statement for update!"))}val futmsg = dbou.value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."case None => "update may not complete!"}case Left(err) => err.getMessage}}complete(futureToJson(futmsg))}} ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) =>val bson = Document(filter)val futmsg = repository.delete(bson).value.value.runToFuture.map {eoc =>eoc match {case Right(oc) => oc match {case Some(d) => s"${d.getDeletedCount} rows deleted."case None => "delete may not complete!"}case Left(err) => err.getMessage}}complete(futureToJson(futmsg))}}}}

 

这篇关于restapi(4)- rest-mongo : MongoDB数据库前端的httpserver的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887191

相关文章

详解Vue如何使用xlsx库导出Excel文件

《详解Vue如何使用xlsx库导出Excel文件》第三方库xlsx提供了强大的功能来处理Excel文件,它可以简化导出Excel文件这个过程,本文将为大家详细介绍一下它的具体使用,需要的小伙伴可以了解... 目录1. 安装依赖2. 创建vue组件3. 解释代码在Vue.js项目中导出Excel文件,使用第三

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

数据库oracle用户密码过期查询及解决方案

《数据库oracle用户密码过期查询及解决方案》:本文主要介绍如何处理ORACLE数据库用户密码过期和修改密码期限的问题,包括创建用户、赋予权限、修改密码、解锁用户和设置密码期限,文中通过代码介绍... 目录前言一、创建用户、赋予权限、修改密码、解锁用户和设置期限二、查询用户密码期限和过期后的修改1.查询用

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

IDEA如何切换数据库版本mysql5或mysql8

《IDEA如何切换数据库版本mysql5或mysql8》本文介绍了如何将IntelliJIDEA从MySQL5切换到MySQL8的详细步骤,包括下载MySQL8、安装、配置、停止旧服务、启动新服务以及... 目录问题描述解决方案第一步第二步第三步第四步第五步总结问题描述最近想开发一个新应用,想使用mysq

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

vue解决子组件样式覆盖问题scoped deep

《vue解决子组件样式覆盖问题scopeddeep》文章主要介绍了在Vue项目中处理全局样式和局部样式的方法,包括使用scoped属性和深度选择器(/deep/)来覆盖子组件的样式,作者建议所有组件... 目录前言scoped分析deep分析使用总结所有组件必须加scoped父组件覆盖子组件使用deep前言

VUE动态绑定class类的三种常用方式及适用场景详解

《VUE动态绑定class类的三种常用方式及适用场景详解》文章介绍了在实际开发中动态绑定class的三种常见情况及其解决方案,包括根据不同的返回值渲染不同的class样式、给模块添加基础样式以及根据设... 目录前言1.动态选择class样式(对象添加:情景一)2.动态添加一个class样式(字符串添加:情

Java读取InfluxDB数据库的方法详解

《Java读取InfluxDB数据库的方法详解》本文介绍基于Java语言,读取InfluxDB数据库的方法,包括读取InfluxDB的所有数据库,以及指定数据库中的measurement、field、... 首先,创建一个Java项目,用于撰写代码。接下来,配置所需要的依赖;这里我们就选择可用于与Infl