• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            網(wǎng)絡(luò)服務(wù)器軟件開發(fā)/中間件開發(fā),關(guān)注ACE/ICE/boost

            C++博客 首頁 新隨筆 聯(lián)系 聚合 管理
              152 Posts :: 3 Stories :: 172 Comments :: 0 Trackbacks
                  內(nèi)存消息隊列是服務(wù)器端常用的基礎(chǔ)組件,他使得符合生產(chǎn)者-消費者模型的兩個線程或兩組線程之間的通訊看起來更加清晰,即生產(chǎn)者將消息壓入隊列,消費者從隊列里面取走消息并處理,具體到網(wǎng)絡(luò)服務(wù)器結(jié)構(gòu)中,生產(chǎn)者線程是網(wǎng)絡(luò)接收線程,消費者線程是邏輯處理線程,網(wǎng)絡(luò)線程不停的將接收到的數(shù)據(jù)放到全局消息隊列中,邏輯處理線程不停的從全局消息隊列中取走消息并處理。
                  system v消息隊列接口非常簡單,主要是msgsnd,msgrcv,每個消息的結(jié)構(gòu)中都包含一個類型信息,這樣在msgrcv時就可以選擇只讀取某個類型的消息,如果類型傳為0,則不考慮類型,讀取第一個消息。根據(jù)消息類型來獲取消息是非常有用的,考慮下述簡單服務(wù)器結(jié)構(gòu):
                                 client  server  dbproxy  
            其中,server是主要的業(yè)務(wù)處理服務(wù)器,dbproxy是數(shù)據(jù)庫代理。以server為例,他需要處理兩方面的消息:1。來自client的消息;2。來自dbproxy的消息。定義如下枚舉:
                                 enum eQueueType
                                {
                                        QUEUE_TYPE_CLIENT = 1,
                                        QUEUE_TYPE_WORLD  = 2, 
                                 };

                  當(dāng)網(wǎng)絡(luò)線程收到來自client的消息時,將消息放到QUEUE_TYPE_CLIENT 類型的隊列中,當(dāng)收到dbproxy的消息時,放到QUEUE_TYPE_WORLD中,然后設(shè)置兩個或兩組線程分別處理
            QUEUE_TYPE_CLIENT 隊列和QUEUE_TYPE_WORLD隊列,結(jié)構(gòu)非常清晰。在具體實現(xiàn)MessageQueue時,是基于ACE的消息隊列的,在內(nèi)部設(shè)置了一個消息隊列的map,也就是多個消息隊列,但在接口上就是一個隊列,MessageQueue只提供了根據(jù)類型來獲取隊列中的消息,沒有提供獲取整個隊列組中第一個消息的功能。
                  考慮使用全局消息隊列是我思考了一段時間才決定的,之前的做法是類似Active Object的,在ACE里就是ACE_Task類,該類即具備線程的功能,也包含了一個消息隊列,在使用時重載他的svc成員即可。對不熟悉ACE的人來說,ACE_Task在設(shè)計上比較復(fù)雜,還牽扯到隊列的操作,如此以來,提供更友好的接口就顯得很有必要,因為線程的使用大家都已經(jīng)非常熟悉,消息隊列在概念上也很清晰,那么思路就是:我可以自由的創(chuàng)建線程,在線程里從全局消息隊列中讀取消息。看似和ACE_Task沒有根本區(qū)別,其實全局消息隊列的C風(fēng)格的接口,而不是派生的方式,大大降低了復(fù)雜度。

                    代碼貼上,方便以后查閱:
            MessageQueue.h

            #pragma once

            #include 
            <map>
            #include 
            <ace/Synch.h>
            #include 
            <ace/Message_Queue.h>
            #include 
            <ace/Singleton.h>

            using std::map;

            class MessageQueue
            {
            public:
                MessageQueue(
            void);
                
            ~MessageQueue(void);

            public:
                
            static MessageQueue* Instance();
                ACE_Message_Block
            * GetMessage(int nType);
                ACE_Message_Block
            * GetMessage(int nType,int nSeconds );//超時nSeconds秒
                bool PutMessage(int nType,ACE_Message_Block* pMsg);
                
            bool PutMessage(int nType,ACE_Message_Block* pMsg,int nSeconds);//超時nSeconds秒
                
            protected:
                
            //獲取某類型的消息隊列,沒有則創(chuàng)建
                ACE_Message_Queue<ACE_MT_SYNCH>* GetQueue(int nType);
            private:
                map
            <int,ACE_Message_Queue<ACE_MT_SYNCH>* > m_QueueMap; 
                ACE_Thread_Mutex        m_QueueMapMutex;
            }
            ;

            typedef ACE_Singleton
            <MessageQueue,ACE_Thread_Mutex>    MessageQueueSingleton;


            MessageQueue.cpp

            #include 
            <cassert>
            #include 
            <ace/Guard_T.h>

            #include 
            "MessageQueue.h"
            // MessageQueue.cpp : 定義控制臺應(yīng)用程序的入口點。
            //


            MessageQueue::MessageQueue(
            void)
            {
            }


            MessageQueue::
            ~MessageQueue(void)
            {
                ACE_GUARD(ACE_Thread_Mutex,g,m_QueueMapMutex);
                
            for (map<int,ACE_Message_Queue<ACE_MT_SYNCH>* >::iterator iter = m_QueueMap.begin(); iter != m_QueueMap.end();++iter)
                
            {
                    ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = iter->second;
                    pQueue
            ->close();
                    delete pQueue;
                }

                m_QueueMap.clear();
            }


            MessageQueue
            * MessageQueue::Instance()
            {
                
            return MessageQueueSingleton::instance();
            }


            ACE_Message_Block
            * MessageQueue::GetMessage( int nType )
            {
                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);

                ACE_Message_Block
            * pMsg = NULL;
                
            int nRet = pQueue->dequeue(pMsg);

                
            if (nRet != -1)
                
            {
                    
            return pMsg;
                }

                
            else
                
            {
                    
            return NULL;
                }

            }


            ACE_Message_Block
            * MessageQueue::GetMessage( int nType,int nSeconds )
            {
                assert( nSeconds 
            > 0);
                ACE_Time_Value timeout 
            = ACE_OS::gettimeofday();
                timeout 
            += nSeconds;

                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);            

                ACE_Message_Block
            * pMsg = NULL;
                
            int nRet = pQueue->dequeue(pMsg,&timeout);

                
            if (nRet != -1)
                
            {
                    
            return pMsg;
                }

                
            else
                
            {
                    
            return NULL;
                }

            }


            bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg )
            {
                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);
                
            int nRet = pQueue->enqueue(pMsg);

                
            return nRet != -1;
            }


            bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg,int nSeconds )
            {
                assert( nSeconds 
            > 0);
                ACE_Time_Value timeout 
            = ACE_OS::gettimeofday();
                timeout 
            += nSeconds;

                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);
                
            int nRet = pQueue->enqueue(pMsg,&timeout);

                
            return nRet != -1;
            }


            ACE_Message_Queue
            <ACE_MT_SYNCH>* MessageQueue::GetQueue( int nType )
            {
                assert( nType 
            >= 0 );
                ACE_GUARD_RETURN(ACE_Thread_Mutex,g,m_QueueMapMutex,NULL);

                
            if (m_QueueMap.find(nType) != m_QueueMap.end())
                
            {
                    
            return m_QueueMap[nType];
                }

                
            else
                
            {
                    ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = new ACE_Message_Queue<ACE_MT_SYNCH>();
                    m_QueueMap[nType] 
            = pQueue;
                    
            return pQueue;
                }

            }

            test.cpp
            #include "MessageQueue.h"
            #include 
            <iostream>
            #include 
            <ace/OS.h>
            using namespace std;

            enum eQueueType
            {
                QUEUE_TYPE_CLIENT 
            = 1,
                QUEUE_TYPE_WORLD  
            = 2,    
            }
            ;

            int main(int argc, char* argv[])
            {

                MessageQueue
            * pQueue = MessageQueue::Instance();

                ACE_Message_Block
            * pClientMsg = new ACE_Message_Block(100);
                pClientMsg
            ->copy("client msg");
                pQueue
            ->PutMessage(QUEUE_TYPE_CLIENT,pClientMsg);


                ACE_Message_Block
            * pWorldMsg = new ACE_Message_Block(100);
                pWorldMsg
            ->copy("world msg");
                pQueue
            ->PutMessage(QUEUE_TYPE_WORLD,pWorldMsg);


                ACE_Message_Block
            * pTemp = NULL;
                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_CLIENT);
                cout 
            << pTemp->rd_ptr() << endl;

                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_WORLD);
                cout 
            << pTemp->rd_ptr() << endl;

                cout 
            << "begin time : " << ACE_OS::time(NULL) << endl;
                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_CLIENT,10);
                cout 
            << "end time : " << ACE_OS::time(NULL) << endl;
                
            if (pTemp == NULL)
                
            {
                    cout 
            << "time out when get client msg" << endl;
                }


                
            return 0;
            }

            posted on 2010-09-19 01:01 true 閱讀(2249) 評論(1)  編輯 收藏 引用 所屬分類: ACE游戲開發(fā)

            Feedback

            # re: 仿照system v消息隊列的內(nèi)存消息隊列[未登錄] 2010-09-20 16:15 vincent
            贊一個:)ace_task是個好玩意,有許多可以借鑒的東西  回復(fù)  更多評論
              

            久久久久亚洲AV无码专区体验 | av色综合久久天堂av色综合在| 99久久国产亚洲高清观看2024| 91精品国产高清久久久久久国产嫩草| a级毛片无码兔费真人久久| 久久乐国产精品亚洲综合| 欧美午夜精品久久久久免费视| 久久最新精品国产| 亚洲精品乱码久久久久久久久久久久 | 少妇久久久久久久久久| 青青热久久综合网伊人| 国产精品中文久久久久久久| 97久久精品国产精品青草| 天天影视色香欲综合久久| 久久97精品久久久久久久不卡| 久久se这里只有精品| 久久ZYZ资源站无码中文动漫| 久久一区二区三区99| 曰曰摸天天摸人人看久久久| 久久久无码精品亚洲日韩蜜臀浪潮 | 97精品伊人久久久大香线蕉 | yellow中文字幕久久网| 亚洲中文字幕无码久久2020| 久久精品国产精品亜洲毛片| 国产情侣久久久久aⅴ免费| 少妇无套内谢久久久久| 亚洲精品97久久中文字幕无码| 久久精品一区二区| 国产精品久久久久影院嫩草| 亚洲色婷婷综合久久| 中文字幕热久久久久久久| 久久久久亚洲AV无码专区桃色| 51久久夜色精品国产| 99久久国产免费福利| 91性高湖久久久久| 99精品伊人久久久大香线蕉| 国产L精品国产亚洲区久久| 亚洲国产精品久久久久| 精品免费久久久久国产一区| 久久国产精品国语对白| 久久香蕉国产线看观看猫咪?v|