在一個典型的服務(wù)器程序中,客戶端的請求往往包含了很多不同的邏輯命令,如在一個線程處理函數(shù)中,需要根據(jù)客戶端的命令代碼處理不同的業(yè)務(wù)邏輯:
int thrad_main(int cmd_id,char *data){
switch(cmd_id)
{
case 1:
...
break;
case 2:
...
break;
}
}
如此這般,業(yè)務(wù)處理邏輯和線程邏輯緊密耦合,這是一種很“丑陋”的代碼。
如何通過一種優(yōu)雅的方法,分離并發(fā)邏輯和業(yè)務(wù)邏輯,通過通用的并發(fā)框架,業(yè)務(wù)邏輯設(shè)計(jì)者只需要關(guān)心自己的邏輯代碼,交給“線程池”去處理即可,而不需要去關(guān)心如何創(chuàng)建線程,等待線程結(jié)果這些瑣碎的“小事”?
很簡單,高手出招,必談模式,下面是一種常用的并發(fā)模式,領(lǐng)導(dǎo)者/追隨者線程池模型:
在一組預(yù)先分配的線程中通過“互斥”鎖來同步線程之間的行為,“線程”們通過“民主選舉”選出一位代表“領(lǐng)導(dǎo)者”站在最前端接收請求,拿到“任務(wù)”后,就從身后的候選“繼任者”中選出一個線程代替自己作為“領(lǐng)導(dǎo)者”,自己則變成“工作者”就跑到后面默默去執(zhí)行處理命令,這個“任務(wù)”是一個包含待處理數(shù)據(jù)和處理邏輯的自說明性任務(wù),也就是說所有的線程不必事先知道怎么處理接收到的任務(wù),因?yàn)樗玫降?#8220;任務(wù)包”中就包含了如何處理任務(wù)的說明。就像一個“代工工廠”的工人一樣,無需任何文化基礎(chǔ),會干活就行。
那如何實(shí)現(xiàn)自說明任務(wù)呢?我們定義了一種稱為“Method_Request”的對象,它包含一個接口“virtual int call (void) = 0;”,線程池接受的任務(wù)就是這種Method_Request對象的實(shí)例,比如一個通知線程池結(jié)束工作的Method_Request可以定義為如下的類:
1 class ExitRequest : public ACE_Method_Request
2 {
3 public:
4 virtual int call (void){
5 return -1; // Cause exit.
6 }
7 };
8
我們重載call接口,添加處理這個請求的邏輯代碼,由于僅僅實(shí)現(xiàn)通知線程池結(jié)束工作的操作,我們返回一個特殊值“-1”,即可只是線城池:“工作完成了,你趕快洗洗睡吧!”,線程池會檢查Method_Request對象的返回值,如果是“0”就是處理正常完成,繼續(xù)等待下一個任務(wù),如果是“-1”,就關(guān)閉所有線程。2 {
3 public:
4 virtual int call (void){
5 return -1; // Cause exit.
6 }
7 };
8
再來一個復(fù)雜點(diǎn)的例子,派生的Method_Request不僅有處理邏輯,還包括了需要處理的數(shù)據(jù):
1
2 class M2M_EventRequest : public ACE_Method_Request
3 {
4 // Lua解釋器,每個事件使用自己單獨(dú)的腳本上下文
5 LuaVM::ALEE_LuaService & m_svcs;
6 ALEE_ScriptList_t & m_cmds;
7
8 // 事件內(nèi)容
9 std::string m_type_name;
10 xml_event_t m_xml_event;
11
12 // 調(diào)試信息
13 DebugInfo_ptr m_debug;
14
15 public:
16 M2M_EventRequest(
17 LuaVM::ALEE_LuaService & svcs,
18 ALEE_ScriptList_t &cmds,
19 string const & type_name,
20 xml_event_t event);
21
22 M2M_EventRequest(
23 LuaVM::ALEE_LuaService & svcs,
24 ALEE_ScriptList_t &cmds,
25 string const & type_name,
26 xml_event_t event,
27 DebugInfo_ptr debug);
28
29 virtual ~M2M_EventRequest (void);
30
31 virtual int call (void);
32 };
33
這個Method_Request的功能是,命令線程池調(diào)用Lua解析器處理一段腳本代碼,詳細(xì)邏輯就不解釋了,僅僅是一個示例,我們的重點(diǎn)在于線程池的實(shí)現(xiàn)。2 class M2M_EventRequest : public ACE_Method_Request
3 {
4 // Lua解釋器,每個事件使用自己單獨(dú)的腳本上下文
5 LuaVM::ALEE_LuaService & m_svcs;
6 ALEE_ScriptList_t & m_cmds;
7
8 // 事件內(nèi)容
9 std::string m_type_name;
10 xml_event_t m_xml_event;
11
12 // 調(diào)試信息
13 DebugInfo_ptr m_debug;
14
15 public:
16 M2M_EventRequest(
17 LuaVM::ALEE_LuaService & svcs,
18 ALEE_ScriptList_t &cmds,
19 string const & type_name,
20 xml_event_t event);
21
22 M2M_EventRequest(
23 LuaVM::ALEE_LuaService & svcs,
24 ALEE_ScriptList_t &cmds,
25 string const & type_name,
26 xml_event_t event,
27 DebugInfo_ptr debug);
28
29 virtual ~M2M_EventRequest (void);
30
31 virtual int call (void);
32 };
33
下面就公布這個“萬能線程池的”實(shí)現(xiàn),其實(shí)這是一個基于ACE的線程庫實(shí)現(xiàn)的“領(lǐng)導(dǎo)者/追隨者”模式,我在其基礎(chǔ)上進(jìn)行了改進(jìn),增加了自適應(yīng)功能,可以根據(jù)請求隊(duì)列的負(fù)載,自動調(diào)整線程池中的線程數(shù)目。
閑話少說,上代碼,看得懂的童鞋恭喜你內(nèi)力深厚,還望多提寶貴意見,看不懂得小盆友也可以努力學(xué)習(xí),提高自己:
// LeaderFollower.h
1 #pragma once
2
3 #include "dllmain.h"
4 #include <map>
5 #include <ace/Synch.h> // ACE_Thread_Mutex
6 #include <ace/Task.h> // ACE_Task
7
8 // 線程狀態(tài)
9 enum LF_Status_t
10 {
11 TH_LEADER_ACTIVE,
12 TH_FOLLOWER,
13 TH_WORKER,
14 TH_READY,
15 TH_STOP,
16 };
17
18 struct LF_StatusTime_t
19 {
20 LF_Status_t status;
21 ACE_Time_Value working_tv;
22 ACE_Time_Value start_time;
23 ACE_Time_Value stop_time;
24 ACE_Time_Value work_start;
25 ACE_Time_Value work_time;
26 };
27
28 typedef std::map<ACE_thread_t,LF_StatusTime_t> LF_StatusTimeList_t;
29
30 class LF_Follower;
31
32 // 領(lǐng)導(dǎo)者-追隨者線程池 模式實(shí)現(xiàn)
33 class CPPXCORBA_API LeaderFollower
34 {
35 public:
36 LeaderFollower(void);
37 ~LeaderFollower(void);
38
39 protected:
40 LF_Follower * make_follower(void);
41 int become_leader(void);
42 int elect_new_leader(void);
43 bool leader_active(void);
44 void set_active_leader(ACE_thread_t leader);
45
46 private:
47 ACE_Unbounded_Queue<LF_Follower*> m_followers;
48 ACE_Thread_Mutex m_followers_lock;
49 ACE_Thread_Mutex m_leader_lock;
50 ACE_thread_t m_current_leader;
51
52 //////////////////////////////////////////////////////////////////////////
53 /// 線程池狀態(tài)監(jiān)控
54 public:
55 const LF_StatusTimeList_t & get_status(void) const;
56 const float get_load_rate(void) const;
57
58 protected:
59 void set_status(LF_Status_t status);
60 void set_worktime(ACE_Time_Value work_time);
61
62 private:
63 LF_StatusTimeList_t m_status_time_list;
64 ACE_Thread_Mutex m_status_lock;
65 };
66
2
3 #include "dllmain.h"
4 #include <map>
5 #include <ace/Synch.h> // ACE_Thread_Mutex
6 #include <ace/Task.h> // ACE_Task
7
8 // 線程狀態(tài)
9 enum LF_Status_t
10 {
11 TH_LEADER_ACTIVE,
12 TH_FOLLOWER,
13 TH_WORKER,
14 TH_READY,
15 TH_STOP,
16 };
17
18 struct LF_StatusTime_t
19 {
20 LF_Status_t status;
21 ACE_Time_Value working_tv;
22 ACE_Time_Value start_time;
23 ACE_Time_Value stop_time;
24 ACE_Time_Value work_start;
25 ACE_Time_Value work_time;
26 };
27
28 typedef std::map<ACE_thread_t,LF_StatusTime_t> LF_StatusTimeList_t;
29
30 class LF_Follower;
31
32 // 領(lǐng)導(dǎo)者-追隨者線程池 模式實(shí)現(xiàn)
33 class CPPXCORBA_API LeaderFollower
34 {
35 public:
36 LeaderFollower(void);
37 ~LeaderFollower(void);
38
39 protected:
40 LF_Follower * make_follower(void);
41 int become_leader(void);
42 int elect_new_leader(void);
43 bool leader_active(void);
44 void set_active_leader(ACE_thread_t leader);
45
46 private:
47 ACE_Unbounded_Queue<LF_Follower*> m_followers;
48 ACE_Thread_Mutex m_followers_lock;
49 ACE_Thread_Mutex m_leader_lock;
50 ACE_thread_t m_current_leader;
51
52 //////////////////////////////////////////////////////////////////////////
53 /// 線程池狀態(tài)監(jiān)控
54 public:
55 const LF_StatusTimeList_t & get_status(void) const;
56 const float get_load_rate(void) const;
57
58 protected:
59 void set_status(LF_Status_t status);
60 void set_worktime(ACE_Time_Value work_time);
61
62 private:
63 LF_StatusTimeList_t m_status_time_list;
64 ACE_Thread_Mutex m_status_lock;
65 };
66
// LeaderFollower.cpp
1 #include "stdafx.h"
2 #include "LeaderFollower.h"
3 #include "../cppx.core/dllmain.h"
4
5 // 追隨者標(biāo)記
6 class LF_Follower
7 {
8 ACE_Condition<ACE_Thread_Mutex> m_cond;
9 ACE_thread_t m_owner;
10
11 public:
12 LF_Follower(ACE_Thread_Mutex &leader_lock) : m_cond(leader_lock) {
13 m_owner = ACE_Thread::self();
14 }
15 int wait(void){
16 return m_cond.wait();
17 }
18 int signal(void){
19 return m_cond.signal();
20 }
21 ACE_thread_t owner(void){
22 return m_owner;
23 }
24
25 };
26
27 //////////////////////////////////////////////////////////////////////////
28 LeaderFollower::LeaderFollower(void) :
29 m_current_leader(0)
30 {
31 }
32
33 LeaderFollower::~LeaderFollower(void)
34 {
35 }
36
37 LF_Follower *
38 LeaderFollower::make_follower( void )
39 {
40 ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, 0);
41
42 LF_Follower *fw;
43 ACE_NEW_RETURN(fw, LF_Follower(m_leader_lock), 0);
44 m_followers.enqueue_tail(fw);
45 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) make_follower \t: Now has %d followers.\n"), m_followers.size()));
46 return fw;
47 }
48
49 int
50 LeaderFollower::become_leader( void )
51 {
52 ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
53
54 if( leader_active() && m_current_leader != ACE_Thread::self() ){
55 while(leader_active()){
56 set_status(TH_FOLLOWER);
57 auto_ptr<LF_Follower> fw(make_follower());
58 fw->wait(); // Wait until told to do so.
59 }
60 }
61
62 // Mark yourself as the active leader.
63 set_active_leader(ACE_Thread::self());
64 set_status(TH_LEADER_ACTIVE);
65 //ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) become_leader \t: Becoming the leader.\n")));
66 return 0;
67 }
68
69 int
70 LeaderFollower::elect_new_leader( void )
71 {
72 ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
73
74 set_active_leader(0);
75
76 // Wake up a follower
77 if( !m_followers.is_empty() ){
78 ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, -1);
79
80 // Get the old follower.
81 LF_Follower *fw;
82 if( m_followers.dequeue_head(fw) != 0 )
83 return -1;
84
85 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Resigning and electing %d.\n"), fw->owner()));
86 return (fw->signal() == 0) ? 0 : -1;
87 }
88
89 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Oops no followers left\n")));
90 return -1;
91 }
92
93 bool
94 LeaderFollower::leader_active( void )
95 {
96 return (m_current_leader != 0);
97 }
98
99 void
100 LeaderFollower::set_active_leader( ACE_thread_t leader )
101 {
102 m_current_leader = leader;
103 }
104
105 void LeaderFollower::set_worktime( ACE_Time_Value work_time )
106 {
107 ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
108 LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
109 info.working_tv = work_time;
110 }
111
112 void LeaderFollower::set_status( LF_Status_t status )
113 {
114 ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
115 LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
116 switch(status)
117 {
118 case TH_READY:
119 info.start_time = ACE_OS::gettimeofday();
120 break;
121 case TH_STOP:
122 info.stop_time = ACE_OS::gettimeofday();
123 break;
124 case TH_WORKER:
125 info.work_start = ACE_OS::gettimeofday();
126 break;
127 case TH_LEADER_ACTIVE:
128 case TH_FOLLOWER:
129 if( info.status == TH_WORKER )
130 info.work_time += ACE_OS::gettimeofday() - info.work_start;
131 break;
132 }
133 info.status = status;
134 }
135
136 const LF_StatusTimeList_t &
137 LeaderFollower::get_status( void ) const
138 {
139 return m_status_time_list;
140 }
141
142 const float
143 LeaderFollower::get_load_rate( void ) const
144 {
145 ACE_Time_Value work_time,run_time;
146 foreach(const LF_StatusTimeList_t::value_type & info,get_status()){
147 if( info.second.status != TH_STOP ){
148 work_time += info.second.work_time;
149 run_time += ACE_OS::gettimeofday() - info.second.start_time;
150 }
151 }
152 return (float)work_time.usec()/run_time.usec()*100;
153 }
154
2 #include "LeaderFollower.h"
3 #include "../cppx.core/dllmain.h"
4
5 // 追隨者標(biāo)記
6 class LF_Follower
7 {
8 ACE_Condition<ACE_Thread_Mutex> m_cond;
9 ACE_thread_t m_owner;
10
11 public:
12 LF_Follower(ACE_Thread_Mutex &leader_lock) : m_cond(leader_lock) {
13 m_owner = ACE_Thread::self();
14 }
15 int wait(void){
16 return m_cond.wait();
17 }
18 int signal(void){
19 return m_cond.signal();
20 }
21 ACE_thread_t owner(void){
22 return m_owner;
23 }
24
25 };
26
27 //////////////////////////////////////////////////////////////////////////
28 LeaderFollower::LeaderFollower(void) :
29 m_current_leader(0)
30 {
31 }
32
33 LeaderFollower::~LeaderFollower(void)
34 {
35 }
36
37 LF_Follower *
38 LeaderFollower::make_follower( void )
39 {
40 ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, 0);
41
42 LF_Follower *fw;
43 ACE_NEW_RETURN(fw, LF_Follower(m_leader_lock), 0);
44 m_followers.enqueue_tail(fw);
45 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) make_follower \t: Now has %d followers.\n"), m_followers.size()));
46 return fw;
47 }
48
49 int
50 LeaderFollower::become_leader( void )
51 {
52 ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
53
54 if( leader_active() && m_current_leader != ACE_Thread::self() ){
55 while(leader_active()){
56 set_status(TH_FOLLOWER);
57 auto_ptr<LF_Follower> fw(make_follower());
58 fw->wait(); // Wait until told to do so.
59 }
60 }
61
62 // Mark yourself as the active leader.
63 set_active_leader(ACE_Thread::self());
64 set_status(TH_LEADER_ACTIVE);
65 //ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) become_leader \t: Becoming the leader.\n")));
66 return 0;
67 }
68
69 int
70 LeaderFollower::elect_new_leader( void )
71 {
72 ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock, -1);
73
74 set_active_leader(0);
75
76 // Wake up a follower
77 if( !m_followers.is_empty() ){
78 ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock, -1);
79
80 // Get the old follower.
81 LF_Follower *fw;
82 if( m_followers.dequeue_head(fw) != 0 )
83 return -1;
84
85 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Resigning and electing %d.\n"), fw->owner()));
86 return (fw->signal() == 0) ? 0 : -1;
87 }
88
89 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Oops no followers left\n")));
90 return -1;
91 }
92
93 bool
94 LeaderFollower::leader_active( void )
95 {
96 return (m_current_leader != 0);
97 }
98
99 void
100 LeaderFollower::set_active_leader( ACE_thread_t leader )
101 {
102 m_current_leader = leader;
103 }
104
105 void LeaderFollower::set_worktime( ACE_Time_Value work_time )
106 {
107 ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
108 LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
109 info.working_tv = work_time;
110 }
111
112 void LeaderFollower::set_status( LF_Status_t status )
113 {
114 ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
115 LF_StatusTime_t & info = m_status_time_list[ACE_Thread::self()];
116 switch(status)
117 {
118 case TH_READY:
119 info.start_time = ACE_OS::gettimeofday();
120 break;
121 case TH_STOP:
122 info.stop_time = ACE_OS::gettimeofday();
123 break;
124 case TH_WORKER:
125 info.work_start = ACE_OS::gettimeofday();
126 break;
127 case TH_LEADER_ACTIVE:
128 case TH_FOLLOWER:
129 if( info.status == TH_WORKER )
130 info.work_time += ACE_OS::gettimeofday() - info.work_start;
131 break;
132 }
133 info.status = status;
134 }
135
136 const LF_StatusTimeList_t &
137 LeaderFollower::get_status( void ) const
138 {
139 return m_status_time_list;
140 }
141
142 const float
143 LeaderFollower::get_load_rate( void ) const
144 {
145 ACE_Time_Value work_time,run_time;
146 foreach(const LF_StatusTimeList_t::value_type & info,get_status()){
147 if( info.second.status != TH_STOP ){
148 work_time += info.second.work_time;
149 run_time += ACE_OS::gettimeofday() - info.second.start_time;
150 }
151 }
152 return (float)work_time.usec()/run_time.usec()*100;
153 }
154
// LF_ThreadPool.h
1 #pragma once
2
3 #include "LeaderFollower.h"
4
5 #include <ace/Task.h>
6 #include <ace/Activation_Queue.h>
7 #include <ace/Method_Request.h>
8
9 class CPPXCORBA_API LF_ThreadPool :
10 public ACE_Task_Base,
11 public LeaderFollower
12 {
13 class ExitRequest : public ACE_Method_Request
14 {
15 public:
16 virtual int call (void){
17 return -1; // Cause exit.
18 }
19 };
20
21 bool m_bShutdown;
22 bool m_bRunning;
23 ACE_Activation_Queue m_activation_queue_;
24
25 static const size_t ScheduleTime = 10;
26 static const size_t MinThreadNum = 10;
27 static const size_t MaxThreadNum = 20;
28
29 public:
30 LF_ThreadPool(void);
31 ~LF_ThreadPool(void);
32
33 virtual int svc(void);
34
35 int start_stread_pool( void );
36 int stop_thread_pool( void );
37 int post_request( ACE_Method_Request *request );
38
39 int get_queue_load(void){ return m_activation_queue_.method_count(); }
40 int get_max_thread(void){ return MaxThreadNum; }
41 int get_min_thread(void){ return MinThreadNum; }
42
43 private:
44 int _fork_new_thread( void );
45 int _post_exit_request(void);
46 };
47
2
3 #include "LeaderFollower.h"
4
5 #include <ace/Task.h>
6 #include <ace/Activation_Queue.h>
7 #include <ace/Method_Request.h>
8
9 class CPPXCORBA_API LF_ThreadPool :
10 public ACE_Task_Base,
11 public LeaderFollower
12 {
13 class ExitRequest : public ACE_Method_Request
14 {
15 public:
16 virtual int call (void){
17 return -1; // Cause exit.
18 }
19 };
20
21 bool m_bShutdown;
22 bool m_bRunning;
23 ACE_Activation_Queue m_activation_queue_;
24
25 static const size_t ScheduleTime = 10;
26 static const size_t MinThreadNum = 10;
27 static const size_t MaxThreadNum = 20;
28
29 public:
30 LF_ThreadPool(void);
31 ~LF_ThreadPool(void);
32
33 virtual int svc(void);
34
35 int start_stread_pool( void );
36 int stop_thread_pool( void );
37 int post_request( ACE_Method_Request *request );
38
39 int get_queue_load(void){ return m_activation_queue_.method_count(); }
40 int get_max_thread(void){ return MaxThreadNum; }
41 int get_min_thread(void){ return MinThreadNum; }
42
43 private:
44 int _fork_new_thread( void );
45 int _post_exit_request(void);
46 };
47
// LF_ThreadPool.cpp
1 #include "stdafx.h"
2 #include "LF_ThreadPool.h"
3
4 LF_ThreadPool::LF_ThreadPool(void) :
5 m_bShutdown(false),
6 m_bRunning(false)
7 {
8 }
9
10 LF_ThreadPool::~LF_ThreadPool(void)
11 {
12 }
13
14 int LF_ThreadPool::svc( void )
15 {
16 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread started.\t: %d working threads left.\n"),thr_count()));
17
18 // 線程開始運(yùn)行
19 m_bRunning = true;
20
21 set_status(TH_READY);
22
23 while(true){
24 // Block until this thread is the leader.
25 become_leader();
26
27 // 設(shè)置線程空閑時(shí)間,空閑線程將會自動退出
28 ACE_Time_Value tv(ScheduleTime);
29 tv += ACE_OS::gettimeofday();
30
31 // 從隊(duì)列獲取下一個請求,并獲得所有權(quán)
32 auto_ptr<ACE_Method_Request> request(m_activation_queue_.dequeue(&tv));
33 if( request.get() == 0 ){ // 長時(shí)間沒有請求,dequeue超時(shí)返回
34 if( elect_new_leader() == 0 && thr_count() > MinThreadNum ) // 成功選擇新的領(lǐng)導(dǎo)者,且工作線程數(shù)大于最少線程數(shù)
35 break; // 結(jié)束當(dāng)前線程
36 if( thr_count() < MinThreadNum && thr_count() < MaxThreadNum ) // 工作線程數(shù)小于最少線程數(shù),創(chuàng)建新的線程
37 _fork_new_thread();
38 continue; // 繼續(xù)擔(dān)當(dāng)領(lǐng)導(dǎo)者(優(yōu)先成為領(lǐng)導(dǎo)者),或返回線程池等待
39 }
40
41 // Elect a new leader then process the request
42 if( elect_new_leader() != 0 || thr_count() < MinThreadNum ) // 沒有空余線程可成為領(lǐng)導(dǎo)者,或者線程池容量調(diào)整
43 if( !m_bShutdown ) // 且沒有調(diào)度關(guān)閉
44 if( thr_count() < MaxThreadNum ) // 未達(dá)到線程數(shù)上線
45 _fork_new_thread(); // 創(chuàng)建新的線程
46
47 // Invoke the method request.
48 set_status(TH_WORKER);
49
50 ACE_Time_Value tv_start,tv_finish,tv_working;
51 tv_start = ACE_OS::gettimeofday();
52
53 int result = request->call();
54
55 tv_finish = ACE_OS::gettimeofday();
56 tv_working = tv_finish - tv_start;
57 set_worktime(tv_working);
58
59 if( result == -1 ){
60 if( thr_count() > 1 ) // If received a ExitMethod, Notify the next Thread(if exists) to exit too.
61 _post_exit_request();
62 break;
63 }
64 }
65
66 // 剩下最后一個線程,線程池停止
67 if( thr_count() == 1 )
68 m_bRunning = false;
69
70 set_status(TH_STOP);
71 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread stoped.\t: %d working threads left.\n"),thr_count()-1));
72 return 0;
73 }
74
75 int LF_ThreadPool::start_stread_pool( void )
76 {
77 m_bShutdown = false;
78 return activate(THR_NEW_LWP| THR_JOINABLE,MinThreadNum);
79 }
80
81 int LF_ThreadPool::stop_thread_pool( void )
82 {
83 // 線程池已停止
84 if( !m_bRunning )
85 return 0;
86
87 m_bShutdown = true;
88 _post_exit_request();
89 return wait();
90 }
91
92 int LF_ThreadPool::post_request( ACE_Method_Request *request )
93 {
94 ACE_TRACE (ACE_TEXT ("SvcThreadPool::enqueue"));
95 return m_activation_queue_.enqueue (request);
96 }
97
98 int LF_ThreadPool::_fork_new_thread( void )
99 {
100 return activate(THR_NEW_LWP| THR_JOINABLE,1,1);
101 }
102
103 int LF_ThreadPool::_post_exit_request( void )
104 {
105 return post_request(new ExitRequest);
106 }
107
2 #include "LF_ThreadPool.h"
3
4 LF_ThreadPool::LF_ThreadPool(void) :
5 m_bShutdown(false),
6 m_bRunning(false)
7 {
8 }
9
10 LF_ThreadPool::~LF_ThreadPool(void)
11 {
12 }
13
14 int LF_ThreadPool::svc( void )
15 {
16 //ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread started.\t: %d working threads left.\n"),thr_count()));
17
18 // 線程開始運(yùn)行
19 m_bRunning = true;
20
21 set_status(TH_READY);
22
23 while(true){
24 // Block until this thread is the leader.
25 become_leader();
26
27 // 設(shè)置線程空閑時(shí)間,空閑線程將會自動退出
28 ACE_Time_Value tv(ScheduleTime);
29 tv += ACE_OS::gettimeofday();
30
31 // 從隊(duì)列獲取下一個請求,并獲得所有權(quán)
32 auto_ptr<ACE_Method_Request> request(m_activation_queue_.dequeue(&tv));
33 if( request.get() == 0 ){ // 長時(shí)間沒有請求,dequeue超時(shí)返回
34 if( elect_new_leader() == 0 && thr_count() > MinThreadNum ) // 成功選擇新的領(lǐng)導(dǎo)者,且工作線程數(shù)大于最少線程數(shù)
35 break; // 結(jié)束當(dāng)前線程
36 if( thr_count() < MinThreadNum && thr_count() < MaxThreadNum ) // 工作線程數(shù)小于最少線程數(shù),創(chuàng)建新的線程
37 _fork_new_thread();
38 continue; // 繼續(xù)擔(dān)當(dāng)領(lǐng)導(dǎo)者(優(yōu)先成為領(lǐng)導(dǎo)者),或返回線程池等待
39 }
40
41 // Elect a new leader then process the request
42 if( elect_new_leader() != 0 || thr_count() < MinThreadNum ) // 沒有空余線程可成為領(lǐng)導(dǎo)者,或者線程池容量調(diào)整
43 if( !m_bShutdown ) // 且沒有調(diào)度關(guān)閉
44 if( thr_count() < MaxThreadNum ) // 未達(dá)到線程數(shù)上線
45 _fork_new_thread(); // 創(chuàng)建新的線程
46
47 // Invoke the method request.
48 set_status(TH_WORKER);
49
50 ACE_Time_Value tv_start,tv_finish,tv_working;
51 tv_start = ACE_OS::gettimeofday();
52
53 int result = request->call();
54
55 tv_finish = ACE_OS::gettimeofday();
56 tv_working = tv_finish - tv_start;
57 set_worktime(tv_working);
58
59 if( result == -1 ){
60 if( thr_count() > 1 ) // If received a ExitMethod, Notify the next Thread(if exists) to exit too.
61 _post_exit_request();
62 break;
63 }
64 }
65
66 // 剩下最后一個線程,線程池停止
67 if( thr_count() == 1 )
68 m_bRunning = false;
69
70 set_status(TH_STOP);
71 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread stoped.\t: %d working threads left.\n"),thr_count()-1));
72 return 0;
73 }
74
75 int LF_ThreadPool::start_stread_pool( void )
76 {
77 m_bShutdown = false;
78 return activate(THR_NEW_LWP| THR_JOINABLE,MinThreadNum);
79 }
80
81 int LF_ThreadPool::stop_thread_pool( void )
82 {
83 // 線程池已停止
84 if( !m_bRunning )
85 return 0;
86
87 m_bShutdown = true;
88 _post_exit_request();
89 return wait();
90 }
91
92 int LF_ThreadPool::post_request( ACE_Method_Request *request )
93 {
94 ACE_TRACE (ACE_TEXT ("SvcThreadPool::enqueue"));
95 return m_activation_queue_.enqueue (request);
96 }
97
98 int LF_ThreadPool::_fork_new_thread( void )
99 {
100 return activate(THR_NEW_LWP| THR_JOINABLE,1,1);
101 }
102
103 int LF_ThreadPool::_post_exit_request( void )
104 {
105 return post_request(new ExitRequest);
106 }
107
怎么樣?很簡單吧?什么?怎么用?Oh My Lady GaGa!還是告訴你吧:
1 m_pool.post_request(new M2M_EventRequest(m_lua_svc,m_lua_scripts,type_name,xml_event,*iter));
需要線程池出來干活的時(shí)候,創(chuàng)建一個請求對象,扔給他就行了!好了,代碼就是最好的文檔,C++開源社區(qū)給了我成長的土壤,希望能對后來者有所幫助。
把這些東西貼出來,是為了整理自己的大腦,免得這些曾經(jīng)頂著熊貓眼熬出來的東西,塵封在茫茫的代碼海洋中,取之于前輩,還之于后人。也希望有更多的高手能夠慷慨布道,壯大我們的C++社區(qū)。