Skip to content

Database Patterns Advanced

Introduction

Go's database/sql package provides a lightweight, driver-agnostic interface to SQL databases. Understanding it deeply — that sql.DB is a connection pool, not a single connection; that prepared statements and context-aware queries matter for production; that transaction handling requires careful error management — separates production Go engineers from tutorial-level ones.

Why This Matters

Every backend service talks to a database. Interviewers probe whether you understand connection pool tuning, safe transaction patterns, NULL handling, and how to structure data access for testability. These are the patterns that prevent outages.


sql.DB Is a Connection Pool

import (
    "database/sql"
    _ "github.com/lib/pq" // PostgreSQL driver (registers via init)
)

func main() {
    // sql.Open does NOT open a connection — it initializes the pool
    db, err := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    // Verify connectivity
    if err := db.PingContext(context.Background()); err != nil {
        log.Fatalf("database unreachable: %v", err)
    }
}

Connection Pool Configuration

db.SetMaxOpenConns(25)                 // max simultaneous connections
db.SetMaxIdleConns(10)                 // max idle connections in pool
db.SetConnMaxLifetime(5 * time.Minute) // recycle connections (DNS changes, LB)
db.SetConnMaxIdleTime(1 * time.Minute) // close idle connections faster
graph TD
    APP[Application] --> POOL[sql.DB Connection Pool]
    POOL -->|"MaxOpenConns=25"| C1[Conn 1]
    POOL --> C2[Conn 2]
    POOL --> C3[...]
    POOL -->|"MaxIdleConns=10"| CN[Conn N]
    C1 --> DB[(PostgreSQL)]
    C2 --> DB
    C3 --> DB
    CN --> DB
Setting Default Production Recommendation Why
MaxOpenConns 0 (unlimited) 25–50 Prevents overwhelming the DB
MaxIdleConns 2 10–25 Avoids reconnection overhead
ConnMaxLifetime 0 (no limit) 5 min Handles DNS changes, LB rotation
ConnMaxIdleTime 0 (no limit) 1 min Frees unused connections faster

Interview Tip

"I always set MaxOpenConns based on the database's connection limit divided by the number of application instances. For example, if PostgreSQL allows 100 connections and I have 4 pods, each pod gets MaxOpenConns=25. I also set ConnMaxLifetime to handle infrastructure changes like database failovers."


Context-Aware Queries

Always use the Context variants in production — they respect deadlines and cancellation.

QueryContext (Multiple Rows)

func (r *userRepo) List(ctx context.Context, limit int) ([]*User, error) {
    rows, err := r.db.QueryContext(ctx,
        "SELECT id, name, email, created_at FROM users ORDER BY created_at DESC LIMIT $1",
        limit,
    )
    if err != nil {
        return nil, fmt.Errorf("query users: %w", err)
    }
    defer rows.Close()

    var users []*User
    for rows.Next() {
        var u User
        if err := rows.Scan(&u.ID, &u.Name, &u.Email, &u.CreatedAt); err != nil {
            return nil, fmt.Errorf("scan user: %w", err)
        }
        users = append(users, &u)
    }

    // CRITICAL: check for errors after iteration
    if err := rows.Err(); err != nil {
        return nil, fmt.Errorf("rows iteration: %w", err)
    }

    return users, nil
}

QueryRowContext (Single Row)

func (r *userRepo) FindByID(ctx context.Context, id string) (*User, error) {
    var u User
    err := r.db.QueryRowContext(ctx,
        "SELECT id, name, email, created_at FROM users WHERE id = $1", id,
    ).Scan(&u.ID, &u.Name, &u.Email, &u.CreatedAt)

    if err == sql.ErrNoRows {
        return nil, ErrNotFound
    }
    if err != nil {
        return nil, fmt.Errorf("find user %s: %w", id, err)
    }
    return &u, nil
}

ExecContext (INSERT, UPDATE, DELETE)

