青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品

posts - 297,  comments - 15,  trackbacks - 0
epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
   LT/ET:ET也會多次發送event,當然頻率遠低于LT,但是epoll one shot才是真正的對"one connection VS one thread in worker thread pool,不依賴于任何connection-data-queue"的基礎支持 .我看到大部分對epoll_wait的處理模式如下,很教科化,因為man-pages就是這樣舉例子的。
man-pages epoll_wait handle:
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
for (;;) 
{
   nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
   for(1~nfds)
       handle events[i];
   ......
}
epoll_ctl_add的當然是EPOLLIN|EPOLLET,此外我就不知道處理上面代碼的是一條線程還是一堆線程(in threadpool),但愿不是一條線程吧!如果是的話,難不成等處理完這MAX_EVENTS 個再去處理接下來的MAX_EVENTS 個?慢否? 但是如果是一堆線程的話,你是否考慮過如何處理request-data in one connection在邏輯上的完整性,也就是一個request-data-packet可能會被分割成若干次發送,在上面的處理模式中你真的要好好設計一下了。
而我的epoll_wait處理模式如下:
struct epoll_event activeEvent;
for(;;)
{
   epoll_wait(epollfd, &activeEvent, 1/*很驚訝嗎,但絕不是一條線程在運行這段代碼,而是一堆*/, timeout);
   if handle activeEvent success
      epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
  ......
}
處理上面代碼的當然是一堆線程in threadpool,而且epoll_ctl_add的是EPOLLIN|EPOLLET|EPOLLONESHOT
因為我的設計理念是嚴格遵守one connection VS one thread in worker thread pool。
所以我下面的server框架的基本模型是:
One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
在epollWorkerRoutine中有以下的職責:
1.handle request,當忙時增加epollWorkerThread數量但不超過maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
2.timeout時檢查是否空閑和當前epollWorkerThread數量,當空閑時保持或減少至minThreads數量.
3.對所有Accepted-socket管理生命周期,這里利用系統的keepalive probes,若想實現業務層"心跳探測"只需將QSS_SIO_KEEPALIVE_VALS_TIMEOUT 改回系統默認的2小時.這里并不維護所有連接列表,當然你可以在/proc/getpid/fd下找到所有的socket fd.
4.linux上的non-blocking socket的操作仍然依賴recv,send,不像windows上的wsarecv+overlapped,即便不用fcntl fd o_nonblock也可以立即返回。我這里把send動作實現成了blocking的(internalBlockingSender),同樣的道理,non-blocking send依然會形成響應數據在邏輯上的碎片錯序,特別是你如果采用上面那個教科化的處理模式的化,并且還是多線程的話,那么這簡直就亂透了。當然你可以使用response-data-queue來達到異步準確發送數據的目的。

下面結合源碼,淺析一下epoll programming:
socketserver.h
#ifndef __Q_SOCKET_SERVER__
#define __Q_SOCKET_SERVER__
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <string.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SOCKET_ERROR -1
#define INVALID_SOCKET -1
typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef unsigned short WORD;
typedef unsigned int DWORD;

#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
#define QSS_SIO_KEEPALIVE_VALS_COUNT 3
#define MAX_THREADS 100
#define MAX_THREADS_MIN  10
#define MIN_WORKER_WAIT_TIMEOUT  20*1000
#define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS  32

#define MAX_BUF_SIZE 1024
/* ulimit -n opened FDs per process.記得修改哦,否則還是select效果,就不是epoll效果了哦,呵呵*/
#define BLOCKING_SEND_TIMEOUT 20

typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

typedef struct {
  WORD passive;
  WORD port;//uint16_t
  WORD minThreads;
  WORD maxThreads;
  pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
  volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
  int  workerWaitTimeout;//wait timeout
  volatile int workerCounter;
  volatile int currentBusyWorkers;
  volatile int CSocketsCounter;
  CSocketLifecycleCallback cslifecb;
  InternalProtocolHandler protoHandler;
  SOCKET server_s;
  SOCKADDR_IN serv_addr;
  int epollFD;//main epoller.
  int BSendEpollFD;//For blocking send.
}QSocketServer;

typedef struct {
  SOCKET client_s;
  SOCKADDR_IN client_addr;
  uint32_t curEvents;

  char buf[MAX_BUF_SIZE];
  DWORD numberOfBytesTransferred;
  char * data;

  int BSendEpollFDRelated;
  pthread_mutex_t writableLock;
  pthread_cond_t  writableMonitor;
}QSSEPollEvent;//for per connection

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
int startSocketServer(QSocketServer *qss);
int shutdownSocketServer(QSocketServer *qss);
#endif

qsocketserver_model.c  //下面的代碼離生產環境還差內存池logger哦!
 #include "socketserver.h"
#include <dirent.h>
#include <regex.h>
#define DIGIT_PATTERN_STRING "^[0-9]+$"
void *  epollWorkerRoutine(void *);
void *  blockingSendEpollerRoutine(void *);
int isDigitStr(const char *str){
    int ret=-1;
    regex_t regex;
    regmatch_t matchs[1];
    if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*這里不要傳0哦,否則nomatch*/)){
        ret=!regexec(&regex,str, 1,matchs,0);
        regfree(&regex);
    }
    return ret;
}

static int setNonBlocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts==-1)
    {
        perror("fcntl(sock,GETFL) failed!\n");
        return opts;
    }
    opts = opts|O_NONBLOCK;
    opts=fcntl(sock,F_SETFL,opts);
    if(opts==-1)
    {
        perror("fcntl(sock,SETFL,opts) failed!\n");
        return opts;
    }
    return 1;
}

static void adjustQSSWorkerLimits(QSocketServer *qss){
   //to adjust availabe size.
}
typedef struct{
 QSocketServer * qss;
 pthread_t th;
}QSSWORKER_PARAM;

static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
 WORD res=0;
 if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
 {
  QSSWORKER_PARAM * pParam=NULL;
  int i=0;
  pthread_spin_lock(&qss->g_spinlock);
  if(qss->workerCounter+addCounter<=qss->maxThreads)
   for(;i<addCounter;i++)
   {
    pParam=malloc(sizeof(QSSWORKER_PARAM));

    if(pParam){
     pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
     pParam->qss=qss;
     qss->workerCounter++,res++;
    }
   }
  pthread_spin_unlock(&qss->g_spinlock);
 }
 return res;
}

static void SOlogger(const char * msg,SOCKET s){
 perror(msg);
    if(s>0)
    close(s);
}

static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
    QSSEPollEvent *qssEPEvent=event->data.ptr;
    int ret;
    printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
    if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
        //sleep(5);
        ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
    }
     printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
 return ret;
}

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
{

 QSocketServer * qss=malloc(sizeof(QSocketServer));
 qss->passive=passive;
 qss->port=port;
 qss->minThreads=minThreads;
 qss->maxThreads=maxThreads;
 qss->workerWaitTimeout=workerWaitTimeout;
 qss->lifecycleStatus=0;
 pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
 qss->workerCounter=0;
 qss->currentBusyWorkers=0;
 qss->CSocketsCounter=0;
 qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
 if(!qss->protoHandler)
  qss->protoHandler=_InternalProtocolHandler;
 adjustQSSWorkerLimits(qss);
 *qss_ptr=qss;
 return 1;
}

