• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            網絡服務器軟件開發/中間件開發,關注ACE/ICE/boost

            C++博客 首頁 新隨筆 聯系 聚合 管理
              152 Posts :: 3 Stories :: 172 Comments :: 0 Trackbacks
                  內存消息隊列是服務器端常用的基礎組件,他使得符合生產者-消費者模型的兩個線程或兩組線程之間的通訊看起來更加清晰,即生產者將消息壓入隊列,消費者從隊列里面取走消息并處理,具體到網絡服務器結構中,生產者線程是網絡接收線程,消費者線程是邏輯處理線程,網絡線程不停的將接收到的數據放到全局消息隊列中,邏輯處理線程不停的從全局消息隊列中取走消息并處理。
                  system v消息隊列接口非常簡單,主要是msgsnd,msgrcv,每個消息的結構中都包含一個類型信息,這樣在msgrcv時就可以選擇只讀取某個類型的消息,如果類型傳為0,則不考慮類型,讀取第一個消息。根據消息類型來獲取消息是非常有用的,考慮下述簡單服務器結構:
                                 client  server  dbproxy  
            其中,server是主要的業務處理服務器,dbproxy是數據庫代理。以server為例,他需要處理兩方面的消息:1。來自client的消息;2。來自dbproxy的消息。定義如下枚舉:
                                 enum eQueueType
                                {
                                        QUEUE_TYPE_CLIENT = 1,
                                        QUEUE_TYPE_WORLD  = 2, 
                                 };

                  當網絡線程收到來自client的消息時,將消息放到QUEUE_TYPE_CLIENT 類型的隊列中,當收到dbproxy的消息時,放到QUEUE_TYPE_WORLD中,然后設置兩個或兩組線程分別處理
            QUEUE_TYPE_CLIENT 隊列和QUEUE_TYPE_WORLD隊列,結構非常清晰。在具體實現MessageQueue時,是基于ACE的消息隊列的,在內部設置了一個消息隊列的map,也就是多個消息隊列,但在接口上就是一個隊列,MessageQueue只提供了根據類型來獲取隊列中的消息,沒有提供獲取整個隊列組中第一個消息的功能。
                  考慮使用全局消息隊列是我思考了一段時間才決定的,之前的做法是類似Active Object的,在ACE里就是ACE_Task類,該類即具備線程的功能,也包含了一個消息隊列,在使用時重載他的svc成員即可。對不熟悉ACE的人來說,ACE_Task在設計上比較復雜,還牽扯到隊列的操作,如此以來,提供更友好的接口就顯得很有必要,因為線程的使用大家都已經非常熟悉,消息隊列在概念上也很清晰,那么思路就是:我可以自由的創建線程,在線程里從全局消息隊列中讀取消息。看似和ACE_Task沒有根本區別,其實全局消息隊列的C風格的接口,而不是派生的方式,大大降低了復雜度。

                    代碼貼上,方便以后查閱:
            MessageQueue.h

            #pragma once

            #include 
            <map>
            #include 
            <ace/Synch.h>
            #include 
            <ace/Message_Queue.h>
            #include 
            <ace/Singleton.h>

            using std::map;

            class MessageQueue
            {
            public:
                MessageQueue(
            void);
                
            ~MessageQueue(void);

            public:
                
            static MessageQueue* Instance();
                ACE_Message_Block
            * GetMessage(int nType);
                ACE_Message_Block
            * GetMessage(int nType,int nSeconds );//超時nSeconds秒
                bool PutMessage(int nType,ACE_Message_Block* pMsg);
                
            bool PutMessage(int nType,ACE_Message_Block* pMsg,int nSeconds);//超時nSeconds秒
                
            protected:
                
            //獲取某類型的消息隊列,沒有則創建
                ACE_Message_Queue<ACE_MT_SYNCH>* GetQueue(int nType);
            private:
                map
            <int,ACE_Message_Queue<ACE_MT_SYNCH>* > m_QueueMap; 
                ACE_Thread_Mutex        m_QueueMapMutex;
            }
            ;

            typedef ACE_Singleton
            <MessageQueue,ACE_Thread_Mutex>    MessageQueueSingleton;


            MessageQueue.cpp

            #include 
            <cassert>
            #include 
            <ace/Guard_T.h>

            #include 
            "MessageQueue.h"
            // MessageQueue.cpp : 定義控制臺應用程序的入口點。
            //


            MessageQueue::MessageQueue(
            void)
            {
            }


            MessageQueue::
            ~MessageQueue(void)
            {
                ACE_GUARD(ACE_Thread_Mutex,g,m_QueueMapMutex);
                
            for (map<int,ACE_Message_Queue<ACE_MT_SYNCH>* >::iterator iter = m_QueueMap.begin(); iter != m_QueueMap.end();++iter)
                
            {
                    ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = iter->second;
                    pQueue
            ->close();
                    delete pQueue;
                }

                m_QueueMap.clear();
            }


            MessageQueue
            * MessageQueue::Instance()
            {
                
            return MessageQueueSingleton::instance();
            }


            ACE_Message_Block
            * MessageQueue::GetMessage( int nType )
            {
                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);

                ACE_Message_Block
            * pMsg = NULL;
                
            int nRet = pQueue->dequeue(pMsg);

                
            if (nRet != -1)
                
            {
                    
            return pMsg;
                }

                
            else
                
            {
                    
            return NULL;
                }

            }


            ACE_Message_Block
            * MessageQueue::GetMessage( int nType,int nSeconds )
            {
                assert( nSeconds 
            > 0);
                ACE_Time_Value timeout 
            = ACE_OS::gettimeofday();
                timeout 
            += nSeconds;

                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);            

                ACE_Message_Block
            * pMsg = NULL;
                
            int nRet = pQueue->dequeue(pMsg,&timeout);

                
            if (nRet != -1)
                
            {
                    
            return pMsg;
                }

                
            else
                
            {
                    
            return NULL;
                }

            }


            bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg )
            {
                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);
                
            int nRet = pQueue->enqueue(pMsg);

                
            return nRet != -1;
            }


            bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg,int nSeconds )
            {
                assert( nSeconds 
            > 0);
                ACE_Time_Value timeout 
            = ACE_OS::gettimeofday();
                timeout 
            += nSeconds;

                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);
                
            int nRet = pQueue->enqueue(pMsg,&timeout);

                
            return nRet != -1;
            }


            ACE_Message_Queue
            <ACE_MT_SYNCH>* MessageQueue::GetQueue( int nType )
            {
                assert( nType 
            >= 0 );
                ACE_GUARD_RETURN(ACE_Thread_Mutex,g,m_QueueMapMutex,NULL);

                
            if (m_QueueMap.find(nType) != m_QueueMap.end())
                
            {
                    
            return m_QueueMap[nType];
                }

                
            else
                
            {
                    ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = new ACE_Message_Queue<ACE_MT_SYNCH>();
                    m_QueueMap[nType] 
            = pQueue;
                    
            return pQueue;
                }

            }

            test.cpp
            #include "MessageQueue.h"
            #include 
            <iostream>
            #include 
            <ace/OS.h>
            using namespace std;

            enum eQueueType
            {
                QUEUE_TYPE_CLIENT 
            = 1,
                QUEUE_TYPE_WORLD  
            = 2,    
            }
            ;

            int main(int argc, char* argv[])
            {

                MessageQueue
            * pQueue = MessageQueue::Instance();

                ACE_Message_Block
            * pClientMsg = new ACE_Message_Block(100);
                pClientMsg
            ->copy("client msg");
                pQueue
            ->PutMessage(QUEUE_TYPE_CLIENT,pClientMsg);


                ACE_Message_Block
            * pWorldMsg = new ACE_Message_Block(100);
                pWorldMsg
            ->copy("world msg");
                pQueue
            ->PutMessage(QUEUE_TYPE_WORLD,pWorldMsg);


                ACE_Message_Block
            * pTemp = NULL;
                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_CLIENT);
                cout 
            << pTemp->rd_ptr() << endl;

                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_WORLD);
                cout 
            << pTemp->rd_ptr() << endl;

                cout 
            << "begin time : " << ACE_OS::time(NULL) << endl;
                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_CLIENT,10);
                cout 
            << "end time : " << ACE_OS::time(NULL) << endl;
                
            if (pTemp == NULL)
                
            {
                    cout 
            << "time out when get client msg" << endl;
                }


                
            return 0;
            }

            posted on 2010-09-19 01:01 true 閱讀(2240) 評論(1)  編輯 收藏 引用 所屬分類: ACE游戲開發

            Feedback

            # re: 仿照system v消息隊列的內存消息隊列[未登錄] 2010-09-20 16:15 vincent
            贊一個:)ace_task是個好玩意,有許多可以借鑒的東西  回復  更多評論
              

            色老头网站久久网| 久久国产免费观看精品| 国内精品久久久久久久影视麻豆| 国产韩国精品一区二区三区久久| 51久久夜色精品国产| 久久久久久久91精品免费观看| 色欲久久久天天天综合网精品 | 日本久久久久久中文字幕| 久久精品无码av| 色欲综合久久躁天天躁蜜桃| 久久亚洲电影| 国产成人久久精品一区二区三区| 久久精品国产精品亜洲毛片 | 久久人人爽人人爽人人片AV不| 久久99精品综合国产首页| 漂亮人妻被中出中文字幕久久| 久久99国产精品久久99| 久久天天躁狠狠躁夜夜躁2014| 亚洲国产天堂久久综合网站| 亚洲中文字幕无码久久2020 | 久久久久香蕉视频| 久久国产免费观看精品3| 久久福利资源国产精品999| 国产999精品久久久久久| 久久精品国产精品亚洲毛片| 区久久AAA片69亚洲| 国内精品伊人久久久久妇| 国产—久久香蕉国产线看观看| 精品永久久福利一区二区| 亚洲精品乱码久久久久久按摩 | 欧美一级久久久久久久大| 久久综合九色综合精品| 97久久天天综合色天天综合色hd| 99久久99久久精品国产片果冻| 久久久久青草线蕉综合超碰| 久久毛片一区二区| 精品久久久久成人码免费动漫| 欧美激情一区二区久久久| 精品伊人久久大线蕉色首页| 久久精品国产2020| 无码超乳爆乳中文字幕久久|