基于云計算的價格查詢實現
上篇博客提到價格查詢功能,當時正在考慮做成云計算模式,所以當時連多線程都沒考慮,就是準備將功能都交給云計算系統的,由云計算內部管理線程和調度問題,所以當時實現就根本不用考慮多線程,現在功能基本實現,下面大致講講我的做法。
國內很多人談到全文檢索就必提lucene,提到云計算就必提google的map/reduce、開源的hadoop、amazon的ec2,似乎只有那些東西才叫云計算,咱是實戰派,沒興趣口舌之爭,在俺看來分布式存儲+分布式計算就叫云計算,俺就看了看google的map/reduce論文,照其思想在win下做了個簡單的job/task調度系統,使其能支撐俺的第一個實戰應用價格查詢,圖示如下:

① 、adminclient承擔管理功能,可查看任務及執行情況,可查看Tasknode機器情況,如果需要可管理Task,目前只支持簡單的幾條命令,adminclient主動連jobserver登錄成功后可發送管理命令。
② 、JobOwner提交一個Job之后返回一個jobid,如果意外斷開可通過下次重連的時候提交jobid和一個sessionid可提取job結果數據,job提交通過提交一個zip包即可,參數等文件都打在包里面,tasknode可直接解包執行里面的dll。Jobowner主動連jobserver,登錄成功后可發job命令。
③ TaskNode是執行具體任務的客戶端,job包用zip打包后發布給tasknode,tasknode參與計算并反饋結果。TaskNode設計成多線程模式,一個線程保持和jobserver的通信,其他線程參與運算,Tasknode可同時執行多個不同的任務,如a線程執行價格查詢,b線程執行hash破解等。Tasknode主動連jobserver,登錄后可接受jobserver分派的任務,由于tasknode是主動連jobserver的,所以即使是內網機器或者任意有閑置資源的機器都可作為Tasknode,不管它是家里的、公司的、還是網吧的,這也是該系統基于windows實現的一個重要前提,因為win的機器是如此的多,在國內win的機器無處不在。
JobServer是job調度器,管理包分發以及任務分割、調度,典型的執行流程是這樣,jobowner提交一個命名的包給jobserver,jobserver將該包部署管理,之后jobowner 可給jobserver提交任務,jobserver收到任務后根據任務指定的包配置執行,如部署包后裝載dll并執行任務分割操作,分割是將一個job分割為多個task,之后再將每個task提交給一個tasknode執行,并管理tasknode的輸出以及可能的出錯,出錯現在的處理是交給另一個tasknode執行,當剩下最后一個tasknode的時候會將該tsaknode同步叫給另一個不同的tasknode執行,不管誰最后成功執行這個tasknode,只要該task執行成功立即結束整個job,并將結果反饋給jobowner,jobowner也可在執行中提交查詢命令,jobserver會將被查詢job當前的輸出返回,這樣碰到需要長時間執行的任務也能適用。
從以上介紹可以看到,具體任務是由包執行的,這個包實際上可能是一個dll,也可能是幾個dll加上一些配置文件組成,之所以設計成這種模式,主要是考慮整個系統在win上方便部署,主dll需要支持幾個固定的接口:
//任務dll初始化函數
typedef bool (*jobtask_init_)(jobtaskfunc *jtfunc, bool tasknode);
//map分割函數
typedef size_t (*jobtask_split_)(jobtaskfunc *jtfunc,
const char *input, size_t len,
std::vector<CAutoBuffer *> &vbuf);
//reduce打包函數
typedef size_t (*jobtask_reduce_)(jobtaskfunc *jtfunc,
std::vector<CAutoBuffer *> &vbuf,
CAutoBuffer &buf);
//Task執行函數
typedef bool (*jobtask_map_)(jobtaskfunc *jtfunc, const char *cmdline, CAutoBuffer &outbuf);
//釋放函數
typedef bool (*jobtask_free_)(jobtaskfunc *jtfunc);
上面init函數主要執行線程相關的初始化,該函數典型的可能是空,或者是
CoInitialize(NULL); 等
Split函數是用來將job輸入分割為N個tasknode輸入的,該函數由jobserver調用,每個tasknode輸入就是map函數的輸入,tasknode的任務就是調用map函數,并傳遞輸入,最后將輸出返回給jobserver,jobserver在需要的時候調用reduce將各個tasknode的輸出打包返回,free函數是個輔助函數,釋放資源的。
熟悉google的map/reduce的應該知道,我的實現簡化了reduce,在我的實現里面并沒有獨立的reduce worker,該任務由jobserver自己做了,這一方面是簡化實現,另方面也是適應需求的結果,畢竟在我的需求里面輸入是很少的(一個典型任務100字節量級),tasknode的計算是很多的,輸出也是不多的(1k量級),所以由jobserver打包整個輸出也很輕松,用不著一組獨立的reduce來管理輸出。另外可以看到上面接口用了我的自定義類CAutoBuffer,這個類主要管理不定長數據的,其實用vector<char>也可,但考慮方便,我的實現內部都用了CAutoBuffer。一個典型的分布式應用只要做一個dll,有上面幾個函數,并輸出一個
struct jobtaskfunc
{
//初始化函數
jobtask_init_ init;
//釋放函數
jobtask_free_ free;
//以下被tasknode調用
jobtask_map_ map;
//以下被jobserver調用
jobtask_split_ split;
jobtask_reduce_ reduce;
};
typedef jobtaskfunc *(WINAPI *create_jobtask_)();
函數即可。
學習map/reduce重要的是學習其思想,并不拘泥于實現形式,我想這大概正是國內環境欠缺的,國內能說得頭頭是道的人太多,能動手干出結果來的人很少,真正坐下來做實事的不多,只喜歡抄抄概念,拿別人的東西過來架設一下,就是這樣的人也能混成大拿。我從map/reduce思想出發,學習其思想,簡化其實現,為實際應用服務,雖然這個東西很簡單,甚至可以說有些簡陋,但實際效果不錯,雖然現在只部署了兩個點,但總體上還是令人滿意的。
實現這個jobserver/tasknode系統并部署價格查詢花了不到兩周時間,實際上花在jobserver、tasknode上的時間大概只有一周多一點,ppsget.dll(具體干活的dll)用正則表達式分析網頁并提取輸出,該dll被應用到多線程環境后也出了一些問題,用boost::reg的時候居然偶爾會出現異常,原以為boost::reg這樣的應用應該是非常明確的,要么找到,要么沒有找到,除此不應該有第三態,沒想到boost::reg這個不爭氣的東西不但不是二態的,還容易出現異常,試用了一下tr1::regex也是類似的問題,無奈只能在外面包了一層異常處理,雖然不再被異常搞死,但一旦出現異常就是很慢的,要10s左右才返回,現在也沒有特別好的辦法,只在異常的時候將頁面保存,事后分析并改寫正則表達式,盡量將正則表達式做小,將非貪婪式查找用少一點。
下面看看我們價格查詢網站 http://www.shprog.com/pps.aspx 的輸出:

那個360的價格居然是圖片,ocr模塊是俺同事搞的,現在識別率能達到99%以上,還是很不錯的。