|
最近有朋友在面試的時候被問了select 和epoll效率差的原因,和一般人一樣,大部分都會回答select是輪詢、epoll是觸發式的,所以效率高。這個答案聽上去很完美,大致也說出了二者的主要區別。今天閑來無事,翻看了下內核代碼,結合內核代碼和大家分享下我的觀點。一、連接數我本人也曾經在項目中用過select和epoll,對于select,感觸最深的是linux下select最大數目限制(windows 下似乎沒有限制),每個進程的select最多能處理FD_SETSIZE個FD(文件句柄),如果要處理超過1024個句柄,只能采用多進程了。常見的使用slect的多進程模型是這樣的: 一個進程專門accept,成功后將fd通過unix socket傳遞給子進程處理,父進程可以根據子進程負載分派。曾經用過1個父進程+4個子進程 承載了超過4000個的負載。這種模型在我們當時的業務運行的非常好。epoll在連接數方面沒有限制,當然可能需要用戶調用API重現設置進程的資源限制。二、IO差別1、select的實現這段可以結合linux內核代碼描述了,我使用的是2.6.28,其他2.6的代碼應該差不多吧。先看看select:select系統調用的代碼在fs/Select.c下,asmlinkage long sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp){ struct timespec end_time, *to = NULL; struct timeval tv; int ret; if (tvp) { if (copy_from_user(&tv, tvp, sizeof(tv))) return -EFAULT; to = &end_time; if (poll_select_set_timeout(to, tv.tv_sec + (tv.tv_usec / USEC_PER_SEC), (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC)) return -EINVAL; } ret = core_sys_select(n, inp, outp, exp, to); ret = poll_select_copy_remaining(&end_time, tvp, 1, ret); return ret;} 前面是從用戶控件拷貝各個fd_set到內核空間,接下來的具體工作在core_sys_select中,core_sys_select->do_select,真正的核心內容在do_select里:int do_select(int n, fd_set_bits *fds, struct timespec *end_time){ ktime_t expire, *to = NULL; struct poll_wqueues table; poll_table *wait; int retval, i, timed_out = 0; unsigned long slack = 0; rcu_read_lock(); retval = max_select_fd(n, fds); rcu_read_unlock(); if (retval < 0) return retval; n = retval; poll_initwait(&table); wait = &table.pt; if (end_time && !end_time->tv_sec && !end_time->tv_nsec) { wait = NULL; timed_out = 1; } if (end_time && !timed_out) slack = estimate_accuracy(end_time); retval = 0; for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; set_current_state(TASK_INTERRUPTIBLE); inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; for (i = 0; i < n; ++rinp, ++routp, ++rexp) { unsigned long in, out, ex, all_bits, bit = 1, mask, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; const struct file_operations *f_op = NULL; struct file *file = NULL; in = *inp++; out = *outp++; ex = *exp++; all_bits = in | out | ex; if (all_bits == 0) { i += __NFDBITS; continue; } for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) { int fput_needed; if (i >= n) break; if (!(bit & all_bits)) continue; file = fget_light(i, &fput_needed); if (file) { f_op = file->f_op; mask = DEFAULT_POLLMASK; if (f_op && f_op->poll) mask = (*f_op->poll)(file, retval ? NULL : wait); fput_light(file, fput_needed); if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; retval++; } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; } } } if (res_in) *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; cond_resched(); } wait = NULL; if (retval || timed_out || signal_pending(current)) break; if (table.error) { retval = table.error; break; } /* * If this is the first loop and we have a timeout * given, then we convert to ktime_t and set the to * pointer to the expiry value. */ if (end_time && !to) { expire = timespec_to_ktime(*end_time); to = &expire; } if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) timed_out = 1; } __set_current_state(TASK_RUNNING); poll_freewait(&table); return retval;} 上面的代碼很多,其實真正關鍵的代碼是這一句:mask = (*f_op->poll)(file, retval ? NULL : wait); 這個是調用文件系統的 poll函數,不同的文件系統poll函數自然不同,由于我們這里關注的是tcp連接,而socketfs的注冊在 net/Socket.c里。register_filesystem(&sock_fs_type); socket文件系統的函數也是在net/Socket.c里:static const struct file_operations socket_file_ops = { .owner = THIS_MODULE, .llseek = no_llseek, .aio_read = sock_aio_read, .aio_write = sock_aio_write, .poll = sock_poll, .unlocked_ioctl = sock_ioctl,#ifdef CONFIG_COMPAT .compat_ioctl = compat_sock_ioctl,#endif .mmap = sock_mmap, .open = sock_no_open, /* special open code to disallow open via /proc */ .release = sock_close, .fasync = sock_fasync, .sendpage = sock_sendpage, .splice_write = generic_splice_sendpage, .splice_read = sock_splice_read,};從sock_poll跟隨下去,最后可以到 net/ipv4/tcp.c的unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait) 這個是最終的查詢函數,也就是說select 的核心功能是調用tcp文件系統的poll函數,不停的查詢,如果沒有想要的數據,主動執行一次調度(防止一直占用cpu),直到有一個連接有想要的消息為止。從這里可以看出select的執行方式基本就是不同的調用poll,直到有需要的消息為止,如果select 處理的socket很多,這其實對整個機器的性能也是一個消耗。2、epoll的實現epoll的實現代碼在 fs/EventPoll.c下,由于epoll涉及到幾個系統調用,這里不逐個分析了,僅僅分析幾個關鍵點,第一個關鍵點在static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd) 這是在我們調用sys_epoll_ctl 添加一個被管理socket的時候調用的函數,關鍵的幾行如下:epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); /* * Attach the item to the poll hooks and get current event bits. * We can safely use the file* here because its usage count has * been increased by the caller of this function. Note that after * this operation completes, the poll callback can start hitting * the new item. */ revents = tfile->f_op->poll(tfile, &epq.pt); 這里也是調用文件系統的poll函數,不過這次初始化了一個結構,這個結構會帶有一個poll函數的callback函數:ep_ptable_queue_proc,在調用poll函數的時候,會執行這個callback,這個callback的功能就是將當前進程添加到 socket的等待進程上。static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt){ struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq; if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { /* We have to signal that an error occurred */ epi->nwait = -1; }} 注意到參數 whead 實際上是 sk->sleep,其實就是將當前進程添加到sk的等待隊列里,當該socket收到數據或者其他事件觸發時,會調用sock_def_readable 或者sock_def_write_space 通知函數來喚醒等待進程,這2個函數都是在socket創建的時候填充在sk結構里的。從前面的分析來看,epoll確實是比select聰明的多、輕松的多,不用再苦哈哈的去輪詢了。
|