////////////////////////////////////////////////////////////////////////
/*
?* Copyright (c) 2006-2008
?* Author: Weiming Zhou
?*
?* Permission to use, copy, modify, distribute and sell this software
?* and its documentation for any purpose is hereby granted without fee,
?* provided that the above copyright notice appear in all copies and
?* that both that copyright notice and this permission notice appear
?* in supporting documentation.?
?*/
?
/*
?*?? CNestTaskScheduler.h
?*
?* ? DESCRIPTION
?*?????? Module for Task Scheduler class
?*
?*?? HISTORY
?*?????? 06-08-2008??? create by zhouweiming.
?*
?*/
#ifndef
__CNESTTASKSCHEDULER_H__
#define
__CNESTTASKSCHEDULER_H__
?
#include
"CLocalQueue.h"
#include
"CStealQueue.h"
#include
"CDistributedQueue.h"
?
class
CNestTaskScheduler {
private
:
??? CThreadPool???? m_ThreadPool;
//(TaskScheduler_StartFunc, NULL, 0);
??? CDistributedQueue<TASK, CLocalQueue<TASK>, CStealQueue<TASK>> m_DQueue;
??? THREADFUNC????? m_StartFunc;?
//
為線程池使用的線程入口函數指針
??? LONG?
volatile
? m_lTaskId;???
//Task Id,
用于判斷是否喚醒對應的線程
?
public
:
??? CNestTaskScheduler();
???
virtual
~CNestTaskScheduler(){};
?
?
???
//
下面兩個函數為調度器本身直接使用
???
void
SetStartFunc(THREADFUNC StartFunc);
???
int
GetTask(TASK &Task);
??? CThreadPool & GetThreadPool();
??? LONG AtomicIncrementTaskId();
???
???
//
下面三個函數為調度器的使用者使用
???
void
SpawnLocalTask(TASK &Task);
???
void
SpawnTask(TASK &Task);
???
void
BeginRootThread(TASK &Task);
};
?
#endif
//__CNESTTASKSCHEDULER_H__
?
////////////////////////////////////////////////////////////////////////
/*
?* Copyright (c) 2006-2008
?* Author: Weiming Zhou
?*
?* Permission to use, copy, modify, distribute and sell this software
?* and its documentation for any purpose is hereby granted without fee,
?* provided that the above copyright notice appear in all copies and
?* that both that copyright notice and this permission notice appear
?* in supporting documentation.?
?*/
?
#include
"CapiGlobal.h"
#include
"CThreadPool.h"
#include
"CLocalQueue.h"
#include
"CStealQueue.h"
#include
"CDistributedQueue.h"
#include
"CNestTaskScheduler.h"
?
static
unsigned
int
WINAPI NestTaskScheduler_StartFunc(
void
*pArgs);
?
#define
???? NESTTASK_QUEUE_SIZE???? 128
?
/**?
嵌套任務調度的構造函數
?
???? @return? constructor -
無??
*/
CNestTaskScheduler::CNestTaskScheduler()
{
??? m_StartFunc = NestTaskScheduler_StartFunc;
?
???
int
n = m_ThreadPool.GetThreadCount();
??? m_DQueue.Create(NESTTASK_QUEUE_SIZE, n, NESTTASK_QUEUE_SIZE, 0,
??????? ThreadPool_GetThreadId, &m_ThreadPool);
}
?
/**?
嵌套任務調度的設置線程池的入口函數
?
???? @param?? THREADFUNC StartFunc -
線程池的入口函數??
???? @return? void -
無
*/
void
CNestTaskScheduler::SetStartFunc(THREADFUNC StartFunc)
{
??? m_StartFunc = StartFunc;
}
?
/**?
嵌套任務調度的獲取任務函數
?
???? @param?? TASK &Task -
接收從分布式隊列中獲取的任務
???? @return? int -
成功返回CAPI_SUCCESS, 失敗返回CAPI_FAILED.??
*/
int
CNestTaskScheduler::GetTask(TASK &Task)
{
???
//
先從本地隊列獲取任務
???
//
本地獲取任務失敗后從共享隊列獲取任務
???
return
m_DQueue.DeQueue(Task);
};
?
?
/**?
嵌套任務調度的獲取線程池函數
?
??? @return?? CThreadPool & -
返回線程池對象??
*/
CThreadPool & CNestTaskScheduler::GetThreadPool()
{
???
return
m_ThreadPool;
}
?
/**?
嵌套任務調度的原子增加任務Id函數
?
??? @return?? int -
返回原子加后的任務Id.
*/
LONG CNestTaskScheduler::AtomicIncrementTaskId()
{
??? LONG Id = AtomicIncrement(&m_lTaskId);
???
return
Id;
}
?
?
?
/**?
嵌套任務調度的生成當前線程的本地任務
???
任務被放入當前線程的本地隊列中
?
???? @param?? TASK &Task -
待執行的任務??
???? @return? void -
無
*/
void
CNestTaskScheduler::SpawnLocalTask(TASK &Task)
{
???
//
將任務放入本地隊列中
??? m_DQueue.PushToLocalQueue(Task);
};
?
?
/**?
嵌套任務調度的生成任務函數
???
生成的任務被放入分布式隊列中
?
???? @param?? TASK &Task -
待執行的任務??
???? @return? void -
無
*/
void
CNestTaskScheduler::SpawnTask(TASK &Task)
{
???
if
( m_lTaskId < m_ThreadPool.GetThreadCount() )
??? {
???????
//
依次喚醒各個掛起的線程
??????? LONG Id = AtomicIncrement(&m_lTaskId);
???????
if
( Id < m_ThreadPool.GetThreadCount() )
??????? {
???????????
//
下面之所以可以對其他線程的本地隊列進行無同步的操作,是因為
???????????
//
訪問這些隊列的線程在進隊操作之后才開始運行
??????????? m_DQueue.PushToLocalQueue(Task, Id);
??????????? m_ThreadPool.ExecThread(Id);
??????? }
???????
else
??????? {
??????????? m_DQueue.EnQueue(Task);
??????? }
??? }
???
else
??? {
???????
//
先判斷共享隊列是否滿,如果未滿則放入共享隊列中
???????
//
如果滿了則放入本地隊列中
??????? m_DQueue.EnQueue(Task);
??? }
};
?
?
/**?
嵌套任務調度的啟動根線程函數
?
???? @param?? TASK &Task -
要執行的最初任務???
???? @return? void -
無
*/
void
CNestTaskScheduler::BeginRootThread(TASK &Task)
{
??? m_lTaskId = 0;
?
??? m_ThreadPool.CreateThreadPool(m_StartFunc,
this
, 0);
?
??? m_DQueue.PushToLocalQueue(Task, 0);
?
??? m_ThreadPool.ExecThread( 0 );?
?
??? m_ThreadPool.WaitAllThread();
}
?
/**?
嵌套任務調度的線程池入口函數
?
???? @param?? void *pArgs - CNestTaskScheduler
類型的參數????
???? @return? unsigned int WINAPI -
返回?
*/
unsigned
int
WINAPI NestTaskScheduler_StartFunc(
void
*pArgs)
{
??? CNestTaskScheduler? *pSched = (CNestTaskScheduler *)pArgs;
?
??? TASK??? Task;
???
int
???? nRet;
?
???
for
( ;; )
??? {
??????? nRet = pSched->GetTask(Task);
?
???????
if
( nRet == CAPI_FAILED )
??????? {
??????????? CThreadPool &ThreadPool = pSched->GetThreadPool();
???????????
???????????
//
喚醒一個掛起的線程,防止任務數量小于CPU核數時,
???????????
//
仍然有任務處于掛起狀態,從而導致WaitAllThread()處于死等狀態
???????????
//
這個喚醒過程是一個串行的過程,被喚醒的任務會繼續喚醒一個掛起線程
??????????? LONG Id = pSched->AtomicIncrementTaskId();
???????????
if
( Id < ThreadPool.GetThreadCount() )
??????????? {
??????????????? ThreadPool.ExecThread(Id);
??????????? }
???????????
break
;
??????? }
?
??????? (*(Task.func))(Task.pArg);
??? }
?
???
return
0;
}