青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

C小加

厚德 博學 求真 至善 The bright moon and breeze
posts - 145, comments - 195, trackbacks - 0, articles - 0
  C++博客 :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

ACE前攝器Proactor最好的講解(轉載)

Posted on 2013-04-12 17:43 C小加 閱讀(6238) 評論(3)  編輯 收藏 引用 所屬分類: 網絡編程

轉自:http://blog.csdn.net/caisini_vc/article/details/4474910
把這兩天做Proactor的一些經驗和心得寫一下,可能會給一些人幫助。
    Proactor是異步模式的網絡處理器,ACE中叫做“前攝器”。
    先講幾個概念:
    前攝器(Proactor)-異步的事件多路分離器、處理器,是核心處理類。啟動后由3個線程組成(你不需要關心這三個線程,我只是讓你知道一下有這回事存在)。
    接受器(Acceptor)-用于服務端,監聽在一個端口上,接受用戶的請求。
    連接器(Connector)-用于客戶端,去連接遠程的監聽。當然,如果遠程是ACE寫的,就是Acceptor。
    異步模式-即非阻塞模式。網絡的傳輸速度一般來講為10Mbps、100Mbps、1000Mbps。拿千兆網來說,實際的傳輸速度為1000Mbps/8大概為128KB左右。我們的CPU一般為P4 3.0GHZ,如果是32位的處理器,一秒鐘大概可以處理6G的字節,那么,128KB的網絡速度是遠遠及不上處理器的速度的。網絡發送數據是一位一位發送出去的,如果CPU等在這里,發送完成函數才結束,那么,處理器浪費了大量時間在網絡傳輸上。
    操作系統提供了異步的模式來傳輸網絡數據,工作模式即:應用程序把要發送的數據交給操作系統,操作系統把數據放在系統緩沖區后就告訴應用程序OK了,我幫你發,應用程序該干嘛干嘛去。操作系統發送完成后,會給應用系統一個回執,告訴應用程序:剛才那個包發送完成了!
   舉個例子:你有幾封郵件和包裹要發,最有效率的辦法是什么?你把郵件和包裹及交給總臺,總臺MM說,好了,你幫你發,你忙去吧!然后你去工作了。過了一會,總臺MM打電話告訴你:“剛才我叫快遞公司的人來了,把你的包裹發出去了。郵局的人也來了,取走了郵件,放心好了”。同樣,如果你知道今天會有包裹來,比如你在淘寶上購物了,你能成天等在總臺?你應該告訴總臺MM:“今天可能有我的一個快遞,你幫我收一下,晚上請你肯德基!”。MM:“看在肯得基的面子上,幫你收了”。某個時間,MM打電話來了:“帥哥,你的包裹到了,我幫你簽收了,快來拿吧。”
   因為操作系統是很有效率的,所有,他在后臺收發是很快的。應用程序也很簡單。Proactor就是這種異步模式的。Proactor就是總臺MM;ACE_Service_Handle就是總臺代為收發郵件的公司流程。

我們看一個實例:


//***********************************************************
class TPTCPAsynchServerImpl : public ACE_Service_Handler
{
public:
 TPTCPAsynchServerImpl(
void);
 
~TPTCPAsynchServerImpl(void);
 
virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block); 
 
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
 
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
 
virtual void  handle_time_out (const ACE_Time_Value &tv, const void *act=0);
private:
 
int initiate_read_stream (const ACE_Asynch_Read_Stream::Result &result);
 ACE_Asynch_Read_Stream rs_;
 ACE_Asynch_Write_Stream ws_;
};


這個例子從ACE_Service_Handler繼承過來,ACE_Service_Handle主要就是定義了一些回調函數。
1、 virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block);
  當有客戶端連接上來,連接建立成功后Proactor會調用這個方法。

2、 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
當用戶要讀的數據讀好了后,調用這個方法

3、virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
當用戶要寫的數據在網卡上發送成功后,Proactor會回調這個方法

4、 virtual void  handle_time_out (const ACE_Time_Value &tv, const void *act=0);
當用戶設定的時鐘到期了,這個方法會被調用。

這跟和總臺MM的聯絡方法是不是一樣的?

對還缺點東西,缺少怎么向總臺MM交待任務的方法。下面看看:

首先,創建一個監聽器。

 ACE_Asynch_Acceptor<TPTCPAsynchServerImpl> acceptor_;
看到沒,就是我們剛才寫的類,因為他繼承了回調接口,并實現了自已的代碼,模板中ACE_Asynch_Acceptor會在合適的時候回調這些方法。

