Akka(43): Http:SSE-Server Sent Event - 服务端主推消息

2024-04-09 04:48

本文主要是介绍Akka(43): Http:SSE-Server Sent Event - 服务端主推消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   因为我了解Akka-http的主要目的不是为了有关Web-Server的编程,而是想实现一套系统集成的api,所以也需要考虑由服务端主动向客户端发送指令的应用场景。比如一个零售店管理平台的服务端在完成了某些数据更新后需要通知各零售门市客户端下载最新数据。虽然Akka-http也提供对websocket协议的支持,但websocket的网络连接是双向恒久的,适合频繁的问答交互式服务端与客户端的交流,消息结构也比较零碎。而我们面临的可能是批次型的大量数据库数据交换,只需要简单的服务端单向消息就行了,所以websocket不太合适,而Akka-http的SSE应该比较适合我们的要求。SSE模式的基本原理是服务端统一集中发布消息,各客户端持久订阅服务端发布的消息并从消息的内容中筛选出属于自己应该执行的指令,然后进行相应的处理。客户端接收SSE是在一个独立的线程里不断进行的,不会影响客户端当前的运算流程。当收到有用的消息后就会调用一个业务功能函数作为后台异步运算任务。

服务端的SSE发布是以Source[ServerSentEvent,NotUsed]来实现的。ServerSentEvent类型定义如下:

/*** Representation of a server-sent event. According to the specification, an empty data field designates an event* which is to be ignored which is useful for heartbeats.** @param data data, may span multiple lines* @param eventType optional type, must not contain \n or \r* @param id optional id, must not contain \n or \r* @param retry optional reconnection delay in milliseconds*/
final case class ServerSentEvent(data:      String,eventType: Option[String] = None,id:        Option[String] = None,retry:     Option[Int]    = None) {...}

这个类型的参数代表事件消息的数据结构。用户可以根据实际需要充分利用这个数据结构来传递消息。服务端是通过complete以SeverSentEvent类为元素 的Source来进行SSE的,如下:

    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._complete {Source.tick(2.seconds, 2.seconds, NotUsed).map( _ => processToServerSentEvent).keepAlive(1.second, () => ServerSentEvent.heartbeat)}

以上代码代表服务端定时运算processToServerSentEvent返回ServerSentEvent类型结果后发布给所有订阅的客户端。我们用一个函数processToServerSentEvent模拟重复运算的业务功能:

  private def processToServerSentEvent: ServerSentEvent = {Thread.sleep(3000)   //processing delayServerSentEvent(SyncFiles.fileToSync)}

这个函数模拟发布事件数据是某种业务运算结果,在这里代表客户端需要下载文件名称。我们用客户端request来模拟设定这个文件名称:

  object SyncFiles {var fileToSync: String = ""}private def route = {import Directives._import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._def syncRequests =pathPrefix("sync") {pathSingleSlash {post {parameter("file") { filename =>complete {SyncFiles.fileToSync = filenames"set download file to : $filename"}}}}}

客户端订阅SSE的方式如下:

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._import system.dispatcherHttp().singleRequest(Get("http://localhost:8011/events")).flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]).foreach(_.runForeach(se => downloadFiles(se.data)))

每当客户端收到SSE后即运行downloadFiles(filename)函数。downloadFiles函数定义:

  def downloadFiles(file: String) = {Thread.sleep(3000)   //process delayif (file != "")println(s"Try to download $file")}

下面是客户端程序的测试运算步骤:

    scala.io.StdIn.readLine()println("do some thing ...")Http().singleRequest(HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")).onSuccess {case msg => println(msg)}scala.io.StdIn.readLine()println("do some other things ...")Http().singleRequest(HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")).onSuccess {case msg => println(msg)}

运算结果:

do some thing ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Ordersdo some other things ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders
Try to download Items
Try to download ItemsTry to download ItemsProcess finished with exit code 0

下面是本次讨论的示范源代码:

