平臺(tái)服務(wù)和腳本服務(wù)接口對(duì)后端PostgreSQL數(shù)據(jù)庫(kù)的使用目前采用短暫連接方式,造成多次調(diào)用服務(wù)時(shí)頻繁連接和斷開(kāi)數(shù)據(jù)庫(kù),效率很低。
如果共享數(shù)據(jù)庫(kù)連接,則會(huì)造成多線程訪問(wèn)數(shù)據(jù)庫(kù)時(shí)的事務(wù)沖突,故必須采用連接池來(lái)管理對(duì)數(shù)據(jù)庫(kù)的并發(fā)訪問(wèn),某一線程連接到數(shù)據(jù)庫(kù)使用完畢后,不斷開(kāi)數(shù)據(jù)庫(kù)連接,而是把連接歸還給連接池。
另一線程訪問(wèn)數(shù)據(jù)庫(kù)時(shí)會(huì)首先向連接池申請(qǐng)已經(jīng)存在的連接,如果連接池中沒(méi)有空閑連接,或者申請(qǐng)到得連接已經(jīng)超時(shí)失效,再建立新的連接,使用完畢后同樣歸還到連接池。
這樣連接池中的連接數(shù)會(huì)隨著線程壓力的增加逐漸增長(zhǎng),直到所有的線程同時(shí)工作,達(dá)到最多連接數(shù)。
由于一個(gè)線程可能同時(shí)申請(qǐng)多個(gè)連接,故連接數(shù)可能會(huì)大于線程數(shù)。連接池在程序結(jié)束時(shí)銷(xiāo)毀全部連接,或者線程在申請(qǐng)到的某一連接失效時(shí)銷(xiāo)毀該連接。
1
2 class dbConnection
3 {
4 friend class dbConnectionPool;
5 bool is_using;
6
7 public:
8 pqxx::connection pqxx_conn;
9
10 public:
11 dbConnection(string opt)
12 : pqxx_conn(opt),is_using(false)
13 {}
14 };
15
16 typedef boost::shared_ptr<dbConnection> dbConnection_ptr;
17
18 class dbConnectionPool
19 {
20 string m_opt;
21 size_t m_max_num;
22 std::vector<dbConnection_ptr> m_pool;
23 cppx::thread_mutex m_mutex;
24
25 public:
26 dbConnectionPool(string opt,size_t max_num)
27 : m_opt(opt),m_max_num(max_num)
28 {}
29
30 dbConnection_ptr GetConnection(void){
31 ACE_DEBUG((LM_DEBUG,"(%t) LINE %d : %C\n", __LINE__ ,__FUNCTION__));
32
33 cppx::scoped_lock lock(m_mutex);
34 foreach(dbConnection_ptr ptr,m_pool){
35 if( ptr->is_using == false ){
36 ptr->is_using = true;
37
38 if( ptr->pqxx_conn.is_open() ){
39 ACE_DEBUG((LM_DEBUG,"(%t) 找到連接池空閑連接。\n"));
40 return ptr;
41 }
42 }
43 }
44
45 ACE_DEBUG((LM_DEBUG,"(%t) 沒(méi)有空閑連接,創(chuàng)建新連接。[%d]\n",m_pool.size()));
46
47 // 沒(méi)有空閑連接,創(chuàng)建新連接
48 dbConnection_ptr ptr(new dbConnection(m_opt));
49 ptr->is_using = true;
50
51 // 找到一個(gè)失效的連接,用新連接覆蓋
52 bool cover = false;
53 for(size_t idx = 0; idx < m_pool.size(); idx++){
54 dbConnection_ptr ptr2 = m_pool[idx];
55 if( ptr2->is_using == false && ptr2->pqxx_conn.is_open() == false ){
56 m_pool[idx] = ptr;
57 cover = true;
58 ACE_DEBUG((LM_DEBUG,"(%t) 覆蓋失效的連接[%d]。\n",idx));
59 break;
60 }
61 }
62 if( !cover ){
63 m_pool.push_back(ptr);
64 }
65
66 return ptr;
67 }
68
69 void ReleaseConnection(dbConnection_ptr ptr_){
70 ACE_DEBUG((LM_DEBUG,"(%t) LINE %d : %C\n", __LINE__ ,__FUNCTION__));
71 ACE_DEBUG((LM_DEBUG,"(%t) 連接使用完畢,歸還到連接池。[%d]\n",m_pool.size()));
72
73 cppx::scoped_lock lock(m_mutex);
74 ptr_->is_using = false;
75 }
76 };
77
78 class dbConnectionUser
79 {
80 dbConnectionPool & pool;
81
82 public:
83 dbConnection_ptr conn;
84
85 public:
86 dbConnectionUser(dbConnectionPool & pool_) : pool(pool_) {
87 conn = pool.GetConnection();
88 }
89 ~dbConnectionUser(void){
90 pool.ReleaseConnection(conn);
91 }
92 };
93
2 class dbConnection
3 {
4 friend class dbConnectionPool;
5 bool is_using;
6
7 public:
8 pqxx::connection pqxx_conn;
9
10 public:
11 dbConnection(string opt)
12 : pqxx_conn(opt),is_using(false)
13 {}
14 };
15
16 typedef boost::shared_ptr<dbConnection> dbConnection_ptr;
17
18 class dbConnectionPool
19 {
20 string m_opt;
21 size_t m_max_num;
22 std::vector<dbConnection_ptr> m_pool;
23 cppx::thread_mutex m_mutex;
24
25 public:
26 dbConnectionPool(string opt,size_t max_num)
27 : m_opt(opt),m_max_num(max_num)
28 {}
29
30 dbConnection_ptr GetConnection(void){
31 ACE_DEBUG((LM_DEBUG,"(%t) LINE %d : %C\n", __LINE__ ,__FUNCTION__));
32
33 cppx::scoped_lock lock(m_mutex);
34 foreach(dbConnection_ptr ptr,m_pool){
35 if( ptr->is_using == false ){
36 ptr->is_using = true;
37
38 if( ptr->pqxx_conn.is_open() ){
39 ACE_DEBUG((LM_DEBUG,"(%t) 找到連接池空閑連接。\n"));
40 return ptr;
41 }
42 }
43 }
44
45 ACE_DEBUG((LM_DEBUG,"(%t) 沒(méi)有空閑連接,創(chuàng)建新連接。[%d]\n",m_pool.size()));
46
47 // 沒(méi)有空閑連接,創(chuàng)建新連接
48 dbConnection_ptr ptr(new dbConnection(m_opt));
49 ptr->is_using = true;
50
51 // 找到一個(gè)失效的連接,用新連接覆蓋
52 bool cover = false;
53 for(size_t idx = 0; idx < m_pool.size(); idx++){
54 dbConnection_ptr ptr2 = m_pool[idx];
55 if( ptr2->is_using == false && ptr2->pqxx_conn.is_open() == false ){
56 m_pool[idx] = ptr;
57 cover = true;
58 ACE_DEBUG((LM_DEBUG,"(%t) 覆蓋失效的連接[%d]。\n",idx));
59 break;
60 }
61 }
62 if( !cover ){
63 m_pool.push_back(ptr);
64 }
65
66 return ptr;
67 }
68
69 void ReleaseConnection(dbConnection_ptr ptr_){
70 ACE_DEBUG((LM_DEBUG,"(%t) LINE %d : %C\n", __LINE__ ,__FUNCTION__));
71 ACE_DEBUG((LM_DEBUG,"(%t) 連接使用完畢,歸還到連接池。[%d]\n",m_pool.size()));
72
73 cppx::scoped_lock lock(m_mutex);
74 ptr_->is_using = false;
75 }
76 };
77
78 class dbConnectionUser
79 {
80 dbConnectionPool & pool;
81
82 public:
83 dbConnection_ptr conn;
84
85 public:
86 dbConnectionUser(dbConnectionPool & pool_) : pool(pool_) {
87 conn = pool.GetConnection();
88 }
89 ~dbConnectionUser(void){
90 pool.ReleaseConnection(conn);
91 }
92 };
93
使用連接池的方法,只要在建立數(shù)據(jù)庫(kù)連接的類(lèi)中增加連接池成員即可:
1
2 struct ALEE_PlatformService_iPqxx::pimpl_t
3 {
4 #if defined(USE_POOL_DBLINK) && (USE_POOL_DBLINK == 1)
5
6 public:
7 dbConnectionPool m_pool;
8
9 #else
10
2 struct ALEE_PlatformService_iPqxx::pimpl_t
3 {
4 #if defined(USE_POOL_DBLINK) && (USE_POOL_DBLINK == 1)
5
6 public:
7 dbConnectionPool m_pool;
8
9 #else
10
為了方便,定義如下的宏:
1 #define DB_CONNECTION() \
2 dbConnectionUser user(pimpl_->m_pool); \
3 if( !user.conn || !user.conn->pqxx_conn.is_open() ) return false; \
4 pqxx::work X(user.conn->pqxx_conn);
5
2 dbConnectionUser user(pimpl_->m_pool); \
3 if( !user.conn || !user.conn->pqxx_conn.is_open() ) return false; \
4 pqxx::work X(user.conn->pqxx_conn);
5
需要調(diào)用數(shù)據(jù)庫(kù)查詢時(shí),只需要把上面的宏插入try塊中:
1
2 try{
3 DB_CONNECTION();
4
5 fuStringCommand cmd;
6 cmd = pimpl_->m_sqls["apx_get_moi_attr_enum"];
7 cmd << X.quote(moi_hash)
8 << X.quote(attr_code);
9
10 result = new defs::xml_row;
11 return pimpl_->QueryRecord(X, cmd, *result);
12 }
13 catch_pqxx_error();
14
2 try{
3 DB_CONNECTION();
4
5 fuStringCommand cmd;
6 cmd = pimpl_->m_sqls["apx_get_moi_attr_enum"];
7 cmd << X.quote(moi_hash)
8 << X.quote(attr_code);
9
10 result = new defs::xml_row;
11 return pimpl_->QueryRecord(X, cmd, *result);
12 }
13 catch_pqxx_error();
14
一個(gè)簡(jiǎn)單的數(shù)據(jù)庫(kù)連接池就完成了,由于多線程并發(fā)時(shí)動(dòng)態(tài)申請(qǐng)數(shù)據(jù)庫(kù)連接,既可以充分發(fā)揮數(shù)據(jù)庫(kù)并發(fā)的好處有避免了共享數(shù)據(jù)庫(kù)連接造成事務(wù)沖突。完全滿足簡(jiǎn)單的數(shù)據(jù)庫(kù)查詢需要。
等燈。等燈。 Cppx Inside