国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Redis3.2源碼分析-跳躍表zskiplist

luoyibu / 1996人閱讀

摘要:函數考慮了一些與客戶端交互的內容,學的時候沒必要細看,它主要分為以下兩步調用找到排名的節點從這個節點開始遍歷個節點下面是的代碼從高層向底層累加直到累加的值等于范圍查找功能給定一個的范圍,從中找出滿足該范圍的所有節點。

跳躍表是Redis zset的底層實現之一,zset在member較多時會采用跳躍表作為底層實現,它在添加、刪除、查找節點上都擁有與紅黑樹相當的性能,它其實說白了就是一種特殊的鏈表,鏈表的每個節點存了不同的“層”信息,用這種分層存節點的方式在查找節點時能跳過些節點,從而使添加、刪除、查找操作都擁有了O(logn)的平均時間復雜度。
下面簡單介紹一下跳躍表:
跳躍表最低層(第一層)是一個擁有跳躍表所有節點的普通鏈表,每次在往跳躍表插入鏈表節點時一定會插入到這個最低層,至于是否插入到上層 就由拋硬幣決定(這么說不是很準確,redis里這個概率是1/4而非1/2,為了表述方便先這么說),什么意思呢?假設已經有一個跳躍表,其高度只有一層:

往表中插入節點“7”時,假設插入7時拋硬幣的結果是正,則在第二層中插入“7”節點,繼續拋一次看看還能不能上到第三層,為反則停止插入,上層不再插入“7”節點了:

同理插入“4”節點假設連續拋兩次都拋了正面,第三次拋了反面,則“4”節點會插入到2、3層:

這就是跳躍表最基本的樣子。

查找一個節點時,我們只需從高層到低層,一個個鏈表查找,每次找到該層鏈表中小于等于目標節點的最大節點,直到找到為止。由于高層的鏈表迭代時會“跳過”低層的部分節點,所以跳躍表會比正常的鏈表查找少查部分節點,這也是skiplist名字的由來。

假如我們需要查找節點“5”:
先遍歷最高層,發現第三層頭結點的下一個節點是“4”,4<5,所以游標定位到“4”節點,但是“4”節點的下一個節點是空,得繼續往低層走;第二層也從“4”節點開始,“4”節點在第二層的下一個節點是“7”,7>5,公交車做過頭了,回來依舊定位在“4”節點;繼續往低層走,第一層“4”節點的下一個節點是“5”,這就找到了。

事實上插入前也需要進行跳躍表查找操作,上文演示的插入流程只是直接用了鏈表而省略了這一步。
跳躍表有了個大概的了解,接下來我們細說redis里的skiplist

zskiplist相關的結構體聲明

redis的skiplist有兩個主要的數據結構,

zskiplistNode:skiplist的節點

zskiplist:用來記錄一個skiplist的長度、高度等信息

zskiplistNode的定義
typedef struct zskiplistNode {
    robj *obj;//zset的member
    double score;//zset的score
    struct zskiplistNode *backward;//指向上一個節點,用于zrevrange命令
    struct zskiplistLevel {
        struct zskiplistNode *forward;//指向下一個節點
        unsigned int span;//到達后一個節點的跨度(兩個相鄰節點span為1)
    } level[];//該節點在各層的信息,柔性數組成員
} zskiplistNode;

backward變量是特意為zrevrange*系列命令準備的,目的是為了使跳躍表實現反向遍歷,普通跳躍表的實現里是非必要的。

level變量記錄了此節點各層的信息:

level[x]->forward表示該節點在第x層的下一個節點。跳躍表節點都會出現在最低層的鏈表里,所以都會有level[0],通過level[0]->forward實現跳躍表正向遍歷,zrange*系列命令就是以此實現的。

level[x]->span表示該節點到第x層的下一個節點跳躍了多少節點。span主要是為了zrank、zrevrank服務,普通跳躍表的實現里是非必要的。

zskiplist的定義
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;//跳躍表頭尾節點
    unsigned long length;//節點個數
    int level;//除頭結點外最大的層數
} zskiplist;

