#!/usr/bin/env python3 """ Compare local dev database schema with production to find missing columns """ import subprocess import json from collections import defaultdict # Get schema from local dev database def get_local_schema(): """Get all tables and columns from local dev database""" print("šŸ” Fetching LOCAL dev schema...") result = subprocess.run( ["docker", "exec", "bmc_hub_dev-postgres-1", "psql", "-U", "bmcnetworks", "-d", "bmc_hub", "-t", "-c", """ SELECT table_name, column_name, data_type, character_maximum_length, is_nullable, column_default FROM information_schema.columns WHERE table_schema = 'public' ORDER BY table_name, ordinal_position; """], capture_output=True, text=True ) schema = defaultdict(list) for line in result.stdout.strip().split('\n'): if not line.strip(): continue parts = [p.strip() for p in line.split('|')] if len(parts) >= 2: table = parts[0] column = parts[1] data_type = parts[2] if len(parts) > 2 else '' schema[table].append({ 'column': column, 'type': data_type, 'full': line }) return schema # Get schema from production via SSH def get_prod_schema(): """Get all tables and columns from production database""" print("šŸ” Fetching PRODUCTION schema...") result = subprocess.run( ["ssh", "bmcadmin@172.16.31.183", "sudo podman exec bmc-hub-postgres-prod psql -U bmc_hub -d bmc_hub -t -c \"SELECT table_name, column_name, data_type, character_maximum_length, is_nullable, column_default FROM information_schema.columns WHERE table_schema = 'public' ORDER BY table_name, ordinal_position;\""], capture_output=True, text=True ) schema = defaultdict(list) for line in result.stdout.strip().split('\n'): if not line.strip(): continue parts = [p.strip() for p in line.split('|')] if len(parts) >= 2: table = parts[0] column = parts[1] data_type = parts[2] if len(parts) > 2 else '' schema[table].append({ 'column': column, 'type': data_type, 'full': line }) return schema def compare_schemas(local, prod): """Compare schemas and find missing columns""" print("\nšŸ“Š Comparing schemas...\n") missing = defaultdict(list) # Check each table in local for table in sorted(local.keys()): local_cols = {col['column']: col for col in local[table]} prod_cols = {col['column']: col for col in prod.get(table, [])} # Find missing columns for col_name, col_info in local_cols.items(): if col_name not in prod_cols: missing[table].append(col_info) return missing def print_missing(missing): """Print missing columns in a readable format""" if not missing: print("āœ… No missing columns! Schemas are in sync.") return print("āŒ MISSING COLUMNS IN PRODUCTION:\n") print("=" * 80) for table in sorted(missing.keys()): print(f"\nšŸ“‹ Table: {table}") print("-" * 80) for col in missing[table]: print(f" āŒ {col['column']} ({col['type']})") print("\n" + "=" * 80) print(f"\nšŸ“Š Summary: {sum(len(cols) for cols in missing.values())} missing columns across {len(missing)} tables") def generate_sql(missing): """Generate SQL to add missing columns""" if not missing: return print("\n\nšŸ”§ SQL TO ADD MISSING COLUMNS:") print("=" * 80) print() for table in sorted(missing.keys()): print(f"-- Table: {table}") for col in missing[table]: col_type = col['type'] # Simplified type mapping - you may need to adjust if 'character varying' in col_type: col_type = 'VARCHAR(255)' elif col_type == 'integer': col_type = 'INTEGER' elif 'numeric' in col_type: col_type = 'NUMERIC(10,2)' elif col_type == 'boolean': col_type = 'BOOLEAN DEFAULT false' elif col_type == 'text': col_type = 'TEXT' elif 'timestamp' in col_type: col_type = 'TIMESTAMP' elif col_type == 'date': col_type = 'DATE' elif col_type == 'jsonb': col_type = 'JSONB' print(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {col['column']} {col_type};") print() if __name__ == "__main__": try: local = get_local_schema() prod = get_prod_schema() missing = compare_schemas(local, prod) print_missing(missing) generate_sql(missing) except subprocess.CalledProcessError as e: print(f"āŒ Error: {e}") print(f"Output: {e.output}") except Exception as e: print(f"āŒ Error: {e}")