Redis的Keyspace notifications功能初探

2023-12-13 09:18

本文主要是介绍Redis的Keyspace notifications功能初探,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

最近在做一套系统,其中要求若干个Worker服务器将心跳信息都上报给中央服务器。当一定时间中央服务器没有得到心跳信息时则认为该Worker失效了,发出告警。


满足这种需求的解决方法多种多样,我开始想到了memcache,上报一次心跳信息就刷新一次缓存,当缓存内心跳信息对象超时被删除,即认为对应的Worker失效。然而由于memcache的工作原理,删除都是被动的,我们无法及时判断数据是否过期,即便知道了数据过期,也没有一种机制来回调方法来执行自定义的处理动作。难道缓存架构就真的不行了吗?


答案是否定的。在Redis 2.8.0版本起,加入了“Keyspace notifications”(即“键空间通知”)的功能。如何理解该功能呢?我们来看下官方是怎么说的:

键空间通知,允许Redis客户端从“发布/订阅”通道中建立订阅关系,以便客户端能够在Redis中的数据因某种方式受到影响时收到相应事件。

可能接收到的事件举例如下:

影响一个给出的键的所有命令(会告诉你哪个键被执行了一个命令,这个命令是什么);

接收到了一个LPUSH操作的所有键(LPUSH命令:key v1 [v2 v3..]将指定的所有值从左到右进行压栈操作,形成一个栈,并将该栈命名为指定的key);

在数据库0中失效的所有键(不一定非得是数据库0,这里这样表述其实想表达可以知道影响的哪个数据库)。


看到这里我联想到,如果一条缓存数据失效了,通过订阅关系,客户端会收到消息,通过分析消息可以得知何种消息,分析消息内容可以知道是哪个key失效了。这样就可以间接实现开头所描述的功能。


接下来我们来看下实验的步骤:

1.准备redis服务器

作为开源软件,redis下载后得到的是源代码,使用tar -xzvf redis-2.8.9.tar.gz解压后对其进行编译,过程也很简单,make就可以了。编译完成之后可以使用自带的runtest进行测试,看是否编译成功。然后就是安装了,执行make PREFIX=/usr/local/redis-2.8.9 install,PREFIX参数指定的就是安装路径。现在安装的只有可执行文件,还没有配置文件。其实在源码目录中有一个模板redis.conf,我们对它进行改动就可以了。
其他配置我们不关心,但是官方文档中说Keyspace notifications功能默认是关闭的(默认地,Keyspace 时间通知功能是禁用的,因为它或多或少会使用一些CPU的资源),我们需要打开它。打开的方法也很简单,配置属性:notify-keyspace-events

默认配置是这样的:notify-keyspace-events ""
根据文档中的说明:
[plain]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. K     Keyspace events, published with __keyspace@<db>__ prefix.  
  2. E     Keyevent events, published with __keyevent@<db>__ prefix.  
  3. g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...  
  4. $     String commands  
  5. l     List commands  
  6. s     Set commands  
  7. h     Hash commands  
  8. z     Sorted set commands  
  9. x     Expired events (events generated every time a key expires)  
  10. e     Evicted events (events generated when a key is evicted for maxmemory)  
  11. A     Alias for g$lshzxe, so that the "AKE" string means all the events.  
我们配置为: notify-keyspace-events Ex,含义为:发布key事件,使用过期事件(当每一个key失效时,都会生成该事件)。

2.准备客户端和连接配置

本文中使用的客户端是Jedis,版本为2.4.2,为了代码的通用性,我使用spring来管理连接:

[html]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. <bean id="pool" class="redis.clients.jedis.JedisPool">  
  2.         <constructor-arg>  
  3.             <bean id="config" class="redis.clients.jedis.JedisPoolConfig">  
  4.                 <property name="maxIdle" value="0" />  
  5.                 <property name="maxTotal" value="20" />  
  6.                 <property name="maxWaitMillis" value="1000" />  
  7.                 <property name="testOnBorrow" value="true" />  
  8.         </bean>  
  9.         </constructor-arg>  
  10.         <constructor-arg>  
  11.             <value>192.168.1.100</value>  
  12.         </constructor-arg>  
  13.         <constructor-arg>  
  14.         <value>6379</value>  
  15.     </constructor-arg>  
  16. </bean>  
