相關UML:

CAttempterEngine實現了兩個接口:IQueueServiceSink、IAttemperEngine;
通過前面的分析,偶們了解到,IQueueServiceSink這個接口被是用來處理CQueueService中的數據的,根據上面的UML我們可以看到,CAttemperEngine關聯了一個CQueueService(或者直接點說是:持有了一個CQueueService對象,說組合也成)。這樣的話這個CAttemperEngine暴露出來的接口就只剩下IAttemperEnging了。
這個接口有兩處值得單獨討論的:SetSocketEngine、GetQueueService;
SetSocketEngine,后面的分析中還會出現,我覺得這里是一個設計上的失誤導致需要暴露socket引擎接口;
GetQueueService的設計思路可能是說,每個IAttemperEngine接口背后都有一個CQueueService,從以后的分析中可以看到,這個思路是理解整個kernel的關鍵。調度引擎應該是一個消息匯總(從個個引擎產生的消息)然后派發到IAttemperEngineSink。因為代碼中是沒有看到有關ITimerSink ISocketSink之類的東東的,,,
整個消息是個引擎產生,然后投遞到指定的CQueueService,然后匯總到這里被派發到IAttemperEngineSink出去的,,,
看看CAttemperEngine中處理數據的代碼:
這個函數中一個很重要的參數:wIdentifier;
可以來追溯一下他的源頭:
另外一個要注意的點是對socket事件的處理,我之前認為調度引擎組合了一個socket引擎是一個設計缺陷,應為這里的在處理socket read事件的時候如果異常了直接直接使用引擎來關閉socket而不是調用socket sink的指定接口。猜想也許是不希望客戶端直接處理socket句柄吧,,,
還是用一句話描述下調度引擎:
調度引擎的工作可以這樣描述消息匯總、派發。其他引擎通過CQueueServiceEvent將消息post到調度引擎上來(通過共享同一個CQueueService),然后由調度引擎集中派發出去,,,

