接上篇,本篇談談一些實現細節。
這個爬蟲程序主要的問題在于如何獲取P2P網絡中分享的資源,獲取到資源后索引到數據庫中,搜索就是自然而然的事情。
DHT
DHT網絡本質上是一個用于查詢的網絡,其用于查詢一個資源有哪些計算機正在下載。每個資源都有一個20字節長度的ID用于標示,稱為infohash。當一個程序作為DHT節點加入這個網絡時,就會有其他節點來向你查詢,當你做出回應后,對方就會記錄下你。對方還會詢問其他節點,當對方開始下載這個infohash對應的資源時,他就會告訴所有曾經詢問過的節點,包括你。這個時候就可以確定,這個infohash對應的資源在這個網絡中是有效的。
關于這個網絡的工作原理,參看:P2P中DHT網絡爬蟲以及寫了個磁力搜索的網頁。
獲取到infohash后能做什么?關鍵點在于,我們現在使用的磁力鏈接(magnet url),是和infohash對應起來的。也就是拿到infohash,就等于拿到一個磁力鏈接。但是這個爬蟲還需要建立資源的信息,這些信息來源于種子文件。種子文件其實也是對應到一個資源,種子文件包含資源名、描述、文件列表、文件大小等信息。獲取到infohash時,其實也獲取到了對應的計算機地址,我們可以在這些計算機上下載到對應的種子文件。
但是我為了簡單,在獲取到infohash后,從一些提供映射磁力鏈到種子文件服務的網站上直接下載了對應的種子。dhtcrawler里使用了以下網站:
http://torrage.com
https://zoink.it
http://bt.box.n0808.com
使用這些網站時,需提供磁力哈希(infohash可直接轉換),構建特定的URL,發出HTTP請求即可。
U1 = "http://torrage.com/torrent/" ++ MagHash ++ ".torrent",
U2 = "https://zoink.it/torrent/" ++ MagHash ++ ".torrent",
U3 = format_btbox_url(MagHash),
format_btbox_url(MagHash) ->
H = lists:sublist(MagHash, 2),
T = lists:nthtail(38, MagHash),
"http://bt.box.n0808.com/" ++ H ++ "/" ++ T ++ "/" ++ MagHash ++ ".torrent".
但是,以一個節點的身份加入DHT網絡,是無法獲取大量查詢的。在DHT網絡中,每個節點都有一個ID。每個節點在查詢信息時,僅詢問離信息較近的節點。這里的信息除了infohash外還包含節點,即節點詢問一個節點,這個節點在哪里。DHT的典型實現中(Kademlia),使用兩個ID的xor操作來確定距離。既然距離的計算是基于ID的,為了盡可能獲取整個DHT網絡交換的信息,爬蟲程序就可以建立盡可能多的DHT節點,讓這些節點的ID均勻地分布在ID取值區間內,以這樣的方式加入網絡。
在dhtcrawler中,我使用以下方式產生了N個大致均勻分布的ID:
create_discrete_ids(1) ->
[dht_id:random()];
create_discrete_ids(Count) ->
Max = dht_id:max(),
Piece = Max div Count,
[random:uniform(Piece) + Index * Piece || Index <- lists:seq(0, Count - 1)].
除了盡可能多地往DHT網絡里部署節點之外,對單個節點而言,也有些注意事項。例如應盡可能快地將自己告訴盡可能多的節點,這可以在啟動時進行大量的隨機infohash的查詢。隨著查詢過程的深入,該節點會與更多的節點打交道。因為DHT網絡里的節點實際上是不穩定的,它今天在線,明天后天可能不在線,所以計算你的ID固定,哪些節點與你較近,本身就是個相對概念。節點在程序退出時,也最好將自己的路由信息(與自己交互的節點列表)保存起來,這樣下次啟動時就可以更快地加入網絡。
在dhtcrawler的實現中,每個節點每個一定時間,都會向網絡中隨機查詢一個infohash,這個infohash是隨機產生的。其查詢目的不在于infohash,而在于告訴更多的節點,以及在其他節點上保持自己的活躍。
handle_event(startup, {MyID}) ->
timer:apply_interval(?QUERY_INTERVAL, ?MODULE, start_tell_more_nodes, [MyID]).
start_tell_more_nodes(MyID) ->
spawn(?MODULE, tell_more_nodes, [MyID]).
tell_more_nodes(MyID) ->
[search:get_peers(MyID, dht_id:random()) || _ <- lists:seq(1, 3)].
DHT節點的完整實現是比較繁瑣的,涉及到查詢以及繁雜的各種對象的超時(節點、桶、infohash),而超時的處理并不是粗暴地做刪除操作。因為本身是基于UDP協議,你得對這些超時對象做進一步的查詢才能正確地進一步做其他事情。而搜索也是個繁雜的事情,遞歸地查詢節點,感覺上,你不一定離目標越來越近,由于被查詢節點的不確定性(無法確定對方是否在玩弄你,或者本身對方就是個傻逼),你很可能接下來要查詢的節點反而離目標變遠了。
在我第一次的DHT實現中,我使用了類似transmission里DHT實現的方法,不斷無腦遞歸,當搜索有太久時間沒得到響應后終止搜索。第二次實現時,我就使用了etorrent里的實現。這個搜索更聰明,它記錄搜索過的節點,并且檢查是否離目標越來越遠。當遠離目標時,就認為搜索是不太有效的,不太有效的搜索嘗試幾次就可以放棄。
實際上,爬蟲的實現并不需要完整地實現DHT節點的正常功能。爬蟲作為一個DHT節點的唯一動機僅是獲取網絡里其他節點的查詢。而要完成這個功能,你只需要裝得像個正常人就行。這里不需要保存infohash對應的peer列表,面臨每一次查詢,你隨便回復幾個節點地址就可以。但是這里有個責任問題,如果整個DHT網絡有2000個節點,而你這個爬蟲就有1000個節點,那么你的隨意回復,就可能導致對方根本找不到正確的信息,這樣你依然得不到有效的資源。(可以利用這一點破壞DHT網絡)
DHT的實現沒有使用第三方庫。
種子
種子文件的格式同DHT網絡消息格式一樣,使用一種稱為bencode的文本格式來編碼。種子文件分為兩類:單個文件和多個文件。
文件的信息無非就是文件名、大小。文件名可能包含utf8編碼的名字,為了后面處理的方便,dhtcrawler都會優先使用utf8編碼。
{ok, {dict, Info}} = dict:find(<<"info">>, TD),
case type(Info) of
single -> {single, parse_single(Info)};
multi -> {multi, parse_multi(Info)}
end.
parse_single(Info) ->
Name = read_string("name", Info),
{ok, Length} = dict:find(<<"length">>, Info),
{Name, Length}.
parse_multi(Info) ->
Root = read_string("name", Info),
{ok, {list, Files}} = dict:find(<<"files">>, Info),
FileInfo = [parse_file_item(Item) || {dict, Item} <- Files],
{Root, FileInfo}.
數據庫
我最開始在選用數據庫時,為了不使用第三方庫,打算使用erlang自帶的mnesia。但是因為涉及到字符串匹配搜索,mnesia的查詢語句在我看來太不友好,在經過一些資料查閱后就直接放棄了。
然后我打算使用couchdb,因為它是erlang寫的,而我正在用erlang寫程序。第一次接觸非關系型數據庫,發現NoSQL數據庫使用起來比SQL類的簡單多了。但是在erlang里要使用couchdb實在太折騰了。我使用的客戶端庫是couchbeam。
因為couchdb暴露的API都是基于HTTP協議的,其數據格式使用了json,所以couchbeam實際上就是對各種HTTP請求、回應和json的包裝。但是它竟然使用了ibrowse這個第三方HTTP客戶端庫,而不是erlang自帶的。ibrowse又使用了jiffy這個解析json的庫。這個庫更慘烈的是它的解析工作都是交給C語言寫的動態庫來完成,我還得編譯那個C庫。
couchdb看起來不支持字符串查詢,我得自己創建一個view,這個view里我通過翻閱了一些資料寫了一個將每個doc的name拆分成若干次查詢結果的map。這個map在處理每一次查詢時,我都得動態更新之。couchdb是不支持局部更新的,這還不算大問題。然后很高興,終于支持字符串查詢了。這里的字符串查詢都是基于字符串的子串查詢。但是問題在于,太慢了。每一次在WEB端的查詢,都直接導致erlang進程的call超時。
要讓couchdb支持字符串查詢,要快速,當然是有解決方案的。但是這個時候我已經沒有心思繼續折騰,任何一個庫、程序如果接口設計得如此不方便,那就可以考慮換一個其他的。
我選擇了mongodb。同樣的基于文檔的數據庫。2.4版本還支持全文搜索。什么是全文搜索呢,這是一種基于單詞的全文搜索方式。hello world
我可以搜索hello
,基于單詞。mongodb會自動拆詞。更關鍵更讓人爽的是,要開啟這個功能非常簡單:設置啟動參數、建立索引。沒了。mongodb的erlang客戶端庫mongodb-erlang也只依賴一個bson-erlang庫。然后我又埋頭苦干,幾個小時候我的這個爬蟲程序就可以在瀏覽器端搜索關鍵字了。
后來我發現,mongodb的全文搜索是不支持中文的。因為它還不知道中文該怎么拆詞。恰好我有個同事做過中文拆詞的研究,看起來涉及到很復雜的算法。直到這個時候,我他媽才醒悟,我為什么需要基于單詞的搜索。我們大部分的搜索其實都是基于子字符串的搜索。
于是,我將種子文件的名字拆分成了若干個子字符串,將這些子字符串以數組的形式作為種子文檔的一個鍵值存儲,而我依然還可以使用全文索引,因為全文索引會將整個字符串作為單詞比較。實際上,基于一般的查詢方式也是可以的。當然,索引還是得建立。
使用mongodb時唯一讓我很不爽的是mongodb-erlang這個客戶端庫的文檔太欠缺。這還不算大問題,因為看看源碼參數還是可以大概猜到用法。真正悲劇的是mongodb的有些查詢功能它是不支持的。例如通過cursor來排序來限制數量。在cursor模塊并沒有對應的mongodb接口。最終我只好通過以下方式查詢,我不明白batchsize,但它可以工作:
search_announce_top(Conn, Count) ->
Sel = {'$query', {}, '$orderby', {announce, -1}},
List = mongo_do(Conn, fun() ->
Cursor = mongo:find(?COLLNAME, Sel, [], 0, Count),
mongo_cursor:rest(Cursor)
end),
[decode_torrent_item(Item) || Item <- List].
另一個悲劇的是,mongodb-erlang還不支持文檔的局部更新,它的update接口直接要求傳入整個文檔。幾經折騰,我可以通過runCommand來完成:
inc_announce(Conn, Hash) when is_list(Hash) ->
Cmd = {findAndModify, ?COLLNAME, query, {'_id', list_to_binary(Hash)},
update, {'$inc', {announce, 1}},
new, true},
Ret = mongo_do(Conn, fun() ->
mongo:command(Cmd)
end).
Unicode
不知道在哪里我看到過erlang說自己其實是不需要支持unicode的,因為這門語言本身是通過list來模擬字符串。對于unicode而言,對應的list保存的本身就是整數值。但是為了方便處理,erlang還是提供了一些unicode操作的接口。
因為我需要將種子的名字按字拆分,對于a中文
這樣的字符串而言,我需要拆分成以下結果:
a
a中
a中文
中
中文
文
那么,在erlang中當我獲取到一個字符串list時,我就需要知道哪幾個整數合起來實際上對應著一個漢字。erlang里unicode模塊里有幾個函數可以將unicode字符串list對應的整數合起來,例如:[111, 222, 333]
可能表示的是一個漢字,將其轉換以下可得到[111222333]
這樣的形式。
split(Str) when is_list(Str) ->
B = list_to_binary(Str), % 必須轉換為binary
case unicode:characters_to_list(B) of
{error, L, D} ->
{error, L, D};
{incomplete, L, D} ->
{incomplete, L, D};
UL ->
{ok, subsplit(UL)}
end.
subsplit([]) ->
[];
subsplit(L) ->
[_|R] = L,
{PreL, _} = lists:splitwith(fun(Ch) -> not is_spliter(Ch) end, L),
[unicode:characters_to_binary(lists:sublist(PreL, Len))
|| Len <- lists:seq(1, length(PreL))] ++ subsplit(R).
除了這里的拆字之外,URL的編碼、數據庫的存儲都還好,沒遇到問題。
注意,以上針對數據庫本身的吐槽,完全基于我不熟悉該數據庫的情況下,不建議作為你工具選擇的參考。
erlang的穩定性
都說可以用erlang來編寫高容錯的服務器程序。看看它的supervisor,監視子進程,自動重啟子進程。天生的容錯功能,就算你宕個幾次,單個進程自動重啟,整個程序看起來還穩健地在運行,多牛逼啊。再看看erlang的進程,輕量級的語言特性,就像OOP語言里的一個對象一樣輕量。如果說使用OOP語言寫程序得think in object,那用erlang你就得think in process,多牛逼多駭人啊。
實際上,以我的經驗來看,你還得以傳統的思維去看待erlang的進程。一些多線程程序里的問題,在erlang的進程環境中依然存在,例如死鎖。
在erlang中,對于一些異步操作,你可以通過進程間的交互將這個操作包裝成同步接口,例如ping的實現,可以等到對方回應之后再返回。被阻塞的進程反正很輕量,其包含的邏輯很單一。這不但是一種良好的包裝,甚至可以說是一種erlang-style。但這很容易帶來死鎖。在最開始的時候我沒有注意這個問題,當爬蟲節點數上升的時候,網絡數據復雜的時候,似乎就出現了死鎖型宕機(進程互相等待太久,直接timeout)。
另一個容易在多進程環境下出現的問題就是消息依賴的上下文改變問題。當投遞一個消息到某個進程,到這個消息被處理之前,這段時間這個消息關聯的邏輯運算所依賴的上下文環境改變了,例如某個ets元素不見了,在處理這個消息時,你還得以多線程編程的思維來編寫代碼。
至于supervisor,這玩意你得端正態度。它不是用來包容你的傻逼錯誤的。當你寫下傻逼代碼導致進程頻繁崩潰的時候,supervisor屁用沒有。supervisor的唯一作用,僅僅是在一個確實本身可靠的系統,確實人品問題萬分之一崩潰了,重啟它。畢竟,一個重啟頻率的推薦值,是一個小時4次。