作者:
dozb
標準C++線程即將到來。CUJ預言它將衍生自Boost線程庫,現在就由Bill帶領我們探索一下Boost線程庫。
就在幾年前,用多線程執行程序還是一件非比尋常的事。然而今天互聯網應用服務程序普遍使用多線程來提高與多客戶鏈接時的效率;為了達到最大的吞吐量,事務服務器在單獨的線程上運行服務程序;GUI應用程序將那些費時,復雜的處理以線程的形式單獨運行,以此來保證用戶界面能夠及時響應用戶的操作。這樣使用多線程的例子還有很多。
但是C++標準并沒有涉及到多線程,這讓程序員們開始懷疑是否可能寫出多線程的C++程序。盡管不可能寫出符合標準的多線程程序,但是程序員們還是會使用支持多線程的操作系統提供的多線程庫來寫出多線程C++程序。但是這樣做至少有兩個問題:這些庫大部分都是用C語言完成的,如果在C++程序中要使用這些庫就必須十分小心;還有,每一個操作系統都有自己的一套支持多線程的類庫。因此,這樣寫出來得代碼是沒有標準可循的,也不是到處都適用的(non-portable)。Boost線程庫就是為了解決所有這些問題而設計的。
Boost是由C++標準委員會類庫工作組成員發起,致力于為C++開發新的類庫的組織?,F在它已經有近2000名成員。許多庫都可以在Boost源碼的發布版本中找到。為了使這些類庫是線程安全的(thread-safe),Boost線程庫被創建了。
許多C++專家都投身于Boost線程庫的開發中。所有接口的設計都是從0開始的,并不是C線程API的簡單封裝。許多C++特性(比如構造函數和析構函數,函數對象(function object)和模板)都被使用在其中以使接口更加靈活。現在的版本可以在POSIX,Win32和Macintosh Carbon平臺下工作。
就像std::fstream類就代表一個文件一樣,boost::thread類就代表一個可執行的線程。缺省構造函數創建一個代表當前執行線程的實例。一個重載的構造函數以一個不需任何參數的函數對象作為參數,并且沒有返回值。這個構造函數創建一個新的可執行線程,它調用了那個函數對象。
起先,大家認為傳統C創建線程的方法似乎比這樣的設計更有用,因為C創建線程的時候會傳入一個void*指針,通過這種方法就可以傳入數據。然而,由于Boost線程庫是使用函數對象來代替函數指針,那么函數對象本身就可以攜帶線程所需的數據。這種方法更具靈活性,也是類型安全(type-safe)的。當和Boost.Bind這樣的功能庫一起使用時,這樣的方法就可以讓你傳遞任意數量的數據給新建的線程。
目前,由Boost線程庫創建的線程對象功能還不是很強大。事實上它只能做兩項操作。線程對象可以方便使用==和!=進行比較來確定它們是否是代表同一個線程;你還可以調用boost::thread::join來等待線程執行完畢。其他一些線程庫可以讓你對線程做一些其他操作(比如設置優先級,甚至是取消線程)。然而,由于要在普遍適用(portable)的接口中加入這些操作不是簡單的事,目前仍在討論如何將這些操組加入到Boost線程庫中。
Listing1展示了boost::thread類的一個最簡單的用法。 新建的線程只是簡單的在std::out上打印“hello,world”,main函數在它執行完畢之后結束。
例1:
#include <boost/thread/thread.hpp>
#include <iostream>
void hello()
{
std::cout <<
"Hello world, I'm a thread!"
<< std::endl;
}
int main(int argc, char* argv[])
{
boost::thread thrd(&hello);
thrd.join();
return 0;
}
任何寫過多線程程序的人都知道避免不同線程同時訪問共享區域的重要性。如果一個線程要改變共享區域中某個數據,而與此同時另一線程正在讀這個數據,那么結果將是未定義的。為了避免這種情況的發生就要使用一些特殊的原始類型和操作。其中最基本的就是互斥體(mutex,mutual exclusion的縮寫)。一個互斥體一次只允許一個線程訪問共享區。當一個線程想要訪問共享區時,首先要做的就是鎖?。╨ock)互斥體。如果其他的線程已經鎖住了互斥體,那么就必須先等那個線程將互斥體解鎖,這樣就保證了同一時刻只有一個線程能訪問共享區域。
互斥體的概念有不少變種。Boost線程庫支持兩大類互斥體,包括簡單互斥體(simple mutex)和遞歸互斥體(recursive mutex)。如果同一個線程對互斥體上了兩次鎖,就會發生死鎖(deadlock),也就是說所有的等待解鎖的線程將一直等下去。有了遞歸互斥體,單個線程就可以對互斥體多次上鎖,當然也必須解鎖同樣次數來保證其他線程可以對這個互斥體上鎖。
在這兩大類互斥體中,對于線程如何上鎖還有多個變種。一個線程可以有三種方法來對一個互斥體加鎖:
- 一直等到沒有其他線程對互斥體加鎖。
- 如果有其他互斥體已經對互斥體加鎖就立即返回。
- 一直等到沒有其他線程互斥體加鎖,直到超時。
似乎最佳的互斥體類型是遞歸互斥體,它可以使用所有三種上鎖形式。然而每一個變種都是有代價的。所以Boost線程庫允許你根據不同的需要使用最有效率的互斥體類型。Boost線程庫提供了6中互斥體類型,下面是按照效率進行排序:
boost::mutex,
boost::try_mutex,
boost::timed_mutex,
boost::recursive_mutex,
boost::recursive_try_mutex,
boost::recursive_timed_mutex
如果互斥體上鎖之后沒有解鎖就會發生死鎖。這是一個很普遍的錯誤,Boost線程庫就是要將其變成不可能(至少時很困難)。直接對互斥體上鎖和解鎖對于Boost線程庫的用戶來說是不可能的。mutex類通過teypdef定義在RAII中實現的類型來實現互斥體的上鎖和解鎖。這也就是大家知道的Scope Lock模式。為了構造這些類型,要傳入一個互斥體的引用。構造函數對互斥體加鎖,析構函數對互斥體解鎖。C++保證了析構函數一定會被調用,所以即使是有異常拋出,互斥體也總是會被正確的解鎖。
這種方法保證正確的使用互斥體。然而,有一點必須注意:盡管Scope Lock模式可以保證互斥體被解鎖,但是它并沒有保證在異常拋出之后貢獻資源仍是可用的。所以就像執行單線程程序一樣,必須保證異常不會導致程序狀態異常。另外,這個已經上鎖的對象不能傳遞給另一個線程,因為它們維護的狀態并沒有禁止這樣做。
List2給出了一個使用boost::mutex的最簡單的例子。例子中共創建了兩個新的線程,每個線程都有10次循環,在std::cout上打印出線程id和當前循環的次數,而main函數等待這兩個線程執行完才結束。std::cout就是共享資源,所以每一個線程都使用一個全局互斥體來保證同時只有一個線程能向它寫入。
許多讀者可能已經注意到List2中傳遞數據給線程還必須的手工寫一個函數。盡管這個例子很簡單,如果每一次都要寫這樣的代碼實在是讓人厭煩的事。別急,有一種簡單的解決辦法。函數庫允許你通過將另一個函數綁定,并傳入調用時需要的數據來創建一個新的函數。 List3向你展示了如何使用Boost.Bind庫來簡化List2中的代碼,這樣就不必手工寫這些函數對象了。
例2:
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>
boost::mutex io_mutex;
struct count
{
count(int id) : id(id) { }
void operator()()
{
for (int i = 0; i < 10; ++i)
{
boost::mutex::scoped_lock
lock(io_mutex);
std::cout << id << ": "
<< i << std::endl;
}
}
int id;
};
int main(int argc, char* argv[])
{
boost::thread thrd1(count(1));
boost::thread thrd2(count(2));
thrd1.join();
thrd2.join();
return 0;
}
例3: // 這個例子和例2一樣,除了使用Boost.Bind來簡化創建線程攜帶數據,避免使用函數對象
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>
boost::mutex io_mutex;
void count(int id)
{
for (int i = 0; i < 10; ++i)
{
boost::mutex::scoped_lock
lock(io_mutex);
std::cout << id << ": " <<
i << std::endl;
}
}
int main(int argc, char* argv[])
{
boost::thread thrd1(
boost::bind(&count, 1));
boost::thread thrd2(
boost::bind(&count, 2));
thrd1.join();
thrd2.join();
return 0;
}
有的時候僅僅依靠鎖住共享資源來使用它是不夠的。有時候共享資源只有某些狀態的時候才能夠使用。比方說,某個線程如果要從堆棧中讀取數據,那么如果棧中沒有數據就必須等待數據被壓棧。這種情況下的同步使用互斥體是不夠的。另一種同步的方式--條件變量,就可以使用在這種情況下。
條件變量的使用總是和互斥體及共享資源聯系在一起的。線程首先鎖住互斥體,然后檢驗共享資源的狀態是否處于可使用的狀態。如果不是,那么線程就要等待條件變量。要指向這樣的操作就必須在等待的時候將互斥體解鎖,以便其他線程可以訪問共享資源并改變其狀態。它還得保證從等到得線程返回時互斥體是被上鎖得。當另一個線程改變了共享資源的狀態時,它就要通知正在等待條件變量得線程,并將之返回等待的線程。
List4是一個使用了boost::condition的簡單例子。有一個實現了有界緩存區的類和一個固定大小的先進先出的容器。由于使用了互斥體boost::mutex,這個緩存區是線程安全的。put和get使用條件變量來保證線程等待完成操作所必須的狀態。有兩個線程被創建,一個在buffer中放入100個整數,另一個將它們從buffer中取出。這個有界的緩存一次只能存放10個整數,所以這兩個線程必須周期性的等待另一個線程。為了驗證這一點,put和get在std::cout中輸出診斷語句。最后,當兩個線程結束后,main函數也就執行完畢了。
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <iostream>
const int BUF_SIZE = 10;
const int ITERS = 100;
boost::mutex io_mutex;
class buffer
{
public:
typedef boost::mutex::scoped_lock
scoped_lock;
buffer()
: p(0), c(0), full(0)
{
}
void put(int m)
{
scoped_lock lock(mutex);
if (full == BUF_SIZE)
{
{
boost::mutex::scoped_lock
lock(io_mutex);
std::cout <<
"Buffer is full. Waiting..."
<< std::endl;
}
while (full == BUF_SIZE)
cond.wait(lock);
}
buf[p] = m;
p = (p+1) % BUF_SIZE;
++full;
cond.notify_one();
}
int get()
{
scoped_lock lk(mutex);
if (full == 0)
{
{
boost::mutex::scoped_lock
lock(io_mutex);
std::cout <<
"Buffer is empty. Waiting..."
<< std::endl;
}
while (full == 0)
cond.wait(lk);
}
int i = buf[c];
c = (c+1) % BUF_SIZE;
--full;
cond.notify_one();
return i;
}
private:
boost::mutex mutex;
boost::condition cond;
unsigned int p, c, full;
int buf[BUF_SIZE];
};
buffer buf;
void writer()
{
for (int n = 0; n < ITERS; ++n)
{
{
boost::mutex::scoped_lock
lock(io_mutex);
std::cout << "sending: "
<< n << std::endl;
}
buf.put(n);
}
}
void reader()
{
for (int x = 0; x < ITERS; ++x)
{
int n = buf.get();
{
boost::mutex::scoped_lock
lock(io_mutex);
std::cout << "received: "
<< n << std::endl;
}
}
}
int main(int argc, char* argv[])
{
boost::thread thrd1(&reader);
boost::thread thrd2(&writer);
thrd1.join();
thrd2.join();
return 0;
}
大多數函數都不是可重入的。這也就是說在某一個線程已經調用了一個函數時,如果你再調用同一個函數,那么這樣是不安全的。一個不可重入的函數通過連續的調用來保存靜態變量或者是返回一個指向靜態數據的指針。 舉例來說,std::strtok就是不可重入的,因為它使用靜態變量來保存要被分割成符號的字符串。
有兩種方法可以讓不可重用的函數變成可重用的函數。第一種方法就是改變接口,用指針或引用代替原先使用靜態數據的地方。比方說,POSIX定義了strok_r,std::strtok中的一個可重入的變量,它用一個額外的char**參數來代替靜態數據。這種方法很簡單,而且提供了可能的最佳效果。但是這樣必須改變公共接口,也就意味著必須改代碼。另一種方法不用改變公有接口,而是用本地存儲線程(thread local storage)來代替靜態數據(有時也被成為特殊線程存儲,thread-specific storage)。
Boost線程庫提供了智能指針boost::thread_specific_ptr來訪問本地存儲線程。每一個線程第一次使用這個智能指針的實例時,它的初值是NULL,所以必須要先檢查這個它的只是否為空,并且為它賦值。Boost線程庫保證本地存儲線程中保存的數據會在線程結束后被清除。
List5是一個使用boost::thread_specific_ptr的簡單例子。其中創建了兩個線程來初始化本地存儲線程,并有10次循環,每一次都會增加智能指針指向的值,并將其輸出到std::cout上(由于std::cout是一個共享資源,所以通過互斥體進行同步)。main線程等待這兩個線程結束后就退出。從這個例子輸出可以明白的看出每個線程都處理屬于自己的數據實例,盡管它們都是使用同一個boost::thread_specific_ptr。
例5:
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/tss.hpp>
#include <iostream>
boost::mutex io_mutex;
boost::thread_specific_ptr<int> ptr;
struct count
{
count(int id) : id(id) { }
void operator()()
{
if (ptr.get() == 0)
ptr.reset(new int(0));
for (int i = 0; i < 10; ++i)
{
(*ptr)++;
boost::mutex::scoped_lock
lock(io_mutex);
std::cout << id << ": "
<< *ptr << std::endl;
}
}
int id;
};
int main(int argc, char* argv[])
{
boost::thread thrd1(count(1));
boost::thread thrd2(count(2));
thrd1.join();
thrd2.join();
return 0;
}
還有一個問題沒有解決:如何使得初始化工作(比如說構造函數)也是線程安全的。比方說,如果一個引用程序要產生唯一的全局的對象,由于實例化順序的問題,某個函數會被調用來返回一個靜態的對象,它必須保證第一次被調用時就產生這個靜態的對象。這里的問題就是如果多個線程同時調用了這個函數,那么這個靜態對象的構造函數就會被調用多次,這樣錯誤產生了。
解決這個問題的方法就是所謂的“一次實現”(once routine)。“一次實現”在一個應用程序只能執行一次。如果多個線程想同時執行這個操作,那么真正執行的只有一個,而其他線程必須等這個操作結束。為了保證它只被執行一次,這個routine由另一個函數間接的調用,而這個函數傳給它一個指針以及一個標志著這個routine是否已經被調用的特殊標志。這個標志是以靜態的方式初始化的,這也就保證了它在編譯期間就被初始化而不是運行時。因此也就沒有多個線程同時將它初始化的問題了。Boost線程庫提供了boost::call_once來支持“一次實現”,并且定義了一個標志boost::once_flag及一個初始化這個標志的宏BOOST_ONCE_INIT。
List6是一個使用了boost::call_once的例子。其中定義了一個靜態的全局整數,初始值為0;還有一個由BOOST_ONCE_INIT初始化的靜態boost::once_flag實例。main函數創建了兩個線程,它們都想通過傳入一個函數調用boost::call_once來初始化這個全局的整數,這個函數是將它加1。main函數等待著兩個線程結束,并將最后的結果輸出的到std::cout。由最后的結果可以看出這個操作確實只被執行了一次,因為它的值是1。
#include <boost/thread/thread.hpp>
#include <boost/thread/once.hpp>
#include <iostream>
int i = 0;
boost::once_flag flag =
BOOST_ONCE_INIT;
void init()
{
++i;
}
void thread()
{
boost::call_once(&init, flag);
}
int main(int argc, char* argv[])
{
boost::thread thrd1(&thread);
boost::thread thrd2(&thread);
thrd1.join();
thrd2.join();
std::cout << i << std::endl;
return 0;
}
Boost線程庫正在計劃加入一些新特性。其中包括boost::read_write_mutex,它可以讓多個線程同時從共享區中讀取數據,但是一次只可能有一個線程向共享區寫入數據;boost::thread_barrier,它使得一組線程處于等待狀態,知道所有得線程都都進入了屏障區;boost::thread_pool,他允許執行一些小的routine而不必每一都要創建或是銷毀一個線程。
Boost線程庫已經作為標準中的類庫技術報告中的附件提交給C++標準委員會,它的出現也為下一版C++標準吹響了第一聲號角。委員會成員對Boost線程庫的初稿給予了很高的評價,當然他們還會考慮其他的多線程庫。他們對在C++標準中加入對多線程的支持非常感興趣。從這一點上也可以看出,多線程在C++中的前途一片光明。
- The Boost.Threads Library by Bill Kempf
- Visit the Boost website at <http://www.boost.org>.
更多查看: