锘??xml version="1.0" encoding="utf-8" standalone="yes"?>久久午夜综合久久,久久久WWW成人,久久精品国产亚洲AV无码麻豆http://www.shnenglu.com/adapterofcoms/zh-cnFri, 09 May 2025 10:11:53 GMTFri, 09 May 2025 10:11:53 GMT60涓涓熀浜嶦vent Poll(epoll)鐨凾CP Server Framework,嫻呮瀽epollhttp://www.shnenglu.com/adapterofcoms/archive/2010/08/03/122063.htmladapterofcomsadapterofcomsTue, 03 Aug 2010 06:37:00 GMThttp://www.shnenglu.com/adapterofcoms/archive/2010/08/03/122063.htmlhttp://www.shnenglu.com/adapterofcoms/comments/122063.htmlhttp://www.shnenglu.com/adapterofcoms/archive/2010/08/03/122063.html#Feedback1http://www.shnenglu.com/adapterofcoms/comments/commentRss/122063.htmlhttp://www.shnenglu.com/adapterofcoms/services/trackbacks/122063.html   epoll,event poll,on linux kernel 2.6.x.pthread,nptl-2.12
   LT/ET:ET涔熶細澶氭鍙戦乪vent,褰撶劧棰戠巼榪滀綆浜嶭T,浣嗘槸epoll one shot鎵嶆槸鐪熸鐨勫"one connection VS one thread in worker thread pool,涓嶄緷璧栦簬浠諱綍connection-data-queue"鐨勫熀紜鏀寔 .鎴戠湅鍒板ぇ閮ㄥ垎瀵筫poll_wait鐨勫鐞嗘ā寮忓涓嬶紝寰堟暀縐戝寲錛屽洜涓簃an-pages灝辨槸榪欐牱涓句緥瀛愮殑銆?br>man-pages epoll_wait handle:
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
for (;;)
{
   nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
   for(1~nfds)
       handle events[i];
   ......
}
epoll_ctl_add鐨勫綋鐒舵槸EPOLLIN|EPOLLET,姝ゅ鎴戝氨涓嶇煡閬撳鐞嗕笂闈唬鐮佺殑鏄竴鏉$嚎紼嬭繕鏄竴鍫嗙嚎紼嬶紙in threadpool錛夛紝浣嗘効涓嶆槸涓鏉$嚎紼嬪惂錛佸鏋滄槸鐨勮瘽錛岄毦涓嶆垚絳夊鐞嗗畬榪?strong>MAX_EVENTS 涓啀鍘誨鐞嗘帴涓嬫潵鐨?strong>MAX_EVENTS 涓紵鎱㈠惁錛?浣嗘槸濡傛灉鏄竴鍫嗙嚎紼嬬殑璇濓紝浣犳槸鍚﹁冭檻榪囧浣曞鐞唕equest-data in one connection鍦ㄩ昏緫涓婄殑瀹屾暣鎬э紝涔熷氨鏄竴涓猺equest-data-packet鍙兘浼氳鍒嗗壊鎴愯嫢騫叉鍙戦侊紝鍦ㄤ笂闈㈢殑澶勭悊妯″紡涓綘鐪熺殑瑕佸ソ濂借璁′竴涓嬩簡銆?br>鑰屾垜鐨別poll_wait澶勭悊妯″紡濡備笅錛?br>struct epoll_event activeEvent;
for(;;)
{
   epoll_wait(epollfd, &activeEvent, 1/*寰堟儕璁跺悧錛屼絾緇濅笉鏄竴鏉$嚎紼嬪湪榪愯榪欐浠g爜,鑰屾槸涓鍫?/, timeout);
   if handle activeEvent success
      epoll_ctl_mod EPOLLIN|EPOLLET|EPOLLONESHOT
  ......
}
澶勭悊涓婇潰浠g爜鐨勫綋鐒舵槸涓鍫嗙嚎紼媔n threadpool,鑰屼笖epoll_ctl_add鐨勬槸EPOLLIN|EPOLLET|EPOLLONESHOT
鍥犱負鎴戠殑璁捐鐞嗗康鏄弗鏍奸伒瀹?strong>one connection VS one thread in worker thread pool銆?br>鎵浠ユ垜涓嬮潰鐨剆erver妗嗘灦鐨勫熀鏈ā鍨嬫槸:
One connection VS one thread in worker thread pool ,worker thread performs epollWorkerRoutine.
鍦╡pollWorkerRoutine涓湁浠ヤ笅鐨勮亴璐?
1.handle request,褰撳繖鏃跺鍔爀pollWorkerThread鏁伴噺浣嗕笉瓚呰繃maxThreads,post/MOD EPOLLIN|EPOLLONESHOT Interested Event to epoll.
2.timeout鏃舵鏌ユ槸鍚︾┖闂插拰褰撳墠epollWorkerThread鏁伴噺,褰撶┖闂叉椂淇濇寔鎴栧噺灝戣嚦minThreads鏁伴噺.
3.瀵規墍鏈堿ccepted-socket綆$悊鐢熷懡鍛ㄦ湡,榪欓噷鍒╃敤緋葷粺鐨刱eepalive probes,鑻ユ兂瀹炵幇涓氬姟灞?蹇冭煩鎺㈡祴"鍙渶灝哘SS_SIO_KEEPALIVE_VALS_TIMEOUT 鏀瑰洖緋葷粺榛樿鐨?灝忔椂.榪欓噷騫朵笉緇存姢鎵鏈夎繛鎺ュ垪琛紝褰撶劧浣犲彲浠ュ湪/proc/getpid/fd涓嬫壘鍒版墍鏈夌殑socket fd.
4.linux涓婄殑non-blocking socket鐨勬搷浣滀粛鐒朵緷璧杛ecv,send錛屼笉鍍弚indows涓婄殑wsarecv+overlapped,鍗充究涓嶇敤fcntl fd o_nonblock涔熷彲浠ョ珛鍗寵繑鍥炪傛垜榪欓噷鎶妔end鍔ㄤ綔瀹炵幇鎴愪簡blocking鐨勶紙internalBlockingSender錛夛紝鍚屾牱鐨勯亾鐞嗭紝non-blocking send渚濈劧浼氬艦鎴愬搷搴旀暟鎹湪閫昏緫涓婄殑紕庣墖閿欏簭錛岀壒鍒槸浣犲鏋滈噰鐢ㄤ笂闈㈤偅涓暀縐戝寲鐨勫鐞嗘ā寮忕殑鍖栵紝騫朵笖榪樻槸澶氱嚎紼嬬殑璇濓紝閭d箞榪欑畝鐩村氨涔遍忎簡銆傚綋鐒朵綘鍙互浣跨敤response-data-queue鏉ヨ揪鍒板紓姝ュ噯紜彂閫佹暟鎹殑鐩殑銆?br>
涓嬮潰緇撳悎婧愮爜,嫻呮瀽涓涓媏poll programming:
socketserver.h
#ifndef __Q_SOCKET_SERVER__
#define __Q_SOCKET_SERVER__
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <string.h>
#include <sys/epoll.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define SOCKET_ERROR -1
#define INVALID_SOCKET -1
typedef int SOCKET;
typedef struct sockaddr_in SOCKADDR_IN;
typedef unsigned short WORD;
typedef unsigned int DWORD;

#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
#define QSS_SIO_KEEPALIVE_VALS_COUNT 3
#define MAX_THREADS 100
#define MAX_THREADS_MIN  10
#define MIN_WORKER_WAIT_TIMEOUT  20*1000
#define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS  32

#define MAX_BUF_SIZE 1024
/* ulimit -n opened FDs per process.璁板緱淇敼鍝︼紝鍚﹀垯榪樻槸select鏁堟灉,灝變笉鏄痚poll鏁堟灉浜嗗摝錛屽懙鍛?/
#define BLOCKING_SEND_TIMEOUT 20

typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR

typedef struct {
  WORD passive;
  WORD port;//uint16_t
  WORD minThreads;
  WORD maxThreads;
  pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
  volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
  int  workerWaitTimeout;//wait timeout
  volatile int workerCounter;
  volatile int currentBusyWorkers;
  volatile int CSocketsCounter;
  CSocketLifecycleCallback cslifecb;
  InternalProtocolHandler protoHandler;
  SOCKET server_s;
  SOCKADDR_IN serv_addr;
  int epollFD;//main epoller.
  int BSendEpollFD;//For blocking send.
}QSocketServer;

typedef struct {
  SOCKET client_s;
  SOCKADDR_IN client_addr;
  uint32_t curEvents;

  char buf[MAX_BUF_SIZE];
  DWORD numberOfBytesTransferred;
  char * data;

  int BSendEpollFDRelated;
  pthread_mutex_t writableLock;
  pthread_cond_t  writableMonitor;
}QSSEPollEvent;//for per connection

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
int startSocketServer(QSocketServer *qss);
int shutdownSocketServer(QSocketServer *qss);
#endif

qsocketserver_model.c  //涓嬮潰鐨勪唬鐮佺鐢熶駭鐜榪樺樊鍐呭瓨姹?/strong>鍜?strong>logger鍝︼紒
 #include "socketserver.h"
#include <dirent.h>
#include <regex.h>
#define DIGIT_PATTERN_STRING "^[0-9]+$"
void *  epollWorkerRoutine(void *);
void *  blockingSendEpollerRoutine(void *);
int isDigitStr(const char *str){
    int ret=-1;
    regex_t regex;
    regmatch_t matchs[1];
    if(!regcomp(&regex,DIGIT_PATTERN_STRING,REG_EXTENDED/*榪欓噷涓嶈浼?鍝︼紝鍚﹀垯nomatch*/)){
        ret=!regexec(&regex,str, 1,matchs,0);
        regfree(&regex);
    }
    return ret;
}