然后使用Spring Test和Junit来测试代码

[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. @RunWith(SpringJUnit4ClassRunner.class)  
  2. @ContextConfiguration("/applicationContext*.xml")  
  3. public class RedisSubscribeDemo {  
  4.       
  5.     private static final Log Log= LogFactory.getLog(RedisSubscribeDemo.class);  
  6.       
  7.     @Resource  
  8.     private JedisPool pool;  
  9.       
  10.     @Test  
  11.     public void doTest() throws InterruptedException {  
  12.         for (int i = 0; i < 1; i++) {  
  13.             TestThread thread= new TestThread(pool);  
  14.             thread.start();  
  15.         }  
  16.         Thread.sleep(50000L);  
  17.         Log.info("Test finish...");  
  18.     }  
  19. }  

由于要使用一定的延迟,我们把主要测试代码放到了TestThread中。当测试线程启动后,主线程停滞50秒,让我们有足够的时间来操作。

[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. public class TestThread extends Thread {  
  2.       
  3.     private Log log= LogFactory.getLog(TestThread.class);  
  4.       
  5.     private JedisPool pool;  
  6.       
  7.     public TestThread(JedisPool pool){  
  8.         log.info("loading test thread");  
  9.         this.pool= pool;  
  10.     }  
  11.   
  12.     @Override  
  13.     public void run() {  
  14.         Jedis jedis= pool.getResource();  
  15.         jedis.psubscribe(new MySubscribe(), "*");  
  16.         try {  
  17.             Thread.sleep(10000L);  
  18.         } catch (InterruptedException e) {  
  19.             log.info("延时失败", e);  
  20.         }  
  21.         jedis.close();  
  22.         log.info("Test run finished");  
  23.     }  
  24. }  
在测试线程中,我们将自定义的MySubscribe加入到了Jedis的模板订阅(即psubscribe,因为模板订阅的channel是支持星号'*'通配的,这样可以收集到多个通配通道的消息,而与之相反的还有一个subscribe,此订阅只能指定严格匹配的通道)中,同样为了测试过程能够将结果显示出来,在绑定了订阅后,对该线程进行了延时10秒。

[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. public class MySubscribe extends JedisPubSub {  
  2.       
  3.     private static final Log log= LogFactory.getLog(MySubscribe.class);  
  4.   
  5.     // 初始化按表达式的方式订阅时候的处理    
  6.     public void onPSubscribe(String pattern, int subscribedChannels) {    
  7.             log.info(pattern + "=" + subscribedChannels);  
  8.     }  
  9.       
  10.     // 取得按表达式的方式订阅的消息后的处理    
  11.     public void onPMessage(String pattern, String channel, String message) {    
  12.             log.info(pattern + "=" + channel + "=" + message);  
  13.     }  
  14.   
  15.     ...其他未用到的重写方法忽略  
  16. }  
作为Jedis自定义订阅,必须继承redis.clients.jedis.JedisPubSub类,在 psubscribe模式下,重点重写onPMessage方法,该方法为接收到模板订阅后处理事件的重要代码。pattern为在绑定订阅时使用的通配模板,channel为通配后符合条件的实际通道名称,message就不用多说了,就是事件消息内容。

3.实战

通过Redis自带的redis-cli命令,我们可以在服务端通过命令行的方式直接操作。我们运行上面的示例代码,然后迅速切换到redis-cli命令中,建立一个生命周期很短暂的数据:
[plain]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. 127.0.0.1:6379> set chaijunkun 123 PX 100  
PX参数指定生命周期单位为毫秒,100即声明周期,即100毫秒。key为chaijunkun的数据,其值为123。
当执行语句后,回显:
[plain]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. OK  

这时我们看实例程序的输出:
[plain]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. *=__keyevent@0__:expired=chaijunkun  
从输出可以看出,之前指定的通配符为*,通配任何通道;之后是实际的通道名称:__keyevent@0__:expired,这里我们可以看到订阅收到了一个keyevent位于数据库0,事件类型为:expired,是一个过期事件;最后是chaijunkun,这个是过期数据的key。
在官方文档中,keyevent通道的格式永远是这样的:
__keyevent@<db>__:prefix
对于数据过期事件,我们在绑定订阅时通配模板也可以精确地写成:
__keyevent@*__:expired
通过示例代码,我们可以看到确实印证了之前的构想,实现了数据过期的事件触发(event)或者说回调(callback)。

4.其他应用

之前的代码中,对于事件的发布都是由Redis自己生成的。实际上在命令中主动发布自定义消息也是可以的,在publish命令的帮助中我们看到:
[plain]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. 127.0.0.1:6379> help publish  
  2.   
  3.   PUBLISH channel message  
  4.   summary: Post a message to a channel  
  5.   since: 2.0.0  
  6.   group: pubsub  

通过参数,可以自定义通道名称和通道消息。而在Jedis中,发布API甚至做到了字节数据的级别:
jedis.publish(byte[] channel, byte[] message)
因此我们可以构想,自定一套通讯协议,channel为命令字,message为消息体,我们可以通过redis这种简单的发布/订阅机制实现消息的分发。

这篇关于Redis的Keyspace notifications功能初探的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/487903

相关文章

C语言小项目实战之通讯录功能

《C语言小项目实战之通讯录功能》:本文主要介绍如何设计和实现一个简单的通讯录管理系统,包括联系人信息的存储、增加、删除、查找、修改和排序等功能,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录功能介绍:添加联系人模块显示联系人模块删除联系人模块查找联系人模块修改联系人模块排序联系人模块源代码如下

Java中使用Java Mail实现邮件服务功能示例

《Java中使用JavaMail实现邮件服务功能示例》:本文主要介绍Java中使用JavaMail实现邮件服务功能的相关资料,文章还提供了一个发送邮件的示例代码,包括创建参数类、邮件类和执行结... 目录前言一、历史背景二编程、pom依赖三、API说明(一)Session (会话)(二)Message编程客

Redis的Zset类型及相关命令详细讲解

《Redis的Zset类型及相关命令详细讲解》:本文主要介绍Redis的Zset类型及相关命令的相关资料,有序集合Zset是一种Redis数据结构,它类似于集合Set,但每个元素都有一个关联的分数... 目录Zset简介ZADDZCARDZCOUNTZRANGEZREVRANGEZRANGEBYSCOREZ

Java CompletableFuture如何实现超时功能

《JavaCompletableFuture如何实现超时功能》:本文主要介绍实现超时功能的基本思路以及CompletableFuture(之后简称CF)是如何通过代码实现超时功能的,需要的... 目录基本思路CompletableFuture 的实现1. 基本实现流程2. 静态条件分析3. 内存泄露 bug

C#实现系统信息监控与获取功能

《C#实现系统信息监控与获取功能》在C#开发的众多应用场景中,获取系统信息以及监控用户操作有着广泛的用途,比如在系统性能优化工具中,需要实时读取CPU、GPU资源信息,本文将详细介绍如何使用C#来实现... 目录前言一、C# 监控键盘1. 原理与实现思路2. 代码实现二、读取 CPU、GPU 资源信息1.

Redis多种内存淘汰策略及配置技巧分享

《Redis多种内存淘汰策略及配置技巧分享》本文介绍了Redis内存满时的淘汰机制,包括内存淘汰机制的概念,Redis提供的8种淘汰策略(如noeviction、volatile-lru等)及其适用场... 目录前言一、什么是 Redis 的内存淘汰机制?二、Redis 内存淘汰策略1. pythonnoe

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Go语言实现将中文转化为拼音功能

《Go语言实现将中文转化为拼音功能》这篇文章主要为大家详细介绍了Go语言中如何实现将中文转化为拼音功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 有这么一个需求:新用户入职 创建一系列账号比较麻烦,打算通过接口传入姓名进行初始化。想把姓名转化成拼音。因为有些账号即需要中文也需要英

基于WinForm+Halcon实现图像缩放与交互功能

《基于WinForm+Halcon实现图像缩放与交互功能》本文主要讲述在WinForm中结合Halcon实现图像缩放、平移及实时显示灰度值等交互功能,包括初始化窗口的不同方式,以及通过特定事件添加相应... 目录前言初始化窗口添加图像缩放功能添加图像平移功能添加实时显示灰度值功能示例代码总结最后前言本文将

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制