简介

OpenResty提供了共享内存的机制,可以用它进行Worker进程之间的通讯、传递数据等。

我们使用Kong封装好的模块:lua-resty-worker-events

此模块提供了一种向Nginx服务器中的其他工作进程发送事件的方法。通信是通过一个共享的存储区进行的,事件数据将存储在该存储区中。

所有工人的事件顺序都是相同的。

工作进程将设置一个计时器来检查后台事件。模块遵循单例模式,因此每个worker运行一次。但是,如果保持最新状态很重要,那么可以将间隔设置为较低的频率,并且在收到每个请求时进行轮询的调用可以确保尽快处理所有事情。

示例与封装

以下是官方给出的使用示例

http {
    lua_package_path "/path/to/lua-resty-worker-events/lib/?.lua;;";

    # the size depends on the number of event to handle:
    lua_shared_dict process_events 1m;

    init_worker_by_lua_block {
        local ev = require "resty.worker.events"

        local handler = function(data, event, source, pid)
            print("received event; source=",source,
                  ", event=",event,
                  ", data=", tostring(data),
                  ", from process ",pid)
        end

        ev.register(handler)

        local ok, err = ev.configure {
            shm = "process_events", -- defined by "lua_shared_dict"
            timeout = 2,            -- life time of unique event data in shm
            interval = 1,           -- poll interval (seconds)

            wait_interval = 0.010,  -- wait before retry fetching event data
            wait_max = 0.5,         -- max wait time before discarding event
            shm_retries = 999,      -- retries for shm fragmentation (no memory)
        }
        if not ok then
            ngx.log(ngx.ERR, "failed to start event system: ", err)
            return
        end
    }

    server {
        ...

        # example for polling:
        location = /some/path {

            default_type text/plain;
            content_by_lua_block {
                -- manually call `poll` to stay up to date, can be used instead,
                -- or together with the timer interval. Polling is efficient,
                -- so if staying up-to-date is important, this is preferred.
                require("resty.worker.events").poll()

                -- do regular stuff here

            }
        }
    }
}

在我们项目使用的过程中,一般会分为两部分。

首先我们会初始化worker events,也就是对事件触发的一些参数进行配置。

function init_worker_events()
    local worker_events = require "resty.worker.events"
    
    local ok, err =
        worker_events.configure {
        shm = "cpia_worker_events", -- defined by "lua_shared_dict"
        timeout = 5, -- life time of event data in shm
        interval = 0.5, -- poll interval (seconds)
        wait_interval = 0.010, -- wait before retry fetching event data
        wait_max = 0.5 -- max wait time before discarding event
    }
    if not ok then
        return nil, err
    end

    return worker_events
end

然后会对它进行简单的封装,根据开发者的使用习惯,使用更通用的命名模式。

function process_event(worker_events)
    local m = {}

    m.worker_events = worker_events

    m.publish = function(source, event, data)
        worker_events.post(source, event, data)
    end

    m.subscribe = function(callback, source, ...)
        worker_events.register(callback, source, ...)
    end

    m.unsubscribe = function(callback, source, ...)
        worker_events.unregister(callback, source, ...)
    end

    return m
end

使用方式

以上我们对其进行了封装,介绍一下如何使用。

  • 订阅事件
local callback = function(data, event, source, pid)
    --- process events
end
event.subscribe(callback, "load", "ssl")

这个方法订阅了load模块下的ssl事件,callback传递的参数详细参考文档。

  • 推送事件
local data = {version = "0.1"}
event.publish("load", "ssl",  data)

这个方法把数据data推送给load模块的ssl事件。

  • 取消订阅事件
event.unsubscribe(callback, "load", "ssl")

这个方法会取消订阅load模块的ssl事件,就是这个取消事件,居然还要带上callback方法,实属让人不解。