static int setNonBlocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts==-1)
    {
        perror("fcntl(sock,GETFL) failed!\n");
        return opts;
    }
    opts = opts|O_NONBLOCK;
    opts=fcntl(sock,F_SETFL,opts);
    if(opts==-1)
    {
        perror("fcntl(sock,SETFL,opts) failed!\n");
        return opts;
    }
    return 1;
}

static void adjustQSSWorkerLimits(QSocketServer *qss){
   //to adjust availabe size.
}
typedef struct{
 QSocketServer * qss;
 pthread_t th;
}QSSWORKER_PARAM;

static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
 WORD res=0;
 if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
 {
  QSSWORKER_PARAM * pParam=NULL;
  int i=0;
  pthread_spin_lock(&qss->g_spinlock);
  if(qss->workerCounter+addCounter<=qss->maxThreads)
   for(;i<addCounter;i++)
   {
    pParam=malloc(sizeof(QSSWORKER_PARAM));

    if(pParam){
     pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
     pParam->qss=qss;
     qss->workerCounter++,res++;
    }
   }
  pthread_spin_unlock(&qss->g_spinlock);
 }
 return res;
}

static void SOlogger(const char * msg,SOCKET s){
 perror(msg);
    if(s>0)
    close(s);
}

static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
    QSSEPollEvent *qssEPEvent=event->data.ptr;
    int ret;
    printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
    if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
        //sleep(5);
        ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
    }
     printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
 return ret;
}

int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
{

 QSocketServer * qss=malloc(sizeof(QSocketServer));
 qss->passive=passive;
 qss->port=port;
 qss->minThreads=minThreads;
 qss->maxThreads=maxThreads;
 qss->workerWaitTimeout=workerWaitTimeout;
 qss->lifecycleStatus=0;
 pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
 qss->workerCounter=0;
 qss->currentBusyWorkers=0;
 qss->CSocketsCounter=0;
 qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
 if(!qss->protoHandler)
  qss->protoHandler=_InternalProtocolHandler;
 adjustQSSWorkerLimits(qss);
 *qss_ptr=qss;
 return 1;
}

