Redis源码分析(三十)--- pubsub发布订阅模式

2024-04-28 00:32

本文主要是介绍Redis源码分析(三十)--- pubsub发布订阅模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Redis源码分析(三十)--- pubsub发布订阅模式


        今天学习了Redis中比较高大上的名词,“发布订阅模式”,发布订阅模式这个词在我最开始接触听说的时候是在JMS(Java Message Service)java消息服务中听说的。这个名次用通俗的一点话说,就是我订阅了这类消息,当只有这类的消息进行广播发送的时候,我才会,其他的消息直接过滤,保证了一个高效的传输效率。下面切入正题,学习一下Redis是如何实现这个发布订阅模式的。先看看里面的简单的API构造;

/*-----------------------------------------------------------------------------* Pubsub low level API*----------------------------------------------------------------------------*/
void freePubsubPattern(void *p) /* 释放发布订阅的模式 */
int listMatchPubsubPattern(void *a, void *b) /* 发布订阅模式是否匹配 */
int clientSubscriptionsCount(redisClient *c) /* 返回客户端的所订阅的数量,包括channels + patterns管道和模式 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) /* Client订阅一个Channel管道 */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) /* 取消订阅Client中的Channel */
int pubsubSubscribePattern(redisClient *c, robj *pattern) /* Client客户端订阅一种模式 */
int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) /* Client客户端取消订阅pattern模式 */
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) /* 客户端取消自身订阅的所有Channel */
int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) /* 客户端取消订阅所有的pattern模式 */
int pubsubPublishMessage(robj *channel, robj *message) /* 为所有订阅了Channel的Client发送消息message *//* ------------PUB/SUB API ---------------- */
void subscribeCommand(redisClient *c) /* 订阅Channel的命令 */
void unsubscribeCommand(redisClient *c) /* 取消订阅Channel的命令 */
void psubscribeCommand(redisClient *c) /* 订阅模式命令 */
void punsubscribeCommand(redisClient *c) /* 取消订阅模式命令 */
void publishCommand(redisClient *c) /* 发布消息命令 */
void pubsubCommand(redisClient *c) /* 发布订阅命令 */
在这里面出现了高频的词Pattern(模式)和Channel(频道,叫管道比较别扭),也就是说,后续所有的关于发布订阅的东东都是基于这2者展开进行的。现在大致讲解一下在Redis中是如何实现此中模式的:

1.在RedisClient 内部维护了一个pubsub_channels的Channel列表,记录了此客户端所订阅的频道

2.在Server服务端,同样维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每一个Channel对应着一批订阅了此频道的Client,也就是Channel-->list of Clients

3.当一个Client publish一个message的时候,会先去服务端的pubsub_channels找相应的Channel,遍历里面的Client,然后发送通知,即完成了整个发布订阅模式。

    我们可以简单的看一下Redis订阅一个Channel的方法实现;

/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or* 0 if the client was already subscribed to that channel. */
/* Client订阅一个Channel管道 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {struct dictEntry *de;list *clients = NULL;int retval = 0;/* Add the channel to the client -> channels hash table *///在Client的字典pubsub_channels中添加Channelif (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {retval = 1;incrRefCount(channel);/* Add the client to the channel -> list of clients hash table *///添加Clietn到server中的pubsub_channels,对应的列表中de = dictFind(server.pubsub_channels,channel);if (de == NULL) {//如果此频道的Client列表为空,则创建新列表并添加clients = listCreate();dictAdd(server.pubsub_channels,channel,clients);incrRefCount(channel);} else {//否则,获取这个频道的客户端列表,在尾部添加新的客户端clients = dictGetVal(de);}listAddNodeTail(clients,c);}/* Notify the client *///添加给回复客户端addReply(c,shared.mbulkhdr[3]);addReply(c,shared.subscribebulk);addReplyBulk(c,channel);addReplyLongLong(c,clientSubscriptionsCount(c));return retval;
}
添加操作主要分2部,Client自身的内部维护的pubsub_channels的添加,是一个dict字典对象,然后,是server端维护的pubsub_channels中的client列表的添加。在进行Channel频道的删除的时候,也是执行的这2步骤操作:

