协程作为事件处理程序

lua-users home
wiki

这是一个非常基本的 Supervisor(或事件调度器),类似于 Copas 和 ProgrammingInLua? 中的 Supervisor(但没有基于任何一个)。它是在回应邮件列表中的一个问题而写成的,我尝试对其进行了完整的注释;在最后有一个使用它的例子。它试图回答的问题是

那么,如何做类似的事情呢?...

result = block(self, event)
...其中 block() 是一个 SWIG 包装函数,它接受脚本上下文(即状态)和事件作为参数,并阻塞(yield)脚本,直到事件发生,然后将详细信息复制到结果结构(resume)中?

答案是,为什么不在 Lua 中做呢?下面的脚手架可以每秒处理数十万个事件调度,因此在 C 中编写它不太可能有什么帮助;CPU 时间将主要由实际处理请求的成本决定。

local cocreate, resume, yield, costatus =
  coroutine.create, coroutine.resume, coroutine.yield, coroutine.status

local LOG = function(...)
  io.stderr:write(os.date"!%F %T ")
  io.stderr:write(...)
  io.stderr:write"\n"
end

-- This function expects to be a coroutine. When it is started,
-- it initializes itself, and then awaits further instructions.
-- Whenever it has nothing else to do it yields, unless it was
-- told to quit in which case it returns.

function supervisor(...)
  -- each event is a table of sets of coroutines;
  -- we assume we can handle the events in any order.
  local events = {}
  -- This table associates coroutines with names, mostly
  -- for debugging
  local name = {}
  -- to signal that we're done
  local done = false

  -- This function removes all references to a coro
  -- from all events, which we do when the coro dies.
  -- We could use weak tables to do the same thing.
  -- On the other hand, we might want to clean up other
  -- data associated with the coroutine. Anyway, this is easy:

  local function remove(coro)
    for _, coros in pairs(events) do
      coros[coro] = nil
    end
    name[coro] = nil
  end

  -- Convenience function to log tracebacks
  local function log_traceback(coro, msg, err)
    LOG("Coroutine ", name[coro], " ", msg, ":")
    local tb = debug.traceback(coro, err)
    for line in tb:gmatch"[^\n]+" do LOG("  ", line) end
  end

  -- The core routine, which handles the results of a
  -- coroutine.resume. First argument is the coro, rest
  -- are the return values from coro.resume(). The only
  -- thing we're expecting from the coro is a "waitfor"
  -- command, but other ones could be added...
  local function handle_resume(coro, ok, todo, ...)
    if costatus(coro) == "dead" then
      remove(coro) -- always get rid of it.
      -- log a traceback on error
      if not ok then log_traceback(coro, "failed with error", todo) end
    elseif todo ~= "waitfor" then
      -- todo should be "waitfor" and ... the event.
      -- If we had more verbs, we could use a case-table.
      log_traceback(coro, "unknown request "..tostring(todo),
                          "bad return value")
      remove(coro)
    else
      -- We don't care if the event doesn't exist (should we?)
      local q = events[...]
      if q == nil then q = {}; events[...] = q end
      -- We might want to tell the upper level that a new
      -- event needs to be recognized, though.
      -- if next(q) == nil then addevent((...)) end
      q[coro] = true
    end
  end 
      
  -- A table of actions; essentially a case statement
  local handler = {}

  -- do a clean exit (although actually there's no
  -- cleanup necessary, but there might be.)
  function handler.done()
    done = true
  end

  -- debugging: report on status
  function handler.status()
    -- invert the events table for nicer printing
    local n, e = {}, {}
    for evt, coros in pairs(events) do
      for coro in pairs(coros) do
        local who = name[coro]
        n[#n+1] = who
        e[who] = evt
      end
    end
    -- sort the names
    table.sort(n)
    -- and produce the report
    for _, who in ipairs(n) do
      LOG(who, " is waiting for ", tostring(e[who]))
    end
  end

  -- introduce a new actor (coroutine) into the system, and run
  -- it until it blocks
  function handler.introduce(who, func, ...)
    local coro = cocreate(func)
    name[coro] = who
    -- let it initialize itself
    return handle_resume(coro, resume(coro, ...))
  end

  -- send an event to whoever cares
  function handler.signal(what, ...)
    local q = events[what]
    if q and next(q) then
      for coro in pairs(q) do
        q[coro] = nil  -- handled
        handle_resume(coro, resume(coro, what, ...))
      end
      -- Maybe tell the top-level whether the event is
      -- still active?
      return next(q) ~= nil
    else
      -- No-one cares, sniff. "Log" the fact
      LOG("Event ", tostring(what), " dropped into the bit bucket")
    end
  end

  -- Set the __index meta for the handler table to avoid having
  -- to test for bad commands explicitly
  local function logargs(...)
    local t = {n = select("#", ...), ...}
    if t.n > 0 then
      for i = 1, t.n do
        local ti = t[i]
        t[i] = (type(ti) == "string" and "%q" or "%s")
               :format(tostring(ti))
      end
      LOG("..with arguments: ", table.concat(t, ", "))
    end
  end
  function handler:__index(what)
    LOG("Supervisor received unknown message ", what)
    return logargs
  end
  setmetatable(handler, handler)

  -- auxiliary function to handle a command, necessary to
  -- capture multiple returns from yield()
  local function handle(simonsays, ...)
    return handler[simonsays](...)
  end

  -- The main loop is a bit anti-climactic
  LOG"Starting up"
  local rv = handle(...)
  repeat 
    rv = handle(yield(rv))
  until done
  LOG"Shutting down"
end

------------------------------------------------------------------------------
--  Sample very simple processor
------------------------------------------------------------------------------

local function block(event) return yield("waitfor", event) end
local function process(n)
  for i = 1, n do
    local e, howmany = block"TICK"
    assert(e == "TICK" and howmany == i)
  end
end

------------------------------------------------------------------------------
-- Test driver in Lua
------------------------------------------------------------------------------

local super = cocreate(supervisor)

local function check(ok, rv)
  if not ok then
    LOG("supervisor crashed")
    local tb = debug.traceback(super, rv)
    for line in tb:gmatch"[^\n]+" do LOG("  ", line) end
    os.exit(1)
  end
  return rv
end

local function send(msg, ...) return check(resume(super, msg, ...)) end

local nprocess, nreps = tonumber(arg[1]) or 10, tonumber(arg[2] or 12)
for i = 1, tonumber(nprocess) do
  send("introduce", ("p%04i"):format(i), process, nreps)
end
local j = 1
while send("signal", "TICK", j) do
  j = j + 1
end
-- This should be empty
send"status"
send"done"
LOG("Endcount ", j)

定义 Supervisor[1] ? --DavidManura

这个定义来自一篇关于 Erlang 的论文,有点循环,但它很好地总结了: “一种实现特定风格的 supervisor-worker 编程的行为,它允许动态构建监督树。”

换句话说,Supervisor 负责维护一群 worker,组织他们的工作量,向他们发送事件(如本例所示)并处理错误。

[Termite](pdf 文档,如果你喜欢括号的话可以阅读)中也有一些很好的例子。

最近更改 · 偏好设置
编辑 · 历史记录
最后编辑于 2007 年 3 月 2 日凌晨 5:23 GMT (差异)