//創建一個地址對象
 ACE_INET_Addr addr(port, ip);
acceptor_.open (addr, 8 * 1024, 1);
    Open后,就開始監聽了。其它的,向Proactor注冊一些事件的事模板類中都替你做了,你不需要做很多事。
    那么,已經開始監聽了,我的程序從哪里開始呢?對于一個服務程序來講,程序是被用戶的連接驅動的,一個用戶程序想和通訊,必須先創建連接,就是Socket中的connect操作。這個操作Proactor會替我們做一些工作,當連接創建完成后,上面講的Open方法會被調用,我們看看Open方法中都有些什么代碼:


void TPTCPAsynchServerImpl::open (ACE_HANDLE handle, ACE_Message_Block &message_block)
{
 ACE_DEBUG ((LM_DEBUG, 
"%N:%l:TPTCPAsynchServerImpl::open() "));
 
//構造讀流
 if (rs_.open (*this, handle) == -1)
 {
  ACE_ERROR ((LM_ERROR, 
"%N:%l: ""TPTCPAsynchServerImpl::open() Error"));
  
return;
    }
 
//構造寫流
 if (ws_.open(*this, handle) == -1)
 {
  ACE_ERROR ((LM_ERROR, 
"%N:%l: ""TPTCPAsynchServerImpl::open() Error"));
  
return;
    }
 
//獲取客戶端連接地址和端口
 ACE_INET_Addr addr; 
 ACE_SOCK_SEQPACK_Association ass
=ACE_SOCK_SEQPACK_Association(handle); 
 size_t addr_size
=1
 ass.get_local_addrs(
&addr,addr_size);
 
this->server_->onClientConnect((int)handle, addr.get_ip_address(), addr.get_port_number());

 

 
//如果客戶連接時同時提交了數據,需要偽造一個結果,然后呼叫讀事件
 if (message_block.length () != 0)
 {
 
// ACE_DEBUG((LM_DEBUG, "message_block.length() != 0 "));
  
// 復制消息塊
  ACE_Message_Block &duplicate =  *message_block.duplicate ();
  
// 偽造讀結果,以便進行讀完成回調
  ACE_Asynch_Read_Stream_Result_Impl *fake_result =
        ACE_Proactor::instance ()
->create_asynch_read_stream_result (this->proxy (),
                                                                     
this->handle_,
                                                                     duplicate,
                                                                     
1024,
                                                                     
0,
                                                                     ACE_INVALID_HANDLE,
                                                                     
0,
                                                                     
0);
  size_t bytes_transferred 
= message_block.length ();
  
// Accept事件處理完成,wr_ptr指針會被向前移動,將其移動到開始位置
  duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred);
  
// 這個方法將調用回調函數
  fake_result->complete (message_block.length (), 10);
  
// 銷毀偽造的讀結果
  delete fake_result;
 }

 
// 否則,通知底層,準備讀取用戶數據
 
//創建一個消息塊。這個消息塊將用于從套接字中異步讀 
 ACE_Message_Block *mb = 0;
  ACE_NEW (mb, ACE_Message_Block (_bufSize));
 
if (rs_.read (*mb, mb->size () - 1== -1)
 {
  delete mb;
  ACE_ERROR ((LM_ERROR, 
"%N:%l:open init read failed!"));
  
return;
 }
}

 

我們看到,首先創建了兩個流,就是前面類定義中定義的一個異步寫流,一個異步讀流。以后對網絡的讀和寫就通過這兩個流進行。我還給出了一段讀客戶端地址和端口的代碼。然后是讀取客戶Connect可能附帶的數據,那段代碼不用看懂,以后使用照抄就行。然后就是

 


if (rs_.read (*mb, mb->size () - 1) == -1)
 {
  delete mb;
  ACE_ERROR ((LM_ERROR, "%N:%l:open init read failed!"));
  return;
 }

這段代碼使用讀流讀一段數據。這段代碼就是向總臺MM交待:我要收包裹,收好了叫我!
也就是說,這段代碼99%的可能是讀不出數據的,只是向Proactor注冊讀的事件,具體的等待、讀取操作由Proactor讀,讀到了,就回調Handle_Read_Stream方法。ACE_Message_Block是消息塊,數據就是存放在消息塊中的。
下面看看Handle_Read_Stream方法的代碼:


void TPTCPAsynchServerImpl::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
 result.message_block ().rd_ptr ()[result.bytes_transferred ()] 
