Database Patterns Advanced¶
Introduction¶
Go's database/sql package provides a lightweight, driver-agnostic interface to SQL databases. Understanding it deeply — that sql.DB is a connection pool, not a single connection; that prepared statements and context-aware queries matter for production; that transaction handling requires careful error management — separates production Go engineers from tutorial-level ones.
Why This Matters
Every backend service talks to a database. Interviewers probe whether you understand connection pool tuning, safe transaction patterns, NULL handling, and how to structure data access for testability. These are the patterns that prevent outages.
sql.DB Is a Connection Pool¶
import (
"database/sql"
_ "github.com/lib/pq" // PostgreSQL driver (registers via init)
)
func main() {
// sql.Open does NOT open a connection — it initializes the pool
db, err := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Verify connectivity
if err := db.PingContext(context.Background()); err != nil {
log.Fatalf("database unreachable: %v", err)
}
}
Connection Pool Configuration¶
db.SetMaxOpenConns(25) // max simultaneous connections
db.SetMaxIdleConns(10) // max idle connections in pool
db.SetConnMaxLifetime(5 * time.Minute) // recycle connections (DNS changes, LB)
db.SetConnMaxIdleTime(1 * time.Minute) // close idle connections faster
graph TD
APP[Application] --> POOL[sql.DB Connection Pool]
POOL -->|"MaxOpenConns=25"| C1[Conn 1]
POOL --> C2[Conn 2]
POOL --> C3[...]
POOL -->|"MaxIdleConns=10"| CN[Conn N]
C1 --> DB[(PostgreSQL)]
C2 --> DB
C3 --> DB
CN --> DB
| Setting | Default | Production Recommendation | Why |
|---|---|---|---|
MaxOpenConns |
0 (unlimited) | 25–50 | Prevents overwhelming the DB |
MaxIdleConns |
2 | 10–25 | Avoids reconnection overhead |
ConnMaxLifetime |
0 (no limit) | 5 min | Handles DNS changes, LB rotation |
ConnMaxIdleTime |
0 (no limit) | 1 min | Frees unused connections faster |
Interview Tip
"I always set MaxOpenConns based on the database's connection limit divided by the number of application instances. For example, if PostgreSQL allows 100 connections and I have 4 pods, each pod gets MaxOpenConns=25. I also set ConnMaxLifetime to handle infrastructure changes like database failovers."
Context-Aware Queries¶
Always use the Context variants in production — they respect deadlines and cancellation.
QueryContext (Multiple Rows)¶
func (r *userRepo) List(ctx context.Context, limit int) ([]*User, error) {
rows, err := r.db.QueryContext(ctx,
"SELECT id, name, email, created_at FROM users ORDER BY created_at DESC LIMIT $1",
limit,
)
if err != nil {
return nil, fmt.Errorf("query users: %w", err)
}
defer rows.Close()
var users []*User
for rows.Next() {
var u User
if err := rows.Scan(&u.ID, &u.Name, &u.Email, &u.CreatedAt); err != nil {
return nil, fmt.Errorf("scan user: %w", err)
}
users = append(users, &u)
}
// CRITICAL: check for errors after iteration
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration: %w", err)
}
return users, nil
}
QueryRowContext (Single Row)¶
func (r *userRepo) FindByID(ctx context.Context, id string) (*User, error) {
var u User
err := r.db.QueryRowContext(ctx,
"SELECT id, name, email, created_at FROM users WHERE id = $1", id,
).Scan(&u.ID, &u.Name, &u.Email, &u.CreatedAt)
if err == sql.ErrNoRows {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("find user %s: %w", id, err)
}
return &u, nil
}
ExecContext (INSERT, UPDATE, DELETE)¶
func (r *userRepo) Save(ctx context.Context, u *User) error {
result, err := r.db.ExecContext(ctx,
`INSERT INTO users (id, name, email, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET name = $2, email = $3`,
u.ID, u.Name, u.Email, u.CreatedAt,
)
if err != nil {
return fmt.Errorf("save user: %w", err)
}
affected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if affected == 0 {
return fmt.Errorf("no rows affected")
}
return nil
}
Transactions¶
The Safe Transaction Pattern¶
func (r *userRepo) Transfer(ctx context.Context, fromID, toID string, amount int) error {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
// defer rollback — no-op if tx is already committed
defer tx.Rollback()
var fromBalance int
err = tx.QueryRowContext(ctx,
"SELECT balance FROM accounts WHERE user_id = $1 FOR UPDATE", fromID,
).Scan(&fromBalance)
if err != nil {
return fmt.Errorf("get balance: %w", err)
}
if fromBalance < amount {
return fmt.Errorf("insufficient balance: have %d, need %d", fromBalance, amount)
}
if _, err := tx.ExecContext(ctx,
"UPDATE accounts SET balance = balance - $1 WHERE user_id = $2",
amount, fromID,
); err != nil {
return fmt.Errorf("debit: %w", err)
}
if _, err := tx.ExecContext(ctx,
"UPDATE accounts SET balance = balance + $1 WHERE user_id = $2",
amount, toID,
); err != nil {
return fmt.Errorf("credit: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit: %w", err)
}
return nil
}
Reusable Transaction Helper¶
func WithTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
if err := fn(tx); err != nil {
return err
}
return tx.Commit()
}
// Usage
err := WithTx(ctx, db, func(tx *sql.Tx) error {
if _, err := tx.ExecContext(ctx, "INSERT INTO orders ...", args...); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, "UPDATE inventory ...", args...); err != nil {
return err
}
return nil
})
Transaction Isolation Levels¶
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
ReadOnly: true, // for read-only transactions
})
| Level | Dirty Read | Non-Repeatable Read | Phantom Read | Use Case |
|---|---|---|---|---|
ReadUncommitted |
Yes | Yes | Yes | Almost never |
ReadCommitted |
No | Yes | Yes | PostgreSQL default |
RepeatableRead |
No | No | Yes | MySQL default |
Serializable |
No | No | No | Financial, strict consistency |
NULL Handling¶
sql.NullXxx Types¶
type Profile struct {
ID string
Name string
Phone sql.NullString // nullable column
Age sql.NullInt64
}
func (r *repo) GetProfile(ctx context.Context, id string) (*Profile, error) {
var p Profile
err := r.db.QueryRowContext(ctx,
"SELECT id, name, phone, age FROM profiles WHERE id = $1", id,
).Scan(&p.ID, &p.Name, &p.Phone, &p.Age)
// Check if null
if p.Phone.Valid {
fmt.Println("Phone:", p.Phone.String)
} else {
fmt.Println("Phone: not set")
}
return &p, err
}
Pointer Fields (Cleaner API)¶
type Profile struct {
ID string
Name string
Phone *string // nil = NULL
Age *int
}
func (r *repo) GetProfile(ctx context.Context, id string) (*Profile, error) {
var p Profile
err := r.db.QueryRowContext(ctx,
"SELECT id, name, phone, age FROM profiles WHERE id = $1", id,
).Scan(&p.ID, &p.Name, &p.Phone, &p.Age)
return &p, err
}
// Usage
if profile.Phone != nil {
fmt.Println("Phone:", *profile.Phone)
}
sqlx (Easier Scanning)¶
import "github.com/jmoiron/sqlx"
type User struct {
ID string `db:"id"`
Name string `db:"name"`
Email string `db:"email"`
CreatedAt time.Time `db:"created_at"`
}
db, err := sqlx.Connect("postgres", dsn)
// Scan directly into struct
var user User
err = db.GetContext(ctx, &user,
"SELECT id, name, email, created_at FROM users WHERE id = $1", id)
// Scan into slice of structs
var users []User
err = db.SelectContext(ctx, &users,
"SELECT id, name, email, created_at FROM users ORDER BY created_at DESC LIMIT $1", 10)
// Named queries
result, err := db.NamedExecContext(ctx,
"INSERT INTO users (id, name, email) VALUES (:id, :name, :email)",
user)
Prepared Statements¶
// Prepare once, execute many times (useful in loops)
stmt, err := db.PrepareContext(ctx,
"INSERT INTO events (id, type, payload, created_at) VALUES ($1, $2, $3, $4)")
if err != nil {
return err
}
defer stmt.Close()
for _, event := range events {
_, err := stmt.ExecContext(ctx, event.ID, event.Type, event.Payload, event.CreatedAt)
if err != nil {
return fmt.Errorf("insert event %s: %w", event.ID, err)
}
}
Prepared Statement Pitfalls
db.Prepare() pins a connection for the statement. If you prepare many statements without closing them, you can exhaust the pool. In most cases, db.QueryContext() / db.ExecContext() are fine — the driver handles preparation transparently.
Database Migrations¶
golang-migrate¶
go install -tags 'postgres' github.com/golang-migrate/migrate/v4/cmd/migrate@latest
# Create migration
migrate create -ext sql -dir migrations -seq add_users_table
# Run migrations
migrate -path migrations -database "postgres://localhost/mydb?sslmode=disable" up
# Rollback last migration
migrate -path migrations -database "postgres://localhost/mydb?sslmode=disable" down 1
-- migrations/000001_add_users_table.up.sql
CREATE TABLE users (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_users_email ON users(email);
-- migrations/000001_add_users_table.down.sql
DROP TABLE IF EXISTS users;
Running Migrations in Code¶
import (
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
)
func runMigrations(dbURL string) error {
m, err := migrate.New("file://migrations", dbURL)
if err != nil {
return fmt.Errorf("create migrator: %w", err)
}
if err := m.Up(); err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("run migrations: %w", err)
}
return nil
}
Repository Pattern¶
type UserRepository interface {
FindByID(ctx context.Context, id string) (*User, error)
FindByEmail(ctx context.Context, email string) (*User, error)
Save(ctx context.Context, user *User) error
Delete(ctx context.Context, id string) error
List(ctx context.Context, offset, limit int) ([]*User, error)
}
type postgresUserRepo struct {
db *sql.DB
}
func NewPostgresUserRepository(db *sql.DB) UserRepository {
return &postgresUserRepo{db: db}
}
// All methods use the same *sql.DB (connection pool)
// Methods accept context for deadline/cancellation
// Return domain types, not sql types
Supporting Transactions Across Repositories¶
// DBTX abstracts *sql.DB and *sql.Tx
type DBTX interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}
type postgresUserRepo struct {
db DBTX // accepts both *sql.DB and *sql.Tx
}
func NewPostgresUserRepository(db DBTX) *postgresUserRepo {
return &postgresUserRepo{db: db}
}
// Usage: pass the same tx to multiple repositories
func (s *service) TransferAndNotify(ctx context.Context, req TransferRequest) error {
return WithTx(ctx, s.db, func(tx *sql.Tx) error {
accountRepo := NewAccountRepository(tx)
auditRepo := NewAuditRepository(tx)
if err := accountRepo.Debit(ctx, req.FromID, req.Amount); err != nil {
return err
}
if err := accountRepo.Credit(ctx, req.ToID, req.Amount); err != nil {
return err
}
return auditRepo.Log(ctx, "transfer", req)
})
}
Testing Database Code¶
sqlmock¶
import "github.com/DATA-DOG/go-sqlmock"
func TestFindByID(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatal(err)
}
defer db.Close()
rows := sqlmock.NewRows([]string{"id", "name", "email", "created_at"}).
AddRow("123", "Alice", "alice@example.com", time.Now())
mock.ExpectQuery("SELECT id, name, email, created_at FROM users WHERE id = \\$1").
WithArgs("123").
WillReturnRows(rows)
repo := NewPostgresUserRepository(db)
user, err := repo.FindByID(context.Background(), "123")
if err != nil {
t.Fatal(err)
}
if user.Name != "Alice" {
t.Errorf("got %q, want %q", user.Name, "Alice")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unfulfilled expectations: %v", err)
}
}
Integration Test with testcontainers¶
//go:build integration
func TestUserRepoIntegration(t *testing.T) {
ctx := context.Background()
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "postgres:16-alpine",
ExposedPorts: []string{"5432/tcp"},
Env: map[string]string{
"POSTGRES_USER": "test",
"POSTGRES_PASSWORD": "test",
"POSTGRES_DB": "testdb",
},
WaitingFor: wait.ForListeningPort("5432/tcp"),
},
Started: true,
})
if err != nil {
t.Fatal(err)
}
defer container.Terminate(ctx)
host, _ := container.Host(ctx)
port, _ := container.MappedPort(ctx, "5432")
dsn := fmt.Sprintf("postgres://test:test@%s:%s/testdb?sslmode=disable", host, port.Port())
db, err := sql.Open("postgres", dsn)
if err != nil {
t.Fatal(err)
}
defer db.Close()
// Run migrations
runMigrations(dsn)
repo := NewPostgresUserRepository(db)
t.Run("save and find", func(t *testing.T) {
user := &User{ID: "1", Name: "Alice", Email: "alice@test.com", CreatedAt: time.Now()}
if err := repo.Save(ctx, user); err != nil {
t.Fatal(err)
}
got, err := repo.FindByID(ctx, "1")
if err != nil {
t.Fatal(err)
}
if got.Name != "Alice" {
t.Errorf("got %q, want %q", got.Name, "Alice")
}
})
}
Quick Reference¶
| Operation | Method | Notes |
|---|---|---|
| Open pool | sql.Open(driver, dsn) |
Does NOT connect — call Ping to verify |
| Single row | db.QueryRowContext(ctx, q, args...) |
Returns *sql.Row, call .Scan() |
| Multiple rows | db.QueryContext(ctx, q, args...) |
Returns *sql.Rows, iterate with .Next() |
| Execute | db.ExecContext(ctx, q, args...) |
INSERT, UPDATE, DELETE; returns sql.Result |
| Begin tx | db.BeginTx(ctx, opts) |
Always defer tx.Rollback() |
| Commit | tx.Commit() |
Rollback is no-op after commit |
| Prepare | db.PrepareContext(ctx, q) |
Reuse for repeated queries; defer stmt.Close() |
| NULL column | sql.NullString or *string |
Handles SQL NULL values |
| Pool config | db.SetMaxOpenConns(n) |
Always set in production |
Best Practices¶
- Always set
MaxOpenConns— unbounded connections can overwhelm the database - Always use
Contextvariants —QueryContext,ExecContext,BeginTxfor deadline propagation - Always
defer rows.Close()— leaked rows hold connections from the pool - Always check
rows.Err()after iteration — partial results from network errors are silent - Use
defer tx.Rollback()immediately afterBeginTx— it's a no-op afterCommit - Wrap errors with
%w— enableserrors.Is()checks on sentinel errors likesql.ErrNoRows - Use the repository pattern — abstracts storage, enables mocking, separates concerns
- Use the
DBTXinterface — lets the same repository code work with both*sql.DBand*sql.Tx
Common Pitfalls¶
Not Closing Rows
Forgetting rows.Close() leaks connections. Eventually the pool exhausts and all queries block.
Not Checking rows.Err()
rows.Next() returns false both on completion and on error. Without rows.Err(), you silently process partial results.
Using sql.DB as Single Connection
sql.DB is a pool. Each Query/Exec can use a different connection. For operations that must use the same connection (e.g., temp tables, session variables), use an explicit transaction or db.Conn(ctx).
Scanning into Wrong Types
Scanning a nullable column into a non-pointer type panics or produces zero values. Use sql.NullString or *string for nullable columns.
Forgetting ConnMaxLifetime
Without ConnMaxLifetime, connections live forever. If the database IP changes (failover, DNS rotation), stale connections cause silent failures.
Performance Considerations¶
- Connection pooling is automatic — don't open/close
sql.DBper request - Prepared statements reduce parsing overhead for repeated identical queries
- Batch inserts (using
COPYor multi-value INSERT) are 10–100x faster than individual INSERTs pgx(native PostgreSQL driver) is faster thanlib/pq— use it for new projects- Index your queries —
EXPLAIN ANALYZEis your friend; no amount of Go optimization helps if the DB does a full scan - Connection pool exhaustion is the #1 production database issue — monitor
db.Stats():
Interview Tips¶
Interview Tip
"The most common production database bug I've seen is connection pool exhaustion — from leaked rows, unbounded pool size, or long-running transactions. I always set MaxOpenConns, defer rows.Close(), and monitor db.Stats() via Prometheus."
Interview Tip
"I use the DBTX interface pattern so my repository methods work with both *sql.DB and *sql.Tx. This lets me compose operations into transactions without changing the repository code — I just pass a tx instead of db."
Interview Tip
"For testing database code, I use sqlmock for unit tests (fast, no external dependencies) and testcontainers for integration tests (real PostgreSQL in Docker). Build tags separate them so go test ./... stays fast."
Key Takeaways¶
sql.DBis a connection pool — open once, configure pool settings, share across the application- Always use
Contextvariants —QueryContext,ExecContext,BeginTxfor proper timeout and cancellation defer rows.Close()and checkrows.Err()— prevent pool leaks and silent data loss- Transaction pattern:
BeginTx→defer Rollback()→ operations →Commit()— safe and clean - The
DBTXinterface enables repositories that work with both connections and transactions - Set
MaxOpenConnsandConnMaxLifetimein production — defaults are unsafe - Repository pattern + interfaces = testable, swappable data access layer
- Use sqlmock for unit tests, testcontainers for integration tests — fast feedback + real confidence