简介
OpenResty提供了共享内存的机制,可以用它进行Worker进程之间的通讯、传递数据等。
我们使用Kong封装好的模块:lua-resty-worker-events。
此模块提供了一种向Nginx服务器中的其他工作进程发送事件的方法。通信是通过一个共享的存储区进行的,事件数据将存储在该存储区中。
所有工人的事件顺序都是相同的。
工作进程将设置一个计时器来检查后台事件。模块遵循单例模式,因此每个worker运行一次。但是,如果保持最新状态很重要,那么可以将间隔设置为较低的频率,并且在收到每个请求时进行轮询的调用可以确保尽快处理所有事情。
示例与封装
以下是官方给出的使用示例
http {
lua_package_path "/path/to/lua-resty-worker-events/lib/?.lua;;";
# the size depends on the number of event to handle:
lua_shared_dict process_events 1m;
init_worker_by_lua_block {
local ev = require "resty.worker.events"
local handler = function(data, event, source, pid)
print("received event; source=",source,
", event=",event,
", data=", tostring(data),
", from process ",pid)
end
ev.register(handler)
local ok, err = ev.configure {
shm = "process_events", -- defined by "lua_shared_dict"
timeout = 2, -- life time of unique event data in shm
interval = 1, -- poll interval (seconds)
wait_interval = 0.010, -- wait before retry fetching event data
wait_max = 0.5, -- max wait time before discarding event
shm_retries = 999, -- retries for shm fragmentation (no memory)
}
if not ok then
ngx.log(ngx.ERR, "failed to start event system: ", err)
return
end
}
server {
...
# example for polling:
location = /some/path {
default_type text/plain;
content_by_lua_block {
-- manually call `poll` to stay up to date, can be used instead,
-- or together with the timer interval. Polling is efficient,
-- so if staying up-to-date is important, this is preferred.
require("resty.worker.events").poll()
-- do regular stuff here
}
}
}
}
在我们项目使用的过程中,一般会分为两部分。
首先我们会初始化worker events,也就是对事件触发的一些参数进行配置。
function init_worker_events()
local worker_events = require "resty.worker.events"
local ok, err =
worker_events.configure {
shm = "cpia_worker_events", -- defined by "lua_shared_dict"
timeout = 5, -- life time of event data in shm
interval = 0.5, -- poll interval (seconds)
wait_interval = 0.010, -- wait before retry fetching event data
wait_max = 0.5 -- max wait time before discarding event
}
if not ok then
return nil, err
end
return worker_events
end
然后会对它进行简单的封装,根据开发者的使用习惯,使用更通用的命名模式。
function process_event(worker_events)
local m = {}
m.worker_events = worker_events
m.publish = function(source, event, data)
worker_events.post(source, event, data)
end
m.subscribe = function(callback, source, ...)
worker_events.register(callback, source, ...)
end
m.unsubscribe = function(callback, source, ...)
worker_events.unregister(callback, source, ...)
end
return m
end
使用方式
以上我们对其进行了封装,介绍一下如何使用。
- 订阅事件
local callback = function(data, event, source, pid)
--- process events
end
event.subscribe(callback, "load", "ssl")
这个方法订阅了load
模块下的ssl
事件,callback传递的参数详细参考文档。
- 推送事件
local data = {version = "0.1"}
event.publish("load", "ssl", data)
这个方法把数据data
推送给load
模块的ssl
事件。
- 取消订阅事件
event.unsubscribe(callback, "load", "ssl")
这个方法会取消订阅load
模块的ssl
事件,就是这个取消事件,居然还要带上callback方法,实属让人不解。