• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            posts - 297,  comments - 15,  trackbacks - 0
            epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
               LT/ET:ET也會多次發送event,當然頻率遠低于LT,但是epoll one shot才是真正的對"one connection VS one thread in worker thread pool,不依賴于任何connection-data-queue"的基礎支持 .我看到大部分對epoll_wait的處理模式如下,很教科化,因為man-pages就是這樣舉例子的。
            man-pages epoll_wait handle:
            #define MAX_EVENTS 10
            struct epoll_event events[MAX_EVENTS];
            for (;;) 
            {
               nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
               for(1~nfds)
                   handle events[i];
               ......
            }
            epoll_ctl_add的當然是EPOLLIN|EPOLLET,此外我就不知道處理上面代碼的是一條線程還是一堆線程(in threadpool),但愿不是一條線程吧!如果是的話,難不成等處理完這MAX_EVENTS 個再去處理接下來的MAX_EVENTS 個?慢否? 但是如果是一堆線程的話,你是否考慮過如何處理request-data in one connection在邏輯上的完整性,也就是一個request-data-packet可能會被分割成若干次發送,在上面的處理模式中你真的要好好設計一下了。
            而我的epoll_wait處理模式如下:
            struct epoll_event activeEvent;
            for(;;)
            {
               epoll_wait(epollfd, &activeEvent, 1/*很驚訝嗎,但絕不是一條線程在運行這段代碼,而是一堆*/, timeout);
               if handle activeEvent success
                  epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
              ......
            }
            處理上面代碼的當然是一堆線程in threadpool,而且epoll_ctl_add的是EPOLLIN|EPOLLET|EPOLLONESHOT
            因為我的設計理念是嚴格遵守one connection VS one thread in worker thread pool。
            所以我下面的server框架的基本模型是:
            One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
            在epollWorkerRoutine中有以下的職責:
            1.handle request,當忙時增加epollWorkerThread數量但不超過maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
            2.timeout時檢查是否空閑和當前epollWorkerThread數量,當空閑時保持或減少至minThreads數量.
            3.對所有Accepted-socket管理生命周期,這里利用系統的keepalive probes,若想實現業務層"心跳探測"只需將QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系統默認的2小時.這里并不維護所有連接列表,當然你可以在/proc/getpid/fd下找到所有的socket fd.
            4.linux上的non-blocking socket的操作仍然依賴recv,send,不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我這里把send動作實現成了blocking的(internalBlockingSender),同樣的道理,non-blocking send依然會形成響應數據在邏輯上的碎片錯序,特別是你如果采用上面那個教科化的處理模式的化,并且還是多線程的話,那么這簡直就亂透了。當然你可以使用response-data-queue來達到異步準確發送數據的目的。

            下面結合源碼,淺析一下epoll programming:
            socketserver.h
            #ifndef __Q_SOCKET_SERVER__
            #define __Q_SOCKET_SERVER__
            #include <errno.h>
            #include <sys/socket.h>
            #include <netinet/in.h>
            #include <netinet/tcp.h>
            #include <arpa/inet.h>
            #include <sys/types.h>
            #include <string.h>
            #include <sys/epoll.h>
            #include <pthread.h>
            #include <unistd.h>
            #include <fcntl.h>
            #include <stdio.h>
            #include <stdlib.h>
            #include <time.h>
            #define SOCKET_ERROR -1
            #define INVALID_SOCKET -1
            typedef int SOCKET;
            typedef struct sockaddr_in SOCKADDR_IN;
            typedef unsigned short WORD;
            typedef unsigned int DWORD;

            #define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
            #define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
            #define QSS_SIO_KEEPALIVE_VALS_COUNT 3
            #define MAX_THREADS 100
            #define MAX_THREADS_MIN  10
            #define MIN_WORKER_WAIT_TIMEOUT  20*1000
            #define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
            #define MAX_THREADPOOLS  32

            #define MAX_BUF_SIZE 1024
            /* ulimit -n opened FDs per process.記得修改哦,否則還是select效果,就不是epoll效果了哦,呵呵*/
            #define BLOCKING_SEND_TIMEOUT 20

            typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
            typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
            typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

            typedef struct {
              WORD passive;
              WORD port;//uint16_t
              WORD minThreads;
              WORD maxThreads;
              pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
              volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
              int  workerWaitTimeout;//wait timeout
              volatile int workerCounter;
              volatile int currentBusyWorkers;
              volatile int CSocketsCounter;
              CSocketLifecycleCallback cslifecb;
              InternalProtocolHandler protoHandler;
              SOCKET server_s;
              SOCKADDR_IN serv_addr;
              int epollFD;//main epoller.
              int BSendEpollFD;//For blocking send.
            }QSocketServer;

            typedef struct {
              SOCKET client_s;
              SOCKADDR_IN client_addr;
              uint32_t curEvents;

              char buf[MAX_BUF_SIZE];
              DWORD numberOfBytesTransferred;
              char * data;

              int BSendEpollFDRelated;
              pthread_mutex_t writableLock;
              pthread_cond_t  writableMonitor;
            }QSSEPollEvent;//for per connection

            int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
            int startSocketServer(QSocketServer *qss);
            int shutdownSocketServer(QSocketServer *qss);
            #endif

            qsocketserver_model.c  //下面的代碼離生產環境還差內存池logger哦!
             #include "socketserver.h"
            #include <dirent.h>
            #include <regex.h>
            #define DIGIT_PATTERN_STRING "^[0-9]+$"
            void *  epollWorkerRoutine(void *);
            void *  blockingSendEpollerRoutine(void *);
            int isDigitStr(const char *str){
                int ret=-1;
                regex_t regex;
                regmatch_t matchs[1];
                if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*這里不要傳0哦,否則nomatch*/)){
                    ret=!regexec(&regex,str, 1,matchs,0);
                    regfree(&regex);
                }
                return ret;
            }

            static int setNonBlocking(int sock)
            {
                int opts;
                opts=fcntl(sock,F_GETFL);
                if(opts==-1)
                {
                    perror("fcntl(sock,GETFL) failed!\n");
                    return opts;
                }
                opts = opts|O_NONBLOCK;
                opts=fcntl(sock,F_SETFL,opts);
                if(opts==-1)
                {
                    perror("fcntl(sock,SETFL,opts) failed!\n");
                    return opts;
                }
                return 1;
            }

            static void adjustQSSWorkerLimits(QSocketServer *qss){
               //to adjust availabe size.
            }
            typedef struct{
             QSocketServer * qss;
             pthread_t th;
            }QSSWORKER_PARAM;

            static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
             WORD res=0;
             if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
             {
              QSSWORKER_PARAM * pParam=NULL;
              int i=0;
              pthread_spin_lock(&qss->g_spinlock);
              if(qss->workerCounter+addCounter<=qss->maxThreads)
               for(;i<addCounter;i++)
               {
                pParam=malloc(sizeof(QSSWORKER_PARAM));

                if(pParam){
                 pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
                 pParam->qss=qss;
                 qss->workerCounter++,res++;
                }
               }
              pthread_spin_unlock(&qss->g_spinlock);
             }
             return res;
            }

            static void SOlogger(const char * msg,SOCKET s){
             perror(msg);
                if(s>0)
                close(s);
            }

            static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
                QSSEPollEvent *qssEPEvent=event->data.ptr;
                int ret;
                printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
                if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
                    //sleep(5);
                    ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
                }
                 printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
             return ret;
            }

            int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
            {

             QSocketServer * qss=malloc(sizeof(QSocketServer));
             qss->passive=passive;
             qss->port=port;
             qss->minThreads=minThreads;
             qss->maxThreads=maxThreads;
             qss->workerWaitTimeout=workerWaitTimeout;
             qss->lifecycleStatus=0;
             pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
             qss->workerCounter=0;
             qss->currentBusyWorkers=0;
             qss->CSocketsCounter=0;
             qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
             if(!qss->protoHandler)
              qss->protoHandler=_InternalProtocolHandler;
             adjustQSSWorkerLimits(qss);
             *qss_ptr=qss;
             return 1;
            }

            int startSocketServer(QSocketServer *qss)
            {
                if(qss==NULL)
                    return 0;
                else{
                    pthread_spin_lock(&qss->g_spinlock);
                    if(qss->lifecycleStatus==0){
                        qss->lifecycleStatus=1;
                        pthread_spin_unlock(&qss->g_spinlock);
                    }else{
                        pthread_spin_unlock(&qss->g_spinlock);
                        return 0;
                    }
                }
                //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

             qss->serv_addr.sin_family=AF_INET;
             qss->serv_addr.sin_port=htons(qss->port);
             inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
             //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

             qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
             if(setNonBlocking(qss->server_s)==-1)
             {
                SOlogger("setNonBlocking server_s failed.\n",0);
                 return 0;
             }

             if(qss->server_s==INVALID_SOCKET)
             {
              SOlogger("socket failed.\n",0);
              return 0;
             }

             if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
             {
              SOlogger("bind failed.\n",qss->server_s);
              return 0;
             }

             if(listen(qss->server_s,SOMAXCONN/*這個宏windows也有,這里是128,當然你可以設的小些,它影響開銷的*/)==SOCKET_ERROR)
            {
              SOlogger("listen failed.\n",qss->server_s);
              return 0;
            }
                qss->epollFD=epoll_create1(0);/*這里不是epoll_create(size)哦,你可能不知道如何設置size,所以忽略它吧*/
                if(qss->epollFD==-1){
                    SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
                    return 0;
                }
                qss->BSendEpollFD=epoll_create1(0);//for blocking send.
                if(qss->BSendEpollFD==-1){
                    SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
                    return 0;
                }

                {//ADD ACCEPT EVENT
                    struct epoll_event _epEvent;
                    QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
                    qssEPEvent->client_s=qss->server_s;
                    _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
                    _epEvent.data.ptr=qssEPEvent;
                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
                        SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
                        free(qssEPEvent);
                        return 0;
                    }
                }
                {//starup blocking send epoller.
                    QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
                    pParam->qss=qss;
                    pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
                }

             //initialize worker for epoll events.
             addQSSWorker(qss,qss->minThreads);
             qss->lifecycleStatus=2;
             return 1;
            }

            int shutdownSocketServer(QSocketServer *qss){
                //change qss->lifecycleStatus
                if(qss==NULL)
                    return 0;
                else{
                    pthread_spin_lock(&qss->g_spinlock);
                    if(qss->lifecycleStatus==2){
                        qss->lifecycleStatus=3;
                        pthread_spin_unlock(&qss->g_spinlock);
                    }else{
                        pthread_spin_unlock(&qss->g_spinlock);
                        return 0;
                    }
                }
                /*shutdown server-listening socket,這里優雅的做法是shutdown--notify-->epoll-->close.記得shutdown會發送EOF的哦*/
                shutdown(qss->server_s,SHUT_RDWR);

                // /proc/getpid/fd  shutdown all socket cs != serv_s
                {
                    char dirBuf[64];
                    struct dirent * de;
                    DIR *pd=NULL;
                    int sockFD;
                    sprintf(dirBuf,"/proc/%d/fd/",getpid());
                    pd=opendir(dirBuf);
                    if(pd!=NULL){
                        while((de=readdir(pd))!=NULL){
                            if(isDigitStr(de->d_name)){
                                sockFD=atoi(de->d_name);
                                if(isfdtype(sockFD,S_IFSOCK))
                                shutdown(sockFD,SHUT_RDWR);
                            }
                        }
                        closedir(pd);
                    }
                    /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
                }
             return 1;
            }

            static int onAcceptRoutine(QSocketServer * qss)
            {
                SOCKADDR_IN client_addr;
                unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
                SOCKET cs;
                struct epoll_event _epEvent;
                QSSEPollEvent *qssEPEvent=NULL;
                cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
                if(cs==INVALID_SOCKET)
                {
                    printf("onAccept failed:%d,%s\n",errno,strerror(errno));
                    epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
                    return 0;
                }
                if(setNonBlocking(cs)==-1)
                {
                    printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
                    return 0;
                }

                {// set keepalive option
                    int keepAlive = 1;
                    int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
                    int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
                    int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
                    if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
                       {
                        printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
                        return 0;
                       }
                }
                qssEPEvent=malloc(sizeof(QSSEPollEvent));
                qssEPEvent->client_s=cs;
                {
                    _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
                    qssEPEvent->BSendEpollFDRelated=0;
                    _epEvent.data.ptr=qssEPEvent;/*這里又和教科的不一樣哦,真正的user data用ptr,而不是單一的fd*/
                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
                        printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
                        free(qssEPEvent);
                        return 0;
                    }else{
                        pthread_spin_lock(&qss->g_spinlock);
                        qss->CSocketsCounter++;
                        pthread_spin_unlock(&qss->g_spinlock);
                        if(qss->cslifecb)
                            qss->cslifecb(cs,0);
                    }
                }
                printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
                return 1;
            }

            typedef struct{
                QSocketServer * qss;
                QSSEPollEvent * event;
            }InternalSenderBase_t;

            static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
                InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
                char * _sbuf=_buf;
                int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

                QSSEPollEvent *qssEPEvent=NULL;
                struct epoll_event _epEvent;

                struct timespec sendTimeo;

                while(1){
                    *errno_ptr=0;
                    while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
                        sum+=ret,_sbuf+=ret;
                    if(sum==nbs||ret==0)
                        break;
                    else if(ret==-1){
                        if(errno==EAGAIN&&sum<nbs){
                            qssEPEvent=sb->event;
                            _epEvent.data.ptr=qssEPEvent;
                            _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                            if(qssEPEvent->BSendEpollFDRelated==0){
                                pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                                pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                                qssEPEvent->BSendEpollFDRelated=1;
                                curEpoll_ctl_opt=EPOLL_CTL_ADD;
                            }else{
                                curEpoll_ctl_opt=EPOLL_CTL_MOD;
                            }

                            {//wait writable.
                                int flag=0;
                                pthread_mutex_lock(&qssEPEvent->writableLock);
                                if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                                    sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                                    int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                                    if(err)
                                        flag=-1;
                                }else
                                    flag=-1;
                                pthread_mutex_unlock(&qssEPEvent->writableLock);
                                if(flag==-1)
                                    break;
                            }

                        }else{
                            if(errno==EAGAIN&&sum==nbs)
                                ret=nbs;//it is ok;
                            break;
                        }
                    }
                }//end while.
                return ret;
            }
            void *  blockingSendEpollerRoutine(void *_param){
                QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
                QSocketServer * qss=pParam->qss;
                //pthread_t * curThread=&pParam->th;
                struct epoll_event epEvents[qss->maxThreads];
                QSSEPollEvent *qssEPEvent=NULL;
                int pollRes,*errno_ptr=&errno;

                pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

                free(pParam);
                while(1){

                    pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
                    if(pollRes>=1){
                        int i=0;
                        for(;i<pollRes;i++)
                        if(epEvents[i].events&EPOLLOUT){//這個epollfd只應該做以下的事情,少做為快!
                            qssEPEvent=epEvents[i].data.ptr;
                            pthread_mutex_lock(&qssEPEvent->writableLock);
                            pthread_cond_signal(&qssEPEvent->writableMonitor);
                            pthread_mutex_unlock(&qssEPEvent->writableLock);
                        }

                    }else if(pollRes==-1){//errno 
                        printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
                        break;
                    }

                }

                return NULL;
            }

            void *  epollWorkerRoutine(void * _param){
                QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
                QSocketServer * qss=pParam->qss;
                pthread_t * curThread=&pParam->th;
                struct epoll_event _epEvent;
                QSSEPollEvent *qssEPEvent=NULL;
                InternalSenderBase_t _senderBase;
                int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
                _senderBase.qss=qss;
                pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

                free(pParam);
                while(!exitCode){

                    *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
                    pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
                    if(pollRes==1){
                        qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

                        if(qssEPEvent->client_s==qss->server_s)
                        {//Accepted Socket.
                           onAcceptRoutine(qss);
                           continue;
                        }else{
                            if(qss->protoHandler){
                                _senderBase.event=_epEvent.data.ptr;
                                pthread_spin_lock(&qss->g_spinlock);
                                qss->currentBusyWorkers++;
                                pthread_spin_unlock(&qss->g_spinlock);

                                addQSSWorker(qss,1);
                                handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                                pthread_spin_lock(&qss->g_spinlock);
                                qss->currentBusyWorkers--;
                                pthread_spin_unlock(&qss->g_spinlock);

                                if(handleCode>0){
                                    _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                                        SOErrOccurred=2;
                                }else{
                                    SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                                }
                            }
                        }

                    }else if(pollRes==0){//timeout
                        printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
                        if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
                        {
                            pthread_spin_lock(&qss->g_spinlock);
                            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                                qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                                exitCode=2;
                            }
                            pthread_spin_unlock(&qss->g_spinlock);
                        }else if(qss->lifecycleStatus>=4)
                                exitCode=4;

                    }else if(pollRes==-1){//errno
                        printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
                        exitCode=1;
                    }

                    if(SOErrOccurred){
                        if(qss->cslifecb)
                            qss->cslifecb(qssEPEvent->client_s,-1);
                        /*if(qssEPEvent)*/{
                            epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                            epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                            close(qssEPEvent->client_s);
                            if(qssEPEvent->BSendEpollFDRelated){
                                pthread_cond_destroy(&qssEPEvent->writableMonitor);
                                pthread_mutex_destroy(&qssEPEvent->writableLock);
                            }
                            free(qssEPEvent);
                          }
                        pthread_spin_lock(&qss->g_spinlock);
                        if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
                         //for qss workerSize,
                         qss->lifecycleStatus=4;
                         exitCode=3;
                        }
                        pthread_spin_unlock(&qss->g_spinlock);
                    }//SOErrOccurred handle;

                }//end main while.

                if(exitCode!=2){
                    int clearup=0;
                    pthread_spin_lock(&qss->g_spinlock);
                    if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
                      clearup=1;
                    }
                    pthread_spin_unlock(&qss->g_spinlock);
                    if(clearup){
                     close(qss->epollFD);
                     close(qss->BSendEpollFD);
                     pthread_spin_destroy(&qss->g_spinlock);
                     free(qss);
                    }
                }//exitCode handle;
             return NULL;
            }


            from:

            http://www.shnenglu.com/adapterofcoms/archive/2010/08/03/122063.html

            posted on 2010-08-25 20:41 chatler 閱讀(2538) 評論(0)  編輯 收藏 引用 所屬分類: NetworkSocketLinux_Coding
            <2009年5月>
            262728293012
            3456789
            10111213141516
            17181920212223
            24252627282930
            31123456

            常用鏈接

            留言簿(10)

            隨筆分類(307)

            隨筆檔案(297)

            algorithm

            Books_Free_Online

            C++

            database

            Linux

            Linux shell

            linux socket

            misce

            • cloudward
            • 感覺這個博客還是不錯,雖然做的東西和我不大相關,覺得看看還是有好處的

            network

            OSS

            • Google Android
            • Android is a software stack for mobile devices that includes an operating system, middleware and key applications. This early look at the Android SDK provides the tools and APIs necessary to begin developing applications on the Android platform using the Java programming language.
            • os161 file list

            overall

            搜索

            •  

            最新評論

            閱讀排行榜

            評論排行榜

            久久妇女高潮几次MBA| 999久久久无码国产精品| 国产免费久久精品99久久| 777久久精品一区二区三区无码| 国产精品久久久久9999| 亚洲va中文字幕无码久久| 久久久久久免费一区二区三区| 国内精品久久久久国产盗摄| 亚洲伊人久久成综合人影院 | 老司机午夜网站国内精品久久久久久久久 | 久久人妻少妇嫩草AV无码专区| 7777久久亚洲中文字幕| 精品久久久久久国产91| 久久久久久精品无码人妻| 久久久噜噜噜久久中文字幕色伊伊| 亚洲精品无码久久毛片| 久久婷婷五月综合成人D啪| 久久99热这里只有精品国产| 久久久噜噜噜久久中文福利| 狠狠色丁香婷婷综合久久来| 国产精品九九久久免费视频 | 国产99久久久国产精免费| 久久精品无码一区二区app| 精品久久久久久亚洲精品 | 97久久精品无码一区二区| 91精品婷婷国产综合久久| 久久伊人中文无码| 日韩人妻无码一区二区三区久久| 欧美激情精品久久久久久久| 国产aⅴ激情无码久久| 亚洲国产精品无码久久青草| 少妇人妻88久久中文字幕| 女人香蕉久久**毛片精品| 久久精品免费全国观看国产| 欧美久久综合九色综合| 久久亚洲高清综合| 久久精品国产亚洲av麻豆小说| 日日狠狠久久偷偷色综合96蜜桃| 久久久免费精品re6| 久久人人爽人人爽人人av东京热| 国内精品久久久久|