上一篇中我介紹了一種通過封閉Critical Section對象而方便的使用互斥鎖的方式,文中所有的例子是兩個線程對同一數據一讀一寫,因此需要讓它們在這里互斥,不能同時訪問。而在實際情況中可能會有更復雜的情況出現,就是多個線程訪問同一數據,一部分是讀,一部分是寫。我們知道只有讀-寫或寫-寫同時進行時可能會出現問題,而讀-讀則可以同時進行,因為它們不會對數據進行修改,所以也有必要在C++中封裝一種方便的允許讀-讀并發、讀-寫與寫-寫互斥的鎖。要實現這種鎖,使用臨界區就很困難了,不如改用內核對象,這里我使用的是互斥量(Mutex)。
總體的結構與上一篇中的類似,都是寫出一個對鎖進行封裝的基類,再寫一個用于調用加、解鎖函數的類,通過對第二個類的生命周期的管理實現加鎖和解鎖。這里涉及到兩個新問題,一是加鎖、解鎖動作都有兩種,一種是加/解讀鎖,一種是加/解寫鎖;二是為了允許讀-讀并發,這里只聲明一個Mutex是不夠的,必須要聲明多個Mutex,而且有多少個Mutex就同時允許多少個讀線程并發,之所以這么說,是因為我們要使用的API函數是WaitForMultipleObjects。
WaitForMultipleObjects函數的功能就是等待對象狀態被設置,MSDN中對它的說明為:
Waits until one or all of the specified objects are in the signaled state or the time-out interval elapses.
這是個很好用的函數,我們可以用它來等待某個或某幾個對象,并且允許設置超時時間,等待成功時與超時時返回的值是不同的。如果返回的值比WAIT_ABANDONED小則表示等待成功。“等待成功”對于不同類型的內核對象有不同的意義,例如對于進程或線程對象,等待成功就表示進程或線程執行結束了;對于互斥量對象,則表示此對象現在不被任何其他線程擁有,并且一旦等待成功,當前線程即擁有了此互斥量,其他線程則不能同時擁有,直接調用ReleaseMutex函數主動釋放互斥量。
與WaitForMultipleObjects類似的還有一個函數WaitForSingleObject,它的功能比較簡單,只針對單一個對象,而WaitForMultipleObjects可以同時等待多個對象,并且可以設置是否等待所有對象。
上一篇文章中用的InstanceLockBase類里面封裝了一個Critical Section對象,這里則要封裝一組Mutex的Handle,那么這一組是多少個呢?它應該由使用此類的程序中定義,例如可以用動態數組的方法:
//
基類:
???????
class
?RWLockBase?
//
表示Read/Write?Lock
???????
...
{
??????????????HANDLE
*
?handles;
???????
protected
:

??????????????RWLockBase(
int
?handleCount)?
...
{?handles?
=
?
new
?HANDLE[handleCount];?}
??????????????…
???????}
;
//
子類:
???????
class
?MyClass:?
public
?RWLockBase??????

???????
...
{

??????????????MyClass():?RWLockBase(
3
)?
...
{}
??????????????…
???????}
;
這確實是個不錯的辦法,通過在子類構造函數的初始化段中調用基類構造函數并傳參,使得這個動態數組得以正確初始化,不過這樣看著不太爽,子類必須兩次出現“RWLockBase”一詞,能不能像InstanceLockBase那樣只要繼承了就好呢?答案是肯定的,只要用C++模板即可:
class
?_RWLockBase

...
{
???????friend?
class
?RWLock;
protected
:
???????
virtual
?DWORD?ReadLock(
int
?timeout)?
=
?
0
;
???????
virtual
?
void
?ReadUnlock(
int
?handleIndex)?
=
?
0
;
???????
virtual
?DWORD?WriteLock(
int
?timeout)?
=
?
0
;
???????
virtual
?
void
?WriteUnlock()?
=
?
0
;
}
;
模板類RWLockBase從_RWLockBase繼承,并對四個函數寫出實現:
template?
<
int
?maxReadCount?
=
?
3
>
????????
//
這里給一個缺省參數,盡量減少客戶端代碼量
class
?RWLockBase:?
public
?_RWLockBase

