Skip to content

Concurrency Patterns Advanced

Introduction

Go's concurrency model — goroutines and channels — provides primitives, not patterns. Knowing how to spin up a goroutine is easy; knowing how to orchestrate thousands of them safely, cancel them cleanly, and handle errors without leaking resources is what separates junior from senior Go engineers.

This page covers the canonical concurrency patterns that appear in production Go codebases and interviews alike. Each pattern is a complete, working implementation — not a fragment.

Syntax & Usage

Pipeline Pattern

A pipeline is a series of stages connected by channels. Each stage is a goroutine that receives values from an upstream channel, processes them, and sends results to a downstream channel.

package main

import "fmt"

// Stage 1: Generate numbers
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// Stage 2: Square each number
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// Stage 3: Filter — keep only values above threshold
func filter(in <-chan int, threshold int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n > threshold {
                out <- n
            }
        }
    }()
    return out
}

func main() {
    // Compose the pipeline: generate -> square -> filter
    ch := filter(square(generate(1, 2, 3, 4, 5)), 10)

    for v := range ch {
        fmt.Println(v) // 16, 25
    }
}
graph LR
    A[generate] -->|chan int| B[square]
    B -->|chan int| C[filter]
    C -->|chan int| D[consumer]

Fan-Out / Fan-In

Fan-out: multiple goroutines read from the same channel, distributing work.
Fan-in: multiple channels are merged into a single channel, collecting results.

package main

import (
    "fmt"
    "sync"
)

func producer(items ...string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for _, item := range items {
            out <- item
        }
    }()
    return out
}

// worker processes items — fan-out: launch multiple workers on the same input
func worker(id int, in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for item := range in {
            out <- fmt.Sprintf("worker-%d processed %s", id, item)
        }
    }()
    return out
}

// merge fans-in multiple channels into one
func merge(channels ...<-chan string) <-chan string {
    out := make(chan string)
    var wg sync.WaitGroup
    wg.Add(len(channels))

    for _, ch := range channels {
        go func(c <-chan string) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    input := producer("a", "b", "c", "d", "e", "f")

    // Fan-out: 3 workers share the input channel
    w1 := worker(1, input)
    w2 := worker(2, input)
    w3 := worker(3, input)

    // Fan-in: merge all worker outputs
    for result := range merge(w1, w2, w3) {
        fmt.Println(result)
    }
}

Worker Pool (Bounded Concurrency)

The most common production pattern — limit concurrent work to N goroutines.

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID      int
    Payload string
}

type Result struct {
    JobID int
    Output string
    Err   error
}

func workerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
    results := make(chan Result)
    var wg sync.WaitGroup
    wg.Add(numWorkers)

    for i := 0; i < numWorkers; i++ {
        go func(workerID int) {
            defer wg.Done()
            for job := range jobs {
                select {
                case <-ctx.Done():
                    return
                default:
                }

                // Simulate work
                time.Sleep(100 * time.Millisecond)
                output := fmt.Sprintf("worker %d processed job %d: %s",
                    workerID, job.ID, job.Payload)

                select {
                case results <- Result{JobID: job.ID, Output: output}:
                case <-ctx.Done():
                    return
                }
            }
        }(i)
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    jobs := make(chan Job, 100)
    go func() {
        defer close(jobs)
        for i := 0; i < 20; i++ {
            jobs <- Job{ID: i, Payload: fmt.Sprintf("task-%d", i)}
        }
    }()

    results := workerPool(ctx, 5, jobs)

    for r := range results {
        if r.Err != nil {
            fmt.Printf("Job %d failed: %v\n", r.JobID, r.Err)
        } else {
            fmt.Println(r.Output)
        }
    }
}

Semaphore Pattern (Weighted Concurrency Limiting)

package main

import (
    "context"
    "fmt"
    "time"

    "golang.org/x/sync/semaphore"
)

func main() {
    const maxConcurrency = 3

    // Channel-based semaphore (no external dependency)
    sem := make(chan struct{}, maxConcurrency)

    for i := 0; i < 10; i++ {
        sem <- struct{}{} // Acquire — blocks if buffer is full
        go func(id int) {
            defer func() { <-sem }() // Release
            fmt.Printf("Processing %d\n", id)
            time.Sleep(time.Second)
        }(i)
    }

    // Drain semaphore to wait for all goroutines
    for i := 0; i < maxConcurrency; i++ {
        sem <- struct{}{}
    }
}

