漫话Redis源码之七十六

2024-02-06 09:38
文章标签 源码 redis 漫话 七十六

本文主要是介绍漫话Redis源码之七十六,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

处理非阻塞的client, 整个实现还是比较简单的:

/* This function is called in the beforeSleep() function of the event loop* in order to process the pending input buffer of clients that were* unblocked after a blocking operation. */
void processUnblockedClients(void) {listNode *ln;client *c;while (listLength(server.unblocked_clients)) {ln = listFirst(server.unblocked_clients);serverAssert(ln != NULL);c = ln->value;listDelNode(server.unblocked_clients,ln);c->flags &= ~CLIENT_UNBLOCKED;/* Process remaining data in the input buffer, unless the client* is blocked again. Actually processInputBuffer() checks that the* client is not blocked before to proceed, but things may change and* the code is conceptually more correct this way. */if (!(c->flags & CLIENT_BLOCKED)) {/* If we have a queued command, execute it now. */if (processPendingCommandsAndResetClient(c) == C_ERR) {continue;}/* Then process client if it has more data in it's buffer. */if (c->querybuf && sdslen(c->querybuf) > 0) {processInputBuffer(c);}}}
}/* This function will schedule the client for reprocessing at a safe time.** This is useful when a client was blocked for some reason (blocking operation,* CLIENT PAUSE, or whatever), because it may end with some accumulated query* buffer that needs to be processed ASAP:** 1. When a client is blocked, its readable handler is still active.* 2. However in this case it only gets data into the query buffer, but the*    query is not parsed or executed once there is enough to proceed as*    usually (because the client is blocked... so we can't execute commands).* 3. When the client is unblocked, without this function, the client would*    have to write some query in order for the readable handler to finally*    call processQueryBuffer*() on it.* 4. With this function instead we can put the client in a queue that will*    process it for queries ready to be executed at a safe time.*/
void queueClientForReprocessing(client *c) {/* The client may already be into the unblocked list because of a previous* blocking operation, don't add back it into the list multiple times. */if (!(c->flags & CLIENT_UNBLOCKED)) {c->flags |= CLIENT_UNBLOCKED;listAddNodeTail(server.unblocked_clients,c);}
}/* Unblock a client calling the right function depending on the kind* of operation the client is blocking for. */
void unblockClient(client *c) {if (c->btype == BLOCKED_LIST ||c->btype == BLOCKED_ZSET ||c->btype == BLOCKED_STREAM) {unblockClientWaitingData(c);} else if (c->btype == BLOCKED_WAIT) {unblockClientWaitingReplicas(c);} else if (c->btype == BLOCKED_MODULE) {if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);unblockClientFromModule(c);} else if (c->btype == BLOCKED_PAUSE) {listDelNode(server.paused_clients,c->paused_list_node);c->paused_list_node = NULL;} else {serverPanic("Unknown btype in unblockClient().");}/* Reset the client for a new query since, for blocking commands* we do not do it immediately after the command returns (when the* client got blocked) in order to be still able to access the argument* vector from module callbacks and updateStatsOnUnblock. */if (c->btype != BLOCKED_PAUSE) {freeClientOriginalArgv(c);resetClient(c);}/* Clear the flags, and put the client in the unblocked list so that* we'll process new commands in its query buffer ASAP. */server.blocked_clients--;server.blocked_clients_by_type[c->btype]--;c->flags &= ~CLIENT_BLOCKED;c->btype = BLOCKED_NONE;removeClientFromTimeoutTable(c);queueClientForReprocessing(c);
}/* This function gets called when a blocked client timed out in order to* send it a reply of some kind. After this function is called,* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {if (c->btype == BLOCKED_LIST ||c->btype == BLOCKED_ZSET ||c->btype == BLOCKED_STREAM) {addReplyNullArray(c);} else if (c->btype == BLOCKED_WAIT) {addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));} else if (c->btype == BLOCKED_MODULE) {moduleBlockedClientTimedOut(c);} else {serverPanic("Unknown btype in replyToBlockedClientTimedOut().");}
}/* Mass-unblock clients because something changed in the instance that makes* blocking no longer safe. For example clients blocked in list operations* in an instance which turns from master to slave is unsafe, so this function* is called when a master turns into a slave.** The semantics is to send an -UNBLOCKED error to the client, disconnecting* it at the same time. */
void disconnectAllBlockedClients(void) {listNode *ln;listIter li;listRewind(server.clients,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);if (c->flags & CLIENT_BLOCKED) {/* PAUSED clients are an exception, when they'll be unblocked, the* command processing will start from scratch, and the command will* be either executed or rejected. (unlike LIST blocked clients for* which the command is already in progress in a way. */if (c->btype == BLOCKED_PAUSE)continue;addReplyError(c,"-UNBLOCKED force unblock from blocking operation, ""instance state changed (master -> replica?)");unblockClient(c);c->flags |= CLIENT_CLOSE_AFTER_REPLY;}}
}/* Helper function for handleClientsBlockedOnKeys(). This function is called* when there may be clients blocked on a list key, and there may be new* data to fetch (the key is ready). */
void serveClientsBlockedOnListKey(robj *o, readyList *rl) {/* We serve clients in the same order they blocked for* this key, from the first blocked to the last. */dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);if (de) {list *clients = dictGetVal(de);int numclients = listLength(clients);while(numclients--) {listNode *clientnode = listFirst(clients);client *receiver = clientnode->value;if (receiver->btype != BLOCKED_LIST) {/* Put at the tail, so that at the next call* we'll not run into it again. */listRotateHeadToTail(clients);continue;}robj *dstkey = receiver->bpop.target;int wherefrom = receiver->bpop.listpos.wherefrom;int whereto = receiver->bpop.listpos.whereto;robj *value = listTypePop(o, wherefrom);if (value) {/* Protect receiver->bpop.target, that will be* freed by the next unblockClient()* call. */if (dstkey) incrRefCount(dstkey);monotime replyTimer;elapsedStart(&replyTimer);if (serveClientBlockedOnList(receiver,rl->key,dstkey,rl->db,value,wherefrom, whereto) == C_ERR){/* If we failed serving the client we need* to also undo the POP operation. */listTypePush(o,value,wherefrom);}updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));unblockClient(receiver);if (dstkey) decrRefCount(dstkey);decrRefCount(value);} else {break;}}}if (listTypeLength(o) == 0) {dbDelete(rl->db,rl->key);notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);}/* We don't call signalModifiedKey() as it was already called* when an element was pushed on the list. */
}/* Helper function for handleClientsBlockedOnKeys(). This function is called* when there may be clients blocked on a sorted set key, and there may be new* data to fetch (the key is ready). */
void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {/* We serve clients in the same order they blocked for* this key, from the first blocked to the last. */dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);if (de) {list *clients = dictGetVal(de);int numclients = listLength(clients);unsigned long zcard = zsetLength(o);while(numclients-- && zcard) {listNode *clientnode = listFirst(clients);client *receiver = clientnode->value;if (receiver->btype != BLOCKED_ZSET) {/* Put at the tail, so that at the next call* we'll not run into it again. */listRotateHeadToTail(clients);continue;}int where = (receiver->lastcmd &&receiver->lastcmd->proc == bzpopminCommand)? ZSET_MIN : ZSET_MAX;monotime replyTimer;elapsedStart(&replyTimer);genericZpopCommand(receiver,&rl->key,1,where,1,NULL);updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));unblockClient(receiver);zcard--;/* Replicate the command. */robj *argv[2];struct redisCommand *cmd = where == ZSET_MIN ?server.zpopminCommand :server.zpopmaxCommand;argv[0] = createStringObject(cmd->name,strlen(cmd->name));argv[1] = rl->key;incrRefCount(rl->key);propagate(cmd,receiver->db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);decrRefCount(argv[0]);decrRefCount(argv[1]);}}
}/* Helper function for handleClientsBlockedOnKeys(). This function is called* when there may be clients blocked on a stream key, and there may be new* data to fetch (the key is ready). */
void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);stream *s = o->ptr;/* We need to provide the new data arrived on the stream* to all the clients that are waiting for an offset smaller* than the current top item. */if (de) {list *clients = dictGetVal(de);listNode *ln;listIter li;listRewind(clients,&li);while((ln = listNext(&li))) {client *receiver = listNodeValue(ln);if (receiver->btype != BLOCKED_STREAM) continue;bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key);streamID *gt = &bki->stream_id;/* If we blocked in the context of a consumer* group, we need to resolve the group and update the* last ID the client is blocked for: this is needed* because serving other clients in the same consumer* group will alter the "last ID" of the consumer* group, and clients blocked in a consumer group are* always blocked for the ">" ID: we need to deliver* only new messages and avoid unblocking the client* otherwise. */streamCG *group = NULL;if (receiver->bpop.xread_group) {group = streamLookupCG(s,receiver->bpop.xread_group->ptr);/* If the group was not found, send an error* to the consumer. */if (!group) {addReplyError(receiver,"-NOGROUP the consumer group this client ""was blocked on no longer exists");unblockClient(receiver);continue;} else {*gt = group->last_id;}}if (streamCompareID(&s->last_id, gt) > 0) {streamID start = *gt;streamIncrID(&start);/* Lookup the consumer for the group, if any. */streamConsumer *consumer = NULL;int noack = 0;if (group) {int created = 0;consumer =streamLookupConsumer(group,receiver->bpop.xread_consumer->ptr,SLC_NONE,&created);noack = receiver->bpop.xread_group_noack;if (created && noack) {streamPropagateConsumerCreation(receiver,rl->key,receiver->bpop.xread_group,consumer->name);}}monotime replyTimer;elapsedStart(&replyTimer);/* Emit the two elements sub-array consisting of* the name of the stream and the data we* extracted from it. Wrapped in a single-item* array, since we have just one key. */if (receiver->resp == 2) {addReplyArrayLen(receiver,1);addReplyArrayLen(receiver,2);} else {addReplyMapLen(receiver,1);}addReplyBulk(receiver,rl->key);streamPropInfo pi = {rl->key,receiver->bpop.xread_group};streamReplyWithRange(receiver,s,&start,NULL,receiver->bpop.xread_count,0, group, consumer, noack, &pi);updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));/* Note that after we unblock the client, 'gt'* and other receiver->bpop stuff are no longer* valid, so we must do the setup above before* this call. */unblockClient(receiver);}}}
}/* Helper function for handleClientsBlockedOnKeys(). This function is called* in order to check if we can serve clients blocked by modules using* RM_BlockClientOnKeys(), when the corresponding key was signaled as ready:* our goal here is to call the RedisModuleBlockedClient reply() callback to* see if the key is really able to serve the client, and in that case,* unblock it. */
void serveClientsBlockedOnKeyByModule(readyList *rl) {dictEntry *de;/* Optimization: If no clients are in type BLOCKED_MODULE,* we can skip this loop. */if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return;/* We serve clients in the same order they blocked for* this key, from the first blocked to the last. */de = dictFind(rl->db->blocking_keys,rl->key);if (de) {list *clients = dictGetVal(de);int numclients = listLength(clients);while(numclients--) {listNode *clientnode = listFirst(clients);client *receiver = clientnode->value;/* Put at the tail, so that at the next call* we'll not run into it again: clients here may not be* ready to be served, so they'll remain in the list* sometimes. We want also be able to skip clients that are* not blocked for the MODULE type safely. */listRotateHeadToTail(clients);if (receiver->btype != BLOCKED_MODULE) continue;/* Note that if *this* client cannot be served by this key,* it does not mean that another client that is next into the* list cannot be served as well: they may be blocked by* different modules with different triggers to consider if a key* is ready or not. This means we can't exit the loop but need* to continue after the first failure. */monotime replyTimer;elapsedStart(&replyTimer);if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));moduleUnblockClient(receiver);}}
}/* This function should be called by Redis every time a single command,* a MULTI/EXEC block, or a Lua script, terminated its execution after* being called by a client. It handles serving clients blocked in* lists, streams, and sorted sets, via a blocking commands.** All the keys with at least one client blocked that received at least* one new element via some write operation are accumulated into* the server.ready_keys list. This function will run the list and will* serve clients accordingly. Note that the function will iterate again and* again as a result of serving BLMOVE we can have new blocking clients* to serve because of the PUSH side of BLMOVE.** This function is normally "fair", that is, it will server clients* using a FIFO behavior. However this fairness is violated in certain* edge cases, that is, when we have clients blocked at the same time* in a sorted set and in a list, for the same key (a very odd thing to* do client side, indeed!). Because mismatching clients (blocking for* a different type compared to the current key type) are moved in the* other side of the linked list. However as long as the key starts to* be used only for a single type, like virtually any Redis application will* do, the function is already fair. */
void handleClientsBlockedOnKeys(void) {while(listLength(server.ready_keys) != 0) {list *l;/* Point server.ready_keys to a fresh list and save the current one* locally. This way as we run the old list we are free to call* signalKeyAsReady() that may push new elements in server.ready_keys* when handling clients blocked into BLMOVE. */l = server.ready_keys;server.ready_keys = listCreate();while(listLength(l) != 0) {listNode *ln = listFirst(l);readyList *rl = ln->value;/* First of all remove this key from db->ready_keys so that* we can safely call signalKeyAsReady() against this key. */dictDelete(rl->db->ready_keys,rl->key);/* Even if we are not inside call(), increment the call depth* in order to make sure that keys are expired against a fixed* reference time, and not against the wallclock time. This* way we can lookup an object multiple times (BLMOVE does* that) without the risk of it being freed in the second* lookup, invalidating the first one.* See https://github.com/redis/redis/pull/6554. */server.fixed_time_expire++;updateCachedTime(0);/* Serve clients blocked on the key. */robj *o = lookupKeyWrite(rl->db,rl->key);if (o != NULL) {if (o->type == OBJ_LIST)serveClientsBlockedOnListKey(o,rl);else if (o->type == OBJ_ZSET)serveClientsBlockedOnSortedSetKey(o,rl);else if (o->type == OBJ_STREAM)serveClientsBlockedOnStreamKey(o,rl);/* We want to serve clients blocked on module keys* regardless of the object type: we don't know what the* module is trying to accomplish right now. */serveClientsBlockedOnKeyByModule(rl);}server.fixed_time_expire--;/* Free this item. */decrRefCount(rl->key);zfree(rl);listDelNode(l,ln);}listRelease(l); /* We have the new list on place at this point. */}
}/* This is how the current blocking lists/sorted sets/streams work, we use* BLPOP as example, but the concept is the same for other list ops, sorted* sets and XREAD.* - If the user calls BLPOP and the key exists and contains a non empty list*   then LPOP is called instead. So BLPOP is semantically the same as LPOP*   if blocking is not required.* - If instead BLPOP is called and the key does not exists or the list is*   empty we need to block. In order to do so we remove the notification for*   new data to read in the client socket (so that we'll not serve new*   requests if the blocking request is not served). Also we put the client*   in a dictionary (db->blocking_keys) mapping keys to a list of clients*   blocking for this keys.* - If a PUSH operation against a key with blocked clients waiting is*   performed, we mark this key as "ready", and after the current command,*   MULTI/EXEC block, or script, is executed, we serve all the clients waiting*   for this list, from the one that blocked first, to the last, accordingly*   to the number of elements we have in the ready list.*//* Set a client in blocking mode for the specified key (list, zset or stream),* with the specified timeout. The 'type' argument is BLOCKED_LIST,* BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are* waiting for an empty key in order to awake the client. The client is blocked* for all the 'numkeys' keys as in the 'keys' argument. When we block for* stream keys, we also provide an array of streamID structures: clients will* be unblocked only when items with an ID greater or equal to the specified* one is appended to the stream. */
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) {dictEntry *de;list *l;int j;c->bpop.timeout = timeout;c->bpop.target = target;if (listpos != NULL) c->bpop.listpos = *listpos;if (target != NULL) incrRefCount(target);for (j = 0; j < numkeys; j++) {/* Allocate our bkinfo structure, associated to each key the client* is blocked for. */bkinfo *bki = zmalloc(sizeof(*bki));if (btype == BLOCKED_STREAM)bki->stream_id = ids[j];/* If the key already exists in the dictionary ignore it. */if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {zfree(bki);continue;}incrRefCount(keys[j]);/* And in the other "side", to map keys -> clients */de = dictFind(c->db->blocking_keys,keys[j]);if (de == NULL) {int retval;/* For every key we take a list of clients blocked for it */l = listCreate();retval = dictAdd(c->db->blocking_keys,keys[j],l);incrRefCount(keys[j]);serverAssertWithInfo(c,keys[j],retval == DICT_OK);} else {l = dictGetVal(de);}listAddNodeTail(l,c);bki->listnode = listLast(l);}blockClient(c,btype);
}/* Unblock a client that's waiting in a blocking operation such as BLPOP.* You should never call this function directly, but unblockClient() instead. */
void unblockClientWaitingData(client *c) {dictEntry *de;dictIterator *di;list *l;serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);di = dictGetIterator(c->bpop.keys);/* The client may wait for multiple keys, so unblock it for every key. */while((de = dictNext(di)) != NULL) {robj *key = dictGetKey(de);bkinfo *bki = dictGetVal(de);/* Remove this client from the list of clients waiting for this key. */l = dictFetchValue(c->db->blocking_keys,key);serverAssertWithInfo(c,key,l != NULL);listDelNode(l,bki->listnode);/* If the list is empty we need to remove it to avoid wasting memory */if (listLength(l) == 0)dictDelete(c->db->blocking_keys,key);}dictReleaseIterator(di);/* Cleanup the client structure */dictEmpty(c->bpop.keys,NULL);if (c->bpop.target) {decrRefCount(c->bpop.target);c->bpop.target = NULL;}if (c->bpop.xread_group) {decrRefCount(c->bpop.xread_group);decrRefCount(c->bpop.xread_consumer);c->bpop.xread_group = NULL;c->bpop.xread_consumer = NULL;}
}

这篇关于漫话Redis源码之七十六的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/683876

相关文章

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

Redis中使用布隆过滤器解决缓存穿透问题

一、缓存穿透(失效)问题 缓存穿透是指查询一个一定不存在的数据,由于缓存中没有命中,会去数据库中查询,而数据库中也没有该数据,并且每次查询都不会命中缓存,从而每次请求都直接打到了数据库上,这会给数据库带来巨大压力。 二、布隆过滤器原理 布隆过滤器(Bloom Filter)是一种空间效率很高的随机数据结构,它利用多个不同的哈希函数将一个元素映射到一个位数组中的多个位置,并将这些位置的值置

red5-server源码

red5-server源码:https://github.com/Red5/red5-server