花了足足3天時間,外加1天心情休整,終于在第5天編寫出了一個能運行的基于asio和thread_group的框架,差點沒氣暈過去,把源碼都看懂了才感覺會用了。
測試了一下,debug下一萬次回應(yīng)耗時800+毫秒,release下是200+毫秒,機器配置雙核2.5G英特爾,4個線程并行工作,無錯的感覺真好,再也不用擔(dān)心iocp出一些奇怪的問題啦,因為是巨人們寫的實現(xiàn),呵呵。
進入正題,簡要說一下asio的實現(xiàn)原理吧。在win32平臺上,asio是基于IOCP技術(shù)實現(xiàn)的,我以前也用過IOCP,卻沒想到居然能擴展成這樣,真是神奇!在其他平臺下還會有別的方法去實現(xiàn),具體見io_service類下面這部分的源碼:
// The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
typedef detail::win_iocp_io_service impl_type;
friend class detail::win_iocp_overlapped_ptr;
#elif defined(BOOST_ASIO_HAS_EPOLL)
typedef detail::task_io_service<detail::epoll_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_KQUEUE)
typedef detail::task_io_service<detail::kqueue_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
typedef detail::task_io_service<detail::dev_poll_reactor<false> > impl_type;
#else
typedef detail::task_io_service<detail::select_reactor<false> > impl_type;
#endif

這部分代碼其實就在boost::asio::io_service類聲明中的最前面幾行,可以看見在不同平臺下,io_service類的實現(xiàn)將會不同。很顯然,windows平臺下當(dāng)然是win_iocp_io_service類為實現(xiàn)了(不過我一開始還以為win_iocp_io_service是直接拿出來用的呢,還在疑惑這樣怎么有移植性呢?官方文檔也對該類只字不提,其實我卡殼就是卡在這里了,差點就直接用這個類了^_^!)。
那么就分析一下win_iocp_io_service的代碼吧,這里完全是用IOCP來路由各種任務(wù),大家使用post來委托任務(wù),內(nèi)部調(diào)用的其實是IOCP的PostQueuedCompletionStatus函數(shù),然后線程們用run來接受任務(wù),內(nèi)部其實是阻塞在IOCP的GetQueuedCompletionStatus函數(shù)上,一旦有了任務(wù)就立即返回,執(zhí)行完后再一個循環(huán),繼續(xù)阻塞在這里等待下一個任務(wù)的到來,這種設(shè)計思想堪稱神奇,對線程、服務(wù)以及任務(wù)完全解耦,靈活度達到了如此高度,不愧為boost庫的東西!我只能有拜的份了...
說一下總體的設(shè)計思想,其實io_service就像是勞工中介所,而一個線程就是一個勞工,而調(diào)用post的模塊相當(dāng)于富人們,他們?nèi)ブ薪樗腥蝿?wù),而勞工們就聽候中介所的調(diào)遣去執(zhí)行這些任務(wù),任務(wù)的內(nèi)容就寫在富人們給你的handler上,也就是函數(shù)指針,指針指向具體實現(xiàn)就是任務(wù)的實質(zhì)內(nèi)容。其實在整個過程中,富人們都不知道是哪個勞工幫他們做的工作,只知道是中介所負責(zé)完成這些就可以了。這使得邏輯上的耦合降到了最低。不過這樣的比喻也有個不恰當(dāng)?shù)牡胤剑绻惨@樣比喻的話,我只能說:其實勞工里面也有很多富人的^o^! 。很多勞工在完成任務(wù)的過程中自己也托給中介所一些任務(wù),然后這些任務(wù)很可能還是自己去完成。這也難怪,運行代碼的總是這些線程,那么調(diào)用post的肯定也會有這些線程了,不過不管怎么說,如此循環(huán)往復(fù)可以解決問題就行,比喻不見得就得恰當(dāng),任何事物之間都不可能完全相同,只要能闡述思想就行。
最后還要說明的一點就是:委托的任務(wù)其實可以設(shè)定執(zhí)行的時間的,很不錯的設(shè)定,內(nèi)部實現(xiàn)則是通過定時器原理,GetQueuedCompletionStatus有一個等待時間的參數(shù)似乎被用在這方面,還有源碼中的定時器線程我并沒有過多的去理解,總之大體原理已基本掌握,剩下的就是使勁的用它了!!!
另外為了方便人交流,在這里插入一些代碼可能更容易讓人理解吧,
下面這個是啟動服務(wù)時的代碼:
void ServerFramework::run()


