• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            posts - 311, comments - 0, trackbacks - 0, articles - 0
              C++博客 :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

            (搬運工)boost之ThreadPool

            Posted on 2012-07-17 16:24 點點滴滴 閱讀(3569) 評論(0)  編輯 收藏 引用 所屬分類: 02 編程語言

            threadpool是基于boost庫實現的一個線程池子庫,但線程池實現起來不是很復雜。我們從threadpool中又能學到什么東西呢?

            它是基于boost庫實現的,如果大家對boost庫有興趣,看看一個簡單的實現還是可以學到點東西的。

            threadpool基本功能

            1、任務封裝,包括普通任務(task_func)和優先級任務(prio_task_func)。

            2、調度策略,包括fifo_scheduler(先進先出)、lifo_scheduler(后進先出)、prio_scheduler(優先級)。

            3、結束策略,包括wait_for_all_tasks(全部任務等待)、wait_for_active_tasks(激活任務等待)、immediately(立即結束)。

            4、動態修改線程池個數功能。

            5、基于future封裝的異步返回值獲取功能。

            在sorceforge上有一個用boost編寫的線程池。該線程池和boost結合的比較好,并且提供了多種任務執行策略,使用也非常簡單。 下載地址: http://threadpool.sourceforge.net/ 這個線程池不需要編譯,只要在項目中包含其頭文件就可以了。

            一、源代碼分析

            quickstart分析(/threadpool/libs/threadpool/quickstart

            這個例子的代碼很簡單,但已經全部展示了線程池的核心內容,包括建立、調度、同步等操作。

            view plaincopy to clipboardprint?

            // Create fifo thread pool container with two threads.

            pool tp(2);

            // Add some tasks to the pool.

            tp.schedule(&first_task);

            tp.schedule(&second_task);

            // Wait until all tasks are finished.

            tp.wait();

            // Create fifo thread pool container with two threads.

            pool tp(2);

            // Add some tasks to the pool.

            tp.schedule(&first_task);

            tp.schedule(&second_task);

            // Wait until all tasks are finished.

            tp.wait();

            pool的定義具體見pool.hpp,但使用了pimpl模式,核心代碼見pool_core.hpp文件。

            下面是pool的定義

            typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;

            typedef fifo_pool pool;

            從上面可以知道,pool實際就是fifo_pool,從模板參數可以看到,使用了fifo_schedulerwait_for_all_tasks

            對于線程池有點理解的都知道,一般都是那幾樣東西,線程的封裝,條件變量,隊列數據結構。

            所以簡單的能做的很簡單,復雜的的就看你的策略需求了。

            對基于boost庫的threadpool子庫來說,上面的三樣東西都是現成的,線程封裝和條件變量直接使用thread子庫就行,隊列使用stl的標準容器。

            task_adaptors.hpp

            對線程任務的封裝,所謂task,我們可以理解成需要運行的函數。

            threadpool最大限度的使用了functionbind功能來封裝函數,這點和thread子庫類似。

            文件中涉及的內容主要有三個:task_funcprio_task_funclooped_task_func

            對普通task的封裝

            typedef function0<void> task_func;

            如果對bindfunction熟悉的應該很好理解。

            對優先級任務的封裝

            class prio_task_func

            這個類很簡單,重載了兩個方法,

            operator()是仿函數的用法,

            operator<是用于優先級比較使用的,用于stl容器的元素比較。

            size_policies.hpp

            size的封裝,包括empty_controllerresize_controllerstatic_size

            shutdown_policies.hpp

            對線程池結束的策略封裝,包括wait_for_all_taskswait_for_active_tasksimmediately

            這幾個類很簡單,具體操作封裝在pool中。

            線程池運行過程中,包括隊列中等待的task,線程正在運行的task

            所以結束的時候,對這些task的策略操作是有選擇的。

            scheduling_policies.hpp

            對任務調度測試的封裝,包括fifo_schedulerlifo_schedulerprio_scheduler

            實際上,這三個類的相似程度很高,大家可能更喜歡用繼承和虛函數實現。

            前面說到保存task的隊列數據結構,在這里就看的很清楚了。

            fifolifo使用的是std::dequeprio使用的是std::priority_queue,其他部分代碼沒什么好說的了。

            pool_adaptors.hpp

            對全局schedule函數的幾種封裝。

            future.hpp

            好像thread子庫也有future,但不清楚是否是一樣的內容。

            threadpoolfuture是為了封裝異步函數調用返回值實現的。

            簡單點理解,就是schedule任務的時候,把一個指針在兩者間綁定起來,后面就可以通過future來獲取返回值了。

            當然,獲取返回值的過程應該是阻塞的,任務未完成時只能wait

            locking_ptr.hpp

            LockingPtr的簡單封裝,具體可googlevolatile - Multithreaded Programmer's Best Friend》。

            threadpool大量使用了volatile關鍵字,所以需要LockingPtr保護。

            scope_guard.hpp

            對函數對象的封裝,利用C++析構函數時調用一個在構造函數時綁定的函數對象。

            worker_thread.hpp

            對工作線程的封裝,這個封裝不是指底層線程api封裝,因為這部分是由boostthread子庫提供的。

            封裝針對的是循環執行task的邏輯函數(線程跑起來就loop run某個函數,從隊列中獲取task執行,空閑時等待。)

            我們重點看的是runcreate_and_attach

            這兩個函數連起來看,就很清楚了,create_and_attach通過bind方式生成一個thread執行run方法。

            run方法中的這條語句就是一個簡單的loop操作,

            while(m_pool->execute_task()) {}

            所以,當execute_task返回值為false時,run函數就結束了,bind該函數的thread也就結束了。

            ok,來到這里,有必要簡單的把整個調用過程說明一下。

            // Create fifo thread pool container with two threads.

            pool tp(2);

            該操作會調用pool的構造函數

            view plaincopy to clipboardprint?

            thread_pool(size_t initial_threads = 0)

            : m_core(new pool_core_type)

            , m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))

            {

            size_policy_type::init(*m_core, initial_threads);

            }

            thread_pool(size_t initial_threads = 0)

            : m_core(new pool_core_type)

            , m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))

            {

            size_policy_type::init(*m_core, initial_threads);

            }

            由于pimpl模式,所以所有代碼都封裝在m_core內實現的。

            pool默認的線程個數為0,通過size_policy_type::init來初始化。

            size_policy_type是一個模板參數,pool對應的是fifo,所以也就是static_size類型了。

            //static_size類的init函數

            view plaincopy to clipboardprint?

            static void init(Pool& pool, size_t const worker_count)

            {

            pool.resize(worker_count);

            }

            static void init(Pool& pool, size_t const worker_count)

            {

            pool.resize(worker_count);

            }

            //pool_coreresize函數

            這個函數有點長,主要是做動態配置線程個數的邏輯操作,create_and_attach也是在這里調用的。

            view plaincopy to clipboardprint?

            //worker_threadcreate_and_attach函數

            static void create_and_attach(shared_ptr<pool_type> const & pool)

            {

            shared_ptr<worker_thread> worker(new worker_thread(pool));

            if(worker)

            {

            //run是線程的loop函數

            worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));

            }

            }

            //worker_threadcreate_and_attach函數

            static void create_and_attach(shared_ptr<pool_type> const & pool)

            {

            shared_ptr<worker_thread> worker(new worker_thread(pool));

            if(worker)

            {

            //run是線程的loop函數

            worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));

            }

            }

            view plaincopy to clipboardprint?

            //worker_threadrun函數

            void run()

            {

            scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));

            while(m_pool->execute_task()) {} //loop直到返回值為false

            notify_exception.disable();

            m_pool->worker_destructed(this->shared_from_this());

            }

            //worker_threadrun函數

            void run()

            {

            scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));

            while(m_pool->execute_task()) {} //loop直到返回值為false

            notify_exception.disable();

            m_pool->worker_destructed(this->shared_from_this());

            }

            //pool_coreexecute_task函數

            這個函數有點長,簡單點說,就是從隊列中獲取task然后執行,如果隊列為空,則線程需要wait操作。

            由于threadpool支持動態resize線程個數,從該函數我們也是可以看出來是如何做到的。

            view plaincopy to clipboardprint?

            // decrease number of threads if necessary

            if(m_worker_count > m_target_worker_count)

            {

            return false; // terminate worker

            }

            // decrease number of threads if necessary

            if(m_worker_count > m_target_worker_count)

            {

            return false; // terminate worker

            }

            pool內部使用了多個整數來記錄現在個數,譬如m_worker_countm_target_worker_count

            m_worker_count是當前激活運行中的線程個數。

            m_target_worker_count是最新動態配置的線程個數。

            當個數不匹配時,通過返回false方式結束線程。

            // Add some tasks to the pool.

            tp.schedule(&first_task);

            view plaincopy to clipboardprint?

            //thread_poolschedule函數

            bool schedule(task_type const & task)

            {

            return m_core->schedule(task);

            }

            //pool_coreschedule函數(和execute_task函數強相關)

            bool schedule(task_type const & task) volatile

            {

            locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

            if(lockedThis->m_scheduler.push(task))

            {

            //task成功入隊列后,notify_one一個線程。

            lockedThis->m_task_or_terminate_workers_event.notify_one();

            return true;

            }

            else

            {

            return false;

            }

            }

            //thread_poolschedule函數

            bool schedule(task_type const & task)

            {

            return m_core->schedule(task);

            }

            //pool_coreschedule函數(和execute_task函數強相關)

            bool schedule(task_type const & task) volatile

            {

            locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

            if(lockedThis->m_scheduler.push(task))

            {

            //task成功入隊列后,notify_one一個線程。

            lockedThis->m_task_or_terminate_workers_event.notify_one();

            return true;

            }

            else

            {

            return false;

            }

            }

            // Wait until all tasks are finished.

            tp.wait();

            //pool_corewait函數

            void wait(size_t const task_threshold = 0) const volatile

            bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile

            wait函數是一個阻塞操作,內部邏輯實現使用了一個條件變量,提供超時等待方式。

            二、boost線程池使用實例

            線程池可以減少創建和切換線程的額外開銷,利用已經存在的線程多次循環執行多個任務從而提高系統的處理能力,有關線程池的概念可google搜索,下面將其使用實例:

            #include <iostream>
            #include <sstream>
            #include <boost/thread/mutex.hpp>
            #include <boost/bind.hpp>

            #include <boost/threadpool.hpp>

            using namespace std;
            using namespace boost::threadpool;


            //
            // Helpers
            boost::mutex m_io_monitor;

            void print(string text)
            {
            boost::mutex::scoped_lock lock(m_io_monitor);//每個線程使用全局互斥來保證每次只有一個線程執行
            cout << text;
            }

            template<typename T>
            string to_string(T const & value)
            {
            ostringstream ost;
            ost << value;
            ost.flush();
            return ost.str();
            }

            //
            // An example task functions
            void task_1()
            {
            print(" task_1()/n");
            }

            void task_2()
            {
            print(" task_2()/n");
            }

            void task_3()
            {
            print(" task_3()/n");
            }

            int task_4()
            {
            print(" task_4()/n");
            return 4;
            }

            void task_with_parameter(int value)
            {
            print(" task_with_parameter(" + to_string(value) + ")/n");
            }

            int loops = 0;
            bool looped_task()
            {
            print(" looped_task()/n");
            return ++loops < 5;
            }


            int task_int()
            {
            print(" task_int()/n");
            return 23;
            }


            void fifo_pool_test()
            {
            pool tp;

            tp.schedule(&task_1);
            tp.schedule(boost::bind(task_with_parameter, 4));

            if(!tp.empty())
            {
            tp.clear(); // remove all tasks -> no output in this test
            }

            size_t active_threads = tp.active();
            size_t pending_threads = tp.pending();
            size_t total_threads = tp.size();

            size_t dummy = active_threads + pending_threads + total_threads;
            dummy++;

            tp.size_controller().resize(5);
            tp.wait();
            }

            void lifo_pool_test()
            {
            lifo_pool tp;
            tp.size_controller().resize(0);
            schedule(tp, &task_1);
            tp.size_controller().resize(10);
            tp.wait();
            }

            void prio_pool_test()
            {
            prio_pool tp(2);
            schedule(tp, prio_task_func(1, &task_1));
            schedule(tp, prio_task_func(10,&task_2));
            }


            void future_test()
            {
            fifo_pool tp(5);
            future<int> fut = schedule(tp, &task_4);
            int res = fut();
            }


            int main (int , char * const [])
            {
            fifo_pool_test();
            lifo_pool_test();
            prio_pool_test();
            future_test();
            return 0;
            }

            任務返回值的獲取:

            一般異步調用中,返回值的獲取有同步獲取和異步獲取兩種形式。

            同步獲取返回值:

            int task_int_23()
            {
            cout<<"task_int_23()/n";
            return 23;
            }

            future<int> res = schedule(tp, &task_int_23);
            res.wait();

            cout<<"get res value:"<<res.get()<<endl;

            異步獲取返回值:

            不知道是設計者就不打算使用異步回調獲取返回值還是我看的不夠仔細,異步獲取返回值的方式還真沒有找著,只好自己簡單的寫了一個回調的仿函數來實現異步返回值的獲取。

            //R為任務函數的返回值類型
            template
            <class R>
            class callback_task
            {
            typedef boost::function<void (R)> callback;
            typedef boost::function<R ()> function;

            private:
            callback c_;
            function f_;

            public:
            //F:
            任務執行函數
            C:結果回調函數
            template<class F,class C>
            callback_task(F f,C c)
            {
            f_ = f;
            c_ = c;
            }

            void operator()()
            {
            c_(f_());
            }
            };

            通過這個對象可以很容易的實現異步結果的回調。

            //task_int_23的結果回調函數
            void callback(int k)
            {
            cout<<"get callback value:"<<k<<endl;
            }

            //通過回調的形式獲取任務的返回值
            tp.schedule(callback_task<int>(&task_int_23,&callback));

            少妇久久久久久被弄高潮| 日本亚洲色大成网站WWW久久| 四虎国产精品成人免费久久| 精品欧美一区二区三区久久久| 人人狠狠综合久久亚洲88| AV无码久久久久不卡网站下载| 人妻无码中文久久久久专区| 人妻少妇久久中文字幕| 97精品国产97久久久久久免费 | 一本久久a久久精品综合香蕉| 国产免费久久精品99久久| 亚洲国产成人久久精品动漫| 日韩精品久久久久久| 久久亚洲精品视频| 99久久精品免费| 国产精品VIDEOSSEX久久发布| 久久国产精品无码网站| 久久久久久一区国产精品| 内射无码专区久久亚洲| 伊人久久大香线蕉成人| 伊人久久大香线蕉综合5g| 2019久久久高清456| 日韩久久久久久中文人妻| 精品无码久久久久久午夜| 国产欧美一区二区久久| 国产真实乱对白精彩久久| 久久国产视频网| 久久成人小视频| 久久人人爽人人爽人人AV东京热| 久久成人国产精品| 88久久精品无码一区二区毛片| 久久久久久久综合综合狠狠| 久久天天躁狠狠躁夜夜2020一| 人妻无码αv中文字幕久久琪琪布| 99国产欧美久久久精品蜜芽| 青青青伊人色综合久久| 人妻丰满?V无码久久不卡| 国产成人精品综合久久久久| 国产∨亚洲V天堂无码久久久| 99久久夜色精品国产网站| 久久国产欧美日韩精品免费|