func (r *userRepo) Save(ctx context.Context, u *User) error {
    result, err := r.db.ExecContext(ctx,
        `INSERT INTO users (id, name, email, created_at)
         VALUES ($1, $2, $3, $4)
         ON CONFLICT (id) DO UPDATE SET name = $2, email = $3`,
        u.ID, u.Name, u.Email, u.CreatedAt,
    )
    if err != nil {
        return fmt.Errorf("save user: %w", err)
    }

    affected, err := result.RowsAffected()
    if err != nil {
        return fmt.Errorf("rows affected: %w", err)
    }
    if affected == 0 {
        return fmt.Errorf("no rows affected")
    }
    return nil
}

Transactions

The Safe Transaction Pattern

func (r *userRepo) Transfer(ctx context.Context, fromID, toID string, amount int) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    // defer rollback — no-op if tx is already committed
    defer tx.Rollback()

    var fromBalance int
    err = tx.QueryRowContext(ctx,
        "SELECT balance FROM accounts WHERE user_id = $1 FOR UPDATE", fromID,
    ).Scan(&fromBalance)
    if err != nil {
        return fmt.Errorf("get balance: %w", err)
    }

    if fromBalance < amount {
        return fmt.Errorf("insufficient balance: have %d, need %d", fromBalance, amount)
    }

    if _, err := tx.ExecContext(ctx,
        "UPDATE accounts SET balance = balance - $1 WHERE user_id = $2",
        amount, fromID,
    ); err != nil {
        return fmt.Errorf("debit: %w", err)
    }

    if _, err := tx.ExecContext(ctx,
        "UPDATE accounts SET balance = balance + $1 WHERE user_id = $2",
        amount, toID,
    ); err != nil {
        return fmt.Errorf("credit: %w", err)
    }

    if err := tx.Commit(); err != nil {
        return fmt.Errorf("commit: %w", err)
    }
    return nil
}

Reusable Transaction Helper

func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    if err := fn(tx); err != nil {
        return err
    }

    return tx.Commit()
}

// Usage
err := WithTx(ctx, db, func(tx *sql.Tx) error {
    if _, err := tx.ExecContext(ctx, "INSERT INTO orders ...", args...); err != nil {
        return err
    }
    if _, err := tx.ExecContext(ctx, "UPDATE inventory ...", args...); err != nil {
        return err
    }
    return nil
})

Transaction Isolation Levels

tx, err := db.BeginTx(ctx, &sql.TxOptions{
    Isolation: sql.LevelSerializable,
    ReadOnly:  true, // for read-only transactions
})
Level Dirty Read Non-Repeatable Read Phantom Read Use Case
ReadUncommitted Yes Yes Yes Almost never
ReadCommitted No Yes Yes PostgreSQL default
RepeatableRead No No Yes MySQL default
Serializable No No No Financial, strict consistency

NULL Handling

sql.NullXxx Types

type Profile struct {
    ID    string
    Name  string
    Phone sql.NullString  // nullable column
    Age   sql.NullInt64
}

func (r *repo) GetProfile(ctx context.Context, id string) (*Profile, error) {
    var p Profile
    err := r.db.QueryRowContext(ctx,
        "SELECT id, name, phone, age FROM profiles WHERE id = $1", id,
    ).Scan(&p.ID, &p.Name, &p.Phone, &p.Age)

    // Check if null
    if p.Phone.Valid {
        fmt.Println("Phone:", p.Phone.String)
    } else {
        fmt.Println("Phone: not set")
    }
    return &p, err
}

Pointer Fields (Cleaner API)

type Profile struct {
    ID    string
    Name  string
    Phone *string // nil = NULL
    Age   *int
}

func (r *repo) GetProfile(ctx context.Context, id string) (*Profile, error) {
    var p Profile
    err := r.db.QueryRowContext(ctx,
        "SELECT id, name, phone, age FROM profiles WHERE id = $1", id,
    ).Scan(&p.ID, &p.Name, &p.Phone, &p.Age)
    return &p, err
}

// Usage
if profile.Phone != nil {
    fmt.Println("Phone:", *profile.Phone)
}