zskiplist的頭結點不是一個有效的節點,它有ZSKIPLIST_MAXLEVEL層(32層),每層的forward指向該層跳躍表的第一個節點,若沒有則為null。
btw:跳躍表節點的層數限制在了32,若想超過32層得連續32次拋硬幣都得到正面,這得有足夠多的節點,redis限定了拋硬幣正面的概率為1/4,所以到達32層的概率為(1/2)^64,一般一臺64位的計算機能擁有的最大內存也無法存儲這么多zskiplistNode,所以對于基本使用 32層的上限已經足夠高了,再高也沒必要 浪費頭節點的內存。

最終zskiplist內存布局如下:

也可以去掉不必要的部分(backward、obj),作出如下抽象后的圖:

下文我們都通過簡化圖來分析跳躍表的基本操作流程。

zskiplist相關接口 創建一個zskiplist 時間復雜度O(1)

創建一個zskiplist就是創建一個高度為ZSKIPLIST_MAXLEVEL(32)的頭結點,score為0,member為null。
代碼如下:

zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//畫圖,32level的頭結點
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        //頭結點每個level的下一個節點都初始化為null,跨度為0
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

//為指定高度的節點分配空間并賦值,insert操作也要用到
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));//柔性數組成員
    zn->score = score;
    zn->obj = obj;
    return zn;
}

執行完zslCreate后會得到如下布局:

zskiplist插入一個節點 時間復雜度O(logn)

插入一個節點時需要做以下工作

要找到新節點的插入位置,只需像上文介紹的那樣,從高層向低層找即可。在找的過程中用update[]數組記錄每一層插入位置的上一個節點,用rank[]數組記錄每一層插入位置的上一個節點在跳躍表中的排名。根據update[]插入新節點,插入完成后再根據rank[]更新跨度信息即可。
ps:redis允許節點有重復的score,當score相同時根據member(代碼里的obj指向的字符串)的字典序來排名。

/* Returns a random level for the new skiplist node we are going to create.
 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
 * (both inclusive), with a powerlaw-alike distribution where higher
 * levels are less likely to be returned. */
int zslRandomLevel(void) {//返回一個隨機的層數,不是level的索引是層數
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//有1/4的概率加入到上一層中
        level += 1;
    return (levelheader;//x用于迭代zskiplistNode
    for (i = zsl->level-1; i >= 0; i--) {//從最高層向最底層查詢,找到合適的插入位置
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];//記錄每一層插入節點的上一個節點的排名
        while (x->level[i].forward &&//當前層的下一個節點存在
            (x->level[i].forward->score < score ||//下一個節點的分數小于需要插入的分數
                (x->level[i].forward->score == score &&//score相同的情況下,根據member字符串的大小來比較(二進制安全的memcmp)
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            rank[i] += x->level[i].span;//每層的跨度
            x = x->level[i].forward;//下一個節點
        }
        update[i] = x;//當前層的最后一個小于score的節點
    }
    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {//大于之前跳躍表的高度所以沒有記錄update[i],因為插入的節點有這么高所以要修改這些頭結點的信息
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;//高出部分的頭結點在還沒插入當前節點時跨度應該是整張表,插入之后會重新更新這個值
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,obj);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        //rank[0]是x在第0層的上一個節點的實際排名,rank[i]是x在第i層的上一個節點的實際排名,它們倆的差值為x在第i層的上一個節點與x之間的距離
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;//尾節點
    zsl->length++;
    return x;
}

假如往圖2插入一個score為7的節點,則會按照下圖方式所示進行:
找到插入位置(藍色的線表示查找路徑):

假設層數為3,將節點7插入進skiplist并修改附近節點的相關屬性:

zskiplist里的刪除操作
釋放一個節點的內存 時間復雜度O(1):
void zslFreeNode(zskiplistNode *node) {
    decrRefCount(node->obj);//member的引用計數-1,防止內存泄漏
    zfree(node);
}
釋放整個skiplist的內存 時間復雜度O(n):
void zslFree(zskiplist *zsl) {
    //任何一個節點一定有level[0],所以迭代level[0]來刪除所有節點
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header);
    while(node) {
        next = node->level[0].forward;
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}
從skiplist中刪除并釋放掉一個節點 時間復雜度O(logn):

主要分為以下3個步驟:

根據member(obj)和score找到節點的位置(代碼里變量x即為該節點,update記錄每層x的上一個節點)

調動zslDeleteNode把x節點從skiplist邏輯上刪除

釋放x節點內存。

/* Delete an element with matching score/object from the skiplist. */
//從skiplist邏輯上刪除一個節點并釋放該節點的內存
int zslDelete(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0)))
            x = x->level[i].forward;
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;//要刪除的節點
    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
        zslDeleteNode(zsl, x, update);
        zslFreeNode(x);//obj的引用計數-1并釋放節點內存
        return 1;
    }
    return 0; /* not found */
}
/* Internal function used by zslDelete, zslDeleteRangeByScore and zslDeleteRangeByRank and zslDeleteRangeByLex*/
//從skiplist邏輯上刪除一個節點(不釋放內存,僅改變節點位置關系)
//x為要刪除的節點
//update為每一層x的上一個節點(為了更新x上一個節點的forward和span屬性)
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {//當前層有x節點
            update[i]->level[i].span += x->level[i].span - 1;//...=x->level[i].span;
            update[i]->level[i].forward = x->level[i].forward;//跨過x節點
        } else {//當前層沒有x節點
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {//是否是tail節點
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)//刪除了最高層數的節點
        zsl->level--;
    zsl->length--;
}
根據范圍刪除節點 時間復雜度O(log(n)+m), m是范圍內元素的個數

