Skip to main content

⚙️ Data Transformation Pipeline

How we transform raw Chicago Open Data into actionable business insights through intelligent category mapping, temporal aggregation, and business logic.

🔄 Transformation Overview

The multi-stage process of converting raw data into business intelligence

1️⃣

Raw Data Ingestion

SODA API responses with daily business license counts

2️⃣

Category Mapping

License descriptions → Business categories

3️⃣

Temporal Aggregation

Daily → Weekly → Trend analysis

4️⃣

Business Metrics

Week-over-week changes, momentum indices, and trend analysis

🏷️ Category Mapping Strategy

Converting diverse license descriptions into meaningful business categories

The Challenge

Chicago's business license system has hundreds of different license types, from "Retail Food Establishment" to "Limited Business License" to "Public Place of Amusement." We needed to group these into meaningful business categories for analysis.

Our goal was to create 4-5 high-level categories that business owners and economic developers could easily understand and act upon.

Category Framework

🍽️ Food & Beverage

Restaurants, bars, food trucks, grocery stores, catering services

💼 Professional Services

Consulting, legal, accounting, real estate, financial services

💅 Personal Services

Salons, spas, fitness, healthcare, personal care

🏠 Home Services

Construction, maintenance, landscaping, cleaning, repairs

Implementation Code

import pandas as pd
from typing import Dict, List

class BusinessCategoryMapper:
    def __init__(self):
        # Define category keywords with priority order
        self.category_keywords = {
            'Food & Beverage': [
                'food', 'restaurant', 'catering', 'bar', 'liquor', 'grocery',
                'bakery', 'cafe', 'pizzeria', 'tavern', 'brewery'
            ],
            'Professional Services': [
                'consulting', 'legal', 'accounting', 'real estate', 'financial',
                'insurance', 'advertising', 'marketing', 'technology', 'engineering'
            ],
            'Personal Services': [
                'salon', 'spa', 'fitness', 'health', 'beauty', 'wellness',
                'massage', 'therapy', 'medical', 'dental', 'cosmetic'
            ],
            'Home Services': [
                'construction', 'maintenance', 'landscaping', 'cleaning',
                'repair', 'plumbing', 'electrical', 'hvac', 'roofing'
            ]
        }

    def map_license_to_category(self, license_description: str) -> str:
        """Map a license description to a business category"""
        description_lower = license_description.lower()

        # Try exact matches first
        for category, keywords in self.category_keywords.items():
            for keyword in keywords:
                if keyword in description_lower:
                    return category

        # Fallback to fuzzy matching for edge cases
        return self._fuzzy_match(description_lower)

    def _fuzzy_match(self, description: str) -> str:
        """Fuzzy matching for complex license descriptions"""
        # Implementation of fuzzy matching logic
        # Returns best match based on similarity scores
        return 'Other'  # Default fallback

📅 Temporal Aggregation

Converting daily data into weekly trends and business metrics

Daily to Weekly Rollup

We aggregate daily business license counts into weekly totals, ensuring consistent week boundaries (Monday-Sunday) for reliable trend analysis.

def aggregate_daily_to_weekly(df: pd.DataFrame) -> pd.DataFrame:
    """Aggregate daily business license data to weekly totals"""

    # Convert date strings to datetime
    df['day'] = pd.to_datetime(df['day'])

    # Calculate week start (Monday)
    df['week_start'] = df['day'] - pd.to_timedelta(df['day'].dt.weekday, unit='D')

    # Group by week, community area, and category
    weekly_agg = df.groupby([
        'week_start',
        'community_area',
        'community_area_name',
        'category'
    ]).agg({
        'n': 'sum'  # Sum daily counts
    }).reset_index()

    # Rename for clarity
    weekly_agg = weekly_agg.rename(columns={'n': 'new_licenses'})

    return weekly_agg