sqlx (Easier Scanning)

import "github.com/jmoiron/sqlx"

type User struct {
    ID        string    `db:"id"`
    Name      string    `db:"name"`
    Email     string    `db:"email"`
    CreatedAt time.Time `db:"created_at"`
}

db, err := sqlx.Connect("postgres", dsn)

// Scan directly into struct
var user User
err = db.GetContext(ctx, &user,
    "SELECT id, name, email, created_at FROM users WHERE id = $1", id)

// Scan into slice of structs
var users []User
err = db.SelectContext(ctx, &users,
    "SELECT id, name, email, created_at FROM users ORDER BY created_at DESC LIMIT $1", 10)

// Named queries
result, err := db.NamedExecContext(ctx,
    "INSERT INTO users (id, name, email) VALUES (:id, :name, :email)",
    user)

Prepared Statements

// Prepare once, execute many times (useful in loops)
stmt, err := db.PrepareContext(ctx,
    "INSERT INTO events (id, type, payload, created_at) VALUES ($1, $2, $3, $4)")
if err != nil {
    return err
}
defer stmt.Close()

for _, event := range events {
    _, err := stmt.ExecContext(ctx, event.ID, event.Type, event.Payload, event.CreatedAt)
    if err != nil {
        return fmt.Errorf("insert event %s: %w", event.ID, err)
    }
}

Prepared Statement Pitfalls

db.Prepare() pins a connection for the statement. If you prepare many statements without closing them, you can exhaust the pool. In most cases, db.QueryContext() / db.ExecContext() are fine — the driver handles preparation transparently.


Database Migrations

golang-migrate

go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest

# Create migration
migrate create -ext sql -dir migrations -seq add_users_table

# Run migrations
migrate -path migrations -database "postgres://localhost/mydb?sslmode=disable" up

