• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            風雨兼程

            ring my bells
            posts - 49, comments - 14, trackbacks - 0, articles - 0

            內核同步機制(Linux 2.4.x)

            Posted on 2008-04-10 23:07 silentneil 閱讀(182) 評論(0)  編輯 收藏 引用
            本文將Linux內核中用于同步的幾種機制集中起來分析,強調了它們之間在實現和使用上的不同。

            同步通常是為了達到多線程協同的目的而設計的一種機制,通常包含異步信號機制和互斥機制作為其實現的底層。在Linux 2.4內核中也有相應的技術實現,包括信號量、自旋鎖、原子操作和等待隊列,其中原子操作和等待隊列又是實現信號量的底層。

            等待隊列和異步信號

            wait queue很早就作為一個基本的功能單位出現在Linux內核里了,它以隊列為基礎數據結構,與進程調度機制緊密結合,能夠用于實現核心的異步事件通知機制。我們從它的使用范例著手,看看等待隊列是如何實現異步信號功能的。

            在核心運行過程中,經常會因為某些條件不滿足而需要掛起當前線程,直至條件滿足了才繼續執行。在2.4內核中提供了一組新接口來實現這樣的功能,下面的代碼節選自kernel/printk.c:

                unsigned long log_size;
            1:  DECLARE_WAIT_QUEUE_HEAD(log_wait);...
            4:  spinlock_t console_lock = SPIN_LOCK_UNLOCKED;...
                int do_syslog(int type,char *buf,int len){
                    ...
            2:      error=wait_event_interruptible(log_wait,log_size);
                    if(error)
                         goto out;
                    ...
            5:      spin_lock_irq(&console_lock);
                ...
                    log_size--;
                    ...
            6:    spin_unlock_irq(&console_lock);
                    ...
                }
                asmlinkage int printk(const char *fmt,...){
                ...
            7:    spin_lock_irqsave(&console_lock,flags);
                    ...
                    log_size++;...
            8:    spin_unlock_irqrestore(&console_lock,flags);
            3:      wake_up_interruptible(&log_wait);
                    ...
                }
               


            這段代碼實現了printk調用和syslog之間的同步,syslog需要等待printk送數據到緩沖區,因此,在2:處等待log_size非0;而 printk一邊傳送數據,一邊增加log_size的值,完成后喚醒在log_wait上等待的所有線程(這個線程不是用戶空間的線程概念,而是核內的一個執行序列)。執行了3:的wake_up_interruptible()后,2:處的wait_event_interruptible()返回 0,從而進入syslog的實際動作。

            1:是定義log_wait全局變量的宏調用。

            在實際操作log_size全局變量的時候,還使用了spin_lock自旋鎖來實現互斥,關于自旋鎖,這里暫不作解釋,但從這段代碼中已經可以清楚的知道它的使用方法了。

            所有wait queue使用上的技巧體現在wait_event_interruptible()的實現上,代碼位于include/linux/sched.h中,前置數字表示行號:

            779 #define __wait_event_interruptible(wq, condition, ret)                  \
            780 do {                                                                    \
            781         wait_queue_t __wait;                                            \
            782         init_waitqueue_entry(&__wait, current);                         \
            783                                                                         \
            784         add_wait_queue(&wq, &__wait);                                   \
            785         for (;;) {                                                      \
            786                 set_current_state(TASK_INTERRUPTIBLE);                  \
            787                 if (condition)                                          \
            788                         break;                                          \
            789                 if (!signal_pending(current)) {                         \
            790                         schedule();                                     \
            791                         continue;                                       \
            792                 }                                                       \
            793                 ret = -ERESTARTSYS;                                     \
            794                 break;                                                  \
            795         }                                                               \
            796         current->state = TASK_RUNNING;                                  \
            797         remove_wait_queue(&wq, &__wait);                                \
            798 } while (0)
            799        
            800 #define wait_event_interruptible(wq, condition)                         \
            801 ({                                                                      \
            802         int __ret = 0;                                                  \
            803         if (!(condition))                                               \
            804                 __wait_event_interruptible(wq, condition, __ret);       \
            805         __ret;                                                          \
            806 })
             


            在wait_event_interruptible ()中首先判斷condition是不是已經滿足,如果是則直接返回0,否則調用__wait_event_interruptible(),并用 __ret來存放返回值。__wait_event_interruptible()首先定義并初始化一個wait_queue_t變量__wait,其中數據為當前進程結構current(struct task_struct),并把__wait入隊。在無限循環中,__wait_event_interruptible()將本進程置為可中斷的掛起狀態,反復檢查condition是否成立,如果成立則退出,如果不成立則繼續休眠;條件滿足后,即把本進程運行狀態置為運行態,并將__wait從等待隊列中清除掉,從而進程能夠調度運行。如果進程當前有異步信號(POSIX的),則返回-ERESTARTSYS。

            掛起的進程并不會自動轉入運行的,因此,還需要一個喚醒動作,這個動作由wake_up_interruptible()完成,它將遍歷作為參數傳入的 log_wait等待隊列,將其中所有的元素(通常都是task_struct)置為運行態,從而可被調度到,執行 __wait_event_interruptible()中的代碼。

            DECLARE_WAIT_QUEUE_HEAD(log_wait)經過宏展開后就是定義了一個log_wait等待隊列頭變量:

            struct __wait_queue_head log_wait = {
                lock:    SPIN_LOCK_UNLOCKED,
                task_list:      { &log_wait.task_list, &log_wait.task_list }
            }
             


            其中task_list是struct list_head變量,包括兩個list_head指針,一個next、一個prev,這里把它們初始化為自身,屬于隊列實現上的技巧,其細節可以參閱關于內核list數據結構的討論,add_wait_queue()和remove_wait_queue()就等同于list_add()和 list_del()。

            wait_queue_t結構在include/linux/wait.h中定義,關鍵元素即為一個struct task_struct變量表征當前進程。

            除了wait_event_interruptible()/wake_up_interruptible()以外,與此相對應的還有wait_event ()和wake_up()接口,interruptible是更安全、更常用的選擇,因為可中斷的等待可以接收信號,從而掛起的進程允許被外界kill。

            wait_event* ()接口是2.4內核引入并推薦使用的,在此之前,最常用的等待操作是interruptible_sleep_on (wait_queue_head_t *wq),當然,與此配套的還有不可中斷版本sleep_on(),另外,還有帶有超時控制的*sleep_on_timeout()。sleep_on 系列函數的語義比wait_event簡單,沒有條件判斷功能,其余動作與wait_event完全相同,也就是說,我們可以用 interruptible_sleep_on()來實現wait_event_interruptible()(僅作示意〉:

            do{
                interruptible_sleep_on(&log_wait);
                    if(condition)
                    break;
            }while(1);
             


            相對而言,這種操作序列有反復的入隊、出隊動作,更加耗時,而很大一部分等待操作的確是需要判斷一個條件是否滿足的,因此2.4才推薦使用wait_event接口。

            在wake_up系列接口中,還有一類wake_up_sync()和wake_up_interruptible_sync()接口,保證調度在wake_up返回之后進行。




                回頁首


            原子操作和信號量

            POSIX 有信號量,SysV IPC有信號量,核內也有信號量,接口很簡單,一個down(),一個up(),分別對應P操作和V操作,down()調用可能引起線程掛起,因此和 sleep_on類似,也有interruptible系列接口。down意味著信號量減1,up意味著信號量加1,這兩個操作顯然需要互斥。在 Linux 2.4中,并沒有如想象中的用鎖實現,而是使用了原子操作。

            在include/asm/atomic.h中定義了一系列原子操作,包括原子讀、原子寫、原子加等等,大多是直接用匯編語句來實現的,這里就不詳細解釋。

            我們從信號量數據結構開始,它定義在include/asm/semaphore.h中:

            struct semaphore {
                    atomic_t count;
                    int sleepers;
                    wait_queue_head_t wait;
            }
             


            down ()操作可以理解為申請資源,up()操作可以理解為釋放資源,因此,信號量實際表示的是資源的數量以及是否有進程正在等待。在semaphore結構中,count相當于資源計數,為正數或0時表示可用資源數,-1則表示沒有空閑資源且有等待進程。而等待進程的數量并不關心。這種設計主要是考慮與信號量的原語相一致,當某個進程執行up()函數釋放資源,點亮信號燈時,如果count恢復到0,則表示尚有進程在等待該資源,因此執行喚醒操作。一個典型的down()-up()流程是這樣的:

            down()-->count做原子減1操作,如果結果不小于0則表示成功申請,從down()中返回;
            -->如果結果為負(實際上只可能是-1),則表示需要等待,則調用__down_fail();
            __down_fail()調用__down(),__down()用C代碼實現,要求已不如down()和__down_fail()嚴格,在此作實際的等待(arch/i386/kernel/semaphore.c):

            void __down(struct semaphore * sem)
            {
                    struct task_struct *tsk = current;
                    DECLARE_WAITQUEUE(wait, tsk);
                    tsk->state = TASK_UNINTERRUPTIBLE;
                    add_wait_queue_exclusive(&sem->wait, &wait);
                    spin_lock_irq(&semaphore_lock);
                    sem->sleepers++;
                    for (;;) {
                            int sleepers = sem->sleepers;
                            /*
                             * Add "everybody else" into it. They aren't
                             * playing, because we own the spinlock.
                             */
                            if (!atomic_add_negative(sleepers - 1, &sem->count)) {
                                    sem->sleepers = 0;
                                    break;
                            }
                            sem->sleepers = 1;      /* us - see -1 above */
                            spin_unlock_irq(&semaphore_lock);
                            schedule();
                            tsk->state = TASK_UNINTERRUPTIBLE;
                            spin_lock_irq(&semaphore_lock);
                    }
                    spin_unlock_irq(&semaphore_lock);
                    remove_wait_queue(&sem->wait, &wait);
                    tsk->state = TASK_RUNNING;
                    wake_up(&sem->wait);
            }
             


            __down()-->當前進程進入wait等待隊列,狀態為不可中斷的掛起,sleepers++,如果這是第一次申請失敗,則sleepers值為1,否則為2--這個設置純粹是為了下面這句原子加而安排的。

            在真正進入休眠以前,__down()還是需要判斷一下是不是確實沒有資源可用,因為在spin_lock之前什么都可能發生。 atomic_add_negative()將sleepers-1(只可能是0或者1,分別表示僅有一個等待進程或是多個)加到count(如果有多個進程申請資源失敗進入__down(),count可能為-2、-3等)之上,這個加法完成后,結果為0只可能是在sleepers等于1的時候發生(因為如果sleepers等于2,表示有多個進程執行了down(),則count必然小于-1,因此sleepers-1+count必然小于0),表示 count在此之前已經變為0了,也就是說已經有進程釋放了資源,因此本進程不用休眠而是獲得資源退出__down(),從而也從down()中返回;如果沒有進程釋放資源,那么在所有等待進程的這一加法完成后,count將等于-1。因此,從down()調用外面看(無論是在down()中休眠還是獲得資源離開down()),count為負時只可能為-1(此時sleepers等于1),這么設計使得up()操作只需要對count加1,判斷是否為0 就可以知道是否有必要執行喚醒操作__up_wakeup()了。

            獲得了資源的進程將把sleepers設為 0,并喚醒所有其他等待進程,這個操作實際上只是起到恢復count為-1,并使它們再次進入休眠的作用,因為第一個被喚醒的等待進程執行 atomic_add_negative()操作后會將count恢復為-1,然后將sleepers置為1;以后的等待進程則會像往常一樣重新休眠。

            將down()操作設計得如此復雜的原因和結果就是up操作相當簡單。up()利用匯編原子地將count加1,如果小于等于0表示有等待進程,則調用__up_wakeup()-->__up()喚醒wait;否則直接返回。

            在down()中競爭獲得資源的進程并不是按照優先級排序的,只是在up()操作完成后第一個被喚醒或者正在__down()中運行而暫未進入休眠的進程成功的可能性稍高一些。

            盡管可以將信號量的count初始化為1從而實現一種互斥鎖(mutex),但Linux并不保證這個count不會超過1,因為up操作并不考慮 count的初值,所以只能依靠程序員自己來控制不要無謂的執行up()從而破壞mutex的語義。相關的初始化接口定義在 include/asm/semaphore.h中,但一般程序員可以通過sema_init()接口來初始化信號量:

            #define DECLARE_MUTEX(name) __DECLARE_SEMAPHORE_GENERIC(name,1)
            #define DECLARE_MUTEX_LOCKED(name) __DECLARE_SEMAPHORE_GENERIC(name,0)
            static inline void sema_init (struct semaphore *sem, int val)
            static inline void init_MUTEX (struct semaphore *sem)
            static inline void init_MUTEX_LOCKED (struct semaphore *sem)
             


            除了down()以外,Linux還提供了一個down_interruptible(),操作序列與down()基本相同,僅在休眠狀態為可中斷和信號處理上有所不同。在標準的信號量以外,還有一套讀寫信號量,用于將資源的讀寫區分開來進行同步以提高效率,采用讀寫鎖來實現,有興趣的可以參閱文后列出的參考資料。




                回頁首


            自旋鎖

            鎖是一個概念,正如上面提到的mutex互斥鎖僅僅是其中的一種。互斥鎖被鎖定時進入休眠,而系統還能正常運轉,但有很多時候,鎖應該不僅僅互斥訪問,甚至應該讓系統掛起,直至鎖成功,也就是說在鎖操作中"自旋",這就是Linux中的spinlock機制。

            從實現上來說,自旋鎖比較簡單,主要分為兩部分,一部分是中斷處理,一部分是自旋處理,最基礎的部分在spin_lock_string和spin_unlock_string這兩段匯編代碼中:

            #define spin_lock_string \
                    "\n1:\t" \
                    "lock ; decb %0\n\t" \
                    "js 2f\n" \
                    ".section .text.lock,\"ax\"\n" \
                    "2:\t" \
                    "cmpb $0,%0\n\t" \
                    "rep;nop\n\t" \
                    "jle 2b\n\t" \
                    "jmp 1b\n" \
                    ".previous"
            #define spin_unlock_string \
                    "movb $1,%0"
             


            不詳細解釋這段匯編代碼的語義,spin_lock_string對鎖原子減1,循環檢查鎖值,直到鎖值大于0;而spin_unlock_string則是對鎖賦值1。spin_lock_string用于構成spin_lock()函數,spin_unlock_string用于構成 spin_unlock()函數。

            spin_lock()/spin_unlock()構成了自旋鎖機制的基礎,它們和關中斷local_irq_disable()/開中斷local_irq_enable()、關bh local_bh_disable()/開bh local_bh_enable()、關中斷并保存狀態字local_irq_save()/開中斷并恢復狀態字local_irq_restore() 結合就形成了整套自旋鎖機制,接口定義在include/linux/spinlock.h中,這里就不列舉了。

            實際上,以上提到的spin_lock()都是在CONFIG_SMP的前提下才會生成的,也就是說,在單處理機系統中,spin_lock()是一條空語句,因為在處理機執行它的語句時,不可能受到打擾,語句肯定是串行的。在這種簡單情況下,spin_lock_irq()就只需要鎖中斷就可以完成任務了。在include/linux/spinlock.h中就用#ifdef CONFIG_SMP來區分兩種不同的情況。

            自旋鎖有很多種,信號量也可以用來構成互斥鎖,原子操作也有鎖功能,而且還有與標準鎖機制類似的讀寫鎖變種,在不同的應用場合應該選擇不同的鎖,下一節就是介紹如何選擇。




                回頁首


            鎖的使用

            1.用戶上下文之間

            如果所訪問的共享資源僅在用戶上下文中使用,最高效的辦法就是使用信號量。在net/core/netfilter.c中有一處使用信號量的例子:

            static DECLARE_MUTEX(nf_sockopt_mutex);
            int nf_register_sockopt(struct nf_sockopt_ops *reg)
            {
            ...
                    if (down_interruptible(&nf_sockopt_mutex) != 0)
                            return -EINTR;
            ...
            out:
                    up(&nf_sockopt_mutex);
                    return ret;
            }
             


            2.用戶上下文與bottom half之間

            此時有兩種情況需要使用鎖,一是用戶上下文被bottom half中斷,二是多個處理機同時進入一個臨界段。一般情況下,使用spin_lock_bh()/spin_unlock_bh()可以滿足要求,它將關閉當前CPU的bottom half,然后再獲取鎖,直至離開臨界段再釋放并對bottom half重新使能。

            3.用戶上下文與軟中斷(Tasklet)之間

            tasklet與bottom half的實現機制是一樣的,實際上spin_lock_bh()也同時關閉了tasklet的執行,因此,在這種情況下,用戶上下文與tasklet之間的同步也使用spin_lock_bh()/spin_unlock_bh()。

            4.bottom half之間

            bottom half本身的機制就保證了不會有多于1個的bottom half同時處于運行態,即使對于SMP系統也是如此,因此,在設計共享數據的bottom half時無需考慮互斥。

            5.tasklet之間

            tasklet 和bottom half類似,也是受到local_bh_disable()保護的,因此,同一個tasklet不會同時在兩個CPU上運行;但不同的tasklet卻有可能,因此,如果需要同步不同的tasklet訪問共享數據的話,就應該使用spin_lock()/spin_unlock()。正如上面提到的,這種保護僅對SMP系統有意義,UP系統中tasklet的運行不會受到另一個tasklet(不論它是否與之相同)的打擾,因此也就沒有必要上鎖。

            6.softirq之間

            softirq 是實現tasklet和bottom half的基礎,限制較后二者都少,允許兩個softirq同時運行于不同的CPU之上,而不論它們是否來自同一個softirq代碼,因此,在這種情況下,都需要用spin_lock()/spin_unlock()來同步。

            7.硬中斷和軟中斷之間

            硬中斷是指硬件中斷的處理程序上下文,軟中斷包括softirq和在它基礎上實現的tasklet和bottom half等,此時,為了防止硬件中斷軟中斷的運行,同步措施必須包括關閉硬件中斷,spin_lock_irq()/spin_unlock_irq() 就包括這個動作。還有一對API,spin_lock_irqsave()/spin_unlock_irqrestore(),不僅關閉中斷,還保存機器狀態字,并在打開中斷時恢復。

            8.其他注意事項

            首先需要提醒的是"死鎖",這在操作系統原理的課本上都做過介紹,無論是使用信號量還是使用自旋鎖,都有可能產生死鎖,特別是自旋鎖,如果死鎖在spin_lock上,整個系統就會掛起。如何避免死鎖是理論課的問題,這里就不多說了。

            另外一點就是盡可能短時間的鎖定,因此,"對數據上鎖,而不是對代碼上鎖"就成了一個簡單的原則;在有可能的情況下,使用讀寫鎖,而不要總是使用互斥鎖;對讀寫排序,使用原子操作,從而完全避免使用鎖,也是一個不錯的設計思想。

            不要在鎖定狀態下調用可能引起休眠的操作,以下這些操作就是目前可能因此休眠的函數:

               1. 對用戶內存的訪問:copy_from_user()、copy_to_user()、get_user()、put_user()
               2. kmalloc(GFP_KERNEL)
               3. down_interruptible()和down(),如果需要在spinlock中使用信號量,可以選擇down_trylock(),它不會引起掛起 printk()的靈巧設計使得它不會掛起,因此可以在任何上下文中使用。


            摘自: http://baoguanghua.blog.sohu.com/79808640.html
            99久久精品免费看国产一区二区三区| 色综合久久天天综合| 久久综合亚洲色HEZYO社区| 少妇熟女久久综合网色欲| 久久久一本精品99久久精品66| 日本精品久久久久中文字幕8| 久久精品无码一区二区日韩AV| 久久国产亚洲精品| 久久99热国产这有精品| 四虎国产精品成人免费久久| 99久久国产综合精品麻豆| 亚洲精品高清一二区久久| 久久久久99精品成人片欧美| 久久国产免费直播| 99精品久久精品| 亚洲va久久久噜噜噜久久狠狠| 国产AⅤ精品一区二区三区久久| 久久亚洲日韩精品一区二区三区| 国产精品免费看久久久香蕉| 久久国产精品无码一区二区三区 | 久久精品二区| 久久精品国产亚洲av麻豆小说| 精品久久久久久久久久中文字幕| 久久ZYZ资源站无码中文动漫 | 久久久九九有精品国产| 亚洲国产成人久久笫一页| 久久青青草原精品影院| 国产美女久久精品香蕉69| 7777久久久国产精品消防器材 | 国产亚洲精品久久久久秋霞| 国产精品成人99久久久久 | 久久人人爽爽爽人久久久| 久久久国产99久久国产一| 日本精品一区二区久久久| 久久www免费人成看国产片| 久久精品无码一区二区三区日韩| 婷婷久久综合九色综合98| 亚洲国产精品久久久久| 久久久久久综合一区中文字幕 | 国产99久久久久久免费看| 99久久精品国产毛片|