青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

posts - 311, comments - 0, trackbacks - 0, articles - 0
  C++博客 :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

(搬運工)boost之ThreadPool

Posted on 2012-07-17 16:24 點點滴滴 閱讀(3599) 評論(0)  編輯 收藏 引用 所屬分類: 02 編程語言

threadpool是基于boost庫實現的一個線程池子庫,但線程池實現起來不是很復雜。我們從threadpool中又能學到什么東西呢?

它是基于boost庫實現的,如果大家對boost庫有興趣,看看一個簡單的實現還是可以學到點東西的。

threadpool基本功能

1、任務封裝,包括普通任務(task_func)和優先級任務(prio_task_func)。

2、調度策略,包括fifo_scheduler(先進先出)、lifo_scheduler(后進先出)、prio_scheduler(優先級)。

3、結束策略,包括wait_for_all_tasks(全部任務等待)、wait_for_active_tasks(激活任務等待)、immediately(立即結束)。

4、動態修改線程池個數功能。

5、基于future封裝的異步返回值獲取功能。

在sorceforge上有一個用boost編寫的線程池。該線程池和boost結合的比較好,并且提供了多種任務執行策略,使用也非常簡單。 下載地址: http://threadpool.sourceforge.net/ 這個線程池不需要編譯,只要在項目中包含其頭文件就可以了。

一、源代碼分析

quickstart分析(/threadpool/libs/threadpool/quickstart

這個例子的代碼很簡單,但已經全部展示了線程池的核心內容,包括建立、調度、同步等操作。

view plaincopy to clipboardprint?

// Create fifo thread pool container with two threads.

pool tp(2);

// Add some tasks to the pool.

tp.schedule(&first_task);

tp.schedule(&second_task);

// Wait until all tasks are finished.

tp.wait();

// Create fifo thread pool container with two threads.

pool tp(2);

// Add some tasks to the pool.

tp.schedule(&first_task);

tp.schedule(&second_task);

// Wait until all tasks are finished.

tp.wait();

pool的定義具體見pool.hpp,但使用了pimpl模式,核心代碼見pool_core.hpp文件。

下面是pool的定義

typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;

typedef fifo_pool pool;

從上面可以知道,pool實際就是fifo_pool,從模板參數可以看到,使用了fifo_schedulerwait_for_all_tasks

對于線程池有點理解的都知道,一般都是那幾樣東西,線程的封裝,條件變量,隊列數據結構。

所以簡單的能做的很簡單,復雜的的就看你的策略需求了。

對基于boost庫的threadpool子庫來說,上面的三樣東西都是現成的,線程封裝和條件變量直接使用thread子庫就行,隊列使用stl的標準容器。

task_adaptors.hpp

對線程任務的封裝,所謂task,我們可以理解成需要運行的函數。

threadpool最大限度的使用了functionbind功能來封裝函數,這點和thread子庫類似。

文件中涉及的內容主要有三個:task_funcprio_task_funclooped_task_func

對普通task的封裝

typedef function0<void> task_func;

如果對bindfunction熟悉的應該很好理解。

對優先級任務的封裝

class prio_task_func

這個類很簡單,重載了兩個方法,

operator()是仿函數的用法,

operator<是用于優先級比較使用的,用于stl容器的元素比較。

size_policies.hpp

size的封裝,包括empty_controllerresize_controllerstatic_size

shutdown_policies.hpp

對線程池結束的策略封裝,包括wait_for_all_taskswait_for_active_tasksimmediately

這幾個類很簡單,具體操作封裝在pool中。

線程池運行過程中,包括隊列中等待的task,線程正在運行的task

所以結束的時候,對這些task的策略操作是有選擇的。

scheduling_policies.hpp

對任務調度測試的封裝,包括fifo_schedulerlifo_schedulerprio_scheduler

實際上,這三個類的相似程度很高,大家可能更喜歡用繼承和虛函數實現。

前面說到保存task的隊列數據結構,在這里就看的很清楚了。

fifolifo使用的是std::dequeprio使用的是std::priority_queue,其他部分代碼沒什么好說的了。

pool_adaptors.hpp

對全局schedule函數的幾種封裝。

future.hpp

好像thread子庫也有future,但不清楚是否是一樣的內容。

threadpoolfuture是為了封裝異步函數調用返回值實現的。

簡單點理解,就是schedule任務的時候,把一個指針在兩者間綁定起來,后面就可以通過future來獲取返回值了。

當然,獲取返回值的過程應該是阻塞的,任務未完成時只能wait

locking_ptr.hpp

LockingPtr的簡單封裝,具體可googlevolatile - Multithreaded Programmer's Best Friend》。

threadpool大量使用了volatile關鍵字,所以需要LockingPtr保護。

scope_guard.hpp

對函數對象的封裝,利用C++析構函數時調用一個在構造函數時綁定的函數對象。

worker_thread.hpp

對工作線程的封裝,這個封裝不是指底層線程api封裝,因為這部分是由boostthread子庫提供的。

封裝針對的是循環執行task的邏輯函數(線程跑起來就loop run某個函數,從隊列中獲取task執行,空閑時等待。)

我們重點看的是runcreate_and_attach

這兩個函數連起來看,就很清楚了,create_and_attach通過bind方式生成一個thread執行run方法。

run方法中的這條語句就是一個簡單的loop操作,

while(m_pool->execute_task()) {}

所以,當execute_task返回值為false時,run函數就結束了,bind該函數的thread也就結束了。

ok,來到這里,有必要簡單的把整個調用過程說明一下。

// Create fifo thread pool container with two threads.

pool tp(2);

該操作會調用pool的構造函數

view plaincopy to clipboardprint?

thread_pool(size_t initial_threads = 0)

: m_core(new pool_core_type)

, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))