int startSocketServer(QSocketServer *qss)
{
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==0){
            qss->lifecycleStatus=1;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

 qss->serv_addr.sin_family=AF_INET;
 qss->serv_addr.sin_port=htons(qss->port);
 inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
 //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

 qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
 if(setNonBlocking(qss->server_s)==-1)
 {
    SOlogger("setNonBlocking server_s failed.\n",0);
     return 0;
 }

 if(qss->server_s==INVALID_SOCKET)
 {
  SOlogger("socket failed.\n",0);
  return 0;
 }

 if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
 {
  SOlogger("bind failed.\n",qss->server_s);
  return 0;
 }

 if(listen(qss->server_s,SOMAXCONN/*這個宏windows也有,這里是128,當然你可以設的小些,它影響開銷的*/)==SOCKET_ERROR)
{
  SOlogger("listen failed.\n",qss->server_s);
  return 0;
}
    qss->epollFD=epoll_create1(0);/*這里不是epoll_create(size)哦,你可能不知道如何設置size,所以忽略它吧*/
    if(qss->epollFD==-1){
        SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
        return 0;
    }
    qss->BSendEpollFD=epoll_create1(0);//for blocking send.
    if(qss->BSendEpollFD==-1){
        SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
        return 0;
    }

    {//ADD ACCEPT EVENT
        struct epoll_event _epEvent;
        QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
        qssEPEvent->client_s=qss->server_s;
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
        _epEvent.data.ptr=qssEPEvent;
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
            SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
            free(qssEPEvent);
            return 0;
        }
    }
    {//starup blocking send epoller.
        QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
        pParam->qss=qss;
        pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
    }

 //initialize worker for epoll events.
 addQSSWorker(qss,qss->minThreads);
 qss->lifecycleStatus=2;
 return 1;
}

int shutdownSocketServer(QSocketServer *qss){
    //change qss->lifecycleStatus
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==2){
            qss->lifecycleStatus=3;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    /*shutdown server-listening socket,這里優雅的做法是shutdown--notify-->epoll-->close.記得shutdown會發送EOF的哦*/
    shutdown(qss->server_s,SHUT_RDWR);

    // /proc/getpid/fd  shutdown all socket cs != serv_s
    {
        char dirBuf[64];
        struct dirent * de;
        DIR *pd=NULL;
        int sockFD;
        sprintf(dirBuf,"/proc/%d/fd/",getpid());
        pd=opendir(dirBuf);
        if(pd!=NULL){
            while((de=readdir(pd))!=NULL){
                if(isDigitStr(de->d_name)){
                    sockFD=atoi(de->d_name);
                    if(isfdtype(sockFD,S_IFSOCK))
                    shutdown(sockFD,SHUT_RDWR);
                }
            }
            closedir(pd);
        }
        /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
    }
 return 1;
}

static int onAcceptRoutine(QSocketServer * qss)
{
    SOCKADDR_IN client_addr;
    unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
    SOCKET cs;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
    if(cs==INVALID_SOCKET)
    {
        printf("onAccept failed:%d,%s\n",errno,strerror(errno));
        epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
        return 0;
    }
    if(setNonBlocking(cs)==-1)
    {
        printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
        return 0;
    }

    {// set keepalive option
        int keepAlive = 1;
        int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
        int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
        int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
        if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
           setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
           setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
           setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
           {
            printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
            return 0;
           }
    }
    qssEPEvent=malloc(sizeof(QSSEPollEvent));
    qssEPEvent->client_s=cs;
    {
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
        qssEPEvent->BSendEpollFDRelated=0;
        _epEvent.data.ptr=qssEPEvent;/*這里又和教科的不一樣哦,真正的user data用ptr,而不是單一的fd*/
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
            printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
            free(qssEPEvent);
            return 0;
        }else{
            pthread_spin_lock(&qss->g_spinlock);
            qss->CSocketsCounter++;
            pthread_spin_unlock(&qss->g_spinlock);
            if(qss->cslifecb)
                qss->cslifecb(cs,0);
        }
    }
    printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
    return 1;
}

typedef struct{
    QSocketServer * qss;
    QSSEPollEvent * event;
}InternalSenderBase_t;

