本文主要是介绍Akka(42): Http:身份验证 - authentication, authorization and use of raw headers,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
当我们把Akka-http作为数据库数据交换工具时,数据是以Source[ROW,_]形式存放在Entity里的。很多时候除数据之外我们可能需要进行一些附加的信息传递如对数据的具体处理方式等。我们可以通过Akka-http的raw-header来实现附加自定义消息的传递,这项功能可以通过Akka-http提供的raw-header筛选功能来实现。在客户端我们把附加消息放在HttpRequest的raw header里,如下:
import akka.http.scaladsl.model.headers._val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows").addHeader(RawHeader("action","insert:county"))
在这里客户端注明上传数据应插入county表。服务端可以像下面这样获取这项信息:
optionalHeaderValueByName("action") {case Some(action) =>entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete(s"Received rows for $action")}case None => complete ("No action specified!")}
Akka-http通过Credential类的Directive提供了authentication和authorization。在客户端可以用下面的方法提供自己的用户身份信息:
import akka.http.scaladsl.model.headers._val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows").addHeader(RawHeader("action","insert:county")).addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))
服务端对客户端的身份验证处理方法如下:
import akka.http.scaladsl.server.directives.Credentialsdef myUserPassAuthenticator(credentials: Credentials): Future[Option[User]] = {implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")credentials match {case p @ Credentials.Provided(id) =>Future {// potentiallyif (p.verify("p4ssw0rd")) Some(User(id))else None}case _ => Future.successful(None)}}case class User(name: String)val validUsers = Set("john","peter","tiger","susan")def hasAdminPermissions(user: User): Future[Boolean] = {implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")Future.successful(validUsers.contains(user.name))}
下面是Credential-Directive的使用方法:
authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>authorizeAsync(_ => hasPermissions(user)) {withoutSizeLimit {handleExceptions(postExceptionHandler) {optionalHeaderValueByName("action") {case Some(action) =>entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete(s"Received rows for $action sent from $user")}case None => complete(s"$user did not specify action for uploaded rows!")}}}}}
下面是本次讨论的示范代码:
客户端:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.model._
import spray.json._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)implicit val countyFormat = jsonFormat2(County)
}object HttpClientDemo extends App {import Converters._implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherimplicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()import akka.util.ByteStringimport akka.http.scaladsl.model.HttpEntity.limitableByteSourceval source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}def countyToByteString(c: County) = {ByteString(c.toJson.toString)}val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)val rowBytes = limitableByteSource(source via flowCountyToByteString)import akka.http.scaladsl.model.headers._val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows").addHeader(RawHeader("action","insert:county")).addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))val data = HttpEntity(ContentTypes.`application/json`,rowBytes)def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {val futResp = Http(sys).singleRequest(request.copy(entity = dataEntity))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}uploadRows(request,data)scala.io.StdIn.readLine()sys.terminate()}
服务端:
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }implicit val countyFormat = jsonFormat2(County)
}object HttpServerDemo extends App {import Converters._implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherimplicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)def postExceptionHandler: ExceptionHandler =ExceptionHandler {case _: RuntimeException =>extractRequest { req =>req.discardEntityBytes()complete((StatusCodes.InternalServerError.intValue, "Upload Failed!"))}}import akka.http.scaladsl.server.directives.Credentialsdef userPassAuthenticator(credentials: Credentials): Future[Option[User]] = {implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")credentials match {case p @ Credentials.Provided(id) =>Future {// potentiallyif (p.verify("p4ssw0rd")) Some(User(id))else None}case _ => Future.successful(None)}}case class User(name: String)val validUsers = Set("john","peter","tiger","susan")def hasPermissions(user: User): Future[Boolean] = {implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")Future.successful(validUsers.contains(user.name))}val route =path("rows") {get {complete {source}} ~post {authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>authorizeAsync(_ => hasPermissions(user)) {withoutSizeLimit {handleExceptions(postExceptionHandler) {optionalHeaderValueByName("action") {case Some(action) =>entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete(s"Received rows for $action sent from $user")}case None => complete(s"$user did not specify action for uploaded rows!")}}}}}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}
这篇关于Akka(42): Http:身份验证 - authentication, authorization and use of raw headers的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!