...
{
???????HANDLE?handles[maxReadCount];
???????DWORD?ReadLock(
int
?timeout)???
//
加讀鎖,只要等到一個互斥量返回即可
???????
...
{
??????????????
return
?::WaitForMultipleObjects(maxReadCount,?handles,?FALSE,?timeout);
???????}
???????
void
?ReadUnlock(
int
?handleIndex)?
//
解讀鎖,釋放已獲得的互斥量
???????
...
{
??????????????::ReleaseMutex(handles[handleIndex]);
???????}
???????DWORD?WriteLock(
int
?timeout)???
//
加寫鎖,等到所有互斥量,從而與其他所有線程互斥
???????
...
{
??????????????
return
?::WaitForMultipleObjects(maxReadCount,?handles,?TRUE,?timeout);
???????}
???????
void
?WriteUnlock()??????????????????????
//
解寫鎖,釋放所有的互斥量
???????
...
{
??????????????
for
(
int
?i?
=
?
0
;?i?
<
?maxReadCount;?i
++
)
?????????????????????::ReleaseMutex(handles[i]);
???????}
protected
:
???????RWLockBase()????????????????????????????
//
構造函數,初始化每個互斥量
???????
...
{
??????????????
for
(
int
?i?
=
?
0
;?i?
<
?maxReadCount;?i
++
)
?????????????????????handles[i]?
=
?::CreateMutex(
0
,?FALSE,?
0
);
???????}
???????
~
RWLockBase()??????????????????????????
//
析構函數,銷毀對象
???????
...
{
??????????????
for
(
int
?i?
=
?
0
;?i?
<
?maxReadCount;?i
++
)
?????????????????????::CloseHandle(handles[i]);
???????}
}
;
而相應的鎖類也會稍復雜一些:
class
?RWLock

...
{
???????
bool
?lockSuccess;????????????????
//
因為有可能超時,需要保存是否等待成功
???????
int
?readLockHandleIndex;??????
//
對于讀鎖,需要知道獲得的是哪個互斥量
???????_RWLockBase
*
?_pObj;?????????
//
目標對象基類指針
public
:
???????
//
這里通過第二個參數決定是加讀鎖還是寫鎖,第三個參數為超時的時間
???????RWLock(_RWLockBase
*
?pObj,?
bool
?readLock?
=
?
true
,?
int
?timeout?
=
?
3000
)

???????
...
{
??????????????_pObj?
=
?pObj;
??????????????lockSuccess?
=
?FALSE;
??????????????readLockHandleIndex?
=
?
-
1
;
??????????????
if
(NULL?
==
?_pObj)
?????????????????????
return
;
?
??????????????
if
(readLock)??????????
//
讀鎖
??????????????
...
{
?????????????????????DWORD?retval?
=
?_pObj
->
ReadLock(timeout);
?????????????????????
if
(retval?
<
?WAIT_ABANDONED)?
//
返回值小于WAIT_ABANDONED表示成功
?????????????????????
...
{???????????????????????????????????????????????
//
其值減WAIT_OBJECT_0就是數組下標
????????????????????????????readLockHandleIndex?
=
?retval?
-
?WAIT_OBJECT_0;
????????????????????????????lockSuccess?
=
?TRUE;
?????????????????????}
??????????????}
??????????????
else
??????????????
...
{
?????????????????????DWORD?retval?
=
?_pObj
->
WriteLock(timeout);
?????????????????????
if
(retval?
<
?WAIT_ABANDONED)?
//
寫鎖時獲得了所有互斥量,無需保存下標
????????????????????????????lockSuccess?
=
?TRUE;
??????????????}
???????}
???????
~
RWLock()

???????
...
{
??????????????
if
(NULL?
==
?_pObj)
?????????????????????
return
;
??????????????
if
(readLockHandleIndex?
>
?
-
1
)
?????????????????????_pObj
->
ReadUnlock(readLockHandleIndex);
??????????????
else
?????????????????????_pObj
->
WriteUnlock();
???????}
???????
bool
?IsLockSuccess()?
const
?
...
{?
return
?lockSuccess;?}
}
;
?
這樣一來,讀/寫鎖的類也就完成了,使用時與InstanceLock類似:
1 被鎖對象從RWLockBase<>類繼承
2 需要加讀鎖時,聲明一個RWLock實例,并指出要加的是讀鎖
3 需要加寫鎖時,聲明一個RWLock實例,并指出要加的是寫鎖
?
這里還是要多說兩句,雖然使用純虛函數結合模板類,使得客戶端代碼量減到最少,但性能上有一些影響,因為聲明了虛函數,則實例中必然存在4個字節的VPTR,調用虛函數時則要查找VTABLE,空間和時間上都有微小的犧牲。而如果不使用模板類,則沒有虛函數的代價,但也有犧牲:不使用模板類則需要使用動態數組,動態數組本身需要程序運行時在堆上分配,這也需要時間;指向動態數組的指針也需要占用內存,所以空間上的開鎖是一樣的,時間上雖然動態分配內存需要的時間應該比虛函數的調用要慢一點,但初始化只需要一次,總體來說也是值得的。所以最終要使用哪一種,就看具體需要了。
?
這里也給出一個實驗。這里所用的被鎖類也上一篇類似,簡單的從RWLockBase類繼承:
class
?MyClass2:?
public
?RWLockBase
<>
...
{}
;
MyClass2?mc2;
看看兩個線程函數:
//
讀線程
DWORD?CALLBACK?ReadThreadProc(LPVOID?param)

