在一個系統中有一個寫線程和若干個讀線程,讀寫線程通過一個指針共用了一個數據結構,寫線程改寫這個結構,讀線程讀取該結構。在寫線程改寫這個數據結構的過程中,加鎖情況下讀線程由于等待鎖耗時會增加。
可以利用RCU (Read Copy Update What is rcu)的思想來去除這個鎖。本文提到的主要實現代碼:gist
RCU
RCU可以說是一種替代讀寫鎖的方法。其基于一個事實:當寫線程在改變一個指針時,讀線程獲取這個指針,要么獲取到老的值,要么獲取到新的值。RCU的基本思想其實很簡單,參考What is RCU中Toy implementation可以很容易理解。一種簡單的RCU流程可以描述為:
寫線程:
old_ptr = _ptr
tmp_ptr = copy(_ptr) // copy
change(tmp_ptr) // change
_ptr = tmp_ptr // update
synchroize(tmp_ptr)
寫線程要更新_ptr
指向的內容時,先復制一份新的,基于新的進行改變,更新_ptr
指針,最后同步釋放老的內存。
讀線程:
tmp_ptr = _ptr
use(tmp_ptr)
dereference(tmp_ptr)
讀線程直接使用_ptr
,使用完后需要告訴寫線程自己不再使用_ptr
。讀線程獲取_ptr
時,可能會獲取到老的也可能獲取到新的,無論哪種RCU都需要保證這塊內存是有效的。重點在synchroize
和dereference
。synchroize
會等待所有使用老的_ptr
的線程dereference
,對于新的_ptr
使用者其不需要等待。這個問題說白了就是寫線程如何知道old_ptr
沒有任何讀線程在使用,可以安全地釋放。
這個問題實際上在wait-free
的各種實現中有好些解法,how-when-to-release-memory-in-wait-free-algorithms這里有人總結了幾種方法,例如Hazard pointers
、Quiescence period based reclamation
。
簡單地使用引用計數智能指針是無法解決這個問題的,因為智能指針自己不是線程安全的,例如:
tmp_ptr = _ptr // 1
tmp_ptr->addRef() // 2
use
tmp_ptr->release()
代碼1/2行不是原子的,所以當取得tmp_ptr
準備addRef
時,tmp_ptr
可能剛好被釋放了。
Quiescence period based reclamation
方法指的是讀線程需要聲明自己處于Quiescence period
,也就是不使用_ptr
的時候,當其使用_ptr
的時候實際是進入了一個邏輯上的臨界區,當所有讀線程都不再使用_ptr
的時候,寫線程就可以對內存進行安全地釋放。
本文正是描述了一種Quiescence period based reclamation
實現。這個實現可以用于有一個寫線程和多個讀線程共用若干個數據的場景。
實現
該方法本質上把數據同步分解為基本的內存單元讀寫。使用方式上可描述為:
讀線程:
tmp_ptr = _ptr
use
update() // 標識自己不再使用任何共享數據
寫線程:
old_ptr = _ptr
tmp_ptr = copy(_ptr)
change(tmp_ptr)
_ptr = tmp_ptr
gc()
defer_free(old_ptr)
以下具體描述讀寫線程的實現。
寫線程
寫線程負責標識內存需要被釋放,以及檢查何時可以真正釋放內存。其維護了一個釋放內存隊列:
void *_pending[8]
uint64_t _head, _tail
void defer_free(void *p) {
_head ++
_pending[PENDING_POS(_head)] = p
}
gc() {
for (_tail -> find_free_pos())
free(_pending[_tail])
}
find_free_pos
找到一個可釋放內存位置,在[_tail, find_free_pos())
這個區間內所有內存是可以安全被釋放的。
隊列位置_head/_tail
一直增大,PENDING_POS
就是對這個位置取模,限定在隊列大小范圍內也是可行的,無論哪種方式,_head
從邏輯上說一直>=_tail
,但在實際中可能小于_tail
,所以實現時不使用大小判定,而是:
gc() {
pos = find_free_pos()
while (_tail != pos) {
free(_pending[PENDING_POS(_tail)])
_tail ++
}
}
讀線程
讀線程不再使用共享內存時,就標識自己:
update() {
static __thread int tid
_tmark[tid] = _head
}
讀線程的狀態會影響寫線程的回收邏輯,其狀態分為:
- 初始
- 活躍,會調用到
update
- 暫停,其他地方同步,或被掛起
- 退出
讀線程處于活躍狀態時,它會不斷地更新自己可釋放內存位置(_tmark[tid]
)。寫線程檢查所有讀線程的_tmark[tid]
,[_tail, min(_tmark[]))
是所有讀線程都不再使用的內存區間,可以被安全釋放。
find_free_pos() {
min = MAX_INTEGER
pos = 0
for (tid = 0; tid < max_threads; ++tid) {
tpos = _tmark[tid]
offset = tpos - tail
if (offset < min) {
min = offset
pos = tpos
}
}
return pos
}
當讀線程暫停時,其_tmark[tid]
可能會在很長一段時間里得不到更新,此時會阻礙寫線程釋放內存。所以需要方法來標識讀線程是否進入暫停狀態。通過設置一個上次釋放內存位置_tfreeds[tid]
,標識每個線程當前內存釋放到的位置。如果一個線程處于暫停狀態了,那么在一定時間后,_tfreeds[tid] == _tmark[tid]
。在查找可釋放位置時,就需要忽略暫停狀態的讀線程:
find_free_pos() {
min = MAX_INTEGER
pos = _head
for (tid = 0; tid < max_threads; ++tid) {
tpos = _tmark[tid]
if (tpos == _tfreeds[tid]) continue
offset = tpos - tail
if (offset < min) {
min = offset
pos = tpos
}
}
for (tid = 0; tid < max_threads; ++tid) {
if (_tfreeds[tid] != _tmark[tid])
_tfreeds[tid] = pos
}
return pos
}
但是當所有線程都處于暫停狀態時,寫線程可能還在工作,上面的實現就會返回_head
,此時寫線程依然可以正常釋放內存。
小結,該方法原理可用下圖表示:

