概述
epoll是linux提供一種多路復用的技術,類似各個平臺都支持的select,只是epoll在內核的實現做了更多地優化,可以支持比select更多的文件描述符,當然也支持 socket這種網絡的文件描述符。linux上的大并發的接入服務器,目前的實現方式肯定都通過epoll實現。
epoll和線程
有很多開發人員用epoll的時候,會開多個線程來進行數據通信,比如一個線程專門accept(我個人早些年在FreeBSD用kqueue的時候,由于對內部機制沒有基本了解也這樣搞),一個線程收發,或者接收和發送都用各自獨立的線程。
通常情況下,accept獨立線程是沒必要的,accept對于內核而言,就應用程序從內核的未完成的SYN隊列讀取一點數據而已。具體參見 accept部分:
收發獨立成兩個線程也沒有必要,因為大部分的應用服務器,通常情況下,啟動一個線程收發數據,最大數據的收發量瓶頸在于網卡,而不是CPU;像網游接入服務器配置一個KM的網卡,很少有游戲會申請1G的帶寬,那一臺機器得有多少數據輸入和輸出。所以我們通信線程為epoll服務器就夠了。
epoll的基本原理
為了讓某些朋友能讀得更連慣,我還是說一下epoll基本原理。
epoll外部表現和select是一樣的。他提供READ, WRITE和ERROR等事件。
大致流程像下面這樣:
1. 應用注冊感興趣的事件到內核;
2. 內核在某種條件下,將事件通知應用程序;
3. 應用程序收到事件后,根據事件類型做對應的邏輯處理。
原理部分我再說一下,不容易理解的地方,包括水平觸發和邊緣觸發,WRITE事件的事件利用(這個可以結合參考文獻1的kqueue的WRITE事件,原理一致的)和修改事件的細節。
水平觸發
READ事件,socket recv buff有數據,將一直向應用程序通知,直到buff為空。
WRITE事件,socket send buff從滿的狀態到可發送數據,將一直通知應用程序,直到buff滿。
邊緣觸發
READ事件,socket recv buff有數據了,只通知應用一次,不管應用程序有沒有調用read api,接下來都不通知了。
WRITE事件,socket send buff從滿的狀態到可以發送數據,只通知一次。
上面這個解釋不知道大家能否理解,也只能這樣說了。有疑問的做一下試驗。另外,這些細節的東西,前幾年固定下來后,這幾年做的項目,是直接用的,也就很少在涉及細節,是憑理解和記憶寫下的文字,萬一有錯請指正^-^。
WRITE事件的利用
這個還一下不好描述。大概描述一下,詳細看參考文獻1。大致這樣:
1. 邏輯層寫數據到應用層的發送buff,向epoll注冊一下WRITE事件;
2. 這時epoll會通知應用程序一個WRITE事件;
3. 在WRITE事件響應函數里,從應用層的發送buff讀數據,然后用socket send api發送。
因為我在很多實際項目中,看到大家沒有利用epoll的WRITE的事件來發數據,特意地說一下。大部分的項目,是直接輪詢應用程序的發送隊列,我早期項目也是這么干的。
epoll的修改事件
對于這個我的映像比較深刻。epoll的修改事件比較坑爹,不能單獨修改某個事件!怎么說呢?比如epoll里已經注冊了READ&WRITE事件,你如果想單單重注冊一下WRITE事件而且READ事件不變,epoll的epoll_ctl API是做不到的,你必須同時注冊READ&WRITE,這個在下面的代碼中可以看到。FreeBSD的kqueue在這一點完全滿足我們程序員的要求。
抽象epoll API
我把herm socket epoll封裝部分貼出來,讓朋友們參考一下epoll的用法。大部分錯誤拋異常代碼被我去掉了。
- class Multiplexor
- {
- public:
- Multiplexor(int size, int timeout = -1, bool lt = true);
- ~Multiplexor();
- void Run();
- void Register(ISockHandler* eh, MultiplexorMask mask);
- void Remove(ISockHandler* eh);
- void EnableMask(ISockHandler* eh, MultiplexorMask mask);
- void DisableMask(ISockHandler* eh, MultiplexorMask mask);
- private:
- inline bool OperateHandler(int op, ISockHandler* eh, MultiplexorMask mask)
- {
- struct epoll_event evt;
- evt.data.ptr = eh;
- evt.events = mask;
- return epoll_ctl(m_epfd, op, eh->GetHandle(), &evt) != -1;
- }
- private:
- int m_epfd;
- struct epoll_event* m_evts;
- int m_size;
- int m_timeout;
- __uint32_t m_otherMasks;
- };
class Multiplexor
{
public:
Multiplexor(int size, int timeout = -1, bool lt = true);
~Multiplexor();
void Run();
void Register(ISockHandler* eh, MultiplexorMask mask);
void Remove(ISockHandler* eh);
void EnableMask(ISockHandler* eh, MultiplexorMask mask);
void DisableMask(ISockHandler* eh, MultiplexorMask mask);
private:
inline bool OperateHandler(int op, ISockHandler* eh, MultiplexorMask mask)
{
struct epoll_event evt;
evt.data.ptr = eh;
evt.events = mask;
return epoll_ctl(m_epfd, op, eh->GetHandle(), &evt) != -1;
}
private:
int m_epfd;
struct epoll_event* m_evts;
int m_size;
int m_timeout;
__uint32_t m_otherMasks;
};
- Multiplexor::Multiplexor(int size, int timeout, bool lt)
- {
- m_epfd = epoll_create(size);
- if (m_epfd == -1)
- throw HERM_SOCKET_EXCEPTION(ST_OTHER);
- m_size = size;
- m_evts = new struct epoll_event[size];
- m_timeout = timeout;
-
- m_otherMasks = EPOLLERR | EPOLLHUP;
- if (!lt)
- m_otherMasks |= EPOLLET;
- }
- Multiplexor::~Multiplexor()
- {
- close(m_epfd);
- delete[] m_evts;
- }
- void Multiplexor::Run()
- {
- int fds = epoll_wait(m_epfd, m_evts, m_size, m_timeout);
- if (fds == -1)
- {
- if (errno == EINTR)
- return;
- }
- for (int i = 0; i < fds; ++i)
- {
- __uint32_t evts = m_evts[i].events;
- ISockHandler* eh = reinterpret_cast<ISockHandler*>(m_evts[i].data.ptr);
- int stateType = ST_SUCCESS;
- if (evts & EPOLLIN)
- stateType = eh->OnReceive();
- if (evts & EPOLLOUT)
- stateType = eh->OnSend();
- if (evts & EPOLLERR || evts & EPOLLHUP)
- stateType = ST_EXCEPT_FAILED;
- if (stateType != ST_SUCCESS)
- eh->OnError(stateType, errno);
- }
- }
- void Multiplexor::Register(ISockHandler* eh, MultiplexorMask mask)
- {
- MultiplexorMask masks = mask | m_otherMasks;
- OperateHandler(EPOLL_CTL_ADD, eh, masks);
- }
- void Multiplexor::Remove(ISockHandler* eh)
- {
-
- OperateHandler(EPOLL_CTL_DEL, eh, ALL_EVENTS_MASK);
- }
- void Multiplexor::EnableMask(ISockHandler* eh, MultiplexorMask mask)
- {
- MultiplexorMask masks = mask | Herm::READ_MASK | Herm::WRITE_MASK;
- OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks);
- }
- void Multiplexor::DisableMask(ISockHandler* eh, MultiplexorMask mask)
- {
- MultiplexorMask masks = (Herm::READ_MASK | Herm::WRITE_MASK) & (~mask);
- if (!OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks))
- throw HERM_SOCKET_EXCEPTION(ST_OTHER);
- }
Multiplexor::Multiplexor(int size, int timeout, bool lt)
{
m_epfd = epoll_create(size);
if (m_epfd == -1)
throw HERM_SOCKET_EXCEPTION(ST_OTHER);
m_size = size;
m_evts = new struct epoll_event[size];
m_timeout = timeout;
// sys/epoll.h is no EPOLLRDHUP(0X2000), don't add EPOLLRDHUP
m_otherMasks = EPOLLERR | EPOLLHUP;
if (!lt)
m_otherMasks |= EPOLLET;
}
Multiplexor::~Multiplexor()
{
close(m_epfd);
delete[] m_evts;
}
void Multiplexor::Run()
{
int fds = epoll_wait(m_epfd, m_evts, m_size, m_timeout);
if (fds == -1)
{
if (errno == EINTR)
return;
}
for (int i = 0; i < fds; ++i)
{
__uint32_t evts = m_evts[i].events;
ISockHandler* eh = reinterpret_cast<ISockHandler*>(m_evts[i].data.ptr);
int stateType = ST_SUCCESS;
if (evts & EPOLLIN)
stateType = eh->OnReceive();
if (evts & EPOLLOUT)
stateType = eh->OnSend();
if (evts & EPOLLERR || evts & EPOLLHUP)
stateType = ST_EXCEPT_FAILED;
if (stateType != ST_SUCCESS)
eh->OnError(stateType, errno);
}
}
void Multiplexor::Register(ISockHandler* eh, MultiplexorMask mask)
{
MultiplexorMask masks = mask | m_otherMasks;
OperateHandler(EPOLL_CTL_ADD, eh, masks);
}
void Multiplexor::Remove(ISockHandler* eh)
{
// Delete fd from epoll, don't need masks
OperateHandler(EPOLL_CTL_DEL, eh, ALL_EVENTS_MASK);
}
void Multiplexor::EnableMask(ISockHandler* eh, MultiplexorMask mask)
{
MultiplexorMask masks = mask | Herm::READ_MASK | Herm::WRITE_MASK;
OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks);
}
void Multiplexor::DisableMask(ISockHandler* eh, MultiplexorMask mask)
{
MultiplexorMask masks = (Herm::READ_MASK | Herm::WRITE_MASK) & (~mask);
if (!OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks))
throw HERM_SOCKET_EXCEPTION(ST_OTHER);
}
上面類就用到epoll_create(), epoll_ctl()和epoll_wait(),以及幾種事件。epoll用起來比select清爽一些。
大致用法類似下面這樣:
先定義一個Handler
- class StreamHandler : public Herm::ISockHandler
- {
- public:
- virtual Herm::Handle GetHandle() const;
- virtual int OnReceive(int);
- virtual int OnSend(int);
- };
class StreamHandler : public Herm::ISockHandler
{
public:
virtual Herm::Handle GetHandle() const;
virtual int OnReceive(int);
virtual int OnSend(int);
};
在OnReceive()處理收到數據的動作,在OnSend()。。。。
在通信線程中,大概類似這樣的代碼,實際看情況。
- Multiplexor multiplexor;
- StreamHandler sh;
- multiplexor.Register(&sh, READ_EVT);
- multiplexor.Run(...);
Multiplexor multiplexor;
StreamHandler sh;
multiplexor.Register(&sh, READ_EVT);
multiplexor.Run(...);
參考文獻
1.
FreeBSD上kqueue和epoll是類似的,有興趣的朋友請參考。
http://blog.csdn.net/herm_lib/article/details/6047038
2.
這里涉及到epoll的通信層如何和邏輯數據交互的問題
http://blog.csdn.net/herm_lib/article/details/5980657