• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>
            隨筆-167  評論-8  文章-0  trackbacks-0
            這是一個基于ACE的線程庫實現,屬于半同步半異步類型的線程池,感覺實現得非常優雅,代碼是由網上下的好幾份代碼拼湊而成的(ACE的源碼包中的tests目錄下有大量的實例,研究這些例子是學習ACE的好辦法,只是由于注釋都是一堆堆的英文,有時候感覺頭疼,就懶得去看它了)。這個線程池由一個線程池管理器管理著五個線程來處理消息,當五個處理線程都在處理消息時,接收新的消息將導致線程管理器被阻塞。消息處理線程處理完發給自己的消息后將被阻塞,其將重新被管理器管理器放入隊列中。越發感覺到ACE的強大,只可惜我們的程序用不上。一個原因是我們程序本身處理的數據量并不會太大;另外我們的程序只要求跑在Solaris上面,不會出現異構的平臺;最后ACE庫本身太繁雜了,很多地方比如網絡相關的函數我們是不會用的,不過如果現在我們正在使用的網絡庫也使用ACE的話,那么使用ACE簡直再好不過了。
            #include "ace/OS.h"
            #include 
            "ace/Task.h"
            #include  
            "ace/Thread.h"
            #include 
            "ace/Synch.h"

            class Worker;

            class IManager
            {
            public:
                
            virtual int return_to_work (Worker *worker) = 0;
            };

            class Worker : public ACE_Task<ACE_MT_SYNCH>
            {
            public:
                Worker (IManager 
            *manager) : manager_(manager) { }

                
            //線程啟動之后進入本函數
                virtual int svc (void)
                {
                    thread_id_ 
            = ACE_Thread::self();

                    
            //工作線程啟動之后只有收到MB_HANGUP類型的消息它才會退出
                    while (1)
                    {
                        ACE_Message_Block 
            *mb = 0;

                        
            //如果隊列中沒有數據,本線程將被阻塞
                        if (this->getq(mb) == -1)
                            ACE_ERROR_BREAK((LM_ERROR, ACE_TEXT (
            "%p "), ACE_TEXT ("getq")));

                        
            // 如果是MB_HANGUP消息,就結束線程
                        if (mb->msg_type() == ACE_Message_Block::MB_HANGUP)
                        {
                            ACE_DEBUG ((LM_INFO,
                                ACE_TEXT (
            "(%t) Shutting down ")));
                                mb
            ->release();

                            
            break;
                        }

                        
            // Process the message.
                        process_message (mb);

                        
            // Return to work.
                        
            // 這里會將自己放到線程池中,并通過workers_cond_來通知manager
                        this->manager_->return_to_work (this);
                    }

                    
            return 0;
                }

                ACE_thread_t thread_id(
            void)
                {
                    
            return thread_id_;
                }

            private:

                
            //處理消息
                void process_message (ACE_Message_Block *mb)
                {
                    ACE_TRACE (ACE_TEXT (
            "Worker::process_message"));

                    
            int msgId;

                    ACE_OS::memcpy (
            &msgId, mb->rd_ptr(), sizeof(int));

                    mb
            ->release();

                    ACE_DEBUG ((LM_DEBUG,
                        ACE_TEXT (
            "(%t) Started processing message %d "),
                        msgId));

                    ACE_OS::sleep(
            3);

                    ACE_DEBUG ((LM_DEBUG,
                        ACE_TEXT (
            "(%t) Finished processing message %d "),
                        msgId));
                }

                
            //指向線程池管理器
                IManager *manager_;

                
            //保存本線程id號
                ACE_thread_t thread_id_;
            };

            class Manager : public ACE_Task<ACE_MT_SYNCH>public IManager
            {
            public:
                
            enum {POOL_SIZE = 5, MAX_TIMEOUT = 5};

                Manager ()
                    : shutdown_(
            0), workers_lock_(), workers_cond_(workers_lock_)
                {
                    ACE_TRACE (ACE_TEXT (
            "Manager::Manager"));
                }

                
            /* 線程處理函數 */
                
            int svc (void)
                {
                    ACE_TRACE (ACE_TEXT (
            "Manager::svc"));

                    ACE_DEBUG ((LM_INFO, ACE_TEXT (
            "(%t) Manager started ")));

                    
            // Create pool.
                    create_worker_pool();

                    
            while (true)
                    {
                        ACE_Message_Block 
            *mb = 0;
                        ACE_Time_Value tv ((
            long)MAX_TIMEOUT);
                        tv 
            += ACE_OS::time (0);

                        
            // Get a message request.
                        if (this->getq (mb, &tv) < 0)
                        {
                            shut_down ();
                            
            break;
                        }

                        
            // Choose a worker.
                        Worker *worker = 0;

                        
            /* 
                        這對大括號中的代碼從worker線程池中獲取一個工作線程,線程池由
                        this->workers_lock_互斥體加以保護,如果沒有worker可用,manager
                        會阻塞在workers_lock_條件變量上,等待某個線程回來工作
                        
            */
                        {
                            ACE_GUARD_RETURN (ACE_Thread_Mutex,
                                worker_mon, 
            this->workers_lock_, -1);

                            
            /* 
                            阻塞在workers_lock_.wait()上直到有worker可用,當某個worker回來后
                            會把自己放到線程池隊列上,同時通過觸發workers_cond_來通知manager
                            
            */
                            
            while (this->workers_.is_empty ())
                                workers_cond_.wait ();

                            
            /* 將獲取的worker從線程池隊列中刪除 */
                            
            this->workers_.dequeue_head (worker);
                        }

                        
            // Ask the worker to do the job.
                        
            // 將請求消息放入到worker的消息隊列中
                        worker->putq (mb);
                    }

                    
            return 0;
                }

                
            int shut_down (void)
                {
                     ACE_TRACE (ACE_TEXT (
            "ACE_ThreadPool::DestroyPool"));

                     ACE_Unbounded_Queue
            <Worker* >::ITERATOR iter = this->workers_.begin();

                    Worker
            ** worker_ptr = NULL;

                     
            do
                    {
                        iter.next (worker_ptr);

                        Worker 
            *worker = (*worker_ptr);

                        
            // Send the hangup message.
                        ACE_Message_Block *mb;
                        ACE_NEW_RETURN(
                            mb,
                            ACE_Message_Block(
            0,
                            ACE_Message_Block::MB_HANGUP),
                            
            -1);

                        worker
            ->putq(mb);

                        
            // Wait for the exit.
                        worker->wait();

                        ACE_ASSERT (worker
            ->msg_queue()->is_empty ());

                        delete worker;
                     }
            while (iter.advance());

                     
            return 0;
                };

                ACE_thread_t thread_id (Worker 
            *worker);

                
            /* 提供給worker的接口,用于在worker完成處理后,將自己放入到線程池隊列,并通知manager */
                
            virtual int return_to_work (Worker *worker)
                {
                    ACE_GUARD_RETURN (ACE_Thread_Mutex,
                        worker_mon, 
            this->workers_lock_, -1);

                    ACE_DEBUG ((LM_DEBUG,
                        ACE_TEXT (
            "(%t) Worker %u returning to work. "),
                        worker
            ->thr_mgr()->thr_self()));

                    
            // 將worker放入到線程池隊列
                    this->workers_.enqueue_tail (worker);

                    
            // 觸發條件變量,通知manager
                    this->workers_cond_.signal ();

                    
            return 0;
                }

            private:
            // 創建worker線程池
                int create_worker_pool (void)
                {
                    ACE_GUARD_RETURN (ACE_Thread_Mutex,
                        worker_mon,
                        
            this->workers_lock_,
                        
            -1);

                    
            for (int i = 0; i < POOL_SIZE; i++)
                    {
                        Worker 
            *worker;

                        
            // 創建worker
                        ACE_NEW_RETURN (worker, Worker (this), -1);

                        
            // 放入線程池隊列
                        this->workers_.enqueue_tail (worker);

                        
            // 激活線程,調用該函數后,worker線程被創建,由于worker
                        
            // 是ACE_Task的子類,線程激活后,從svc函數開始執行
                        worker->activate ();
                    }

                    
            return 0;
                }

            private:
                
            int shutdown_;

                
            /* workers_lock_ 線程池隊列的互斥體,在對線程池進行操作時,需要通過互斥鎖來保護
                所以在所有的線程池隊列隊列操作前都有這樣的語句:
                    ACE_GUARD_RETURN (ACE_Thread_Mutex,
                    worker_mon, this->workers_lock_, -1);
                    
            */
                ACE_Thread_Mutex workers_lock_;
                ACE_Condition
            <ACE_Thread_Mutex> workers_cond_;

                
            /* 線程池隊列 */
                ACE_Unbounded_Queue
            <Worker* > workers_;
            };

            int ACE_TMAIN (int, ACE_TCHAR *[])
            {
                Manager tp;
                tp.activate ();

                
            // Wait for a moment every time you send a message.
                ACE_Time_Value tv;
                tv.msec (
            100);

                ACE_Message_Block 
            *mb;
                
            for (int i = 0; i < 10; i++)
                {
                    ACE_NEW_RETURN(mb, ACE_Message_Block(
            sizeof(int)), -1);

                    ACE_OS::memcpy (mb
            ->wr_ptr(), &i, sizeof(int));

                    ACE_OS::sleep(tv);

                    
            // Add a new work item.
                    
            // 這里將請求消息首先發到了manager線程,由manager線程負責分發
                    tp.putq (mb);
                }

                
            // 主線程等待子線程結束
                ACE_Thread_Manager::instance()->wait();

                
            return 0;
            }


            posted on 2009-11-02 18:13 老馬驛站 閱讀(2208) 評論(0)  編輯 收藏 引用 所屬分類: ACE
            久久精品国产免费| 亚洲中文字幕无码久久精品1| 久久精品国产久精国产思思| 99久久久精品免费观看国产| 久久99免费视频| 亚洲国产香蕉人人爽成AV片久久| 狠狠色婷婷久久一区二区| 精品久久久久久中文字幕| 久久精品99无色码中文字幕| 色欲综合久久中文字幕网| 久久国产一区二区| 久久精品人人做人人爽电影| 久久青草国产精品一区| 模特私拍国产精品久久| 国产一区二区三精品久久久无广告 | 久久久无码一区二区三区| 亚洲精品国产成人99久久| 伊色综合久久之综合久久| 色诱久久久久综合网ywww| 一本久久久久久久| 三上悠亚久久精品| 亚洲国产综合久久天堂| 久久91精品国产91久久户| 色婷婷综合久久久中文字幕| 亚洲欧洲中文日韩久久AV乱码| 99精品久久精品一区二区| 2021国产精品久久精品| 久久天天躁狠狠躁夜夜不卡| 欧美777精品久久久久网| 欧美黑人激情性久久| 精品久久久久久中文字幕大豆网| 品成人欧美大片久久国产欧美| 久久婷婷五月综合色奶水99啪| 要久久爱在线免费观看| 精品久久人人妻人人做精品| 久久综合九色综合精品| 国产精品久久久久…| 久久Av无码精品人妻系列| 久久亚洲春色中文字幕久久久| 日韩AV无码久久一区二区 | 香蕉久久夜色精品国产2020|