• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            loop_in_codes

            低調做技術__歡迎移步我的獨立博客 codemaro.com 微博 kevinlynx

            #

            Dhtcrawler2換用sphinx搜索

            dhtcrawler2最開始使用mongodb自帶的全文搜索引擎搜索資源。搜索一些短關鍵字時很容易導致erlang進程call timeout,也就是查詢時間太長。對于像avi這種關鍵字,搜索時間長達十幾秒。搜索的資源數量200萬左右。這其中大部分資源只是對root文件名進行了索引,即對于多文件資源而言沒有索引單個文件名。索引方式有部分資源是按照字符串子串的形式,沒有拆詞,非常占用存儲空間;有部分是使用了rmmseg(我編譯了rmmseg-cpp作為erlang nif庫調用 erl-rmmseg)進行了拆詞,占用空間小了很多,但由于詞庫問題很多片里的詞匯沒拆出來。

            很早以前我以為搜索耗時的原因是因為數據庫太忙,想部署個mongodb集群出來。后來發現數據庫沒有任何讀寫的狀態下,查詢依然慢。終于只好放棄mongodb自帶的文本搜索。于是我改用sphinx。簡單起見,我直接下載了coreseek4.1(sphinx的一個支持中文拆詞的包裝)。

            現在,已經導入了200多萬的資源進sphinx,并且索引了所有文件名,索引文件達800M。對于avi關鍵字的搜索大概消耗0.2秒的時間。搜索試試

            以下記錄下sphinx在dhtcrawler的應用

            sphinx簡介

            sphinx包含兩個主要的程序:indexer和searchd。indexer用于建立文本內容的索引,然后searchd基于這些索引提供文本搜索功能,而要使用該功能,可以遵循searchd的網絡協議連接searchd這個服務來使用。

            indexer可以通過多種方式來獲取這些文本內容,文本內容的來源稱為數據源。sphinx內置mysql這種數據源,意思是可以直接從mysql數據庫中取得數據。sphinx還支持xmlpipe2這種數據源,其數據以xml格式提供給indexer。要導入mongodb數據庫里的內容,可以選擇使用xmlpipe2這種方式。

            sphinx document

            xmlpipe2數據源需要按照以下格式提交:

            <sphinx:docset>
                <sphinx:schema>
                    <sphinx:field name="subject"/>
                    <sphinx:field name="files"/>
                    <sphinx:attr name="hash1" type="int" bits="32"/>
                    <sphinx:attr name="hash2" type="int" bits="32"/>
                </sphinx:schema>
                <sphinx:document id="1">
                    <subject>this is the subject</subject>
                    <files>file content</files>
                    <hash1>111</hash1>
                </sphinx:document>
            </sphinx:docset>
            

            該文件包含兩大部分:schemadocuments,其中schema又包含兩部分:fieldattr,其中由field標識的字段就會被indexer讀取并全部作為輸入文本建立索引,而attr則標識查詢結果需要附帶的信息;documents則是由一個個sphinx:document組成,即indexer真正要處理的數據。注意其中被schema引用的屬性名。

            document一個很重要的屬性就是它的id。這個id對應于sphinx需要唯一,查詢結果也會包含此id。一般情況下,此id可以直接是數據庫主鍵,可用于查詢到詳細信息。searchd搜索關鍵字,其實可以看作為搜索這些document,搜索出來的結果也是這些document,搜索結果中主要包含schema中指定的attr。

            增量索引

            數據源的數據一般是變化的,新增的數據要加入到sphinx索引文件中,才能使得searchd搜索到新錄入的數據。要不斷地加入新數據,可以使用增量索引機制。增量索引機制中,需要一個主索引和一個次索引(delta index)。每次新增的數據都建立為次索引,然后一段時間后再合并進主索引。這個過程主要還是使用indexer和searchd程序。實際上,searchd是一個需要一直運行的服務,而indexer則是一個建立完索引就退出的工具程序。所以,這里的增量索引機制,其中涉及到的“每隔一定時間就合并”這種工作,需要自己寫程序來協調(或通過其他工具)

            sphinx與mongodb

            上面提到,一般sphinx document的id都是使用的數據庫主鍵,以方便查詢。但mongodb中默認情況不使用數字作為主鍵。dhtcrawler的資源數據庫使用的是資源info-hash作為主鍵,這無法作為sphinx document的id。一種解決辦法是,將該hash按位拆分,拆分成若干個sphinx document attr支持位數的整數。例如,info-hash是一個160位的id,如果使用32位的attr(高版本的sphinx支持64位的整數),那么可以把該info-hash按位拆分成5個attr。而sphinx document id則可以使用任意數字,只要保證不沖突就行。當獲得查詢結果時,取得對應的attr,組合為info-hash即可。

            mongodb默認的Object id也可以按這種方式拆分。

            dhtcrawler2與sphinx

            dhtcrawler2中我自己寫了一個導入程序。該程序從mongodb中讀出數據,數據到一定量時,就輸出為xmlpipe2格式的xml文件,然后建立為次索引,最后合并進主索引。過程很簡單,包含兩次啟動外部進程的工作,這個可以通過erlang中os:cmd完成。

            值得注意的是,在從mongodb中讀數據時,使用skip基本是不靠譜的,skip 100萬個數據需要好幾分鐘,為了不增加額外的索引字段,我只好在created_at字段上加索引,然后按時間段來讀取資源,這一切都是為了支持程序關閉重啟后,可以繼續上次工作,而不是重頭再來。200萬的數據,已經處理了好幾天了。

            后頭數據建立好了,需要在前臺展示出來。erlang中似乎只有一個sphinx客戶端庫:giza。這個庫有點老,寫成的時候貌似還在使用sphinx0.9版本。其中查詢代碼包含了版本判定,已經無法在我使用的sphinx2.x版本中使用。無奈之下我只好修改了這個庫的源碼,幸運的是查詢功能居然是正常的,意味著sphinx若干個版本了也沒改動通信協議?后來,我為了取得查詢的統計信息,例如消耗時間以及總結果,我再一次修改了giza的源碼。新的版本可以在我的github上找到:my giza,看起來我沒侵犯版本協議吧?

            目前dhtcrawler的搜索,先是基于sphinx搜索出hash列表,然后再去mongodb中搜索hash對應的資源。事實上,可以為sphinx的document直接附加這些資源的描述信息,就可以避免去數據庫查詢。但我想,這樣會增加sphinx索引文件的大小,擔心會影響搜索速度。實際測試時,發現數據庫查詢有時候還真的很消耗時間,盡管我做了分頁,以使得單頁僅對數據庫進行少量查詢。

            xml unicode

            在導入xml到sphinx的索引過程中,本身我輸出的內容都是unicode的,但有很多資源會導致indexer解析xml出錯。出錯后indexer直接停止對當前xml的處理。后來查閱資料發現是因為這些無法被indexer處理的xml內容包含unicode里的控制字符,例如 ä (U+00E4)。我的解決辦法是直接過濾掉這些控制字符。unicode的控制字符參看UTF-8 encoding table and Unicode characters。在erlang中干這個事居然不復雜:

            strip_invalid_unicode(<<>>) ->
                <<>>;
            strip_invalid_unicode(<<C/utf8, R/binary>>) ->
                case is_valid_unicode(C) of
                    true ->
                        RR = strip_invalid_unicode(R),
                        <<C/utf8, RR/binary>>;
                    false ->
                        strip_invalid_unicode(R)
                end;
            strip_invalid_unicode(<<_, R/binary>>) ->
                strip_invalid_unicode(R).
                
            is_valid_unicode(C) when C < 16#20 ->
                false;
            is_valid_unicode(C) when C >= 16#7f, C =< 16#ff ->
                false;
            is_valid_unicode(_) ->
                true.
            

            posted @ 2013-08-08 23:04 Kevin Lynx 閱讀(2736) | 評論 (0)編輯 收藏

            磁力搜索第二版-dhtcrawler2

            上篇

            下載使用

            目前為止dhtcrawler2相對dhtcrawler而言,數據庫部分調整很大,DHT部分基本沿用之前。但單純作為一個爬資源的程序而言,DHT部分可以進行大幅削減,這個以后再說。這個版本更快、更穩定。為了方便,我將編譯好的erlang二進制文件作為git的主分支,我還添加了一些Windows下的批處理腳本,總之基本上下載源碼以后即可運行。

            項目地址:https://github.com/kevinlynx/dhtcrawler2

            使用方法

            • 下載erlang,我測試的是R16B版本,確保erl等程序被加入Path環境變量
            • 下載mongodb,解壓即用:

                mongod --dbpath xxx --setParameter textSearchEnabled=true
              
            • 下載dhtcrawler2

                git clone https://github.com/kevinlynx/dhtcrawler2.git
              
            • 運行win_start_crawler.bat

            • 運行win_start_hash.bat
            • 運行win_start_http.bat
            • 打開localhost:8000查看stats

            爬蟲每次運行都會保存DHT節點狀態,早期運行的時候收集速度會不夠。dhtcrawler2將程序分為3部分:

            • crawler,即DHT爬蟲部分,僅負責收集hash
            • hash,準確來講叫hash reader,處理爬蟲收集的hash,處理過程主要涉及到下載種子文件
            • http,使用hash處理出來的數據庫,以作為Web端接口

            我沒有服務器,但程序有被部署在別人的服務器上:bt.cmhttp://222.175.114.126:8000/

            其他工具

            為了提高資源索引速度,我陸續寫了一些工具,包括:

            • import_tors,用于導入本地種子文件到數據庫
            • tor_cache,用于下載種子到本地,僅僅提供下載的功能,hash_reader在需要種子文件時,可以先從本地取
            • cache_indexer,目前hash_reader取種子都是從torrage.com之類的種子緩存站點取,這些站點提供了種子列表,cache_indexer將這些列表導入數據庫,hash_reader在請求種子文件前可以通過該數據庫檢查torrage.com上有無此種子,從而減少多余的http請求

            這些工具的代碼都被放在dhtcrawler2中,可以查看對應的啟動腳本來查看具體如何啟動。

            OS/Database

            根據實際的測試效果來看,當收集的資源量過百萬時(目前bt.cm錄入近160萬資源),4G內存的Windows平臺,mongodb很容易就會掛掉。掛掉的原因全是1455,頁面文件太小。有人建議不要在Windows下使用mongodb,Linux下我自己沒做過測試。

            mongodb可以部署為集群形式(replica-set),當初我想把http部分的查詢放在一個只讀的mongodb實例上,但因為建立集群時,要同步已有的10G數據庫,而每次同步都以mongodb掛掉結束,遂放棄。在目前bt.cm的配置中,數據庫torrent的鎖比例(db lock)很容易上50%,這也讓http在搜索時,經常出現搜索超時的情況。

            技術信息

            dhtcrawler最早的版本有很多問題,修復過的最大的一個問題是關于erlang定時器的,在DHT實現中,需要對每個節點每個peer做超時處理,在erlang中的做法直接是針對每個節點注冊了一個定時器。這不是問題,問題在于定時器資源就像沒有GC的內存資源一樣,是會由于程序員的代碼問題而出現資源泄漏。所以,dhtcrawler第一個版本在節點數配置在100以上的情況下,用不了多久就會內存耗盡,最終導致erlang虛擬機core dump。

            除了這個問題以外,dhtcrawler的資源收錄速度也不是很快。這當然跟數據庫和獲取種子的速度有直接關系。尤其是獲取種子,使用的是一些提供info-hash到種子映射的網站,通過HTTP請求來下載種子文件。我以為通過BT協議直接下載種子會快些,并且實時性也要高很多,因為這個種子可能未被這些緩存網站收錄,但卻可以直接向對方請求得到。為此,我還特地翻閱了相關協議,并且用erlang實現了(以后的文章我會講到具體實現這個協議)。

            后來我懷疑get_peers的數量會不會比announce_peer多,但是理論上一般的客戶端在get_peers之后都是announce_peer,但是如果get_peers查詢的peers恰好不在線呢?這意味著很多資源雖然已經存在,只不過你恰好暫時請求不到。實際測試時,發現get_peers基本是announce_peer數量的10倍。

            將hash的獲取方式做了調整后,dhtcrawler在幾分鐘以內以幾乎每秒上百個新增種子的速度工作。然后,程序掛掉。

            從dhtcrawler到今天為止的dhtcrawler2,中間間隔了剛好1個月。我的所有業余時間全部撲在這個項目上,面臨的問題一直都是程序的內存泄漏、資源收錄的速度不夠快,到后來又變為數據庫壓力過大。每一天我都以為我將會完成一個穩定版本,然后終于可以去干點別的事情,但總是干不完,目前完沒完都還在觀察。我始終明白在做優化前需要進行詳盡的數據收集和分析,從而真正地優化到正確的點上,但也總是憑直覺和少量數據分析就開始嘗試。

            這里談談遇到的一些問題。

            erlang call timeout

            最開始遇到erlang中gen_server:call出現timeout錯誤時,我還一直以為是進程死鎖了。相關代碼讀來讀去,實在覺得不可能發生死鎖。后來發現,當erlang虛擬機壓力上去后,例如內存太大,但沒大到耗盡系統所有內存(耗進所有內存基本就core dump了),進程間的調用就會出現timeout。

            當然,內存占用過大可能只是表象。其進程過多,進程消息隊列太長,也許才是導致出現timeout的根本原因。消息隊列過長,也可能是由于發生了消息泄漏的緣故。消息泄漏我指的是這樣一種情況,進程自己給自己發消息(當然是cast或info),這個消息被處理時又會發送相同的消息,正常情況下,gen_server處理了一個該消息,就會從消息隊列里移除它,然后再發送相同的消息,這不會出問題。但是當程序邏輯出問題,每次處理該消息時,都會發生多余一個的同類消息,那消息隊列自然就會一直增長。

            保持進程邏輯簡單,以避免這種邏輯錯誤。

            erlang gb_trees

            我在不少的地方使用了gb_trees,dht_crawler里就可能出現gb_trees:get(xxx, nil)這種錯誤。乍一看,我以為我真的傳入了一個nil值進去。然后我苦看代碼,以為在某個地方我會把這個gb_trees對象改成了nil。但事情不是這樣的,gb_tress使用一個tuple作為tree的節點,當某個節點沒有子節點時,就會以nil表示。

            gb_trees:get(xxx, nil)類似的錯誤,實際指的是xxx沒有在這個gb_trees中找到。

            erlang httpc

            dht_crawler通過http協議從torrage.com之類的緩存網站下載種子。最開始我為了盡量少依賴第三方庫,使用的是erlang自帶的httpc。后來發現程序有內存泄漏,google發現erlang自帶的httpc早為人詬病,當然也有大神說在某個版本之后這個httpc已經很不錯。為了省事,我直接換了ibrowse,替換之后正常很多。但是由于沒有具體分析測試過,加之時間有點遠了,我也記不太清細節。因為早期的http請求部分,沒有做數量限制,也可能是由于我的使用導致的問題。

            某個版本后,我才將http部分嚴格地與hash處理部分區分開來。相較數據庫操作而言,http請求部分慢了若干數量級。在hash_reader中將這兩塊分開,嚴格限制了提交給httpc的請求數,以獲得穩定性。

            對于一個復雜的網絡系統而言,分清哪些是耗時的哪些是不大耗時的,才可能獲得性能的提升。對于hash_reader而言,處理一個hash的速度,雖然很大程度取決于數據庫,但相較http請求,已經快很多。它在處理這些hash時,會將數據庫已收錄的資源和待下載的資源分離開,以盡快的速度處理已存在的,而將待下載的處理速度交給httpc的響應速度。

            erlang httpc ssl

            ibrowse處理https請求時,默認和erlang自帶的httpc使用相同的SSL實現。這經常導致出現tls_connection進程掛掉的錯誤,具體原因不明。

            erlang調試

            首先合理的日志是任何系統調試的必備。

            我面臨的大部分問題都是內存泄漏相關,所以依賴的erlang工具也是和內存相關的:

            • 使用etop,可以檢查內存占用多的進程、消息隊列大的進程、CPU消耗多的進程等等:

                spawn(fun() -> etop:start([{output, text}, {interval, 10}, {lines, 20}, {sort, msg_q }]) end).
              
            • 使用erlang:system_info(allocated_areas).檢查內存使用情況,其中會輸出系統timer數量

            • 使用erlang:process_info查看某個具體的進程,這個甚至會輸出消息隊列里的消息

            hash_writer/crawler

            crawler本身僅收集hash,然后寫入數據庫,所以可以稱crawler為hash_writer。這些hash里存在大量的重復。hash_reader從數據庫里取出這些hash然后做處理。處理過程會首先判定該hash對應的資源是否被收錄,沒有收錄就先通過http獲取種子。

            在某個版本之后,crawler會簡單地預先處理這些hash。它緩存一定數量的hash,接收到新hash時,就合并到hash緩存里,以保證緩存里沒有重復的hash。這個重復率經過實際數據分析,大概是50%左右,即收到的100個請求里,有50個是重復的。這樣的優化,不僅會降低hash數據庫的壓力,hash_reader處理的hash數量少了,也會對torrent數據庫有很大提升。

            當然進一步的方案可以將crawler和hash_reader之間交互的這些hash直接放在內存中處理,省去中間數據庫。但是由于mongodb大量使用虛擬內存的緣故(內存映射文件),經常導致服務器內存不夠(4G),內存也就成了珍稀資源。當然這個方案還有個弊端是難以權衡hash緩存的管理。crawler收到hash是一個不穩定的過程,在某些時間點這些hash可能爆多,而hash_reader處理hash的速度也會不太穩定,受限于收到的hash類別(是新增資源還是已存在資源)、種子請求速度、是否有效等。

            當然,也可以限制緩存大小,以及對hash_reader/crawler處理速度建立關系來解決這些問題。但另一方面,這里的優化是否對目前的系統有提升,是否是目前系統面臨的最大問題,卻是需要考究的事情。

            cache indexer

            dht_crawler是從torrage.com等網站獲取種子文件,這些網站看起來都是使用了相同的接口,其都有一個sync目錄,里面存放了每天每個月索引的種子hash,例如 http://torrage.com/sync/。這個網站上是否有某個hash對應的種子,就可以從這些索引中檢查。

            hash_reader在處理新資源時,請求種子的過程中發現大部分在這些服務器上都沒有找到,也就是發起的很多http請求都是404回應,這不但降低了系統的處理能力、帶寬,也降低了索引速度。所以我寫了一個工具,先手工將sync目錄下的所有文件下載到本地,然后通過這個工具 (cache indexer) 將這些索引文件里的hash全部導入數據庫。在以后的運行過程中,該工具僅下載當天的索引文件,以更新數據庫。 hash_reader 根據配置,會首先檢查某個hash是否存在該數據庫中,存在的hash才可能在torrage.com上下載得到。

            種子緩存

            hash_reader可以通過配置,將下載得到的種子保存在本地文件系統或數據庫中。這可以建立自己的種子緩存,但保存在數據庫中會對數據庫造成壓力,尤其在當前測試服務器硬件環境下;而保存為本地文件,又特別占用硬盤空間。

            基于BT協議的種子下載

            通過http從種子緩存里取種子文件,可能會沒有直接從P2P網絡里取更實時。目前還沒來得及查看這些種子緩存網站的實現原理。但是通過BT協議獲取種子會有點麻煩,因為dht_crawler是根據get_peer請求索引資源的,所以如果要通過BT協議取種子,那么這里還得去DHT網絡里查詢該種子,這個查詢過程可能會較長,相比之下會沒有http下載快。而如果通過announce_peer來索引新資源的話,其索引速度會大大降低,因為announce_peer請求比get_peer請求少很多,幾乎10倍。

            所以,這里的方案可能會結合兩者,新開一個服務,建立自己的種子緩存。

            中文分詞

            mongodb的全文索引是不支持中文的。我在之前提到,為了支持搜索中文,我將字符串拆成了若干子串。這樣的后果就是字符串索引會稍稍偏大,而且目前這一塊的代碼還特別簡單,會將很多非文字字符也算在內。后來我加了個中文分詞庫,使用的是rmmseg-cpp。我將其C++部分抽離出來編譯成erlang nif,這可以在我的github上找到。

            但是這個庫拆分中文句子依賴于詞庫,而這個詞庫不太新,dhtcrawler爬到的大部分資源類型你們也懂,那些詞匯拆出來的比率不太高,這會導致搜索出來的結果沒你想的那么直白。當然更新詞庫應該是可以解決這個問題的,目前還沒有時間顧這一塊。

            總結

            一個老外對我說過,”i have 2 children to feed, so i will not do this only for fun”。

            你的大部分編程知識來源于網絡,所以稍稍回饋一下不會讓你丟了飯碗。

            我很窮,如果你能讓我收獲金錢和編程成就,還不會嫌我穿得太邋遢,that’s really kind of you。

            posted @ 2013-07-20 16:37 Kevin Lynx 閱讀(5701) | 評論 (1)編輯 收藏

            使用erlang實現P2P磁力搜索-實現

            上篇,本篇談談一些實現細節。

            這個爬蟲程序主要的問題在于如何獲取P2P網絡中分享的資源,獲取到資源后索引到數據庫中,搜索就是自然而然的事情。

            DHT

            DHT網絡本質上是一個用于查詢的網絡,其用于查詢一個資源有哪些計算機正在下載。每個資源都有一個20字節長度的ID用于標示,稱為infohash。當一個程序作為DHT節點加入這個網絡時,就會有其他節點來向你查詢,當你做出回應后,對方就會記錄下你。對方還會詢問其他節點,當對方開始下載這個infohash對應的資源時,他就會告訴所有曾經詢問過的節點,包括你。這個時候就可以確定,這個infohash對應的資源在這個網絡中是有效的。

            關于這個網絡的工作原理,參看:P2P中DHT網絡爬蟲以及寫了個磁力搜索的網頁

            獲取到infohash后能做什么?關鍵點在于,我們現在使用的磁力鏈接(magnet url),是和infohash對應起來的。也就是拿到infohash,就等于拿到一個磁力鏈接。但是這個爬蟲還需要建立資源的信息,這些信息來源于種子文件。種子文件其實也是對應到一個資源,種子文件包含資源名、描述、文件列表、文件大小等信息。獲取到infohash時,其實也獲取到了對應的計算機地址,我們可以在這些計算機上下載到對應的種子文件。

            但是我為了簡單,在獲取到infohash后,從一些提供映射磁力鏈到種子文件服務的網站上直接下載了對應的種子。dhtcrawler里使用了以下網站:

            http://torrage.com
            https://zoink.it
            http://bt.box.n0808.com
            

            使用這些網站時,需提供磁力哈希(infohash可直接轉換),構建特定的URL,發出HTTP請求即可。

               U1 = "http://torrage.com/torrent/" ++ MagHash ++ ".torrent",
                U2 = "https://zoink.it/torrent/" ++ MagHash ++ ".torrent",
                U3 = format_btbox_url(MagHash),
            
            format_btbox_url(MagHash) ->
                H = lists:sublist(MagHash, 2),
                T = lists:nthtail(38, MagHash),
                "http://bt.box.n0808.com/" ++ H ++ "/" ++ T ++ "/" ++ MagHash ++ ".torrent".
            

            但是,以一個節點的身份加入DHT網絡,是無法獲取大量查詢的。在DHT網絡中,每個節點都有一個ID。每個節點在查詢信息時,僅詢問離信息較近的節點。這里的信息除了infohash外還包含節點,即節點詢問一個節點,這個節點在哪里。DHT的典型實現中(Kademlia),使用兩個ID的xor操作來確定距離。既然距離的計算是基于ID的,為了盡可能獲取整個DHT網絡交換的信息,爬蟲程序就可以建立盡可能多的DHT節點,讓這些節點的ID均勻地分布在ID取值區間內,以這樣的方式加入網絡。

            在dhtcrawler中,我使用以下方式產生了N個大致均勻分布的ID:

            create_discrete_ids(1) ->
                [dht_id:random()];
            create_discrete_ids(Count) ->
                Max = dht_id:max(),
                Piece = Max div Count,
                [random:uniform(Piece) + Index * Piece || Index <- lists:seq(0, Count - 1)].
            

            除了盡可能多地往DHT網絡里部署節點之外,對單個節點而言,也有些注意事項。例如應盡可能快地將自己告訴盡可能多的節點,這可以在啟動時進行大量的隨機infohash的查詢。隨著查詢過程的深入,該節點會與更多的節點打交道。因為DHT網絡里的節點實際上是不穩定的,它今天在線,明天后天可能不在線,所以計算你的ID固定,哪些節點與你較近,本身就是個相對概念。節點在程序退出時,也最好將自己的路由信息(與自己交互的節點列表)保存起來,這樣下次啟動時就可以更快地加入網絡。

            在dhtcrawler的實現中,每個節點每個一定時間,都會向網絡中隨機查詢一個infohash,這個infohash是隨機產生的。其查詢目的不在于infohash,而在于告訴更多的節點,以及在其他節點上保持自己的活躍。

            handle_event(startup, {MyID}) ->
                timer:apply_interval(?QUERY_INTERVAL, ?MODULE, start_tell_more_nodes, [MyID]).
            
            start_tell_more_nodes(MyID) ->
                spawn(?MODULE, tell_more_nodes, [MyID]).
            
            tell_more_nodes(MyID) ->
                [search:get_peers(MyID, dht_id:random()) || _ <- lists:seq(1, 3)].
            

            DHT節點的完整實現是比較繁瑣的,涉及到查詢以及繁雜的各種對象的超時(節點、桶、infohash),而超時的處理并不是粗暴地做刪除操作。因為本身是基于UDP協議,你得對這些超時對象做進一步的查詢才能正確地進一步做其他事情。而搜索也是個繁雜的事情,遞歸地查詢節點,感覺上,你不一定離目標越來越近,由于被查詢節點的不確定性(無法確定對方是否在玩弄你,或者本身對方就是個傻逼),你很可能接下來要查詢的節點反而離目標變遠了。

            在我第一次的DHT實現中,我使用了類似transmission里DHT實現的方法,不斷無腦遞歸,當搜索有太久時間沒得到響應后終止搜索。第二次實現時,我就使用了etorrent里的實現。這個搜索更聰明,它記錄搜索過的節點,并且檢查是否離目標越來越遠。當遠離目標時,就認為搜索是不太有效的,不太有效的搜索嘗試幾次就可以放棄。

            實際上,爬蟲的實現并不需要完整地實現DHT節點的正常功能。爬蟲作為一個DHT節點的唯一動機僅是獲取網絡里其他節點的查詢。而要完成這個功能,你只需要裝得像個正常人就行。這里不需要保存infohash對應的peer列表,面臨每一次查詢,你隨便回復幾個節點地址就可以。但是這里有個責任問題,如果整個DHT網絡有2000個節點,而你這個爬蟲就有1000個節點,那么你的隨意回復,就可能導致對方根本找不到正確的信息,這樣你依然得不到有效的資源。(可以利用這一點破壞DHT網絡)

            DHT的實現沒有使用第三方庫。

            種子

            種子文件的格式同DHT網絡消息格式一樣,使用一種稱為bencode的文本格式來編碼。種子文件分為兩類:單個文件和多個文件。

            文件的信息無非就是文件名、大小。文件名可能包含utf8編碼的名字,為了后面處理的方便,dhtcrawler都會優先使用utf8編碼。

               {ok, {dict, Info}} = dict:find(<<"info">>, TD),
                case type(Info) of
                    single -> {single, parse_single(Info)};
                    multi -> {multi, parse_multi(Info)}
                end.
            parse_single(Info) ->
                Name = read_string("name", Info),
                {ok, Length} = dict:find(<<"length">>, Info),
                {Name, Length}.
            
            parse_multi(Info) ->
                Root = read_string("name", Info),
                {ok, {list, Files}} = dict:find(<<"files">>, Info),
                FileInfo = [parse_file_item(Item) || {dict, Item} <- Files],
                {Root, FileInfo}.
            

            數據庫

            我最開始在選用數據庫時,為了不使用第三方庫,打算使用erlang自帶的mnesia。但是因為涉及到字符串匹配搜索,mnesia的查詢語句在我看來太不友好,在經過一些資料查閱后就直接放棄了。

            然后我打算使用couchdb,因為它是erlang寫的,而我正在用erlang寫程序。第一次接觸非關系型數據庫,發現NoSQL數據庫使用起來比SQL類的簡單多了。但是在erlang里要使用couchdb實在太折騰了。我使用的客戶端庫是couchbeam。

            因為couchdb暴露的API都是基于HTTP協議的,其數據格式使用了json,所以couchbeam實際上就是對各種HTTP請求、回應和json的包裝。但是它竟然使用了ibrowse這個第三方HTTP客戶端庫,而不是erlang自帶的。ibrowse又使用了jiffy這個解析json的庫。這個庫更慘烈的是它的解析工作都是交給C語言寫的動態庫來完成,我還得編譯那個C庫。

            couchdb看起來不支持字符串查詢,我得自己創建一個view,這個view里我通過翻閱了一些資料寫了一個將每個doc的name拆分成若干次查詢結果的map。這個map在處理每一次查詢時,我都得動態更新之。couchdb是不支持局部更新的,這還不算大問題。然后很高興,終于支持字符串查詢了。這里的字符串查詢都是基于字符串的子串查詢。但是問題在于,太慢了。每一次在WEB端的查詢,都直接導致erlang進程的call超時。

            要讓couchdb支持字符串查詢,要快速,當然是有解決方案的。但是這個時候我已經沒有心思繼續折騰,任何一個庫、程序如果接口設計得如此不方便,那就可以考慮換一個其他的。

            我選擇了mongodb。同樣的基于文檔的數據庫。2.4版本還支持全文搜索。什么是全文搜索呢,這是一種基于單詞的全文搜索方式。hello world我可以搜索hello,基于單詞。mongodb會自動拆詞。更關鍵更讓人爽的是,要開啟這個功能非常簡單:設置啟動參數、建立索引。沒了。mongodb的erlang客戶端庫mongodb-erlang也只依賴一個bson-erlang庫。然后我又埋頭苦干,幾個小時候我的這個爬蟲程序就可以在瀏覽器端搜索關鍵字了。

            后來我發現,mongodb的全文搜索是不支持中文的。因為它還不知道中文該怎么拆詞。恰好我有個同事做過中文拆詞的研究,看起來涉及到很復雜的算法。直到這個時候,我他媽才醒悟,我為什么需要基于單詞的搜索。我們大部分的搜索其實都是基于子字符串的搜索。

            于是,我將種子文件的名字拆分成了若干個子字符串,將這些子字符串以數組的形式作為種子文檔的一個鍵值存儲,而我依然還可以使用全文索引,因為全文索引會將整個字符串作為單詞比較。實際上,基于一般的查詢方式也是可以的。當然,索引還是得建立。

            使用mongodb時唯一讓我很不爽的是mongodb-erlang這個客戶端庫的文檔太欠缺。這還不算大問題,因為看看源碼參數還是可以大概猜到用法。真正悲劇的是mongodb的有些查詢功能它是不支持的。例如通過cursor來排序來限制數量。在cursor模塊并沒有對應的mongodb接口。最終我只好通過以下方式查詢,我不明白batchsize,但它可以工作:

            search_announce_top(Conn, Count) ->
                Sel = {'$query', {}, '$orderby', {announce, -1}},
                List = mongo_do(Conn, fun() ->
                    Cursor = mongo:find(?COLLNAME, Sel, [], 0, Count), 
                    mongo_cursor:rest(Cursor)
                end),
                [decode_torrent_item(Item) || Item <- List].
            

            另一個悲劇的是,mongodb-erlang還不支持文檔的局部更新,它的update接口直接要求傳入整個文檔。幾經折騰,我可以通過runCommand來完成:

            inc_announce(Conn, Hash) when is_list(Hash) ->
                Cmd = {findAndModify, ?COLLNAME, query, {'_id', list_to_binary(Hash)}, 
                    update, {'$inc', {announce, 1}},
                    new, true},
                Ret = mongo_do(Conn, fun() ->
                    mongo:command(Cmd)
                end).
            

            Unicode

            不知道在哪里我看到過erlang說自己其實是不需要支持unicode的,因為這門語言本身是通過list來模擬字符串。對于unicode而言,對應的list保存的本身就是整數值。但是為了方便處理,erlang還是提供了一些unicode操作的接口。

            因為我需要將種子的名字按字拆分,對于a中文這樣的字符串而言,我需要拆分成以下結果:

            a
            a中
            a中文
            中
            中文
            文
            

            那么,在erlang中當我獲取到一個字符串list時,我就需要知道哪幾個整數合起來實際上對應著一個漢字。erlang里unicode模塊里有幾個函數可以將unicode字符串list對應的整數合起來,例如:[111, 222, 333]可能表示的是一個漢字,將其轉換以下可得到[111222333]這樣的形式。

            split(Str) when is_list(Str) ->
                B = list_to_binary(Str), % 必須轉換為binary
                case unicode:characters_to_list(B) of
                    {error, L, D} ->
                        {error, L, D};
                    {incomplete, L, D} ->
                        {incomplete, L, D};
                    UL ->
                    {ok, subsplit(UL)}
                end.
            
            subsplit([]) ->
                [];
            
            subsplit(L) ->
                [_|R] = L,
                {PreL, _} = lists:splitwith(fun(Ch) -> not is_spliter(Ch) end, L),
                [unicode:characters_to_binary(lists:sublist(PreL, Len)) 
                    || Len <- lists:seq(1, length(PreL))] ++ subsplit(R).
            

            除了這里的拆字之外,URL的編碼、數據庫的存儲都還好,沒遇到問題。

            注意,以上針對數據庫本身的吐槽,完全基于我不熟悉該數據庫的情況下,不建議作為你工具選擇的參考。

            erlang的穩定性

            都說可以用erlang來編寫高容錯的服務器程序。看看它的supervisor,監視子進程,自動重啟子進程。天生的容錯功能,就算你宕個幾次,單個進程自動重啟,整個程序看起來還穩健地在運行,多牛逼啊。再看看erlang的進程,輕量級的語言特性,就像OOP語言里的一個對象一樣輕量。如果說使用OOP語言寫程序得think in object,那用erlang你就得think in process,多牛逼多駭人啊。

            實際上,以我的經驗來看,你還得以傳統的思維去看待erlang的進程。一些多線程程序里的問題,在erlang的進程環境中依然存在,例如死鎖。

            在erlang中,對于一些異步操作,你可以通過進程間的交互將這個操作包裝成同步接口,例如ping的實現,可以等到對方回應之后再返回。被阻塞的進程反正很輕量,其包含的邏輯很單一。這不但是一種良好的包裝,甚至可以說是一種erlang-style。但這很容易帶來死鎖。在最開始的時候我沒有注意這個問題,當爬蟲節點數上升的時候,網絡數據復雜的時候,似乎就出現了死鎖型宕機(進程互相等待太久,直接timeout)。

            另一個容易在多進程環境下出現的問題就是消息依賴的上下文改變問題。當投遞一個消息到某個進程,到這個消息被處理之前,這段時間這個消息關聯的邏輯運算所依賴的上下文環境改變了,例如某個ets元素不見了,在處理這個消息時,你還得以多線程編程的思維來編寫代碼。

            至于supervisor,這玩意你得端正態度。它不是用來包容你的傻逼錯誤的。當你寫下傻逼代碼導致進程頻繁崩潰的時候,supervisor屁用沒有。supervisor的唯一作用,僅僅是在一個確實本身可靠的系統,確實人品問題萬分之一崩潰了,重啟它。畢竟,一個重啟頻率的推薦值,是一個小時4次。

            posted @ 2013-06-20 20:40 Kevin Lynx 閱讀(5221) | 評論 (1)編輯 收藏

            使用erlang實現P2P磁力搜索(開源)

            接上回對DHT網絡的研究,我用erlang克隆了一個磁力搜索引擎。我這個實現包含了完整的功能,DHT網絡的加入、infohash的接收、種子的獲取、資源信息的索引、搜索。

            如下圖:

            screenshot

            在我的筆記本上,我開啟了100個DHT節點,大致均勻地分布在DHT網絡里,資源索引速度大概在1小時一萬個左右(包含重復資源)。

            這個程序包含三大部分:

            這兩個項目總共包含大概2500行的erlang代碼。其中,DHT實現部分將DHT網絡的加入包裝成一個庫,爬蟲部分在搜索種子時,暫時沒有使用P2P里的種子下載方式,而是使用現成的磁力鏈轉種子的網站服務,這樣我只需要使用erlang自帶的HTTP客戶端就可以獲取種子信息。爬蟲在獲取到種子信息后,將數據存儲到mongodb里。WEB端我為了盡量少用第三方庫,我只好使用erlang自帶的HTTP服務器,因此網頁內容的創建沒有模板系統可用,只好通過字符串構建,編寫起來不太方便。

            使用

            整個程序依賴了兩個庫:bson-erlang和mongodb-erlang,但下載依賴庫的事都可以通過rebar解決,項目文件里我已經包含了rebar的執行程序。我僅在Windows7上測試過,但理論上在所有erlang支持的系統上都可以。

            • 下載安裝mongodb
            • 進入mongodb bin目錄啟動mongodb,數據庫目錄保存在db下,需手動建立該目錄

                mongod --dbpath db --setParameter textSearchEnabled=true
              
            • 下載erlang,我使用的是R16B版本

            • 下載dhtcrawler,不需要單獨下載kdht,待會下載依賴項的時候會自動下載

                git clone git@github.com:kevinlynx/dhtcrawler.git
              
            • cmd進入dhtcrawler目錄,下載依賴項前需保證環境變量里有git,例如D:\Program Files (x86)\Git\cmd,需注意不要將bash的目錄加入進來,使用以下命令下載依賴項

                rebar get-deps
              
            • 編譯

                rebar compile
              
            • 在dhtcrawler目錄下,啟動erlang

                erl -pa ebin
              
            • 在erlang shell里運行爬蟲,erlang語句以點號(.)作為結束

                crawler_app:start().
              
            • erlang shell里運行HTTP服務器

                crawler_http:start().
              
            • 瀏覽器里輸入localhost:8000/index.html,這個時候還沒有索引到資源,建議監視網絡流量以觀察爬蟲程序是否正確工作

            爬蟲程序啟動時會讀取priv/dhtcrawler.config配置文件,該文件里配置了DHT節點的UDP監聽端口、節點數量、數據庫地址等,可自行配置。

            接下來我會談談各部分的實現方法。

            posted @ 2013-06-20 14:44 Kevin Lynx 閱讀(7819) | 評論 (9)編輯 收藏

            使用ActionScript開發Ice Web客戶端

            我們目前的項目服務器端使用了Ice來構建。Ice有一套自己的網絡協議,客戶端和服務器端可以基于此協議來交互。由于Ice使用Slice這種中間語言來描述服務器和客戶端的交互接口,所以它可以做到極大限度地屏蔽網絡協議這個細節。也就是說,我們只要借助Ice和Slice,我們可以輕松地編寫網絡程序。

            然后,我們的后端現在需要一個運行在Web瀏覽器上的客戶端。要與Ice做交互,如果使用TCP協議的話,得保證是長連接的。但HTTP是短連接的。而另一方面,我們還需要選擇一個Ice支持的和Web相關的語言來做這件事情。如果要在瀏覽器端直接與Ice服務建立連接,可供選擇的語言/平臺包括:

            • Flash
            • Silverlight

            因為我之前用erlang簡單寫了個Ice的客戶端庫,所以我對Ice底層協議有一定了解,可以不一定使用Ice支持的語言,所以HTML5其實也是個選擇。此外,如果在瀏覽器端使用Applet,Java可能也是個選擇。

            其實幾個月前在這塊的技術選擇問題上我就做過簡單的研究,當時確定的方案是使用Flash。但是,后來人員招聘上遇到了問題,看起來要招一個會ActionScript和前端頁面技術的程序員來做我們這種項目,似乎大材小用,成本顯高了。

            那么,考慮到團隊里有現成的Java程序員,而且看起來招一個會用Java寫網站的程序員簡單又便宜,似乎是排除技術原因的最好選擇。但是,如果不在瀏覽器端直接連接服務器來做交互,而是讓Web服務器端來做中轉的話,會面臨不少問題:

            • 瀏覽器端操作結果的獲取問題,說白了就是非實時了,得用Ajax等等技術去模擬實時,代價就是不斷輪訓,也就是通常說的poll
            • Web服務器端需要編寫大量代碼:對用戶操作的映射,結果緩存等等

            如果能用Flash包裝與服務器交互的部分,而把UI相關的東西留給HTML/JS/CSS去做,那是不是可行一點?如果只是用ActionScript編寫與服務器端的交互邏輯代碼,我就不需要花時間去系統學習ActionScript,甚至如何用Flash做界面,我甚至不用搞懂這些技術之間的關系。基本上看些Ice for ActionScript的例子代碼,就可以完成這件事情。

            以下記錄一些主要的過程/方法:

            ActionScript程序的開發

            開發一個嵌入到網頁中的FLASH,只需要Flex SDK。SDK里自帶了一些編譯器相關工具。我不打算使用IDE,因為看起來IDE更復雜。簡單的google之后,基本就可以構建出一個Flash文件:

            • 構建基本的程序需要一個mxml文件,這個文件里主要用來捕獲Flash在頁面上初始化完成這個事件,以初始化內部邏輯
            • 編寫ActionScript源碼,看起來其文件、類的組織方式很像Java
            • 使用Flex SDK中的mxmlc程序來編譯,生成swf文件,例如:

              mxmlc myflexapp.mxml -library-path+=xxx.swc

            • 嵌入到網頁中,簡單的做法可以借助swfobject.js這個庫,嵌入的方式:

                 <script type="text/javascript" src="swfobject.js"></script>
                <script type="text/javascript">
                    var flashvars = {};
                    var params = {};
                  params.play = "true";
                    params.quality = "high";
                    params.bgcolor = "white";
                    params.allowscriptaccess = "always";
                    params.allowfullscreen = "true";
                    var attributes = {};
                    attributes.id = "asclient";
                    attributes.name = "asclient";
                    attributes.align = "middle";
                    swfobject.embedSWF("asclient.swf", "flashContent", "1", "1",
                        "0", "", 
                        flashvars, params, attributes);
                    swfobject.createCSS("#flashContent", "display:none;");
                </script>
            

            自然,頁面中需加入flashContent這個div:

                 <div id="flashContent">
                    <p>no flash</p>
                </div>
            

            我的mxml文件也很簡單:

            <?xml version="1.0" encoding="utf-8"?>
            <s:Application 
                xmlns:fx="http://ns.adobe.com/mxml/2009" 
                xmlns:s="library://ns.adobe.com/flex/spark" 
                xmlns:mx="library://ns.adobe.com/flex/mx"
                applicationComplete="doApplicationComplete()" >
                <fx:Script>
                <![CDATA[
                   import ASClient.Coordinator;
                   import flash.external.ExternalInterface;
            
                   private var _coordinator:Coordinator;
            
                   public function doApplicationComplete():void
                   {
                        trace("doApplicationComplete");
                        _coordinator = new Coordinator();
                        _coordinator.reg_methods();
                        ExternalInterface.call("as_ready"); 
                   } 
                 ]]>
                </fx:Script>
            </s:Application>
            

            ActionScript日志

            我通過日志來調試ActionScript代碼。最簡單的方式就是通過trace函數來輸出日志。要成功輸出日志包含以下步驟:

            • 給瀏覽器安裝調試版本的Flash Player
            • 日志是輸出到用戶目錄下的,并且需要手動創建日志目錄(Administrator替換為用戶名):

                C:\Users\Administrator\AppData\Roaming\Macromedia\Flash Player\Logs
              
            • 用戶目錄下新建配置文件mm.cfg:

                AS3StaticProfile=0
                AS3Verbose=0
                TraceOutputFileEnable=1 
                TraceOutputBuffered=0
                ErrorReportingEnable=1  
                AS3Trace=0
              
            • 編譯DEBUG版本的Flash文件,可以修改flex sdk下的flex-config.xml文件,里面增加debug=true配置即可

            在開發過程中需要注意瀏覽器緩存問題,當編譯出新的Flash文件后,瀏覽器即使頁面刷新也可能使用的是緩存里的Flash。當然,最重要的,是通過瀏覽器來訪問這個包含了Flash的網頁,Web服務器隨意。

            Flash Policy文件

            在Flash的某個版本后,Flash中如果要向外建立socket連接,是首先要取得目標主機返回的policy文件的。也就是在建立連接前,Flash底層會先向目標主機詢問得到一個描述訪問權限的文件。

            簡單來說,目標主機需要在843端口上建立TCP監聽,一旦有網絡連接,就發送以下內容,內容后需添加0x00用于標示結束。(當然,具體細節還挺多,自行google)

            <cross-domain-policy>
                 <allow-access-from domain="*" to-ports="*" />
            </cross-domain-policy>
            

            最開始我使用的是朋友給的現成的Policy服務,雖然我寫的Flash可以成功連接我的Ice服務,但始終要等待2秒以上的時間。google Flash Policy相關內容,可以發現確實存在一個延時,但那是因為目標主機沒有在843端口服務。后來我自己用erlang寫了個Policy服務,延時就沒有了。猜測可能是他的Policy服務沒有添加0x00作為結束導致。

            ActionScript與JavaScript的交互

            既然我使用ActionScript來包裝與服務器的交互,那么JavaScript就必然需要和ActionScript通信。這個通信過程也就是在JavaScript中調用ActionScript中的函數,反過來亦然。這個過程很簡單:

            在JavaScript中調用ActionScript函數:

            首先是ActionScript需要注冊哪些函數可以被調用:

            ExternalInterface.addCallback("service_loadall", loadAll);
            

            通過ExternalInterface.addCallback注冊的函數,其實是個closure,所以在類中注冊自己的成員函數都可以(因為成員函數會使用this,形成了一個closure)。

            然后在JavaScript中調用:

                function asObject() {
                    // asclient是嵌入Flash時填入的name和(或?)id
                    return window.document.asclient;
                }
                var as = asObject();
                as.service_loadall();
            

            在ActionScript中調用JavaScript中調用則更簡單,一句話:

            ExternalInterface.call("service_load_done", args);
            

            至于在兩者之間的函數參數傳遞,其類型都可以自動映射。但因為我的應用里數據較為復雜,我就將數據轉換為JSON格式,在JavaScript這邊操作較為簡單。

            頁面切換

            這里我們需要的Web前端頁面,更像是一個管理系統,所以頁面切換是很有可能的。問題在于,當出現頁面跳轉時,Flash對象會重新初始化,新的頁面無法使用前一個頁面建立好的網絡連接(或者能?)。為了解決這個問題,服務器當然可以設計一種重登錄機制,方便客戶端以一種特殊的方式進入系統,繞過正常的登錄環節。但是我們使用了Glacier2這個網關,在這個網關上有針對連接的超時管理,這樣反復建立新的連接對資源太浪費了。

            綜上,我想只能通過前端去規避這個問題。例如,前端開發人員依然可以分開設計很多頁面,頁面里也可以使用正常的鏈接。我們編寫一些JavaScript代碼,將頁面里的鏈接替換成對應的JavaScript代碼,動態載入新的頁面內容,然后對頁面內的部分內容進行替換,從而盡可能讓頁面設計人員編寫正常的網頁,同時也解決頁面切換問題。

            這是個蹩腳的方法,但在我有限的前端知識體系下,似乎也只能這樣干了。

            posted @ 2013-06-10 21:30 Kevin Lynx 閱讀(2150) | 評論 (0)編輯 收藏

            P2P中DHT網絡爬蟲

            DHT網絡爬蟲基于DHT網絡構建了一個P2P資源搜索引擎。這個搜索引擎不但可以用于構建DHT網絡中活躍的資源索引(活躍的資源意味著該網絡中肯定有人至少持有該資源的部分數據),還可以分析出該網絡中的熱門分享資源。小蝦不久前發布了一個這樣的搜索引擎:磁力搜索。他也寫博客對此稍作了介紹:寫了個磁力搜索的網頁 - 收錄最近熱門分享的資源。網絡上其實也有其他人做了類似的應用:DHT monitoringCrawling Bittorrent DHT

            但是他的這篇文章僅介紹了DHT網絡的大致工作原理,并且這個爬蟲的具體工作原理也沒有提到。對此我查閱了些文章及代碼,雖然從原理上梳理出了整個實現方案,但很多細節還是不甚清楚。所以本文僅作概要介紹。

            DHT/Magnet/Torrent

            在P2P網絡中,要通過種子文件下載一個資源,需要知道整個P2P網絡中有哪些計算機正在下載/上傳該資源。這里將這些提供某個資源下載的計算機定義為peer。傳統的P2P網絡中,存在一些tracker服務器,這些服務器的作用主要用于跟蹤某個資源有哪些關聯的peer。下載這個資源當然得首先取得這些peer。

            DHT的出現用于解決當tracker服務器不可用時,P2P客戶端依然可以取得某個資源的peer。DHT解決這個問題,是因為它將原來tracker上的資源peer信息分散到了整個網絡中。這里將實現了DHT協議的計算機定義為節點(node)。通常一個P2P客戶端程序既是peer也是節點。DHT網絡有多種實現算法,例如Kademlia。

            當某個P2P客戶端通過種子文件下載資源時,如果沒有tracker服務器,它就會向DHT網絡查詢這個資源對應的peer列表。資源的標識在DHT網絡中稱為infohash,是一個20字節長的字符串,一般通過sha1算法獲得,也就是一個類似UUID的東西。

            實際上,種子文件本身就對應著一個infohash,這個infohash是通過種子文件的文件描述信息動態計算得到。一個種子文件包含了對應資源的描述信息,例如文件名、文件大小等。Magnet,這里指的是磁力鏈接,它是一個類似URL的字符串地址。P2P軟件通過磁力鏈接,會下載到一個種子文件,然后根據該種子文件繼續真實資源的下載。

            磁力鏈接中包含的最重要的信息就是infohash。這個infohash一般為40字節或32字節,它其實只是資源infohash(20字節)的一種編碼形式。

            Kademlia

            Kademlia是DHT網絡的一種實現。網絡上關于這個算法的文章,主要是圍繞整個DHT網絡的實現原理進行論述。個人覺得這些文章很蛋疼,基本上讀了之后對于要如何去實現一個DHT客戶端還是沒有概念。這里主要可參考P2P中DHT網絡介紹,以及BitTorrent網站上的DHT協議描述

            Kad的主要目的是用于查詢某個資源對應的peer列表,而這個peer列表實際上是分散在整個網絡中。網絡中節點數量很大,如果要獲得peer列表,最簡單的做法無非就是依次詢問網絡中的每個節點。這當然不可行。所以在Kad算法中,設立了一個路由表。每一個節點都有一份路由表。這個是按照節點之間的距離關系構建出來的。節點之間的距離當然也有特定的算法定義,在Kad中通過對兩個節點的ID進行異或操作得到。節點的ID和infohash通過相同算法構建,都是20字節長度。節點和infohash之間也有距離關系,實際上表示的是節點和資源的距離關系。

            有了這個路由表之后,再通過一個基于距離關系的查找算法,就可以實現不用挨個遍歷就找到特定的節點。而查找資源peer這個操作,正是基于節點查找這個過程。

            路由表的實現,按我的理解,有點類似一般的hash表結構。在這個表中有160個桶,稱為K桶,這個桶的數量在實現上可以動態增長。每個桶保存有限個元素,例如K取值為8,指的就是這個桶最多保存8個元素。每個元素就是一個節點,節點包含節點ID、地址信息以及peer信息。這些桶可以通過距離值索引得到,即距離值會經過一個hash算法,使其值落到桶的索引范圍內。

            要加入一個DHT網絡,需要首先知道這個網絡中的任意一個節點。如何獲得這個節點?在一些開源的P2P軟件中,會提供一些節點地址,例如transmission中提供的dht.transmissionbt.com:6881。

            協議

            Kad定義了節點之間的交互協議。這些協議支撐了整個DHT網絡里信息分布式存儲的實現。這些協議都是使用UDP來傳送。其協議格式使用一種稱為bencode的編碼方式來編碼協議數據。bencode是一種文本格式的編碼,它還用于種子文件內的信息編碼。

            Kad協議具體格式可參考BitTorrent的定義:DHT Protocol。這些協議包括4種請求:ping,find_node,get_peer,announce_peer。在有些文檔中這些請求的名字會有不同,例如announce_peer又被稱為store,get_peer被稱為find_value。這4種請求中,都會有對應的回應消息。其中最重要的消息是get_peer,其目的在于在網絡中查找某個資源對應的peer列表。

            值得一提的是,所有這些請求,包括各種回應,都可以用于處理該消息的節點構建路由表。因為路由表本質就是存儲網絡中的節點信息。

            ping

            用于確定某個節點是否在線。這個請求主要用于輔助路由表的更新。

            find_node

            用于查找某個節點,以獲得其地址信息。當某個節點接收到該請求后,如果目標節點不在自己的路由表里,那么就返回離目標節點較近的K個節點。這個消息可用于節點啟動時構建路由表。通過find_node方式構建路由表,其實現方式為向DHT網絡查詢自己。那么,接收該查詢的節點就會一直返回其他節點了列表,查詢者遞歸查詢,直到無法查詢為止。那么,什么時候無法繼續查詢呢?這一點我也不太清楚。每一次查詢得到的都是離目標節點更接近的節點集,那么理論上經過若干次遞歸查詢后,就無法找到離目標節點更近的節點了,因為最近的節點是自己,但自己還未完全加入網絡。這意味著最后所有節點都會返回空的節點集合,這樣就算查詢結束?

            實際上,通過find_node來構建路由表,以及順帶加入DHT網絡,這種方式什么時候停止在我看來并不重要。路由表的構建并不需要在啟動時構建完成,在以后與其他節點的交互過程中,路由表本身就會慢慢地得到構建。在初始階段盡可能地通過find_node去與其他節點交互,最大的好處無非就是盡早地讓網絡中的其他節點認識自己。

            get_peer

            通過資源的infohash獲得資源對應的peer列表。當查詢者獲得資源的peer列表后,它就可以通過這些peer進行資源下載了。收到該請求的節點會在自己的路由表中查找該infohash,如果有收錄,就返回對應的peer列表。如果沒有,則返回離該infohash較近的若干個節點。查詢者若收到的是節點列表,那么就會遞歸查找。這個過程同find_node一樣。

            值得注意的是,get_peer的回應消息里會攜帶一個token,該token會用于稍后的announce_peer請求。

            announce_peer

            該請求主要目的在于通知,通知其他節點自己開始下載某個資源。這個消息用于構建網絡中資源的peer列表。當一個已經加入DHT網絡的P2P客戶端通過種子文件開始下載資源時,首先在網絡中查詢該資源的peer列表,這個過程通過get_peer完成。當某個節點從get_peer返回peer時,查詢者開始下載,然后通過announce_peer告訴返回這個peer的節點。

            announce_peer中會攜帶get_peer回應消息里的token。關于這一點,我有一個疑問是,在P2P中DHT網絡介紹文檔中提到:

            (announce_peer)同時會把自己的peer信息發送給先前的告訴者和自己K桶中的k個最近的節點存儲該peer-list信息

            不管這里提到的K的最近的節點是離自己最近,還是離資源infohash最近的節點,因為處理announce_peer消息時,有一個token的驗證過程。但是這K個節點中,并沒有在之前創建對應的token。我通過transmission中的DHT實現做了個數據收集,可以證明的是,announce_peer消息是不僅僅會發給get_peer的回應者的。

            DHT爬蟲

            DHT爬蟲是一個遵循Kad協議的假節點程序。具體可以參考小蝦發布的那個網站應用:磁力搜索

            這個爬蟲的實現方式,主要包含以下內容:

            • 通過其他節點的announce_peer發來的infohash確認網絡中有某個資源可被下載
            • 通過從網絡中獲取這個資源的種子文件,來獲得該資源的描述

            通過累計收集得到的資源信息,就可以提供一個資源搜索引擎,或者構建資源統計信息。以下進一步描述實現細節。整個爬蟲的實現依賴了一個很重要的信息,那就是資源的infohash實際上就是一個磁力鏈接(當然需要包裝一下數據)。這意味著一旦我們獲得了一個infohash,我們就等于獲得了一個種子。

            獲得資源通知

            當爬蟲程序加入DHT網絡后,它總會收到其他節點發來的announce_peer消息。announce_peer消息與get_peer消息里都帶了資源的infohash,但是get_peer里的infohash對應的資源在該網絡中不一定存在,即該資源沒有任何可用peer。而announce_peer則表示已經確認了該網絡中有節點正在下載該資源,也即該資源的數據確實存在該網絡中。

            所以,爬蟲程序需要盡最大努力地獲取其他節點發來的announce_peer消息。如果announce_peer消息會發送給離消息發送節點較近的節點,那么,一方面,爬蟲程序應該將自己告訴網絡中盡可能多的節點。這可以通過一次完整的find_node操作實現。另一方面,爬蟲程序內部實現可以部署多個DHT節點,總之目的在于盡可能地讓爬蟲程序稱為其他節點的較近者。

            當收集到infohash之后,爬蟲程序還需要通過該infohash獲得對應資源的描述信息。

            獲取資源信息

            獲得資源描述信息,其實就是通過infohash獲得對應的種子文件。這需要實現P2P協議里的文件分享協議。種子文件的獲取其實就是一個文件下載過程,下載到種子文件之后,就可以獲取到資源描述。這個過程一種簡單的方法,就是從infohash構建出一個磁力鏈接,然后交給一個支持磁力下載的程序下載種子。

            從infohash構建出磁力鏈接非常簡單,只需要將infohash編碼成磁力鏈接的xt字段即可,構建實現可以從transmission源碼里找到:

            /* 這個算法其實和printf("%02x", sha1[i])一樣 */
            void tr_sha1_to_hex (char *out, const unsigned char *sha1)
            {
            int i;
            static const char hex[] = "0123456789abcdef";
            for (i=0; i<20; ++i) {
            const unsigned int val = *sha1++;
            *out++ = hex[val >> 4];
            *out++ = hex[val & 0xf];
            }
            *out = '\0';
            }
            void appendMagnet(FILE *fp, const unsigned char *info_hash) {
            char out[48];
            tr_sha1_to_hex(out, info_hash);
            fprintf(fp, "magnet:?xt=urn:btih:%s", out);
            }
            

            現在你就可以做一個實驗,在transmission的DHT實現中,在announce_peer消息的處理代碼中,將收到的infohash通過上面的appendMagnet轉換為磁力鏈接輸出到日志文件里。然后,可以通過支持磁力鏈接的程序(例如QQ旋風)直接下載。有趣的是,當QQ旋風開始下載該磁力鏈接對應的種子文件時,你自己的測試程序能收到QQ旋風程序發出的announce_peer消息。當然,你得想辦法讓這個測試程序盡可能地讓其他節點知道你,這可以通過很多方式實現。

            總結

            最終的DHT爬蟲除了需要實現DHT協議之外,還需要實現P2P文件下載協議,甚至包括一個種子文件解析模塊。看起來包含不小的工作量。而如果要包裝為一個資源搜索引擎,還會涉及到資源存儲以及搜索,更別說前端呈現了。這個時候,如果你使用的語言不包含這些工具庫,那實在是太悲劇了。沒錯,我就沒找到一個erlang DHT庫(倒是有erlang實現的BT客戶端,懶得去剝了)。

            UPDATE

            通過詳細閱讀transmission里的DHT實現,一些之前的疑惑隨之解開。

            announce_peer會發給哪些節點

            在一次對infohash的查詢過程中,所有對本節點發出的get_peer作出回應的節點(不論這個回應節點回應的是nodes還是peers),當本節點取得peer信息時,就會對所有這些節點發出announce_peer。get_peer的回應消息里,不論是peer還是node,都會攜帶一個token,這樣在將來收到對方的announce_peer時,就可以驗證該token。

            節點和bucket狀態

            在本地的路由表中,保存的node是有狀態之分的。狀態分為三種:good/dubious/bad。good節點基本可以斷定該節點是一個在線的并且可以正常回應消息的節點;而bad節點則是可確定的無效節點,通常會盡快從路由表中移除;而dubious則是介于good和bad節點之間,表示可能有問題的節點,需要進一步發送例如ping消息來確認其狀態。路由表中應該盡可能保證保存的是good節點,對查詢消息的回應里也需攜帶好的節點。

            bucket也是有狀態的,當一個bucket中的所有節點在一定時間之內都沒有任何活動的時候,該bucket則應該考慮進行狀態的確認,確認方式可以隨機選擇該bucket中的節點進行find_node操作(這也是find_node除了用于啟動之外的唯一作用,但具體實現不見得使用這種方式)。沒有消息來往的bucket則應該考慮移除。DHT中幾乎所有操作都會涉及到bucket的索引,如果索引到一個所有節點都有問題的bucket,那么該操作可能就無法完成。

            search在何時停止

            首先,某次發起的search,無論是對node還是對peer,都可能導致進一步產生若干個search。這些search都是基于transaction id來標識的。由一次search導致產生的所有子search都擁有相同的transaction id,以使得在該search成功或失敗時可以通過該transaction id來刪除對應的所有search。transaction id也就是DHT中每個消息消息頭”t”的值。

            但是search何時停止?transmission中是通過超時機制來停止。在search過程中,如果長時間沒有收到跟該search關聯的節點發來的回應消息,那么就撤銷該search,表示搜索失敗。

            參考資料

            posted @ 2013-05-19 21:51 Kevin Lynx 閱讀(13761) | 評論 (0)編輯 收藏

            Erlang使用感受

            用erlang也算寫了些代碼了,主要包括使用RabbitMQ的練習,以及最近寫的kl_tservericerl。其中icerl是一個實現了Ice的erlang庫。

            erlang的書較少,我主要讀過<Programming Erlang>和<Erlang/OTP in Action>。其實erlang本身就語言來說的話比較簡單,同ruby一樣,類似這種本身目標是應用于實際軟件項目的語言都比較簡單,對應的語法書很快可以翻完。

            這里我僅談談自己在編寫erlang代碼過程中的一些感受。

            語法

            erlang語法很簡單,接觸過函數式語言的程序員上手會很快。它沒有類似common lisp里宏這種較復雜的語言特性。其語法元素很緊湊,不存在一些用處不大的特性。在這之前,我學習過ruby和common lisp。ruby代碼寫的比common lisp多。但是在學習erlang的過程中我的腦海里卻不斷出現common lisp里的語法特性。這大概是因為common lisp的語法相對ruby來說,更接近erlang。

            編程模式

            erlang不是一個面向對象的語言,它也不同common lisp提供多種編程模式。它的代碼就是靠一個個函數組織出來的。面向對象語言在語法上有一點讓我很爽的是,其函數調用更自然。erlang的接口調用就像C語言里接口的調用一樣:

            func(Obj, args)
            Obj->func(args)
            

            即需要在函數第一個參數傳遞操作對象。但是面向對象語言也會帶來一些語法的復雜性。如果一門語言可以用很少的語法元素表達很多信息,那么我覺得這門語言就是門優秀的語言。

            表達式/語句

            erlang里沒有語句,全部是表達式,意思是所有語法元素都是有返回值的。這實在太好了,全世界都有返回值可以讓代碼寫起來簡單多了:

                Flag = case func() of 1 -> true; 0 -> false end, 
            

            命名

            我之所以不想寫一行python代碼的很大一部分原因在于這門語言居然要求我必須使用代碼縮進來編程,真是不敢相信。erlang里雖然沒有此規定,卻也有不同的語法元素有大小寫的限定。變量首字母必須大寫,atom必須以小寫字母開頭,更霸氣的是模塊命名必須和文件名相同。

            變量

            erlang里的變量是不可更改的。實際上給一個變量賦值,嚴格來說應該叫bound,即綁定。這個特性完全就是函數式語言里的特性。其帶來的好處就像函數式語言宣揚的一樣,這會使得代碼沒有副作用(side effect)。因為程序里的所有函數不論怎樣調用,其程序狀態都不會改變,因為變量無法被改變。

            變量不可更改,直接意味著全局變量沒有存在的意義,也就意味著不論你的系統是多么復雜地被構建出來,當系統崩潰時,其崩潰所在位置的上下文就足夠找到問題。

            但是變量不可改變也會帶來一些代碼編寫上的不便。我想這大概是編程思維的轉變問題。erlang的語法特性會強迫人編寫非常短小的函數,你大概不愿意看到你的函數實現里出現Var1/Var2/Var3這樣的變量,而實際上這樣的命名在命令式語言里其實指的是同一個變量,只不過其值不同而已。

            但是我們的程序總是應該有狀態的。在erlang里我們通過不斷創建新的變量來存儲這個狀態。我們需要通過將這個狀態隨著我們的程序流程不斷地通過函數參數和返回值傳遞下去。

            atom

            atom這個語法特性本身沒問題,它就同lisp里的atom一樣,沒什么意義,就是一個名字。它主要用在增加代碼的可讀性上。但是這個atom帶來的好處,直接導致erlang不去內置諸如true/false這種關鍵字。erlang使用true/false這兩個atom來作為boolean operator的返回值。但erlang里嚴格來說是沒有布爾類型的。這其實沒什么,糟糕的是,對于一些較常見的函數返回值,例如true/false,erlang程序員之間就得做約定。要表示一個函數執行失敗了,我可以返回false、null、failed、error、nil,甚至what_the_fuck,這一度讓我迷惘。

            list/tuple

            erlang里的list當然沒有lisp里的list牛逼,別人整個世界就是由list構成的。在一段時間里,我一直以為list里只能保存相同類型的元素,而tuple才是用于保存不同類型元素的容器。直到有一天我發現tuple的操作不能滿足我的需求了,我才發現list居然是可以保存不同類型的。

            list相對于tuple而言,更厲害的地方就在于頭匹配,意思是可以通過匹配來拆分list的頭和剩余部分。

            匹配(match)

            erlang的匹配機制是個好東西。這個東西貫穿了整個語言。在我理解看來,匹配機制減少了很多判斷代碼。它試圖用一個期望的類型去匹配另一個東西,如果這個東西出了錯,它就無法完成這個匹配。無法完成匹配就導致程序斷掉。

            匹配還有個方便的地方在于可以很方便地取出record里的成員,或者tuple和list的某個部分,這其實增強了其他語法元素的能力。

            循環

            erlang里沒有循環語法元素,這真是太好了。函數式語言里為什么要有循環語法呢?common lisp干毛要加上那些復雜的循環(宏),每次我遇到需要寫循環的場景時,我都誠惶誠恐,最后還是用遞歸來解決。

            同樣,在erlang里我們也是用函數遞歸來解決循環問題。甚至,我們還有list comprehension。當我寫C++代碼時,我很不情愿用循環去寫那些容器遍歷代碼,幸運的是在C++11里通過lambda和STL里那些算法我終于不用再寫這樣的循環代碼了。

            if/case/guard

            erlang里有條件判定語法if,甚至還有類似C語言里的switch…case。這個我一時半會還不敢評價,好像haskell里也保留了if。erlang里同haskell一樣有guard的概念,這其實是一種變相的條件判斷,只不過其使用場景不一樣。

            進程

            并發性支持屬于erlang的最大亮點。erlang里的進程概念非常簡單,基于消息機制,程序員從來不需要擔心同步問題。每個進程都有一個mailbox,用于緩存發送到此進程的消息。erlang提供內置的語法元素來發送和接收消息。

            erlang甚至提供分布式支持,更酷的是你往網絡上的其他進程發送消息,其語法和往本地進程發送是一樣的。

            模塊加載

            如果我寫了一個erlang庫,該如何在另一個erlang程序里加載這個庫?這個問題一度讓我迷惘。erlang里貌似有對庫打包的功能(.ez?),按理說應該提供一種整個庫加載的方式,然后可以通過手動調用函數或者指定代碼依賴項來加載。結果不是這樣。

            erlang不是按整個庫來加載的,因為也沒有方式去描述一個庫(應該有第三方的)。當我們調用某個模塊里的函數時,erlang會自動從某個目錄列表里去搜索對應的beam文件。所以,可以通過在啟動erlang添加這個模塊文件所在目錄來實現加載,這還是自動的。當然,也可以在erlang shell里通過函數添加這個目錄。

            OTP

            使用erlang來編寫程序,最大的優勢可能就是其OTP了。OTP基本上就是一些隨erlang一起發布的庫。這些庫中最重要的一個概念是behaviour。behaviour其實就是提供了一種編程框架,應用層提供各種回調函數給這個框架,從而獲得一個健壯的并發程序。

            application behaviour

            application behaviour用于組織一個erlang程序,通過一個配置文件,和提供若干回調,就可以讓我們編寫的erlang程序以一種統一的方式啟動。我之前寫的都是erlang庫,并不需要啟動,而是提供給應用層使用,所以也沒使用該behaviour。

            gen_server behaviour

            這個behaviour應該是使用頻率很高的。它封裝了進程使用的細節,本質上也就是將主動收取消息改成了自動收取,收取后再回調給你的模塊。

            supervisor behaviour

            這個behaviour看起來很厲害,通過對它進行一些配置,你可以把你的并發程序里的所有進程建立成樹狀結構。這個結構的牛逼之處在于,當某個進程掛掉之后,通過supervisor可以自動重新啟動這個掛掉的進程,當然重啟沒這么簡單,它提供多種重啟規則,以讓整個系統確實通過重啟變成正常狀態。這實在太牛逼了,這意味著你的服務器可以7x24小時地運行了,就算有問題你也可以立刻獲得一個重寫工作的系統。

            熱更新

            代碼熱更新對于一個動態語言而言其實根本算不上什么優點,基本上動態語言都能做到這一點。但是把熱更新這個功能加到一個用于開發并發程序的語言里,那就很牛逼了。你再一次可以確保你的服務器7x24小時不停機維護。

            gen_tcp

            最開始我以為erlang將網絡部分封裝得已經認不出有socket這個概念了。至少,你也得有一個牛逼的網絡庫吧。結果發現依然還是socket那一套。然后我很失望。直到后來,發現使用一些behaviour,加上調整gen_tcp的一些option,居然可以以很少的代碼寫出一個維護大量連接的TCP服務器。是啊,erlang天生就是并發的,在傳統的網絡模型中,我們會覺得使用one-thread-per-connection雖然簡單卻不是可行的,因為thread是OS資源,太昂貴。但是在erlang里,one-process-per-connection卻是再自然不過的事情。你要是寫個erlang程序里面卻只有一個process你都不好意思告訴別人你寫的是erlang。process是高效的(對我們這種二流程序員而言),它就像C++里一個很普通的對象一樣。

            在使用gen_tcp的過程中我發現一個問題,不管我使用哪一種模型,我竟然找不到一種溫柔的關閉方式。我查看了幾個tutorial,這些混蛋竟然沒有一個人提到如何去正常關閉一個erlang TCP服務器。后來,我沒有辦法,只好使用API強制關閉服務器進程。

            Story

            其實,我和erlang之間是有故事的。我并不是這個月開始才接觸erlang。早在2009年夏天的時候我就學習過這門語言。那時候我還沒接觸過任何函數式語言,那時候lua里的閉包都讓我覺得新奇。然后無意間,我莫名其妙地接觸了haskell(<Real World Haskell>),在我決定開始寫點什么haskell練習時,我發現我無從下手,最后,Monads把我嚇哭了。haskell實在太可怕了。

            緊接著我懷揣著對函數式語言的濃烈好奇心看到了erlang。當我看到了concurrent programming的章節時,在一個燥熱難耐的下午我的領導找到了我,同我探討起erlang對我們的網游服務器有什么好處。然后,我結束我了的erlang之旅。

            時隔四年,這種小眾語言,居然進入了中國程序員的視野,并被用于開發網頁游戲服務器。時代在進步,我們總是被甩在后面。

            posted @ 2013-05-09 21:24 Kevin Lynx 閱讀(5474) | 評論 (0)編輯 收藏

            erlang和RabbitMQ學習總結

            AMQP和RabbitMQ概述

            AMQP(Advanced Message Queue Protocol)定義了一種消息系統規范。這個規范描述了在一個分布式的系統中各個子系統如何通過消息交互。而RabbitMQ則是AMQP的一種基于erlang的實現。

            AMQP將分布式系統中各個子系統隔離開來,子系統之間不再有依賴。子系統僅依賴于消息。子系統不關心消息的發送者,也不關心消息的接受者。

            AMQP中有一些概念,用于定義與應用層的交互。這些概念包括:message、queue、exchange、channel, connection, broker、vhost。

            注:到目前為止我并沒有打算使用AMQP,所以沒有做更深入的學習,僅為了找個機會寫寫erlang代碼,以下信息僅供參考。

            • message,即消息,簡單來說就是應用層需要發送的數據
            • queue,即隊列,用于存儲消息
            • exchange,有翻譯為“路由”,它用于投遞消息,應用程序在發送消息時并不是指定消息被發送到哪個隊列,而是將消息投遞給路由,由路由投遞到隊列
            • channel,幾乎所有操作都在channel中進行,有點類似一個溝通通道
            • connection,應用程序與broker的網絡連接
            • broker,可簡單理解為實現AMQP的服務,例如RabbitMQ服務

            關于AMQP可以通過一篇很有名的文章了解更多:RabbitMQ+Python入門經典 兔子和兔子窩

            RabbitMQ的運行需要erlang的支持,erlang和RabbitMQ在windows下都可以直接使用安裝程序,非常簡單。RabbitMQ還支持網頁端的管理,這需要開啟一些RabbitMQ的插件,可以參考官方文檔

            RabbitMQ本質上其實是一個服務器,與這個服務器做交互則是通過AMQP定義的協議,應用可以使用一個實現了AMQP協議的庫來與服務器交互。這里我使用erlang的一個客戶端,對應著RabbitMQ的tutorial,使用erlang實現了一遍。基于這個過程我將一些關鍵實現羅列出來以供記憶:

            主要功能使用

            關于RabbitMQ erlang client的使用說明可以參考官方文檔。這個client library下載下來后是兩個ez文件,其實就是zip文件,本身是erlang支持的庫打包格式,但據說這個feature還不成熟。總之我是直接解壓,然后在環境變量中指定ERL_LIBS到解壓目錄。使用時使用include_lib包含庫文件(類似C語言里的頭文件):

                -include_lib("amqp_client/include/amqp_client.hrl").
            

            Connection/Channel

            對于連接到本地的RabbitMQ服務:

                {ok, Connection} = amqp_connection:start(#amqp_params_network{}),
                {ok, Channel} = amqp_connection:open_channel(Connection),
            

            創建Queue

            每個Queue都有名字,這個名字可以人為指定,也可以由系統分配。Queue創建后如果不顯示刪除,斷開網絡連接是不會自動刪除這個Queue的,這個可以在RabbitMQ的web管理端看到。

                #'queue.declare_ok'{queue = Q}
                    = amqp_channel:call(Channel, #'queue.declare'{queue = <<"rpc_queue">>}),
            

            但也可以指定Queue會在程序退出后被自動刪除,需要指定exclusive參數:

                QDecl = #'queue.declare'{queue = <<>>, exclusive = true},
                #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, QDecl),
            

            上例中queue的名字未指定,由系統分配。

            發送消息

            一般情況下,消息其實是發送給exchange的:

                Payload = <<"hello">>
                Publish = #'basic.publish'{exchange = <<"log_exchange">>},
                amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),
            

            exchange有一系列規則,決定某個消息將被投遞到哪個隊列。

            發送消息時也可以不指定exchange,這個時候消息的投遞將依賴于routing_keyrouting_key在這種場景下就對應著目標queue的名字:

                #'queue.declare_ok'{queue = Q}
                    = amqp_channel:call(Channel, #'queue.declare'{queue = <<"rpc_queue">>}),
                Payload = <<"hello">>,
                Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
                amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),
            

            接收消息

            可以通過注冊一個消息consumer來完成消息的異步接收:

                Sub = #'basic.consume' {queue = Q},
                #'basic.consume_ok'{consumer_tag = Tag} = amqp_channel:subscribe(Channel, Sub, self()),
            

            以上注冊了了一個consumer,監聽變量Q指定的隊列。當有消息到達該隊列時,系統就會向consumer進程對應的mailbox投遞一個通知,我們可以使用receive來接收該通知:

                loop(Channel) ->
                    receive 
                        % This is the first message received (from RabbitMQ)
                        #'basic.consume_ok'{} -> 
                            loop(Channel);
                        % a delivery
                        {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Payload}} ->
                            echo(Payload),
                            % ack the message
                            amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
                            loop(Channel);
                    ...
            

            綁定exchange和queue

            綁定(binding)其實也算AMQP里的一個關鍵概念,它用于建立exchange和queue之間的聯系,以方便exchange在收到消息后將消息投遞到隊列。我們不一定需要將隊列和exchange綁定起來。

                Binding = #'queue.bind'{queue = Queue, exchange = Exchange, routing_key = RoutingKey},
                #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)
            

            在綁定的時候需要填入一個routing_key的參數,不同類型的exchange對該值的處理方式不一樣,例如后面提到fanout類型的exchange時,就不需要該值。

            更多細節

            通過閱讀RabbitMQ tutorial,我們還會獲得很多細節信息。例如exchange的種類、binding等。

            exchange分類

            exchange有四種類型,不同類型決定了其在收到消息后,該如何處理這條消息(投遞規則),這四種類型為:

            • fanout
            • direct
            • topic
            • headers

            fanout類型的exchange是一個廣播exchange,它在收到消息后會將消息廣播給所有綁定到它上面的隊列。綁定(binding)用于將隊列和exchange關聯起來。我們可以在創建exchange的時候指定exchange的類型:

                Declare = #'exchange.declare'{exchange = <<"my_exchange">>, type = <<"fanout">>}
                #'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare)
            

            direct類型的exchange在收到消息后,會將此消息投遞到發送消息時指定的routing_key和綁定隊列到exchange上時的routing_key相同的隊列里。可以多次綁定一個隊列到一個exchange上,每次指定不同的routing_key,就可以接收多種routing_key類型的消息。注意,綁定隊列時我們可以填入一個routing_key,發送消息時也可以指定一個routing_key

            topic類型的exchange相當于是direct exchange的擴展,direct exchange在投遞消息到隊列時,是單純的對routing_key做相等判定,而topic exchange則是一個routing_key的字符串匹配,就像正則表達式一樣。在routing_key中可以填入一種字符串匹配符號:

            * (star) can substitute for exactly one word.
            # (hash) can substitute for zero or more words.
            

            header exchange tutorial中未提到,我也不深究

            消息投遞及回應

            每個消息都可以提供回應,以使RabbitMQ確定該消息確實被收到。RabbitMQ重新投遞消息僅依靠與consumer的網絡連接情況,所以只要網絡連接正常,consumer卡死也不會導致RabbitMQ重投消息。如下回應消息:

                amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
            

            其中Tag來源于接收到消息時里的Tag

            如果有多個consumer監聽了一個隊列,RabbitMQ會依次把消息投遞到這些consumer上。這里的投遞原則使用了round robin方法,也就是輪流方式。如前所述,如果某個consumer的處理邏輯耗時嚴重,則將導致多個consumer出現負載不均衡的情況,而RabbitMQ并不關心consumer的負載。可以通過消息回應機制來避免RabbitMQ使用這種消息數平均的投遞原則:

                Prefetch = 1,
                amqp_channel:call(Channel, #'basic.qos'{prefetch_count = Prefetch})
            

            消息可靠性

            RabbitMQ可以保證消息的可靠性,這需要設置消息和隊列都為durable的:

                #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello_queue">>, durable = true}),
            
                Payload = <<"foobar">>,
                Publish = #'basic.publish'{exchange = "", routing_key = Queue},
                Props = #'P_basic'{delivery_mode = 2}, %% persistent message
                Msg = #amqp_msg{props = Props, payload = Payload},
                amqp_channel:cast(Channel, Publish, Msg)
            

            參考

            除了參考RabbitMQ tutorial外,還可以看看別人使用erlang是如何實現這些tutorial的,github上有一個這樣的項目:rabbitmq-tutorials。我自己也實現了一份,包括rabbitmq-tutorials中沒實現的RPC。后來我發現原來rabbitmq erlang client的實現里已經包含了一個RPC模塊。

            posted @ 2013-04-12 21:27 Kevin Lynx 閱讀(8668) | 評論 (0)編輯 收藏

            淺析軟件工程開發方法學RUP

            前言

            因為之前一直處在游戲開發行業,由于種種原因一直對軟件工程中的項目管理、項目開發方法缺乏體驗。雖然項目中也曾倡導編寫更多的文檔,無論是模塊說明文檔還是設計文檔,但效果一直不好。不甚理想的地方主要體現在文檔的規范性欠缺、不統一、浮于表面沒有實質內容。文檔的編寫缺乏詳盡的方法指導,那么所謂的設計文檔要么是用來敷衍上級要么就是隨著開發人員的水平不一而千差萬別。

            當我開始目前這個非游戲項目時,我也曾想,前期做好結構設計,制定好關鍵問題的解決方案,那么要完成這個項目就不在話下了。但是我很快就面臨了一個問題:需求不定。回想身處游戲公司的那些日子,程序員總是抱怨策劃需求變更過快過多,在每一次策劃提出一個需求變更時,謹慎的程序員都會再三讓策劃保證:放心,不會變了。而我面臨的問題則更為嚴峻。我意識到,項目的需求,就連用戶也無法一一羅列出來。我們需要的是需求調研。但就算你將客戶的所有需求全部挖掘出來后(這幾乎不可能,因為他們自己也不太清楚自己想要什么),當你交付了第一個軟件版本,幾乎可以肯定客戶會提出一大堆的需求變更:我要的不是這個,我要的那個怎么沒有,哦,我當初以為你說的是另一個意思。

            當然,需求調研這種工作不是讓程序員去做的(那會更悲劇,無論是對客戶還是對程序員而言,他們都是在對牛彈琴)。但需求的不確定性也總是存在的。

            事實上,需求變化本身就是一個很正常的現象。我一向愿意更悲觀地處理軟件開發方面的問題,所謂小心使得萬年船。基于此,我決定擺好心態學學軟件開發的方法學。

            概要

            本文簡要描述、總結了RUP開發方法學的主要內容,結合我自己的感受闡述了一些RUP的核心原則。我相信我所理解的內容是膚淺的,對于非代碼的表達我更相信其是存在歧義的。所以本文僅當是一種經驗參考,不必當真。

            RUP據傳是用于指導大型甚至超大型項目開發的,我們做的不是這樣規模的項目。但是我們需要記錄下整個項目的開發過程,通過這個過程中產出的工件任何一個人可以看出這個項目是如何實現出來的,其目的在于規避唯有從海量代碼中才能熟悉項目實現這種問題。這里出現了一個概念:工件,其指的是軟件項目開發過程中任何留下記錄的事物,例如文檔、圖、代碼等。RUP的一個重要思想,在于其整個軟件開發過程都是可推導的。例如我們通常說的軟件架構,或小一點的模塊結構,都是通過開發過程中前面階段產出的工件推導得出,而不是憑借程序員的經驗拍腦袋想出來的(經驗不太可靠,并且千差萬別,而推導意味著將每個環節變得可靠)。我們借助RUP的這個特性,創建這些工件,用以建立起軟件實現的可靠知識庫。

            RUP概覽

            以下均摘自<Thinking in UML>中對RUP的描述:

            統一過程歸納和集成了軟件開發活動中的最佳實踐,它定義了軟件開發過程中最重要的階段和工作(四個階段和九個核心工作流),定義了參與軟件開發過程的各種角色和他們的職責,還定義了軟件生產過程中產生的工件,并提供了模板。最后,采用演進式軟件生命周期(迭代)將工作、角色和工件串在一起,形成了統一過程。

            統一過程是一種追求穩定的軟件方法,它追求開發穩定架構,控制變更

            統一過程集成了面向對象方法、UML語言、核心工作流、工件模板和過程指導等知識

            簡單來說,RUP作為一種軟件項目開發方法學,它定義了軟件開發的每一個過程,最重要的是它指導了在每一個過程需要產出什么,這些產出又是怎樣得到。它試圖規范化整個流程,以規避需求變更,項目參與者水平不一等帶來的項目不可控等問題,以期一個軟件產品穩定地開發出來。在一個項目開發過程中,最核心的資源是人,最不可控的亦是人。

            RUP過程與實踐

            我覺得要快速學習一種知識,需要首先獲得這門知識的總體框架。另一方面,在我們獲得更多信息后,我們需要挖掘出這門知識的核心思想。學習RUP我覺得從這兩方面入手是相對比較快速和有效的手段。

            RUP框架

            RUP定義了軟件開發過程的四個階段,以及9個工作流程。一張極為經典的RUP開發過程框架圖如下:

            rup

            RUP將整個軟件開發過程分為四個階段:

            • 先啟(Inception)、
            • 精化(Elaboration)
            • 構建(Construction)
            • 產品化(Transition)

            每一個階段的工作分為9個流程:

            • 業務建模
            • 需求
            • 分析設計
            • 實施
            • 測試
            • 部署
            • 配置與變更管理
            • 項目管理
            • 環境

            其中,前6個流程被統稱為”engineering disciplines”,后3個流程被稱為”supporting disciplines”。當然,我們主要關注前6個流程。那么,這些工作流程和開發階段又有什么關系呢?上圖中其實已經闡明了這些關系。

            RUP指導迭代開發。在軟件開發的這4個階段中,每一個階段會被分為若干次迭代。而每一次迭代則涵蓋了這9個工作流程。隨著開發階段向產品化靠近,自然而然地,需求的變更、增加自然會減少,所以從圖中可以看出,開發過程越到后期,其工作流程中關于需求的工作則越少。同樣,在先啟階段,其需求相關的工作則占據了該階段的主要工作內容。

            RUP中的迭代要求在每一次迭代中,都會完整地實施一遍整個工作流程。在軟件實施階段,甚至會在每一個迭代過程完后輸出一個可運行的軟件版本。這個版本可能會被交付給客戶,以期進一步地在功能需求上取得與客戶一致的意見。這倒是同敏捷開發有點類似。

            既然制定了工作流程,那每一個工作流程該如何去實施呢?RUP制定了每個工作流程需要參與的角色,這些角色需要從事的活動,以及這些活動產生的工件。

            這句話實際上反映了RUP的一個重要信息,摘自wiki:

            RUP is based on a set of building blocks, or content elements, describing what is to be produced, the necessary skills required and the step-by-step explanation describing how specific development goals are to be achieved. The main building blocks, or content elements, are the following:

            • Roles (who) – A Role defines a set of related skills, competencies and responsibilities.
            • Work Products (what) – A Work Product represents something resulting from a task, including all the documents and models produced while working through the process.
            • Tasks (how) – A Task describes a unit of work assigned to a Role that provides a meaningful result.

            RUP建模

            在我看來,RUP每個工作流程所完成的工作,就是一個建模的過程。所謂建模,簡單來說就是將需要描述的事物通過更系統的形式表達出來,以期獲得對該事物更深入的理解。<Thinking in UML>中定義建模概念為:

            建模(modeling),是指通過對客觀事物建立一種抽象的方法用以表征事物并獲得對事物本身的理解,同時把這種理解概念化,將這些邏輯概念組織起來,構成一種對所觀察的對象的內部結構和工作原理的便于理解的表達。

            在這里,建模的過程需要使用一些工具。在RUP中建模使用UML來完成。在<Thinking in UML>中講述了UML的核心模型,包括:

            • 業務用例模型
            • 概念用例模型
            • 系統用例模型
            • 領域模型
            • 分析模型
            • 軟件架構和框架
            • 設計模型
            • 組件模型
            • 實施模型

            可能在大家的普遍認識中,UML無非就是幾種圖,并且粗看一眼理解起來也不困難,甚至還能用來畫畫類圖做下代碼結構設計。但UML的作用不僅僅如此。

            以上所描述的UML核心模型中,每個模型并不單指的的是一種UML圖。每個模型實際上都會包含幾種UML圖,會包含若干張UML圖。這些模型基本上滲透于RUP的9個工作流程中,只不過不同的工作流程使用的模型比重不一而已。

            例如在“分析設計”工作流程中,可能會使用到系統用例模型、分析模型、軟件架構和框架、設計模型等,而業務用例模型可能在這個流程中根本不會用到;相反,業務用例模型則可能在“業務建模”流程中被廣泛使用。

            前已述及在RUP的每個工作流程中,RUP定義了該流程需要參與的角色,以及這些角色需要進行的活動,例如這里可以看看“分析設計”流程中的角色和活動集(摘自<Thinking in UML>):

            analyse-action

            相應的,在該工作流程中需要產出的工件集為(摘自<Thinking in UML>):

            analyse-ar

            既然使用了UML作為建模工具,所以可以簡單地說這些工件主要就是UML圖,當然也會有其他文檔性質的事物,例如網絡協議結構、數據庫表等UML無法描述的東西則通過普通文字性文檔描述。

            RUP最佳實踐

            到目前為止我們已經了解到RUP定義了開發過程(phase),定義了每個過程包含的若干工作流程,還定義了每個工作流程需要哪些角色從事哪些活動來完成哪些工件。除此之外,RUP還提供了6條最佳實踐用以指導軟件開發:

            • 迭代開發
            • 管理需求
            • 使用基于組件的架構
            • 可視建模
            • 持續的質量驗證
            • 控制變更

            這些實踐在我看來僅僅是一些項目開發的指導原則,它們滲透到每一個過程,每一個工作流程。在項目過程中實踐這些原則,用以確保項目的成功。例如我們使用UML建模,以達到“可視建模”。我們通過建立需求用例,以“管理需求”。

            RUP核心思想

            似乎沒有文檔來專門闡述RUP的核心思想,但我覺得掌握其核心思想才是學習的要點所在。要理解一種軟件開發方法學的核心思想,其實最好是將其與別的方法學做比較。這里先就我的一些感想做闡述。

            用例驅動

            用例驅動指的是整個軟件項目的推進過程,是依靠“用例”來完成。<Thinking in UML>:

            在實際的軟件項目中,一個軟件要實現的功能通過用例來捕獲,接下來的所有分析、設計、實現、測試都由用例來取得,即以實現用例為目標。在統一過程中用例能夠驅動的不僅僅是分析設計。

            用例簡單來說就是描述了一個系統功能。但必須注意的是,這僅僅是它定義的一小部分。用例主要分布在“業務建模”、“需求”、“分析設計”這些工作過程中。在不同的過程中用例的粒度和性質都不一樣。例如對于一個借書系統而言,在業務建模階段,我們可以獲取出一個“借書”用例,其系統邊界甚至不是系統而可能僅關注這個業務本身(因為這個階段還沒有考慮到計算機如何實現這個借書業務);在系統分析階段,我們就可以將“借書”這個用例細化為用戶和計算機軟件系統的交互;進一步地,我們可能會進一步精化這個用例,例如用戶通過網頁終端“借書”。(這里描述了很多建模的細節,可不必深究,本文只給出一個概要性的介紹)

            我們說用例驅動軟件開發,但它如何驅動的呢?我在實際的建模過程中,最明顯的感受就是用例驅動了整個建模過程。

            • 在需求分析階段,我以系統使用者的角度繪制出了一份用例圖,用于表達使用者對該系統的需求
            • 然后我繪制序列圖(活動圖等)來實現這些用例,也就是闡述使用者具體是如何與系統交互的
            • 從之前的建模過程中我獲得對系統功能需求方面的認識
            • 基于前面的分析我可以繪制出系統用例圖,以明確系統的各個功能需求
            • 同樣針對用例繪制用例實現圖
            • 用例本身應該包含更多的文檔,因此我編寫用例規約用以詳細描述各個用例
            • 從用例規約、用例實現中我可以抽離出一些分析類(較設計類更高抽象的類),包含用例場景中涉及到的實體,控制邏輯
            • 細化這些分析類,將分析類組織起來形成系統,我會用界面類去銜接各個控制類
            • 將得到的分析類按模塊來劃分,從而可以得到一個初步的系統架構
            • 初步考慮系統實現,我甚至會得到一個初步是的系統部署圖
            • 回過頭不斷審視系統用例,以確認我是否實現了所有用例,這可以保證我的分析實現了所有需求,我不用枚舉所有系統特性是否被我考慮周全,我僅需在已有用例圖中核實
            • 基于模塊實現各個用例,或者基于分析類來實現系統用例
            • 通過重新繪制以及核實用例,可以進一步精化分析類,分析類在很大程度上會一一對應到設計類,而設計類則對應到實際的代碼
            • 可以進入設計階段,設計階段會考慮到系統的實現細節,例如使用的語言,使用的框架等

            進入設計階段后,雖然可以進一步建模,得到會直接映射到代碼的類圖、序列圖等,但這樣的圖在面臨需求變更時,基本上會面臨修改,這意味著維護這些文檔需要耗費精力。所以,<Thinking in UML>中主張將精力放在維護分析類模型中,而通過其他約定實現從分析類到實際代碼的轉換。我覺得這個也在理。

            規范化整個過程

            我個人覺得RUP的一大特點在于規范化了整個軟件開發過程,每一個步驟需要哪些角色參與,該干什么,怎么去干都有指導。加之這些活動的”可推導性“,這意味著不論參與角色屬于什么水平,都可以穩固地推進項目進程。

            此外,這種規范化也會給項目留下詳細的演化過程。你可以明確地看到整個軟件是如何演化出最終的產品代碼,可以深入地理解項目代碼中的設計。

            總結

            我只是一個RUP新手,即便如此,我依然不覺得RUP是軟件開發的萬能藥。我相信任何軟件開發方法都是有局限性的。我們在實際使用的時候也只是吸取其精華。不同的開發方法其適用范圍也是不一樣的。正如有人將RUP和XP做比較時說,如果你使用RUP去開一個雜貨鋪,在沒開張之前你就已經破產了;同樣如果你用XP去做飛機,飛機毀了十來次也許才能做出來(from <Thinking in UML> again)。

            參考文檔

            posted @ 2013-03-21 21:41 Kevin Lynx 閱讀(3099) | 評論 (0)編輯 收藏

            分布式程序開發平臺ICE概覽

            本文基于ICE Manual及相關文檔就ICE的一些主要特性做一個概覽,它不是一個tutorial,不是一個guid,更不是manual。

            概覽

            ICE,Internet Communications Engine,按照官方介紹,是一個支持C++、.Net、Java、Python、Objective-C、Ruby、PHP及ActionScript等語言的分布式程序開發平臺。按照我的理解,簡單來說它是一個核心功能包裝RPC的庫。要把這個RPC包裝得漂亮,自然而然,對于使用者而言,調用一個遠端的接口和調用一個本地的接口沒有什么區別,例如:

                Object *obj = xxx
                obj->sayHello(); 
            

            ICE包裝sayHello接口,當應用層調用該接口時,ICE發送調用請求到遠端服務器,接收返回結果并返回給應用層。ICE在接口提供方面,做到了這一點。

            以下,我將逐個給出ICE中的一些工具、組件、特性說明,以展示它作為一個分布式程序開發平臺所擁有的能力。到目前為止,所有這些信息均來自于ICE相關文檔,總結出來權當為自己服務。

            Slice

            Slice(Specification Language for Ice)是ICE定義的一種中間語言,其語法類似于C++。對于一個RPC過程而言,例如上面調用遠端的sayHello接口,其主要涉及到調用這個接口的參數和返回值傳遞,當然接口本身的傳遞不在話下,ICE為了包裝這個過程,其使用了這樣一種方式:使用者使用Slice語言描述RPC過程中調用的接口,例如該接口屬于哪個類,該接口有哪些參數哪些返回值;然后使用者使用ICE提供的Slice編譯器(實際上是一個語言翻譯程序)將Slice源碼翻譯成目標語言。而這個目標語言,則是使用者開發應用程序的開發語言,即上文提到的C++、.Net、Java等。

            這些翻譯出來的目標代碼,就封裝了sayHello底層實現的一切細節。當然事情沒有這么簡單,但我們目前只需關注簡單的這一部分。ICE之所以支持那么多種開發語言,正是Slice立下的功勞。Slice語言本身的語言特性,實際上受限于目標語言的語言特性,例如Slice支持異常,恰是因為Slice轉換的所有語言都包含異常這個語法特性。

            Slice還有一個重要特性,在于一份Slice源碼被翻譯出來的目標代碼,一般情況是被服務器和客戶端同時使用。

            開發步驟

            使用ICE開發應用程序,其步驟遵循:

            1. 編寫Slice,說明整個RPC中涉及到的接口調用,編譯它
            2. 基于Slice目標代碼和ICE庫編寫Server
            3. 基于Slice目標帶啊和ICE庫編寫Client

            一個例子

            有必要展示一個例子,以獲得使用ICE開發應用程序的感性認識。這個例子是一個簡單的hello world程序,客戶端讓服務器打印一個字符串。

            • 編寫Slice
                // Printer.ice,Slice源碼后綴為ice
                module Demo {
                    interface Printer {
                        void printString(string s);
                    };
                };
            

            使用ICE提供的程序翻譯為C++代碼:

                $ slice2cpp Printer.ice
            

            得到Printer.cpp和Printer.h。Slice翻譯出來的目標代碼除了封裝RPC交互的一些細節外,最重要的,因為本身Slice文件其實是定義接口,但接口的實現,則需要應用層來做。

            • 服務器端使用生成的Printer.cpp/.h,并實現Printer接口
                // 翻譯出來的Printer.h中有對應于Slice中定義的Printer類,及需要實現的printString接口
                class PrinterI : public Printer {
                public:
                    virtual void printString(const string& s, const Ice::Current&) {
                        count << s << endl;
                    }
                };
            
            • 客戶端使用生成的Printer.cpp/.h,通過ICE獲得一個Printer對象,然后調用其printString接口
                // don't care about this
                PrinterPrx printer = PrinterPrx::checkedCast(base);
                printer->printString("Hello World!");
            

            使用ICE開發應用程序,整體過程即為以上展示。

            概念

            ICE包含了很多概念,作為一個開發平臺而言,有其專有術語一點不過分,熟悉這些概念可以更容易學習ICE。這里羅列一些關鍵概念。

            服務器端和客戶端

            ICE中的服務器端和客戶端和一般網絡程序中的概念不太一樣。在若干個交互的網絡程序中,我們都很好理解這樣一種現象:某個程序有多個角色,它可能是作為A程序的服務器端,也可能是作為B程序的客戶端。ICE中的服務器和客戶端角色更容易變換。

            以Printer例子為例,如果我們的printString接口有一個回調函數參數(這在ICE中很常見),服務器實現printString時,當其打印出字符串后,需通過該回調函數通知客戶端。這樣的回調機制在ICE的實現中,會創建一個新的網絡連接,而此時,這個原有的服務器端就變成了原有客戶端的客戶。當然,你也可以避免這樣的情況出現。

            ICE Objects/Object Adapter/Facet

            對于Printer例子,一個Printer對象可以被看作是一個ICE Objects。Object可以說是服務器端提供給客戶端的接口。所以在服務器端通常會創建出很多個Object。服務器端使用Object Adapter對象去保存這些Object。例如,一個典型的ICE對象在初始化時可能包含以下代碼:

                // 創建一個Object Adapter
                Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapter("Hello");
                // 創建一個Object,形如Printer
                Demo::HelloPtr hello = new HelloI;
                // 將Object加入到Object Adapter
                adapter->add(hello, communicator()->stringToIdentity("hello"));
            

            Facet是Object的一部分,或者說Object是Facet的一個集合,摘Ice manual中的一句話:

            An Ice object is actually a collection of sub-objects known as facets whose types are not necessarily related.

            Proxy

            Proxy是ICE客戶端里的概念。客戶端通過Proxy訪問服務器端上的Object,通過Proxy調用服務器端Object上提供的接口。在客戶端上一般有類似以下代碼:

                Ice::ObjectPrx base = ic->stringToProxy("SimplePrinter:default -p 10000");
                // Printer Proxy
                PrinterPrx printer = PrinterPrx::checkedCast(base);
                printer->printString("Hello World!");
            

            Proxy又分為幾種,包括:

            Direct Proxy

            Direct Proxy,這里的direct意指這個proxy訪問的object時,是否攜帶了地址(EndPoint)信息,例如上面例子中SimplePrinter:default -p 10000就是一個地址。

            Indirect Proxy

            Indirect Proxy相對Direct Proxy而言,其沒有具體的地址,僅僅是一個符號。通常包含兩種形式:

            • SimplePrinter
            • SimplePrinter@PrinterAdapter

            為了獲取真正的地址,客戶端需要一個定位服務(location service)來獲取這個符號對應的地址。ICE中提供了一些默認的服務程序,IceGrid就是其中之一,而IceGrid的作用就包括定位具體的地址,即翻譯符號地址到具體的地址。

            這里Indirect Proxy可以看作一個域名,而Direct Proxy可以看作是IP地址。Indirect Proxy使用時,就需要借助DNS翻譯得到域名對應的IP地址。

            Fixed Proxy

            由于Proxy是用于與服務器端的Object通信的,客戶端借助Proxy來訪問服務器端的Object,所以Proxy通常都會對應一個真實的網絡連接。在ICE中,一般的Proxy于網絡連接(Connection)實際上是沒有太大關聯的。一個Proxy可以沒有Connection,也可以在建立這個Connection后又斷開之。但是,ICE提供了一種特殊的Proxy,Fixed Proxy,這種Proxy緊密地與一個Connection綁定在一起,其生命周期被強制關聯起來。

            關于Fixed Proxy可以參看ICE Manual Connection Management

            其他

            • AMI

            Asynchronous Method Invocation,對于客戶端而言,用于表示某個服務器端接口是異步操作,需在Slice中使用metadata來修飾這個接口,例如:

                ["ami"]  void sayHello(int delay)
            
            • AMD

            Asynchronous method dispatch,這個針對于服務器端,同樣表示這個接口是異步操作,需在Slice中使用metadata來修飾這個接口:

                ["ami", "amd"]  void sayHello(int delay)
            

            通常對于這種異步接口而言,都需要使用Slice metadata amiamd同時修飾。

            • idempotent

            idempotent是Slice中的概念,同const一樣用于修飾某個接口的特性。idempotent表示該接口無論調用多少次,其執行結果都是相同的,例如一般的get類接口。

            • batched invocation

            客戶端調用服務器端的接口這個動作稱為invocation。就像網絡層的數據緩存一樣,ICE對于接口調用也可能暫時緩存,當多個提交請求緩存起來后,然后調用刷新接口全部刷新到服務器端,則稱為batched invocation

            服務

            ICE除了提供一個庫之外,還提供了一些應用程序。這些應用程序本身也是一些服務器,提供了一些方便的功能方便我們開發分布式程序。

            Freeze

            Freeze用于將Slice對象持久化到數據庫中,按照Manual里的說法,它應該是一個編譯器,可以生成一些持久化操作的代碼。Freeze持久化對象時使用的數據庫是Berkeley DB。

            Ice has a built-in object persistence service, known as Freeze. Freeze makes it easy to store object state in a database: you define the state stored by your objects in Slice, and the Freeze compiler generates code that stores and retrieves object state to and from a database. Freeze uses Berkeley DB as its database.

            FreezeScript有點類似于Rails中的數據庫操作工具,可用于操作持久化到數據庫中的對象數據。

            Ice also offers a tool set collectively called FreezeScript that makes it easier to maintain databases and to migrate the contents of existing databases to a new schema if the type definitions of objects change.

            IceBox

            IceBox可用于管理服務器中的動態組件。這些動態組件本質上也是提供服務的ICE程序。在形式上,這些組件可以是動態連接庫。

            IceBox is a simple application server that can orchestrate the starting and stopping of a number of application components. Application components can be deployed as a dynamic library instead of as a process.

            IceGrid

            IceGrid相當于一個DNS解析服務,可以讓服務器不用配置EndPoint,客戶端也不用指定服務器的EndPoint,以方便大量的服務器部署。在一般的應用中,我們需要為ICE服務器指定綁定的網絡地址(IP和端口),同時也需要為客戶端指定服務器端的地址信息。當服務增加到一定數量時,就會存在管理上和配置上的麻煩。而IceGrid則是用于避免這種麻煩,將服務器端和客戶端上的地址信息通過一個符號代替,就像我們把Internet上的服務器使用域名來標識一樣。

            但IceGrid的作用不僅如此,通過配合部署一系列稱為IceGrid Node的程序,IceGrid還可以管理各個服務器的啟動、關閉、宕機重啟等,其中甚至包括負載均衡。

            IceGrid provides a facility to activate servers on demand, when a client first invokes an operation. Server activation is taken care of by IceGrid nodes. You must run an IceGrid node on each machine on which you want IceGrid to start servers on demand.

            簡要介紹可以參看ICE Manual Teach Yourself IceGrid in 10 minutes

            Glacier2

            Glacier2 is a lightweight firewall traversal solution for Ice applications.

            按我的理解,Glacier2就像一個網關服務器。它被部署在服務器和客戶端之間,我們的服務器群部署在內網,外網不可訪問,然后通過Glacier2,外部網絡的客戶端就可以訪問內網的服務器群提供的服務。

            對于服務器的開發而言,使用Glacier2,服務器端不需要做任何改動。客戶端需要配置Glacier2服務的地址信息,也需要配置要使用服務器的地址信息。Glacier2通過客戶端欲訪問的服務器地址,在內網定位到真實的服務器,并轉發請求提供服務。

            Glacier2支持驗證客戶端,從這一點看來,它又有點像一個驗證服務器。通過驗證客戶端,以提供被正確授權的客戶端以完整服務。

            Glacier2的工作過程可以描述為:

            When a client invokes an operation on a routed proxy, the client connects to one of Glacier2’s client endpoints and sends the request as if Glacier2 is the server. Glacier2 then establishes an outgoing connection to the client’s intended server in the private network, forwards the request to that server, and returns the reply (if any) to the client. Glacier2 is essentially acting as a local client on behalf of the remote client.

            一個Glacier2可服務于若干個客戶端和服務器。

            詳細參看ICE Manual Glacier2

            管理

            ICE服務器可以提供給外部一定的管理功能,包括:關閉服務器、讀取服務器配置。這個功能是通過操作Ice.Admin這個Ice Object來實現的。這個Object包含兩個Facet:Process和Property,分別對應于關閉服務器和讀取服務器配置功能。

            對于需要管理服務器的客戶端而言,可以大致通過如下代碼來完成:

                // 可以通過communicator來獲取這個admin object
                Ice::ObjectPrx adminObj = ...;
                // 獲取admin object里的property facet
                Ice::PropertiesAdminPrx propAdmin = Ice::PropertiesAdminPrx::checkedCast(adminObj, "Properties");
                Ice::PropertyDict props = propAdmin->getPropertiesForPrefix("");
            

            詳細參看ICE Manual Administrative Facility

            連接管理

            前已述及,ICE中的網絡連接隱藏于Proxy之下。Proxy有兩個接口可以獲取這個連接對象:

                ice_getConnection
                ice_getCachedConnection
            

            例如:

                HelloPrx hello = HelloPrx::uncheckedCast(communicator->stringToProxy("hello:tcp -h remote.host.com -p 10000"));
                ConnectionPtr conn = hello->ice_getConnection();
            

            ICE隱藏了網絡連接的細節。當ICE發現需要建立連接時才會去建立,例如以上例子中當獲得一個Proxy時(這里是HelloPrx),ICE并不建立網絡連接,當某個時刻通過該Proxy調用服務器端的某個接口時,ICE發現對應的網絡連接沒有建立,則發起網絡連接。

            以上例子在獲取Proxy時,使用了uncheckCast,關于checkedCastuncheckedCast,也影響著網絡連接的建立邏輯:

            On the other hand, if the code were to use a checkedCast instead, then connection establishment would take place as part of the checkedCast, because a checked cast requires a remote call to determine whether the target object supports the specified interface.

            關于連接管理,ICE使用了一個稱為ACM的機制,即Active connection management。當某個連接非active一段時間后,ICE就會主動關閉此連接。應用層當然可以控制這個行為。

            詳細參看ICE Manual Connection Management

            posted @ 2013-02-15 15:24 Kevin Lynx 閱讀(6429) | 評論 (0)編輯 收藏

            僅列出標題
            共12頁: 1 2 3 4 5 6 7 8 9 Last 
            国内精品久久久久影院亚洲| 国内精品久久国产| 久久婷婷五月综合色高清| 久久国产综合精品五月天| 狠狠狠色丁香婷婷综合久久俺| 精品无码久久久久国产动漫3d| 三级韩国一区久久二区综合| 狠狠人妻久久久久久综合| 97久久精品人人澡人人爽| 久久亚洲欧美日本精品| 97久久天天综合色天天综合色hd| 午夜久久久久久禁播电影| 色综合久久久久综合体桃花网| 婷婷久久五月天| 伊人久久综合无码成人网 | 久久久久女人精品毛片| 亚洲AV乱码久久精品蜜桃| 亚洲精品无码久久千人斩| 狠狠色婷婷久久一区二区| 伊人久久大香线蕉AV色婷婷色| 日日躁夜夜躁狠狠久久AV| 日产精品久久久久久久性色| 国内精品久久久久伊人av| 99久久成人国产精品免费| 成人亚洲欧美久久久久| 久久青青草原精品国产软件| 麻豆久久久9性大片| 午夜精品久久久久久久久| 久久精品国产精品国产精品污| 成人精品一区二区久久| 日韩欧美亚洲综合久久影院Ds| 国内精品久久久久久久久电影网| 久久综合狠狠综合久久综合88 | 99久久精品九九亚洲精品| 久久精品亚洲精品国产欧美| 麻豆久久久9性大片| 国产美女久久精品香蕉69| 国内精品久久久久久久影视麻豆| 伊人久久大香线蕉综合热线| 久久精品午夜一区二区福利| 精品免费久久久久国产一区|