在一個系統(tǒng)中有一個寫線程和若干個讀線程,讀寫線程通過一個指針共用了一個數(shù)據(jù)結(jié)構(gòu),寫線程改寫這個結(jié)構(gòu),讀線程讀取該結(jié)構(gòu)。在寫線程改寫這個數(shù)據(jù)結(jié)構(gòu)的過程中,加鎖情況下讀線程由于等待鎖耗時會增加。
可以利用RCU (Read Copy Update What is rcu)的思想來去除這個鎖。本文提到的主要實現(xiàn)代碼:gist
RCU
RCU可以說是一種替代讀寫鎖的方法。其基于一個事實:當(dāng)寫線程在改變一個指針時,讀線程獲取這個指針,要么獲取到老的值,要么獲取到新的值。RCU的基本思想其實很簡單,參考What is RCU中Toy implementation可以很容易理解。一種簡單的RCU流程可以描述為:
寫線程:
old_ptr = _ptr
tmp_ptr = copy(_ptr) // copy
change(tmp_ptr) // change
_ptr = tmp_ptr // update
synchroize(tmp_ptr)
寫線程要更新_ptr
指向的內(nèi)容時,先復(fù)制一份新的,基于新的進(jìn)行改變,更新_ptr
指針,最后同步釋放老的內(nèi)存。
讀線程:
tmp_ptr = _ptr
use(tmp_ptr)
dereference(tmp_ptr)
讀線程直接使用_ptr
,使用完后需要告訴寫線程自己不再使用_ptr
。讀線程獲取_ptr
時,可能會獲取到老的也可能獲取到新的,無論哪種RCU都需要保證這塊內(nèi)存是有效的。重點在synchroize
和dereference
。synchroize
會等待所有使用老的_ptr
的線程dereference
,對于新的_ptr
使用者其不需要等待。這個問題說白了就是寫線程如何知道old_ptr
沒有任何讀線程在使用,可以安全地釋放。
這個問題實際上在wait-free
的各種實現(xiàn)中有好些解法,how-when-to-release-memory-in-wait-free-algorithms這里有人總結(jié)了幾種方法,例如Hazard pointers
、Quiescence period based reclamation
。
簡單地使用引用計數(shù)智能指針是無法解決這個問題的,因為智能指針自己不是線程安全的,例如:
tmp_ptr = _ptr // 1
tmp_ptr->addRef() // 2
use
tmp_ptr->release()
代碼1/2行不是原子的,所以當(dāng)取得tmp_ptr
準(zhǔn)備addRef
時,tmp_ptr
可能剛好被釋放了。
Quiescence period based reclamation
方法指的是讀線程需要聲明自己處于Quiescence period
,也就是不使用_ptr
的時候,當(dāng)其使用_ptr
的時候?qū)嶋H是進(jìn)入了一個邏輯上的臨界區(qū),當(dāng)所有讀線程都不再使用_ptr
的時候,寫線程就可以對內(nèi)存進(jìn)行安全地釋放。
本文正是描述了一種Quiescence period based reclamation
實現(xiàn)。這個實現(xiàn)可以用于有一個寫線程和多個讀線程共用若干個數(shù)據(jù)的場景。
實現(xiàn)
該方法本質(zhì)上把數(shù)據(jù)同步分解為基本的內(nèi)存單元讀寫。使用方式上可描述為:
讀線程:
tmp_ptr = _ptr
use
update() // 標(biāo)識自己不再使用任何共享數(shù)據(jù)
寫線程:
old_ptr = _ptr
tmp_ptr = copy(_ptr)
change(tmp_ptr)
_ptr = tmp_ptr
gc()
defer_free(old_ptr)
以下具體描述讀寫線程的實現(xiàn)。
寫線程
寫線程負(fù)責(zé)標(biāo)識內(nèi)存需要被釋放,以及檢查何時可以真正釋放內(nèi)存。其維護(hù)了一個釋放內(nèi)存隊列:
void *_pending[8]
uint64_t _head, _tail
void defer_free(void *p) {
_head ++
_pending[PENDING_POS(_head)] = p
}
gc() {
for (_tail -> find_free_pos())
free(_pending[_tail])
}
find_free_pos
找到一個可釋放內(nèi)存位置,在[_tail, find_free_pos())
這個區(qū)間內(nèi)所有內(nèi)存是可以安全被釋放的。
隊列位置_head/_tail
一直增大,PENDING_POS
就是對這個位置取模,限定在隊列大小范圍內(nèi)也是可行的,無論哪種方式,_head
從邏輯上說一直>=_tail
,但在實際中可能小于_tail
,所以實現(xiàn)時不使用大小判定,而是:
gc() {
pos = find_free_pos()
while (_tail != pos) {
free(_pending[PENDING_POS(_tail)])
_tail ++
}
}
讀線程
讀線程不再使用共享內(nèi)存時,就標(biāo)識自己:
update() {
static __thread int tid
_tmark[tid] = _head
}
讀線程的狀態(tài)會影響寫線程的回收邏輯,其狀態(tài)分為:
- 初始
- 活躍,會調(diào)用到
update
- 暫停,其他地方同步,或被掛起
- 退出
讀線程處于活躍狀態(tài)時,它會不斷地更新自己可釋放內(nèi)存位置(_tmark[tid]
)。寫線程檢查所有讀線程的_tmark[tid]
,[_tail, min(_tmark[]))
是所有讀線程都不再使用的內(nèi)存區(qū)間,可以被安全釋放。
find_free_pos() {
min = MAX_INTEGER
pos = 0
for (tid = 0; tid < max_threads; ++tid) {
tpos = _tmark[tid]
offset = tpos - tail
if (offset < min) {
min = offset
pos = tpos
}
}
return pos
}
當(dāng)讀線程暫停時,其_tmark[tid]
可能會在很長一段時間里得不到更新,此時會阻礙寫線程釋放內(nèi)存。所以需要方法來標(biāo)識讀線程是否進(jìn)入暫停狀態(tài)。通過設(shè)置一個上次釋放內(nèi)存位置_tfreeds[tid]
,標(biāo)識每個線程當(dāng)前內(nèi)存釋放到的位置。如果一個線程處于暫停狀態(tài)了,那么在一定時間后,_tfreeds[tid] == _tmark[tid]
。在查找可釋放位置時,就需要忽略暫停狀態(tài)的讀線程:
find_free_pos() {
min = MAX_INTEGER
pos = _head
for (tid = 0; tid < max_threads; ++tid) {
tpos = _tmark[tid]
if (tpos == _tfreeds[tid]) continue
offset = tpos - tail
if (offset < min) {
min = offset
pos = tpos
}
}
for (tid = 0; tid < max_threads; ++tid) {
if (_tfreeds[tid] != _tmark[tid])
_tfreeds[tid] = pos
}
return pos
}
但是當(dāng)所有線程都處于暫停狀態(tài)時,寫線程可能還在工作,上面的實現(xiàn)就會返回_head
,此時寫線程依然可以正常釋放內(nèi)存。
小結(jié),該方法原理可用下圖表示:

線程動態(tài)增加/減少
如果讀線程可能中途退出,中途動態(tài)增加,那么_tmark[]
就需要被復(fù)用,此時線程tid
的分配調(diào)整為動態(tài)的即可:
class ThreadIdPool {
public:
// 動態(tài)獲取一個線程tid,某線程每次調(diào)用該接口返回相同的值
int get()
// 線程退出時回收該tid
void put(int id)
}
ThreadIdPool
的實現(xiàn)無非就是利用TLS,以及在線程退出時得到通知以回收tid。那么對于讀線程的update
實現(xiàn)變?yōu)椋?/p>
update() {
tid = _idPool->get()
_tmark[tid] = _head
}
當(dāng)某個線程退出時,_tmark[tid]
和_tfreeds[tid]
不需要做任何處理,當(dāng)新創(chuàng)建的線程復(fù)用了該tid
時,可以立即復(fù)用_tmark[tid]
和_tfreeds[tid]
,此時這2個值必然是相等的。
以上,就是整個方法的實現(xiàn)。
線程可讀可寫
以上方法適用場景還是不夠通用。在nbds項目(實現(xiàn)了一些無鎖數(shù)據(jù)結(jié)構(gòu)的toy project)中有一份雖然簡單但也有啟發(fā)的實現(xiàn)(rcu.c)。該實現(xiàn)支持任意線程defer_free
,所有線程update
。update
除了聲明不再使用任何共享內(nèi)存外,還可能回收內(nèi)存。任意線程都可能維護(hù)一些待釋放的內(nèi)存,任意一塊內(nèi)存可能被任意其他線程使用。那么它是如何內(nèi)存回收的?
本文描述的方法是所有讀線程自己聲明自己,然后由寫線程主動來檢查。不同于此方法, nbds的實現(xiàn),基于一種通知擴(kuò)散的方式。該方式以這樣一種方式工作:
當(dāng)某個線程嘗試內(nèi)存回收時,它需要知道所有其他線程的空閑位置(相當(dāng)于_tmark[tid]
),它通知下一個線程我需要釋放的范圍。當(dāng)下一個線程update
時(離開臨界區(qū)),它會將上個線程的通知繼續(xù)告訴下一個線程,直到最后這個通知回到發(fā)起線程。那么對于發(fā)起線程而言,這個釋放請求在所有線程中走了一遍,得到了大家的認(rèn)可,可以安全釋放。每個線程都以這樣的方式工作。
void rcu_defer_free (void *x) {
...
rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
...
}
void rcu_update (void) {
...
for (i = 0; i < num_threads_; ++i) {
...
uint64_t x = rcu_[tid_][i]; // 其它線程發(fā)給自己的通知
rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x; // 擴(kuò)散出去
...
}
...
while (q->tail != rcu_[tid_][tid_]) {
free
}
...
}
這個實現(xiàn)相對簡單,不支持線程暫停,以及線程動態(tài)增加和減少。