过滤器、源和接收器 |
|
作者:DiegoNehab
某些操作可以以过滤器的形式实现。过滤器是一个函数,它处理在连续函数调用中接收到的数据,并逐块返回部分结果。可以作为过滤器实现的操作示例包括文本的换行符规范化、Base64 和 Quoted-Printable 传输内容编码、将文本分解为行、SMTP 字节填充,以及许多其他操作。当我们允许将过滤器链接在一起以创建复合过滤器时,过滤器变得更加强大。过滤器可以被视为数据转换链中的中间节点。源和接收器是这些链的相应端点。源是一个函数,它逐块生成数据,而接收器是一个函数,它逐块接收数据。在本技术说明中,我们定义了一个用于过滤器、源、接收器和链接的优雅接口。我们逐步演化我们的接口,直到我们达到高度的一般性。我们讨论了在实现此接口期间出现的困难,并提供了解决方案和示例。
应用程序有时拥有太多信息无法处理以适合内存,因此被迫以更小的部分处理数据。即使有足够的内存,原子地处理所有数据也可能需要足够长的时间来使想要与应用程序交互的用户感到沮丧。此外,复杂的转换通常可以定义为一系列更简单的操作。几个不同的复杂转换可能共享相同的简单操作,因此需要一个统一的接口来组合它们。以下概念构成了我们对这些问题的解决方案。
过滤器是接受连续输入块并生成连续输出块的函数。此外,连接所有输出数据的結果与将过滤器应用于输入数据的连接的結果相同。因此,边界无关紧要:过滤器必须处理用户任意分割的输入数据。
链 是一个函数,它组合了两个(或多个)其他函数的效果,但其接口与其中一个组件的接口无法区分。因此,链式过滤器可以在任何可以使用原子过滤器的场合使用。但是,它对数据的影響是其组件过滤器的组合效果。需要注意的是,因此,链可以相互链接,以创建任意复杂的运算,这些运算可以像原子运算一样使用。
过滤器可以看作是网络中的内部节点,数据流经这些节点,可能在途中发生转换。链将这些节点连接在一起。为了完整地描述,我们需要源 和接收器 作为网络的初始节点和最终节点。更直观地说,源是一个函数,每次调用时都会产生新的数据。另一方面,接收器是将接收到的数据发送到最终目的地的函数。自然地,源和接收器可以与过滤器链接。
最后,过滤器、链、源和接收器都是被动实体:它们需要被反复调用才能发生任何事情。泵 提供了推动数据通过网络的驱动力,从源到接收器。
希望这些概念通过示例变得清晰。在接下来的部分中,我们从简化的接口开始,并多次改进它,直到我们找不到明显的缺陷。我们展示的演变并非人为编造:它遵循了我们在巩固对这些概念的理解时所遵循的步骤。
一些数据转换更容易实现为过滤器,而不是其他方式。可以实现为过滤器的操作示例包括文本的换行符规范化、Base64 和 Quoted-Printable 传输内容编码、文本的换行、SMTP 字节填充等等。让我们以换行符规范化为示例来定义我们最初的过滤器接口。我们稍后讨论为什么实现可能并不简单。
假设我们得到一段文本,其换行符约定未知(包括可能混合了常见的 Unix(LF)、Mac OS(CR)和 DOS(CRLF)约定)。我们希望能够编写以下代码
input = source.chain(source.file(io.stdin), normalize("\r\n")) output = sink.file(io.stdout) pump(input, output)
该程序应该从标准输入流读取数据,并将换行符规范化为 MIME 标准定义的规范 CRLF 标记,最后将结果发送到标准输出流。为此,我们使用文件源 从标准输入生成数据,并将其与规范化数据的过滤器链接。然后,泵反复从源获取数据,并将其移动到文件接收器,该接收器将其发送到标准输出。
为了使讨论更加具体,我们首先讨论归一化过滤器的实现。normalize
*工厂* 是一个创建此类过滤器的函数。我们最初的过滤器接口如下:过滤器接收一段输入数据,并返回一段处理后的数据。当没有更多输入数据时,用户通过调用过滤器并传入一个 nil
块来通知过滤器。然后过滤器返回最后一段处理后的数据。
虽然接口极其简单,但实现起来并不那么明显。任何符合此接口的过滤器都需要在调用之间保留某种上下文。这是因为块可能会在标记行尾的 CR 和 LF 字符之间被分割。这种对上下文存储的需求正是使用工厂的动机:每次调用工厂时,它都会返回一个带有自己上下文的过滤器,这样我们就可以同时使用多个独立的过滤器。对于归一化过滤器,我们知道显而易见的解决方案(即在生成任何输出之前将所有输入连接到上下文中)并不够好,所以我们必须找到另一种方法。
我们将把实现分为两部分:一个低级过滤器和一个高级过滤器工厂。低级过滤器将用 C 语言实现,并且不会在函数调用之间保留任何上下文。高级过滤器工厂用 Lua 语言实现,将创建并返回一个高级过滤器,该过滤器保留低级过滤器所需的任何上下文,但将用户与它的内部细节隔离开来。这样,我们就可以利用 C 的效率来完成繁重的工作,并利用 Lua 的简单性来进行簿记。
以下是高级行尾归一化过滤器工厂的实现
function filter.cycle(low, ctx, extra) return function(chunk) local ret ret, ctx = low(ctx, chunk, extra) return ret end end function normalize(marker) return cycle(eol, 0, marker) end
normalize
工厂简单地调用了一个更通用的工厂,即 cycle
工厂。该工厂接收一个低级过滤器、一个初始上下文和一些额外的值,并返回相应的 高级过滤器。每次高级过滤器使用新的块调用时,它都会调用低级过滤器,并将之前的上下文、新的块和额外的参数传递给它。低级过滤器会生成处理后的数据块和一个新的上下文。最后,高级过滤器更新其内部上下文,并将处理后的数据块返回给用户。所有工作都是由低级过滤器完成的。请注意,此实现利用了 Lua 5.0 的词法作用域规则,在函数调用之间本地存储上下文。
转向低级过滤器,我们注意到对于换行符规范化问题本身,没有完美的解决方案。困难来自混合输入中空行的定义存在固有的歧义。但是,以下解决方案对于任何一致的输入以及混合输入中的非空行都非常有效。它也对空行做了合理的工作,并作为如何实现低级过滤器的良好示例。
以下是我们的做法:CR 和 LF 被视为换行符的候选者。如果看到其中一个候选者单独出现,或者后面跟着一个不同的候选者,则我们发出一个换行符标记。也就是说,CR CR 和 LF LF 每个发出两个换行符标记,但 CR LF 和 LF CR 只发出一个标记。这个想法照顾了 Mac OS、Mac OS X、VMS 和 Unix、DOS 和 MIME,以及可能的其他更模糊的约定。
低级过滤器被分成两个简单的函数。内部函数实际上执行转换。它依次接收每个输入字符,决定输出什么以及如何修改上下文。上下文告诉上次看到的字符是否是候选者,如果是,它是哪个候选者。
#define candidate(c) (c == CR || c == LF) static int process(int c, int last, const char *marker, luaL_Buffer *buffer) { if (candidate(c)) { if (candidate(last)) { if (c == last) luaL_addstring(buffer, marker); return 0; } else { luaL_addstring(buffer, marker); return c; } } else { luaL_putchar(buffer, c); return 0; } }
内部函数利用 Lua 的辅助库的缓冲区接口来提高效率和易用性。外部函数只是与 Lua 交互。它接收上下文和输入块(以及可选的换行符标记),并返回转换后的输出和新的上下文。
static int eol(lua_State *L) { int ctx = luaL_checkint(L, 1); size_t isize = 0; const char *input = luaL_optlstring(L, 2, NULL, &isize); const char *last = input + isize; const char *marker = luaL_optstring(L, 3, CRLF); luaL_Buffer buffer; luaL_buffinit(L, &buffer); if (!input) { lua_pushnil(L); lua_pushnumber(L, 0); return 2; } while (input < last) ctx = process(*input++, ctx, marker, &buffer); luaL_pushresult(&buffer); lua_pushnumber(L, ctx); return 2; }
请注意,如果输入块是 nil
,则操作被认为已完成。在这种情况下,循环不会执行一次,上下文将重置为初始状态。这允许过滤器无限期地重复使用。如果可能,编写这样的过滤器是一个好主意。
除了上面显示的换行符规范化过滤器之外,还可以使用相同的想法实现许多其他过滤器。例如,Base64 和 Quoted-Printable 传输内容编码、将文本分解成行、SMTP 字节填充等。具有挑战性的部分是决定上下文是什么。例如,对于换行,它可能是当前行中剩余的字节数。对于 Base64 编码,它可能是输入在 3 字节原子中划分后剩余的字节。
当引入链式概念时,过滤器变得更加强大。假设您有一个用于 Quoted-Printable 编码的过滤器,并且您想对某些文本进行编码。根据标准,文本必须在编码之前规范化为其规范形式。一个简化此任务的良好接口是一个工厂,它创建一个复合过滤器,该过滤器将数据通过多个过滤器传递,但可以在任何使用原始过滤器的地方使用。
local function chain2(f1, f2) return function(chunk) local ret = f2(f1(chunk)) if chunk then return ret else return ret .. f2() end end end function filter.chain(...) local f = arg[1] for i = 2, table.getn(arg) do f = chain2(f, arg[i]) end return f end local chain = filter.chain(normalize("\r\n"), encode("quoted-printable")) while 1 do local chunk = io.read(2048) io.write(chain(chunk)) if not chunk then break end end
链式工厂非常简单。它所做的只是返回一个函数,该函数将数据通过所有过滤器传递并将结果返回给用户。它使用更简单的辅助函数,该函数知道如何将两个过滤器链接在一起。在辅助函数中,如果块是最终块,则必须特别注意。这是因为最终块通知必须依次通过两个过滤器。由于链式工厂,执行 Quoted-Printable 转换很容易,如上面的示例所示。
正如我们在引言中提到的,我们迄今为止介绍的过滤器充当转换网络中的内部节点。信息从节点到节点(或者更确切地说,从一个过滤器到下一个过滤器)流动,并在其输出时进行转换。将过滤器链接在一起是我们找到连接网络中节点的方法。但是端节点呢?在网络的开头,我们需要一个提供数据的节点,即源。在网络的末尾,我们需要一个接收数据的节点,即接收器。
我们从两个简单的源开始。第一个是 `empty` 源:它只是不返回任何数据,可能返回错误消息。第二个是 `file` 源,它以分块方式生成文件的內容,并在完成时关闭文件句柄。
function source.empty(err) return function() return nil, err end end function source.file(handle, io_err) if handle then return function() local chunk = handle:read(2048) if not chunk then handle:close() end return chunk end else return source.empty(io_err or "unable to open file") end end
源在每次被调用时返回下一块数据。当没有更多数据时,它只返回 `nil`。如果出现错误,源可以通过返回 `nil` 后跟错误消息来通知调用者。Adrian Sietsma 注意到,虽然并非有意为之,但源的接口与 Lua 5.0 中迭代器的概念兼容。也就是说,数据源可以很好地与 `for` 循环一起使用。使用我们的文件源作为迭代器,我们可以重写第一个示例
local process = normalize("\r\n") for chunk in source.file(io.stdin) do io.write(process(chunk)) end io.write(process(nil))
请注意,最后一次对过滤器的调用获取了处理后的数据的最后一部分。当源返回nil
时,循环终止,因此我们需要在循环之外进行最后一次调用。
通常情况下,源需要在某些事件发生后改变其行为。一个简单的例子是文件源,它希望确保无论调用多少次,在文件结束之后都返回nil
,避免尝试读取文件末尾之后的内容。另一个例子是返回多个文件内容的源,就像它们被连接在一起一样,从一个文件移动到下一个文件,直到最后一个文件的末尾被读取。
实现这种源的一种方法是让工厂声明额外的状态变量,源可以通过词法作用域使用这些变量。我们的文件源可以在检测到文件结束时将文件句柄本身设置为nil
。然后,每次调用源时,它都可以检查句柄是否仍然有效并相应地采取行动。
function source.file(handle, io_err) if handle then return function() if not handle then return nil end local chunk = handle:read(2048) if not chunk then handle:close() handle = nil end return chunk end else return source.empty(io_err or "unable to open file") end end
实现这种行为的另一种方法是改变源接口,使其更加灵活。让我们允许源除了返回下一部分数据之外,还可以返回第二个值。如果返回的块是nil
,则额外的返回值告诉我们发生了什么。第二个nil
意味着没有更多数据,源是空的。任何其他值都被认为是错误消息。另一方面,如果块不是nil
,则第二个返回值告诉我们源是否希望被替换。如果是nil
,我们应该继续使用相同的源。否则,它必须是另一个源,我们必须从那时起使用它来获取剩余的数据。
这种额外的自由度对编写源函数的人来说是件好事,但对于那些必须使用它的人来说却很痛苦。幸运的是,给定一个这样的花哨源,我们可以使用以下工厂将其转换为一个永远不需要被替换的简单源。
function source.simplify(src) return function() local chunk, err_or_new = src() src = err_or_new or src if not chunk then return nil, err_or_new else return chunk end end end
简化工厂允许我们编写花哨的源并像使用简单的源一样使用它们。因此,我们接下来的函数只会生成简单的源,而接受源的函数将假设它们是简单的。
回到我们的文件源,扩展的接口允许更优雅的实现。新的源只要没有更多数据,就要求被一个空的源替换。没有重复检查句柄。为了使事情对用户更简单,工厂本身在将花哨的文件源返回给用户之前对其进行了简化。
function source.file(handle, io_err) if handle then return source.simplify(function() local chunk = handle:read(2048) if not chunk then handle:close() return "", source.empty() end return chunk end) else return source.empty(io_err or "unable to open file") end end
如果我们使用 Lua 5.0 的一项新功能:协程,我们可以使这些想法更强大。协程缺乏宣传,我将在这里发挥我的作用。就像词法作用域一样,协程一开始尝起来很奇怪,但一旦你习惯了这个概念,它就能救你一命。我不得不承认,使用协程来实现我们的文件源会有点过分,所以让我们实现一个串联的源工厂。
function source.cat(...) local co = coroutine.create(function() local i = 1 while i <= table.getn(arg) do local chunk, err = arg[i]() if chunk then coroutine.yield(chunk) elseif err then return nil, err else i = i + 1 end end end) return function() return shift(coroutine.resume(co)) end end
工厂创建两个函数。第一个是辅助函数,它以协程的形式完成所有工作。它从一个源中读取一个块。如果块是 nil
,它会移动到下一个源,否则它只返回块。当它恢复时,它会从停止的地方继续,并尝试读取下一个块。第二个函数是源本身,它只是恢复辅助协程的执行,将它返回的任何块返回给用户(跳过第一个告诉我们协程是否终止的结果)。想象一下在没有协程的情况下编写相同的函数,你会注意到这种实现的简单性。当我们使过滤器接口更强大时,我们将再次使用协程。
将源与过滤器链接意味着什么?最实用的解释是,组合的源过滤器是一个新的源,它生成数据并通过过滤器传递数据,然后再返回。这里有一个工厂可以做到这一点
function source.chain(src, f) return source.simplify(function() local chunk, err = src() if not chunk then return f(nil), source.empty(err) else return f(chunk) end end) end
我们介绍中的激励示例将源与过滤器链接在一起。将源与过滤器链接的想法在考虑可能从源获取输入数据的函数时很有用。通过将一个简单的源与一个或多个过滤器链接起来,即使同一个函数不知道其背后发生的过滤,也可以提供过滤后的数据。
正如我们为初始数据源定义了一个接口一样,我们也可以为最终数据目的地定义一个接口。我们称任何符合该接口的函数为接收器。以下是两个返回接收器的简单工厂。表工厂创建一个接收器,将所有获得的数据存储到一个表中。数据随后可以使用 table.concat
库函数有效地连接成一个字符串。作为另一个示例,我们介绍了 null
接收器:一个简单地丢弃它接收数据的接收器。
function sink.table(t) t = t or {} local f = function(chunk, err) if chunk then table.insert(t, chunk) end return 1 end return f, t end local function null() return 1 end function sink.null() return null end
接收器接收连续的数据块,直到用 nil
块通知数据结束。错误通过 nil
块后的一个额外的参数通知错误消息。如果接收器本身检测到错误并且希望不再被调用,它应该返回 nil
,可以选择在后面加上错误消息。非 nil
的返回值意味着源将接受更多数据。最后,就像源可以选择被替换一样,接收器也可以选择被替换,遵循相同的接口。再次,很容易实现一个 sink.simplify
工厂,它将一个花哨的接收器转换为一个简单的接收器。
例如,让我们创建一个从标准输入读取数据的源,然后将其与一个规范化行尾约定过滤器的过滤器链接起来,并使用一个接收器将所有数据放入一个表中,最后打印结果。
local load = source.chain(source.file(io.stdin), normalize("\r\n")) local store, t = sink.table() while 1 do local chunk = load() store(chunk) if not chunk then break end end print(table.concat(t))
同样,正如我们创建了一个工厂,它从源和过滤器生成一个链接的源过滤器一样,也很容易创建一个工厂,它根据接收器和过滤器生成一个新的接收器。新的接收器将它接收的所有数据通过过滤器传递,然后再传递给原始接收器。以下是实现
function sink.chain(f, snk) return function(chunk, err) local r, e = snk(f(chunk)) if not r then return nil, e end if not chunk then return snk(nil, err) end return 1 end end
在我们的示例中,有一个循环运行了太长时间。它一直都在,因为我们目前设计的所有东西都是被动的。源、接收器、过滤器:它们都不会自己做任何事情。将源可以提供的所有数据泵入接收器的操作非常常见,因此我们将提供几个辅助函数来为我们完成此操作。
function pump.step(src, snk) local chunk, src_err = src() local ret, snk_err = snk(chunk, src_err) return chunk and ret and not src_err and not snk_err, src_err or snk_err end function pump.all(src, snk, step) step = step or pump.step while true do local ret, err = step(src, snk) if not ret then return not err, err end end end
pump.step
函数将一块数据从源移动到接收器。pump.all
函数接受一个可选的 step
函数,并使用它将所有数据从源泵入接收器。现在,我们可以使用我们拥有的所有东西来编写一个程序,该程序从磁盘读取一个二进制文件,并在将其编码为 Base64 传输内容编码后将其存储在另一个文件中。
local load = source.chain( source.file(io.open("input.bin", "rb")), encode("base64") ) local store = sink.chain( wrap(76), sink.file(io.open("output.b64", "w")), ) pump.all(load, store)
我们在这里拆分过滤器的这种方式并非直观,这是故意的。或者,我们可以将 Base64 编码过滤器和换行过滤器链接在一起,然后将生成的过滤器与文件源或文件接收器链接在一起。这并不重要。
事实证明,我们仍然存在一个问题。当 David Burgess 编写他的 gzip 过滤器时,他注意到解压缩过滤器可以将一小块输入数据扩展为大量数据。虽然我们希望可以忽略这个问题,但我们很快达成一致,我们不能这样做。唯一的解决方案是允许过滤器返回部分结果,这就是我们选择做的事情。在调用过滤器传递输入数据后,用户现在必须循环调用过滤器以找出它是否还有更多输出数据要返回。请注意,这些额外的调用不能向过滤器传递更多数据。
更具体地说,在将一段输入数据传递给过滤器并收集第一段输出数据后,用户会反复调用过滤器,传递空字符串,以获取额外的输出数据块。当过滤器本身返回空字符串时,用户就知道没有更多输出数据,可以继续传递下一个输入数据块。最后,在用户传递一个nil
通知过滤器没有更多输入数据后,过滤器可能仍然产生了太多输出数据,无法在一个数据块中返回。用户必须再次循环,这次每次传递nil
,直到过滤器本身返回nil
,通知用户它终于完成了。
大多数过滤器不需要这种额外的自由。幸运的是,新的过滤器接口很容易实现。事实上,我们在引言中创建的换行符翻译过滤器已经符合它。另一方面,链式函数变得更加复杂。如果不是协程,我不会乐意实现它。如果您能找到一个不使用协程的更简单的实现,请告诉我!
local function chain2(f1, f2) local co = coroutine.create(function(chunk) while true do local filtered1 = f1(chunk) local filtered2 = f2(filtered1) local done2 = filtered1 and "" while true do if filtered2 == "" or filtered2 == nil then break end coroutine.yield(filtered2) filtered2 = f2(done2) end if filtered1 == "" then chunk = coroutine.yield(filtered1) elseif filtered1 == nil then return nil else chunk = chunk and "" end end end) return function(chunk) local _, res = coroutine.resume(co, chunk) return res end end
链式源也变得更加复杂,但可以使用协程实现类似的解决方案。链式接收器与以往一样简单。有趣的是,这些修改对不需要额外灵活性的过滤器的性能没有明显负面影响。但是,它们确实极大地提高了像 gzip 过滤器这样的过滤器的效率,这就是我们保留它们的原因。
这些想法是在开发LuaSocket
[1] 2.0 期间产生的,并作为 LTN12 模块提供。因此,LuaSocket
[1] 的实现得到了极大的简化,并且变得更加强大。MIME 模块特别集成到 LTN12 中,并提供了许多其他过滤器。我们认为这些概念值得公开,即使是那些不关心LuaSocket
[1] 的人,因此有了 LTN。
另一个值得一提的额外应用是使用身份过滤器。假设您想在将文件下载到接收器时向用户提供一些反馈。将接收器与身份过滤器(一个简单地返回未修改的接收数据的过滤器)链接起来,您可以动态更新进度计数器。原始接收器不需要修改。另一个有趣的想法是 T 接收器:一个将数据发送到另外两个接收器的接收器。总之,似乎还有很多其他有趣的想法的空间。
在本技术说明中,我们介绍了过滤器、源、接收器和泵。这些是用于一般数据处理的有用工具。源为数据获取提供了一个简单的抽象。接收器为最终数据目的地提供了一个抽象。过滤器定义了数据转换的接口。过滤器、源和接收器的链式连接提供了一种优雅的方式,可以从更简单的转换中创建任意复杂的数据转换。泵只是让机器运转起来。
在使用 Diego 的上述实用程序时,我发现以下关于各种函数原型的简洁总结很有用。
function source() -- we have data return chunk -- we have an error return nil, err -- no more data return nil end
function(chunk, src_err) if chunk == nil then -- no more data to process, we won't receive more chunks if src_err then -- source reports an error, TBD what to do with chunk received up to now else -- do something with concatenation of chunks, all went well end return true -- or anything that evaluates to true elseif chunk == "" then -- this is assumed to be without effect on the sink, but may -- not be if something different than raw text is processed -- do nothing and return true to keep filters happy return true -- or anything that evaluates to true else -- chunk has data, process/store it as appropriate return true -- or anything that evaluates to true end -- in case of error return nil, err end
ret, err = pump.step(source, sink) if ret == 1 then -- all ok, continue pumping elseif err then -- an error occured in the sink or source. If in both, the sink -- error is lost. else -- ret == nil and err == nil -- done, nothing left to pump end ret, err = pump.all(source, sink) if ret == 1 then -- all OK, done elseif err then -- an error occured else -- impossible end
function filter(chunk) -- first two cases are to maintain chaining logic that -- support expanding filters (see below) if chunk == nil then return nil elseif chunk == "" then return "" else -- process chunk and return filtered data return data end end
高级源的概念是使源能够指示哪个其他源从现在开始包含数据。
function source() -- we have data return chunk -- we have an error return nil, err -- no more data return nil, nil -- no more data in current source, but use sourceB from now on -- ("" could be real data, but should not be nil or false) return "", sourceB end
simple = source.simplify(fancy)
高级接收器的概念是使接收器能够指示哪个其他接收器从现在开始处理数据。
function(chunk, src_err) -- same as above (simple sink), except next sinkK sink -- to use indicated after true return true, sinkK end
将高级接收器转换为简单接收器
simple = sink.simplify(fancy)
function filter(chunk) if chunk == nil then -- end of data. If some expanded data is still to be returned return partial_data -- the chains will keep on calling the filter to get all of -- the partial data until the filter return nil return nil elseif chunk == "" then -- a previous filter may have finished returning expanded -- data, now it's our turn to expand the data and return it return partial_data -- the chains will keep on calling the filter to get all of -- the partial data until the filter return "" return "" else -- process chunk and return filtered data, potentially partial. -- In all cases, the filter is called again with "" (see above) return partial_data end end