...
{
???????
int
?i?
=
?(
int
)param;
???????RWLock?
lock
(
&
mc2);??????????
//
加讀鎖
???????
if
(
lock
.IsLockSuccess())?????????????
//
如果加鎖成功
???????
...
{
??????????????Say(
"
read?thread?%d?started
"
,?i);???
//
為了代碼短一些,假設Say函數有這種能力
??????????????Sleep(
1000
);
??????????????Say(
"
read?thread?%d?ended
"
,?i);
???????}
???????
else
?????????????????????????????????????
//
加鎖超時,則顯示超時信息
??????????????Say(
"
read?thread?%d?timeout
"
,?i);
???????
return
?
0
;
}
//
寫線程
DWORD?CALLBACK?WriteThreadProc(LPVOID?param)

...
{
???????
int
?i?
=
?(
int
)param;
???????RWLock?
lock
(
&
mc2,?
false
);?
//
加寫鎖。
???????
if
(
lock
.IsLockSuccess())

???????
...
{
??????????????Say(
"
write?thread?%d?started
"
,?i);
??????????????Sleep(
600
);
??????????????Say(
"
write?thread?%d?ended
"
,?i);
???????}
???????
else
??????????????Say(
"
write?thread?%d?timeout
"
,?i);
???????
return
?
0
;
}
?
主線程:
???????
int
?i;
???????
for
(i?
=
?
0
;?i?
<
?
5
;?i
++
)
??????????????::CreateThread(
0
,?
0
,?ReadThreadProc,?(LPVOID)i,?
0
,?
0
);
???????
for
(i?
=
?
0
;?i?
<
?
5
;?i
++
)
??????????????::CreateThread(
0
,?
0
,?WriteThreadProc,?(LPVOID)i,?
0
,?
0
);
程序共開10個線程,5個讀5個寫。從RWLockBase類繼承時我們使用了默認的模板參數,所以最多同時允許3個讀線程。程序的運行結果如下:
001 [15:07:28.484]read thread 0 started
002 [15:07:28.484]read thread 1 started
003 [15:07:28.484]read thread 2 started
004 [15:07:29.484]read thread 0 ended
005 [15:07:29.484]read thread 3 started
006 [15:07:29.484]read thread 1 ended
007 [15:07:29.484]read thread 4 started
008 [15:07:29.484]read thread 2 ended
009 [15:07:30.484]read thread 3 ended
010 [15:07:30.484]read thread 4 ended
011 [15:07:30.484]write thread 0 started
012 [15:07:31.078]write thread 0 ended
013 [15:07:31.078]write thread 1 started
014 [15:07:31.484]write thread 2 timeout
015 [15:07:31.484]write thread 3 timeout
016 [15:07:31.484]write thread 4 timeout
017 [15:07:31.687]write thread 1 ended
前三行三個讀線程取得讀鎖,之后等一秒(第4-8行),三個讀線程都結束了,并且余下的兩個讀線程取得讀鎖,雖然這時剩下了一個互斥量沒有使用,但因為其他的線程都請求加寫鎖,寫鎖與其他所有線程互斥,所以還不能取得寫鎖。再過一秒(第9-11行),后來的兩個取得讀鎖的線程也結束了,則第一個寫線程取得寫鎖。600毫秒之后(第12-13行)第一個寫線程結束,第二個寫線程開始。400毫秒之后(第14-16行)余下的三個寫線程都超時了,再后第二個寫線程也結束了。
posted on 2006-10-19 14:35
Jerry Cat 閱讀(1503)
評論(1) 編輯 收藏 引用