= '/0';
  ACE_DEBUG ((LM_DEBUG, 
"********************/n"));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d/n""bytes_to_read", result.bytes_to_read ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d/n""handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d/n""bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d/n""act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d/n""success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d/n""completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d/n""error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, 
"********************/n"));
 result.message_block().release();
 
if (this->initiate_read_stream (result) == -1)
 {
  ACE_ERROR((LM_ERROR, 
"%N:%l:read stream failed!connection closed, remove it:%d/n", result.handle()));
  closeConnection(result.handle());
 } 
}
 

這個函數被調用,就表明有數據已經讀好了,包裹已經在總臺了。Proactor比總臺MM還好,給你送上門了,數據就在Result里,上面演示了Result中的數據。然后把消息塊釋放了,然后調用initiate_read_stream繼續監聽網絡上可能到來的數據。看看initiate_read_stream好了:


int TPTCPAsynchServerImpl::initiate_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
 ACE_DEBUG((LM_TRACE, 
"%N:%l:TPTCPAsynchServerImpl::initiate_read_stream() "));
 
//創建一個消息塊。這個消息塊將用于從套接字中異步讀 
 ACE_Message_Block *mb = new ACE_Message_Block(_bufSize);
 
if (mb == NULL)
 {
  ACE_DEBUG((LM_ERROR, 
"%N:%l:can't allock ACE_Message_Block.  ")); 
  
return -1;
 }
 
 
if (rs_.read (*mb, mb->size () - 1== -1)
 {
  delete mb;
  ACE_ERROR_RETURN ((LM_ERROR, 
"%N:%l:rs->read() failed, clientID=%d ", result.handle()),  -1);
 }
 
return 0;
}

 

代碼很簡單,就是創建一個新的消息塊,然后使用讀流注冊一個讀消息就可以了。

到此為止,Proactor的讀流程很清楚了吧?

下面再說一個寫流程。

寫流程其實更簡單,在任意想向客戶端寫數據的地方,調用相應代碼就行了,比如,我們提供了SendData方法來發送數據,在任意想發送數據的地方調用SendData就行了,SendData的代碼如下:

int TPTCPAsynchServerImpl::sendData(int clientID, const char *data, int dataLen, unsigned int &id)
{
 ACE_DEBUG((LM_DEBUG, 
"TPTCPAsynchServerImpl::sendData(void) "));
 ACE_Message_Block 
*mb; 
 ACE_NEW_RETURN(mb, ACE_Message_Block(dataLen 
+ 1), -1);
 mb
->wr_ptr((char*)data);                  
 ACE_OS::memcpy(mb
->base(),(char*)data, dataLen);
 id 
= GlobleSingleton::instance()->getIndex();
 mb
->msg_type((int)id);
 
//向操作系統發送數據
 if (connection->ws->write (*mb , dataLen ) == -1)
 {
  ACE_ERROR_RETURN((LM_ERROR, 
"%N:%l:sendData failed! clientID=%d ", clientID),-1);
 }
 
return 0;
}

 

簡單說,就是創建了一個消息塊,把用戶數據拷貝進來,然后調用寫流WS向Proactor發送一個Write事件就可以了,發送成功后,Handle_write_handle會被調用,看一下:


void
TPTCPAsynchServerImpl::handle_write_stream (
const ACE_Asynch_Write_Stream::Result &result)
{
  ACE_DEBUG ((LM_DEBUG,
              
"handle_write_stream called "));
  
// Reset pointers.
  result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
  ACE_DEBUG ((LM_DEBUG, 
"******************** "));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d ""bytes_to_write", result.bytes_to_write ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d ""handle", result.handle ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d ""bytes_transfered", result.bytes_transferred ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d ""act", (u_long) result.act ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d ""success", result.success ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d ""completion_key", (u_long) result.completion_key ()));
  ACE_DEBUG ((LM_DEBUG, 
"%s = %d ""error", result.error ()));
  ACE_DEBUG ((LM_DEBUG, 
"******************** "));
#if 0
  ACE_DEBUG ((LM_DEBUG, 
"%s = %s ""message_block", result.message_block ().rd_ptr ()));
#endif
  
// Release message block.
  result.message_block ().release ();
}

 

代碼中使用了result中發數據,然后把消息塊釋放了,就這么簡單。


////////////////////////////////////////////////////////////////////////////////////////////////////

