Back to Skills
    🦞

    data-lineage-tracker

    Track data origin, transformations

    By @datadrivenconstruction
    View on GitHub
    SKILL.md
    ---
    slug: "data-lineage-tracker"
    display_name: "Data Lineage Tracker"
    description: "Track data origin, transformations, and flow through construction systems. Essential for audit trails, compliance, and debugging data issues."
    ---
    
    # Data Lineage Tracker for Construction
    
    ## Overview
    
    Track the origin, transformations, and flow of construction data through systems. Provides audit trails for compliance, helps debug data issues, and ensures data governance.
    
    ## Business Case
    
    Construction projects require data accountability:
    - **Audit Compliance**: Know where every number came from
    - **Issue Resolution**: Trace data problems to their source
    - **Change Impact**: Understand what downstream systems are affected
    - **Regulatory Requirements**: Maintain data provenance for legal/insurance
    
    ## Technical Implementation
    
    ```python
    from dataclasses import dataclass, field
    from typing import List, Dict, Any, Optional, Set
    from datetime import datetime
    from enum import Enum
    import json
    import hashlib
    import uuid
    
    class TransformationType(Enum):
        EXTRACT = "extract"
        TRANSFORM = "transform"
        LOAD = "load"
        AGGREGATE = "aggregate"
        JOIN = "join"
        FILTER = "filter"
        CALCULATE = "calculate"
        MANUAL_EDIT = "manual_edit"
        IMPORT = "import"
        EXPORT = "export"
    
    @dataclass
    class DataSource:
        id: str
        name: str
        system: str
        location: str
        owner: str
        created_at: datetime
    
    @dataclass
    class TransformationStep:
        id: str
        transformation_type: TransformationType
        description: str
        input_entities: List[str]
        output_entities: List[str]
        logic: str  # SQL, Python, or description
        performed_by: str  # user or system
        performed_at: datetime
        parameters: Dict[str, Any] = field(default_factory=dict)
    
    @dataclass
    class DataEntity:
        id: str
        name: str
        source_id: str
        entity_type: str  # table, file, field, record
        created_at: datetime
        version: int = 1
        checksum: Optional[str] = None
        parent_entities: List[str] = field(default_factory=list)
        metadata: Dict[str, Any] = field(default_factory=dict)
    
    @dataclass
    class LineageRecord:
        id: str
        entity_id: str
        transformation_id: str
        upstream_entities: List[str]
        downstream_entities: List[str]
        recorded_at: datetime
    
    class ConstructionDataLineageTracker:
        """Track data lineage for construction data flows."""
    
        def __init__(self, project_id: str):
            self.project_id = project_id
            self.sources: Dict[str, DataSource] = {}
            self.entities: Dict[str, DataEntity] = {}
            self.transformations: Dict[str, TransformationStep] = {}
            self.lineage_records: List[LineageRecord] = []
    
        def register_source(self, name: str, system: str, location: str, owner: str) -> DataSource:
            """Register a new data source."""
            source = DataSource(
                id=f"SRC-{uuid.uuid4().hex[:8]}",
                name=name,
                system=system,
                location=location,
                owner=owner,
                created_at=datetime.now()
            )
            self.sources[source.id] = source
            return source
    
        def register_entity(self, name: str, source_id: str, entity_type: str,
                           parent_entities: List[str] = None,
                           metadata: Dict = None) -> DataEntity:
            """Register a data entity (table, file, field)."""
            entity = DataEntity(
                id=f"ENT-{uuid.uuid4().hex[:8]}",
                name=name,
                source_id=source_id,
                entity_type=entity_type,
                created_at=datetime.now(),
                parent_entities=parent_entities or [],
                metadata=metadata or {}
            )
            self.entities[entity.id] = entity
            return entity
    
        def calculate_checksum(self, data: Any) -> str:
            """Calculate checksum for data verification."""
            if isinstance(data, str):
                content = data
            else:
                content = json.dumps(data, sort_keys=True, default=str)
            return hashlib.sha256(content.encode()).hexdigest()[:16]
    
        def record_transformation(self,
                                 transformation_type: TransformationType,
                                 description: str,
                                 input_entities: List[str],
                                 output_entities: List[str],
                                 logic: str,
                                 performed_by: str,
                                 parameters: Dict = None) -> TransformationStep:
            """Record a data transformation."""
            transformation = TransformationStep(
                id=f"TRF-{uuid.uuid4().hex[:8]}",
                transformation_type=transformation_type,
                description=description,
                input_entities=input_entities,
                output_entities=output_entities,
                logic=logic,
                performed_by=performed_by,
                performed_at=datetime.now(),
                parameters=parameters or {}
            )
            self.transformations[transformation.id] = transformation
    
            # Create lineage records
            for output_id in output_entities:
                record = LineageRecord(
                    id=f"LIN-{uuid.uuid4().hex[:8]}",
                    entity_id=output_id,
                    transformation_id=transformation.id,
                    upstream_entities=input_entities,
                    downstream_entities=[],
                    recorded_at=datetime.now()
                )
                self.lineage_records.append(record)
    
                # Update downstream references for input entities
                for input_id in input_entities:
                    for existing_record in self.lineage_records:
                        if existing_record.entity_id == input_id:
                            existing_record.downstream_entities.append(output_id)
    
            return transformation
    
        def trace_upstream(self, entity_id: str, depth: int = None) -> List[Dict]:
            """Trace all upstream sources of an entity."""
            visited = set()
            lineage = []
    
            def trace(eid: str, current_depth: int):
                if eid in visited:
                    return
                if depth is not None and current_depth > depth:
                    return
    
                visited.add(eid)
    
                entity = self.entities.get(eid)
                if not entity:
                    return
    
                # Find transformations that produced this entity
                for record in self.lineage_records:
                    if record.entity_id == eid:
                        transformation = self.transformations.get(record.transformation_id)
                        if transformation:
                            lineage.append({
                                'entity': entity.name,
                                'entity_id': eid,
                                'depth': current_depth,
                                'transformation': transformation.description,
                                'transformation_type': transformation.transformation_type.value,
                                'performed_at': transformation.performed_at.isoformat(),
                                'performed_by': transformation.performed_by,
                                'upstream': record.upstream_entities
                            })
    
                            for upstream_id in record.upstream_entities:
                                trace(upstream_id, current_depth + 1)
    
            trace(entity_id, 0)
            return sorted(lineage, key=lambda x: x['depth'])
    
        def trace_downstream(self, entity_id: str, depth: int = None) -> List[Dict]:
            """Trace all downstream dependencies of an entity."""
            visited = set()
            dependencies = []
    
            def trace(eid: str, current_depth: int):
                if eid in visited:
                    return
                if depth is not None and current_depth > depth:
                    return
    
                visited.add(eid)
    
                entity = self.entities.get(eid)
                if not entity:
                    return
    
                # Find entities that use this entity
                for record in self.lineage_records:
                    if eid in record.upstream_entities:
                        transformation = self.transformations.get(record.transformation_id)
                        if transformation:
                            dependencies.append({
                                'entity': self.entities[record.entity_id].name if record.entity_id in self.entities else record.entity_id,
                                'entity_id': record.entity_id,
                                'depth': current_depth,
                                'transformation': transformation.description,
                                'transformation_type': transformation.transformation_type.value
                            })
    
                            trace(record.entity_id, current_depth + 1)
    
            trace(entity_id, 0)
            return sorted(dependencies, key=lambda x: x['depth'])
    
        def get_entity_history(self, entity_id: str) -> List[Dict]:
            """Get complete history of changes to an entity."""
            history = []
    
            for record in self.lineage_records:
                if record.entity_id == entity_id:
                    transformation = self.transformations.get(record.transformation_id)
                    if transformation:
                        history.append({
                            'timestamp': transformation.performed_at.isoformat(),
                            'action': transformation.transformation_type.value,
                            'description': transformation.description,
                            'performed_by': transformation.performed_by,
                            'inputs': [
                                self.entities[eid].name if eid in self.entities else eid
                                for eid in record.upstream_entities
                            ]
                 
    
    ... (truncated)