static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
    InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
    char * _sbuf=_buf;
    int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

    QSSEPollEvent *qssEPEvent=NULL;
    struct epoll_event _epEvent;

    struct timespec sendTimeo;

    while(1){
        *errno_ptr=0;
        while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
            sum+=ret,_sbuf+=ret;
        if(sum==nbs||ret==0)
            break;
        else if(ret==-1){
            if(errno==EAGAIN&&sum<nbs){
                qssEPEvent=sb->event;
                _epEvent.data.ptr=qssEPEvent;
                _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                if(qssEPEvent->BSendEpollFDRelated==0){
                    pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                    pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                    qssEPEvent->BSendEpollFDRelated=1;
                    curEpoll_ctl_opt=EPOLL_CTL_ADD;
                }else{
                    curEpoll_ctl_opt=EPOLL_CTL_MOD;
                }

                {//wait writable.
                    int flag=0;
                    pthread_mutex_lock(&qssEPEvent->writableLock);
                    if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                        sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                        int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                        if(err)
                            flag=-1;
                    }else
                        flag=-1;
                    pthread_mutex_unlock(&qssEPEvent->writableLock);
                    if(flag==-1)
                        break;
                }

            }else{
                if(errno==EAGAIN&&sum==nbs)
                    ret=nbs;//it is ok;
                break;
            }
        }
    }//end while.
    return ret;
}
void *  blockingSendEpollerRoutine(void *_param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    //pthread_t * curThread=&pParam->th;
    struct epoll_event epEvents[qss->maxThreads];
    QSSEPollEvent *qssEPEvent=NULL;
    int pollRes,*errno_ptr=&errno;

    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(1){

        pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
        if(pollRes>=1){
            int i=0;
            for(;i<pollRes;i++)
            if(epEvents[i].events&EPOLLOUT){//這個epollfd只應該做以下的事情,少做為快!
                qssEPEvent=epEvents[i].data.ptr;
                pthread_mutex_lock(&qssEPEvent->writableLock);
                pthread_cond_signal(&qssEPEvent->writableMonitor);
                pthread_mutex_unlock(&qssEPEvent->writableLock);
            }

        }else if(pollRes==-1){//errno 
            printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            break;
        }

    }

    return NULL;
}

void *  epollWorkerRoutine(void * _param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    pthread_t * curThread=&pParam->th;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    InternalSenderBase_t _senderBase;
    int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
    _senderBase.qss=qss;
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(!exitCode){

        *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
        pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
        if(pollRes==1){
            qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

            if(qssEPEvent->client_s==qss->server_s)
            {//Accepted Socket.
               onAcceptRoutine(qss);
               continue;
            }else{
                if(qss->protoHandler){
                    _senderBase.event=_epEvent.data.ptr;
                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers++;
                    pthread_spin_unlock(&qss->g_spinlock);

                    addQSSWorker(qss,1);
                    handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers--;
                    pthread_spin_unlock(&qss->g_spinlock);

                    if(handleCode>0){
                        _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                        if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                            SOErrOccurred=2;
                    }else{
                        SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                    }
                }
            }

        }else if(pollRes==0){//timeout
            printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
            {
                pthread_spin_lock(&qss->g_spinlock);
                if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                    qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                    exitCode=2;
                }
                pthread_spin_unlock(&qss->g_spinlock);
            }else if(qss->lifecycleStatus>=4)
                    exitCode=4;

        }else if(pollRes==-1){//errno
            printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            exitCode=1;
        }

        if(SOErrOccurred){
            if(qss->cslifecb)
                qss->cslifecb(qssEPEvent->client_s,-1);
            /*if(qssEPEvent)*/{
                epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                close(qssEPEvent->client_s);
                if(qssEPEvent->BSendEpollFDRelated){
                    pthread_cond_destroy(&qssEPEvent->writableMonitor);
                    pthread_mutex_destroy(&qssEPEvent->writableLock);
                }
                free(qssEPEvent);
              }
            pthread_spin_lock(&qss->g_spinlock);
            if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
             //for qss workerSize,
             qss->lifecycleStatus=4;
             exitCode=3;
            }
            pthread_spin_unlock(&qss->g_spinlock);
        }//SOErrOccurred handle;

    }//end main while.

    if(exitCode!=2){
        int clearup=0;
        pthread_spin_lock(&qss->g_spinlock);
        if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
          clearup=1;
        }
        pthread_spin_unlock(&qss->g_spinlock);
        if(clearup){
         close(qss->epollFD);
         close(qss->BSendEpollFD);
         pthread_spin_destroy(&qss->g_spinlock);
         free(qss);
        }
    }//exitCode handle;
 return NULL;
}