int startSocketServer(QSocketServer *qss)
{
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==0){
            qss->lifecycleStatus=1;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    //bzero(&qss->serv_addr, sizeof(qss->serv_addr));

 qss->serv_addr.sin_family=AF_INET;
 qss->serv_addr.sin_port=htons(qss->port);
 inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
 //qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");

 qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
 if(setNonBlocking(qss->server_s)==-1)
 {
    SOlogger("setNonBlocking server_s failed.\n",0);
     return 0;
 }

 if(qss->server_s==INVALID_SOCKET)
 {
  SOlogger("socket failed.\n",0);
  return 0;
 }

 if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
 {
  SOlogger("bind failed.\n",qss->server_s);
  return 0;
 }

 if(listen(qss->server_s,SOMAXCONN/*榪欎釜瀹弚indows涔熸湁錛岃繖閲屾槸128錛屽綋鐒朵綘鍙互璁劇殑灝忎簺錛屽畠褰卞搷寮閿鐨?/strong>*/)==SOCKET_ERROR)
{
  SOlogger("listen failed.\n",qss->server_s);
  return 0;
}
    qss->epollFD=epoll_create1(0);/*榪欓噷涓嶆槸epoll_create(size)鍝︼紝浣犲彲鑳戒笉鐭ラ亾濡備綍璁劇疆size,鎵浠ュ拷鐣ュ畠鍚?/
    if(qss->epollFD==-1){
        SOlogger("epoll_create1 0, main epollFD  failed.\n",qss->server_s);
        return 0;
    }
    qss->BSendEpollFD=epoll_create1(0);//for blocking send.
    if(qss->BSendEpollFD==-1){
        SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
        return 0;
    }

    {//ADD ACCEPT EVENT
        struct epoll_event _epEvent;
        QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
        qssEPEvent->client_s=qss->server_s;
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
        _epEvent.data.ptr=qssEPEvent;
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
            SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
            free(qssEPEvent);
            return 0;
        }
    }
    {//starup blocking send epoller.
        QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
        pParam->qss=qss;
        pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
    }

 //initialize worker for epoll events.
 addQSSWorker(qss,qss->minThreads);
 qss->lifecycleStatus=2;
 return 1;
}

int shutdownSocketServer(QSocketServer *qss){
    //change qss->lifecycleStatus
    if(qss==NULL)
        return 0;
    else{
        pthread_spin_lock(&qss->g_spinlock);
        if(qss->lifecycleStatus==2){
            qss->lifecycleStatus=3;
            pthread_spin_unlock(&qss->g_spinlock);
        }else{
            pthread_spin_unlock(&qss->g_spinlock);
            return 0;
        }
    }
    /*shutdown server-listening socket,榪欓噷浼橀泤鐨勫仛娉曟槸shutdown--notify-->epoll-->close.璁板緱shutdown浼氬彂閫丒OF鐨勫摝*/
    shutdown(qss->server_s,SHUT_RDWR);

    // /proc/getpid/fd  shutdown all socket cs != serv_s
    {
        char dirBuf[64];
        struct dirent * de;
        DIR *pd=NULL;
        int sockFD;
        sprintf(dirBuf,"/proc/%d/fd/",getpid());
        pd=opendir(dirBuf);
        if(pd!=NULL){
            while((de=readdir(pd))!=NULL){
                if(isDigitStr(de->d_name)){
                    sockFD=atoi(de->d_name);
                    if(isfdtype(sockFD,S_IFSOCK))
                    shutdown(sockFD,SHUT_RDWR);
                }
            }
            closedir(pd);
        }
        /*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
    }
 return 1;
}

static int onAcceptRoutine(QSocketServer * qss)
{
    SOCKADDR_IN client_addr;
    unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
    SOCKET cs;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
    if(cs==INVALID_SOCKET)
    {
        printf("onAccept failed:%d,%s\n",errno,strerror(errno));
        epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22  Invalid argument
        return 0;
    }
    if(setNonBlocking(cs)==-1)
    {
        printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
        return 0;
    }

    {// set keepalive option
        int keepAlive = 1;
        int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
        int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
        int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
        if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
           setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
           setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
           setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
           {
            printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
            return 0;
           }
    }
    qssEPEvent=malloc(sizeof(QSSEPollEvent));
    qssEPEvent->client_s=cs;
    {
        _epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
        qssEPEvent->BSendEpollFDRelated=0;
        _epEvent.data.ptr=qssEPEvent;/*榪欓噷鍙堝拰鏁欑鐨勪笉涓鏍峰摝錛岀湡姝g殑user data鐢╬tr,鑰屼笉鏄崟涓鐨刦d*/
        if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
            printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
            free(qssEPEvent);
            return 0;
        }else{
            pthread_spin_lock(&qss->g_spinlock);
            qss->CSocketsCounter++;
            pthread_spin_unlock(&qss->g_spinlock);
            if(qss->cslifecb)
                qss->cslifecb(cs,0);
        }
    }
    printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
    return 1;
}

typedef struct{
    QSocketServer * qss;
    QSSEPollEvent * event;
}InternalSenderBase_t;

static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
    InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
    char * _sbuf=_buf;
    int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;

    QSSEPollEvent *qssEPEvent=NULL;
    struct epoll_event _epEvent;

    struct timespec sendTimeo;

    while(1){
        *errno_ptr=0;
        while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
            sum+=ret,_sbuf+=ret;
        if(sum==nbs||ret==0)
            break;
        else if(ret==-1){
            if(errno==EAGAIN&&sum<nbs){
                qssEPEvent=sb->event;
                _epEvent.data.ptr=qssEPEvent;
                _epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
                if(qssEPEvent->BSendEpollFDRelated==0){
                    pthread_mutex_init(&qssEPEvent->writableLock,NULL);
                    pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
                    qssEPEvent->BSendEpollFDRelated=1;
                    curEpoll_ctl_opt=EPOLL_CTL_ADD;
                }else{
                    curEpoll_ctl_opt=EPOLL_CTL_MOD;
                }

                {//wait writable.
                    int flag=0;
                    pthread_mutex_lock(&qssEPEvent->writableLock);
                    if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
                        sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
                        int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
                        if(err)
                            flag=-1;
                    }else
                        flag=-1;
                    pthread_mutex_unlock(&qssEPEvent->writableLock);
                    if(flag==-1)
                        break;
                }

            }else{
                if(errno==EAGAIN&&sum==nbs)
                    ret=nbs;//it is ok;
                break;
            }
        }
    }//end while.
    return ret;
}
void *  blockingSendEpollerRoutine(void *_param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    //pthread_t * curThread=&pParam->th;
    struct epoll_event epEvents[qss->maxThreads];
    QSSEPollEvent *qssEPEvent=NULL;
    int pollRes,*errno_ptr=&errno;

    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(1){

        pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
        if(pollRes>=1){
            int i=0;
            for(;i<pollRes;i++)
            if(epEvents[i].events&EPOLLOUT){//榪欎釜epollfd鍙簲璇ュ仛浠ヤ笅鐨勪簨鎯咃紝灝戝仛涓哄揩!
                qssEPEvent=epEvents[i].data.ptr;
                pthread_mutex_lock(&qssEPEvent->writableLock);
                pthread_cond_signal(&qssEPEvent->writableMonitor);
                pthread_mutex_unlock(&qssEPEvent->writableLock);
            }

        }else if(pollRes==-1){//errno 
            printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            break;
        }

    }

    return NULL;
}