def fill_missing_weeks(df: pd.DataFrame) -> pd.DataFrame:
    """Fill missing weeks with zero values for consistent time series"""

    # Create complete date range
    min_date = df['week_start'].min()
    max_date = df['week_start'].max()
    date_range = pd.date_range(start=min_date, end=max_date, freq='W-MON')

    # Get all unique combinations
    areas = df['community_area'].unique()
    categories = df['category'].unique()

    # Create complete index
    complete_index = pd.MultiIndex.from_product([
        date_range, areas, categories
    ], names=['week_start', 'community_area', 'category'])

    # Reindex and fill missing values
    df_complete = df.set_index(['week_start', 'community_area', 'category']).reindex(complete_index)
    df_complete['new_licenses'] = df_complete['new_licenses'].fillna(0)

    return df_complete.reset_index()

Business Metrics Calculation

We calculate several key business metrics that provide actionable insights:

def calculate_business_metrics(df: pd.DataFrame) -> pd.DataFrame:
    """Calculate week-over-week changes and momentum indices"""

    # Sort by date and area for proper calculations
    df = df.sort_values(['community_area', 'category', 'week_start'])

    # Calculate week-over-week change
    df['wow_change'] = df.groupby(['community_area', 'category'])['new_licenses'].diff()
    df['wow_pct_change'] = df.groupby(['community_area', 'category'])['new_licenses'].pct_change()

    # Calculate 13-week moving average (quarterly baseline)
    df['avg_13w'] = df.groupby(['community_area', 'category'])['new_licenses'].rolling(
        window=13, min_periods=1
    ).mean().reset_index(0, drop=True)

    # Calculate momentum index (z-score relative to 13-week average)
    df['momentum_index'] = (
        (df['new_licenses'] - df['avg_13w']) /
        df.groupby(['community_area', 'category'])['new_licenses'].rolling(
            window=13, min_periods=1
        ).std().reset_index(0, drop=True)
    )

    # Cap momentum to reasonable bounds
    df['momentum_index'] = df['momentum_index'].clip(-4, 4)

    return df

def identify_trends(df: pd.DataFrame) -> Dict[str, List]:
    """Identify top performing areas and categories"""

    latest_week = df['week_start'].max()
    latest_data = df[df['week_start'] == latest_week]

    # Top gainers by momentum
    top_gainers = latest_data.nlargest(10, 'momentum_index')[
        ['community_area_name', 'category', 'momentum_index', 'new_licenses']
    ]

    # Top areas by volume
    top_volume = latest_data.groupby('community_area_name')['new_licenses'].sum().nlargest(10)

    # Category performance
    category_performance = latest_data.groupby('category')['new_licenses'].sum().sort_values(ascending=False)

    return {
        'top_gainers': top_gainers.to_dict('records'),
        'top_volume': top_volume.to_dict(),
        'category_performance': category_performance.to_dict()
    }

✅ Data Quality & Validation

Ensuring data integrity throughout the transformation process

Validation Checks

We implement comprehensive data quality checks at each stage of transformation:

def validate_transformed_data(df: pd.DataFrame) -> Dict[str, Any]:
    """Comprehensive validation of transformed data"""

    validation_results = {
        'total_records': len(df),
        'date_range': {
            'start': df['week_start'].min(),
            'end': df['week_start'].max(),
            'weeks': df['week_start'].nunique()
        },
        'geographic_coverage': {
            'total_areas': df['community_area'].nunique(),
            'areas_with_data': df[df['new_licenses'] > 0]['community_area'].nunique()
        },
        'category_distribution': df['category'].value_counts().to_dict(),
        'data_quality_issues': []
    }

    # Check for missing values
    missing_values = df.isnull().sum()
    if missing_values.any():
        validation_results['data_quality_issues'].append({
            'type': 'missing_values',
            'details': missing_values[missing_values > 0].to_dict()
        })

    # Check for negative values
    negative_licenses = df[df['new_licenses'] < 0]
    if len(negative_licenses) > 0:
        validation_results['data_quality_issues'].append({
            'type': 'negative_values',
            'count': len(negative_licenses)
        })

    # Check for extreme outliers
    q99 = df['new_licenses'].quantile(0.99)
    extreme_outliers = df[df['new_licenses'] > q99 * 3]
    if len(extreme_outliers) > 0:
        validation_results['data_quality_issues'].append({
            'type': 'extreme_outliers',
            'count': len(extreme_outliers),
            'threshold': q99 * 3
        })

    return validation_results

