這是一個基于ACE的線程庫實現,屬于半同步半異步類型的線程池,感覺實現得非常優雅,代碼是由網上下的好幾份代碼拼湊而成的(ACE的源碼包中的tests目錄下有大量的實例,研究這些例子是學習ACE的好辦法,只是由于注釋都是一堆堆的英文,有時候感覺頭疼,就懶得去看它了)。這個線程池由一個線程池管理器管理著五個線程來處理消息,當五個處理線程都在處理消息時,接收新的消息將導致
線程管理器被阻塞。消息處理線程處理完發給自己的消息后將被阻塞,其將重新被管理器管理器放入隊列中。越發感覺到ACE的強大,只可惜我們的程序用不上。一個原因是我們程序本身處理的數據量并不會太大;另外我們的程序只要求跑在Solaris上面,不會出現異構的平臺;最后ACE庫本身太繁雜了,很多地方比如網絡相關的函數我們是不會用的,不過如果現在我們正在使用的網絡庫也使用ACE的話,那么使用ACE簡直再好不過了。
#include "ace/OS.h"
#include "ace/Task.h"
#include "ace/Thread.h"
#include "ace/Synch.h"
class Worker;
class IManager
{
public:
virtual int return_to_work (Worker *worker) = 0;
};
class Worker : public ACE_Task<ACE_MT_SYNCH>
{
public:
Worker (IManager *manager) : manager_(manager) { }
//線程啟動之后進入本函數
virtual int svc (void)
{
thread_id_ = ACE_Thread::self();
//工作線程啟動之后只有收到MB_HANGUP類型的消息它才會退出
while (1)
{
ACE_Message_Block *mb = 0;
//如果隊列中沒有數據,本線程將被阻塞
if (this->getq(mb) == -1)
ACE_ERROR_BREAK((LM_ERROR, ACE_TEXT ("%p "), ACE_TEXT ("getq")));
// 如果是MB_HANGUP消息,就結束線程
if (mb->msg_type() == ACE_Message_Block::MB_HANGUP)
{
ACE_DEBUG ((LM_INFO,
ACE_TEXT ("(%t) Shutting down ")));
mb->release();
break;
}
// Process the message.
process_message (mb);
// Return to work.
// 這里會將自己放到線程池中,并通過workers_cond_來通知manager
this->manager_->return_to_work (this);
}
return 0;
}
ACE_thread_t thread_id(void)
{
return thread_id_;
}
private:
//處理消息
void process_message (ACE_Message_Block *mb)
{
ACE_TRACE (ACE_TEXT ("Worker::process_message"));
int msgId;
ACE_OS::memcpy (&msgId, mb->rd_ptr(), sizeof(int));
mb->release();
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Started processing message %d "),
msgId));
ACE_OS::sleep(3);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Finished processing message %d "),
msgId));
}
//指向線程池管理器
IManager *manager_;
//保存本線程id號
ACE_thread_t thread_id_;
};
class Manager : public ACE_Task<ACE_MT_SYNCH>, public IManager
{
public:
enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};
Manager ()
: shutdown_(0), workers_lock_(), workers_cond_(workers_lock_)
{
ACE_TRACE (ACE_TEXT ("Manager::Manager"));
}
/* 線程處理函數 */
int svc (void)
{
ACE_TRACE (ACE_TEXT ("Manager::svc"));
ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started ")));
// Create pool.
create_worker_pool();
while (true)
{
ACE_Message_Block *mb = 0;
ACE_Time_Value tv ((long)MAX_TIMEOUT);
tv += ACE_OS::time (0);
// Get a message request.
if (this->getq (mb, &tv) < 0)
{
shut_down ();
break;
}
// Choose a worker.
Worker *worker = 0;
/*
這對大括號中的代碼從worker線程池中獲取一個工作線程,線程池由
this->workers_lock_互斥體加以保護,如果沒有worker可用,manager
會阻塞在workers_lock_條件變量上,等待某個線程回來工作
*/
{
ACE_GUARD_RETURN (ACE_Thread_Mutex,
worker_mon, this->workers_lock_, -1);
/*
阻塞在workers_lock_.wait()上直到有worker可用,當某個worker回來后
會把自己放到線程池隊列上,同時通過觸發workers_cond_來通知manager
*/
while (this->workers_.is_empty ())
workers_cond_.wait ();
/* 將獲取的worker從線程池隊列中刪除 */
this->workers_.dequeue_head (worker);
}
// Ask the worker to do the job.
// 將請求消息放入到worker的消息隊列中
worker->putq (mb);
}
return 0;
}
int shut_down (void)
{
ACE_TRACE (ACE_TEXT ("ACE_ThreadPool::DestroyPool"));
ACE_Unbounded_Queue<Worker* >::ITERATOR iter = this->workers_.begin();
Worker** worker_ptr = NULL;
do
{
iter.next (worker_ptr);
Worker *worker = (*worker_ptr);
// Send the hangup message.
ACE_Message_Block *mb;
ACE_NEW_RETURN(
mb,
ACE_Message_Block(0,
ACE_Message_Block::MB_HANGUP),
-1);
worker->putq(mb);
// Wait for the exit.
worker->wait();
ACE_ASSERT (worker->msg_queue()->is_empty ());
delete worker;
}while (iter.advance());
return 0;
};
ACE_thread_t thread_id (Worker *worker);
/* 提供給worker的接口,用于在worker完成處理后,將自己放入到線程池隊列,并通知manager */
virtual int return_to_work (Worker *worker)
{
ACE_GUARD_RETURN (ACE_Thread_Mutex,
worker_mon, this->workers_lock_, -1);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Worker %u returning to work. "),
worker->thr_mgr()->thr_self()));
// 將worker放入到線程池隊列
this->workers_.enqueue_tail (worker);
// 觸發條件變量,通知manager
this->workers_cond_.signal ();
return 0;
}
private:
// 創建worker線程池
int create_worker_pool (void)
{
ACE_GUARD_RETURN (ACE_Thread_Mutex,
worker_mon,
this->workers_lock_,
-1);
for (int i = 0; i < POOL_SIZE; i++)
{
Worker *worker;
// 創建worker
ACE_NEW_RETURN (worker, Worker (this), -1);
// 放入線程池隊列
this->workers_.enqueue_tail (worker);
// 激活線程,調用該函數后,worker線程被創建,由于worker
// 是ACE_Task的子類,線程激活后,從svc函數開始執行
worker->activate ();
}
return 0;
}
private:
int shutdown_;
/* workers_lock_ 線程池隊列的互斥體,在對線程池進行操作時,需要通過互斥鎖來保護
所以在所有的線程池隊列隊列操作前都有這樣的語句:
ACE_GUARD_RETURN (ACE_Thread_Mutex,
worker_mon, this->workers_lock_, -1);
*/
ACE_Thread_Mutex workers_lock_;
ACE_Condition<ACE_Thread_Mutex> workers_cond_;
/* 線程池隊列 */
ACE_Unbounded_Queue<Worker* > workers_;
};
int ACE_TMAIN (int, ACE_TCHAR *[])
{
Manager tp;
tp.activate ();
// Wait for a moment every time you send a message.
ACE_Time_Value tv;
tv.msec (100);
ACE_Message_Block *mb;
for (int i = 0; i < 10; i++)
{
ACE_NEW_RETURN(mb, ACE_Message_Block(sizeof(int)), -1);
ACE_OS::memcpy (mb->wr_ptr(), &i, sizeof(int));
ACE_OS::sleep(tv);
// Add a new work item.
// 這里將請求消息首先發到了manager線程,由manager線程負責分發
tp.putq (mb);
}
// 主線程等待子線程結束
ACE_Thread_Manager::instance()->wait();
return 0;
}
posted on 2009-11-02 18:13
老馬驛站 閱讀(2208)
評論(0) 編輯 收藏 引用 所屬分類:
ACE