作者:CppExplore 網址:http://www.shnenglu.com/CppExplore/
為了后面寫的《網絡模型(二)》,多寫一篇關于線程的。線程使用涉及的主要數據結構以及應用框架可以參考http://www.shnenglu.com/CppExplore/archive/2008/01/15/41175.html。本文的主要目的是給出linux下實用的線程消息隊列實現。
一、linux上線程相關的操作有下面幾種:
(1)pthread_t類型的創建、屬性創建設置等。
這類具體可以:man pthread_creat; man pthread_attr_init;man pthread_detach;man pthread_join;等查看
(2)pthread_mutex_t類型的操作。
這類具體可以: man pthread_mutex_init可以看到所有相關的操作。
(3)pthread_cond_t類型的操作。同樣:man pthread_cond_init。pthread_cond_t的wait和signal操作一定要和pthread_mutex_t的lock、unlock配合使用。類似于此:
pthread_mutex_t mux=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;

//wait操作:
pthread_mutex_lock(&mux);
pthread_cond_wait(&cond,&mux);//睡眠前將內部會執行pthread_mutex_unlock,醒來時內部會執行pthread_mutex_lock
pthread_mutex_unlock(&mux);

//signal操作
pthread_mutex_lock(&mux);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mux);

(4)sem_t類型的操作。同樣:man sem_init 這個系列一般是用不到的,太重量級了,也是最強大的一種。
二、linux2.6內核的線程庫。2.6內核的默認安裝的是redhat公司的NPTL(原生posix線程庫),以前內核安裝的是LinuxThreads庫,兩者的簡單介紹可以看http://www.ibm.com/developerworks/cn/linux/l-threading.html。不過對于應用者,分析兩者的區別和優劣也沒什么大意義。這里特別提下NPTL的futex機制。借助該機制,pthread_mutex的性能大大提高,只要不進入競爭態,進程就不會陷入內核態。這點可以自己寫示例程序,通過strace -c 跟蹤進程的系統調用得以證實,另外還可以證實總是進入內核態的操作有pthread_cond_signal和sem_post。
三、實用的線程消息隊列實現。下面設計一個具有線程消息隊列的線程封裝類。通過上面的分析,我們可以有如下結論:
(1)減少pthread_cond_signal和sem_post的調用,只在有必要的時候調用;
(2)盡量避免pthread_mutex進入競爭態。增大消息隊列的大小,可以有效減少競態條件的出現。
下面給出一個實用的線程消息隊列的實現類,這個類也將是以后《網絡模型》文章中用到的線程消息隊列類,代碼注釋請看對私有屬性的注釋:
class CThreadQueue


{

public:
CThreadQueue(int queueSize=1024):
sizeQueue(queueSize),lput(0),lget(0),nFullThread(0),nEmptyThread(0),nData(0)

{
pthread_mutex_init(&mux,0);
pthread_cond_init(&condGet,0);
pthread_cond_init(&condPut,0);
buffer=new (void *)[sizeQueue];
}
virtual ~CThreadQueue()

{
delete[] buffer;
}
void * getq()

{
void *data;
pthread_mutex_lock(&mux);
while(lget==lput&&nData==0)//此處循環判斷的原因如下:假設2個線程在getq阻塞,然后兩者都被激活,而其中一個線程運行比較塊,快速消耗了2個數據,另一個線程醒來的時候已經沒有新數據可以消耗了。另一點,man pthread_cond_wait可以看到,該函數可以被信號中斷返回,此時返回EINTR。為避免以上任何一點,都必須醒來后再次判斷睡眠條件。更正:pthread_cond_wait是信號安全的系統調用,不會被信號中斷。

{
nEmptyThread++;
pthread_cond_wait(&condGet,&mux);
nEmptyThread--;
}
data=buffer[lget++];
nData--;
if(lget==sizeQueue)

{
lget=0;
}
if(nFullThread) //必要時才進行signal操作,勿總是signal

{
pthread_cond_signal(&condPut);
}
pthread_mutex_unlock(&mux);
return data;
}
void putq(void *data)

{
pthread_mutex_lock(&mux);
while(lput==lget&&nData)

{
nFullThread++;
pthread_cond_wait(&condPut,&mux);
nFullThread--;
}
buffer[lput++]=data;
nData++;
if(lput==sizeQueue)

{
lput=0;
}
if(nEmptyThread)

{
pthread_cond_signal(&condGet);
}
pthread_mutex_unlock(&mux);
}
private:
pthread_mutex_t mux;
pthread_cond_t condGet;
pthread_cond_t condPut;

void * * buffer; //循環消息隊列
int sizeQueue; //隊列大小
int lput; //location put 放數據的指針偏移
int lget; //location get 取數據的指針偏移
int nFullThread; //隊列滿,阻塞在putq處的線程數
int nEmptyThread; //隊列空,阻塞在getq處的線程數
int nData; //隊列中的消息個數,主要用來判斷隊列空還是滿
};
下面給出這個線程消息隊列的一個使用舉例:
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
CThreadQueue queue;//使用的時候給出稍大的CThreadQueue初始化參數,可以減少進入內核態的操作。
void * produce(void * arg)


{
int i=0;
pthread_detach(pthread_self());
while(i++<100)

{
queue.putq((void *)i);
}
}
void *consume(void *arg)


{
int data;
while(1)

{
data=(int)(queue.getq());
printf("data=%d\n",data)
}
}
int main()


{ pthread_t pid;
int i=0;

while(i++<3)
pthread_create(&pid,0,produce,0);
i=0;
while(i++<3)
pthread_create(&pid,0,consume,0);
sleep(300);
}
