深入理解Redis之有序集合

Posted by 令德湖周杰伦 on 05-29,2020

前言

redis有序集合,内部有2种实现方式,一种为压缩链表,另一种为跳跃表。两者都是redis的基本数据结构,各有千秋。今天我们通过zet的来深入的
认识下他们。

一、压缩列表ziplist

由一系列连续的内存块组成的顺序型数据结构,通过连续分配内存的方式,可以节约内存。 通过复杂的设计,来最大化节省内存。

结构

zlbytes + zltail + zllen+ entry1 + entry2 + ... + zlend
其中:
zlbytes:压缩列表总字节数,占4个字节
zltail:尾结点的偏移量,占4个字节
zllen:列表中结点的个数,占2个字节
entry1:结点(下文会详细说明)
zlend:固定为0xFF
可知:头部位置为从zl偏移10个字节就拿到第一个entry的地址。即p = zl + 10 ,尾部位置就是 p = zl + zltail代表的offset

entry的实际构成:prevlen + encoding + entry-data 或者 prevlen + encoding
(注意:这里的构成和ziplist.c中struct zlentry不同)

prevlen:前一个结点的字节数,prevlen字段可能占1个字节,也可能占5个字节,当占一个字节数时,代表前一个entry的总字节数不超过254。 当占5个字节数时,前一个entry的所占的总字节数为5个中的后四个字节所表示的数,比如 0xfe 00 00 00 ff 代表前一个entry总字节数为255个字节。

encoding:本结点的编码,这里的编码中也包含了本节点的entry-data字节长度。

字符类型的编码:

  • ZIP_STR_06B:(0 << 6) 即00xxxxxx 前两位00表示最大长度为63的字符串,后面6位表示实际字符串长度,encoding占1个字节
  • ZIP_STR_14B (1 << 6) 即01xxxxxx xxxxxxxx 前两位01表示中等长度的字符串(63<len<=16383),后面14位表示实际长度,encoding占用两个字节
  • ZIP_STR_32B (2 << 6) //10000000 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx 特大字符,第一个字节固定128(0X80),后面四个字节存储实际长度,encoding占用5个字节

int类型的编码:统一占一个字节

  • ZIP_INT_16B (0xc0 | 0<<4) //11000000 content内容是int16,长度是2个字节
  • ZIP_INT_32B (0xc0 | 1<<4) //11010000 content内容是int32,长度是4个字节
  • ZIP_INT_64B (0xc0 | 2<<4) //11100000 表示content内容是int64,长度是8个字节
  • ZIP_INT_24B (0xc0 | 3<<4) //11110000 表示content内容是int24,长度是3个字节
  • ZIP_INT_8B 0xfe // 11111110 表示content内容是int8,长度是1个字节
  • ZIP_INT_IMM 1111xxxx //代表小整数,没有content内容,encoding的后四位正好就可以表示出,范围在11110001~11111101之间,代表0~12的数,也就是xxxx-1,对应上面 prevlen + encoding 这种情况

entry-data:存储的内容
实际的例子:[02] [0b] [48 65 6c 6c 6f 20 57 6f 72 6c 64],02占一个字节,表示上一个entry的长度为2个字节,0b占一个字节,0x0000 1011 代表ZIP_STR_06B小字符串,且长度为11个字节。48 65 6c 6c 6f 20 57 6f 72 6c 64 这11个字节为存储的内容,ASCII为"Hello World"

为什么要254?

因为255是固定0Xff,代表zl的结尾,都知道一个字节只能代表0~255数字,而255已经使用,所以prevlen只能取最大的254,代表上一个元素的所占的总字节数。但是如果上一个字节数超过254怎么办。就需要扩展prevlen所占的字节数(原来为1个字节)为5个字节 size(len) + 1, 扩展后的第一个字节固定为254,后四个字节为真正的字节数。

问题

假如一个结点这样构成:prevlen = f0 1个字节,encodeing = c0 ff ff ff ff 5个字节,entry-data 内容有 32^2 - 1 个字节,那么下一个entry的prevlen = 32^2 - 1 + 5 + 1 ,那么5个字节不够表示了呀?!等等,32^2 - 1 B ≈ 1G 实际场景中单个value有这么存的么?!哈哈

级联更新

为什么为级联更新?why?

前面了解到prevlen所占的字节为1个或者5个,当prevlen所占的字节字数变化时,可能会导致entry的总长度发生变化,而entry的总长度变化会导致后一个entry的prevlen字段发生变化,这样就会引起连锁反应。

什么什么发生?when?

执行插入或者删除操作时都可能会触发。具体的触发条件,下文会举例说明。

如何发生的?how?