/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or* 0 if the client was not subscribed to the specified channel. */
/* 取消订阅Client中的Channel */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {struct dictEntry *de;list *clients;listNode *ln;int retval = 0;/* Remove the channel from the client -> channels hash table */incrRefCount(channel); /* channel may be just a pointer to the same objectwe have in the hash tables. Protect it... *///字典删除Client中pubsub_channels中的Channelif (dictDelete(c->pubsub_channels,channel) == DICT_OK) {retval = 1;/* Remove the client from the channel -> clients list hash table *///再移除Channel对应的Client列表de = dictFind(server.pubsub_channels,channel);redisAssertWithInfo(c,NULL,de != NULL);clients = dictGetVal(de);ln = listSearchKey(clients,c);redisAssertWithInfo(c,NULL,ln != NULL);listDelNode(clients,ln);if (listLength(clients) == 0) {/* Free the list and associated hash entry at all if this was* the latest client, so that it will be possible to abuse* Redis PUBSUB creating millions of channels. */dictDelete(server.pubsub_channels,channel);}}/* Notify the client */if (notify) {addReply(c,shared.mbulkhdr[3]);addReply(c,shared.unsubscribebulk);addReplyBulk(c,channel);addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));}decrRefCount(channel); /* it is finally safe to release it */return retval;
}
里面还有对应的模式的订阅和取消订阅的操作,原理和channel完全一致,二者的区别在于,pattern是用来匹配的Channel的,这个是什么意思呢。在后面会做出答案,接着看。最后看一个最最核心的方法,客户端发步消息方法:

/* Publish a message */
/* 为所有订阅了Channel的Client发送消息message */
int pubsubPublishMessage(robj *channel, robj *message) {int receivers = 0;struct dictEntry *de;listNode *ln;listIter li;/* Send to clients listening for that channel *///找到Channel所对应的dictEntryde = dictFind(server.pubsub_channels,channel);if (de) {//获取此Channel对应的客户单列表list *list = dictGetVal(de);listNode *ln;listIter li;listRewind(list,&li);while ((ln = listNext(&li)) != NULL) {//依次取出List中的客户单,添加消息回复redisClient *c = ln->value;addReply(c,shared.mbulkhdr[3]);addReply(c,shared.messagebulk);addReplyBulk(c,channel);//添加消息回复addReplyBulk(c,message);receivers++;}}/* Send to clients listening to matching channels *//* 发送给尝试匹配该Channel的客户端消息 */if (listLength(server.pubsub_patterns)) {listRewind(server.pubsub_patterns,&li);channel = getDecodedObject(channel);while ((ln = listNext(&li)) != NULL) {pubsubPattern *pat = ln->value;//客户端的模式如果匹配了Channel,也会发送消息if (stringmatchlen((char*)pat->pattern->ptr,sdslen(pat->pattern->ptr),(char*)channel->ptr,sdslen(channel->ptr),0)) {addReply(pat->client,shared.mbulkhdr[4]);addReply(pat->client,shared.pmessagebulk);addReplyBulk(pat->client,pat->pattern);addReplyBulk(pat->client,channel);addReplyBulk(pat->client,message);receivers++;}}decrRefCount(channel);}return receivers;
}
pattern的作用就在上面体现了,如果某种pattern匹配了Channel频道,则模式的客户端也会接收消息。在server->pubsub_patterns中, pubsub_patterns是一个list列表,里面的每一个pattern只对应一个Client,就是上面的pat->client,这一点和Channel还是有本质的区别的。讲完发布订阅模式的基本操作后,顺便把与此相关的notify通知类也稍稍讲讲,通知只有3个方法,
/* ----------------- API ------------------- */
int keyspaceEventsStringToFlags(char *classes) /* 键值字符类型转为对应的Class类型 */
sds keyspaceEventsFlagsToString(int flags) /* 通过输入的flag值类,转为字符类型*/
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) /* 发布通知方法,分为2类,keySpace的通知,keyEvent的通知 */
涉及到string To flag 和flag To String 的转换,也不知道这个会在哪里用到;
/* Turn a string representing notification classes into an integer* representing notification classes flags xored.** The function returns -1 if the input contains characters not mapping to* any class. */
/* 键值字符类型转为对应的Class类型 */
int keyspaceEventsStringToFlags(char *classes) {char *p = classes;int c, flags = 0;while((c = *p++) != '\0') {switch(c) {case 'A': flags |= REDIS_NOTIFY_ALL; break;case 'g': flags |= REDIS_NOTIFY_GENERIC; break;case '$': flags |= REDIS_NOTIFY_STRING; break;case 'l': flags |= REDIS_NOTIFY_LIST; break;case 's': flags |= REDIS_NOTIFY_SET; break;case 'h': flags |= REDIS_NOTIFY_HASH; break;case 'z': flags |= REDIS_NOTIFY_ZSET; break;case 'x': flags |= REDIS_NOTIFY_EXPIRED; break;case 'e': flags |= REDIS_NOTIFY_EVICTED; break;case 'K': flags |= REDIS_NOTIFY_KEYSPACE; break;case 'E': flags |= REDIS_NOTIFY_KEYEVENT; break;default: return -1;}}return flags;
}
应该是响应键盘输入的类型和Redis类型之间的转换。在notify的方法还有一个event事件的通知方法:

/* The API provided to the rest of the Redis core is a simple function:** notifyKeyspaceEvent(char *event, robj *key, int dbid);** 'event' is a C string representing the event name.* 'key' is a Redis object representing the key name.* 'dbid' is the database ID where the key lives.  */
/* 发布通知方法,分为2类,keySpace的通知,keyEvent的通知 */ 
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {sds chan;robj *chanobj, *eventobj;int len = -1;char buf[24];/* If notifications for this class of events are off, return ASAP. */if (!(server.notify_keyspace_events & type)) return;eventobj = createStringObject(event,strlen(event));//2种的通知形式,略有差别/* __keyspace@<db>__:<key> <event> notifications. */if (server.notify_keyspace_events & REDIS_NOTIFY_KEYSPACE) {chan = sdsnewlen("__keyspace@",11);len = ll2string(buf,sizeof(buf),dbid);chan = sdscatlen(chan, buf, len);chan = sdscatlen(chan, "__:", 3);chan = sdscatsds(chan, key->ptr);chanobj = createObject(REDIS_STRING, chan);//上述几步操作,组件格式字符串,最后发布消息,下面keyEvent的通知同理pubsubPublishMessage(chanobj, eventobj);decrRefCount(chanobj);}/* __keyevente@<db>__:<event> <key> notifications. */if (server.notify_keyspace_events & REDIS_NOTIFY_KEYEVENT) {chan = sdsnewlen("__keyevent@",11);if (len == -1) len = ll2string(buf,sizeof(buf),dbid);chan = sdscatlen(chan, buf, len);chan = sdscatlen(chan, "__:", 3);chan = sdscatsds(chan, eventobj->ptr);chanobj = createObject(REDIS_STRING, chan);pubsubPublishMessage(chanobj, key);decrRefCount(chanobj);}decrRefCount(eventobj);
}
有keySpace和keyEvent的2种事件通知。具体怎么用,等后面碰到的时候在看看。

这篇关于Redis源码分析(三十)--- pubsub发布订阅模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/941898

相关文章

如何开启和关闭3GB模式

https://jingyan.baidu.com/article/4d58d5414dfc2f9dd4e9c082.html

十四、观察者模式与访问者模式详解

