概述
epoll是linux提供一種多路復(fù)用的技術(shù),類似各個平臺都支持的select,只是epoll在內(nèi)核的實現(xiàn)做了更多地優(yōu)化,可以支持比select更多的文件描述符,當然也支持 socket這種網(wǎng)絡(luò)的文件描述符。linux上的大并發(fā)的接入服務(wù)器,目前的實現(xiàn)方式肯定都通過epoll實現(xiàn)。
epoll和線程
有很多開發(fā)人員用epoll的時候,會開多個線程來進行數(shù)據(jù)通信,比如一個線程專門accept(我個人早些年在FreeBSD用kqueue的時候,由于對內(nèi)部機制沒有基本了解也這樣搞),一個線程收發(fā),或者接收和發(fā)送都用各自獨立的線程。
通常情況下,accept獨立線程是沒必要的,accept對于內(nèi)核而言,就應(yīng)用程序從內(nèi)核的未完成的SYN隊列讀取一點數(shù)據(jù)而已。具體參見 accept部分:
收發(fā)獨立成兩個線程也沒有必要,因為大部分的應(yīng)用服務(wù)器,通常情況下,啟動一個線程收發(fā)數(shù)據(jù),最大數(shù)據(jù)的收發(fā)量瓶頸在于網(wǎng)卡,而不是CPU;像網(wǎng)游接入服務(wù)器配置一個KM的網(wǎng)卡,很少有游戲會申請1G的帶寬,那一臺機器得有多少數(shù)據(jù)輸入和輸出。所以我們通信線程為epoll服務(wù)器就夠了。
epoll的基本原理
為了讓某些朋友能讀得更連慣,我還是說一下epoll基本原理。
epoll外部表現(xiàn)和select是一樣的。他提供READ, WRITE和ERROR等事件。
大致流程像下面這樣:
1. 應(yīng)用注冊感興趣的事件到內(nèi)核;
2. 內(nèi)核在某種條件下,將事件通知應(yīng)用程序;
3. 應(yīng)用程序收到事件后,根據(jù)事件類型做對應(yīng)的邏輯處理。
原理部分我再說一下,不容易理解的地方,包括水平觸發(fā)和邊緣觸發(fā),WRITE事件的事件利用(這個可以結(jié)合參考文獻1的kqueue的WRITE事件,原理一致的)和修改事件的細節(jié)。
水平觸發(fā)
READ事件,socket recv buff有數(shù)據(jù),將一直向應(yīng)用程序通知,直到buff為空。
WRITE事件,socket send buff從滿的狀態(tài)到可發(fā)送數(shù)據(jù),將一直通知應(yīng)用程序,直到buff滿。
邊緣觸發(fā)
READ事件,socket recv buff有數(shù)據(jù)了,只通知應(yīng)用一次,不管應(yīng)用程序有沒有調(diào)用read api,接下來都不通知了。
WRITE事件,socket send buff從滿的狀態(tài)到可以發(fā)送數(shù)據(jù),只通知一次。
上面這個解釋不知道大家能否理解,也只能這樣說了。有疑問的做一下試驗。另外,這些細節(jié)的東西,前幾年固定下來后,這幾年做的項目,是直接用的,也就很少在涉及細節(jié),是憑理解和記憶寫下的文字,萬一有錯請指正^-^。
WRITE事件的利用
這個還一下不好描述。大概描述一下,詳細看參考文獻1。大致這樣:
1. 邏輯層寫數(shù)據(jù)到應(yīng)用層的發(fā)送buff,向epoll注冊一下WRITE事件;
2. 這時epoll會通知應(yīng)用程序一個WRITE事件;
3. 在WRITE事件響應(yīng)函數(shù)里,從應(yīng)用層的發(fā)送buff讀數(shù)據(jù),然后用socket send api發(fā)送。
因為我在很多實際項目中,看到大家沒有利用epoll的WRITE的事件來發(fā)數(shù)據(jù),特意地說一下。大部分的項目,是直接輪詢應(yīng)用程序的發(fā)送隊列,我早期項目也是這么干的。
epoll的修改事件
對于這個我的映像比較深刻。epoll的修改事件比較坑爹,不能單獨修改某個事件!怎么說呢?比如epoll里已經(jīng)注冊了READ&WRITE事件,你如果想單單重注冊一下WRITE事件而且READ事件不變,epoll的epoll_ctl API是做不到的,你必須同時注冊READ&WRITE,這個在下面的代碼中可以看到。FreeBSD的kqueue在這一點完全滿足我們程序員的要求。
抽象epoll API
我把herm socket epoll封裝部分貼出來,讓朋友們參考一下epoll的用法。大部分錯誤拋異常代碼被我去掉了。
- class Multiplexor
- {
- public:
- Multiplexor(int size, int timeout = -1, bool lt = true);
- ~Multiplexor();
- void Run();
- void Register(ISockHandler* eh, MultiplexorMask mask);
- void Remove(ISockHandler* eh);
- void EnableMask(ISockHandler* eh, MultiplexorMask mask);
- void DisableMask(ISockHandler* eh, MultiplexorMask mask);
- private:
- inline bool OperateHandler(int op, ISockHandler* eh, MultiplexorMask mask)
- {
- struct epoll_event evt;
- evt.data.ptr = eh;
- evt.events = mask;
- return epoll_ctl(m_epfd, op, eh->GetHandle(), &evt) != -1;
- }
- private:
- int m_epfd;
- struct epoll_event* m_evts;
- int m_size;
- int m_timeout;
- __uint32_t m_otherMasks;
- };
class Multiplexor
{
public:
Multiplexor(int size, int timeout = -1, bool lt = true);
~Multiplexor();
void Run();
void Register(ISockHandler* eh, MultiplexorMask mask);
void Remove(ISockHandler* eh);
void EnableMask(ISockHandler* eh, MultiplexorMask mask);
void DisableMask(ISockHandler* eh, MultiplexorMask mask);
private:
inline bool OperateHandler(int op, ISockHandler* eh, MultiplexorMask mask)
{
struct epoll_event evt;
evt.data.ptr = eh;
evt.events = mask;
return epoll_ctl(m_epfd, op, eh->GetHandle(), &evt) != -1;
}
private:
int m_epfd;
struct epoll_event* m_evts;
int m_size;
int m_timeout;
__uint32_t m_otherMasks;
};
- Multiplexor::Multiplexor(int size, int timeout, bool lt)
- {
- m_epfd = epoll_create(size);
- if (m_epfd == -1)
- throw HERM_SOCKET_EXCEPTION(ST_OTHER);
- m_size = size;
- m_evts = new struct epoll_event[size];
- m_timeout = timeout;
-
- m_otherMasks = EPOLLERR | EPOLLHUP;
- if (!lt)
- m_otherMasks |= EPOLLET;
- }
- Multiplexor::~Multiplexor()
- {
- close(m_epfd);
- delete[] m_evts;
- }
- void Multiplexor::Run()
- {
- int fds = epoll_wait(m_epfd, m_evts, m_size, m_timeout);
- if (fds == -1)
- {
- if (errno == EINTR)
- return;
- }
- for (int i = 0; i < fds; ++i)
- {
- __uint32_t evts = m_evts[i].events;
- ISockHandler* eh = reinterpret_cast<ISockHandler*>(m_evts[i].data.ptr);
- int stateType = ST_SUCCESS;
- if (evts & EPOLLIN)
- stateType = eh->OnReceive();
- if (evts & EPOLLOUT)
- stateType = eh->OnSend();
- if (evts & EPOLLERR || evts & EPOLLHUP)
- stateType = ST_EXCEPT_FAILED;
- if (stateType != ST_SUCCESS)
- eh->OnError(stateType, errno);
- }
- }
- void Multiplexor::Register(ISockHandler* eh, MultiplexorMask mask)
- {
- MultiplexorMask masks = mask | m_otherMasks;
- OperateHandler(EPOLL_CTL_ADD, eh, masks);
- }
- void Multiplexor::Remove(ISockHandler* eh)
- {
-
- OperateHandler(EPOLL_CTL_DEL, eh, ALL_EVENTS_MASK);
- }
- void Multiplexor::EnableMask(ISockHandler* eh, MultiplexorMask mask)
- {
- MultiplexorMask masks = mask | Herm::READ_MASK | Herm::WRITE_MASK;
- OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks);
- }
- void Multiplexor::DisableMask(ISockHandler* eh, MultiplexorMask mask)
- {
- MultiplexorMask masks = (Herm::READ_MASK | Herm::WRITE_MASK) & (~mask);
- if (!OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks))
- throw HERM_SOCKET_EXCEPTION(ST_OTHER);
- }
Multiplexor::Multiplexor(int size, int timeout, bool lt)
{
m_epfd = epoll_create(size);
if (m_epfd == -1)
throw HERM_SOCKET_EXCEPTION(ST_OTHER);
m_size = size;
m_evts = new struct epoll_event[size];
m_timeout = timeout;
// sys/epoll.h is no EPOLLRDHUP(0X2000), don't add EPOLLRDHUP
m_otherMasks = EPOLLERR | EPOLLHUP;
if (!lt)
m_otherMasks |= EPOLLET;
}
Multiplexor::~Multiplexor()
{
close(m_epfd);
delete[] m_evts;
}
void Multiplexor::Run()
{
int fds = epoll_wait(m_epfd, m_evts, m_size, m_timeout);
if (fds == -1)
{
if (errno == EINTR)
return;
}
for (int i = 0; i < fds; ++i)
{
__uint32_t evts = m_evts[i].events;
ISockHandler* eh = reinterpret_cast<ISockHandler*>(m_evts[i].data.ptr);
int stateType = ST_SUCCESS;
if (evts & EPOLLIN)
stateType = eh->OnReceive();
if (evts & EPOLLOUT)
stateType = eh->OnSend();
if (evts & EPOLLERR || evts & EPOLLHUP)
stateType = ST_EXCEPT_FAILED;
if (stateType != ST_SUCCESS)
eh->OnError(stateType, errno);
}
}
void Multiplexor::Register(ISockHandler* eh, MultiplexorMask mask)
{
MultiplexorMask masks = mask | m_otherMasks;
OperateHandler(EPOLL_CTL_ADD, eh, masks);
}
void Multiplexor::Remove(ISockHandler* eh)
{
// Delete fd from epoll, don't need masks
OperateHandler(EPOLL_CTL_DEL, eh, ALL_EVENTS_MASK);
}
void Multiplexor::EnableMask(ISockHandler* eh, MultiplexorMask mask)
{
MultiplexorMask masks = mask | Herm::READ_MASK | Herm::WRITE_MASK;
OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks);
}
void Multiplexor::DisableMask(ISockHandler* eh, MultiplexorMask mask)
{
MultiplexorMask masks = (Herm::READ_MASK | Herm::WRITE_MASK) & (~mask);
if (!OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks))
throw HERM_SOCKET_EXCEPTION(ST_OTHER);
}
上面類就用到epoll_create(), epoll_ctl()和epoll_wait(),以及幾種事件。epoll用起來比select清爽一些。
大致用法類似下面這樣:
先定義一個Handler
- class StreamHandler : public Herm::ISockHandler
- {
- public:
- virtual Herm::Handle GetHandle() const;
- virtual int OnReceive(int);
- virtual int OnSend(int);
- };
class StreamHandler : public Herm::ISockHandler
{
public:
virtual Herm::Handle GetHandle() const;
virtual int OnReceive(int);
virtual int OnSend(int);
};
在OnReceive()處理收到數(shù)據(jù)的動作,在OnSend()。。。。
在通信線程中,大概類似這樣的代碼,實際看情況。
- Multiplexor multiplexor;
- StreamHandler sh;
- multiplexor.Register(&sh, READ_EVT);
- multiplexor.Run(...);
Multiplexor multiplexor;
StreamHandler sh;
multiplexor.Register(&sh, READ_EVT);
multiplexor.Run(...);
參考文獻
1.
FreeBSD上kqueue和epoll是類似的,有興趣的朋友請參考。
http://blog.csdn.net/herm_lib/article/details/6047038
2.
這里涉及到epoll的通信層如何和邏輯數(shù)據(jù)交互的問題
http://blog.csdn.net/herm_lib/article/details/5980657