• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            posts - 311, comments - 0, trackbacks - 0, articles - 0
              C++博客 :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理
            本篇將介紹客戶端與游戲邏輯服務(wù)器連接建立以后,mangosd如何接收、解析和處理客戶端發(fā)過來的協(xié)議。本篇不再討論mangosd與客戶端的認(rèn)證及建立最終RC4流加密的過程,想了解這部分內(nèi)容請(qǐng)看該系列的第一篇

             

             

            一、acceptor socket的監(jiān)聽啟動(dòng)及注冊(cè)

            mangosd的main ()函數(shù)調(diào)用單例對(duì)象sMaster的Run ()函數(shù),啟動(dòng)監(jiān)聽socket的代碼如下:

             

               1: int Master::Run()
               2: {
               3:     ........
               4:  
               5:     ///- Launch the world listener socket
               6:     uint16 wsport = sWorld.getConfig (CONFIG_UINT32_PORT_WORLD);
               7:     std::string bind_ip = sConfig.GetStringDefault ("BindIP", "0.0.0.0");
               8:  
               9:     if (sWorldSocketMgr->StartNetwork (wsport, bind_ip) == -1)
              10:     {
              11:         sLog.outError ("Failed to start network");
              12:         Log::WaitBeforeContinueIfNeed();
              13:         World::StopNow(ERROR_EXIT_CODE);
              14:         // go down and shutdown the server
              15:     }
              16:  
              17:     sWorldSocketMgr->Wait ();
              18:  
              19:     ........
              20: }

             

            mangosd用WorldSocketMgr類來管理socket。StartNetwork ()會(huì)調(diào)用StartReactiveIO ()來啟動(dòng)監(jiān)聽socket,處理代碼如下:

               1: int WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
               2: {
               3:     ........
               4:  
               5:     //(1)
               6:     m_NetThreadsCount = static_cast<size_t> (num_threads + 1);
               7:     m_NetThreads = new ReactorRunnable[m_NetThreadsCount];
               8:  
               9:     // -1 means use default
              10:     m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1);
              11:     m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536);
              12:     if ( m_SockOutUBuff <= 0 )
              13:     {
              14:         sLog.outError ("Network.OutUBuff is wrong in your config file");
              15:         return -1;
              16:     }
              17:  
              18:     //(2)
              19:     WorldSocket::Acceptor *acc = new WorldSocket::Acceptor;
              20:     m_Acceptor = acc;
              21:  
              22:     ACE_INET_Addr listen_addr (port, address);
              23:     if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1)
              24:     {
              25:         sLog.outError ("Failed to open acceptor ,check if the port is free");
              26:         return -1;
              27:     }
              28:  
              29:     //(3)
              30:     for (size_t i = 0; i < m_NetThreadsCount; ++i)
              31:         m_NetThreads[i].Start ();
              32:  
              33:     return 0;
              34: }

            (1)ReactorRunnable類繼承了ACE_Task_Base,ACE_Task_Base 是ACE 中的任務(wù)或主動(dòng)對(duì)象“處理結(jié)構(gòu)”的基類。在ACE 中使用了此類來實(shí)現(xiàn)主動(dòng)對(duì)象模式。所有希望成為“主動(dòng)對(duì)象”的對(duì)象都必須從此類派生。可以把ACE_TASK 看作是更高級(jí)的、更為面向?qū)ο蟮木€程類[1]。ACE_Task_Base調(diào)用時(shí)繼承類必須重寫svc方法,并且在使用時(shí)保證調(diào)用了activate ()方法。

            (2)指定監(jiān)聽地址,端口并把Acceptor綁定到第一個(gè)線程的Reactor上。啟動(dòng)Acceptor開始監(jiān)聽網(wǎng)絡(luò)IO。

            (3)啟動(dòng)所有線程,每個(gè)線程上有一個(gè)單獨(dú)的ACE_Reactor* m_Reactor;,這里的Reactor使用的是多線程的ACE_TP_Reactor。可以各自單獨(dú)完成事件的多路復(fù)用。

             

             

            二、線程體函數(shù)

            線程體函數(shù)ReactorRunnable::svc () 如下:

               1: virtual int svc ()
               2: {
               3:     //(1)
               4:     WorldDatabase.ThreadStart ();
               5:     
               6:     SocketSet::iterator i, t;
               7:     while (!m_Reactor->reactor_event_loop_done ())
               8:     {
               9:         // dont be too smart to move this outside the loop
              10:         // the run_reactor_event_loop will modify interval
              11:         ACE_Time_Value interval (0, 10000);
              12:  
              13:         //(2)
              14:         if (m_Reactor->run_reactor_event_loop (interval) == -1)
              15:             break;
              16:  
              17:         //(3)
              18:         AddNewSockets ();
              19:  
              20:         for (i = m_Sockets.begin (); i != m_Sockets.end ();)
              21:         {
              22:             //(4)
              23:             if ((*i)->Update () == -1)
              24:             {
              25:                 t = i;
              26:                 ++i;
              27:                 (*t)->CloseSocket ();
              28:                 (*t)->RemoveReference ();
              29:                 --m_Connections;
              30:                 m_Sockets.erase (t);
              31:             }
              32:             else
              33:                 ++i;
              34:         }
              35:     }
              36:  
              37:     WorldDatabase.ThreadEnd ();
              38:     DEBUG_LOG ("Network Thread Exitting");
              39:  
              40:     return 0;
              41: }

            (1)會(huì)調(diào)用mysql_thread_init ()函數(shù),初始化與該線程相關(guān)的變量。

            (2)run_reactor_event_loop ()函數(shù)為多路復(fù)用的等待函數(shù),當(dāng)注冊(cè)的事件發(fā)生、運(yùn)行超時(shí)或者出現(xiàn)錯(cuò)誤時(shí)返回。

            (3)AddNewSockets ()函數(shù)會(huì)將緩存在m_NewSockets里的新到達(dá)的socket添加到SocketSet m_Sockets;里,同時(shí)檢查并處理m_Sockets;里已經(jīng)closed的socket。

            (4)循環(huán)每一個(gè)WorldSocket,調(diào)用其Update ()方法,這里只處理每個(gè)socket的handle_output,即每個(gè)在此線程上的寫事件,向客戶端發(fā)送數(shù)據(jù)。下一節(jié)詳細(xì)介紹:

             

             

            三、WorldSocket::Update () 方法

            Update方法用于處理每個(gè)socket的輸出:

               1: int WorldSocket::Update (void)
               2: {
               3:     if (closing_)
               4:         return -1;
               5:  
               6:     //(1)
               7:     if (m_OutActive || m_OutBuffer->length () == 0)
               8:         return 0;
               9:  
              10:     return handle_output (get_handle ());
              11: }

            (1)m_OutBuffer有數(shù)據(jù)時(shí)才會(huì)調(diào)用handle_output,handle_output ()用于處理輸出,如果輸出不能一次性做完,會(huì)調(diào)用schedule_wakeup_output ()再次激活write事件。當(dāng)輸出處理完畢后則調(diào)用cancel_wakeup_output ()取消激活write事件,使reactor恢復(fù)到正常的loop ()循環(huán)中。詳細(xì)過程如下:

             

               1: int WorldSocket::handle_output (ACE_HANDLE)
               2: {
               3:     //(1)
               4:     ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, -1);
               5:  
               6:     if (closing_)
               7:         return -1;
               8:  
               9:     const size_t send_len = m_OutBuffer->length ();
              10:     if (send_len == 0)
              11:         return cancel_wakeup_output (Guard);
              12:  
              13: #ifdef MSG_NOSIGNAL
              14:     ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL);
              15: #else
              16:     ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len);
              17: #endif // MSG_NOSIGNAL
              18:  
              19:     if (n == 0)
              20:         return -1;
              21:     else if (n == -1)
              22:     {
              23:         if (errno == EWOULDBLOCK || errno == EAGAIN) //----------(2)
              24:             return schedule_wakeup_output (Guard);
              25:  
              26:         return -1;
              27:     }
              28:     else if (n < (ssize_t)send_len) //now n > 0      //----------(3)
              29:     {
              30:         m_OutBuffer->rd_ptr (static_cast<size_t> (n));
              31:  
              32:         // move the data to the base of the buffer
              33:         m_OutBuffer->crunch ();
              34:  
              35:         return schedule_wakeup_output (Guard);
              36:     }
              37:     else //now n == send_len                         //----------(4)
              38:     {
              39:         m_OutBuffer->reset ();
              40:  
              41:         if (!iFlushPacketQueue ())
              42:             return cancel_wakeup_output (Guard);
              43:         else
              44:             return schedule_wakeup_output (Guard);
              45:     }
              46:  
              47:     ACE_NOTREACHED (return 0);
              48: }

            (1)對(duì)m_OutBuffer加鎖。

            (2)考慮信號(hào)打斷的情況等,暫時(shí)不能寫。

            (3)只發(fā)送了部分?jǐn)?shù)據(jù)則繼續(xù)wakeup該線程對(duì)應(yīng)的Reactor。

            (4)檢查m_OutBuffer數(shù)據(jù)發(fā)送完畢同時(shí)等待buffer(PacketQueueT m_PacketQueue;)里已經(jīng)沒有數(shù)據(jù)時(shí),cancel wakeup讓Reactor恢復(fù)正常。

             

             

            四、socket到對(duì)應(yīng)線程的指派

            上一節(jié)內(nèi)容可以看到線程內(nèi)如何處理socket,及新到達(dá)的socket。但從第一節(jié)中可知只有第一個(gè)線程注冊(cè)為acceptor線程,那么新連接到達(dá)時(shí),是如何被指派到對(duì)應(yīng)的“接待”線程的呢?

            可以先看一下ACE_Acceptor的處理時(shí)序圖:

             


            圖3.1 連接到達(dá)處理時(shí)序 [2]

             

            上圖可以看出,當(dāng)連接到達(dá)時(shí),acceptor會(huì)調(diào)用對(duì)應(yīng)的SVC_HANDLER的open ()函數(shù),在mangosd里就是acceptor對(duì)應(yīng)的int WorldSocket::open (void *a),如下:

               1: int WorldSocket::open (void *a)
               2: {
               3:     ........
               4:  
               5:     // Hook for the manager.
               6:     if (sWorldSocketMgr->OnSocketOpen (this) == -1)
               7:         return -1;
               8:  
               9:     ........
              10: }

             

            OnSocketOpen方法:

               1: int WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
               2: {
               3:     ........
               4:  
               5:     // we skip the Acceptor Thread
               6:     size_t min = 1;
               7:  
               8:     MANGOS_ASSERT (m_NetThreadsCount >= 1);
               9:  
              10:     //(1)
              11:     for (size_t i = 1; i < m_NetThreadsCount; ++i)
              12:         if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ())
              13:             min = i;
              14:  
              15:     return m_NetThreads[min].AddSocket (sock);
              16: }

            (1)將WorldSocket均衡的分配給每個(gè)線程。AddSocket ()將socket添加到m_NewSockets中做緩存,待該線程自行調(diào)用AddNewSockets ()添加到處理隊(duì)列里。

             

             

            總結(jié):

            mangosd對(duì)socket的處理因?yàn)槭褂昧薃CE,邏輯處理代碼相對(duì)比較簡(jiǎn)單,寫事件的異常處理主要涉及

            (1)一次不能寫完則不斷的wakeup Reactor。

            (2)信號(hào)中斷等錯(cuò)誤的判斷。似乎這里并沒有考慮全面(見附錄)

            (3)使用另一個(gè)buffer緩存,因?qū)懢彺鎚_OutBuffer滿而帶來的多出的數(shù)據(jù)。

             

            References:

            [1] http://blog.csdn.net/yecao_kinux/article/details/1546914

            [2] http://postfiles12.naver.net/data41/2009/4/11/187/33_kbkim007.jpg?type=w3

            一级a性色生活片久久无| 亚洲一级Av无码毛片久久精品| 亚洲精品乱码久久久久久按摩 | 亚洲美日韩Av中文字幕无码久久久妻妇| 久久亚洲av无码精品浪潮| 无码任你躁久久久久久老妇App| 久久精品亚洲福利| 亚洲v国产v天堂a无码久久| 久久WWW免费人成—看片| 久久亚洲国产成人影院网站| 国产A三级久久精品| 久久精品一区二区三区中文字幕| 久久久久亚洲Av无码专| 久久久久99这里有精品10 | 久久一区二区免费播放| 99久久超碰中文字幕伊人| 亚洲国产精品成人AV无码久久综合影院 | 激情五月综合综合久久69| 欧美黑人激情性久久| 久久久无码精品午夜| 伊人久久综合热线大杳蕉下载| 久久夜色精品国产亚洲| 91亚洲国产成人久久精品网址| 精品久久久久久无码专区不卡| 精品久久久久久久国产潘金莲| 99久久夜色精品国产网站| 99精品久久精品一区二区| 人人狠狠综合久久88成人| 亚洲国产视频久久| 久久久国产精品| 久久久精品日本一区二区三区 | 日日狠狠久久偷偷色综合免费| 久久精品国产亚洲一区二区| 久久精品亚洲日本波多野结衣| 久久精品中文字幕一区 | 99久久精品国产高清一区二区| 亚洲中文久久精品无码| 伊人久久精品无码二区麻豆| 亚洲国产精品无码久久98| 久久免费的精品国产V∧| 一本一本久久A久久综合精品 |