void *  epollWorkerRoutine(void * _param){
    QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
    QSocketServer * qss=pParam->qss;
    pthread_t * curThread=&pParam->th;
    struct epoll_event _epEvent;
    QSSEPollEvent *qssEPEvent=NULL;
    InternalSenderBase_t _senderBase;
    int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
    _senderBase.qss=qss;
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);

    free(pParam);
    while(!exitCode){

        *errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
        pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
        if(pollRes==1){
            qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;

            if(qssEPEvent->client_s==qss->server_s)
            {//Accepted Socket.
               onAcceptRoutine(qss);
               continue;
            }else{
                if(qss->protoHandler){
                    _senderBase.event=_epEvent.data.ptr;
                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers++;
                    pthread_spin_unlock(&qss->g_spinlock);

                    addQSSWorker(qss,1);
                    handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);

                    pthread_spin_lock(&qss->g_spinlock);
                    qss->currentBusyWorkers--;
                    pthread_spin_unlock(&qss->g_spinlock);

                    if(handleCode>0){
                        _epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
                        if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
                            SOErrOccurred=2;
                    }else{
                        SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
                    }
                }
            }

        }else if(pollRes==0){//timeout
            printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
            if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
            {
                pthread_spin_lock(&qss->g_spinlock);
                if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
                    qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
                    exitCode=2;
                }
                pthread_spin_unlock(&qss->g_spinlock);
            }else if(qss->lifecycleStatus>=4)
                    exitCode=4;

        }else if(pollRes==-1){//errno
            printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
            exitCode=1;
        }

        if(SOErrOccurred){
            if(qss->cslifecb)
                qss->cslifecb(qssEPEvent->client_s,-1);
            /*if(qssEPEvent)*/{
                epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
                close(qssEPEvent->client_s);
                if(qssEPEvent->BSendEpollFDRelated){
                    pthread_cond_destroy(&qssEPEvent->writableMonitor);
                    pthread_mutex_destroy(&qssEPEvent->writableLock);
                }
                free(qssEPEvent);
              }
            pthread_spin_lock(&qss->g_spinlock);
            if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
             //for qss workerSize,
             qss->lifecycleStatus=4;
             exitCode=3;
            }
            pthread_spin_unlock(&qss->g_spinlock);
        }//SOErrOccurred handle;

    }//end main while.

    if(exitCode!=2){
        int clearup=0;
        pthread_spin_lock(&qss->g_spinlock);
        if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
          clearup=1;
        }
        pthread_spin_unlock(&qss->g_spinlock);
        if(clearup){
         close(qss->epollFD);
         close(qss->BSendEpollFD);
         pthread_spin_destroy(&qss->g_spinlock);
         free(qss);
        }
    }//exitCode handle;
 return NULL;
}

 

 

 

 



]]>
涓涓熀浜庡畬鎴愮鍙g殑TCP Server Framework,嫻呮瀽IOCPhttp://www.shnenglu.com/adapterofcoms/archive/2010/06/26/118781.htmladapterofcomsadapterofcomsSat, 26 Jun 2010 09:30:00 GMThttp://www.shnenglu.com/adapterofcoms/archive/2010/06/26/118781.htmlhttp://www.shnenglu.com/adapterofcoms/comments/118781.htmlhttp://www.shnenglu.com/adapterofcoms/archive/2010/06/26/118781.html#Feedback1http://www.shnenglu.com/adapterofcoms/comments/commentRss/118781.htmlhttp://www.shnenglu.com/adapterofcoms/services/trackbacks/118781.html    CreateIoCompletionPort鐨凬umberOfConcurrentThreads錛?br>1.鍙湁褰撶浜屼釜鍙傛暟ExistingCompletionPort涓篘ULL鏃跺畠鎵嶆湁鏁堬紝瀹冩槸涓猰ax threads limits.
2.澶у鏈夎皝鎶婂畠璁劇疆涓鴻秴鍑篶pu涓暟鐨勫鹼紝褰撶劧涓嶅彧鏄痗pu涓暟鐨?鍊嶏紝鑰屾槸涓嬮潰鐨凪AX_THREADS 100鐢氳嚦鏇村ぇ銆?br>瀵逛簬榪欎釜鍊肩殑璁懼畾錛宮sdn騫舵病鏈夎闈炲緱璁炬垚cpu涓暟鐨?鍊嶏紝鑰屼笖涔熸病鏈夋妸鍑忓皯綰跨▼涔嬮棿涓婁笅鏂囦氦鎹㈣繖浜涘獎鍝嶆壇鍒拌繖閲屾潵銆侷/O Completion Ports MSDN:"If your transaction required a lengthy computation, a larger concurrency value will allow more threads to run. Each completion packet may take longer to finish, but more completion packets will be processed at the same time. "銆?br>    瀵逛簬struct OVERLAPPED錛屾垜浠父浼氬涓嬫墿灞曪紝
typedef struct {
  WSAOVERLAPPED overlapped; //must be first member?   鏄殑錛屽繀欏繪槸絎竴涓傚鏋滀綘涓嶈偗瀹氾紝浣犲彲浠ヨ瘯璇曘?br>  SOCKET client_s;
  SOCKADDR_IN client_addr;
  WORD optCode;//1--read,2--send.  鏈変漢甯鎬細瀹氫箟榪欎釜鏁版嵁鎴愬憳錛屼絾涔熸湁浜轟笉鐢紝浜夎鍦╯end/WSASend,姝ゆ椂鐨勫悓姝ュ拰寮傛鏄惁鏈夊繀瑕侊紵 鑷沖皯鎴戜笅闈㈢殑server鏇存湰灝辨病鐢ㄥ畠銆?br>  char buf[MAX_BUF_SIZE];
  WSABUF wsaBuf;//inited ?  榪欎釜涓嶈蹇樹簡錛?br>  DWORD numberOfBytesTransferred;
  DWORD flags;   

}QSSOverlapped;//for per connection
鎴戜笅闈㈢殑server妗嗘灦鐨勫熀鏈濇兂鏄?
One connection VS one thread in worker thread pool ,worker thread performs completionWorkerRoutine.
A Acceptor thread 涓撻棬鐢ㄦ潵accept socket,鍏寵仈鑷矷OCP,騫禬SARecv:post Recv Completion Packet to IOCP.
鍦╟ompletionWorkerRoutine涓湁浠ヤ笅鐨勮亴璐?
1.handle request,褰撳繖鏃跺鍔燾ompletionWorkerThread鏁伴噺浣嗕笉瓚呰繃maxThreads,post Recv Completion Packet to IOCP.
2.timeout鏃舵鏌ユ槸鍚︾┖闂插拰褰撳墠completionWorkerThread鏁伴噺,褰撶┖闂叉椂淇濇寔鎴栧噺灝戣嚦minThreads鏁伴噺.
3.瀵規墍鏈堿ccepted-socket綆$悊鐢熷懡鍛ㄦ湡,榪欓噷鍒╃敤緋葷粺鐨刱eepalive probes,鑻ユ兂瀹炵幇涓氬姟灞?蹇冭煩鎺㈡祴"鍙渶灝哘SS_SIO_KEEPALIVE_VALS_TIMEOUT 鏀瑰洖緋葷粺榛樿鐨?灝忔椂.
涓嬮潰緇撳悎婧愪唬鐮?嫻呮瀽涓涓婭OCP:
socketserver.h
#ifndef __Q_SOCKET_SERVER__
#define __Q_SOCKET_SERVER__
#include <winsock2.h>
#include <mstcpip.h>
#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60*1000
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5*1000

