Back to Skills
    🦞

    database-operations

    Use when designing database schemas, writing

    By @jgarrison929
    View on GitHub
    SKILL.md
    ---
    name: database-operations
    version: 1.0.0
    description: Use when designing database schemas, writing migrations, optimizing SQL queries, fixing N+1 problems, creating indexes, setting up PostgreSQL, configuring EF Core, implementing caching, partitioning tables, or any database performance question.
    triggers:
      - database
      - schema
      - migration
      - SQL
      - query optimization
      - index
      - PostgreSQL
      - Postgres
      - N+1
      - slow query
      - EXPLAIN
      - partitioning
      - caching
      - Redis
      - connection pool
      - EF Core migration
      - database design
    role: specialist
    scope: implementation
    output-format: code
    ---
    
    # Database Operations
    
    Comprehensive database design, migration, and optimization specialist. Adapted from buildwithclaude by Dave Poon (MIT).
    
    ## Role Definition
    
    You are a database optimization expert specializing in PostgreSQL, query performance, schema design, and EF Core migrations. You measure first, optimize second, and always plan rollback procedures.
    
    ## Core Principles
    
    1. **Measure first** — always use `EXPLAIN ANALYZE` before optimizing
    2. **Index strategically** — based on query patterns, not every column
    3. **Denormalize selectively** — only when justified by read patterns
    4. **Cache expensive computations** — Redis/materialized views for hot paths
    5. **Plan rollback** — every migration has a reverse migration
    6. **Zero-downtime migrations** — additive changes first, destructive later
    
    ---
    
    ## Schema Design Patterns
    
    ### User Management
    
    ```sql
    CREATE TYPE user_status AS ENUM ('active', 'inactive', 'suspended', 'pending');
    
    CREATE TABLE users (
      id BIGSERIAL PRIMARY KEY,
      email VARCHAR(255) UNIQUE NOT NULL,
      username VARCHAR(50) UNIQUE NOT NULL,
      password_hash VARCHAR(255) NOT NULL,
      first_name VARCHAR(100) NOT NULL,
      last_name VARCHAR(100) NOT NULL,
      status user_status DEFAULT 'active',
      email_verified BOOLEAN DEFAULT FALSE,
      created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
      updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
      deleted_at TIMESTAMPTZ,  -- Soft delete
    
      CONSTRAINT users_email_format CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}
    #x27;), CONSTRAINT users_names_not_empty CHECK (LENGTH(TRIM(first_name)) > 0 AND LENGTH(TRIM(last_name)) > 0) ); -- Strategic indexes CREATE INDEX idx_users_email ON users(email); CREATE INDEX idx_users_status ON users(status) WHERE status != 'active'; CREATE INDEX idx_users_created_at ON users(created_at); CREATE INDEX idx_users_deleted_at ON users(deleted_at) WHERE deleted_at IS NULL; ``` ### Audit Trail ```sql CREATE TYPE audit_operation AS ENUM ('INSERT', 'UPDATE', 'DELETE'); CREATE TABLE audit_log ( id BIGSERIAL PRIMARY KEY, table_name VARCHAR(255) NOT NULL, record_id BIGINT NOT NULL, operation audit_operation NOT NULL, old_values JSONB, new_values JSONB, changed_fields TEXT[], user_id BIGINT REFERENCES users(id), created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_audit_table_record ON audit_log(table_name, record_id); CREATE INDEX idx_audit_user_time ON audit_log(user_id, created_at); -- Trigger function CREATE OR REPLACE FUNCTION audit_trigger_function() RETURNS TRIGGER AS $ BEGIN IF TG_OP = 'DELETE' THEN INSERT INTO audit_log (table_name, record_id, operation, old_values) VALUES (TG_TABLE_NAME, OLD.id, 'DELETE', to_jsonb(OLD)); RETURN OLD; ELSIF TG_OP = 'UPDATE' THEN INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values) VALUES (TG_TABLE_NAME, NEW.id, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW)); RETURN NEW; ELSIF TG_OP = 'INSERT' THEN INSERT INTO audit_log (table_name, record_id, operation, new_values) VALUES (TG_TABLE_NAME, NEW.id, 'INSERT', to_jsonb(NEW)); RETURN NEW; END IF; END; $ LANGUAGE plpgsql; -- Apply to any table CREATE TRIGGER audit_users AFTER INSERT OR UPDATE OR DELETE ON users FOR EACH ROW EXECUTE FUNCTION audit_trigger_function(); ``` ### Soft Delete Pattern ```sql -- Query filter view CREATE VIEW active_users AS SELECT * FROM users WHERE deleted_at IS NULL; -- Soft delete function CREATE OR REPLACE FUNCTION soft_delete(p_table TEXT, p_id BIGINT) RETURNS VOID AS $ BEGIN EXECUTE format('UPDATE %I SET deleted_at = CURRENT_TIMESTAMP WHERE id = $1 AND deleted_at IS NULL', p_table) USING p_id; END; $ LANGUAGE plpgsql; ``` ### Full-Text Search ```sql ALTER TABLE products ADD COLUMN search_vector tsvector GENERATED ALWAYS AS ( to_tsvector('english', COALESCE(name, '') || ' ' || COALESCE(description, '') || ' ' || COALESCE(sku, '')) ) STORED; CREATE INDEX idx_products_search ON products USING gin(search_vector); -- Query SELECT * FROM products WHERE search_vector @@ to_tsquery('english', 'laptop & gaming'); ``` --- ## Query Optimization ### Analyze Before Optimizing ```sql -- Always start here EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) SELECT u.id, u.name, COUNT(o.id) as order_count FROM users u LEFT JOIN orders o ON u.id = o.user_id WHERE u.created_at > '2024-01-01' GROUP BY u.id, u.name ORDER BY order_count DESC; ``` ### Indexing Strategy ```sql -- Single column for exact lookups CREATE INDEX CONCURRENTLY idx_users_email ON users(email); -- Composite for multi-column queries (order matters!) CREATE INDEX CONCURRENTLY idx_orders_user_status ON orders(user_id, status, created_at); -- Partial index for filtered queries CREATE INDEX CONCURRENTLY idx_products_low_stock ON products(inventory_quantity) WHERE inventory_tracking = true AND inventory_quantity <= 5; -- Covering index (includes extra columns to avoid table lookup) CREATE INDEX CONCURRENTLY idx_orders_covering ON orders(user_id, status) INCLUDE (total, created_at); -- GIN index for JSONB CREATE INDEX CONCURRENTLY idx_products_attrs ON products USING gin(attributes); -- Expression index CREATE INDEX CONCURRENTLY idx_users_email_lower ON users(lower(email)); ``` ### Find Unused Indexes ```sql SELECT schemaname, tablename, indexname, idx_scan as scans, pg_size_pretty(pg_relation_size(indexrelid)) as size FROM pg_stat_user_indexes WHERE idx_scan = 0 ORDER BY pg_relation_size(indexrelid) DESC; ``` ### Find Missing Indexes (Slow Queries) ```sql -- Enable pg_stat_statements first SELECT query, calls, total_exec_time, mean_exec_time, rows FROM pg_stat_statements WHERE mean_exec_time > 100 -- ms ORDER BY total_exec_time DESC LIMIT 20; ``` ### N+1 Query Detection ```sql -- Look for repeated similar queries in pg_stat_statements SELECT query, calls, mean_exec_time FROM pg_stat_statements WHERE calls > 100 AND query LIKE '%WHERE%id = $1%' ORDER BY calls DESC; ``` --- ## Migration Patterns ### Safe Column Addition ```sql -- +migrate Up -- Always use CONCURRENTLY for indexes in production ALTER TABLE users ADD COLUMN phone VARCHAR(20); CREATE INDEX CONCURRENTLY idx_users_phone ON users(phone) WHERE phone IS NOT NULL; -- +migrate Down DROP INDEX IF EXISTS idx_users_phone; ALTER TABLE users DROP COLUMN IF EXISTS phone; ``` ### Safe Column Rename (Zero-Downtime) ```sql -- Step 1: Add new column ALTER TABLE users ADD COLUMN display_name VARCHAR(100); UPDATE users SET display_name = name; ALTER TABLE users ALTER COLUMN display_name SET NOT NULL; -- Step 2: Deploy code that writes to both columns -- Step 3: Deploy code that reads from new column -- Step 4: Drop old column ALTER TABLE users DROP COLUMN name; ``` ### Table Partitioning ```sql -- Create partitioned table CREATE TABLE orders ( id BIGSERIAL, user_id BIGINT NOT NULL, total DECIMAL(10,2), created_at TIMESTAMPTZ NOT NULL, PRIMARY KEY (id, created_at) ) PARTITION BY RANGE (created_at); -- Monthly partitions CREATE TABLE orders_2024_01 PARTITION OF orders FOR VALUES FROM ('2024-01-01') TO ('2024-02-01'); CREATE TABLE orders_2024_02 PARTITION OF orders FOR VALUES FROM ('2024-02-01') TO ('2024-03-01'); -- Auto-create partitions CREATE OR REPLACE FUNCTION create_monthly_partition(p_table TEXT, p_date DATE) RETURNS VOID AS $ DECLARE partition_name TEXT := p_table || '_' || to_char(p_date, 'YYYY_MM'); next_date DATE := p_date + INTERVAL '1 month'; BEGIN EXECUTE format( 'CREATE TABLE IF NOT EXISTS %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)', partition_name, p_table, p_date, next_date ); END; $ LANGUAGE plpgsql; ``` --- ## EF Core Migrations (.NET) ### Create and Apply ```bash # Add migration dotnet ef migrations add AddPhoneToUsers -p src/Infrastructure -s src/Api # Apply dotnet ef database update -p src/Infrastructure -s src/Api # Generate idempotent SQL script for production dotnet ef migrations script -p src/Infrastructure -s src/Api -o migration.sql --idempotent # Rollback dotnet ef database update PreviousMigrationName -p src/Infrastructure -s src/Api ``` ### EF Core Configuration Best Practices ```csharp // Use AsNoTracking for read queries var users = await _db.Users .AsNoTracking() .Where(u => u.Status == UserStatus.Active) .Select(u => new UserDto { Id = u.Id, Name = u.Name }) .ToListAsync(ct); // Avoid N+1 with Include var orders = await _db.Orders .Include(o => o.Items) .ThenInclude(i => i.Product) .Where(o => o.UserId == userId) .ToListAsync(ct); // Better: Projection var orders = await _db.Orders .Where(o => o.UserId == userId) .Select(o => new OrderDto { Id = o.Id, Total = o.Total, Items = o.Items.Select(i => new OrderItemDto { ProductName = i.Product.Name, Quantity = i.Quantity, }).ToList(), }) .ToListAsync(ct); ``` --- ## Caching Strategy ### Redis Query Cache ```typescript import Redis from 'ioredis' const redis = new Redis(process.env.REDIS_URL) async function cachedQuery<T>( key: string, queryFn: () => Promise<T>, ttlSeconds: number = 300 ): Promise<T> { const cached = await redis.get(key) if (cached) return JSON.parse(cached) const result = await queryFn() await redis.setex(key, ttlSeconds, JSON.stringify(result)) return result } // Usage const product ... (truncated)