這是簡單的proactor用法,當然,復雜也基本就這樣用。所謂不基本的不是Proactor的內容,而是服務器編程本身的麻煩。比如說,多個連接的管理、重發機制、發送隊列等等,這都不是ACE的內容。這些要大家自己思考了,并添加。

在這里,我要說幾個重要的問題:連接的管理。Acceptor是一個類,但是在每一個連接,Proactor都用了某種辦法創建了一個實例,所以,連接管理的群集類一定不能在Acceptor類中,不然得到的結果就是始終只有一條記錄。因為每個Acceptor都有一個實例,實例對應一個連接,群集類也就每個實例一個了。要采取的方法是一個全局的容器對象就可以了。比如我這個類:


typedef ACE_Map_Manager 
<ACE_HANDLE, ConnectionBean *, ACE_Null_Mutex> ConnectionMap;
typedef ACE_Map_Iterator
<ACE_HANDLE, ConnectionBean *, ACE_Null_Mutex> ConnectionIterator;
typedef ACE_Map_Entry   
<ACE_HANDLE, ConnectionBean *> ConnectionEntry;
class Globle
{
public:
 Globle(
void);
 
~Globle(void);
 ITPServer
* server_;
 ConnectionMap _connections;
 unsigned getIndex(
void); 
 
long getTimerId(void);
private:
 unsigned 
int index_;
 
long timerId_;
};
typedef ACE_Singleton
<Globle, ACE_Null_Mutex> GlobleSingleton;

 

我使用ACE的Singleton模板創建這個類,每一個Acceptor要使用ConnectionMap,都使用這里的_connections,方法如下 :
  GlobleSingleton::instance()->connection.bind()......

這個問題可是我花費了2天時間找出來的,諸位同仁不可不戒啊,給點掌聲:)

Feedback

# re: ACE前攝器Proactor最好的講解(轉載)  回復  更多評論   

2013-04-12 19:59 by coolypf
128KB, 太離譜了

# re: ACE前攝器Proactor最好的講解(轉載)  回復  更多評論   

2013-04-13 11:08 by reborntercel
實際的傳輸速度為1000Mbps/8大概為128KB左右,樓豬是不是搞錯了啊??????????????????????????

# re: ACE前攝器Proactor最好的講解(轉載)  回復  更多評論   