from:

http://www.shnenglu.com/adapterofcoms/archive/2010/08/03/122063.html

posted on 2010-08-25 20:41 chatler 閱讀(2593) 評論(0)  編輯 收藏 引用 所屬分類: Network 、SocketLinux_Coding
<2010年11月>
31123456
78910111213
14151617181920
21222324252627
2829301234
567891011

常用鏈接

留言簿(10)

隨筆分類(307)

隨筆檔案(297)

algorithm

Books_Free_Online

C++

database

Linux

Linux shell

linux socket

misce

  • cloudward
  • 感覺這個博客還是不錯,雖然做的東西和我不大相關,覺得看看還是有好處的

network

OSS

  • Google Android
  • Android is a software stack for mobile devices that includes an operating system, middleware and key applications. This early look at the Android SDK provides the tools and APIs necessary to begin developing applications on the Android platform using the Java programming language.
  • os161 file list

overall

搜索

  •  

最新評論

閱讀排行榜

評論排行榜

青青草原综合久久大伊人导航_色综合久久天天综合_日日噜噜夜夜狠狠久久丁香五月_热久久这里只有精品
  • <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            亚洲福利免费| 亚洲国产成人午夜在线一区| 欧美成人亚洲成人| 久久国产精品色婷婷| 欧美日韩精品一区二区| 久久在线精品| 国产日韩欧美在线播放不卡| 99热精品在线| 一本综合精品| 欧美激情第一页xxx| 欧美福利视频在线观看| 国产一区二区福利| 午夜精品国产更新| 亚洲女性喷水在线观看一区| 欧美人与性动交cc0o| 91久久精品www人人做人人爽| 在线成人黄色| 久久久久一区二区三区四区| 久久先锋资源| 狠狠爱www人成狠狠爱综合网| 性色av一区二区三区红粉影视| 西瓜成人精品人成网站| 国产精品热久久久久夜色精品三区| 亚洲激情自拍| 中国女人久久久| 欧美日韩另类视频| 99热这里只有精品8| 亚洲一区二区久久| 国产精品国产a级| 亚洲在线视频观看| 欧美一级大片在线观看| 国产日产欧美a一级在线| 欧美一区二区免费视频| 欧美在线看片| 好吊色欧美一区二区三区视频| 久久久www成人免费毛片麻豆| 老鸭窝亚洲一区二区三区| 亚洲国产成人av| 欧美国产日韩精品免费观看| 亚洲精品美女| 欧美一级电影久久| 国产综合色精品一区二区三区 | 欧美在线一区二区三区| 国产日韩欧美精品在线| 久久精品视频va| 欧美激情一区二区三区四区| 一本久道久久久| 国产精品麻豆va在线播放| 午夜欧美理论片| 欧美插天视频在线播放| 日韩一级精品视频在线观看| 国产精品久在线观看| 性欧美xxxx视频在线观看| 欧美成人亚洲成人| 亚洲一区二区精品在线| 一区二区在线观看av| 欧美96在线丨欧| 亚洲一区免费观看| 欧美成人一品| 亚洲性夜色噜噜噜7777| 国产综合久久| 欧美日韩一区视频| 久久久午夜精品| 一本综合久久| 欧美高清视频www夜色资源网| 亚洲视频在线观看网站| 亚洲精品一区二| 久久av二区| 夜夜夜久久久| 一区在线观看视频| 欧美午夜a级限制福利片| 久久久欧美一区二区| 中文网丁香综合网| 欧美国产先锋| 久久免费99精品久久久久久| 中国成人在线视频| 亚洲国产精品999| 国产日韩欧美视频在线| 欧美日韩伦理在线免费| 久久久噜噜噜久久久| 亚洲一区二区三区精品视频| 亚洲黄色成人| 蜜臀久久久99精品久久久久久| 先锋影音国产一区| 99视频一区| 亚洲二区免费| 精品动漫av| 国产一区再线| 国产伦精品一区二区三区四区免费 | 欧美aⅴ一区二区三区视频| 亚洲欧美日韩人成在线播放| 亚洲美女一区| 亚洲区中文字幕| 影音先锋久久久| 黄网站免费久久| 国产亚洲欧美一区二区| 国产乱肥老妇国产一区二| 欧美日韩在线一区二区三区| 欧美大片一区二区| 麻豆精品精华液| 毛片一区二区三区| 麻豆成人在线播放| 久久久久久久高潮| 久久久夜精品| 欧美1区2区3区| 免费观看30秒视频久久| 久久这里只精品最新地址| 久久精品一二三区| 久久婷婷国产麻豆91天堂| 久久久久在线观看| 久久综合给合久久狠狠色 | 91久久久久久久久| 亚洲成人在线| 亚洲激情视频网站| 亚洲人成在线播放网站岛国| 亚洲精品国产精品国自产在线 | 一区国产精品| 亚洲高清电影| 99精品国产热久久91蜜凸| 亚洲麻豆视频| 中文网丁香综合网| 午夜精品成人在线视频| 久久精品成人欧美大片古装| 久久久久www| 亚洲成色777777在线观看影院| 亚洲第一区在线| 一区二区国产日产| 欧美亚洲视频在线看网址| 久久夜色精品| 欧美另类99xxxxx| 国产精品一区二区你懂得| 韩日成人在线| 一本色道88久久加勒比精品| 性色av一区二区三区在线观看| 久久精品一本久久99精品| 欧美激情精品久久久久久蜜臀| 亚洲黄网站黄| 香蕉久久夜色精品| 麻豆精品精华液| 国产精品毛片高清在线完整版| 国产综合激情| 亚洲视频中文字幕| 久久亚洲精品欧美| 亚洲久久在线| 久久精品视频在线看| 欧美理论视频| 黑人巨大精品欧美一区二区小视频 | 中文欧美日韩| 噜噜噜噜噜久久久久久91| 亚洲精品美女久久7777777| 午夜精品久久久久久久男人的天堂| 久久夜色精品国产噜噜av| 欧美性猛交99久久久久99按摩| 在线观看亚洲精品视频| 亚洲字幕一区二区| 欧美激情亚洲视频| 午夜免费在线观看精品视频| 欧美精品首页| 在线日本高清免费不卡| 午夜久久久久久| 亚洲人精品午夜| 久久视频一区| 国产视频精品xxxx| 亚洲图片欧美日产| 亚洲国产成人av在线| 欧美在线免费播放| 国产精品一区二区三区久久| 99国产精品99久久久久久粉嫩| 另类图片国产| 久久av一区二区三区漫画| 国产精品啊v在线| 一本色道久久88综合日韩精品| 欧美 日韩 国产一区二区在线视频 | 免费影视亚洲| 欧美在线视频不卡| 国产精品日韩精品欧美精品| av成人国产| 亚洲高清资源综合久久精品| 久久久亚洲高清| 国产日韩欧美在线观看| 亚洲免费在线| 一区二区三区国产盗摄| 欧美日韩国产成人精品| 亚洲日本精品国产第一区| 蜜桃av综合| 久久全国免费视频| 欲色影视综合吧| 久久伊人免费视频| 久久久久久尹人网香蕉| 一区二区在线观看视频| 久久视频精品在线| 久久精品1区| 国产真实乱偷精品视频免| 久久国产视频网| 久久国产精彩视频| 国语自产精品视频在线看| 久久久久久亚洲综合影院红桃| 欧美专区亚洲专区| 激情一区二区| 欧美激情第9页|