? 動(dòng)態(tài)任務(wù)分解方式分類:非嵌套型動(dòng)態(tài)任務(wù),嵌套型動(dòng)態(tài)任務(wù)。
1、非嵌套型動(dòng)態(tài)任務(wù)
??????使用分布式隊(duì)列
2、嵌套型動(dòng)態(tài)任務(wù)
??????嵌套型任務(wù)通常有一個(gè)或多個(gè)開始任務(wù),其他任務(wù)的產(chǎn)生均源自于開始任務(wù)。
??????調(diào)度方法:每個(gè)線程都有一個(gè)本地隊(duì)列,另外所有線程擁有一個(gè)共享隊(duì)列。當(dāng)每個(gè)線程產(chǎn)生N個(gè)任務(wù)時(shí),首先選擇一個(gè)任務(wù)送入本地隊(duì)列,然后將其他任務(wù)送入共享隊(duì)列。
??????對(duì)應(yīng)的CAPI代碼如下:
////////////////////////////////////////////////////////////////////////
/*
?* Copyright (c) 2006-2008
?* Author: Weiming Zhou
?*
?* Permission to use, copy, modify, distribute and sell this software
?* and its documentation for any purpose is hereby granted without fee,
?* provided that the above copyright notice appear in all copies and
?* that both that copyright notice and this permission notice appear
?* in supporting documentation.?
?*/
?
/*
?*?? CNestTaskScheduler.h
?*
?* ? DESCRIPTION
?*?????? Module for Task Scheduler class
?*
?*?? HISTORY
?*?????? 06-08-2008??? create by zhouweiming.
?*
?*/
#ifndef
__CNESTTASKSCHEDULER_H__
#define
__CNESTTASKSCHEDULER_H__
?
#include
"CLocalQueue.h"
#include
"CStealQueue.h"
#include
"CDistributedQueue.h"
?
class
CNestTaskScheduler {
private
:
??? CThreadPool???? m_ThreadPool;
//(TaskScheduler_StartFunc, NULL, 0);
??? CDistributedQueue<TASK, CLocalQueue<TASK>, CStealQueue<TASK>> m_DQueue;
??? THREADFUNC????? m_StartFunc;?
//
為線程池使用的線程入口函數(shù)指針
??? LONG?
volatile
? m_lTaskId;???
//Task Id,
用于判斷是否喚醒對(duì)應(yīng)的線程
?
public
:
??? CNestTaskScheduler();
???
virtual
~CNestTaskScheduler(){};
?
?
???
//
下面兩個(gè)函數(shù)為調(diào)度器本身直接使用
???
void
SetStartFunc(THREADFUNC StartFunc);
???
int
GetTask(TASK &Task);
??? CThreadPool & GetThreadPool();
??? LONG AtomicIncrementTaskId();
???
???
//
下面三個(gè)函數(shù)為調(diào)度器的使用者使用
???
void
SpawnLocalTask(TASK &Task);
???
void
SpawnTask(TASK &Task);
???
void
BeginRootThread(TASK &Task);
};
?
#endif
//__CNESTTASKSCHEDULER_H__
?
////////////////////////////////////////////////////////////////////////
/*
?* Copyright (c) 2006-2008
?* Author: Weiming Zhou
?*
?* Permission to use, copy, modify, distribute and sell this software
?* and its documentation for any purpose is hereby granted without fee,
?* provided that the above copyright notice appear in all copies and
?* that both that copyright notice and this permission notice appear
?* in supporting documentation.?
?*/
?
#include
"CapiGlobal.h"
#include
"CThreadPool.h"
#include
"CLocalQueue.h"
#include
"CStealQueue.h"
#include
"CDistributedQueue.h"
#include
"CNestTaskScheduler.h"
?
static
unsigned
int
WINAPI NestTaskScheduler_StartFunc(
void
*pArgs);
?
#define
???? NESTTASK_QUEUE_SIZE???? 128
?
/**?
嵌套任務(wù)調(diào)度的構(gòu)造函數(shù)
?
???? @return? constructor -
無??
*/
CNestTaskScheduler::CNestTaskScheduler()
{
??? m_StartFunc = NestTaskScheduler_StartFunc;
?
???
int
n = m_ThreadPool.GetThreadCount();
??? m_DQueue.Create(NESTTASK_QUEUE_SIZE, n, NESTTASK_QUEUE_SIZE, 0,
??????? ThreadPool_GetThreadId, &m_ThreadPool);
}
?
/**?
嵌套任務(wù)調(diào)度的設(shè)置線程池的入口函數(shù)
?
???? @param?? THREADFUNC StartFunc -
線程池的入口函數(shù)??
???? @return? void -
無
*/
void
CNestTaskScheduler::SetStartFunc(THREADFUNC StartFunc)
{
??? m_StartFunc = StartFunc;
}
?
/**?
嵌套任務(wù)調(diào)度的獲取任務(wù)函數(shù)
?
???? @param?? TASK &Task -
接收從分布式隊(duì)列中獲取的任務(wù)
???? @return? int -
成功返回CAPI_SUCCESS, 失敗返回CAPI_FAILED.??
*/
int
CNestTaskScheduler::GetTask(TASK &Task)
{
???
//
先從本地隊(duì)列獲取任務(wù)
???
//
本地獲取任務(wù)失敗后從共享隊(duì)列獲取任務(wù)
???
return
m_DQueue.DeQueue(Task);
};
?
?
/**?
嵌套任務(wù)調(diào)度的獲取線程池函數(shù)
?
??? @return?? CThreadPool & -
返回線程池對(duì)象??
*/
CThreadPool & CNestTaskScheduler::GetThreadPool()
{
???
return
m_ThreadPool;
}
?
/**?
嵌套任務(wù)調(diào)度的原子增加任務(wù)Id函數(shù)
?
??? @return?? int -
返回原子加后的任務(wù)Id.
*/
LONG CNestTaskScheduler::AtomicIncrementTaskId()
{
??? LONG Id = AtomicIncrement(&m_lTaskId);
???
return
Id;
}
?
?
?
/**?
嵌套任務(wù)調(diào)度的生成當(dāng)前線程的本地任務(wù)
???
任務(wù)被放入當(dāng)前線程的本地隊(duì)列中
?
???? @param?? TASK &Task -
待執(zhí)行的任務(wù)??
???? @return? void -
無
*/
void
CNestTaskScheduler::SpawnLocalTask(TASK &Task)
{
???
//
將任務(wù)放入本地隊(duì)列中
??? m_DQueue.PushToLocalQueue(Task);
};
?
?
/**?
嵌套任務(wù)調(diào)度的生成任務(wù)函數(shù)
???
生成的任務(wù)被放入分布式隊(duì)列中
?
???? @param?? TASK &Task -
待執(zhí)行的任務(wù)??
???? @return? void -
無
*/
void
CNestTaskScheduler::SpawnTask(TASK &Task)
{
???
if
( m_lTaskId < m_ThreadPool.GetThreadCount() )
??? {
???????
//
依次喚醒各個(gè)掛起的線程
??????? LONG Id = AtomicIncrement(&m_lTaskId);
???????
if
( Id < m_ThreadPool.GetThreadCount() )
??????? {
???????????
//
下面之所以可以對(duì)其他線程的本地隊(duì)列進(jìn)行無同步的操作,是因?yàn)?span lang="EN-US">
???????????
//
訪問這些隊(duì)列的線程在進(jìn)隊(duì)操作之后才開始運(yùn)行
??????????? m_DQueue.PushToLocalQueue(Task, Id);
??????????? m_ThreadPool.ExecThread(Id);
??????? }
???????
else
??????? {
??????????? m_DQueue.EnQueue(Task);
??????? }
??? }
???
else
??? {
???????
//
先判斷共享隊(duì)列是否滿,如果未滿則放入共享隊(duì)列中
???????
//
如果滿了則放入本地隊(duì)列中
??????? m_DQueue.EnQueue(Task);
??? }
};
?
?
/**?
嵌套任務(wù)調(diào)度的啟動(dòng)根線程函數(shù)
?
???? @param?? TASK &Task -
要執(zhí)行的最初任務(wù)???
???? @return? void -
無
*/
void
CNestTaskScheduler::BeginRootThread(TASK &Task)
{
??? m_lTaskId = 0;
?
??? m_ThreadPool.CreateThreadPool(m_StartFunc,
this
, 0);
?
??? m_DQueue.PushToLocalQueue(Task, 0);
?
??? m_ThreadPool.ExecThread( 0 );?
?
??? m_ThreadPool.WaitAllThread();
}
?
/**?
嵌套任務(wù)調(diào)度的線程池入口函數(shù)
?
???? @param?? void *pArgs - CNestTaskScheduler
類型的參數(shù)????
???? @return? unsigned int WINAPI -
返回?
*/
unsigned
int
WINAPI NestTaskScheduler_StartFunc(
void
*pArgs)
{
??? CNestTaskScheduler? *pSched = (CNestTaskScheduler *)pArgs;
?
??? TASK??? Task;
???
int
???? nRet;
?
???
for
( ;; )
??? {
??????? nRet = pSched->GetTask(Task);
?
???????
if
( nRet == CAPI_FAILED )
??????? {
??????????? CThreadPool &ThreadPool = pSched->GetThreadPool();
???????????
???????????
//
喚醒一個(gè)掛起的線程,防止任務(wù)數(shù)量小于CPU核數(shù)時(shí),
???????????
//
仍然有任務(wù)處于掛起狀態(tài),從而導(dǎo)致WaitAllThread()處于死等狀態(tài)
???????????
//
這個(gè)喚醒過程是一個(gè)串行的過程,被喚醒的任務(wù)會(huì)繼續(xù)喚醒一個(gè)掛起線程
??????????? LONG Id = pSched->AtomicIncrementTaskId();
???????????
if
( Id < ThreadPool.GetThreadCount() )
??????????? {
??????????????? ThreadPool.ExecThread(Id);
??????????? }
???????????
break
;
??????? }
?
??????? (*(Task.func))(Task.pArg);
??? }
?
???
return
0;
}
?
??????
posted on 2010-01-17 17:57
小王 閱讀(683)
評(píng)論(0) 編輯 收藏 引用 所屬分類:
多核編程