昨天在上篇blog里描寫了如何把STL容器放到共享內存里去,不過由于好久不寫blog,發覺詞匯組織能力差了很多,不少想寫的東西寫的很零散,今天剛好翻看自己的書簽,看到一篇挺老的文章,不過從共享內存到STL容器講述得蠻全面,還提供了學習的實例,所以順便翻譯過來,并附上原文地址。
共享內存(shm)是當前主流UNIX系統中的一種IPC方法,它允許多個進程把同一塊物理內存段(segment)映射(map)到它們的地址空間中去。既然內存段對于各自附著(attach)的進程是共享的,這些進程可以很方便的通過這塊共享內存上的共有數據進行通信。因此,顧名思義,共享內存就是進程之間共享的一組內存段。當一個進程附著到一塊共享內存上后,它得到一個指向這塊共享內存的指針;該進程可以像使用其他內存一樣使用這塊共享內存。當然,由于這塊內存同樣會被其他進程訪問或寫入,所以必須要注意進程同步問題。
參考如下代碼,這是UNIX系統上使用共享內存的一般方法(注:本文調用的是POSIX函數):
//Get shared memory id //shared memory key const key_t ipckey = 24568; //shared memory permission; can be //read and written by anybody const int perm = 0666; //shared memory segment size size_t shmSize = 4096; //Create shared memory if not //already created with specified //permission int shmId = shmget (ipckey,shmSize,IPC_CREAT|perm); if (shmId ==-1) { //Error } //Attach the shared memory segment void* shmPtr = shmat(shmId,NULL,0); struct commonData* dp = (struct commonData*)shmPtr; //detach shared memory shmdt(shmPtr); |
存放在共享內存中的數據結構
當保存數據到共享內存中時需要留意,參考如下結構:
struct commonData { int sharedInt; float sharedFloat; char* name; Struct CommonData* next; }; |
進程A把數據寫入共享內存:
//Attach shared memory struct commonData* dp = (struct commonData*) shmat (shmId,NULL,0); dp->sharedInt = 5; . . dp->name = new char [20]; strcpy(dp->name,"My Name"); dp->next = new struct commonData(); |
稍后,進程B把數據讀出:
struct commonData* dp = (struct commonData*) shmat (shmId,NULL,0); //count = 5; int count = dp->sharedInt; //problem printf("name = [%s]\n",dp->name); dp = dp->next; //problem |
結構 commonData
的成員 name
和指向下一個結構的 next
所指向的內存分別從進程A的地址空間中的堆上分配,顯然 name 和 next 指向的內存也只有進程A可以訪問。當進程B訪問 dp->name
或者 dp->next
時候,由于它在訪問自己地址空間以外的內存空間,所以這將是非法操作(memory violation),它無法正確得到 name
和 next
所指向的內存。因此,所有的共享內存中的指針必須同樣指向共享內存中的地址。(這也是為什么包含虛函數繼承的C++類對象不能放到共享內存中的原因——這是另外一個話題。注:因為虛函數的具體實現可能會在其他的內存空間中)由于這些條件限制,放入共享內存中的結構應該簡單簡單。(注:我覺得最好避免使用指針)
共享內存中的STL容器
想像一下把STL容器,例如map, vector, list等等,放入共享內存中,IPC一旦有了這些強大的通用數據結構做輔助,無疑進程間通信的能力一下子強大了很多。我們沒必要再為共享內存設計其他額外的數據結構,另外,STL的高度可擴展性將為IPC所驅使。STL容器被良好的封裝,默認情況下有它們自己的內存管理方案。當一個元素被插入到一個STL列表(list)中時,列表容器自動為其分配內存,保存數據。考慮到要將STL容器放到共享內存中,而容器卻自己在堆上分配內存。一個最笨拙的辦法是在堆上構造STL容器,然后把容器復制到共享內存,并且確保所有容器的內部分配的內存指向共享內存中的相應區域,這基本是個不可能完成的任務。例如下邊進程A所做的事情:
//Attach to shared memory void* rp = (void*)shmat(shmId,NULL,0); //Construct the vector in shared //memory using placement new vector<int>* vpInA = new(rp) vector<int>*; //The vector is allocating internal data //from the heap in process A's address //space to hold the integer value (*vpInA)[0] = 22; |
然后進程B希望從共享內存中取出數據:
vector<int>* vpInB = (vector<int>*) shmat(shmId,NULL,0); //problem - the vector contains internal //pointers allocated in process A's address //space and are invalid here int i = *(vpInB)[0]; |
重用STL allocator
進一步考察STL容器,我們發現它的模板定義中有第二個默認參數,也就是allocator 類,該類實際是一個內存分配模型。默認的allocator是從堆上分配內存(注:這就是STL容器的默認表現,我們甚至可以改造它從一個網絡數據庫中分配空間,保存數據)。下邊是 vector 類的一部分定義:
template<class T, class A = allocator<T> > class vector { //other stuff }; |
考慮如下聲明:
//User supplied allocator myAlloc vector<int,myAlloc<int> > alocV; |
假設 myAlloc
從共享內存上分配內存,則 alocV
將完全在共享內存上被構造,所以進程A可以如下:
//Attach to shared memory void* rp = (void*)shmat(shmId,NULL,0); //Construct the vector in shared memory //using placement new vector<int>* vpInA = new(rp) vector<int,myAlloc<int> >*; //The vector uses myAlloc<int> to allocate //memory for its internal data structure //from shared memory (*v)[0] = 22; |
進程B可以如下讀出數據:
vector<int>* vpInB = (vector<int,myAlloc<int> >*) shmat (shmId,NULL,0); //Okay since all of the vector is //in shared memory int i = *(vpInB)[0]; |
所有附著在共享內存上的進程都可以安全的使用該vector。在這個例子中,該類的所有內存都在共享內存上分配,同時可以被其他的進程訪問。只要提供一個用戶自定義的allocator,任何STL容器都可以安全的放置到共享內存上。
一個基于共享內存的STL Allocator
清單 shared_allocator.hh 是一個STL Allocator的實現,SharedAllocator
是一個模板類。而 Pool
類完成共享內存的分配與回收。
template<class T>class SharedAllocator { private: Pool pool_; // pool of elements of sizeof(T) public: typedef T value_type; typedef unsigned int size_type; typedef ptrdiff_t difference_type; typedef T* pointer; typedef const T* const_pointer; typedef T& reference; typedef const T& const_reference; pointer address(reference r) const { return &r; } const_pointer address(const_reference r) const {return &r;} SharedAllocator() throw():pool_(sizeof(T)) {} template<class U> SharedAllocator (const SharedAllocator<U>& t) throw(): pool_(sizeof(T)) {} ~SharedAllocator() throw() {}; // space for n Ts pointer allocate(size_t n, const void* hint=0) { return(static_cast<pointer> (pool_.alloc(n))); } // deallocate n Ts, don't destroy void deallocate(pointer p,size_type n) { pool_.free((void*)p,n); return; } // initialize *p by val void construct(pointer p, const T& val) { new(p) T(val); } // destroy *p but don't deallocate void destroy(pointer p) { p->~T(); } size_type max_size() const throw() { pool_.maxSize(); } template<class U> // in effect: typedef SharedAllocator<U> other struct rebind { typedef SharedAllocator<U> other; }; }; template<class T>bool operator==(const SharedAllocator<T>& a, const SharedAllocator<T>& b) throw() { return(a.pool_ == b.pool_); } template<class T>bool operator!=(const SharedAllocator<T>& a, const SharedAllocator<T>& b) throw() { return(!(a.pool_ == b.pool_)); } |
清單pool.hh是 Pool
類定義,其中靜態成員shm_
是類型 shmPool
,保證每個進程只有唯一的一個shmPool
實例。shmPool
ctor 創建并附著所需大小的內存到共享內存上。共享內存的參數,比如 鍵值、段數目、段大小,都通過環境變量傳遞給 shmPool
ctor。成員 segs_
是共享段的數目,segSize_
是每個共享段的大小,成員path_
和key_
用來創建唯一的 ipckey
。shmPool
為每個共享段創建一個信號量(semaphore)用于同步。shmPool
還在為每個共享段構造了一個 Chunk
類,一個 Chunk
代表一個共享段。每個共享段的標識是shmId_
, 信號量 semId_
控制該段的訪問許可,一個指向 Link
結構的指針表明 Chunk
類的剩余列表。
class Pool { private: class shmPool { private: struct Container { containerMap* cont; }; class Chunk { public: Chunk() Chunk(Chunk&); ~Chunk() {} void* alloc(size_t size); void free (void* p,size_t size); private: int shmId_; int semId_; int lock_() }; int key_; char* path_; Chunk** chunks_; size_t segs_; size_t segSize_; Container* contPtr_; int contSemId_; public: shmPool(); ~shmPool(); size_t maxSize(); void* alloc(size_t size); void free(void* p, size_t size); int shmPool::lockContainer() int unLockContainer() containerMap* getContainer() void shmPool::setContainer(containerMap* container) }; private: static shmPool shm_; size_t elemSize_; public: Pool(size_t elemSize); ~Pool() {} size_t maxSize(); void* alloc(size_t size); void free(void* p, size_t size); int lockContainer(); int unLockContainer(); containerMap* getContainer(); void setContainer(containerMap* container); }; inline bool operator==(const Pool& a,const Pool& b) { return(a.compare(b)); } |
把STL容器放入共享內存
假設進程A在共享內存中放入了數個容器,進程B如何找到這些容器呢?一個方法就是進程A把容器放在共享內存中的確定地址上(fixed offsets),則進程B可以從該已知地址上獲取容器。另外一個改進點的辦法是,進程A先在共享內存某塊確定地址上放置一個map容器,然后進程A再創建其他容器,然后給其取個名字和地址一并保存到這個map容器里。進程B知道如何獲取該保存了地址映射的map容器,然后同樣再根據名字取得其他容器的地址。清單container_factory.hh是一個容器工廠類。類Pool
的方法setContainer
把map容器放置在一個已知地址上,方法getContainer
可以重新獲取這個map。該工廠的方法用來在共享內存中創建、獲取和刪除容器。當然,傳遞給容器工廠的容器需要以SharedAllocator
作為allocator。
struct keyComp { bool operator()(const char* key1,const char* key2) { return(strcmp(key1,key2) < 0); } }; class containerMap: public map<char*,void*,keyComp,SharedAllocator<char* > > {}; class containerFactory { public: containerFactory():pool_(sizeof(containerMap)){} ~containerFactory() {} template<class Container> Container* createContainer (char* key,Container* c=NULL); template<class Container> Container* getContainer (char* key,Container* c=NULL); template<class Container> int removeContainer (char* key,Container* c=NULL); private: Pool pool_; int lock_(); int unlock_(); }; |
結論
本文描述的方案可以在共享內存中創建STL容器,其中的一個缺陷是,在分配共享內存之前,應該保證共享內存的總大小(segs_* segSize_
)大于你要保存STL容器的最大長度,因為一旦類Pool
超出了共享內存的,該類無法再分配新的共享內存。
完整的源代碼可以從這里下載:www.cuj.com/code
參考文獻
- Bjarne Stroustrup. The C++ Programming Language, Third Edition (Addison-Wesley, 1997).
- Matthew H. Austern. Generic Programming and the STL: Using and
Extending the C++ Standard Template Library (Addison-Wesley, 1999).
關于作者
Grum Ketema has Masters degrees in Electrical Engineering and Computer Science. With 17 years of experience in software development, he has been using C since 1985, C++ since 1988, and Java since 1997. He has worked at AT&T Bell Labs, TASC, Massachusetts Institute of Technology, SWIFT, BEA Systems, and Northrop.