• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            posts - 297,  comments - 15,  trackbacks - 0
            epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
               LT/ET:ET也會(huì)多次發(fā)送event,當(dāng)然頻率遠(yuǎn)低于LT,但是epoll one shot才是真正的對(duì)"one connection VS one thread in worker thread pool,不依賴(lài)于任何connection-data-queue"的基礎(chǔ)支持 .我看到大部分對(duì)epoll_wait的處理模式如下,很教科化,因?yàn)閙an-pages就是這樣舉例子的。
            man-pages epoll_wait handle:
            #define MAX_EVENTS 10
            struct epoll_event events[MAX_EVENTS];
            for (;;) 
            {
               nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
               for(1~nfds)
                   handle events[i];
               ......
            }
            epoll_ctl_add的當(dāng)然是EPOLLIN|EPOLLET,此外我就不知道處理上面代碼的是一條線程還是一堆線程(in threadpool),但愿不是一條線程吧!如果是的話,難不成等處理完這MAX_EVENTS 個(gè)再去處理接下來(lái)的MAX_EVENTS 個(gè)?慢否? 但是如果是一堆線程的話,你是否考慮過(guò)如何處理request-data in one connection在邏輯上的完整性,也就是一個(gè)request-data-packet可能會(huì)被分割成若干次發(fā)送,在上面的處理模式中你真的要好好設(shè)計(jì)一下了。
            而我的epoll_wait處理模式如下:
            struct epoll_event activeEvent;
            for(;;)
            {
               epoll_wait(epollfd, &activeEvent, 1/*很驚訝嗎,但絕不是一條線程在運(yùn)行這段代碼,而是一堆*/, timeout);
               if handle activeEvent success
                  epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
              ......
            }
            處理上面代碼的當(dāng)然是一堆線程in threadpool,而且epoll_ctl_add的是EPOLLIN|EPOLLET|EPOLLONESHOT
            因?yàn)槲业脑O(shè)計(jì)理念是嚴(yán)格遵守one connection VS one thread in worker thread pool。
            所以我下面的server框架的基本模型是:
            One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
            在epollWorkerRoutine中有以下的職責(zé):
            1.handle request,當(dāng)忙時(shí)增加epollWorkerThread數(shù)量但不超過(guò)maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
            2.timeout時(shí)檢查是否空閑和當(dāng)前epollWorkerThread數(shù)量,當(dāng)空閑時(shí)保持或減少至minThreads數(shù)量.
            3.對(duì)所有Accepted-socket管理生命周期,這里利用系統(tǒng)的keepalive probes,若想實(shí)現(xiàn)業(yè)務(wù)層"心跳探測(cè)"只需將QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系統(tǒng)默認(rèn)的2小時(shí).這里并不維護(hù)所有連接列表,當(dāng)然你可以在/proc/getpid/fd下找到所有的socket fd.
            4.linux上的non-blocking socket的操作仍然依賴(lài)recv,send,不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我這里把send動(dòng)作實(shí)現(xiàn)成了blocking的(internalBlockingSender),同樣的道理,non-blocking send依然會(huì)形成響應(yīng)數(shù)據(jù)在邏輯上的碎片錯(cuò)序,特別是你如果采用上面那個(gè)教科化的處理模式的化,并且還是多線程的話,那么這簡(jiǎn)直就亂透了。當(dāng)然你可以使用response-data-queue來(lái)達(dá)到異步準(zhǔn)確發(fā)送數(shù)據(jù)的目的。

            下面結(jié)合源碼,淺析一下epoll programming:
            socketserver.h
            #ifndef __Q_SOCKET_SERVER__
            #define __Q_SOCKET_SERVER__
            #include <errno.h>
            #include <sys/socket.h>
            #include <netinet/in.h>
            #include <netinet/tcp.h>
            #include <arpa/inet.h>
            #include <sys/types.h>
            #include <string.h>
            #include <sys/epoll.h>
            #include <pthread.h>
            #include <unistd.h>
            #include <fcntl.h>
            #include <stdio.h>
            #include <stdlib.h>
            #include <time.h>
            #define SOCKET_ERROR -1
            #define INVALID_SOCKET -1
            typedef int SOCKET;
            typedef struct sockaddr_in SOCKADDR_IN;
            typedef unsigned short WORD;
            typedef unsigned int DWORD;

            #define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
            #define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
            #define QSS_SIO_KEEPALIVE_VALS_COUNT 3
            #define MAX_THREADS 100
            #define MAX_THREADS_MIN  10
            #define MIN_WORKER_WAIT_TIMEOUT  20*1000
            #define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
            #define MAX_THREADPOOLS  32

            #define MAX_BUF_SIZE 1024
            /* ulimit -n opened FDs per process.記得修改哦,否則還是select效果,就不是epoll效果了哦,呵呵*/
            #define BLOCKING_SEND_TIMEOUT 20

            typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
            typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
            typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

            typedef struct {
              WORD passive;
              WORD port;//uint16_t
              WORD minThreads;
              WORD maxThreads;
              pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
              volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
              int  workerWaitTimeout;//wait timeout
              volatile int workerCounter;
              volatile int currentBusyWorkers;
              volatile int CSocketsCounter;
              CSocketLifecycleCallback cslifecb;
              InternalProtocolHandler protoHandler;
              SOCKET server_s;
              SOCKADDR_IN serv_addr;
              int epollFD;//main epoller.
              int BSendEpollFD;//For blocking send.
            }QSocketServer;

            typedef struct {
              SOCKET client_s;
              SOCKADDR_IN client_addr;
              uint32_t curEvents;

              char buf[MAX_BUF_SIZE];
              DWORD numberOfBytesTransferred;
              char * data;

              int BSendEpollFDRelated;
              pthread_mutex_t writableLock;
              pthread_cond_t  writableMonitor;
            }QSSEPollEvent;//for per connection

            int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
            int startSocketServer(QSocketServer *qss);
            int shutdownSocketServer(QSocketServer *qss);
            #endif

            qsocketserver_model.c  //下面的代碼離生產(chǎn)環(huán)境還差內(nèi)存池logger哦!
             #include "socketserver.h"
            #include <dirent.h>
            #include <regex.h>
            #define DIGIT_PATTERN_STRING "^[0-9]+$"
            void *  epollWorkerRoutine(void *);
            void *  blockingSendEpollerRoutine(void *);
            int isDigitStr(const char *str){
                int ret=-1;
                regex_t regex;
                regmatch_t matchs[1];
                if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*這里不要傳0哦,否則nomatch*/)){
                    ret=!regexec(&regex,str, 1,matchs,0);
                    regfree(&regex);
                }
                return ret;
            }

            static int setNonBlocking(int sock)
            {
                int opts;
                opts=fcntl(sock,F_GETFL);
                if(opts==-1)
                {
                    perror("fcntl(sock,GETFL) failed!\n");
                    return opts;
                }
                opts = opts|O_NONBLOCK;
                opts=fcntl(sock,F_SETFL,opts);
                if(opts==-1)
                {
                    perror("fcntl(sock,SETFL,opts) failed!\n");
                    return opts;
                }
                return 1;
            }

            static void adjustQSSWorkerLimits(QSocketServer *qss){
               //to adjust availabe size.
            }
            typedef struct{
             QSocketServer * qss;
             pthread_t th;
            }QSSWORKER_PARAM;

            static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
             WORD res=0;
             if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
             {
              QSSWORKER_PARAM * pParam=NULL;
              int i=0;
              pthread_spin_lock(&qss->g_spinlock);
              if(qss->workerCounter+addCounter<=qss->maxThreads)
               for(;i<addCounter;i++)
               {
                pParam=malloc(sizeof(QSSWORKER_PARAM));

                if(pParam){
                 pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
                 pParam->qss=qss;
                 qss->workerCounter++,res++;
                }
               }
              pthread_spin_unlock(&qss->g_spinlock);
             }
             return res;
            }

            static void SOlogger(const char * msg,SOCKET s){
             perror(msg);
                if(s>0)
                close(s);
            }

            static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
                QSSEPollEvent *qssEPEvent=event->data.ptr;
                int ret;
                printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
                if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
                    //sleep(5);
                    ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
                }
                 printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
             return ret;
            }

            int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
            {

             QSocketServer * qss=malloc(sizeof(QSocketServer));
             qss->passive=passive;
             qss->port=port;
             qss->minThreads=minThreads;
             qss->maxThreads=maxThreads;
             qss->workerWaitTimeout=workerWaitTimeout;
             qss->lifecycleStatus=0;
             pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
             qss->workerCounter=0;
             qss->currentBusyWorkers=0;
             qss->CSocketsCounter=0;
             qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
             if(!qss->protoHandler)
              qss->protoHandler=_InternalProtocolHandler;
             adjustQSSWorkerLimits(qss);
             *qss_ptr=qss;
             return 1;
            }

            int startSocketServer(QSocketServer *qss)
            {
                if(qss==NULL)
                    return 0;
                else{
                    pthread_spin_lock(&qss->g_spinlock);
                    if(qss->lifecycleStatus==0){
                        qss->lifecycleStatus=1;
                        pthread_spin_unlock(&qss->g_spinlock);
                    }else{
                        pthread_spin_unlock(&qss->g_spinlock);
                        return 0;
                    }
                }
                //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

             qss->serv_addr.sin_family=AF_INET;
             qss->serv_addr.sin_port=htons(qss->port);
             inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
             //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

             qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
             if(setNonBlocking(qss->server_s)==-1)
             {
                SOlogger("setNonBlocking server_s failed.\n",0);
                 return 0;
             }

             if(qss->server_s==INVALID_SOCKET)
             {
              SOlogger("socket failed.\n",0);
              return 0;
             }

             if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
             {
              SOlogger("bind failed.\n",qss->server_s);
              return 0;
             }

             if(listen(qss->server_s,SOMAXCONN/*這個(gè)宏windows也有,這里是128,當(dāng)然你可以設(shè)的小些,它影響開(kāi)銷(xiāo)的*/)==SOCKET_ERROR)
            {
              SOlogger("listen failed.\n",qss->server_s);
              return 0;
            }
                qss->epollFD=epoll_create1(0);/*這里不是epoll_create(size)哦,你可能不知道如何設(shè)置size,所以忽略它吧*/
                if(qss->epollFD==-1){
                    SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
                    return 0;
                }
                qss->BSendEpollFD=epoll_create1(0);//for blocking send.
                if(qss->BSendEpollFD==-1){
                    SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
                    return 0;
                }

                {//ADD ACCEPT EVENT
                    struct epoll_event _epEvent;
                    QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
                    qssEPEvent->client_s=qss->server_s;
                    _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
                    _epEvent.data.ptr=qssEPEvent;
                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
                        SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
                        free(qssEPEvent);
                        return 0;
                    }
                }
                {//starup blocking send epoller.
                    QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
                    pParam->qss=qss;
                    pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
                }

             //initialize worker for epoll events.
             addQSSWorker(qss,qss->minThreads);
             qss->lifecycleStatus=2;
             return 1;
            }

            int shutdownSocketServer(QSocketServer *qss){
                //change qss->lifecycleStatus
                if(qss==NULL)
                    return 0;
                else{
                    pthread_spin_lock(&qss->g_spinlock);
                    if(qss->lifecycleStatus==2){
                        qss->lifecycleStatus=3;
                        pthread_spin_unlock(&qss->g_spinlock);
                    }else{
                        pthread_spin_unlock(&qss->g_spinlock);
                        return 0;
                    }
                }
                /*shutdown server-listening socket,這里優(yōu)雅的做法是shutdown--notify-->epoll-->close.記得shutdown會(huì)發(fā)送EOF的哦*/
                shutdown(qss->server_s,SHUT_RDWR);

                // /proc/getpid/fd  shutdown all socket cs != serv_s
                {
                    char dirBuf[64];
                    struct dirent * de;
                    DIR *pd=NULL;
                    int sockFD;
                    sprintf(dirBuf,"/proc/%d/fd/",getpid());
                    pd=opendir(dirBuf);
                    if(pd!=NULL){
                        while((de=readdir(pd))!=NULL){
                            if(isDigitStr(de->d_name)){
                                sockFD=atoi(de->d_name);
                                if(isfdtype(sockFD,S_IFSOCK))
                                shutdown(sockFD,SHUT_RDWR);
                            }
                        }
                        closedir(pd);
                    }
                    /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
                }
             return 1;
            }

            static int onAcceptRoutine(QSocketServer * qss)
            {
                SOCKADDR_IN client_addr;
                unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
                SOCKET cs;
                struct epoll_event _epEvent;
                QSSEPollEvent *qssEPEvent=NULL;
                cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
                if(cs==INVALID_SOCKET)
                {
                    printf("onAccept failed:%d,%s\n",errno,strerror(errno));
                    epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
                    return 0;
                }
                if(setNonBlocking(cs)==-1)
                {
                    printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
                    return 0;
                }

                {// set keepalive option
                    int keepAlive = 1;
                    int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
                    int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
                    int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
                    if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
                       setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
                       {
                        printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
                        return 0;
                       }
                }
                qssEPEvent=malloc(sizeof(QSSEPollEvent));
                qssEPEvent->client_s=cs;
                {
                    _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
                    qssEPEvent->BSendEpollFDRelated=0;
                    _epEvent.data.ptr=qssEPEvent;/*這里又和教科的不一樣哦,真正的user data用ptr,而不是單一的fd*/
                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
                        printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
                        free(qssEPEvent);
                        return 0;
                    }else{
                        pthread_spin_lock(&qss->g_spinlock);
                        qss->CSocketsCounter++;
                        pthread_spin_unlock(&qss->g_spinlock);
                        if(qss->cslifecb)
                            qss->cslifecb(cs,0);
                    }
                }
                printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
                return 1;
            }

            typedef struct{
                QSocketServer * qss;
                QSSEPollEvent * event;
            }InternalSenderBase_t;

            static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
                InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
                char * _sbuf=_buf;
                int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

                QSSEPollEvent *qssEPEvent=NULL;
                struct epoll_event _epEvent;

                struct timespec sendTimeo;

                while(1){
                    *errno_ptr=0;
                    while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
                        sum+=ret,_sbuf+=ret;
                    if(sum==nbs||ret==0)
                        break;
                    else if(ret==-1){
                        if(errno==EAGAIN&&sum<nbs){
                            qssEPEvent=sb->event;
                            _epEvent.data.ptr=qssEPEvent;
                            _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                            if(qssEPEvent->BSendEpollFDRelated==0){
                                pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                                pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                                qssEPEvent->BSendEpollFDRelated=1;
                                curEpoll_ctl_opt=EPOLL_CTL_ADD;
                            }else{
                                curEpoll_ctl_opt=EPOLL_CTL_MOD;
                            }

                            {//wait writable.
                                int flag=0;
                                pthread_mutex_lock(&qssEPEvent->writableLock);
                                if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                                    sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                                    int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                                    if(err)
                                        flag=-1;
                                }else
                                    flag=-1;
                                pthread_mutex_unlock(&qssEPEvent->writableLock);
                                if(flag==-1)
                                    break;
                            }

                        }else{
                            if(errno==EAGAIN&&sum==nbs)
                                ret=nbs;//it is ok;
                            break;
                        }
                    }
                }//end while.
                return ret;
            }
            void *  blockingSendEpollerRoutine(void *_param){
                QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
                QSocketServer * qss=pParam->qss;
                //pthread_t * curThread=&pParam->th;
                struct epoll_event epEvents[qss->maxThreads];
                QSSEPollEvent *qssEPEvent=NULL;
                int pollRes,*errno_ptr=&errno;

                pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

                free(pParam);
                while(1){

                    pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
                    if(pollRes>=1){
                        int i=0;
                        for(;i<pollRes;i++)
                        if(epEvents[i].events&EPOLLOUT){//這個(gè)epollfd只應(yīng)該做以下的事情,少做為快!
                            qssEPEvent=epEvents[i].data.ptr;
                            pthread_mutex_lock(&qssEPEvent->writableLock);
                            pthread_cond_signal(&qssEPEvent->writableMonitor);
                            pthread_mutex_unlock(&qssEPEvent->writableLock);
                        }

                    }else if(pollRes==-1){//errno 
                        printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
                        break;
                    }

                }

                return NULL;
            }

            void *  epollWorkerRoutine(void * _param){
                QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
                QSocketServer * qss=pParam->qss;
                pthread_t * curThread=&pParam->th;
                struct epoll_event _epEvent;
                QSSEPollEvent *qssEPEvent=NULL;
                InternalSenderBase_t _senderBase;
                int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
                _senderBase.qss=qss;
                pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

                free(pParam);
                while(!exitCode){

                    *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
                    pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
                    if(pollRes==1){
                        qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

                        if(qssEPEvent->client_s==qss->server_s)
                        {//Accepted Socket.
                           onAcceptRoutine(qss);
                           continue;
                        }else{
                            if(qss->protoHandler){
                                _senderBase.event=_epEvent.data.ptr;
                                pthread_spin_lock(&qss->g_spinlock);
                                qss->currentBusyWorkers++;
                                pthread_spin_unlock(&qss->g_spinlock);

                                addQSSWorker(qss,1);
                                handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                                pthread_spin_lock(&qss->g_spinlock);
                                qss->currentBusyWorkers--;
                                pthread_spin_unlock(&qss->g_spinlock);

                                if(handleCode>0){
                                    _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                                    if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                                        SOErrOccurred=2;
                                }else{
                                    SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                                }
                            }
                        }

                    }else if(pollRes==0){//timeout
                        printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
                        if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
                        {
                            pthread_spin_lock(&qss->g_spinlock);
                            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                                qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                                exitCode=2;
                            }
                            pthread_spin_unlock(&qss->g_spinlock);
                        }else if(qss->lifecycleStatus>=4)
                                exitCode=4;

                    }else if(pollRes==-1){//errno
                        printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
                        exitCode=1;
                    }

                    if(SOErrOccurred){
                        if(qss->cslifecb)
                            qss->cslifecb(qssEPEvent->client_s,-1);
                        /*if(qssEPEvent)*/{
                            epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                            epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                            close(qssEPEvent->client_s);
                            if(qssEPEvent->BSendEpollFDRelated){
                                pthread_cond_destroy(&qssEPEvent->writableMonitor);
                                pthread_mutex_destroy(&qssEPEvent->writableLock);
                            }
                            free(qssEPEvent);
                          }
                        pthread_spin_lock(&qss->g_spinlock);
                        if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
                         //for qss workerSize,
                         qss->lifecycleStatus=4;
                         exitCode=3;
                        }
                        pthread_spin_unlock(&qss->g_spinlock);
                    }//SOErrOccurred handle;

                }//end main while.

                if(exitCode!=2){
                    int clearup=0;
                    pthread_spin_lock(&qss->g_spinlock);
                    if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
                      clearup=1;
                    }
                    pthread_spin_unlock(&qss->g_spinlock);
                    if(clearup){
                     close(qss->epollFD);
                     close(qss->BSendEpollFD);
                     pthread_spin_destroy(&qss->g_spinlock);
                     free(qss);
                    }
                }//exitCode handle;
             return NULL;
            }


            from:

            http://www.shnenglu.com/adapterofcoms/archive/2010/08/03/122063.html

            posted on 2010-08-25 20:41 chatler 閱讀(2558) 評(píng)論(0)  編輯 收藏 引用 所屬分類(lèi): NetworkSocketLinux_Coding
            <2025年7月>
            293012345
            6789101112
            13141516171819
            20212223242526
            272829303112
            3456789

            常用鏈接

            留言簿(10)

            隨筆分類(lèi)(307)

            隨筆檔案(297)

            algorithm

            Books_Free_Online

            C++

            database

            Linux

            Linux shell

            linux socket

            misce

            • cloudward
            • 感覺(jué)這個(gè)博客還是不錯(cuò),雖然做的東西和我不大相關(guān),覺(jué)得看看還是有好處的

            network

            OSS

            • Google Android
            • Android is a software stack for mobile devices that includes an operating system, middleware and key applications. This early look at the Android SDK provides the tools and APIs necessary to begin developing applications on the Android platform using the Java programming language.
            • os161 file list

            overall

            搜索

            •  

            最新評(píng)論

            閱讀排行榜

            評(píng)論排行榜

            久久无码人妻一区二区三区午夜| 国产日韩久久免费影院| 狠狠色综合网站久久久久久久高清| 亚洲性久久久影院| 亚洲国产另类久久久精品小说 | 久久久久亚洲AV片无码下载蜜桃| 久久久久久久波多野结衣高潮 | 久久精品人人做人人爽电影| 久久精品人成免费| 久久人人爽人爽人人爽av| 九九久久自然熟的香蕉图片| 精品多毛少妇人妻AV免费久久| 国产精品久久久久免费a∨| 久久国产亚洲精品麻豆| 怡红院日本一道日本久久 | 久久婷婷色香五月综合激情| 久久精品水蜜桃av综合天堂| 欧美无乱码久久久免费午夜一区二区三区中文字幕 | 亚洲精品无码久久久| www.久久热| 综合人妻久久一区二区精品| 亚洲第一永久AV网站久久精品男人的天堂AV | 久久国产成人精品麻豆| 免费精品久久天干天干| 久久久久女教师免费一区| 99精品国产在热久久无毒不卡| 亚洲精品乱码久久久久久蜜桃图片 | 日韩精品久久久久久久电影| 久久本道综合久久伊人| 久久亚洲美女精品国产精品| 午夜精品久久影院蜜桃| 久久久久久久综合日本| 久久99精品久久久久久不卡| 久久综合丁香激情久久| 狠狠色丁香婷综合久久| 久久精品亚洲中文字幕无码麻豆| 亚洲AV无一区二区三区久久 | 美女写真久久影院| 久久精品国产91久久综合麻豆自制| 久久精品国产99久久无毒不卡| 久久精品国产亚洲AV电影|