{

size_policy_type::init(*m_core, initial_threads);

}

thread_pool(size_t initial_threads = 0)

: m_core(new pool_core_type)

, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))

{

size_policy_type::init(*m_core, initial_threads);

}

由于pimpl模式,所以所有代碼都封裝在m_core內實現的。

pool默認的線程個數為0,通過size_policy_type::init來初始化。

size_policy_type是一個模板參數,pool對應的是fifo,所以也就是static_size類型了。

//static_size類的init函數

view plaincopy to clipboardprint?

static void init(Pool& pool, size_t const worker_count)

{

pool.resize(worker_count);

}

static void init(Pool& pool, size_t const worker_count)

{

pool.resize(worker_count);

}

//pool_coreresize函數

這個函數有點長,主要是做動態配置線程個數的邏輯操作,create_and_attach也是在這里調用的。

view plaincopy to clipboardprint?

//worker_threadcreate_and_attach函數

static void create_and_attach(shared_ptr<pool_type> const & pool)

{

shared_ptr<worker_thread> worker(new worker_thread(pool));

if(worker)

{

//run是線程的loop函數

worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));

}

}

//worker_threadcreate_and_attach函數

static void create_and_attach(shared_ptr<pool_type> const & pool)

{

shared_ptr<worker_thread> worker(new worker_thread(pool));

if(worker)

{

//run是線程的loop函數

worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));

}

}

view plaincopy to clipboardprint?

//worker_threadrun函數

void run()

{

scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));

while(m_pool->execute_task()) {} //loop直到返回值為false

notify_exception.disable();

m_pool->worker_destructed(this->shared_from_this());

}

//worker_threadrun函數

void run()

{

scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));

while(m_pool->execute_task()) {} //loop直到返回值為false

notify_exception.disable();

m_pool->worker_destructed(this->shared_from_this());

}

//pool_coreexecute_task函數

這個函數有點長,簡單點說,就是從隊列中獲取task然后執行,如果隊列為空,則線程需要wait操作。

由于threadpool支持動態resize線程個數,從該函數我們也是可以看出來是如何做到的。

