• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            loop_in_codes

            低調做技術__歡迎移步我的獨立博客 codemaro.com 微博 kevinlynx

            無鎖有序鏈表的實現

            無鎖有序鏈表可以保證元素的唯一性,使其可用于哈希表的桶,甚至直接作為一個效率不那么高的map。普通鏈表的無鎖實現相對簡單點,因為插入元素可以在表頭插,而有序鏈表的插入則是任意位置。

            本文主要基于論文High Performance Dynamic Lock-Free Hash Tables實現。

            主要問題

            鏈表的主要操作包含insertremove,先簡單實現一個版本,就會看到問題所在,以下代碼只用作示例:

            struct node_t {
                    key_t key;
                    value_t val;
                    node_t *next;
                };
            
                int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
                    node_t *pred = head;
                    node_t *item = head->next;
                    while (item) {
                        int d = KEY_CMP(item->key, key);
                        if (d >= 0) {
                            *pred_ptr = pred;
                            *item_ptr = item;
                            return d == 0 ? TRUE : FALSE;
                        }
                        pred = item;
                        item = item->next;
                    } 
                    *pred_ptr = pred;
                    *item_ptr = NULL;
                    return FALSE;
                }
            
                int l_insert(node_t *head, key_t key, value_t val) {
                    node_t *pred, *item, *new_item;
                    while (TRUE) {
                        if (l_find(&pred, &item, head, key)) {
                            return FALSE;
                        }
                        new_item = (node_t*) malloc(sizeof(node_t));
                        new_item->key = key;
                        new_item->val = val;
                        new_item->next = item;
                        // A. 如果pred本身被移除了
                        if (CAS(&pred->next, item, new_item)) {
                            return TRUE;
                        }
                        free(new_item);
                    }
                }
            
                int l_remove(node_t *head, key_t key) {
                    node_t *pred, *item;
                    while (TRUE) {
                        if (!l_find(&pred, &item, head, key)) {
                            return TRUE;
                        }
                        // B. 如果pred被移除;如果item也被移除
                        if (CAS(&pred->next, item, item->next)) {
                            haz_free(item);
                            return TRUE;
                        }
                    }
                }

            l_find函數返回查找到的前序元素和元素本身,代碼A和B雖然拿到了preditem,但在CAS的時候,其可能被其他線程移除。甚至,在l_find過程中,其每一個元素都可能被移除。問題在于,任何時候拿到一個元素時,都不確定其是否還有效。元素的有效性包括其是否還在鏈表中,其指向的內存是否還有效。

            解決方案

            通過為元素指針增加一個有效性標志位,配合CAS操作的互斥性,就可以解決元素有效性判定問題。

            因為node_t放在內存中是會對齊的,所以指向node_t的指針值低幾位是不會用到的,從而可以在低幾位里設置標志,這樣在做CAS的時候,就實現了DCAS的效果,相當于將兩個邏輯上的操作變成了一個原子操作。想象下引用計數對象的線程安全性,其內包裝的指針是線程安全的,但對象本身不是。

            CAS的互斥性,在若干個線程CAS相同的對象時,只有一個線程會成功,失敗的線程就可以以此判定目標對象發生了變更。改進后的代碼(代碼僅做示例用,不保證正確):

            typedef size_t markable_t;
                // 最低位置1,表示元素被刪除
                #define HAS_MARK(p) ((markable_t)p & 0x01)
                #define MARK(p) ((markable_t)p | 0x01)
                #define STRIP_MARK(p) ((markable_t)p & ~0x01)
            
                int l_insert(node_t *head, key_t key, value_t val) {
                    node_t *pred, *item, *new_item;
                    while (TRUE) {
                        if (l_find(&pred, &item, head, key)) { 
                            return FALSE;
                        }
                        new_item = (node_t*) malloc(sizeof(node_t));
                        new_item->key = key;
                        new_item->val = val;
                        new_item->next = item;
                        // A. 雖然find拿到了合法的pred,但是在以下代碼之前pred可能被刪除,此時pred->next被標記
                        //    pred->next != item,該CAS會失敗,失敗后重試
                        if (CAS(&pred->next, item, new_item)) {
                            return TRUE;
                        }
                        free(new_item);
                    }
                    return FALSE;
                }
            
                int l_remove(node_t *head, key_t key) {
                    node_t *pred, *item;
                    while (TRUE) {
                        if (!l_find(&pred, &item, head, key)) {
                            return FALSE;
                        }
                        node_t *inext = item->next;
                        // B. 刪除item前先標記item->next,如果CAS失敗,那么情況同insert一樣,有其他線程在find之后
                        //    刪除了item,失敗后重試
                        if (!CAS(&item->next, inext, MARK(inext))) {
                            continue;
                        }
                        // C. 對同一個元素item刪除時,只會有一個線程成功走到這里
                        if (CAS(&pred->next, item, STRIP_MARK(item->next))) {
                            haz_defer_free(item);
                            return TRUE;
                        }
                    }
                    return FALSE;
                }
            
                int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
                    node_t *pred = head;
                    node_t *item = head->next;
                    hazard_t *hp1 = haz_get(0);
                    hazard_t *hp2 = haz_get(1);
                    while (item) {
                        haz_set_ptr(hp1, pred);
                        haz_set_ptr(hp2, item);
                        /* 
                         如果已被標記,那么緊接著item可能被移除鏈表甚至釋放,所以需要重頭查找
                        */
                        if (HAS_MARK(item->next)) { 
                            return l_find(pred_ptr, item_ptr, head, key);
                        }
                        int d = KEY_CMP(item->key, key);
                        if (d >= 0) {
                            *pred_ptr = pred;
                            *item_ptr = item;
                            return d == 0 ? TRUE : FALSE;
                        }
                        pred = item;
                        item = item->next;
                    } 
                    *pred_ptr = pred;
                    *item_ptr = NULL;
                    return FALSE;
                }

            haz_get、haz_set_ptr之類的函數是一個hazard pointer實現,用于支持多線程下內存的GC。上面的代碼中,要刪除一個元素item時,會標記item->next,從而使得insert時中那個CAS不需要做任何調整??偨Y下這里的線程競爭情況:

            • insertfind到正常的preditempred->next == item,然后在CAS前有線程刪除了pred,此時pred->next == MARK(item),CAS失敗,重試;刪除分為2種情況:a) 從鏈表移除,得到標記,pred可繼續訪問;b) pred可能被釋放內存,此時再使用pred會錯誤。為了處理情況b,所以引入了類似hazard pointer的機制,可以有效保障任意一個指針p只要還有線程在使用它,它的內存就不會被真正釋放
            • insert中有多個線程在pred后插入元素,此時同樣由insert中的CAS保證,這個不多說
            • remove中情況同insert,find拿到了有效的prednext,但在CAS的時候pred被其他線程刪除,此時情況同insertCAS失敗,重試
            • 任何時候改變鏈表結構時,無論是remove還是insert,都需要重試該操作
            • find中遍歷時,可能會遇到被標記刪除的item,此時item根據remove的實現很可能被刪除,所以需要重頭開始遍歷

            ABA問題

            ABA問題還是存在的,insert中:

            if (CAS(&pred->next, item, new_item)) {
                    return TRUE;
                }

            如果CAS之前,pred后的item被移除,又以相同的地址值加進來,但其value變了,此時CAS會成功,但鏈表可能就不是有序的了。pred->val < new_item->val > item->val

            為了解決這個問題,可以利用指針值地址對齊的其他位來存儲一個計數,用于表示pred->next的改變次數。當insert拿到pred時,pred->next中存儲的計數假設是0,CAS之前其他線程移除了pred->next又新增回了item,此時pred->next中的計數增加,從而導致insertCAS失敗。

            // 最低位留作刪除標志
                #define MASK ((sizeof(node_t) - 1) & ~0x01)
            
                #define GET_TAG(p) ((markable_t)p & MASK)
                #define TAG(p, tag) ((markable_t)p | (tag))
                #define MARK(p) ((markable_t)p | 0x01)
                #define HAS_MARK(p) ((markable_t)p & 0x01)
                #define STRIP_MARK(p) ((node_t*)((markable_t)p & ~(MASK | 0x01)))

            remove的實現:

            /* 先標記再刪除 */
                if (!CAS(&sitem->next, inext, MARK(inext))) {
                    continue;
                }
                int tag = GET_TAG(pred->next) + 1;
                if (CAS(&pred->next, item, TAG(STRIP_MARK(sitem->next), tag))) {
                    haz_defer_free(sitem);
                    return TRUE;
                }

            insert中也可以更新pred->next的計數。

            總結

            無鎖的實現,本質上都會依賴于CAS的互斥性。從頭實現一個lock free的數據結構,可以深刻感受到lock free實現的tricky。最終代碼可以從這里github獲取。代碼中為了簡單,實現了一個不是很強大的hazard pointer,可以參考之前的博文

            posted on 2015-05-05 19:47 Kevin Lynx 閱讀(22648) 評論(0)  編輯 收藏 引用 所屬分類: c/c++

            欧美丰满熟妇BBB久久久| AAA级久久久精品无码片| 久久综合九色欧美综合狠狠| 亚洲国产另类久久久精品黑人| 亚洲伊人久久大香线蕉苏妲己| 精品久久久久久中文字幕大豆网| 久久99精品国产99久久| 成人久久免费网站| 色综合久久夜色精品国产| 色偷偷888欧美精品久久久| 久久久久久久精品成人热色戒| 99久久精品免费看国产一区二区三区| 国内精品久久久久久99| 久久精品无码av| 国产激情久久久久影院小草| 777米奇久久最新地址| 亚洲日韩中文无码久久| 久久亚洲国产中v天仙www| 亚洲人成无码久久电影网站| 亚洲午夜精品久久久久久浪潮| 精品国产99久久久久久麻豆| 久久91这里精品国产2020| 国产精品激情综合久久 | 伊人久久大香线焦AV综合影院| 伊人久久大香线蕉影院95| 久久久久久精品免费看SSS| 国产亚洲美女精品久久久| 国产Av激情久久无码天堂| 亚洲中文久久精品无码ww16 | 亚洲国产精品一区二区三区久久| 国内精品久久久久久久97牛牛| 亚洲人成网站999久久久综合| 久久久WWW成人| 99久久99久久精品国产片果冻| 国内精品久久久久久99蜜桃 | 久久久噜噜噜久久| 国产精品久久久久久一区二区三区 | 97久久超碰国产精品2021| 久久精品无码午夜福利理论片| 久久久精品国产sm调教网站| 免费久久人人爽人人爽av|