NSQ消息队列---总结篇

2024-04-25 17:36
文章标签 总结 队列 消息 nsq

本文主要是介绍NSQ消息队列---总结篇,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

架构

概念

nsqlookup:存储了nsqd的元数据和服务信息(endpoind),向消费者提供服务发现功能, 向nsqadmin提供数据查询功能。

nsqd: 是接收、队列和传送消息到客户端的守护进程。

nsqadmin:简单的管理界面,展示了topic, channel以及channel上的消费者,也可以创建topic,channel。

消息可靠性

(1)生产者不保证消息可靠

(2)消费者保证至少一次消费

发送逻辑

(1)根据配置指定的nsqd的ip, 选择一个机器,通过 HTTP API(也可以TCP)将消息发布到 nsqd的指定 topic

(2)当 producer初次发布的消息的 topic不存在,则会创建。

(3)对topic加锁,将消息发送给 memoryMsgChan中,然后释放锁。如果 memoryMsgChan满了,申请一个buff,把消息写到 Backend,后期被 backendMsgChan接收。

(4)messagePump 不断从 memoryChan/backend队列中读消息,并将消息每个复制一遍,发送给 topic下的所有channel

//nsqd/topic.go:220
func (t *Topic) messagePump() {...for {select {case msg = <-memoryMsgChan:case buf = <-backendChan:msg, err = decodeMessage(buf)...case <-t.channelUpdateChan:...case pause := <-t.pauseChan:...case <-t.exitChan:goto exit}for i, channel := range chans {chanMsg := msgif i > 0 {chanMsg = NewMessage(msg.ID, msg.Body)chanMsg.Timestamp = msg.TimestampchanMsg.deferred = msg.deferred}if chanMsg.deferred != 0 {channel.PutMessageDeferred(chanMsg, chanMsg.deferred)continue}err := channel.PutMessage(chanMsg)...}}
}

(5)channel的PutMessage和 topic类似,首先先写 memoryMsgChan, 满了写入 backend.

(6)protocol实例的messagePump方法从memoryMsgChan或backendMsgChan读取消息并通过p.SendMessage(client, msg)发送到客户端 ,消息写入client.Writer。

消费逻辑

(1)consumer将会从 nsqlookup 服务器节点上发现所有包含事件 topic的 nsqd节点。每个consumer向每个 nsqd主机进行订阅操作。

(2)根据获取到的机器信息,通过 TCPsubscribe 自己需要的channel。如果 topic 或者 channel没有创建,则会创建

(3)多个 consumer对应一个 channel, 每个消息将被传递到一个随机的 consumer中。

消费特点:

(1)支持延时消息。

(2)channel在consumer退出后并不会删除。

消息消费失败

这个担保是作为协议和工作流的一部分,工作原理如下(假设客户端成功连接并订阅一个话题):

1)客户表示已经准备好接收消息

2)NSQ 发送一条消息,并暂时将数据存储在本地(在 re-queue 或 timeout,采用大小堆【超时时间排序】和map存储)

3)客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队消息 (有个协程定时查看,根据当前时间跟最大堆的顶元素比较)。

消费者限流

就是客户端连接上nsqd之后,会告诉nsqd它的可接受的消息数量是多少,每当nsqd给客户端推送一条消息这个RDY就会减一,而客户端消费完毕并且发送一个FIN之后,这个RDY又会加一(其实这个设计有点类似tcp中的用来控制流量的窗口机制)。

客户端库的被设计成在 RDY 数达到配置 max-in-flight 的 25% 发送一个命令来更新 RDY 计数(并适当考虑连接到多个 nsqd 情况下,适当地分配)。

本地初始化consumer时配置 maxInFlight, 在服务端也配置一个 max_in_flight。每个连接默认:2500

客户端的职责:

  1. 引导并均匀地将max_in_flight配置分配给所有连接。

  2. 绝不允许所有连接的RDY计数总和(total_rdy_count)超过所配置的max_in_flight。

  3. 永远不要超过nsqd配置的每个连接的max_rdy_count。

  4. 公开API方法以可靠地指示消息流的不足(message flow starvation)。

客户端库应始终尝试在所有连接之间平均分配RDY计数。通常,此实现为max_in_flight / num_conns。

特性:

1.采用push机制,保证至少一次推送

性能:

单个节点性能

声明:请牢记 NSQ 设计的初衷是分布式。单个节点的性能非常重要,但这并不是我们所追求的。

  • 2012 MacBook Air i7 2ghz

  • go1.2

  • NSQ v0.2.24

  • 200 byte messages

GOMAXPROCS=1 (单个生产者,单个消费者)

$ ./bench.sh results... PUB: 2014/01/12 22:09:08 duration: 2.311925588s - 82.500mb/s - 432539.873ops/s - 2.312us/op SUB: 2014/01/12 22:09:19 duration: 6.009749983s - 31.738mb/s - 166396.273ops/s - 6.010us/op

GOMAXPROCS=4 (4 publishers, 4 consumers)

$ ./bench.sh results... PUB: 2014/01/13 16:58:05 duration: 1.411492441s - 135.130mb/s - 708469.965ops/s - 1.411us/op SUB: 2014/01/13 16:58:16 duration: 5.251380583s - 36.321mb/s - 190426.114ops/s - 5.251us/op

引用:

分布式实时消息平台NSQ - 知乎

https://juejin.cn/post/6932865148784902158

http://nsqio.cn/design.html

这篇关于NSQ消息队列---总结篇的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/935346

相关文章

SpringKafka错误处理(重试机制与死信队列)

《SpringKafka错误处理(重试机制与死信队列)》SpringKafka提供了全面的错误处理机制,通过灵活的重试策略和死信队列处理,下面就来介绍一下,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、Spring Kafka错误处理基础二、配置重试机制三、死信队列实现四、特定异常的处理策略五

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

java常见报错及解决方案总结

《java常见报错及解决方案总结》:本文主要介绍Java编程中常见错误类型及示例,包括语法错误、空指针异常、数组下标越界、类型转换异常、文件未找到异常、除以零异常、非法线程操作异常、方法未定义异常... 目录1. 语法错误 (Syntax Errors)示例 1:解决方案:2. 空指针异常 (NullPoi

Java反转字符串的五种方法总结

《Java反转字符串的五种方法总结》:本文主要介绍五种在Java中反转字符串的方法,包括使用StringBuilder的reverse()方法、字符数组、自定义StringBuilder方法、直接... 目录前言方法一:使用StringBuilder的reverse()方法方法二:使用字符数组方法三:使用自

Python依赖库的几种离线安装方法总结

《Python依赖库的几种离线安装方法总结》:本文主要介绍如何在Python中使用pip工具进行依赖库的安装和管理,包括如何导出和导入依赖包列表、如何下载和安装单个或多个库包及其依赖,以及如何指定... 目录前言一、如何copy一个python环境二、如何下载一个包及其依赖并安装三、如何导出requirem

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

Rust格式化输出方式总结

《Rust格式化输出方式总结》Rust提供了强大的格式化输出功能,通过std::fmt模块和相关的宏来实现,主要的输出宏包括println!和format!,它们支持多种格式化占位符,如{}、{:?}... 目录Rust格式化输出方式基本的格式化输出格式化占位符Format 特性总结Rust格式化输出方式