• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            Khan's Notebook GCC/GNU/Linux Delphi/Window Java/Anywhere

            路漫漫,長修遠,我們不能沒有錢
            隨筆 - 173, 文章 - 0, 評論 - 257, 引用 - 0
            數據加載中……

            boost中asio網絡庫多線程并發處理實現,以及asio在多線程模型中線程的調度情況和線程安全。 (轉載)

            轉載自(http://www.cnblogs.com/lidabo/p/3906055.html)


            1、實現多線程方法:

                其實就是多個線程同時調用io_service::run

            1         for (int i = 0; i != m_nThreads; ++i)
            2         {
            3             boost::shared_ptr<boost::thread> pTh(new boost::thread(
            4                 boost::bind(&boost::asio::io_service::run,&m_ioService)));
            5             m_listThread.push_back(pTh);
            6         }


            2、多線程調度情況:

                asio規定:只能在調用io_service::run的線程中才能調用事件完成處理器。


            注:事件完成處理器就是你async_accept、async_write等注冊的句柄,類似于回調的東西。


            單線程:

                如果只有一個線程調用io_service::run,根據asio的規定,事件完成處理器也只能在這個線程中執行。也就是說,你所有代碼都在同一個線程中運行,因此變量的訪問是安全的。


            多線程:

                如果有多個線程同時調用io_service::run以實現多線程并發處理。對于asio來說,這些線程都是平等的,沒有主次之分。如果你投遞的一個請求比如async_write完成時,asio將隨機的激活調用io_service::run的線程。并在這個線程中調用事件完成處理器(async_write當時注冊的句柄)。如果你的代碼耗時較長,這個時候你投遞的另一個async_write請求完成時,asio將不等待你的代碼處理完成,它將在另外的一個調用io_service::run線程中,調用async_write當時注冊的句柄。也就是說,你注冊的事件完成處理器有可能同時在多個線程中調用。


            當然你可以使用 boost::asio::io_service::strand讓完成事件處理器的調用,在同一時間只有一個, 比如下面的的代碼:

              socket_.async_read_some(boost::asio::buffer(buffer_),
                  strand_.wrap(
                    boost::bind(&connection::handle_read, shared_from_this(),
                      boost::asio::placeholders::error,
                      boost::asio::placeholders::bytes_transferred)));

            ...

            boost::asio::io_service::strand strand_;

             

                此時async_read_som完成后掉用handle_read時,必須等待其它handle_read調用完成時才能被執行(async_read_som引起的handle_read調用)。

                多線程調用時,還有一個重要的問題,那就是無序化。比如說,你短時間內投遞多個async_write,那么完成處理器的調用并不是按照你投遞async_write的順序調用的。asio第一次調用完成事件處理器,有可能是第二次async_write返回的結果,也有可能是第3次的。使用strand也是這樣的。strand只是保證同一時間只運行一個完成處理器,但它并不保證順序。

             


             

            代碼測試:

            服務器:

            將下面的代碼編譯以后,使用cmd命令提示符下傳人參數<IP> <port> <threads>調用

            比如:test.exe 0.0.0.0 3005 10   

            客服端 使用windows自帶的telnet

            cmd命令提示符:

            telnet 127.0.0.1 3005

             

            原理:客戶端連接成功后,同一時間調用100次boost::asio::async_write給客戶端發送數據,并且在完成事件處理器中打印調用序號,和線程ID。

            核心代碼:


             1     void start()
             2     {
             3         for (int i = 0; i != 100; ++i)
             4         {
             5             boost::shared_ptr<string> pStr(new string);
             6             *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
             7             *pStr += "\r\n";
             8             boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
             9                 boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
            10                  boost::asio::placeholders::error,
            11                  boost::asio::placeholders::bytes_transferred,
            12                  pStr,i)
            13                 );
            14         }
            15     }
            16 
            17 //去掉 boost::mutex::scoped_lock lock(m_ioMutex); 效果更明顯。
            18 
            19     void HandleWrite(const boost::system::error_code& error
            20         ,std::size_t bytes_transferred
            21         ,boost::shared_ptr<string> pStr,int nIndex)
            22     {
            23         if (!error)
            24         {
            25             boost::mutex::scoped_lock lock(m_ioMutex);
            26             cout << "發送序號=" << nIndex << ",線程id=" << boost::this_thread::get_id() << endl;
            27         }
            28         else
            29         {
            30             cout << "連接斷開" << endl;
            31         }
            32     }


            完整代碼:

              1 #include <boost/bind.hpp>
              2 #include <boost/shared_ptr.hpp>
              3 #include <boost/enable_shared_from_this.hpp>
              4 #include <boost/asio.hpp>
              5 #include <boost/lexical_cast.hpp>
              6 #include <boost/thread.hpp>
              7 #include <boost/thread/mutex.hpp>
              8 #include <string>
              9 #include <iostream>
             10 
             11 
             12 using std::cout;
             13 using std::endl;
             14 using std::string;
             15 using boost::asio::ip::tcp;
             16 
             17 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
             18 
             19 
             20 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
             21 
             22 class CMyTcpConnection : public boost::enable_shared_from_this<CMyTcpConnection>
             23 {
             24 public:
             25     CMyTcpConnection(boost::asio::io_service &ser)
             26         :m_nSocket(ser)
             27     {
             28     }
             29 
             30     typedef boost::shared_ptr<CMyTcpConnection> CPMyTcpCon;
             31 
             32 
             33     static CPMyTcpCon CreateNew(boost::asio::io_service& io_service) {
             34         return CPMyTcpCon(new CMyTcpConnection(io_service));
             35     }
             36 
             37 
             38    
             39 public:
             40     void start()
             41     {
             42         for (int i = 0; i != 100; ++i) {
             43             boost::shared_ptr<string> pStr(new string);
             44             *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
             45             *pStr += "\r\n";
             46             boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
             47                 boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, pStr,i)
             48                 );
             49         }
             50     }
             51 
             52 
             53     tcp::socket& socket()
             54     {
             55         return m_nSocket;
             56     }
             57 
             58 
             59 private:
             60     void HandleWrite(const boost::system::error_code& error ,std::size_t bytes_transferred ,boost::shared_ptr<string> pStr,int nIndex)
             61     {
             62         if (!error) {
             63             boost::mutex::scoped_lock lock(m_ioMutex);
             64             cout << "發送序號=" << nIndex << ",線程id=" << boost::this_thread::get_id() << endl;
             65         } else {
             66             cout << "連接斷開" << endl;
             67         }
             68     }
             69 
             70 public:
             71 
             72 private:
             73     tcp::socket m_nSocket;
             74     boost::mutex m_ioMutex;
             75 };
             76 
             77 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
             78 
             79 
             80 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
             81 
             82 class CMyService : private boost::noncopyable
             83 {
             84 public:
             85     CMyService(string const &strIP,string const &strPort,int nThreads)
             86         :m_tcpAcceptor(m_ioService)
             87         ,m_nThreads(nThreads)
             88     {
             89         tcp::resolver resolver(m_ioService);
             90         tcp::resolver::query query(strIP,strPort);
             91         tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
             92         boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
             93         m_tcpAcceptor.open(endpoint.protocol());
             94         m_tcpAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
             95         m_tcpAcceptor.bind(endpoint);
             96         m_tcpAcceptor.listen();
             97 
             98 
             99         StartAccept();
            100     }
            101 
            102     ~CMyService(){Stop();}
            103 
            104 public:
            105     void Stop() 
            106     { 
            107         m_ioService.stop();
            108         for (std::vector<boost::shared_ptr<boost::thread>>::const_iterator it = m_listThread.cbegin();
            109             it != m_listThread.cend(); ++ it)
            110         {
            111             (*it)->join();
            112         }
            113     }
            114     void Start() {
            115         for (int i = 0; i != m_nThreads; ++i) {
            116             boost::shared_ptr<boost::thread> pTh( new boost::thread( boost::bind(&boost::asio::io_service::run, &m_ioService) ) );
            117             m_listThread.push_back(pTh);
            118         }
            119     }
            120 private:
            121     void HandleAccept(const boost::system::error_code& error ,boost::shared_ptr<CMyTcpConnection> newConnect)
            122     {
            123         if (!error) {
            124             newConnect->start();
            125         }
            126         StartAccept();
            127     }
            128 
            129 
            130     void StartAccept()
            131     {
            132         CMyTcpConnection::CPMyTcpCon newConnect = CMyTcpConnection::CreateNew(m_tcpAcceptor.get_io_service());
            133         m_tcpAcceptor.async_accept( newConnect->socket(), vboost::bind(&CMyService::HandleAccept, this, boost::asio::placeholders::error, newConnect) );
            134     }
            135 
            136 
            137 private:
            138     boost::asio::io_service m_ioService;
            139     boost::asio::ip::tcp::acceptor m_tcpAcceptor;
            140     std::vector<boost::shared_ptr<boost::thread>> m_listThread;
            141     std::size_t m_nThreads;
            142 };
            143 
            144 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
            145 
            146 
            147 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
            148 
            149 int main(int argc, char* argv[])
            150 {
            151     try
            152     {
            153         if (argc != 4)
            154         {
            155             std::cerr << "<IP> <port> <threads>\n";
            156             return 1;
            157         }
            158         int nThreads = boost::lexical_cast<int>(argv[3]);
            159         CMyService mySer(argv[1], argv[2], nThreads);
            160         mySer.Start();
            161         getchar();
            162         mySer.Stop();
            163     } catch (std::exception& e) {
            164         std::cerr << "Exception: " << e.what() << "\n";
            165     }
            166     return 0;
            167 }



            ## 相關函數介紹

            ### boost::asio::ip::tcp::resolver

            iterator resolve(
                const endpoint_type & e,
                boost::system::error_code & ec);
            Return Value

            A forward-only iterator that can be used to traverse the list of endpoint entries. Returns a default constructed iterator if an error occurs.

            Remarks

            A default constructed iterator represents the end of the list.

            A successful call to this function is guaranteed to return at least one entry.


            tcp::resolver一般和tcp::resolver::query結合用,通過query這個詞顧名思義就知道它是用來查詢socket的相應信息,一般而言我們關心socket的東東有address,port而已,通過 tcp::resolver很容易實現設置和查詢,它通過query把字符串格式的ip如192.168.0.200或主機名 http://localhost,端口“8080”等轉化成socket內部表示格式,這樣我們應用的時候可以直接使用字符串的形式,而且不用再擔心 socket的字節順序轉化問題。示例如下:


            1     boost::asio::io_service io_service ;  
            2     boost::asio::ip::tcp::resolver resolver(io_service);  
            3     boost::asio::ip::tcp::resolver::query query("localhost""9000");

            還 有要說明的是, boost::asio把通訊雙方(server, client)都用endpoint的表示,所以endpoint中的address, port 分別封裝了ip和端口。貌似resolver和endpoint不相干,于是乎出現tcp::resolver::iterator了,它是 resolver的迭代器,其實就是endpoint的指針,那么就可以這樣:
             1     boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);  
             2     boost::asio::ip::tcp::resolver::iterator end;     //默認構造的resolver::iterator,用來代表失敗
             3     boost::system::error_code error = boost::asio::error::host_not_found;  
             4     boost::asio::ip::tcp::endpoint endpoint;  
             5     while (error && endpoint_iterator != end) {  
             6         endpoint = *endpoint_iterator ;    //返回的是迭代器,需要解引用成endpoint
             7         socket.close();    //先close,再connect
             8         socket.connect(endpoint, error);  
             9         endpoint_iterator++ ;  
            10     } 

            得到endpoint后就好說啦,endpoint.address().to_string()就能夠返回string格式的ip地址,endpoint.port()返回端口。

            其實endpoint 完全可以自己構造,方法也是很簡單的,tcp::endpoint(tcp::v4(), (unsigned short)9000) 這個是server端的用法,tcp::v4()直接返回自己的address,如果用于client那么需要設置server的ip ,實現如下:

            1 
            2     boost::system::error_code error = boost::asio::error::host_not_found;  
            3     boost::asio::ip::address add;  
            4     add.from_string("127.0.0.1");  
            5     tcp::endpoint endpoint(add, short(9000));  
            6     socket.connect(endpoint, error);  
            7 

            這樣不使用resolver也是可以的。
            還有更神奇的:

             1 
             2     boost::asio::io_service ioservice ;  
             3     boost::asio::io_service my_io_service ;  
             4     boost::asio::ip::tcp::resolver resolver(my_io_service);  
             5     boost::asio::ip::tcp::resolver::query query("www.google.com""http");  
             6     boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);  
             7     boost::asio::ip::tcp::resolver::iterator end; // End marker.  
             8       
             9     while (iter != end) {  
            10       boost::asio::ip::tcp::endpoint endpoint = *iter++;  
            11       std::cout << endpoint << std::endl;  
            12     }  
            13 


            這樣有發現一個新的用途,通過resolver迭代可以得到多個節點endpoint,比如google 就有好幾個ip。
            上面這個例子的運行結果:

            1         74.125.128.106:80  
            2         74.125.128.147:80  
            3         74.125.128.99:80  
            4         74.125.128.103:80  
            5         74.125.128.104:80  
            6         74.125.128.105:80




            ### boost::mutex::scoped_lock

            boost::mutex::scoped_lock lock(m_ioMutex);

            其依賴RAII機制, 在過了作用域之后對鎖進行自動釋放和回收.其實現代碼如圖所示

            boost_1_60_0/boost/asio/detail/scoped_lock.hpp:

              1 //
              2 // detail/scoped_lock.hpp
              3 // ~~~~~~~~~~~~~~~~~~~~~~
              4 //
              5 // Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
              6 //
              7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
              8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
              9 //
             10 
             11 #ifndef BOOST_ASIO_DETAIL_SCOPED_LOCK_HPP
             12 #define BOOST_ASIO_DETAIL_SCOPED_LOCK_HPP
             13 
             14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
             15 # pragma once
             16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
             17 
             18 #include <boost/asio/detail/noncopyable.hpp>
             19 
             20 #include <boost/asio/detail/push_options.hpp>
             21 
             22 namespace boost {
             23 namespace asio {
             24 namespace detail {
             25 
             26 // Helper class to lock and unlock a mutex automatically.
             27 template <typename Mutex>
             28 class scoped_lock
             29   : private noncopyable
             30 {
             31 public:
             32   // Tag type used to distinguish constructors.
             33   enum adopt_lock_t { adopt_lock };
             34 
             35   // Constructor adopts a lock that is already held.
             36   scoped_lock(Mutex& m, adopt_lock_t)
             37     : mutex_(m),
             38       locked_(true)
             39   {
             40   }
             41 
             42   // Constructor acquires the lock.
             43   explicit scoped_lock(Mutex& m)
             44     : mutex_(m)
             45   {
             46     mutex_.lock();
             47     locked_ = true;
             48   }
             49 
             50   // Destructor releases the lock.
             51   ~scoped_lock()
             52   {
             53     if (locked_)
             54       mutex_.unlock();
             55   }
             56 
             57   // Explicitly acquire the lock.
             58   void lock()
             59   {
             60     if (!locked_)
             61     {
             62       mutex_.lock();
             63       locked_ = true;
             64     }
             65   }
             66 
             67   // Explicitly release the lock.
             68   void unlock()
             69   {
             70     if (locked_)
             71     {
             72       mutex_.unlock();
             73       locked_ = false;
             74     }
             75   }
             76 
             77   // Test whether the lock is held.
             78   bool locked() const
             79   {
             80     return locked_;
             81   }
             82 
             83   // Get the underlying mutex.
             84   Mutex& mutex()
             85   {
             86     return mutex_;
             87   }
             88 
             89 private:
             90   // The underlying mutex.
             91   Mutex& mutex_;
             92 
             93   // Whether the mutex is currently locked or unlocked.
             94   bool locked_;
             95 };
             96 
             97 // namespace detail
             98 // namespace asio
             99 // namespace boost
            100 
            101 #include <boost/asio/detail/pop_options.hpp>
            102 
            103 #endif // BOOST_ASIO_DETAIL_SCOPED_LOCK_HPP
            104 


            ### lexical_cast

            lexical_cast是boost中一個非常有用,常用,好用的庫,我現在的小數據轉換用的都是lexical_cast。
            lexical_cast最大的特點是安全,包括長度安全,類型安全。

            使用方式:

            1 #include <boost/lexical_cast.hpp> 
            2 
            3 const double PI=3.1415926535;
            4 string str;
            5 str = lexical_cast<string>(PI); 
            6 
            7 string str="3.1415926535";
            8 double PI=lexical_cast<double>(str); 
            9 

            如果轉換失敗, 拋出bad_lexical_cast異常

            其實現方式如下:

            boost_1_60_0/boost/lexical_cast.hpp

              1 // Copyright Kevlin Henney, 2000-2005.
              2 // Copyright Alexander Nasonov, 2006-2010.
              3 // Copyright Antony Polukhin, 2011-2014.
              4 //
              5 // Distributed under the Boost Software License, Version 1.0. (See
              6 // accompanying file LICENSE_1_0.txt or copy at
              7 // http://www.boost.org/LICENSE_1_0.txt)
              8 //
              9 // what:  lexical_cast custom keyword cast
             10 // who:   contributed by Kevlin Henney,
             11 //        enhanced with contributions from Terje Slettebo,
             12 //        with additional fixes and suggestions from Gennaro Prota,
             13 //        Beman Dawes, Dave Abrahams, Daryle Walker, Peter Dimov,
             14 //        Alexander Nasonov, Antony Polukhin, Justin Viiret, Michael Hofmann,
             15 //        Cheng Yang, Matthew Bradbury, David W. Birdsall, Pavel Korzh and other Boosters
             16 // when:  November 2000, March 2003, June 2005, June 2006, March 2011 - 2014
             17 
             18 #ifndef BOOST_LEXICAL_CAST_INCLUDED
             19 #define BOOST_LEXICAL_CAST_INCLUDED
             20 
             21 #include <boost/config.hpp>
             22 #ifdef BOOST_HAS_PRAGMA_ONCE
             23 #   pragma once
             24 #endif
             25 
             26 #if defined(BOOST_NO_STRINGSTREAM) || defined(BOOST_NO_STD_WSTRING)
             27 #define BOOST_LCAST_NO_WCHAR_T
             28 #endif
             29 
             30 #include <boost/range/iterator_range_core.hpp>
             31 #include <boost/lexical_cast/bad_lexical_cast.hpp>
             32 #include <boost/lexical_cast/try_lexical_convert.hpp>
             33 
             34 namespace boost 
             35 {
             36     template <typename Target, typename Source>
             37     inline Target lexical_cast(const Source &arg)
             38     {
             39         Target result;
             40 
             41         if (!boost::conversion::detail::try_lexical_convert(arg, result)) {
             42             boost::conversion::detail::throw_bad_cast<Source, Target>();
             43         }
             44 
             45         return result;
             46     }
             47 
             48     template <typename Target>
             49     inline Target lexical_cast(const char* chars, std::size_t count)
             50     {
             51         return ::boost::lexical_cast<Target>(
             52             ::boost::iterator_range<const char*>(chars, chars + count)
             53         );
             54     }
             55 
             56     template <typename Target>
             57     inline Target lexical_cast(const unsigned char* chars, std::size_t count)
             58     {
             59         return ::boost::lexical_cast<Target>(
             60             ::boost::iterator_range<const unsigned char*>(chars, chars + count)
             61         );
             62     }
             63 
             64     template <typename Target>
             65     inline Target lexical_cast(const signed char* chars, std::size_t count)
             66     {
             67         return ::boost::lexical_cast<Target>(
             68             ::boost::iterator_range<const signed char*>(chars, chars + count)
             69         );
             70     }
             71 
             72 #ifndef BOOST_LCAST_NO_WCHAR_T
             73     template <typename Target>
             74     inline Target lexical_cast(const wchar_t* chars, std::size_t count)
             75     {
             76         return ::boost::lexical_cast<Target>(
             77             ::boost::iterator_range<const wchar_t*>(chars, chars + count)
             78         );
             79     }
             80 #endif
             81 #ifndef BOOST_NO_CXX11_CHAR16_T
             82     template <typename Target>
             83     inline Target lexical_cast(const char16_t* chars, std::size_t count)
             84     {
             85         return ::boost::lexical_cast<Target>(
             86             ::boost::iterator_range<const char16_t*>(chars, chars + count)
             87         );
             88     }
             89 #endif
             90 #ifndef BOOST_NO_CXX11_CHAR32_T
             91     template <typename Target>
             92     inline Target lexical_cast(const char32_t* chars, std::size_t count)
             93     {
             94         return ::boost::lexical_cast<Target>(
             95             ::boost::iterator_range<const char32_t*>(chars, chars + count)
             96         );
             97     }
             98 #endif
             99 
            100 // namespace boost
            101 
            102 #undef BOOST_LCAST_NO_WCHAR_T
            103 
            104 #endif // BOOST_LEXICAL_CAST_INCLUDED
            105 
            106 



            posted on 2017-10-14 11:44 Khan 閱讀(1764) 評論(0)  編輯 收藏 引用 所屬分類: GCC/G++跨平臺開發

            久久久无码人妻精品无码| 久久综合给合综合久久| 久久久精品国产| 日韩久久久久中文字幕人妻| 久久香蕉综合色一综合色88| 97久久超碰国产精品2021| 久久99精品久久久久久hb无码| 性欧美丰满熟妇XXXX性久久久| 久久天天躁狠狠躁夜夜2020一 | 青青热久久国产久精品 | 国内精品久久人妻互换| 久久综合给合久久狠狠狠97色69| 狠狠色综合网站久久久久久久高清 | 99久久99久久精品国产片果冻| 思思久久99热只有频精品66| 久久精品免费一区二区| 国内精品九九久久精品| 97久久婷婷五月综合色d啪蜜芽 | 无码国内精品久久人妻| 国产成年无码久久久久毛片| 天天久久狠狠色综合| 久久最新免费视频| 精品久久久久久久国产潘金莲| 亚洲色婷婷综合久久| 久久99精品国产麻豆宅宅| 国产免费福利体检区久久| 中文精品99久久国产 | 93精91精品国产综合久久香蕉| 久久九九免费高清视频| 成人久久免费网站| 久久青草国产精品一区| 久久笫一福利免费导航| 久久精品国产亚洲AV高清热| 国产亚州精品女人久久久久久| 亚洲精品无码久久久久AV麻豆| 久久久久久午夜成人影院| 久久精品国产亚洲AV不卡| 久久亚洲欧美国产精品| 久久久久久极精品久久久| 久久精品国产99久久久| 91麻豆国产精品91久久久|