#define MAX_THREADS 100
#define MAX_THREADS_MIN  10
#define MIN_WORKER_WAIT_TIMEOUT  20*1000
#define MAX_WORKER_WAIT_TIMEOUT  60*MIN_WORKER_WAIT_TIMEOUT

#define MAX_BUF_SIZE 1024

/*褰揂ccepted socket鍜宻ocket鍏抽棴鎴栧彂鐢熷紓甯告椂鍥炶皟CSocketLifecycleCallback*/
typedef void (*CSocketLifecycleCallback)(SOCKET cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose//娉ㄦ剰OnClose姝ゆ椂鐨剆ocket鏈繀鍙敤,鍙兘宸茬粡琚潪姝e父鍏抽棴鎴栧叾浠栧紓甯?

/*鍗忚澶勭悊鍥炶皟*/
typedef int (*InternalProtocolHandler)(LPWSAOVERLAPPED overlapped);//return -1:SOCKET_ERROR

typedef struct Q_SOCKET_SERVER SocketServer;
DWORD initializeSocketServer(SocketServer ** ssp,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,long workerWaitTimeout);
DWORD startSocketServer(SocketServer *ss);
DWORD shutdownSocketServer(SocketServer *ss);

#endif
 qsocketserver.c      綆縐?qss,鐩稿簲鐨凮VERLAPPED綆縐皅ssOl.
#include "socketserver.h"
#include "stdio.h"
typedef struct { 
  WORD passive;//daemon
  WORD port;
  WORD minThreads;
  WORD maxThreads;
  volatile long lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitKeyPosted,5-stopped
  long  workerWaitTimeout;//wait timeout 
  CRITICAL_SECTION QSS_LOCK;
  volatile long workerCounter;
  volatile long currentBusyWorkers;
  volatile long CSocketsCounter;//Accepted-socket寮曠敤璁℃暟
  CSocketLifecycleCallback cslifecb;
  InternalProtocolHandler protoHandler;
  WORD wsaVersion;//=MAKEWORD(2,0);
  WSADATA wsData;
  SOCKET server_s;
  SOCKADDR_IN serv_addr;
  HANDLE iocpHandle;
}QSocketServer;

typedef struct {
  WSAOVERLAPPED overlapped; 
  SOCKET client_s;
  SOCKADDR_IN client_addr;
  WORD optCode;
  char buf[MAX_BUF_SIZE];
  WSABUF wsaBuf;
  DWORD numberOfBytesTransferred;
  DWORD flags;
}QSSOverlapped;

DWORD  acceptorRoutine(LPVOID);
DWORD  completionWorkerRoutine(LPVOID);

static void adjustQSSWorkerLimits(QSocketServer *qss){
  /*adjust size and timeout.*/
  /*if(qss->maxThreads <= 0) {
   qss->maxThreads = MAX_THREADS;
        } else if (qss->maxThreads < MAX_THREADS_MIN) {           
         qss->maxThreads = MAX_THREADS_MIN;
        }
        if(qss->minThreads >  qss->maxThreads) {
         qss->minThreads =  qss->maxThreads;
        }
        if(qss->minThreads <= 0) {
            if(1 == qss->maxThreads) {
             qss->minThreads = 1;
            } else {
             qss->minThreads = qss->maxThreads/2;
            }
        }
       
        if(qss->workerWaitTimeout<MIN_WORKER_WAIT_TIMEOUT)
         qss->workerWaitTimeout=MIN_WORKER_WAIT_TIMEOUT;
        if(qss->workerWaitTimeout>MAX_WORKER_WAIT_TIMEOUT)
         qss->workerWaitTimeout=MAX_WORKER_WAIT_TIMEOUT;        */
}

typedef struct{
 QSocketServer * qss;
 HANDLE th;
}QSSWORKER_PARAM;

static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
 WORD res=0;
 if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads)){
  DWORD threadId;
  QSSWORKER_PARAM * pParam=NULL;
  int i=0;  
  EnterCriticalSection(&qss->QSS_LOCK);
  if(qss->workerCounter+addCounter<=qss->maxThreads)
   for(;i<addCounter;i++)
   {
    pParam=malloc(sizeof(QSSWORKER_PARAM));
    if(pParam){
     pParam->th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)completionWorkerRoutine,pParam,CREATE_SUSPENDED,&threadId);
     pParam->qss=qss;
     ResumeThread(pParam->th);
     qss->workerCounter++,res++; 
    }    
   }  
  LeaveCriticalSection(&qss->QSS_LOCK);
 }  
 return res;
}

