作者:CppExplore 網(wǎng)址:http://www.shnenglu.com/CppExplore/
為了后面寫的《網(wǎng)絡(luò)模型(二)》,多寫一篇關(guān)于線程的。線程使用涉及的主要數(shù)據(jù)結(jié)構(gòu)以及應(yīng)用框架可以參考http://www.shnenglu.com/CppExplore/archive/2008/01/15/41175.html。本文的主要目的是給出linux下實(shí)用的線程消息隊(duì)列實(shí)現(xiàn)。
一、linux上線程相關(guān)的操作有下面幾種:
(1)pthread_t類型的創(chuàng)建、屬性創(chuàng)建設(shè)置等。
這類具體可以:man pthread_creat; man pthread_attr_init;man pthread_detach;man pthread_join;等查看
(2)pthread_mutex_t類型的操作。
這類具體可以: man pthread_mutex_init可以看到所有相關(guān)的操作。
(3)pthread_cond_t類型的操作。同樣:man pthread_cond_init。pthread_cond_t的wait和signal操作一定要和pthread_mutex_t的lock、unlock配合使用。類似于此:
pthread_mutex_t mux=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;

//wait操作:
pthread_mutex_lock(&mux);
pthread_cond_wait(&cond,&mux);//睡眠前將內(nèi)部會(huì)執(zhí)行pthread_mutex_unlock,醒來時(shí)內(nèi)部會(huì)執(zhí)行pthread_mutex_lock
pthread_mutex_unlock(&mux);

//signal操作
pthread_mutex_lock(&mux);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mux);

(4)sem_t類型的操作。同樣:man sem_init 這個(gè)系列一般是用不到的,太重量級(jí)了,也是最強(qiáng)大的一種。
二、linux2.6內(nèi)核的線程庫。2.6內(nèi)核的默認(rèn)安裝的是redhat公司的NPTL(原生posix線程庫),以前內(nèi)核安裝的是LinuxThreads庫,兩者的簡(jiǎn)單介紹可以看http://www.ibm.com/developerworks/cn/linux/l-threading.html。不過對(duì)于應(yīng)用者,分析兩者的區(qū)別和優(yōu)劣也沒什么大意義。這里特別提下NPTL的futex機(jī)制。借助該機(jī)制,pthread_mutex的性能大大提高,只要不進(jìn)入競(jìng)爭(zhēng)態(tài),進(jìn)程就不會(huì)陷入內(nèi)核態(tài)。這點(diǎn)可以自己寫示例程序,通過strace -c 跟蹤進(jìn)程的系統(tǒng)調(diào)用得以證實(shí),另外還可以證實(shí)總是進(jìn)入內(nèi)核態(tài)的操作有pthread_cond_signal和sem_post。
三、實(shí)用的線程消息隊(duì)列實(shí)現(xiàn)。下面設(shè)計(jì)一個(gè)具有線程消息隊(duì)列的線程封裝類。通過上面的分析,我們可以有如下結(jié)論:
(1)減少pthread_cond_signal和sem_post的調(diào)用,只在有必要的時(shí)候調(diào)用;
(2)盡量避免pthread_mutex進(jìn)入競(jìng)爭(zhēng)態(tài)。增大消息隊(duì)列的大小,可以有效減少競(jìng)態(tài)條件的出現(xiàn)。
下面給出一個(gè)實(shí)用的線程消息隊(duì)列的實(shí)現(xiàn)類,這個(gè)類也將是以后《網(wǎng)絡(luò)模型》文章中用到的線程消息隊(duì)列類,代碼注釋請(qǐng)看對(duì)私有屬性的注釋:
class CThreadQueue


{

public:
CThreadQueue(int queueSize=1024):
sizeQueue(queueSize),lput(0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)

{
pthread_mutex_init(&mux,0);
pthread_cond_init(&condGet,0);
pthread_cond_init(&condPut,0);
buffer=new (void *)[sizeQueue];
}
virtual ~CThreadQueue()

{
delete[] buffer;
}
void * getq()

{
void *data;
pthread_mutex_lock(&mux);
while(lget==lput&&nData==0)//此處循環(huán)判斷的原因如下:假設(shè)2個(gè)線程在getq阻塞,然后兩者都被激活,而其中一個(gè)線程運(yùn)行比較塊,快速消耗了2個(gè)數(shù)據(jù),另一個(gè)線程醒來的時(shí)候已經(jīng)沒有新數(shù)據(jù)可以消耗了。另一點(diǎn),man pthread_cond_wait可以看到,該函數(shù)可以被信號(hào)中斷返回,此時(shí)返回EINTR。為避免以上任何一點(diǎn),都必須醒來后再次判斷睡眠條件。更正:pthread_cond_wait是信號(hào)安全的系統(tǒng)調(diào)用,不會(huì)被信號(hào)中斷。

{
nEmptyThread++;
pthread_cond_wait(&condGet,&mux);
nEmptyThread--;
}
data=buffer[lget++];
nData--;
if(lget==sizeQueue)

{
lget=0;
}
if(nFullThread) //必要時(shí)才進(jìn)行signal操作,勿總是signal

{
pthread_cond_signal(&condPut);
}
pthread_mutex_unlock(&mux);
return data;
}
void putq(void *data)

{
pthread_mutex_lock(&mux);
while(lput==lget&&nData)

{
nFullThread++;
pthread_cond_wait(&condPut,&mux);
nFullThread--;
}
buffer[lput++]=data;
nData++;
if(lput==sizeQueue)

{
lput=0;
}
if(nEmptyThread)

{
pthread_cond_signal(&condGet);
}
pthread_mutex_unlock(&mux);
}
private:
pthread_mutex_t mux;
pthread_cond_t condGet;
pthread_cond_t condPut;

void * * buffer; //循環(huán)消息隊(duì)列
int sizeQueue; //隊(duì)列大小
int lput; //location put 放數(shù)據(jù)的指針偏移
int lget; //location get 取數(shù)據(jù)的指針偏移
int nFullThread; //隊(duì)列滿,阻塞在putq處的線程數(shù)
int nEmptyThread; //隊(duì)列空,阻塞在getq處的線程數(shù)
int nData; //隊(duì)列中的消息個(gè)數(shù),主要用來判斷隊(duì)列空還是滿
};
下面給出這個(gè)線程消息隊(duì)列的一個(gè)使用舉例:
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
CThreadQueue queue;//使用的時(shí)候給出稍大的CThreadQueue初始化參數(shù),可以減少進(jìn)入內(nèi)核態(tài)的操作。
void * produce(void * arg)


{
int i=0;
pthread_detach(pthread_self());
while(i++<100)

{
queue.putq((void *)i);
}
}
void *consume(void *arg)


{
int data;
while(1)

{
data=(int)(queue.getq());
printf("data=%d\n",data)
}
}
int main()


{ pthread_t pid;
int i=0;

while(i++<3)
pthread_create(&pid,0,produce,0);
i=0;
while(i++<3)
pthread_create(&pid,0,consume,0);
sleep(300);
}
