青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

posts - 2,  comments - 2,  trackbacks - 0
   epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
   LT/ET:ET也會(huì)多次發(fā)送event,當(dāng)然頻率遠(yuǎn)低于LT,但是epoll one shot才是真正的對(duì)"one connection VS one thread in worker thread pool,不依賴于任何connection-data-queue"的基礎(chǔ)支持 .我看到大部分對(duì)epoll_wait的處理模式如下,很教科化,因?yàn)閙an-pages就是這樣舉例子的。
man-pages epoll_wait handle:
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
for (;;)
{
   nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
   for(1~nfds)
       handle events[i];
   ......
}
epoll_ctl_add的當(dāng)然是EPOLLIN|EPOLLET,此外我就不知道處理上面代碼的是一條線程還是一堆線程(in threadpool),但愿不是一條線程吧!如果是的話,難不成等處理完這MAX_EVENTS 個(gè)再去處理接下來(lái)的MAX_EVENTS 個(gè)?慢否? 但是如果是一堆線程的話,你是否考慮過(guò)如何處理request-data in one connection在邏輯上的完整性,也就是一個(gè)request-data-packet可能會(huì)被分割成若干次發(fā)送,在上面的處理模式中你真的要好好設(shè)計(jì)一下了。
而我的epoll_wait處理模式如下:
struct epoll_event activeEvent;
for(;;)
{
   epoll_wait(epollfd, &activeEvent, 1/*很驚訝嗎,但絕不是一條線程在運(yùn)行這段代碼,而是一堆*/, timeout);
   if handle activeEvent success
      epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
  ......
}
處理上面代碼的當(dāng)然是一堆線程in threadpool,而且epoll_ctl_add的是EPOLLIN|EPOLLET|EPOLLONESHOT
因?yàn)槲业脑O(shè)計(jì)理念是嚴(yán)格遵守one connection VS one thread in worker thread pool。
所以我下面的server框架的基本模型是:
One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
在epollWorkerRoutine中有以下的職責(zé):
1.handle request,當(dāng)忙時(shí)增加epollWorkerThread數(shù)量但不超過(guò)maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
2.timeout時(shí)檢查是否空閑和當(dāng)前epollWorkerThread數(shù)量,當(dāng)空閑時(shí)保持或減少至minThreads數(shù)量.
3.對(duì)所有Accepted-socket管理生命周期,這里利用系統(tǒng)的keepalive probes,若想實(shí)現(xiàn)業(yè)務(wù)層"心跳探測(cè)"只需將QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系統(tǒng)默認(rèn)的2小時(shí).這里并不維護(hù)所有連接列表,當(dāng)然你可以在/proc/getpid/fd下找到所有的socket fd.
4.linux上的non-blocking socket的操作仍然依賴recv,send,不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我這里把send動(dòng)作實(shí)現(xiàn)成了blocking的(internalBlockingSender),同樣的道理,non-blocking send依然會(huì)形成響應(yīng)數(shù)據(jù)在邏輯上的碎片錯(cuò)序,特別是你如果采用上面那個(gè)教科化的處理模式的化,并且還是多線程的話,那么這簡(jiǎn)直就亂透了。當(dāng)然你可以使用response-data-queue來(lái)達(dá)到異步準(zhǔn)確發(fā)送數(shù)據(jù)的目的。

下面結(jié)合源碼,淺析一下epoll programming:
socketserver.h
#ifndef __Q_SOCKET_SERVER__
#define __Q_SOCKET_SERVER__
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <string.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SOCKET_ERROR -1
#define INVALID_SOCKET -1
typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef unsigned short WORD;
typedef unsigned int DWORD;

#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
#define QSS_SIO_KEEPALIVE_VALS_COUNT 3
#define MAX_THREADS 100
#define MAX_THREADS_MIN  10
#define MIN_WORKER_WAIT_TIMEOUT  20*1000
#define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS  32

#define MAX_BUF_SIZE 1024
/* ulimit -n opened FDs per process.記得修改哦,否則還是select效果,就不是epoll效果了哦,呵呵*/
#define BLOCKING_SEND_TIMEOUT 20

typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

typedef struct {
  WORD passive;
  WORD port;//uint16_t
  WORD minThreads;
  WORD maxThreads;
  pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
  volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
  int  workerWaitTimeout;//wait timeout
  volatile int workerCounter;
  volatile int currentBusyWorkers;
  volatile int CSocketsCounter;
  CSocketLifecycleCallback cslifecb;
  InternalProtocolHandler protoHandler;
  SOCKET server_s;
  SOCKADDR_IN serv_addr;
  int epollFD;//main epoller.
  int BSendEpollFD;//For blocking send.
}QSocketServer;

typedef struct {
  SOCKET client_s;
  SOCKADDR_IN client_addr;
  uint32_t curEvents;

  char buf[MAX_BUF_SIZE];
  DWORD numberOfBytesTransferred;
  char * data;

  int BSendEpollFDRelated;
  pthread_mutex_t writableLock;
  pthread_cond_t  writableMonitor;
}QSSEPollEvent;//for per connection

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
int startSocketServer(QSocketServer *qss);
int shutdownSocketServer(QSocketServer *qss);
#endif

qsocketserver_model.c  //下面的代碼離生產(chǎn)環(huán)境還差內(nèi)存池logger哦!
 #include "socketserver.h"
#include <dirent.h>
#include <regex.h>
#define DIGIT_PATTERN_STRING "^[0-9]+$"
void *  epollWorkerRoutine(void *);
void *  blockingSendEpollerRoutine(void *);
int isDigitStr(const char *str){
    int ret=-1;
    regex_t regex;
    regmatch_t matchs[1];
    if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*這里不要傳0哦,否則nomatch*/)){
        ret=!regexec(&regex,str, 1,matchs,0);
        regfree(&regex);
    }
    return ret;
}

static int setNonBlocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts==-1)
    {
        perror("fcntl(sock,GETFL) failed!\n");
        return opts;
    }
    opts = opts|O_NONBLOCK;
    opts=fcntl(sock,F_SETFL,opts);
    if(opts==-1)
    {
        perror("fcntl(sock,SETFL,opts) failed!\n");
        return opts;
    }
    return 1;
}

static void adjustQSSWorkerLimits(QSocketServer *qss){
   //to adjust availabe size.
}
typedef struct{
 QSocketServer * qss;
 pthread_t th;
}QSSWORKER_PARAM;

static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
 WORD res=0;
 if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
 {
  QSSWORKER_PARAM * pParam=NULL;
  int i=0;
  pthread_spin_lock(&qss->g_spinlock);
  if(qss->workerCounter+addCounter<=qss->maxThreads)
   for(;i<addCounter;i++)
   {
    pParam=malloc(sizeof(QSSWORKER_PARAM));

    if(pParam){
     pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
     pParam->qss=qss;
     qss->workerCounter++,res++;
    }
   }
  pthread_spin_unlock(&qss->g_spinlock);
 }
 return res;
}

static void SOlogger(const char * msg,SOCKET s){
 perror(msg);
    if(s>0)
    close(s);
}

static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
    QSSEPollEvent *qssEPEvent=event->data.ptr;
    int ret;
    printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
    if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
        //sleep(5);
        ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
    }
     printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
 return ret;
}

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
{

 QSocketServer * qss=malloc(sizeof(QSocketServer));
 qss->passive=passive;
 qss->port=port;
 qss->minThreads=minThreads;
 qss->maxThreads=maxThreads;
 qss->workerWaitTimeout=workerWaitTimeout;
 qss->lifecycleStatus=0;
 pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
 qss->workerCounter=0;
 qss->currentBusyWorkers=0;
 qss->CSocketsCounter=0;
 qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
 if(!qss->protoHandler)
  qss->protoHandler=_InternalProtocolHandler;
 adjustQSSWorkerLimits(qss);
 *qss_ptr=qss;
 return 1;
}