# Rollback last migration
migrate -path migrations -database "postgres://localhost/mydb?sslmode=disable" down 1
-- migrations/000001_add_users_table.up.sql
CREATE TABLE users (
    id         TEXT PRIMARY KEY,
    name       TEXT NOT NULL,
    email      TEXT UNIQUE NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_users_email ON users(email);

-- migrations/000001_add_users_table.down.sql
DROP TABLE IF EXISTS users;

Running Migrations in Code

import (
    "github.com/golang-migrate/migrate/v4"
    _ "github.com/golang-migrate/migrate/v4/database/postgres"
    _ "github.com/golang-migrate/migrate/v4/source/file"
)

func runMigrations(dbURL string) error {
    m, err := migrate.New("file://migrations", dbURL)
    if err != nil {
        return fmt.Errorf("create migrator: %w", err)
    }

    if err := m.Up(); err != nil && err != migrate.ErrNoChange {
        return fmt.Errorf("run migrations: %w", err)
    }

    return nil
}

Repository Pattern

type UserRepository interface {
    FindByID(ctx context.Context, id string) (*User, error)
    FindByEmail(ctx context.Context, email string) (*User, error)
    Save(ctx context.Context, user *User) error
    Delete(ctx context.Context, id string) error
    List(ctx context.Context, offset, limit int) ([]*User, error)
}

type postgresUserRepo struct {
    db *sql.DB
}

func NewPostgresUserRepository(db *sql.DB) UserRepository {
    return &postgresUserRepo{db: db}
}

// All methods use the same *sql.DB (connection pool)
// Methods accept context for deadline/cancellation
// Return domain types, not sql types

Supporting Transactions Across Repositories

// DBTX abstracts *sql.DB and *sql.Tx
type DBTX interface {
    ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
    QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
    QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

type postgresUserRepo struct {
    db DBTX // accepts both *sql.DB and *sql.Tx
}

func NewPostgresUserRepository(db DBTX) *postgresUserRepo {
    return &postgresUserRepo{db: db}
}

// Usage: pass the same tx to multiple repositories
func (s *service) TransferAndNotify(ctx context.Context, req TransferRequest) error {
    return WithTx(ctx, s.db, func(tx *sql.Tx) error {
        accountRepo := NewAccountRepository(tx)
        auditRepo := NewAuditRepository(tx)

        if err := accountRepo.Debit(ctx, req.FromID, req.Amount); err != nil {
            return err
        }
        if err := accountRepo.Credit(ctx, req.ToID, req.Amount); err != nil {
            return err
        }
        return auditRepo.Log(ctx, "transfer", req)
    })
}

Testing Database Code

sqlmock

import "github.com/DATA-DOG/go-sqlmock"

func TestFindByID(t *testing.T) {
    db, mock, err := sqlmock.New()
    if err != nil {
        t.Fatal(err)
    }
    defer db.Close()

    rows := sqlmock.NewRows([]string{"id", "name", "email", "created_at"}).
        AddRow("123", "Alice", "alice@example.com", time.Now())

    mock.ExpectQuery("SELECT id, name, email, created_at FROM users WHERE id = \\$1").
        WithArgs("123").
        WillReturnRows(rows)

    repo := NewPostgresUserRepository(db)
    user, err := repo.FindByID(context.Background(), "123")
    if err != nil {
        t.Fatal(err)
    }
    if user.Name != "Alice" {
        t.Errorf("got %q, want %q", user.Name, "Alice")
    }

    if err := mock.ExpectationsWereMet(); err != nil {
        t.Errorf("unfulfilled expectations: %v", err)
    }
}

Integration Test with testcontainers

//go:build integration

func TestUserRepoIntegration(t *testing.T) {
    ctx := context.Background()

    container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
        ContainerRequest: testcontainers.ContainerRequest{
            Image:        "postgres:16-alpine",
            ExposedPorts: []string{"5432/tcp"},
            Env: map[string]string{
                "POSTGRES_USER":     "test",
                "POSTGRES_PASSWORD": "test",
                "POSTGRES_DB":       "testdb",
            },
            WaitingFor: wait.ForListeningPort("5432/tcp"),
        },
        Started: true,
    })
    if err != nil {
        t.Fatal(err)
    }
    defer container.Terminate(ctx)

    host, _ := container.Host(ctx)
    port, _ := container.MappedPort(ctx, "5432")
    dsn := fmt.Sprintf("postgres://test:test@%s:%s/testdb?sslmode=disable", host, port.Port())

    db, err := sql.Open("postgres", dsn)
    if err != nil {
        t.Fatal(err)
    }
    defer db.Close()

    // Run migrations
    runMigrations(dsn)

    repo := NewPostgresUserRepository(db)

    t.Run("save and find", func(t *testing.T) {
        user := &User{ID: "1", Name: "Alice", Email: "alice@test.com", CreatedAt: time.Now()}
        if err := repo.Save(ctx, user); err != nil {
            t.Fatal(err)
        }

        got, err := repo.FindByID(ctx, "1")
        if err != nil {
            t.Fatal(err)
        }
        if got.Name != "Alice" {
            t.Errorf("got %q, want %q", got.Name, "Alice")
        }
    })
}

Quick Reference

Operation Method Notes
Open pool sql.Open(driver, dsn) Does NOT connect — call Ping to verify
Single row db.QueryRowContext(ctx, q, args...) Returns *sql.Row, call .Scan()
Multiple rows db.QueryContext(ctx, q, args...) Returns *sql.Rows, iterate with .Next()
Execute db.ExecContext(ctx, q, args...) INSERT, UPDATE, DELETE; returns sql.Result
Begin tx db.BeginTx(ctx, opts) Always defer tx.Rollback()
Commit tx.Commit() Rollback is no-op after commit
Prepare db.PrepareContext(ctx, q) Reuse for repeated queries; defer stmt.Close()
NULL column sql.NullString or *string Handles SQL NULL values
Pool config db.SetMaxOpenConns(n) Always set in production