view plaincopy to clipboardprint?

// decrease number of threads if necessary

if(m_worker_count > m_target_worker_count)

{

return false; // terminate worker

}

// decrease number of threads if necessary

if(m_worker_count > m_target_worker_count)

{

return false; // terminate worker

}

pool內部使用了多個整數來記錄現在個數,譬如m_worker_countm_target_worker_count

m_worker_count是當前激活運行中的線程個數。

m_target_worker_count是最新動態配置的線程個數。

當個數不匹配時,通過返回false方式結束線程。

// Add some tasks to the pool.

tp.schedule(&first_task);

view plaincopy to clipboardprint?

//thread_poolschedule函數

bool schedule(task_type const & task)

{

return m_core->schedule(task);

}

//pool_coreschedule函數(和execute_task函數強相關)

bool schedule(task_type const & task) volatile

{

locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

if(lockedThis->m_scheduler.push(task))

{

//task成功入隊列后,notify_one一個線程。

lockedThis->m_task_or_terminate_workers_event.notify_one();

return true;

}

else

{

return false;

}

}

//thread_poolschedule函數

bool schedule(task_type const & task)

{

return m_core->schedule(task);

}

//pool_coreschedule函數(和execute_task函數強相關)

bool schedule(task_type const & task) volatile

{

locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

if(lockedThis->m_scheduler.push(task))

{

//task成功入隊列后,notify_one一個線程。

lockedThis->m_task_or_terminate_workers_event.notify_one();

return true;

}

else

{

return false;

}

}

// Wait until all tasks are finished.

tp.wait();

//pool_corewait函數

void wait(size_t const task_threshold = 0) const volatile

bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile

wait函數是一個阻塞操作,內部邏輯實現使用了一個條件變量,提供超時等待方式。

二、boost線程池使用實例

線程池可以減少創建和切換線程的額外開銷,利用已經存在的線程多次循環執行多個任務從而提高系統的處理能力,有關線程池的概念可google搜索,下面將其使用實例:

#include <iostream>
#include <sstream>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>

#include <boost/threadpool.hpp>

using namespace std;
using namespace boost::threadpool;


//
// Helpers
boost::mutex m_io_monitor;

void print(string text)
{
boost::mutex::scoped_lock lock(m_io_monitor);//每個線程使用全局互斥來保證每次只有一個線程執行
cout << text;
}

template<typename T>
string to_string(T const & value)
{
ostringstream ost;
ost << value;
ost.flush();
return ost.str();
}

//
// An example task functions
void task_1()
{
print(" task_1()/n");
}

void task_2()
{
print(" task_2()/n");
}

void task_3()
{
print(" task_3()/n");
}

int task_4()
{
print(" task_4()/n");
return 4;
}

void task_with_parameter(int value)
{
print(" task_with_parameter(" + to_string(value) + ")/n");
}

int loops = 0;
bool looped_task()
{
print(" looped_task()/n");
return ++loops < 5;
}


int task_int()
{
print(" task_int()/n");
return 23;
}


void fifo_pool_test()
{
pool tp;

tp.schedule(&task_1);
tp.schedule(boost::bind(task_with_parameter, 4));

if(!tp.empty())
{
tp.clear(); // remove all tasks -> no output in this test
}

size_t active_threads = tp.active();
size_t pending_threads = tp.pending();
size_t total_threads = tp.size();

size_t dummy = active_threads + pending_threads + total_threads;
dummy++;

tp.size_controller().resize(5);
tp.wait();
}

void lifo_pool_test()
{
lifo_pool tp;
tp.size_controller().resize(0);
schedule(tp, &task_1);
tp.size_controller().resize(10);
tp.wait();
}

void prio_pool_test()
{
prio_pool tp(2);
schedule(tp, prio_task_func(1, &task_1));
schedule(tp, prio_task_func(10,&task_2));
}


void future_test()
{
fifo_pool tp(5);
future<int> fut = schedule(tp, &task_4);
int res = fut();
}