21.观察者模式 21.1.课程目标 1、 掌握观察者模式和访问者模式的应用场景。 2、 掌握观察者模式在具体业务场景中的应用。 3、 了解访问者模式的双分派。 4、 观察者模式和访问者模式的优、缺点。 21.2.内容定位 1、 有 Swing开发经验的人群更容易理解观察者模式。 2、 访问者模式被称为最复杂的设计模式。 21.3.观察者模式 观 察 者 模 式 ( Obser

[职场] 公务员的利弊分析 #知识分享#经验分享#其他

公务员的利弊分析     公务员作为一种稳定的职业选择,一直备受人们的关注。然而,就像任何其他职业一样,公务员职位也有其利与弊。本文将对公务员的利弊进行分析,帮助读者更好地了解这一职业的特点。 利: 1. 稳定的职业:公务员职位通常具有较高的稳定性,一旦进入公务员队伍,往往可以享受到稳定的工作环境和薪资待遇。这对于那些追求稳定的人来说,是一个很大的优势。 2. 薪资福利优厚:公务员的薪资和

Windows/macOS/Linux 安装 Redis 和 Redis Desktop Manager 可视化工具

本文所有安装都在macOS High Sierra 10.13.4进行,Windows安装相对容易些,Linux安装与macOS类似,文中会做区分讲解 1. Redis安装 1.下载Redis https://redis.io/download 把下载的源码更名为redis-4.0.9-source,我喜欢跟maven、Tomcat放在一起,就放到/Users/zhan/Documents

springboot家政服务管理平台 LW +PPT+源码+讲解

3系统的可行性研究及需求分析 3.1可行性研究 3.1.1技术可行性分析 经过大学四年的学习,已经掌握了JAVA、Mysql数据库等方面的编程技巧和方法,对于这些技术该有的软硬件配置也是齐全的,能够满足开发的需要。 本家政服务管理平台采用的是Mysql作为数据库,可以绝对地保证用户数据的安全;可以与Mysql数据库进行无缝连接。 所以,家政服务管理平台在技术上是可以实施的。 3.1

高仿精仿愤怒的小鸟android版游戏源码

这是一款很完美的高仿精仿愤怒的小鸟android版游戏源码,大家可以研究一下吧、 为了报复偷走鸟蛋的肥猪们,鸟儿以自己的身体为武器,仿佛炮弹一样去攻击肥猪们的堡垒。游戏是十分卡通的2D画面,看着愤怒的红色小鸟,奋不顾身的往绿色的肥猪的堡垒砸去,那种奇妙的感觉还真是令人感到很欢乐。而游戏的配乐同样充满了欢乐的感觉,轻松的节奏,欢快的风格。 源码下载

高度内卷下,企业如何通过VOC(客户之声)做好竞争分析?

VOC,即客户之声,是一种通过收集和分析客户反馈、需求和期望,来洞察市场趋势和竞争对手动态的方法。在高度内卷的市场环境下,VOC不仅能够帮助企业了解客户的真实需求,还能为企业提供宝贵的竞争情报,助力企业在竞争中占据有利地位。 那么,企业该如何通过VOC(客户之声)做好竞争分析呢?深圳天行健企业管理咨询公司解析如下: 首先,要建立完善的VOC收集机制。这包括通过线上渠道(如社交媒体、官网留言

为什么要做Redis分区和分片

Redis分区(Partitioning)和分片(Sharding)是将数据分布在多个Redis实例或多个节点上的做法。这种技术用于提高性能、可扩展性和可用性。以下是执行Redis分区和分片的主要原因: 1. **提高吞吐量**:    - 通过将数据分散到多个节点,可以并行处理更多的操作,从而提高整体吞吐量。 2. **内存限制**:    - 单个Redis实例的内存是有限的。分区允许数据

如何理解redis是单线程的

写在文章开头 在面试时我们经常会问到这样一道题 你刚刚说redis是单线程的,那你能不能告诉我它是如何基于单个线程完成指令接收与连接接入的? 这时候我们经常会得到沉默,所以对于这道题,笔者会直接通过3.0.0源码分析的角度来剖析一下redis单线程的设计与实现。 Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源