def log_validation_results(results: Dict[str, Any]):
    """Log validation results for monitoring"""

    print("🔍 Data Validation Results:")
    print(f"   Total Records: {results['total_records']:,}")
    print(f"   Date Range: {results['date_range']['start']} to {results['date_range']['end']}")
    print(f"   Geographic Coverage: {results['geographic_coverage']['total_areas']} areas")
    print(f"   Categories: {len(results['category_distribution'])}")

    if results['data_quality_issues']:
        print("   ⚠️  Quality Issues Found:")
        for issue in results['data_quality_issues']:
            print(f"      - {issue['type']}: {issue.get('count', 'N/A')}")
    else:
        print("   ✅ No quality issues detected")

Data Lineage Tracking

We track the lineage of our data transformations to ensure reproducibility and debugging:

class DataLineageTracker:
    def __init__(self):
        self.transformations = []
        self.data_snapshots = {}

    def log_transformation(self, step: str, input_shape: tuple, output_shape: tuple,
                          params: Dict[str, Any]):
        """Log a transformation step"""
        self.transformations.append({
            'step': step,
            'timestamp': pd.Timestamp.now(),
            'input_shape': input_shape,
            'output_shape': output_shape,
            'parameters': params
        })

    def save_snapshot(self, name: str, df: pd.DataFrame):
        """Save a data snapshot for debugging"""
        self.data_snapshots[name] = {
            'timestamp': pd.Timestamp.now(),
            'shape': df.shape,
            'columns': list(df.columns),
            'sample_data': df.head(5).to_dict('records')
        }

    def generate_report(self) -> str:
        """Generate a transformation report"""
        report = "🔄 Data Transformation Report\n\n"

        for i, transform in enumerate(self.transformations, 1):
            report += f"{i}. {transform['step']}\n"
            report += f"   Input: {transform['input_shape']} → Output: {transform['output_shape']}\n"
            report += f"   Time: {transform['timestamp']}\n"
            report += f"   Params: {transform['parameters']}\n\n"

        return report

⚡ Performance Optimization

Techniques to ensure fast and efficient data processing

Memory Management

  • • Use appropriate data types (int32 vs int64)
  • • Process data in chunks for large datasets
  • • Clean up intermediate DataFrames
  • • Use categorical types for repeated strings

Processing Efficiency

  • • Vectorized operations over loops
  • • Efficient groupby operations
  • • Minimize DataFrame copies
  • • Use NumPy for numeric operations

Optimization Code Example

def optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """Optimize DataFrame for memory and performance"""

    # Convert string columns to categorical where beneficial
    for col in df.select_dtypes(include=['object']):
        if df[col].nunique() / len(df) < 0.5:  # Less than 50% unique values
            df[col] = df[col].astype('category')

    # Use appropriate integer types
    for col in df.select_dtypes(include=['int64']):
        if df[col].min() >= 0:
            if df[col].max() < 255:
                df[col] = df[col].astype('uint8')
            elif df[col].max() < 65535:
                df[col] = df[col].astype('uint16')
            else:
                df[col] = df[col].astype('uint32')
        else:
            if df[col].min() > -128 and df[col].max() < 127:
                df[col] = df[col].astype('int8')
            elif df[col].min() > -32768 and df[col].max() < 32767:
                df[col] = df[col].astype('int16')
            else:
                df[col] = df[col].astype('int32')

    return df

def process_in_chunks(df: pd.DataFrame, chunk_size: int = 10000):
    """Process large DataFrames in chunks to manage memory"""

    total_rows = len(df)
    for start_idx in range(0, total_rows, chunk_size):
        end_idx = min(start_idx + chunk_size, total_rows)
        chunk = df.iloc[start_idx:end_idx]

        # Process chunk
        yield process_chunk(chunk)

Ready for the Next Chapter?

Now that we've transformed our data into business insights, let's see how we load it into our data warehouse and prepare it for visualization.