http://www.tuicool.com/articles/zYfAnea
最近實(shí)現(xiàn)一個(gè)二維碼掃描登錄的功能,當(dāng)用戶用移動(dòng)設(shè)備掃描PC端頁面的二維碼之后,移動(dòng)設(shè)備通過常規(guī)HTTP短連接向服務(wù)器獲取認(rèn)證數(shù)據(jù),認(rèn)證通過后,服務(wù)器向PC瀏覽器主動(dòng)推送帳號(hào)相關(guān)信息以完成PC端頁面的登錄。
服務(wù)器主動(dòng)向?yàn)g覽器推送數(shù)據(jù),基本上就是ajax輪詢、iframe stream、websocket等等,可以參見 《Comet (web技術(shù))》
推送服務(wù)器有很多種,當(dāng)然用強(qiáng)大穩(wěn)定又順手的nginx了。 nginx相關(guān)的推送插件模塊有nginx-push-stream-module、nginx_http_push_module,但是很遺憾,可配置不可編程。又到我們的主角 OpenResty (OpenResty (aka. ngx_openresty) is a full-fledged web application server by bundling the standard Nginx core, lots of 3rd-party Nginx modules, as well as most of their external dependencies)出馬的時(shí)候了。
1、PC瀏覽器向php發(fā)起ajax請(qǐng)求,獲取一個(gè)與當(dāng)前session相關(guān)的唯一的二維碼URL,和一個(gè)唯一的sub訂閱URL。
2、PC瀏覽器顯示二維碼,并對(duì)sub訂閱URL發(fā)起ajax長(zhǎng)連接或者websocket連接,這個(gè)請(qǐng)求將直接由nginx來hold住,超時(shí)時(shí)間由配置參數(shù)push_free_timeout決定。
3、手機(jī)端掃描并解析二維碼,向php發(fā)起認(rèn)證。
4、php收到移動(dòng)設(shè)備的請(qǐng)求后,解出sessionid,向nginx的pub接口發(fā)布數(shù)據(jù),該數(shù)據(jù)將被直接投遞到對(duì)應(yīng)的sub接口,并回傳到瀏覽器。
5、如果sub接口在push_free_timeout指定的時(shí)間內(nèi)一直沒有收到數(shù)據(jù),將主動(dòng)斷開與瀏覽器端的連接。此時(shí),瀏覽器可以根據(jù)業(yè)務(wù)場(chǎng)景決定是否重新發(fā)起連接。
出于性能考慮,使用ngx.shared共享內(nèi)存存儲(chǔ)消息,只能共享于一個(gè)ngx實(shí)例內(nèi),對(duì)于10k級(jí)別的聊天室并發(fā)連接應(yīng)該是夠用了。
使用redis作為外部存儲(chǔ),也是可以的,如果100k的并發(fā),需要注意ngx對(duì)nosql發(fā)起連接時(shí)耗盡socket,當(dāng)然這個(gè)是可以解決的。
更大規(guī)模的并發(fā),值得自研推送服務(wù)器。
在生產(chǎn)環(huán)境某個(gè)LB節(jié)點(diǎn)上試運(yùn)行過,openresty跑著waf\fastcgi proxy\http proxy\comet。常態(tài)1k并發(fā)連接數(shù),load 0.01,40k并發(fā)時(shí),load只有0.15。
下面是相關(guān)測(cè)試代碼,如果有空完善了再托管到github上,考慮寫一個(gè)聊天室的完整demo。
resty.push基礎(chǔ)模塊(需要使用到ngx.shared共享內(nèi)存來存儲(chǔ)消息,在nginx.conf的http段配置lua_shared_dict push 10m;)
--[[
-- /usr/local/openresty/lualib/resty/push.lua
-- push.lua ,resty.push 基于nginx_lua的push推送方案
-- 支持多對(duì)多頻道
-- 支持long-pooling, stream, websocket
--
-- Author: chuyinfeng.com <Liujiaxiong@kingsoft.com>
-- 2014.03.12
--]]
local _M = {_VERSION = '0.01'}
local function debug(msg)
--ngx.say(msg)
--ngx.flush(true)
end
-- 配置信息
_M.config = {
-- 推送間隔,1s
['push_interval'] = 1,
-- 消息隊(duì)列最大長(zhǎng)度
['msglist_len'] = 100,
-- 消息生存周期
['msg_lefttime'] = 3,
-- 頻道空閑超時(shí)
['channel_timeout'] = 30,
-- 推送空閑超時(shí),在改時(shí)間段內(nèi)無消息則關(guān)閉當(dāng)前推送連接
['push_free_timeout'] = 10,
-- 共享內(nèi)存名
['store_name'] = 'push',
-- 頻道號(hào)
['channels'] = {1, 2},
}
-- 頻道數(shù)量
_M.channels_len = 0
-- 當(dāng)前讀位置
_M.idx_read = 0
-- 共享內(nèi)存
_M.store = nil
-- cjson 模塊
local cjson = require "cjson"
--[[
-- 設(shè)置
--]]
_M.opt = function(self, k, v)
local t = type(k)
if t == 'table' then
for key, val in pairs(k) do
self.config[key] = val
end
end
if t == 'string' then
self.config[k] = v
end
self.channels_len = table.maxn(self.config['channels'])
self.store = ngx.shared[self.config['store_name']]
end
--[[
-- 向頻道寫入消息
--
-- @param ngx.shared.dict, 共享內(nèi)存
-- @param string channel_id,可用ngx.crc32_long生成
-- @param int channel_timeout, 頻道空閑超時(shí)時(shí)間
-- @param string msg,消息內(nèi)容 必須為字符串
-- @param int msg_lefttime, 消息生存周期
-- @param int msglist_len, 消息隊(duì)列長(zhǎng)度
-- @return boolean
--]]
local function _write(store, channel_id, channel_timeout, msg, msg_lefttime, msglist_len)
local idx, ok, err
-- 消息當(dāng)前讀取位置計(jì)數(shù)器+1
idx, err = store:incr(channel_id, 1)
-- 如果異常,則新建頻道
if err then
ok, err = store:set(channel_id, 1, channel_timeout)
if err then return 0 end
idx = 1
else
store:replace(channel_id, idx, channel_timeout)
end
-- 寫入消息
debug("write " .. channel_id .. idx .. " , lefttime: " .. msg_lefttime.. " , msg: " .. msg)
ok, err = store:set('m' .. channel_id .. idx, msg, msg_lefttime)
if err then return 0 end
-- 清除隊(duì)列之前的舊消息
if idx > msglist_len then
store:delete('m' .. channel_id .. (idx - msglist_len))
end
return idx
end
--[[
-- 從頻道讀取消息
--
-- @param int channel_id, 必須為整形,可用ngx.crc32_long生成
-- @param int offset,歷史偏移量,最小為0
-- @return int len, 剩余消息數(shù)量
-- @return string msg, 消息
--]]
local _read = function (store, channel_id, msglist_len, idx_read)
local idx_msg, err, msg
-- 獲取最新消息的位置
idx_msg, _ = store:get(channel_id)
idx_msg = idx_msg or 0
if idx_msg == 0 then
idx_read = 0
end
if idx_msg - idx_read > msglist_len then
idx_read = idx_msg - msglist_len
end
if idx_read < idx_msg then
idx_read = idx_read + 1
msg, _ = store:get('m' .. channel_id .. idx_read)
end
-- 返回讀的位置和消息的最大位置,以及消息
return idx_read, idx_msg, msg
end
--[[
-- 推送消息
-- @param callback wrapper, 消息包裝回調(diào)函數(shù)
--]]
_M.push = function(self, wrapper)
local flag_work = true
local flag_read = true
local idx_read, idx_msg, msg, err
local time_last_msg = ngx.time()
while flag_work do
for i = 1, self.channels_len do
-- 循環(huán)讀取當(dāng)前頻道,直到EOF
flag_read = true
while flag_read do
debug("read from idx_read: " .. self.idx_read)
self.idx_read, idx_msg, msg = _read(self.store, self.config['channels'][i], self.config['msglist_len'], self.idx_read)
if msg ~= nil then
debug("got msg and wrapper msg: " .. msg)
time_last_msg = ngx.time()
wrapper(msg)
end
debug("idx_read: " .. self.idx_read .. ", idx_msg: " .. idx_msg)
if self.idx_read == idx_msg then flag_read = false end
end
end
debug("push_free: " .. ngx.time() - time_last_msg)
if ngx.time() - time_last_msg >= self.config['push_free_timeout'] then
debug("push_timeout: " .. " last: " .. time_last_msg .. " , now: " .. ngx.time())
flag_work = false
end
debug("sleep: " .. self.config['push_interval'])
ngx.sleep(self.config['push_interval'])
end
end
--[[
-- 發(fā)送消息到指定頻道
--]]
_M.send = function(self, msg)
local idx = 0
for i = 1, self.channels_len do
idx = _write(self.store, self.config['channels'][i], self.config['channel_timeout'], msg, self.config['msg_lefttime'], self.config['msglist_len'])
end
return true
end
--[[
-- jsonp格式化
--]]
_M.jsonp = function(self, data, cb)
if cb then
return cb .. "(" .. cjson.encode(data) .. ");"
else
return cjson.encode(data)
end
end
--[[
-- 公開成員
--]]
_M.new = function(self)
return setmetatable({}, { __index = _M })
end
return _M
public發(fā)布
local push = require "resty.push"
local function exit(is_ws)
if is_ws == nil then ngx.eof() end
ngx.exit(444)
end
local ok, err = ngx.on_abort(exit)
if err then return end
local pub = push:new()
pub:opt({
['channels'] = 123,
['push_interval'] = 0.1,
['push_free_timeout'] = 27,
})
ngx.req.read_body()
local body, err = ngx.req.get_post_args()
if err then exit() end
if pub:send(body['data'] or '') then
ngx.header['Content-Type'] = 'text/javascript;charset=UTF-8'
ngx.status = ngx.HTTP_OK
ngx.say(pub:jsonp({['status'] = 1}, args['callback']))
end
subscribe 訂閱接口
local push = require "resty.push"
local function exit(is_ws)
if is_ws == nil then ngx.eof() end
ngx.exit(444)
end
local ok, err = ngx.on_abort(exit)
if err then return end
local args = ngx.req.get_uri_args()
local sub = push:new()
sub:opt({
['channels'] = 123,
['push_interval'] = 0.1,
['push_free_timeout'] = 27,
})
local wrapper = function(msg)
ngx.header['Content-Type'] = 'text/javascript;charset=UTF-8'
ngx.status = ngx.HTTP_OK
ngx.say(sub:jsonp(msg, args['callback']))
exit()
end
sub:push(wrapper)
wrapper(sub:jsonp({['status'] = 1, ['tips'] = 'timeout'}))
posted on 2016-08-04 08:55
思月行云 閱讀(4363)
評(píng)論(0) 編輯 收藏 引用 所屬分類:
Nginx\Openresty