"""Consolidate fixed customer fields into custom_fields JSON. Sprint 12 Phase 1: Move name, phone, address fields into custom_fields. This eliminates redundancy between fixed columns and flexible JSON storage. Run with: docker exec customer_portal python -m customer_portal.migrations.004_consolidate_customer_fields Rollback with: docker exec customer_portal python -m customer_portal.migrations.004_consolidate_customer_fields --rollback """ import json import os import sys from sqlalchemy import create_engine, text def get_engine(): """Get database engine.""" database_url = os.environ.get( "DATABASE_URL", "postgresql://portal:portal@localhost:5432/customer_portal" ) return create_engine(database_url) def run_migration(): """Move fixed field values into custom_fields JSON.""" engine = get_engine() print("Sprint 12 Phase 1: Consolidating customer fields...") print("=" * 60) with engine.connect() as conn: # Get all customers result = conn.execute( text( """ SELECT id, email, name, phone, address_street, address_city, address_zip, custom_fields FROM customers ORDER BY id """ ) ) customers = result.fetchall() migrated = 0 skipped = 0 for row in customers: customer_id = row[0] email = row[1] name = row[2] phone = row[3] address_street = row[4] address_city = row[5] address_zip = row[6] custom_fields_raw = row[7] # Parse existing custom_fields try: custom_fields = ( json.loads(custom_fields_raw) if custom_fields_raw else {} ) except (json.JSONDecodeError, TypeError): custom_fields = {} # Track changes changes = [] # Migrate name (only if not already in custom_fields) if name and "name" not in custom_fields: custom_fields["name"] = name changes.append(f"name={name}") # Migrate phone (only if not already in custom_fields) if phone and "phone" not in custom_fields: custom_fields["phone"] = phone changes.append(f"phone={phone}") # Migrate address_street (only if not already in custom_fields) if address_street and "address_street" not in custom_fields: custom_fields["address_street"] = address_street changes.append(f"address_street={address_street[:30]}...") # Migrate address_city (only if not already in custom_fields) if address_city and "address_city" not in custom_fields: custom_fields["address_city"] = address_city changes.append(f"address_city={address_city[:30]}...") # Migrate address_zip (only if not already in custom_fields) if address_zip and "address_zip" not in custom_fields: custom_fields["address_zip"] = address_zip changes.append(f"address_zip={address_zip}") if changes: # Update customer new_custom_fields = json.dumps(custom_fields, ensure_ascii=False) conn.execute( text( """ UPDATE customers SET custom_fields = :custom_fields WHERE id = :id """ ), {"custom_fields": new_custom_fields, "id": customer_id}, ) migrated += 1 print(f" [OK] Customer {customer_id} ({email}): {', '.join(changes)}") else: skipped += 1 conn.commit() print("=" * 60) print( f"Migration completed: {migrated} migrated, {skipped} skipped (already consolidated)" ) def run_rollback(): """Restore fixed fields from custom_fields (for rollback).""" engine = get_engine() print("Rolling back Sprint 12 Phase 1...") print("=" * 60) with engine.connect() as conn: # Get all customers result = conn.execute( text( """ SELECT id, email, name, phone, address_street, address_city, address_zip, custom_fields FROM customers ORDER BY id """ ) ) customers = result.fetchall() rolled_back = 0 for row in customers: customer_id = row[0] email = row[1] current_name = row[2] current_phone = row[3] current_street = row[4] current_city = row[5] current_zip = row[6] custom_fields_raw = row[7] # Parse custom_fields try: custom_fields = ( json.loads(custom_fields_raw) if custom_fields_raw else {} ) except (json.JSONDecodeError, TypeError): continue # Check if we have migrated data in custom_fields updates = {} # Restore name if it differs if "name" in custom_fields and custom_fields["name"] != current_name: updates["name"] = custom_fields["name"] # Restore phone if it differs if "phone" in custom_fields and custom_fields["phone"] != current_phone: updates["phone"] = custom_fields["phone"] # Restore address fields if ( "address_street" in custom_fields and custom_fields["address_street"] != current_street ): updates["address_street"] = custom_fields["address_street"] if ( "address_city" in custom_fields and custom_fields["address_city"] != current_city ): updates["address_city"] = custom_fields["address_city"] if ( "address_zip" in custom_fields and custom_fields["address_zip"] != current_zip ): updates["address_zip"] = custom_fields["address_zip"] if updates: # Build update query set_clauses = ", ".join([f"{k} = :{k}" for k in updates]) updates["id"] = customer_id conn.execute( text(f"UPDATE customers SET {set_clauses} WHERE id = :id"), updates ) rolled_back += 1 print( f" [OK] Customer {customer_id} ({email}): restored {list(updates.keys())}" ) conn.commit() print("=" * 60) print(f"Rollback completed: {rolled_back} customers restored") def verify_migration(): """Verify migration by checking custom_fields content.""" engine = get_engine() print("\nVerifying migration...") print("=" * 60) with engine.connect() as conn: result = conn.execute( text( """ SELECT id, email, custom_fields FROM customers WHERE custom_fields IS NOT NULL AND custom_fields != '{}' ORDER BY id LIMIT 10 """ ) ) for row in result: customer_id = row[0] email = row[1] custom_fields_raw = row[2] try: custom_fields = json.loads(custom_fields_raw) has_name = "name" in custom_fields has_phone = "phone" in custom_fields has_address = "address_street" in custom_fields print(f" Customer {customer_id} ({email}):") print(f" name: {'OK' if has_name else 'MISSING'}") print(f" phone: {'OK' if has_phone else 'MISSING'}") print(f" address: {'OK' if has_address else 'MISSING'}") except json.JSONDecodeError: print(f" Customer {customer_id} ({email}): INVALID JSON") print("=" * 60) if __name__ == "__main__": if "--rollback" in sys.argv: run_rollback() elif "--verify" in sys.argv: verify_migration() else: run_migration() verify_migration()