int startSocketServer(QSocketServer *qss)
{
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==0){
            qss->lifecycleStatus=1;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

 qss->serv_addr.sin_family=AF_INET;
 qss->serv_addr.sin_port=htons(qss->port);
 inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
 //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

 qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
 if(setNonBlocking(qss->server_s)==-1)
 {
    SOlogger("setNonBlocking server_s failed.\n",0);
     return 0;
 }

 if(qss->server_s==INVALID_SOCKET)
 {
  SOlogger("socket failed.\n",0);
  return 0;
 }

 if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
 {
  SOlogger("bind failed.\n",qss->server_s);
  return 0;
 }

 if(listen(qss->server_s,SOMAXCONN/*這個(gè)宏windows也有,這里是128,當(dāng)然你可以設(shè)的小些,它影響開(kāi)銷的*/)==SOCKET_ERROR)
{
  SOlogger("listen failed.\n",qss->server_s);
  return 0;
}
    qss->epollFD=epoll_create1(0);/*這里不是epoll_create(size)哦,你可能不知道如何設(shè)置size,所以忽略它吧*/
    if(qss->epollFD==-1){
        SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
        return 0;
    }
    qss->BSendEpollFD=epoll_create1(0);//for blocking send.
    if(qss->BSendEpollFD==-1){
        SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
        return 0;
    }

    {//ADD ACCEPT EVENT
        struct epoll_event _epEvent;
        QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
        qssEPEvent->client_s=qss->server_s;
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
        _epEvent.data.ptr=qssEPEvent;
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
            SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
            free(qssEPEvent);
            return 0;
        }
    }
    {//starup blocking send epoller.
        QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
        pParam->qss=qss;
        pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
    }

 //initialize worker for epoll events.
 addQSSWorker(qss,qss->minThreads);
 qss->lifecycleStatus=2;
 return 1;
}

int shutdownSocketServer(QSocketServer *qss){
    //change qss->lifecycleStatus
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==2){
            qss->lifecycleStatus=3;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    /*shutdown server-listening socket,這里優(yōu)雅的做法是shutdown--notify-->epoll-->close.記得shutdown會(huì)發(fā)送EOF的哦*/
    shutdown(qss->server_s,SHUT_RDWR);

    // /proc/getpid/fd  shutdown all socket cs != serv_s
    {
        char dirBuf[64];
        struct dirent * de;
        DIR *pd=NULL;
        int sockFD;
        sprintf(dirBuf,"/proc/%d/fd/",getpid());
        pd=opendir(dirBuf);
        if(pd!=NULL){
            while((de=readdir(pd))!=NULL){
                if(isDigitStr(de->d_name)){
                    sockFD=atoi(de->d_name);
                    if(isfdtype(sockFD,S_IFSOCK))
                    shutdown(sockFD,SHUT_RDWR);
                }
            }
            closedir(pd);
        }
        /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
    }
 return 1;
}

static int onAcceptRoutine(QSocketServer * qss)
{
    SOCKADDR_IN client_addr;
    unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
    SOCKET cs;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
    if(cs==INVALID_SOCKET)
    {
        printf("onAccept failed:%d,%s\n",errno,strerror(errno));
        epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
        return 0;
    }
    if(setNonBlocking(cs)==-1)
    {
        printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
        return 0;
    }

    {// set keepalive option
        int keepAlive = 1;
        int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
        int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
        int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
        if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
           setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
           setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
           setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
           {
            printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
            return 0;
           }
    }
    qssEPEvent=malloc(sizeof(QSSEPollEvent));
    qssEPEvent->client_s=cs;
    {
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
        qssEPEvent->BSendEpollFDRelated=0;
        _epEvent.data.ptr=qssEPEvent;/*這里又和教科的不一樣哦,真正的user data用ptr,而不是單一的fd*/
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
            printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
            free(qssEPEvent);
            return 0;
        }else{
            pthread_spin_lock(&qss->g_spinlock);
            qss->CSocketsCounter++;
            pthread_spin_unlock(&qss->g_spinlock);
            if(qss->cslifecb)
                qss->cslifecb(cs,0);
        }
    }
    printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
    return 1;
}

typedef struct{
    QSocketServer * qss;
    QSSEPollEvent * event;
}InternalSenderBase_t;

