• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            posts - 297,  comments - 15,  trackbacks - 0
            epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
               LT/ET:ET也會多次發送event,當然頻率遠低于LT,但是epoll one shot才是真正的對"one connection VS one thread in worker thread pool,不依賴于任何connection-data-queue"的基礎支持 .我看到大部分對epoll_wait的處理模式如下,很教科化,因為man-pages就是這樣舉例子的。
            man-pages epoll_wait handle:
            #define MAX_EVENTS 10
            struct epoll_event events[MAX_EVENTS];
            for (;;) 
            {
               nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
               for(1~nfds)
                   handle events[i];
               ......
            }
            epoll_ctl_add的當然是EPOLLIN|EPOLLET,此外我就不知道處理上面代碼的是一條線程還是一堆線程(in threadpool),但愿不是一條線程吧!如果是的話,難不成等處理完這MAX_EVENTS 個再去處理接下來的MAX_EVENTS 個?慢否? 但是如果是一堆線程的話,你是否考慮過如何處理request-data in one connection在邏輯上的完整性,也就是一個request-data-packet可能會被分割成若干次發送,在上面的處理模式中你真的要好好設計一下了。
            而我的epoll_wait處理模式如下:
            struct epoll_event activeEvent;
            for(;;)
            {
               epoll_wait(epollfd, &activeEvent, 1/*很驚訝嗎,但絕不是一條線程在運行這段代碼,而是一堆*/, timeout);
               if handle activeEvent success
                  epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
              ......
            }
            處理上面代碼的當然是一堆線程in threadpool,而且epoll_ctl_add的是EPOLLIN|EPOLLET|EPOLLONESHOT
            因為我的設計理念是嚴格遵守one connection VS one thread in worker thread pool。
            所以我下面的server框架的基本模型是:
            One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
            在epollWorkerRoutine中有以下的職責:
            1.handle request,當忙時增加epollWorkerThread數量但不超過maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
            2.timeout時檢查是否空閑和當前epollWorkerThread數量,當空閑時保持或減少至minThreads數量.
            3.對所有Accepted-socket管理生命周期,這里利用系統的keepalive probes,若想實現業務層"心跳探測"只需將QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系統默認的2小時.這里并不維護所有連接列表,當然你可以在/proc/getpid/fd下找到所有的socket fd.
            4.linux上的non-blocking socket的操作仍然依賴recv,send,不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我這里把send動作實現成了blocking的(internalBlockingSender),同樣的道理,non-blocking send依然會形成響應數據在邏輯上的碎片錯序,特別是你如果采用上面那個教科化的處理模式的化,并且還是多線程的話,那么這簡直就亂透了。當然你可以使用response-data-queue來達到異步準確發送數據的目的。

            下面結合源碼,淺析一下epoll programming:
            socketserver.h
            #ifndef __Q_SOCKET_SERVER__
            #define __Q_SOCKET_SERVER__
            #include <errno.h>
            #include <sys/socket.h>
            #include <netinet/in.h>
            #include <netinet/tcp.h>
            #include <arpa/inet.h>
            #include <sys/types.h>
            #include <string.h>
            #include <sys/epoll.h>
            #include <pthread.h>
            #include <unistd.h>
            #include <fcntl.h>
            #include <stdio.h>
            #include <stdlib.h>
            #include <time.h>
            #define SOCKET_ERROR -1
            #define INVALID_SOCKET -1
            typedef int SOCKET;
            typedef struct sockaddr_in SOCKADDR_IN;
            typedef unsigned short WORD;
            typedef unsigned int DWORD;

            #define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
            #define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
            #define QSS_SIO_KEEPALIVE_VALS_COUNT 3
            #define MAX_THREADS 100
            #define MAX_THREADS_MIN  10
            #define MIN_WORKER_WAIT_TIMEOUT  20*1000
            #define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
            #define MAX_THREADPOOLS  32

            #define MAX_BUF_SIZE 1024
            /* ulimit -n opened FDs per process.記得修改哦,否則還是select效果,就不是epoll效果了哦,呵呵*/
            #define BLOCKING_SEND_TIMEOUT 20

            typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
            typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
            typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

            typedef struct {
              WORD passive;
              WORD port;//uint16_t
              WORD minThreads;
              WORD maxThreads;
              pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
              volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
              int  workerWaitTimeout;//wait timeout
              volatile int workerCounter;
              volatile int currentBusyWorkers;
              volatile int CSocketsCounter;
              CSocketLifecycleCallback cslifecb;
              InternalProtocolHandler protoHandler;
              SOCKET server_s;
              SOCKADDR_IN serv_addr;
              int epollFD;//main epoller.
              int BSendEpollFD;//For blocking send.
            }QSocketServer;

            typedef struct {
              SOCKET client_s;
              SOCKADDR_IN client_addr;
              uint32_t curEvents;

              char buf[MAX_BUF_SIZE];
              DWORD numberOfBytesTransferred;
              char * data;

              int BSendEpollFDRelated;
              pthread_mutex_t writableLock;
              pthread_cond_t  writableMonitor;
            }QSSEPollEvent;//for per connection

            int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
            int startSocketServer(QSocketServer *qss);
            int shutdownSocketServer(QSocketServer *qss);
            #endif

            qsocketserver_model.c  //下面的代碼離生產環境還差內存池logger哦!
             #include "socketserver.h"
            #include <dirent.h>
            #include <regex.h>
            #define DIGIT_PATTERN_STRING "^[0-9]+$"
            void *  epollWorkerRoutine(void *);
            void *  blockingSendEpollerRoutine(void *);
            int isDigitStr(const char *str){
                int ret=-1;
                regex_t regex;
                regmatch_t matchs[1];
                if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*這里不要傳0哦,否則nomatch*/)){
                    ret=!regexec(&regex,str, 1,matchs,0);
                    regfree(&regex);
                }
                return ret;
            }

            static int setNonBlocking(int sock)
            {
                int opts;
                opts=fcntl(sock,F_GETFL);
                if(opts==-1)
                {
                    perror("fcntl(sock,GETFL) failed!\n");
                    return opts;
                }
                opts = opts|O_NONBLOCK;
                opts=fcntl(sock,F_SETFL,opts);
                if(opts==-1)
                {
                    perror("fcntl(sock,SETFL,opts) failed!\n");
                    return opts;
                }
                return 1;
            }

            static void adjustQSSWorkerLimits(QSocketServer *qss){
               //to adjust availabe size.
            }
            typedef struct{
             QSocketServer * qss;
             pthread_t th;
            }QSSWORKER_PARAM;

            static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
             WORD res=0;
             if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
             {
              QSSWORKER_PARAM * pParam=NULL;
              int i=0;
              pthread_spin_lock(&qss->g_spinlock);
              if(qss->workerCounter+addCounter<=qss->maxThreads)
               for(;i<addCounter;i++)
               {
                pParam=malloc(sizeof(QSSWORKER_PARAM));

                if(pParam){
                 pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
                 pParam->qss=qss;
                 qss->workerCounter++,res++;
                }
               }
              pthread_spin_unlock(&qss->g_spinlock);
             }
             return res;
            }

            static void SOlogger(const char * msg,SOCKET s){
             perror(msg);
                if(s>0)
                close(s);
            }

            static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
                QSSEPollEvent *qssEPEvent=event->data.ptr;
                int ret;
                printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
                if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
                    //sleep(5);
                    ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
                }
                 printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
             return ret;
            }

            int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
            {

             QSocketServer * qss=malloc(sizeof(QSocketServer));
             qss->passive=passive;
             qss->port=port;
             qss->minThreads=minThreads;
             qss->maxThreads=maxThreads;
             qss->workerWaitTimeout=workerWaitTimeout;
             qss->lifecycleStatus=0;
             pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
             qss->workerCounter=0;
             qss->currentBusyWorkers=0;
             qss->CSocketsCounter=0;
             qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
             if(!qss->protoHandler)
              qss->protoHandler=_InternalProtocolHandler;
             adjustQSSWorkerLimits(qss);
             *qss_ptr=qss;
             return 1;
            }

            int startSocketServer(QSocketServer *qss)
            {
                if(qss==NULL)
                    return 0;
                else{
                    pthread_spin_lock(&qss->g_spinlock);
                    if(qss->lifecycleStatus==0){
                        qss->lifecycleStatus=1;
                        pthread_spin_unlock(&qss->g_spinlock);
                    }else{
                        pthread_spin_unlock(&qss->g_spinlock);
                        return 0;
                    }
                }
                //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

             qss->serv_addr.sin_family=AF_INET;
             qss->serv_addr.sin_port=htons(qss->port);
             inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
             //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

             qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
             if(setNonBlocking(qss->server_s)==-1)
             {
                SOlogger("setNonBlocking server_s failed.\n",0);
                 return 0;
             }

             if(qss->server_s==INVALID_SOCKET)
             {
              SOlogger("socket failed.\n",0);
              return 0;
             }

             if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
             {
              SOlogger("bind failed.\n",qss->server_s);
              return 0;
             }

             if(listen(qss->server_s,SOMAXCONN/*這個宏windows也有,這里是128,當然你可以設的小些,它影響開銷的*/)==SOCKET_ERROR)
            {
              SOlogger("listen failed.\n",qss->server_s);
              return 0;
            }
                qss->epollFD=epoll_create1(0);/*這里不是epoll_create(size)哦,你可能不知道如何設置size,所以忽略它吧*/
                if(qss->epollFD==-1){
                    SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
                    return 0;
                }
                qss->BSendEpollFD=epoll_create1(0);//for blocking send.
                if(qss->BSendEpollFD==-1){
                    SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
                    return 0;
                }

                {//ADD ACCEPT EVENT
                    struct epoll_event _epEvent;
                    QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
                    qssEPEvent->client_s=qss->server_s;
                    _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
                    _epEvent.data.ptr=qssEPEvent;
                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
                        SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
                        free(qssEPEvent);
                        return 0;
                    }
                }
                {//starup blocking send epoller.
                    QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
                    pParam->qss=qss;
                    pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
                }

             //initialize worker for epoll events.
             addQSSWorker(qss,qss->minThreads);
             qss->lifecycleStatus=2;
             return 1;
            }

            int shutdownSocketServer(QSocketServer *qss){
                //change qss->lifecycleStatus
                if(qss==NULL)
                    return 0;
                else{
                    pthread_spin_lock(&qss->g_spinlock);
                    if(qss->lifecycleStatus==2){
                        qss->lifecycleStatus=3;
                        pthread_spin_unlock(&qss->g_spinlock);
                    }else{
                        pthread_spin_unlock(&qss->g_spinlock);
                        return 0;
                    }
                }
                /*shutdown server-listening socket,這里優雅的做法是shutdown--notify-->epoll-->close.記得shutdown會發送EOF的哦*/
                shutdown(qss->server_s,SHUT_RDWR);

                // /proc/getpid/fd  shutdown all socket cs != serv_s
                {
                    char dirBuf[64];
                    struct dirent * de;
                    DIR *pd=NULL;
                    int sockFD;
                    sprintf(dirBuf,"/proc/%d/fd/",getpid());
                    pd=opendir(dirBuf);
                    if(pd!=NULL){
                        while((de=readdir(pd))!=NULL){
                            if(isDigitStr(de->d_name)){
                                sockFD=atoi(de->d_name);
                                if(isfdtype(sockFD,S_IFSOCK))
                                shutdown(sockFD,SHUT_RDWR);
                            }
                        }
                        closedir(pd);
                    }
                    /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
                }
             return 1;
            }

            static int onAcceptRoutine(QSocketServer * qss)
            {
                SOCKADDR_IN client_addr;
                unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
                SOCKET cs;
                struct epoll_event _epEvent;
                QSSEPollEvent *qssEPEvent=NULL;
                cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
                if(cs==INVALID_SOCKET)
                {
                    printf("onAccept failed:%d,%s\n",errno,strerror(errno));
                    epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
                    return 0;
                }
                if(setNonBlocking(cs)==-1)
                {
                    printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
                    return 0;
                }

                {// set keepalive option
                    int keepAlive = 1;
                    int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
                    int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
                    int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
                    if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
                       {
                        printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
                        return 0;
                       }
                }
                qssEPEvent=malloc(sizeof(QSSEPollEvent));
                qssEPEvent->client_s=cs;
                {
                    _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
                    qssEPEvent->BSendEpollFDRelated=0;
                    _epEvent.data.ptr=qssEPEvent;/*這里又和教科的不一樣哦,真正的user data用ptr,而不是單一的fd*/
                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
                        printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
                        free(qssEPEvent);
                        return 0;
                    }else{
                        pthread_spin_lock(&qss->g_spinlock);
                        qss->CSocketsCounter++;
                        pthread_spin_unlock(&qss->g_spinlock);
                        if(qss->cslifecb)
                            qss->cslifecb(cs,0);
                    }
                }
                printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
                return 1;
            }

            typedef struct{
                QSocketServer * qss;
                QSSEPollEvent * event;
            }InternalSenderBase_t;

            static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
                InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
                char * _sbuf=_buf;
                int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

                QSSEPollEvent *qssEPEvent=NULL;
                struct epoll_event _epEvent;

                struct timespec sendTimeo;

                while(1){
                    *errno_ptr=0;
                    while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
                        sum+=ret,_sbuf+=ret;
                    if(sum==nbs||ret==0)
                        break;
                    else if(ret==-1){
                        if(errno==EAGAIN&&sum<nbs){
                            qssEPEvent=sb->event;
                            _epEvent.data.ptr=qssEPEvent;
                            _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                            if(qssEPEvent->BSendEpollFDRelated==0){
                                pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                                pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                                qssEPEvent->BSendEpollFDRelated=1;
                                curEpoll_ctl_opt=EPOLL_CTL_ADD;
                            }else{
                                curEpoll_ctl_opt=EPOLL_CTL_MOD;
                            }

                            {//wait writable.
                                int flag=0;
                                pthread_mutex_lock(&qssEPEvent->writableLock);
                                if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                                    sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                                    int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                                    if(err)
                                        flag=-1;
                                }else
                                    flag=-1;
                                pthread_mutex_unlock(&qssEPEvent->writableLock);
                                if(flag==-1)
                                    break;
                            }

                        }else{
                            if(errno==EAGAIN&&sum==nbs)
                                ret=nbs;//it is ok;
                            break;
                        }
                    }
                }//end while.
                return ret;
            }
            void *  blockingSendEpollerRoutine(void *_param){
                QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
                QSocketServer * qss=pParam->qss;
                //pthread_t * curThread=&pParam->th;
                struct epoll_event epEvents[qss->maxThreads];
                QSSEPollEvent *qssEPEvent=NULL;
                int pollRes,*errno_ptr=&errno;

                pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

                free(pParam);
                while(1){

                    pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
                    if(pollRes>=1){
                        int i=0;
                        for(;i<pollRes;i++)
                        if(epEvents[i].events&EPOLLOUT){//這個epollfd只應該做以下的事情,少做為快!
                            qssEPEvent=epEvents[i].data.ptr;
                            pthread_mutex_lock(&qssEPEvent->writableLock);
                            pthread_cond_signal(&qssEPEvent->writableMonitor);
                            pthread_mutex_unlock(&qssEPEvent->writableLock);
                        }

                    }else if(pollRes==-1){//errno 
                        printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
                        break;
                    }

                }

                return NULL;
            }

            void *  epollWorkerRoutine(void * _param){
                QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
                QSocketServer * qss=pParam->qss;
                pthread_t * curThread=&pParam->th;
                struct epoll_event _epEvent;
                QSSEPollEvent *qssEPEvent=NULL;
                InternalSenderBase_t _senderBase;
                int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
                _senderBase.qss=qss;
                pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

                free(pParam);
                while(!exitCode){

                    *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
                    pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
                    if(pollRes==1){
                        qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

                        if(qssEPEvent->client_s==qss->server_s)
                        {//Accepted Socket.
                           onAcceptRoutine(qss);
                           continue;
                        }else{
                            if(qss->protoHandler){
                                _senderBase.event=_epEvent.data.ptr;
                                pthread_spin_lock(&qss->g_spinlock);
                                qss->currentBusyWorkers++;
                                pthread_spin_unlock(&qss->g_spinlock);

                                addQSSWorker(qss,1);
                                handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                                pthread_spin_lock(&qss->g_spinlock);
                                qss->currentBusyWorkers--;
                                pthread_spin_unlock(&qss->g_spinlock);

                                if(handleCode>0){
                                    _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                                        SOErrOccurred=2;
                                }else{
                                    SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                                }
                            }
                        }

                    }else if(pollRes==0){//timeout
                        printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
                        if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
                        {
                            pthread_spin_lock(&qss->g_spinlock);
                            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                                qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                                exitCode=2;
                            }
                            pthread_spin_unlock(&qss->g_spinlock);
                        }else if(qss->lifecycleStatus>=4)
                                exitCode=4;

                    }else if(pollRes==-1){//errno
                        printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
                        exitCode=1;
                    }

                    if(SOErrOccurred){
                        if(qss->cslifecb)
                            qss->cslifecb(qssEPEvent->client_s,-1);
                        /*if(qssEPEvent)*/{
                            epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                            epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                            close(qssEPEvent->client_s);
                            if(qssEPEvent->BSendEpollFDRelated){
                                pthread_cond_destroy(&qssEPEvent->writableMonitor);
                                pthread_mutex_destroy(&qssEPEvent->writableLock);
                            }
                            free(qssEPEvent);
                          }
                        pthread_spin_lock(&qss->g_spinlock);
                        if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
                         //for qss workerSize,
                         qss->lifecycleStatus=4;
                         exitCode=3;
                        }
                        pthread_spin_unlock(&qss->g_spinlock);
                    }//SOErrOccurred handle;

                }//end main while.

                if(exitCode!=2){
                    int clearup=0;
                    pthread_spin_lock(&qss->g_spinlock);
                    if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
                      clearup=1;
                    }
                    pthread_spin_unlock(&qss->g_spinlock);
                    if(clearup){
                     close(qss->epollFD);
                     close(qss->BSendEpollFD);
                     pthread_spin_destroy(&qss->g_spinlock);
                     free(qss);
                    }
                }//exitCode handle;
             return NULL;
            }


            from:

            http://www.shnenglu.com/adapterofcoms/archive/2010/08/03/122063.html

            posted on 2010-08-25 20:41 chatler 閱讀(2538) 評論(0)  編輯 收藏 引用 所屬分類: NetworkSocket 、Linux_Coding
            <2010年1月>
            272829303112
            3456789
            10111213141516
            17181920212223
            24252627282930
            31123456

            常用鏈接

            留言簿(10)

            隨筆分類(307)

            隨筆檔案(297)

            algorithm

            Books_Free_Online

            C++

            database

            Linux

            Linux shell

            linux socket

            misce

            • cloudward
            • 感覺這個博客還是不錯,雖然做的東西和我不大相關,覺得看看還是有好處的

            network

            OSS

            • Google Android
            • Android is a software stack for mobile devices that includes an operating system, middleware and key applications. This early look at the Android SDK provides the tools and APIs necessary to begin developing applications on the Android platform using the Java programming language.
            • os161 file list

            overall

            搜索

            •  

            最新評論

            閱讀排行榜

            評論排行榜

            色综合久久久久网| 国产精品久久久久久一区二区三区| 欧美午夜A∨大片久久| 久久精品免费一区二区| 国产三级久久久精品麻豆三级| 精品一久久香蕉国产线看播放| 色青青草原桃花久久综合| 久久久久亚洲精品无码蜜桃| 久久国产精品波多野结衣AV| 日本欧美久久久久免费播放网| 国内精品伊人久久久久网站| 亚洲国产精品无码成人片久久| 久久精品无码一区二区三区日韩| 99久久做夜夜爱天天做精品| 国产精品亚洲综合专区片高清久久久| 久久精品国产久精国产果冻传媒| 国产 亚洲 欧美 另类 久久| 久久99精品久久久久婷婷| 一本久久免费视频| 亚洲国产精品热久久| 亚洲va久久久噜噜噜久久天堂| 综合久久久久久中文字幕亚洲国产国产综合一区首| 久久精品国产亚洲AV电影| 久久99这里只有精品国产| 久久久99精品成人片中文字幕| 色综合久久久久| 欧美激情精品久久久久| 久久久国产精品网站| 久久国产精品-久久精品| 蜜臀av性久久久久蜜臀aⅴ | 亚洲AV无码久久| 久久久久免费精品国产| 性做久久久久久久久浪潮| 亚洲国产成人久久综合碰| 久久有码中文字幕| 亚州日韩精品专区久久久| 日韩精品无码久久一区二区三| 久久久黄片| 午夜精品久久久内射近拍高清| 热综合一本伊人久久精品| 欧美久久久久久|