花了足足3天時(shí)間,外加1天心情休整,終于在第5天編寫出了一個(gè)能運(yùn)行的基于asio和thread_group的框架,差點(diǎn)沒氣暈過去,把源碼都看懂了才感覺會(huì)用了。
測試了一下,debug下一萬次回應(yīng)耗時(shí)800+毫秒,release下是200+毫秒,機(jī)器配置雙核2.5G英特爾,4個(gè)線程并行工作,無錯(cuò)的感覺真好,再也不用擔(dān)心iocp出一些奇怪的問題啦,因?yàn)槭蔷奕藗儗懙膶?shí)現(xiàn),呵呵。
進(jìn)入正題,簡要說一下asio的實(shí)現(xiàn)原理吧。在win32平臺(tái)上,asio是基于IOCP技術(shù)實(shí)現(xiàn)的,我以前也用過IOCP,卻沒想到居然能擴(kuò)展成這樣,真是神奇!在其他平臺(tái)下還會(huì)有別的方法去實(shí)現(xiàn),具體見io_service類下面這部分的源碼:
// The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
typedef detail::win_iocp_io_service impl_type;
friend class detail::win_iocp_overlapped_ptr;
#elif defined(BOOST_ASIO_HAS_EPOLL)
typedef detail::task_io_service<detail::epoll_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_KQUEUE)
typedef detail::task_io_service<detail::kqueue_reactor<false> > impl_type;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
typedef detail::task_io_service<detail::dev_poll_reactor<false> > impl_type;
#else
typedef detail::task_io_service<detail::select_reactor<false> > impl_type;
#endif

這部分代碼其實(shí)就在boost::asio::io_service類聲明中的最前面幾行,可以看見在不同平臺(tái)下,io_service類的實(shí)現(xiàn)將會(huì)不同。很顯然,windows平臺(tái)下當(dāng)然是win_iocp_io_service類為實(shí)現(xiàn)了(不過我一開始還以為win_iocp_io_service是直接拿出來用的呢,還在疑惑這樣怎么有移植性呢?官方文檔也對(duì)該類只字不提,其實(shí)我卡殼就是卡在這里了,差點(diǎn)就直接用這個(gè)類了^_^!)。
那么就分析一下win_iocp_io_service的代碼吧,這里完全是用IOCP來路由各種任務(wù),大家使用post來委托任務(wù),內(nèi)部調(diào)用的其實(shí)是IOCP的PostQueuedCompletionStatus函數(shù),然后線程們用run來接受任務(wù),內(nèi)部其實(shí)是阻塞在IOCP的GetQueuedCompletionStatus函數(shù)上,一旦有了任務(wù)就立即返回,執(zhí)行完后再一個(gè)循環(huán),繼續(xù)阻塞在這里等待下一個(gè)任務(wù)的到來,這種設(shè)計(jì)思想堪稱神奇,對(duì)線程、服務(wù)以及任務(wù)完全解耦,靈活度達(dá)到了如此高度,不愧為boost庫的東西!我只能有拜的份了...
說一下總體的設(shè)計(jì)思想,其實(shí)io_service就像是勞工中介所,而一個(gè)線程就是一個(gè)勞工,而調(diào)用post的模塊相當(dāng)于富人們,他們?nèi)ブ薪樗腥蝿?wù),而勞工們就聽候中介所的調(diào)遣去執(zhí)行這些任務(wù),任務(wù)的內(nèi)容就寫在富人們給你的handler上,也就是函數(shù)指針,指針指向具體實(shí)現(xiàn)就是任務(wù)的實(shí)質(zhì)內(nèi)容。其實(shí)在整個(gè)過程中,富人們都不知道是哪個(gè)勞工幫他們做的工作,只知道是中介所負(fù)責(zé)完成這些就可以了。這使得邏輯上的耦合降到了最低。不過這樣的比喻也有個(gè)不恰當(dāng)?shù)牡胤剑绻惨@樣比喻的話,我只能說:其實(shí)勞工里面也有很多富人的^o^! 。很多勞工在完成任務(wù)的過程中自己也托給中介所一些任務(wù),然后這些任務(wù)很可能還是自己去完成。這也難怪,運(yùn)行代碼的總是這些線程,那么調(diào)用post的肯定也會(huì)有這些線程了,不過不管怎么說,如此循環(huán)往復(fù)可以解決問題就行,比喻不見得就得恰當(dāng),任何事物之間都不可能完全相同,只要能闡述思想就行。
最后還要說明的一點(diǎn)就是:委托的任務(wù)其實(shí)可以設(shè)定執(zhí)行的時(shí)間的,很不錯(cuò)的設(shè)定,內(nèi)部實(shí)現(xiàn)則是通過定時(shí)器原理,GetQueuedCompletionStatus有一個(gè)等待時(shí)間的參數(shù)似乎被用在這方面,還有源碼中的定時(shí)器線程我并沒有過多的去理解,總之大體原理已基本掌握,剩下的就是使勁的用它了!!!
另外為了方便人交流,在這里插入一些代碼可能更容易讓人理解吧,
下面這個(gè)是啟動(dòng)服務(wù)時(shí)的代碼:
void ServerFramework::run()


