• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            牽著老婆滿街逛

            嚴以律己,寬以待人. 三思而后行.
            GMail/GTalk: yanglinbo#google.com;
            MSN/Email: tx7do#yahoo.com.cn;
            QQ: 3 0 3 3 9 6 9 2 0 .

            無鎖(lock-free)數據結構

            轉載自:http://www.limodev.cn/blog/archives/494


            系統程序員成長計劃-并發(五)

            轉載時請注明出處和作者聯系方式
            文章出處:http://www.limodev.cn/blog
            作者聯系方式:李先靜 <xianjimli@gmail.com>

            無鎖(lock-free)數據結構

                  提到并行計算通常都會想到加鎖,事實卻并非如此,大多數并發是不需要加鎖的。比如在不同電腦上運行的代碼編輯器,兩者并發運行不需要加鎖。在一臺電腦上同時運行的媒體播放放器和代碼編輯器,兩者并發運行不需要加鎖(當然系統調用和進程調度是要加鎖的)。在同一個進程中運行多個線程,如果各自處理獨立的事情也不需要加鎖(當然系統調用、進程調度和內存分配是要加鎖的)。在以上這些情況里,各個并發實體之間沒有共享數據,所以雖然并發運行但不需要加鎖。

            多線程并發運行時,雖然有共享數據,如果所有線程只是讀取共享數據而不修改它,也是不用加鎖的,比如代碼段就是共享的“數據”,每個線程都會讀取,但是不用加鎖。排除所有這些情況,多線程之間有共享數據,有的線程要修改這些共享數據,有的線程要讀取這些共享數據,這才是程序員需要關注的情況,也是本節我們討論的范圍。

            在并發的環境里,加鎖可以保護共享的數據,但是加鎖也會存在一些問題:

            • 由于臨界區無法并發運行,進入臨界區就需要等待,加鎖帶來效率的降低。
            • 在復雜的情況下,很容易造成死鎖,并發實體之間無止境的互相等待。
            • 在中斷/信號處理函數中不能加鎖,給并發處理帶來困難。
            • 優先級倒置造成實時系統不能正常工作。低級優先進程拿到高優先級進程需要的鎖,結果是高/低優先級的進程都無法運行,中等優先級的進程可能在狂跑。

            由于并發與加鎖(互斥)的矛盾關系,無鎖數據結構自然成為程序員關注的焦點,這也是本節要介紹的:



                  CPU提供的原子操作

            大約在七八年前,我們用apache的xerces來解析XML文件,奇怪的是多線程反而比單線程慢。他們找了很久也沒有找出原因,只是證實使用多進程代替多線程會快一個數量級,在Windows上他們就使用了多進程的方式。后來移植到linux時候,我發現xerces每創建一個結點都會去更新一些全局的統計信息,比如把結點的總數加一,它使用的pthread_mutex實現互斥。這就是問題所在:一個XML文檔有數以萬計的結點,以50個線程并發為例,每個線程解析一個XML文檔,總共要進行上百萬次的加鎖/解鎖,幾乎所有線程都在等待,你說能快得了嗎?

            當時我知道Windows下有InterlockedIncrement之類的函數,它們利用CPU一些特殊指令,保證對整數的基本操作是原子的。查找了一些資源發現Linux下也有類似的函數,后來我把所有加鎖去掉,換成這些原子操作,速度比多進程運行還快了幾倍。下面我們看++和—的原子操作在IA架構上的實現:

            #define ATOMIC_SMP_LOCK "lock ; "
            typedef 
            struct volatile int counter; } atomic_t;

            static __inline__ void atomic_inc(atomic_t *v)
            {
                __asm__ __volatile__(
                    ATOMIC_SMP_LOCK 
            "incl %0"
                    :
            "=m" (v->counter)
                    :
            "m" (v->counter));
            }


            static __inline__ void atomic_dec(atomic_t *v)
            {
                __asm__ __volatile__(
                    ATOMIC_SMP_LOCK 
            "decl %0"
                    :
            "=m" (v->counter)
                    :
            "m" (v->counter));
            }


            單入單出的循環隊列。單入單出的循環隊列是一種特殊情況,雖然特殊但是很實用,重要的是它不需要加鎖。這里的單入是指只有一個線程向隊列里追加數據(push),單出只是指只有一個線程從隊列里取數據(pop),循環隊列與普通隊列相比,不同之處在于它的最大數據儲存量是事先固定好的,不能動態增長。盡管有這些限制它的應用還是相當廣泛的。這我們介紹一下它的實現:

            數據下定義如下:

            typedef struct _FifoRing
            {
                
            int r_cursor;
                
            int w_cursor;
                size_t length;
                
            void* data[0];

            }
            FifoRing;

            r_cursor指向隊列頭,用于取數據(pop)。w_cursor指向隊列尾,用于追加數據(push)。length表示隊列的最大數據儲存量,data表示存放的數據,[0]在這里表示變長的緩沖區(前面我們已經講過)。


            創建函數
            FifoRing* fifo_ring_create(size_t length)
            {
                FifoRing
            * thiz = NULL;

                return_val_if_fail(length 
            > 1, NULL);

                thiz 
            = (FifoRing*)malloc(sizeof(FifoRing) + length * sizeof(void*));

                
            if(thiz != NULL)
                
            {
                    thiz
            ->r_cursor = 0;
                    thiz
            ->w_cursor = 0;
                    thiz
            ->length   = length;
                }


                
            return thiz;
            }


            這里我們要求隊列的長度大于1而不是大于0,為什么呢?排除長度為1的隊列沒有什么意義的原因外,更重要的原因是隊列頭與隊列尾重疊 (r_cursor= =w_cursor) 時,到底表示是滿隊列還是空隊列?這個要搞清楚才行,上次一個同事犯了這個錯誤,讓我們查了很久。這里我們認為隊列頭與隊列尾重疊時表示隊列為空,這與隊列初始狀態一致,后面在寫的時候始終保留一個空位,避免隊列頭與隊列尾重疊,這樣可以消除歧義了。

            追加數據(push)

            Ret fifo_ring_push(FifoRing* thiz, void* data)
            {
                
            int w_cursor = 0;
                Ret ret 
            = RET_FAIL;
                return_val_if_fail(thiz 
            != NULL, RET_FAIL);

                w_cursor 
            = (thiz->w_cursor + 1% thiz->length;

                
            if(w_cursor != thiz->r_cursor)
                
            {
                    thiz
            ->data[thiz->w_cursor] = data;
                    thiz
            ->w_cursor = w_cursor;

                    ret 
            = RET_OK;
                }


                
            return ret;
            }

            隊列頭和隊列尾之間還有一個以上的空位時就追加數據,否則返回失敗。

            取數據(pop)

            Ret fifo_ring_pop(FifoRing* thiz, void** data)
            {
                Ret ret 
            = RET_FAIL;
                return_val_if_fail(thiz 
            != NULL && data != NULL, RET_FAIL);

                
            if(thiz->r_cursor != thiz->w_cursor)
                
            {
                    
            *data = thiz->data[thiz->r_cursor];
                    thiz
            ->r_cursor = (thiz->r_cursor + 1)%thiz->length;

                    ret 
            = RET_OK;
                }


                
            return ret;
            }

            隊列頭和隊列尾不重疊表示隊列不為空,取數據并移動隊列頭。



                  單寫多讀的無鎖數據結構
                  
            單寫表示只有一個線程去修改共享數據結構,多讀表示有多個線程去讀取共享數據結構。前面介紹的讀寫鎖可以有效的解決這個問題,但更高效的辦法是使用無鎖數據結構。思路如下:

            就像為了避免顯示閃爍而使用的雙緩沖一樣,我們使用兩份數據結構,一份數據結構用于讀取,所有線程都可以在不加鎖的情況下讀取這個數據結構。另外一份數據結構用于修改,由于只有一個線程會修改它,所以也不用加鎖。

            在修改之后,我們再交換讀/寫的兩個函數結構,把另外一份也修改過來,這樣兩個數據結構就一致了。在交換時要保證沒有線程在讀取,所以我們還需要一個讀線程的引用計數。現在我們看看怎么把前面寫的雙向鏈表改為單寫多讀的無鎖數據結構。

            為了保證交換是原子的,我們需要一個新的原子操作CAS(compare and swap)。

            #define CAS(_a, _o, _n)                                    \
            (
            { __typeof__(_o) __o = _o;                                \
               __asm__ __volatile__(                                   \
                   
            "lock cmpxchg %3,%1"                                \
                   : 
            "=a" (__o), "=m" (*(volatile unsigned int *)(_a)) \
                   :  
            "0" (__o), "r" (_n) );                           \
               __o;                                                    \
            }
            )

            數據結構

            typedef struct _SwmrDList
            {
                atomic_t rd_index_and_ref;
                DList
            * dlists[2];
            }
            SwmrDList;

            兩個鏈表,一個用于讀一個用于寫。rd_index_and_ref的最高字節記錄用于讀取的雙向鏈表的索引,低24位用于記錄讀取線程的引用記數,最大支持16777216個線程同時讀取,應該是足夠了,所以后面不考慮它的溢出。

            讀取操作

            int      swmr_dlist_find(SwmrDList* thiz, DListDataCompareFunc cmp, void* ctx)
            {
                
            int ret = 0;
                return_val_if_fail(thiz 
            != NULL && thiz->dlists != NULL, -1);

                atomic_inc(
            &(thiz->rd_index_and_ref));
                size_t rd_index 
            = (thiz->rd_index_and_ref.counter>>24& 0x1;
                ret 
            = dlist_find(thiz->dlists[rd_index], cmp, ctx);
                atomic_dec(
            &(thiz->rd_index_and_ref));

                
            return ret;
            }

            修改操作

            Ret swmr_dlist_insert(SwmrDList* thiz, size_t index, void* data)
            {
                Ret ret 
            = RET_FAIL;
                DList
            * wr_dlist = NULL;
                return_val_if_fail(thiz 
            != NULL && thiz->dlists != NULL, ret);

                size_t wr_index 
            = !((thiz->rd_index_and_ref.counter>>24& 0x1);
                
            if((ret = dlist_insert(thiz->dlists[wr_index], index, data)) == RET_OK)
                
            {
                    
            int rd_index_old = thiz->rd_index_and_ref.counter & 0xFF000000;
                    
            int rd_index_new = wr_index << 24;

                    
            do
                    
            {
                        usleep(
            100);
                    }
            while(CAS(&(thiz->rd_index_and_ref), rd_index_old, rd_index_new));

                    wr_index 
            = rd_index_old>>24;
                    ret 
            = dlist_insert(thiz->dlists[wr_index], index, data);
                }


                
            return ret;
            }

            先修改用于修改的雙向鏈表,修改完成之后等到沒有線程讀取時,交換讀/寫兩個鏈表,再修改另一個鏈表,此時兩個鏈表狀態保持一致。

            稍做改進,對修改的操作進行加鎖,就可以支持多讀多寫的數據結構,讀是無鎖的,寫是加鎖的。


                  真正的無鎖數據結構
                  Andrei Alexandrescu的《Lock-FreeDataStructures》估計是這方面最經典的論文了,對他的方法我開始感到驚奇后來感到失望,驚奇的是算法的巧妙,失望的是無鎖的限制和代價。作者最后說這種數據結構只適用于WRRMBNTM(Write-Rarely-Read-Many -But-Not-Too-Many)的情況。而且每次修改都要拷貝整個數據結構(甚至多次),所以不要指望這種方法能帶來多少性能上的提高,唯一的好處是能避免加鎖帶來的部分副作用。有興趣的朋友可以看下這篇論文,這里我就不重復了。

            本節示例請到這里下載。

            posted on 2010-10-31 16:16 楊粼波 閱讀(1311) 評論(0)  編輯 收藏 引用

            亚洲午夜久久久久妓女影院| 大伊人青草狠狠久久| 亚洲乱码日产精品a级毛片久久 | 久久婷婷久久一区二区三区 | 亚洲午夜久久影院| yy6080久久| 精品久久久久久中文字幕| 精品久久久久久久久久中文字幕| 久久91精品国产91| 99久久国产热无码精品免费 | 色欲综合久久中文字幕网| 国产一久久香蕉国产线看观看| 久久精品夜色噜噜亚洲A∨| 婷婷综合久久中文字幕蜜桃三电影| 91性高湖久久久久| 嫩草伊人久久精品少妇AV| 久久久久国产亚洲AV麻豆| 久久精品aⅴ无码中文字字幕不卡| 久久无码av三级| 91精品国产综合久久婷婷| 久久久久久久综合狠狠综合| 丁香狠狠色婷婷久久综合| 麻豆av久久av盛宴av| 久久久国产精华液| 久久久国产精品福利免费| 久久国产色AV免费看| 国内精品久久人妻互换| 亚洲国产精品一区二区三区久久| 99久久精品免费| 91精品国产高清久久久久久国产嫩草| 无码精品久久久久久人妻中字| 国产免费久久精品99re丫y| 久久亚洲精品无码播放| 久久久久久久国产免费看| 精品国产婷婷久久久| 久久精品国产精品亚洲下载 | 91精品国产综合久久久久久| 久久久黄色大片| 欧美精品一区二区久久| 国产一区二区久久久| 亚洲乱码精品久久久久..|