情景分析
在網(wǎng)絡(luò)編程中,通常異步比同步處理更為復(fù)雜,但由于異步的事件通知機(jī)制,避免了同步方式中的忙等待,提高了吞吐量,因此效率較高,在高性能應(yīng)用開(kāi)發(fā)中,經(jīng)常被用到。而在處理異步相關(guān)的問(wèn)題時(shí),狀態(tài)機(jī)模式是一種典型的有效方法,這在libevent、memcached、nginx等開(kāi)源軟件(庫(kù))中多次被使用而得到見(jiàn)證。據(jù)此,為拋磚引玉,本文展示了使用此方法異步接收變長(zhǎng)數(shù)據(jù)包的實(shí)現(xiàn),這里的變長(zhǎng)是指在某一種網(wǎng)絡(luò)協(xié)議中,具有完整意義的數(shù)據(jù)包長(zhǎng)度是不固定的。為了描述方便,我們以一個(gè)TCP C/S模式的簡(jiǎn)單例子為場(chǎng)景分析說(shuō)明,服務(wù)端在某知名端口監(jiān)聽(tīng),使用異步IO復(fù)用機(jī)制epoll ET模式接受連接、接收分析請(qǐng)求、存取數(shù)據(jù)到內(nèi)存緩沖中,這種內(nèi)存緩沖類(lèi)似于數(shù)據(jù)庫(kù),數(shù)據(jù)按鍵值對(duì)存取;客戶端的請(qǐng)求包括增加、修改和刪除三種,每種請(qǐng)求對(duì)應(yīng)的數(shù)據(jù)包長(zhǎng)度不一樣。
協(xié)議封包
如下圖所示,請(qǐng)求封包各字段從左到右依次為:type字段表示請(qǐng)求類(lèi)型,占1個(gè)字節(jié),值域?yàn)閧1,2,3},1為增加,2為修改,3為刪除;key字段表示鍵名,占16個(gè)字節(jié);val字段表示值,占256個(gè)字節(jié);expire表示生存期,占4個(gè)字節(jié),單位為秒。顯而易見(jiàn),對(duì)于每種類(lèi)型的請(qǐng)求,其封包長(zhǎng)度是固定的。
狀態(tài)轉(zhuǎn)換
由于在一條連接上客戶端可以先后發(fā)送多種不同類(lèi)型的請(qǐng)求,因此服務(wù)端需要接收完整某種請(qǐng)求的包后,才能解析處理。當(dāng)處于某種特定類(lèi)型的請(qǐng)求時(shí),接收完它的包,這很容易實(shí)現(xiàn)。但當(dāng)存在多種不同類(lèi)型的請(qǐng)求時(shí),就需要先識(shí)別當(dāng)前的請(qǐng)求類(lèi)型,再在這種類(lèi)型中接收完整的包,然后再識(shí)別新的請(qǐng)求類(lèi)型,繼續(xù)循環(huán)這樣的一個(gè)過(guò)程。因此,這就自然而然地對(duì)應(yīng)到了狀態(tài)機(jī),如下圖所示,有4個(gè)狀態(tài):1個(gè)起始狀態(tài)
prepare,在此狀態(tài)中識(shí)別當(dāng)前請(qǐng)求類(lèi)型,轉(zhuǎn)到下一中間狀態(tài);3個(gè)中間狀態(tài)
add、
set、
del,分別對(duì)應(yīng)
增加、
修改和
刪除請(qǐng)求,在此狀態(tài)中不斷接收數(shù)據(jù),直至接收完整,再轉(zhuǎn)到起始狀態(tài)。由于從中間狀態(tài)能轉(zhuǎn)到起始狀態(tài),因此就沒(méi)必要存在結(jié)束狀態(tài)。e1、e2、e3、e4表示不同狀態(tài)間轉(zhuǎn)化的觸發(fā)事件。
代碼實(shí)現(xiàn)
數(shù)據(jù)結(jié)構(gòu)定義
connection類(lèi)表示一條在客戶端和服務(wù)端間建立的連接,靜態(tài)成員函數(shù)handle_read被epoll模型當(dāng)有數(shù)據(jù)可讀時(shí)回調(diào),普通成員函數(shù)handle_read則做實(shí)際的接收處理。
1 enum read_state { prepare, add, set, del };
2
3
static const char MSG_TYPE_ADD = 1;
4
static const char MSG_TYPE_SET = 2;
5
static const char MSG_TYPE_DEL = 3;
6
7
#pragma pack(1)
8
struct msg_add
9

{
10
char key[16];
11
char val[256];
12
uint32_t expire;
13
};
14
15
struct msg_set
16

{
17
char key[16];
18
char val[256];
19
};
20
21
struct msg_del
22

{
23
char key[16];
24
};
25
#pragma pack()
26
27
static const size_t MSG_MAX_SIZE = sizeof(msg_add);
28
29
class connection
30

