Skip to main content

📥 Data Loading & Storage

How we load transformed data into Google Sheets as our data warehouse and prepare it for seamless integration with Looker Studio dashboards.

🏗️ Storage Strategy

Why we chose Google Sheets as our data warehouse solution

The Decision

For the Chicago SMB Market Radar, we needed a data storage solution that was cost-effective, reliable, and seamlessly integrated with our visualization layer. After evaluating various options, we chose Google Sheets for several compelling reasons.

Why Google Sheets?

✅ Advantages
  • • Zero infrastructure costs
  • • Built-in version control
  • • Real-time collaboration
  • • Seamless Looker Studio integration
  • • Familiar interface for stakeholders
⚠️ Considerations
  • • Row limits (10M per sheet)
  • • API rate limits
  • • Limited data types
  • • No advanced querying

Alternative Solutions Considered

PostgreSQL/MySQL

Too complex for this use case, requires infrastructure management

BigQuery

Overkill for our data volume, adds complexity and cost

CSV Files + Git

Lacks real-time updates and collaboration features

🔌 Google Sheets Integration

How we programmatically interact with Google Sheets using Python

Authentication Setup

We use Google Service Account authentication to securely access and modify our Google Sheets without requiring user interaction.

import gspread
from google.oauth2.service_account import Credentials

def setup_google_sheets_client():
    """Setup Google Sheets client with service account"""

    # Define the scope
    scope = [
        'https://spreadsheets.google.com/feeds',
        'https://www.googleapis.com/auth/drive'
    ]

    # Load credentials from service account file
    credentials = Credentials.from_service_account_file(
        'path/to/service-account.json',
        scopes=scope
    )

    # Create client
    client = gspread.authorize(credentials)

    return client

def get_or_create_sheet(client, sheet_name: str, sheet_id: str = None):
    """Get existing sheet or create new one"""

    if sheet_id:
        # Open existing sheet by ID
        sheet = client.open_by_key(sheet_id)
    else:
        # Create new sheet
        sheet = client.create(sheet_name)

    return sheet

Data Loading Operations

We implement several data loading patterns to efficiently update our sheets:

class GoogleSheetsLoader:
    def __init__(self, client, sheet_id: str):
        self.client = client
        self.sheet = client.open_by_key(sheet_id)

    def load_weekly_data(self, df: pd.DataFrame, tab_name: str):
        """Load weekly aggregated data to specified tab"""

        # Get or create the tab
        try:
            worksheet = self.sheet.worksheet(tab_name)
        except gspread.WorksheetNotFound:
            worksheet = self.sheet.add_worksheet(tab_name, 1000, 20)

        # Clear existing data
        worksheet.clear()

        # Prepare headers
        headers = list(df.columns)
        worksheet.append_row(headers)

        # Load data in batches
        batch_size = 1000
        for i in range(0, len(df), batch_size):
            batch = df.iloc[i:i+batch_size]
            rows = [list(row) for _, row in batch.iterrows()]
            worksheet.append_rows(rows)

        # Format the worksheet
        self._format_worksheet(worksheet, headers)

    def _format_worksheet(self, worksheet, headers):
        """Apply formatting to the worksheet"""

        # Format headers
        worksheet.format('A1:Z1', {
            'backgroundColor': {'red': 0.2, 'green': 0.6, 'blue': 0.9},
            'textFormat': {'bold': True, 'foregroundColor': {'red': 1, 'green': 1, 'blue': 1}}
        })

        # Freeze header row
        worksheet.freeze(rows=1)

        # Auto-resize columns
        worksheet.columns_auto_resize(0, len(headers))

📋 Data Warehouse Schema

The structure and organization of our Google Sheets data warehouse

Sheet Organization

We organize our data into logical tabs that serve different analytical purposes:

📊 Data Tabs
  • licenses_weekly: Weekly business license data
  • permits_weekly: Weekly building permit data
  • cta_weekly: Weekly transit ridership data
  • summary_latest: Current period summaries
🔧 Configuration Tabs
  • community_areas: Geographic reference data
  • category_mapping: Business category definitions
  • metadata: Data lineage and version info

Data Structure Example

licenses_weekly Tab Structure:

| week_start | community_area | community_area_name | category | new_licenses | wow_change | avg_13w | momentum_index |
|------------|----------------|---------------------|----------|--------------|------------|---------|----------------|
| 2025-01-06 | 8             | Near North Side     | Food     | 12           | 2          | 10.5    | 0.8            |
| 2025-01-06 | 8             | Near North Side     | Retail   | 8            | -1         | 7.2     | 0.4            |
| 2025-01-06 | 32            | Loop                | Food     | 15           | 3          | 12.1    | 1.2            |
| 2025-01-06 | 32            | Loop                | Services | 6            | 0          | 5.8     | 0.1            |

Key Features:
- Consistent date formatting (YYYY-MM-DD)
- Numeric community area codes
- Standardized category names
- Calculated business metrics
- No missing values

Data Quality Constraints

  • Data Types: Enforce proper data types (dates, numbers, text)
  • Required Fields: Ensure all critical fields are populated
  • Value Ranges: Validate numeric values are within expected ranges
  • Referential Integrity: Maintain consistency across related tabs
  • Format Standards: Consistent date formats and naming conventions

🔄 Loading Patterns

Different strategies for loading data efficiently and reliably

Full Refresh vs Incremental

🔄 Full Refresh

Complete replacement of all data in a tab

  • • Simple and reliable
  • • Ensures data consistency
  • • Good for small datasets
  • • Easy to debug
📈 Incremental Update

