• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            隨筆 - 6  文章 - 11  trackbacks - 0
            <2011年2月>
            303112345
            6789101112
            13141516171819
            20212223242526
            272812345
            6789101112

            常用鏈接

            留言簿(1)

            隨筆檔案

            搜索

            •  

            最新評論

            閱讀排行榜

            評論排行榜

                  在C++中要進(jìn)行并發(fā)處理,不可避免要使用多線程,在傳統(tǒng)的教科書中,大家都是采用最原始的多線程技術(shù),應(yīng)用邏輯和線程并發(fā)策略緊密綁定。
                  在一個典型的服務(wù)器程序中,客戶端的請求往往包含了很多不同的邏輯命令,如在一個線程處理函數(shù)中,需要根據(jù)客戶端的命令代碼處理不同的業(yè)務(wù)邏輯:

            int thrad_main(int cmd_id,char *data){
               switch(cmd_id)
               {
               case 1:
                  ...
                  break;
               case 2:
                  ...
                  break;
               }
            }

               如此這般,業(yè)務(wù)處理邏輯和線程邏輯緊密耦合,這是一種很“丑陋”的代碼。
               如何通過一種優(yōu)雅的方法,分離并發(fā)邏輯和業(yè)務(wù)邏輯,通過通用的并發(fā)框架,業(yè)務(wù)邏輯設(shè)計(jì)者只需要關(guān)心自己的邏輯代碼,交給“線程池”去處理即可,而不需要去關(guān)心如何創(chuàng)建線程,等待線程結(jié)果這些瑣碎的“小事”?

               很簡單,高手出招,必談模式,下面是一種常用的并發(fā)模式,領(lǐng)導(dǎo)者/追隨者線程池模型:

               在一組預(yù)先分配的線程中通過“互斥”鎖來同步線程之間的行為,“線程”們通過“民主選舉”選出一位代表“領(lǐng)導(dǎo)者”站在最前端接收請求,拿到“任務(wù)”后,就從身后的候選“繼任者”中選出一個線程代替自己作為“領(lǐng)導(dǎo)者”,自己則變成“工作者”就跑到后面默默去執(zhí)行處理命令,這個“任務(wù)”是一個包含待處理數(shù)據(jù)和處理邏輯的自說明性任務(wù),也就是說所有的線程不必事先知道怎么處理接收到的任務(wù),因?yàn)樗玫降?#8220;任務(wù)包”中就包含了如何處理任務(wù)的說明。就像一個“代工工廠”的工人一樣,無需任何文化基礎(chǔ),會干活就行。
               那如何實(shí)現(xiàn)自說明任務(wù)呢?我們定義了一種稱為“Method_Request”的對象,它包含一個接口“virtual int call (void) = 0;”,線程池接受的任務(wù)就是這種Method_Request對象的實(shí)例,比如一個通知線程池結(jié)束工作的Method_Request可以定義為如下的類:
            1     class ExitRequest : public ACE_Method_Request
            2     {
            3     public:
            4         virtual int call (void){
            5             return -1;  // Cause exit.
            6         }
            7     };
            8 
                  我們重載call接口,添加處理這個請求的邏輯代碼,由于僅僅實(shí)現(xiàn)通知線程池結(jié)束工作的操作,我們返回一個特殊值“-1”,即可只是線城池:“工作完成了,你趕快洗洗睡吧!”,線程池會檢查Method_Request對象的返回值,如果是“0”就是處理正常完成,繼續(xù)等待下一個任務(wù),如果是“-1”,就關(guān)閉所有線程。

                  再來一個復(fù)雜點(diǎn)的例子,派生的Method_Request不僅有處理邏輯,還包括了需要處理的數(shù)據(jù):
             1 
             2 class M2M_EventRequest : public ACE_Method_Request
             3 {
             4     // Lua解釋器,每個事件使用自己單獨(dú)的腳本上下文
             5     LuaVM::ALEE_LuaService & m_svcs;
             6     ALEE_ScriptList_t & m_cmds;
             7 
             8     // 事件內(nèi)容
             9     std::string m_type_name;
            10     xml_event_t m_xml_event;
            11 
            12     // 調(diào)試信息
            13     DebugInfo_ptr m_debug;
            14 
            15 public:
            16     M2M_EventRequest(
            17         LuaVM::ALEE_LuaService & svcs,
            18         ALEE_ScriptList_t &cmds,
            19         string const & type_name,
            20         xml_event_t event);
            21 
            22     M2M_EventRequest(
            23         LuaVM::ALEE_LuaService & svcs,
            24         ALEE_ScriptList_t &cmds,
            25         string const & type_name,
            26         xml_event_t event,
            27         DebugInfo_ptr debug);
            28 
            29     virtual ~M2M_EventRequest (void);
            30 
            31     virtual int call (void);
            32 };
            33 
                  這個Method_Request的功能是,命令線程池調(diào)用Lua解析器處理一段腳本代碼,詳細(xì)邏輯就不解釋了,僅僅是一個示例,我們的重點(diǎn)在于線程池的實(shí)現(xiàn)。
                  下面就公布這個“萬能線程池的”實(shí)現(xiàn),其實(shí)這是一個基于ACE的線程庫實(shí)現(xiàn)的“領(lǐng)導(dǎo)者/追隨者”模式,我在其基礎(chǔ)上進(jìn)行了改進(jìn),增加了自適應(yīng)功能,可以根據(jù)請求隊(duì)列的負(fù)載,自動調(diào)整線程池中的線程數(shù)目。
                  閑話少說,上代碼,看得懂的童鞋恭喜你內(nèi)力深厚,還望多提寶貴意見,看不懂得小盆友也可以努力學(xué)習(xí),提高自己:
            // LeaderFollower.h
             1 #pragma once
             2 
             3 #include "dllmain.h"
             4 #include <map>
             5 #include <ace/Synch.h>    // ACE_Thread_Mutex
             6 #include <ace/Task.h>    // ACE_Task
             7 
             8 // 線程狀態(tài)
             9 enum LF_Status_t
            10 {
            11     TH_LEADER_ACTIVE,
            12     TH_FOLLOWER,
            13     TH_WORKER,
            14     TH_READY,
            15     TH_STOP,
            16 };
            17 
            18 struct LF_StatusTime_t
            19 {
            20     LF_Status_t    status;
            21     ACE_Time_Value working_tv;
            22     ACE_Time_Value start_time;
            23     ACE_Time_Value stop_time;
            24     ACE_Time_Value work_start;
            25     ACE_Time_Value work_time;
            26 };
            27 
            28 typedef std::map<ACE_thread_t,LF_StatusTime_t>  LF_StatusTimeList_t;
            29 
            30 class LF_Follower;
            31 
            32 // 領(lǐng)導(dǎo)者-追隨者線程池 模式實(shí)現(xiàn)
            33 class CPPXCORBA_API LeaderFollower
            34 {
            35 public:
            36     LeaderFollower(void);
            37     ~LeaderFollower(void);
            38 
            39 protected:
            40     LF_Follower * make_follower(void);
            41     int     become_leader(void);
            42     int     elect_new_leader(void);
            43     bool leader_active(void);
            44     void set_active_leader(ACE_thread_t leader);
            45 
            46 private:
            47     ACE_Unbounded_Queue<LF_Follower*>   m_followers;
            48     ACE_Thread_Mutex                    m_followers_lock;
            49     ACE_Thread_Mutex                    m_leader_lock;
            50     ACE_thread_t                        m_current_leader;
            51 
            52     //////////////////////////////////////////////////////////////////////////
            53     /// 線程池狀態(tài)監(jiān)控
            54 public:
            55     const LF_StatusTimeList_t & get_status(voidconst;
            56     const float get_load_rate(voidconst;
            57 
            58 protected:
            59     void set_status(LF_Status_t status);
            60     void set_worktime(ACE_Time_Value work_time);
            61 
            62 private:
            63     LF_StatusTimeList_t m_status_time_list;
            64     ACE_Thread_Mutex    m_status_lock;
            65 };
            66 

            // LeaderFollower.cpp
              1 #include "stdafx.h"
              2 #include "LeaderFollower.h"
              3 #include "../cppx.core/dllmain.h"
              4 
              5 // 追隨者標(biāo)記
              6 class LF_Follower
              7 {
              8     ACE_Condition<ACE_Thread_Mutex> m_cond;
              9     ACE_thread_t                    m_owner;
             10 
             11 public:
             12     LF_Follower(ACE_Thread_Mutex &leader_lock) : m_cond(leader_lock) {
             13         m_owner = ACE_Thread::self();
             14     }
             15     int wait(void){
             16         return m_cond.wait();
             17     }
             18     int signal(void){
             19         return m_cond.signal();
             20     }
             21     ACE_thread_t owner(void){
             22         return m_owner;
             23     }
             24 
             25 };
             26 
             27 //////////////////////////////////////////////////////////////////////////
             28 LeaderFollower::LeaderFollower(void) :
             29 m_current_leader(0)
             30 {
             31 }
             32 
             33 LeaderFollower::~LeaderFollower(void)
             34 {
             35 }
             36 
             37 LF_Follower * 
             38 LeaderFollower::make_follower( void )
             39 {
             40     ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, 0);
             41     
             42     LF_Follower *fw;
             43     ACE_NEW_RETURN(fw, LF_Follower(m_leader_lock), 0);
             44     m_followers.enqueue_tail(fw);
             45     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) make_follower \t: Now has %d followers.\n"), m_followers.size()));
             46     return fw;
             47 }
             48 
             49 int 
             50 LeaderFollower::become_leader( void )
             51 {
             52     ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
             53 
             54     if( leader_active() && m_current_leader != ACE_Thread::self() ){
             55         while(leader_active()){
             56             set_status(TH_FOLLOWER);
             57             auto_ptr<LF_Follower> fw(make_follower());
             58             fw->wait();         // Wait until told to do so.
             59         }
             60     }
             61 
             62     // Mark yourself as the active leader.
             63     set_active_leader(ACE_Thread::self());
             64     set_status(TH_LEADER_ACTIVE);
             65     //ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) become_leader \t: Becoming the leader.\n")));
             66     return 0;
             67 }
             68 
             69 int 
             70 LeaderFollower::elect_new_leader( void )
             71 {
             72     ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
             73 
             74     set_active_leader(0);
             75 
             76     // Wake up a follower
             77     if!m_followers.is_empty() ){
             78         ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, -1);
             79 
             80         // Get the old follower.
             81         LF_Follower *fw;
             82         if( m_followers.dequeue_head(fw) != 0 )
             83             return -1;
             84 
             85         //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Resigning and electing %d.\n"), fw->owner()));
             86         return (fw->signal() == 0? 0 : -1;
             87     }
             88 
             89     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Oops no followers left\n")));
             90     return -1;
             91 }
             92 
             93 bool 
             94 LeaderFollower::leader_active( void )
             95 {
             96     return (m_current_leader != 0);
             97 }
             98 
             99 void 
            100 LeaderFollower::set_active_leader( ACE_thread_t leader )
            101 {
            102     m_current_leader = leader;
            103 }
            104 
            105 void LeaderFollower::set_worktime( ACE_Time_Value work_time )
            106 {
            107     ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
            108     LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
            109     info.working_tv = work_time;
            110 }
            111 
            112 void LeaderFollower::set_status( LF_Status_t status )
            113 {
            114     ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
            115     LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
            116     switch(status)
            117     {
            118     case TH_READY:
            119         info.start_time = ACE_OS::gettimeofday();
            120         break;
            121     case TH_STOP:
            122         info.stop_time = ACE_OS::gettimeofday();
            123         break;
            124     case TH_WORKER:
            125         info.work_start = ACE_OS::gettimeofday();
            126         break;
            127     case TH_LEADER_ACTIVE:
            128     case TH_FOLLOWER:
            129         if( info.status == TH_WORKER )
            130             info.work_time += ACE_OS::gettimeofday() - info.work_start;
            131         break;
            132     }
            133     info.status = status;
            134 }
            135 
            136 const LF_StatusTimeList_t & 
            137 LeaderFollower::get_status( void ) const
            138 {
            139     return m_status_time_list;
            140 }
            141 
            142 const float 
            143 LeaderFollower::get_load_rate( void ) const
            144 {
            145     ACE_Time_Value work_time,run_time;
            146     foreach(const LF_StatusTimeList_t::value_type & info,get_status()){
            147         if( info.second.status != TH_STOP ){
            148             work_time += info.second.work_time;
            149             run_time += ACE_OS::gettimeofday() - info.second.start_time;
            150         }
            151     }
            152     return (float)work_time.usec()/run_time.usec()*100;
            153 }
            154 

            // LF_ThreadPool.h
             1 #pragma once
             2 
             3 #include "LeaderFollower.h"
             4 
             5 #include <ace/Task.h>
             6 #include <ace/Activation_Queue.h>
             7 #include <ace/Method_Request.h>
             8 
             9 class CPPXCORBA_API LF_ThreadPool :
            10     public ACE_Task_Base,
            11     public LeaderFollower
            12 {
            13     class ExitRequest : public ACE_Method_Request
            14     {
            15     public:
            16         virtual int call (void){
            17             return -1;  // Cause exit.
            18         }
            19     };
            20 
            21     bool m_bShutdown;
            22     bool m_bRunning;
            23     ACE_Activation_Queue m_activation_queue_;
            24 
            25     static const size_t ScheduleTime = 10;
            26     static const size_t MinThreadNum = 10;
            27     static const size_t MaxThreadNum = 20;
            28 
            29 public:
            30     LF_ThreadPool(void);
            31     ~LF_ThreadPool(void);
            32 
            33     virtual int svc(void);
            34 
            35     int start_stread_pool( void );
            36     int stop_thread_pool( void );
            37     int post_request( ACE_Method_Request *request );
            38 
            39     int get_queue_load(void){ return m_activation_queue_.method_count(); }
            40     int get_max_thread(void){ return MaxThreadNum; }
            41     int get_min_thread(void){ return MinThreadNum; }
            42 
            43 private:
            44     int _fork_new_thread( void );
            45     int _post_exit_request(void);
            46 };
            47 

            // LF_ThreadPool.cpp
              1 #include "stdafx.h"
              2 #include "LF_ThreadPool.h"
              3 
              4 LF_ThreadPool::LF_ThreadPool(void) :
              5 m_bShutdown(false),
              6 m_bRunning(false)
              7 {
              8 }
              9 
             10 LF_ThreadPool::~LF_ThreadPool(void)
             11 {
             12 }
             13 
             14 int LF_ThreadPool::svc( void )
             15 {
             16     //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread started.\t: %d working threads left.\n"),thr_count()));
             17 
             18     // 線程開始運(yùn)行
             19     m_bRunning = true;
             20 
             21     set_status(TH_READY);
             22 
             23     while(true){
             24         // Block until this thread is the leader.
             25         become_leader();
             26 
             27         // 設(shè)置線程空閑時(shí)間,空閑線程將會自動退出
             28         ACE_Time_Value tv(ScheduleTime);
             29         tv += ACE_OS::gettimeofday();
             30 
             31         // 從隊(duì)列獲取下一個請求,并獲得所有權(quán)
             32         auto_ptr<ACE_Method_Request> request(m_activation_queue_.dequeue(&tv));
             33         if( request.get() == 0 ){                                               // 長時(shí)間沒有請求,dequeue超時(shí)返回
             34             if( elect_new_leader() == 0 && thr_count() > MinThreadNum )         // 成功選擇新的領(lǐng)導(dǎo)者,且工作線程數(shù)大于最少線程數(shù)
             35                 break;                                                          // 結(jié)束當(dāng)前線程
             36             if( thr_count() < MinThreadNum && thr_count() < MaxThreadNum )      // 工作線程數(shù)小于最少線程數(shù),創(chuàng)建新的線程
             37                 _fork_new_thread();
             38             continue;                                                           // 繼續(xù)擔(dān)當(dāng)領(lǐng)導(dǎo)者(優(yōu)先成為領(lǐng)導(dǎo)者),或返回線程池等待
             39         }
             40 
             41         // Elect a new leader then process the request
             42         if( elect_new_leader() != 0 || thr_count() < MinThreadNum )             // 沒有空余線程可成為領(lǐng)導(dǎo)者,或者線程池容量調(diào)整
             43             if!m_bShutdown )                                                  // 且沒有調(diào)度關(guān)閉
             44                 if( thr_count() < MaxThreadNum )                                // 未達(dá)到線程數(shù)上線
             45                     _fork_new_thread();                                         // 創(chuàng)建新的線程
             46 
             47         // Invoke the method request.
             48         set_status(TH_WORKER);
             49 
             50         ACE_Time_Value tv_start,tv_finish,tv_working;
             51         tv_start = ACE_OS::gettimeofday();
             52 
             53         int result = request->call();
             54 
             55         tv_finish = ACE_OS::gettimeofday();
             56         tv_working = tv_finish - tv_start;
             57         set_worktime(tv_working);
             58 
             59         if( result == -1 ){
             60             if( thr_count() > 1 )                                                // If received a ExitMethod, Notify the next Thread(if exists) to exit too.
             61                 _post_exit_request();
             62             break;
             63         }
             64     }
             65 
             66     // 剩下最后一個線程,線程池停止
             67     if( thr_count() == 1 )
             68         m_bRunning = false;
             69 
             70     set_status(TH_STOP);
             71     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread stoped.\t: %d working threads left.\n"),thr_count()-1));
             72     return 0;
             73 }
             74 
             75 int LF_ThreadPool::start_stread_pool( void )
             76 {
             77     m_bShutdown = false;
             78     return activate(THR_NEW_LWP| THR_JOINABLE,MinThreadNum);
             79 }
             80 
             81 int LF_ThreadPool::stop_thread_pool( void )
             82 {
             83     // 線程池已停止
             84     if!m_bRunning )
             85         return 0;
             86 
             87     m_bShutdown = true;
             88     _post_exit_request();
             89     return wait();
             90 }
             91 
             92 int LF_ThreadPool::post_request( ACE_Method_Request *request )
             93 {
             94     ACE_TRACE (ACE_TEXT ("SvcThreadPool::enqueue"));
             95     return m_activation_queue_.enqueue (request);
             96 }
             97 
             98 int LF_ThreadPool::_fork_new_thread( void )
             99 {
            100     return activate(THR_NEW_LWP| THR_JOINABLE,1,1);
            101 }
            102 
            103 int LF_ThreadPool::_post_exit_request( void )
            104 {
            105     return post_request(new ExitRequest);
            106 }
            107 

                  怎么樣?很簡單吧?什么?怎么用?Oh My Lady GaGa!還是告訴你吧:
            1 m_pool.post_request(new M2M_EventRequest(m_lua_svc,m_lua_scripts,type_name,xml_event,*iter));
                  需要線程池出來干活的時(shí)候,創(chuàng)建一個請求對象,扔給他就行了!

                  好了,代碼就是最好的文檔,C++開源社區(qū)給了我成長的土壤,希望能對后來者有所幫助。
                  把這些東西貼出來,是為了整理自己的大腦,免得這些曾經(jīng)頂著熊貓眼熬出來的東西,塵封在茫茫的代碼海洋中,取之于前輩,還之于后人。也希望有更多的高手能夠慷慨布道,壯大我們的C++社區(qū)。
            posted on 2011-02-28 15:46 風(fēng)雷九州 閱讀(4267) 評論(3)  編輯 收藏 引用

            FeedBack:
            # re: 一個基于ACE的負(fù)載自適應(yīng)萬能線程池實(shí)現(xiàn) 2011-02-28 17:25 true
            常見的一種需求:把一個客戶端的處理始終綁定到某一個線程,就是說各個請求之間是有時(shí)序要求的。線程池的自適應(yīng)要考慮一下這個問題  回復(fù)  更多評論
              
            # re: 一個基于ACE的負(fù)載自適應(yīng)萬能線程池實(shí)現(xiàn) 2011-02-28 17:49 風(fēng)雷九州
            @true

            嗯,這個當(dāng)初也有考慮到,是為了線程的上下文緩存盡量少切換,在數(shù)據(jù)處理中為了充分發(fā)揮CPU的緩存性能,還需要考慮線程的優(yōu)先執(zhí)行CPU等。

            這些措施在性能要求十分苛刻的情況下時(shí)必須要考慮的,我的方案目前僅是實(shí)現(xiàn)了客戶端請求與線程的分離,使程序的架構(gòu)更靈活,能夠滿足一般的服務(wù)器并發(fā)性能要求即可。

            @true

            我理解錯了,你說的可能是,需求要求客戶端的請求是有固定的時(shí)序的,但是我的方案并不是用來處理客戶端并發(fā)的,線程池處理的是大量的設(shè)備消息,這些消息通常大量并發(fā)到達(dá),而且相互之間沒有什么關(guān)系,故不需要某個特定的線程來處理。

            設(shè)備的狀態(tài)是保存在單獨(dú)的狀態(tài)服務(wù)中的的,任何一個線程接到處理任務(wù)都能夠處理設(shè)備的狀態(tài)邏輯,多線程之間是通過“讀寫鎖”共享狀態(tài)服務(wù)的。

            除非是考慮到線程執(zhí)行上下文切換的代價(jià)帶來的性能損失,否則邏輯上是不關(guān)心某個請求是被哪個線程處理的。  回復(fù)  更多評論
              
            # re: 一個基于ACE的負(fù)載自適應(yīng)萬能線程池實(shí)現(xiàn) 2011-02-28 22:19 liquanhai
            非常感謝,學(xué)習(xí)了  回復(fù)  更多評論
              

            只有注冊用戶登錄后才能發(fā)表評論。
            網(wǎng)站導(dǎo)航: 博客園   IT新聞   BlogJava   博問   Chat2DB   管理


            99久久亚洲综合精品网站| 久久综合九色综合网站| 日韩精品国产自在久久现线拍 | 国产精品美女久久久久av爽| 久久99精品久久久久久水蜜桃| 久久久艹| 国产成人久久精品激情| 久久九九久精品国产| 亚洲精品乱码久久久久久蜜桃不卡 | 亚洲精品乱码久久久久久蜜桃不卡| 久久青青草原精品国产| 久久国产视频99电影| 精品综合久久久久久97| 国产成人精品久久亚洲| 久久久久久国产精品美女| 国产激情久久久久影院| 东方aⅴ免费观看久久av| 国产精品青草久久久久福利99| 免费久久人人爽人人爽av| 久久久久亚洲av综合波多野结衣| 亚洲人成网站999久久久综合 | 久久久青草青青国产亚洲免观| 久久人人爽人人爽人人片AV不| 久久天堂AV综合合色蜜桃网| 人人狠狠综合久久亚洲高清| 久久精品一区二区国产| 久久国产精品无| 亚洲欧美久久久久9999| 欧美亚洲国产精品久久蜜芽| 久久综合给合久久国产免费| 久久综合色老色| 狠狠人妻久久久久久综合| 久久国产乱子伦精品免费强| 久久AV高清无码| 久久久久人妻一区精品色| 久久婷婷色综合一区二区| 久久精品国产福利国产琪琪 | 久久伊人精品青青草原日本| 99精品伊人久久久大香线蕉| 色综合合久久天天综合绕视看 | 久久国产成人精品麻豆 |