?
一種支持內存共享的簡捷工具
Daniel Robbins (drobbins@gentoo.org), 總裁/CEO, Gentoo Technologies, Inc.
2000 年 7 月 01 日
??? POSIX(可移植操作系統接口)線程是提高代碼響應和性能的有力手段。在本系列中,Daniel Robbins 向您精確地展示在編程中如何使用線程。其中還涉及大量幕后細節,讀完本系列文章,您完全可以運用 POSIX 線程創建多線程程序。
線程是有趣的
了解如何正確運用線程是每一個優秀程序員必備的素質。線程類似于進程。如同進程,線程由內核按時間分片進行管理。在單處理器系統中,內核使用時間分片來模擬線程的并發執行,這種方式和進程的相同。而在多處理器系統中,如同多個進程,線程實際上一樣可以并發執行。
那么為什么對于大多數合作性任務,多線程比多個獨立的進程更優越呢?這是因為,線程共享相同的內存空間。不同的線程可以存取內存中的同一個變量。所以,程序中的所有線程都可以讀或寫聲明過的全局變量。如果曾用 fork() 編寫過重要代碼,就會認識到這個工具的重要性。為什么呢?雖然 fork() 允許創建多個進程,但它還會帶來以下通信問題: 如何讓多個進程相互通信,這里每個進程都有各自獨立的內存空間。對這個問題沒有一個簡單的答案。雖然有許多不同種類的本地 IPC (進程間通信),但它們都遇到兩個重要障礙:
??? * 強加了某種形式的額外內核開銷,從而降低性能。
??? * 對于大多數情形,IPC 不是對于代碼的“自然”擴展。通常極大地增加了程序的復雜性。
雙重壞事: 開銷和復雜性都非好事。如果曾經為了支持 IPC 而對程序大動干戈過,那么您就會真正欣賞線程提供的簡單共享內存機制。由于所有的線程都駐留在同一內存空間,POSIX 線程無需進行開銷大而復雜的長距離調用。只要利用簡單的同步機制,程序中所有的線程都可以讀取和修改已有的數據結構。而無需將數據經由文件描述符轉儲或擠入緊窄的共享內存空間。僅此一個原因,就足以讓您考慮應該采用單進程/多線程模式而非多進程/單線程模式。
線程是快捷的
不僅如此。線程同樣還是非常快捷的。與標準 fork() 相比,線程帶來的開銷很小。內核無需單獨復制進程的內存空間或文件描述符等等。這就節省了大量的 CPU 時間,使得線程創建比新進程創建快上十到一百倍。因為這一點,可以大量使用線程而無需太過于擔心帶來的 CPU 或內存不足。使用 fork() 時導致的大量 CPU 占用也不復存在。這表示只要在程序中有意義,通常就可以創建線程。
當然,和進程一樣,線程將利用多 CPU。如果軟件是針對多處理器系統設計的,這就真的是一大特性(如果軟件是開放源碼,則最終可能在不少平臺上運行)。特定類型線程程序(尤其是 CPU 密集型程序)的性能將隨系統中處理器的數目幾乎線性地提高。如果正在編寫 CPU 非常密集型的程序,則絕對想設法在代碼中使用多線程。一旦掌握了線程編碼,無需使用繁瑣的 IPC 和其它復雜的通信機制,就能夠以全新和創造性的方法解決編碼難題。所有這些特性配合在一起使得多線程編程更有趣、快速和靈活。
線程是可移植的
如果熟悉 Linux 編程,就有可能知道 __clone() 系統調用。__clone() 類似于 fork(),同時也有許多線程的特性。例如,使用 __clone(),新的子進程可以有選擇地共享父進程的執行環境(內存空間,文件描述符等)。這是好的一面。但 __clone() 也有不足之處。正如__clone() 在線幫助指出:
??? “__clone 調用是特定于 Linux 平臺的,不適用于實現可移植的程序。欲編寫線程化應用程序(多線程控制同一內存空間),最好使用實現 POSIX 1003.1c 線程 API 的庫,例如 Linux-Threads 庫。參閱 pthread_create(3thr)。”
雖然 __clone() 有線程的許多特性,但它是不可移植的。當然這并不意味著代碼中不能使用它。但在軟件中考慮使用 __clone() 時應當權衡這一事實。值得慶幸的是,正如 __clone() 在線幫助指出,有一種更好的替代方案:POSIX 線程。如果想編寫 可移植的 多線程代碼,代碼可運行于 Solaris、FreeBSD、Linux 和其它平臺,POSIX 線程是一種當然之選。
第一個線程
下面是一個 POSIX 線程的簡單示例程序:
thread1.c
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
?void *thread_function(void *arg) {
? int i;
? for ( i=0; i<20; i++) {
??? printf("Thread says hi!\n");
??? sleep(1);
? }
? return NULL;
}
int main(void) {
? pthread_t mythread;
?
? if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
??? printf("error creating thread.");
??? abort();
? }
? if ( pthread_join ( mythread, NULL ) ) {
??? printf("error joining thread.");
??? abort();
? }
? exit(0);
}
要編譯這個程序,只需先將程序存為 thread1.c,然后輸入:
$ gcc thread1.c -o thread1 -lpthread
運行則輸入:
$ ./thread1
理解 thread1.c
thread1.c 是一個非常簡單的線程程序。雖然它沒有實現什么有用的功能,但可以幫助理解線程的運行機制。下面,我們一步一步地了解這個程序是干什么的。main() 中聲明了變量 mythread,類型是 pthread_t。pthread_t 類型在 pthread.h 中定義,通常稱為“線程 id”(縮寫為 "tid")??梢哉J為它是一種線程句柄。
mythread 聲明后(記住 mythread 只是一個 "tid",或是將要創建的線程的句柄),調用 pthread_create 函數創建一個真實活動的線程。不要因為 pthread_create() 在 "if" 語句內而受其迷惑。由于 pthread_create() 執行成功時返回零而失敗時則返回非零值,將 pthread_create() 函數調用放在 if() 語句中只是為了方便地檢測失敗的調用。讓我們查看一下 pthread_create 參數。第一個參數 &mythread 是指向 mythread 的指針。第二個參數當前為 NULL,可用來定義線程的某些屬性。由于缺省的線程屬性是適用的,只需將該參數設為 NULL。
第三個參數是新線程啟動時調用的函數名。本例中,函數名為 thread_function()。當 thread_function() 返回時,新線程將終止。本例中,線程函數沒有實現大的功能。它僅將 "Thread says hi!" 輸出 20 次然后退出。注意 thread_function() 接受 void * 作為參數,同時返回值的類型也是 void *。這表明可以用 void * 向新線程傳遞任意類型的數據,新線程完成時也可返回任意類型的數據。那如何向線程傳遞一個任意參數?很簡單。只要利用 pthread_create() 中的第四個參數。本例中,因為沒有必要將任何數據傳給微不足道的 thread_function(),所以將第四個參數設為 NULL。
您也許已推測到,在 pthread_create() 成功返回之后,程序將包含兩個線程。等一等, 兩個 線程?我們不是只創建了一個線程嗎?不錯,我們只創建了一個進程。但是主程序同樣也是一個線程??梢赃@樣理解:如果編寫的程序根本沒有使用 POSIX 線程,則該程序是單線程的(這個單線程稱為“主”線程)。創建一個新線程之后程序總共就有兩個線程了。
我想此時您至少有兩個重要問題。第一個問題,新線程創建之后主線程如何運行。答案,主線程按順序繼續執行下一行程序(本例中執行 "if (pthread_join(...))")。第二個問題,新線程結束時如何處理。答案,新線程先停止,然后作為其清理過程的一部分,等待與另一個線程合并或“連接”。
現在,來看一下 pthread_join()。正如 pthread_create() 將一個線程拆分為兩個, pthread_join() 將兩個線程合并為一個線程。pthread_join() 的第一個參數是 tid mythread。第二個參數是指向 void 指針的指針。如果 void 指針不為 NULL,pthread_join 將線程的 void * 返回值放置在指定的位置上。由于我們不必理會 thread_function() 的返回值,所以將其設為 NULL.
您會注意到 thread_function() 花了 20 秒才完成。在 thread_function() 結束很久之前,主線程就已經調用了 pthread_join()。如果發生這種情況,主線程將中斷(轉向睡眠)然后等待 thread_function() 完成。當 thread_function() 完成后, pthread_join() 將返回。這時程序又只有一個主線程。當程序退出時,所有新線程已經使用 pthread_join() 合并了。這就是應該如何處理在程序中創建的每個新線程的過程。如果沒有合并一個新線程,則它仍然對系統的最大線程數限制不利。這意味著如果未對線程做正確的清理,最終會導致 pthread_create() 調用失敗。
無父,無子
如果使用過 fork() 系統調用,可能熟悉父進程和子進程的概念。當用 fork() 創建另一個新進程時,新進程是子進程,原始進程是父進程。這創建了可能非常有用的層次關系,尤其是等待子進程終止時。例如,waitpid() 函數讓當前進程等待所有子進程終止。waitpid() 用來在父進程中實現簡單的清理過程。
而 POSIX 線程就更有意思。您可能已經注意到我一直有意避免使用“父線程”和“子線程”的說法。這是因為 POSIX 線程中不存在這種層次關系。雖然主線程可以創建一個新線程,新線程可以創建另一個新線程,POSIX 線程標準將它們視為等同的層次。所以等待子線程退出的概念在這里沒有意義。POSIX 線程標準不記錄任何“家族”信息。缺少家族信息有一個主要含意:如果要等待一個線程終止,就必須將線程的 tid 傳遞給 pthread_join()。線程庫無法為您斷定 tid。
對大多數開發者來說這不是個好消息,因為這會使有多個線程的程序復雜化。不過不要為此擔憂。POSIX 線程標準提供了有效地管理多個線程所需要的所有工具。實際上,沒有父/子關系這一事實卻為在程序中使用線程開辟了更創造性的方法。例如,如果有一個線程稱為線程 1,線程 1 創建了稱為線程 2 的線程,則線程 1 自己沒有必要調用 pthread_join() 來合并線程 2,程序中其它任一線程都可以做到。當編寫大量使用線程的代碼時,這就可能允許發生有趣的事情。例如,可以創建一個包含所有已停止線程的全局“死線程列表”,然后讓一個專門的清理線程專等停止的線程加到列表中。這個清理線程調用 pthread_join() 將剛停止的線程與自己合并?,F在,僅用一個線程就巧妙和有效地處理了全部清理。
同步漫游
現在我們來看一些代碼,這些代碼做了一些意想不到的事情。thread2.c 的代碼如下:
thread2.c
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
int myglobal;
?void *thread_function(void *arg) {
? int i,j;
? for ( i=0; i<20; i++) {
??? j=myglobal;
??? j=j+1;
??? printf(".");
??? fflush(stdout);
??? sleep(1);
??? myglobal=j;
? }
? return NULL;
}
int main(void) {
? pthread_t mythread;
? int i;
? if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
??? printf("error creating thread.");
??? abort();
? }
? for ( i=0; i<20; i++) {
??? myglobal=myglobal+1;
??? printf("o");
??? fflush(stdout);
??? sleep(1);
? }
? if ( pthread_join ( mythread, NULL ) ) {
??? printf("error joining thread.");
??? abort();
? }
? printf("\nmyglobal equals %d\n",myglobal);
? exit(0);
}
理解 thread2.c
如同第一個程序,這個程序創建一個新線程。主線程和新線程都將全局變量 myglobal 加一 20 次。但是程序本身產生了某些意想不到的結果。編譯代碼請輸入:
$ gcc thread2.c -o thread2 -lpthread
運行請輸入:
$ ./thread2
輸出:
$ ./thread2
..o.o.o.o.oo.o.o.o.o.o.o.o.o.o..o.o.o.o.o
myglobal equals 21
非常意外吧!因為 myglobal 從零開始,主線程和新線程各自對其進行了 20 次加一, 程序結束時 myglobal 值應當等于 40。由于 myglobal 輸出結果為 21,這其中肯定有問題。但是究竟是什么呢?
放棄嗎?好,讓我來解釋是怎么一回事。首先查看函數 thread_function()。注意如何將 myglobal 復制到局部變量 "j" 了嗎? 接著將 j 加一, 再睡眠一秒,然后到這時才將新的 j 值復制到 myglobal?這就是關鍵所在。設想一下,如果主線程就在新線程將 myglobal 值復制給 j 后 立即將 myglobal 加一,會發生什么?當 thread_function() 將 j 的值寫回 myglobal 時,就覆蓋了主線程所做的修改。
當編寫線程程序時,應避免產生這種無用的副作用,否則只會浪費時間(當然,除了編寫關于 POSIX 線程的文章時有用)。那么,如何才能排除這種問題呢?
由于是將 myglobal 復制給 j 并且等了一秒之后才寫回時產生問題,可以嘗試避免使用臨時局部變量并直接將 myglobal 加一。雖然這種解決方案對這個特定例子適用,但它還是不正確。如果我們對 myglobal 進行相對復雜的數學運算,而不是簡單的加一,這種方法就會失效。但是為什么呢?
要理解這個問題,必須記住線程是并發運行的。即使在單處理器系統上運行(內核利用時間分片模擬多任務)也是可以的,從程序員的角度,想像兩個線程是同時執行的。thread2.c 出現問題是因為 thread_function() 依賴以下論據:在 myglobal 加一之前的大約一秒鐘期間不會修改 myglobal。需要有些途徑讓一個線程在對 myglobal 做更改時通知其它線程“不要靠近”。我將在下一篇文章中講解如何做到這一點。到時候見。
通用線程:POSIX 線程詳解,第 2部分
稱作互斥對象的小玩意
?? ?
Daniel Robbins (drobbins@gentoo.org), 總裁/CEO, Gentoo Technologies, Inc.
2000 年 8 月 01 日
??? POSIX 線程是提高代碼響應和性能的有力手段。在此三部分系列文章的第二篇中,Daniel Robbins 將說明,如何使用被稱為互斥對象的靈巧小玩意,來保護線程代碼中共享數據結構的完整性。
互斥我吧!
在 前一篇文章中 ,談到了會導致異常結果的線程代碼。兩個線程分別對同一個全局變量進行了二十次加一。變量的值最后應該是 40,但最終值卻是 21。這是怎么回事呢?因為一個線程不停地“取消”了另一個線程執行的加一操作,所以產生這個問題?,F在讓我們來查看改正后的代碼,它使用 互斥對象(mutex)來解決該問題:
thread3.c
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
int myglobal;
pthread_mutex_t mymutex=PTHREAD_MUTEX_INITIALIZER;
?void *thread_function(void *arg) {
? int i,j;
? for ( i=0; i<20; i++) {
??? pthread_mutex_lock(&mymutex);
??? j=myglobal;
??? j=j+1;
??? printf(".");
??? fflush(stdout);
??? sleep(1);
??? myglobal=j;
??? pthread_mutex_unlock(&mymutex);
? }
? return NULL;
}
int main(void) {
? pthread_t mythread;
? int i;
? if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
??? printf("error creating thread.");
??? abort();
? }
? for ( i=0; i<20; i++) {
??? pthread_mutex_lock(&mymutex);
??? myglobal=myglobal+1;
??? pthread_mutex_unlock(&mymutex);
??? printf("o");
??? fflush(stdout);
??? sleep(1);
? }
? if ( pthread_join ( mythread, NULL ) ) {
??? printf("error joining thread.");
??? abort();
? }
? printf("\nmyglobal equals %d\n",myglobal);
? exit(0);
}
解讀一下
如果將這段代碼與 前一篇文章 中給出的版本作一個比較,就會注意到增加了 pthread_mutex_lock() 和 pthread_mutex_unlock() 函數調用。在線程程序中這些調用執行了不可或缺的功能。他們提供了一種 相互排斥的方法(互斥對象即由此得名)。兩個線程不能同時對同一個互斥對象加鎖。
互斥對象是這樣工作的。如果線程 a 試圖鎖定一個互斥對象,而此時線程 b 已鎖定了同一個互斥對象時,線程 a 就將進入睡眠狀態。一旦線程 b 釋放了互斥對象(通過 pthread_mutex_unlock() 調用),線程 a 就能夠鎖定這個互斥對象(換句話說,線程 a 就將從 pthread_mutex_lock() 函數調用中返回,同時互斥對象被鎖定)。同樣地,當線程 a 正鎖定互斥對象時,如果線程 c 試圖鎖定互斥對象的話,線程 c 也將臨時進入睡眠狀態。對已鎖定的互斥對象上調用 pthread_mutex_lock() 的所有線程都將進入睡眠狀態,這些睡眠的線程將“排隊”訪問這個互斥對象。
通常使用 pthread_mutex_lock() 和 pthread_mutex_unlock() 來保護數據結構。這就是說,通過線程的鎖定和解鎖,對于某一數據結構,確保某一時刻只能有一個線程能夠訪問它??梢酝茰y到,當線程試圖鎖定一個未加鎖的互斥對象時,POSIX 線程庫將同意鎖定,而不會使線程進入睡眠狀態。
請看這幅輕松的漫畫,四個小精靈重現了最近一次 pthread_mutex_lock() 調用的一個場面。
圖中,鎖定了互斥對象的線程能夠存取復雜的數據結構,而不必擔心同時會有其它線程干擾。那個數據結構實際上是“凍結”了,直到互斥對象被解鎖為止。pthread_mutex_lock() 和 pthread_mutex_unlock() 函數調用,如同“在施工中”標志一樣,將正在修改和讀取的某一特定共享數據包圍起來。這兩個函數調用的作用就是警告其它線程,要它們繼續睡眠并等待輪到它們對互斥對象加鎖。當然,除非在 每個 對特定數據結構進行讀寫操作的語句前后,都分別放上 pthread_mutex_lock() 和 pthread_mutext_unlock() 調用,才會出現這種情況。
為什么要用互斥對象?
聽上去很有趣,但究竟為什么要讓線程睡眠呢?要知道,線程的主要優點不就是其具有獨立工作、更多的時候是同時工作的能力嗎?是的,確實是這樣。然而,每個重要的線程程序都需要使用某些互斥對象。讓我們再看一下示例程序以便理解原因所在。
請看 thread_function(),循環中一開始就鎖定了互斥對象,最后才將它解鎖。在這個示例程序中,mymutex 用來保護 myglobal 的值。仔細查看 thread_function(),加一代碼把 myglobal 復制到一個局部變量,對局部變量加一,睡眠一秒鐘,在這之后才把局部變量的值傳回給 myglobal。不使用互斥對象時,即使主線程在 thread_function() 線程睡眠一秒鐘期間內對 myglobal 加一,thread_function() 蘇醒后也會覆蓋主線程所加的值。使用互斥對象能夠保證這種情形不會發生。(您也許會想到,我增加了一秒鐘延遲以觸發不正確的結果。把局部變量的值賦給 myglobal 之前,實際上沒有什么真正理由要求 thread_function() 睡眠一秒鐘。)使用互斥對象的新程序產生了期望的結果:
$ ./thread3
o..o..o.o..o..o.o.o.o.o..o..o..o.ooooooo
myglobal equals 40
為了進一步探索這個極為重要的概念,讓我們看一看程序中進行加一操作的代碼:
thread_function() 加一代碼:
??? j=myglobal;
??? j=j+1;
??? printf(".");
??? fflush(stdout);
??? sleep(1);
??? myglobal=j;
主線程加一代碼:
??? myglobal=myglobal+1;
如果代碼是位于單線程程序中,可以預期 thread_function() 代碼將完整執行。接下來才會執行主線程代碼(或者是以相反的順序執行)。在不使用互斥對象的線程程序中,代碼可能(幾乎是,由于調用了 sleep() 的緣故)以如下的順序執行:
??? thread_function() 線程??????? 主線程
??? j=myglobal;
??? j=j+1;
??? printf(".");
??? fflush(stdout);
??? sleep(1);???????????????????? myglobal=myglobal+1;
??? myglobal=j;
當代碼以此特定順序執行時,將覆蓋主線程對 myglobal 的修改。程序結束后,就將得到不正確的值。如果是在操縱指針的話,就可能產生段錯誤。注意到 thread_function() 線程按順序執行了它的所有指令。看來不象是 thread_function() 有什么次序顛倒。問題是,同一時間內,另一個線程對同一數據結構進行了另一個修改。
線程內幕 1
在解釋如何確定在何處使用互斥對象之前,先來深入了解一下線程的內部工作機制。請看第一個例子:
假設主線程將創建三個新線程:線程 a、線程 b 和線程 c。假定首先創建線程 a,然后是線程 b,最后創建線程 c。
??? pthread_create( &thread_a, NULL, thread_function, NULL);
??? pthread_create( &thread_b, NULL, thread_function, NULL);
??? pthread_create( &thread_c, NULL, thread_function, NULL);
在第一個 pthread_create() 調用完成后,可以假定線程 a 不是已存在就是已結束并停止。第二個 pthread_create() 調用后,主線程和線程 b 都可以假定線程 a 存在(或已停止)。
然而,就在第二個 create() 調用返回后,主線程無法假定是哪一個線程(a 或 b)會首先開始運行。雖然兩個線程都已存在,線程 CPU 時間片的分配取決于內核和線程庫。至于誰將首先運行,并沒有嚴格的規則。盡管線程 a 更有可能在線程 b 之前開始執行,但這并無保證。對于多處理器系統,情況更是如此。如果編寫的代碼假定在線程 b 開始執行之前實際上執行線程 a 的代碼,那么,程序最終正確運行的概率是 99%?;蛘吒愀?,程序在您的機器上 100% 地正確運行,而在您客戶的四處理器服務器上正確運行的概率卻是零。
從這個例子還可以得知,線程庫保留了每個單獨線程的代碼執行順序。換句話說,實際上那三個 pthread_create() 調用將按它們出現的順序執行。從主線程上來看,所有代碼都是依次執行的。有時,可以利用這一點來優化部分線程程序。例如,在上例中,線程 c 就可以假定線程 a 和線程 b 不是正在運行就是已經終止。它不必擔心存在還沒有創建線程 a 和線程 b 的可能性。可以使用這一邏輯來優化線程程序。
線程內幕 2
現在來看另一個假想的例子。假設有許多線程,他們都正在執行下列代碼:
??? myglobal=myglobal+1;
那么,是否需要在加一操作語句前后分別鎖定和解鎖互斥對象呢?也許有人會說“不”。編譯器極有可能把上述賦值語句編譯成一條機器指令。大家都知道,不可能"半途"中斷一條機器指令。即使是硬件中斷也不會破壞機器指令的完整性?;谝陨峡紤],很可能傾向于完全省略 pthread_mutex_lock() 和 pthread_mutex_unlock() 調用。不要這樣做。
我在說廢話嗎?不完全是這樣。首先,不應該假定上述賦值語句一定會被編譯成一條機器指令,除非親自驗證了機器代碼。即使插入某些內嵌匯編語句以確保加一操作的完整執行――甚至,即使是自己動手寫編譯器!-- 仍然可能有問題。
答案在這里。使用單條內嵌匯編操作碼在單處理器系統上可能不會有什么問題。每個加一操作都將完整地進行,并且多半會得到期望的結果。但是多處理器系統則截然不同。在多 CPU 機器上,兩個單獨的處理器可能會在幾乎同一時刻(或者,就在同一時刻)執行上述賦值語句。不要忘了,這時對內存的修改需要先從 L1 寫入 L2 高速緩存、然后才寫入主存。(SMP 機器并不只是增加了處理器而已;它還有用來仲裁對 RAM 存取的特殊硬件。)最終,根本無法搞清在寫入主存的競爭中,哪個 CPU 將會"勝出"。要產生可預測的代碼,應使用互斥對象。互斥對象將插入一道"內存關卡",由它來確保對主存的寫入按照線程鎖定互斥對象的順序進行。
考慮一種以 32 位塊為單位更新主存的 SMP 體系結構。如果未使用互斥對象就對一個 64 位整數進行加一操作,整數的最高 4 位字節可能來自一個 CPU,而其它 4 個字節卻來自另一 CPU。糟糕吧!最糟糕的是,使用差勁的技術,您的程序在重要客戶的系統上有可能不是很長時間才崩潰一次,就是早上三點鐘就崩潰。David R. Butenhof 在他的《POSIX 線程編程》(請參閱本文末尾的 參考資料部分)一書中,討論了由于未使用互斥對象而將產生的種種情況。
許多互斥對象
如果放置了過多的互斥對象,代碼就沒有什么并發性可言,運行起來也比單線程解決方案慢。如果放置了過少的互斥對象,代碼將出現奇怪和令人尷尬的錯誤。幸運的是,有一個中間立場。首先,互斥對象是用于串行化存取*共享數據*。不要對非共享數據使用互斥對象,并且,如果程序邏輯確保任何時候都只有一個線程能存取特定數據結構,那么也不要使用互斥對象。
其次,如果要使用共享數據,那么在讀、寫共享數據時都應使用互斥對象。用 pthread_mutex_lock() 和 pthread_mutex_unlock() 把讀寫部分保護起來,或者在程序中不固定的地方隨機使用它們。學會從一個線程的角度來審視代碼,并確保程序中每一個線程對內存的觀點都是一致和合適的。為了熟悉互斥對象的用法,最初可能要花好幾個小時來編寫代碼,但是很快就會習慣并且*也*不必多想就能夠正確使用它們。
使用調用:初始化
現在該來看看使用互斥對象的各種不同方法了。讓我們從初始化開始。在 thread3.c 示例 中,我們使用了靜態初始化方法。這需要聲明一個 pthread_mutex_t 變量,并賦給它常數 PTHREAD_MUTEX_INITIALIZER:
pthread_mutex_t mymutex=PTHREAD_MUTEX_INITIALIZER;
很簡單吧。但是還可以動態地創建互斥對象。當代碼使用 malloc() 分配一個新的互斥對象時,使用這種動態方法。此時,靜態初始化方法是行不通的,并且應當使用例程 pthread_mutex_init():
int pthread_mutex_init( pthread_mutex_t *mymutex, const pthread_mutexattr_t *attr)
正如所示,pthread_mutex_init 接受一個指針作為參數以初始化為互斥對象,該指針指向一塊已分配好的內存區。第二個參數,可以接受一個可選的 pthread_mutexattr_t 指針。這個結構可用來設置各種互斥對象屬性。但是通常并不需要這些屬性,所以正常做法是指定 NULL。
一旦使用 pthread_mutex_init() 初始化了互斥對象,就應使用 pthread_mutex_destroy() 消除它。pthread_mutex_destroy() 接受一個指向 pthread_mutext_t 的指針作為參數,并釋放創建互斥對象時分配給它的任何資源。請注意, pthread_mutex_destroy() 不會 釋放用來存儲 pthread_mutex_t 的內存。釋放自己的內存完全取決于您。還必須注意一點,pthread_mutex_init() 和 pthread_mutex_destroy() 成功時都返回零。
使用調用:鎖定
pthread_mutex_lock(pthread_mutex_t *mutex)
pthread_mutex_lock() 接受一個指向互斥對象的指針作為參數以將其鎖定。如果碰巧已經鎖定了互斥對象,調用者將進入睡眠狀態。函數返回時,將喚醒調用者(顯然)并且調用者還將保留該鎖。函數調用成功時返回零,失敗時返回非零的錯誤代碼。
pthread_mutex_unlock(pthread_mutex_t *mutex)
pthread_mutex_unlock() 與 pthread_mutex_lock() 相配合,它把線程已經加鎖的互斥對象解鎖。始終應該盡快對已加鎖的互斥對象進行解鎖(以提高性能)。并且絕對不要對您未保持鎖的互斥對象進行解鎖操作(否則,pthread_mutex_unlock() 調用將失敗并帶一個非零的 EPERM 返回值)。
pthread_mutex_trylock(pthread_mutex_t *mutex)
當線程正在做其它事情的時候(由于互斥對象當前是鎖定的),如果希望鎖定互斥對象,這個調用就相當方便。調用 pthread_mutex_trylock() 時將嘗試鎖定互斥對象。如果互斥對象當前處于解鎖狀態,那么您將獲得該鎖并且函數將返回零。然而,如果互斥對象已鎖定,這個調用也不會阻塞。當然,它會返回非零的 EBUSY 錯誤值。然后可以繼續做其它事情,稍后再嘗試鎖定。
等待條件發生
互斥對象是線程程序必需的工具,但它們并非萬能的。例如,如果線程正在等待共享數據內某個條件出現,那會發生什么呢?代碼可以反復對互斥對象鎖定和解鎖,以檢查值的任何變化。同時,還要快速將互斥對象解鎖,以便其它線程能夠進行任何必需的更改。這是一種非常可怕的方法,因為線程需要在合理的時間范圍內頻繁地循環檢測變化。
在每次檢查之間,可以讓調用線程短暫地進入睡眠,比如睡眠三秒鐘,但是因此線程代碼就無法最快作出響應。真正需要的是這樣一種方法,當線程在等待滿足某些條件時使線程進入睡眠狀態。一旦條件滿足,還需要一種方法以喚醒因等待滿足特定條件而睡眠的線程。如果能夠做到這一點,線程代碼將是非常高效的,并且不會占用寶貴的互斥對象鎖。這正是 POSIX 條件變量能做的事!
而 POSIX 條件變量將是我下一篇文章的主題,其中將說明如何正確使用條件變量。到那時,您將擁有了創建復雜線程程序所需的全部資源,那些線程程序可以模擬工作人員、裝配線等等。既然您已經越來越熟悉線程,我將在下一篇文章中加快進度。這樣,在下一篇文章的結尾就能放上一個相對復雜的線程程序。說到等到條件產生,下次再見!
通用線程:POSIX 線程詳解,第 3 部分
使用條件變量提高效率
?? ?
Daniel Robbins, 總裁兼 CEO, Gentoo Technologies, Inc.
2000 年 9 月 01 日
??? 本文是 POSIX 線程三部曲系列的最后一部分,Daniel 將詳細討論如何使用條件變量。條件變量是 POSIX 線程結構,可以讓您在遇到某些條件時“喚醒”線程。可以將它們看作是一種線程安全的信號發送。Daniel 使用目前您所學到的知識實現了一個多線程工作組應用程序,本文將圍繞著這一示例而進行討論。
條件變量詳解
在 上一篇文章結束時,我描述了一個比較特殊的難題:如果線程正在等待某個特定條件發生,它應該如何處理這種情況?它可以重復對互斥對象鎖定和解鎖,每次都會檢查共享數據結構,以查找某個值。但這是在浪費時間和資源,而且這種繁忙查詢的效率非常低。解決這個問題的最佳方法是使用 pthread_cond_wait() 調用來等待特殊條件發生。
了解 pthread_cond_wait() 的作用非常重要 -- 它是 POSIX 線程信號發送系統的核心,也是最難以理解的部分。
首先,讓我們考慮以下情況:線程為查看已鏈接列表而鎖定了互斥對象,然而該列表恰巧是空的。這一特定線程什么也干不了 -- 其設計意圖是從列表中除去節點,但是現在卻沒有節點。因此,它只能:
鎖定互斥對象時,線程將調用 pthread_cond_wait(&mycond,&mymutex)。pthread_cond_wait() 調用相當復雜,因此我們每次只執行它的一個操作。
pthread_cond_wait() 所做的第一件事就是同時對互斥對象解鎖(于是其它線程可以修改已鏈接列表),并等待條件 mycond 發生(這樣當 pthread_cond_wait() 接收到另一個線程的“信號”時,它將蘇醒)。現在互斥對象已被解鎖,其它線程可以訪問和修改已鏈接列表,可能還會添加項。
此時,pthread_cond_wait() 調用還未返回。對互斥對象解鎖會立即發生,但等待條件 mycond 通常是一個阻塞操作,這意味著線程將睡眠,在它蘇醒之前不會消耗 CPU 周期。這正是我們期待發生的情況。線程將一直睡眠,直到特定條件發生,在這期間不會發生任何浪費 CPU 時間的繁忙查詢。從線程的角度來看,它只是在等待 pthread_cond_wait() 調用返回。
現在繼續說明,假設另一個線程(稱作“2 號線程”)鎖定了 mymutex 并對已鏈接列表添加了一項。在對互斥對象解鎖之后,2 號線程會立即調用函數 pthread_cond_broadcast(&mycond)。此操作之后,2 號線程將使所有等待 mycond 條件變量的線程立即蘇醒。這意味著第一個線程(仍處于 pthread_cond_wait() 調用中)現在將蘇醒。
現在,看一下第一個線程發生了什么。您可能會認為在 2 號線程調用 pthread_cond_broadcast(&mymutex) 之后,1 號線程的 pthread_cond_wait() 會立即返回。不是那樣!實際上,pthread_cond_wait() 將執行最后一個操作:重新鎖定 mymutex。一旦 pthread_cond_wait() 鎖定了互斥對象,那么它將返回并允許 1 號線程繼續執行。那時,它可以馬上檢查列表,查看它所感興趣的更改。
停止并回顧!
那個過程非常復雜,因此讓我們先來回顧一下。第一個線程首先調用:
??? pthread_mutex_lock(&mymutex);
然后,它檢查了列表。沒有找到感興趣的東西,于是它調用:
??? pthread_cond_wait(&mycond, &mymutex);
然后,pthread_cond_wait() 調用在返回前執行許多操作:
????? ?
??? pthread_mutex_unlock(&mymutex);
它對 mymutex 解鎖,然后進入睡眠狀態,等待 mycond 以接收 POSIX 線程“信號”。一旦接收到“信號”(加引號是因為我們并不是在討論傳統的 UNIX 信號,而是來自 pthread_cond_signal() 或 pthread_cond_broadcast() 調用的信號),它就會蘇醒。但 pthread_cond_wait() 沒有立即返回 -- 它還要做一件事:重新鎖定 mutex:
??? ?
??? pthread_mutex_lock(&mymutex);
pthread_cond_wait() 知道我們在查找 mymutex “背后”的變化,因此它繼續操作,為我們鎖定互斥對象,然后才返回。
pthread_cond_wait() 小測驗
現在已回顧了 pthread_cond_wait() 調用,您應該了解了它的工作方式。應該能夠敘述 pthread_cond_wait() 依次執行的所有操作。嘗試一下。如果理解了 pthread_cond_wait(),其余部分就相當容易,因此請重新閱讀以上部分,直到記住為止。好,讀完之后,能否告訴我在調用 pthread_cond_wait() 之 前,互斥對象必須處于什么狀態?pthread_cond_wait() 調用返回之后,互斥對象處于什么狀態?這兩個問題的答案都是“鎖定”。既然已經完全理解了 pthread_cond_wait() 調用,現在來繼續研究更簡單的東西 -- 初始化和真正的發送信號和廣播進程。到那時,我們將會對包含了多線程工作隊列的 C 代碼了如指掌。
初始化和清除
條件變量是一個需要初始化的真實數據結構。以下就初始化的方法。首先,定義或分配一個條件變量,如下所示:
??? pthread_cond_t mycond;
然后,調用以下函數進行初始化:
??? pthread_cond_init(&mycond,NULL);
瞧,初始化完成了!在釋放或廢棄條件變量之前,需要毀壞它,如下所示:
??? pthread_cond_destroy(&mycond);
很簡單吧。接著討論 pthread_cond_wait() 調用。
等待
一旦初始化了互斥對象和條件變量,就可以等待某個條件,如下所示:
??? pthread_cond_wait(&mycond, &mymutex);
請注意,代碼在邏輯上應該包含 mycond 和 mymutex。一個特定條件只能有一個互斥對象,而且條件變量應該表示互斥數據“內部”的一種特殊的條件更改。一個互斥對象可以用許多條件變量(例如, cond_empty、cond_full、cond_cleanup),但每個條件變量只能有一個互斥對象。
發送信號和廣播
對于發送信號和廣播,需要注意一點。如果線程更改某些共享數據,而且它想要喚醒所有正在等待的線程,則應使用 pthread_cond_broadcast 調用,如下所示:
??? pthread_cond_broadcast(&mycond);
在某些情況下,活動線程只需要喚醒第一個正在睡眠的線程。假設您只對隊列添加了一個工作作業。那么只需要喚醒一個工作程序線程(再喚醒其它線程是不禮貌的!):
??? pthread_cond_signal(&mycond);
此函數只喚醒一個線程。如果 POSIX 線程標準允許指定一個整數,可以讓您喚醒一定數量的正在睡眠的線程,那就更完美了。但是很可惜,我沒有被邀請參加會議。
工作組
我將演示如何創建多線程工作組。在這個方案中,我們創建了許多工作程序線程。每個線程都會檢查 wq(“工作隊列”),查看是否有需要完成的工作。如果有需要完成的工作,那么線程將從隊列中除去一個節點,執行這些特定工作,然后等待新的工作到達。
與此同時,主線程負責創建這些工作程序線程、將工作添加到隊列,然后在它退出時收集所有工作程序線程。您將會遇到許多 C 代碼,好好準備吧!
隊列
需要隊列是出于兩個原因。首先,需要隊列來保存工作作業。還需要可用于跟蹤已終止線程的數據結構。還記得前幾篇文章(請參閱本文結尾處的 參考資料)中,我曾提到過需要使用帶有特定進程標識的 pthread_join 嗎?使用“清除隊列”(稱作 "cq")可以解決無法等待 任何已終止線程的問題(稍后將詳細討論這個問題)。以下是標準隊列代碼。將此代碼保存到文件 queue.h 和 queue.c:
queue.h
/* queue.h
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
*/
typedef struct node {
? struct node *next;
} node;
typedef struct queue {
? node *head, *tail;
} queue;
void queue_init(queue *myroot);
void queue_put(queue *myroot, node *mynode);
node *queue_get(queue *myroot);
queue.c
/* queue.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** This set of queue functions was originally thread-aware.? I
** redesigned the code to make this set of queue routines
** thread-ignorant (just a generic, boring yet very fast set of queue
** routines).? Why the change?? Because it makes more sense to have
** the thread support as an optional add-on.? Consider a situation
** where you want to add 5 nodes to the queue.? With the
** thread-enabled version, each call to queue_put() would
** automatically lock and unlock the queue mutex 5 times -- that's a
** lot of unnecessary overhead.? However, by moving the thread stuff
** out of the queue routines, the caller can lock the mutex once at
** the beginning, then insert 5 items, and then unlock at the end.
** Moving the lock/unlock code out of the queue functions allows for
** optimizations that aren't possible otherwise.? It also makes this
** code useful for non-threaded applications.
**
** We can easily thread-enable this data structure by using the
** data_control type defined in control.c and control.h.? */
#include <stdio.h>
#include "queue.h"
void queue_init(queue *myroot) {
? myroot->head=NULL;
? myroot->tail=NULL;
}
void queue_put(queue *myroot,node *mynode) {
? mynode->next=NULL;
? if (myroot->tail!=NULL)
??? myroot->tail->next=mynode;
? myroot->tail=mynode;
? if (myroot->:head==NULL)
??? myroot->head=mynode;
}
node *queue_get(queue *myroot) {
? //get from root
? node *mynode;
? mynode=myroot->head;
? if (myroot->head!=NULL)
??? myroot->head=myroot->head->next;
? return mynode;
}
data_control 代碼
我編寫的并不是線程安全的隊列例程,事實上我創建了一個“數據包裝”或“控制”結構,它可以是任何線程支持的數據結構??匆幌?control.h:
control.h
#include
typedef struct data_control {
? pthread_mutex_t mutex;
? pthread_cond_t cond;
? int active;
} data_control;
現在您看到了 data_control 結構定義,以下是它的視覺表示:
所使用的 data_control 結構
圖像中的鎖代表互斥對象,它允許對數據結構進行互斥訪問。黃色的星代表條件變量,它可以睡眠,直到所討論的數據結構改變為止。on/off 開關表示整數 "active",它告訴線程此數據是否是活動的。在代碼中,我使用整數 active 作為標志,告訴工作隊列何時應該關閉。以下是 control.c:
control.c
/* control.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** These routines provide an easy way to make any type of
** data-structure thread-aware.? Simply associate a data_control
** structure with the data structure (by creating a new struct, for
** example).? Then, simply lock and unlock the mutex, or
** wait/signal/broadcast on the condition variable in the data_control
** structure as needed.
**
** data_control structs contain an int called "active".? This int is
** intended to be used for a specific kind of multithreaded design,
** where each thread checks the state of "active" every time it locks
** the mutex.? If active is 0, the thread knows that instead of doing
** its normal routine, it should stop itself.? If active is 1, it
** should continue as normal.? So, by setting active to 0, a
** controlling thread can easily inform a thread work crew to shut
** down instead of processing new jobs.? Use the control_activate()
** and control_deactivate() functions, which will also broadcast on
** the data_control struct's condition variable, so that all threads
** stuck in pthread_cond_wait() will wake up, have an opportunity to
** notice the change, and then terminate.
*/
#include "control.h"
int control_init(data_control *mycontrol) {
? int mystatus;
? if (pthread_mutex_init(&(mycontrol->mutex),NULL))
??? return 1;
? if (pthread_cond_init(&(mycontrol->cond),NULL))
??? return 1;
? mycontrol->active=0;
? return 0;
}
int control_destroy(data_control *mycontrol) {
? int mystatus;
? if (pthread_cond_destroy(&(mycontrol->cond)))
??? return 1;
? if (pthread_cond_destroy(&(mycontrol->cond)))
??? return 1;
? mycontrol->active=0;
? return 0;
}
int control_activate(data_control *mycontrol) {
? int mystatus;
? if (pthread_mutex_lock(&(mycontrol->mutex)))
??? return 0;
? mycontrol->active=1;
? pthread_mutex_unlock(&(mycontrol->mutex));
? pthread_cond_broadcast(&(mycontrol->cond));
? return 1;
}
int control_deactivate(data_control *mycontrol) {
? int mystatus;
? if (pthread_mutex_lock(&(mycontrol->mutex)))
??? return 0;
? mycontrol->active=0;
? pthread_mutex_unlock(&(mycontrol->mutex));
? pthread_cond_broadcast(&(mycontrol->cond));
? return 1;
}
調試時間
在開始調試之前,還需要一個文件。以下是 dbug.h:
dbug.h
#define dabort() \
?{? printf("Aborting at line %d in source file %s\n",__LINE__,__FILE__); abort(); }
此代碼用于處理工作組代碼中的不可糾正錯誤。
工作組代碼
說到工作組代碼,以下就是:
workcrew.c
#include <stdio.h>
#include <stdlib.h>
#include "control.h"
#include "queue.h"
#include "dbug.h"
/* the work_queue holds tasks for the various threads to complete. */
struct work_queue {
? data_control control;
? queue work;
} wq;
/* I added a job number to the work node.? Normally, the work node
?? would contain additional data that needed to be processed. */
typedef struct work_node {
? struct node *next;
? int jobnum;
} wnode;
/* the cleanup queue holds stopped threads.? Before a thread
?? terminates, it adds itself to this list.? Since the main thread is
?? waiting for changes in this list, it will then wake up and clean up
?? the newly terminated thread. */
struct cleanup_queue {
? data_control control;
? queue cleanup;
} cq;
/* I added a thread number (for debugging/instructional purposes) and
?? a thread id to the cleanup node.? The cleanup node gets passed to
?? the new thread on startup, and just before the thread stops, it
?? attaches the cleanup node to the cleanup queue.? The main thread
?? monitors the cleanup queue and is the one that performs the
?? necessary cleanup. */
typedef struct cleanup_node {
? struct node *next;
? int threadnum;
? pthread_t tid;
} cnode;
void *threadfunc(void *myarg) {
? wnode *mywork;
? cnode *mynode;
? mynode=(cnode *) myarg;
? pthread_mutex_lock(&wq.control.mutex);
? while (wq.control.active) {
??? while (wq.work.head==NULL && wq.control.active) {
????? pthread_cond_wait(&wq.control.cond, &wq.control.mutex);
??? }
??? if (!wq.control.active)
????? break;
??? //we got something!
??? mywork=(wnode *) queue_get(&wq.work);
??? pthread_mutex_unlock(&wq.control.mutex);
??? //perform processing...
??? printf("Thread number %d processing job %d\n",mynode->threadnum,mywork->jobnum);
??? free(mywork);
??? pthread_mutex_lock(&wq.control.mutex);
? }
? pthread_mutex_unlock(&wq.control.mutex);
? pthread_mutex_lock(&cq.control.mutex);
? queue_put(&cq.cleanup,(node *) mynode);
? pthread_mutex_unlock(&cq.control.mutex);
? pthread_cond_signal(&cq.control.cond);
? printf("thread %d shutting down...\n",mynode->threadnum);
? return NULL;
?
}
#define NUM_WORKERS 4
int numthreads;
void join_threads(void) {
? cnode *curnode;
? printf("joining threads...\n");
? while (numthreads) {
??? pthread_mutex_lock(&cq.control.mutex);
??? /* below, we sleep until there really is a new cleanup node.? This
?????? takes care of any false wakeups... even if we break out of
?????? pthread_cond_wait(), we don't make any assumptions that the
?????? condition we were waiting for is true.? */
??? while (cq.cleanup.head==NULL) {
????? pthread_cond_wait(&cq.control.cond,&cq.control.mutex);
??? }
??? /* at this point, we hold the mutex and there is an item in the
?????? list that we need to process.? First, we remove the node from
?????? the queue.? Then, we call pthread_join() on the tid stored in
?????? the node.? When pthread_join() returns, we have cleaned up
?????? after a thread.? Only then do we free() the node, decrement the
?????? number of additional threads we need to wait for and repeat the
?????? entire process, if necessary */
????? curnode = (cnode *) queue_get(&cq.cleanup);
????? pthread_mutex_unlock(&cq.control.mutex);
????? pthread_join(curnode->tid,NULL);
????? printf("joined with thread %d\n",curnode->threadnum);
????? free(curnode);
????? numthreads--;
? }
}
int create_threads(void) {
? int x;
? cnode *curnode;
? for (x=0; x<NUM_WORKERS; x++) {
??? curnode=malloc(sizeof(cnode));
??? if (!curnode)
????? return 1;
??? curnode->threadnum=x;
??? if (pthread_create(&curnode->tid, NULL, threadfunc, (void *) curnode))
????? return 1;
??? printf("created thread %d\n",x);
??? numthreads++;
? }
? return 0;
}
void initialize_structs(void) {
? numthreads=0;
? if (control_init(&wq.control))
??? dabort();
? queue_init(&wq.work);
? if (control_init(&cq.control)) {
??? control_destroy(&wq.control);
??? dabort();
? }
? queue_init(&wq.work);
? control_activate(&wq.control);
}
void cleanup_structs(void) {
? control_destroy(&cq.control);
? control_destroy(&wq.control);
}
int main(void) {
? int x;
? wnode *mywork;
? initialize_structs();
? /* CREATION */
?
? if (create_threads()) {
??? printf("Error starting threads... cleaning up.\n");
??? join_threads();
??? dabort();
? }
? pthread_mutex_lock(&wq.control.mutex);
? for (x=0; x<16000; x++) {
??? mywork=malloc(sizeof(wnode));
??? if (!mywork) {
????? printf("ouch! can't malloc!\n");
????? break;
??? }
??? mywork->jobnum=x;
??? queue_put(&wq.work,(node *) mywork);
? }
? pthread_mutex_unlock(&wq.control.mutex);
? pthread_cond_broadcast(&wq.control.cond);
? printf("sleeping...\n");
? sleep(2);
? printf("deactivating work queue...\n");
? control_deactivate(&wq.control);
? /* CLEANUP? */
? join_threads();
? cleanup_structs();
}
代碼初排
現在來快速初排代碼。定義的第一個結構稱作 "wq",它包含了 data_control 和隊列頭。data_control 結構用于仲裁對整個隊列的訪問,包括隊列中的節點。下一步工作是定義實際的工作節點。要使代碼符合本文中的示例,此處所包含的都是作業號。
接著,創建清除隊列。注釋說明了它的工作方式。好,現在讓我們跳過 threadfunc()、join_threads()、create_threads() 和 initialize_structs() 調用,直接跳到 main()。所做的第一件事就是初始化結構 -- 這包括初始化 data_controls 和隊列,以及激活工作隊列。
有關清除的注意事項
現在初始化線程。如果看一下 create_threads() 調用,似乎一切正常 -- 除了一件事。請注意,我們正在分配清除節點,以及初始化它的線程號和 TID 組件。我們還將清除節點作為初始自變量傳遞給每一個新的工作程序線程。為什么這樣做?
因為當某個工作程序線程退出時,它會將其清除節點連接到清除隊列,然后終止。那時,主線程會在清除隊列中檢測到這個節點(利用條件變量),并將這個節點移出隊列。因為 TID(線程標識)存儲在清除節點中,所以主線程可以確切知道哪個線程已終止了。然后,主線程將調用 pthread_join(tid),并聯接適當的工作程序線程。如果沒有做記錄,那么主線程就需要按任意順序聯接工作程序線程,可能是按它們的創建順序。由于線程不一定按此順序終止,那么主線程可能會在已經聯接了十個線程時,等待聯接另一個線程。您能理解這種設計決策是如何使關閉代碼加速的嗎(尤其在使用幾百個工作程序線程的情況下)?
創建工作
我們已啟動了工作程序線程(它們已經完成了執行 threadfunc(),稍后將討論此函數),現在主線程開始將工作節點插入工作隊列。首先,它鎖定 wq 的控制互斥對象,然后分配 16000 個工作包,將它們逐個插入隊列。完成之后,將調用 pthread_cond_broadcast(),于是所有正在睡眠的線程會被喚醒,并開始執行工作。此時,主線程將睡眠兩秒鐘,然后釋放工作隊列,并通知工作程序線程終止活動。接著,主線程會調用 join_threads() 函數來清除所有工作程序線程。
threadfunc()
現在來討論 threadfunc(),這是所有工作程序線程都要執行的代碼。當工作程序線程啟動時,它會立即鎖定工作隊列互斥對象,獲取一個工作節點(如果有的話),然后對它進行處理。如果沒有工作,則調用 pthread_cond_wait()。您會注意到這個調用在一個非常緊湊的 while() 循環中,這是非常重要的。當從 pthread_cond_wait() 調用中蘇醒時,決不能認為條件肯定發生了 -- 它 可能發生了,也可能沒有發生。如果發生了這種情況,即錯誤地喚醒了線程,而列表是空的,那么 while 循環將再次調用 pthread_cond_wait()。
如果有一個工作節點,那么我們只打印它的作業號,釋放它并退出。然而,實際代碼會執行一些更實質性的操作。在 while() 循環結尾,我們鎖定了互斥對象,以便檢查 active 變量,以及在循環頂部檢查新的工作節點。如果執行完此代碼,就會發現如果 wq.control.active 是 0,while 循環就會終止,并會執行 threadfunc() 結尾處的清除代碼。
工作程序線程的清除代碼部件非常有趣。首先,由于 pthread_cond_wait() 返回了鎖定的互斥對象,它會對 work_queue 解鎖。然后,它鎖定清除隊列,添加清除代碼(包含了 TID,主線程將使用此 TID 來調用 pthread_join()),然后再對清除隊列解鎖。此后,它發信號給所有 cq 等待者 (pthread_cond_signal(&cq.control.cond)),于是主線程就知道有一個待處理的新節點。我們不使用 pthread_cond_broadcast(),因為沒有這個必要 -- 只有一個線程(主線程)在等待清除隊列中的新節點。當它調用 join_threads() 時,工作程序線程將打印關閉消息,然后終止,等待主線程發出的 pthread_join() 調用。
join_threads()
如果要查看關于如何使用條件變量的簡單示例,請參考 join_threads() 函數。如果還有工作程序線程,join_threads() 會一直執行,等待清除隊列中新的清除節點。如果有新節點,我們會將此節點移出隊列、對清除隊列解鎖(從而使工作程序可以添加清除節點)、聯接新的工作程序線程(使用存儲在清除節點中的 TID)、釋放清除節點、減少“現有”線程的數量,然后繼續。
結束語
現在已經到了“POSIX 線程詳解”系列的尾聲,希望您已經準備好開始將多線程代碼添加到您自己的應用程序中。有關詳細信息,請參閱 參考資料部分,這部分內容還包含了本文中使用的所有源碼的 tar 文件。下一個系列中再見!
參考資料
??? * 您可以參閱本文在 developerWorks 全球站點上的 英文原文.
??? * 參閱 Linux threads中的文檔,Sean Walton, KB7rfa
??? * 在 An Introduction to Pthreads-Tcl 中,查看對 Tcl 的更改以使其能夠使用 POSIX 線程
??? * 使用友好的 Linux pthread 在線幫助 ("man -k pthread")
??? * 參考 POSIX and DCE threads for Linux主頁
??? * 查看 The LinuxThreads Library
??? * Proolix ,一種簡單遵從 POSIX 標準的操作系統,用于 i8086+,一直在開發中
??? * 閱讀 David R. Butenhof 的著作 Programming with POSIX Threads,書中討論了許多問題,其中談到不使用互斥對象是可能出現的種種情況
??? * 查閱 W. Richard Stevens 的著作 UNIX Network Programming: Network APIs: Sockets and XTI, Volume 1
關于作者
?? ??? ?
Daniel Robbins 居住在新墨西哥州的 Albuquerque。他是 Gentoo Technologies, Inc. 的總裁兼 CEO, Gentoo 項目的總設計師,多本 MacMillan 出版書籍的作者,包括: Caldera OpenLinux Unleashed、 SuSE Linux Unleashed和 Samba Unleashed 。Daniel 自小學二年級起就與計算機結下不解之緣,那時他首先接觸的是 Logo 程序語言,并沉溺于 Pac-Man 游戲中。這也許就是他至今仍擔任 SONY Electronic Publishing/Psygnosis 的首席圖形設計師的原因所在。Daniel 喜歡與妻子 Mary 和剛出生的女兒 Hadassah 一起共渡時光。可通過 drobbins@gentoo.org 與 Daniel Robbins 取得聯系。