📥 Data Loading & Storage
How we load transformed data into Google Sheets as our data warehouse and prepare it for seamless integration with Looker Studio dashboards.
🏗️ Storage Strategy
Why we chose Google Sheets as our data warehouse solution
The Decision
For the Chicago SMB Market Radar, we needed a data storage solution that was cost-effective, reliable, and seamlessly integrated with our visualization layer. After evaluating various options, we chose Google Sheets for several compelling reasons.
Why Google Sheets?
✅ Advantages
- • Zero infrastructure costs
- • Built-in version control
- • Real-time collaboration
- • Seamless Looker Studio integration
- • Familiar interface for stakeholders
⚠️ Considerations
- • Row limits (10M per sheet)
- • API rate limits
- • Limited data types
- • No advanced querying
Alternative Solutions Considered
PostgreSQL/MySQL
Too complex for this use case, requires infrastructure management
BigQuery
Overkill for our data volume, adds complexity and cost
CSV Files + Git
Lacks real-time updates and collaboration features
🔌 Google Sheets Integration
How we programmatically interact with Google Sheets using Python
Authentication Setup
We use Google Service Account authentication to securely access and modify our Google Sheets without requiring user interaction.
import gspread
from google.oauth2.service_account import Credentials
def setup_google_sheets_client():
"""Setup Google Sheets client with service account"""
# Define the scope
scope = [
'https://spreadsheets.google.com/feeds',
'https://www.googleapis.com/auth/drive'
]
# Load credentials from service account file
credentials = Credentials.from_service_account_file(
'path/to/service-account.json',
scopes=scope
)
# Create client
client = gspread.authorize(credentials)
return client
def get_or_create_sheet(client, sheet_name: str, sheet_id: str = None):
"""Get existing sheet or create new one"""
if sheet_id:
# Open existing sheet by ID
sheet = client.open_by_key(sheet_id)
else:
# Create new sheet
sheet = client.create(sheet_name)
return sheetData Loading Operations
We implement several data loading patterns to efficiently update our sheets:
class GoogleSheetsLoader:
def __init__(self, client, sheet_id: str):
self.client = client
self.sheet = client.open_by_key(sheet_id)
def load_weekly_data(self, df: pd.DataFrame, tab_name: str):
"""Load weekly aggregated data to specified tab"""
# Get or create the tab
try:
worksheet = self.sheet.worksheet(tab_name)
except gspread.WorksheetNotFound:
worksheet = self.sheet.add_worksheet(tab_name, 1000, 20)
# Clear existing data
worksheet.clear()
# Prepare headers
headers = list(df.columns)
worksheet.append_row(headers)
# Load data in batches
batch_size = 1000
for i in range(0, len(df), batch_size):
batch = df.iloc[i:i+batch_size]
rows = [list(row) for _, row in batch.iterrows()]
worksheet.append_rows(rows)
# Format the worksheet
self._format_worksheet(worksheet, headers)
def _format_worksheet(self, worksheet, headers):
"""Apply formatting to the worksheet"""
# Format headers
worksheet.format('A1:Z1', {
'backgroundColor': {'red': 0.2, 'green': 0.6, 'blue': 0.9},
'textFormat': {'bold': True, 'foregroundColor': {'red': 1, 'green': 1, 'blue': 1}}
})
# Freeze header row
worksheet.freeze(rows=1)
# Auto-resize columns
worksheet.columns_auto_resize(0, len(headers))📋 Data Warehouse Schema
The structure and organization of our Google Sheets data warehouse
Sheet Organization
We organize our data into logical tabs that serve different analytical purposes:
📊 Data Tabs
- • licenses_weekly: Weekly business license data
- • permits_weekly: Weekly building permit data
- • cta_weekly: Weekly transit ridership data
- • summary_latest: Current period summaries
🔧 Configuration Tabs
- • community_areas: Geographic reference data
- • category_mapping: Business category definitions
- • metadata: Data lineage and version info
Data Structure Example
licenses_weekly Tab Structure: | week_start | community_area | community_area_name | category | new_licenses | wow_change | avg_13w | momentum_index | |------------|----------------|---------------------|----------|--------------|------------|---------|----------------| | 2025-01-06 | 8 | Near North Side | Food | 12 | 2 | 10.5 | 0.8 | | 2025-01-06 | 8 | Near North Side | Retail | 8 | -1 | 7.2 | 0.4 | | 2025-01-06 | 32 | Loop | Food | 15 | 3 | 12.1 | 1.2 | | 2025-01-06 | 32 | Loop | Services | 6 | 0 | 5.8 | 0.1 | Key Features: - Consistent date formatting (YYYY-MM-DD) - Numeric community area codes - Standardized category names - Calculated business metrics - No missing values
Data Quality Constraints
- Data Types: Enforce proper data types (dates, numbers, text)
- Required Fields: Ensure all critical fields are populated
- Value Ranges: Validate numeric values are within expected ranges
- Referential Integrity: Maintain consistency across related tabs
- Format Standards: Consistent date formats and naming conventions
🔄 Loading Patterns
Different strategies for loading data efficiently and reliably
Full Refresh vs Incremental
🔄 Full Refresh
Complete replacement of all data in a tab
- • Simple and reliable
- • Ensures data consistency
- • Good for small datasets
- • Easy to debug
📈 Incremental Update
Update only new or changed records
- • Faster execution
- • Lower API usage
- • More complex logic
- • Risk of data inconsistency
Atomic Operations
We implement atomic operations to ensure data consistency and prevent partial updates that could corrupt our data warehouse.
def atomic_sheet_update(self, tab_name: str, new_data: pd.DataFrame):
"""Update sheet atomically using temporary tab"""
# Create temporary tab
temp_tab_name = f"{tab_name}_temp_{int(time.time())}"
temp_worksheet = self.sheet.add_worksheet(temp_tab_name, 1000, 20)
try:
# Load data to temporary tab
self._load_data_to_worksheet(temp_worksheet, new_data)
# Validate data integrity
if not self._validate_worksheet_data(temp_worksheet, new_data):
raise ValueError("Data validation failed")
# Get original worksheet
original_worksheet = self.sheet.worksheet(tab_name)
# Swap names atomically
original_worksheet.update_title(f"{tab_name}_old_{int(time.time())}")
temp_worksheet.update_title(tab_name)
# Clean up old worksheet
self.sheet.del_worksheet(original_worksheet)
return True
except Exception as e:
# Clean up temporary tab on failure
try:
self.sheet.del_worksheet(temp_worksheet)
except:
pass
raise eBatch Processing
For large datasets, we process data in batches to manage memory usage and API rate limits effectively.
def batch_load_data(self, df: pd.DataFrame, batch_size: int = 1000):
"""Load data in batches to manage memory and API limits"""
total_rows = len(df)
batches_processed = 0
for start_idx in range(0, total_rows, batch_size):
end_idx = min(start_idx + batch_size, total_rows)
batch = df.iloc[start_idx:end_idx]
# Process batch
self._process_batch(batch)
batches_processed += 1
# Progress tracking
progress = (end_idx / total_rows) * 100
print(f"Progress: {progress:.1f}% ({batches_processed} batches)")
# Rate limiting
if batches_processed % 5 == 0:
time.sleep(1) # Pause every 5 batches
return batches_processed⚠️ Error Handling & Recovery
Robust error handling to ensure data loading reliability
Common Failure Scenarios
🚨 API Failures
- • Rate limit exceeded
- • Authentication expired
- • Network timeouts
- • Service unavailable
⚠️ Data Issues
- • Invalid data formats
- • Missing required fields
- • Data type mismatches
- • Size limit exceeded
Recovery Strategies
def robust_data_loading(self, df: pd.DataFrame, max_retries: int = 3):
"""Robust data loading with comprehensive error handling"""
for attempt in range(max_retries):
try:
# Attempt to load data
result = self._load_data_with_validation(df)
if result['success']:
print(f"Data loaded successfully on attempt {attempt + 1}")
return result
except gspread.exceptions.APIError as e:
if e.response.status_code == 429: # Rate limited
wait_time = (2 ** attempt) * 60 # Exponential backoff
print(f"Rate limited, waiting {wait_time} seconds...")
time.sleep(wait_time)
else:
print(f"API error: {e}")
except Exception as e:
print(f"Unexpected error on attempt {attempt + 1}: {e}")
# Wait before retry
if attempt < max_retries - 1:
time.sleep(5)
# All retries failed
raise Exception(f"Failed to load data after {max_retries} attempts")
def _load_data_with_validation(self, df: pd.DataFrame):
"""Load data with comprehensive validation"""
# Pre-load validation
validation_result = self._validate_data_before_load(df)
if not validation_result['is_valid']:
return {'success': False, 'errors': validation_result['errors']}
# Load data
load_result = self._execute_data_load(df)
# Post-load validation
post_validation = self._validate_data_after_load(df)
return {
'success': load_result['success'] and post_validation['is_valid'],
'rows_loaded': load_result['rows_loaded'],
'warnings': post_validation['warnings']
}⚡ Performance Optimization
Techniques to optimize data loading performance and efficiency
API Optimization
- • Batch API calls
- • Efficient authentication
- • Connection pooling
- • Rate limit management
Data Processing
- • Memory-efficient operations
- • Parallel processing
- • Streaming data handling
- • Optimized data structures
Performance Monitoring
We track performance metrics to identify bottlenecks and optimize accordingly:
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'load_times': [],
'api_calls': 0,
'rows_processed': 0,
'errors': 0
}
def start_timer(self):
return time.time()
def end_timer(self, start_time, operation: str):
duration = time.time() - start_time
self.metrics['load_times'].append({
'operation': operation,
'duration': duration,
'timestamp': time.time()
})
print(f"{operation} completed in {duration:.2f} seconds")
def log_api_call(self):
self.metrics['api_calls'] += 1
def log_rows_processed(self, count: int):
self.metrics['rows_processed'] += count
def get_performance_summary(self):
if not self.metrics['load_times']:
return "No performance data available"
avg_load_time = sum(t['duration'] for t in self.metrics['load_times']) / len(self.metrics['load_times'])
return {
'total_operations': len(self.metrics['load_times']),
'average_load_time': avg_load_time,
'total_api_calls': self.metrics['api_calls'],
'total_rows_processed': self.metrics['rows_processed'],
'error_rate': self.metrics['errors'] / max(1, self.metrics['api_calls'])
}Ready for the Next Chapter?
Now that we've loaded our data into Google Sheets, let's see how we transform it into compelling visualizations and business insights.