Update only new or changed records

  • • Faster execution
  • • Lower API usage
  • • More complex logic
  • • Risk of data inconsistency

Atomic Operations

We implement atomic operations to ensure data consistency and prevent partial updates that could corrupt our data warehouse.

def atomic_sheet_update(self, tab_name: str, new_data: pd.DataFrame):
    """Update sheet atomically using temporary tab"""

    # Create temporary tab
    temp_tab_name = f"{tab_name}_temp_{int(time.time())}"
    temp_worksheet = self.sheet.add_worksheet(temp_tab_name, 1000, 20)

    try:
        # Load data to temporary tab
        self._load_data_to_worksheet(temp_worksheet, new_data)

        # Validate data integrity
        if not self._validate_worksheet_data(temp_worksheet, new_data):
            raise ValueError("Data validation failed")

        # Get original worksheet
        original_worksheet = self.sheet.worksheet(tab_name)

        # Swap names atomically
        original_worksheet.update_title(f"{tab_name}_old_{int(time.time())}")
        temp_worksheet.update_title(tab_name)

        # Clean up old worksheet
        self.sheet.del_worksheet(original_worksheet)

        return True

    except Exception as e:
        # Clean up temporary tab on failure
        try:
            self.sheet.del_worksheet(temp_worksheet)
        except:
            pass
        raise e

Batch Processing

For large datasets, we process data in batches to manage memory usage and API rate limits effectively.

def batch_load_data(self, df: pd.DataFrame, batch_size: int = 1000):
    """Load data in batches to manage memory and API limits"""

    total_rows = len(df)
    batches_processed = 0

    for start_idx in range(0, total_rows, batch_size):
        end_idx = min(start_idx + batch_size, total_rows)
        batch = df.iloc[start_idx:end_idx]

        # Process batch
        self._process_batch(batch)
        batches_processed += 1

        # Progress tracking
        progress = (end_idx / total_rows) * 100
        print(f"Progress: {progress:.1f}% ({batches_processed} batches)")

        # Rate limiting
        if batches_processed % 5 == 0:
            time.sleep(1)  # Pause every 5 batches

    return batches_processed

⚠️ Error Handling & Recovery

Robust error handling to ensure data loading reliability

Common Failure Scenarios

🚨 API Failures
  • • Rate limit exceeded
  • • Authentication expired
  • • Network timeouts
  • • Service unavailable
⚠️ Data Issues
  • • Invalid data formats
  • • Missing required fields
  • • Data type mismatches
  • • Size limit exceeded

Recovery Strategies

def robust_data_loading(self, df: pd.DataFrame, max_retries: int = 3):
    """Robust data loading with comprehensive error handling"""

    for attempt in range(max_retries):
        try:
            # Attempt to load data
            result = self._load_data_with_validation(df)

            if result['success']:
                print(f"Data loaded successfully on attempt {attempt + 1}")
                return result

        except gspread.exceptions.APIError as e:
            if e.response.status_code == 429:  # Rate limited
                wait_time = (2 ** attempt) * 60  # Exponential backoff
                print(f"Rate limited, waiting {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print(f"API error: {e}")

        except Exception as e:
            print(f"Unexpected error on attempt {attempt + 1}: {e}")

        # Wait before retry
        if attempt < max_retries - 1:
            time.sleep(5)

    # All retries failed
    raise Exception(f"Failed to load data after {max_retries} attempts")

def _load_data_with_validation(self, df: pd.DataFrame):
    """Load data with comprehensive validation"""

    # Pre-load validation
    validation_result = self._validate_data_before_load(df)
    if not validation_result['is_valid']:
        return {'success': False, 'errors': validation_result['errors']}

    # Load data
    load_result = self._execute_data_load(df)

    # Post-load validation
    post_validation = self._validate_data_after_load(df)

    return {
        'success': load_result['success'] and post_validation['is_valid'],
        'rows_loaded': load_result['rows_loaded'],
        'warnings': post_validation['warnings']
    }

⚡ Performance Optimization

Techniques to optimize data loading performance and efficiency

API Optimization

  • • Batch API calls
  • • Efficient authentication
  • • Connection pooling
  • • Rate limit management

Data Processing

  • • Memory-efficient operations
  • • Parallel processing
  • • Streaming data handling
  • • Optimized data structures

Performance Monitoring

We track performance metrics to identify bottlenecks and optimize accordingly:

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            'load_times': [],
            'api_calls': 0,
            'rows_processed': 0,
            'errors': 0
        }

    def start_timer(self):
        return time.time()

    def end_timer(self, start_time, operation: str):
        duration = time.time() - start_time
        self.metrics['load_times'].append({
            'operation': operation,
            'duration': duration,
            'timestamp': time.time()
        })

        print(f"{operation} completed in {duration:.2f} seconds")

    def log_api_call(self):
        self.metrics['api_calls'] += 1

    def log_rows_processed(self, count: int):
        self.metrics['rows_processed'] += count

    def get_performance_summary(self):
        if not self.metrics['load_times']:
            return "No performance data available"

        avg_load_time = sum(t['duration'] for t in self.metrics['load_times']) / len(self.metrics['load_times'])

        return {
            'total_operations': len(self.metrics['load_times']),
            'average_load_time': avg_load_time,
            'total_api_calls': self.metrics['api_calls'],
            'total_rows_processed': self.metrics['rows_processed'],
            'error_rate': self.metrics['errors'] / max(1, self.metrics['api_calls'])
        }

Ready for the Next Chapter?

Now that we've loaded our data into Google Sheets, let's see how we transform it into compelling visualizations and business insights.