zslDeleteRangeByScore //刪除 滿足score范圍 的節點

zslDeleteRangeByRank //刪除 滿足排名范圍 的節點

zslDeleteRangeByLex //在所有節點的score相同的skiplist中,刪除 滿足member字典序范圍 的節點

代碼懶得貼了

范圍查找操作 時間復雜度O(log(n)+m), m是范圍內元素的個數

根據查找范圍的類型 zskiplist查找可以分為三類:

rank范圍查找

score范圍查找

member范圍查找

rank范圍查找 zrangeGenericCommand

功能:給定一個zero-based排名范圍(start,end),從zskiplist中找出滿足該范圍的所有節點。
zrangeGenericCommand函數考慮了一些與客戶端交互的內容,學zskiplist的時候沒必要細看,它主要分為以下兩步:

1.調用zslGetElementByRank找到排名start+1的節點·······O(logn)

2.從這個節點開始遍歷(end-start+1)個節點·······O(m)

下面是zslGetElementByRank的代碼:

/* Finds an element by its rank. The rank argument needs to be 1-based. */
//O(logn)
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) { //從高層向底層累加span直到累加的值等于rank
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
        {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        if (traversed == rank) {
            return x;
        }
    }
    return NULL;
}
score范圍查找 genericZrangebyscoreCommand

功能:給定一個score的范圍,從zskiplist中找出滿足該score范圍的所有節點。
為了方便的表示score范圍的開閉區間,redis在server.h里聲明了一個表示zset分數區間的類型zrangespec,關于score范圍查找的相關函數都會用到它:

/* Struct to hold a inclusive/exclusive range spec by score comparison. */
typedef struct {
    double min, max;
    //是否是開閉區間,1為開,0位閉
    int minex, maxex; /* are min or max exclusive? */
} zrangespec;

判斷一個score與zrangespec區間內最小值、最大值的關系:

//給定value是否>(>=)此范圍的下界
//gte(greater than or equal to)
static int zslValueGteMin(double value, zrangespec *spec) {
    return spec->minex ? (value > spec->min) : (value >= spec->min);
}
//給定value是否<(<=)此范圍的上界
//lte(less than or equal to)
int zslValueLteMax(double value, zrangespec *spec) {
    return spec->maxex ? (value < spec->max) : (value <= spec->max);
}

根據上述兩個函數,就可以用O(1)時間復雜度判斷一個zskiplist的score是否與zrangespec分數區間有交集:

