關于RMsgQueue類的使用
RMsgQueue類是Symbian OS EKA2才提供的一個類,最近因為項目中要使用,為此對使用進行如下小結。
因為RMsgQueue類只是一個封裝好的內核資源類,類似于RSocket和RTimer類,要想使用它進行異步操作就必須對其用AO類來封裝,從而來實現監聽消息,在有消息過來時得到通知并根據消息內容進行相對應的處理。
那這個消息內容又該如何定義呢?我們可以參看下RMsgQueue類定義
template <typename T>
class RMsgQueue : public RMsgQueueBase
{
public:
TInt CreateLocal(TInt aSize, TOwnerType aType=EOwnerProcess);
TInt CreateGlobal(const TDesC& aName, TInt aSize, TOwnerType aType=EOwnerProcess);
TInt Send(const T& aMsg);
void SendBlocking(const T& aMsg);
TInt Receive(T& aMsg);
void ReceiveBlocking(T& aMsg);
};
顯然該類RMsgQueue也是Symbian的廋模板類,具體消息內容可以根據需求自定義類型,即如果傳遞的消息足夠簡單則可以采用Tint類型來定義,考慮到我們的實際應用對消息內容進行如下定義
typedef enum _NetWork_Msg_Type
{
NetworkSocketCreate,
NetworkSocketConnect,
NetworkSocketSend,
NetworkSocketRecv,
NetworkSocketClose,
}NetWorkMsgType;
typedef struct _NetWork_Msg
{
NetWorkMsgType m_msgType;
TInt32 m_msgFd;
}NetworkMsg;
其中的NetworkMsg就是我們在這里使用的消息內容,其中的成員m_msgType 是NetWorkMsgType定義的枚舉值,表示消息類型,另一個成員m_msgFd是我們跨線程傳遞一個Id值。
有了以上消息內容,我們就可以簡單的來定義消息隊列了
RMsgQueue<NetworkMsg> g_NetWorkMsgQue;
如上已經實例化了一個RMsgQueue對象,但是RMsgQueue對象必須要像RTimer一樣需要調用CreateLocal或者CreateGlobal才能創建起來,那我們在哪里創建它呢?這兩個創建又有什么區別呢?由于RMsgQueue可以是全局的也可以是局部的,假如使用局部的,那么類的封裝性較好,而且可以很好的跨進程使用,但是缺點是需要傳遞對象,此時創建就可以使用CreateGlobal。考慮到我們只在一個進程的不同線程間使用,同一進程的線程間可以共享資源(內存),所以這里我們就采用簡單的全局變量來實現它,為此創建就用了簡單的CreateLocal函數,而且我們將其放在了AO的二階段構造中,詳見示例代碼。
至于AO的其它封裝,主要是利用RMsgQueue如下三個函數
void RMsgQueue::NotifyDataAvailable(TRequestStatus& aStatus);
TInt RMsgQueue::Send(const T& aMsg);
TInt RMsgQueue::Receive(T& aMsg);
先在封裝的AO中調用NotifyDataAvailable開啟消息的監聽,然后就開始等待外部的Send函數調用發消息進來,一旦有消息Send進來就進入AO的Runl中,我們通過調用Receive函數來對傳遞進來的消息內容進行解析和相應處理。邏輯就這么簡單,下面給出源代碼供參考。
頭文件內容:
/*
* MessageQueueAO.h
*
* Created on: 2010-3-30
* Author: frank
*/
#ifndef MESSAGEQUEUEAO_H_
#define MESSAGEQUEUEAO_H_
#include <e32base.h>
#include <e32msgqueue.h>
typedef enum _NetWork_Msg_Type
{
NetworkConnect,
NetworkSocketCreate,
NetworkSocketConnect,
NetworkSocketSend,
NetworkSocketRecv,
NetworkSocketClose,
NetWorkDisConnect
}NetWorkMsgType;
typedef struct _NetWork_Msg
{
NetWorkMsgType m_msgType;
TInt32 m_msgFd;
}NetworkMsg;
const TInt KNumberOfMsgs = 10;
extern RMsgQueue<NetworkMsg> g_NetWorkMsgQue;
class CMessageQueueAO : public CActive
{
public:
// Cancel and destroy
virtual ~CMessageQueueAO();
// Two-phased constructor.
static CMessageQueueAO* NewL();
// Two-phased constructor.
static CMessageQueueAO* NewLC();
public:
// New functions
// Function for making the initial request
TInt StartMessageGet(const TDesC& aText=KNullDesC);
private:
// C++ constructor
CMessageQueueAO();
// Second-phase constructor
void ConstructL();
private:
// From CActive
// Handle completion
void RunL();
// How to cancel me
void DoCancel();
// Override to handle leaves from RunL(). Default implementation causes
// the active scheduler to panic.
TInt RunError(TInt aError);
};
#endif /* MESSAGEQUEUEAO_H_ */
實現文件內容
/*
* MessageQueueAO.cpp
*
* Created on: 2010-3-30
* Author: frank
*/
#include "MessageQueueAO.h"
RMsgQueue<NetworkMsg> g_NetWorkMsgQue;
CMessageQueueAO::CMessageQueueAO()
:CActive(EPriorityHigh)
{
// TODO Auto-generated constructor stub
}
CMessageQueueAO::~CMessageQueueAO()
{
// TODO Auto-generated destructor stub
Cancel();
}
CMessageQueueAO* CMessageQueueAO::NewL()
{
CMessageQueueAO* self = CMessageQueueAO::NewLC();
CleanupStack::Pop(); // self;
return self;
}
CMessageQueueAO* CMessageQueueAO::NewLC()
{
CMessageQueueAO* self = new (ELeave) CMessageQueueAO();
CleanupStack::PushL(self);
self->ConstructL();
return self;
}
void CMessageQueueAO::ConstructL()
{
CActiveScheduler::Add(this); // Add to scheduler
g_NetWorkMsgQue.CreateLocal(KNumberOfMsgs);
}
TInt CMessageQueueAO::StartMessageGet(const TDesC& aText)
{
g_NetWorkMsgQue.NotifyDataAvailable(iStatus);
SetActive(); // Tell scheduler a request is active
}
void SocketCreate(TInt32 aFd)
{
}
void SocketRecv(TInt32 aFd)
{
}
void SocketConnect(TInt32 aFd)
{
}
void CMessageQueueAO::RunL()
{
if (iStatus.Int() == KErrNone)
{
//接收數據
NetworkMsg msgfrmq;
g_NetWorkMsgQue.Receive(msgfrmq);
switch(msgfrmq.m_msgType)
{
case NetworkSocketCreate:
LockMutexRaw(g_NetMgrMutex);
SocketCreate(msgfrmq.m_msgFd);
UnlockMutexRaw(g_NetMgrMutex);
break;
case NetworkSocketConnect:
LockMutexRaw(g_NetMgrMutex);
SocketConnect(msgfrmq.m_msgFd);
UnlockMutexRaw(g_NetMgrMutex);
break;
case NetworkSocketRecv:
LockMutexRaw(g_NetMgrMutex);
SocketRecv(msgfrmq.m_msgFd);
UnlockMutexRaw(g_NetMgrMutex);
break;
case NetworkSocketClose:
LockMutexRaw(g_NetMgrMutex);
SocketClose(msgfrmq.m_msgFd);
UnlockMutexRaw(g_NetMgrMutex);
break;
default:
break;
}
//Using ReceiveBlocking()
// msgqueue.ReceiveBlocking(msgfrmq);// 如果使用blocking 可能那邊要發送兩次消息這邊才會跑哦
}
g_NetWorkMsgQue.NotifyDataAvailable(iStatus);//開啟下一個消息的接收
SetActive();
}
void CMessageQueueAO::DoCancel()
{
if( IsActive() )
{
g_NetWorkMsgQue.CancelDataAvailable();
}
}
TInt CMessageQueueAO::RunError(TInt aError)
{
return aError;
}
封裝完這個AO,大功就可以完成了,將這個AO放在一個獨立的線程中創建起來,并調用StartMessageGet讓這個線程始終監聽消息。外界線程通過調用g_NetWorkMsgQue.Send(msgfrmq)來給這個AO線程發消息并得到相應的處理。在這里我就不過多展開了,因為要涉及兩個線程間的操作,篇幅就太大了。
在結束小結的時候,我想著如何實現IPC,這樣的話,這個小結就完整了,碰巧,在逛博客的時候,發現一篇博文總結得很好,而且言簡意賅,為此轉載與下面,跟大家一起分享,博文原址http://blog.sina.com.cn/s/blog_63b4ee0d0100g3xc.html
如何理解進程間通信?
先用一個通俗的例子來解釋:比如我們需要實現這樣一種模式,應用由進程A與進程B兩部分組成,A有UI,負責用戶交互;B沒有UI,完全后臺運行,A與B之間可以相互通信。就如彩信的模式,彩信到達后后臺下載,下載完給出提示信息(即用戶界面),閱讀彩信再激活“信息”程序。
進程通信即兩個并行進程可以通過互相發送消息進行合作,消息是通過消息緩沖而在進程之間相互傳遞的。
RMsgQueue是3版提供的比較好的與事件機制融合的技術, 2版只能使用其它傳統的, 信號量, 共享內存等技術.
解決方案:
讓我們來看代碼:
Server
void CP2PServer::ConstructL()
{
//創建
iMsgQueue.CreateGlobal(KGLobalName, KNumberOfSlots, KMessageLength, EOwnerProcess);
CActiveScheduler::Add( this); // Add to scheduler
iMsgQueue.NotifyDataAvailable( iStatus );//開始監聽消息
SetActive(); // Tell scheduler a request is active
}
void CP2PServer::RunL()
{
if (iStatus.Int() == KErrNone)
{
//接收數據
TRAPD(error,iMsgQueue.Receive( &str_SendData, KMessageLength));
if(error==KErrNone)
{
iObserver->HandleMessageReceiveL(str_SendData.DataBuf.Left(str_SendData.DataLength));
}
}
}
Client
void CP2PClient::ConstructL()
{
//創建
iMsgQueue.OpenGlobal(KGLobalName, EOwnerProcess);
}
//發送消息
void CP2PClient::SendMessageL(const TDesC8& aSendMessage)
{
STR_SENDDATA temp;
temp.DataLength = aSendMessage.Length();
temp.DataBuf.Copy(aSendMessage);
iMsgQueue.Send(&temp, KMessageLength);
}
就一些疑難問題解決如下:
B線程創建一個“東東”,名字為KGLobalName,并且B線程開啟監控消息, A線程在需要時使用帶KGLobalName參數的OpenGlobal函數打開同名, 然后發送消息出來, 之后直接關閉,B會收到此消息解析后開始進行相應的處理。
上面的這個“東東”就是一個消息隊列, 只要創建者線程B存在, 就可以使用, 一般這個設計都是B始終運行, 并且建立RMsgQueue, 其它進程需要時打開, 發送消息,然后關閉。
如果雙方都可以作為發送方與接受方,則每方都創建RMsgQueue,注意使用不同的名字。
最后除了感謝上面這位牛人給出的總結外,我再補充一下,如果不是為了避免傳遞消息時的內存拷貝操作,單向操作用C/S架構更為可取,至于雙向操作,目前看來的確還是RMsgQueue好些,不知道大牛們覺得我這種猜想是否成立。
posted on 2010-04-09 21:40
frank.sunny 閱讀(2026)
評論(1) 編輯 收藏 引用 所屬分類:
symbian 開發