本文主要是介绍漫话Redis源码之七十三,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
加载配置文件中的信息,非重点:
/* Load the cluster config from 'filename'.** If the file does not exist or is zero-length (this may happen because* when we lock the nodes.conf file, we create a zero-length one for the* sake of locking if it does not already exist), C_ERR is returned.* If the configuration was loaded from the file, C_OK is returned. */
int clusterLoadConfig(char *filename) {FILE *fp = fopen(filename,"r");struct stat sb;char *line;int maxline, j;if (fp == NULL) {if (errno == ENOENT) {return C_ERR;} else {serverLog(LL_WARNING,"Loading the cluster node config from %s: %s",filename, strerror(errno));exit(1);}}/* Check if the file is zero-length: if so return C_ERR to signal* we have to write the config. */if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {fclose(fp);return C_ERR;}/* Parse the file. Note that single lines of the cluster config file can* be really long as they include all the hash slots of the node.* This means in the worst possible case, half of the Redis slots will be* present in a single line, possibly in importing or migrating state, so* together with the node ID of the sender/receiver.** To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */maxline = 1024+CLUSTER_SLOTS*128;line = zmalloc(maxline);while(fgets(line,maxline,fp) != NULL) {int argc;sds *argv;clusterNode *n, *master;char *p, *s;/* Skip blank lines, they can be created either by users manually* editing nodes.conf or by the config writing process if stopped* before the truncate() call. */if (line[0] == '\n' || line[0] == '\0') continue;/* Split the line into arguments for processing. */argv = sdssplitargs(line,&argc);if (argv == NULL) goto fmterr;/* Handle the special "vars" line. Don't pretend it is the last* line even if it actually is when generated by Redis. */if (strcasecmp(argv[0],"vars") == 0) {if (!(argc % 2)) goto fmterr;for (j = 1; j < argc; j += 2) {if (strcasecmp(argv[j],"currentEpoch") == 0) {server.cluster->currentEpoch =strtoull(argv[j+1],NULL,10);} else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {server.cluster->lastVoteEpoch =strtoull(argv[j+1],NULL,10);} else {serverLog(LL_WARNING,"Skipping unknown cluster config variable '%s'",argv[j]);}}sdsfreesplitres(argv,argc);continue;}/* Regular config lines have at least eight fields */if (argc < 8) {sdsfreesplitres(argv,argc);goto fmterr;}/* Create this node if it does not exist */n = clusterLookupNode(argv[0]);if (!n) {n = createClusterNode(argv[0],0);clusterAddNode(n);}/* Address and port */if ((p = strrchr(argv[1],':')) == NULL) {sdsfreesplitres(argv,argc);goto fmterr;}*p = '\0';memcpy(n->ip,argv[1],strlen(argv[1])+1);char *port = p+1;char *busp = strchr(port,'@');if (busp) {*busp = '\0';busp++;}n->port = atoi(port);/* In older versions of nodes.conf the "@busport" part is missing.* In this case we set it to the default offset of 10000 from the* base port. */n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;/* The plaintext port for client in a TLS cluster (n->pport) is not* stored in nodes.conf. It is received later over the bus protocol. *//* Parse flags */p = s = argv[2];while(p) {p = strchr(s,',');if (p) *p = '\0';if (!strcasecmp(s,"myself")) {serverAssert(server.cluster->myself == NULL);myself = server.cluster->myself = n;n->flags |= CLUSTER_NODE_MYSELF;} else if (!strcasecmp(s,"master")) {n->flags |= CLUSTER_NODE_MASTER;} else if (!strcasecmp(s,"slave")) {n->flags |= CLUSTER_NODE_SLAVE;} else if (!strcasecmp(s,"fail?")) {n->flags |= CLUSTER_NODE_PFAIL;} else if (!strcasecmp(s,"fail")) {n->flags |= CLUSTER_NODE_FAIL;n->fail_time = mstime();} else if (!strcasecmp(s,"handshake")) {n->flags |= CLUSTER_NODE_HANDSHAKE;} else if (!strcasecmp(s,"noaddr")) {n->flags |= CLUSTER_NODE_NOADDR;} else if (!strcasecmp(s,"nofailover")) {n->flags |= CLUSTER_NODE_NOFAILOVER;} else if (!strcasecmp(s,"noflags")) {/* nothing to do */} else {serverPanic("Unknown flag in redis cluster config file");}if (p) s = p+1;}/* Get master if any. Set the master and populate master's* slave list. */if (argv[3][0] != '-') {master = clusterLookupNode(argv[3]);if (!master) {master = createClusterNode(argv[3],0);clusterAddNode(master);}n->slaveof = master;clusterNodeAddSlave(master,n);}/* Set ping sent / pong received timestamps */if (atoi(argv[4])) n->ping_sent = mstime();if (atoi(argv[5])) n->pong_received = mstime();/* Set configEpoch for this node. */n->configEpoch = strtoull(argv[6],NULL,10);/* Populate hash slots served by this instance. */for (j = 8; j < argc; j++) {int start, stop;if (argv[j][0] == '[') {/* Here we handle migrating / importing slots */int slot;char direction;clusterNode *cn;p = strchr(argv[j],'-');serverAssert(p != NULL);*p = '\0';direction = p[1]; /* Either '>' or '<' */slot = atoi(argv[j]+1);if (slot < 0 || slot >= CLUSTER_SLOTS) {sdsfreesplitres(argv,argc);goto fmterr;}p += 3;cn = clusterLookupNode(p);if (!cn) {cn = createClusterNode(p,0);clusterAddNode(cn);}if (direction == '>') {server.cluster->migrating_slots_to[slot] = cn;} else {server.cluster->importing_slots_from[slot] = cn;}continue;} else if ((p = strchr(argv[j],'-')) != NULL) {*p = '\0';start = atoi(argv[j]);stop = atoi(p+1);} else {start = stop = atoi(argv[j]);}if (start < 0 || start >= CLUSTER_SLOTS ||stop < 0 || stop >= CLUSTER_SLOTS){sdsfreesplitres(argv,argc);goto fmterr;}while(start <= stop) clusterAddSlot(n, start++);}sdsfreesplitres(argv,argc);}/* Config sanity check */if (server.cluster->myself == NULL) goto fmterr;zfree(line);fclose(fp);serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);/* Something that should never happen: currentEpoch smaller than* the max epoch found in the nodes configuration. However we handle this* as some form of protection against manual editing of critical files. */if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {server.cluster->currentEpoch = clusterGetMaxEpoch();}return C_OK;fmterr:serverLog(LL_WARNING,"Unrecoverable error: corrupted cluster config file.");zfree(line);if (fp) fclose(fp);exit(1);
}/* Cluster node configuration is exactly the same as CLUSTER NODES output.** This function writes the node config and returns 0, on error -1* is returned.** Note: we need to write the file in an atomic way from the point of view* of the POSIX filesystem semantics, so that if the server is stopped* or crashes during the write, we'll end with either the old file or the* new one. Since we have the full payload to write available we can use* a single write to write the whole file. If the pre-existing file was* bigger we pad our payload with newlines that are anyway ignored and truncate* the file afterward. */
int clusterSaveConfig(int do_fsync) {sds ci;size_t content_size;struct stat sb;int fd;server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;/* Get the nodes description and concatenate our "vars" directive to* save currentEpoch and lastVoteEpoch. */ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE, 0);ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",(unsigned long long) server.cluster->currentEpoch,(unsigned long long) server.cluster->lastVoteEpoch);content_size = sdslen(ci);if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))== -1) goto err;/* Pad the new payload if the existing file length is greater. */if (fstat(fd,&sb) != -1) {if (sb.st_size > (off_t)content_size) {ci = sdsgrowzero(ci,sb.st_size);memset(ci+content_size,'\n',sb.st_size-content_size);}}if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;if (do_fsync) {server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;if (fsync(fd) == -1) goto err;}/* Truncate the file if needed to remove the final \n padding that* is just garbage. */if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {/* ftruncate() failing is not a critical error. */}close(fd);sdsfree(ci);return 0;err:if (fd != -1) close(fd);sdsfree(ci);return -1;
}void clusterSaveConfigOrDie(int do_fsync) {if (clusterSaveConfig(do_fsync) == -1) {serverLog(LL_WARNING,"Fatal: can't update cluster config file.");exit(1);}
}/* Lock the cluster config using flock(), and leaks the file descriptor used to* acquire the lock so that the file will be locked forever.** This works because we always update nodes.conf with a new version* in-place, reopening the file, and writing to it in place (later adjusting* the length with ftruncate()).** On success C_OK is returned, otherwise an error is logged and* the function returns C_ERR to signal a lock was not acquired. */
int clusterLockConfig(char *filename) {
/* flock() does not exist on Solaris* and a fcntl-based solution won't help, as we constantly re-open that file,* which will release _all_ locks anyway*/
#if !defined(__sun)/* To lock it, we need to open the file in a way it is created if* it does not exist, otherwise there is a race condition with other* processes. */int fd = open(filename,O_WRONLY|O_CREAT|O_CLOEXEC,0644);if (fd == -1) {serverLog(LL_WARNING,"Can't open %s in order to acquire a lock: %s",filename, strerror(errno));return C_ERR;}if (flock(fd,LOCK_EX|LOCK_NB) == -1) {if (errno == EWOULDBLOCK) {serverLog(LL_WARNING,"Sorry, the cluster configuration file %s is already used ""by a different Redis Cluster node. Please make sure that ""different nodes use different cluster configuration ""files.", filename);} else {serverLog(LL_WARNING,"Impossible to lock %s: %s", filename, strerror(errno));}close(fd);return C_ERR;}/* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the* lock to the file as long as the process exists.** After fork, the child process will get the fd opened by the parent process,* we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),* it will be closed in the child process.* If it is not closed, when the main process is killed -9, but the child process* (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the* child process, and the main process will fail to get lock, means fail to start. */server.cluster_config_file_lock_fd = fd;
#elseUNUSED(filename);
#endif /* __sun */return C_OK;
}/* Derives our ports to be announced in the cluster bus. */
void deriveAnnouncedPorts(int *announced_port, int *announced_pport,int *announced_cport) {int port = server.tls_cluster ? server.tls_port : server.port;/* Default announced ports. */*announced_port = port;*announced_pport = server.tls_cluster ? server.port : 0;*announced_cport = port + CLUSTER_PORT_INCR;/* Config overriding announced ports. */if (server.tls_cluster && server.cluster_announce_tls_port) {*announced_port = server.cluster_announce_tls_port;*announced_pport = server.cluster_announce_port;} else if (server.cluster_announce_port) {*announced_port = server.cluster_announce_port;}if (server.cluster_announce_bus_port) {*announced_cport = server.cluster_announce_bus_port;}
}/* Some flags (currently just the NOFAILOVER flag) may need to be updated* in the "myself" node based on the current configuration of the node,* that may change at runtime via CONFIG SET. This function changes the* set of flags in myself->flags accordingly. */
void clusterUpdateMyselfFlags(void) {int oldflags = myself->flags;int nofailover = server.cluster_slave_no_failover ?CLUSTER_NODE_NOFAILOVER : 0;myself->flags &= ~CLUSTER_NODE_NOFAILOVER;myself->flags |= nofailover;if (myself->flags != oldflags) {clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);}
}void clusterInit(void) {int saveconf = 0;server.cluster = zmalloc(sizeof(clusterState));server.cluster->myself = NULL;server.cluster->currentEpoch = 0;server.cluster->state = CLUSTER_FAIL;server.cluster->size = 1;server.cluster->todo_before_sleep = 0;server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);server.cluster->nodes_black_list =dictCreate(&clusterNodesBlackListDictType,NULL);server.cluster->failover_auth_time = 0;server.cluster->failover_auth_count = 0;server.cluster->failover_auth_rank = 0;server.cluster->failover_auth_epoch = 0;server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;server.cluster->lastVoteEpoch = 0;for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {server.cluster->stats_bus_messages_sent[i] = 0;server.cluster->stats_bus_messages_received[i] = 0;}server.cluster->stats_pfail_nodes = 0;memset(server.cluster->slots,0, sizeof(server.cluster->slots));clusterCloseAllSlots();/* Lock the cluster config file to make sure every node uses* its own nodes.conf. */server.cluster_config_file_lock_fd = -1;if (clusterLockConfig(server.cluster_configfile) == C_ERR)exit(1);/* Load or create a new nodes configuration. */if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {/* No configuration found. We will just use the random name provided* by the createClusterNode() function. */myself = server.cluster->myself =createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",myself->name);clusterAddNode(myself);saveconf = 1;}if (saveconf) clusterSaveConfigOrDie(1);/* We need a listening TCP port for our cluster messaging needs. */server.cfd.count = 0;/* Port sanity check II* The other handshake port check is triggered too late to stop* us from trying to use a too-high cluster port number. */int port = server.tls_cluster ? server.tls_port : server.port;if (port > (65535-CLUSTER_PORT_INCR)) {serverLog(LL_WARNING, "Redis port number too high. ""Cluster communication port is 10,000 port ""numbers higher than your Redis port. ""Your Redis port number must be 55535 or less.");exit(1);}if (listenToPort(port+CLUSTER_PORT_INCR, &server.cfd) == C_ERR) {exit(1);}if (createSocketAcceptHandler(&server.cfd, clusterAcceptHandler) != C_OK) {serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");}/* The slots -> keys map is a radix tree. Initialize it here. */server.cluster->slots_to_keys = raxNew();memset(server.cluster->slots_keys_count,0,sizeof(server.cluster->slots_keys_count));/* Set myself->port/cport/pport to my listening ports, we'll just need to* discover the IP address via MEET messages. */deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);server.cluster->mf_end = 0;resetManualFailover();clusterUpdateMyselfFlags();
}
这篇关于漫话Redis源码之七十三的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!