在一個(gè)系統(tǒng)中有一個(gè)寫線程和若干個(gè)讀線程,讀寫線程通過一個(gè)指針共用了一個(gè)數(shù)據(jù)結(jié)構(gòu),寫線程改寫這個(gè)結(jié)構(gòu),讀線程讀取該結(jié)構(gòu)。在寫線程改寫這個(gè)數(shù)據(jù)結(jié)構(gòu)的過程中,加鎖情況下讀線程由于等待鎖耗時(shí)會增加。
可以利用RCU (Read Copy Update What is rcu)的思想來去除這個(gè)鎖。本文提到的主要實(shí)現(xiàn)代碼:gist
RCU
RCU可以說是一種替代讀寫鎖的方法。其基于一個(gè)事實(shí):當(dāng)寫線程在改變一個(gè)指針時(shí),讀線程獲取這個(gè)指針,要么獲取到老的值,要么獲取到新的值。RCU的基本思想其實(shí)很簡單,參考What is RCU中Toy implementation可以很容易理解。一種簡單的RCU流程可以描述為:
寫線程:
old_ptr = _ptr
tmp_ptr = copy(_ptr) // copy
change(tmp_ptr) // change
_ptr = tmp_ptr // update
synchroize(tmp_ptr)
寫線程要更新_ptr
指向的內(nèi)容時(shí),先復(fù)制一份新的,基于新的進(jìn)行改變,更新_ptr
指針,最后同步釋放老的內(nèi)存。
讀線程:
tmp_ptr = _ptr
use(tmp_ptr)
dereference(tmp_ptr)
讀線程直接使用_ptr
,使用完后需要告訴寫線程自己不再使用_ptr
。讀線程獲取_ptr
時(shí),可能會獲取到老的也可能獲取到新的,無論哪種RCU都需要保證這塊內(nèi)存是有效的。重點(diǎn)在synchroize
和dereference
。synchroize
會等待所有使用老的_ptr
的線程dereference
,對于新的_ptr
使用者其不需要等待。這個(gè)問題說白了就是寫線程如何知道old_ptr
沒有任何讀線程在使用,可以安全地釋放。
這個(gè)問題實(shí)際上在wait-free
的各種實(shí)現(xiàn)中有好些解法,how-when-to-release-memory-in-wait-free-algorithms這里有人總結(jié)了幾種方法,例如Hazard pointers
、Quiescence period based reclamation
。
簡單地使用引用計(jì)數(shù)智能指針是無法解決這個(gè)問題的,因?yàn)橹悄苤羔樧约翰皇蔷€程安全的,例如:
tmp_ptr = _ptr // 1
tmp_ptr->addRef() // 2
use
tmp_ptr->release()
代碼1/2行不是原子的,所以當(dāng)取得tmp_ptr
準(zhǔn)備addRef
時(shí),tmp_ptr
可能剛好被釋放了。
Quiescence period based reclamation
方法指的是讀線程需要聲明自己處于Quiescence period
,也就是不使用_ptr
的時(shí)候,當(dāng)其使用_ptr
的時(shí)候?qū)嶋H是進(jìn)入了一個(gè)邏輯上的臨界區(qū),當(dāng)所有讀線程都不再使用_ptr
的時(shí)候,寫線程就可以對內(nèi)存進(jìn)行安全地釋放。
本文正是描述了一種Quiescence period based reclamation
實(shí)現(xiàn)。這個(gè)實(shí)現(xiàn)可以用于有一個(gè)寫線程和多個(gè)讀線程共用若干個(gè)數(shù)據(jù)的場景。
實(shí)現(xiàn)
該方法本質(zhì)上把數(shù)據(jù)同步分解為基本的內(nèi)存單元讀寫。使用方式上可描述為:
讀線程:
tmp_ptr = _ptr
use
update() // 標(biāo)識自己不再使用任何共享數(shù)據(jù)
寫線程:
old_ptr = _ptr
tmp_ptr = copy(_ptr)
change(tmp_ptr)
_ptr = tmp_ptr
gc()
defer_free(old_ptr)
以下具體描述讀寫線程的實(shí)現(xiàn)。
寫線程
寫線程負(fù)責(zé)標(biāo)識內(nèi)存需要被釋放,以及檢查何時(shí)可以真正釋放內(nèi)存。其維護(hù)了一個(gè)釋放內(nèi)存隊(duì)列:
void *_pending[8]
uint64_t _head, _tail
void defer_free(void *p) {
_head ++
_pending[PENDING_POS(_head)] = p
}
gc() {
for (_tail -> find_free_pos())
free(_pending[_tail])
}
find_free_pos
找到一個(gè)可釋放內(nèi)存位置,在[_tail, find_free_pos())
這個(gè)區(qū)間內(nèi)所有內(nèi)存是可以安全被釋放的。
隊(duì)列位置_head/_tail
一直增大,PENDING_POS
就是對這個(gè)位置取模,限定在隊(duì)列大小范圍內(nèi)也是可行的,無論哪種方式,_head
從邏輯上說一直>=_tail
,但在實(shí)際中可能小于_tail
,所以實(shí)現(xiàn)時(shí)不使用大小判定,而是:
gc() {
pos = find_free_pos()
while (_tail != pos) {
free(_pending[PENDING_POS(_tail)])
_tail ++
}
}
讀線程
讀線程不再使用共享內(nèi)存時(shí),就標(biāo)識自己:
update() {
static __thread int tid
_tmark[tid] = _head
}
讀線程的狀態(tài)會影響寫線程的回收邏輯,其狀態(tài)分為:
- 初始
- 活躍,會調(diào)用到
update
- 暫停,其他地方同步,或被掛起
- 退出
讀線程處于活躍狀態(tài)時(shí),它會不斷地更新自己可釋放內(nèi)存位置(_tmark[tid]
)。寫線程檢查所有讀線程的_tmark[tid]
,[_tail, min(_tmark[]))
是所有讀線程都不再使用的內(nèi)存區(qū)間,可以被安全釋放。
find_free_pos() {
min = MAX_INTEGER
pos = 0
for (tid = 0; tid < max_threads; ++tid) {
tpos = _tmark[tid]
offset = tpos - tail
if (offset < min) {
min = offset
pos = tpos
}
}
return pos
}
當(dāng)讀線程暫停時(shí),其_tmark[tid]
可能會在很長一段時(shí)間里得不到更新,此時(shí)會阻礙寫線程釋放內(nèi)存。所以需要方法來標(biāo)識讀線程是否進(jìn)入暫停狀態(tài)。通過設(shè)置一個(gè)上次釋放內(nèi)存位置_tfreeds[tid]
,標(biāo)識每個(gè)線程當(dāng)前內(nèi)存釋放到的位置。如果一個(gè)線程處于暫停狀態(tài)了,那么在一定時(shí)間后,_tfreeds[tid] == _tmark[tid]
。在查找可釋放位置時(shí),就需要忽略暫停狀態(tài)的讀線程:
find_free_pos() {
min = MAX_INTEGER
pos = _head
for (tid = 0; tid < max_threads; ++tid) {
tpos = _tmark[tid]
if (tpos == _tfreeds[tid]) continue
offset = tpos - tail
if (offset < min) {
min = offset
pos = tpos
}
}
for (tid = 0; tid < max_threads; ++tid) {
if (_tfreeds[tid] != _tmark[tid])
_tfreeds[tid] = pos
}
return pos
}
但是當(dāng)所有線程都處于暫停狀態(tài)時(shí),寫線程可能還在工作,上面的實(shí)現(xiàn)就會返回_head
,此時(shí)寫線程依然可以正常釋放內(nèi)存。
小結(jié),該方法原理可用下圖表示:

線程動(dòng)態(tài)增加/減少
如果讀線程可能中途退出,中途動(dòng)態(tài)增加,那么_tmark[]
就需要被復(fù)用,此時(shí)線程tid
的分配調(diào)整為動(dòng)態(tài)的即可:
class ThreadIdPool {
public:
// 動(dòng)態(tài)獲取一個(gè)線程tid,某線程每次調(diào)用該接口返回相同的值
int get()
// 線程退出時(shí)回收該tid
void put(int id)
}
ThreadIdPool
的實(shí)現(xiàn)無非就是利用TLS,以及在線程退出時(shí)得到通知以回收tid。那么對于讀線程的update
實(shí)現(xiàn)變?yōu)椋?/p>
update() {
tid = _idPool->get()
_tmark[tid] = _head
}
當(dāng)某個(gè)線程退出時(shí),_tmark[tid]
和_tfreeds[tid]
不需要做任何處理,當(dāng)新創(chuàng)建的線程復(fù)用了該tid
時(shí),可以立即復(fù)用_tmark[tid]
和_tfreeds[tid]
,此時(shí)這2個(gè)值必然是相等的。
以上,就是整個(gè)方法的實(shí)現(xiàn)。
線程可讀可寫
以上方法適用場景還是不夠通用。在nbds項(xiàng)目(實(shí)現(xiàn)了一些無鎖數(shù)據(jù)結(jié)構(gòu)的toy project)中有一份雖然簡單但也有啟發(fā)的實(shí)現(xiàn)(rcu.c)。該實(shí)現(xiàn)支持任意線程defer_free
,所有線程update
。update
除了聲明不再使用任何共享內(nèi)存外,還可能回收內(nèi)存。任意線程都可能維護(hù)一些待釋放的內(nèi)存,任意一塊內(nèi)存可能被任意其他線程使用。那么它是如何內(nèi)存回收的?
本文描述的方法是所有讀線程自己聲明自己,然后由寫線程主動(dòng)來檢查。不同于此方法, nbds的實(shí)現(xiàn),基于一種通知擴(kuò)散的方式。該方式以這樣一種方式工作:
當(dāng)某個(gè)線程嘗試內(nèi)存回收時(shí),它需要知道所有其他線程的空閑位置(相當(dāng)于_tmark[tid]
),它通知下一個(gè)線程我需要釋放的范圍。當(dāng)下一個(gè)線程update
時(shí)(離開臨界區(qū)),它會將上個(gè)線程的通知繼續(xù)告訴下一個(gè)線程,直到最后這個(gè)通知回到發(fā)起線程。那么對于發(fā)起線程而言,這個(gè)釋放請求在所有線程中走了一遍,得到了大家的認(rèn)可,可以安全釋放。每個(gè)線程都以這樣的方式工作。
void rcu_defer_free (void *x) {
...
rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
...
}
void rcu_update (void) {
...
for (i = 0; i < num_threads_; ++i) {
...
uint64_t x = rcu_[tid_][i]; // 其它線程發(fā)給自己的通知
rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x; // 擴(kuò)散出去
...
}
...
while (q->tail != rcu_[tid_][tid_]) {
free
}
...
}
這個(gè)實(shí)現(xiàn)相對簡單,不支持線程暫停,以及線程動(dòng)態(tài)增加和減少。