服务端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration.DurationInt
import akka.http.scaladsl.model.sse.ServerSentEventobject SSEServer {def main(args: Array[String]): Unit = {implicit val system = ActorSystem()implicit val mat    = ActorMaterializer()Http().bindAndHandle(route, "localhost", 8011)scala.io.StdIn.readLine()system.terminate()}object SyncFiles {var fileToSync: String = ""}private def route = {import Directives._import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._def syncRequests =pathPrefix("sync") {pathSingleSlash {post {parameter("file") { filename =>complete {SyncFiles.fileToSync = filenames"set download file to : $filename"}}}}}def events =path("events") {get {complete {Source.tick(2.seconds, 2.seconds, NotUsed).map( _ => processToServerSentEvent).keepAlive(1.second, () => ServerSentEvent.heartbeat)}}}syncRequests ~ events}private def processToServerSentEvent: ServerSentEvent = {Thread.sleep(3000)   //processing delayServerSentEvent(SyncFiles.fileToSync)}
}

客户端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model._object SSEClient {def downloadFiles(file: String) = {Thread.sleep(3000)   //process delayif (file != "")println(s"Try to download $file")}def main(args: Array[String]): Unit = {implicit val system = ActorSystem()implicit val mat    = ActorMaterializer()import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._import system.dispatcherHttp().singleRequest(Get("http://localhost:8011/events")).flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]).foreach(_.runForeach(se => downloadFiles(se.data)))scala.io.StdIn.readLine()println("do some thing ...")Http().singleRequest(HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")).onSuccess {case msg => println(msg)}scala.io.StdIn.readLine()println("do some other things ...")Http().singleRequest(HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")).onSuccess {case msg => println(msg)}scala.io.StdIn.readLine()system.terminate()}
}






这篇关于Akka(43): Http:SSE-Server Sent Event - 服务端主推消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887211

相关文章

BUUCTF靶场[web][极客大挑战 2019]Http、[HCTF 2018]admin

目录   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 [web][HCTF 2018]admin 考点:弱密码字典爆破 四种方法:   [web][极客大挑战 2019]Http 考点:Referer协议、UA协议、X-Forwarded-For协议 访问环境 老规矩,我们先查看源代码

【Linux】应用层http协议

一、HTTP协议 1.1 简要介绍一下HTTP        我们在网络的应用层中可以自己定义协议,但是,已经有大佬定义了一些现成的,非常好用的应用层协议,供我们直接使用,HTTP(超文本传输协议)就是其中之一。        在互联网世界中,HTTP(超文本传输协议)是一个至关重要的协议,他定义了客户端(如浏览器)与服务器之间如何进行通信,以交换或者传输超文本(比如HTML文档)。

如何确定 Go 语言中 HTTP 连接池的最佳参数?

确定 Go 语言中 HTTP 连接池的最佳参数可以通过以下几种方式: 一、分析应用场景和需求 并发请求量: 确定应用程序在特定时间段内可能同时发起的 HTTP 请求数量。如果并发请求量很高,需要设置较大的连接池参数以满足需求。例如,对于一个高并发的 Web 服务,可能同时有数百个请求在处理,此时需要较大的连接池大小。可以通过压力测试工具模拟高并发场景,观察系统在不同并发请求下的性能表现,从而

Java Websocket实例【服务端与客户端实现全双工通讯】

Java Websocket实例【服务端与客户端实现全双工通讯】 现很多网站为了实现即时通讯,所用的技术都是轮询(polling)。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发 出HTTP request,然后由服务器返回最新的数据给客服端的浏览器。这种传统的HTTP request 的模式带来很明显的缺点 – 浏 览器需要不断的向服务器发出请求,然而HTTP

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

Anaconda 中遇到CondaHTTPError: HTTP 404 NOT FOUND for url的问题及解决办法

最近在跑一个开源项目遇到了以下问题,查了很多资料都大(抄)同(来)小(抄)异(去)的,解决不了根本问题,费了很大的劲终于得以解决,记录如下: 1、问题及过程: (myenv) D:\Workspace\python\XXXXX>conda install python=3.6.13 Solving environment: done.....Proceed ([y]/n)? yDownloa

JAVA用最简单的方法来构建一个高可用的服务端,提升系统可用性

一、什么是提升系统的高可用性 JAVA服务端,顾名思义就是23体验网为用户提供服务的。停工时间,就是不能向用户提供服务的时间。高可用,就是系统具有高度可用性,尽量减少停工时间。如何用最简单的方法来搭建一个高效率可用的服务端JAVA呢? 停工的原因一般有: 服务器故障。例如服务器宕机,服务器网络出现问题,机房或者机架出现问题等;访问量急剧上升,导致服务器压力过大导致访问量急剧上升的原因;时间和

red5-server源码

red5-server源码:https://github.com/Red5/red5-server

构建高性能WEB之HTTP首部优化

0x00 前言 在讨论浏览器优化之前,首先我们先分析下从客户端发起一个HTTP请求到用户接收到响应之间,都发生了什么?知己知彼,才能百战不殆。这也是作为一个WEB开发者,为什么一定要深入学习TCP/IP等网络知识。 0x01 到底发生什么了? 当用户发起一个HTTP请求时,首先客户端将与服务端之间建立TCP连接,成功建立连接后,服务端将对请求进行处理,并对客户端做出响应,响应内容一般包括响应

Golang支持平滑升级的HTTP服务

前段时间用Golang在做一个HTTP的接口,因编译型语言的特性,修改了代码需要重新编译可执行文件,关闭正在运行的老程序,并启动新程序。对于访问量较大的面向用户的产品,关闭、重启的过程中势必会出现无法访问的情况,从而影响用户体验。 使用Golang的系统包开发HTTP服务,是无法支持平滑升级(优雅重启)的,本文将探讨如何解决该问题。 一、平滑升级(优雅重启)的一般思路 一般情况下,要实现平滑