// Weighted semaphore from x/sync — allows different costs per operation
func weightedSemaphoreExample() {
    ctx := context.Background()
    sem := semaphore.NewWeighted(10) // total weight capacity: 10

    // Heavy operation costs 5
    sem.Acquire(ctx, 5)
    go func() {
        defer sem.Release(5)
        // ... heavy work ...
    }()

    // Light operation costs 1
    sem.Acquire(ctx, 1)
    go func() {
        defer sem.Release(1)
        // ... light work ...
    }()
}

errgroup (golang.org/x/sync/errgroup)

The most important concurrency utility for production Go. Manages a group of goroutines, collects the first error, and cancels remaining work.

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

type UserProfile struct {
    Name    string
    Posts   []string
    Friends []string
}

func fetchUser(ctx context.Context, id string) (string, error) {
    req, _ := http.NewRequestWithContext(ctx, "GET",
        "https://api.example.com/users/"+id, nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return "", fmt.Errorf("fetch user: %w", err)
    }
    defer resp.Body.Close()
    return "Alice", nil
}

func fetchPosts(ctx context.Context, id string) ([]string, error) {
    // Simulated API call
    return []string{"post1", "post2"}, nil
}

func fetchFriends(ctx context.Context, id string) ([]string, error) {
    return []string{"Bob", "Charlie"}, nil
}

func getUserProfile(ctx context.Context, userID string) (*UserProfile, error) {
    g, ctx := errgroup.WithContext(ctx)
    profile := &UserProfile{}

    g.Go(func() error {
        name, err := fetchUser(ctx, userID)
        if err != nil {
            return err
        }
        profile.Name = name
        return nil
    })

    g.Go(func() error {
        posts, err := fetchPosts(ctx, userID)
        if err != nil {
            return err
        }
        profile.Posts = posts
        return nil
    })

    g.Go(func() error {
        friends, err := fetchFriends(ctx, userID)
        if err != nil {
            return err
        }
        profile.Friends = friends
        return nil
    })

    // Wait blocks until all goroutines complete or one returns an error.
    // On first error, the derived context is cancelled, signaling other
    // goroutines to abort.
    if err := g.Wait(); err != nil {
        return nil, fmt.Errorf("get user profile: %w", err)
    }

    return profile, nil
}

errgroup with Concurrency Limit

func processAllItems(ctx context.Context, items []Item) error {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(10) // At most 10 goroutines concurrently

    for _, item := range items {
        item := item // capture loop variable (unnecessary in Go 1.22+)
        g.Go(func() error {
            return processItem(ctx, item)
        })
    }

    return g.Wait()
}

Generator Pattern

// Fibonacci generator — produces values on demand
func fibonacci(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        a, b := 0, 1
        for {
            select {
            case ch <- a:
                a, b = b, a+b
            case <-ctx.Done():
                return
            }
        }
    }()
    return ch
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    for v := range fibonacci(ctx) {
        fmt.Println(v)
        if v > 1000 {
            cancel() // Stop the generator
            break
        }
    }
}

Future / Promise Pattern

type Future[T any] struct {
    result T
    err    error
    done   chan struct{}
}

func NewFuture[T any](fn func() (T, error)) *Future[T] {
    f := &Future[T]{done: make(chan struct{})}
    go func() {
        f.result, f.err = fn()
        close(f.done)
    }()
    return f
}

func (f *Future[T]) Get() (T, error) {
    <-f.done
    return f.result, f.err
}

func (f *Future[T]) GetWithTimeout(timeout time.Duration) (T, error) {
    select {
    case <-f.done:
        return f.result, f.err
    case <-time.After(timeout):
        var zero T
        return zero, fmt.Errorf("future timed out after %v", timeout)
    }
}