{
boost::thread_group workers;
for (uint32 i = 0; i < mWorkerCount; ++i)
workers.create_thread(
boost::bind(&boost::asio::io_service::run, &mIoService));
workers.join_all();
}
在打開前就得分配好任務(wù),否則線程們運(yùn)行起來就退出了,阻塞不住,任務(wù)的分配就交給open函數(shù)了,它是分配了監(jiān)聽端口的任務(wù),一旦有了連接就會(huì)拋出一個(gè)任務(wù),其中一個(gè)線程就會(huì)開始行動(dòng)啦。

void ServerFramework::open(const String& address, const String& port, uint32 nWorkers /**//*= DEFAULT_WORKER_COUNT*/)


{
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(mIoService);
boost::asio::ip::tcp::resolver::query query(address, port);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);

mAcceptor.open(endpoint.protocol());
mAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
mAcceptor.bind(endpoint);
mAcceptor.listen();

mNextConnection = new Connection(this);
mAcceptor.async_accept(mNextConnection->getSocket(),
boost::bind(&ServerFramework::__onConnect, this,
boost::asio::placeholders::error));

mWorkerCount = nWorkers;
if (mWorkerCount == DEFAULT_WORKER_COUNT)

{
mWorkerCount = 4;
}
}
open函數(shù)中給io_service的一個(gè)任務(wù)就是在有鏈接訪問服務(wù)器端口的情況下執(zhí)行ServerFramework::__onConnect函數(shù),有一點(diǎn)需要格外注意的,io_service必須時(shí)刻都有任務(wù)存在,否則線程io_service::run函數(shù)將返回,于是線程都會(huì)結(jié)束并銷毀,程序?qū)⑼顺觯裕惚仨毐WC無論何時(shí)都有任務(wù)存在,這樣線程們即使空閑了也還是會(huì)繼續(xù)等待,不會(huì)銷毀。所以,我在ServerFramework::__onConnect函數(shù)中又一次給了io_service相同的任務(wù),即:繼續(xù)監(jiān)聽端口,有鏈接了還是調(diào)用ServerFramework::__onConnect函數(shù)。如果你在ServerFramework::__onConnect執(zhí)行完了還沒有給io_service任務(wù)的話,那么一切都晚了...... 代碼如下:
void ServerFramework::__onConnect(const BoostSysErr& e)


{
if (e)

{
MOELOG_DETAIL_WARN(e.message().c_str());
}

Connection* p = mNextConnection;
mNextConnection = new Connection(this);

// 再次進(jìn)入監(jiān)聽狀態(tài)
mAcceptor.async_accept(mNextConnection->getSocket(),
boost::bind(&ServerFramework::__onConnect, this,
boost::asio::placeholders::error));

// 處理當(dāng)前鏈接
__addConnection(p);
p->start();
}
最后,展示一下這個(gè)類的所有成員變量吧:
// 用于線程池異步處理的核心對(duì)象
boost::asio::io_service mIoService;

// 網(wǎng)絡(luò)鏈接的接收器,用于接收請(qǐng)求進(jìn)入的鏈接
boost::asio::ip::tcp::acceptor mAcceptor;

// 指向下一個(gè)將要被使用的鏈接對(duì)象
Connection* mNextConnection;

// 存儲(chǔ)服務(wù)器鏈接對(duì)象的容器
ConnectionSet mConnections;


/**///// 為鏈接對(duì)象容器準(zhǔn)備的strand,防止并行調(diào)用mConnections
//boost::asio::io_service::strand mStrand_mConnections;

// 為鏈接對(duì)象容器準(zhǔn)備的同步鎖,防止并行調(diào)用mConnections
boost::mutex mMutex4ConnSet;

// 為控制臺(tái)輸出流準(zhǔn)備的strand,防止并行調(diào)用std::cout
AsioService::strand mStrand_ConsoleIostream;

// 工作線程的數(shù)量
uint32 mWorkerCount;
但愿這篇隨筆也能對(duì)正在研究asio的朋友們有所幫助吧。