static void SOlogger(const char * msg,SOCKET s,int clearup){
 perror(msg);
 if(s>0)
 closesocket(s);
 if(clearup)
 WSACleanup();
}

static int _InternalEchoProtocolHandler(LPWSAOVERLAPPED overlapped){
 QSSOverlapped *qssOl=(QSSOverlapped *)overlapped;
 
 printf("numOfT:%d,WSARecvd:%s,\n",qssOl->numberOfBytesTransferred,qssOl->buf);
 //Sleep(500); 
 return send(qssOl->client_s,qssOl->buf,qssOl->numberOfBytesTransferred,0);
}

DWORD initializeSocketServer(SocketServer ** ssp,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,long workerWaitTimeout){
 QSocketServer * qss=malloc(sizeof(QSocketServer));
 qss->passive=passive>0?1:0;
 qss->port=port;
 qss->minThreads=minThreads;
 qss->maxThreads=maxThreads;
 qss->workerWaitTimeout=workerWaitTimeout;
 qss->wsaVersion=MAKEWORD(2,0); 
 qss->lifecycleStatus=0;
 InitializeCriticalSection(&qss->QSS_LOCK);
 qss->workerCounter=0;
 qss->currentBusyWorkers=0;
 qss->CSocketsCounter=0;
 qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
 if(!qss->protoHandler)
  qss->protoHandler=_InternalEchoProtocolHandler; 
 adjustQSSWorkerLimits(qss);
 *ssp=(SocketServer *)qss;
 return 1;
}

DWORD startSocketServer(SocketServer *ss){ 
 QSocketServer * qss=(QSocketServer *)ss;
 if(qss==NULL||InterlockedCompareExchange(&qss->lifecycleStatus,1,0))
  return 0; 
 qss->serv_addr.sin_family=AF_INET;
 qss->serv_addr.sin_port=htons(qss->port);
 qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");
 if(WSAStartup(qss->wsaVersion,&qss->wsData)){  
  /*榪欓噷榪樻湁涓彃鏇插氨鏄繖涓猈SAStartup琚皟鐢ㄧ殑鏃跺?瀹冨眳鐒朵細鍚姩涓鏉¢澶栫殑綰跨▼,褰撶劧紼嶅悗榪欐潯綰跨▼浼氳嚜鍔ㄩ鍑虹殑.涓嶇煡WSAClearup鍙堜細濡備綍?......*/

  SOlogger("WSAStartup failed.\n",0,0);
  return 0;
 }
 qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
 if(qss->server_s==INVALID_SOCKET){  
  SOlogger("socket failed.\n",0,1);
  return 0;
 }
 if(bind(qss->server_s,(LPSOCKADDR)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR){  
  SOlogger("bind failed.\n",qss->server_s,1);
  return 0;
 }
 if(listen(qss->server_s,SOMAXCONN)==SOCKET_ERROR)/*榪欓噷鏉ヨ皥璋?strong>backlog
,寰堝浜轟笉鐭ラ亾璁炬垚浣曞?鎴戣鍒拌繃1,5,50,100鐨?鏈変漢璇磋瀹氱殑瓚婂ぇ瓚婅楄祫婧?鐨勭‘,榪欓噷璁炬垚SOMAXCONN涓嶄唬琛╳indows浼氱湡鐨勪嬌鐢⊿OMAXCONN,鑰屾槸" If set to SOMAXCONN, the underlying service provider responsible for socket s will set the backlog to a maximum reasonable value. "錛屽悓鏃跺湪鐜板疄鐜涓紝涓嶅悓鎿嶄綔緋葷粺鏀寔TCP緙撳啿闃熷垪鏈夋墍涓嶅悓錛屾墍浠ヨ繕涓嶅璁╂搷浣滅郴緇熸潵鍐沖畾瀹冪殑鍊箋傚儚Apache榪欑鏈嶅姟鍣細
#ifndef DEFAULT_LISTENBACKLOG
#define DEFAULT_LISTENBACKLOG 511
#endif
*/
    {       
  SOlogger("listen failed.\n",qss->server_s,1);
        return 0;
    }
 qss->iocpHandle=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,/*NumberOfConcurrentThreads-->*/qss->maxThreads);
 //initialize worker for completion routine.
 addQSSWorker(qss,qss->minThreads);  
 qss->lifecycleStatus=2;
 {
  QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
  pParam->qss=qss;
  pParam->th=NULL;
  if(qss->passive){
   DWORD threadId;
   pParam->th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)acceptorRoutine,pParam,0,&threadId); 
  }else
   return acceptorRoutine(pParam);
 }
 return 1;
}

DWORD shutdownSocketServer(SocketServer *ss){
 QSocketServer * qss=(QSocketServer *)ss;
 if(qss==NULL||InterlockedCompareExchange(&qss->lifecycleStatus,3,2)!=2)
  return 0; 
 closesocket(qss->server_s/*listen-socket*/);//..other accepted-sockets associated with the listen-socket will not be closed,except WSACleanup is called.. 
 if(qss->CSocketsCounter==0)
  qss->lifecycleStatus=4,PostQueuedCompletionStatus(qss->iocpHandle,0,-1,NULL);
 WSACleanup();  
 return 1;
}

