• <ins id="pjuwb"></ins>
    <blockquote id="pjuwb"><pre id="pjuwb"></pre></blockquote>
    <noscript id="pjuwb"></noscript>
          <sup id="pjuwb"><pre id="pjuwb"></pre></sup>
            <dd id="pjuwb"></dd>
            <abbr id="pjuwb"></abbr>

            loop_in_codes

            低調(diào)做技術(shù)__歡迎移步我的獨(dú)立博客 codemaro.com 微博 kevinlynx

            erlang和RabbitMQ學(xué)習(xí)總結(jié)

            AMQP和RabbitMQ概述

            AMQP(Advanced Message Queue Protocol)定義了一種消息系統(tǒng)規(guī)范。這個規(guī)范描述了在一個分布式的系統(tǒng)中各個子系統(tǒng)如何通過消息交互。而RabbitMQ則是AMQP的一種基于erlang的實(shí)現(xiàn)。

            AMQP將分布式系統(tǒng)中各個子系統(tǒng)隔離開來,子系統(tǒng)之間不再有依賴。子系統(tǒng)僅依賴于消息。子系統(tǒng)不關(guān)心消息的發(fā)送者,也不關(guān)心消息的接受者。

            AMQP中有一些概念,用于定義與應(yīng)用層的交互。這些概念包括:message、queue、exchange、channel, connection, broker、vhost。

            注:到目前為止我并沒有打算使用AMQP,所以沒有做更深入的學(xué)習(xí),僅為了找個機(jī)會寫寫erlang代碼,以下信息僅供參考。

            • message,即消息,簡單來說就是應(yīng)用層需要發(fā)送的數(shù)據(jù)
            • queue,即隊列,用于存儲消息
            • exchange,有翻譯為“路由”,它用于投遞消息,應(yīng)用程序在發(fā)送消息時并不是指定消息被發(fā)送到哪個隊列,而是將消息投遞給路由,由路由投遞到隊列
            • channel,幾乎所有操作都在channel中進(jìn)行,有點(diǎn)類似一個溝通通道
            • connection,應(yīng)用程序與broker的網(wǎng)絡(luò)連接
            • broker,可簡單理解為實(shí)現(xiàn)AMQP的服務(wù),例如RabbitMQ服務(wù)

            關(guān)于AMQP可以通過一篇很有名的文章了解更多:RabbitMQ+Python入門經(jīng)典 兔子和兔子窩

            RabbitMQ的運(yùn)行需要erlang的支持,erlang和RabbitMQ在windows下都可以直接使用安裝程序,非常簡單。RabbitMQ還支持網(wǎng)頁端的管理,這需要開啟一些RabbitMQ的插件,可以參考官方文檔

            RabbitMQ本質(zhì)上其實(shí)是一個服務(wù)器,與這個服務(wù)器做交互則是通過AMQP定義的協(xié)議,應(yīng)用可以使用一個實(shí)現(xiàn)了AMQP協(xié)議的庫來與服務(wù)器交互。這里我使用erlang的一個客戶端,對應(yīng)著RabbitMQ的tutorial,使用erlang實(shí)現(xiàn)了一遍。基于這個過程我將一些關(guān)鍵實(shí)現(xiàn)羅列出來以供記憶:

            主要功能使用

            關(guān)于RabbitMQ erlang client的使用說明可以參考官方文檔。這個client library下載下來后是兩個ez文件,其實(shí)就是zip文件,本身是erlang支持的庫打包格式,但據(jù)說這個feature還不成熟。總之我是直接解壓,然后在環(huán)境變量中指定ERL_LIBS到解壓目錄。使用時使用include_lib包含庫文件(類似C語言里的頭文件):

                -include_lib("amqp_client/include/amqp_client.hrl").
            

            Connection/Channel

            對于連接到本地的RabbitMQ服務(wù):

                {ok, Connection} = amqp_connection:start(#amqp_params_network{}),
                {ok, Channel} = amqp_connection:open_channel(Connection),
            

            創(chuàng)建Queue

            每個Queue都有名字,這個名字可以人為指定,也可以由系統(tǒng)分配。Queue創(chuàng)建后如果不顯示刪除,斷開網(wǎng)絡(luò)連接是不會自動刪除這個Queue的,這個可以在RabbitMQ的web管理端看到。

                #'queue.declare_ok'{queue = Q}
                    = amqp_channel:call(Channel, #'queue.declare'{queue = <<"rpc_queue">>}),
            

            但也可以指定Queue會在程序退出后被自動刪除,需要指定exclusive參數(shù):

                QDecl = #'queue.declare'{queue = <<>>, exclusive = true},
                #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, QDecl),
            

            上例中queue的名字未指定,由系統(tǒng)分配。

            發(fā)送消息

            一般情況下,消息其實(shí)是發(fā)送給exchange的:

                Payload = <<"hello">>
                Publish = #'basic.publish'{exchange = <<"log_exchange">>},
                amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),
            

            exchange有一系列規(guī)則,決定某個消息將被投遞到哪個隊列。

            發(fā)送消息時也可以不指定exchange,這個時候消息的投遞將依賴于routing_keyrouting_key在這種場景下就對應(yīng)著目標(biāo)queue的名字:

                #'queue.declare_ok'{queue = Q}
                    = amqp_channel:call(Channel, #'queue.declare'{queue = <<"rpc_queue">>}),
                Payload = <<"hello">>,
                Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
                amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),
            

            接收消息

            可以通過注冊一個消息consumer來完成消息的異步接收:

                Sub = #'basic.consume' {queue = Q},
                #'basic.consume_ok'{consumer_tag = Tag} = amqp_channel:subscribe(Channel, Sub, self()),
            

            以上注冊了了一個consumer,監(jiān)聽變量Q指定的隊列。當(dāng)有消息到達(dá)該隊列時,系統(tǒng)就會向consumer進(jìn)程對應(yīng)的mailbox投遞一個通知,我們可以使用receive來接收該通知:

                loop(Channel) ->
                    receive 
                        % This is the first message received (from RabbitMQ)
                        #'basic.consume_ok'{} -> 
                            loop(Channel);
                        % a delivery
                        {#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Payload}} ->
                            echo(Payload),
                            % ack the message
                            amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
                            loop(Channel);
                    ...
            

            綁定exchange和queue

            綁定(binding)其實(shí)也算AMQP里的一個關(guān)鍵概念,它用于建立exchange和queue之間的聯(lián)系,以方便exchange在收到消息后將消息投遞到隊列。我們不一定需要將隊列和exchange綁定起來。

                Binding = #'queue.bind'{queue = Queue, exchange = Exchange, routing_key = RoutingKey},
                #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)
            

            在綁定的時候需要填入一個routing_key的參數(shù),不同類型的exchange對該值的處理方式不一樣,例如后面提到fanout類型的exchange時,就不需要該值。

            更多細(xì)節(jié)

            通過閱讀RabbitMQ tutorial,我們還會獲得很多細(xì)節(jié)信息。例如exchange的種類、binding等。

            exchange分類

            exchange有四種類型,不同類型決定了其在收到消息后,該如何處理這條消息(投遞規(guī)則),這四種類型為:

            • fanout
            • direct
            • topic
            • headers

            fanout類型的exchange是一個廣播exchange,它在收到消息后會將消息廣播給所有綁定到它上面的隊列。綁定(binding)用于將隊列和exchange關(guān)聯(lián)起來。我們可以在創(chuàng)建exchange的時候指定exchange的類型:

                Declare = #'exchange.declare'{exchange = <<"my_exchange">>, type = <<"fanout">>}
                #'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare)
            

            direct類型的exchange在收到消息后,會將此消息投遞到發(fā)送消息時指定的routing_key和綁定隊列到exchange上時的routing_key相同的隊列里。可以多次綁定一個隊列到一個exchange上,每次指定不同的routing_key,就可以接收多種routing_key類型的消息。注意,綁定隊列時我們可以填入一個routing_key,發(fā)送消息時也可以指定一個routing_key

            topic類型的exchange相當(dāng)于是direct exchange的擴(kuò)展,direct exchange在投遞消息到隊列時,是單純的對routing_key做相等判定,而topic exchange則是一個routing_key的字符串匹配,就像正則表達(dá)式一樣。在routing_key中可以填入一種字符串匹配符號:

            * (star) can substitute for exactly one word.
            # (hash) can substitute for zero or more words.
            

            header exchange tutorial中未提到,我也不深究

            消息投遞及回應(yīng)

            每個消息都可以提供回應(yīng),以使RabbitMQ確定該消息確實(shí)被收到。RabbitMQ重新投遞消息僅依靠與consumer的網(wǎng)絡(luò)連接情況,所以只要網(wǎng)絡(luò)連接正常,consumer卡死也不會導(dǎo)致RabbitMQ重投消息。如下回應(yīng)消息:

                amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
            

            其中Tag來源于接收到消息時里的Tag

            如果有多個consumer監(jiān)聽了一個隊列,RabbitMQ會依次把消息投遞到這些consumer上。這里的投遞原則使用了round robin方法,也就是輪流方式。如前所述,如果某個consumer的處理邏輯耗時嚴(yán)重,則將導(dǎo)致多個consumer出現(xiàn)負(fù)載不均衡的情況,而RabbitMQ并不關(guān)心consumer的負(fù)載。可以通過消息回應(yīng)機(jī)制來避免RabbitMQ使用這種消息數(shù)平均的投遞原則:

                Prefetch = 1,
                amqp_channel:call(Channel, #'basic.qos'{prefetch_count = Prefetch})
            

            消息可靠性

            RabbitMQ可以保證消息的可靠性,這需要設(shè)置消息和隊列都為durable的:

                #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, #'queue.declare'{queue = <<"hello_queue">>, durable = true}),
            
                Payload = <<"foobar">>,
                Publish = #'basic.publish'{exchange = "", routing_key = Queue},
                Props = #'P_basic'{delivery_mode = 2}, %% persistent message
                Msg = #amqp_msg{props = Props, payload = Payload},
                amqp_channel:cast(Channel, Publish, Msg)
            

            參考

            除了參考RabbitMQ tutorial外,還可以看看別人使用erlang是如何實(shí)現(xiàn)這些tutorial的,github上有一個這樣的項目:rabbitmq-tutorials。我自己也實(shí)現(xiàn)了一份,包括rabbitmq-tutorials中沒實(shí)現(xiàn)的RPC。后來我發(fā)現(xiàn)原來rabbitmq erlang client的實(shí)現(xiàn)里已經(jīng)包含了一個RPC模塊。

            posted on 2013-04-12 21:27 Kevin Lynx 閱讀(8675) 評論(0)  編輯 收藏 引用 所屬分類: erlang

            久久久WWW成人免费精品| 久久综合色区| 欧美久久精品一级c片片| 久久精品一区二区国产| 久久婷婷五月综合97色直播| 久久狠狠爱亚洲综合影院| 国产一久久香蕉国产线看观看 | 人妻少妇精品久久| 久久天堂AV综合合色蜜桃网 | 日韩电影久久久被窝网| 97精品久久天干天天天按摩| 亚洲午夜福利精品久久| 99久久国产综合精品网成人影院 | 人妻无码αv中文字幕久久琪琪布| 久久精品国产亚洲av高清漫画 | 94久久国产乱子伦精品免费| 国产精品99久久久精品无码| 久久久久97国产精华液好用吗| 欧美丰满熟妇BBB久久久| 亚洲七七久久精品中文国产| 国产精品亚洲综合专区片高清久久久 | 国产精品成人精品久久久| 久久亚洲中文字幕精品有坂深雪 | 久久久无码精品亚洲日韩京东传媒 | 久久青青草原精品国产软件| 久久久青草久久久青草| 男女久久久国产一区二区三区| 亚洲人成无码久久电影网站| 日韩亚洲国产综合久久久| 久久国产乱子伦精品免费午夜| 久久这里只精品国产99热| 国产精品久久一区二区三区| www.久久热.com| 国产69精品久久久久9999| 久久久久久a亚洲欧洲aⅴ| 国产亚洲欧美成人久久片| 99久久人人爽亚洲精品美女| 亚洲成色999久久网站| 久久99精品综合国产首页| 久久免费精品一区二区| 亚洲天堂久久精品|