線程動態增加/減少
如果讀線程可能中途退出,中途動態增加,那么_tmark[]
就需要被復用,此時線程tid
的分配調整為動態的即可:
class ThreadIdPool {
public:
// 動態獲取一個線程tid,某線程每次調用該接口返回相同的值
int get()
// 線程退出時回收該tid
void put(int id)
}
ThreadIdPool
的實現無非就是利用TLS,以及在線程退出時得到通知以回收tid。那么對于讀線程的update
實現變為:
update() {
tid = _idPool->get()
_tmark[tid] = _head
}
當某個線程退出時,_tmark[tid]
和_tfreeds[tid]
不需要做任何處理,當新創建的線程復用了該tid
時,可以立即復用_tmark[tid]
和_tfreeds[tid]
,此時這2個值必然是相等的。
以上,就是整個方法的實現。
線程可讀可寫
以上方法適用場景還是不夠通用。在nbds項目(實現了一些無鎖數據結構的toy project)中有一份雖然簡單但也有啟發的實現(rcu.c)。該實現支持任意線程defer_free
,所有線程update
。update
除了聲明不再使用任何共享內存外,還可能回收內存。任意線程都可能維護一些待釋放的內存,任意一塊內存可能被任意其他線程使用。那么它是如何內存回收的?
本文描述的方法是所有讀線程自己聲明自己,然后由寫線程主動來檢查。不同于此方法, nbds的實現,基于一種通知擴散的方式。該方式以這樣一種方式工作:
當某個線程嘗試內存回收時,它需要知道所有其他線程的空閑位置(相當于_tmark[tid]
),它通知下一個線程我需要釋放的范圍。當下一個線程update
時(離開臨界區),它會將上個線程的通知繼續告訴下一個線程,直到最后這個通知回到發起線程。那么對于發起線程而言,這個釋放請求在所有線程中走了一遍,得到了大家的認可,可以安全釋放。每個線程都以這樣的方式工作。
void rcu_defer_free (void *x) {
...
rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
...
}
void rcu_update (void) {
...
for (i = 0; i < num_threads_; ++i) {
...
uint64_t x = rcu_[tid_][i]; // 其它線程發給自己的通知
rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x; // 擴散出去
...
}
...
while (q->tail != rcu_[tid_][tid_]) {
free
}
...
}
這個實現相對簡單,不支持線程暫停,以及線程動態增加和減少。