漫话Redis源码之七十三

2024-02-06 09:38
文章标签 源码 redis 漫话 七十三

本文主要是介绍漫话Redis源码之七十三,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

加载配置文件中的信息,非重点:

/* Load the cluster config from 'filename'.** If the file does not exist or is zero-length (this may happen because* when we lock the nodes.conf file, we create a zero-length one for the* sake of locking if it does not already exist), C_ERR is returned.* If the configuration was loaded from the file, C_OK is returned. */
int clusterLoadConfig(char *filename) {FILE *fp = fopen(filename,"r");struct stat sb;char *line;int maxline, j;if (fp == NULL) {if (errno == ENOENT) {return C_ERR;} else {serverLog(LL_WARNING,"Loading the cluster node config from %s: %s",filename, strerror(errno));exit(1);}}/* Check if the file is zero-length: if so return C_ERR to signal* we have to write the config. */if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {fclose(fp);return C_ERR;}/* Parse the file. Note that single lines of the cluster config file can* be really long as they include all the hash slots of the node.* This means in the worst possible case, half of the Redis slots will be* present in a single line, possibly in importing or migrating state, so* together with the node ID of the sender/receiver.** To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */maxline = 1024+CLUSTER_SLOTS*128;line = zmalloc(maxline);while(fgets(line,maxline,fp) != NULL) {int argc;sds *argv;clusterNode *n, *master;char *p, *s;/* Skip blank lines, they can be created either by users manually* editing nodes.conf or by the config writing process if stopped* before the truncate() call. */if (line[0] == '\n' || line[0] == '\0') continue;/* Split the line into arguments for processing. */argv = sdssplitargs(line,&argc);if (argv == NULL) goto fmterr;/* Handle the special "vars" line. Don't pretend it is the last* line even if it actually is when generated by Redis. */if (strcasecmp(argv[0],"vars") == 0) {if (!(argc % 2)) goto fmterr;for (j = 1; j < argc; j += 2) {if (strcasecmp(argv[j],"currentEpoch") == 0) {server.cluster->currentEpoch =strtoull(argv[j+1],NULL,10);} else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {server.cluster->lastVoteEpoch =strtoull(argv[j+1],NULL,10);} else {serverLog(LL_WARNING,"Skipping unknown cluster config variable '%s'",argv[j]);}}sdsfreesplitres(argv,argc);continue;}/* Regular config lines have at least eight fields */if (argc < 8) {sdsfreesplitres(argv,argc);goto fmterr;}/* Create this node if it does not exist */n = clusterLookupNode(argv[0]);if (!n) {n = createClusterNode(argv[0],0);clusterAddNode(n);}/* Address and port */if ((p = strrchr(argv[1],':')) == NULL) {sdsfreesplitres(argv,argc);goto fmterr;}*p = '\0';memcpy(n->ip,argv[1],strlen(argv[1])+1);char *port = p+1;char *busp = strchr(port,'@');if (busp) {*busp = '\0';busp++;}n->port = atoi(port);/* In older versions of nodes.conf the "@busport" part is missing.* In this case we set it to the default offset of 10000 from the* base port. */n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;/* The plaintext port for client in a TLS cluster (n->pport) is not* stored in nodes.conf. It is received later over the bus protocol. *//* Parse flags */p = s = argv[2];while(p) {p = strchr(s,',');if (p) *p = '\0';if (!strcasecmp(s,"myself")) {serverAssert(server.cluster->myself == NULL);myself = server.cluster->myself = n;n->flags |= CLUSTER_NODE_MYSELF;} else if (!strcasecmp(s,"master")) {n->flags |= CLUSTER_NODE_MASTER;} else if (!strcasecmp(s,"slave")) {n->flags |= CLUSTER_NODE_SLAVE;} else if (!strcasecmp(s,"fail?")) {n->flags |= CLUSTER_NODE_PFAIL;} else if (!strcasecmp(s,"fail")) {n->flags |= CLUSTER_NODE_FAIL;n->fail_time = mstime();} else if (!strcasecmp(s,"handshake")) {n->flags |= CLUSTER_NODE_HANDSHAKE;} else if (!strcasecmp(s,"noaddr")) {n->flags |= CLUSTER_NODE_NOADDR;} else if (!strcasecmp(s,"nofailover")) {n->flags |= CLUSTER_NODE_NOFAILOVER;} else if (!strcasecmp(s,"noflags")) {/* nothing to do */} else {serverPanic("Unknown flag in redis cluster config file");}if (p) s = p+1;}/* Get master if any. Set the master and populate master's* slave list. */if (argv[3][0] != '-') {master = clusterLookupNode(argv[3]);if (!master) {master = createClusterNode(argv[3],0);clusterAddNode(master);}n->slaveof = master;clusterNodeAddSlave(master,n);}/* Set ping sent / pong received timestamps */if (atoi(argv[4])) n->ping_sent = mstime();if (atoi(argv[5])) n->pong_received = mstime();/* Set configEpoch for this node. */n->configEpoch = strtoull(argv[6],NULL,10);/* Populate hash slots served by this instance. */for (j = 8; j < argc; j++) {int start, stop;if (argv[j][0] == '[') {/* Here we handle migrating / importing slots */int slot;char direction;clusterNode *cn;p = strchr(argv[j],'-');serverAssert(p != NULL);*p = '\0';direction = p[1]; /* Either '>' or '<' */slot = atoi(argv[j]+1);if (slot < 0 || slot >= CLUSTER_SLOTS) {sdsfreesplitres(argv,argc);goto fmterr;}p += 3;cn = clusterLookupNode(p);if (!cn) {cn = createClusterNode(p,0);clusterAddNode(cn);}if (direction == '>') {server.cluster->migrating_slots_to[slot] = cn;} else {server.cluster->importing_slots_from[slot] = cn;}continue;} else if ((p = strchr(argv[j],'-')) != NULL) {*p = '\0';start = atoi(argv[j]);stop = atoi(p+1);} else {start = stop = atoi(argv[j]);}if (start < 0 || start >= CLUSTER_SLOTS ||stop < 0 || stop >= CLUSTER_SLOTS){sdsfreesplitres(argv,argc);goto fmterr;}while(start <= stop) clusterAddSlot(n, start++);}sdsfreesplitres(argv,argc);}/* Config sanity check */if (server.cluster->myself == NULL) goto fmterr;zfree(line);fclose(fp);serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);/* Something that should never happen: currentEpoch smaller than* the max epoch found in the nodes configuration. However we handle this* as some form of protection against manual editing of critical files. */if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {server.cluster->currentEpoch = clusterGetMaxEpoch();}return C_OK;fmterr:serverLog(LL_WARNING,"Unrecoverable error: corrupted cluster config file.");zfree(line);if (fp) fclose(fp);exit(1);
}/* Cluster node configuration is exactly the same as CLUSTER NODES output.** This function writes the node config and returns 0, on error -1* is returned.** Note: we need to write the file in an atomic way from the point of view* of the POSIX filesystem semantics, so that if the server is stopped* or crashes during the write, we'll end with either the old file or the* new one. Since we have the full payload to write available we can use* a single write to write the whole file. If the pre-existing file was* bigger we pad our payload with newlines that are anyway ignored and truncate* the file afterward. */
int clusterSaveConfig(int do_fsync) {sds ci;size_t content_size;struct stat sb;int fd;server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;/* Get the nodes description and concatenate our "vars" directive to* save currentEpoch and lastVoteEpoch. */ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE, 0);ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",(unsigned long long) server.cluster->currentEpoch,(unsigned long long) server.cluster->lastVoteEpoch);content_size = sdslen(ci);if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))== -1) goto err;/* Pad the new payload if the existing file length is greater. */if (fstat(fd,&sb) != -1) {if (sb.st_size > (off_t)content_size) {ci = sdsgrowzero(ci,sb.st_size);memset(ci+content_size,'\n',sb.st_size-content_size);}}if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;if (do_fsync) {server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;if (fsync(fd) == -1) goto err;}/* Truncate the file if needed to remove the final \n padding that* is just garbage. */if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {/* ftruncate() failing is not a critical error. */}close(fd);sdsfree(ci);return 0;err:if (fd != -1) close(fd);sdsfree(ci);return -1;
}void clusterSaveConfigOrDie(int do_fsync) {if (clusterSaveConfig(do_fsync) == -1) {serverLog(LL_WARNING,"Fatal: can't update cluster config file.");exit(1);}
}/* Lock the cluster config using flock(), and leaks the file descriptor used to* acquire the lock so that the file will be locked forever.** This works because we always update nodes.conf with a new version* in-place, reopening the file, and writing to it in place (later adjusting* the length with ftruncate()).** On success C_OK is returned, otherwise an error is logged and* the function returns C_ERR to signal a lock was not acquired. */
int clusterLockConfig(char *filename) {
/* flock() does not exist on Solaris* and a fcntl-based solution won't help, as we constantly re-open that file,* which will release _all_ locks anyway*/
#if !defined(__sun)/* To lock it, we need to open the file in a way it is created if* it does not exist, otherwise there is a race condition with other* processes. */int fd = open(filename,O_WRONLY|O_CREAT|O_CLOEXEC,0644);if (fd == -1) {serverLog(LL_WARNING,"Can't open %s in order to acquire a lock: %s",filename, strerror(errno));return C_ERR;}if (flock(fd,LOCK_EX|LOCK_NB) == -1) {if (errno == EWOULDBLOCK) {serverLog(LL_WARNING,"Sorry, the cluster configuration file %s is already used ""by a different Redis Cluster node. Please make sure that ""different nodes use different cluster configuration ""files.", filename);} else {serverLog(LL_WARNING,"Impossible to lock %s: %s", filename, strerror(errno));}close(fd);return C_ERR;}/* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the* lock to the file as long as the process exists.** After fork, the child process will get the fd opened by the parent process,* we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),* it will be closed in the child process.* If it is not closed, when the main process is killed -9, but the child process* (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the* child process, and the main process will fail to get lock, means fail to start. */server.cluster_config_file_lock_fd = fd;
#elseUNUSED(filename);
#endif /* __sun */return C_OK;
}/* Derives our ports to be announced in the cluster bus. */
void deriveAnnouncedPorts(int *announced_port, int *announced_pport,int *announced_cport) {int port = server.tls_cluster ? server.tls_port : server.port;/* Default announced ports. */*announced_port = port;*announced_pport = server.tls_cluster ? server.port : 0;*announced_cport = port + CLUSTER_PORT_INCR;/* Config overriding announced ports. */if (server.tls_cluster && server.cluster_announce_tls_port) {*announced_port = server.cluster_announce_tls_port;*announced_pport = server.cluster_announce_port;} else if (server.cluster_announce_port) {*announced_port = server.cluster_announce_port;}if (server.cluster_announce_bus_port) {*announced_cport = server.cluster_announce_bus_port;}
}/* Some flags (currently just the NOFAILOVER flag) may need to be updated* in the "myself" node based on the current configuration of the node,* that may change at runtime via CONFIG SET. This function changes the* set of flags in myself->flags accordingly. */
void clusterUpdateMyselfFlags(void) {int oldflags = myself->flags;int nofailover = server.cluster_slave_no_failover ?CLUSTER_NODE_NOFAILOVER : 0;myself->flags &= ~CLUSTER_NODE_NOFAILOVER;myself->flags |= nofailover;if (myself->flags != oldflags) {clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);}
}void clusterInit(void) {int saveconf = 0;server.cluster = zmalloc(sizeof(clusterState));server.cluster->myself = NULL;server.cluster->currentEpoch = 0;server.cluster->state = CLUSTER_FAIL;server.cluster->size = 1;server.cluster->todo_before_sleep = 0;server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);server.cluster->nodes_black_list =dictCreate(&clusterNodesBlackListDictType,NULL);server.cluster->failover_auth_time = 0;server.cluster->failover_auth_count = 0;server.cluster->failover_auth_rank = 0;server.cluster->failover_auth_epoch = 0;server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;server.cluster->lastVoteEpoch = 0;for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {server.cluster->stats_bus_messages_sent[i] = 0;server.cluster->stats_bus_messages_received[i] = 0;}server.cluster->stats_pfail_nodes = 0;memset(server.cluster->slots,0, sizeof(server.cluster->slots));clusterCloseAllSlots();/* Lock the cluster config file to make sure every node uses* its own nodes.conf. */server.cluster_config_file_lock_fd = -1;if (clusterLockConfig(server.cluster_configfile) == C_ERR)exit(1);/* Load or create a new nodes configuration. */if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {/* No configuration found. We will just use the random name provided* by the createClusterNode() function. */myself = server.cluster->myself =createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",myself->name);clusterAddNode(myself);saveconf = 1;}if (saveconf) clusterSaveConfigOrDie(1);/* We need a listening TCP port for our cluster messaging needs. */server.cfd.count = 0;/* Port sanity check II* The other handshake port check is triggered too late to stop* us from trying to use a too-high cluster port number. */int port = server.tls_cluster ? server.tls_port : server.port;if (port > (65535-CLUSTER_PORT_INCR)) {serverLog(LL_WARNING, "Redis port number too high. ""Cluster communication port is 10,000 port ""numbers higher than your Redis port. ""Your Redis port number must be 55535 or less.");exit(1);}if (listenToPort(port+CLUSTER_PORT_INCR, &server.cfd) == C_ERR) {exit(1);}if (createSocketAcceptHandler(&server.cfd, clusterAcceptHandler) != C_OK) {serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");}/* The slots -> keys map is a radix tree. Initialize it here. */server.cluster->slots_to_keys = raxNew();memset(server.cluster->slots_keys_count,0,sizeof(server.cluster->slots_keys_count));/* Set myself->port/cport/pport to my listening ports, we'll just need to* discover the IP address via MEET messages. */deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);server.cluster->mf_end = 0;resetManualFailover();clusterUpdateMyselfFlags();
}

这篇关于漫话Redis源码之七十三的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/683874

相关文章

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

Redis中使用布隆过滤器解决缓存穿透问题

一、缓存穿透(失效)问题 缓存穿透是指查询一个一定不存在的数据,由于缓存中没有命中,会去数据库中查询,而数据库中也没有该数据,并且每次查询都不会命中缓存,从而每次请求都直接打到了数据库上,这会给数据库带来巨大压力。 二、布隆过滤器原理 布隆过滤器(Bloom Filter)是一种空间效率很高的随机数据结构,它利用多个不同的哈希函数将一个元素映射到一个位数组中的多个位置,并将这些位置的值置

red5-server源码

red5-server源码:https://github.com/Red5/red5-server