threadpool是基于boost庫實現的一個線程池子庫,但線程池實現起來不是很復雜。我們從threadpool中又能學到什么東西呢?
它是基于boost庫實現的,如果大家對boost庫有興趣,看看一個簡單的實現還是可以學到點東西的。
threadpool基本功能
1、任務封裝,包括普通任務(task_func)和優先級任務(prio_task_func)。
2、調度策略,包括fifo_scheduler(先進先出)、lifo_scheduler(后進先出)、prio_scheduler(優先級)。
3、結束策略,包括wait_for_all_tasks(全部任務等待)、wait_for_active_tasks(激活任務等待)、immediately(立即結束)。
4、動態修改線程池個數功能。
5、基于future封裝的異步返回值獲取功能。
在sorceforge上有一個用boost編寫的線程池。該線程池和boost結合的比較好,并且提供了多種任務執行策略,使用也非常簡單。 下載地址: http://threadpool.sourceforge.net/ 這個線程池不需要編譯,只要在項目中包含其頭文件就可以了。
一、源代碼分析
quickstart分析(/threadpool/libs/threadpool/quickstart)
這個例子的代碼很簡單,但已經全部展示了線程池的核心內容,包括建立、調度、同步等操作。
view plaincopy to clipboardprint?
// Create fifo thread pool container with two threads.
pool tp(2);
// Add some tasks to the pool.
tp.schedule(&first_task);
tp.schedule(&second_task);
// Wait until all tasks are finished.
tp.wait();
// Create fifo thread pool container with two threads.
pool tp(2);
// Add some tasks to the pool.
tp.schedule(&first_task);
tp.schedule(&second_task);
// Wait until all tasks are finished.
tp.wait();
pool的定義具體見pool.hpp,但使用了pimpl模式,核心代碼見pool_core.hpp文件。
下面是pool的定義
typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;
typedef fifo_pool pool;
從上面可以知道,pool實際就是fifo_pool,從模板參數可以看到,使用了fifo_scheduler和wait_for_all_tasks。
對于線程池有點理解的都知道,一般都是那幾樣東西,線程的封裝,條件變量,隊列數據結構。
所以簡單的能做的很簡單,復雜的的就看你的策略需求了。
對基于boost庫的threadpool子庫來說,上面的三樣東西都是現成的,線程封裝和條件變量直接使用thread子庫就行,隊列使用stl的標準容器。
task_adaptors.hpp
對線程任務的封裝,所謂task,我們可以理解成需要運行的函數。
threadpool最大限度的使用了function和bind功能來封裝函數,這點和thread子庫類似。
文件中涉及的內容主要有三個:task_func、prio_task_func和looped_task_func。
對普通task的封裝
typedef function0<void> task_func;
如果對bind和function熟悉的應該很好理解。
對優先級任務的封裝
class prio_task_func
這個類很簡單,重載了兩個方法,
operator()是仿函數的用法,
operator<是用于優先級比較使用的,用于stl容器的元素比較。
size_policies.hpp
對size的封裝,包括empty_controller、resize_controller和static_size。
shutdown_policies.hpp
對線程池結束的策略封裝,包括wait_for_all_tasks、wait_for_active_tasks和immediately。
這幾個類很簡單,具體操作封裝在pool中。
線程池運行過程中,包括隊列中等待的task,線程正在運行的task。
所以結束的時候,對這些task的策略操作是有選擇的。
scheduling_policies.hpp
對任務調度測試的封裝,包括fifo_scheduler、lifo_scheduler和prio_scheduler。
實際上,這三個類的相似程度很高,大家可能更喜歡用繼承和虛函數實現。
前面說到保存task的隊列數據結構,在這里就看的很清楚了。
fifo和lifo使用的是std::deque,prio使用的是std::priority_queue,其他部分代碼沒什么好說的了。
pool_adaptors.hpp
對全局schedule函數的幾種封裝。
future.hpp
好像thread子庫也有future,但不清楚是否是一樣的內容。
threadpool的future是為了封裝異步函數調用返回值實現的。
簡單點理解,就是schedule任務的時候,把一個指針在兩者間綁定起來,后面就可以通過future來獲取返回值了。
當然,獲取返回值的過程應該是阻塞的,任務未完成時只能wait。
locking_ptr.hpp
LockingPtr的簡單封裝,具體可google《volatile - Multithreaded Programmer's Best Friend》。
threadpool大量使用了volatile關鍵字,所以需要LockingPtr保護。
scope_guard.hpp
對函數對象的封裝,利用C++析構函數時調用一個在構造函數時綁定的函數對象。
worker_thread.hpp
對工作線程的封裝,這個封裝不是指底層線程api封裝,因為這部分是由boost的thread子庫提供的。
封裝針對的是循環執行task的邏輯函數(線程跑起來就loop run某個函數,從隊列中獲取task執行,空閑時等待。)
我們重點看的是run和create_and_attach。
這兩個函數連起來看,就很清楚了,create_and_attach通過bind方式生成一個thread執行run方法。
run方法中的這條語句就是一個簡單的loop操作,
while(m_pool->execute_task()) {}
所以,當execute_task返回值為false時,run函數就結束了,bind該函數的thread也就結束了。
ok,來到這里,有必要簡單的把整個調用過程說明一下。
// Create fifo thread pool container with two threads.
pool tp(2);
該操作會調用pool的構造函數
view plaincopy to clipboardprint?
thread_pool(size_t initial_threads = 0)
: m_core(new pool_core_type)
, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))
{
size_policy_type::init(*m_core, initial_threads);
}
thread_pool(size_t initial_threads = 0)
: m_core(new pool_core_type)
, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))
{
size_policy_type::init(*m_core, initial_threads);
}
由于pimpl模式,所以所有代碼都封裝在m_core內實現的。
pool默認的線程個數為0,通過size_policy_type::init來初始化。
而size_policy_type是一個模板參數,pool對應的是fifo,所以也就是static_size類型了。
//static_size類的init函數
view plaincopy to clipboardprint?
static void init(Pool& pool, size_t const worker_count)
{
pool.resize(worker_count);
}
static void init(Pool& pool, size_t const worker_count)
{
pool.resize(worker_count);
}
//pool_core的resize函數
這個函數有點長,主要是做動態配置線程個數的邏輯操作,create_and_attach也是在這里調用的。
view plaincopy to clipboardprint?
//worker_thread的create_and_attach函數
static void create_and_attach(shared_ptr<pool_type> const & pool)
{
shared_ptr<worker_thread> worker(new worker_thread(pool));
if(worker)
{
//run是線程的loop函數
worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));
}
}
//worker_thread的create_and_attach函數
static void create_and_attach(shared_ptr<pool_type> const & pool)
{
shared_ptr<worker_thread> worker(new worker_thread(pool));
if(worker)
{
//run是線程的loop函數
worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));
}
}
view plaincopy to clipboardprint?
//worker_thread的run函數
void run()
{
scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));
while(m_pool->execute_task()) {} //loop直到返回值為false
notify_exception.disable();
m_pool->worker_destructed(this->shared_from_this());
}
//worker_thread的run函數
void run()
{
scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));
while(m_pool->execute_task()) {} //loop直到返回值為false
notify_exception.disable();
m_pool->worker_destructed(this->shared_from_this());
}
//pool_core的execute_task函數
這個函數有點長,簡單點說,就是從隊列中獲取task然后執行,如果隊列為空,則線程需要wait操作。
由于threadpool支持動態resize線程個數,從該函數我們也是可以看出來是如何做到的。
view plaincopy to clipboardprint?
// decrease number of threads if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
// decrease number of threads if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
pool內部使用了多個整數來記錄現在個數,譬如m_worker_count和m_target_worker_count。
m_worker_count是當前激活運行中的線程個數。
m_target_worker_count是最新動態配置的線程個數。
當個數不匹配時,通過返回false方式結束線程。
// Add some tasks to the pool.
tp.schedule(&first_task);
view plaincopy to clipboardprint?
//thread_pool的schedule函數
bool schedule(task_type const & task)
{
return m_core->schedule(task);
}
//pool_core的schedule函數(和execute_task函數強相關)
bool schedule(task_type const & task) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(lockedThis->m_scheduler.push(task))
{
//task成功入隊列后,notify_one一個線程。
lockedThis->m_task_or_terminate_workers_event.notify_one();
return true;
}
else
{
return false;
}
}
//thread_pool的schedule函數
bool schedule(task_type const & task)
{
return m_core->schedule(task);
}
//pool_core的schedule函數(和execute_task函數強相關)
bool schedule(task_type const & task) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(lockedThis->m_scheduler.push(task))
{
//task成功入隊列后,notify_one一個線程。
lockedThis->m_task_or_terminate_workers_event.notify_one();
return true;
}
else
{
return false;
}
}
// Wait until all tasks are finished.
tp.wait();
//pool_core的wait函數
void wait(size_t const task_threshold = 0) const volatile
bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
wait函數是一個阻塞操作,內部邏輯實現使用了一個條件變量,提供超時等待方式。
二、boost線程池使用實例
線程池可以減少創建和切換線程的額外開銷,利用已經存在的線程多次循環執行多個任務從而提高系統的處理能力,有關線程池的概念可google搜索,下面將其使用實例:
#include <iostream>
#include <sstream>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/threadpool.hpp>
using namespace std;
using namespace boost::threadpool;
//
// Helpers
boost::mutex m_io_monitor;
void print(string text)
{
boost::mutex::scoped_lock lock(m_io_monitor);//每個線程使用全局互斥來保證每次只有一個線程執行
cout << text;
}
template<typename T>
string to_string(T const & value)
{
ostringstream ost;
ost << value;
ost.flush();
return ost.str();
}
//
// An example task functions
void task_1()
{
print(" task_1()/n");
}
void task_2()
{
print(" task_2()/n");
}
void task_3()
{
print(" task_3()/n");
}
int task_4()
{
print(" task_4()/n");
return 4;
}
void task_with_parameter(int value)
{
print(" task_with_parameter(" + to_string(value) + ")/n");
}
int loops = 0;
bool looped_task()
{
print(" looped_task()/n");
return ++loops < 5;
}
int task_int()
{
print(" task_int()/n");
return 23;
}
void fifo_pool_test()
{
pool tp;
tp.schedule(&task_1);
tp.schedule(boost::bind(task_with_parameter, 4));
if(!tp.empty())
{
tp.clear(); // remove all tasks -> no output in this test
}
size_t active_threads = tp.active();
size_t pending_threads = tp.pending();
size_t total_threads = tp.size();
size_t dummy = active_threads + pending_threads + total_threads;
dummy++;
tp.size_controller().resize(5);
tp.wait();
}
void lifo_pool_test()
{
lifo_pool tp;
tp.size_controller().resize(0);
schedule(tp, &task_1);
tp.size_controller().resize(10);
tp.wait();
}
void prio_pool_test()
{
prio_pool tp(2);
schedule(tp, prio_task_func(1, &task_1));
schedule(tp, prio_task_func(10,&task_2));
}
void future_test()
{
fifo_pool tp(5);
future<int> fut = schedule(tp, &task_4);
int res = fut();
}
int main (int , char * const [])
{
fifo_pool_test();
lifo_pool_test();
prio_pool_test();
future_test();
return 0;
}
任務返回值的獲取:
一般異步調用中,返回值的獲取有同步獲取和異步獲取兩種形式。
同步獲取返回值:
int task_int_23()
{
cout<<"task_int_23()/n";
return 23;
}
future<int> res = schedule(tp, &task_int_23);
res.wait();
cout<<"get res value:"<<res.get()<<endl;
異步獲取返回值:
不知道是設計者就不打算使用異步回調獲取返回值還是我看的不夠仔細,異步獲取返回值的方式還真沒有找著,只好自己簡單的寫了一個回調的仿函數來實現異步返回值的獲取。
//R為任務函數的返回值類型
template<class R>
class callback_task
{
typedef boost::function<void (R)> callback;
typedef boost::function<R ()> function;
private:
callback c_;
function f_;
public:
//F: 任務執行函數 C:結果回調函數
template<class F,class C>
callback_task(F f,C c)
{
f_ = f;
c_ = c;
}
void operator()()
{
c_(f_());
}
};
通過這個對象可以很容易的實現異步結果的回調。
//task_int_23的結果回調函數
void callback(int k)
{
cout<<"get callback value:"<<k<<endl;
}
//通過回調的形式獲取任務的返回值
tp.schedule(callback_task<int>(&task_int_23,&callback));