#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5
#define QSS_SIO_KEEPALIVE_VALS_COUNT 3
#define MAX_THREADS 100
#define MAX_THREADS_MIN 10
#define MIN_WORKER_WAIT_TIMEOUT 20*1000
#define MAX_WORKER_WAIT_TIMEOUT 60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_THREADPOOLS 32
#define MAX_BUF_SIZE 1024
/* ulimit -n opened FDs per process.璁板緱淇敼鍝︼紝鍚﹀垯榪樻槸select鏁堟灉,灝變笉鏄痚poll鏁堟灉浜嗗摝錛屽懙鍛?/
#define BLOCKING_SEND_TIMEOUT 20
typedef void (*CSocketLifecycleCallback)(int cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose
typedef int (*BlockingSender_t)(void * senderBase,int cs, void * buf, size_t nbs);
typedef int (*InternalProtocolHandler)(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase);//return -1:SOCKET_ERROR
typedef struct {
WORD passive;
WORD port;//uint16_t
WORD minThreads;
WORD maxThreads;
pthread_spinlock_t g_spinlock;//PTHREAD_PROCESS_PRIVATE
volatile int lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitSignaled,5-stopped
int workerWaitTimeout;//wait timeout
volatile int workerCounter;
volatile int currentBusyWorkers;
volatile int CSocketsCounter;
CSocketLifecycleCallback cslifecb;
InternalProtocolHandler protoHandler;
SOCKET server_s;
SOCKADDR_IN serv_addr;
int epollFD;//main epoller.
int BSendEpollFD;//For blocking send.
}QSocketServer;
typedef struct {
SOCKET client_s;
SOCKADDR_IN client_addr;
uint32_t curEvents;
char buf[MAX_BUF_SIZE];
DWORD numberOfBytesTransferred;
char * data;
int BSendEpollFDRelated;
pthread_mutex_t writableLock;
pthread_cond_t writableMonitor;
}QSSEPollEvent;//for per connection
int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout);
int startSocketServer(QSocketServer *qss);
int shutdownSocketServer(QSocketServer *qss);
#endif
qsocketserver_model.c //涓嬮潰鐨勪唬鐮佺鐢熶駭鐜榪樺樊鍐呭瓨姹?/strong>鍜?strong>logger鍝︼紒
#include "socketserver.h"
#include <dirent.h>
#include <regex.h>
#define DIGIT_PATTERN_STRING "^[0-9]+$"
void * epollWorkerRoutine(void *);
void * blockingSendEpollerRoutine(void *);
int isDigitStr(const char *str){
int ret=-1;
regex_t regex;
regmatch_t matchs[1];
if(!regcomp(®ex,DIGIT_PATTERN_STRING,REG_EXTENDED/*榪欓噷涓嶈浼?鍝︼紝鍚﹀垯nomatch*/)){
ret=!regexec(®ex,str, 1,matchs,0);
regfree(®ex);
}
return ret;
}
static int setNonBlocking(int sock)
{
int opts;
opts=fcntl(sock,F_GETFL);
if(opts==-1)
{
perror("fcntl(sock,GETFL) failed!\n");
return opts;
}
opts = opts|O_NONBLOCK;
opts=fcntl(sock,F_SETFL,opts);
if(opts==-1)
{
perror("fcntl(sock,SETFL,opts) failed!\n");
return opts;
}
return 1;
}
static void adjustQSSWorkerLimits(QSocketServer *qss){
//to adjust availabe size.
}
typedef struct{
QSocketServer * qss;
pthread_t th;
}QSSWORKER_PARAM;
static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
WORD res=0;
if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads))
{
QSSWORKER_PARAM * pParam=NULL;
int i=0;
pthread_spin_lock(&qss->g_spinlock);
if(qss->workerCounter+addCounter<=qss->maxThreads)
for(;i<addCounter;i++)
{
pParam=malloc(sizeof(QSSWORKER_PARAM));
if(pParam){
pthread_create(&pParam->th,NULL,epollWorkerRoutine,pParam);
pParam->qss=qss;
qss->workerCounter++,res++;
}
}
pthread_spin_unlock(&qss->g_spinlock);
}
return res;
}
static void SOlogger(const char * msg,SOCKET s){
perror(msg);
if(s>0)
close(s);
}
static int _InternalProtocolHandler(struct epoll_event * event,BlockingSender_t _blockingSender,void * senderBase){
QSSEPollEvent *qssEPEvent=event->data.ptr;
int ret;
printf("_InternalProtocolHandler START pollRes==1,err:%d, ...cs:%d,,,,,th:%lu,\n",errno,qssEPEvent->client_s,pthread_self());
if((ret=recv(qssEPEvent->client_s,qssEPEvent->buf,MAX_BUF_SIZE,0))>0){
//sleep(5);
ret=_blockingSender(senderBase,qssEPEvent->client_s,qssEPEvent->buf,ret);
}
printf("_InternalProtocolHandler END ret=%d,err:%d,%s, ...cs:%d,,,,,th:%lu,\n",ret,errno,strerror(errno),qssEPEvent->client_s,pthread_self());
return ret;
}
int createSocketServer(QSocketServer ** qss_ptr,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,int workerWaitTimeout)
{
QSocketServer * qss=malloc(sizeof(QSocketServer));
qss->passive=passive;
qss->port=port;
qss->minThreads=minThreads;
qss->maxThreads=maxThreads;
qss->workerWaitTimeout=workerWaitTimeout;
qss->lifecycleStatus=0;
pthread_spin_init(&qss->g_spinlock,PTHREAD_PROCESS_PRIVATE);
qss->workerCounter=0;
qss->currentBusyWorkers=0;
qss->CSocketsCounter=0;
qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
if(!qss->protoHandler)
qss->protoHandler=_InternalProtocolHandler;
adjustQSSWorkerLimits(qss);
*qss_ptr=qss;
return 1;
}
int startSocketServer(QSocketServer *qss)
{
if(qss==NULL)
return 0;
else{
pthread_spin_lock(&qss->g_spinlock);
if(qss->lifecycleStatus==0){
qss->lifecycleStatus=1;
pthread_spin_unlock(&qss->g_spinlock);
}else{
pthread_spin_unlock(&qss->g_spinlock);
return 0;
}
}
//bzero(&qss->serv_addr, sizeof(qss->serv_addr));
qss->serv_addr.sin_family=AF_INET;
qss->serv_addr.sin_port=htons(qss->port);
inet_aton("127.0.0.1",&(qss->serv_addr.sin_addr));
//qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");
qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
if(setNonBlocking(qss->server_s)==-1)
{
SOlogger("setNonBlocking server_s failed.\n",0);
return 0;
}
if(qss->server_s==INVALID_SOCKET)
{
SOlogger("socket failed.\n",0);
return 0;
}
if(bind(qss->server_s,(struct sockaddr *)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR)
{
SOlogger("bind failed.\n",qss->server_s);
return 0;
}
if(listen(qss->server_s,SOMAXCONN/*榪欎釜瀹弚indows涔熸湁錛岃繖閲屾槸128錛屽綋鐒朵綘鍙互璁劇殑灝忎簺錛屽畠褰卞搷寮閿鐨?/strong>*/)==SOCKET_ERROR)
{
SOlogger("listen failed.\n",qss->server_s);
return 0;
}
qss->epollFD=epoll_create1(0);/*榪欓噷涓嶆槸epoll_create(size)鍝︼紝浣犲彲鑳戒笉鐭ラ亾濡備綍璁劇疆size,鎵浠ュ拷鐣ュ畠鍚?/
if(qss->epollFD==-1){
SOlogger("epoll_create1 0, main epollFD failed.\n",qss->server_s);
return 0;
}
qss->BSendEpollFD=epoll_create1(0);//for blocking send.
if(qss->BSendEpollFD==-1){
SOlogger("epoll_create1 0,BSendEpollFD failed.\n",qss->server_s);
return 0;
}
{//ADD ACCEPT EVENT
struct epoll_event _epEvent;
QSSEPollEvent *qssEPEvent=malloc(sizeof(QSSEPollEvent));
qssEPEvent->client_s=qss->server_s;
_epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET;
_epEvent.data.ptr=qssEPEvent;
if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,qss->server_s,&_epEvent)==-1){
SOlogger("epoll_ctl server_s to accept failed.\n",qss->server_s);
free(qssEPEvent);
return 0;
}
}
{//starup blocking send epoller.
QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
pParam->qss=qss;
pthread_create(&pParam->th,NULL,blockingSendEpollerRoutine,pParam);
}
//initialize worker for epoll events.
addQSSWorker(qss,qss->minThreads);
qss->lifecycleStatus=2;
return 1;
}
int shutdownSocketServer(QSocketServer *qss){
//change qss->lifecycleStatus
if(qss==NULL)
return 0;
else{
pthread_spin_lock(&qss->g_spinlock);
if(qss->lifecycleStatus==2){
qss->lifecycleStatus=3;
pthread_spin_unlock(&qss->g_spinlock);
}else{
pthread_spin_unlock(&qss->g_spinlock);
return 0;
}
}
/*shutdown server-listening socket,榪欓噷浼橀泤鐨勫仛娉曟槸shutdown--notify-->epoll-->close.璁板緱shutdown浼氬彂閫丒OF鐨勫摝*/
shutdown(qss->server_s,SHUT_RDWR);
// /proc/getpid/fd shutdown all socket cs != serv_s
{
char dirBuf[64];
struct dirent * de;
DIR *pd=NULL;
int sockFD;
sprintf(dirBuf,"/proc/%d/fd/",getpid());
pd=opendir(dirBuf);
if(pd!=NULL){
while((de=readdir(pd))!=NULL){
if(isDigitStr(de->d_name)){
sockFD=atoi(de->d_name);
if(isfdtype(sockFD,S_IFSOCK))
shutdown(sockFD,SHUT_RDWR);
}
}
closedir(pd);
}
/*fstat(ret,&_stat);S_ISSOCK(_stat.st_mode)======isfdtype(sockFD,S_IFSOCK)*/
}
return 1;
}
static int onAcceptRoutine(QSocketServer * qss)
{
SOCKADDR_IN client_addr;
unsigned int client_addr_leng=sizeof(SOCKADDR_IN);
SOCKET cs;
struct epoll_event _epEvent;
QSSEPollEvent *qssEPEvent=NULL;
cs=accept(qss->server_s,(struct sockaddr *)&client_addr,&client_addr_leng);
if(cs==INVALID_SOCKET)
{
printf("onAccept failed:%d,%s\n",errno,strerror(errno));
epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qss->server_s,NULL);//EINVAL 22 Invalid argument
return 0;
}
if(setNonBlocking(cs)==-1)
{
printf("onAccept setNonBlocking client_s failed.cs:%d\n",cs);
return 0;
}
{// set keepalive option
int keepAlive = 1;
int keepIdle = QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
int keepInterval = QSS_SIO_KEEPALIVE_VALS_INTERVAL;
int keepCount = QSS_SIO_KEEPALIVE_VALS_COUNT;
if(setsockopt(cs, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive))||
setsockopt(cs, SOL_TCP, TCP_KEEPIDLE, (void *)&keepIdle, sizeof(keepIdle))||
setsockopt(cs, SOL_TCP, TCP_KEEPINTVL, (void *)&keepInterval, sizeof(keepInterval))||
setsockopt(cs, SOL_TCP, TCP_KEEPCNT, (void *)&keepCount, sizeof(keepCount)))
{
printf("onAccept set keepalive option client_s failed.cs:%d,err:%s\n",cs,strerror(errno));
return 0;
}
}
qssEPEvent=malloc(sizeof(QSSEPollEvent));
qssEPEvent->client_s=cs;
{
_epEvent.events=qssEPEvent->curEvents=EPOLLIN|EPOLLET|EPOLLONESHOT;
qssEPEvent->BSendEpollFDRelated=0;
_epEvent.data.ptr=qssEPEvent;/*榪欓噷鍙堝拰鏁欑鐨勪笉涓鏍峰摝錛岀湡姝g殑user data鐢╬tr,鑰屼笉鏄崟涓鐨刦d*/
if(epoll_ctl(qss->epollFD,EPOLL_CTL_ADD,cs,&_epEvent)==-1){
printf("onAccept epoll_ctl client_s failed.cs:%d,err:%d\n",cs,errno);
free(qssEPEvent);
return 0;
}else{
pthread_spin_lock(&qss->g_spinlock);
qss->CSocketsCounter++;
pthread_spin_unlock(&qss->g_spinlock);
if(qss->cslifecb)
qss->cslifecb(cs,0);
}
}
printf("onAccepted flags:err:%d ,cs:%d.\n",errno,cs);
return 1;
}
typedef struct{
QSocketServer * qss;
QSSEPollEvent * event;
}InternalSenderBase_t;
static int internalBlockingSender(void * senderBase,int cs, void * _buf, size_t nbs){
InternalSenderBase_t *sb=(InternalSenderBase_t *)senderBase;
char * _sbuf=_buf;
int ret=0,sum=0,curEpoll_ctl_opt,*errno_ptr=&errno;
QSSEPollEvent *qssEPEvent=NULL;
struct epoll_event _epEvent;
struct timespec sendTimeo;
while(1){
*errno_ptr=0;
while(sum<nbs&&(ret=send(cs,_sbuf,nbs-sum,0))>0)
sum+=ret,_sbuf+=ret;
if(sum==nbs||ret==0)
break;
else if(ret==-1){
if(errno==EAGAIN&&sum<nbs){
qssEPEvent=sb->event;
_epEvent.data.ptr=qssEPEvent;
_epEvent.events=EPOLLOUT|EPOLLET|EPOLLONESHOT;
if(qssEPEvent->BSendEpollFDRelated==0){
pthread_mutex_init(&qssEPEvent->writableLock,NULL);
pthread_cond_init(&qssEPEvent->writableMonitor,NULL);
qssEPEvent->BSendEpollFDRelated=1;
curEpoll_ctl_opt=EPOLL_CTL_ADD;
}else{
curEpoll_ctl_opt=EPOLL_CTL_MOD;
}
{//wait writable.
int flag=0;
pthread_mutex_lock(&qssEPEvent->writableLock);
if(epoll_ctl(sb->qss->BSendEpollFD,curEpoll_ctl_opt,qssEPEvent->client_s,&_epEvent)==0){
sendTimeo.tv_nsec=0,sendTimeo.tv_sec=time(NULL)+BLOCKING_SEND_TIMEOUT;
int err=pthread_cond_timedwait(&qssEPEvent->writableMonitor,&qssEPEvent->writableLock,&sendTimeo);
if(err)
flag=-1;
}else
flag=-1;
pthread_mutex_unlock(&qssEPEvent->writableLock);
if(flag==-1)
break;
}
}else{
if(errno==EAGAIN&&sum==nbs)
ret=nbs;//it is ok;
break;
}
}
}//end while.
return ret;
}
void * blockingSendEpollerRoutine(void *_param){
QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
QSocketServer * qss=pParam->qss;
//pthread_t * curThread=&pParam->th;
struct epoll_event epEvents[qss->maxThreads];
QSSEPollEvent *qssEPEvent=NULL;
int pollRes,*errno_ptr=&errno;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
free(pParam);
while(1){
pollRes=epoll_wait(qss->BSendEpollFD,epEvents,qss->maxThreads,-1);
if(pollRes>=1){
int i=0;
for(;i<pollRes;i++)
if(epEvents[i].events&EPOLLOUT){//榪欎釜epollfd鍙簲璇ュ仛浠ヤ笅鐨勪簨鎯咃紝灝戝仛涓哄揩!
qssEPEvent=epEvents[i].data.ptr;
pthread_mutex_lock(&qssEPEvent->writableLock);
pthread_cond_signal(&qssEPEvent->writableMonitor);
pthread_mutex_unlock(&qssEPEvent->writableLock);
}
}else if(pollRes==-1){//errno
printf("blockingSendEpollerRoutine pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
break;
}
}
return NULL;
}
void * epollWorkerRoutine(void * _param){
QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)_param;
QSocketServer * qss=pParam->qss;
pthread_t * curThread=&pParam->th;
struct epoll_event _epEvent;
QSSEPollEvent *qssEPEvent=NULL;
InternalSenderBase_t _senderBase;
int pollRes=0,handleCode=0,exitCode=0,SOErrOccurred=0,*errno_ptr=&errno;
_senderBase.qss=qss;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);
free(pParam);
while(!exitCode){
*errno_ptr=0,SOErrOccurred=0,qssEPEvent=NULL;
pollRes=epoll_wait(qss->epollFD,&_epEvent,1,qss->workerWaitTimeout);
if(pollRes==1){
qssEPEvent=(QSSEPollEvent *)_epEvent.data.ptr;
if(qssEPEvent->client_s==qss->server_s)
{//Accepted Socket.
onAcceptRoutine(qss);
continue;
}else{
if(qss->protoHandler){
_senderBase.event=_epEvent.data.ptr;
pthread_spin_lock(&qss->g_spinlock);
qss->currentBusyWorkers++;
pthread_spin_unlock(&qss->g_spinlock);
addQSSWorker(qss,1);
handleCode=qss->protoHandler(&_epEvent,internalBlockingSender,&_senderBase);
pthread_spin_lock(&qss->g_spinlock);
qss->currentBusyWorkers--;
pthread_spin_unlock(&qss->g_spinlock);
if(handleCode>0){
_epEvent.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
if(epoll_ctl(qss->epollFD,EPOLL_CTL_MOD,qssEPEvent->client_s,&_epEvent)==-1)
SOErrOccurred=2;
}else{
SOErrOccurred=1;//maybe socket closed 0. Or -1 socket error.
}
}
}
}else if(pollRes==0){//timeout
printf("pollRes==0,err:%d, timeout...th:%lu\n",*errno_ptr,*curThread);
if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads)
{
pthread_spin_lock(&qss->g_spinlock);
if(qss->lifecycleStatus<=3&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
exitCode=2;
}
pthread_spin_unlock(&qss->g_spinlock);
}else if(qss->lifecycleStatus>=4)
exitCode=4;
}else if(pollRes==-1){//errno
printf("pollRes==-1,err:%d, errno...%s\n",*errno_ptr,strerror(*errno_ptr));
exitCode=1;
}
if(SOErrOccurred){
if(qss->cslifecb)
qss->cslifecb(qssEPEvent->client_s,-1);
/*if(qssEPEvent)*/{
epoll_ctl(qss->epollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
epoll_ctl(qss->BSendEpollFD,EPOLL_CTL_DEL,qssEPEvent->client_s,NULL);
close(qssEPEvent->client_s);
if(qssEPEvent->BSendEpollFDRelated){
pthread_cond_destroy(&qssEPEvent->writableMonitor);
pthread_mutex_destroy(&qssEPEvent->writableLock);
}
free(qssEPEvent);
}
pthread_spin_lock(&qss->g_spinlock);
if(--qss->CSocketsCounter==0&&qss->lifecycleStatus>=3){
//for qss workerSize,
qss->lifecycleStatus=4;
exitCode=3;
}
pthread_spin_unlock(&qss->g_spinlock);
}//SOErrOccurred handle;
}//end main while.
if(exitCode!=2){
int clearup=0;
pthread_spin_lock(&qss->g_spinlock);
if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
clearup=1;
}
pthread_spin_unlock(&qss->g_spinlock);
if(clearup){
close(qss->epollFD);
close(qss->BSendEpollFD);
pthread_spin_destroy(&qss->g_spinlock);
free(qss);
}
}//exitCode handle;
return NULL;
}
}QSSOverlapped;//for per connection
鎴戜笅闈㈢殑server妗嗘灦鐨勫熀鏈濇兂鏄?
One connection VS one thread in worker thread pool ,worker thread performs completionWorkerRoutine.
A Acceptor thread 涓撻棬鐢ㄦ潵accept socket,鍏寵仈鑷矷OCP,騫禬SARecv:post Recv Completion Packet to IOCP.
鍦╟ompletionWorkerRoutine涓湁浠ヤ笅鐨勮亴璐?
1.handle request,褰撳繖鏃跺鍔燾ompletionWorkerThread鏁伴噺浣嗕笉瓚呰繃maxThreads,post Recv Completion Packet to IOCP.
2.timeout鏃舵鏌ユ槸鍚︾┖闂插拰褰撳墠completionWorkerThread鏁伴噺,褰撶┖闂叉椂淇濇寔鎴栧噺灝戣嚦minThreads鏁伴噺.
3.瀵規墍鏈堿ccepted-socket綆$悊鐢熷懡鍛ㄦ湡,榪欓噷鍒╃敤緋葷粺鐨刱eepalive probes,鑻ユ兂瀹炵幇涓氬姟灞?蹇冭煩鎺㈡祴"鍙渶灝哘SS_SIO_KEEPALIVE_VALS_TIMEOUT 鏀瑰洖緋葷粺榛樿鐨?灝忔椂.
涓嬮潰緇撳悎婧愪唬鐮?嫻呮瀽涓涓婭OCP:
socketserver.h
#ifndef __Q_SOCKET_SERVER__
#define __Q_SOCKET_SERVER__
#include <winsock2.h>
#include <mstcpip.h>
#define QSS_SIO_KEEPALIVE_VALS_TIMEOUT 30*60*1000
#define QSS_SIO_KEEPALIVE_VALS_INTERVAL 5*1000
#define MAX_THREADS 100
#define MAX_THREADS_MIN 10
#define MIN_WORKER_WAIT_TIMEOUT 20*1000
#define MAX_WORKER_WAIT_TIMEOUT 60*MIN_WORKER_WAIT_TIMEOUT
#define MAX_BUF_SIZE 1024
/*褰揂ccepted socket鍜宻ocket鍏抽棴鎴栧彂鐢熷紓甯告椂鍥炶皟CSocketLifecycleCallback*/
typedef void (*CSocketLifecycleCallback)(SOCKET cs,int lifecycle);//lifecycle:0:OnAccepted,-1:OnClose//娉ㄦ剰OnClose姝ゆ椂鐨剆ocket鏈繀鍙敤,鍙兘宸茬粡琚潪姝e父鍏抽棴鎴栧叾浠栧紓甯?
/*鍗忚澶勭悊鍥炶皟*/
typedef int (*InternalProtocolHandler)(LPWSAOVERLAPPED overlapped);//return -1:SOCKET_ERROR
typedef struct Q_SOCKET_SERVER SocketServer;
DWORD initializeSocketServer(SocketServer ** ssp,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,long workerWaitTimeout);
DWORD startSocketServer(SocketServer *ss);
DWORD shutdownSocketServer(SocketServer *ss);
#endif
qsocketserver.c 綆縐?qss,鐩稿簲鐨凮VERLAPPED綆縐皅ssOl.
#include "socketserver.h"
#include "stdio.h"
typedef struct {
WORD passive;//daemon
WORD port;
WORD minThreads;
WORD maxThreads;
volatile long lifecycleStatus;//0-created,1-starting, 2-running,3-stopping,4-exitKeyPosted,5-stopped
long workerWaitTimeout;//wait timeout
CRITICAL_SECTION QSS_LOCK;
volatile long workerCounter;
volatile long currentBusyWorkers;
volatile long CSocketsCounter;//Accepted-socket寮曠敤璁℃暟
CSocketLifecycleCallback cslifecb;
InternalProtocolHandler protoHandler;
WORD wsaVersion;//=MAKEWORD(2,0);
WSADATA wsData;
SOCKET server_s;
SOCKADDR_IN serv_addr;
HANDLE iocpHandle;
}QSocketServer;
typedef struct {
WSAOVERLAPPED overlapped;
SOCKET client_s;
SOCKADDR_IN client_addr;
WORD optCode;
char buf[MAX_BUF_SIZE];
WSABUF wsaBuf;
DWORD numberOfBytesTransferred;
DWORD flags;
}QSSOverlapped;
DWORD acceptorRoutine(LPVOID);
DWORD completionWorkerRoutine(LPVOID);
static void adjustQSSWorkerLimits(QSocketServer *qss){
/*adjust size and timeout.*/
/*if(qss->maxThreads <= 0) {
qss->maxThreads = MAX_THREADS;
} else if (qss->maxThreads < MAX_THREADS_MIN) {
qss->maxThreads = MAX_THREADS_MIN;
}
if(qss->minThreads > qss->maxThreads) {
qss->minThreads = qss->maxThreads;
}
if(qss->minThreads <= 0) {
if(1 == qss->maxThreads) {
qss->minThreads = 1;
} else {
qss->minThreads = qss->maxThreads/2;
}
}
if(qss->workerWaitTimeout<MIN_WORKER_WAIT_TIMEOUT)
qss->workerWaitTimeout=MIN_WORKER_WAIT_TIMEOUT;
if(qss->workerWaitTimeout>MAX_WORKER_WAIT_TIMEOUT)
qss->workerWaitTimeout=MAX_WORKER_WAIT_TIMEOUT; */
}
typedef struct{
QSocketServer * qss;
HANDLE th;
}QSSWORKER_PARAM;
static WORD addQSSWorker(QSocketServer *qss,WORD addCounter){
WORD res=0;
if(qss->workerCounter<qss->minThreads||(qss->currentBusyWorkers==qss->workerCounter&&qss->workerCounter<qss->maxThreads)){
DWORD threadId;
QSSWORKER_PARAM * pParam=NULL;
int i=0;
EnterCriticalSection(&qss->QSS_LOCK);
if(qss->workerCounter+addCounter<=qss->maxThreads)
for(;i<addCounter;i++)
{
pParam=malloc(sizeof(QSSWORKER_PARAM));
if(pParam){
pParam->th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)completionWorkerRoutine,pParam,CREATE_SUSPENDED,&threadId);
pParam->qss=qss;
ResumeThread(pParam->th);
qss->workerCounter++,res++;
}
}
LeaveCriticalSection(&qss->QSS_LOCK);
}
return res;
}
static void SOlogger(const char * msg,SOCKET s,int clearup){
perror(msg);
if(s>0)
closesocket(s);
if(clearup)
WSACleanup();
}
static int _InternalEchoProtocolHandler(LPWSAOVERLAPPED overlapped){
QSSOverlapped *qssOl=(QSSOverlapped *)overlapped;
printf("numOfT:%d,WSARecvd:%s,\n",qssOl->numberOfBytesTransferred,qssOl->buf);
//Sleep(500);
return send(qssOl->client_s,qssOl->buf,qssOl->numberOfBytesTransferred,0);
}
DWORD initializeSocketServer(SocketServer ** ssp,WORD passive,WORD port,CSocketLifecycleCallback cslifecb,InternalProtocolHandler protoHandler,WORD minThreads,WORD maxThreads,long workerWaitTimeout){
QSocketServer * qss=malloc(sizeof(QSocketServer));
qss->passive=passive>0?1:0;
qss->port=port;
qss->minThreads=minThreads;
qss->maxThreads=maxThreads;
qss->workerWaitTimeout=workerWaitTimeout;
qss->wsaVersion=MAKEWORD(2,0);
qss->lifecycleStatus=0;
InitializeCriticalSection(&qss->QSS_LOCK);
qss->workerCounter=0;
qss->currentBusyWorkers=0;
qss->CSocketsCounter=0;
qss->cslifecb=cslifecb,qss->protoHandler=protoHandler;
if(!qss->protoHandler)
qss->protoHandler=_InternalEchoProtocolHandler;
adjustQSSWorkerLimits(qss);
*ssp=(SocketServer *)qss;
return 1;
}
DWORD startSocketServer(SocketServer *ss){
QSocketServer * qss=(QSocketServer *)ss;
if(qss==NULL||InterlockedCompareExchange(&qss->lifecycleStatus,1,0))
return 0;
qss->serv_addr.sin_family=AF_INET;
qss->serv_addr.sin_port=htons(qss->port);
qss->serv_addr.sin_addr.s_addr=INADDR_ANY;//inet_addr("127.0.0.1");
if(WSAStartup(qss->wsaVersion,&qss->wsData)){
/*榪欓噷榪樻湁涓彃鏇插氨鏄繖涓猈SAStartup琚皟鐢ㄧ殑鏃跺?瀹冨眳鐒朵細鍚姩涓鏉¢澶栫殑綰跨▼,褰撶劧紼嶅悗榪欐潯綰跨▼浼氳嚜鍔ㄩ鍑虹殑.涓嶇煡WSAClearup鍙堜細濡備綍?......*/
SOlogger("WSAStartup failed.\n",0,0);
return 0;
}
qss->server_s=socket(AF_INET,SOCK_STREAM,IPPROTO_IP);
if(qss->server_s==INVALID_SOCKET){
SOlogger("socket failed.\n",0,1);
return 0;
}
if(bind(qss->server_s,(LPSOCKADDR)&qss->serv_addr,sizeof(SOCKADDR_IN))==SOCKET_ERROR){
SOlogger("bind failed.\n",qss->server_s,1);
return 0;
}
if(listen(qss->server_s,SOMAXCONN)==SOCKET_ERROR)/*榪欓噷鏉ヨ皥璋?strong>backlog,寰堝浜轟笉鐭ラ亾璁炬垚浣曞?鎴戣鍒拌繃1,5,50,100鐨?鏈変漢璇磋瀹氱殑瓚婂ぇ瓚婅楄祫婧?鐨勭‘,榪欓噷璁炬垚SOMAXCONN涓嶄唬琛╳indows浼氱湡鐨勪嬌鐢⊿OMAXCONN,鑰屾槸" If set to SOMAXCONN, the underlying service provider responsible for socket s will set the backlog to a maximum reasonable value. "錛屽悓鏃跺湪鐜板疄鐜涓紝涓嶅悓鎿嶄綔緋葷粺鏀寔TCP緙撳啿闃熷垪鏈夋墍涓嶅悓錛屾墍浠ヨ繕涓嶅璁╂搷浣滅郴緇熸潵鍐沖畾瀹冪殑鍊箋傚儚Apache榪欑鏈嶅姟鍣細
#ifndef DEFAULT_LISTENBACKLOG
#define DEFAULT_LISTENBACKLOG 511
#endif
*/
{
SOlogger("listen failed.\n",qss->server_s,1);
return 0;
}
qss->iocpHandle=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,/*NumberOfConcurrentThreads-->*/qss->maxThreads);
//initialize worker for completion routine.
addQSSWorker(qss,qss->minThreads);
qss->lifecycleStatus=2;
{
QSSWORKER_PARAM * pParam=malloc(sizeof(QSSWORKER_PARAM));
pParam->qss=qss;
pParam->th=NULL;
if(qss->passive){
DWORD threadId;
pParam->th=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)acceptorRoutine,pParam,0,&threadId);
}else
return acceptorRoutine(pParam);
}
return 1;
}
DWORD shutdownSocketServer(SocketServer *ss){
QSocketServer * qss=(QSocketServer *)ss;
if(qss==NULL||InterlockedCompareExchange(&qss->lifecycleStatus,3,2)!=2)
return 0;
closesocket(qss->server_s/*listen-socket*/);//..other accepted-sockets associated with the listen-socket will not be closed,except WSACleanup is called..
if(qss->CSocketsCounter==0)
qss->lifecycleStatus=4,PostQueuedCompletionStatus(qss->iocpHandle,0,-1,NULL);
WSACleanup();
return 1;
}
DWORD acceptorRoutine(LPVOID ss){
QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)ss;
QSocketServer * qss=pParam->qss;
HANDLE curThread=pParam->th;
QSSOverlapped *qssOl=NULL;
SOCKADDR_IN client_addr;
int client_addr_leng=sizeof(SOCKADDR_IN);
SOCKET cs;
free(pParam);
while(1){
printf("accept starting.....\n");
cs/*Accepted-socket*/=accept(qss->server_s,(LPSOCKADDR)&client_addr,&client_addr_leng);
if(cs==INVALID_SOCKET)
{
printf("accept failed:%d\n",GetLastError());
break;
}else{//SO_KEEPALIVE,SIO_KEEPALIVE_VALS 榪欓噷鏄埄鐢ㄧ郴緇熺殑"蹇冭煩鎺㈡祴",keepalive probes.linux:setsockopt,SOL_TCP:TCP_KEEPIDLE,TCP_KEEPINTVL,TCP_KEEPCNT
struct tcp_keepalive alive,aliveOut;
int so_keepalive_opt=1;
DWORD outDW;
if(!setsockopt(cs,SOL_SOCKET,SO_KEEPALIVE,(char *)&so_keepalive_opt,sizeof(so_keepalive_opt))){
alive.onoff=TRUE;
alive.keepalivetime=QSS_SIO_KEEPALIVE_VALS_TIMEOUT;
alive.keepaliveinterval=QSS_SIO_KEEPALIVE_VALS_INTERVAL;
if(WSAIoctl(cs,SIO_KEEPALIVE_VALS,&alive,sizeof(alive),&aliveOut,sizeof(aliveOut),&outDW,NULL,NULL)==SOCKET_ERROR){
printf("WSAIoctl SIO_KEEPALIVE_VALS failed:%d\n",GetLastError());
break;
}
}else{
printf("setsockopt SO_KEEPALIVE failed:%d\n",GetLastError());
break;
}
}
CreateIoCompletionPort((HANDLE)cs,qss->iocpHandle,cs,0);
if(qssOl==NULL){
qssOl=malloc(sizeof(QSSOverlapped));
}
qssOl->client_s=cs;
qssOl->wsaBuf.len=MAX_BUF_SIZE,qssOl->wsaBuf.buf=qssOl->buf,qssOl->numberOfBytesTransferred=0,qssOl->flags=0;//initialize WSABuf.
memset(&qssOl->overlapped,0,sizeof(WSAOVERLAPPED));
{
DWORD lastErr=GetLastError();
int ret=0;
SetLastError(0);
ret=WSARecv(cs,&qssOl->wsaBuf,1,&qssOl->numberOfBytesTransferred,&qssOl->flags,&qssOl->overlapped,NULL);
if(ret==0||(ret==SOCKET_ERROR&&GetLastError()==WSA_IO_PENDING)){
InterlockedIncrement(&qss->CSocketsCounter);//Accepted-socket璁℃暟閫掑.
if(qss->cslifecb)
qss->cslifecb(cs,0);
qssOl=NULL;
}
if(!GetLastError())
SetLastError(lastErr);
}
printf("accept flags:%d ,cs:%d.\n",GetLastError(),cs);
}//end while.
if(qssOl)
free(qssOl);
if(qss)
shutdownSocketServer((SocketServer *)qss);
if(curThread)
CloseHandle(curThread);
return 1;
}
static int postRecvCompletionPacket(QSSOverlapped * qssOl,int SOErrOccurredCode){
int SOErrOccurred=0;
DWORD lastErr=GetLastError();
SetLastError(0);
//SOCKET_ERROR:-1,WSA_IO_PENDING:997
if(WSARecv(qssOl->client_s,&qssOl->wsaBuf,1,&qssOl->numberOfBytesTransferred,&qssOl->flags,&qssOl->overlapped,NULL)==SOCKET_ERROR
&&GetLastError()!=WSA_IO_PENDING)//this case lastError maybe 64, 10054
{
SOErrOccurred=SOErrOccurredCode;
}
if(!GetLastError())
SetLastError(lastErr);
if(SOErrOccurred)
printf("worker[%d] postRecvCompletionPacket SOErrOccurred=%d,preErr:%d,postedErr:%d\n",GetCurrentThreadId(),SOErrOccurred,lastErr,GetLastError());
return SOErrOccurred;
}
DWORD completionWorkerRoutine(LPVOID ss){
QSSWORKER_PARAM * pParam=(QSSWORKER_PARAM *)ss;
QSocketServer * qss=pParam->qss;
HANDLE curThread=pParam->th;
QSSOverlapped * qssOl=NULL;
DWORD numberOfBytesTransferred=0;
ULONG_PTR completionKey=0;
int postRes=0,handleCode=0,exitCode=0,SOErrOccurred=0;
free(pParam);
while(!exitCode){
SetLastError(0);
if(GetQueuedCompletionStatus(qss->iocpHandle,&numberOfBytesTransferred,&completionKey,(LPOVERLAPPED *)&qssOl,qss->workerWaitTimeout)){
if(completionKey==-1&&qss->lifecycleStatus>=4)
{
printf("worker[%d] completionKey -1:%d \n",GetCurrentThreadId(),GetLastError());
if(qss->workerCounter>1)
PostQueuedCompletionStatus(qss->iocpHandle,0,-1,NULL);
exitCode=1;
break;
}
if(numberOfBytesTransferred>0){
InterlockedIncrement(&qss->currentBusyWorkers);
addQSSWorker(qss,1);
handleCode=qss->protoHandler((LPWSAOVERLAPPED)qssOl);
InterlockedDecrement(&qss->currentBusyWorkers);
if(handleCode>=0){
SOErrOccurred=postRecvCompletionPacket(qssOl,1);
}else
SOErrOccurred=2;
}else{
printf("worker[%d] numberOfBytesTransferred==0 ***** closesocket servS or cs *****,%d,%d ,ol is:%d\n",GetCurrentThreadId(),GetLastError(),completionKey,qssOl==NULL?0:1);
SOErrOccurred=3;
}
}else{ //GetQueuedCompletionStatus rtn FALSE, lastError 64 ,995[timeout worker thread exit.] ,WAIT_TIMEOUT:258
if(qssOl){
SOErrOccurred=postRecvCompletionPacket(qssOl,4);
}else {
printf("worker[%d] GetQueuedCompletionStatus F:%d \n",GetCurrentThreadId(),GetLastError());
if(GetLastError()!=WAIT_TIMEOUT){
exitCode=2;
}else{//wait timeout
if(qss->lifecycleStatus!=4&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
EnterCriticalSection(&qss->QSS_LOCK);
if(qss->lifecycleStatus!=4&&qss->currentBusyWorkers==0&&qss->workerCounter>qss->minThreads){
qss->workerCounter--;//until qss->workerCounter decrease to qss->minThreads
exitCode=3;
}
LeaveCriticalSection(&qss->QSS_LOCK);
}
}
}
}//end GetQueuedCompletionStatus.
if(SOErrOccurred){
if(qss->cslifecb)
qss->cslifecb(qssOl->client_s,-1);
/*if(qssOl)*/{
closesocket(qssOl->client_s);
free(qssOl);
}
if(InterlockedDecrement(&qss->CSocketsCounter)==0&&qss->lifecycleStatus>=3){
//for qss workerSize,PostQueuedCompletionStatus -1
qss->lifecycleStatus=4,PostQueuedCompletionStatus(qss->iocpHandle,0,-1,NULL);
exitCode=4;
}
}
qssOl=NULL,numberOfBytesTransferred=0,completionKey=0,SOErrOccurred=0;//for net while.
}//end while.
//last to do
if(exitCode!=3){
int clearup=0;
EnterCriticalSection(&qss->QSS_LOCK);
if(!--qss->workerCounter&&qss->lifecycleStatus>=4){//clearup QSS
clearup=1;
}
LeaveCriticalSection(&qss->QSS_LOCK);
if(clearup){
DeleteCriticalSection(&qss->QSS_LOCK);
CloseHandle(qss->iocpHandle);
free(qss);
}
}
CloseHandle(curThread);
return 1;
}
------------------------------------------------------------------------------------------------------------------------
瀵逛簬IOCP鐨凩astError鐨勮鯨鍒拰澶勭悊鏄釜闅劇偣,鎵浠ヨ娉ㄦ剰鎴戠殑completionWorkerRoutine鐨剋hile緇撴瀯,
緇撴瀯濡備笅:
while(!exitCode){
if(completionKey==-1){...break;}
if(GetQueuedCompletionStatus){/*鍦ㄨ繖涓猧f浣撲腑鍙浣犳姇閫掔殑OVERLAPPED is not NULL,閭d箞榪欓噷浣犲緱鍒扮殑灝辨槸瀹?/strong>.*/
if(numberOfBytesTransferred>0){
/*鍦ㄨ繖閲宧andle request,璁板緱瑕佺戶緇姇閫掍綘鐨凮VERLAPPED鍝? */
}else{
/*榪欓噷鍙兘瀹㈡埛绔垨鏈嶅姟绔痗losesocket(the socket),浣嗘槸OVERLAPPED is not NULL,鍙浣犳姇閫掔殑涓嶄負NULL!*/
}
}else{/*鍦ㄨ繖閲岀殑if浣撲腑,铏界劧GetQueuedCompletionStatus return FALSE,浣嗘槸涓嶄唬琛∣VERLAPPED涓瀹氫負NULL.鐗瑰埆鏄疧VERLAPPED is not NULL鐨勬儏鍐典笅,涓嶈浠ヤ負LastError鍙戠敓浜?灝變唬琛ㄥ綋鍓嶇殑socket鏃犵敤鎴栧彂鐢熻嚧鍛界殑寮傚父,姣斿鍙戠敓lastError:995榪欑鎯呭喌涓嬫鏃剁殑socket鏈夊彲鑳芥槸涓鍒囨甯哥殑鍙敤鐨?浣犱笉搴旇鍏抽棴瀹?/strong>.*/
if(OVERLAPPED is not NULL){
/*榪欑鎯呭喌涓?璇蜂笉綆?7,21緇х畫鎶曢掑惂!鍦ㄦ姇閫掑悗鍐嶆嫻嬮敊璇?/strong>.*/
}else{
}
}
if(socket error occured){
}
prepare for next while.
}
琛屾枃浠撲績,闅懼厤鏈夐敊璇垨涓嶈凍涔嬪,甯屾湜澶у韙婅穬鎸囨璇勮,璋㈣阿!
榪欎釜妯″瀷鍦ㄦц兘涓婅繕鏄湁鏀硅繘鐨勭┖闂村摝錛?br>