// Usage: fire off concurrent work, collect results later
func main() {
    userFuture := NewFuture(func() (User, error) {
        return fetchUserFromDB("123")
    })
    ordersFuture := NewFuture(func() ([]Order, error) {
        return fetchOrdersFromDB("123")
    })

    user, err := userFuture.Get()
    orders, err := ordersFuture.Get()
    // Both fetched concurrently
}

Bounded Parallelism with Rate Limiting

func processWithRateLimit(ctx context.Context, items []string) error {
    const (
        maxConcurrency = 10
        ratePerSecond  = 50
    )

    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(maxConcurrency)

    limiter := rate.NewLimiter(rate.Limit(ratePerSecond), ratePerSecond)

    for _, item := range items {
        item := item
        if err := limiter.Wait(ctx); err != nil {
            return err
        }
        g.Go(func() error {
            return process(ctx, item)
        })
    }

    return g.Wait()
}

Graceful Shutdown

package main

import (
    "context"
    "fmt"
    "net/http"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

type Server struct {
    httpServer *http.Server
    workers    sync.WaitGroup
    shutdown   chan struct{}
}

func NewServer(addr string) *Server {
    s := &Server{
        shutdown: make(chan struct{}),
    }
    mux := http.NewServeMux()
    mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintln(w, "OK")
    })
    s.httpServer = &http.Server{Addr: addr, Handler: mux}
    return s
}

func (s *Server) StartBackgroundWorker(ctx context.Context) {
    s.workers.Add(1)
    go func() {
        defer s.workers.Done()
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-ticker.C:
                fmt.Println("Background work tick")
            case <-ctx.Done():
                fmt.Println("Worker shutting down...")
                return
            }
        }
    }()
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    server := NewServer(":8080")
    server.StartBackgroundWorker(ctx)

    // Start HTTP server
    go func() {
        if err := server.httpServer.ListenAndServe(); err != http.ErrServerClosed {
            fmt.Printf("HTTP server error: %v\n", err)
        }
    }()

    // Wait for interrupt signal
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh
    fmt.Println("\nShutdown signal received...")

    // Phase 1: Stop accepting new requests
    shutdownCtx, shutdownCancel := context.WithTimeout(
        context.Background(), 30*time.Second)
    defer shutdownCancel()
    server.httpServer.Shutdown(shutdownCtx)

    // Phase 2: Cancel background work
    cancel()

    // Phase 3: Wait for in-flight work to complete
    done := make(chan struct{})
    go func() {
        server.workers.Wait()
        close(done)
    }()

    select {
    case <-done:
        fmt.Println("Graceful shutdown complete")
    case <-shutdownCtx.Done():
        fmt.Println("Shutdown timed out, forcing exit")
    }
}

Quick Reference

Pattern Use When Key Mechanism
Pipeline Sequential processing stages Chained channels
Fan-out/Fan-in Parallelize CPU-bound work Multiple goroutines on one channel + merge
Worker Pool Bounded concurrent I/O Fixed goroutine count reading from job channel
Semaphore Variable-cost concurrency limiting Buffered channel or x/sync/semaphore
errgroup Concurrent tasks with error handling errgroup.Group with Wait()
Generator Lazy sequence production Goroutine + channel, cancelled via context
Future/Promise Fire-and-forget with deferred result Channel or struct with done channel
Rate Limiting API calls, external resource limits rate.Limiter + errgroup
Graceful Shutdown Production servers Signal handling + context cancellation

Best Practices

  1. Always use context.Context for cancellation propagation. Every long-running operation should accept a context as its first parameter.

  2. Always close channels from the sender side, never the receiver. The sender knows when there's no more data.

  3. Use errgroup as your default for concurrent tasks with error handling. It replaces most hand-rolled sync.WaitGroup + error channel patterns.

  4. Set concurrency limits — unbounded goroutine creation is the #1 cause of Go service OOMs in production.

  5. Prefer select with ctx.Done() in every goroutine loop to ensure clean cancellation.

  6. Use defer for cleanupdefer close(ch), defer wg.Done(), defer cancel().

Common Pitfalls

Goroutine Leaks

The most common concurrency bug. A goroutine blocked on a channel send/receive that will never complete stays alive forever, consuming memory.

