⚠️ How to go about this series?
1. Run Every Example
2. Experiment and Break Things
3. Reason About Behavior
4. Build Mental Models
In our previous post, we explored the Generator concurrency pattern, the building blocks of Go’s other concurrency patterns.
Now, let’s look at how these primitives combine to form powerful patterns that solve real-world problems.
In this post we’ll cover Pipeline Pattern and will try to visualize them. So let’s gear up as we’ll be hands on through out the process.
Pipeline Pattern
A pipeline is like an assembly line in a factory, where each stage performs a specific task on the data and passes the result to the next stage.
We build pipelines by connecting goroutines with channels, where each goroutine represents a stage that receives data, processes it, and sends it to the next stage.
Let’s implement a simple pipeline that:
- Generates numbers
- Squares them
- Prints the results
// Stage 1: Generate numbers
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// Stage 3: Print numbers
func print(in <-chan int) {
for n := range in {
fmt.Printf("%d ", n)
}
fmt.Println()
}
func main() {
// Connect the pipeline
numbers := generate(2, 3, 4) // Stage 1
squares := square(numbers) // Stage 2
print(squares) // Stage 3
}
✏️ Quick byte
<-chan int
denotes a receive-only channel. A channel of type<-chan int
can only be used to receive values, not to send them. This is useful to enforce stricter communication patterns and prevent accidental writes to the channel by the receiver.
chan int
This denotes a bidirectional channel. A channel of typechan int
can be used to both send and receive values.
Let’s go ahead and visualize the above example:
Here you can see each the building blocks of the pipeline are goroutines following generator pattern. Implies that as soon as the data is ready at any step the next step in the pipeline can start processing it unlike sequential processing.
Error Handling in Pipelines
Core principles should be:
- Each stage knows exactly what to do with both good and bad values
- Errors can’t get lost in the pipeline
- Bad values don’t cause panics
- The error message carries context about what went wrong
- The pipeline can be extended with more stages, and they’ll all handle errors consistently
let’s update our code with some proper error handling.
type Result struct {
Value int
Err error
}
func generateWithError(nums ...int) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for _, n := range nums {
if n < 0 {
out <- Result{Err: fmt.Errorf("negative number: %d", n)}
return
}
out <- Result{Value: n}
}
}()
return out
}
func squareWithError(in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for r := range in {
if r.Err != nil {
out <- r // Forward the error
continue
}
out <- Result{Value: r.Value * r.Value}
}
}()
return out
}
func main() {
// Using pipeline with error handling
for result := range squareWithError(generateWithError(2, -3, 4)) {
if result.Err != nil {
fmt.Printf("Error: %v\n", result.Err)
continue
}
fmt.Printf("Result: %d\n", result.Value)
}
}
Why Use Pipeline Pattern?
Let’s take an example to understand better, we have a data processing workflow that follows the pipeline pattern as shown below.
- Each stage in a pipeline operates independently, communicating only through channels. This enables several benefit:
👉 Each stage can be developed, tested, and modified independently 👉 Changes to one stage’s internals don’t affect other stages 👉 Easy to add new stages or modify existing ones 👉 Clear separation of concerns
- Pipeline patterns naturally enable parallel/concurrent processing. Each stage can process different data simultaneously as soon as the data is available.
And the best part? We can run multiple instance of each stage (workers) for more concurrent requirements like so:
🤔💡 Hey but isn’t that the Fan-In and Fan-Out Concurrency Pattern?
Bingo! Good catch right there. It is indeed a Fan-Out, Fan-In pattern, which is a specific type of pipeline pattern. We are going to cover it in details in out next post so fret not ;)
Real world use case
processing images in a pipeline
type Image struct {
Data []byte
Format string
}
// Stage 1: Load images
func loadImages(filenames ...string) <-chan Image {
out := make(chan Image)
go func() {
defer close(out)
for _, f := range filenames {
data, _ := os.ReadFile(f)
out <- Image{Data: data, Format: filepath.Ext(f)}
}
}()
return out
}
// Stage 2: Resize images
func resize(in <-chan Image) <-chan Image {
out := make(chan Image)
go func() {
defer close(out)
for img := range in {
// Simulate resize operation
resizedData := img.Data // In reality, you'd resize here
out <- Image{Data: resizedData, Format: img.Format}
}
}()
return out
}
// Stage 3: Save images
func save(in <-chan Image) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for img := range in {
filename := fmt.Sprintf("resized%s", img.Format)
// Simulate save operation
out <- fmt.Sprintf("Saved: %s", filename)
}
}()
return out
}
or something as complicated as log processing pipeline
Pipeline scaling patterns
Horizontal Scaling (Fan-Out, Fan-In)
This pattern is ideal for CPU-bound operations where work can be processed independently. The pipeline distributes work across multiple workers and then recombines the results. This is particularly effective when:
- Processing is CPU-intensive (data transformations, calculations)
- Tasks can be processed independently
- You have multiple CPU cores available
Buffered Pipeline
This pattern helps manage speed mismatches between pipeline stages. The buffer acts as a shock absorber, allowing fast stages to work ahead without being blocked by slower stages. This is useful when:
- Different stages have varying processing speeds
- You want to maintain steady throughput
- Memory usage for buffering is acceptable
- You need to handle burst processing
Batched Processing
This pattern optimizes I/O-bound operations by grouping multiple items into a single batch. Instead of processing items one at a time, it collects them into groups and processes them together. This is effective when:
- You’re working with external systems (databases, APIs)
- Network round-trips are expensive
- The operation has significant fixed overhead per request
- You need to optimize throughput over latency
Each of these patterns can be combined as needed. For example, you might use batched processing with horizontal scaling, where multiple workers each process batches of items. The key is understanding your bottlenecks and choosing the appropriate pattern to address them.
That wraps up our deep dive into the Generator pattern! Coming up next, we’ll explore the Pipeline concurrency pattern, where we’ll see how to chain our generators together to build powerful data processing flows.
If you found this post helpful, have any questions, or want to share your own experiences with generators - I’d love to hear from you in the comments below. Your insights and questions help make these explanations even better for everyone.
If you missed the post on visual guide to golang’s goroutines and channels then check it out :)
Stay tuned for more Go concurrency patterns! 🚀