int main (int , char * const [])
{
fifo_pool_test();
lifo_pool_test();
prio_pool_test();
future_test();
return 0;
}

任務返回值的獲取:

一般異步調用中,返回值的獲取有同步獲取和異步獲取兩種形式。

同步獲取返回值:

int task_int_23()
{
cout<<"task_int_23()/n";
return 23;
}

future<int> res = schedule(tp, &task_int_23);
res.wait();

cout<<"get res value:"<<res.get()<<endl;

異步獲取返回值:

不知道是設計者就不打算使用異步回調獲取返回值還是我看的不夠仔細,異步獲取返回值的方式還真沒有找著,只好自己簡單的寫了一個回調的仿函數來實現異步返回值的獲取。

//R為任務函數的返回值類型
template
<class R>
class callback_task
{
typedef boost::function<void (R)> callback;
typedef boost::function<R ()> function;

private:
callback c_;
function f_;

public:
//F:
任務執行函數
C:結果回調函數
template<class F,class C>
callback_task(F f,C c)
{
f_ = f;
c_ = c;
}

void operator()()
{
c_(f_());
}
};

通過這個對象可以很容易的實現異步結果的回調。

//task_int_23的結果回調函數
void callback(int k)
{
cout<<"get callback value:"<<k<<endl;
}

//通過回調的形式獲取任務的返回值
tp.schedule(callback_task<int>(&task_int_23,&callback));

