• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            網絡服務器軟件開發/中間件開發,關注ACE/ICE/boost

            C++博客 首頁 新隨筆 聯系 聚合 管理
              152 Posts :: 3 Stories :: 172 Comments :: 0 Trackbacks
                  內存消息隊列是服務器端常用的基礎組件,他使得符合生產者-消費者模型的兩個線程或兩組線程之間的通訊看起來更加清晰,即生產者將消息壓入隊列,消費者從隊列里面取走消息并處理,具體到網絡服務器結構中,生產者線程是網絡接收線程,消費者線程是邏輯處理線程,網絡線程不停的將接收到的數據放到全局消息隊列中,邏輯處理線程不停的從全局消息隊列中取走消息并處理。
                  system v消息隊列接口非常簡單,主要是msgsnd,msgrcv,每個消息的結構中都包含一個類型信息,這樣在msgrcv時就可以選擇只讀取某個類型的消息,如果類型傳為0,則不考慮類型,讀取第一個消息。根據消息類型來獲取消息是非常有用的,考慮下述簡單服務器結構:
                                 client  server  dbproxy  
            其中,server是主要的業務處理服務器,dbproxy是數據庫代理。以server為例,他需要處理兩方面的消息:1。來自client的消息;2。來自dbproxy的消息。定義如下枚舉:
                                 enum eQueueType
                                {
                                        QUEUE_TYPE_CLIENT = 1,
                                        QUEUE_TYPE_WORLD  = 2, 
                                 };

                  當網絡線程收到來自client的消息時,將消息放到QUEUE_TYPE_CLIENT 類型的隊列中,當收到dbproxy的消息時,放到QUEUE_TYPE_WORLD中,然后設置兩個或兩組線程分別處理
            QUEUE_TYPE_CLIENT 隊列和QUEUE_TYPE_WORLD隊列,結構非常清晰。在具體實現MessageQueue時,是基于ACE的消息隊列的,在內部設置了一個消息隊列的map,也就是多個消息隊列,但在接口上就是一個隊列,MessageQueue只提供了根據類型來獲取隊列中的消息,沒有提供獲取整個隊列組中第一個消息的功能。
                  考慮使用全局消息隊列是我思考了一段時間才決定的,之前的做法是類似Active Object的,在ACE里就是ACE_Task類,該類即具備線程的功能,也包含了一個消息隊列,在使用時重載他的svc成員即可。對不熟悉ACE的人來說,ACE_Task在設計上比較復雜,還牽扯到隊列的操作,如此以來,提供更友好的接口就顯得很有必要,因為線程的使用大家都已經非常熟悉,消息隊列在概念上也很清晰,那么思路就是:我可以自由的創建線程,在線程里從全局消息隊列中讀取消息。看似和ACE_Task沒有根本區別,其實全局消息隊列的C風格的接口,而不是派生的方式,大大降低了復雜度。

                    代碼貼上,方便以后查閱:
            MessageQueue.h

            #pragma once

            #include 
            <map>
            #include 
            <ace/Synch.h>
            #include 
            <ace/Message_Queue.h>
            #include 
            <ace/Singleton.h>

            using std::map;

            class MessageQueue
            {
            public:
                MessageQueue(
            void);
                
            ~MessageQueue(void);

            public:
                
            static MessageQueue* Instance();
                ACE_Message_Block
            * GetMessage(int nType);
                ACE_Message_Block
            * GetMessage(int nType,int nSeconds );//超時nSeconds秒
                bool PutMessage(int nType,ACE_Message_Block* pMsg);
                
            bool PutMessage(int nType,ACE_Message_Block* pMsg,int nSeconds);//超時nSeconds秒
                
            protected:
                
            //獲取某類型的消息隊列,沒有則創建
                ACE_Message_Queue<ACE_MT_SYNCH>* GetQueue(int nType);
            private:
                map
            <int,ACE_Message_Queue<ACE_MT_SYNCH>* > m_QueueMap; 
                ACE_Thread_Mutex        m_QueueMapMutex;
            }
            ;

            typedef ACE_Singleton
            <MessageQueue,ACE_Thread_Mutex>    MessageQueueSingleton;


            MessageQueue.cpp

            #include 
            <cassert>
            #include 
            <ace/Guard_T.h>

            #include 
            "MessageQueue.h"
            // MessageQueue.cpp : 定義控制臺應用程序的入口點。
            //


            MessageQueue::MessageQueue(
            void)
            {
            }


            MessageQueue::
            ~MessageQueue(void)
            {
                ACE_GUARD(ACE_Thread_Mutex,g,m_QueueMapMutex);
                
            for (map<int,ACE_Message_Queue<ACE_MT_SYNCH>* >::iterator iter = m_QueueMap.begin(); iter != m_QueueMap.end();++iter)
                
            {
                    ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = iter->second;
                    pQueue
            ->close();
                    delete pQueue;
                }

                m_QueueMap.clear();
            }


            MessageQueue
            * MessageQueue::Instance()
            {
                
            return MessageQueueSingleton::instance();
            }


            ACE_Message_Block
            * MessageQueue::GetMessage( int nType )
            {
                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);

                ACE_Message_Block
            * pMsg = NULL;
                
            int nRet = pQueue->dequeue(pMsg);

                
            if (nRet != -1)
                
            {
                    
            return pMsg;
                }

                
            else
                
            {
                    
            return NULL;
                }

            }


            ACE_Message_Block
            * MessageQueue::GetMessage( int nType,int nSeconds )
            {
                assert( nSeconds 
            > 0);
                ACE_Time_Value timeout 
            = ACE_OS::gettimeofday();
                timeout 
            += nSeconds;

                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);            

                ACE_Message_Block
            * pMsg = NULL;
                
            int nRet = pQueue->dequeue(pMsg,&timeout);

                
            if (nRet != -1)
                
            {
                    
            return pMsg;
                }

                
            else
                
            {
                    
            return NULL;
                }

            }


            bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg )
            {
                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);
                
            int nRet = pQueue->enqueue(pMsg);

                
            return nRet != -1;
            }


            bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg,int nSeconds )
            {
                assert( nSeconds 
            > 0);
                ACE_Time_Value timeout 
            = ACE_OS::gettimeofday();
                timeout 
            += nSeconds;

                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);
                
            int nRet = pQueue->enqueue(pMsg,&timeout);

                
            return nRet != -1;
            }


            ACE_Message_Queue
            <ACE_MT_SYNCH>* MessageQueue::GetQueue( int nType )
            {
                assert( nType 
            >= 0 );
                ACE_GUARD_RETURN(ACE_Thread_Mutex,g,m_QueueMapMutex,NULL);

                
            if (m_QueueMap.find(nType) != m_QueueMap.end())
                
            {
                    
            return m_QueueMap[nType];
                }

                
            else
                
            {
                    ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = new ACE_Message_Queue<ACE_MT_SYNCH>();
                    m_QueueMap[nType] 
            = pQueue;
                    
            return pQueue;
                }

            }

            test.cpp
            #include "MessageQueue.h"
            #include 
            <iostream>
            #include 
            <ace/OS.h>
            using namespace std;

            enum eQueueType
            {
                QUEUE_TYPE_CLIENT 
            = 1,
                QUEUE_TYPE_WORLD  
            = 2,    
            }
            ;

            int main(int argc, char* argv[])
            {

                MessageQueue
            * pQueue = MessageQueue::Instance();

                ACE_Message_Block
            * pClientMsg = new ACE_Message_Block(100);
                pClientMsg
            ->copy("client msg");
                pQueue
            ->PutMessage(QUEUE_TYPE_CLIENT,pClientMsg);


                ACE_Message_Block
            * pWorldMsg = new ACE_Message_Block(100);
                pWorldMsg
            ->copy("world msg");
                pQueue
            ->PutMessage(QUEUE_TYPE_WORLD,pWorldMsg);


                ACE_Message_Block
            * pTemp = NULL;
                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_CLIENT);
                cout 
            << pTemp->rd_ptr() << endl;

                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_WORLD);
                cout 
            << pTemp->rd_ptr() << endl;

                cout 
            << "begin time : " << ACE_OS::time(NULL) << endl;
                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_CLIENT,10);
                cout 
            << "end time : " << ACE_OS::time(NULL) << endl;
                
            if (pTemp == NULL)
                
            {
                    cout 
            << "time out when get client msg" << endl;
                }


                
            return 0;
            }

            posted on 2010-09-19 01:01 true 閱讀(2240) 評論(1)  編輯 收藏 引用 所屬分類: ACE游戲開發

            Feedback

            # re: 仿照system v消息隊列的內存消息隊列[未登錄] 2010-09-20 16:15 vincent
            贊一個:)ace_task是個好玩意,有許多可以借鑒的東西  回復  更多評論
              

            久久久久亚洲精品天堂| 久久综合噜噜激激的五月天| 18岁日韩内射颜射午夜久久成人| 久久精品国产只有精品2020| AAA级久久久精品无码区| 三级片免费观看久久| 日韩精品久久久久久免费| 国内精品久久久久影院网站| 精品国产青草久久久久福利| 精品久久久久久久久中文字幕| 婷婷久久综合九色综合绿巨人 | 久久久久国产日韩精品网站| 久久综合久久综合亚洲| 天天综合久久久网| 无码乱码观看精品久久| 日本精品久久久久中文字幕8| 99精品久久久久久久婷婷| 久久精品国产一区二区| 99久久无色码中文字幕| 久久精品日日躁夜夜躁欧美| 欧洲性大片xxxxx久久久| 亚洲国产精品久久久久婷婷软件| 无码AV波多野结衣久久| 亚洲伊人久久综合影院| 国产国产成人久久精品| 91精品国产91久久久久久| 国产精品久久久久久吹潮| 99久久国产综合精品女同图片| 蜜桃麻豆www久久国产精品| 国内精品免费久久影院| 精品久久久久久无码国产| 美女写真久久影院| 欧美日韩中文字幕久久伊人| 久久精品国产亚洲AV麻豆网站 | 久久中文字幕一区二区| 日韩精品久久久久久久电影蜜臀| 国产亚洲精品久久久久秋霞 | 93精91精品国产综合久久香蕉| 99久久婷婷国产综合亚洲| 国产精品久久久久…| 国内精品伊人久久久久|