AMQP和RabbitMQ概述
AMQP(Advanced Message Queue Protocol)定義了一種消息系統規范。這個規范描述了在一個分布式的系統中各個子系統如何通過消息交互。而RabbitMQ則是AMQP的一種基于erlang的實現。
AMQP將分布式系統中各個子系統隔離開來,子系統之間不再有依賴。子系統僅依賴于消息。子系統不關心消息的發送者,也不關心消息的接受者。
AMQP中有一些概念,用于定義與應用層的交互。這些概念包括:message、queue、exchange、channel, connection, broker、vhost。
注:到目前為止我并沒有打算使用AMQP,所以沒有做更深入的學習,僅為了找個機會寫寫erlang代碼,以下信息僅供參考。
- message,即消息,簡單來說就是應用層需要發送的數據
- queue,即隊列,用于存儲消息
- exchange,有翻譯為“路由”,它用于投遞消息,應用程序在發送消息時并不是指定消息被發送到哪個隊列,而是將消息投遞給路由,由路由投遞到隊列
- channel,幾乎所有操作都在channel中進行,有點類似一個溝通通道
- connection,應用程序與broker的網絡連接
- broker,可簡單理解為實現AMQP的服務,例如RabbitMQ服務
關于AMQP可以通過一篇很有名的文章了解更多:RabbitMQ+Python入門經典 兔子和兔子窩
RabbitMQ的運行需要erlang的支持,erlang和RabbitMQ在windows下都可以直接使用安裝程序,非常簡單。RabbitMQ還支持網頁端的管理,這需要開啟一些RabbitMQ的插件,可以參考官方文檔。
RabbitMQ本質上其實是一個服務器,與這個服務器做交互則是通過AMQP定義的協議,應用可以使用一個實現了AMQP協議的庫來與服務器交互。這里我使用erlang的一個客戶端,對應著RabbitMQ的tutorial,使用erlang實現了一遍。基于這個過程我將一些關鍵實現羅列出來以供記憶:
主要功能使用
關于RabbitMQ erlang client的使用說明可以參考官方文檔。這個client library下載下來后是兩個ez文件,其實就是zip文件,本身是erlang支持的庫打包格式,但據說這個feature還不成熟。總之我是直接解壓,然后在環境變量中指定ERL_LIBS
到解壓目錄。使用時使用include_lib
包含庫文件(類似C語言里的頭文件):
-include_lib("amqp_client/include/amqp_client.hrl").
Connection/Channel
對于連接到本地的RabbitMQ服務:
{ok, Connection} = amqp_connection:start(#amqp_params_network{}),
{ok, Channel} = amqp_connection:open_channel(Connection),
創建Queue
每個Queue都有名字,這個名字可以人為指定,也可以由系統分配。Queue創建后如果不顯示刪除,斷開網絡連接是不會自動刪除這個Queue的,這個可以在RabbitMQ的web管理端看到。
#'queue.declare_ok'{queue = Q}
= amqp_channel:call(Channel, #'queue.declare'{queue = <<"rpc_queue">>}),
但也可以指定Queue會在程序退出后被自動刪除,需要指定exclusive
參數:
QDecl = #'queue.declare'{queue = <<>>, exclusive = true},
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, QDecl),
上例中queue的名字未指定,由系統分配。
發送消息
一般情況下,消息其實是發送給exchange的:
Payload = <<"hello">>
Publish = #'basic.publish'{exchange = <<"log_exchange">>},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),
exchange有一系列規則,決定某個消息將被投遞到哪個隊列。
發送消息時也可以不指定exchange,這個時候消息的投遞將依賴于routing_key
,routing_key
在這種場景下就對應著目標queue的名字:
#'queue.declare_ok'{queue = Q}
= amqp_channel:call(Channel, #'queue.declare'{queue = <<"rpc_queue">>}),
Payload = <<"hello">>,
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),
接收消息
可以通過注冊一個消息consumer來完成消息的異步接收:
Sub = #'basic.consume' {queue = Q},
#'basic.consume_ok'{consumer_tag = Tag} = amqp_channel:subscribe(Channel, Sub, self()),
以上注冊了了一個consumer,監聽變量Q
指定的隊列。當有消息到達該隊列時,系統就會向consumer進程對應的mailbox投遞一個通知,我們可以使用receive
來接收該通知:
loop(Channel) ->
receive
% This is the first message received (from RabbitMQ)
#'basic.consume_ok'{} ->
loop(Channel);
% a delivery
{#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Payload}} ->
echo(Payload),
% ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
loop(Channel);
...
綁定exchange和queue
綁定(binding)其實也算AMQP里的一個關鍵概念,它用于建立exchange和queue之間的聯系,以方便exchange在收到消息后將消息投遞到隊列。我們不一定需要將隊列和exchange綁定起來。
Binding = #'queue.bind'{queue = Queue, exchange = Exchange, routing_key = RoutingKey},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)
在綁定的時候需要填入一個routing_key
的參數,不同類型的exchange對該值的處理方式不一樣,例如后面提到fanout
類型的exchange時,就不需要該值。
更多細節
通過閱讀RabbitMQ tutorial,我們還會獲得很多細節信息。例如exchange的種類、binding等。
exchange分類
exchange有四種類型,不同類型決定了其在收到消息后,該如何處理這條消息(投遞規則),這四種類型為:
- fanout
- direct
- topic
- headers
fanout類型的exchange是一個廣播exchange,它在收到消息后會將消息廣播給所有綁定到它上面的隊列。綁定(binding)用于將隊列和exchange關聯起來。我們可以在創建exchange的時候指定exchange的類型:
Declare = #'exchange.declare'{exchange = <<"my_exchange">>, type = <<"fanout">>}
#'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare)
direct類型的exchange在收到消息后,會將此消息投遞到發送消息時指定的routing_key
和綁定隊列到exchange上時的routing_key
相同的隊列里。可以多次綁定一個隊列到一個exchange上,每次指定不同的routing_key
,就可以接收多種routing_key
類型的消息。注意,綁定隊列時我們可以填入一個routing_key
,發送消息時也可以指定一個routing_key
。
topic類型的exchange相當于是direct exchange的擴展,direct exchange在投遞消息到隊列時,是單純的對routing_key
做相等判定,而topic exchange則是一個routing_key
的字符串匹配,就像正則表達式一樣。在routing_key
中可以填入一種字符串匹配符號:
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
header exchange tutorial中未提到,我也不深究
消息投遞及回應
每個消息都可以提供回應,以使RabbitMQ確定該消息確實被收到。RabbitMQ重新投遞消息僅依靠與consumer的網絡連接情況,所以只要網絡連接正常,consumer卡死也不會導致RabbitMQ重投消息。如下回應消息:
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
其中Tag
來源于接收到消息時里的Tag
。
如果有多個consumer監聽了一個隊列,RabbitMQ會依次把消息投遞到這些consumer上。這里的投遞原則使用了round robin
方法,也就是輪流方式。如前所述,如果某個consumer的處理邏輯耗時嚴重,則將導致多個consumer出現負載不均衡的情況,而RabbitMQ并不關心consumer的負載。可以通過消息回應機制來避免RabbitMQ使用這種消息數平均的投遞原則:
Prefetch = 1,
amqp_channel:call(Channel, #'basic.qos'{prefetch_count = Prefetch})
消息可靠性
RabbitMQ可以保證消息的可靠性,這需要設置消息和隊列都為durable的:
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello_queue">>, durable = true}),
Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = "", routing_key = Queue},
Props = #'P_basic'{delivery_mode = 2}, %% persistent message
Msg = #amqp_msg{props = Props, payload = Payload},
amqp_channel:cast(Channel, Publish, Msg)
參考
除了參考RabbitMQ tutorial外,還可以看看別人使用erlang是如何實現這些tutorial的,github上有一個這樣的項目:rabbitmq-tutorials。我自己也實現了一份,包括rabbitmq-tutorials中沒實現的RPC。后來我發現原來rabbitmq erlang client的實現里已經包含了一個RPC模塊。