• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

               C++ 技術中心

               :: 首頁 :: 聯系 ::  :: 管理
              160 Posts :: 0 Stories :: 87 Comments :: 0 Trackbacks

            公告

            鄭重聲明:本BLOG所發表的原創文章,作者保留一切權利。必須經過作者本人同意后方可轉載,并注名作者(天空)和出處(CppBlog.com)。作者Email:coder@luckcoder.com

            留言簿(27)

            搜索

            •  

            最新隨筆

            最新評論

            評論排行榜

            epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
               LT/ET:ET也會多次發送event,當然頻率遠低于LT,但是epoll one shot才是真正的對"one connection VS one thread in worker thread pool,不依賴于任何connection-data-queue"的基礎支持 .我看到大部分對epoll_wait的處理模式如下,很教科化,因為man-pages就是這樣舉例子的。
            man-pages epoll_wait handle:
            #define MAX_EVENTS 10
            struct epoll_event events[MAX_EVENTS];
            for (;;) 
            {
               nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
               for(1~nfds)
                   handle events[i];
               ......
            }
            epoll_ctl_add的當然是EPOLLIN|EPOLLET,此外我就不知道處理上面代碼的是一條線程還是一堆線程(in threadpool),但愿不是一條線程吧!如果是的話,難不成等處理完這MAX_EVENTS 個再去處理接下來的MAX_EVENTS 個?慢否? 但是如果是一堆線程的話,你是否考慮過如何處理request-data in one connection在邏輯上的完整性,也就是一個request-data-packet可能會被分割成若干次發送,在上面的處理模式中你真的要好好設計一下了。
            而我的epoll_wait處理模式如下:
            struct epoll_event activeEvent;
            for(;;)
            {
               epoll_wait(epollfd, &activeEvent1/*很驚訝嗎,但絕不是一條線程在運行這段代碼,而是一堆*/, timeout);
               if handle activeEvent success
                  epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
              ......
            }
            處理上面代碼的當然是一堆線程in threadpool,而且epoll_ctl_add的是EPOLLIN|EPOLLET|EPOLLONESHOT
            因為我的設計理念是嚴格遵守one connection VS one thread in worker thread pool。
            所以我下面的server框架的基本模型是:
            One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
            在epollWorkerRoutine中有以下的職責:
            1.handle request,當忙時增加epollWorkerThread數量但不超過maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
            2.timeout時檢查是否空閑和當前epollWorkerThread數量,當空閑時保持或減少至minThreads數量.
            3.對所有Accepted-socket管理生命周期,這里利用系統的keepalive probes,若想實現業務層"心跳探測"只需將QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系統默認的2小時.這里并不維護所有連接列表,當然你可以在/proc/getpid/fd下找到所有的socket fd.
            4.linux上的non-blocking socket的操作仍然依賴recv,send,不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我這里把send動作實現成了blocking的(internalBlockingSender),同樣的道理,non-blocking send依然會形成響應數據在邏輯上的碎片錯序,特別是你如果采用上面那個教科化的處理模式的化,并且還是多線程的話,那么這簡直就亂透了。當然你可以使用response-data-queue來達到異步準確發送數據的目的。

            下面結合源碼,淺析一下epoll programming:
            socketserver.h
            #ifndef __Q_SOCKET_SERVER__
            #define __Q_SOCKET_SERVER__
            #include <errno.h>
            #include <sys/socket.h>
            #include <netinet/in.h>
            #include <netinet/tcp.h>
            #include <arpa/inet.h>
            #include <sys/types.h>
            #include <string.h>
            #include <sys/epoll.h>
            #include <pthread.h>
            #include <unistd.h>
            #include <fcntl.h>
            #include <stdio.h>
            #include <stdlib.h>
            #include <time.h>
            #define SOCKET_ERROR -1
            #define INVALID_SOCKET -1
            typedef int SOCKET;
            typedef struct sockaddr_in SOCKADDR_IN;
            typedef unsigned short WORD;
            typedef unsigned int DWORD;

            #define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
            #define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
            #define QSS_SIO_KEEPALIVE_VALS_COUNT 3
            #define MAX_THREADS 100
            #define MAX_THREADS_MIN  10
            #define MIN_WORKER_WAIT_TIMEOUT  20*1000
            #define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
            #define MAX_THREADPOOLS  32

            #define MAX_BUF_SIZE 1024
            /* ulimit -n opened FDs per process.記得修改哦,否則還是select效果,就不是epoll效果了哦,呵呵*/
            #define BLOCKING_SEND_TIMEOUT 20

            typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
            typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
            typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

            typedef struct {
              WORD passive;
              WORD port;//uint16_t
              WORD minThreads;
              WORD maxThreads;
              pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
              volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
              int  workerWaitTimeout;//wait timeout
              volatile int workerCounter;
              volatile int currentBusyWorkers;
              volatile int CSocketsCounter;
              CSocketLifecycleCallback cslifecb;
              InternalProtocolHandler protoHandler;
              SOCKET server_s;
              SOCKADDR_IN serv_addr;
              int epollFD;//main epoller.
              int BSendEpollFD;//For blocking send.
            }QSocketServer;

            typedef struct {
              SOCKET client_s;
              SOCKADDR_IN client_addr;
              uint32_t curEvents;

              char buf[MAX_BUF_SIZE];
              DWORD numberOfBytesTransferred;
              char * data;

              int BSendEpollFDRelated;
              pthread_mutex_t writableLock;
              pthread_cond_t  writableMonitor;
            }QSSEPollEvent;//for per connection

            int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
            int startSocketServer(QSocketServer *qss);
            int shutdownSocketServer(QSocketServer *qss);
            #endif

            qsocketserver_model.c  //下面的代碼離生產環境還差內存池logger哦!
             #include "socketserver.h"
            #include <dirent.h>
            #include <regex.h>
            #define DIGIT_PATTERN_STRING "^[0-9]+$"
            void *  epollWorkerRoutine(void *);
            void *  blockingSendEpollerRoutine(void *);
            int isDigitStr(const char *str){
                int ret=-1;
                regex_t regex;
                regmatch_t matchs[1];
                if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*這里不要傳0哦,否則nomatch*/)){
                    ret=!regexec(&regex,str, 1,matchs,0);
                    regfree(&regex);
                }
                return ret;
            }

            static int setNonBlocking(int sock)
            {
                int opts;
                opts=fcntl(sock,F_GETFL);
                if(opts==-1)
                {
                    perror("fcntl(sock,GETFL) failed!\n");
                    return opts;
                }
                opts = opts|O_NONBLOCK;
                opts=fcntl(sock,F_SETFL,opts);
                if(opts==-1)
                {
                    perror("fcntl(sock,SETFL,opts) failed!\n");
                    return opts;
                }
                return 1;
            }

            static void adjustQSSWorkerLimits(QSocketServer *qss){
               //to adjust availabe size.
            }
            typedef struct{
             QSocketServer * qss;
             pthread_t th;
            }QSSWORKER_PARAM;

            static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
             WORD res=0;
             if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
             {
              QSSWORKER_PARAM * pParam=NULL;
              int i=0;
              pthread_spin_lock(&qss->g_spinlock);
              if(qss->workerCounter+addCounter<=qss->maxThreads)
               for(;i<addCounter;i++)
               {
                pParam=malloc(sizeof(QSSWORKER_PARAM));

                if(pParam){
                 pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
                 pParam->qss=qss;
                 qss->workerCounter++,res++;
                }
               }
              pthread_spin_unlock(&qss->g_spinlock);
             }
             return res;
            }

            static void SOlogger(const char * msg,SOCKET s){
             perror(msg);
                if(s>0)
                close(s);
            }

            static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
                QSSEPollEvent *qssEPEvent=event->data.ptr;
                int ret;
                printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
                if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
                    //sleep(5);
                    ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
                }
                 printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
             return ret;
            }

            int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
            {

             QSocketServer * qss=malloc(sizeof(QSocketServer));
             qss->passive=passive;
             qss->port=port;
             qss->minThreads=minThreads;
             qss->maxThreads=maxThreads;
             qss->workerWaitTimeout=workerWaitTimeout;
             qss->lifecycleStatus=0;
             pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
             qss->workerCounter=0;
             qss->currentBusyWorkers=0;
             qss->CSocketsCounter=0;
             qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
             if(!qss->protoHandler)
              qss->protoHandler=_InternalProtocolHandler;
             adjustQSSWorkerLimits(qss);
             *qss_ptr=qss;
             return 1;
            }

            int startSocketServer(QSocketServer *qss)
            {
                if(qss==NULL)
                    return 0;
                else{
                    pthread_spin_lock(&qss->g_spinlock);
                    if(qss->lifecycleStatus==0){
                        qss->lifecycleStatus=1;
                        pthread_spin_unlock(&qss->g_spinlock);
                    }else{
                        pthread_spin_unlock(&qss->g_spinlock);
                        return 0;
                    }
                }
                //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

             qss->serv_addr.sin_family=AF_INET;
             qss->serv_addr.sin_port=htons(qss->port);
             inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
             //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

             qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
             if(setNonBlocking(qss->server_s)==-1)
             {
                SOlogger("setNonBlocking server_s failed.\n",0);
                 return 0;
             }

             if(qss->server_s==INVALID_SOCKET)
             {
              SOlogger("socket failed.\n",0);
              return 0;
             }

             if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
             {
              SOlogger("bind failed.\n",qss->server_s);
              return 0;
             }

             if(listen(qss->server_s,SOMAXCONN/*這個宏windows也有,這里是128,當然你可以設的小些,它影響開銷的*/)==SOCKET_ERROR)
            {
              SOlogger("listen failed.\n",qss->server_s);
              return 0;
            }
                qss->epollFD=epoll_create1(0);/*這里不是epoll_create(size)哦,你可能不知道如何設置size,所以忽略它吧*/
                if(qss->epollFD==-1){
                    SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
                    return 0;
                }
                qss->BSendEpollFD=epoll_create1(0);//for blocking send.
                if(qss->BSendEpollFD==-1){
                    SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
                    return 0;
                }

                {//ADD ACCEPT EVENT
                    struct epoll_event _epEvent;
                    QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
                    qssEPEvent->client_s=qss->server_s;
                    _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
                    _epEvent.data.ptr=qssEPEvent;
                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
                        SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
                        free(qssEPEvent);
                        return 0;
                    }
                }
                {//starup blocking send epoller.
                    QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
                    pParam->qss=qss;
                    pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
                }

             //initialize worker for epoll events.
             addQSSWorker(qss,qss->minThreads);
             qss->lifecycleStatus=2;
             return 1;
            }

            int shutdownSocketServer(QSocketServer *qss){
                //change qss->lifecycleStatus
                if(qss==NULL)
                    return 0;
                else{
                    pthread_spin_lock(&qss->g_spinlock);
                    if(qss->lifecycleStatus==2){
                        qss->lifecycleStatus=3;
                        pthread_spin_unlock(&qss->g_spinlock);
                    }else{
                        pthread_spin_unlock(&qss->g_spinlock);
                        return 0;
                    }
                }
                /*shutdown server-listening socket,這里優雅的做法是shutdown--notify-->epoll-->close.記得shutdown會發送EOF的哦*/
                shutdown(qss->server_s,SHUT_RDWR);

                // /proc/getpid/fd  shutdown all socket cs != serv_s
                {
                    char dirBuf[64];
                    struct dirent * de;
                    DIR *pd=NULL;
                    int sockFD;
                    sprintf(dirBuf,"/proc/%d/fd/",getpid());
                    pd=opendir(dirBuf);
                    if(pd!=NULL){
                        while((de=readdir(pd))!=NULL){
                            if(isDigitStr(de->d_name)){
                                sockFD=atoi(de->d_name);
                                if(isfdtype(sockFD,S_IFSOCK))
                                shutdown(sockFD,SHUT_RDWR);
                            }
                        }
                        closedir(pd);
                    }
                    /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
                }
             return 1;
            }

            static int onAcceptRoutine(QSocketServer * qss)
            {
                SOCKADDR_IN client_addr;
                unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
                SOCKET cs;
                struct epoll_event _epEvent;
                QSSEPollEvent *qssEPEvent=NULL;
                cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
                if(cs==INVALID_SOCKET)
                {
                    printf("onAccept failed:%d,%s\n",errno,strerror(errno));
                    epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
                    return 0;
                }
                if(setNonBlocking(cs)==-1)
                {
                    printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
                    return 0;
                }

                {// set keepalive option
                    int keepAlive = 1;
                    int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
                    int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
                    int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
                    if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
                       {
                        printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
                        return 0;
                       }
                }
                qssEPEvent=malloc(sizeof(QSSEPollEvent));
                qssEPEvent->client_s=cs;
                {
                    _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
                    qssEPEvent->BSendEpollFDRelated=0;
                    _epEvent.data.ptr=qssEPEvent;/*這里又和教科的不一樣哦,真正的user data用ptr,而不是單一的fd*/
                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
                        printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
                        free(qssEPEvent);
                        return 0;
                    }else{
                        pthread_spin_lock(&qss->g_spinlock);
                        qss->CSocketsCounter++;
                        pthread_spin_unlock(&qss->g_spinlock);
                        if(qss->cslifecb)
                            qss->cslifecb(cs,0);
                    }
                }
                printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
                return 1;
            }

            typedef struct{
                QSocketServer * qss;
                QSSEPollEvent * event;
            }InternalSenderBase_t;

            static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
                InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
                char * _sbuf=_buf;
                int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

                QSSEPollEvent *qssEPEvent=NULL;
                struct epoll_event _epEvent;

                struct timespec sendTimeo;

                while(1){
                    *errno_ptr=0;
                    while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
                        sum+=ret,_sbuf+=ret;
                    if(sum==nbs||ret==0)
                        break;
                    else if(ret==-1){
                        if(errno==EAGAIN&&sum<nbs){
                            qssEPEvent=sb->event;
                            _epEvent.data.ptr=qssEPEvent;
                            _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                            if(qssEPEvent->BSendEpollFDRelated==0){
                                pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                                pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                                qssEPEvent->BSendEpollFDRelated=1;
                                curEpoll_ctl_opt=EPOLL_CTL_ADD;
                            }else{
                                curEpoll_ctl_opt=EPOLL_CTL_MOD;
                            }

                            {//wait writable.
                                int flag=0;
                                pthread_mutex_lock(&qssEPEvent->writableLock);
                                if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                                    sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                                    int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                                    if(err)
                                        flag=-1;
                                }else
                                    flag=-1;
                                pthread_mutex_unlock(&qssEPEvent->writableLock);
                                if(flag==-1)
                                    break;
                            }

                        }else{
                            if(errno==EAGAIN&&sum==nbs)
                                ret=nbs;//it is ok;
                            break;
                        }
                    }
                }//end while.
                return ret;
            }
            void *  blockingSendEpollerRoutine(void *_param){
                QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
                QSocketServer * qss=pParam->qss;
                //pthread_t * curThread=&pParam->th;
                struct epoll_event epEvents[qss->maxThreads];
                QSSEPollEvent *qssEPEvent=NULL;
                int pollRes,*errno_ptr=&errno;

                pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

                free(pParam);
                while(1){

                    pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
                    if(pollRes>=1){
                        int i=0;
                        for(;i<pollRes;i++)
                        if(epEvents[i].events&EPOLLOUT){//這個epollfd只應該做以下的事情,少做為快!
                            qssEPEvent=epEvents[i].data.ptr;
                            pthread_mutex_lock(&qssEPEvent->writableLock);
                            pthread_cond_signal(&qssEPEvent->writableMonitor);
                            pthread_mutex_unlock(&qssEPEvent->writableLock);
                        }

                    }else if(pollRes==-1){//errno 
                        printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
                        break;
                    }

                }

                return NULL;
            }

            void *  epollWorkerRoutine(void * _param){
                QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
                QSocketServer * qss=pParam->qss;
                pthread_t * curThread=&pParam->th;
                struct epoll_event _epEvent;
                QSSEPollEvent *qssEPEvent=NULL;
                InternalSenderBase_t _senderBase;
                int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
                _senderBase.qss=qss;
                pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

                free(pParam);
                while(!exitCode){

                    *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
                    pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
                    if(pollRes==1){
                        qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

                        if(qssEPEvent->client_s==qss->server_s)
                        {//Accepted Socket.
                           onAcceptRoutine(qss);
                           continue;
                        }else{
                            if(qss->protoHandler){
                                _senderBase.event=_epEvent.data.ptr;
                                pthread_spin_lock(&qss->g_spinlock);
                                qss->currentBusyWorkers++;
                                pthread_spin_unlock(&qss->g_spinlock);

                                addQSSWorker(qss,1);
                                handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                                pthread_spin_lock(&qss->g_spinlock);
                                qss->currentBusyWorkers--;
                                pthread_spin_unlock(&qss->g_spinlock);

                                if(handleCode>0){
                                    _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                                        SOErrOccurred=2;
                                }else{
                                    SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                                }
                            }
                        }

                    }else if(pollRes==0){//timeout
                        printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
                        if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
                        {
                            pthread_spin_lock(&qss->g_spinlock);
                            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                                qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                                exitCode=2;
                            }
                            pthread_spin_unlock(&qss->g_spinlock);
                        }else if(qss->lifecycleStatus>=4)
                                exitCode=4;

                    }else if(pollRes==-1){//errno
                        printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
                        exitCode=1;
                    }

                    if(SOErrOccurred){
                        if(qss->cslifecb)
                            qss->cslifecb(qssEPEvent->client_s,-1);
                        /*if(qssEPEvent)*/{
                            epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                            epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                            close(qssEPEvent->client_s);
                            if(qssEPEvent->BSendEpollFDRelated){
                                pthread_cond_destroy(&qssEPEvent->writableMonitor);
                                pthread_mutex_destroy(&qssEPEvent->writableLock);
                            }
                            free(qssEPEvent);
                          }
                        pthread_spin_lock(&qss->g_spinlock);
                        if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
                         //for qss workerSize,
                         qss->lifecycleStatus=4;
                         exitCode=3;
                        }
                        pthread_spin_unlock(&qss->g_spinlock);
                    }//SOErrOccurred handle;

                }//end main while.

                if(exitCode!=2){
                    int clearup=0;
                    pthread_spin_lock(&qss->g_spinlock);
                    if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
                      clearup=1;
                    }
                    pthread_spin_unlock(&qss->g_spinlock);
                    if(clearup){
                     close(qss->epollFD);
                     close(qss->BSendEpollFD);
                     pthread_spin_destroy(&qss->g_spinlock);
                     free(qss);
                    }
                }//exitCode handle;
             return NULL;
            }

             

            posted on 2013-12-01 21:37 C++技術中心 閱讀(1988) 評論(0)  編輯 收藏 引用 所屬分類: Linux 編程
            久久久久成人精品无码| 伊人久久大香线蕉无码麻豆| 久久天天躁狠狠躁夜夜avapp| 蜜臀久久99精品久久久久久小说| A狠狠久久蜜臀婷色中文网| 欧美久久精品一级c片片| 性做久久久久久免费观看| 久久久久久精品久久久久| 色综合久久综合网观看| 亚洲午夜精品久久久久久app| av无码久久久久久不卡网站| 久久综合九色欧美综合狠狠| 国产精品久久久久久福利69堂| 麻豆久久| 久久精品二区| 9999国产精品欧美久久久久久| 国产成人综合久久精品红| 久久Av无码精品人妻系列| 亚洲人成无码www久久久| 99久久综合狠狠综合久久| 欧美喷潮久久久XXXXx| 久久精品一区二区三区AV| 久久这里有精品视频| 国产香蕉97碰碰久久人人| 久久夜色tv网站| 亚洲国产精品久久| 国产99久久精品一区二区| 色综合久久久久久久久五月| 久久99久久99精品免视看动漫| 欧美午夜精品久久久久久浪潮| 久久精品国产一区二区三区| 国产精品一区二区久久精品无码 | 一本一道久久综合狠狠老| 久久天天躁狠狠躁夜夜av浪潮| 7国产欧美日韩综合天堂中文久久久久 | 久久精品国产亚洲欧美| 久久精品夜夜夜夜夜久久| 日日躁夜夜躁狠狠久久AV| 午夜欧美精品久久久久久久| 亚洲AV无码久久精品色欲| 日韩精品无码久久久久久|