级联更新分为2种:级联扩展和级联收缩
看一个级联扩展的例子:e1 e2 e3 e4 e5 属性为 bytes(e1) = 100 bytes(e2) = 250 bytes(e3) = 251 bytes(e4) = 252 bytes(e5) = 253,其中:
e1.prevlen = 0
e2.prevlen = 100
e3.prevlen = 250
e4.prevlen = 251
e5.prevlen = 252
当一个新结点e要新插入到e1,e2之间,其中bytes(e) = 255,插入后bytes(e2.prevlen) = 1 变为 5,导致bytes(e2) = 250+4 ->bytes(e3.prevlen) = 5 -> bytes(e3) = 251+4 -> ...
就这样连锁的更新下去,直到结点的长度不发生变化时停止。

看一个级联收缩的例子:e1 e2 e3 e4 e5 属性为 bytes(e1) = 100 bytes(e2) = 254 bytes(e3) = 255 bytes(e4) = 256 bytes(e4) = 257,其中
e1.prevlen = 0
e2.prevlen = 100
e3.prevlen = 254
e4.prevlen = 255
e5.prevlen = 256
当删除e2时,那么导致bytes(e3.prevlen) = 5 变为 1 -> bytes(e3) = 255 - 4 = 251 -> bytes(e4.prevlen) = 5变为1
-> bytes(e4) = 256 - 4 = 252 ...
但是在级联收缩时,redis在实际中没有做收缩,因为5个字节也是可以存储1个字节的内容的。
虽然有点浪费,但是级联更新实在是太可怕了,所以浪费就浪费吧。所以 prevlen中存储的可能是 0xfe 00 00 00 fb

zset通过ziplist实现

使用2个entry来表示element和sorce,成组成对的插入和删除。(以下简称为e-s对)

基本操作

zadd:在插入时,首先遍历比较每一个element的sorce,然后确定插入位置。
zdel:删除时,首先遍历比较element,查找到e-s对,然后删除。
zupdate:更新时,首先遍历比较element,查找到e-s对,比较sorce,如果分数不同,就先删除,后新增。

zrang实现

  1. 取第一个e-s开始记数,通过传入的start确定开始查找的位置。其中reverse可以通过head和tail来确定。
  2. 根据区间长度循环取值,将ziplist中的entry转换为zset的数据

zrangByscore实现

  1. 根据reverse决定从头还是尾开始扫描,根据score确定第一个符合的元素。
  2. 继续循环,判断是否超过max,超过break

二、跳表skiplist

跳表在原有的有序链表上面增加了多级索引,通过索引来实现快速查找。跳表不仅能提高搜索性能,同时也可以提高插入和删除操作的性能。

结构

/**
 * 跳跃表结点结构体
 */
typedef struct zskiplistNode {
    sds ele; // key
    double score; // score
    struct zskiplistNode *backward; // 后退指针,它指向位于当前节点的前一个节点
    //层级结构体
    struct zskiplistLevel {
        struct zskiplistNode *forward; // 前进指针,前进指针用于访问位于表尾方向的其他节点,
        unsigned long span; // 跨度,跨度则记录了前进指针所指向节点和当前节点的距离
    } level[]; // 多层连接指针,多级索引
} zskiplistNode;

/**
 * 跳跃表结构体
 */
typedef struct zskiplist {
    struct zskiplistNode *header, // 头指针
            *tail; // 尾指针
    unsigned long length; // 记录跳跃表的长度,也即是,跳跃表目前包含节点的数量(表头节点不计算在内)
    int level; // 记录目前跳跃表内,层数最大的那个节点的层数(表头节点的层数不计算在内)
} zskiplist;

多级索引

为什么新增时要随机一个层级?why?

随机一个层级,是为了使索引的分布均匀,不然在某2个索引之间可能会聚集过多的元素,导致时间复杂退化,比如有一级索引:1和100,如果不更新索引的话,在后续执行插入操作后,2者之间,可能会插入很多的数据,最后退化为普通链表。

如果随机?how?

随机获取一个层级,随机函数生成的值要合理,不能过多的生成高层,也不能不生成低层。以下是redis的随机函数

#define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

解释下:(ZSKIPLIST_P * 0xFFFF) 就是 1111 1111 1111 1111 右移2位 -> 0011 1111 1111 1111,而random() & 0xFFFF 就是取确保在
0000 0000 0000 0000 ~ 1111 1111 1111 1111 随机到一个数。 而这些数中,当前2位是00时,会满足 00xx xxxx xxxx xxxx <
0011 1111 1111 1111, level++,而前2位是00的概率也是1/4,就是说level=2的概率为1/4,level如果想成为3,下次前2位还要随机到00,1/4 * 1/4 = 1/16,以此类推。

空间复杂度为:假如有n个元素要插入skiplist中,那么在第一层的索引个数为n/4,即n(level1) = n/4,n(level2)=n/16,n(level3) = n/64 ....
全部的索引个数 n/4 + n/16 + ... + 1 求等比数列的前n项和可知, S = (n-1)/3 , 所以skiplist额外的空间为n/3,也可以理解为平均每个节点
有 1.33 个指针。

