• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            網(wǎng)絡(luò)服務(wù)器軟件開發(fā)/中間件開發(fā),關(guān)注ACE/ICE/boost

            C++博客 首頁 新隨筆 聯(lián)系 聚合 管理
              152 Posts :: 3 Stories :: 172 Comments :: 0 Trackbacks
                  內(nèi)存消息隊列是服務(wù)器端常用的基礎(chǔ)組件,他使得符合生產(chǎn)者-消費(fèi)者模型的兩個線程或兩組線程之間的通訊看起來更加清晰,即生產(chǎn)者將消息壓入隊列,消費(fèi)者從隊列里面取走消息并處理,具體到網(wǎng)絡(luò)服務(wù)器結(jié)構(gòu)中,生產(chǎn)者線程是網(wǎng)絡(luò)接收線程,消費(fèi)者線程是邏輯處理線程,網(wǎng)絡(luò)線程不停的將接收到的數(shù)據(jù)放到全局消息隊列中,邏輯處理線程不停的從全局消息隊列中取走消息并處理。
                  system v消息隊列接口非常簡單,主要是msgsnd,msgrcv,每個消息的結(jié)構(gòu)中都包含一個類型信息,這樣在msgrcv時就可以選擇只讀取某個類型的消息,如果類型傳為0,則不考慮類型,讀取第一個消息。根據(jù)消息類型來獲取消息是非常有用的,考慮下述簡單服務(wù)器結(jié)構(gòu):
                                 client  server  dbproxy  
            其中,server是主要的業(yè)務(wù)處理服務(wù)器,dbproxy是數(shù)據(jù)庫代理。以server為例,他需要處理兩方面的消息:1。來自client的消息;2。來自dbproxy的消息。定義如下枚舉:
                                 enum eQueueType
                                {
                                        QUEUE_TYPE_CLIENT = 1,
                                        QUEUE_TYPE_WORLD  = 2, 
                                 };

                  當(dāng)網(wǎng)絡(luò)線程收到來自client的消息時,將消息放到QUEUE_TYPE_CLIENT 類型的隊列中,當(dāng)收到dbproxy的消息時,放到QUEUE_TYPE_WORLD中,然后設(shè)置兩個或兩組線程分別處理
            QUEUE_TYPE_CLIENT 隊列和QUEUE_TYPE_WORLD隊列,結(jié)構(gòu)非常清晰。在具體實(shí)現(xiàn)MessageQueue時,是基于ACE的消息隊列的,在內(nèi)部設(shè)置了一個消息隊列的map,也就是多個消息隊列,但在接口上就是一個隊列,MessageQueue只提供了根據(jù)類型來獲取隊列中的消息,沒有提供獲取整個隊列組中第一個消息的功能。
                  考慮使用全局消息隊列是我思考了一段時間才決定的,之前的做法是類似Active Object的,在ACE里就是ACE_Task類,該類即具備線程的功能,也包含了一個消息隊列,在使用時重載他的svc成員即可。對不熟悉ACE的人來說,ACE_Task在設(shè)計上比較復(fù)雜,還牽扯到隊列的操作,如此以來,提供更友好的接口就顯得很有必要,因為線程的使用大家都已經(jīng)非常熟悉,消息隊列在概念上也很清晰,那么思路就是:我可以自由的創(chuàng)建線程,在線程里從全局消息隊列中讀取消息。看似和ACE_Task沒有根本區(qū)別,其實(shí)全局消息隊列的C風(fēng)格的接口,而不是派生的方式,大大降低了復(fù)雜度。

                    代碼貼上,方便以后查閱:
            MessageQueue.h

            #pragma once

            #include 
            <map>
            #include 
            <ace/Synch.h>
            #include 
            <ace/Message_Queue.h>
            #include 
            <ace/Singleton.h>

            using std::map;

            class MessageQueue
            {
            public:
                MessageQueue(
            void);
                
            ~MessageQueue(void);

            public:
                
            static MessageQueue* Instance();
                ACE_Message_Block
            * GetMessage(int nType);
                ACE_Message_Block
            * GetMessage(int nType,int nSeconds );//超時nSeconds秒
                bool PutMessage(int nType,ACE_Message_Block* pMsg);
                
            bool PutMessage(int nType,ACE_Message_Block* pMsg,int nSeconds);//超時nSeconds秒
                
            protected:
                
            //獲取某類型的消息隊列,沒有則創(chuàng)建
                ACE_Message_Queue<ACE_MT_SYNCH>* GetQueue(int nType);
            private:
                map
            <int,ACE_Message_Queue<ACE_MT_SYNCH>* > m_QueueMap; 
                ACE_Thread_Mutex        m_QueueMapMutex;
            }
            ;

            typedef ACE_Singleton
            <MessageQueue,ACE_Thread_Mutex>    MessageQueueSingleton;


            MessageQueue.cpp

            #include 
            <cassert>
            #include 
            <ace/Guard_T.h>

            #include 
            "MessageQueue.h"
            // MessageQueue.cpp : 定義控制臺應(yīng)用程序的入口點(diǎn)。
            //


            MessageQueue::MessageQueue(
            void)
            {
            }


            MessageQueue::
            ~MessageQueue(void)
            {
                ACE_GUARD(ACE_Thread_Mutex,g,m_QueueMapMutex);
                
            for (map<int,ACE_Message_Queue<ACE_MT_SYNCH>* >::iterator iter = m_QueueMap.begin(); iter != m_QueueMap.end();++iter)
                
            {
                    ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = iter->second;
                    pQueue
            ->close();
                    delete pQueue;
                }

                m_QueueMap.clear();
            }


            MessageQueue
            * MessageQueue::Instance()
            {
                
            return MessageQueueSingleton::instance();
            }


            ACE_Message_Block
            * MessageQueue::GetMessage( int nType )
            {
                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);

                ACE_Message_Block
            * pMsg = NULL;
                
            int nRet = pQueue->dequeue(pMsg);

                
            if (nRet != -1)
                
            {
                    
            return pMsg;
                }

                
            else
                
            {
                    
            return NULL;
                }

            }


            ACE_Message_Block
            * MessageQueue::GetMessage( int nType,int nSeconds )
            {
                assert( nSeconds 
            > 0);
                ACE_Time_Value timeout 
            = ACE_OS::gettimeofday();
                timeout 
            += nSeconds;

                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);            

                ACE_Message_Block
            * pMsg = NULL;
                
            int nRet = pQueue->dequeue(pMsg,&timeout);

                
            if (nRet != -1)
                
            {
                    
            return pMsg;
                }

                
            else
                
            {
                    
            return NULL;
                }

            }


            bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg )
            {
                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);
                
            int nRet = pQueue->enqueue(pMsg);

                
            return nRet != -1;
            }


            bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg,int nSeconds )
            {
                assert( nSeconds 
            > 0);
                ACE_Time_Value timeout 
            = ACE_OS::gettimeofday();
                timeout 
            += nSeconds;

                ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = GetQueue(nType);
                
            int nRet = pQueue->enqueue(pMsg,&timeout);

                
            return nRet != -1;
            }


            ACE_Message_Queue
            <ACE_MT_SYNCH>* MessageQueue::GetQueue( int nType )
            {
                assert( nType 
            >= 0 );
                ACE_GUARD_RETURN(ACE_Thread_Mutex,g,m_QueueMapMutex,NULL);

                
            if (m_QueueMap.find(nType) != m_QueueMap.end())
                
            {
                    
            return m_QueueMap[nType];
                }

                
            else
                
            {
                    ACE_Message_Queue
            <ACE_MT_SYNCH>* pQueue = new ACE_Message_Queue<ACE_MT_SYNCH>();
                    m_QueueMap[nType] 
            = pQueue;
                    
            return pQueue;
                }

            }

            test.cpp
            #include "MessageQueue.h"
            #include 
            <iostream>
            #include 
            <ace/OS.h>
            using namespace std;

            enum eQueueType
            {
                QUEUE_TYPE_CLIENT 
            = 1,
                QUEUE_TYPE_WORLD  
            = 2,    
            }
            ;

            int main(int argc, char* argv[])
            {

                MessageQueue
            * pQueue = MessageQueue::Instance();

                ACE_Message_Block
            * pClientMsg = new ACE_Message_Block(100);
                pClientMsg
            ->copy("client msg");
                pQueue
            ->PutMessage(QUEUE_TYPE_CLIENT,pClientMsg);


                ACE_Message_Block
            * pWorldMsg = new ACE_Message_Block(100);
                pWorldMsg
            ->copy("world msg");
                pQueue
            ->PutMessage(QUEUE_TYPE_WORLD,pWorldMsg);


                ACE_Message_Block
            * pTemp = NULL;
                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_CLIENT);
                cout 
            << pTemp->rd_ptr() << endl;

                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_WORLD);
                cout 
            << pTemp->rd_ptr() << endl;

                cout 
            << "begin time : " << ACE_OS::time(NULL) << endl;
                pTemp 
            = pQueue->GetMessage(QUEUE_TYPE_CLIENT,10);
                cout 
            << "end time : " << ACE_OS::time(NULL) << endl;
                
            if (pTemp == NULL)
                
            {
                    cout 
            << "time out when get client msg" << endl;
                }


                
            return 0;
            }

            posted on 2010-09-19 01:01 true 閱讀(2240) 評論(1)  編輯 收藏 引用 所屬分類: ACE游戲開發(fā)

            Feedback

            # re: 仿照system v消息隊列的內(nèi)存消息隊列[未登錄] 2010-09-20 16:15 vincent
            贊一個:)ace_task是個好玩意,有許多可以借鑒的東西  回復(fù)  更多評論
              

            久久久久亚洲av无码专区喷水| 99久久精品久久久久久清纯 | 亚洲精品午夜国产va久久| 久久精品国产福利国产琪琪| 久久午夜福利无码1000合集| a级成人毛片久久| 一级女性全黄久久生活片免费 | 久久精品一本到99热免费| 国内精品久久久久影院免费| 亚洲美日韩Av中文字幕无码久久久妻妇| 伊人久久大香线蕉综合影院首页| 国产69精品久久久久99尤物| 伊人久久久AV老熟妇色| 久久婷婷五月综合成人D啪| 国产精品福利一区二区久久| 亚洲香蕉网久久综合影视 | 中文字幕久久精品无码| 国产综合成人久久大片91| 久久久久久无码Av成人影院| 四虎国产精品成人免费久久| 久久精品国产亚洲5555| 久久综合欧美成人| 久久久精品国产sm调教网站| 久久青青草视频| 久久婷婷午色综合夜啪| 青青草原综合久久大伊人导航| 99热都是精品久久久久久| 麻豆精品久久久一区二区| 国产精品久久久久久久久鸭 | 国产精品午夜久久| 好属妞这里只有精品久久| 九九久久自然熟的香蕉图片| 人妻精品久久无码区| 麻豆成人久久精品二区三区免费| 亚洲AV成人无码久久精品老人| 精品熟女少妇AV免费久久| 欧美精品国产综合久久| 久久久久久久91精品免费观看| 中文字幕无码久久久| 亚洲欧美精品一区久久中文字幕| 久久精品免费大片国产大片|