• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            posts - 311, comments - 0, trackbacks - 0, articles - 0
              C++博客 :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理

            threadpool是基于boost庫(kù)實(shí)現(xiàn)的一個(gè)線(xiàn)程池子庫(kù),但線(xiàn)程池實(shí)現(xiàn)起來(lái)不是很復(fù)雜。我們從threadpool中又能學(xué)到什么東西呢?

            它是基于boost庫(kù)實(shí)現(xiàn)的,如果大家對(duì)boost庫(kù)有興趣,看看一個(gè)簡(jiǎn)單的實(shí)現(xiàn)還是可以學(xué)到點(diǎn)東西的。

            threadpool基本功能

            1、任務(wù)封裝,包括普通任務(wù)(task_func)和優(yōu)先級(jí)任務(wù)(prio_task_func)。

            2、調(diào)度策略,包括fifo_scheduler(先進(jìn)先出)、lifo_scheduler(后進(jìn)先出)、prio_scheduler(優(yōu)先級(jí))。

            3、結(jié)束策略,包括wait_for_all_tasks(全部任務(wù)等待)、wait_for_active_tasks(激活任務(wù)等待)、immediately(立即結(jié)束)。

            4、動(dòng)態(tài)修改線(xiàn)程池個(gè)數(shù)功能。

            5、基于future封裝的異步返回值獲取功能。

            在sorceforge上有一個(gè)用boost編寫(xiě)的線(xiàn)程池。該線(xiàn)程池和boost結(jié)合的比較好,并且提供了多種任務(wù)執(zhí)行策略,使用也非常簡(jiǎn)單。 下載地址: http://threadpool.sourceforge.net/ 這個(gè)線(xiàn)程池不需要編譯,只要在項(xiàng)目中包含其頭文件就可以了。

            一、源代碼分析

            quickstart分析(/threadpool/libs/threadpool/quickstart

            這個(gè)例子的代碼很簡(jiǎn)單,但已經(jīng)全部展示了線(xiàn)程池的核心內(nèi)容,包括建立、調(diào)度、同步等操作。

            view plaincopy to clipboardprint?

            // Create fifo thread pool container with two threads.

            pool tp(2);

            // Add some tasks to the pool.

            tp.schedule(&first_task);

            tp.schedule(&second_task);

            // Wait until all tasks are finished.

            tp.wait();

            // Create fifo thread pool container with two threads.

            pool tp(2);

            // Add some tasks to the pool.

            tp.schedule(&first_task);

            tp.schedule(&second_task);

            // Wait until all tasks are finished.

            tp.wait();

            pool的定義具體見(jiàn)pool.hpp,但使用了pimpl模式,核心代碼見(jiàn)pool_core.hpp文件。

            下面是pool的定義

            typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;

            typedef fifo_pool pool;

            從上面可以知道,pool實(shí)際就是fifo_pool,從模板參數(shù)可以看到,使用了fifo_schedulerwait_for_all_tasks

            對(duì)于線(xiàn)程池有點(diǎn)理解的都知道,一般都是那幾樣?xùn)|西,線(xiàn)程的封裝,條件變量,隊(duì)列數(shù)據(jù)結(jié)構(gòu)。

            所以簡(jiǎn)單的能做的很簡(jiǎn)單,復(fù)雜的的就看你的策略需求了。

            對(duì)基于boost庫(kù)的threadpool子庫(kù)來(lái)說(shuō),上面的三樣?xùn)|西都是現(xiàn)成的,線(xiàn)程封裝和條件變量直接使用thread子庫(kù)就行,隊(duì)列使用stl的標(biāo)準(zhǔn)容器。

            task_adaptors.hpp

            對(duì)線(xiàn)程任務(wù)的封裝,所謂task,我們可以理解成需要運(yùn)行的函數(shù)。

            threadpool最大限度的使用了functionbind功能來(lái)封裝函數(shù),這點(diǎn)和thread子庫(kù)類(lèi)似。

            文件中涉及的內(nèi)容主要有三個(gè):task_funcprio_task_funclooped_task_func

            對(duì)普通task的封裝

            typedef function0<void> task_func;

            如果對(duì)bindfunction熟悉的應(yīng)該很好理解。

            對(duì)優(yōu)先級(jí)任務(wù)的封裝

            class prio_task_func

            這個(gè)類(lèi)很簡(jiǎn)單,重載了兩個(gè)方法,

            operator()是仿函數(shù)的用法,

            operator<是用于優(yōu)先級(jí)比較使用的,用于stl容器的元素比較。

            size_policies.hpp

            對(duì)size的封裝,包括empty_controllerresize_controllerstatic_size

            shutdown_policies.hpp

            對(duì)線(xiàn)程池結(jié)束的策略封裝,包括wait_for_all_taskswait_for_active_tasksimmediately

            這幾個(gè)類(lèi)很簡(jiǎn)單,具體操作封裝在pool中。

            線(xiàn)程池運(yùn)行過(guò)程中,包括隊(duì)列中等待的task,線(xiàn)程正在運(yùn)行的task

            所以結(jié)束的時(shí)候,對(duì)這些task的策略操作是有選擇的。

            scheduling_policies.hpp

            對(duì)任務(wù)調(diào)度測(cè)試的封裝,包括fifo_schedulerlifo_schedulerprio_scheduler

            實(shí)際上,這三個(gè)類(lèi)的相似程度很高,大家可能更喜歡用繼承和虛函數(shù)實(shí)現(xiàn)。

            前面說(shuō)到保存task的隊(duì)列數(shù)據(jù)結(jié)構(gòu),在這里就看的很清楚了。

            fifolifo使用的是std::dequeprio使用的是std::priority_queue,其他部分代碼沒(méi)什么好說(shuō)的了。

            pool_adaptors.hpp

            對(duì)全局schedule函數(shù)的幾種封裝。

            future.hpp

            好像thread子庫(kù)也有future,但不清楚是否是一樣的內(nèi)容。

            threadpoolfuture是為了封裝異步函數(shù)調(diào)用返回值實(shí)現(xiàn)的。

            簡(jiǎn)單點(diǎn)理解,就是schedule任務(wù)的時(shí)候,把一個(gè)指針在兩者間綁定起來(lái),后面就可以通過(guò)future來(lái)獲取返回值了。

            當(dāng)然,獲取返回值的過(guò)程應(yīng)該是阻塞的,任務(wù)未完成時(shí)只能wait

            locking_ptr.hpp

            LockingPtr的簡(jiǎn)單封裝,具體可googlevolatile - Multithreaded Programmer's Best Friend》。

            threadpool大量使用了volatile關(guān)鍵字,所以需要LockingPtr保護(hù)。

            scope_guard.hpp

            對(duì)函數(shù)對(duì)象的封裝,利用C++析構(gòu)函數(shù)時(shí)調(diào)用一個(gè)在構(gòu)造函數(shù)時(shí)綁定的函數(shù)對(duì)象。

            worker_thread.hpp

            對(duì)工作線(xiàn)程的封裝,這個(gè)封裝不是指底層線(xiàn)程api封裝,因?yàn)檫@部分是由boostthread子庫(kù)提供的。

            封裝針對(duì)的是循環(huán)執(zhí)行task的邏輯函數(shù)(線(xiàn)程跑起來(lái)就loop run某個(gè)函數(shù),從隊(duì)列中獲取task執(zhí)行,空閑時(shí)等待。)

            我們重點(diǎn)看的是runcreate_and_attach

            這兩個(gè)函數(shù)連起來(lái)看,就很清楚了,create_and_attach通過(guò)bind方式生成一個(gè)thread執(zhí)行run方法。

            run方法中的這條語(yǔ)句就是一個(gè)簡(jiǎn)單的loop操作,

            while(m_pool->execute_task()) {}

            所以,當(dāng)execute_task返回值為false時(shí),run函數(shù)就結(jié)束了,bind該函數(shù)的thread也就結(jié)束了。

            ok,來(lái)到這里,有必要簡(jiǎn)單的把整個(gè)調(diào)用過(guò)程說(shuō)明一下。

            // Create fifo thread pool container with two threads.

            pool tp(2);

            該操作會(huì)調(diào)用pool的構(gòu)造函數(shù)

            view plaincopy to clipboardprint?

            thread_pool(size_t initial_threads = 0)

            : m_core(new pool_core_type)

            , m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))

            {

            size_policy_type::init(*m_core, initial_threads);

            }

            thread_pool(size_t initial_threads = 0)

            : m_core(new pool_core_type)

            , m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))

            {

            size_policy_type::init(*m_core, initial_threads);

            }

            由于pimpl模式,所以所有代碼都封裝在m_core內(nèi)實(shí)現(xiàn)的。

            pool默認(rèn)的線(xiàn)程個(gè)數(shù)為0,通過(guò)size_policy_type::init來(lái)初始化。

            size_policy_type是一個(gè)模板參數(shù),pool對(duì)應(yīng)的是fifo,所以也就是static_size類(lèi)型了。

            //static_size類(lèi)的init函數(shù)

            view plaincopy to clipboardprint?

            static void init(Pool& pool, size_t const worker_count)

            {

            pool.resize(worker_count);

            }

            static void init(Pool& pool, size_t const worker_count)

            {

            pool.resize(worker_count);

            }

            //pool_coreresize函數(shù)

            這個(gè)函數(shù)有點(diǎn)長(zhǎng),主要是做動(dòng)態(tài)配置線(xiàn)程個(gè)數(shù)的邏輯操作,create_and_attach也是在這里調(diào)用的。

            view plaincopy to clipboardprint?

            //worker_threadcreate_and_attach函數(shù)

            static void create_and_attach(shared_ptr<pool_type> const & pool)

            {

            shared_ptr<worker_thread> worker(new worker_thread(pool));

            if(worker)

            {

            //run是線(xiàn)程的loop函數(shù)

            worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));

            }

            }

            //worker_threadcreate_and_attach函數(shù)

            static void create_and_attach(shared_ptr<pool_type> const & pool)

            {

            shared_ptr<worker_thread> worker(new worker_thread(pool));

            if(worker)

            {

            //run是線(xiàn)程的loop函數(shù)

            worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));

            }

            }

            view plaincopy to clipboardprint?

            //worker_threadrun函數(shù)

            void run()

            {

            scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));

            while(m_pool->execute_task()) {} //loop直到返回值為false

            notify_exception.disable();

            m_pool->worker_destructed(this->shared_from_this());

            }

            //worker_threadrun函數(shù)

            void run()

            {

            scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));

            while(m_pool->execute_task()) {} //loop直到返回值為false

            notify_exception.disable();

            m_pool->worker_destructed(this->shared_from_this());

            }

            //pool_coreexecute_task函數(shù)

            這個(gè)函數(shù)有點(diǎn)長(zhǎng),簡(jiǎn)單點(diǎn)說(shuō),就是從隊(duì)列中獲取task然后執(zhí)行,如果隊(duì)列為空,則線(xiàn)程需要wait操作。

            由于threadpool支持動(dòng)態(tài)resize線(xiàn)程個(gè)數(shù),從該函數(shù)我們也是可以看出來(lái)是如何做到的。

            view plaincopy to clipboardprint?

            // decrease number of threads if necessary

            if(m_worker_count > m_target_worker_count)

            {

            return false; // terminate worker

            }

            // decrease number of threads if necessary

            if(m_worker_count > m_target_worker_count)

            {

            return false; // terminate worker

            }

            pool內(nèi)部使用了多個(gè)整數(shù)來(lái)記錄現(xiàn)在個(gè)數(shù),譬如m_worker_countm_target_worker_count

            m_worker_count是當(dāng)前激活運(yùn)行中的線(xiàn)程個(gè)數(shù)。

            m_target_worker_count是最新動(dòng)態(tài)配置的線(xiàn)程個(gè)數(shù)。

            當(dāng)個(gè)數(shù)不匹配時(shí),通過(guò)返回false方式結(jié)束線(xiàn)程。

            // Add some tasks to the pool.

            tp.schedule(&first_task);

            view plaincopy to clipboardprint?

            //thread_poolschedule函數(shù)

            bool schedule(task_type const & task)

            {

            return m_core->schedule(task);

            }

            //pool_coreschedule函數(shù)(和execute_task函數(shù)強(qiáng)相關(guān))

            bool schedule(task_type const & task) volatile

            {

            locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

            if(lockedThis->m_scheduler.push(task))

            {

            //task成功入隊(duì)列后,notify_one一個(gè)線(xiàn)程。

            lockedThis->m_task_or_terminate_workers_event.notify_one();

            return true;

            }

            else

            {

            return false;

            }

            }

            //thread_poolschedule函數(shù)

            bool schedule(task_type const & task)

            {

            return m_core->schedule(task);

            }

            //pool_coreschedule函數(shù)(和execute_task函數(shù)強(qiáng)相關(guān))

            bool schedule(task_type const & task) volatile

            {

            locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

            if(lockedThis->m_scheduler.push(task))

            {

            //task成功入隊(duì)列后,notify_one一個(gè)線(xiàn)程。

            lockedThis->m_task_or_terminate_workers_event.notify_one();

            return true;

            }

            else

            {

            return false;

            }

            }

            // Wait until all tasks are finished.

            tp.wait();

            //pool_corewait函數(shù)

            void wait(size_t const task_threshold = 0) const volatile

            bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile

            wait函數(shù)是一個(gè)阻塞操作,內(nèi)部邏輯實(shí)現(xiàn)使用了一個(gè)條件變量,提供超時(shí)等待方式。

            二、boost線(xiàn)程池使用實(shí)例

            線(xiàn)程池可以減少創(chuàng)建和切換線(xiàn)程的額外開(kāi)銷(xiāo),利用已經(jīng)存在的線(xiàn)程多次循環(huán)執(zhí)行多個(gè)任務(wù)從而提高系統(tǒng)的處理能力,有關(guān)線(xiàn)程池的概念可google搜索,下面將其使用實(shí)例:

            #include <iostream>
            #include <sstream>
            #include <boost/thread/mutex.hpp>
            #include <boost/bind.hpp>

            #include <boost/threadpool.hpp>

            using namespace std;
            using namespace boost::threadpool;


            //
            // Helpers
            boost::mutex m_io_monitor;

            void print(string text)
            {
            boost::mutex::scoped_lock lock(m_io_monitor);//每個(gè)線(xiàn)程使用全局互斥來(lái)保證每次只有一個(gè)線(xiàn)程執(zhí)行
            cout << text;
            }

            template<typename T>
            string to_string(T const & value)
            {
            ostringstream ost;
            ost << value;
            ost.flush();
            return ost.str();
            }

            //
            // An example task functions
            void task_1()
            {
            print(" task_1()/n");
            }

            void task_2()
            {
            print(" task_2()/n");
            }

            void task_3()
            {
            print(" task_3()/n");
            }

            int task_4()
            {
            print(" task_4()/n");
            return 4;
            }

            void task_with_parameter(int value)
            {
            print(" task_with_parameter(" + to_string(value) + ")/n");
            }

            int loops = 0;
            bool looped_task()
            {
            print(" looped_task()/n");
            return ++loops < 5;
            }


            int task_int()
            {
            print(" task_int()/n");
            return 23;
            }


            void fifo_pool_test()
            {
            pool tp;

            tp.schedule(&task_1);
            tp.schedule(boost::bind(task_with_parameter, 4));

            if(!tp.empty())
            {
            tp.clear(); // remove all tasks -> no output in this test
            }

            size_t active_threads = tp.active();
            size_t pending_threads = tp.pending();
            size_t total_threads = tp.size();

            size_t dummy = active_threads + pending_threads + total_threads;
            dummy++;

            tp.size_controller().resize(5);
            tp.wait();
            }

            void lifo_pool_test()
            {
            lifo_pool tp;
            tp.size_controller().resize(0);
            schedule(tp, &task_1);
            tp.size_controller().resize(10);
            tp.wait();
            }

            void prio_pool_test()
            {
            prio_pool tp(2);
            schedule(tp, prio_task_func(1, &task_1));
            schedule(tp, prio_task_func(10,&task_2));
            }


            void future_test()
            {
            fifo_pool tp(5);
            future<int> fut = schedule(tp, &task_4);
            int res = fut();
            }


            int main (int , char * const [])
            {
            fifo_pool_test();
            lifo_pool_test();
            prio_pool_test();
            future_test();
            return 0;
            }

            任務(wù)返回值的獲取:

            一般異步調(diào)用中,返回值的獲取有同步獲取和異步獲取兩種形式。

            同步獲取返回值:

            int task_int_23()
            {
            cout<<"task_int_23()/n";
            return 23;
            }

            future<int> res = schedule(tp, &task_int_23);
            res.wait();

            cout<<"get res value:"<<res.get()<<endl;

            異步獲取返回值:

            不知道是設(shè)計(jì)者就不打算使用異步回調(diào)獲取返回值還是我看的不夠仔細(xì),異步獲取返回值的方式還真沒(méi)有找著,只好自己簡(jiǎn)單的寫(xiě)了一個(gè)回調(diào)的仿函數(shù)來(lái)實(shí)現(xiàn)異步返回值的獲取。

            //R為任務(wù)函數(shù)的返回值類(lèi)型
            template
            <class R>
            class callback_task
            {
            typedef boost::function<void (R)> callback;
            typedef boost::function<R ()> function;

            private:
            callback c_;
            function f_;

            public:
            //F:
            任務(wù)執(zhí)行函數(shù)
            C:結(jié)果回調(diào)函數(shù)
            template<class F,class C>
            callback_task(F f,C c)
            {
            f_ = f;
            c_ = c;
            }

            void operator()()
            {
            c_(f_());
            }
            };

            通過(guò)這個(gè)對(duì)象可以很容易的實(shí)現(xiàn)異步結(jié)果的回調(diào)。

            //task_int_23的結(jié)果回調(diào)函數(shù)
            void callback(int k)
            {
            cout<<"get callback value:"<<k<<endl;
            }

            //通過(guò)回調(diào)的形式獲取任務(wù)的返回值
            tp.schedule(callback_task<int>(&task_int_23,&callback));

            日本精品久久久久中文字幕8| 久久综合香蕉国产蜜臀AV| 国产午夜免费高清久久影院| 国产亚洲美女精品久久久久狼| 国产精品美女久久久免费| 久久丫忘忧草产品| 精品久久久久久亚洲| 亚洲精品无码久久不卡| 97r久久精品国产99国产精| 欧美久久久久久午夜精品| 伊人久久大香线蕉av一区| 国产成人精品久久综合| 无码国产69精品久久久久网站| 精品国产综合区久久久久久| 亚洲人成精品久久久久| 久久精品国产亚洲精品| 久久96国产精品久久久| 久久天天躁狠狠躁夜夜不卡| 爱做久久久久久| 国产精品久久久福利| 国色天香久久久久久久小说| 久久久久久国产a免费观看不卡| 国内精品久久久久影院一蜜桃| 国产精品久久久久久久久久影院| 九九久久精品无码专区| 久久精品嫩草影院| 97久久超碰国产精品2021| 无码精品久久久久久人妻中字| 久久久久久久久久久精品尤物| 亚洲美日韩Av中文字幕无码久久久妻妇| 中文字幕亚洲综合久久2| 99久久www免费人成精品| 嫩草影院久久国产精品| 国产一区二区三区久久精品| 国产成人久久AV免费| 久久精品国产亚洲av高清漫画| 亚洲欧美伊人久久综合一区二区| 久久久一本精品99久久精品66| 久久久久免费看成人影片| 99久久精品国产高清一区二区 | 久久久久久久久久久|