2014-12-30 15:31 by www
人家說的對,是你們沒弄懂。1024/8=128KB
青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            午夜亚洲影视| 亚洲精品美女91| 久久aⅴ乱码一区二区三区| 亚洲人成在线观看一区二区| 欧美激情一区二区三区| 欧美成人网在线| 亚洲视频中文字幕| 午夜日韩在线| 亚洲黄色成人网| 亚洲每日更新| 国产一区二区三区四区在线观看 | 欧美日韩精品欧美日韩精品一 | 国产精品外国| 久久只精品国产| 欧美女同视频| 久久精品国产清高在天天线| 毛片基地黄久久久久久天堂 | 国产一区二区三区四区老人| 欧美福利网址| 国产精品日韩在线| 欧美国产极速在线| 国产精品h在线观看| 久久久久久亚洲精品中文字幕| 另类综合日韩欧美亚洲| 亚洲影视在线| 蜜臀av性久久久久蜜臀aⅴ四虎| 一区二区三区高清在线| 久久gogo国模啪啪人体图| 一本色道久久88精品综合| 欧美一区二区观看视频| 夜夜嗨av一区二区三区免费区| 亚洲欧美日韩一区二区| 日韩亚洲视频| 久久久亚洲综合| 欧美一区二区私人影院日本 | 久久www成人_看片免费不卡| 亚洲精选视频在线| 久久精品视频在线看| 亚洲欧美国产日韩天堂区| 裸体一区二区| 久久久99久久精品女同性| 欧美日韩中文另类| 欧美国产一区视频在线观看| 国产日韩精品久久久| 亚洲另类自拍| 日韩一区二区久久| 久久综合色8888| 久久久久久成人| 国产精品白丝av嫩草影院| 亚洲国产欧美在线| 亚洲黑丝在线| 久久亚洲不卡| 蜜臀久久久99精品久久久久久| 国产精品午夜在线| 9l国产精品久久久久麻豆| 亚洲九九精品| 欧美电影打屁股sp| 亚洲黄色成人| 亚洲免费激情| 欧美精品国产一区| 亚洲精品视频一区| 99精品视频免费在线观看| 欧美波霸影院| 欧美国产日韩一二三区| 欧美大片在线影院| 一本一本久久a久久精品牛牛影视| 久久久久久九九九九| 看片网站欧美日韩| 在线观看中文字幕亚洲| 久久精品一区二区国产| 老司机成人网| 亚洲狠狠丁香婷婷综合久久久| 欧美成人精品在线观看| 亚洲国产综合91精品麻豆| 亚洲伦伦在线| 国产精品jvid在线观看蜜臀 | 亚洲高清视频在线观看| 亚洲国产欧美一区二区三区久久 | 亚洲无人区一区| 久久精品国产精品亚洲| 狠狠色狠狠色综合| 欧美成人午夜激情在线| 日韩网站在线观看| 欧美一级视频一区二区| 国产一区二区三区免费观看| 久久久7777| 亚洲日韩成人| 欧美亚洲尤物久久| 在线成人av| 欧美日韩中文精品| 久久久久在线观看| 亚洲精品视频中文字幕| 欧美一区二区精品| 亚洲国产精品一区二区久 | 久久久久国产精品一区二区| 欧美大片va欧美在线播放| 在线亚洲免费| 狠狠爱综合网| 欧美精品久久久久久久久老牛影院| 亚洲视频一二| 可以免费看不卡的av网站| 9色porny自拍视频一区二区| 国产伦一区二区三区色一情| 开心色5月久久精品| 在线综合+亚洲+欧美中文字幕| 久久天天狠狠| 亚洲一线二线三线久久久| 伊人婷婷久久| 国产精品手机视频| 欧美粗暴jizz性欧美20| 欧美一区激情| 夜夜爽www精品| 亚洲第一黄色网| 欧美一区二区三区四区在线观看地址 | 久久久av水蜜桃| 亚洲精品国久久99热| 久久视频一区二区| 亚洲在线观看视频网站| 亚洲成色精品| 国产一区91| 国产乱肥老妇国产一区二| 蜜臀久久99精品久久久久久9| 亚洲欧洲日韩在线| 久久综合影音| 久久久www成人免费无遮挡大片| 亚洲愉拍自拍另类高清精品| 亚洲国产另类久久久精品极度| 国产欧美日韩视频一区二区三区| 欧美精品日韩一本| 免费的成人av| 亚洲激情偷拍| 中文精品在线| 91久久精品国产91性色tv| 国产日韩综合| 国产老肥熟一区二区三区| 国产精品99一区二区| 欧美视频中文一区二区三区在线观看| 美脚丝袜一区二区三区在线观看| 午夜在线视频观看日韩17c| 亚洲男人第一av网站| 亚洲尤物视频网| 亚洲一区欧美二区| 亚洲免费网址| 小黄鸭视频精品导航| 亚洲一区二区三区四区五区午夜| 中文在线一区| 亚洲综合日韩在线| 亚洲综合国产激情另类一区| 亚洲网站在线观看| 亚洲欧美国产高清| 久久av红桃一区二区小说| 久久不见久久见免费视频1| 欧美一区在线看| 久久天堂精品| 欧美黄污视频| 国产精品久久久久久久久久ktv | 欧美日韩亚洲一区二区| 欧美视频在线一区二区三区| 国产精品久久久久久久久久免费看| 国产精品观看| 狠狠色狠狠色综合日日tαg | 亚洲电影av在线| 亚洲精品中文字幕女同| 亚洲线精品一区二区三区八戒| 亚洲欧美另类在线观看| 久久国产黑丝| 亚洲国产91精品在线观看| 亚洲美洲欧洲综合国产一区| 亚洲午夜精品国产| 久久精品亚洲精品| 欧美黄色aa电影| 国产精品一区二区三区四区 | 国产美女精品视频免费观看| 国产一区二区三区电影在线观看| 亚洲国产精品免费| 亚洲一区二区三区久久 | 中日韩美女免费视频网站在线观看| 亚洲一二区在线| 久久嫩草精品久久久精品一| 亚洲国产视频a| 香蕉久久精品日日躁夜夜躁| 欧美成人免费全部| 国产亚洲精品aa午夜观看| 亚洲电影免费观看高清完整版在线观看 | 亚洲激情精品| 亚洲综合色自拍一区| 麻豆成人小视频| 国产精品女人久久久久久| 亚洲国产一区二区a毛片| 欧美在线观看一区| 亚洲国产高清一区| 欧美在线一二三四区| 欧美日韩一区二区三区免费| 激情一区二区三区| 亚洲欧美国产三级| 午夜精品一区二区三区四区| 亚洲一区二区三区国产| 久久久综合网| 这里只有精品视频| 欧美激情一区二区三区四区|