https://www.aliyun.com/jiaocheng/647017.html
https://github.com/chenxiaofa/p
前言作為一個游戲從業(yè)者不可能不使用推方案,以前一直使用 nginx-push-stream-module這個模塊的 Forever Iframe模式來實現(xiàn)推方案。
最近決定研究下 lua-resty-websocket來實現(xiàn)一個更加高效好用推方案
不推薦使用的場景由于 OpenResty目前還不能做到跨 worker通信,所以想到實現(xiàn)指定推送需要中轉一次,效率上可能不如其他語言如 golang等
過于復雜的業(yè)務邏輯 頻繁的指定推送(單對單、組、Tag等) 廣播過多 一起工作的好基友們想要推的優(yōu)雅以下幾個基友的幫忙是不可或缺的
ngx.semaphore它可以讓你在想要發(fā)消息的地方優(yōu)雅的進入到發(fā)送階段,也可以讓你來優(yōu)雅的控制一個鏈接超時的關閉。
ngx.shared由于目前我們無法做到跨 worker的通信,所以必須借助共享內存來中轉不屬于當前 worker的消息。
lua-resty-websocket由于貪圖方便還是直接使用了現(xiàn)成的庫,喜歡折騰的小伙伴請移步 stream-lua-nginx-module
大概的思路由于不能跨 worker通信所以我給每個 worker申請了一個 shared共享內存來保存消息。
理論上 shared的數(shù)量等于 worker的數(shù)量最佳。
然后每個 worker啟動一個 timer來判斷當前 worker的 message id和 shared中的 message id是否有變化。
這里為什么不用 shared的有序列表來做,容我先賣個關子。
當發(fā)生變化時,判斷消息的目標是否在自己的 session hash中,如果在則發(fā)之。
開始準備工作 修改配置文件首先修改 nginx.conf配置,增加以下設置
lua_shared_dict message_1 10m;
lua_shared_dict message_2 10m;
lua_shared_dict message_n 10m;
init_worker_by_lua_file scripts/init_worker_by_lua.lua;
init_worker_by_lua
local ngx = ngx
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_timer_at = ngx.timer.at
local require = require
local socketMgr = require("socketMgr")
local delay = 1
local loopMessage
loopMessage = function(premature)
if premature then
ngx_log(ngx_ERR, "timer was shut: ", err)
return
end
socketMgr:loopMessages()
local ok, err = ngx_timer_at(delay, loopMessage)
if not ok then
ngx_log(ngx_ERR, "failed to create the timer: ", err)
return
end
end
loopMessage()
loopMessages判斷 local message id和 shared message id是否不等。
隨后每次 local message id+ 1 從 shared拉取數(shù)據(jù),進行消息推送邏輯。
建立連接不做過多說明,自行查看 lua-resty-websocket的 wiki
當連接監(jiān)聽好之后,要進行一系列的管理。如:
session id和 user id的雙向映射 session id和 group name的雙向映射后面再詳細說明
生成 session id我是用 ( worker id+ 1) * 100000 + worker's local incr id來生成唯一 session id比較簡陋,但是夠用。
這么做的原因是,通過對 session id進行取余可以很方便的得知 worker id,可以方便的給 shared寫消息。
local ngx_worker_id = ngx.worker.id()local _incr_id = 0local _gen_session_id = function()_incr_id = _incr_id + 1return (ngx_worker_id + 1) * 100000 + _incr_idend 設置消息映射這個可以用于收到當前 worker所屬的 shared message判斷是否在當前進程。
_messages[session_id] = {}_semaphores[session_id] = semaphore.new(0) 接收消息&;發(fā)送消息代碼和 官方例子類同不做過多說明,只說我改了什么。
在接收消息中管理了一個變量即 close_flag用于管理 send message輕線程的退出。
以下是一段偽代碼,含義的話請聯(lián)系上下文。
local session_id = sessionMgr:gen_session_id()
local send_semaphore = sessionMgr:get_semaphore(session_id)
local close_flag = false
local function _push_thread_function()
while close_flag == false do
local ok, err = send_semaphore:wait(300)
if ok then
local messages = socketMgr:getMessages(session_id)
while messages and #messages > 0 do
local message = messages[1]
table_remove(messages, 1)
--- your send message function handler
end
end
if close_flag then
socketMgr:destory(session_id)
break
end
end
end
local push_thread = ngx_thread_spawn(_push_thread_function)
while true do
local data, typ, err = wbsocket:recv_frame()
while err == "again" do
local cut_data cut_data, _, err = wbsocket:recv_frame()
data = data .. cut_data
end
if not data then
close_flag = true
send_semaphore:post(1)
break
elseif typ == 'close' then
close_flag = true
send_semaphore:post(1)
break
elseif typ == 'ping' then
local bytes, err = wbsocket:send_pong(data)
if not bytes then
close_flag = true
send_semaphore:post(1)
break
end
elseif typ == 'pong' then
elseif typ == 'text' then
-- your receive function handler
elseif typ == 'continuation' then
elseif typ == 'binary' then
end
end
ngx_thread_wait(push_thread)
wbsocket:send_close() 消息推送現(xiàn)在說說為什么不用 shared的有序列表來存儲消息,我是使用了 shared的 set方法中的 flag屬性來存放 session id。
這樣在獲得一個消息的時候,能很方便的知道消息是發(fā)給哪個 session id的。
繼續(xù)一段偽代碼。
local ngx_shared = ngx.shared
local _shared_dicts = {ngx_shared.message_1,ngx_shared.message_2,ngx_shared.message_n,}
local current_shared = _shared_dicts[ngx_worker_id + 1]
local current_message_id = 1
--- 如果在當前進程
if _messages[session_id] then
table.insert(_messages[session_id], "message data")
_semaphores[session_id]:post(1)
--- 會進入到,上述 _push_thread_function 方法中,進行發(fā)送邏輯
else
local shared_id = session_id % 100000
local message_shared = _shared_dicts[shared_id]
local message_id = message_shared:incr("id", 1, 0)
message_shared:set("message." .. message_id, "message data", 60, session_id)
end
其他https://github.com/chenxiaofa/p
借鑒了這位同學的設計思路,實現(xiàn)了額外邏輯。如:
加入、退出、銷毀組 各 worker之間的 cmd內部命令執(zhí)行 熱更新的特殊處理 等