內存消息隊列是服務器端常用的基礎組件,他使得符合生產者-消費者模型的兩個線程或兩組線程之間的通訊看起來更加清晰,即生產者將消息壓入隊列,消費者從隊列里面取走消息并處理,具體到網絡服務器結構中,生產者線程是網絡接收線程,消費者線程是邏輯處理線程,網絡線程不停的將接收到的數據放到全局消息隊列中,邏輯處理線程不停的從全局消息隊列中取走消息并處理。
system v消息隊列接口非常簡單,主要是msgsnd,msgrcv,每個消息的結構中都包含一個類型信息,這樣在msgrcv時就可以選擇只讀取某個類型的消息,如果類型傳為0,則不考慮類型,讀取第一個消息。根據消息類型來獲取消息是非常有用的,考慮下述簡單服務器結構:
client server dbproxy
其中,server是主要的業務處理服務器,dbproxy是數據庫代理。以server為例,他需要處理兩方面的消息:1。來自client的消息;2。來自dbproxy的消息。定義如下枚舉:
enum eQueueType
{
QUEUE_TYPE_CLIENT = 1,
QUEUE_TYPE_WORLD = 2,
};
當網絡線程收到來自client的消息時,將消息放到QUEUE_TYPE_CLIENT 類型的隊列中,當收到dbproxy的消息時,放到QUEUE_TYPE_WORLD中,然后設置兩個或兩組線程分別處理
QUEUE_TYPE_CLIENT 隊列和QUEUE_TYPE_WORLD隊列,結構非常清晰。在具體實現MessageQueue時,是基于ACE的消息隊列的,在內部設置了一個消息隊列的map,也就是多個消息隊列,但在接口上就是一個隊列,MessageQueue只提供了根據類型來獲取隊列中的消息,沒有提供獲取整個隊列組中第一個消息的功能。
考慮使用全局消息隊列是我思考了一段時間才決定的,之前的做法是類似Active Object的,在ACE里就是ACE_Task類,該類即具備線程的功能,也包含了一個消息隊列,在使用時重載他的svc成員即可。對不熟悉ACE的人來說,ACE_Task在設計上比較復雜,還牽扯到隊列的操作,如此以來,提供更友好的接口就顯得很有必要,因為線程的使用大家都已經非常熟悉,消息隊列在概念上也很清晰,那么思路就是:我可以自由的創建線程,在線程里從全局消息隊列中讀取消息。看似和ACE_Task沒有根本區別,其實全局消息隊列的C風格的接口,而不是派生的方式,大大降低了復雜度。
代碼貼上,方便以后查閱:
MessageQueue.h
#pragma once

#include <map>
#include <ace/Synch.h>
#include <ace/Message_Queue.h>
#include <ace/Singleton.h>

using std::map;

class MessageQueue


{
public:
MessageQueue(void);
~MessageQueue(void);

public:
static MessageQueue* Instance();
ACE_Message_Block* GetMessage(int nType);
ACE_Message_Block* GetMessage(int nType,int nSeconds );//超時nSeconds秒
bool PutMessage(int nType,ACE_Message_Block* pMsg);
bool PutMessage(int nType,ACE_Message_Block* pMsg,int nSeconds);//超時nSeconds秒
protected:
//獲取某類型的消息隊列,沒有則創建
ACE_Message_Queue<ACE_MT_SYNCH>* GetQueue(int nType);
private:
map<int,ACE_Message_Queue<ACE_MT_SYNCH>* > m_QueueMap;
ACE_Thread_Mutex m_QueueMapMutex;
};

typedef ACE_Singleton<MessageQueue,ACE_Thread_Mutex> MessageQueueSingleton;
MessageQueue.cpp

#include <cassert>
#include <ace/Guard_T.h>

#include "MessageQueue.h"
// MessageQueue.cpp : 定義控制臺應用程序的入口點。
//


MessageQueue::MessageQueue(void)


{
}

MessageQueue::~MessageQueue(void)


{
ACE_GUARD(ACE_Thread_Mutex,g,m_QueueMapMutex);
for (map<int,ACE_Message_Queue<ACE_MT_SYNCH>* >::iterator iter = m_QueueMap.begin(); iter != m_QueueMap.end();++iter)

{
ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = iter->second;
pQueue->close();
delete pQueue;
}
m_QueueMap.clear();
}

MessageQueue* MessageQueue::Instance()


{
return MessageQueueSingleton::instance();
}

ACE_Message_Block* MessageQueue::GetMessage( int nType )


{
ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = GetQueue(nType);

ACE_Message_Block* pMsg = NULL;
int nRet = pQueue->dequeue(pMsg);

if (nRet != -1)

{
return pMsg;
}
else

{
return NULL;
}
}

ACE_Message_Block* MessageQueue::GetMessage( int nType,int nSeconds )


{
assert( nSeconds > 0);
ACE_Time_Value timeout = ACE_OS::gettimeofday();
timeout += nSeconds;

ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = GetQueue(nType);

ACE_Message_Block* pMsg = NULL;
int nRet = pQueue->dequeue(pMsg,&timeout);

if (nRet != -1)

{
return pMsg;
}
else

{
return NULL;
}
}

bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg )


{
ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = GetQueue(nType);
int nRet = pQueue->enqueue(pMsg);

return nRet != -1;
}

bool MessageQueue::PutMessage( int nType,ACE_Message_Block* pMsg,int nSeconds )


{
assert( nSeconds > 0);
ACE_Time_Value timeout = ACE_OS::gettimeofday();
timeout += nSeconds;

ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = GetQueue(nType);
int nRet = pQueue->enqueue(pMsg,&timeout);

return nRet != -1;
}

ACE_Message_Queue<ACE_MT_SYNCH>* MessageQueue::GetQueue( int nType )


{
assert( nType >= 0 );
ACE_GUARD_RETURN(ACE_Thread_Mutex,g,m_QueueMapMutex,NULL);

if (m_QueueMap.find(nType) != m_QueueMap.end())

{
return m_QueueMap[nType];
}
else

{
ACE_Message_Queue<ACE_MT_SYNCH>* pQueue = new ACE_Message_Queue<ACE_MT_SYNCH>();
m_QueueMap[nType] = pQueue;
return pQueue;
}
}
test.cpp
#include "MessageQueue.h"
#include <iostream>
#include <ace/OS.h>
using namespace std;

enum eQueueType


{
QUEUE_TYPE_CLIENT = 1,
QUEUE_TYPE_WORLD = 2,
};

int main(int argc, char* argv[])


{

MessageQueue* pQueue = MessageQueue::Instance();

ACE_Message_Block* pClientMsg = new ACE_Message_Block(100);
pClientMsg->copy("client msg");
pQueue->PutMessage(QUEUE_TYPE_CLIENT,pClientMsg);


ACE_Message_Block* pWorldMsg = new ACE_Message_Block(100);
pWorldMsg->copy("world msg");
pQueue->PutMessage(QUEUE_TYPE_WORLD,pWorldMsg);


ACE_Message_Block* pTemp = NULL;
pTemp = pQueue->GetMessage(QUEUE_TYPE_CLIENT);
cout << pTemp->rd_ptr() << endl;

pTemp = pQueue->GetMessage(QUEUE_TYPE_WORLD);
cout << pTemp->rd_ptr() << endl;

cout << "begin time : " << ACE_OS::time(NULL) << endl;
pTemp = pQueue->GetMessage(QUEUE_TYPE_CLIENT,10);
cout << "end time : " << ACE_OS::time(NULL) << endl;
if (pTemp == NULL)

{
cout << "time out when get client msg" << endl;
}

return 0;
}
