⚙️ Data Transformation Pipeline
How we transform raw Chicago Open Data into actionable business insights through intelligent category mapping, temporal aggregation, and business logic.
🔄 Transformation Overview
The multi-stage process of converting raw data into business intelligence
Raw Data Ingestion
SODA API responses with daily business license counts
Category Mapping
License descriptions → Business categories
Temporal Aggregation
Daily → Weekly → Trend analysis
Business Metrics
Week-over-week changes, momentum indices, and trend analysis
🏷️ Category Mapping Strategy
Converting diverse license descriptions into meaningful business categories
The Challenge
Chicago's business license system has hundreds of different license types, from "Retail Food Establishment" to "Limited Business License" to "Public Place of Amusement." We needed to group these into meaningful business categories for analysis.
Our goal was to create 4-5 high-level categories that business owners and economic developers could easily understand and act upon.
Category Framework
🍽️ Food & Beverage
Restaurants, bars, food trucks, grocery stores, catering services
💼 Professional Services
Consulting, legal, accounting, real estate, financial services
💅 Personal Services
Salons, spas, fitness, healthcare, personal care
🏠 Home Services
Construction, maintenance, landscaping, cleaning, repairs
Implementation Code
import pandas as pd
from typing import Dict, List
class BusinessCategoryMapper:
def __init__(self):
# Define category keywords with priority order
self.category_keywords = {
'Food & Beverage': [
'food', 'restaurant', 'catering', 'bar', 'liquor', 'grocery',
'bakery', 'cafe', 'pizzeria', 'tavern', 'brewery'
],
'Professional Services': [
'consulting', 'legal', 'accounting', 'real estate', 'financial',
'insurance', 'advertising', 'marketing', 'technology', 'engineering'
],
'Personal Services': [
'salon', 'spa', 'fitness', 'health', 'beauty', 'wellness',
'massage', 'therapy', 'medical', 'dental', 'cosmetic'
],
'Home Services': [
'construction', 'maintenance', 'landscaping', 'cleaning',
'repair', 'plumbing', 'electrical', 'hvac', 'roofing'
]
}
def map_license_to_category(self, license_description: str) -> str:
"""Map a license description to a business category"""
description_lower = license_description.lower()
# Try exact matches first
for category, keywords in self.category_keywords.items():
for keyword in keywords:
if keyword in description_lower:
return category
# Fallback to fuzzy matching for edge cases
return self._fuzzy_match(description_lower)
def _fuzzy_match(self, description: str) -> str:
"""Fuzzy matching for complex license descriptions"""
# Implementation of fuzzy matching logic
# Returns best match based on similarity scores
return 'Other' # Default fallback📅 Temporal Aggregation
Converting daily data into weekly trends and business metrics
Daily to Weekly Rollup
We aggregate daily business license counts into weekly totals, ensuring consistent week boundaries (Monday-Sunday) for reliable trend analysis.
def aggregate_daily_to_weekly(df: pd.DataFrame) -> pd.DataFrame:
"""Aggregate daily business license data to weekly totals"""
# Convert date strings to datetime
df['day'] = pd.to_datetime(df['day'])
# Calculate week start (Monday)
df['week_start'] = df['day'] - pd.to_timedelta(df['day'].dt.weekday, unit='D')
# Group by week, community area, and category
weekly_agg = df.groupby([
'week_start',
'community_area',
'community_area_name',
'category'
]).agg({
'n': 'sum' # Sum daily counts
}).reset_index()
# Rename for clarity
weekly_agg = weekly_agg.rename(columns={'n': 'new_licenses'})
return weekly_agg
def fill_missing_weeks(df: pd.DataFrame) -> pd.DataFrame:
"""Fill missing weeks with zero values for consistent time series"""
# Create complete date range
min_date = df['week_start'].min()
max_date = df['week_start'].max()
date_range = pd.date_range(start=min_date, end=max_date, freq='W-MON')
# Get all unique combinations
areas = df['community_area'].unique()
categories = df['category'].unique()
# Create complete index
complete_index = pd.MultiIndex.from_product([
date_range, areas, categories
], names=['week_start', 'community_area', 'category'])
# Reindex and fill missing values
df_complete = df.set_index(['week_start', 'community_area', 'category']).reindex(complete_index)
df_complete['new_licenses'] = df_complete['new_licenses'].fillna(0)
return df_complete.reset_index()Business Metrics Calculation
We calculate several key business metrics that provide actionable insights:
def calculate_business_metrics(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate week-over-week changes and momentum indices"""
# Sort by date and area for proper calculations
df = df.sort_values(['community_area', 'category', 'week_start'])
# Calculate week-over-week change
df['wow_change'] = df.groupby(['community_area', 'category'])['new_licenses'].diff()
df['wow_pct_change'] = df.groupby(['community_area', 'category'])['new_licenses'].pct_change()
# Calculate 13-week moving average (quarterly baseline)
df['avg_13w'] = df.groupby(['community_area', 'category'])['new_licenses'].rolling(
window=13, min_periods=1
).mean().reset_index(0, drop=True)
# Calculate momentum index (z-score relative to 13-week average)
df['momentum_index'] = (
(df['new_licenses'] - df['avg_13w']) /
df.groupby(['community_area', 'category'])['new_licenses'].rolling(
window=13, min_periods=1
).std().reset_index(0, drop=True)
)
# Cap momentum to reasonable bounds
df['momentum_index'] = df['momentum_index'].clip(-4, 4)
return df
def identify_trends(df: pd.DataFrame) -> Dict[str, List]:
"""Identify top performing areas and categories"""
latest_week = df['week_start'].max()
latest_data = df[df['week_start'] == latest_week]
# Top gainers by momentum
top_gainers = latest_data.nlargest(10, 'momentum_index')[
['community_area_name', 'category', 'momentum_index', 'new_licenses']
]
# Top areas by volume
top_volume = latest_data.groupby('community_area_name')['new_licenses'].sum().nlargest(10)
# Category performance
category_performance = latest_data.groupby('category')['new_licenses'].sum().sort_values(ascending=False)
return {
'top_gainers': top_gainers.to_dict('records'),
'top_volume': top_volume.to_dict(),
'category_performance': category_performance.to_dict()
}✅ Data Quality & Validation
Ensuring data integrity throughout the transformation process
Validation Checks
We implement comprehensive data quality checks at each stage of transformation:
def validate_transformed_data(df: pd.DataFrame) -> Dict[str, Any]:
"""Comprehensive validation of transformed data"""
validation_results = {
'total_records': len(df),
'date_range': {
'start': df['week_start'].min(),
'end': df['week_start'].max(),
'weeks': df['week_start'].nunique()
},
'geographic_coverage': {
'total_areas': df['community_area'].nunique(),
'areas_with_data': df[df['new_licenses'] > 0]['community_area'].nunique()
},
'category_distribution': df['category'].value_counts().to_dict(),
'data_quality_issues': []
}
# Check for missing values
missing_values = df.isnull().sum()
if missing_values.any():
validation_results['data_quality_issues'].append({
'type': 'missing_values',
'details': missing_values[missing_values > 0].to_dict()
})
# Check for negative values
negative_licenses = df[df['new_licenses'] < 0]
if len(negative_licenses) > 0:
validation_results['data_quality_issues'].append({
'type': 'negative_values',
'count': len(negative_licenses)
})
# Check for extreme outliers
q99 = df['new_licenses'].quantile(0.99)
extreme_outliers = df[df['new_licenses'] > q99 * 3]
if len(extreme_outliers) > 0:
validation_results['data_quality_issues'].append({
'type': 'extreme_outliers',
'count': len(extreme_outliers),
'threshold': q99 * 3
})
return validation_results
def log_validation_results(results: Dict[str, Any]):
"""Log validation results for monitoring"""
print("🔍 Data Validation Results:")
print(f" Total Records: {results['total_records']:,}")
print(f" Date Range: {results['date_range']['start']} to {results['date_range']['end']}")
print(f" Geographic Coverage: {results['geographic_coverage']['total_areas']} areas")
print(f" Categories: {len(results['category_distribution'])}")
if results['data_quality_issues']:
print(" ⚠️ Quality Issues Found:")
for issue in results['data_quality_issues']:
print(f" - {issue['type']}: {issue.get('count', 'N/A')}")
else:
print(" ✅ No quality issues detected")Data Lineage Tracking
We track the lineage of our data transformations to ensure reproducibility and debugging:
class DataLineageTracker:
def __init__(self):
self.transformations = []
self.data_snapshots = {}
def log_transformation(self, step: str, input_shape: tuple, output_shape: tuple,
params: Dict[str, Any]):
"""Log a transformation step"""
self.transformations.append({
'step': step,
'timestamp': pd.Timestamp.now(),
'input_shape': input_shape,
'output_shape': output_shape,
'parameters': params
})
def save_snapshot(self, name: str, df: pd.DataFrame):
"""Save a data snapshot for debugging"""
self.data_snapshots[name] = {
'timestamp': pd.Timestamp.now(),
'shape': df.shape,
'columns': list(df.columns),
'sample_data': df.head(5).to_dict('records')
}
def generate_report(self) -> str:
"""Generate a transformation report"""
report = "🔄 Data Transformation Report\n\n"
for i, transform in enumerate(self.transformations, 1):
report += f"{i}. {transform['step']}\n"
report += f" Input: {transform['input_shape']} → Output: {transform['output_shape']}\n"
report += f" Time: {transform['timestamp']}\n"
report += f" Params: {transform['parameters']}\n\n"
return report⚡ Performance Optimization
Techniques to ensure fast and efficient data processing
Memory Management
- • Use appropriate data types (int32 vs int64)
- • Process data in chunks for large datasets
- • Clean up intermediate DataFrames
- • Use categorical types for repeated strings
Processing Efficiency
- • Vectorized operations over loops
- • Efficient groupby operations
- • Minimize DataFrame copies
- • Use NumPy for numeric operations
Optimization Code Example
def optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Optimize DataFrame for memory and performance"""
# Convert string columns to categorical where beneficial
for col in df.select_dtypes(include=['object']):
if df[col].nunique() / len(df) < 0.5: # Less than 50% unique values
df[col] = df[col].astype('category')
# Use appropriate integer types
for col in df.select_dtypes(include=['int64']):
if df[col].min() >= 0:
if df[col].max() < 255:
df[col] = df[col].astype('uint8')
elif df[col].max() < 65535:
df[col] = df[col].astype('uint16')
else:
df[col] = df[col].astype('uint32')
else:
if df[col].min() > -128 and df[col].max() < 127:
df[col] = df[col].astype('int8')
elif df[col].min() > -32768 and df[col].max() < 32767:
df[col] = df[col].astype('int16')
else:
df[col] = df[col].astype('int32')
return df
def process_in_chunks(df: pd.DataFrame, chunk_size: int = 10000):
"""Process large DataFrames in chunks to manage memory"""
total_rows = len(df)
for start_idx in range(0, total_rows, chunk_size):
end_idx = min(start_idx + chunk_size, total_rows)
chunk = df.iloc[start_idx:end_idx]
# Process chunk
yield process_chunk(chunk)Ready for the Next Chapter?
Now that we've transformed our data into business insights, let's see how we load it into our data warehouse and prepare it for visualization.