ElasticMQ 0.7.0: Long Polling, Non-Blocking Implementation Using Akka and Spray

本文主要是介绍ElasticMQ 0.7.0: Long Polling, Non-Blocking Implementation Using Akka and Spray,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ElasticMQ 0.7.0: Long Polling, Non-Blocking Implementation Using Akka and Spray

原文作者:Adam Warski
原文地址:https://dzone.com/articles/elasticmq-070-long-polling-non
译者微博:@从流域到海域
译者博客:blog.csdn.net/solo95
本文同样刊载于腾讯云+:https://cloud.tencent.com/developer/article/1018703

ElasticMQ 0.7.0:长轮询,使用Akka和Spray的非阻塞实现

ElasticMQ 0.7.0,一个附带基于actor的Scala以及兼容Amazon SQS插件的消息队列系统刚刚发布。

这是一次重大的重写(即版本升级),升级之后将在核心使用Akka actors 并在REST层使用Spray。到目前为止,只有核心和SQS模块被重写, 日志( journaling),SQL后端和副本(replication)模块的重写尚未完成。

主要的客户端改进是:

  • 支持长轮询,这是SQS前一段时间的补充
  • 更简单的独立服务器 - 只需下载一个jar包

使用长时间的轮询的过程中,当收到消息时,可以指定一个额外的的MessageWaitTime属性。如果队列中没有消息,,ElasticMQ将等待MessageWaitTime几秒钟直到消息到达,而不是用空响应完成请求。这有助于减少带宽的使用(不需要非常频繁地进行请求),进而提高系统整体性能(发送后立即收到消息)并降低SQS成本。

独立的服务器现在是一个单一的jar包。要运行本地内存SQS实现(例如,测试使用SQS的应用程序),只需要下载jar文件并运行:

java -jar elasticmq-server-0.7.0.jar

这将在http://localhost:9324启动服务器。当然,接口和端口都是可配置的,详情请参阅自述文件。像以前一样,您也可以使用任何基于JVM的语言来运行嵌入式服务器。

实现说明

出于好奇,下面是对ElasticMQ如何实现的简短描述,包括核心系统,REST层,Akka数据流使用和长轮询实现。所有的代码都可以在GitHub上找到。

如前所述,ElasticMQ现在使用Akka和Spray来实现,并且不包含任何阻塞调用。一切都是异步的。

核心

核心系统是基于角色的。有一个主角色(main actor)(QueueManagerActor),它知道系统中当前创建了哪些队列,并提供了创建和删除队列的可能性。

为了与actor沟通,使用了类型化问答模式。例如,要查找一个队列(一个队列也是一个actor),就会定义一个消息:

case class LookupQueue(queueName:String)extends Replyable [Option [ActorRef]]

用法如下所示:

import org.elasticmq.actor.reply._
val lookupFuture: Future[Option[ActorRef]] = queueManagerActor ? LookupQueue("q2")

如前所述,每个队列都是一个actor,并且已经封装了队列状态。我们可以使用简单的可变数据结构,而不需要任何线程同步,因为角色模型(actor model)为我们处理了这个问题。有一些消息可以发送给queue-actor,例如:

case class SendMessage(message: NewMessageData)   extends Replyable[MessageData]
case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, waitForMessages: Option[Duration])     extends Replyable[List[MessageData]]
case class GetQueueStatistics(deliveryTime: Long) extends Replyable[QueueStatistics]
Rest层

SQS查询/ REST层是使用Spray来实现的,这是一个基于Akka的轻量级REST/HTTP工具包。

除了基于角色的非阻塞IO实现外,Spray还提供了强大的路由库spray-routing。它包含一些内置的指令,用于在请求方法(get / post等)上进行匹配,提取表单参数中的查询参数或匹配请求路径。但它也可以让你使用简单的指令组合来定义你自己的指令。一个典型的ElasticMQ route示例如下所示:

val listQueuesDirective = action("ListQueues") {rootPath {anyParam("QueueNamePrefix"?) { prefixOption =>// logic}}}

action"Action"URL的body参数中匹配指定的action名称并接受/拒绝请求的地方,rootPath会匹配出空路径(…)。Spray有一个很好的教程,如果你有兴趣,我建议你看看这篇教程。

如何使用路由中的队列角色(queue actors)来完成HTTP请求?

关于Spray的RequestContext好处是,它所做的只是将一个实例传递给你的路由,不需要任何回复。完全放弃请求或使用某个value完成该请求仅仅取决于它的路由。该请求也可以在另一个线程中完成 - 或者,例如,在未来某个线程运行完成时。这正是ElasticMQ所做的。在这里使用mapflatMapfor-comprehensions(这是一个针对map/ flatMap更好的语法)是非常方便的,例如(省略了一些内容):

// Looking up the queue and deleting it are going to be called in sequence,
// but asynchronously, as ? returns a Future
for {queueActor <- queueManagerActor ? LookupQueue(queueName)_ <- queueActor ? DeleteMessage(DeliveryReceipt(receipt))
} {requestContext.complete(200, "message deleted")
}

有时,当流程更复杂时,ElasticMQ会使用Akka Dataflow,这需要启用continuations插件。还有一个类似的项目,使用宏,Scala Async,但这个仍处于早期开发阶段。

使用Akka Dataflow,您可以编写使用Future们的代码,就好像编写正常的序列化代码一样。CPS插件会将其转换为在需要时使用回调。这是一个来自CreateQueueDirectives的例子:

(序列化代码sequential code,也有翻译成顺序代码的,即按顺序执行的代码,过程中不存在多线程异步操作,译者注)

flow {val queueActorOption = (queueManagerActor ? LookupQueue(newQueueData.name)).apply()queueActorOption match {case None => {val createResult = (queueManagerActor ? CreateQueue(newQueueData)).apply()createResult match {case Left(e) => throw new SQSException("Queue already created: " + e.message)case Right(_) => newQueueData}}case Some(queueActor) => {(queueActor ? GetQueueData()).apply()}}
}

这里的重要部分是flow代码块,它界定了转换的范围,以及调用Future提取future内容的apply()。这看起来像完全正常的序列化代码,但是在执行时,因为第一次Future是第一次使用将会异步运行。

长轮询

由于所有的代码都是异步和非阻塞的,实现长轮询非常容易。请注意,从一个队列接收消息时,我们得到一个Future[List[MessageData]]。为了发出响应已完成这个future,HTTP请求也将会以适当的响应来完成。然而,这个future几乎可以立即完成(例如正常情况下),比如在10秒之后 - 代码所需的支持没有变化。唯一要做的就是延迟完成future,直到指定的时间过去或新的消息到达。

实现在QueueActorWaitForMessagesOps中。当接收到消息的请求到达时,队列中没有任何内容产生,而是立即回复(即向发送者actor发送空列表),我们将储存原始请求的引用和发送方actor在map中。使用Akka调度程序,我们还计划在指定的时间超过之后发回空列表并删除条目。

当新消息到达时,我们只需从map上等待一个请求,然后尝试去完成它。同样,所有同步和并发问题都由Akka和actor模型来处理。

请测试新版本,如果您有任何反馈,请让我们知晓!

Adam

这篇关于ElasticMQ 0.7.0: Long Polling, Non-Blocking Implementation Using Akka and Spray的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/623336

相关文章

-bash: /bin/mv: Argument list too long mv

把labels下的所有文件mv到img文件夹下: mv labels/* img/ 报错: -bash: /bin/mv: Argument list too long  mv # Using find ... -exec + find folder2 -name '*.*' -exec mv --target-directory=folder '{}' +   # Using xar

BD错误集锦1——[Hive]ERROR StatusLogger No log4j2 configuration file found. Using default configuration:

错误描述:在使用IDEA进行jdbc方式连接到hive数据仓库时,出现以下错误:                ERROR StatusLogger No log4j2 configuration file found. 问题原因:缺少log4j2.xml文件   <?xml version="1.0" encoding="UTF-8"?><Configuration><Appender

论文阅读--Efficient Hybrid Zoom using Camera Fusion on Mobile Phones

这是谷歌影像团队 2023 年发表在 Siggraph Asia 上的一篇文章,主要介绍的是利用多摄融合的思路进行变焦。 单反相机因为卓越的硬件性能,可以非常方便的实现光学变焦。不过目前的智能手机,受制于物理空间的限制,还不能做到像单反一样的光学变焦。目前主流的智能手机,都是采用多摄的设计,一般来说一个主摄搭配一个长焦,为了实现主摄与长焦之间的变焦,目前都是采用数字变焦的方式,数字变焦相比于光学

在WinCE的C#编程中,需要静态调用C++的动态库,需要添加using System.Runtime.InteropServices

using System.Runtime.InteropServices;         [DllImport("Win32DLL.dll", EntryPoint = "WriteREG_SZToRegTCHAR")]         private static extern bool WriteREG_SZToRegTCHAR(int iFlag, string regKeyP

长尾分布(Long-tailed Distribution)

长尾分布( L o n g − t a i l e d D i s t r i b u t i o n Long-tailed\ Distribution Long−tailed Distribution)是统计学和概率论中的一个重要概念,用于描述一组数据中尾部(即远离均值的部分)包含了相对较多极端值的情况。以下是对长尾分布的详细解释: 定义 长尾分布是重尾分布的一个子类型,其特点是分布的尾部

1、为什么使用Long时,推荐多使用valueOf方法,少使用parseLong方法

为什么使用Long时,推荐多使用valueOf方法,少使用parseLong方法? 因为Long本身有缓存机制,缓存了-128到127范围内的Long,valueOf方法会从缓存中去拿值,如果命中缓存,会减少资源的开销,parseLong方法没有这个机制。

【Rust项目推荐】Rust search extension 0.7发布!地址栏快速搜索Rust文档、crates的浏览器插件...

大家好, Rust Search Extension是我从2018年开始开发的浏览器插件,方便大家在浏览器地址栏快速搜索官方文档、内置属性、crates和错误码。目前发布了v0.7版! 目前在Reddit上获得了将近200个赞,欢迎大家下载体验。 Reddit帖子:Announcing rust-search-extension v0.7! Search std docs, crates, e

java.sql.SQLException: Access denied for user 'root'@'localhost' (using password: YES)问题

1.问题描述:   在myeclipse中发布web服务器,通过手机客户端访问MySQL数据库(也就是myeclipse中的tomcat连接的数据库),myEclipse 的控制台(Console)报出如下错误 java.sql.SQLException: Access denied for user 'root'@'localhost' (using password: YES) 2.自己

NetSuite Non-Inventory Item 公司内外采购总账影响

上篇文章提到,Non-Inventory Item的科目维护会根据各个企业的实际情况而有所不同,通常情况下都涉及外部交易,即对外采购与销售;另外也涉及到公司内部的相关交易,本篇以采购为例,来看看公司内外采购交易所对应的总账影响。 首先,我们创建一个Non-Inventory Item物料,其Accounting标签下的Account维护如下: 需要注意的是,这里的Intercompany

SyntaxError- Non-ASCII character '-xe8' in file

python编译报错: SyntaxError: Non-ASCII character ‘\xe8’ in file xxx原因是不支持中文注释,如这种中英文混杂注释: # Subtract off the mean and divide by the variance of the pixels.#减去平均值并除以像素的方差 解决办法: 在文件第一行加上 #encoding:utf