Concurrency Patterns Advanced¶
Introduction¶
Go's concurrency model — goroutines and channels — provides primitives, not patterns. Knowing how to spin up a goroutine is easy; knowing how to orchestrate thousands of them safely, cancel them cleanly, and handle errors without leaking resources is what separates junior from senior Go engineers.
This page covers the canonical concurrency patterns that appear in production Go codebases and interviews alike. Each pattern is a complete, working implementation — not a fragment.
Syntax & Usage¶
Pipeline Pattern¶
A pipeline is a series of stages connected by channels. Each stage is a goroutine that receives values from an upstream channel, processes them, and sends results to a downstream channel.
package main
import "fmt"
// Stage 1: Generate numbers
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Stage 2: Square each number
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// Stage 3: Filter — keep only values above threshold
func filter(in <-chan int, threshold int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n > threshold {
out <- n
}
}
}()
return out
}
func main() {
// Compose the pipeline: generate -> square -> filter
ch := filter(square(generate(1, 2, 3, 4, 5)), 10)
for v := range ch {
fmt.Println(v) // 16, 25
}
}
graph LR
A[generate] -->|chan int| B[square]
B -->|chan int| C[filter]
C -->|chan int| D[consumer]
Fan-Out / Fan-In¶
Fan-out: multiple goroutines read from the same channel, distributing work.
Fan-in: multiple channels are merged into a single channel, collecting results.
package main
import (
"fmt"
"sync"
)
func producer(items ...string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, item := range items {
out <- item
}
}()
return out
}
// worker processes items — fan-out: launch multiple workers on the same input
func worker(id int, in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for item := range in {
out <- fmt.Sprintf("worker-%d processed %s", id, item)
}
}()
return out
}
// merge fans-in multiple channels into one
func merge(channels ...<-chan string) <-chan string {
out := make(chan string)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan string) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
input := producer("a", "b", "c", "d", "e", "f")
// Fan-out: 3 workers share the input channel
w1 := worker(1, input)
w2 := worker(2, input)
w3 := worker(3, input)
// Fan-in: merge all worker outputs
for result := range merge(w1, w2, w3) {
fmt.Println(result)
}
}
Worker Pool (Bounded Concurrency)¶
The most common production pattern — limit concurrent work to N goroutines.
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Payload string
}
type Result struct {
JobID int
Output string
Err error
}
func workerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
results := make(chan Result)
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
defer wg.Done()
for job := range jobs {
select {
case <-ctx.Done():
return
default:
}
// Simulate work
time.Sleep(100 * time.Millisecond)
output := fmt.Sprintf("worker %d processed job %d: %s",
workerID, job.ID, job.Payload)
select {
case results <- Result{JobID: job.ID, Output: output}:
case <-ctx.Done():
return
}
}
}(i)
}
go func() {
wg.Wait()
close(results)
}()
return results
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
jobs := make(chan Job, 100)
go func() {
defer close(jobs)
for i := 0; i < 20; i++ {
jobs <- Job{ID: i, Payload: fmt.Sprintf("task-%d", i)}
}
}()
results := workerPool(ctx, 5, jobs)
for r := range results {
if r.Err != nil {
fmt.Printf("Job %d failed: %v\n", r.JobID, r.Err)
} else {
fmt.Println(r.Output)
}
}
}
Semaphore Pattern (Weighted Concurrency Limiting)¶
package main
import (
"context"
"fmt"
"time"
"golang.org/x/sync/semaphore"
)
func main() {
const maxConcurrency = 3
// Channel-based semaphore (no external dependency)
sem := make(chan struct{}, maxConcurrency)
for i := 0; i < 10; i++ {
sem <- struct{}{} // Acquire — blocks if buffer is full
go func(id int) {
defer func() { <-sem }() // Release
fmt.Printf("Processing %d\n", id)
time.Sleep(time.Second)
}(i)
}
// Drain semaphore to wait for all goroutines
for i := 0; i < maxConcurrency; i++ {
sem <- struct{}{}
}
}
// Weighted semaphore from x/sync — allows different costs per operation
func weightedSemaphoreExample() {
ctx := context.Background()
sem := semaphore.NewWeighted(10) // total weight capacity: 10
// Heavy operation costs 5
sem.Acquire(ctx, 5)
go func() {
defer sem.Release(5)
// ... heavy work ...
}()
// Light operation costs 1
sem.Acquire(ctx, 1)
go func() {
defer sem.Release(1)
// ... light work ...
}()
}
errgroup (golang.org/x/sync/errgroup)¶
The most important concurrency utility for production Go. Manages a group of goroutines, collects the first error, and cancels remaining work.
package main
import (
"context"
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
type UserProfile struct {
Name string
Posts []string
Friends []string
}
func fetchUser(ctx context.Context, id string) (string, error) {
req, _ := http.NewRequestWithContext(ctx, "GET",
"https://api.example.com/users/"+id, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("fetch user: %w", err)
}
defer resp.Body.Close()
return "Alice", nil
}
func fetchPosts(ctx context.Context, id string) ([]string, error) {
// Simulated API call
return []string{"post1", "post2"}, nil
}
func fetchFriends(ctx context.Context, id string) ([]string, error) {
return []string{"Bob", "Charlie"}, nil
}
func getUserProfile(ctx context.Context, userID string) (*UserProfile, error) {
g, ctx := errgroup.WithContext(ctx)
profile := &UserProfile{}
g.Go(func() error {
name, err := fetchUser(ctx, userID)
if err != nil {
return err
}
profile.Name = name
return nil
})
g.Go(func() error {
posts, err := fetchPosts(ctx, userID)
if err != nil {
return err
}
profile.Posts = posts
return nil
})
g.Go(func() error {
friends, err := fetchFriends(ctx, userID)
if err != nil {
return err
}
profile.Friends = friends
return nil
})
// Wait blocks until all goroutines complete or one returns an error.
// On first error, the derived context is cancelled, signaling other
// goroutines to abort.
if err := g.Wait(); err != nil {
return nil, fmt.Errorf("get user profile: %w", err)
}
return profile, nil
}
errgroup with Concurrency Limit¶
func processAllItems(ctx context.Context, items []Item) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // At most 10 goroutines concurrently
for _, item := range items {
item := item // capture loop variable (unnecessary in Go 1.22+)
g.Go(func() error {
return processItem(ctx, item)
})
}
return g.Wait()
}
Generator Pattern¶
// Fibonacci generator — produces values on demand
func fibonacci(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
a, b := 0, 1
for {
select {
case ch <- a:
a, b = b, a+b
case <-ctx.Done():
return
}
}
}()
return ch
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for v := range fibonacci(ctx) {
fmt.Println(v)
if v > 1000 {
cancel() // Stop the generator
break
}
}
}
Future / Promise Pattern¶
type Future[T any] struct {
result T
err error
done chan struct{}
}
func NewFuture[T any](fn func() (T, error)) *Future[T] {
f := &Future[T]{done: make(chan struct{})}
go func() {
f.result, f.err = fn()
close(f.done)
}()
return f
}
func (f *Future[T]) Get() (T, error) {
<-f.done
return f.result, f.err
}
func (f *Future[T]) GetWithTimeout(timeout time.Duration) (T, error) {
select {
case <-f.done:
return f.result, f.err
case <-time.After(timeout):
var zero T
return zero, fmt.Errorf("future timed out after %v", timeout)
}
}
// Usage: fire off concurrent work, collect results later
func main() {
userFuture := NewFuture(func() (User, error) {
return fetchUserFromDB("123")
})
ordersFuture := NewFuture(func() ([]Order, error) {
return fetchOrdersFromDB("123")
})
user, err := userFuture.Get()
orders, err := ordersFuture.Get()
// Both fetched concurrently
}
Bounded Parallelism with Rate Limiting¶
func processWithRateLimit(ctx context.Context, items []string) error {
const (
maxConcurrency = 10
ratePerSecond = 50
)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(maxConcurrency)
limiter := rate.NewLimiter(rate.Limit(ratePerSecond), ratePerSecond)
for _, item := range items {
item := item
if err := limiter.Wait(ctx); err != nil {
return err
}
g.Go(func() error {
return process(ctx, item)
})
}
return g.Wait()
}
Graceful Shutdown¶
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type Server struct {
httpServer *http.Server
workers sync.WaitGroup
shutdown chan struct{}
}
func NewServer(addr string) *Server {
s := &Server{
shutdown: make(chan struct{}),
}
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "OK")
})
s.httpServer = &http.Server{Addr: addr, Handler: mux}
return s
}
func (s *Server) StartBackgroundWorker(ctx context.Context) {
s.workers.Add(1)
go func() {
defer s.workers.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("Background work tick")
case <-ctx.Done():
fmt.Println("Worker shutting down...")
return
}
}
}()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
server := NewServer(":8080")
server.StartBackgroundWorker(ctx)
// Start HTTP server
go func() {
if err := server.httpServer.ListenAndServe(); err != http.ErrServerClosed {
fmt.Printf("HTTP server error: %v\n", err)
}
}()
// Wait for interrupt signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
fmt.Println("\nShutdown signal received...")
// Phase 1: Stop accepting new requests
shutdownCtx, shutdownCancel := context.WithTimeout(
context.Background(), 30*time.Second)
defer shutdownCancel()
server.httpServer.Shutdown(shutdownCtx)
// Phase 2: Cancel background work
cancel()
// Phase 3: Wait for in-flight work to complete
done := make(chan struct{})
go func() {
server.workers.Wait()
close(done)
}()
select {
case <-done:
fmt.Println("Graceful shutdown complete")
case <-shutdownCtx.Done():
fmt.Println("Shutdown timed out, forcing exit")
}
}
Quick Reference¶
| Pattern | Use When | Key Mechanism |
|---|---|---|
| Pipeline | Sequential processing stages | Chained channels |
| Fan-out/Fan-in | Parallelize CPU-bound work | Multiple goroutines on one channel + merge |
| Worker Pool | Bounded concurrent I/O | Fixed goroutine count reading from job channel |
| Semaphore | Variable-cost concurrency limiting | Buffered channel or x/sync/semaphore |
| errgroup | Concurrent tasks with error handling | errgroup.Group with Wait() |
| Generator | Lazy sequence production | Goroutine + channel, cancelled via context |
| Future/Promise | Fire-and-forget with deferred result | Channel or struct with done channel |
| Rate Limiting | API calls, external resource limits | rate.Limiter + errgroup |
| Graceful Shutdown | Production servers | Signal handling + context cancellation |
Best Practices¶
-
Always use
context.Contextfor cancellation propagation. Every long-running operation should accept a context as its first parameter. -
Always close channels from the sender side, never the receiver. The sender knows when there's no more data.
-
Use
errgroupas your default for concurrent tasks with error handling. It replaces most hand-rolledsync.WaitGroup+ error channel patterns. -
Set concurrency limits — unbounded goroutine creation is the #1 cause of Go service OOMs in production.
-
Prefer
selectwithctx.Done()in every goroutine loop to ensure clean cancellation. -
Use
deferfor cleanup —defer close(ch),defer wg.Done(),defer cancel().
Common Pitfalls¶
Goroutine Leaks
The most common concurrency bug. A goroutine blocked on a channel send/receive that will never complete stays alive forever, consuming memory.
// ❌ LEAK: if nobody reads from ch, the goroutine never exits
func leaky() <-chan int {
ch := make(chan int)
go func() {
ch <- expensiveComputation() // Blocked forever if consumer is gone
}()
return ch
}
// ✅ FIX: use context for cancellation
func safe(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
result := expensiveComputation()
select {
case ch <- result:
case <-ctx.Done():
}
}()
return ch
}
Closing a Channel Twice
Closing an already-closed channel causes a panic. Ensure only one goroutine (the sender) closes the channel.
Sending on a Closed Channel
Sending on a closed channel also panics. Design so that senders close the channel after all sends are done.
Loop Variable Capture (pre-Go 1.22)
// ❌ Before Go 1.22: all goroutines capture the same 'i'
for i := 0; i < 10; i++ {
go func() {
fmt.Println(i) // Likely prints "10" ten times
}()
}
// ✅ Fix: shadow the variable
for i := 0; i < 10; i++ {
i := i
go func() {
fmt.Println(i) // Prints 0-9 in some order
}()
}
// Note: Go 1.22+ fixes this — each iteration gets its own variable
Missing WaitGroup Coordination
Forgetting wg.Add() before launching a goroutine, or calling wg.Add() inside the goroutine (race condition with wg.Wait()).
Performance Considerations¶
-
Goroutine cost: ~2-8 KB stack initially, grows as needed. 100K goroutines use ~200 MB–800 MB of stack space alone.
-
Channel overhead: Unbuffered channels require synchronization (mutex lock + goroutine parking). Buffered channels are faster for producer-consumer patterns but use more memory.
-
Worker pool sizing: For CPU-bound work,
runtime.NumCPU()workers is optimal. For I/O-bound work, use more (10x–100x) depending on latency. -
Avoid excessive channel hopping: Each pipeline stage adds latency (channel send/receive ~50-100ns). For simple transformations, merge stages.
-
sync.Poolfor reducing allocations in worker pools — reuse buffers, structs, etc. -
Profile goroutine counts: Use
runtime.NumGoroutine()or pprof's goroutine profile to detect leaks in production.
Interview Tips¶
Interview Tip
When asked "How would you process 1 million items concurrently?", the answer is never "launch 1 million goroutines." Start with errgroup + SetLimit(N) or a worker pool. Discuss how you'd choose N (CPU-bound vs I/O-bound), how you'd handle errors, and how you'd implement graceful shutdown.
Interview Tip
The pipeline pattern is the go-to answer for "How would you design a data processing system in Go?" Draw the stage diagram, explain channel ownership (each stage owns its output channel), and show how you'd add cancellation with context.
Interview Tip
Know errgroup cold. It's the standard answer for "fetch 3 APIs concurrently and return the results." It handles cancellation, error collection, and goroutine lifecycle — exactly what interviewers want to see. Mention SetLimit() for bonus points.
Interview Tip
For graceful shutdown questions, show the three-phase approach: (1) stop accepting new work (close listener), (2) cancel in-flight work (context cancellation), (3) wait with timeout (don't hang forever). Mention signal.Notify and http.Server.Shutdown.
Key Takeaways¶
- Worker pool + errgroup covers 80% of real-world concurrency needs.
- Every goroutine must have an exit path — use context cancellation, channel closure, or WaitGroup.
- Fan-out/fan-in is the core pattern for parallel processing — distribute work, collect results.
- Pipelines compose beautifully for data processing, but each stage adds latency overhead.
- errgroup is the production standard for concurrent tasks with error handling — prefer it over hand-rolled WaitGroup patterns.
- Graceful shutdown requires signal handling, context propagation, and bounded wait times.