{
31
public:
32
connection();
33
34
void recv_add_msg(msg_add* msg);
35
void recv_set_msg(msg_set* msg);
36
void recv_del_msg(msg_get* msg);
37
38
bool send_add_msg(const char* key,const char* val,uint32_t expire);
39
bool send_set_msg(const char* key,const char* val);
40
bool send_del_msg(const char* key);
41
42
private:
43
void reset_state()
44
{ tran_ = 0, size_ = 1; s_ = prepare;}
45
46
void handle_read();
47
static void handle_read(int fd,short ev,void* arg);
48
49
private:
50
int sock_;
51
char buf_[MSG_MAX_SIZE];
52
size_t tran_;
53
size_t size_;
54
read_state s_;
55
};
服務(wù)端異步接收
最初時(shí)處于
起始狀態(tài)
prepare,在這個(gè)狀態(tài)中:先接收1個(gè)字節(jié),分析請(qǐng)求類(lèi)型,更新?tīng)顟B(tài),然后繼續(xù)接收數(shù)據(jù)。當(dāng)收到數(shù)據(jù)read返回時(shí),那么這時(shí)已經(jīng)處于3種中間狀態(tài)
add、
set、
del之一了,在這個(gè)狀態(tài)中:只要繼續(xù)收完這種類(lèi)型的請(qǐng)求包即可解析處理,最后再重設(shè),返回到狀態(tài)
prepare,繼續(xù)接收下一個(gè)請(qǐng)求包。
1
connection::connection()
2
:sock(-1)
3
,tran_(0)
4
,size_(1)
5
,s_(prepare)
6

{
7
}
8
9
void connection::recv_add_msg(msg_add* msg)
10

{
11
}
12
13
void connection::recv_set_msg(msg_set* msg)
14

{
15
}
16
17
void connection::recv_del_msg(msg_del* msg)
18

{
19
}
20
21
void connection::handle_read(int fd,short ev,void* arg)
22

{
23
static_cast<connection*>(arg)->handle_read();
24
}
25
26
void connection::state_machine()
27

{
28
switch(s_)
{
29
case prepare:
30
if(MSG_TYPE_ADD==buf_[0])
{
31
tran_ = 0, size_ = sizeof(msg_add);
32
s_ = add;
33
}else if(MSG_TYPE_SET==buf_[0])
{
34
tran_ = 0, size_ = sizeof(msg_set);
35
s_ = set;
36
}else if(MSG_TYPE_DEL==buf_[0])
{
37
tran_ = 0, size_ = sizeof(msg_del);
38
s_ = del;
39
}else
40
assert(false);
41
break;
42
43
case add:
44
if(tran_ == size_)
{
45
recv_add_msg(reinterpret_cast<msg_add*>(buf_));
46
reset_state();
47
}
48
break;
49
50
case set:
51
if(tran_ == size_)
{
52
recv_set_msg(reinterpret_cast<msg_set*>(buf_));
53
reset_state();
54
}
55
break;
56
57
case del:
58
if(tran_ == size_)
{
59
recv_del_msg(reinterpret_cast<msg_del*>(buf_));
60
reset_state();
61
}
62
break;
63
}
64
}
65
66
void connection::handle_read()
67

{
68
ssize_t ret;
69
for(;;)
{
70
ret = read(sock_,buf_+tran_,size_-tran_);
71
if (ret > 0)
{
72
tran_ += ret;
73
state_machine();
74
}else if(ret < 0 && errno == EAGAIN)
{
75
break;
76
}else
{
77
close(sock_); break;
78
}
79
}
80
}
客戶端同步發(fā)送
由于一般大多數(shù)的客戶端不像服務(wù)端要求高性能高并發(fā),因此使用同步方式來(lái)發(fā)送數(shù)據(jù)。下面代碼忽略了錯(cuò)誤處理,為簡(jiǎn)單方便,發(fā)送請(qǐng)求的實(shí)現(xiàn)也寫(xiě)在了類(lèi)connection內(nèi),依次為send_add_msg、send_set_msg、send_del_msg成員函數(shù)。
1
bool connection::send_iovec(char type,void* msg,size_t len)
2

{
3
struct iovec iov[2];
4
iov[0].iov_base = &type;
5
iov[0].iov_len = 1;
6
iov[1].iov_base = msg;
7
iov[1].iov_len = len;
8
9
return writev(sock_,iov,NUM_ELEMENTS(iov))==1+len;
10
}
11
12
bool connection::send_add_msg(const char* key,const char* val,uint32_t expire)
13

{
14
msg_add msg;
15
strcpy(msg.key,key);
16
strcpy(msg.val,val);
17
msg.expire = expire;
18
return send_iovec(MSG_TYPE_ADD,&msg,sizeof(msg));
19
}
20
21
bool connection::send_set_msg(const char* key,const char* val)
22

{
23
msg_set msg;
24
strcpy(msg.key,key);
25
strcpy(msg.val,val);
26
return send_iovec(MSG_TYPE_SET,&msg,sizeof(msg));
27
}
28
29
bool connection::send_del_msg(const char* key)
30

{
31
msg_del msg;
32
strcpy(msg.key,key);
33
return send_iovec(MSG_TYPE_GET,&msg,sizeof(msg));
34
}
小結(jié)
雖然以上所述的場(chǎng)景是網(wǎng)絡(luò)通信,但對(duì)于進(jìn)程間使用管道和字節(jié)流套接字的通信,也同樣適合。
posted on 2012-09-20 15:48
春秋十二月 閱讀(2817)
評(píng)論(2) 編輯 收藏 引用 所屬分類(lèi):
Network