• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            牽著老婆滿街逛

            嚴以律己,寬以待人. 三思而后行.
            GMail/GTalk: yanglinbo#google.com;
            MSN/Email: tx7do#yahoo.com.cn;
            QQ: 3 0 3 3 9 6 9 2 0 .

            透過 Linux 內核看無鎖編程

            轉載自:http://www.ibm.com/developerworks/cn/linux/l-cn-lockfree/index.html

            非阻塞型同步 (Non-blocking Synchronization) 簡介

            如何正確有效的保護共享數據是編寫并行程序必須面臨的一個難題,通常的手段就是同步。同步可分為阻塞型同步(Blocking Synchronization)和非阻塞型同步( Non-blocking Synchronization)。

            阻塞型同步是指當一個線程到達臨界區時,因另外一個線程已經持有訪問該共享數據的鎖,從而不能獲取鎖資源而阻塞,直到另外一個線程釋放鎖。常見的同步原語有 mutex、semaphore 等。如果同步方案采用不當,就會造成死鎖(deadlock),活鎖(livelock)和優先級反轉(priority inversion),以及效率低下等現象。

            為了降低風險程度和提高程序運行效率,業界提出了不采用鎖的同步方案,依照這種設計思路設計的算法稱為非阻塞型算法,其本質特征就是停止一個線程的執行不會阻礙系統中其他執行實體的運行。

            當今比較流行的 Non-blocking Synchronization 實現方案有三種:

            1. Wait-free

              Wait-free 是指任意線程的任何操作都可以在有限步之內結束,而不用關心其它線程的執行速度。 Wait-free 是基于 per-thread 的,可以認為是 starvation-free 的。非常遺憾的是實際情況并非如此,采用 Wait-free 的程序并不能保證 starvation-free,同時內存消耗也隨線程數量而線性增長。目前只有極少數的非阻塞算法實現了這一點。

            2. Lock-free

              Lock-Free 是指能夠確保執行它的所有線程中至少有一個能夠繼續往下執行。由于每個線程不是 starvation-free 的,即有些線程可能會被任意地延遲,然而在每一步都至少有一個線程能夠往下執行,因此系統作為一個整體是在持續執行的,可以認為是 system-wide 的。所有 Wait-free 的算法都是 Lock-Free 的。

            3. Obstruction-free

              Obstruction-free 是指在任何時間點,一個孤立運行線程的每一個操作可以在有限步之內結束。只要沒有競爭,線程就可以持續運行。一旦共享數據被修改,Obstruction-free 要求中止已經完成的部分操作,并進行回滾。 所有 Lock-Free 的算法都是 Obstruction-free 的。

            綜上所述,不難得出 Obstruction-free 是 Non-blocking synchronization 中性能最差的,而 Wait-free 性能是最好的,但實現難度也是最大的,因此 Lock-free 算法開始被重視,并廣泛運用于當今正在運行的程序中,比如 linux 內核。

            一般采用原子級的 read-modify-write 原語來實現 Lock-Free 算法,其中 LL 和 SC 是 Lock-Free 理論研究領域的理想原語,但實現這些原語需要 CPU 指令的支持,非常遺憾的是目前沒有任何 CPU 直接實現了 SC 原語。根據此理論,業界在原子操作的基礎上提出了著名的 CAS(Compare - And - Swap)操作來實現 Lock-Free 算法,Intel 實現了一條類似該操作的指令:cmpxchg8。

            CAS 原語負責將某處內存地址的值(1 個字節)與一個期望值進行比較,如果相等,則將該內存地址處的值替換為新值,CAS 操作偽碼描述如下:


            清單 1. CAS 偽碼
                        Bool CAS(T* addr, T expected, T newValue)
                        {
                        if( *addr == expected )
                        {
                        *addr =  newValue;
                        return true;
                        }
                        else
                        return false;
                        }
                        

            在實際開發過程中,利用 CAS 進行同步,代碼如下所示:


            清單 2. CAS 實際操作
                        do{
                        備份舊數據;
                        基于舊數據構造新數據;
                        }while(!CAS( 內存地址,備份的舊數據,新數據 ))
                        

            就是指當兩者進行比較時,如果相等,則證明共享數據沒有被修改,替換成新值,然后繼續往下運行;如果不相等,說明共享數據已經被修改,放棄已經所做的操作,然后重新執行剛才的操作。容易看出 CAS 操作是基于共享數據不會被修改的假設,采用了類似于數據庫的 commit-retry 的模式。當同步沖突出現的機會很少時,這種假設能帶來較大的性能提升。


            加鎖的層級

            根據復雜程度、加鎖粒度及運行速度,可以得出如下圖所示的鎖層級:


            圖 1. 加鎖層級
            圖 1. 加鎖層級 

            其中標注為紅色字體的方案為 Blocking synchronization,黑色字體為 Non-blocking synchronization。Lock-based 和 Lockless-based 兩者之間的區別僅僅是加鎖粒度的不同。圖中最底層的方案就是大家經常使用的 mutex 和 semaphore 等方案,代碼復雜度低,但運行效率也最低。


            Linux 內核中的無鎖分析

            Linux 內核可能是當今最大最復雜的并行程序之一,它的并行主要來至于中斷、內核搶占及 SMP 等。內核設計者們為了不斷提高 Linux 內核的效率,從全局著眼,逐步廢棄了大內核鎖來降低鎖的粒度;從細處下手,不斷對局部代碼進行優化,用無鎖編程替代基于鎖的方案,如 seqlock 及 RCU 等;不斷減少鎖沖突程度、降低等待時間,如 Double-checked locking 和原子鎖等。

            內核無鎖第一層級 — 少鎖

            無論什么時候當臨界區中的代碼僅僅需要加鎖一次,同時當其獲取鎖的時候必須是線程安全的,此時就可以利用 Double-checked Locking 模式來減少鎖競爭和加鎖載荷。目前 Double-checked Locking 已經廣泛應用于單例 (Singleton) 模式中。內核設計者基于此思想,巧妙的將 Double-checked Locking 方法運用于內核代碼中。

            當一個進程已經僵死,即進程處于 TASK_ZOMBIE 狀態,如果父進程調用 waitpid() 系統調用時,父進程需要為子進程做一些清理性的工作,代碼如下所示:


            清單 3. 少鎖操作
                        984   static int wait_task_zombie(task_t *p, int noreap,
                        985           struct siginfo __user *infop,
                        986           int __user *stat_addr, struct rusage __user *ru)
                        987   {
                        ……
                        1103       if (p->real_parent != p->parent) {
                        1104           write_lock_irq(&tasklist_lock);
                        1105           /* Double-check with lock held.  */
                        1106           if (p->real_parent != p->parent) {
                        1107               __ptrace_unlink(p);
                        1108               // TODO: is this safe?
                        1109               p->exit_state = EXIT_ZOMBIE;
                        ……
                        1120           }
                        1121           write_unlock_irq(&tasklist_lock);
                        1122      }
                        ……
                        1127  }
                        

            如果將 write_lock_irq 放置于 1103 行之前,鎖的范圍過大,鎖的負載也會加重,影響效率;如果將加鎖的代碼放到判斷里面,且沒有 1106 行的代碼,程序會正確嗎?在單核情況下是正確的,但在雙核情況下問題就出現了。一個非主進程在一個 CPU 上運行,正準備調用 exit 退出,此時主進程在另外一個 CPU 上運行,在子進程調用 release_task 函數之前調用上述代碼。子進程在 exit_notify 函數中,先持有讀寫鎖 tasklist_lock,調用 forget_original_parent。主進程運行到 1104 處,由于此時子進程先持有該鎖,所以父進程只好等待。在 forget_original_parent 函數中,如果該子進程還有子進程,則會調用 reparent_thread(),將執行 p->parent = p->real_parent; 語句,導致兩者相等,等非主進程釋放讀寫鎖 tasklist_lock 時,另外一個 CPU 上的主進程被喚醒,一旦開始執行,繼續運行將會導致 bug。

            嚴格的說,Double-checked locking 不屬于無鎖編程的范疇,但由原來的每次加鎖訪問到大多數情況下無須加鎖,就是一個巨大的進步。同時從這里也可以看出一點端倪,內核開發者為了降低鎖沖突率,減少等待時間,提高運行效率,一直在持續不斷的進行改進。

            內核無鎖第二層級 — 原子鎖

            原子操作可以保證指令以原子的方式執行——執行過程不被打斷。內核提供了兩組原子操作接口:一組針對于整數進行操作,另外一組針對于單獨的位進行操作。內核中的原子操作通常是內聯函數,一般是通過內嵌匯編指令來完成。對于一些簡單的需求,例如全局統計、引用計數等等,可以歸結為是對整數的原子計算。

            內核無鎖第三層級 — Lock-free

            1. Lock-free 應用場景一 —— Spin Lock

            Spin Lock 是一種輕量級的同步方法,一種非阻塞鎖。當 lock 操作被阻塞時,并不是把自己掛到一個等待隊列,而是死循環 CPU 空轉等待其他線程釋放鎖。 Spin lock 鎖實現代碼如下:


            清單 4. spin lock 實現代碼
                        static inline void __preempt_spin_lock(spinlock_t *lock)
                        {
                        ……
                        do {
                        preempt_enable();
                        while (spin_is_locked(lock))
                        cpu_relax();
                        preempt_disable();
                        } while (!_raw_spin_trylock(lock));
                        }
                        static inline int _raw_spin_trylock(spinlock_t *lock)
                        {
                        char oldval;
                        __asm__ __volatile__(
                        "xchgb %b0,%1"
                        :"=q" (oldval), "=m" (lock->lock)
                        :"0" (0) : "memory");
                        return oldval > 0;
                        }
                        

            匯編語言指令 xchgb 原子性的交換 8 位 oldval( 存 0) 和 lock->lock 的值,如果 oldval 為 1(lock 初始值為 1),則獲取鎖成功,反之,則繼續循環,接著 relax 休息一會兒,然后繼續周而復始,直到成功。

            對于應用程序來說,希望任何時候都能獲取到鎖,也就是期望 lock->lock 為 1,那么用 CAS 原語來描述 _raw_spin_trylock(lock) 就是 CAS(lock->lock,1,0);

            如果同步操作總是能在數條指令內完成,那么使用 Spin Lock 會比傳統的 mutex lock 快一個數量級。Spin Lock 多用于多核系統中,適合于鎖持有時間小于將一個線程阻塞和喚醒所需時間的場合。

            pthread 庫已經提供了對 spin lock 的支持,所以用戶態程序也能很方便的使用 spin lock 了,需要包含 pthread.h 。在某些場景下,pthread_spin_lock 效率是 pthread_mutex_lock 效率的一倍多。美中不足的是,內核實現了讀寫 spin lock 鎖,但 pthread 未能實現。

            2. Lock -free 應用場景二 —— Seqlock

            手表最主要最常用的功能是讀時間,而不是校正時間,一旦后者成了最常用的功能,消費者肯定不會買賬。計算機的時鐘也是這個功能,修改時間是小概率事件,而讀時間是經常發生的行為。以下代碼摘自 2.4.34 內核:


            清單 5. 2.4.34 seqlock 實現代碼
                        443 void do_gettimeofday(struct timeval *tv)
                        444 {
                        ……
                        448         read_lock_irqsave(&xtime_lock, flags);
                        ……
                        455         sec = xtime.tv_sec;
                        456         usec += xtime.tv_usec;
                        457         read_unlock_irqrestore(&xtime_lock, flags);
                        ……
                        466 }
                        468 void do_settimeofday(struct timeval *tv)
                        469 {
                        470         write_lock_irq(&xtime_lock);
                        ……
                        490         write_unlock_irq(&xtime_lock);
                        491 }
                        

            不難發現獲取時間和修改時間采用的是 spin lock 讀寫鎖,讀鎖和寫鎖具有相同的優先級,只要讀持有鎖,寫鎖就必須等待,反之亦然。

            Linux 2.6 內核中引入一種新型鎖——順序鎖 (seqlock),它與 spin lock 讀寫鎖非常相似,只是它為寫者賦予了較高的優先級。也就是說,即使讀者正在讀的時候也允許寫者繼續運行。當存在多個讀者和少數寫者共享一把鎖時,seqlock 便有了用武之地,因為 seqlock 對寫者更有利,只要沒有其他寫者,寫鎖總能獲取成功。根據 lock-free 和時鐘功能的思想,內核開發者在 2.6 內核中,將上述讀寫鎖修改成了順序鎖 seqlock,代碼如下:


            清單 6. 2.6.10 seqlock 實現代碼
                        static inline unsigned read_seqbegin(const seqlock_t *sl)
                        {
                        unsigned ret = sl->sequence;
                        smp_rmb();
                        return ret;
                        }
                        static inline int read_seqretry(const seqlock_t *sl, unsigned iv)
                        {
                        smp_rmb();
                        return (iv & 1) | (sl->sequence ^ iv);
                        }
                        static inline void write_seqlock(seqlock_t *sl)
                        {
                        spin_lock(&sl->lock);
                        ++sl->sequence;
                        smp_wmb();
                        }
                        void do_gettimeofday(struct timeval *tv)
                        {
                        unsigned long seq;
                        unsigned long usec, sec;
                        unsigned long max_ntp_tick;
                        ……
                        do {
                        unsigned long lost;
                        seq = read_seqbegin(&xtime_lock);
                        ……
                        sec = xtime.tv_sec;
                        usec += (xtime.tv_nsec / 1000);
                        } while (read_seqretry(&xtime_lock, seq));
                        ……
                        tv->tv_sec = sec;
                        tv->tv_usec = usec;
                        }
                        int do_settimeofday(struct timespec *tv)
                        {
                        ……
                        write_seqlock_irq(&xtime_lock);
                        ……
                        write_sequnlock_irq(&xtime_lock);
                        clock_was_set();
                        return 0;
                        }
                        

            Seqlock 實現原理是依賴一個序列計數器,當寫者寫入數據時,會得到一把鎖,并且將序列值加 1。當讀者讀取數據之前和之后,該序列號都會被讀取,如果讀取的序列號值都相同,則表明寫沒有發生。反之,表明發生過寫事件,則放棄已進行的操作,重新循環一次,直至成功。不難看出,do_gettimeofday 函數里面的 while 循環和接下來的兩行賦值操作就是 CAS 操作。

            采用順序鎖 seqlock 好處就是寫者永遠不會等待,缺點就是有些時候讀者不得不反復多次讀相同的數據直到它獲得有效的副本。當要保護的臨界區很小,很簡單,頻繁讀取而寫入很少發生(WRRM--- Write Rarely Read Mostly)且必須快速時,就可以使用 seqlock。但 seqlock 不能保護包含有指針的數據結構,因為當寫者修改數據結構時,讀者可能會訪問一個無效的指針。

            3. Lock -free 應用場景三 —— RCU

            在 2.6 內核中,開發者還引入了一種新的無鎖機制 -RCU(Read-Copy-Update),允許多個讀者和寫者并發執行。RCU 技術的核心是寫操作分為寫和更新兩步,允許讀操作在任何時候無阻礙的運行,換句話說,就是通過延遲寫來提高同步性能。RCU 主要應用于 WRRM 場景,但它對可保護的數據結構做了一些限定:RCU 只保護被動態分配并通過指針引用的數據結構,同時讀寫控制路徑不能有睡眠。以下數組動態增長代碼摘自 2.4.34 內核:

            清單 7. 2.4.34 RCU 實現代碼

            其中 ipc_lock 是讀者,grow_ary 是寫者,不論是讀或者寫,都需要加 spin lock 對被保護的數據結構進行訪問。改變數組大小是小概率事件,而讀取是大概率事件,同時被保護的數據結構是指針,滿足 RCU 運用場景。以下代碼摘自 2.6.10 內核:


            清單 8. 2.6.10 RCU 實現代碼
                        #define rcu_read_lock()    preempt_disable()
                        #define rcu_read_unlock()  preempt_enable()
                        #define rcu_assign_pointer(p, v)  ({ \
                        smp_wmb(); \
                        (p) = (v); \
                        })
                        struct kern_ipc_perm* ipc_lock(struct ipc_ids* ids, int id)
                        {
                        ……
                        rcu_read_lock();
                        entries = rcu_dereference(ids->entries);
                        if(lid >= entries->size) {
                        rcu_read_unlock();
                        return NULL;
                        }
                        out = entries->p[lid];
                        if(out == NULL) {
                        rcu_read_unlock();
                        return NULL;
                        }
                        ……
                        return out;
                        }
                        static int grow_ary(struct ipc_ids* ids, int newsize)
                        {
                        struct ipc_id_ary* new;
                        struct ipc_id_ary* old;
                        ……
                        new = ipc_rcu_alloc(sizeof(struct kern_ipc_perm *)*newsize +
                        sizeof(struct ipc_id_ary));
                        if(new == NULL)
                        return size;
                        new->size = newsize;
                        memcpy(new->p, ids->entries->p, sizeof(struct kern_ipc_perm *)*size
                        +sizeof(struct ipc_id_ary));
                        for(i=size;i<newsize;i++) {
                        new->p[i] = NULL;
                        }
                        old = ids->entries;
                        /*
                        * Use rcu_assign_pointer() to make sure the memcpyed contents
                        * of the new array are visible before the new array becomes visible.
                        */
                        rcu_assign_pointer(ids->entries, new);
                        ipc_rcu_putref(old);
                        return newsize;
                        }
                        

            縱觀整個流程,寫者除內核屏障外,幾乎沒有一把鎖。當寫者需要更新數據結構時,首先復制該數據結構,申請 new 內存,然后對副本進行修改,調用 memcpy 將原數組的內容拷貝到 new 中,同時對擴大的那部分賦新值,修改完畢后,寫者調用 rcu_assign_pointer 修改相關數據結構的指針,使之指向被修改后的新副本,整個寫操作一氣呵成,其中修改指針值的操作屬于原子操作。在數據結構被寫者修改后,需要調用內存屏障 smp_wmb,讓其他 CPU 知曉已更新的指針值,否則會導致 SMP 環境下的 bug。當所有潛在的讀者都執行完成后,調用 call_rcu 釋放舊副本。同 Spin lock 一樣,RCU 同步技術主要適用于 SMP 環境。

            內核無鎖第四層級 — 免鎖

            環形緩沖區是生產者和消費者模型中常用的數據結構。生產者將數據放入數組的尾端,而消費者從數組的另一端移走數據,當達到數組的尾部時,生產者繞回到數組的頭部。

            如果只有一個生產者和一個消費者,那么就可以做到免鎖訪問環形緩沖區(Ring Buffer)。寫入索引只允許生產者訪問并修改,只要寫入者在更新索引之前將新的值保存到緩沖區中,則讀者將始終看到一致的數據結構。同理,讀取索引也只允許消費者訪問并修改。


            圖 2. 環形緩沖區實現原理圖
            圖 2. 環形緩沖區實現原理圖 

            如圖所示,當讀者和寫者指針相等時,表明緩沖區是空的,而只要寫入指針在讀取指針后面時,表明緩沖區已滿。


            清單 9. 2.6.10 環形緩沖區實現代碼
                        /*
                        * __kfifo_put - puts some data into the FIFO, no locking version
                        * Note that with only one concurrent reader and one concurrent
                        * writer, you don't need extra locking to use these functions.
                        */
                        unsigned int __kfifo_put(struct kfifo *fifo,
                        unsigned char *buffer, unsigned int len)
                        {
                        unsigned int l;
                        len = min(len, fifo->size - fifo->in + fifo->out);
                        /* first put the data starting from fifo->in to buffer end */
                        l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
                        memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
                        /* then put the rest (if any) at the beginning of the buffer */
                        memcpy(fifo->buffer, buffer + l, len - l);
                        fifo->in += len;
                        return len;
                        }
                        /*
                        * __kfifo_get - gets some data from the FIFO, no locking version
                        * Note that with only one concurrent reader and one concurrent
                        * writer, you don't need extra locking to use these functions.
                        */
                        unsigned int __kfifo_get(struct kfifo *fifo,
                        unsigned char *buffer, unsigned int len)
                        {
                        unsigned int l;
                        len = min(len, fifo->in - fifo->out);
                        /* first get the data from fifo->out until the end of the buffer */
                        l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
                        memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
                        /* then get the rest (if any) from the beginning of the buffer */
                        memcpy(buffer + l, fifo->buffer, len - l);
                        fifo->out += len;
                        return len;
                        }
                        

            以上代碼摘自 2.6.10 內核,通過代碼的注釋(斜體部分)可以看出,當只有一個消費者和一個生產者時,可以不用添加任何額外的鎖,就能達到對共享數據的訪問。


            總結

            通過對比 2.4 和 2.6 內核代碼,不得不佩服內核開發者的智慧,為了提高內核性能,一直不斷的進行各種優化,并將業界最新的 lock-free 理念運用到內核中。

            在實際開發過程中,進行無鎖設計時,首先進行場景分析,因為每種無鎖方案都有特定的應用場景,接著根據場景分析進行數據結構的初步設計,然后根據先前的分析結果進行并發模型建模,最后在調整數據結構的設計,以便達到最優。


             

            參考資料

            關于作者

            楊小華,目前從事 Linux 內核方面的研究,喜歡搗鼓 Linux 系統,對 Linux 中斷系統比較了解。可以通過 normalnotebook@126.com 與他取得聯系。


             

            posted on 2010-10-31 16:31 楊粼波 閱讀(1176) 評論(0)  編輯 收藏 引用

            日韩人妻无码精品久久久不卡| 青青青伊人色综合久久| 亚洲国产精品久久久久婷婷软件| 一本久久综合亚洲鲁鲁五月天亚洲欧美一区二区 | segui久久国产精品| 国产欧美久久久精品| 久久人人爽爽爽人久久久| 久久人人爽人人爽人人AV东京热 | 国产成人精品白浆久久69| 精品多毛少妇人妻AV免费久久| 国内精品伊人久久久影院| 合区精品久久久中文字幕一区| 久久久亚洲精品蜜桃臀| 一本色综合久久| 久久久久久久波多野结衣高潮| 无码超乳爆乳中文字幕久久| 新狼窝色AV性久久久久久| 久久精品国产亚洲av麻豆色欲 | 国产99久久久久久免费看| 精品久久国产一区二区三区香蕉 | 久久久久久久亚洲Av无码| 久久精品九九亚洲精品| 伊人久久免费视频| 久久精品国产第一区二区| 亚洲精品成人久久久| 久久永久免费人妻精品下载| 久久精品国产亚洲欧美| 久久综合久久伊人| 久久精品国产亚洲AV无码麻豆| 国产叼嘿久久精品久久| 97精品依人久久久大香线蕉97| 国产精品久久久久久久久免费| 久久福利片| 久久亚洲中文字幕精品有坂深雪| 久久99热国产这有精品| 亚洲国产天堂久久久久久| 一本一本久久A久久综合精品| 无码日韩人妻精品久久蜜桃| 亚洲一区二区三区日本久久九| 久久香综合精品久久伊人| 久久本道伊人久久|