本文主要是介绍压缩列表ziplist,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
1压缩列表的结构
2.压缩列表节点的构成
previous_entry_length
encodeing
conent
3.压缩列表API
1.创建列表 ziplistNew
2.正向遍历 ziplistNext
3.反向遍历 ziplistPrev
4.插入元素 ziplistInsert
5.级联更新 __ziplistCascadeUpdate
6.删除节点
7.查找entry ziplistFind
4.连锁更新问题
5.引出结构 listpack
listpack的结构和节点结构
listpack是如何实现从尾部到头部遍历?
ziplist和普通的
双向链表不同 ,ziplist
不存储指向上一个链表节点和指向下一个链表节点的指针,而是存储上一个节点长度和当前节点长度,通过牺牲部分读写性能,来换取高效的内存空间利用率,节约内存,是一种时间换空间的思想。ziplist使用连续内存地址+偏移量的方式实现链表。
其只用在字段个数少,字段值小的场景里面;适合存储一些int类型的数据或者长度比较短的字符串。
1压缩列表的结构
压缩列表是 Redis 为节约空间而实现的一系列特殊编码的连续内存块组成的顺序型数据结构, 本质上是字节数组
。在模型上将这些连续的数组分为3大部分,分别是header+entry集合+end。
这个结构,redis没有弄成是一个结构体,在看源代码的时候是找不到这个结构的。是可以通过创建压缩列表函数ziplistNew看出压缩列表的结构组成。
- zlbytes:整个压缩列表所占用的字节数,该字段是4个字节大小,因此压缩列表最多有2^32-1个字节。
- zltail:ziplist头部到末尾元素的长度,通过zltail字段可以很方便获取末尾元素的地址。该字段是4个字节大小。
- zllen:压缩列表的元素个数,该字段是2字节大小。zllen无法存储元素个数超过65535(2^16-1)的压缩列表,必须遍历整个压缩列表才能获取到元素个数。
- entry:压缩列表存储的元素,可以是字节数组或者整数,长度不限。entry的编码结构将在后面详细介绍。
- zlend:压缩列表的结尾,占1个字节,恒为0xFF。
Redis并没有像之前的字符串SDS,字典等结构那样封装一个结构体来保存压缩列表的信息。那是要怎样去对该数据进行操作呢。
Redis是通过定义一系列宏定义来对数据进行操作。其通过字节之间的定位和计算来获取数据的。
//返回整个压缩列表的总字节
#define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl)))
//返回压缩列表的tail_offset变量,方便获取最后一个节点的位置
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
//返回压缩列表的节点数量
#define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
//返回压缩列表的表头的字节数
//(内存字节数zlbytes,最后一个节点地址ztail_offset,节点总数量zllength)
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
//返回压缩列表最后结尾的字节数
#define ZIPLIST_END_SIZE (sizeof(uint8_t))
//返回压缩列表首节点地址
#define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE)
//返回压缩列表尾节点地址
#define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
//返回压缩列表最后结尾的地址
#define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
2.压缩列表节点的构成
很多人看代码找到的应该是zlentry。但是其代码的注释Note that this is not how the data is actually encoded。
/* We use this function to receive information about a ziplist entry.* Note that this is not how the data is actually encoded, is just what we* get filled by a function in order to operate more easily. */
//翻译:我们使用此函数来接收有关压缩列表条目的信息。
//请注意,这并不是数据的实际编码方式,只是我们通过函数填充的内容,以便更容易地操作
typedef struct zlentry {unsigned int prevrawlensize; //记录prevrawlen需要的字节数unsigned int prevrawlen; //记录上个节点的长度unsigned int lensize; //编码格式需要的字节数unsigned int len; //记录节点的元素的长度,是节点元素的长度,不是节点长度unsigned int headersize; //prevrawlensize+lensize unsigned char encoding; //编码格式,在该结构体中就是用来简单区分是字符数组还是整数类型的unsigned char *p; //具体的数据指针
} zlentry;
对于该结构体,我们需要注意三个字段,prevrawlensize,lensize,headersize。源码中会经常使用到这三个字段。
而这个结构存储实在是太浪费空间了,这不符合压缩列表的设计目的。Redis对上述结构进行了改进了,抽象合并了三个参数。这三个参数整体被称为entry.
压缩列表的结点可以保存一个字符数组或者整数值。这说明每个节点之间的保存的数据类型是可以不同的。
结构体zlentry中的字段与entry中的对应关系:
- 记录previous_entry_length所需用的字节数就是prevrawlensize,
- 记录encoding所需用的字节数就是lensize。
- headersize=prevrawlensize+lensize。所以在entry地址首部+headersize,就是可以访问到content。
- 字段len就是content部分的长度。entry地址首部+headersize+len,就可以访问到下一节点的首地址了。
previous_entry_length
该属性记录了压缩列表前一个节点的长度(不是该元素的长度),占1个字节或者5个字节。
- 如果前一个节点的长度小于254个字节,那么使用1个字节来保存这个长度值,即前一个节点的长度可以使用1个字节表示。
- 如果前一个节点的长度大于等于254个字节,那么使用5个字节来保存这个长度值,第一个字节会被设置为0xFE(十进制的254),之后的四个字节则用于保存前一个节点的长度。
例子:大于等于254字节时的表示
注意: ZipList中所有存储长度的数值均是采用小端字节序,即是低位字节在低地址,高位字节在高地址。如上图所示。这样也方便Redis获取长度,因为访问到字段previous_entry_length时候,内存是从低地址开始向高地址的。
previous_entry_length中关联的源码
#define ZIP_BIG_PREVLEN 254 int zipStorePrevEntryLengthLarge(unsigned char *p, unsigned int len) {if (p != NULL) {p[0] = ZIP_BIG_PREVLEN; //第一个字节固定存放254memcpy(p+1,&len,sizeof(len)); //上一个节点的长度存放在后四个字节memrev32ifbe(p+1);}return 1+sizeof(len); //sizeof(len)的值为4
}//对prevrawlen进行编码 或者直接返回编码len长度所需要的字节数
unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) {if (p == NULL) {return (len < ZIP_BIG_PREVLEN) ? 1 : sizeof(len)+1;} else {if (len < ZIP_BIG_PREVLEN) { //上一个节点的长度小于254p[0] = len; //preventrylength域只占一个字节return 1;} else {return zipStorePrevEntryLengthLarge(p,len);}}
}/* Return the number of bytes used to encode the length of the previous* entry. The length is returned by setting the var 'prevlensize'. */
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do { \if ((ptr)[0] < ZIP_BIG_PREVLEN) { \(prevlensize) = 1; \} else { \(prevlensize) = 5; \} \
} while(0);
由于压缩列表中的数据以一种不规则的方式进行紧邻,无法通过后退指针来找到上一个元素,而通过保存上一个节点的长度,用当前的地址减去这个长度,就可以很容易的获取到了上一个节点的位置,通过一个一个节点向前回溯,来达到从表尾往表头遍历的操作。
举个例子:有一个指向当前结点起始地址的指针c,那只要用指针c减去当前结点previous_entry_length属性的值,就可以得到一个指向前一个节点起始地址的指针p。
encodeing
节点的encoding记录了节点的content属性所保存数据的类型和长度。
- 1字节、2字节或者5字节长, 值的最高位为 00 、 01 或者 10 的是字符数组编码: 数组的长度由编码除去最高两位之后的其他位记录;
- 1字节长, 值的最高位以 11 开头的是整数编码: 整数值的类型和长度由编码除去最高两位之后的其他位记录;
如果 content 存放的数据是字符数组类型,关联encoding的源码如下:
/* Different encoding/length possibilities */
#define ZIP_STR_MASK 0xc0 /* 1100 0000 */
#define ZIP_INT_MASK 0x30 /* 0011 0000 */
#define ZIP_STR_06B (0 << 6) /* 00 开头 */
#define ZIP_STR_14B (1 << 6) /* 01 开头 */
#define ZIP_STR_32B (2 << 6) /* 10 开头 *//* Macro to determine if the entry is a string. String entries never start* with "11" as most significant bits of the first byte. */
#define ZIP_IS_STR(enc) (((enc) & ZIP_STR_MASK) < ZIP_STR_MASK) /* 不以 11 开头就是字符数组 */unsigned int zipStoreEntryEncoding(unsigned char *p, unsigned char encoding, unsigned int rawlen) {//len字段表示 encoding 域所占字节,len先初始化1个字节unsigned char len = 1, buf[5];if (ZIP_IS_STR(encoding)) {/* Although encoding is given it may not be set for strings,* so we determine it here using the raw length. */if (rawlen <= 0x3f) { //0x3f的二进制是0b0011 1111if (!p) return len;buf[0] = ZIP_STR_06B | rawlen;} else if (rawlen <= 0x3fff) {len += 1; //占2个字节if (!p) return len;buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f); //第一个字节的头两位固定为01buf[1] = rawlen & 0xff;} else {len += 4; //占5个字节if (!p) return len;buf[0] = ZIP_STR_32B; //第一个字节的值固定为 1000 0000buf[1] = (rawlen >> 24) & 0xff;buf[2] = (rawlen >> 16) & 0xff;buf[3] = (rawlen >> 8) & 0xff;buf[4] = rawlen & 0xff;}} else {/* Implies integer encoding, so length is always 1. */if (!p) return len;buf[0] = encoding; //如果是整型,只占用1个字节}/* Store this length at p. */memcpy(p,buf,len);return len;
}
如果 content 存放的数据是整数,关联encoding的源码如下:
/* Different encoding/length possibilities */
#define ZIP_INT_16B (0xc0 | 0<<4) /* 1100 0000 */
#define ZIP_INT_32B (0xc0 | 1<<4) /* 1101 0000 */
#define ZIP_INT_64B (0xc0 | 2<<4) /* 1110 0000 */
#define ZIP_INT_24B (0xc0 | 3<<4) /* 1111 0000 */
#define ZIP_INT_8B 0xfe /* 1111 1110 *//* 4 bit integer immediate encoding |1111xxxx| with xxxx between* 0001 and 1101. */
#define ZIP_INT_IMM_MASK 0x0f /* Mask to extract the 4 bits value. To addone is needed to reconstruct the value. */
#define ZIP_INT_IMM_MIN 0xf1 /* 11110001 */
#define ZIP_INT_IMM_MAX 0xfd /* 11111101 *//* Store integer 'value' at 'p', encoded as 'encoding' */
//根据encoding把value保存在p内,(传递进来的p是entry地址首部+headersize,即是content部分)
void zipSaveInteger(unsigned char *p, int64_t value, unsigned char encoding) {int16_t i16;int32_t i32;int64_t i64;if (encoding == ZIP_INT_8B) {((int8_t*)p)[0] = (int8_t)value;} else if (encoding == ZIP_INT_16B) {i16 = value;memcpy(p,&i16,sizeof(i16));memrev16ifbe(p);} else if (encoding == ZIP_INT_24B) {i32 = value<<8;memrev32ifbe(&i32);memcpy(p,((uint8_t*)&i32)+1,sizeof(i32)-sizeof(uint8_t));} else if (encoding == ZIP_INT_32B) {i32 = value;memcpy(p,&i32,sizeof(i32));memrev32ifbe(p);} else if (encoding == ZIP_INT_64B) {i64 = value;memcpy(p,&i64,sizeof(i64));memrev64ifbe(p);} else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) {/* Nothing to do, the value is stored in the encoding itself. */} else {assert(NULL);}
}/* Read integer encoded as 'encoding' from 'p' */
//根据encoding从p内读取数值,(传递进来的p是entry地址首部+headersize,即是content部分)
int64_t zipLoadInteger(unsigned char *p, unsigned char encoding) {int16_t i16;int32_t i32;int64_t i64, ret = 0;if (encoding == ZIP_INT_8B) {ret = ((int8_t*)p)[0];} else if (encoding == ZIP_INT_16B) {memcpy(&i16,p,sizeof(i16));memrev16ifbe(&i16);ret = i16;} else if (encoding == ZIP_INT_32B) {memcpy(&i32,p,sizeof(i32));memrev32ifbe(&i32);ret = i32;} else if (encoding == ZIP_INT_24B) {i32 = 0;memcpy(((uint8_t*)&i32)+1,p,sizeof(i32)-sizeof(uint8_t));memrev32ifbe(&i32);ret = i32>>8;} else if (encoding == ZIP_INT_64B) {memcpy(&i64,p,sizeof(i64));memrev64ifbe(&i64);ret = i64;} else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) {ret = (encoding & ZIP_INT_IMM_MASK)-1; //这里减了1,所以4bit整数的值是从0到12} else {assert(NULL);}return ret;
}
获取记录encoding所需的字节数和节点的元素长度对应的源码如下:
//使用例子
static inline void zipEntry(unsigned char *p, zlentry *e) {//................................//p中的字段对应赋值给e中的字段ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);
}#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do { \if ((encoding) < ZIP_STR_MASK) { \if ((encoding) == ZIP_STR_06B) { \(lensize) = 1; \(len) = (ptr)[0] & 0x3f; \} else if ((encoding) == ZIP_STR_14B) { \(lensize) = 2; \(len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1]; \} else if ((encoding) == ZIP_STR_32B) { \(lensize) = 5; \(len) = ((ptr)[1] << 24) | \((ptr)[2] << 16) | \((ptr)[3] << 8) | \((ptr)[4]); \} else { \(lensize) = 0; /* bad encoding, should be covered by a previous */ \(len) = 0; /* ZIP_ASSERT_ENCODING / zipEncodingLenSize, or */ \/* match the lensize after this macro with 0. */ \} \} else { \(lensize) = 1; \if ((encoding) == ZIP_INT_8B) (len) = 1; \else if ((encoding) == ZIP_INT_16B) (len) = 2; \else if ((encoding) == ZIP_INT_24B) (len) = 3; \else if ((encoding) == ZIP_INT_32B) (len) = 4; \else if ((encoding) == ZIP_INT_64B) (len) = 8; \else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX) \(len) = 0; /* 4 bit immediate */ \else \(lensize) = (len) = 0; /* bad encoding */ \} \
} while(0)
conent
该属性负责保存节点的值。而节点的值是字符数组或者整数,值的类型和长度是由节点的encoding属性来决定的。
例如存储字符数组,编码的最高两位00表示类型为字节数组,后六位001011表示字节数组的长度为11。content保存着结点的值"hello world"。
存储整数值,编码最高两位是11,表示存储的为整数,类型为int16_t,content保存着结点的值10086。
3.压缩列表API
1.创建列表 ziplistNew
unsigned char *ziplistNew(void) {//没有节点时,压缩列表的长度bytes = 头部和尾部的长度之和unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;unsigned char *zl = zmalloc(bytes);ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); //intrev32ifbe函数就是看是否需要进行大小端序转换的ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);ZIPLIST_LENGTH(zl) = 0;//因为是空的压缩列表,节点数量初始化为0zl[bytes-1] = ZIP_END; //设置表末端return zl;
}
2.正向遍历 ziplistNext
该函数返回p指向节点的下一个节点,如果p指向最后一个节点或者结尾标志,则返回NULL。
其计算逻辑是下一个节点的指针等于p加p指向节点的长度。
unsigned char *ziplistNext(unsigned char *zl, unsigned char *p) {((void) zl);//获取整个压缩列表的总字节size_t zlbytes = intrev32ifbe(ZIPLIST_BYTES(zl));if (p[0] == ZIP_END) { /* 如果p指向结尾标志,则返回NULL */return NULL;}/* zipRawEntryLength函数返回p指向节点的长度, * p加p指向节点的长度即下个节点的地址。 */p += zipRawEntryLength(p);if (p[0] == ZIP_END) {return NULL;}//进行安全判断的,是否p超过了链表的内存范围zipAssertValidEntry(zl, zlbytes, p);return p;
}/* 返回p指向节点的长度,是节点长度,不是节点的元素长度 */
static inline unsigned int zipRawEntryLength(unsigned char *p) {zlentry e;/* 解析p指向元素至e结构体变量中。 */zipEntry(p, &e);/* e.headersize + e.len表示节点的长度。*/return e.headersize + e.len;
}//验证p是否未到达ziplist分配之外
static inline void zipAssertValidEntry(unsigned char* zl, size_t zlbytes, unsigned char *p) {zlentry e;assert(zipEntrySafe(zl, zlbytes, p, &e, 1));
}
其中,函数zipEntry和zipEntrySafe是比较相似的。关于这两个函数的,需要回头在熟悉下结构体zlentry。其主要是把p指向的元素的一些字段解析赋值给zlentry中的字段。
而函数zipEntrySafe与zipEntry区别是增加了数据校验。
//解析p指向的元素的信息到e指向的zlentry结构体变量中。
static inline void zipEntry(unsigned char *p, zlentry *e) {/* ZIP_DECODE_PREVLEN是宏函数,//将p对应的节点的记录previous_entry_length所需的字节数和previous_entry_length值赋值给e->prevrawlensize和 e->prevrawlen*/ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);/* ZIP_ENTRY_ENCODING是宏函数,通过元素编码部分的指针将不包含content长度的编码解析至e->encoding字段。 */ZIP_ENTRY_ENCODING(p + e->prevrawlensize, e->encoding);/* ZIP_DECODE_LENGTH是宏函数,通过传递元素编码部分的指针以及e->encoding,* 将记录编码部分的长度以及content的长度赋值给e->lensize、e->len两字段。 */ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);e->headersize = e->prevrawlensize + e->lensize;e->p = p;
}//可以把该函数分成两部分来看,一部分是if (p >= zlfirst && p + 10 < zllast)这个判断,要是符合这个判断就走流程,之后退出。
static inline int zipEntrySafe(unsigned char* zl, size_t zlbytes, unsigned char *p, zlentry *e, int validate_prevlen) {unsigned char *zlfirst = zl + ZIPLIST_HEADER_SIZE;unsigned char *zllast = zl + zlbytes - ZIPLIST_END_SIZE;
#define OUT_OF_RANGE(p) (unlikely((p) < zlfirst || (p) > zllast))//第一部分,走捷径的/* If threre's no possibility for the header to reach outside the ziplist,* take the fast path. (max lensize and prevrawlensize are both 5 bytes) */if (p >= zlfirst && p + 10 < zllast) {ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);ZIP_ENTRY_ENCODING(p + e->prevrawlensize, e->encoding);ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);e->headersize = e->prevrawlensize + e->lensize;e->p = p;/* We didn't call ZIP_ASSERT_ENCODING, so we check lensize was set to 0. */if (unlikely(e->lensize == 0))return 0;/* Make sure the entry doesn't rech outside the edge of the ziplist */if (OUT_OF_RANGE(p + e->headersize + e->len))return 0;/* Make sure prevlen doesn't rech outside the edge of the ziplist */if (validate_prevlen && OUT_OF_RANGE(p - e->prevrawlen))return 0;return 1;}//第二部分,不然不走捷径/* Make sure the pointer doesn't rech outside the edge of the ziplist */if (OUT_OF_RANGE(p))return 0;/* Make sure the encoded prevlen header doesn't reach outside the allocation */ZIP_DECODE_PREVLENSIZE(p, e->prevrawlensize);if (OUT_OF_RANGE(p + e->prevrawlensize))return 0;/* Make sure encoded entry header is valid. */ZIP_ENTRY_ENCODING(p + e->prevrawlensize, e->encoding);e->lensize = zipEncodingLenSize(e->encoding);if (unlikely(e->lensize == ZIP_ENCODING_SIZE_INVALID))return 0;/* Make sure the encoded entry header doesn't reach outside the allocation */if (OUT_OF_RANGE(p + e->prevrawlensize + e->lensize))return 0;/* Decode the prevlen and entry len headers. */ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);e->headersize = e->prevrawlensize + e->lensize;/* Make sure the entry doesn't rech outside the edge of the ziplist */if (OUT_OF_RANGE(p + e->headersize + e->len))return 0;/* Make sure prevlen doesn't rech outside the edge of the ziplist */if (validate_prevlen && OUT_OF_RANGE(p - e->prevrawlen))return 0;e->p = p;return 1;
#undef OUT_OF_RANGE
}
3.反向遍历 ziplistPrev
该函数返回p指向元素的上一个元素,如果p指向第一个元素,则返回NULL。
其逻辑是上一个元素的指针等于p减去上一个元素的的长度。
unsigned char *ziplistPrev(unsigned char *zl, unsigned char *p) {unsigned int prevlensize, prevlen = 0;if (p[0] == ZIP_END) { //p指向结尾标志p = ZIPLIST_ENTRY_TAIL(zl); //返回压缩列表尾节点地址return (p[0] == ZIP_END) ? NULL : p; //如果末尾元素是结束标志,表示是个空的ziplist} else if (p == ZIPLIST_ENTRY_HEAD(zl)) { //头节点的地址return NULL;} else { //剩余的, p指向中间元素ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); //将p对应的节点的记录previous_entry_length所需的字节数和previous_entry_length值赋值给prevlensize和prevlenassert(prevlen > 0);p-=prevlen;size_t zlbytes = intrev32ifbe(ZIPLIST_BYTES(zl));zipAssertValidEntry(zl, zlbytes, p);return p;}
}
4.插入元素 ziplistInsert
/* 在ziplist指定位置插入新的元素。* * zl:ziplist的指针。* p:插入位置对应的指针。* s:待插入字符串的指针,整数用整数字符串表示。* slen:待插入字符串的长度。 */
unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {return __ziplistInsert(zl,p,s,slen);
}unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, newlen;unsigned int prevlensize, prevlen = 0;size_t offset;int nextdiff = 0;unsigned char encoding = 0;long long value = 123456789;zlentry tail;//1if (p[0] != ZIP_END) { //表示不是尾部添加ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);} else {unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);if (ptail[0] != ZIP_END) {prevlen = zipRawEntryLengthSafe(zl, curlen, ptail);}}//2if (zipTryEncoding(s,slen,&value,&encoding)) {/* 'encoding' is set to the appropriate integer encoding */reqlen = zipIntSize(encoding);} else {reqlen = slen;}//3reqlen += zipStorePrevEntryLength(NULL,prevlen);//计算新增节点 previous_entry_length 域的长度reqlen += zipStoreEntryEncoding(NULL,encoding,slen);//计算新增节点 记录encoding所需的长度//4int forcelarge = 0;nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;if (nextdiff == -4 && reqlen < 4) {nextdiff = 0;forcelarge = 1;}//5//因为 realloc 函数可能会改变 zl 指针的地址,要先存一下插入位置相对于压缩列表初始位置的内存偏移offset = p-zl;newlen = curlen+reqlen+nextdiff;zl = ziplistResize(zl,newlen);//内部调用了realloc p = zl+offset; //重新计算插入位置//.....太长了,未完待续,分成两部分
}//返回值= 新增节点的previous_entry_length属性的长度 - 当前节点的previous_entry_length属性的长度
int zipPrevLenByteDiff(unsigned char *p, unsigned int len) {unsigned int prevlensize;ZIP_DECODE_PREVLENSIZE(p, prevlensize);return zipStorePrevEntryLength(NULL, len) - prevlensize;
}
1.若不是尾部添加,获取前一个节点的长度并存放在变量prevlen中。如果是尾部添加,则指针移到最后一个节点的起始处,记为ptail。紧接着判断ptail的起始位置是不是尾部ZIP_END,若不是则获取最后一个节点的长度存放在prevlen中。
2.对待插入的内容进行编码,并将内容的长度存放在reqlen变量中。如果是字符数组,则传入的参数slen就是新节点数据的长度。如果是整数,要根据整数的实际类型来计算新节点数据的长度(不是节点长度)。
3.zipStorePrevEntryLength是计算新增节点的previous_entry_length属性的长度。zipStoreEntryEncoding计算新增节点记录encoding所需的长度。reqlen之前就是content的长度值,添加这两个函数的返回值后就是作为新增节点的长度。
4. 如果不是在尾部添加节点,要确保后一个节点的 previous_entry_length 域能够容纳新增节点的长度。如果不能容纳,则需要计算差值,用于后面的节点扩展。nextdiff的值可以有三种情况:0-空间相等、4-需要更多空间、-4-空间富余。这个if判断条件有点难理解,后面再讲解。
5.因为 realloc 函数可能会改变 zl 指针的地址,先获取p到压缩链表起始处的偏移量offset。重新申请内存空间,重新计算插入位置。
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {//.....................//6/* Apply memory move when necessary and update tail offset. */if (p[0] != ZIP_END) {//将插入位置之后的内容整个移动到新增节点之后//reqlen是新节点的长度,p+reqlen就是为新节点预留位置//nextdiff为后一个节点扩展或收缩的内存大小//curlen-offset-1+nextdiff为总共需要迁移的内存长度memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);//将新增节点的长度存在下一个节点的 previous_entry_length域if (forcelarge)zipStorePrevEntryLengthLarge(p+reqlen,reqlen);elsezipStorePrevEntryLength(p+reqlen,reqlen);//更行尾部对于压缩链表起始处的偏移量ZIPLIST_TAIL_OFFSET(zl) =intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);//p+reqlen处节点的信息记录到tail中//类似 zipEntry(p+reqlen, &tail);zipEntrySafe是做了些安全校验的,前面有讲过assert(zipEntrySafe(zl, newlen, p+reqlen, &tail, 1)); //判断p+reqlen处的节点是否是最后一个节点if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {//zltail 需要加上nextdiffZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);}} else {/* This element will be the new tail. *///7. 是尾部,更新zltail的偏移量ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);}/* When nextdiff != 0, the raw length of the next entry has changed, so* we need to cascade the update throughout the ziplist *///8 nextdiff!=0就进行连锁更新if (nextdiff != 0) {offset = p-zl;//进行连锁更新zl = __ziplistCascadeUpdate(zl,p+reqlen);p = zl+offset;}/* Write the entry *///9p += zipStorePrevEntryLength(p,prevlen);//保存新增节点的 previous_entry_length 域的值p += zipStoreEntryEncoding(p,encoding,slen);//保存新增节点的 encoding 域值//保存新增节点的 content 域的值if (ZIP_IS_STR(encoding)) {memcpy(p,s,slen);} else {zipSaveInteger(p,value,encoding);}//10.增加节点,压缩列表的节点数量加1ZIPLIST_INCR_LENGTH(zl,1);return zl;
}
6.使用memmove将插入位置之后的内容整个移动到新增节点之后。将新增节点的长度存在下一个节点的 previous_entry_length位置。更行尾部对于压缩链表起始处的偏移量。将p+reqlen处节点的信息记录到tail中。判断p+reqlen处的节点是否是最后一个节点,若不是尾部偏移量zltail 需要加上nextdiff。
7.是尾部,更新zltail的偏移量。
8 nextdiff != 0就进行连锁更新。(这个是重点,后面会讲解)
9.p对于的地址位置已经改变了,需要重新获取新增节点的 previous_entry_length 域的值和新增节点的 encoding 域值,并添加这两个值更行p,之后保存新增节点的conent的内容。
10.增加节点,压缩列表的节点数量加1。返回压缩链表
5.级联更新 __ziplistCascadeUpdate
unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {zlentry cur;size_t prevlen, prevlensize, prevoffset; /* Informat of the last changed entry. */size_t firstentrylen; /* Used to handle insert at head. */size_t rawlen, curlen = intrev32ifbe(ZIPLIST_BYTES(zl));size_t extra = 0, cnt = 0, offset;size_t delta = 4; /* Extra bytes needed to update a entry's prevlen (5-1). */unsigned char *tail = zl + intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl));/* Empty ziplist */if (p[0] == ZIP_END) return zl;zipEntry(p, &cur); /* no need for "safe" variant since the input pointer was validated by the function that returned it. */firstentrylen = prevlen = cur.headersize + cur.len;prevlensize = zipStorePrevEntryLength(NULL, prevlen);prevoffset = p - zl;p += prevlen;/* Iterate ziplist to find out how many extra bytes do we need to update it. */while (p[0] != ZIP_END) {assert(zipEntrySafe(zl, curlen, p, &cur, 0));/* Abort when "prevlen" has not changed. */if (cur.prevrawlen == prevlen) break;/* Abort when entry's "prevlensize" is big enough. */if (cur.prevrawlensize >= prevlensize) {if (cur.prevrawlensize == prevlensize) {zipStorePrevEntryLength(p, prevlen);} else {/* This would result in shrinking, which we want to avoid.* So, set "prevlen" in the available bytes. */zipStorePrevEntryLengthLarge(p, prevlen);}break;}/* cur.prevrawlen means cur is the former head entry. */assert(cur.prevrawlen == 0 || cur.prevrawlen + delta == prevlen);/* Update prev entry's info and advance the cursor. */rawlen = cur.headersize + cur.len;prevlen = rawlen + delta; prevlensize = zipStorePrevEntryLength(NULL, prevlen);prevoffset = p - zl;p += rawlen;extra += delta;cnt++;}/* Extra bytes is zero all update has been done(or no need to update). */if (extra == 0) return zl;/* Update tail offset after loop. */if (tail == zl + prevoffset) {/* When the the last entry we need to update is also the tail, update tail offset* unless this is the only entry that was updated (so the tail offset didn't change). */if (extra - delta != 0) {ZIPLIST_TAIL_OFFSET(zl) =intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra-delta);}} else {/* Update the tail offset in cases where the last entry we updated is not the tail. */ZIPLIST_TAIL_OFFSET(zl) =intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);}/* Now "p" points at the first unchanged byte in original ziplist,* move data after that to new ziplist. */offset = p - zl;zl = ziplistResize(zl, curlen + extra);p = zl + offset;memmove(p + extra, p, curlen - offset - 1);p += extra;/* Iterate all entries that need to be updated tail to head. */while (cnt) {zipEntry(zl + prevoffset, &cur); /* no need for "safe" variant since we already iterated on all these entries above. */rawlen = cur.headersize + cur.len;/* Move entry to tail and reset prevlen. */memmove(p - (rawlen - cur.prevrawlensize), zl + prevoffset + cur.prevrawlensize, rawlen - cur.prevrawlensize);p -= (rawlen + delta);if (cur.prevrawlen == 0) {/* "cur" is the previous head entry, update its prevlen with firstentrylen. */zipStorePrevEntryLength(p, firstentrylen);} else {/* An entry's prevlen can only increment 4 bytes. */zipStorePrevEntryLength(p, cur.prevrawlen+delta);}/* Foward to previous entry. */prevoffset -= cur.prevrawlen;cnt--;}return zl;
}
由于连锁更新比较耗费资源,所以开发时采取的策略是尽量避免连锁更新,从代码中也可以看到,在级联更新时候,如果prevlen有富余,previous_entry_length属性的字节大小也不进行压缩。
接着看回插入元素时的源代码
/* Insert item at "p". */
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {int forcelarge = 0;nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;if (nextdiff == -4 && reqlen < 4) {nextdiff = 0;forcelarge = 1;}/* Store offset because a realloc may change the address of zl. */newlen = curlen+reqlen+nextdiff;zl = ziplistResize(zl,newlen);
}int zipPrevLenByteDiff(unsigned char *p, unsigned int len) {unsigned int prevlensize;//宏,展开之后根据p[0]处的值计算出prevlensize,如果p[0]<254,prevlensize为1,否则为5ZIP_DECODE_PREVLENSIZE(p, prevlensize);//zipStorePrevEntryLength函数如果第一个参数为NULL,则根据len字段计算需要的字节数,同理,len<254为1个字节,否则为5个字节return zipStorePrevEntryLength(NULL, len) - prevlensize;
}
nextdiff= 即将插入的entry长度所需要的存储字节数 - 插入位置p当前保存prev_entry_len字段的字节数.
其值有三种类型
- 0: 空间相等
- 4:需要更多空间
- -4:空间富余
当 nextdiff == -4
时,reqlen
怎么可能会小于 4,并且当我们理解了 reqlen < 4
后,也会很疑惑 if
控制块里面代码的作用,通过下图来说明。
该图片来自 redis-6.06 底层数据结构——压缩列表 - 知乎
红色字的讲明白了为什么需要这个if判断的。此时reqlen=3,nextdiff=-4,curlen+reqlen+nextdiff是小于curlen的,那realloc后,总长度变小,会导致最后一个节点数据丢失。
6.删除节点
删除节点的主要逻辑(配合源代码看,需要知道源代码中的变量才好了解该逻辑)。
源代码如下:
unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p) {size_t offset = *p-zl;zl = __ziplistDelete(zl,*p,1);*p = zl+offset;return zl;
}
unsigned char *ziplistDeleteRange(unsigned char *zl, int index, unsigned int num) {unsigned char *p = ziplistIndex(zl,index);return (p == NULL) ? zl : __ziplistDelete(zl,p,num);
}//从压缩列表的 P 位置删除 num 个节点
unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {unsigned int i, totlen, deleted = 0;size_t offset;int nextdiff = 0;zlentry first, tail;size_t zlbytes = intrev32ifbe(ZIPLIST_BYTES(zl));//1//获取P位置的节点信息zipEntry(p, &first);for (i = 0; p[0] != ZIP_END && i < num; i++) {//移动到下一个节点的起始位置p += zipRawEntryLengthSafe(zl, zlbytes, p);//deleted为实际要删除的节点数量deleted++;}//2//要删除的节点长度之和totlen = p-first.p; /* Bytes taken by the element(s) to delete. */if (totlen > 0) {uint32_t set_tail;//当前位置是否是压缩列表尾部if (p[0] != ZIP_END) {//计算要存下first节点的前一个节点的长度,p位置节点的previous_entry_length域需要扩展的字节数nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);//根据nextdiff更新p位置节点的 previous_entry_length 域位置p -= nextdiff;//再修改P位置节点的 previous_entry_length域的值zipStorePrevEntryLength(p,first.prevrawlen);//更新最后一个节点到压缩列表开头的内存偏移量set_tail = intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen; //获取p位置的节点信息赋值给tail,并进行安全校验assert(zipEntrySafe(zl, zlbytes, p, &tail, 1));//判断tail节点是否是最后一个节点if (p[tail.headersize+tail.len] != ZIP_END) {//最后一个节点到压缩列表开头的内存偏移量还有计算上扩展或收缩的大小set_tail = set_tail + nextdiff;}//内存迁移size_t bytes_to_move = zlbytes-(p-zl)-1;memmove(first.p,p,bytes_to_move);} else {/* The entire tail was deleted. No need to move memory. */set_tail = (first.p-zl)-first.prevrawlen;}/* Resize and update length */offset = first.p-zl;zlbytes -= totlen - nextdiff;zl = ziplistResize(zl, zlbytes);p = zl+offset;/* Update record count */ZIPLIST_INCR_LENGTH(zl,-deleted);/* Set the tail offset computed above */assert(set_tail <= zlbytes - ZIPLIST_END_SIZE);ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(set_tail);if (nextdiff != 0)//连锁更新zl = __ziplistCascadeUpdate(zl,p);}return zl;
}
7.查找entry ziplistFind
在zl指向的ziplist上,查找与字符串vstr匹配的元素。依次匹配每个元素,如果匹配成功就返回,否则继续匹配下一个元素。
时间复杂度为O(N^2),因为节点值可能是字符串,而字符串比较复杂度为O(N)
/** p 是开始查找的压缩列表节点地址* vstr 是要查找的元素内容* vlen 是要查找的元素长度* skip 是每查找一次跳过的元素个数
*/
unsigned char *ziplistFind(unsigned char *zl, unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {int skipcnt = 0;unsigned char vencoding = 0;long long vll = 0;size_t zlbytes = ziplistBlobLen(zl); //获得压缩链表的总长度while (p[0] != ZIP_END) {struct zlentry e;unsigned char *q;assert(zipEntrySafe(zl, zlbytes, p, &e, 1)); //把p对应的节点信息赋值给eq = p + e.prevrawlensize + e.lensize; //指针移到 content 的位置if (skipcnt == 0) {/* Compare current entry with specified entry */if (ZIP_IS_STR(e.encoding)) { // 如果编码是字符串的if (e.len == vlen && memcmp(q, vstr, vlen) == 0) {return p;}} else {//将要查找的元素编码为整数if (vencoding == 0) {if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) { //数字解码成功的话返回1/* If the entry can't be encoded we set it to* UCHAR_MAX so that we don't retry again the next* time. */vencoding = UCHAR_MAX;}/* Must be non-zero by now */assert(vencoding);}/* Compare current entry with specified entry, do it only* if vencoding != UCHAR_MAX because if there is no encoding* possible for the field it can't be a valid integer. */if (vencoding != UCHAR_MAX) {long long ll = zipLoadInteger(q, e.encoding); //返回q指针位置对应的节点的元素if (ll == vll) {return p; //匹配成功,返回结果}}}/* Reset skip count */skipcnt = skip;} else {/* Skip entry */skipcnt--;}p = q + e.len; //到达下一节点的起始位置}return NULL;
}
需要注意的是,find函数需传一个skip的值。skip的引入主要是上层容器的使用。假如说上层容器是一个hash结构,使用了ziplist,那么会按照顺序先存储key,再存储value,那么find的时候只需要查key即可,通过skip控制多跳一下,提升查找效率。
4.连锁更新问题
ziplist中每个节点的previous_entry_length记录了前一个节点的长度,考虑在表头插入节点的场景:
如果列表所有节点长度都小于254字节(previous_entry_length 域的都是 1 字节长),且插入节点大于等于254字节,就会导致所有节点的previous_entry_length属性都必须从1字节扩展为5字节。Redis 将这种在特殊情况下产生的连续多次空间扩展操作称之为 连锁更新(cascade update)。
看下面例子:给定一个节点个数为3,每个节点大小均为253字节的ziplist,往表头插入一个500字节大小的节点。
不仅仅是插入节点会引起连锁更新,删除节点也可能会引发连锁更新。连锁更新在最坏的情况下需对压缩列表执行 N 次空间重新分配操作,而每次空间重分配的最坏复杂度为 O(N),所以连锁更新的最坏复杂度为 O(N^2)。
既然连锁更新的最坏复杂度为O(N^2),为什么Redis还是放心使用ziplist?
- 连锁更新触发条件苛刻,只有满足存在多个连续长度为250-253之间的节点才能触发。
- ziplist只应用于节点数少且数据小的场景,即使出现了连续更新,需要更新的节点数量也很少,不会出现性能问题。
5.引出结构 listpack
这些连锁更新的问题理解的是有点复杂的, Redis提出了一个优化后的替代结构listpack。
Redis7.0 才将 listpack 完整替代 ziplist。
listpack的结构和节点结构
该图片来自 https://segmentfault.com/a/1190000041670843
listpack主要做了如下两点改进:
- 头部省去了4字节的zltail字段
- entry中不再保存prev_entry_length字段,而是改为保存本entry自己的长度,并且是存储在每个entry的尾部。
listpack是如何实现从尾部到头部遍历?
ziplist被设计为适合从尾部到头部逐个遍历,那么listpack也需要有这个功能。
首先通过tot-bytes偏移到结尾,然后从右到左读取element-tot-len(注意该字段设计为从右往左读取),这样既实现了尾部到头部的遍历,又没有连锁更新的情况。
这篇关于压缩列表ziplist的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!