{
boost::thread_group workers;
for (uint32 i = 0; i < mWorkerCount; ++i)
workers.create_thread(
boost::bind(&boost::asio::io_service::run, &mIoService));
workers.join_all();
}
在打開前就得分配好任務(wù),否則線程們運行起來就退出了,阻塞不住,任務(wù)的分配就交給open函數(shù)了,它是分配了監(jiān)聽端口的任務(wù),一旦有了連接就會拋出一個任務(wù),其中一個線程就會開始行動啦。

void ServerFramework::open(const String& address, const String& port, uint32 nWorkers /**//*= DEFAULT_WORKER_COUNT*/)


{
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(mIoService);
boost::asio::ip::tcp::resolver::query query(address, port);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);

mAcceptor.open(endpoint.protocol());
mAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
mAcceptor.bind(endpoint);
mAcceptor.listen();

mNextConnection = new Connection(this);
mAcceptor.async_accept(mNextConnection->getSocket(),
boost::bind(&ServerFramework::__onConnect, this,
boost::asio::placeholders::error));

mWorkerCount = nWorkers;
if (mWorkerCount == DEFAULT_WORKER_COUNT)

{
mWorkerCount = 4;
}
}
open函數(shù)中給io_service的一個任務(wù)就是在有鏈接訪問服務(wù)器端口的情況下執(zhí)行ServerFramework::__onConnect函數(shù),有一點需要格外注意的,io_service必須時刻都有任務(wù)存在,否則線程io_service::run函數(shù)將返回,于是線程都會結(jié)束并銷毀,程序?qū)⑼顺觯裕惚仨毐WC無論何時都有任務(wù)存在,這樣線程們即使空閑了也還是會繼續(xù)等待,不會銷毀。所以,我在ServerFramework::__onConnect函數(shù)中又一次給了io_service相同的任務(wù),即:繼續(xù)監(jiān)聽端口,有鏈接了還是調(diào)用ServerFramework::__onConnect函數(shù)。如果你在ServerFramework::__onConnect執(zhí)行完了還沒有給io_service任務(wù)的話,那么一切都晚了...... 代碼如下:
void ServerFramework::__onConnect(const BoostSysErr& e)


{
if (e)

{
MOELOG_DETAIL_WARN(e.message().c_str());
}

Connection* p = mNextConnection;
mNextConnection = new Connection(this);

// 再次進入監(jiān)聽狀態(tài)
mAcceptor.async_accept(mNextConnection->getSocket(),
boost::bind(&ServerFramework::__onConnect, this,
boost::asio::placeholders::error));

// 處理當(dāng)前鏈接
__addConnection(p);
p->start();
}
最后,展示一下這個類的所有成員變量吧:
// 用于線程池異步處理的核心對象
boost::asio::io_service mIoService;

// 網(wǎng)絡(luò)鏈接的接收器,用于接收請求進入的鏈接
boost::asio::ip::tcp::acceptor mAcceptor;

// 指向下一個將要被使用的鏈接對象
Connection* mNextConnection;

// 存儲服務(wù)器鏈接對象的容器
ConnectionSet mConnections;


/**///// 為鏈接對象容器準(zhǔn)備的strand,防止并行調(diào)用mConnections
//boost::asio::io_service::strand mStrand_mConnections;

// 為鏈接對象容器準(zhǔn)備的同步鎖,防止并行調(diào)用mConnections
boost::mutex mMutex4ConnSet;

// 為控制臺輸出流準(zhǔn)備的strand,防止并行調(diào)用std::cout
AsioService::strand mStrand_ConsoleIostream;

// 工作線程的數(shù)量
uint32 mWorkerCount;
但愿這篇隨筆也能對正在研究asio的朋友們有所幫助吧。