DWORD  acceptorRoutine(LPVOID ss){
 QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)ss;
 QSocketServer * qss=pParam->qss;
 HANDLE curThread=pParam->th;
 QSSOverlapped *qssOl=NULL;
 SOCKADDR_IN client_addr;
 int client_addr_leng=sizeof(SOCKADDR_IN);
 SOCKET cs; 
 free(pParam);
 while(1){  
  printf("accept starting.....\n");
  cs/*Accepted-socket*/=accept(qss->server_s,(LPSOCKADDR)&client_addr,&client_addr_leng);
  if(cs==INVALID_SOCKET)
        {
   printf("accept failed:%d\n",GetLastError());   
            break;
        }else{//SO_KEEPALIVE,SIO_KEEPALIVE_VALS 榪欓噷鏄埄鐢ㄧ郴緇熺殑"蹇冭煩鎺㈡祴",keepalive probes.linux:setsockopt,SOL_TCP:TCP_KEEPIDLE,TCP_KEEPINTVL,TCP_KEEPCNT
            struct tcp_keepalive alive,aliveOut;
            int so_keepalive_opt=1;
            DWORD outDW;
            if(!setsockopt(cs,SOL_SOCKET,SO_KEEPALIVE,(char *)&so_keepalive_opt,sizeof(so_keepalive_opt))){
               alive.onoff=TRUE;
               alive.keepalivetime=QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
               alive.keepaliveinterval=QSS_SIO_KEEPALIVE_VALS_INTERVAL;
               if(WSAIoctl(cs,SIO_KEEPALIVE_VALS,&alive,sizeof(alive),&aliveOut,sizeof(aliveOut),&outDW,NULL,NULL)==SOCKET_ERROR){
                    printf("WSAIoctl SIO_KEEPALIVE_VALS failed:%d\n",GetLastError());   
                    break;
                }

            }else{
                     printf("setsockopt SO_KEEPALIVE failed:%d\n",GetLastError());   
                     break;
            }  
  }
  
  CreateIoCompletionPort((HANDLE)cs,qss->iocpHandle,cs,0);
  if(qssOl==NULL){
   qssOl=malloc(sizeof(QSSOverlapped));   
  }
  qssOl->client_s=cs;
  qssOl->wsaBuf.len=MAX_BUF_SIZE,qssOl->wsaBuf.buf=qssOl->buf,qssOl->numberOfBytesTransferred=0,qssOl->flags=0;//initialize WSABuf.
  memset(&qssOl->overlapped,0,sizeof(WSAOVERLAPPED));  
  {
   DWORD lastErr=GetLastError();
   int ret=0;
   SetLastError(0);
   ret=WSARecv(cs,&qssOl->wsaBuf,1,&qssOl->numberOfBytesTransferred,&qssOl->flags,&qssOl->overlapped,NULL);
   if(ret==0||(ret==SOCKET_ERROR&&GetLastError()==WSA_IO_PENDING)){
    InterlockedIncrement(&qss->CSocketsCounter);//Accepted-socket璁℃暟閫掑.
    if(qss->cslifecb)
     qss->cslifecb(cs,0);
    qssOl=NULL;
   }    
   
   if(!GetLastError())
    SetLastError(lastErr);
  }
  
  printf("accept flags:%d ,cs:%d.\n",GetLastError(),cs);
 }//end while.

 if(qssOl)
  free(qssOl);
 if(qss)
  shutdownSocketServer((SocketServer *)qss);
 if(curThread)
  CloseHandle(curThread);

 return 1;
}

static int postRecvCompletionPacket(QSSOverlapped * qssOl,int SOErrOccurredCode){ 
 int SOErrOccurred=0; 
 DWORD lastErr=GetLastError();
 SetLastError(0);
 //SOCKET_ERROR:-1,WSA_IO_PENDING:997
 if(WSARecv(qssOl->client_s,&qssOl->wsaBuf,1,&qssOl->numberOfBytesTransferred,&qssOl->flags,&qssOl->overlapped,NULL)==SOCKET_ERROR
  &&GetLastError()!=WSA_IO_PENDING)//this case lastError maybe 64, 10054 
 {
  SOErrOccurred=SOErrOccurredCode;  
 }      
 if(!GetLastError())
  SetLastError(lastErr); 
 if(SOErrOccurred)
  printf("worker[%d] postRecvCompletionPacket SOErrOccurred=%d,preErr:%d,postedErr:%d\n",GetCurrentThreadId(),SOErrOccurred,lastErr,GetLastError());
 return SOErrOccurred;
}