static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
    InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
    char * _sbuf=_buf;
    int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

    QSSEPollEvent *qssEPEvent=NULL;
    struct epoll_event _epEvent;

    struct timespec sendTimeo;

    while(1){
        *errno_ptr=0;
        while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
            sum+=ret,_sbuf+=ret;
        if(sum==nbs||ret==0)
            break;
        else if(ret==-1){
            if(errno==EAGAIN&&sum<nbs){
                qssEPEvent=sb->event;
                _epEvent.data.ptr=qssEPEvent;
                _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                if(qssEPEvent->BSendEpollFDRelated==0){
                    pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                    pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                    qssEPEvent->BSendEpollFDRelated=1;
                    curEpoll_ctl_opt=EPOLL_CTL_ADD;
                }else{
                    curEpoll_ctl_opt=EPOLL_CTL_MOD;
                }

                {//wait writable.
                    int flag=0;
                    pthread_mutex_lock(&qssEPEvent->writableLock);
                    if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                        sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                        int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                        if(err)
                            flag=-1;
                    }else
                        flag=-1;
                    pthread_mutex_unlock(&qssEPEvent->writableLock);
                    if(flag==-1)
                        break;
                }

            }else{
                if(errno==EAGAIN&&sum==nbs)
                    ret=nbs;//it is ok;
                break;
            }
        }
    }//end while.
    return ret;
}
void *  blockingSendEpollerRoutine(void *_param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    //pthread_t * curThread=&pParam->th;
    struct epoll_event epEvents[qss->maxThreads];
    QSSEPollEvent *qssEPEvent=NULL;
    int pollRes,*errno_ptr=&errno;

    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(1){

        pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
        if(pollRes>=1){
            int i=0;
            for(;i<pollRes;i++)
            if(epEvents[i].events&EPOLLOUT){//這個(gè)epollfd只應(yīng)該做以下的事情,少做為快!
                qssEPEvent=epEvents[i].data.ptr;
                pthread_mutex_lock(&qssEPEvent->writableLock);
                pthread_cond_signal(&qssEPEvent->writableMonitor);
                pthread_mutex_unlock(&qssEPEvent->writableLock);
            }

        }else if(pollRes==-1){//errno 
            printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            break;
        }

    }

    return NULL;
}

void *  epollWorkerRoutine(void * _param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    pthread_t * curThread=&pParam->th;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    InternalSenderBase_t _senderBase;
    int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
    _senderBase.qss=qss;
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(!exitCode){

        *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
        pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
        if(pollRes==1){
            qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

            if(qssEPEvent->client_s==qss->server_s)
            {//Accepted Socket.
               onAcceptRoutine(qss);
               continue;
            }else{
                if(qss->protoHandler){
                    _senderBase.event=_epEvent.data.ptr;
                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers++;
                    pthread_spin_unlock(&qss->g_spinlock);

                    addQSSWorker(qss,1);
                    handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers--;
                    pthread_spin_unlock(&qss->g_spinlock);

                    if(handleCode>0){
                        _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                        if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                            SOErrOccurred=2;
                    }else{
                        SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                    }
                }
            }

        }else if(pollRes==0){//timeout
            printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
            {
                pthread_spin_lock(&qss->g_spinlock);
                if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                    qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                    exitCode=2;
                }
                pthread_spin_unlock(&qss->g_spinlock);
            }else if(qss->lifecycleStatus>=4)
                    exitCode=4;

        }else if(pollRes==-1){//errno
            printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            exitCode=1;
        }

        if(SOErrOccurred){
            if(qss->cslifecb)
                qss->cslifecb(qssEPEvent->client_s,-1);
            /*if(qssEPEvent)*/{
                epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                close(qssEPEvent->client_s);
                if(qssEPEvent->BSendEpollFDRelated){
                    pthread_cond_destroy(&qssEPEvent->writableMonitor);
                    pthread_mutex_destroy(&qssEPEvent->writableLock);
                }
                free(qssEPEvent);
              }
            pthread_spin_lock(&qss->g_spinlock);
            if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
             //for qss workerSize,
             qss->lifecycleStatus=4;
             exitCode=3;
            }
            pthread_spin_unlock(&qss->g_spinlock);
        }//SOErrOccurred handle;

    }//end main while.

    if(exitCode!=2){
        int clearup=0;
        pthread_spin_lock(&qss->g_spinlock);
        if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
          clearup=1;
        }
        pthread_spin_unlock(&qss->g_spinlock);
        if(clearup){
         close(qss->epollFD);
         close(qss->BSendEpollFD);
         pthread_spin_destroy(&qss->g_spinlock);
         free(qss);
        }
    }//exitCode handle;
 return NULL;
}

 

 

 

 