zset通过skiplist来实现

通过 dict + skiplist来实现

typedef struct zset {
    dict *dict; // key => score 的字典
    zskiplist *zsl; // 跳跃表
} zset;

基本操作

zadd:新增时,插入一个元素,随机获取一个层级,更新索引和数据结构
zdel:删除时,将元素删除,更新索引和各层的span
zupdate:如果元素存在,且socre不同,如果newscore在前后2个节点score之间,代表不会影响排序,直接修改就行,如果分值影响了排序,则需要删除这个元素,并插入“新”的这个元素。

zrange

  1. 通过传入的start,找到符合范围的第一个节点ln,时间复杂度O(logN),最坏 O(N) ,N为跳跃表长度。通过span来实现,来记录level层上各个级点的跨度。
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        if (traversed == rank) {
            return x;
        }
    }
    return NULL;
}
  1. 根据区间长度循环取值
while(rangelen--) {        
    ln = reverse ? ln->backward : ln->level[0].forward;
}

zrangeByscore

  1. 根据传入的range,查找符合分数范围的第一个结点, 时间复杂度O(logN)
x = zsl->header;
//从最高层开始下沉查起
for (i = zsl->level-1; i >= 0; i--) {
    /* Go forward while *IN* range. */
    while (x->level[i].forward && zslValueLteMax(x->level[i].forward->score,range))
            x = x->level[i].forward;
}
  1. 根据区间长度循环取值,判断是否超过max,超过break
while (ln && limit--) {
    /* Abort when the node is no longer in range. 循环判断是否超过max,超过break*/
    if (reverse) {
        if (!zslValueGteMin(ln->score,&range)) break;
    } else {
        if (!zslValueLteMax(ln->score,&range)) break;
    }

    rangelen++;
    
    /* Move to next node */
    if (reverse) {
        ln = ln->backward;
    } else {
        ln = ln->level[0].forward;
    }
}

转换convert

redis中规定:当zset中的元素如果有满足如下条件时,zet的实现方式要变为skiplist。
zset-max-ziplist-entries 128 当元素个数超过128个
zset-max-ziplist-value 64 当单个元素的字节数超过64B
这个2个配置在redis.config 中可以配置

为什么要转换?why?

当元素个数超过128个,ziplist花费的时间复杂度较高了

什么时候转换?when?

  • ziplist -> skiplist,每次新增时,判断是否触发Convert
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries || sdslen(ele) > server.zset_max_ziplist_value){
	zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
}
  • skiplist -> ziplist,在加载rdb文件时,如果zset的数据量小于等于zset-max-ziplist-entries,并且最大的value不超过zset-max-ziplist-value,那么会将skiplist转化为ziplist。
if (zsetLength(o) <= server.zset_max_ziplist_entries && maxelelen <= server.zset_max_ziplist_value){
	zsetConvert(o,OBJ_ENCODING_ZIPLIST);
}

如何转换?

ziplist -> skiplist,循环遍历出ziplist中的e-s对,然后依次插入skiplist中。释放ziplist的内存。

zs = zmalloc(sizeof(*zs));
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();

eptr = ziplistIndex(zl,0);
sptr = ziplistNext(zl,eptr);

//ziplist遍历,并且插入zsl中
while (eptr != NULL) {
    score = zzlGetScore(sptr);
    serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
    if (vstr == NULL)
        ele = sdsfromlonglong(vlong);
    else
        ele = sdsnewlen((char*)vstr,vlen);

    node = zslInsert(zs->zsl,score,ele);
    serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
    zzlNext(zl,&eptr,&sptr);
}
//释放内存
zfree(zobj->ptr);
zobj->ptr = zs;
zobj->encoding = OBJ_ENCODING_SKIPLIST;

skiplist -> ziplist,从skiplist的头结点,找到level[0]层第一个元素,根据forward依次遍历出其他元素,并插入ziplist中,最后释skiplist的内存。

unsigned char *zl = ziplistNew();
zs = zobj->ptr;
dictRelease(zs->dict);
node = zs->zsl->header->level[0].forward;
zfree(zs->zsl->header);
zfree(zs->zsl);

while (node) {
    zl = zzlInsertAt(zl,NULL,node->ele,node->score);
    next = node->level[0].forward;
    zslFreeNode(node);
    node = next;
}

zfree(zs);
zobj->ptr = zl;
zobj->encoding = OBJ_ENCODING_ZIPLIST;

总结

ziplist的优点是节省内存,缺点是时间复杂度比较大,skiplist的优点是时间复杂度低,但是索引也会占内存,根据数据结构的知识,时间复杂度和空间复杂度二者是不能兼得,要根据具体的场景来选择一种合适的数据结构,redis通过在config中定义zset的元素数量的阀值来动态的调整底层的实现。