青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            欧美一级片一区| 亚洲综合二区| 国产网站欧美日韩免费精品在线观看| 欧美国产日本韩| 国产一区二区三区四区三区四| 亚洲毛片一区二区| 在线观看欧美一区| 欧美在线视频a| 亚洲综合欧美| 欧美色中文字幕| 亚洲区国产区| 亚洲福利视频免费观看| 久久国产精品网站| 欧美在线播放一区二区| 国产精品九九久久久久久久| 亚洲精品资源美女情侣酒店| 日韩午夜激情电影| 欧美顶级大胆免费视频| 亚洲福利视频网| 亚洲精品男同| 欧美日韩国产精品成人| 亚洲精品久久7777| 亚洲少妇中出一区| 欧美日韩亚洲一区三区 | 亚洲欧美日韩在线播放| 亚洲伊人一本大道中文字幕| 国产精品大片| 亚洲欧美另类中文字幕| 久久国产一区| 伊人久久av导航| 久久综合狠狠综合久久综合88 | 欧美剧在线免费观看网站| 亚洲丁香婷深爱综合| 亚洲精品视频免费| 欧美视频免费看| 亚洲一区二区三区在线| 久久久精品午夜少妇| 精品成人一区| 欧美精品一区二区精品网| 夜夜嗨av一区二区三区四区 | 久久激情综合| 在线观看三级视频欧美| 欧美大片91| 亚洲一区二区三区免费在线观看| 欧美一区二区播放| 1024国产精品| 欧美日韩一区在线| 欧美一区久久| 亚洲电影下载| 午夜电影亚洲| 亚洲国产精品一区二区第一页| 欧美电影在线免费观看网站| 亚洲无玛一区| 欧美大片一区| 亚洲色图自拍| 精品电影在线观看| 欧美日韩免费精品| 久久成人精品一区二区三区| 亚洲七七久久综合桃花剧情介绍| 午夜精品在线看| 亚洲人www| 国产午夜精品一区理论片飘花 | 欧美日韩精品三区| 欧美一区亚洲二区| 亚洲国产一区二区三区青草影视| 亚洲欧美视频| 亚洲国产毛片完整版| 国产精品一区免费视频| 欧美国产精品一区| 欧美一区二区三区久久精品茉莉花| 亚洲国产婷婷香蕉久久久久久| 欧美亚洲尤物久久| 99这里只有久久精品视频| 影音先锋国产精品| 国产精品视屏| 欧美精品v日韩精品v韩国精品v | 麻豆成人精品| 欧美亚洲在线播放| 中日韩高清电影网| 亚洲激情电影中文字幕| 久久综合色88| 欧美在线三区| 午夜精品久久久久久久99黑人| 亚洲人成免费| 亚洲国产精品ⅴa在线观看| 国产欧美日韩在线| 国产精品黄色在线观看| 欧美日韩xxxxx| 欧美激情国产日韩| 免费视频最近日韩| 久热精品视频在线观看一区| 午夜在线精品偷拍| 亚洲欧美日韩另类精品一区二区三区| 亚洲伦理在线免费看| 亚洲国产精品成人| 亚洲电影在线| 欧美激情欧美狂野欧美精品| 欧美~级网站不卡| 另类春色校园亚洲| 老色批av在线精品| 久久夜精品va视频免费观看| 久久久xxx| 久久久久综合网| 久久久噜噜噜久久| 另类国产ts人妖高潮视频| 久久久国产精品一区二区三区| 欧美在线视频网站| 久久精品一二三区| 久久青青草综合| 麻豆9191精品国产| 欧美成人日本| 亚洲国产欧美一区| 亚洲人午夜精品| avtt综合网| 午夜欧美不卡精品aaaaa| 午夜精品久久久久久| 久久成人人人人精品欧| 久久久久久久综合日本| 蜜臀va亚洲va欧美va天堂| 蜜桃久久av一区| 欧美欧美在线| 国产精品网站视频| 国产一区二区三区av电影| 伊人蜜桃色噜噜激情综合| 亚洲欧洲日产国码二区| 在线视频一区观看| 欧美一区二区三区久久精品茉莉花 | 久久成人亚洲| 裸体女人亚洲精品一区| 欧美精品 国产精品| 国产精品老牛| 精品福利av| 一本色道久久88精品综合| 欧美亚洲免费| 欧美超级免费视 在线| 亚洲精选国产| 久久精品国产99国产精品| 欧美成人精品在线视频| 国产精品久久久久久久电影 | 欧美在线视频网站| 欧美高潮视频| 亚洲一区在线播放| 另类天堂视频在线观看| 欧美日韩在线播放三区| 国内精品美女av在线播放| 日韩一本二本av| 久久久久在线观看| 亚洲美女毛片| 久久影音先锋| 国产精品系列在线播放| 亚洲区中文字幕| 久久激情网站| 一本色道久久综合一区| 久久亚洲精品一区二区| 国产精品卡一卡二| 亚洲精选在线观看| 久久久久国产一区二区| a91a精品视频在线观看| 久久综合精品国产一区二区三区| 国产精品久久久久久久一区探花| 亚洲国产精品专区久久| 欧美在线视频播放| av不卡在线观看| 老司机久久99久久精品播放免费| 国产精品一区在线播放| 一区二区高清在线| 欧美xx69| 久久久91精品国产| 国产欧美一区二区三区国产幕精品 | 亚洲精品乱码久久久久久蜜桃91| 久久久久久久久久久久久女国产乱| 亚洲精品久久| 男人天堂欧美日韩| 国内精品伊人久久久久av影院| 亚洲在线成人| 亚洲精品免费电影| 欧美mv日韩mv亚洲| 亚洲国产成人精品久久久国产成人一区 | 黑人巨大精品欧美黑白配亚洲 | 亚洲国产欧美一区二区三区丁香婷| 久久大综合网| 国产欧美日韩精品在线| 亚洲欧美日韩一区二区三区在线| 亚洲人成7777| 欧美精品色一区二区三区| 亚洲日韩欧美视频| 亚洲电影在线看| 欧美黄色影院| 日韩午夜在线电影| 亚洲精品国产精品国自产在线| 美女国产一区| 亚洲毛片网站| 91久久精品日日躁夜夜躁国产| 欧美大胆成人| 99精品免费网| 9l视频自拍蝌蚪9l视频成人| 国产精品二区三区四区| 性欧美8khd高清极品| 性感少妇一区| 在线成人中文字幕|