Best Practices

  1. Always set MaxOpenConns — unbounded connections can overwhelm the database
  2. Always use Context variantsQueryContext, ExecContext, BeginTx for deadline propagation
  3. Always defer rows.Close() — leaked rows hold connections from the pool
  4. Always check rows.Err() after iteration — partial results from network errors are silent
  5. Use defer tx.Rollback() immediately after BeginTx — it's a no-op after Commit
  6. Wrap errors with %w — enables errors.Is() checks on sentinel errors like sql.ErrNoRows
  7. Use the repository pattern — abstracts storage, enables mocking, separates concerns
  8. Use the DBTX interface — lets the same repository code work with both *sql.DB and *sql.Tx

Common Pitfalls

Not Closing Rows

Forgetting rows.Close() leaks connections. Eventually the pool exhausts and all queries block.

// BAD: rows never closed if error occurs in loop
rows, _ := db.QueryContext(ctx, "SELECT ...")
for rows.Next() {
    // if this errors, rows stays open
}

// GOOD: always defer close
rows, err := db.QueryContext(ctx, "SELECT ...")
if err != nil {
    return err
}
defer rows.Close()

Not Checking rows.Err()

rows.Next() returns false both on completion and on error. Without rows.Err(), you silently process partial results.

for rows.Next() {
    rows.Scan(&v)
}
// CRITICAL: check for iteration errors
if err := rows.Err(); err != nil {
    return err
}

Using sql.DB as Single Connection

sql.DB is a pool. Each Query/Exec can use a different connection. For operations that must use the same connection (e.g., temp tables, session variables), use an explicit transaction or db.Conn(ctx).

Scanning into Wrong Types

Scanning a nullable column into a non-pointer type panics or produces zero values. Use sql.NullString or *string for nullable columns.

Forgetting ConnMaxLifetime

Without ConnMaxLifetime, connections live forever. If the database IP changes (failover, DNS rotation), stale connections cause silent failures.


Performance Considerations

  • Connection pooling is automatic — don't open/close sql.DB per request
  • Prepared statements reduce parsing overhead for repeated identical queries
  • Batch inserts (using COPY or multi-value INSERT) are 10–100x faster than individual INSERTs
  • pgx (native PostgreSQL driver) is faster than lib/pq — use it for new projects
  • Index your queriesEXPLAIN ANALYZE is your friend; no amount of Go optimization helps if the DB does a full scan
  • Connection pool exhaustion is the #1 production database issue — monitor db.Stats():
    stats := db.Stats()
    slog.Info("db pool",
        "open", stats.OpenConnections,
        "in_use", stats.InUse,
        "idle", stats.Idle,
        "wait_count", stats.WaitCount,
        "wait_duration", stats.WaitDuration,
    )
    

Interview Tips

Interview Tip

"The most common production database bug I've seen is connection pool exhaustion — from leaked rows, unbounded pool size, or long-running transactions. I always set MaxOpenConns, defer rows.Close(), and monitor db.Stats() via Prometheus."

Interview Tip

"I use the DBTX interface pattern so my repository methods work with both *sql.DB and *sql.Tx. This lets me compose operations into transactions without changing the repository code — I just pass a tx instead of db."

Interview Tip

"For testing database code, I use sqlmock for unit tests (fast, no external dependencies) and testcontainers for integration tests (real PostgreSQL in Docker). Build tags separate them so go test ./... stays fast."


Key Takeaways

  1. sql.DB is a connection pool — open once, configure pool settings, share across the application
  2. Always use Context variantsQueryContext, ExecContext, BeginTx for proper timeout and cancellation
  3. defer rows.Close() and check rows.Err() — prevent pool leaks and silent data loss
  4. Transaction pattern: BeginTxdefer Rollback() → operations → Commit() — safe and clean
  5. The DBTX interface enables repositories that work with both connections and transactions
  6. Set MaxOpenConns and ConnMaxLifetime in production — defaults are unsafe
  7. Repository pattern + interfaces = testable, swappable data access layer
  8. Use sqlmock for unit tests, testcontainers for integration tests — fast feedback + real confidence