• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            那誰(shuí)的技術(shù)博客

            感興趣領(lǐng)域:高性能服務(wù)器編程,存儲(chǔ),算法,Linux內(nèi)核
            隨筆 - 210, 文章 - 0, 評(píng)論 - 1183, 引用 - 0
            數(shù)據(jù)加載中……

            Linux下面的線程鎖,條件變量以及信號(hào)量的使用

            一) 線程鎖
            1) 只能用于"鎖"住臨界代碼區(qū)域
            2) 一個(gè)線程加的鎖必須由該線程解鎖.

            鎖幾乎是我們學(xué)習(xí)同步時(shí)最開(kāi)始接觸到的一個(gè)策略,也是最簡(jiǎn)單, 最直白的策略.

            二) 條件變量,與鎖不同, 條件變量用于等待某個(gè)條件被觸發(fā)
            1) 大體使用的偽碼:

            // 線程一代碼
            pthread_mutex_lock(&mutex);
            // 設(shè)置條件為true
            pthread_cond_signal(&cond);
            pthread_mutex_unlock(&mutex);

            // 線程二代碼
            pthread_mutex_lock(&mutex);
            while (條件為false)
                pthread_cond_wait(&cond, &mutex);
            修改該條件
            pthread_mutex_unlock(&mutex);

            需要注意幾點(diǎn):
            1) 第二段代碼之所以在pthread_cond_wait外面包含一個(gè)while循環(huán)不停測(cè)試條件是否成立的原因是, 在pthread_cond_wait被喚醒的時(shí)候可能該條件已經(jīng)不成立.UNPV2對(duì)這個(gè)的描述是:"Notice that when pthread_cond_wait returns, we always test the condition again, because spurious wakeups can occur: a wakeup when the desired condition is still not true.".

            2) pthread_cond_wait調(diào)用必須和某一個(gè)mutex一起調(diào)用, 這個(gè)mutex是在外部進(jìn)行加鎖的mutex, 在調(diào)用pthread_cond_wait時(shí), 內(nèi)部的實(shí)現(xiàn)將首先將這個(gè)mutex解鎖, 然后等待條件變量被喚醒, 如果沒(méi)有被喚醒, 該線程將一直休眠, 也就是說(shuō), 該線程將一直阻塞在這個(gè)pthread_cond_wait調(diào)用中, 而當(dāng)此線程被喚醒時(shí), 將自動(dòng)將這個(gè)mutex加鎖.
            man文檔中對(duì)這部分的說(shuō)明是:
            pthread_cond_wait atomically unlocks the mutex (as per pthread_unlock_mutex) and waits for the condition variable cond to  be  signaled.  The thread execution is suspended and does not consume any CPU time until the condition variable is
            signaled. The mutex must be locked by the calling thread on entrance to pthread_cond_wait.  Before  returning  to  the calling thread, pthread_cond_wait re-acquires mutex (as per pthread_lock_mutex).
            也就是說(shuō)pthread_cond_wait實(shí)際上可以看作是以下幾個(gè)動(dòng)作的合體:
            解鎖線程鎖
            等待條件為true
            加鎖線程鎖.

            這里是使用條件變量的經(jīng)典例子:
            http://www.shnenglu.com/CppExplore/archive/2008/03/20/44949.html
            之所以使用兩個(gè)條件變量, 是因?yàn)橛袃煞N情況需要進(jìn)行保護(hù),使用數(shù)組實(shí)現(xiàn)循環(huán)隊(duì)列,因此一個(gè)條件是在getq函數(shù)中判斷讀寫(xiě)指針相同且可讀數(shù)據(jù)計(jì)數(shù)為0,此時(shí)隊(duì)列為空沒(méi)有數(shù)據(jù)可讀,因此獲取新數(shù)據(jù)的條件變量就一直等待,另一個(gè)條件是讀寫(xiě)指針相同且可讀數(shù)據(jù)計(jì)數(shù)大于0,此時(shí)隊(duì)列滿了不能再添加數(shù)據(jù), 因此添加新數(shù)據(jù)的條件變量就一直等待,而nEmptyThreadNum和nFullThreadNum則是計(jì)數(shù), 只有這個(gè)計(jì)數(shù)大于0時(shí)才會(huì)喚醒相應(yīng)的條件變量,這樣可以減少調(diào)用pthread_cond_signal的次數(shù).
            為了在下面的敘述方便, 我將這段代碼整理在下面, 是一個(gè)可以編譯運(yùn)行的代碼,但是注意需要在編譯時(shí)加上-pthread鏈接線程庫(kù):
            #include <pthread.h>
            #include 
            <stdio.h>
            #include 
            <unistd.h>
            #include 
            <stdlib.h>

            class CThreadQueue
            {
            public:
                CThreadQueue(
            int queueSize=1024):
                    sizeQueue(queueSize),lput(
            0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)
                {
                    pthread_mutex_init(
            &mux,0);
                    pthread_cond_init(
            &condGet,0);
                    pthread_cond_init(
            &condPut,0);
                    buffer
            =new void *[sizeQueue];
                }
                
            virtual ~CThreadQueue()
                {
                    delete[] buffer;
                }
                
            void * getq()
                {
                    
            void *data;
                    pthread_mutex_lock(
            &mux);
                    /*
                     此 處循環(huán)判斷的原因如下:假設(shè)2個(gè)線程在getq阻塞,然后兩者都被激活,而其中一個(gè)線程運(yùn)行比較塊,快速消耗了2個(gè)數(shù)據(jù),另一個(gè)線程醒來(lái)的時(shí)候已經(jīng)沒(méi)有新 數(shù)據(jù)可以消耗了。另一點(diǎn),man pthread_cond_wait可以看到,該函數(shù)可以被信號(hào)中斷返回,此時(shí)返回EINTR。為避免以上任何一點(diǎn),都必須醒來(lái)后再次判斷睡眠條件。更 正:pthread_cond_wait是信號(hào)安全的系統(tǒng)調(diào)用,不會(huì)被信號(hào)中斷。
                    */
                    while(lget==lput&&nData==0)
                    {
                        nEmptyThread
            ++;
                        pthread_cond_wait(
            &condGet,&mux);
                        nEmptyThread
            --;     
                    }

                    data
            =buffer[lget++];
                    nData
            --;
                    
            if(lget==sizeQueue)
                    {
                        lget
            =0;
                    }
                    
            if(nFullThread) //必要時(shí)才進(jìn)行signal操作,勿總是signal
                    {
                        pthread_cond_signal(
            &condPut);    
                    }
                    pthread_mutex_unlock(
            &mux);
                    
            return data;
                }
                
            void putq(void *data)
                {
                    pthread_mutex_lock(
            &mux);
                    
            while(lput==lget&&nData)
                    { 
                        nFullThread
            ++;
                        pthread_cond_wait(
            &condPut,&mux);
                        nFullThread
            --;
                    }
                    buffer[lput
            ++]=data;
                    nData
            ++;
                    
            if(lput==sizeQueue)
                    {
                        lput
            =0;
                    }
                    
            if(nEmptyThread) //必要時(shí)才進(jìn)行signal操作,勿總是signal
                    {
                        pthread_cond_signal(
            &condGet);
                    }
                    pthread_mutex_unlock(
            &mux);
                }
            private:
                pthread_mutex_t mux;
                pthread_cond_t condGet;
                pthread_cond_t condPut;

                
            void * * buffer;    //循環(huán)消息隊(duì)列
                int sizeQueue;        //隊(duì)列大小
                int lput;        //location put  放數(shù)據(jù)的指針偏移
                int lget;        //location get  取數(shù)據(jù)的指針偏移
                int nFullThread;    //隊(duì)列滿,阻塞在putq處的線程數(shù)
                int nEmptyThread;    //隊(duì)列空,阻塞在getq處的線程數(shù)
                int nData;        //隊(duì)列中的消息個(gè)數(shù),主要用來(lái)判斷隊(duì)列空還是滿
            };

            CThreadQueue queue;
            //使用的時(shí)候給出稍大的CThreadQueue初始化參數(shù),可以減少進(jìn)入內(nèi)核態(tài)的操作。

            void * produce(void * arg)
            {
                
            int i=0;
                pthread_detach(pthread_self());
                
            while(i++<100)
                {
                    queue.putq((
            void *)i);
                }
            }

            void *consume(void *arg)
            {
                
            int data;
                
            while(1)
                {
                    data
            =(int)(queue.getq());
                    printf(
            "data=%d\n",data);
                }
            }

            int main()
            {    
                pthread_t pid;
                
            int i=0;

                
            while(i++<3)
                    pthread_create(
            &pid,0,produce,0);
                i
            =0;
                
            while(i++<3)
                    pthread_create(
            &pid,0,consume,0);
                sleep(
            5);

                
            return 0;
            }


            三) 信號(hào)量
            信號(hào)量既可以作為二值計(jì)數(shù)器(即0,1),也可以作為資源計(jì)數(shù)器.
            主要是兩個(gè)函數(shù):
            sem_wait()  decrements  (locks)  the semaphore pointed to by sem.  If the semaphore's value is greater than zero, then
            the decrement proceeds, and the function returns, immediately.  If the semaphore currently has the  value  zero,  then
            the  call  blocks  until  either  it  becomes possible to perform the decrement (i.e., the semaphore value rises above
            zero), or a signal handler interrupts the call.

            sem_post()  increments  (unlocks)  the  semaphore  pointed  to  by sem.  If the semaphore's value consequently becomes
            greater than zero, then another process or thread blocked in a sem_wait(3) call will be woken up and proceed  to  lock
            the semaphore.

            而函數(shù)int sem_getvalue(sem_t *sem, int *sval);則用于獲取信號(hào)量當(dāng)前的計(jì)數(shù).

            可以用信號(hào)量模擬鎖和條件變量:
            1) 鎖,在同一個(gè)線程內(nèi)同時(shí)對(duì)某個(gè)信號(hào)量先調(diào)用sem_wait再調(diào)用sem_post, 兩個(gè)函數(shù)調(diào)用其中的區(qū)域就是所要保護(hù)的臨界區(qū)代碼了,這個(gè)時(shí)候其實(shí)信號(hào)量是作為二值計(jì)數(shù)器來(lái)使用的.不過(guò)在此之前要初始化該信號(hào)量計(jì)數(shù)為1,見(jiàn)下面例子中的代碼.
            2) 條件變量,在某個(gè)線程中調(diào)用sem_wait, 而在另一個(gè)線程中調(diào)用sem_post.

            我們將上面例子中的線程鎖和條件變量都改成用信號(hào)量實(shí)現(xiàn)以說(shuō)明信號(hào)量如何模擬兩者:
            #include <pthread.h>
            #include 
            <stdio.h>
            #include 
            <unistd.h>
            #include 
            <stdlib.h>
            #include 
            <fcntl.h>
            #include 
            <sys/stat.h>
            #include 
            <semaphore.h>
            #include 
            <errno.h>
            #include 
            <string.h>

            class CThreadQueue
            {
            public:
                CThreadQueue(
            int queueSize=1024):
                    sizeQueue(queueSize),lput(
            0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)
                {
                    
            //pthread_mutex_init(&mux,0);
                    mux = sem_open("mutex", O_RDWR | O_CREAT);
                    
            get = sem_open("get", O_RDWR | O_CREAT);
                    put 
            = sem_open("put", O_RDWR | O_CREAT);
                
                    sem_init(mux, 
            01);

                    buffer
            =new void *[sizeQueue];
                }
                
            virtual ~CThreadQueue()
                {
                    delete[] buffer;
                    sem_unlink(
            "mutex");
                    sem_unlink(
            "get");
                    sem_unlink(
            "put");
                }
                
            void * getq()
                {
                    
            void *data;

                    
            //pthread_mutex_lock(&mux);
                    sem_wait(mux);

                    
            while(lget==lput&&nData==0)
                    {
                        nEmptyThread
            ++;
                        
            //pthread_cond_wait(&condGet,&mux);
                        sem_wait(get);
                        nEmptyThread
            --;     
                    }

                    data
            =buffer[lget++];
                    nData
            --;
                    
            if(lget==sizeQueue)
                    {
                        lget
            =0;
                    }
                    
            if(nFullThread) //必要時(shí)才進(jìn)行signal操作,勿總是signal
                    {
                        
            //pthread_cond_signal(&condPut);    
                        sem_post(put);
                    }

                    
            //pthread_mutex_unlock(&mux);
                    sem_post(mux);

                    
            return data;
                }
                
            void putq(void *data)
                {
                    
            //pthread_mutex_lock(&mux);
                    sem_wait(mux);

                    
            while(lput==lget&&nData)
                    { 
                        nFullThread
            ++;
                        
            //pthread_cond_wait(&condPut,&mux);
                        sem_wait(put);
                        nFullThread
            --;
                    }
                    buffer[lput
            ++]=data;
                    nData
            ++;
                    
            if(lput==sizeQueue)
                    {
                        lput
            =0;
                    }
                    
            if(nEmptyThread)
                    {
                        
            //pthread_cond_signal(&condGet);
                        sem_post(get);
                    }

                    
            //pthread_mutex_unlock(&mux);
                    sem_post(mux);
                }
            private:
                
            //pthread_mutex_t mux;
                sem_t* mux;
                
            //pthread_cond_t condGet;
                
            //pthread_cond_t condPut;
                sem_t* get;
                sem_t
            * put;

                
            void * * buffer;    //循環(huán)消息隊(duì)列
                int sizeQueue;        //隊(duì)列大小
                int lput;        //location put  放數(shù)據(jù)的指針偏移
                int lget;        //location get  取數(shù)據(jù)的指針偏移
                int nFullThread;    //隊(duì)列滿,阻塞在putq處的線程數(shù)
                int nEmptyThread;    //隊(duì)列空,阻塞在getq處的線程數(shù)
                int nData;        //隊(duì)列中的消息個(gè)數(shù),主要用來(lái)判斷隊(duì)列空還是滿
            };

            CThreadQueue queue;
            //使用的時(shí)候給出稍大的CThreadQueue初始化參數(shù),可以減少進(jìn)入內(nèi)核態(tài)的操作。

            void * produce(void * arg)
            {
                
            int i=0;
                pthread_detach(pthread_self());
                
            while(i++<100)
                {
                    queue.putq((
            void *)i);
                }
            }

            void *consume(void *arg)
            {
                
            int data;
                
            while(1)
                {
                    data
            =(int)(queue.getq());
                    printf(
            "data=%d\n",data);
                }
            }

            int main()
            {    
                pthread_t pid;
                
            int i=0;

                
            while(i++<3)
                    pthread_create(
            &pid,0,produce,0);
                i
            =0;
                
            while(i++<3)
                    pthread_create(
            &pid,0,consume,0);
                sleep(
            5);

                
            return 0;
            }


            不過(guò), 信號(hào)量除了可以作為二值計(jì)數(shù)器用于模擬線程鎖和條件變量之外, 還有比它們更加強(qiáng)大的功能, 信號(hào)量可以用做資源計(jì)數(shù)器, 也就是說(shuō)初始化信號(hào)量的值為某個(gè)資源當(dāng)前可用的數(shù)量, 使用了一個(gè)之后遞減, 歸還了一個(gè)之后遞增, 將前面的例子用資源計(jì)數(shù)器的形式再次改寫(xiě)如下,注意在初始化的時(shí)候要將資源計(jì)數(shù)進(jìn)行初始化, 在下面代碼中的構(gòu)造函數(shù)中將put初始化為隊(duì)列的最大數(shù)量, 而get為0:
            #include <pthread.h>
            #include 
            <stdio.h>
            #include 
            <unistd.h>
            #include 
            <stdlib.h>
            #include 
            <fcntl.h>
            #include 
            <sys/stat.h>
            #include 
            <semaphore.h>

            class CThreadQueue
            {
            public:
                CThreadQueue(
            int queueSize=1024):
                    sizeQueue(queueSize),lput(
            0),lget(0)
                {
                    pthread_mutex_init(
            &mux,0);
                    
            get = sem_open("get", O_RDWR | O_CREAT);
                    put 
            = sem_open("put", O_RDWR | O_CREAT);

                    sem_init(
            get00);
                    sem_init(put, 
            0, sizeQueue);

                    buffer
            =new void *[sizeQueue];
                }
                
            virtual ~CThreadQueue()
                {
                    sem_unlink(
            "get");
                    sem_unlink(
            "put");
                    delete[] buffer;
                }
                
            void * getq()
                {
                    sem_wait(
            get);

                    
            void *data;

                    pthread_mutex_lock(
            &mux);

                    
            /*
                    while(lget==lput&&nData==0)
                    {
                        nEmptyThread++;
                        //pthread_cond_wait(&condGet,&mux);
                        nEmptyThread--;     
                    }
                    
            */

                    data
            =buffer[lget++];
                    
            //nData--;
                    if(lget==sizeQueue)
                    {
                        lget
            =0;
                    }
                    
            /*
                    if(nFullThread) //必要時(shí)才進(jìn)行signal操作,勿總是signal
                    {
                        //pthread_cond_signal(&condPut);    
                        sem_post(put);
                    }
                    
            */
                    pthread_mutex_unlock(
            &mux);

                    sem_post(put);

                    
            return data;
                }
                
            void putq(void *data)
                {
                    sem_wait(put);

                    pthread_mutex_lock(
            &mux);

                    
            /*
                    while(lput==lget&&nData)
                    { 
                        nFullThread++;
                        //pthread_cond_wait(&condPut,&mux);
                        sem_wait(put);
                        nFullThread--;
                    }
                    
            */

                    buffer[lput
            ++]=data;
                    
            //nData++;
                    if(lput==sizeQueue)
                    {
                        lput
            =0;
                    }
                    
            /*
                    if(nEmptyThread)
                    {
                        //pthread_cond_signal(&condGet);
                        sem_post(get);
                    }
                    
            */

                    pthread_mutex_unlock(
            &mux);

                    sem_post(
            get);
                }
            private:
                pthread_mutex_t mux;
                
            //pthread_cond_t condGet;
                
            //pthread_cond_t condPut;
                sem_t* get;
                sem_t
            * put;

                
            void * * buffer;    //循環(huán)消息隊(duì)列
                int sizeQueue;        //隊(duì)列大小
                int lput;        //location put  放數(shù)據(jù)的指針偏移
                int lget;        //location get  取數(shù)據(jù)的指針偏移
            };

            CThreadQueue queue;
            //使用的時(shí)候給出稍大的CThreadQueue初始化參數(shù),可以減少進(jìn)入內(nèi)核態(tài)的操作。

            void * produce(void * arg)
            {
                
            int i=0;
                pthread_detach(pthread_self());
                
            while(i++<100)
                {
                    queue.putq((
            void *)i);
                }
            }

            void *consume(void *arg)
            {
                
            int data;
                
            while(1)
                {
                    data
            =(int)(queue.getq());
                    printf(
            "data=%d\n",data);
                }
            }

            int main()
            {    
                pthread_t pid;
                
            int i=0;

                
            while(i++<3)
                    pthread_create(
            &pid,0,produce,0);
                i
            =0;
                
            while(i++<3)
                    pthread_create(
            &pid,0,consume,0);
                sleep(
            5);

                
            return 0;
            }

            可以看見(jiàn),采用信號(hào)量作為資源計(jì)數(shù)之后, 代碼變得"很直白",原來(lái)的一些保存隊(duì)列狀態(tài)的變量都不再需要了.

            信號(hào)量與線程鎖,條件變量相比還有以下幾點(diǎn)不同:
            1)鎖必須是同一個(gè)線程獲取以及釋放, 否則會(huì)死鎖.而條件變量和信號(hào)量則不必.
            2)信號(hào)的遞增與減少會(huì)被系統(tǒng)自動(dòng)記住, 系統(tǒng)內(nèi)部有一個(gè)計(jì)數(shù)器實(shí)現(xiàn)信號(hào)量,不必?fù)?dān)心會(huì)丟失, 而喚醒一個(gè)條件變量時(shí),如果沒(méi)有相應(yīng)的線程在等待該條件變量, 這次喚醒將被丟失.

            posted on 2009-01-15 10:23 那誰(shuí) 閱讀(15699) 評(píng)論(3)  編輯 收藏 引用 所屬分類: Linux/Unix

            評(píng)論

            # re: Linux下面的線程鎖,條件變量以及信號(hào)量的使用  回復(fù)  更多評(píng)論   

            第二段程序模擬線程鎖中有bug,sem_wait(get)跟sem_wait(put)前后需要加上解鎖加鎖動(dòng)作。

            produce 和 consume 的設(shè)計(jì),隊(duì)列大小為1024,測(cè)試數(shù)據(jù)3x100太小。while循環(huán)中最好加上usleep讓線程休息一會(huì)兒,否則consume極可能需要在produce線程跑完了之后才獲得鎖,對(duì)測(cè)試程序不利。
            2009-01-27 08:57 | Mensch88

            # re: Linux下面的線程鎖,條件變量以及信號(hào)量的使用  回復(fù)  更多評(píng)論   

            關(guān)于最后一段對(duì)用信號(hào)量實(shí)現(xiàn)通知/等待機(jī)制和用條件變量來(lái)實(shí)現(xiàn)相同機(jī)制的比較,所說(shuō)的兩點(diǎn)都對(duì)信號(hào)量有利,那條件變量多余了嗎?
            2009-02-11 11:39 | yqiang
            久久久久国产视频电影| 香蕉99久久国产综合精品宅男自 | 99精品久久久久久久婷婷| 91麻豆国产精品91久久久| 久久婷婷色综合一区二区| 久久精品一区二区| 精品久久国产一区二区三区香蕉| 久久精品国产亚洲AV不卡| 国产AV影片久久久久久| 无码专区久久综合久中文字幕| 久久99久久无码毛片一区二区| 看久久久久久a级毛片| 中文字幕亚洲综合久久菠萝蜜| 狠狠色丁香婷婷综合久久来| 久久婷婷国产剧情内射白浆| 久久久精品波多野结衣| 日本三级久久网| 久久精品无码一区二区无码| 日韩电影久久久被窝网| 久久天天躁狠狠躁夜夜av浪潮| 久久精品国产福利国产秒| 热re99久久精品国99热| 久久精品国产久精国产果冻传媒 | 久久久久亚洲AV无码观看| 久久久不卡国产精品一区二区| 久久精品免费一区二区三区| 久久久免费精品re6| 精品久久久久久成人AV| 欧美熟妇另类久久久久久不卡 | 精品久久777| 精品综合久久久久久97超人| 99久久精品国产麻豆| 91久久精一区二区三区大全| 五月丁香综合激情六月久久| 久久99国产乱子伦精品免费| 无码人妻久久久一区二区三区| 亚洲精品白浆高清久久久久久| 亚洲AV日韩精品久久久久久| 国产成人精品白浆久久69| 欧美精品一区二区精品久久| 精品国产青草久久久久福利|