posted on 2010-08-03 14:37 adapterofcoms 閱讀(5502) 評(píng)論(1)  編輯 收藏 引用

FeedBack:
# re: 一個(gè)基于Event Poll(epoll)的TCP Server Framework,淺析epoll
2010-08-04 13:12 | adapterofcoms
這篇文章昨天發(fā)的急,只是粘貼了源碼,沒(méi)有發(fā)表設(shè)計(jì)思路和對(duì)源碼的解析,今天已經(jīng)補(bǔ)上,希望大家多指教!  回復(fù)  更多評(píng)論
  

只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。
網(wǎng)站導(dǎo)航: 博客園   IT新聞   BlogJava   博問(wèn)   Chat2DB   管理



<2025年9月>
31123456
78910111213
14151617181920
21222324252627
2829301234
567891011

常用鏈接

留言簿

隨筆檔案(2)

搜索

  •  

最新評(píng)論

閱讀排行榜

評(píng)論排行榜

青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            日韩一区二区精品在线观看| 亚洲国内高清视频| 亚洲图片欧洲图片日韩av| 欧美激情二区三区| 欧美日本不卡| 欧美在线电影| 久久久久久久一区二区三区| 亚洲激情一区二区| av不卡在线| 国产曰批免费观看久久久| 欧美成人精精品一区二区频| 欧美久久久久久久久久| 亚洲一区国产视频| 欧美中文在线观看| 亚洲精品免费在线观看| 亚洲一区二区成人| 在线观看成人av电影| 日韩一级精品| 在线不卡中文字幕| 99精品99久久久久久宅男| 国产一区二区精品久久| 亚洲精品午夜精品| 好看的亚洲午夜视频在线| 亚洲福利国产| 欧美日韩综合久久| 美腿丝袜亚洲色图| 国产精品国产三级国产aⅴ浪潮| 久久人人97超碰国产公开结果| 牛牛国产精品| 欧美一区二区精品在线| 欧美成人国产| 久久精品一本| 国产精品国产精品国产专区不蜜| 欧美大片免费观看| 国产日韩欧美日韩| 亚洲蜜桃精久久久久久久| 亚洲大胆av| 午夜亚洲视频| 亚洲欧美日韩一区二区| 欧美国产免费| 欧美大尺度在线观看| 国产午夜精品麻豆| 一区二区三区日韩精品| 亚洲精品国精品久久99热一| 久久精品一区二区三区不卡牛牛| 性久久久久久久久久久久| 欧美日韩a区| 亚洲国内欧美| 亚洲日本电影| 欧美bbbxxxxx| 亚洲大黄网站| 亚洲国产欧美一区二区三区丁香婷| 欧美亚洲尤物久久| 欧美一级淫片aaaaaaa视频| 欧美日韩第一区| 亚洲三级免费电影| 99re这里只有精品6| 欧美电影免费网站| 亚洲福利久久| 亚洲精品一区二区在线| 麻豆av一区二区三区| 欧美高清在线观看| 亚洲高清视频一区二区| 久久一区二区视频| 欧美成人综合一区| 篠田优中文在线播放第一区| 国产亚洲美州欧州综合国| 亚洲麻豆国产自偷在线| 一区二区三区久久网| 欧美国产视频日韩| 91久久久在线| 亚洲午夜影视影院在线观看| 国产精品第2页| 亚洲伦理自拍| 性色av香蕉一区二区| 国产欧美1区2区3区| 欧美一区二区三区婷婷月色| 美女国内精品自产拍在线播放| **欧美日韩vr在线| 欧美激情视频在线播放 | 久久精品一区四区| 国产中文一区二区三区| 久久久久久久97| 亚洲国产三级| 亚洲综合精品自拍| 国内精品久久久久久久果冻传媒| 久久久久久久性| 日韩亚洲国产精品| 欧美与黑人午夜性猛交久久久| 国内精品免费在线观看| 欧美顶级艳妇交换群宴| av成人免费观看| 久久免费的精品国产v∧| 亚洲精品久久久蜜桃| 国产精品激情av在线播放| 欧美资源在线| 亚洲三级电影在线观看| 欧美一区网站| 亚洲精品影院在线观看| 国产精品影院在线观看| 免费成人av| 亚洲一区二区三区在线播放| 欧美成人一区二区三区在线观看 | 亚洲伦理久久| 国产一区二区三区在线播放免费观看 | 亚洲伊人伊色伊影伊综合网| 国产综合香蕉五月婷在线| 欧美精品 日韩| 欧美在线视屏| 一本一本久久a久久精品牛牛影视| 久久免费视频网站| 亚洲在线视频网站| 亚洲免费av片| 亚洲国产精品一区在线观看不卡| 国产精品伦一区| 欧美精品久久99| 久久精品国产亚洲a| 亚洲午夜精品久久久久久app| 亚洲电影免费观看高清完整版在线观看| 亚洲欧美日韩国产成人精品影院 | 激情一区二区| 国产精品亚洲产品| 国产精品h在线观看| 欧美xart系列在线观看| 久久婷婷国产综合精品青草| 亚洲欧美日韩国产综合精品二区 | 免费成人性网站| 久久视频在线视频| 欧美日韩国产不卡在线看| 欧美在线看片a免费观看| 欧美激情自拍| 欧美国产在线电影| 欧美大片一区二区三区| 久久久最新网址| 久久精品国产清高在天天线| 午夜精品一区二区三区四区| 中文久久精品| 亚洲在线免费| 久久av资源网站| 欧美在线视频全部完| 午夜精品久久| 欧美亚洲综合久久| 欧美在线观看网址综合| 欧美在线影院| 久久综合网络一区二区| 麻豆9191精品国产| 欧美国产在线视频| 亚洲国产日韩欧美在线图片| 亚洲第一在线综合在线| 亚洲激情在线| 日韩亚洲精品电影| 亚洲欧美精品suv| 校园激情久久| 老牛嫩草一区二区三区日本| 麻豆av一区二区三区| 欧美精品在线免费| 国产精品久久毛片a| 国产日韩在线看| 亚洲第一在线综合在线| 亚洲精品字幕| 午夜久久久久| 另类亚洲自拍| 亚洲欧洲日韩女同| 亚洲一二三区精品| 久久精品99国产精品日本| 久久综合狠狠综合久久综青草| 欧美成人高清| 国产欧美一级| 亚洲欧洲精品天堂一级| 亚洲一区二区三区精品在线观看| 久久精品国产成人| 欧美国产日韩在线| 宅男噜噜噜66一区二区| 久久久99国产精品免费| 欧美日韩国产成人在线91| 国产伦精品一区二区三区免费迷| 在线观看精品视频| 亚洲伊人网站| 欧美激情成人在线| 午夜精品久久久久久久久久久| 蜜臀久久99精品久久久画质超高清| 欧美日韩成人在线观看| 国产主播一区二区三区| 日韩视频一区二区| 久久久久女教师免费一区| 亚洲欧洲另类| 久久久夜色精品亚洲| 国产精品电影网站| 亚洲精品乱码| 久久天天狠狠| 亚洲欧美电影院| 欧美三级中文字幕在线观看| 永久免费视频成人| 羞羞漫画18久久大片| 亚洲精品在线二区| 久久综合色影院| 国产亚洲欧洲997久久综合| 亚洲一区二区av电影| 亚洲国产精品毛片| 久久久久久91香蕉国产|