/* Returns if there is a part of the zset is in range. */
//用O(1)的時間復雜度判斷zset(zsl)的分數范圍是否與給定分數范圍(range)有交集。
//
//range(6,9]  zsl{1,2,3,4,5} 或zsl{10,12,13} 都是不在范圍內
//
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
    zskiplistNode *x;

    /* Test for ranges that will always be empty. */
    if (range->min > range->max ||
            (range->min == range->max && (range->minex || range->maxex)))
        return 0;
    x = zsl->tail;
    if (x == NULL || !zslValueGteMin(x->score,range))//尾節點小于范圍下界
        return 0;
    x = zsl->header->level[0].forward;
    if (x == NULL || !zslValueLteMax(x->score,range))//頭節點大于范圍上界
        return 0;
    return 1;//在
}

genericZrangebyscoreCommand函數也考慮了很多與客戶端交互的內容,就學習底層跳躍表實現時沒必要細看,我們只需要知道底層是如何做到的即可,主要執行如下步驟:

1.調用zslParseRange把客戶端傳過來的范圍min、max轉換成zrangespec區間類型 保存在range變量里。

2.調用zslFirstInRange找到zskiplist中滿足range條件的最小節點。(假設是正序的范圍查找)·······O(logn)

3.從這個節點開始遍歷,直到調用zslValueLteMax()找到最后一個小于range條件上界的節點。·······O(m)

放一下核心的代碼zslFirstInRange:

/* Find the first node that is contained in the specified range.
 * Returns NULL when no element is contained in the range. */
//滿足range條件最小的那個 O(logn)
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
    zskiplistNode *x;
    int i;

    /* If everything is out of range, return early. */
    if (!zslIsInRange(zsl,range)) return NULL;//保證下面的邏輯一定能找到范圍內的節點

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* Go forward while *OUT* of range. */
        while (x->level[i].forward &&
            !zslValueGteMin(x->level[i].forward->score,range))
                x = x->level[i].forward;
    }

    /* This is an inner range, so the next node cannot be NULL. */
    x = x->level[0].forward;
    serverAssert(x != NULL);

    /* Check if score <= max. */
    if (!zslValueLteMax(x->score,range)) return NULL;
    return x;
}
member范圍查找 genericZrangebylexCommand

功能:在一個所有節點的score都相同的zskiplist中,找到滿足member字符串字典序范圍的所有節點。
底層用memcmp比較兩個字符串的大小。實現的流程與genericZrangebyscoreCommand很像,有興趣再看。

其他

zslgetrank 根據member和score獲得節點在該skiplist中的rank
zslParseRange 把客戶端傳過來的范圍min、max轉換成zrangespec區間類型 返回給range參數。
...

為什么不用紅黑樹作為zset底層實現?

其實作者Antirez已經給出了答復:
https://news.ycombinator.com/item?id=1171423
劃重點:They are not very memory intensive. It"s up to you basically.
既然取決于自己,skiplist實現簡單就選它了。至于可能的好處和壞處大概整理了一下有這些:
缺點:

比紅黑樹占用更多的內存,每個節點的大小取決于該節點的層數

空間局部性較差導致緩存命中率低,感覺上會比紅黑樹更慢

優點:

實現比紅黑樹簡單

比紅黑樹更容易擴展,作者之后實現zrank指令時沒怎么改動代碼。

紅黑樹插入刪除時為了平衡高度需要旋轉附近節點,高并發時需要鎖。skiplist不需要考慮。

一般用zset的操作都是執行zrange之類的操作,取出一片連續的節點。這些操作的緩存命中率不會比紅黑樹低。

參考資料

Skip Lists: A Probabilistic Alternative to Balanced Trees
Skip Lists: Done Right

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/28308.html

相關文章

  • Redis源碼分析-壓縮列ziplist

    摘要:用不用作為分界是因為是的值,它用于判斷是否到達尾部。這種類型的節點的大小介于與之間,但是為了表示整數,取出低四位之后會將其作為實際的值。當小于字節時,節點存為上圖的第二種類型,高位為,后續位表示的長度。 // 文中引用的代碼來源于Redis3.2 前言 Redis是基于內存的nosql,有些場景下為了節省內存redis會用時間換空間。ziplist就是很典型的例子。 介紹 ziplis...

    RyanQ 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<