// ❌ LEAK: if nobody reads from ch, the goroutine never exits
func leaky() <-chan int {
    ch := make(chan int)
    go func() {
        ch <- expensiveComputation() // Blocked forever if consumer is gone
    }()
    return ch
}

// ✅ FIX: use context for cancellation
func safe(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        result := expensiveComputation()
        select {
        case ch <- result:
        case <-ctx.Done():
        }
    }()
    return ch
}

Closing a Channel Twice

Closing an already-closed channel causes a panic. Ensure only one goroutine (the sender) closes the channel.

// ❌ Multiple goroutines closing the same channel
go func() { close(ch) }()
go func() { close(ch) }() // PANIC

// ✅ Use sync.Once if multiple goroutines need to trigger close
var once sync.Once
closeCh := func() { once.Do(func() { close(ch) }) }

Sending on a Closed Channel

Sending on a closed channel also panics. Design so that senders close the channel after all sends are done.

Loop Variable Capture (pre-Go 1.22)

// ❌ Before Go 1.22: all goroutines capture the same 'i'
for i := 0; i < 10; i++ {
    go func() {
        fmt.Println(i) // Likely prints "10" ten times
    }()
}

// ✅ Fix: shadow the variable
for i := 0; i < 10; i++ {
    i := i
    go func() {
        fmt.Println(i) // Prints 0-9 in some order
    }()
}
// Note: Go 1.22+ fixes this — each iteration gets its own variable

Missing WaitGroup Coordination

Forgetting wg.Add() before launching a goroutine, or calling wg.Add() inside the goroutine (race condition with wg.Wait()).

// ❌ Race: wg.Wait() might execute before wg.Add()
go func() {
    wg.Add(1)
    defer wg.Done()
    // ...
}()
wg.Wait()

// ✅ Always Add before launching
wg.Add(1)
go func() {
    defer wg.Done()
    // ...
}()
wg.Wait()

Performance Considerations

  • Goroutine cost: ~2-8 KB stack initially, grows as needed. 100K goroutines use ~200 MB–800 MB of stack space alone.

  • Channel overhead: Unbuffered channels require synchronization (mutex lock + goroutine parking). Buffered channels are faster for producer-consumer patterns but use more memory.

  • Worker pool sizing: For CPU-bound work, runtime.NumCPU() workers is optimal. For I/O-bound work, use more (10x–100x) depending on latency.

  • Avoid excessive channel hopping: Each pipeline stage adds latency (channel send/receive ~50-100ns). For simple transformations, merge stages.

  • sync.Pool for reducing allocations in worker pools — reuse buffers, structs, etc.

  • Profile goroutine counts: Use runtime.NumGoroutine() or pprof's goroutine profile to detect leaks in production.

Interview Tips

Interview Tip

When asked "How would you process 1 million items concurrently?", the answer is never "launch 1 million goroutines." Start with errgroup + SetLimit(N) or a worker pool. Discuss how you'd choose N (CPU-bound vs I/O-bound), how you'd handle errors, and how you'd implement graceful shutdown.

Interview Tip

The pipeline pattern is the go-to answer for "How would you design a data processing system in Go?" Draw the stage diagram, explain channel ownership (each stage owns its output channel), and show how you'd add cancellation with context.

Interview Tip

Know errgroup cold. It's the standard answer for "fetch 3 APIs concurrently and return the results." It handles cancellation, error collection, and goroutine lifecycle — exactly what interviewers want to see. Mention SetLimit() for bonus points.

Interview Tip

For graceful shutdown questions, show the three-phase approach: (1) stop accepting new work (close listener), (2) cancel in-flight work (context cancellation), (3) wait with timeout (don't hang forever). Mention signal.Notify and http.Server.Shutdown.

Key Takeaways

  • Worker pool + errgroup covers 80% of real-world concurrency needs.
  • Every goroutine must have an exit path — use context cancellation, channel closure, or WaitGroup.
  • Fan-out/fan-in is the core pattern for parallel processing — distribute work, collect results.
  • Pipelines compose beautifully for data processing, but each stage adds latency overhead.
  • errgroup is the production standard for concurrent tasks with error handling — prefer it over hand-rolled WaitGroup patterns.
  • Graceful shutdown requires signal handling, context propagation, and bounded wait times.