Filters And Pipes Reloaded

lua-users home
wiki

A Coroutines Based Framework

on pipes, call chains and notation

by HartmutSchaefer

摘要

The very interesting article FiltersSourcesAndSinks by DiegoNehab together with section 9.2 of PIL (https://lua.ac.cn/pil/9.2.html) inspired me to experiment with pipes and think about notation issues and what makes pipes different from what we are used to work with in imperative programming. In the article I will discuss what pipes have in common with chained function or method calls, and what makes them different. I will then present and continually evolve a set of functions for working with pipes in Lua. Finally I will use some metaprogramming to get a more pipe-like notation.

The Tao of Data Transformation

Many applications of computers involve the transformation of data from one representation into another, possibly including various forms of mixing and matching with other data. Such transformations can naturally grow more or less complex.

Complex data transformations - as any other algorithm - are easier to understand if they can be decomposed into smaller pieces, each of which performs a simple and well defined task. Additional power results if such pieces can be built in a reusable way, forming a set of building blocks from which complex transformations can be easily assembled.

The simplest form of composition is probably applying a number of transformations in sequence, that is using the result of every transformation as input to the next one. Depending on the environment we are working in, the notations and inner workings differ

If you are working with functions, you will probably write something like

result = transformation3( transformation2( transformation1( data ) ) )

This doesn't look very intuitive, since the transformations are written in the opposite order they are applied. (If they take additional parameters, the notation tends to get even worse.) The notation, however, doesn't change the essence of what's going on and can be enhanced without changing that. If you have a mechanism for function composition, you can write something similar to the following to have the transformations in the order they are applied

result = chain( transformation1, transformation2, transformation3 )( data )

(Using more higher order functions you can even stuff additional parameters with their transformations.)

Further, if you get help from the compiler or some preprocessor that turns, for instance, the | operator into calling its right operand with the left one as parameter, you could achieve an even more intuitive notation, putting the data you start with at the beginning of the chain and possibly writing something like

result = data | transformation1 | transformation2 | transformation3

and, if you liked, with even more help you could even drop the familiar assignment notation and put the receiver for the result at the end of the chain to end up with a notation that looks much like a UNIX pipe, showing how data travels through a series of transformations like this

data > transformation1 > transformation2 > transformation3 > result

or something similar. Note though, that all this doesn't change what's happening under the hood: a simple sequence of wrapped function invocations!

In object oriented programming, including mechanisms like C# extension methods, you can have the sequenced notation we saw above for free using chained method calls

result = data.transformation1().transformation2().transformation3()

The parentheses in this notation also make it fairly visible that the transformation "pipe" through which our data "flows" is simply a series of function calls. (By the way, the fact that in OO the possible transformations are bundled as methods with their input data types is no limitation whatsoever but only an implementation detail.)

Coming to the heart of our discussion, UNIX has brought to us a fundamentally different concept of applying a sequence of transformations to data: UNIX pipes. They come out of the box with an intuitive piping notation like the one we created above (without actually having pipes then!)

data > transformation1 | transformation2 | transformation3 > result

We will see shortly what constitutes the fundamental difference of pipes to function or method call chains, but let's note first that all three mechanisms, independent of notation, reflect the same principle: applying a sequence of transformations to some data.

Said that, there is one aspect that fundamentally distinguishes pipes from function or method calls, and, as we could see above it's neither the notation, nor is it visible from it: Pipes process data in chunks. This applies to the individual stages of the pipeline (called filters) as well as to the pipeline as a whole. Only the amount of data that is actually needed for producing the next output is held in memory or other temporary storage at any stage, and no intermediate results are assembled and stored between the stages whatsoever.

This property makes an important distinction. Not only are pipes able to process unlimited amounts of data, they can even process infinite streams of data such as sensor measurements or the like, yielding results as soon as they are available. This wouldn't be possible using function or method calls since neither can an unlimited sequence of data be collected in advance to processing it, nor can there be any result before the end of the data stream, meaning that there can never be any result.

This difference causes a fundamental difference in the interface by which the transformations are connected within pipes. While function or method calls communicate by parameters and return values, filters communicate by receiving and sending chunks of data. An important aspect of this interface is the inherently decoupled nature of input and output, meaning that not every input needs to yield output, and not every output needs to be caused by input. (We could say that a filter is a generalization of a flattening operation and the opposite of it, and of a mix between the two.)

The way pipes work is thus fundamentally different from the synchronous nature of function calls and makes it impossible to implement a pipe as a simple loop of read-transform-transform-transform-write operations like this

-- Constructs a pipe connecting the given functions. The pipe will read data
-- by calling its first parameter, then transform it by invoking all given
-- functions in sequence, and output the result by calling its second parameter.
-- This will continue until input is exhausted.
-- THIS IS NOT A REAL PIPE!
function pipe( ... )
    local filters = arg
    return function( input, output )
        for data in input do
            local current = data
            for _, filter in ipairs( filters ) do
                current = filter( current )
            end
            output( current )
        end
    end
end

p = pipe( transformation1, transformation2, transformation3 )
p( io.lines, print )

Such an implementation will work only for simplest transformations where every input will be turned into exactly one output on any stage. This is not what filters in the general case do. (This doesn't mean, however, that pipes are capable of doing transformations that are impossible using function calls: If all data to be transformed can be collected and given to the transformation as a whole, it can as well be processed using ordinary function calls - the difference will be mostly in memory footprint. There is, albeit, a difference for infinite data streams. At the other hand, the individual stages of a pipe very well can and commonly do use ordinary functions to accomplish their processing tasks. This is because they are free to collect as much data as necessary as input for a concrete function call.)

Diego implements filters using a function call interface and addresses the impedance mismatch between the decoupled nature of input and output of filters vs. the synchronous nature of function calls by using a special protocol that allows for non-output and non-input "filter" calls. While this complicates not only the scheduler, but more importantly the filter implementations as well, it works very well for the sort of processing he targets with his framework.

More straightforward implementations of the filter algorithms become possible when filters are implemented as separate processes, each having its own control loop, reading input and writing output by calling system functions. This is the way filters are implemented in UNIX, and in Lua we can use coroutines to achieve the same effect. All scheduling and state-keeping is left to the runtime, so filter writers can concentrate on their algorithm only, using a natural and easy notation. Let's see how we can implement pipes and filters in Lua.

Exploring Filters and Pipes

Before we begin, a word about the naming convention I use, so you hopefully won't get confused by the variable names in the code below: A prefix of "p" denotes a function parameter, a prefix of "l" a local variable, a prefix of "s" a static (module global) variable, and a prefix of "g" a global variable (not used here). Function names have no prefix (what definitly doesn't mean that a function couldn't be held in a variable which mandatorily has a prefix). Your preferences may vary, but I get a couple of important advantages from this naming convention

local lCondition = (pCondition == nil) or pCondition

The code I will present below is written for Lua 5.0.

Let's now first define the big picture and then see how we can implement it and see what we can do with it.

The Big Picture

At first, we want to be able to write filters as functions taking as parameters two functions, for input and output. Upon execution they will run from begin to end, transforming all input into output. Input will be obtained by successively calling the input function, and any output will be written by calling the output function. And, instead of only strings we want to be able to pass multiple values of any type between filters. Thus, a filter implementation will roughly follow one of the following templates

function my_filter( pInput, pOutput )
    for ... in pInput do
        ...
        pOutput( ... )
        ...
    end

或者

function my_filter( pInput, pOutput )
    while ... do
        ...
        ... = pInput()
        ...
        pOutput( ... )
        ...
    end

The filters won't return any value until now.

Second, we want to be able to connect filters implemented this way into chains (pipes) that feed the output of one filter into the input of the next. The input of the first filter and the output of the last filter will be identical to the input and output of the pipe, thus, a pipe will itself be a filter and as such can be again combined with other filters or pipes. The code we want to write will look similar to the following, where transformation1, transformation2, transformation3 are filters implemented like shown above

p = pipe( transformation1, transformation2, transformation3 )
p( io.lines, print )

This example reads all lines from standard input, transforms this stream by piping it through the three filters in sequence, and prints the results to the console.

As you can see, in contrast to Diego's solution, we don't need any pumps for this to happen! This is interesting, and it results from the fact that our filters in contrast to Diego's ones are active entities, pumping their input to output themselves. (This is, by the way, the reason why we need coroutines to have multiple filters work together.)

Implementing Pipes

As we said, we want to be able to pass multiple values of any type between filters. In addition, nil's should be allowed too, and for this we define that the end of the data stream will be reached when the first of the values transferred by the input/output handshake is nil.

For this to implement in Lua 5.0 we need a simple utility function doing the opposite of Lua unpack() since simply wrapping a call into braces as usual to collect its return values won't do the job when there are nil's

-- Inverse of Lua unpack(). Returns all its arguments packed into an array.
function pack( ... )
    return arg
end

Now, let's implement pipes. The following function constructs a pipe. It takes a number of filters and returns a new filter representing the chain. The returned function, upon invocation, wraps all filters but the last one into coroutines and creates a function for each of them that resumes the coroutine, passing the filter's input as input and coroutine.yield as output, and returns the yielded value(s) (that is, the filter's output). This function, a generator, acts as the input for the next filter in the chain, the first filter gets the pipe's input as input. After building the chain, the last filter is invoked with its input and the output of the pipe, and what it returns is returned from the pipe (tail call). We don't need the returned value(s) until now but we will see later what this gives us.

-- Creates a new filter acting as the concatenation of the given filters. A
-- filter is a function with two parameters, input and output, that obtains
-- its input by calling input and writes output by calling output. Optionally,
-- return values may be produced, the return values of the last filter being
-- returned by the pipe.
function pipe( ... )
    local lFilters = arg
    local lFilterCount = table.getn( lFilters )
    return function( pInput, pOutput )
        local lCurrentInput = pInput
        for lFilterPosition = 1, lFilterCount - 1 do
            local lCurrentFilter = coroutine.create( lFilters[ lFilterPosition ] )
            local lPreviousInput = lCurrentInput
            lCurrentInput = function()
                local lResult = pack( coroutine.resume( lCurrentFilter, lPreviousInput, coroutine.yield ) )
                local lSuccess = table.remove( lResult, 1 )
                if lSuccess then
                    if coroutine.status( lCurrentFilter ) == "suspended" then
                        return unpack( lResult )
                    end
                else
                    error( lResult[ 1 ] )
                end
            end
        end
        return lFilters[ lFilterCount ]( lCurrentInput, pOutput )
    end
end

Some additional notes about the code above: First, when resuming the coroutines, we pass the filter's input and output. This would strongly be necessary only for the first resume that starts the execution of the coroutine. After that, the two arguments will be returned to the filter by every output call. Since they will be ignored by the filter, this doesn't hurt and we need not implement special handling. Second, since filters may return some value (even if we wouldn't have allowed it by contract), we have to check after every resume if the coroutine has yielded or has ended to decide if the result we got from the resume has to be output.

Note also that the pipe function doesn't build the filter chain itself, this is deferred until the filter is actually run. Otherwise it would not be possible to execute a pipe more than once, and it wouldn't be a filter in the strong meaning.

This small function is all we need to have pipes in Lua! Let's see what we can do with this.

Sources, Sinks and Sealed Pipes

First, let's think about filters and where our data will come from and where it goes. As it turns out, our filter interface allows for two special types of filters: A filter that doesn't call its input parameter produces its output out of itself - it's a source. Conversely, a filter that doesn't call its output parameter only consumes input - it's a sink. Obviously sources and sinks can occur only at the beginning respective end of a pipe. A pipe beginning with a source is itself a source, one ending with a sink is itself a sink. A pipe beginning with a source and ending with a sink is a special case: it doesn't do any input or output, pumping upon invocation all data from the source to the sink in one rush. We'll look at those pipes shortly - they have a number of interesting properties. First let's examine where sources and sinks get their data from and where they put it to.

What could the input and output parameters in case of a source and sink, respectively, mean? They are not needed here for obtaining input or writing output, but they can be used for describing the (real) data source or destination to the filter (source or sink) as, for instance, the name of a file to read from or write to, or a string to pull data from by parsing it. The pipe containing these sources or sinks will have these parameters as its own input and output parameters, passing them to the source and sink unchanged.

Now, imagine for a moment that our sink is something that collects the data it receives into, for instance, a table, or that computes some aggregated value from it like a word count or average. This is where the return value of filters comes into play: Sinks can, instead of (or in addition to) using their output parameter, return a value (or possibly multiple values). The pipe (which is in this case also a sink) will return this result without modification.

Now, let's look again at pipes which are at the same time source and sink. As stated, they don't perform any input or output calls. We can call such pipes sealed because they keep their data flow inside and instead expose a function call interface: They communicate input and output by using parameter and return values. That is, they work effectively as ordinary functions with one or two parameters and possibly returning a result! Like functions, they can be freely invoked, upon every invocation "pumping" the data that was given them in one piece in the input parameter through itself and returning the result again in one piece (either as return value or putting it where specified by the output parameter). Note that there is also no state kept between successive data like in normal filter operation: Every input gets its own fresh pipe instance that "dies" when this data has been processed and the result returned. So, sealed pipes effectively turn pipes into functions!

Let's now see how using pipes looks in practice.

Using Pipes

For experimentation we will define a simple data source yielding a sequence of integers starting with 1 and continuing until the limit it has been given as input parameter is reached

-- Produce a sequence of integers
function seq( pLimit, pOutput )
    for lValue = 1, pLimit do
        pOutput( lValue )
    end
end

We can test this filter already. As it turns out (at least as long as print() gives us enough information about our values to be useful), we can invoke any filter or pipe for debugging purposes with its input and print as output to get its output printed to the console! Nifty, eh? Let's print the output of our number sequence source to the console

> seq( 4, print )
1
2
3
4

Now we will define a data sink computing the average of all values read from input

-- Compute the average from a sequence of numbers
function avg( pInput )
    local lSum, lCount = 0, 0
    for lValue in pInput do
        lSum = lSum + lValue
        lCount = lCount + 1
    end
    return lSum / lCount
end

Let's test it by feeding it the data produced by our data source. We will build a pipe for this

> average_of_sequence = pipe( seq, avg )
> return average_of_sequence( 4 )
2.5

Now, let's do some filtering. The following filter reads numbers from input and writes their squares to output

-- Produce the squares of the input values
function square( pInput, pOutput )
    for lValue in pInput do
        pOutput( lValue * lValue )
    end
end

Using this filter, we'll build a pipe delivering a sequence of the first n squares, then append another square filter to it to get another pipe yielding a sequence of powers of 4, and finally we build a pipe from scratch to compute the average of the first n powers of 4

> seq_squares = pipe( seq, square )
> seq_squares( 4, print )
1
4
9
16
> seq_squares_squared = pipe( seq_squares, square )
> seq_squares_squared( 4, print )
1
16
81
256
> average_of_seq_of_powers = pipe( seq, square, square, avg )
> return average_of_seq_powers( 4 )
88.5

This apparently works very well, but our square filter indeed isn't that interesting: It doesn't make use of the decoupled input and output pipes provide. The transformations above could have been very well implemented using the simplified pipe implementation from the introduction. Let's thus write two other more interesting filters. The first one will output only the odd numbers read from input, and the second will collect its input into pairs

-- Let only odd numbers pass through
function odd( pInput, pOutput )
    for lValue in pInput do
        if math.mod( lValue, 2 ) == 1 then
            pOutput( lValue )
        end
    end
end

-- Collect input into pairs
function pair( pInput, pOutput )
    for lFirstValue in pInput do
        local lSecondValue = pInput()
        pOutput( lFirstValue, lSecondValue )
        if lSecondValue == nil then
            break
        end
    end
end

If we connect the filters with our number sequence source we see that they work as expected

> seq_odd = pipe( seq, odd )
> seq_odd( 4, print )
1
3
> seq_pairs = pipe( seq, pair )
> seq_pairs( 4, print )
1       2
3       4

We won't build an example for a filter now that calls output more often than input since we'll see this in action when we discuss flattening filters later on. For now let's have a look at how we can interface our pipes with code we might already have or intend to write.

Interfacing Pipes with Existing Code

Creating Sources and Sinks

As you might have noted, filters and pipes take a generator as their first (input) parameter. A number of functions is available that return generators like io.lines() or string.gfind(), which we can directly use as input for our pipes. To process, for instance, all numbers contained in the string data with the pipe my_pipe we could write

> my_pipe( string.gfind( data, "%d+" ), print )

It would be nice, however, if we could make a data source from string.gfind, so we could put this inside our pipe at its beginning, and instead write

> my_pipe( data, print )

The following utility function helps us with this, constructing a data source from a function returning a generator. Upon execution, the data source calls the given function and writes all data delivered by the returned generator to its output

-- Wraps a function returning a generator as data source
function source( pFunction )
    return function( pInput, pOutput )
        local lInput = pFunction( pInput )
        while true do
            local lData = pack( lInput() )
            if lData[ 1 ] == nil then
                break
            end
            pOutput( unpack( lData ) )
        end
    end
end

Driving by, for our convenience we define the following function that wraps a function taking many arguments and some values into a function taking only one argument that calls the given function with its argument followed by these values. Since most functions of the Lua library take the value to act on (the this value) as the first argument, this function gives us in Lua what partial function application gives in functional programming

-- Wraps a function returning a generator as data source
function curry( pFunction, ... )
    return function( pParam )
        return pFunction( pParam, unpack( arg ) )
    end
end

Now we can build a data source that uses string.gfind with a pattern "%d+" to find all numbers contained in the string given as input parameter and write them to its output (as strings)

> parse_numbers = source( curry( string.gfind, "%d+" ) )
> parse_numbers( "123 5 78 abc 12", print )
123
5
78
12

We'll work with this data source in the next section after we introduced another interface function.

So, after we defined a function that builds a data source from a function returning a generator, how about building a data sink from something we already have? What would be the opposite operation?

As it turns out, there's nothing we have to build: The opposite operation to a function taking a value and returning a generator would be a function taking a generator and returning a value. But that's exactly the description of our filter interface for the case of sinks: Any function that takes a generator (and optionally a second argument) and (optionally) returns some value is already a sink - there is no need to build one!

Passing Values through Functions

The next thing we would like to have is building filters from existing functions. The pattern is easy: Read input, call the function and write the result to output. The following function accomplishes this

-- Wraps a function as filter
function pass( pFunction )
    return function( pInput, pOutput )
        while true do
            local lData = pack( pInput() )
            if lData[ 1 ] == nil then
                break
            end
            pOutput( pFunction( unpack( lData ) ) )
        end
    end
end

With this function at hand we can create a filter that uses tonumber() to convert the strings passed from our number parser to numbers, so we can use them to compute their average. We build the following pipe

> avg_from_string = pipe( parse_numbers, pass( tonumber ), avg )
> return avg_from_string( "123 5 78 abc 12" )
54.5

Note that we can use pass() with a sealed pipe to use it as a subpipe and pass data through it.

The following function is a variety of pass that doesn't do any output, effectively consuming all input without any return

-- Wraps a function as data sink, consuming any input without any output or
-- return
function consume( pFunction )
    return function( pInput )
        while true do
            local lData = pack( pInput() )
            if lData[ 1 ] == nil then
                break
            end
            pFunction( unpack( lData ) )
        end
    end
end

Reading from and Writing to Pipes

Let's now see if we can interface pipes with custom program logic that processes data in loops. There are two ends of the pipe to interface with: processing the output flowing out of a pipe, and feeding values into a pipe. (Actively feeding a filter's input and reading its output in the same thread probably wouldn't be of much use because of the buffering and synchronization issue it causes. We won't go into this, therefore.)

To process the data coming out of a pipe by a loop we need to turn the pipe into a generator. The following function accomplishes this, wrapping a pipe (or any filter) into a reusable function with one argument that returns a generator yielding the output of the pipe when it's given the argument as input (This function exposes the same signature pattern as is used by string.gfind(), io.lines() and the like)

-- Wraps a filter into a function with one argument returning a generator that
-- yields all values produced by the filter from the input given as argument.
function drain( pFilter )
    return function( pInput )
        local lGenerator = coroutine.create( pFilter )
        return function()
            local lResult = pack( coroutine.resume( lGenerator, pInput, coroutine.yield ) )
            local lSuccess = table.remove( lResult, 1 )
            if lSuccess then
                if coroutine.status( lGenerator ) == "suspended" then
                    return unpack( lResult )
                end
            else
                error( lResult[ 1 ] )
            end
        end
    end
end

We can now write code of the following sort

for ... in drain( pipe( ... ) )( ... ) do
    ...
end

We stated initially that the pipe to drain has to be a data source, but in fact, the function above makes no assumptions about that and thus does not require the pipe to be a data source. It depends on what kind of parameter we intend to give to the returned function: If we plan to give it a generator, the pipe need not be a data source.

Now, how about the opposite binding, that is, turning a pipe that is a data sink into a function that we can call repeatedly to feed data into the input of the pipe? We can implement a function returning such a feeder like follows (for simplicity we don't return a reusable function here like in the case of drain() but bind the output parameter to the pipe in one step)

-- Wraps a pipe that must be a data sink into a function that feeds its
-- arguments into the pipe's input.
-- THIS FUNCTION DOES NOT WORK WITH THE PIPES WE USE HERE!
function feed( pPipe, pOutput )
    local lOutput = coroutine.create( pPipe )
    local lPush = function( ... )
        local lSuccess, lResult = coroutine.resume( lOutput, unpack( arg ) )
        if not lSuccess then
            error( lResult )
        end
        return coroutine.status( lOutput ) == "suspended"
    end
    lPush( function() return coroutine.yield() end, pOutput )
    return lPush
end

The function creates a coroutine from the pipe passing it coroutine.yield as input, then executes it until it yields (requesting its first input), and returns a wrapper that resumes the coroutine on every invocation passing its arguments to it. The wrapper returns true if the coroutine yields again, that is if it will accept more input. Let's use it

> feeder = feed( square, print )
stdin:6: attempt to yield across metamethod/C-call boundary
stack traceback:
        [C]: in function `error'
        stdin:6: in function `lPush'
        stdin:10: in function `feed'
        stdin:1: in main chunk
        [C]: ?

Oops! We stumbled upon a limitation of Lua: Lua doesn't allow the generator called from the header of a for loop to yield! If we rewrite our square filter, we can work around this limitation

> function square2( pInput, pOutput )
>>     while true do
>>         local lValue = pInput()
>>         if lValue == nil then break end
>>         pOutput( lValue * lValue )
>>     end
>> end
> feeder = feed( square2, print )
> return feeder( 2 )
4
true

There is, however, another problem: The feed function will only work for simple filters, it won't work for pipes. The reason is that in our pipes implementation the coroutines executing the filters are "stacked" one above the other in a resume/yield chain, and the filter that receives coroutine.yield as input has already been resumed by the filter following it, so upon yield it will return there and not to the code calling the pipe at the top level. We won't get the processor back, having the pipe waiting for input, as we intended.

We can improve our pipes implementation so that it doesn't have this problem: For this, we wrap all contained filters into coroutines and have them yield on input and output, and control them by a main loop that calls input and output itself

-- A symmetric pipe implementation. Pipes of this sort can be resumed from
-- both ends but prevent the constituting filters to use 'for' loops for
-- reading input. Also, sources and sinks cannot use the input and output
-- parameters of the pipe.
function symmetric_pipe( ... )
    local lFilters = arg
    local lFilterCount = table.getn( lFilters )
    return function( pInput, pOutput )
        local lHandover
        local lInput = function()
            lHandover = nil
            coroutine.yield()
            return unpack( lHandover )
        end
        local lOutput = function( ... )
            lHandover = arg
            coroutine.yield()
        end
        local lProcessors = {}
        for _, lFilter in ipairs( lFilters ) do
            table.insert( lProcessors, coroutine.create( lFilter ) )
        end
        local lCurrentProcessor = lFilterCount
        while lCurrentProcessor <= lFilterCount do
            if not lProcessors[ lCurrentProcessor ] then
                error( "Requesting input from closed pipe" )
            end
            local lSuccess, lResult = coroutine.resume( lProcessors[ lCurrentProcessor ], lInput, lOutput )
            if not lSuccess then
                error( lResult )
            end
            if coroutine.status( lProcessors[ lCurrentProcessor ] ) == "suspended" then
                if lHandover == nil then
                    lCurrentProcessor = lCurrentProcessor - 1
                else
                    lCurrentProcessor = lCurrentProcessor + 1
                end
                if lCurrentProcessor == 0 then
                    lHandover = pack( pInput() )
                    lCurrentProcessor = 1
                elseif lCurrentProcessor > lFilterCount then
                    pOutput( unpack( lHandover ) )
                    lCurrentProcessor = lFilterCount
                end
            else
                lHandover = {}
                lProcessors[ lCurrentProcessor ] = nil
                lCurrentProcessor = lCurrentProcessor + 1
            end
        end
    end
end

This pipes implementation allows pipes to be used with feed() too, however, these pipes have another problem: Since all filters now will yield on input, they suffer from Lua's limitation we saw above, effectively prohibiting us to use for loops in any filter. This is too severe a limitation, so we abandon this otherwise elegant implementation, accepting that there is no way to feed data into pipes from within a loop. If we want to write code that feeds data into a pipe, we have to either wrap it into a generator using corourine.yield and coroutine.wrap or write it as a data source, stuffing it in front of the pipe, and then execute the pipe. Our pipes must always be in the controlling position, and there is no way to have our loop control execution!

There is another issue with the symmetric_pipe() implementation above: It would prohibit using the input and output parameters of the pipe as parameters for the data sources and sinks because they never get them to see. We would lose quite a bit of flexibility by this.

Reading and Writing Files

Using our source() function from above, we can simply wrap io.lines to get a data source that reads a file and writes its contents line by line to its output. There is, alas, no function in the standard library that we could use as a file sink, we have to write our own using io.write. For symmetry, we therefore implement also a file source using io.read. We implement it in such a way that we can use it either as source (taking two parameters) or as function returning a generator (when invoked with one parameter). This allows us to use it at our discretion for building pipes as well as for invoking them

-- Returns a function that can be used as data source yielding the contents of
-- the file named in its input parameter, processed by the given formats, or
-- that can be called with a filename alone to return a generator yielding this
-- data. If no formats are given, line-by-line is assumed.
function filereader( ... )
    local lFormats = arg
    return function( pInput, pOutput )
        local lInput = io.open( pInput )
        local lOutput = pOutput or coroutine.yield
        local lFeeder = function()
            while true do
                local lData = pack( lInput:read( unpack( lFormats ) ) )
                if lData[ 1 ] ~= nil then
                    lOutput( unpack( lData ) )
                end
                if lData[ table.getn( lData ) ] == nil then
                    break
                end
            end
            lInput:close()
        end
        if pOutput then
            lFeeder()
        else
            return coroutine.wrap( lFeeder )
        end
    end
end

-- Returns a data sink that writes its input, optionally formatted by the
-- given format string using string.format(), to the file named in its output
-- parameter. If no format is given, the processing is analogous to print().
function filewriter( pFormat )
    return function( pInput, pOutput )
        local lOutput = io.open( pOutput, "w" )
        while true do
            local lData = pack( pInput() )
            if lData[ 1 ] == nil then
                break
            end
            if pFormat then
                lOutput:write( string.format( pFormat, unpack( lData ) ) )
            else
                for lIndex = 1, table.getn( lData ) do
                    if lIndex > 1 then
                        lOutput:write( "\t" )
                    end
                    lOutput:write( tostring( lData[ lIndex ] ) )
                end
                lOutput:write( "\n" )
            end
        end
        lOutput:close()
    end
end

Let's use both to copy a file line by line

> copy = pipe( filereader(), filewriter() )
> copy( "data.in", "data.out" )

If we stuff various filters between the reader and writer, we can do any processing we want on the lines traveling the pipe. A simple scenario is a utility like grep. The following function builds a filter we can use for this

-- Returns a filter that filters input by the given regexp
function grep( pPattern )
    return function( pInput, pOutput )
        for lData in pInput do
            if string.find( lData, pPattern ) then
                pOutput( lData )
            end
        end
    end
end

We won't test this right now but instead compose a slightly more interesting example. You might have noticed that the function above doesn't mention lines in any way. Indeed, the filter is independent from the input being lines or not. Let's write another filter that collects lines into paragraphs and stuff the two together to do the grep on paragraphs instead of lines (like grep -p).

-- Filter collecting lines into paragraphs. The trailing newline is preserved.
function paragraphize( pInput, pOutput )
    local lBuffer = {}
    for lLine in pInput do
        table.insert( lBuffer, lLine )
        if lLine == "" then
            pOutput( table.concat( lBuffer, "\n" ) )
            lBuffer = {}
        end
    end
    if next( lBuffer ) then
        pOutput( table.concat( lBuffer, "\n" ) )
    end
end

> filter_foobar_paragraphs = pipe( filereader(), paragraphize, grep( "foobar" ), filewriter() )
> filter_foobar_paragraphs( "data.in", "data.out" )

If we liked, we could have omitted the file writer, giving print instead of an output file name as second parameter to the pipe to have the results printed to the console.

An especially interesting kind of file reader is one that reads Lua data files of the sort described in section 10.1 of PIL (https://lua.ac.cn/pil/10.1.html): We would want it to write to output one after the other the entries read from the file. The following function works - similarly to the filereader - as a data source or function returning a generator. It executes the Lua file in an empty sandbox, exploiting some metaprogramming to submit the executing chunk the function(s) it calls for submitting a record (entry in the PIL example). This is done by defining the __index metamethod of the sandbox to provide the chunk with a function that outputs or yields the record together with its tag name (entry in our case). Having the tag name written to output too allows us to have input files carrying more than one kind of data (entity type), for instance if we would want to keep the contents of an entire (sufficiently small) relational database in one data file we could use the table names as tagnames and write the contents of all tables to one file.

-- Invocation with a single argument returns a generator yielding the records
-- from the named Lua file. When invoked with two arguments this function acts
-- as a data source yielding this data on output.
function luareader( pInput, pOutput )
    if pInput == nil then
        error( "bad argument #1 to `loadfile' (string expected, got nil)" )
    end
    local lInput, lError = loadfile( pInput )
    if not lInput then
        error( lError )
    end
    local lOutput = pOutput or coroutine.yield
    local lSandbox = {}
    setmetatable( lSandbox, {
        __index = function( pSandbox, pTagname )
            local lWorker = function( pRecord )
                lOutput( pRecord, pTagname )
            end
            pSandbox[ pTagname ] = lWorker
            return lWorker
        end
    } )
    setfenv( lInput, lSandbox )
    if pOutput then
        lInput()
    else
        return coroutine.wrap( lInput )
    end
end

If we test this with the data from https://lua.ac.cn/pil/10.1.html (only one record) we get

> reader = luareader( "data.lua" )
> return reader()
table: 2001af18 entry
> return reader()
> 

Let's now examine some more complex transformations possible with pipes.

Flattening and Folding Filters

You might have asked yourself already how we would feed data into a pipe from multiple input sources so that the input to the pipe is the concatenation of the output of those sources.

如果你仔细思考,你会发现这个操作的核心并不局限于数据源,而可以发生在管道的任何阶段:我们需要的是一个过滤器,它能为每个输入项生成一个序列,并将所有这些序列的连接体写入输出。这本质上是一个展平(flattening)操作。有了这个作为过滤器的通用功能,我们就再也不需要构建像多源(multisource)这样的特殊东西了:要连接多个输入,我们只需要生成这些输入或描述它们的值的序列,然后应用一个过滤器,将每个元素展开为其包含的数据。最好的做法是调用一个函数,该函数以元素为参数,并返回一个生成器,该生成器产生展开它所产生的数据。

以下函数接收一个返回生成器(例如 io.lines)的函数,并从中构建一个过滤器,该过滤器为每个输入调用该函数,将生成器的输出馈送到其输出。

-- Returns a filter that calls the given function for every input and feeds
-- the output of the returned generator into its output.
function expand( pFunction )
    return function( pInput, pOutput, ... )
        while true do
            local lInput = pack( pInput() )
            if lInput[ 1 ] == nil then
                break
            end
            local lGenerator = pFunction( unpack( lInput ) )
            while true do
                local lData = pack( lGenerator() )
                if lData[ 1 ] == nil then
                    break
                end
                pOutput( unpack( lData ) )
            end
        end
    end
end

假设我们有一个文件,其中包含文件名列表,每行一个。我们想将这些文件的内容逐行连接到一个输出文件中。我们可以这样做:

> concat_files = pipe( filereader(), expand( filereader() ), filewriter() )
> concat_files( "filelist.in", "data.out" )

顺便说一句,这个例子展示了 filereader() 返回的函数的双重特性的隐藏威力。

现在让我们来考虑相反的操作:将通过管道流动的数据组装成更大的单元。由于通过管道流动的数据本质上是*平坦的*,一个组装过滤器必须知道(可能通过查看数据)从哪里划分边界,以区分进入一个组装的数据和进入下一个组装的数据。没有像展平情况下的*输入结束*这样的外部提示来从一个组装切换到下一个。

描述组装算法最简单的方法是将其作为一个函数,该函数接受一个迭代器作为参数,从中提取所需的数据,并返回组装体。这样的函数可以通过以下函数包装成一个折叠过滤器。

-- Filter that calls the given function with its input until it is exhausted,
-- feeding the returned values into its output.
function assemble( pFunction )
    return function( pInput, pOutput )
        local lTerminated = false
        local function lInput()
            local lData = pack( pInput() )
            if lData[ 1 ] == nil then
                lTerminated = true
            end
            return unpack( lData )
        end
        repeat
            pOutput( pFunction( lInput ) )
        until lTerminated
    end
end

有了这个函数,我们就可以为我们之前实现的将行收集到段落的过滤器提供一个更简单的实现。

-- Collects a paragraph by pulling lines from a generator. The trailing newline
-- is preserved.
function collect_paragraph( pInput )
    local lBuffer = {}
    for lLine in pInput do
        table.insert( lBuffer, lLine )
        if lLine == "" then
            break
        end
    end
    return table.concat( lBuffer, "\n" )
end

paragraphize = assemble( collect_paragraph )

嵌套管道

处理复杂数据

在上一节中,我们看到了如何拆解和组装复合数据(在本例中是文件和段落)。这些复合数据的组成部分与复合数据本身沿同一线性管道传输,从而形成一个单一的线性转换路径。(这正是管道的意义所在,不是吗?)在这种情况下,需要注意两点:

毕竟,在一般情况下,后者甚至是不可能的,因为中间过滤器可以随意修改拆解后的数据,模糊了初始复合数据的界限。

现在,想象一下,如果我们想处理具有异构结构和需要进行不同转换的组成部分的复杂数据,并也想为此类转换使用管道,那么除了一个线性管道之外,还需要使用什么样的拓扑结构?浮现在脑海中的拓扑结构将是一组并行“子管道”,从一个节点出来并汇聚到另一个节点。问题是,这些子管道的输入和输出在这些节点中以及可能在它们之间是如何同步的?

显然,并行管道不能“抢占彼此的食物”,单独拉取输入,而且任何管道产生的所有内容都必须以某种方式进入由所有管道的输出构建的结果中。自然,我们得出这样的概念:所有子管道的输入和输出必须在同一个节点(过滤器)中,并且它们恰好只有一个输入(由控制子管道的过滤器读取),并且必须产生一个输出(由控制过滤器组装并写入输出)。也就是说,子管道与过滤器自身服务的输入和输出分开,而过滤器节点本身负责提供输入和输出。事实上,子管道就这样作为*函数*运行,接受参数并产生返回值。如我们所知,这样的管道必须以数据源(获取输入值)开始,以数据接收器(返回结果值)结束。从过滤器的角度来看,这与函数调用完全没有区别。

结果证明,使用并行“子管道”处理复杂值的组成部分对拥有它们作为输入/输出的过滤器没有特殊要求——这就像调用函数一样。由于管道本身没有特殊之处,我将不提供一个函数来通过调用一组函数来从一个结构化值构建另一个结构化值。

但是,我们可以提供一个使用子管道的简单示例,该示例*不*涉及复杂值。我们将使用 pass() 来运行一个*密封管道*作为子管道,以处理管道中流动的各个值。回想一下我们上面计算 4 的前 n 次幂的平均值的 average_of_seq_of_powers 示例。现在我们对这个值如何随 n 变化感兴趣,并构建一个相应的序列,通过将每个 n 传递给我们已有的管道 average_of_seq_of_powers 来实现,将其用作子管道。

> seq_of_average_of_seq_of_powers = pipe( seq, pass( average_of_seq_of_powers ) )
> seq_of_average_of_seq_of_powers( 4, print )
1
8.5
32.666666666667
88.5

这是可能的,因为 average_of_seq_of_powers 是一个*密封管道*,因此行为像一个函数。“传递”这个术语可能有点误导,因为它不是一个活动的子管道在读取输入和写入输出——它是一个完全不同的通信模式:为每个“传递”的值创建一个新的管道实例并运行直到结束。但“传递”也是我们调用函数时使用的术语,并且值确实设法“通过”子管道进行转换,因此这可能是合理的。

处理异构数据流

除了我们在上一节中讨论的内容,我们还可以设想另一种使用并行管道的场景,其中子管道仍然是外部管道的管道,而不是伪装成函数:数据流在某个阶段可能由不同类型的数据组成,这些数据在再次加入主管道之前需要进行不同的转换。必须有某种开关将数据路由到一个子管道集合,然后由一个收集器按输入馈送的顺序从子管道的末端拉取数据。后一个条件迫使收集数据的节点进行输入路由(因为没有其他通信渠道可以用于同步)。与上述复杂数据场景的区别在于,子管道不必在输入/输出行为上同步,甚至可以具有交错的输入和输出。

要实现这种处理,我们必须能够暂停管道的输入,并在数据可用时馈送数据。这对于我们现有的管道类型是不可能的,并且需要对称管道,因此我们放弃了这个想法。

出于好奇,我们还是问一下,在这些先决条件下,是否可能有一个真正的*管道意大利面*,即从一个源发散并汇聚到一个接收器的管道*网络*。显然,这样的结构不能通过递归地将更简单的结构组装成更复杂的结构来构建,而是必须通过单独连接它们的输入和输出来连接过滤器和子管道。如果我们考虑数据如何在这样的网络中传输,我们会发现,与我们的拉式通信方案(向生成器请求输入)相比,我们需要一种推式通信(调用函数处理输出)。(我们的拉式方案与交换机执行的输出分派类型不协调——能够检查要路由的数据——并且需要仅基于*已经*输出的数据来分派*输入请求*,即在无法检查输入的情况下。)有了这些,我们甚至可以想象具有多个输入和输出的*处理网络*。这将是一个有趣的项目来实施和实验,看看它能给我们带来什么。

关于记法

虽然我们实现管道的主要目标是获得用于*实现*数据转换(即,用于长度未定义的数据流)的自然记法,然后可以将它们组合成更复杂的转换,但让我们看看如何增强*使用*这些转换的记法。为此,我们将应用一些元编程。

调用管道的内联记法,即构建一个一次性管道,并—不将其存储在变量中—立即使用它,看起来有点笨拙。以我们的 grep 工具为例(省略 paragraphize):

> pipe( filereader(), grep( "foobar" ), filewriter() )( "data.in", "data.out" )

这与 UNIX 管道记法的简洁性相去甚远,后者大致相当于:

$ filereader "data.in" | grep "foobar" | filewriter "data.out"

当思考如何使用运算符记法来构造管道时,我们首先注意到我们有两种选择。请记住,过滤器是一个函数,它接受一个生成器作为输入参数,并接受一个函数作为输出参数。有了生成器(当我们动态执行正在构建的管道时),我们可以用 coroutine.yield 替换输出,并将后续的过滤器包装成一个协程,从而得到另一个生成器,该生成器产生由过滤器处理的输入。然后,我们可以将其作为下一个过滤器的输入,并重复这个过程,最后得到一个生成器,它产生整个管道的输出。我们可以将其用于 for 循环,但要*执行*管道,我们必须添加另一个构造来耗尽生成器,或者,更好的是,通过其他构造附加接收器,为其提供输出参数。因此,输出的记法将与用于馈送输入的记法不同。此外,如果我们构建生成器链,我们将重新实现 pipe() 的大部分功能,但无法使用此记法来创建可以用作子管道的管道,例如。

因此,我选择了第二种选项:构建一个可重用的管道并立即执行它。

构建管道很容易。我们只需要将第一个过滤器包装在一个代理(proxy)中,即一个带有附加元表的表,该元表定义了实现我们所需运算符的元方法。由于第一个过滤器通常是通过 source() 构建的数据源,或者由我们的库(如 luareader)提供的,因此这种包装通常可以隐藏起来。对于那些必须显式完成的情况,我们将 bind() 函数公开可见。

-- Metatable
local sPipeOperations

-- Binds its argument to the library by wrapping it into a proxy
function bind( pValue )
    local lProxy = { payload = pValue }
    setmetatable( lProxy, sPipeOperations )
    return lProxy
end

元表定义了 __mul 元方法来实现连接过滤器的 * 运算符。我选择乘法而不是加法或连接,是因为组合在一起的过滤器的效果确实是*乘*而不是加。(我们在上面的 seq_squares_squared 示例中可以清楚地看到这一点。)此外,星号不仅看起来很突出,更重要的是,它的绑定强度大于大多数其他运算符,使得我们可以在不使用括号的情况下将其应用于管道。运算符也返回一个代理,因此我们可以对其进行链式调用。

sPipeOperations = {
    -- Chains a proxy with a function or other proxy
    __mul = function( pHeadProxy, pFilter )
        local lProxy = { head = pHeadProxy, payload = fncompile( pFilter ) }
        setmetatable( lProxy, sPipeOperations )
        return lProxy
    end,
    ...
}

为了能够透明地调用包装后的函数,我们还实现了 __call 元方法,该方法通过使用给定参数执行代理所代表的函数来工作。我们使用两个内部实用函数来构建代理所代表的函数。

-- Compiles a proxy to the function it represents
local function fncompile( pSpec )
    if type( pSpec ) == "function" then
        return pSpec
    else
        if pSpec.head then
            return fncompile( pipe( unpack( unchain( pSpec ) ) ) )
        else
            return fncompile( pSpec.payload )
        end
    end
end

-- Collects the payloads of a proxy chain into an array
local function unchain( pProxy )
    if not pProxy then return {} end
    local lResult = unchain( pProxy.head )
    table.insert( lResult, pProxy.payload )
    return lResult
end

sPipeOperations = {
    ...
    __call = function( pProxy, ... )
        return fncompile( pProxy )( unpack( arg ) )
    end,
    ...
}

fncompile() 被实现为一个单独的函数,并且也在 pipe() 的结果上调用,原因是我们希望能够自由地混合和匹配使用 * 或 pipe() 构建的管道。为了做到这一点,pipe() 像任何接受函数作为参数的函数一样,在调用之前使用 fncompile() 进行转换,并且像任何构建过滤器的函数一样,返回一个*绑定*的函数。(尽管我不会再次提供更改后的函数。)

现在我们可以堆叠管道,或者调用接受管道作为参数的函数,而无需再使用 pipe(),如果数据源或第一个过滤器是*绑定*的,甚至无需使用 bind()

> seq = bind( seq )
> average_of_seq_of_powers = seq * square * square * avg
> return average_of_seq_of_powers( 4 )
88.5
> filter_foobar_lines = filereader() * grep( "foobar" ) * filewriter()
> filter_foobar_lines( "data.in", "data.out" )

关于内联调用,到目前为止我们还没有赢得太多,因为现在我们能从上面丑陋记法的形式中移除的是四个字母:pipe。我们仍然需要保留管道构造表达式的括号,因为函数调用运算符的绑定强度大于乘法。

> (filereader() * grep( "foobar" ) * filewriter())( "data.in", "data.out" )

所以,这还不是我们想要达到的地方。对于必须用两个参数调用的管道,我们可能无法轻易地再做任何事情,而不使事情过于复杂。

但是,我们可以为输入数据提供管道记法,并结合转换结果的赋值,当管道*只*用一个输入参数调用并*返回*其结果时(即,作为具有单个参数的函数调用工作)。回想一下我们上面 average_of_seq_of_powers 的示例,我们现在将这样重写:

> result =  4 .. seq * square * square * avg

其结果用于后续处理的数据转换是一种常见情况,同时我们习惯于在这种情况下使用赋值记法。因此,这种记法似乎是一个不错的折衷方案,而且很可能如果我们尝试进一步调整记法直到管道接收器位于管道的右端(如 UNIX),我们不会获得更多收益。

如果我们在 Lua 中实现 __concat 元方法,则上述记法成为可能,Lua 在其中为 .. 运算符调用该方法,只要该运算符由两个操作数中的任何一个提供。

sPipeOperations = {
    ...
    __concat = function( pInput, pProxy )
        return fncompile( pProxy )( pInput )
    end
}

(请注意,由于 Lua 会将 __lt 元方法的 > 运算符的结果转换为布尔值,而这并非我们所需,因此我们无法使用看起来更漂亮的 > 运算符。)

最后,让我们说明这种记法的强大之处,回到我们的 seq_of_average_of_seq_of_powers 子管道示例。由于我们被迫计算一个值而不是写入输出,我们将添加另一个平均计算,得到从 1 到给定数字的序列的平均值的平均值,这些是 4 的幂。

> return 4 .. seq * pass( seq * square * square * avg ) * avg
32.666666666667

这完成了我们的元编程之旅。在给出一些最后的想法之前,让我们注意以下观察:如果我们*绑定*我们拥有的所有函数,我们可以使用 .. 记法来调用*任何*接受一个参数的函数。

> return 1 + "4" .. bind( tonumber )
5

而且,如果所有我们拥有的函数都*绑定*了,我们可以通过使用连接运算符来构建整个调用链!(然而,将其他运算符结合使用(如上面的示例)将受运算符优先级的约束,所以我不会推荐它。)但是,如果我们的数据表示为表(这种情况很常见),则有一种更好的方法来表示此类调用链:我们可以为我们的数据提供一个元表,该元表定义一个 __call 元方法,它以数据和其余参数作为参数调用给定给它的第一个参数的函数)。由此产生的记法可能看起来像这样(此处未提供的 map 函数通过在其原始键下存储从源表转换的值来构建一个表,作为结果表):

t (map, tonumber) (map, square) (avg)

这看起来像一个很好的函数调用链,它将表示数字的字符串数组 t 转换为数字,对其进行平方,然后计算平均值。其计算结果为:

avg( map( map( t, tonumber ), square ) )

这可读性差得多。

请注意,上面 squareavg 函数*不是*我们之前定义的那些。那些是过滤器,我们可以用它们在管道中计算相同的结果(假设我们有一个接受表并返回生成器产生其值的函数 values)。然后我们可以使用以下记法之一,第一个利用表的 __call 元方法,第二个利用管道代理的 __concat 元方法:

t (source( values ) * pass( tonumber ) * square * avg)
t .. source( values ) * pass( tonumber ) * square * avg

请注意,虽然第二个记法看起来好得多,但它*不能*像第一个记法那样用作语句——Lua 需要我们处理返回的值。这通常不是问题,因为我们无论如何都想将管道提供的结果用作返回值,但如果表达式的结果完全是副作用,那将是一个限制。但在这种情况下,记法可能会因为无法提供输出参数而受到影响,而第一个记法可以很容易地做到这一点。

t (source( values ) * pass( tonumber ) * square, print)

一些最终考虑

我们已经结束了讨论,让我们对我们实现的机制进行一些最后的考虑。

您可能想知道为什么我们的过滤器接口不需要从输出函数返回一个布尔值来指示过滤器输出是否正常,更重要的是,是否还可以接受更多输出。毕竟,我们将过滤器实现为主动写入输出的实体,必须有人接收它。在 UNIX 系统上,当一个过滤器尝试向下一个过滤器的输入写入数据时,而下一个过滤器已经执行完毕并关闭了其输入,不再接受数据时,我们会看到“管道关闭”错误。

我们不需要这个的原因是,在我们的实现中,任何过滤器都*被*跟随它的过滤器*恢复*,当它*请求*输入时。也就是说,它*只有*在先前输出已被处理*并且*可以接受新输出时才能执行(事实上,它甚至*被期望*)。因此,过滤器处理了其输入但无法交付其输出的情况永远不会发生:输出永远不会失败——我们永远不会看到我们从 UNIX 中知道的“管道关闭”错误!这是我们*拉式*通信方案的直接结果。因此,这种通信方案也为我们的过滤器接口带来了良好的对称性:输入函数不接受值并返回数据,输出函数接受数据并返回空值。

一个副作用是,不能保证任何过滤器会读取其输入直到结束。因此,过早结束输入处理将不会被注意到。实现这样的检查很容易——任何过滤器在它前面的过滤器结束之前都必须结束——但这可能不值得付出努力。(然而,如果例如文件读取器没有被耗尽,可能会留下未关闭的文件,尽管抛出错误也无济于事。)

如果我们使用*推式*通信方案实现管道(让过滤器在输入时暂停并恢复输出),我们将面临另一种情况:当输出返回(从恢复调用到下一个过滤器)时,它知道该过滤器是否仍然存活,因此可以接受更多输入。它可以将此信息传达给调用过滤器,以便它可以停止处理(或忽略它,在下一次输出时会引起“管道关闭”错误)。然后,输入/输出接口将失去其对称性,构成过滤器实现的循环将不再仅由输入控制。看来,我们的直观的拉式实现从这个角度来看也更优越,并产生了更自然的实现。

再次思考输出不可能失败,似乎与现实存在差异,例如到网络连接的输出*肯定*会失败。因此,让我们仔细看看:

管道中的任何过滤器(除了最后一个)都将 coroutine.yield 作为输出。此调用确实永远不会失败。(当协程再次恢复时,它会返回。)另一方面,管道中的最后一个过滤器被设计为一个数据接收器,所以它不会调用其输出,这意味着输出调用对它也不会失败。那么,输出错误是从哪里产生的呢?显然,当*数据接收器*调用可能失败的函数时。这意味着数据接收器需要与它们向其传递数据的任何内容建立一个合同,该合同允许通信输出通道的状态并处理任何错误。或者,我们可以不编写自定义数据接收器,而是编写一个自定义程序循环来*耗尽*管道,将其输出馈送到某个通道,并自行处理任何问题。在这两种情况下,任何过滤器都不必处理任何通信错误。(然而,如果我们给一个可能失败的函数作为输出到没有以接收器结束的管道,错误要么被忽略,要么会被抛到调用该管道执行的代码中。)

输入错误呢?在任何过滤器中发生的任何错误都会被透明地转发(沿着输入链重新抛出)到数据接收器。因此,接收器,或者如果我们喜欢一个我们编写的从管道末端读取的循环,将处理(或不处理)输入错误和输出错误。

另一个有趣的观察:回想一下在*一个*线程中运行的控制代码来读写管道的输入和输出的场景。

为了解决它带来的同步问题,我们可以决定使用回调架构来解耦处理管道两端的代码:给管道一个callback,通过它可以获得输入,从而让控制代码只管理管道的输出端。事实证明,这*正是*我们在这篇文章中处理管道的方式:callback 是传递给管道的输入函数(生成器)!这是 drain() 的场景。

从另一端看,我们可以决定提供一个 call forward 给管道,用于传递其输出,并让控制代码只处理管道的输入端。同样,这正是我们的过滤器接口所关注的:我们作为输出参数提供*就是*这样的一个 call forward 函数。这是 feed() 的场景。

如果我们比较这两种场景,我们会发现,在这两种情况下,管道(正如我们所知,它们本质上是主动的,就像每个过滤器一样)都被包装成由函数表示的被动实体,只对外部刺激做出反应(函数调用):在我们这里使用的拉式管道实现中,从管道输出端拉取数据;或者,如果我们使用推式实现管道,则将数据馈送到管道输入端,让过滤器在输入时暂停而不是在输出时暂停。(这导致一个不如生成器方法自然且更复杂的解决方案,并且此外还带来了对在过滤器实现中使用 for 循环的上述限制。)

只要管道只是*执行*(即,包含数据源*和*数据接收器),这两种管道实现之间就没有外部可见的差异。

在此处再次审视同一部分的*过滤器网络*:这实际上是一个事件驱动的应用程序,它由一个活跃的处理节点网络构建而成。它响应输入端的事件(从外部调用输入函数以馈送数据),将其转换为输出端的事件(调用输出函数以传递数据)和/或更改其内部状态(可能甚至重新配置自身)。可能存在任何类型的处理节点,而不仅仅是我们认为是过滤器的节点,顺便说一下!虽然这本质上是一个 call forward 架构,但事件驱动的应用程序通常在 callback 架构中出现。这很有趣,但没有矛盾,一切都取决于我们从哪个角度看待调用——从被调用的(应用程序)还是调用(框架)的代码。因此,当我们把我们的*处理网络*连接到一个从外部源(例如鼠标事件)馈送输入的框架,并且可能还提供用于输出的函数(例如图形例程)时,我们自然会将其输入函数作为callbacks提供给它。


RecentChanges · preferences
编辑 · 历史
最后编辑于 2012 年 3 月 28 日上午 2:00 GMT (差异)