Pipelines
All the code used here is on my github: pabloos. Keep an eye on it, for clarity I will not reference all the functions. Each section has his own branch.
1. Original Sketch
A pipeline is not unlike an assembly line: a queue of jobs that transform an input and sends it to the next stage. In Go an obvious implementation would be based on channels (originally based on an idea in the oficial blog), in which each function represents a separate stage connected with the next one using a channel.
type pipe <-chan int
Thus, the stage’s header should be:
func stage(in pipe) pipe
Also, two more stages for the start and end queue are needed:
func source(numbers ...int) pipe
func end(in pipe) []int
The source receives the inputm and the end stage returns an integer array. The point here is that all the async actions are wraped between the stages through channels, so we only need to keep care of the input and output types.
Here is an example of a whole pipeline that performs a simple job on each stage:
func source(numbers ...int) pipe {
out := make(chan int)
go func() {
for _, n := range numbers {
out <- n
}
close(out)
}()
return out
}
func firstStage(in pipe) pipe {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func secondStage(in pipe) pipe {
... //same logic as the way above
}
...
func end(in <-chan int) []int {
out := make([]int, 0)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for n := range in {
out = append(out, n)
}
wg.Done()
}()
wg.Wait()
return out
}
As you can see, we need a Barrier pattern as a WaitGroup in the end function to wait for all the responses sent by the previous stage.
In order to run the pipeline you need to nest every stage into the next one:
func main() {
for _, n := range end(thirdStage(secondStage(firstStage(source(2, 3, 2, 34))))) {
fmt.Println(n)
}
}
2. A refactor proposal
There are some obvious improvements to the code above:
- All the stages have the same header. So it could be an user-defined type; we will use this later:
type stage func(pipe) pipe
- All of them, also, have the same statements inside. The only diference is the mutation, so…
type modifier func(int) int
-
The previous approach seems to mix the structure and the business logic. This is not desirable. We can decouple both by injecting the modifier in a function-composition manner:
func firstStage(in pipe, mod modifier) pipe { out := make(chan int) go send(out, in, mod) return out }
-
Also the async function has only one responsability: send the result. So it’s another candidate to be injected as we did above with the modifier.
func send(outChan chan int, inChan pipe, mod modifier) { for n := range inChan { outChan <- mod(n) } close(outChan) }
Decoupling all these things brings us the chance to test easily. That’s for unit testing… but what about to see the pattern as a structure to check with integration tests?
3. Pattern as an object. Autogenerate the stages
We could present all the elemets as a struct:
type (
pipe <-chan int
source func(...int) pipe
end func(pipe) []int
stage func(pipe) pipe
stages []stage
Pipeline struct {
source
stages
end
}
)
Also, stages can be autogenerated as we can inject the modifiers:
func NewPipeline(source source, end end, modifiers ...modifier) *Pipeline {
return &Pipeline{
source,
genStages(modifiers...),
end,
}
}
func genStages(modifiers ...modifier) stages {
stages := make(stages, 0)
for _, modifier := range modifiers {
stages = append(stages, getStage(modifier))
}
return stages
}
func getStage(mod modifier) stage {
return func(input pipe) pipe {
output := make(chan int)
go send(output, input, mod)
return output
}
}
So, when we want to run the pipeline we only need to deploy the stages in the same order as we specified them and then consume the data stream at each stage:
func (pip *Pipeline) Exec(input ...int) []int {
lastStageIndex := len(pip.stages) - 1
start := pip.source(input...)
result := pip.stages[0](start)
for i := 1; i < lastStageIndex; i++ {
result = pip.stages[i](result)
}
if lastStageIndex == 0 { // if it's a single stage we don't want to remake the last stage (as it's also the first again)
return pip.end(result)
}
return pip.end(pip.stages[lastStageIndex](result))
}
The call is cleaner than the previous ones, as you can see:
for _, number := range NewPipeline(start, final, add2, square, add2).Exec(1, 2, 3, 4, 5) {
fmt.Println(number)
}
4. The flow lasts a bit longer…
There’s a ton of features to think about this refactor. A simple TODO list would be:
- fan in/out
- cancellation
- when to use buffered channels
- errors
- …