Back to Skills
    🦞

    senior-data-engineer

    Data engineering skill for building scalable

    By @alirezarezvani
    View on GitHub
    SKILL.md
    ---
    name: senior-data-engineer
    description: Data engineering skill for building scalable data pipelines, ETL/ELT systems, and data infrastructure. Expertise in Python, SQL, Spark, Airflow, dbt, Kafka, and modern data stack. Includes data modeling, pipeline orchestration, data quality, and DataOps. Use when designing data architectures, building data pipelines, optimizing data workflows, implementing data governance, or troubleshooting data issues.
    ---
    
    # Senior Data Engineer
    
    Production-grade data engineering skill for building scalable, reliable data systems.
    
    ## Table of Contents
    
    1. [Trigger Phrases](#trigger-phrases)
    2. [Quick Start](#quick-start)
    3. [Workflows](#workflows)
       - [Building a Batch ETL Pipeline](#workflow-1-building-a-batch-etl-pipeline)
       - [Implementing Real-Time Streaming](#workflow-2-implementing-real-time-streaming)
       - [Data Quality Framework Setup](#workflow-3-data-quality-framework-setup)
    4. [Architecture Decision Framework](#architecture-decision-framework)
    5. [Tech Stack](#tech-stack)
    6. [Reference Documentation](#reference-documentation)
    7. [Troubleshooting](#troubleshooting)
    
    ---
    
    ## Trigger Phrases
    
    Activate this skill when you see:
    
    **Pipeline Design:**
    - "Design a data pipeline for..."
    - "Build an ETL/ELT process..."
    - "How should I ingest data from..."
    - "Set up data extraction from..."
    
    **Architecture:**
    - "Should I use batch or streaming?"
    - "Lambda vs Kappa architecture"
    - "How to handle late-arriving data"
    - "Design a data lakehouse"
    
    **Data Modeling:**
    - "Create a dimensional model..."
    - "Star schema vs snowflake"
    - "Implement slowly changing dimensions"
    - "Design a data vault"
    
    **Data Quality:**
    - "Add data validation to..."
    - "Set up data quality checks"
    - "Monitor data freshness"
    - "Implement data contracts"
    
    **Performance:**
    - "Optimize this Spark job"
    - "Query is running slow"
    - "Reduce pipeline execution time"
    - "Tune Airflow DAG"
    
    ---
    
    ## Quick Start
    
    ### Core Tools
    
    ```bash
    # Generate pipeline orchestration config
    python scripts/pipeline_orchestrator.py generate \
      --type airflow \
      --source postgres \
      --destination snowflake \
      --schedule "0 5 * * *"
    
    # Validate data quality
    python scripts/data_quality_validator.py validate \
      --input data/sales.parquet \
      --schema schemas/sales.json \
      --checks freshness,completeness,uniqueness
    
    # Optimize ETL performance
    python scripts/etl_performance_optimizer.py analyze \
      --query queries/daily_aggregation.sql \
      --engine spark \
      --recommend
    ```
    
    ---
    
    ## Workflows
    
    ### Workflow 1: Building a Batch ETL Pipeline
    
    **Scenario:** Extract data from PostgreSQL, transform with dbt, load to Snowflake.
    
    #### Step 1: Define Source Schema
    
    ```sql
    -- Document source tables
    SELECT
        table_name,
        column_name,
        data_type,
        is_nullable
    FROM information_schema.columns
    WHERE table_schema = 'source_schema'
    ORDER BY table_name, ordinal_position;
    ```
    
    #### Step 2: Generate Extraction Config
    
    ```bash
    python scripts/pipeline_orchestrator.py generate \
      --type airflow \
      --source postgres \
      --tables orders,customers,products \
      --mode incremental \
      --watermark updated_at \
      --output dags/extract_source.py
    ```
    
    #### Step 3: Create dbt Models
    
    ```sql
    -- models/staging/stg_orders.sql
    WITH source AS (
        SELECT * FROM {{ source('postgres', 'orders') }}
    ),
    
    renamed AS (
        SELECT
            order_id,
            customer_id,
            order_date,
            total_amount,
            status,
            _extracted_at
        FROM source
        WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
    )
    
    SELECT * FROM renamed
    ```
    
    ```sql
    -- models/marts/fct_orders.sql
    {{
        config(
            materialized='incremental',
            unique_key='order_id',
            cluster_by=['order_date']
        )
    }}
    
    SELECT
        o.order_id,
        o.customer_id,
        c.customer_segment,
        o.order_date,
        o.total_amount,
        o.status
    FROM {{ ref('stg_orders') }} o
    LEFT JOIN {{ ref('dim_customers') }} c
        ON o.customer_id = c.customer_id
    
    {% if is_incremental() %}
    WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
    {% endif %}
    ```
    
    #### Step 4: Configure Data Quality Tests
    
    ```yaml
    # models/marts/schema.yml
    version: 2
    
    models:
      - name: fct_orders
        description: "Order fact table"
        columns:
          - name: order_id
            tests:
              - unique
              - not_null
          - name: total_amount
            tests:
              - not_null
              - dbt_utils.accepted_range:
                  min_value: 0
                  max_value: 1000000
          - name: order_date
            tests:
              - not_null
              - dbt_utils.recency:
                  datepart: day
                  field: order_date
                  interval: 1
    ```
    
    #### Step 5: Create Airflow DAG
    
    ```python
    # dags/daily_etl.py
    from airflow import DAG
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    from airflow.operators.bash import BashOperator
    from airflow.utils.dates import days_ago
    from datetime import timedelta
    
    default_args = {
        'owner': 'data-team',
        'depends_on_past': False,
        'email_on_failure': True,
        'email': ['data-alerts@company.com'],
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
    }
    
    with DAG(
        'daily_etl_pipeline',
        default_args=default_args,
        description='Daily ETL from PostgreSQL to Snowflake',
        schedule_interval='0 5 * * *',
        start_date=days_ago(1),
        catchup=False,
        tags=['etl', 'daily'],
    ) as dag:
    
        extract = BashOperator(
            task_id='extract_source_data',
            bash_command='python /opt/airflow/scripts/extract.py --date {{ ds }}',
        )
    
        transform = BashOperator(
            task_id='run_dbt_models',
            bash_command='cd /opt/airflow/dbt && dbt run --select marts.*',
        )
    
        test = BashOperator(
            task_id='run_dbt_tests',
            bash_command='cd /opt/airflow/dbt && dbt test --select marts.*',
        )
    
        notify = BashOperator(
            task_id='send_notification',
            bash_command='python /opt/airflow/scripts/notify.py --status success',
            trigger_rule='all_success',
        )
    
        extract >> transform >> test >> notify
    ```
    
    #### Step 6: Validate Pipeline
    
    ```bash
    # Test locally
    dbt run --select stg_orders fct_orders
    dbt test --select fct_orders
    
    # Validate data quality
    python scripts/data_quality_validator.py validate \
      --table fct_orders \
      --checks all \
      --output reports/quality_report.json
    ```
    
    ---
    
    ### Workflow 2: Implementing Real-Time Streaming
    
    **Scenario:** Stream events from Kafka, process with Flink/Spark Streaming, sink to data lake.
    
    #### Step 1: Define Event Schema
    
    ```json
    {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "title": "UserEvent",
      "type": "object",
      "required": ["event_id", "user_id", "event_type", "timestamp"],
      "properties": {
        "event_id": {"type": "string", "format": "uuid"},
        "user_id": {"type": "string"},
        "event_type": {"type": "string", "enum": ["page_view", "click", "purchase"]},
        "timestamp": {"type": "string", "format": "date-time"},
        "properties": {"type": "object"}
      }
    }
    ```
    
    #### Step 2: Create Kafka Topic
    
    ```bash
    # Create topic with appropriate partitions
    kafka-topics.sh --create \
      --bootstrap-server localhost:9092 \
      --topic user-events \
      --partitions 12 \
      --replication-factor 3 \
      --config retention.ms=604800000 \
      --config cleanup.policy=delete
    
    # Verify topic
    kafka-topics.sh --describe \
      --bootstrap-server localhost:9092 \
      --topic user-events
    ```
    
    #### Step 3: Implement Spark Streaming Job
    
    ```python
    # streaming/user_events_processor.py
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import (
        from_json, col, window, count, avg,
        to_timestamp, current_timestamp
    )
    from pyspark.sql.types import (
        StructType, StructField, StringType,
        TimestampType, MapType
    )
    
    # Initialize Spark
    spark = SparkSession.builder \
        .appName("UserEventsProcessor") \
        .config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events") \
        .config("spark.sql.shuffle.partitions", "12") \
        .getOrCreate()
    
    # Define schema
    event_schema = StructType([
        StructField("event_id", StringType(), False),
        StructField("user_id", StringType(), False),
        StructField("event_type", StringType(), False),
        StructField("timestamp", StringType(), False),
        StructField("properties", MapType(StringType(), StringType()), True)
    ])
    
    # Read from Kafka
    events_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "user-events") \
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .load()
    
    # Parse JSON
    parsed_df = events_df \
        .select(from_json(col("value").cast("string"), event_schema).alias("data")) \
        .select("data.*") \
        .withColumn("event_timestamp", to_timestamp(col("timestamp")))
    
    # Windowed aggregation
    aggregated_df = parsed_df \
        .withWatermark("event_timestamp", "10 minutes") \
        .groupBy(
            window(col("event_timestamp"), "5 minutes"),
            col("event_type")
        ) \
        .agg(
            count("*").alias("event_count"),
            approx_count_distinct("user_id").alias("unique_users")
        )
    
    # Write to Delta Lake
    query = aggregated_df.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", "/checkpoints/user-events-aggregated") \
        .option("path", "/data/lake/user_events_aggregated") \
        .trigger(processingTime="1 minute") \
        .start()
    
    query.awaitTermination()
    ```
    
    #### Step 4: Handle Late Data and Errors
    
    ```python
    # Dead letter queue for failed records
    from pyspark.sql.functions import current_timestamp, lit
    
    def process_with_error_handling(batch_df, batch_id):
        try:
            # Attempt processing
            valid_df = batch_df.filter(col("event_id").isNotNull())
            invalid_df = batch_df.filter(col("event_id").isNull())
    
            # Write valid records
            valid_df.write \
                .format("delta") \
                .mode("append") \
                .save("/data/lake/user_events")
    
            # Write invalid to DLQ
           
    
    ... (truncated)