進行多線程編程,最頭疼的就是那些共享的數據。因為你無法知道哪個線程會在哪個時候對它進行操作,你也無法得知那個線程會先運行,哪個線程會后運行。下面介紹一些技術,通過他們,你會合理安排你的線程之間對資源的競爭。
l???????? 互斥體Mutex
l???????? 信號燈Semophore
l???????? 條件變量Conditions
先說一下互斥量。
什么時候會用上互斥量了?比如你現在有一全局鏈表,你有幾個工作線程。每一個線程從該鏈表中取出頭節點,然后對該頭節點進行處理。比如現在線程1正在取出頭節點,他的操作如下:
Item * p =queue_list;
Queue_list=queue_list->next;
Process_job(p);
Free(p);
當線程1處理完第一步,也就是Item *p=queue_list后,這時候系統停止線程1的運行,改而運行線程2。線程2照樣取出頭節點,然后進行處理,最后釋放了該節點。過了段時間,線程1重新得到運行。而這個時候,其實p所指向的節點已經被線程2釋放掉,而線程1對此毫無知曉。他會接著運行process_job(p)。而這將導致無法預料的后果!
對于這種情況,系統給我們提供了互斥量。你在取出頭節點前必須要等待互斥量,如果此時有其他線程已經獲得該互斥量,那么線程將會阻塞在這個地方。只有等到其他線程釋放掉該互斥量后,你的線程才有可能得到該互斥量。為什么是可能了?因為可能此時有不止你一個線程在等候該互斥量,而系統無法保證你的線程將會優先運行。
互斥量的類型為pthread_mutex_t。你可以聲明多個互斥量。在聲明該變量后,你需要調用pthread_mutex_init()來創建該變量。pthread_mutex_init的格式如下:
int? pthread_mutex_init(pthread_mutex_t? *mutex,? const? pthread_mutex_attr_t *mutexattr);
第一個參數,mutext,也就是你之前聲明的那個互斥量,第二個參數為該互斥量的屬性。這個將在后面詳細討論。
在創建該互斥量之后,你便可以使用它了。要得到互斥量,你需要調用下面的函數:
int pthread_mutex_lock(pthread_mutex_t *mutex);
該函數用來給互斥量上鎖,也就是我們前面所說的等待操作。互斥量一旦被上鎖后,其他線程如果想給該互斥量上鎖,那么就會阻塞在這個操作上。如果在此之前該互斥量已經被其他線程上鎖,那么該操作將會一直阻塞在這個地方,直到獲得該鎖為止。
在得到互斥量后,你就可以進入關鍵代碼區了。
同樣,在操作完成后,你必須調用下面的函數來給互斥量解鎖,也就是前面所說的釋放。這樣其他等待該鎖的線程才有機會獲得該鎖,否則其他線程將會永遠阻塞。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
下面給出一個簡單的例子:
#include <malloc.h>
#include <pthread.h>
struct job {
/* Link field for linked list. */
struct job* next;
/* Other fields describing work to be done... */
};
/* A linked list of pending jobs. */
struct job* job_queue;
/* A mutex protecting job_queue. */
pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Process queued jobs until the queue is empty. */
void* thread_function (void* arg)
{
while (1) {
struct job* next_job;
/* Lock the mutex on the job queue. */
pthread_mutex_lock (&job_queue_mutex);
/* Now it’s safe to check if the queue is empty. */
if (job_queue == NULL)
next_job = NULL;
else {
/* Get the next available job. */
next_job = job_queue;
/* Remove this job from the list. */
job_queue = job_queue->next;
}
/* Unlock the mutex on the job queue because we’re done with the
queue for now. */
pthread_mutex_unlock (&job_queue_mutex);
/* Was the queue empty? If so, end the thread. */
if (next_job == NULL)
break;
/* Carry out the work. */
process_job (next_job);
/* Clean up. */
free (next_job);
}
return NULL;
}
?
在這個例子中我們使用了下面一條語句:
pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
?????? 他的作用和調用pthread_mutex_init()函數一樣。
?
如果一個線程已經給一個互斥量上鎖了,后來在操作的過程中又再次調用了該上鎖的操作,那么該線程將會無限阻塞在這個地方,從而導致死鎖。怎么變了?這就需要我們之前所提到的互斥量的屬性。
互斥量分為下面三種:
l???????? 快速型。這種類型也是默認的類型。該線程的行為正如上面所說的。
l???????? 遞歸型。如果遇到我們上面所提到的死鎖情況,同一線程循環給互斥量上鎖,那么系統將會知道該上鎖行為來自同一線程,那么就會同意線程給該互斥量上鎖。
l???????? 錯誤檢測型。如果該互斥量已經被上鎖,那么后續的上鎖將會失敗而不會阻塞,pthread_mutex_lock()操作將會返回EDEADLK。
互斥量的屬性類型為pthread_mutexattr_t。聲明后調用pthread_mutexattr_init()來創建該互斥量。然后調用int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind);來設置屬性。int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind);格式如下:
int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind);
第一個參數,attr,就是前面聲明的屬性變量,第二個參數,kind,就是我們要設置的屬性類型。他有下面幾個選項:
l???????? PTHREAD_MUTEX_FAST_NP
l???????? PTHREAD_MUTEX_RECURSIVE_NP
l???????? PTHREAD_MUTEX_ERRORCHECK_NP
下面給出一個使用屬性的簡單過程:
pthread_mutex_t mutex;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&mutex,&attr);
pthread_mutex_destroy(&attr);
?
前面我們提到在調用pthread_mutex_lock()的時候,如果此時mutex已經被其他線程上鎖,那么該操作將會一直阻塞在這個地方。如果我們此時不想一直阻塞在這個地方,那么可以調用下面函數:
pthread_mutex_trylock()
如果此時互斥量沒有被上鎖,那么pthread_mutex_trylock()將會返回0,并會對該互斥量上鎖。如果互斥量已經被上鎖,那么會立刻返回EBUSY。
?
上面談到的是使用互斥量。如果碰到下面這種情況,該怎么辦了?
還是上面程序中提到的工作鏈表。此時必然有一個生產者線程,用于往鏈表里添加節點。如果這一段時間沒有工作,那么工作線程將會不停的調用lock,unlock操作。而這樣的操作毫無疑義。
在這里系統給我們提供了另外一種同步機制,信號燈,Semaphore。
信號燈其實就是一個計數器,也是一個整數。每一次調用wait操作將會使semaphore值減一,而如果semaphore值已經為0,則wait操作將會阻塞。每一次調用post操作將會使semaphore值加一。將這些操作用到上面的問題中。工作線程每一次調用wait操作,如果此時鏈表中沒有節點,則工作線程將會阻塞,直到鏈表中有節點。生產者線程在每次往鏈表中添加節點后調用post操作,信號燈值會加一。這樣阻塞的工作線程就會停止阻塞,繼續往下執行。
信號燈的類型為sem_t。在聲明后必須調用sem_init()。需要傳遞兩個參數,第一個參數就是你之前聲明的sem_t變量,第二個必須為0。當你不再需要信號燈時,你必須調用sem_destroy()來釋放資源。
等待信號燈的操作為sem_wait()。投遞一個信號的操作為sem_wait()。和互斥量一樣,等待信號燈也有一個非阻塞的操作,sem_trywait()。該操作在沒有信號燈的時候返回EAGAIN。
下面是一個結合了互斥量和信號燈的例子:
#include <malloc.h>
#include <pthread.h>
#include <semaphore.h>
struct job {
/* Link field for linked list. */
struct job* next;
/* Other fields describing work to be done... */
};
/* A linked list of pending jobs. */
struct job* job_queue;
/* A mutex protecting job_queue. */
pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
/* A semaphore counting the number of jobs in the queue. */
sem_t job_queue_count;
/* Perform one-time initialization of the job queue. */
void initialize_job_queue ()
{
/* The queue is initially empty. */
job_queue = NULL;
/* Initialize the semaphore which counts jobs in the queue. Its
initial value should be zero. */
sem_init (&job_queue_count, 0, 0);
}
/* Process queued jobs until the queue is empty. */
void* thread_function (void* arg)
{
while (1) {
struct job* next_job;
/* Wait on the job queue semaphore. If its value is positive,
indicating that the queue is not empty, decrement the count by
1. If the queue is empty, block until a new job is enqueued. */
sem_wait (&job_queue_count);
/* Lock the mutex on the job queue. */
pthread_mutex_lock (&job_queue_mutex);
/* Because of the semaphore, we know the queue is not empty. Get
the next available job. */
next_job = job_queue;
/* Remove this job from the list. */
job_queue = job_queue->next;
/* Unlock the mutex on the job queue because we’re done with the
queue for now. */
pthread_mutex_unlock (&job_queue_mutex);
/* Carry out the work. */
process_job (next_job);
/* Clean up. */
free (next_job);
}
return NULL;
}
/* Add a new job to the front of the job queue. */
void enqueue_job (/* Pass job-specific data here... */)
{
struct job* new_job;
/* Allocate a new job object. */
new_job = (struct job*) malloc (sizeof (struct job));
/* Set the other fields of the job struct here... */
/* Lock the mutex on the job queue before accessing it. */
pthread_mutex_lock (&job_queue_mutex);
/* Place the new job at the head of the queue. */
new_job->next = job_queue;
job_queue = new_job;
/* Post to the semaphore to indicate that another job is available. If
threads are blocked, waiting on the semaphore, one will become
unblocked so it can process the job. */
sem_post (&job_queue_count);
/* Unlock the job queue mutex. */
pthread_mutex_unlock (&job_queue_mutex);
}
?
?
?
下面說一下第三種同步機制―條件變量。
如果現在在等待一個信號。如果該信號被設置,則繼續運行。如果沒有條件變量,我們將會不停的去查詢該信號是否被設置,這樣就會浪費大量的cpu。而通過使用條件變量,我們就可以將等待信號的線程阻塞,直到有信號的時候再去喚醒它。
條件變量的類型是pthread_cond_t。
下面簡單說一下如何使用條件變量。
l???????? 聲明pthread_cond_t變量后,調用pthread_cond_init()函數,第一個參數為之前聲明的變量。第二個參數在Linux中不起作用。
l???????? 聲明一個pthread_mutex_t變量,并調用pthread_mutex_init()初始化。
l???????? 調用pthread_cond_signal(),發出信號。如果此時有線程在等待該信號,那么該線程將會喚醒。如果沒有,該信號就會別忽略。
l???????? 如果想喚醒所有等待該信號的線程,調用pthread_cond_broadcast()。
l???????? 調用pthread_cond_wait()等待信號。如果沒有信號,線程將會阻塞,直到有信號。該函數的第一個參數是條件變量,第二個參數是一個mutex。在調用該函數之前必須先獲得互斥量。如果線程阻塞,互斥量將立刻會被釋放。
下面給出一個簡單的使用例子。
#include <pthread.h>
#include <stdio.h>
?
pthread_mutex_t mutex;
pthread_cond_t cond;
int flag;
void init()
{
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&cond,NULL);
flag=0;
}
?
void * Thread_Function(void * arg)
{
//loop infinitely
while(1)
{
????? pthread_mutex_lock(&mutex);
????? while(!flag)
?????????? pthread_cond_wait(&cond,&mutex);
????? pthread_mutex_unlock(&mutex);
?
????? do_some_work();
}
}
?
void SetFlag()
{
????? pthread_mutex_lock(&mutex);
????? flag=1;
????? pthread_cond_signal(&cond);
????? pthread_mutex_unlock(&mutex);
}
?
關于線程同步的技術先說到這個地方。
操作系統實驗用的程序,保留一個。沒什么技術含量,請見諒^_^
這個程序就是解決那個著名的“生產者―消費者”的問題,貌似線程的同步都講這個問題,Java里面也有,不過操作起來貌似要更簡單一點。
在這個程序里subp1()用來生產一個 int 數據,subp2()用來獲取這個整數。首先是subp1()生產一個數據,subp2()再去獲取這個數據。subp2()獲取數據的首要條件是 subp1()已經生產了一個新的數據,subp1()生產一個新數據的前提是subp2()已經獲得了 subp1()生產的前一個數據。
引用內容:
/*thread synchronization*/
/*thread.c*/
/*Afdream.com*/
/*2005-12-20*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <linux/sem.h>
#include <semaphore.h>
/*function of p opration*/
void P(int semid,int index)
{
??? struct sembuf sem;
??? sem.sem_num = index;
??? sem.sem_op = -1;
??? //mark of option:0 or IPC_NOWAIT and so on
??? sem.sem_flg = 0;
??? //the '1' in this sentence means the number of commands
??? semop(semid,&sem,1);???????
??? return;
}
/*function of v opration*/
void V(int semid,int index)
{
??? struct sembuf sem;
??? sem.sem_num = index;
??? sem.sem_op =? 1;
??? sem.sem_flg = 0;
??? semop(semid,&sem,1);
??? return;
}
int semid;
pthread_t p1,p2;
int sharedInt=0;
void *subp1();
void *subp2();
/*in Linux,the return of the function main must be int,cann't be void*/
int main()
{
??? union semun? semopts;
??? int res;
??? /*請求兩個信號量*/
??? semid = semget(300,2,IPC_CREAT|0666);
??? if (semid<0) {
??????? printf("error");
??????? return;
??? }
??? /*初始化第一個信號量的值為1*/
??? semopts.val = 1;
??? res=semctl(semid,0,SETVAL,semopts);
??? /*初始化第二個信號量的值為0*/
??? semopts.val = 0;
??? res=semctl(semid,1,SETVAL,semopts);
??? if (res < 0) return;
??? /*創建兩個線程*/
??? pthread_create(&p2,NULL,subp2,NULL);
??????????????? pthread_create(&p1,NULL,subp1,NULL);
??? /*等待兩個線程結束*/
??? pthread_join(p1,NULL);
??? pthread_join(p2,NULL);
??? semctl(semid,0,IPC_RMID,0);
}
/*produce number*/
void *subp1()
{
??? int i,j;
??? for (i=0;? i<10;i++) {
??????? sleep(i+1);
??????? printf("\nready to produce!\n");
??????? P(semid,0);
??????? sharedInt++;
??????? printf("have produced %d!\n",sharedInt);
??????? V(semid,1);
??? }
??? return;
}
/*get number*/
void *subp2()
{
??? int i,j;
??? for (i=0;i<10;i++) {
??????? sleep(10-i);
??????? printf("\nready to get!\n");
??????? P(semid,1);
??????? printf("have got %d!\n",sharedInt);
??????? V(semid,0);
??? }
??? return;
}