threadpool是基于boost庫(kù)實(shí)現(xiàn)的一個(gè)線(xiàn)程池子庫(kù),但線(xiàn)程池實(shí)現(xiàn)起來(lái)不是很復(fù)雜。我們從threadpool中又能學(xué)到什么東西呢?
它是基于boost庫(kù)實(shí)現(xiàn)的,如果大家對(duì)boost庫(kù)有興趣,看看一個(gè)簡(jiǎn)單的實(shí)現(xiàn)還是可以學(xué)到點(diǎn)東西的。
threadpool基本功能
1、任務(wù)封裝,包括普通任務(wù)(task_func)和優(yōu)先級(jí)任務(wù)(prio_task_func)。
2、調(diào)度策略,包括fifo_scheduler(先進(jìn)先出)、lifo_scheduler(后進(jìn)先出)、prio_scheduler(優(yōu)先級(jí))。
3、結(jié)束策略,包括wait_for_all_tasks(全部任務(wù)等待)、wait_for_active_tasks(激活任務(wù)等待)、immediately(立即結(jié)束)。
4、動(dòng)態(tài)修改線(xiàn)程池個(gè)數(shù)功能。
5、基于future封裝的異步返回值獲取功能。
在sorceforge上有一個(gè)用boost編寫(xiě)的線(xiàn)程池。該線(xiàn)程池和boost結(jié)合的比較好,并且提供了多種任務(wù)執(zhí)行策略,使用也非常簡(jiǎn)單。 下載地址: http://threadpool.sourceforge.net/ 這個(gè)線(xiàn)程池不需要編譯,只要在項(xiàng)目中包含其頭文件就可以了。
一、源代碼分析
quickstart分析(/threadpool/libs/threadpool/quickstart)
這個(gè)例子的代碼很簡(jiǎn)單,但已經(jīng)全部展示了線(xiàn)程池的核心內(nèi)容,包括建立、調(diào)度、同步等操作。
view plaincopy to clipboardprint?
// Create fifo thread pool container with two threads.
pool tp(2);
// Add some tasks to the pool.
tp.schedule(&first_task);
tp.schedule(&second_task);
// Wait until all tasks are finished.
tp.wait();
// Create fifo thread pool container with two threads.
pool tp(2);
// Add some tasks to the pool.
tp.schedule(&first_task);
tp.schedule(&second_task);
// Wait until all tasks are finished.
tp.wait();
pool的定義具體見(jiàn)pool.hpp,但使用了pimpl模式,核心代碼見(jiàn)pool_core.hpp文件。
下面是pool的定義
typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;
typedef fifo_pool pool;
從上面可以知道,pool實(shí)際就是fifo_pool,從模板參數(shù)可以看到,使用了fifo_scheduler和wait_for_all_tasks。
對(duì)于線(xiàn)程池有點(diǎn)理解的都知道,一般都是那幾樣?xùn)|西,線(xiàn)程的封裝,條件變量,隊(duì)列數(shù)據(jù)結(jié)構(gòu)。
所以簡(jiǎn)單的能做的很簡(jiǎn)單,復(fù)雜的的就看你的策略需求了。
對(duì)基于boost庫(kù)的threadpool子庫(kù)來(lái)說(shuō),上面的三樣?xùn)|西都是現(xiàn)成的,線(xiàn)程封裝和條件變量直接使用thread子庫(kù)就行,隊(duì)列使用stl的標(biāo)準(zhǔn)容器。
task_adaptors.hpp
對(duì)線(xiàn)程任務(wù)的封裝,所謂task,我們可以理解成需要運(yùn)行的函數(shù)。
threadpool最大限度的使用了function和bind功能來(lái)封裝函數(shù),這點(diǎn)和thread子庫(kù)類(lèi)似。
文件中涉及的內(nèi)容主要有三個(gè):task_func、prio_task_func和looped_task_func。
對(duì)普通task的封裝
typedef function0<void> task_func;
如果對(duì)bind和function熟悉的應(yīng)該很好理解。
對(duì)優(yōu)先級(jí)任務(wù)的封裝
class prio_task_func
這個(gè)類(lèi)很簡(jiǎn)單,重載了兩個(gè)方法,
operator()是仿函數(shù)的用法,
operator<是用于優(yōu)先級(jí)比較使用的,用于stl容器的元素比較。
size_policies.hpp
對(duì)size的封裝,包括empty_controller、resize_controller和static_size。
shutdown_policies.hpp
對(duì)線(xiàn)程池結(jié)束的策略封裝,包括wait_for_all_tasks、wait_for_active_tasks和immediately。
這幾個(gè)類(lèi)很簡(jiǎn)單,具體操作封裝在pool中。
線(xiàn)程池運(yùn)行過(guò)程中,包括隊(duì)列中等待的task,線(xiàn)程正在運(yùn)行的task。
所以結(jié)束的時(shí)候,對(duì)這些task的策略操作是有選擇的。
scheduling_policies.hpp
對(duì)任務(wù)調(diào)度測(cè)試的封裝,包括fifo_scheduler、lifo_scheduler和prio_scheduler。
實(shí)際上,這三個(gè)類(lèi)的相似程度很高,大家可能更喜歡用繼承和虛函數(shù)實(shí)現(xiàn)。
前面說(shuō)到保存task的隊(duì)列數(shù)據(jù)結(jié)構(gòu),在這里就看的很清楚了。
fifo和lifo使用的是std::deque,prio使用的是std::priority_queue,其他部分代碼沒(méi)什么好說(shuō)的了。
pool_adaptors.hpp
對(duì)全局schedule函數(shù)的幾種封裝。
future.hpp
好像thread子庫(kù)也有future,但不清楚是否是一樣的內(nèi)容。
threadpool的future是為了封裝異步函數(shù)調(diào)用返回值實(shí)現(xiàn)的。
簡(jiǎn)單點(diǎn)理解,就是schedule任務(wù)的時(shí)候,把一個(gè)指針在兩者間綁定起來(lái),后面就可以通過(guò)future來(lái)獲取返回值了。
當(dāng)然,獲取返回值的過(guò)程應(yīng)該是阻塞的,任務(wù)未完成時(shí)只能wait。
locking_ptr.hpp
LockingPtr的簡(jiǎn)單封裝,具體可google《volatile - Multithreaded Programmer's Best Friend》。
threadpool大量使用了volatile關(guān)鍵字,所以需要LockingPtr保護(hù)。
scope_guard.hpp
對(duì)函數(shù)對(duì)象的封裝,利用C++析構(gòu)函數(shù)時(shí)調(diào)用一個(gè)在構(gòu)造函數(shù)時(shí)綁定的函數(shù)對(duì)象。
worker_thread.hpp
對(duì)工作線(xiàn)程的封裝,這個(gè)封裝不是指底層線(xiàn)程api封裝,因?yàn)檫@部分是由boost的thread子庫(kù)提供的。
封裝針對(duì)的是循環(huán)執(zhí)行task的邏輯函數(shù)(線(xiàn)程跑起來(lái)就loop run某個(gè)函數(shù),從隊(duì)列中獲取task執(zhí)行,空閑時(shí)等待。)
我們重點(diǎn)看的是run和create_and_attach。
這兩個(gè)函數(shù)連起來(lái)看,就很清楚了,create_and_attach通過(guò)bind方式生成一個(gè)thread執(zhí)行run方法。
run方法中的這條語(yǔ)句就是一個(gè)簡(jiǎn)單的loop操作,
while(m_pool->execute_task()) {}
所以,當(dāng)execute_task返回值為false時(shí),run函數(shù)就結(jié)束了,bind該函數(shù)的thread也就結(jié)束了。
ok,來(lái)到這里,有必要簡(jiǎn)單的把整個(gè)調(diào)用過(guò)程說(shuō)明一下。
// Create fifo thread pool container with two threads.
pool tp(2);
該操作會(huì)調(diào)用pool的構(gòu)造函數(shù)
view plaincopy to clipboardprint?
thread_pool(size_t initial_threads = 0)
: m_core(new pool_core_type)
, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))
{
size_policy_type::init(*m_core, initial_threads);
}
thread_pool(size_t initial_threads = 0)
: m_core(new pool_core_type)
, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))
{
size_policy_type::init(*m_core, initial_threads);
}
由于pimpl模式,所以所有代碼都封裝在m_core內(nèi)實(shí)現(xiàn)的。
pool默認(rèn)的線(xiàn)程個(gè)數(shù)為0,通過(guò)size_policy_type::init來(lái)初始化。
而size_policy_type是一個(gè)模板參數(shù),pool對(duì)應(yīng)的是fifo,所以也就是static_size類(lèi)型了。
//static_size類(lèi)的init函數(shù)
view plaincopy to clipboardprint?
static void init(Pool& pool, size_t const worker_count)
{
pool.resize(worker_count);
}
static void init(Pool& pool, size_t const worker_count)
{
pool.resize(worker_count);
}
//pool_core的resize函數(shù)
這個(gè)函數(shù)有點(diǎn)長(zhǎng),主要是做動(dòng)態(tài)配置線(xiàn)程個(gè)數(shù)的邏輯操作,create_and_attach也是在這里調(diào)用的。
view plaincopy to clipboardprint?
//worker_thread的create_and_attach函數(shù)
static void create_and_attach(shared_ptr<pool_type> const & pool)
{
shared_ptr<worker_thread> worker(new worker_thread(pool));
if(worker)
{
//run是線(xiàn)程的loop函數(shù)
worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));
}
}
//worker_thread的create_and_attach函數(shù)
static void create_and_attach(shared_ptr<pool_type> const & pool)
{
shared_ptr<worker_thread> worker(new worker_thread(pool));
if(worker)
{
//run是線(xiàn)程的loop函數(shù)
worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));
}
}
view plaincopy to clipboardprint?
//worker_thread的run函數(shù)
void run()
{
scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));
while(m_pool->execute_task()) {} //loop直到返回值為false
notify_exception.disable();
m_pool->worker_destructed(this->shared_from_this());
}
//worker_thread的run函數(shù)
void run()
{
scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));
while(m_pool->execute_task()) {} //loop直到返回值為false
notify_exception.disable();
m_pool->worker_destructed(this->shared_from_this());
}
//pool_core的execute_task函數(shù)
這個(gè)函數(shù)有點(diǎn)長(zhǎng),簡(jiǎn)單點(diǎn)說(shuō),就是從隊(duì)列中獲取task然后執(zhí)行,如果隊(duì)列為空,則線(xiàn)程需要wait操作。
由于threadpool支持動(dòng)態(tài)resize線(xiàn)程個(gè)數(shù),從該函數(shù)我們也是可以看出來(lái)是如何做到的。
view plaincopy to clipboardprint?
// decrease number of threads if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
// decrease number of threads if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
pool內(nèi)部使用了多個(gè)整數(shù)來(lái)記錄現(xiàn)在個(gè)數(shù),譬如m_worker_count和m_target_worker_count。
m_worker_count是當(dāng)前激活運(yùn)行中的線(xiàn)程個(gè)數(shù)。
m_target_worker_count是最新動(dòng)態(tài)配置的線(xiàn)程個(gè)數(shù)。
當(dāng)個(gè)數(shù)不匹配時(shí),通過(guò)返回false方式結(jié)束線(xiàn)程。
// Add some tasks to the pool.
tp.schedule(&first_task);
view plaincopy to clipboardprint?
//thread_pool的schedule函數(shù)
bool schedule(task_type const & task)
{
return m_core->schedule(task);
}
//pool_core的schedule函數(shù)(和execute_task函數(shù)強(qiáng)相關(guān))
bool schedule(task_type const & task) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(lockedThis->m_scheduler.push(task))
{
//task成功入隊(duì)列后,notify_one一個(gè)線(xiàn)程。
lockedThis->m_task_or_terminate_workers_event.notify_one();
return true;
}
else
{
return false;
}
}
//thread_pool的schedule函數(shù)
bool schedule(task_type const & task)
{
return m_core->schedule(task);
}
//pool_core的schedule函數(shù)(和execute_task函數(shù)強(qiáng)相關(guān))
bool schedule(task_type const & task) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(lockedThis->m_scheduler.push(task))
{
//task成功入隊(duì)列后,notify_one一個(gè)線(xiàn)程。
lockedThis->m_task_or_terminate_workers_event.notify_one();
return true;
}
else
{
return false;
}
}
// Wait until all tasks are finished.
tp.wait();
//pool_core的wait函數(shù)
void wait(size_t const task_threshold = 0) const volatile
bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
wait函數(shù)是一個(gè)阻塞操作,內(nèi)部邏輯實(shí)現(xiàn)使用了一個(gè)條件變量,提供超時(shí)等待方式。
二、boost線(xiàn)程池使用實(shí)例
線(xiàn)程池可以減少創(chuàng)建和切換線(xiàn)程的額外開(kāi)銷(xiāo),利用已經(jīng)存在的線(xiàn)程多次循環(huán)執(zhí)行多個(gè)任務(wù)從而提高系統(tǒng)的處理能力,有關(guān)線(xiàn)程池的概念可google搜索,下面將其使用實(shí)例:
#include <iostream>
#include <sstream>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/threadpool.hpp>
using namespace std;
using namespace boost::threadpool;
//
// Helpers
boost::mutex m_io_monitor;
void print(string text)
{
boost::mutex::scoped_lock lock(m_io_monitor);//每個(gè)線(xiàn)程使用全局互斥來(lái)保證每次只有一個(gè)線(xiàn)程執(zhí)行
cout << text;
}
template<typename T>
string to_string(T const & value)
{
ostringstream ost;
ost << value;
ost.flush();
return ost.str();
}
//
// An example task functions
void task_1()
{
print(" task_1()/n");
}
void task_2()
{
print(" task_2()/n");
}
void task_3()
{
print(" task_3()/n");
}
int task_4()
{
print(" task_4()/n");
return 4;
}
void task_with_parameter(int value)
{
print(" task_with_parameter(" + to_string(value) + ")/n");
}
int loops = 0;
bool looped_task()
{
print(" looped_task()/n");
return ++loops < 5;
}
int task_int()
{
print(" task_int()/n");
return 23;
}
void fifo_pool_test()
{
pool tp;
tp.schedule(&task_1);
tp.schedule(boost::bind(task_with_parameter, 4));
if(!tp.empty())
{
tp.clear(); // remove all tasks -> no output in this test
}
size_t active_threads = tp.active();
size_t pending_threads = tp.pending();
size_t total_threads = tp.size();
size_t dummy = active_threads + pending_threads + total_threads;
dummy++;
tp.size_controller().resize(5);
tp.wait();
}
void lifo_pool_test()
{
lifo_pool tp;
tp.size_controller().resize(0);
schedule(tp, &task_1);
tp.size_controller().resize(10);
tp.wait();
}
void prio_pool_test()
{
prio_pool tp(2);
schedule(tp, prio_task_func(1, &task_1));
schedule(tp, prio_task_func(10,&task_2));
}
void future_test()
{
fifo_pool tp(5);
future<int> fut = schedule(tp, &task_4);
int res = fut();
}
int main (int , char * const [])
{
fifo_pool_test();
lifo_pool_test();
prio_pool_test();
future_test();
return 0;
}
任務(wù)返回值的獲取:
一般異步調(diào)用中,返回值的獲取有同步獲取和異步獲取兩種形式。
同步獲取返回值:
int task_int_23()
{
cout<<"task_int_23()/n";
return 23;
}
future<int> res = schedule(tp, &task_int_23);
res.wait();
cout<<"get res value:"<<res.get()<<endl;
異步獲取返回值:
不知道是設(shè)計(jì)者就不打算使用異步回調(diào)獲取返回值還是我看的不夠仔細(xì),異步獲取返回值的方式還真沒(méi)有找著,只好自己簡(jiǎn)單的寫(xiě)了一個(gè)回調(diào)的仿函數(shù)來(lái)實(shí)現(xiàn)異步返回值的獲取。
//R為任務(wù)函數(shù)的返回值類(lèi)型
template<class R>
class callback_task
{
typedef boost::function<void (R)> callback;
typedef boost::function<R ()> function;
private:
callback c_;
function f_;
public:
//F: 任務(wù)執(zhí)行函數(shù) C:結(jié)果回調(diào)函數(shù)
template<class F,class C>
callback_task(F f,C c)
{
f_ = f;
c_ = c;
}
void operator()()
{
c_(f_());
}
};
通過(guò)這個(gè)對(duì)象可以很容易的實(shí)現(xiàn)異步結(jié)果的回調(diào)。
//task_int_23的結(jié)果回調(diào)函數(shù)
void callback(int k)
{
cout<<"get callback value:"<<k<<endl;
}
//通過(guò)回調(diào)的形式獲取任務(wù)的返回值
tp.schedule(callback_task<int>(&task_int_23,&callback));