【Redis】Redis 的消息队列 List、Streams—(六)

2024-08-29 14:52
文章标签 redis 队列 list 消息 streams

本文主要是介绍【Redis】Redis 的消息队列 List、Streams—(六),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

    • 一、消息队列
    • 二、List 方案
    • 三、Streams 方案

在这里插入图片描述
在这里插入图片描述

一、消息队列

我们一般把消息队列中发送消息的组件称为生产者,把接收消息的组件称为消费者,下图是一个通用的消息队列的架构模型:
消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。

  • (1)消息保序

虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。

  • (2)重复消息处理

消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。如果多次处理重复消息的话,就可能造成一个业务逻辑被多次执行,从而出现数据问题。

  • (3)消息可靠性保证

消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。

二、List 方案

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,已经能满足消息 保序 的需求了。具体来说,生产者可以使用 LPUSH 命令把要发送的消息依次写入 List,而消费者则可以使用 RPOP 命令,从 List 的另一端按照消息的写入顺序,依次读取消息并进行处理。
在这里插入图片描述

List 并不会主动地通知消费者有新消息写入,如果消费者循环调用 RPOP 命令又会带来 CPU 开销问题。Redis 提供了 BRPOP 命令,称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

在解决 重复消息处理 的问题上,一方面,消息队列要能给每一个消息提供全局唯一的 ID,另一方面,消费者程序要把已经处理过的消息的 ID 记录下来,如果已经处理过,消费者程序就不再进行处理了。这种处理特性也称为幂等性,指对于同一条消息,消费者收到一次或多次的处理结果是一致的。

不过,List 本身是不会为每个消息生成 ID,所以,消息的全局唯一 ID 需要生产者程序在发送消息前自行生成,并包含在消息中以供消费者处理。

为了 保证消息可靠性 ,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存,这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
在这里插入图片描述

三、Streams 方案

如果生产者消息发送很快,而消费者处理消息的速度比较慢,会导致 List 中的消息越积越多,给 Redis 的内存带来很大压力,而 List 并不支持多个消费者同时处理。这时候就要用到 Redis 从 5.0 版本开始提供的 Streams 数据类型了。Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令:

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID
  • XREAD:用于读取消息,可以按 ID 读取数据
  • XREADGROUP:按消费组形式读取消息
  • XPENDING:命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息
  • XACK:命令用于向消息队列确认消息处理已完成
  • XADD 命令插入新消息的格式是键 - 值对形式,例如往名称为 mqstream 的消息队

列中插入一条消息:

XADD mqstream * repo 5
"1599203861727-0"

其中,* 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID,例如“1599203861727-0”。也可以不用 *,直接在消息队列名称后自行设定一个 ID,只要保证全局唯一就行。

自动生成的 ID 由两部分组成,第一部分“1599203861727”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,从 0 开始。例如,“1599203861727-0”就表示在“1599203861727”毫秒内的第 1 条消息。

XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。设定 block 配置项,可实现类似于 BRPOP 的阻塞读取操作,单位是毫秒。例如,从 ID 为 1599203861727-0 的消息开始,读取后续的所有消息(共 3 条)

XREAD BLOCK 100 STREAMS  mqstream 1599203861727-0
1) 1) "mqstream"2) 1) 1) "1599274912765-0"2) 1) "repo"2) "3"2) 1) "1599274925823-0"2) 1) "repo"2) "2"3) 1) "1599274927910-0"2) 1) "repo"2) "1"

再看一个例子,命令以 $ 结尾表示读取最新的消息,同时设置了 block 10000 的配置项,表明 XREAD 在读取最新消息时,如果没有消息到来将阻塞 10000 毫秒(即 10 秒),然后再返回。当消息队列 mqstream 中一直没有消息时,XREAD 在 10 秒后返回空值(nil)

XREAD block 10000 streams mqstream $
(nil)
(10.00s)

XGROUP 创建消费组,是区别于 List 的功能,创建后 Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。

例如,我们执行下面的命令,创建一个名为 group1 的消费组,这个消费组消费的消息队列是 mqstream

XGROUP create mqstream group1 0
OK

执行命令,让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。

在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以,consumer1 就得到 mqstream 消息队列中的所有消息共4条。

XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"2) 1) 1) "1599203861727-0"2) 1) "repo"2) "5"2) 1) "1599274912765-0"2) 1) "repo"2) "3"3) 1) "1599274925823-0"2) 1) "repo"2) "2"4) 1) "1599274927910-0"2) 1) "repo"2) "1"

如果队列中的消息已经被其他消费者读取,则其他消费者无法读取,例如,再让 group1 内的 consumer2 读取消息时,返回空值。

XREADGROUP group group1 consumer2  streams mqstream 0
1) 1) "mqstream"2) (empty list or set)

消费组的目的是让组内的多个消费者共同分担读取,从而实现负载均衡,例如,让 group2 中的 consumer1、2、3 各自读取一条消息

XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"2) 1) 1) "1599203861727-0"2) 1) "repo"2) "5"XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"2) 1) 1) "1599274912765-0"2) 1) "repo"2) "3"XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"2) 1) 1) "1599274925823-0"2) 1) "repo"2) "2"

为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

例如,查看一下 group2 中各个消费者已读取、但尚未确认的消息个数。其中,XPENDING 返回结果的第二、三行分别表示 group2 中所有消费者读取的消息最小 ID 和最大 ID。

XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"2) "1"2) 1) "consumer2"2) "1"3) 1) "consumer3"2) "1"

如果需要进一步查看某个消费者具体读取了哪些数据,可以执行以下命令,consumer2 已读取的消息的 ID 是 1599274912765-0

XPENDING mqstream group2 - + 10 consumer2
1) 1) "1599274912765-0"2) "consumer2"3) (integer) 5133364) (integer) 1

当 1599274912765-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

 XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)

一张表格,汇总了用 List 和 Streams 实现消息队列的特点和区别
在这里插入图片描述

Redis 是一个非常轻量级的键值数据库,Kafka、RabbitMQ 是专门面向消息队列场景的重量级软件,例如 Kafka 的运行就需要再部署 ZooKeeper。

如果分布式系统中的组件消息通信量不大,那么,Redis 只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis 的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。

这篇关于【Redis】Redis 的消息队列 List、Streams—(六)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1118179

相关文章

Redis分片集群的实现

《Redis分片集群的实现》Redis分片集群是一种将Redis数据库分散到多个节点上的方式,以提供更高的性能和可伸缩性,本文主要介绍了Redis分片集群的实现,具有一定的参考价值,感兴趣的可以了解一... 目录1. Redis Cluster的核心概念哈希槽(Hash Slots)主从复制与故障转移2.

Java中List的contains()方法的使用小结

《Java中List的contains()方法的使用小结》List的contains()方法用于检查列表中是否包含指定的元素,借助equals()方法进行判断,下面就来介绍Java中List的c... 目录详细展开1. 方法签名2. 工作原理3. 使用示例4. 注意事项总结结论:List 的 contain

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

Redis 中的热点键和数据倾斜示例详解

《Redis中的热点键和数据倾斜示例详解》热点键是指在Redis中被频繁访问的特定键,这些键由于其高访问频率,可能导致Redis服务器的性能问题,尤其是在高并发场景下,本文给大家介绍Redis中的热... 目录Redis 中的热点键和数据倾斜热点键(Hot Key)定义特点应对策略示例数据倾斜(Data S

java streamfilter list 过滤的实现

《javastreamfilterlist过滤的实现》JavaStreamAPI中的filter方法是过滤List集合中元素的一个强大工具,可以轻松地根据自定义条件筛选出符合要求的元素,本文就来... 目录1. 创建一个示例List2. 使用Stream的filter方法进行过滤3. 自定义过滤条件1. 定

redis+lua实现分布式限流的示例

《redis+lua实现分布式限流的示例》本文主要介绍了redis+lua实现分布式限流的示例,可以实现复杂的限流逻辑,如滑动窗口限流,并且避免了多步操作导致的并发问题,具有一定的参考价值,感兴趣的可... 目录为什么使用Redis+Lua实现分布式限流使用ZSET也可以实现限流,为什么选择lua的方式实现

Redis中管道操作pipeline的实现

《Redis中管道操作pipeline的实现》RedisPipeline是一种优化客户端与服务器通信的技术,通过批量发送和接收命令减少网络往返次数,提高命令执行效率,本文就来介绍一下Redis中管道操... 目录什么是pipeline场景一:我要向Redis新增大批量的数据分批处理事务( MULTI/EXE

Redis中高并发读写性能的深度解析与优化

《Redis中高并发读写性能的深度解析与优化》Redis作为一款高性能的内存数据库,广泛应用于缓存、消息队列、实时统计等场景,本文将深入探讨Redis的读写并发能力,感兴趣的小伙伴可以了解下... 目录引言一、Redis 并发能力概述1.1 Redis 的读写性能1.2 影响 Redis 并发能力的因素二、

Redis中的常用的五种数据类型详解

《Redis中的常用的五种数据类型详解》:本文主要介绍Redis中的常用的五种数据类型详解,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Redis常用的五种数据类型一、字符串(String)简介常用命令应用场景二、哈希(Hash)简介常用命令应用场景三、列表(L