CAttempterEngine實現了兩個接口:IQueueServiceSink、IAttemperEngine;
通過前面的分析,偶們了解到,IQueueServiceSink這個接口被是用來處理CQueueService中的數據的,根據上面的UML我們可以看到,CAttemperEngine關聯了一個CQueueService(或者直接點說是:持有了一個CQueueService對象,說組合也成)。這樣的話這個CAttemperEngine暴露出來的接口就只剩下IAttemperEnging了。
1
//啟動服務
2
virtual bool __cdecl StartService();
3
//停止服務
4
virtual bool __cdecl StopService();
5
//設置網絡
6
virtual bool __cdecl SetSocketEngine(IUnknownEx * pIUnknownEx);
7
//注冊鉤子
8
virtual bool __cdecl SetAttemperEngineSink(IUnknownEx * pIUnknownEx);
9
//獲取接口
10
virtual void * __cdecl GetQueueService(const IID & Guid, DWORD dwQueryVer);
//啟動服務2
virtual bool __cdecl StartService();3
//停止服務4
virtual bool __cdecl StopService();5
//設置網絡6
virtual bool __cdecl SetSocketEngine(IUnknownEx * pIUnknownEx);7
//注冊鉤子8
virtual bool __cdecl SetAttemperEngineSink(IUnknownEx * pIUnknownEx);9
//獲取接口10
virtual void * __cdecl GetQueueService(const IID & Guid, DWORD dwQueryVer);這個接口有兩處值得單獨討論的:SetSocketEngine、GetQueueService;
SetSocketEngine,后面的分析中還會出現,我覺得這里是一個設計上的失誤導致需要暴露socket引擎接口;
GetQueueService的設計思路可能是說,每個IAttemperEngine接口背后都有一個CQueueService,從以后的分析中可以看到,這個思路是理解整個kernel的關鍵。調度引擎應該是一個消息匯總(從個個引擎產生的消息)然后派發到IAttemperEngineSink。因為代碼中是沒有看到有關ITimerSink ISocketSink之類的東東的,,,
整個消息是個引擎產生,然后投遞到指定的CQueueService,然后匯總到這里被派發到IAttemperEngineSink出去的,,,
看看CAttemperEngine中處理數據的代碼:
1
//隊列接口
2
void __cdecl CAttemperEngine::OnQueueServiceSink(WORD wIdentifier, void * pBuffer, WORD wDataSize, DWORD dwInsertTime)
3
{
4
//內核事件
5
ASSERT(m_pIAttemperEngineSink!=NULL);
6
switch (wIdentifier)
7
{
8
case EVENT_TIMER: //定時器事件
9
{
10
//效驗參數
11
ASSERT(wDataSize==sizeof(NTY_TimerEvent));
12
if (wDataSize!=sizeof(NTY_TimerEvent)) return;
13
14
//處理消息
15
NTY_TimerEvent * pTimerEvent=(NTY_TimerEvent *)pBuffer;
16
m_pIAttemperEngineSink->OnEventTimer(pTimerEvent->wTimerID,pTimerEvent->wBindParam);
17
18
return;
19
}
20
case EVENT_DATABASE: //數據庫事件
21
{
22
//效驗參數
23
ASSERT(wDataSize>=sizeof(NTY_DataBaseEvent));
24
if (wDataSize<sizeof(NTY_DataBaseEvent)) return;
25
26
//處理消息
27
NTY_DataBaseEvent * pDataBaseEvent=(NTY_DataBaseEvent *)pBuffer;
28
m_pIAttemperEngineSink->OnEventDataBase(pDataBaseEvent+1,wDataSize-sizeof(NTY_DataBaseEvent),pDataBaseEvent);
29
30
return;
31
}
32
case EVENT_SOCKET_ACCEPT: //網絡應答事件
33
{
34
//效驗大小
35
ASSERT(wDataSize==sizeof(NTY_SocketAcceptEvent));
36
if (wDataSize!=sizeof(NTY_SocketAcceptEvent)) return;
37
38
//處理消息
39
NTY_SocketAcceptEvent * pSocketAcceptEvent=(NTY_SocketAcceptEvent *)pBuffer;
40
m_pIAttemperEngineSink->OnEventSocketAccept(pSocketAcceptEvent);
41
42
return;
43
}
44
case EVENT_SOCKET_READ: //網絡讀取事件
45
{
46
//效驗大小
47
NTY_SocketReadEvent * pSocketReadEvent=(NTY_SocketReadEvent *)pBuffer;
48
ASSERT(wDataSize>=sizeof(NTY_SocketReadEvent));
49
ASSERT(wDataSize==(sizeof(NTY_SocketReadEvent)+pSocketReadEvent->wDataSize));
50
if (wDataSize<sizeof(NTY_SocketReadEvent)) return;
51
if (wDataSize!=(sizeof(NTY_SocketReadEvent)+pSocketReadEvent->wDataSize)) return;
52
53
//處理消息
54
bool bSuccess=false;
55
try
56
{
57
bSuccess=m_pIAttemperEngineSink->OnEventSocketRead(pSocketReadEvent->Command,pSocketReadEvent+1,pSocketReadEvent->wDataSize,pSocketReadEvent);
58
}
59
catch (
) { }
60
if (bSuccess==false) m_pITCPSocketEngine->CloseSocket(pSocketReadEvent->wIndex,pSocketReadEvent->wRoundID);
61
62
return;
63
}
64
case EVENT_SOCKET_CLOSE: //網絡關閉事件
65
{
66
//效驗大小
67
ASSERT(wDataSize==sizeof(NTY_SocketCloseEvent));
68
if (wDataSize!=sizeof(NTY_SocketCloseEvent)) return;
69
70
//處理消息
71
NTY_SocketCloseEvent * pSocketCloseEvent=(NTY_SocketCloseEvent *)pBuffer;
72
m_pIAttemperEngineSink->OnEventSocketClose(pSocketCloseEvent);
73
74
return;
75
}
76
}
77
78
//其他事件
79
m_pIAttemperEngineSink->OnAttemperEvent(wIdentifier,pBuffer,wDataSize,dwInsertTime);
80
81
return;
82
}
//隊列接口2
void __cdecl CAttemperEngine::OnQueueServiceSink(WORD wIdentifier, void * pBuffer, WORD wDataSize, DWORD dwInsertTime)3
{4
//內核事件5
ASSERT(m_pIAttemperEngineSink!=NULL);6
switch (wIdentifier)7
{8
case EVENT_TIMER: //定時器事件9
{10
//效驗參數11
ASSERT(wDataSize==sizeof(NTY_TimerEvent));12
if (wDataSize!=sizeof(NTY_TimerEvent)) return;13

14
//處理消息15
NTY_TimerEvent * pTimerEvent=(NTY_TimerEvent *)pBuffer;16
m_pIAttemperEngineSink->OnEventTimer(pTimerEvent->wTimerID,pTimerEvent->wBindParam);17

18
return;19
}20
case EVENT_DATABASE: //數據庫事件21
{22
//效驗參數23
ASSERT(wDataSize>=sizeof(NTY_DataBaseEvent));24
if (wDataSize<sizeof(NTY_DataBaseEvent)) return;25

26
//處理消息27
NTY_DataBaseEvent * pDataBaseEvent=(NTY_DataBaseEvent *)pBuffer;28
m_pIAttemperEngineSink->OnEventDataBase(pDataBaseEvent+1,wDataSize-sizeof(NTY_DataBaseEvent),pDataBaseEvent);29

30
return;31
}32
case EVENT_SOCKET_ACCEPT: //網絡應答事件33
{34
//效驗大小35
ASSERT(wDataSize==sizeof(NTY_SocketAcceptEvent));36
if (wDataSize!=sizeof(NTY_SocketAcceptEvent)) return;37

38
//處理消息39
NTY_SocketAcceptEvent * pSocketAcceptEvent=(NTY_SocketAcceptEvent *)pBuffer;40
m_pIAttemperEngineSink->OnEventSocketAccept(pSocketAcceptEvent);41

42
return;43
}44
case EVENT_SOCKET_READ: //網絡讀取事件45
{46
//效驗大小47
NTY_SocketReadEvent * pSocketReadEvent=(NTY_SocketReadEvent *)pBuffer;48
ASSERT(wDataSize>=sizeof(NTY_SocketReadEvent));49
ASSERT(wDataSize==(sizeof(NTY_SocketReadEvent)+pSocketReadEvent->wDataSize));50
if (wDataSize<sizeof(NTY_SocketReadEvent)) return;51
if (wDataSize!=(sizeof(NTY_SocketReadEvent)+pSocketReadEvent->wDataSize)) return;52

53
//處理消息54
bool bSuccess=false;55
try 56
{ 57
bSuccess=m_pIAttemperEngineSink->OnEventSocketRead(pSocketReadEvent->Command,pSocketReadEvent+1,pSocketReadEvent->wDataSize,pSocketReadEvent);58
}59
catch (
) { }60
if (bSuccess==false) m_pITCPSocketEngine->CloseSocket(pSocketReadEvent->wIndex,pSocketReadEvent->wRoundID);61

62
return;63
}64
case EVENT_SOCKET_CLOSE: //網絡關閉事件65
{66
//效驗大小67
ASSERT(wDataSize==sizeof(NTY_SocketCloseEvent));68
if (wDataSize!=sizeof(NTY_SocketCloseEvent)) return;69

70
//處理消息71
NTY_SocketCloseEvent * pSocketCloseEvent=(NTY_SocketCloseEvent *)pBuffer;72
m_pIAttemperEngineSink->OnEventSocketClose(pSocketCloseEvent);73

74
return;75
}76
}77

78
//其他事件79
m_pIAttemperEngineSink->OnAttemperEvent(wIdentifier,pBuffer,wDataSize,dwInsertTime); 80

81
return;82
}這個函數中一個很重要的參數:wIdentifier;
可以來追溯一下他的源頭:
1
//數據消息
2
void CQueueService::OnQueueServiceThread(const tagDataHead & DataHead, void * pBuffer, WORD wDataSize)
3
{
4
ASSERT(m_pIQueueServiceSink!=NULL);
5
try
6
{
7
m_pIQueueServiceSink->OnQueueServiceSink(DataHead.wIdentifier,pBuffer,DataHead.wDataSize,DataHead.dwInsertTime);
8
}
9
catch (
) {}
10
return;
11
}
可以看見他是直接保存在最底層的那個DataStroage里邊的,這個在上一章分析中可以看到。(個引擎利用CQueueServiceEvent Post數據的時候就攜帶了類型信息)
//數據消息2
void CQueueService::OnQueueServiceThread(const tagDataHead & DataHead, void * pBuffer, WORD wDataSize)3
{4
ASSERT(m_pIQueueServiceSink!=NULL);5
try 6
{ 7
m_pIQueueServiceSink->OnQueueServiceSink(DataHead.wIdentifier,pBuffer,DataHead.wDataSize,DataHead.dwInsertTime); 8
}9
catch (
) {}10
return;11
}另外一個要注意的點是對socket事件的處理,我之前認為調度引擎組合了一個socket引擎是一個設計缺陷,應為這里的在處理socket read事件的時候如果異常了直接直接使用引擎來關閉socket而不是調用socket sink的指定接口。猜想也許是不希望客戶端直接處理socket句柄吧,,,
還是用一句話描述下調度引擎:
調度引擎的工作可以這樣描述消息匯總、派發。其他引擎通過CQueueServiceEvent將消息post到調度引擎上來(通過共享同一個CQueueService),然后由調度引擎集中派發出去,,,