DWORD  completionWorkerRoutine(LPVOID ss){
 QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)ss;
 QSocketServer * qss=pParam->qss;
 HANDLE curThread=pParam->th;
 QSSOverlapped * qssOl=NULL;
 DWORD numberOfBytesTransferred=0;
 ULONG_PTR completionKey=0;
 int postRes=0,handleCode=0,exitCode=0,SOErrOccurred=0; 
 free(pParam);
 while(!exitCode){
  SetLastError(0);
  if(GetQueuedCompletionStatus(qss->iocpHandle,&numberOfBytesTransferred,&completionKey,(LPOVERLAPPED *)&qssOl,qss->workerWaitTimeout)){
   if(completionKey==-1&&qss->lifecycleStatus>=4)
   {
    printf("worker[%d] completionKey -1:%d \n",GetCurrentThreadId(),GetLastError());
    if(qss->workerCounter>1)
     PostQueuedCompletionStatus(qss->iocpHandle,0,-1,NULL);
    exitCode=1;
    break;
   }
   if(numberOfBytesTransferred>0){   
    
    InterlockedIncrement(&qss->currentBusyWorkers);
    addQSSWorker(qss,1);
    handleCode=qss->protoHandler((LPWSAOVERLAPPED)qssOl);    
    InterlockedDecrement(&qss->currentBusyWorkers);    
    
    if(handleCode>=0){
     SOErrOccurred=postRecvCompletionPacket(qssOl,1);
    }else
     SOErrOccurred=2;    
   }else{
    printf("worker[%d] numberOfBytesTransferred==0 ***** closesocket servS or cs *****,%d,%d ,ol is:%d\n",GetCurrentThreadId(),GetLastError(),completionKey,qssOl==NULL?0:1);
    SOErrOccurred=3;     
   }  
  }else{ //GetQueuedCompletionStatus rtn FALSE, lastError 64 ,995[timeout worker thread exit.] ,WAIT_TIMEOUT:258        
   if(qssOl){
    SOErrOccurred=postRecvCompletionPacket(qssOl,4);
   }else {    

    printf("worker[%d] GetQueuedCompletionStatus F:%d \n",GetCurrentThreadId(),GetLastError());
    if(GetLastError()!=WAIT_TIMEOUT){
     exitCode=2;     
    }else{//wait timeout     
     if(qss->lifecycleStatus!=4&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
      EnterCriticalSection(&qss->QSS_LOCK);
      if(qss->lifecycleStatus!=4&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
       qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
       exitCode=3;      
      }
      LeaveCriticalSection(&qss->QSS_LOCK);
     }
    }    
   }    
  }//end GetQueuedCompletionStatus.

  if(SOErrOccurred){   
   if(qss->cslifecb)
    qss->cslifecb(qssOl->client_s,-1);
   /*if(qssOl)*/{
    closesocket(qssOl->client_s);
    free(qssOl);
   }
   if(InterlockedDecrement(&qss->CSocketsCounter)==0&&qss->lifecycleStatus>=3){    
    //for qss workerSize,PostQueuedCompletionStatus -1
    qss->lifecycleStatus=4,PostQueuedCompletionStatus(qss->iocpHandle,0,-1,NULL);        
    exitCode=4;
   }
  }
  qssOl=NULL,numberOfBytesTransferred=0,completionKey=0,SOErrOccurred=0;//for net while.
 }//end while.

 //last to do
 if(exitCode!=3){ 
  int clearup=0;
  EnterCriticalSection(&qss->QSS_LOCK);
  if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
    clearup=1;
  }
  LeaveCriticalSection(&qss->QSS_LOCK);
  if(clearup){
   DeleteCriticalSection(&qss->QSS_LOCK);
   CloseHandle(qss->iocpHandle);
   free(qss); 
  }
 }
 CloseHandle(curThread);
 return 1;
}
------------------------------------------------------------------------------------------------------------------------
    瀵逛簬IOCP鐨凩astError鐨勮鯨鍒拰澶勭悊鏄釜闅劇偣,鎵浠ヨ娉ㄦ剰鎴戠殑completionWorkerRoutine鐨剋hile緇撴瀯,
緇撴瀯濡備笅:
while(!exitCode){
    if(completionKey==-1){...break;}
    if(GetQueuedCompletionStatus){/*鍦ㄨ繖涓猧f浣撲腑鍙浣犳姇閫掔殑OVERLAPPED is not NULL,閭d箞榪欓噷浣犲緱鍒扮殑灝辨槸瀹?/strong>.*/
        if(numberOfBytesTransferred>0){
               /*鍦ㄨ繖閲宧andle request,璁板緱瑕佺戶緇姇閫掍綘鐨凮VERLAPPED鍝? */
        }else{
              /*榪欓噷鍙兘瀹㈡埛绔垨鏈嶅姟绔痗losesocket(the socket),浣嗘槸OVERLAPPED is not NULL,鍙浣犳姇閫掔殑涓嶄負NULL!*/
        }
    }else{/*鍦ㄨ繖閲岀殑if浣撲腑,铏界劧GetQueuedCompletionStatus return FALSE,浣嗘槸涓嶄唬琛∣VERLAPPED涓瀹氫負NULL.鐗瑰埆鏄疧VERLAPPED is not NULL鐨勬儏鍐典笅,涓嶈浠ヤ負LastError鍙戠敓浜?灝變唬琛ㄥ綋鍓嶇殑socket鏃犵敤鎴栧彂鐢熻嚧鍛界殑寮傚父,姣斿鍙戠敓lastError:995榪欑鎯呭喌涓嬫鏃剁殑socket鏈夊彲鑳芥槸涓鍒囨甯哥殑鍙敤鐨?浣犱笉搴旇鍏抽棴瀹?/strong>.*/
        if(OVERLAPPED is not NULL){
             /*榪欑鎯呭喌涓?璇蜂笉綆?7,21緇х畫鎶曢掑惂!鍦ㄦ姇閫掑悗鍐嶆嫻嬮敊璇?/strong>.*/
        }else{ 

        }
    }
  if(socket error occured){

  }
  prepare for next while.

    琛屾枃浠撲績,闅懼厤鏈夐敊璇垨涓嶈凍涔嬪,甯屾湜澶у韙婅穬鎸囨璇勮,璋㈣阿!

    榪欎釜妯″瀷鍦ㄦц兘涓婅繕鏄湁鏀硅繘鐨勭┖闂村摝錛?br>



]]>
久久久久久久久久久| 久久播电影网| 精品一区二区久久久久久久网站| 久久99精品久久久久婷婷| 国产精品美女久久久免费| 偷偷做久久久久网站| 久久99国产综合精品女同| 久久天天躁狠狠躁夜夜av浪潮| 香蕉久久夜色精品升级完成| 久久国产精品99久久久久久老狼 | 97r久久精品国产99国产精| 国产国产成人久久精品| 亚洲AV日韩AV永久无码久久| 国内精品久久久久久麻豆| 久久国产乱子伦免费精品| 亚洲а∨天堂久久精品| 伊人色综合久久| 久久国产精品无码HDAV| 久久精品中文无码资源站| 久久久久久久国产免费看| 青青国产成人久久91网| 久久人人爽人人爽人人AV| 区久久AAA片69亚洲| 一本久久a久久精品综合夜夜| 久久人人爽爽爽人久久久| 久久久www免费人成精品| 日批日出水久久亚洲精品tv| 国产激情久久久久影院小草| 国产精品无码久久久久久| 亚洲色大成网站WWW久久九九| 久久久久久久免费视频| 亚洲精品无码久久久| 久久久久久A亚洲欧洲AV冫| 久久99精品国产麻豆蜜芽| 99久久www免费人成精品| 久久香蕉国产线看观看乱码| 国产精品久久网| 国产激情久久久久影院老熟女| 日韩精品久久久久久| 久久精品99无色码中文字幕| 日本国产精品久久|