花了足足3天時間,外加1天心情休整,終于在第5天編寫出了一個能運行的基于asio和thread_group的框架,差點沒氣暈過去,把源碼都看懂了才感覺會用了。
測試了一下,debug下一萬次回應耗時800+毫秒,release下是200+毫秒,機器配置雙核2.5G英特爾,4個線程并行工作,無錯的感覺真好,再也不用擔心iocp出一些奇怪的問題啦,因為是巨人們寫的實現,呵呵。
進入正題,簡要說一下asio的實現原理吧。在win32平臺上,asio是基于IOCP技術實現的,我以前也用過IOCP,卻沒想到居然能擴展成這樣,真是神奇!在其他平臺下還會有別的方法去實現,具體見io_service類下面這部分的源碼:
// The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
typedef detail::win_iocp_io_service impl_type;
friend class detail::win_iocp_overlapped_ptr;
#elif defined(BOOST_ASIO_HAS_EPOLL)
typedef detail::task_io_service<detail::epoll_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_KQUEUE)
typedef detail::task_io_service<detail::kqueue_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
typedef detail::task_io_service<detail::dev_poll_reactor<false> > impl_type;
#else
typedef detail::task_io_service<detail::select_reactor<false> > impl_type;
#endif

這部分代碼其實就在boost::asio::io_service類聲明中的最前面幾行,可以看見在不同平臺下,io_service類的實現將會不同。很顯然,windows平臺下當然是win_iocp_io_service類為實現了(不過我一開始還以為win_iocp_io_service是直接拿出來用的呢,還在疑惑這樣怎么有移植性呢?官方文檔也對該類只字不提,其實我卡殼就是卡在這里了,差點就直接用這個類了^_^!)。
那么就分析一下win_iocp_io_service的代碼吧,這里完全是用IOCP來路由各種任務,大家使用post來委托任務,內部調用的其實是IOCP的PostQueuedCompletionStatus函數,然后線程們用run來接受任務,內部其實是阻塞在IOCP的GetQueuedCompletionStatus函數上,一旦有了任務就立即返回,執行完后再一個循環,繼續阻塞在這里等待下一個任務的到來,這種設計思想堪稱神奇,對線程、服務以及任務完全解耦,靈活度達到了如此高度,不愧為boost庫的東西!我只能有拜的份了...
說一下總體的設計思想,其實io_service就像是勞工中介所,而一個線程就是一個勞工,而調用post的模塊相當于富人們,他們去中介所委托任務,而勞工們就聽候中介所的調遣去執行這些任務,任務的內容就寫在富人們給你的handler上,也就是函數指針,指針指向具體實現就是任務的實質內容。其實在整個過程中,富人們都不知道是哪個勞工幫他們做的工作,只知道是中介所負責完成這些就可以了。這使得邏輯上的耦合降到了最低。不過這樣的比喻也有個不恰當的地方,如果硬要這樣比喻的話,我只能說:其實勞工里面也有很多富人的^o^! 。很多勞工在完成任務的過程中自己也托給中介所一些任務,然后這些任務很可能還是自己去完成。這也難怪,運行代碼的總是這些線程,那么調用post的肯定也會有這些線程了,不過不管怎么說,如此循環往復可以解決問題就行,比喻不見得就得恰當,任何事物之間都不可能完全相同,只要能闡述思想就行。
最后還要說明的一點就是:委托的任務其實可以設定執行的時間的,很不錯的設定,內部實現則是通過定時器原理,GetQueuedCompletionStatus有一個等待時間的參數似乎被用在這方面,還有源碼中的定時器線程我并沒有過多的去理解,總之大體原理已基本掌握,剩下的就是使勁的用它了?。。?br>
另外為了方便人交流,在這里插入一些代碼可能更容易讓人理解吧,
下面這個是啟動服務時的代碼:
void ServerFramework::run()


{
boost::thread_group workers;
for (uint32 i = 0; i < mWorkerCount; ++i)
workers.create_thread(
boost::bind(&boost::asio::io_service::run, &mIoService));
workers.join_all();
}
在打開前就得分配好任務,否則線程們運行起來就退出了,阻塞不住,任務的分配就交給open函數了,它是分配了監聽端口的任務,一旦有了連接就會拋出一個任務,其中一個線程就會開始行動啦。

void ServerFramework::open(const String& address, const String& port, uint32 nWorkers /**//*= DEFAULT_WORKER_COUNT*/)


{
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(mIoService);
boost::asio::ip::tcp::resolver::query query(address, port);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);

mAcceptor.open(endpoint.protocol());
mAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
mAcceptor.bind(endpoint);
mAcceptor.listen();

mNextConnection = new Connection(this);
mAcceptor.async_accept(mNextConnection->getSocket(),
boost::bind(&ServerFramework::__onConnect, this,
boost::asio::placeholders::error));

mWorkerCount = nWorkers;
if (mWorkerCount == DEFAULT_WORKER_COUNT)

{
mWorkerCount = 4;
}
}
open函數中給io_service的一個任務就是在有鏈接訪問服務器端口的情況下執行ServerFramework::__onConnect函數,有一點需要格外注意的,io_service必須時刻都有任務存在,否則線程io_service::run函數將返回,于是線程都會結束并銷毀,程序將退出,所以,你必須保證無論何時都有任務存在,這樣線程們即使空閑了也還是會繼續等待,不會銷毀。所以,我在ServerFramework::__onConnect函數中又一次給了io_service相同的任務,即:繼續監聽端口,有鏈接了還是調用ServerFramework::__onConnect函數。如果你在ServerFramework::__onConnect執行完了還沒有給io_service任務的話,那么一切都晚了...... 代碼如下:
void ServerFramework::__onConnect(const BoostSysErr& e)


{
if (e)

{
MOELOG_DETAIL_WARN(e.message().c_str());
}

Connection* p = mNextConnection;
mNextConnection = new Connection(this);

// 再次進入監聽狀態
mAcceptor.async_accept(mNextConnection->getSocket(),
boost::bind(&ServerFramework::__onConnect, this,
boost::asio::placeholders::error));

// 處理當前鏈接
__addConnection(p);
p->start();
}
最后,展示一下這個類的所有成員變量吧:
// 用于線程池異步處理的核心對象
boost::asio::io_service mIoService;

// 網絡鏈接的接收器,用于接收請求進入的鏈接
boost::asio::ip::tcp::acceptor mAcceptor;

// 指向下一個將要被使用的鏈接對象
Connection* mNextConnection;

// 存儲服務器鏈接對象的容器
ConnectionSet mConnections;


/**///// 為鏈接對象容器準備的strand,防止并行調用mConnections
//boost::asio::io_service::strand mStrand_mConnections;

// 為鏈接對象容器準備的同步鎖,防止并行調用mConnections
boost::mutex mMutex4ConnSet;

// 為控制臺輸出流準備的strand,防止并行調用std::cout
AsioService::strand mStrand_ConsoleIostream;

// 工作線程的數量
uint32 mWorkerCount;
但愿這篇隨筆也能對正在研究asio的朋友們有所幫助吧。