• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            loop_in_codes

            低調做技術__歡迎移步我的獨立博客 codemaro.com 微博 kevinlynx

            使用RCU技術實現讀寫線程無鎖

            在一個系統中有一個寫線程和若干個讀線程,讀寫線程通過一個指針共用了一個數據結構,寫線程改寫這個結構,讀線程讀取該結構。在寫線程改寫這個數據結構的過程中,加鎖情況下讀線程由于等待鎖耗時會增加。

            可以利用RCU (Read Copy Update What is rcu)的思想來去除這個鎖。本文提到的主要實現代碼:gist

            RCU

            RCU可以說是一種替代讀寫鎖的方法。其基于一個事實:當寫線程在改變一個指針時,讀線程獲取這個指針,要么獲取到老的值,要么獲取到新的值。RCU的基本思想其實很簡單,參考What is RCU中Toy implementation可以很容易理解。一種簡單的RCU流程可以描述為:

            寫線程:

            old_ptr = _ptr
            tmp_ptr = copy(_ptr)     // copy
            change(tmp_ptr)          // change 
            _ptr = tmp_ptr           // update
            synchroize(tmp_ptr)
            

            寫線程要更新_ptr指向的內容時,先復制一份新的,基于新的進行改變,更新_ptr指針,最后同步釋放老的內存。

            讀線程:

            tmp_ptr = _ptr
            use(tmp_ptr)
            dereference(tmp_ptr)
            

            讀線程直接使用_ptr,使用完后需要告訴寫線程自己不再使用_ptr。讀線程獲取_ptr時,可能會獲取到老的也可能獲取到新的,無論哪種RCU都需要保證這塊內存是有效的。重點在synchroizedereferencesynchroize會等待所有使用老的_ptr的線程dereference,對于新的_ptr使用者其不需要等待。這個問題說白了就是寫線程如何知道old_ptr沒有任何讀線程在使用,可以安全地釋放。

            這個問題實際上在wait-free的各種實現中有好些解法,how-when-to-release-memory-in-wait-free-algorithms這里有人總結了幾種方法,例如Hazard pointersQuiescence period based reclamation

            簡單地使用引用計數智能指針是無法解決這個問題的,因為智能指針自己不是線程安全的,例如:

            tmp_ptr = _ptr      // 1
            tmp_ptr->addRef()   // 2
            use
            tmp_ptr->release()
            

            代碼1/2行不是原子的,所以當取得tmp_ptr準備addRef時,tmp_ptr可能剛好被釋放了。

            Quiescence period based reclamation方法指的是讀線程需要聲明自己處于Quiescence period,也就是不使用_ptr的時候,當其使用_ptr的時候實際是進入了一個邏輯上的臨界區,當所有讀線程都不再使用_ptr的時候,寫線程就可以對內存進行安全地釋放。

            本文正是描述了一種Quiescence period based reclamation實現。這個實現可以用于有一個寫線程和多個讀線程共用若干個數據的場景。

            實現

            該方法本質上把數據同步分解為基本的內存單元讀寫。使用方式上可描述為:

            讀線程:

            tmp_ptr = _ptr
            use
            update() // 標識自己不再使用任何共享數據
            

            寫線程:

            old_ptr = _ptr
            tmp_ptr = copy(_ptr)
            change(tmp_ptr)
            _ptr = tmp_ptr
            gc()
            defer_free(old_ptr)
            

            以下具體描述讀寫線程的實現。

            寫線程

            寫線程負責標識內存需要被釋放,以及檢查何時可以真正釋放內存。其維護了一個釋放內存隊列:

            void *_pending[8]
                uint64_t _head, _tail
            
                void defer_free(void *p) {
                    _head ++
                    _pending[PENDING_POS(_head)] = p
                }
            
                gc() {
                    for (_tail -> find_free_pos())
                        free(_pending[_tail])
                }

            find_free_pos找到一個可釋放內存位置,在[_tail, find_free_pos())這個區間內所有內存是可以安全被釋放的。

            隊列位置_head/_tail一直增大,PENDING_POS就是對這個位置取模,限定在隊列大小范圍內也是可行的,無論哪種方式,_head從邏輯上說一直>=_tail,但在實際中可能小于_tail,所以實現時不使用大小判定,而是:

            gc() {
                    pos = find_free_pos()
                    while (_tail != pos) {
                        free(_pending[PENDING_POS(_tail)])
                        _tail ++
                    }
                }

            讀線程

            讀線程不再使用共享內存時,就標識自己:

            update() {
                    static __thread int tid
                    _tmark[tid] = _head
                }

            讀線程的狀態會影響寫線程的回收邏輯,其狀態分為:

            • 初始
            • 活躍,會調用到update
            • 暫停,其他地方同步,或被掛起
            • 退出

            讀線程處于活躍狀態時,它會不斷地更新自己可釋放內存位置(_tmark[tid])。寫線程檢查所有讀線程的_tmark[tid][_tail, min(_tmark[]))是所有讀線程都不再使用的內存區間,可以被安全釋放。

            find_free_pos() {
                    min = MAX_INTEGER
                    pos = 0
                    for (tid = 0; tid < max_threads; ++tid) {
                        tpos = _tmark[tid]
                        offset = tpos - tail
                        if (offset < min) {
                            min = offset
                            pos = tpos
                        }
                    }
                    return pos
                }

            當讀線程暫停時,其_tmark[tid]可能會在很長一段時間里得不到更新,此時會阻礙寫線程釋放內存。所以需要方法來標識讀線程是否進入暫停狀態。通過設置一個上次釋放內存位置_tfreeds[tid],標識每個線程當前內存釋放到的位置。如果一個線程處于暫停狀態了,那么在一定時間后,_tfreeds[tid] == _tmark[tid]。在查找可釋放位置時,就需要忽略暫停狀態的讀線程:

            find_free_pos() {
                    min = MAX_INTEGER
                    pos = _head
                    for (tid = 0; tid < max_threads; ++tid) {
                        tpos = _tmark[tid]
                        if (tpos == _tfreeds[tid]) continue
                        offset = tpos - tail
                        if (offset < min) {
                            min = offset
                            pos = tpos
                        }
                    }
                    for (tid = 0; tid < max_threads; ++tid) {
                        if (_tfreeds[tid] != _tmark[tid]) 
                            _tfreeds[tid] = pos
                    }
                    return pos
                }

            但是當所有線程都處于暫停狀態時,寫線程可能還在工作,上面的實現就會返回_head,此時寫線程依然可以正常釋放內存。

            小結,該方法原理可用下圖表示:

            線程動態增加/減少

            如果讀線程可能中途退出,中途動態增加,那么_tmark[]就需要被復用,此時線程tid的分配調整為動態的即可:

            class ThreadIdPool {
                public:
                    // 動態獲取一個線程tid,某線程每次調用該接口返回相同的值
                    int get()
                    // 線程退出時回收該tid
                    void put(int id)
                }

            ThreadIdPool的實現無非就是利用TLS,以及在線程退出時得到通知以回收tid。那么對于讀線程的update實現變為:

            update() {
                    tid = _idPool->get()
                    _tmark[tid] = _head
                }

            當某個線程退出時,_tmark[tid]_tfreeds[tid]不需要做任何處理,當新創建的線程復用了該tid時,可以立即復用_tmark[tid]_tfreeds[tid],此時這2個值必然是相等的。

            以上,就是整個方法的實現。

            線程可讀可寫

            以上方法適用場景還是不夠通用。在nbds項目(實現了一些無鎖數據結構的toy project)中有一份雖然簡單但也有啟發的實現(rcu.c)。該實現支持任意線程defer_free,所有線程updateupdate除了聲明不再使用任何共享內存外,還可能回收內存。任意線程都可能維護一些待釋放的內存,任意一塊內存可能被任意其他線程使用。那么它是如何內存回收的?

            本文描述的方法是所有讀線程自己聲明自己,然后由寫線程主動來檢查。不同于此方法, nbds的實現,基于一種通知擴散的方式。該方式以這樣一種方式工作:

            當某個線程嘗試內存回收時,它需要知道所有其他線程的空閑位置(相當于_tmark[tid]),它通知下一個線程我需要釋放的范圍。當下一個線程update時(離開臨界區),它會將上個線程的通知繼續告訴下一個線程,直到最后這個通知回到發起線程。那么對于發起線程而言,這個釋放請求在所有線程中走了一遍,得到了大家的認可,可以安全釋放。每個線程都以這樣的方式工作。

            void rcu_defer_free (void *x) {
                    ...
                    rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
                    ...
                }
            
                void rcu_update (void) {
                    ...
                    for (i = 0; i < num_threads_; ++i) {
                        ...     
                        uint64_t x = rcu_[tid_][i]; // 其它線程發給自己的通知
                        rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x; // 擴散出去
                        ...
                    }
                    ...
                    while (q->tail != rcu_[tid_][tid_]) {
                        free
                    }     
                    ...
                }

            這個實現相對簡單,不支持線程暫停,以及線程動態增加和減少。

            posted on 2015-04-19 19:10 Kevin Lynx 閱讀(11451) 評論(3)  編輯 收藏 引用 所屬分類: c/c++

            評論

            # re: 使用RCU技術實現讀寫線程無鎖[未登錄] 2015-04-22 09:22 春秋十二月

            很喜歡看你的文章,這個實現在商業應用中表現如何?  回復  更多評論   

            # re: 使用RCU技術實現讀寫線程無鎖 2015-04-22 18:29 Kevin Lynx

            @春秋十二月
            我們的一個服務有1W QPS,數據變更時latency會幾十倍波動,用了這個后latency再也不波動了  回復  更多評論   

            # re: 使用RCU技術實現讀寫線程無鎖 2015-10-07 17:43 Abael

            這個原子操作 ?
            atomic_CAS  回復  更多評論   

            亚洲精品无码久久久| 久久精品综合一区二区三区| 久久精品国产亚洲av麻豆图片| 久久久久久无码Av成人影院| 久久青青草原精品国产软件| 久久综合丁香激情久久| 久久精品www| 91久久精品国产91性色也| 思思久久99热只有频精品66| 中文字幕热久久久久久久| 精品人妻伦九区久久AAA片69| 久久精品成人欧美大片| 日韩精品久久无码人妻中文字幕| 亚洲AV日韩精品久久久久久久 | 久久亚洲精品国产精品婷婷| 成人国内精品久久久久影院| 国内精品久久久久久野外| 国产精品一区二区久久精品无码 | 精品久久久久久久中文字幕| 国产精品欧美亚洲韩国日本久久 | 国产精品无码久久久久久| 久久国内免费视频| 久久精品中文字幕一区| 伊人久久亚洲综合影院| 波多野结衣中文字幕久久| 亚洲欧洲久久久精品| 亚洲av日韩精品久久久久久a | 国产精品狼人久久久久影院| 久久精品国产免费观看三人同眠| 久久99国产综合精品| 国产综合免费精品久久久| 亚洲国产成人乱码精品女人久久久不卡| 国产免费福利体检区久久| 久久久WWW免费人成精品| 久久99热狠狠色精品一区| 嫩草伊人久久精品少妇AV| 国内精品久久久久久久久电影网 | 久久久久人妻一区精品色| www.久久99| 伊人色综合久久| 久久99久久成人免费播放|