过滤器和管道重载 |
|
FiltersSourcesAndSinks 是 DiegoNehab 写的一篇非常有趣的文章,它与 PIL 的第 9.2 节 (https://lua.ac.cn/pil/9.2.html) 一起启发我尝试使用管道,并思考符号问题以及管道与我们在命令式编程中习惯使用的内容的不同之处。在这篇文章中,我将讨论管道与链式函数或方法调用有哪些共同点,以及它们的不同之处。然后,我将展示并不断改进一组用于在 Lua 中使用管道的函数。最后,我将使用一些元编程来获得更像管道的符号。
许多计算机应用都涉及将数据从一种表示形式转换为另一种表示形式,可能包括各种形式的混合和匹配其他数据。这种转换自然会变得越来越复杂。
复杂的 数据转换 - 就像任何其他算法一样 - 如果可以分解成更小的部分,每个部分都执行一个简单且定义明确的任务,则更容易理解。如果这些部分可以以可重用的方式构建,形成一组构建块,从中可以轻松地 *组装* 复杂的转换,则会产生额外的力量。
最简单的组合形式可能是按顺序应用多个转换,即使用每个转换的结果作为下一个转换的输入。根据我们工作的环境,符号和内部工作方式会有所不同。
如果您使用的是函数,您可能会写出类似以下的内容
result = transformation3( transformation2( transformation1( data ) ) )
这看起来不太直观,因为转换的编写顺序与应用顺序相反。(如果它们接受额外的参数,符号往往会变得更糟。)但是,符号不会改变正在发生的事情的本质,并且可以在不改变本质的情况下进行增强。如果您有函数组合机制,您可以编写类似以下内容来使转换按应用顺序排列
result = chain( transformation1, transformation2, transformation3 )( data )
(使用更多的高阶函数,您甚至可以将额外的参数与其转换一起塞进去。)
此外,如果您从编译器或某些预处理器获得帮助,这些编译器或预处理器将例如 | 运算符转换为使用左侧操作数作为参数调用其右侧操作数,那么您可以实现更直观的表示法,将您开始使用的数据放在链的开头,并可能写成类似
result = data | transformation1 | transformation2 | transformation3
并且,如果您愿意,在更多帮助下,您甚至可以放弃熟悉的赋值表示法,并将结果的接收者放在链的末尾,最终得到一个看起来非常像 UNIX 管道的表示法,显示数据如何通过一系列转换,例如
data > transformation1 > transformation2 > transformation3 > result
或类似的东西。但请注意,所有这些都不会改变幕后发生的事情:一系列简单的包装函数调用!
在面向对象编程中,包括 C# 扩展方法等机制,您可以免费使用我们上面看到的顺序表示法,使用链式方法调用
result = data.transformation1().transformation2().transformation3()
这种表示法中的括号也使我们能够清楚地看到,我们的数据“流过”的转换“管道”只是一系列函数调用。(顺便说一句,在 OO 中,可能的转换被捆绑为具有其输入数据类型的方法,这绝不是限制,而只是一个实现细节。)
谈到我们讨论的核心,UNIX 为我们带来了一个将一系列转换应用于数据的根本不同的概念:UNIX 管道。它们开箱即用,具有与我们上面创建的类似的直观管道表示法(但实际上当时没有管道!)。
data > transformation1 | transformation2 | transformation3 > result
我们很快就会看到管道与函数或方法调用链的根本区别是什么,但首先让我们注意到,所有三种机制,无论表示法如何,都反映了相同的原则:将一系列转换应用于某些数据。
也就是说,有一个方面从根本上将管道与函数或方法调用区分开来,正如我们上面所看到的,它既不是表示法,也不是从表示法中可见的:管道以块的形式处理数据。这适用于管道的各个阶段(称为过滤器),也适用于整个管道。在任何阶段,只有实际 *需要* 用于生成下一个输出的数据量才会保存在内存或其他临时存储中,并且不会在各个阶段之间组装和存储任何中间结果。
此属性构成了一个重要的区别。管道不仅能够处理无限量的数据,它们甚至可以处理无限的数据流,例如传感器测量值等,并在数据可用时立即产生结果。使用函数或方法调用无法做到这一点,因为既不能预先收集无限的数据序列来进行处理,也不能在数据流结束之前产生任何结果,这意味着永远不会产生任何结果。
这种差异导致了管道中变换连接方式的根本区别。函数或方法调用通过参数和返回值进行通信,而过滤器通过接收和发送数据块进行通信。这种接口的一个重要方面是输入和输出的本质解耦,这意味着并非所有输入都需要产生输出,并非所有输出都需要由输入引起。(我们可以说过滤器是扁平化操作的泛化及其反面,以及两者之间的混合。)
因此,管道的运作方式与函数调用的同步性质 fundamentally different,这使得不可能将管道实现为类似这样的简单读-变换-变换-变换-写操作循环
-- Constructs a pipe connecting the given functions. The pipe will read data -- by calling its first parameter, then transform it by invoking all given -- functions in sequence, and output the result by calling its second parameter. -- This will continue until input is exhausted. -- THIS IS NOT A REAL PIPE! function pipe( ... ) local filters = arg return function( input, output ) for data in input do local current = data for _, filter in ipairs( filters ) do current = filter( current ) end output( current ) end end end p = pipe( transformation1, transformation2, transformation3 ) p( io.lines, print )
这种实现仅适用于最简单的变换,其中每个输入将在任何阶段转换为恰好一个输出。这并非一般情况下过滤器所做的。(然而,这并不意味着管道能够执行使用函数调用无法实现的变换:如果所有要变换的数据都可以被收集并作为一个整体提供给变换,那么它也可以使用普通的函数调用进行处理 - 差异主要在于内存占用。不过,对于无限数据流来说,确实存在差异。另一方面,管道的各个阶段非常 well can 并且通常使用普通函数来完成它们的处理任务。这是因为它们可以自由地收集尽可能多的数据作为具体函数调用的输入。)
Diego 使用函数调用接口实现过滤器,并通过使用一种特殊协议来解决过滤器输入和输出的解耦性质与函数调用的同步性质之间的阻抗不匹配,该协议允许非输出和非输入“过滤器”调用。虽然这不仅使调度器复杂化,而且更重要的是使过滤器实现也复杂化,但它非常适合他使用其框架所针对的处理类型。
当过滤器作为独立进程实现时,每个进程都有自己的控制循环,通过调用系统函数来读取输入和写入输出,过滤器算法的实现将变得更加直接。这就是过滤器在 UNIX 中的实现方式,在 Lua 中,我们可以使用协程来实现相同的效果。所有调度和状态保持都留给运行时,因此过滤器编写者可以专注于他们的算法,使用自然且简单的符号。让我们看看如何在 Lua 中实现管道和过滤器。
在我们开始之前,先说一下我使用的命名约定,这样你 hopefully 不会对下面代码中的变量名感到困惑:以 "p" 为前缀表示函数参数,以 "l" 为前缀表示局部变量,以 "s" 为前缀表示静态(模块全局)变量,以 "g" 为前缀表示全局变量(这里没有使用)。函数名没有前缀(这绝对不意味着函数不能保存在一个强制有前缀的变量中)。你的偏好可能会有所不同,但我从这个命名约定中获得了一些重要的优势
error
或type
,但如果你将它们命名为lError
或pType
等等,就不会有任何问题。)
local lCondition = (pCondition == nil) or pCondition
我将在下面展示的代码是为 Lua 5.0 编写的。
现在让我们先定义一下大局,然后看看我们如何实现它,以及我们可以用它做什么。
首先,我们希望能够将过滤器编写为函数,这些函数接受两个参数:输入函数和输出函数。执行时,它们将从头到尾运行,将所有输入转换为输出。输入将通过连续调用输入函数获得,任何输出将通过调用输出函数写入。此外,我们希望能够在过滤器之间传递**任何类型**的**多个值**,而不仅仅是字符串。因此,过滤器实现将大致遵循以下模板之一
function my_filter( pInput, pOutput ) for ... in pInput do ... pOutput( ... ) ... end
或
function my_filter( pInput, pOutput ) while ... do ... ... = pInput() ... pOutput( ... ) ... end
过滤器目前不会返回任何值。
其次,我们希望能够将以这种方式实现的过滤器连接成链(管道),将一个过滤器的输出馈送到下一个过滤器的输入。第一个过滤器的输入和最后一个过滤器的输出将与管道的输入和输出相同,因此,管道本身将是一个过滤器,因此可以再次与其他过滤器或管道组合。我们想要编写的代码将类似于以下内容,其中 transformation1、transformation2、transformation3 是如上所示实现的过滤器
p = pipe( transformation1, transformation2, transformation3 ) p( io.lines, print )
此示例从标准输入读取所有行,通过按顺序将流通过三个过滤器进行管道传输,并将结果打印到控制台。
如您所见,与 Diego 的解决方案相比,我们不需要任何泵来实现这一点!这很有趣,它源于我们的过滤器与 Diego 的过滤器不同,它们是主动实体,将它们的输入自行泵送到输出。(顺便说一下,这就是我们需要协程来使多个过滤器协同工作的原因。)
正如我们所说,我们希望能够在过滤器之间传递任何类型的多个值。此外,还应允许使用 `nil`,为此,我们定义当输入/输出握手传输的**第一个**值为 `nil` 时,将到达数据流的末尾。
为了在 Lua 5.0 中实现这一点,我们需要一个简单的实用函数来执行与 Lua `unpack()` 相反的操作,因为当存在 `nil` 时,简单地将调用包装在括号中以收集其返回值将无法完成工作。
-- Inverse of Lua unpack(). Returns all its arguments packed into an array. function pack( ... ) return arg end
现在,让我们实现管道。以下函数构建一个管道。它接受多个过滤器并返回一个表示链的新过滤器。返回的函数在调用时,将除最后一个过滤器之外的所有过滤器包装成协程,并为每个过滤器创建一个函数,该函数恢复协程,将过滤器的输入作为输入,将协程的 `yield` 作为输出,并返回 `yield` 的值(即过滤器的输出)。此函数(一个生成器)充当链中下一个过滤器的输入,第一个过滤器获取管道的输入作为输入。在构建链后,最后一个过滤器使用 *其* 输入和管道的输出进行调用,它返回的内容将从管道返回(尾调用)。我们现在不需要返回值,但稍后我们将看到这给了我们什么。
-- Creates a new filter acting as the concatenation of the given filters. A -- filter is a function with two parameters, input and output, that obtains -- its input by calling input and writes output by calling output. Optionally, -- return values may be produced, the return values of the last filter being -- returned by the pipe. function pipe( ... ) local lFilters = arg local lFilterCount = table.getn( lFilters ) return function( pInput, pOutput ) local lCurrentInput = pInput for lFilterPosition = 1, lFilterCount - 1 do local lCurrentFilter = coroutine.create( lFilters[ lFilterPosition ] ) local lPreviousInput = lCurrentInput lCurrentInput = function() local lResult = pack( coroutine.resume( lCurrentFilter, lPreviousInput, coroutine.yield ) ) local lSuccess = table.remove( lResult, 1 ) if lSuccess then if coroutine.status( lCurrentFilter ) == "suspended" then return unpack( lResult ) end else error( lResult[ 1 ] ) end end end return lFilters[ lFilterCount ]( lCurrentInput, pOutput ) end end
关于上面代码的一些补充说明:首先,在恢复协程时,我们传递过滤器的输入和输出。这只有在开始执行协程的第一次恢复时才非常必要。之后,这两个参数将通过每个输出调用返回给过滤器。由于它们将被过滤器忽略,因此不会造成伤害,我们不需要实现特殊处理。其次,由于过滤器可能会 *返回* 一些值(即使我们没有通过契约允许它),我们必须在每次恢复后检查协程是否已 `yield` 或已结束,以决定从恢复中获得的结果是否需要输出。
还要注意,管道函数本身并不构建过滤器链,这将推迟到过滤器实际运行时。否则,就不可能多次执行管道,它也不符合强意义上的过滤器。
这个小函数是我们拥有 Lua 中管道所需的一切!让我们看看我们可以用它做什么。
首先,让我们思考一下过滤器,以及我们的数据将从哪里来,又将去往何处。事实证明,我们的过滤器接口允许两种特殊类型的过滤器:一种不调用其输入参数的过滤器,它会从自身产生输出 - 它是源。相反,一种不调用其输出参数的过滤器只消耗输入 - 它是接收器。显然,源和接收器只能出现在管道的开头或结尾。以源开始的管道本身就是一个源,以接收器结束的管道本身就是一个接收器。以源开始并以接收器结束的管道是一个特殊情况:它不进行任何输入或输出,在调用时将所有数据从源一次性地泵送到接收器。我们很快就会看到这些管道 - 它们具有一些有趣的特性。首先让我们检查一下源和接收器从哪里获取数据,以及它们将数据放到哪里。
在源和接收器的情况下,输入和输出参数分别意味着什么?它们在这里不需要用于获取输入或写入输出,但它们可以用于描述过滤器(源或接收器)的(真实)数据源或目的地,例如,要读取或写入的文件名,或通过解析来提取数据的字符串。包含这些源或接收器的管道将拥有这些参数作为其自身的输入和输出参数,并将它们不变地传递给源和接收器。
现在,想象一下,我们的接收器是一个将接收到的数据收集到例如表格中,或者从数据中计算一些聚合值,例如词频统计或平均值。这就是过滤器返回值发挥作用的地方:接收器可以(或除了)使用其输出参数之外,返回一个值(或可能多个值)。管道(在这种情况下也是接收器)将返回此结果,而不进行修改。
现在,让我们再次看看同时是源和接收器的管道。如前所述,它们不执行任何输入或输出调用。我们可以将这些管道称为密封管道,因为它们将数据流保持在内部,而是公开了一个函数调用接口:它们通过使用参数和返回值来传递输入和输出。也就是说,它们有效地充当具有一个或两个参数并可能返回结果的普通函数!与函数一样,它们可以自由调用,每次调用都会将给定的输入参数中的数据一次性地通过自身“泵送”,并将结果一次性地返回(作为返回值或将其放置在输出参数指定的位置)。请注意,在连续数据之间也不保留任何状态,就像在正常的过滤器操作中一样:每个输入都会获得一个新的管道实例,该实例在数据被处理并返回结果后就会“消失”。因此,密封管道有效地将管道变成了函数!
现在让我们看看在实践中使用管道的样子。
为了实验,我们将定义一个简单的數據源,它生成一个从 1 开始的整数序列,并持续到它作为输入参数接收到的限制为止。
-- Produce a sequence of integers function seq( pLimit, pOutput ) for lValue = 1, pLimit do pOutput( lValue ) end end
我们已经可以测试这个过滤器。事实证明(至少只要 print()
给我们足够的信息来了解我们的值),我们可以调用 *任何* 过滤器或管道进行调试,使用它的输入和 print
作为输出,以将它的输出打印到控制台!很巧妙,对吧?让我们将数字序列源的输出打印到控制台。
> seq( 4, print ) 1 2 3 4
现在我们将定义一个数据接收器,它计算从输入读取的所有值的平均值。
-- Compute the average from a sequence of numbers function avg( pInput ) local lSum, lCount = 0, 0 for lValue in pInput do lSum = lSum + lValue lCount = lCount + 1 end return lSum / lCount end
让我们通过将它馈送到数据源产生的数据来测试它。我们将为此构建一个管道。
> average_of_sequence = pipe( seq, avg ) > return average_of_sequence( 4 ) 2.5
现在,让我们进行一些过滤。以下过滤器从输入读取数字,并将它们的平方写入输出。
-- Produce the squares of the input values function square( pInput, pOutput ) for lValue in pInput do pOutput( lValue * lValue ) end end
使用这个过滤器,我们将构建一个管道,它提供前 n 个平方的序列,然后向它追加另一个平方过滤器,以获得另一个产生 4 的幂序列的管道,最后我们从头开始构建一个管道来计算前 n 个 4 的幂的平均值。
> seq_squares = pipe( seq, square ) > seq_squares( 4, print ) 1 4 9 16 > seq_squares_squared = pipe( seq_squares, square ) > seq_squares_squared( 4, print ) 1 16 81 256 > average_of_seq_of_powers = pipe( seq, square, square, avg ) > return average_of_seq_powers( 4 ) 88.5
这显然运行得很好,但我们的平方过滤器确实不太有趣:它没有利用解耦的输入和输出管道提供的功能。上面的转换完全可以用引言中简化的管道实现来实现。因此,让我们编写两个更有趣的过滤器。第一个将只输出从输入读取的奇数,第二个将收集它的输入成对。
-- Let only odd numbers pass through function odd( pInput, pOutput ) for lValue in pInput do if math.mod( lValue, 2 ) == 1 then pOutput( lValue ) end end end -- Collect input into pairs function pair( pInput, pOutput ) for lFirstValue in pInput do local lSecondValue = pInput() pOutput( lFirstValue, lSecondValue ) if lSecondValue == nil then break end end end
如果我们将过滤器与我们的数字序列源连接起来,我们会看到它们按预期工作。
> seq_odd = pipe( seq, odd ) > seq_odd( 4, print ) 1 3 > seq_pairs = pipe( seq, pair ) > seq_pairs( 4, print ) 1 2 3 4
我们现在不会为一个过滤器构建示例,该过滤器比输入更频繁地调用输出,因为我们将在讨论扁平化过滤器时看到它的实际应用。现在让我们看看如何将我们的管道与我们可能已经拥有或打算编写的代码接口。
您可能已经注意到,过滤器和管道将生成器作为它们第一个(输入)参数。有许多函数可以返回生成器,例如 io.lines()
或 string.gfind()
,我们可以直接将它们用作管道的输入。例如,要使用管道 my_pipe
处理字符串 data
中包含的所有数字,我们可以编写
> my_pipe( string.gfind( data, "%d+" ), print )
但是,如果我们可以从 string.gfind
创建一个数据源,这样我们就可以将它放在管道开头,并改为编写
> my_pipe( data, print )
以下实用函数可以帮助我们实现这一点,它从返回生成器的函数构建数据源。执行时,数据源调用给定函数并将返回的生成器提供的所有数据写入其输出。
-- Wraps a function returning a generator as data source function source( pFunction ) return function( pInput, pOutput ) local lInput = pFunction( pInput ) while true do local lData = pack( lInput() ) if lData[ 1 ] == nil then break end pOutput( unpack( lData ) ) end end end
为了方便起见,我们定义了以下函数,它将一个接受多个参数和一些值的函数包装成一个只接受一个参数的函数,该函数使用其参数和这些值调用给定函数。由于大多数 Lua 库函数将要操作的值(this 值)作为第一个参数,因此此函数在 Lua 中为我们提供了函数式编程中的部分函数应用。
-- Wraps a function returning a generator as data source function curry( pFunction, ... ) return function( pParam ) return pFunction( pParam, unpack( arg ) ) end end
现在我们可以构建一个数据源,它使用 string.gfind
和模式 "%d+" 来查找输入参数字符串中包含的所有数字,并将它们写入其输出(作为字符串)。
> parse_numbers = source( curry( string.gfind, "%d+" ) ) > parse_numbers( "123 5 78 abc 12", print ) 123 5 78 12
在介绍另一个接口函数后,我们将在下一节中使用此数据源。
那么,在我们定义了一个从返回生成器的函数构建数据源的函数之后,如何从我们已经拥有的东西构建数据接收器呢?相反的操作是什么?
事实证明,我们不需要构建任何东西:接受一个值并返回一个生成器的函数的相反操作将是一个接受一个生成器并返回一个值的函数。但这正是我们针对接收器的情况下的过滤器接口的描述:任何接受一个生成器(以及可选的第二个参数)并(可选地)返回某个值的函数本身就是一个接收器 - 无需构建一个!
接下来我们想要做的是从现有函数构建过滤器。模式很简单:读取输入,调用函数并将结果写入输出。以下函数实现了这一点。
-- Wraps a function as filter function pass( pFunction ) return function( pInput, pOutput ) while true do local lData = pack( pInput() ) if lData[ 1 ] == nil then break end pOutput( pFunction( unpack( lData ) ) ) end end end
有了这个函数,我们可以创建一个过滤器,它使用 tonumber()
将从我们的数字解析器传递的字符串转换为数字,这样我们就可以使用它们来计算它们的平均值。我们构建以下管道。
> avg_from_string = pipe( parse_numbers, pass( tonumber ), avg ) > return avg_from_string( "123 5 78 abc 12" ) 54.5
请注意,我们可以使用密封管道的 pass()
将其用作子管道并将数据通过它传递。
以下函数是 pass
的一个变体,它不执行任何输出,有效地消耗所有输入而没有任何返回值。
-- Wraps a function as data sink, consuming any input without any output or -- return function consume( pFunction ) return function( pInput ) while true do local lData = pack( pInput() ) if lData[ 1 ] == nil then break end pFunction( unpack( lData ) ) end end end
现在让我们看看是否可以将管道与在循环中处理数据的自定义程序逻辑进行接口。管道有两个端点需要与之进行接口:处理从管道流出的输出,以及将值馈送到管道。 (在同一个线程中主动馈送过滤器的输入和读取其输出可能没有多大用处,因为这会导致缓冲和同步问题。因此,我们不会深入讨论这一点。)
为了通过循环处理从管道中输出的数据,我们需要将管道转换为生成器。以下函数实现了这一点,将管道(或任何过滤器)包装成一个可重用的函数,该函数带有一个参数,返回一个生成器,当它以参数作为输入时,生成管道的输出(此函数公开了与string.gfind()
、io.lines()
等使用的相同签名模式)。
-- Wraps a filter into a function with one argument returning a generator that -- yields all values produced by the filter from the input given as argument. function drain( pFilter ) return function( pInput ) local lGenerator = coroutine.create( pFilter ) return function() local lResult = pack( coroutine.resume( lGenerator, pInput, coroutine.yield ) ) local lSuccess = table.remove( lResult, 1 ) if lSuccess then if coroutine.status( lGenerator ) == "suspended" then return unpack( lResult ) end else error( lResult[ 1 ] ) end end end end
现在我们可以编写以下类型的代码
for ... in drain( pipe( ... ) )( ... ) do ... end
我们最初指出要排空的管道必须是数据源,但实际上,上面的函数没有对管道进行任何假设,因此不需要管道是数据源。它取决于我们打算传递给返回函数的参数类型:如果我们计划传递一个生成器,则管道不必是数据源。
现在,如何进行相反的绑定,即,将一个数据接收器管道转换为一个可以重复调用以将数据馈送到管道输入的函数?我们可以实现一个返回此类馈送器的函数,如下所示(为简单起见,我们在这里不返回可重用的函数,如drain()
的情况,而是在一步中将输出参数绑定到管道)
-- Wraps a pipe that must be a data sink into a function that feeds its -- arguments into the pipe's input. -- THIS FUNCTION DOES NOT WORK WITH THE PIPES WE USE HERE! function feed( pPipe, pOutput ) local lOutput = coroutine.create( pPipe ) local lPush = function( ... ) local lSuccess, lResult = coroutine.resume( lOutput, unpack( arg ) ) if not lSuccess then error( lResult ) end return coroutine.status( lOutput ) == "suspended" end lPush( function() return coroutine.yield() end, pOutput ) return lPush end
该函数从管道创建协程,将coroutine.yield
作为输入传递给它,然后执行它,直到它产生(请求它的第一个输入),并返回一个包装器,该包装器在每次调用时恢复协程,并将它的参数传递给它。包装器返回true
,如果协程再次产生,也就是说,如果它将接受更多输入。让我们使用它
> feeder = feed( square, print ) stdin:6: attempt to yield across metamethod/C-call boundary stack traceback: [C]: in function `error' stdin:6: in function `lPush' stdin:10: in function `feed' stdin:1: in main chunk [C]: ?
糟糕!我们遇到了 Lua 的一个限制:Lua 不允许从for
循环的头部调用的生成器产生!如果我们重写我们的square
过滤器,我们可以绕过这个限制
> function square2( pInput, pOutput ) >> while true do >> local lValue = pInput() >> if lValue == nil then break end >> pOutput( lValue * lValue ) >> end >> end > feeder = feed( square2, print ) > return feeder( 2 ) 4 true
然而,还有一个问题:feed
函数只适用于简单的过滤器,它不适用于管道。原因是,在我们管道实现中,执行过滤器的协程在 resume/yield 链中一个接一个地“堆叠”,并且接收coroutine.yield
作为输入的过滤器已经被后面的过滤器恢复,因此在产生时,它将返回那里,而不是返回到顶层调用管道的代码。我们不会得到处理器,让管道等待输入,正如我们所期望的那样。
我们可以改进我们的管道实现,使其不再存在此问题:为此,我们将所有包含的过滤器包装到协程中,并让它们在输入和输出时产生,并通过一个主循环来控制它们,该主循环本身调用输入和输出
-- A symmetric pipe implementation. Pipes of this sort can be resumed from -- both ends but prevent the constituting filters to use 'for' loops for -- reading input. Also, sources and sinks cannot use the input and output -- parameters of the pipe. function symmetric_pipe( ... ) local lFilters = arg local lFilterCount = table.getn( lFilters ) return function( pInput, pOutput ) local lHandover local lInput = function() lHandover = nil coroutine.yield() return unpack( lHandover ) end local lOutput = function( ... ) lHandover = arg coroutine.yield() end local lProcessors = {} for _, lFilter in ipairs( lFilters ) do table.insert( lProcessors, coroutine.create( lFilter ) ) end local lCurrentProcessor = lFilterCount while lCurrentProcessor <= lFilterCount do if not lProcessors[ lCurrentProcessor ] then error( "Requesting input from closed pipe" ) end local lSuccess, lResult = coroutine.resume( lProcessors[ lCurrentProcessor ], lInput, lOutput ) if not lSuccess then error( lResult ) end if coroutine.status( lProcessors[ lCurrentProcessor ] ) == "suspended" then if lHandover == nil then lCurrentProcessor = lCurrentProcessor - 1 else lCurrentProcessor = lCurrentProcessor + 1 end if lCurrentProcessor == 0 then lHandover = pack( pInput() ) lCurrentProcessor = 1 elseif lCurrentProcessor > lFilterCount then pOutput( unpack( lHandover ) ) lCurrentProcessor = lFilterCount end else lHandover = {} lProcessors[ lCurrentProcessor ] = nil lCurrentProcessor = lCurrentProcessor + 1 end end end end
此管道实现允许管道也与feed()
一起使用,但是,这些管道还有另一个问题:由于所有过滤器现在将在输入时产生,因此它们会受到我们上面看到的 Lua 限制的影响,实际上禁止我们在任何过滤器中使用for
循环。这是一个过于严重的限制,因此我们放弃了这个原本优雅的实现,接受了无法从循环中将数据馈送到管道的事实。如果我们想编写将数据馈送到管道的代码,我们必须要么使用corourine.yield
和coroutine.wrap
将其包装成一个生成器,要么将其编写为数据源,将其塞到管道前面,然后执行管道。我们的管道必须始终处于控制位置,并且没有办法让我们的循环控制执行!
上面 symmetric_pipe()
实现还有一个问题:它会阻止将管道的输入和输出参数用作数据源和接收器的参数,因为它们永远不会看到这些参数。这样会让我们失去很多灵活性。
使用我们上面提到的 source()
函数,我们可以简单地包装 io.lines
来获得一个数据源,该数据源读取文件并将其内容逐行写入其输出。遗憾的是,标准库中没有我们可以用作文件接收器的函数,我们必须使用 io.write
自己编写。为了对称,我们还使用 io.read
实现了一个文件源。我们以一种可以将其用作源(接受两个参数)或用作返回生成器的函数(当使用一个参数调用时)的方式实现它。这使我们能够根据需要使用它来构建管道以及调用它们。
-- Returns a function that can be used as data source yielding the contents of -- the file named in its input parameter, processed by the given formats, or -- that can be called with a filename alone to return a generator yielding this -- data. If no formats are given, line-by-line is assumed. function filereader( ... ) local lFormats = arg return function( pInput, pOutput ) local lInput = io.open( pInput ) local lOutput = pOutput or coroutine.yield local lFeeder = function() while true do local lData = pack( lInput:read( unpack( lFormats ) ) ) if lData[ 1 ] ~= nil then lOutput( unpack( lData ) ) end if lData[ table.getn( lData ) ] == nil then break end end lInput:close() end if pOutput then lFeeder() else return coroutine.wrap( lFeeder ) end end end -- Returns a data sink that writes its input, optionally formatted by the -- given format string using string.format(), to the file named in its output -- parameter. If no format is given, the processing is analogous to print(). function filewriter( pFormat ) return function( pInput, pOutput ) local lOutput = io.open( pOutput, "w" ) while true do local lData = pack( pInput() ) if lData[ 1 ] == nil then break end if pFormat then lOutput:write( string.format( pFormat, unpack( lData ) ) ) else for lIndex = 1, table.getn( lData ) do if lIndex > 1 then lOutput:write( "\t" ) end lOutput:write( tostring( lData[ lIndex ] ) ) end lOutput:write( "\n" ) end end lOutput:close() end end
让我们使用两者逐行复制文件
> copy = pipe( filereader(), filewriter() ) > copy( "data.in", "data.out" )
如果我们在读取器和写入器之间塞入各种过滤器,我们就可以对通过管道的行进行任何我们想要的处理。一个简单的场景是一个像 grep 这样的实用程序。以下函数构建了一个我们可以用于此的过滤器
-- Returns a filter that filters input by the given regexp function grep( pPattern ) return function( pInput, pOutput ) for lData in pInput do if string.find( lData, pPattern ) then pOutput( lData ) end end end end
我们现在不会测试它,而是构建一个稍微更有趣的示例。你可能已经注意到上面的函数没有以任何方式提及行。实际上,过滤器独立于输入是行还是其他内容。让我们编写另一个过滤器,将行收集成段落,并将这两个过滤器组合在一起,对段落而不是行进行 grep(就像 grep -p
)。
-- Filter collecting lines into paragraphs. The trailing newline is preserved. function paragraphize( pInput, pOutput ) local lBuffer = {} for lLine in pInput do table.insert( lBuffer, lLine ) if lLine == "" then pOutput( table.concat( lBuffer, "\n" ) ) lBuffer = {} end end if next( lBuffer ) then pOutput( table.concat( lBuffer, "\n" ) ) end end
> filter_foobar_paragraphs = pipe( filereader(), paragraphize, grep( "foobar" ), filewriter() ) > filter_foobar_paragraphs( "data.in", "data.out" )
如果我们愿意,我们可以省略文件写入器,在管道中提供 print
而不是输出文件名作为第二个参数,以便将结果打印到控制台。
一种特别有趣的 FileReader 是读取 PIL 第 10.1 节(https://lua.ac.cn/pil/10.1.html)中描述的 Lua 数据文件:我们希望它将从文件中读取的条目一个接一个地写入输出。以下函数类似于 filereader - 作为数据源或返回生成器的函数。它在一个空的沙箱中执行 Lua 文件,利用一些元编程来向执行的代码块提交它调用的用于提交记录(PIL 示例中的 entry
)的函数。这是通过定义沙箱的 __index
元方法来实现的,该方法为代码块提供一个函数,该函数输出或生成记录及其标签名称(在本例中为 entry
)。将标签名称也写入输出使我们能够拥有包含多种数据(实体类型)的输入文件,例如,如果我们希望将整个(足够小的)关系数据库的内容保存在一个数据文件中,我们可以使用表名作为标签名并将所有表的内容写入一个文件。
-- Invocation with a single argument returns a generator yielding the records -- from the named Lua file. When invoked with two arguments this function acts -- as a data source yielding this data on output. function luareader( pInput, pOutput ) if pInput == nil then error( "bad argument #1 to `loadfile' (string expected, got nil)" ) end local lInput, lError = loadfile( pInput ) if not lInput then error( lError ) end local lOutput = pOutput or coroutine.yield local lSandbox = {} setmetatable( lSandbox, { __index = function( pSandbox, pTagname ) local lWorker = function( pRecord ) lOutput( pRecord, pTagname ) end pSandbox[ pTagname ] = lWorker return lWorker end } ) setfenv( lInput, lSandbox ) if pOutput then lInput() else return coroutine.wrap( lInput ) end end
如果我们使用来自 https://lua.ac.cn/pil/10.1.html 的数据(只有一条记录)进行测试,我们会得到
> reader = luareader( "data.lua" ) > return reader() table: 2001af18 entry > return reader() >
现在让我们来研究一下使用管道可以实现的一些更复杂的转换。
你可能已经问过自己,我们如何从多个输入源将数据馈送到管道中,以便管道的输入是这些源输出的串联。
如果你仔细想想,你会发现这种操作的核心并不局限于源,而可以在管道的任何阶段发生:我们需要一个过滤器,它可以从每个输入项生成一个序列,并将所有这些序列的串联写入输出。这本质上是一个扁平化操作。有了这种作为过滤器的泛化,我们就不再需要构建像多源这样的特殊东西:要连接多个输入,我们只需要生成这些输入或描述它们的值的序列,并应用一个过滤器,将每个元素展开到它包含的数据中。我们最好通过调用一个函数来做到这一点,该函数将元素作为参数,并返回一个生成器,生成展开它所产生的数据。
以下函数接受一个返回生成器的函数(如 io.lines
),并从中构建一个过滤器,该过滤器对每个输入调用此函数,并将生成器的输出馈送到其输出
-- Returns a filter that calls the given function for every input and feeds -- the output of the returned generator into its output. function expand( pFunction ) return function( pInput, pOutput, ... ) while true do local lInput = pack( pInput() ) if lInput[ 1 ] == nil then break end local lGenerator = pFunction( unpack( lInput ) ) while true do local lData = pack( lGenerator() ) if lData[ 1 ] == nil then break end pOutput( unpack( lData ) ) end end end end
假设我们有一个文件,其中包含一个文件名列表,每行一个,我们希望将这些文件的内容逐行连接到一个输出文件中。这就是我们如何做到这一点
> concat_files = pipe( filereader(), expand( filereader() ), filewriter() ) > concat_files( "filelist.in", "data.out" )
顺便说一下,这个例子展示了 filereader()
返回的函数的双重性质背后的隐藏力量。
现在让我们考虑相反的操作:将流经管道的数据组装成更大的单元。由于流经管道的数据本质上是扁平的,因此组装过滤器必须自己知道(可能也通过查看数据)在哪里绘制边界,将进入一个组装的数据与进入下一个组装的数据隔开。在扁平化的情况下,没有像输入结束这样的外部提示来从一个组装切换到下一个组装。
描述组装算法的最简单方法是作为接受迭代器作为参数的函数,从迭代器中提取尽可能多的数据,并返回组装。这样的函数可以通过以下函数包装成折叠过滤器
-- Filter that calls the given function with its input until it is exhausted, -- feeding the returned values into its output. function assemble( pFunction ) return function( pInput, pOutput ) local lTerminated = false local function lInput() local lData = pack( pInput() ) if lData[ 1 ] == nil then lTerminated = true end return unpack( lData ) end repeat pOutput( pFunction( lInput ) ) until lTerminated end end
有了这个函数,我们可以为我们之前实现的将行收集到段落的过滤器提供一个更简单的实现
-- Collects a paragraph by pulling lines from a generator. The trailing newline -- is preserved. function collect_paragraph( pInput ) local lBuffer = {} for lLine in pInput do table.insert( lBuffer, lLine ) if lLine == "" then break end end return table.concat( lBuffer, "\n" ) end paragraphize = assemble( collect_paragraph )
在上一节中,我们看到了如何拆卸和组装复合数据(在本例中为文件和段落)。这些复合数据的组成部分与复合数据本身一起沿着同一个线性管道馈送,从而形成一个单一的线性转换路径。(这正是管道的意义所在,不是吗?)关于这种情况,有两点需要注意
毕竟,在一般情况下,后者甚至是不可能的,因为中间的过滤器可以随意地修改分解后的数据,模糊了初始复合体之间的边界。
现在,想象一下,如果我们想要处理具有异构结构的复杂数据,其中组成部分需要进行不同的转换,并且我们也想要使用管道进行这些转换,那么需要使用什么拓扑结构来代替线性管道。想到的拓扑结构将是一组从一个节点出来的并行“子管道”,并连接到另一个节点。问题是,这些子管道的输入和输出如何在这些节点中以及它们之间同步?
显然,并行管道不能“互相抢食”,单独拉取输入,并且任何管道产生的所有内容都必须以某种方式进入从所有管道输出构建的结果。自然地,我们得出了这样的概念:所有子管道必须在同一个节点(过滤器)中拥有它们的输入和输出,并且它们只获得一个输入(由控制子管道的过滤器读取),并且必须产生一个输出(由控制过滤器组装并写入输出)。也就是说,子管道与过滤器的输入和输出分开,过滤器的输入和输出由控制过滤器节点本身提供。事实上,子管道因此作为函数运行,接受参数并产生返回值。众所周知,此类管道必须以数据源(获取输入值)开头,并以数据接收器(返回结果值)结尾。从过滤器节点的角度来看,这与函数调用完全没有区别。
事实证明,使用并行“子管道”处理复杂值的组成部分,对过滤器将它们作为输入/输出没有任何特殊要求——就像调用函数一样。由于管道没有什么特别之处,我不会提供一个函数,通过调用一组函数来从另一个结构化值构建一个结构化值。
但是,我们可以提供一个使用子管道不涉及复杂值的简单示例。我们将使用pass()
运行一个密封管道作为子管道,以处理流经管道的单个值。请记住我们上面计算 4 的前 n
次方的平均值的 average_of_seq_of_powers
示例。我们现在感兴趣的是这个值如何随 n
变化,并通过将每个 n
传递到我们已经拥有的管道 average_of_seq_of_powers
中来构建一个相应的序列,使用它作为子管道。
> seq_of_average_of_seq_of_powers = pipe( seq, pass( average_of_seq_of_powers ) ) > seq_of_average_of_seq_of_powers( 4, print ) 1 8.5 32.666666666667 88.5
这是可能的,因为average_of_seq_of_powers
是一个封闭管道,因此表现得像一个函数。将“传递”用于这种方式使用子管道可能有点误导,因为没有一个活动的子管道读取输入并写入输出——这是一种完全不同的通信模式:对于每个“传递”的值,都会创建一个新的管道实例并运行到结束。但“传递”也是我们在调用函数时使用的术语,而且该值确实设法“通过”子管道,经历了转换,所以这可能是合理的。
除了我们在上一节中讨论的内容之外,我们可以想象另一种使用并行管道的场景,其中子管道仍然是外部管道的管道,而不是伪装成函数:数据流可能在某个阶段由不同类型的数据组成,这些数据必须经过不同的转换才能再次加入主管道。必须有一些开关将数据路由到一组子管道中的一个,并且一些收集器从子管道末端以相应的输入馈送的顺序拉取数据。后一个条件迫使收集数据的节点成为进行输入路由的节点(因为这两个节点没有其他通信通道可用于同步)。与上述复杂数据场景的不同之处在于,子管道不需要在输入/输出行为上同步,甚至可以交错输入和输出。
为了实现这种处理,我们必须能够在输入时挂起管道,在数据可用时馈送数据。这对于我们这里使用的管道类型来说是不可能的,并且需要对称管道,因此我们放弃了这个想法。
出于好奇,让我们问问,在这些前提条件下,是否可以拥有一个真正的 *管道意大利面*,即从一个源头发出并汇聚到一个接收器的 *管道网络*。显然,这样的结构无法通过递归地将不太复杂的结构组装成更复杂的结构来组装,而是需要通过单独连接它们的输入和输出,将过滤器和子管道连接在一起。如果我们考虑数据如何通过这样的网络传输,我们会发现,与我们的拉式通信方案(向生成器请求输入)不同,我们需要一个推式通信方案(调用函数来处理输出)。(我们的拉式方案与开关执行的输出调度类型不协调——能够检查要路由的数据——并且需要根据已经输出的数据来调度 *输入请求*,也就是说,无法检查输入。)有了它,我们甚至可以想象具有多个输入和输出的 *处理网络*。这将是一个有趣的项目,可以实施和实验,看看它能为我们做些什么。
虽然我们通过实现管道所追求的主要目标是获得一种用于 *实现* 数据转换的自然符号(对于长度不确定的数据流而言),然后可以将其组合成更复杂的转换,但让我们看看是否可以增强 *使用* 这些转换的符号。为此,我们将应用一些元编程。
用于调用管道的内联符号,即构建一个单次使用的管道,并在不将其存储在变量中时立即使用它,看起来有点笨拙。以我们的 grep 实用程序为例(省略 paragraphize
)
> pipe( filereader(), grep( "foobar" ), filewriter() )( "data.in", "data.out" )
这与 UNIX 符号的简洁性相去甚远,UNIX 符号大致相当于
$ filereader "data.in" | grep "foobar" | filewriter "data.out"
在考虑如何使用运算符符号来构建管道时,我们首先注意到我们有两个选择。请记住,过滤器是一个函数,它以生成器作为输入参数,以函数作为输出参数。有了生成器(当我们要执行正在构建的管道时,这种情况就会发生),我们可以用 coroutine.yield
代替输出,并将下一个过滤器包装成一个协程,从而得到另一个生成器,它将输入作为过滤器处理后的结果进行生成。然后,我们可以将此作为下一个过滤器的输入,并重复此过程,在最后一个过滤器之后,得到一个生成器,它生成整个管道的输出。我们可以在 for
循环中使用它,但为了 *执行* 管道,我们必须添加另一个结构来清空生成器,或者更好的是,通过其他结构追加接收器,并为其提供输出参数。这样,输出符号将与用于馈送输入的符号不同。此外,如果我们以这种方式构建生成器链,我们将重新实现 pipe()
的大部分功能,但将无法使用此符号来创建可以作为子管道使用的管道,例如。
因此,我选择了我们拥有的第二个选项:构建一个可重用的管道并立即执行它。
构建管道很容易。我们只需要将第一个过滤器包装到一个代理中,即一个带有附加元表的表,该元表定义了实现我们所需运算符的元方法。由于第一个过滤器通常是通过 `source()` 构建的数据源,或者是我们库提供的像 `luareader` 这样的数据源,因此这种包装大多可以隐藏。对于必须显式完成的那些情况,我们使 `bind()` 函数公开可见。
-- Metatable local sPipeOperations -- Binds its argument to the library by wrapping it into a proxy function bind( pValue ) local lProxy = { payload = pValue } setmetatable( lProxy, sPipeOperations ) return lProxy end
元表定义了 `__mul` 元方法来实现连接过滤器的 * 运算符。我选择乘法而不是例如加法或连接,因为塞在一起的过滤器的效果确实会 * 乘*,而不是加。 (您可以在我们上面的 `seq_squares_squared` 示例中非常清楚地看到这一点。)此外,星号不仅看起来很突出,更重要的是,它比大多数其他运算符绑定得更强,这使得可以在不使用括号的情况下将它们应用于管道。运算符也返回一个代理,因此我们可以将其链接起来。
sPipeOperations = { -- Chains a proxy with a function or other proxy __mul = function( pHeadProxy, pFilter ) local lProxy = { head = pHeadProxy, payload = fncompile( pFilter ) } setmetatable( lProxy, sPipeOperations ) return lProxy end, ... }
为了能够透明地调用包装的函数,我们还实现了 `__call` 元方法,它将使用给定的参数执行代理表示的函数。我们使用两个内部实用函数构建代理表示的函数。
-- Compiles a proxy to the function it represents local function fncompile( pSpec ) if type( pSpec ) == "function" then return pSpec else if pSpec.head then return fncompile( pipe( unpack( unchain( pSpec ) ) ) ) else return fncompile( pSpec.payload ) end end end -- Collects the payloads of a proxy chain into an array local function unchain( pProxy ) if not pProxy then return {} end local lResult = unchain( pProxy.head ) table.insert( lResult, pProxy.payload ) return lResult end sPipeOperations = { ... __call = function( pProxy, ... ) return fncompile( pProxy )( unpack( arg ) ) end, ... }
fncompile()
被实现为一个单独的函数,并且也调用 `pipe()` 的结果的原因是,我们希望能够自由地混合和匹配使用 * 或 `pipe()` 构建的管道。为了实现这一点,`pipe()` 与任何其他将函数作为参数的函数一样,使用 `fncompile()` 在使用前将它们转换,并且像任何构建过滤器的函数一样,返回一个 * 绑定* 函数。(我不会在这里再次提供更改后的函数。)
我们现在可以将管道塞在一起,或者调用将管道作为参数的函数,而不再使用 `pipe()`,如果数据源或第一个过滤器是 * 绑定* 的,也不再使用 `bind()`
> seq = bind( seq ) > average_of_seq_of_powers = seq * square * square * avg > return average_of_seq_of_powers( 4 ) 88.5 > filter_foobar_lines = filereader() * grep( "foobar" ) * filewriter() > filter_foobar_lines( "data.in", "data.out" )
关于内联调用,我们到目前为止还没有赢得多少,因为我们现在可以从上面的丑陋表示法中删除四个字母:`pipe`。我们需要在管道构造表达式周围保留括号,因为函数调用运算符比乘法绑定得更强。
> (filereader() * grep( "foobar" ) * filewriter())( "data.in", "data.out" )
所以,这还不是我们想要到达的地方。对于必须使用两个参数调用的管道,可能没有我们可以轻松地做的事情,除非过度复杂化。
但是,我们可以为输入数据实现类似管道的表示法,并结合一个赋值,用于管道在仅使用 * 一个* 输入参数 * 返回* 其结果时进行转换的结果(即,作为具有单个参数的函数调用工作)。请记住我们上面的 `average_of_seq_of_powers` 示例,我们现在将重写如下。
> result = 4 .. seq * square * square * avg
数据转换的结果将进入一个程序变量以供以后处理,这是一个常见的情况,同时我们习惯在这种情况下使用赋值表示法。因此,这种表示法似乎是一个很好的折衷方案,而且如果我们尝试进一步调整表示法,直到我们将结果的接收器放在像 UNIX 这样的管道的最右边,我们可能不会获得更多收益。
如果我们实现了 `__concat` 元方法,该方法在 Lua 在两个操作数中的任何一个提供的情况下,会为 .. 运算符调用,则上述表示法成为可能。
sPipeOperations = { ... __concat = function( pInput, pProxy ) return fncompile( pProxy )( pInput ) end }
(请注意,我们无法通过实现__lt
元方法来使用更好看的>运算符,因为Lua会将其结果转换为布尔值,而这并非我们所需要的。)
最后,让我们以seq_of_average_of_seq_of_powers
子管道示例来说明这种表示法的强大之处。由于我们被迫计算一个值而不是写入输出,我们将添加另一个平均值计算,获取从长度为1到给定数字的序列的平均值的平均值,即4的幂的平均值。
> return 4 .. seq * pass( seq * square * square * avg ) * avg 32.666666666667
这完成了我们的元编程之旅。在进行一些总结性思考之前,让我们注意以下观察结果:如果我们在使用前进行bind
,我们可以使用我们为调用任何接受一个参数的函数实现的..表示法。
> return 1 + "4" .. bind( tonumber ) 5
并且,如果我们手头的所有函数都是绑定的,我们可以通过使用连接运算符来构建整个调用链!(但是,在上面的示例中,像使用其他运算符一样,将受到运算符优先级的约束,因此我不建议这样做。)但是,如果我们的数据表示为表(这种情况很常见),则有一种更好的方法来表示此类调用链:我们可以为我们的数据配备一个元表,该元表定义一个__call
元方法,该方法使用数据和剩余参数调用传递给它的第一个参数的函数)。最终的表示法可能如下所示(map
函数,此处未提供,通过将源表中的值存储在结果表中,并使用传递给它的函数进行转换,并在其原始键下构建一个表)。
t (map, tonumber) (map, square) (avg)
这看起来像一个非常不错的函数调用链,它将表示数字的字符串数组t
转换为数字,对它们进行平方并计算平均值。它计算为
avg( map( map( t, tonumber ), square ) )
这可读性差得多。
请注意,上面的square
和avg
函数不是我们之前定义的那些函数。那些是过滤器,我们可以将它们用于管道来计算相同的结果(前提是我们有一个接受表并返回一个生成其值的生成器的values
函数)。然后,我们可以使用以下表示法之一,第一个表示法利用表的__call
元方法,第二个表示法利用管道代理的__concat
元方法。
t (source( values ) * pass( tonumber ) * square * avg) t .. source( values ) * pass( tonumber ) * square * avg
请注意,虽然第二种表示法看起来更简洁,但它不能像第一种表示法那样用作语句 - Lua 要求我们对返回值进行一些操作。这通常不是问题,因为我们无论如何都希望使用管道提供的结果作为返回值,但如果表达式的结果完全是副作用,这将是一个限制。但在这种情况下,该表示法可能也会因为缺乏提供输出参数而受到影响,而这可以通过第一种表示法轻松实现。
t (source( values ) * pass( tonumber ) * square, print)
我们已经讨论到了最后,让我们对我们实现的机制做一些最后的考虑。
你可能想知道为什么我们不需要在我们的过滤器接口中从输出函数返回一个布尔值来指示过滤器输出是否正常,更重要的是,是否接受更多输出。毕竟,我们实现了过滤器作为主动实体来写入输出,并且必须有人接收它。在 UNIX 上,当过滤器尝试写入下一个过滤器的输入时,如果该过滤器已经完成执行并关闭了其输入,以便不再接受更多数据,我们将看到管道关闭
错误。
我们不需要这样做是因为在我们的实现中,任何过滤器都会在请求输入时被后续过滤器恢复。也就是说,它只能在前面的输出被处理并且接受新的输出的情况下执行(实际上是预期的)。因此,永远不会出现过滤器处理完其输入而无法传递其输出的情况:输出永远不会失败 - 我们永远不会看到我们从 UNIX 中知道的管道关闭错误!这是我们拉取式通信方案的直接结果。这种通信方案也导致了我们的过滤器接口的良好对称性:输入函数不接受任何值并返回数据,而输出函数接受数据并返回无值。
这带来的一个副作用是,没有保证任何过滤器会读取其输入到结束。因此,过早结束输入处理将不会被注意到。实现这样的检查很容易 - 任何过滤器在它的前驱者之前都不能死亡 - 但可能不值得付出努力。(但是,如果例如文件读取器没有被耗尽,但抛出错误并不能解决这个问题,那么可能仍然存在打开的文件。)
如果我们使用推送式通信方案(让过滤器在输入上让步并恢复其输出)来实现管道,我们将遇到另一种情况:当输出返回(从对下一个过滤器的恢复调用)时,它知道该过滤器是否还活着,因此将接受更多输入。它可以将此信息传达给调用过滤器,以便它可以停止处理(或忽略它,在下一个输出上引发管道关闭错误)。然后,输入/输出接口将失去其对称性,构成过滤器实现的循环将不再仅由输入控制。看起来,我们的直观的拉取式实现从这个角度来看也是可取的,并且产生了更自然的实现。
再次思考输出无法失败的问题,似乎与现实存在差异,现实中输出,例如到网络连接的输出,确实可能失败。因此,让我们仔细看看。
管道中除了最后一个过滤器之外,任何过滤器都会将coroutine.yield
作为输出。这个调用确实永远不会失败。(它在协程恢复时返回。)另一方面,管道中的最后一个过滤器应该是一个数据接收器,因此它不会调用其输出,这意味着输出调用也不会为它失败。那么,输出错误从哪里出现呢?显然,是数据接收器调用可能失败的函数的地方。这意味着数据接收器需要与其交付数据的任何内容签订协议,以便能够传达输出通道的状态并处理任何错误。或者,我们可以不编写自定义数据接收器,而是使用自定义程序循环drain()
管道,将它的输出馈送到某个通道,自己处理任何问题。在这两种情况下,任何过滤器都不必处理任何通信错误。(但是,如果我们将可能失败的函数作为输出传递给不以接收器结尾的管道,错误要么会被忽略,要么会被抛出到调用执行管道的代码。)
输入错误呢?任何过滤器中发生的任何错误都会透明地转发(沿着输入链重新抛出)到数据接收器。因此,接收器,或者如果我们喜欢我们编写的从管道末端读取的循环,将处理(或不处理)输入错误以及输出错误。
另一个有趣的观察:请记住从运行在一个线程中的控制代码读取和写入管道输入和输出的场景。
为了解决它带来的同步问题,我们可以决定通过使用回调架构来解耦处理管道两端的代码:通过管道可以获得其输入的callback
,从而使控制代码仅管理管道的输出端。事实证明,这正是我们在本文中使用管道的方式:callback
是传递给管道的输入函数(生成器)!这就是drain()
的场景。
从另一端看,我们可以决定为管道提供一个call forward
,用于传递其输出,并让控制代码仅处理管道的输入端。同样,这就是我们的过滤器接口的意义所在:我们作为输出参数给出的是这样的call forward
函数。这就是feed()
的场景。
如果我们比较这两种场景,我们会发现,在这两种场景中,管道(我们知道,管道本身是活跃的,就像每个过滤器一样)都被包装成由函数表示的被动实体,只对外部刺激(函数调用)做出反应:在我们的拉式管道实现中,从管道输出端拉取数据,或者如果我们以推式方式实现管道,则将数据馈送到管道的输入端,让过滤器在输入而不是输出上产生结果。(这会导致一个比我们这里使用的生成器方法更不自然、更复杂的解决方案,并且此外还会对在过滤器实现中使用 for 循环提出上述限制。)
只要管道只被 *执行*(也就是说,包含一个数据源 *和* 一个数据接收器),两种管道实现之间就不会有外部可见的差异。
在这里再次看看我们来自同一部分的 *过滤器网络*:实际上,这是一个由活跃处理节点网络构建的事件驱动应用程序。它对输入发生的事件(从外部调用输入函数以馈送数据)做出反应,将它们转换为输出上的事件(调用输出函数以传递数据)和/或更改其内部状态(甚至可能重新配置自身)。顺便说一下,可能存在任何类型的处理节点,而不仅仅是我们认为是过滤器的节点!虽然这本质上是一个 call forward
架构,但事件驱动应用程序通常在 callback
架构中被看到。这很有趣,但没有矛盾,一切取决于我们从哪个角度看待调用 - 从被调用方(应用程序)或调用方(框架)代码来看。因此,当我们将我们的 *处理网络* 连接到某个框架,该框架从外部来源(例如鼠标事件)馈送其输入,并且可能还提供它用于输出的函数(例如图形例程)时,我们自然会将我们的输入函数作为 callbacks
提供给它。