503 lines
15 KiB
Python
Executable File
503 lines
15 KiB
Python
Executable File
"""Internal API routes for external integrations."""
|
|
|
|
import logging
|
|
|
|
from flask import Blueprint, current_app, jsonify, request
|
|
|
|
from customer_portal.models import get_db
|
|
from customer_portal.models.customer import Customer
|
|
from customer_portal.services.email import EmailService
|
|
from customer_portal.services.otp import OTPService
|
|
from customer_portal.services.token import TokenService
|
|
from customer_portal.services.wordpress_api import WordPressWebhook
|
|
|
|
bp = Blueprint("api", __name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _is_consent_field(field_name: str, field_type: str) -> bool:
|
|
"""Check if field is a consent/checkbox field that should be excluded.
|
|
|
|
Consent fields contain keywords like "stimme", "akzeptiere", "einwilligung"
|
|
and are typically checkboxes.
|
|
|
|
Args:
|
|
field_name: The field name (lowercase)
|
|
field_type: The field type
|
|
|
|
Returns:
|
|
True if this is a consent field
|
|
"""
|
|
consent_keywords = [
|
|
"stimme",
|
|
"akzeptiere",
|
|
"einwilligung",
|
|
"zustimmung",
|
|
"consent",
|
|
"agree",
|
|
"accept",
|
|
"newsletter",
|
|
"marketing",
|
|
"agb",
|
|
"datenschutz",
|
|
"widerruf",
|
|
"kontaktaufnahme",
|
|
]
|
|
return field_type == "checkbox" or any(
|
|
keyword in field_name for keyword in consent_keywords
|
|
)
|
|
|
|
|
|
def _generate_field_mapping(booking_fields: dict) -> dict:
|
|
"""Generate field mapping from WordPress booking fields.
|
|
|
|
Analyzes the booking fields sent by WordPress to determine which
|
|
form field names correspond to standard customer data fields.
|
|
|
|
Sprint 6.7: Improved email field detection to exclude consent fields.
|
|
|
|
Args:
|
|
booking_fields: Dictionary of booking fields from WordPress
|
|
Format: {"fieldname": {"type": "email", "label": "E-Mail"}, ...}
|
|
|
|
Returns:
|
|
Dictionary with field mappings like:
|
|
{
|
|
"email_field": "e_mail",
|
|
"name_field": "name",
|
|
"phone_field": "telefon",
|
|
"address_field": "adresse",
|
|
"zip_field": "plz",
|
|
"city_field": "ort"
|
|
}
|
|
"""
|
|
# Default fallbacks
|
|
mapping = {
|
|
"email_field": "email",
|
|
"name_field": "name",
|
|
"phone_field": "phone",
|
|
"address_field": "address",
|
|
"zip_field": "zip",
|
|
"city_field": "city",
|
|
}
|
|
|
|
if not booking_fields or not isinstance(booking_fields, dict):
|
|
return mapping
|
|
|
|
# booking_fields structure: {"fieldname": {"type": "email", "label": "E-Mail"}, ...}
|
|
for field_name, field_info in booking_fields.items():
|
|
if not isinstance(field_info, dict):
|
|
continue
|
|
|
|
field_type = field_info.get("type", "")
|
|
name_lower = field_name.lower()
|
|
|
|
# Skip consent/checkbox fields entirely
|
|
if _is_consent_field(name_lower, field_type):
|
|
continue
|
|
|
|
# Email field detection (more specific):
|
|
# 1. type="email" is the most reliable indicator
|
|
# 2. Field name is exactly or starts with email/e_mail/e-mail/mail
|
|
# 3. Must NOT be a long field name (consent fields are usually long)
|
|
is_email_field = (
|
|
field_type == "email"
|
|
or name_lower in ("email", "e_mail", "e-mail", "mail")
|
|
or (
|
|
name_lower.startswith(("email", "e_mail", "e-mail"))
|
|
and len(name_lower) < 15
|
|
)
|
|
)
|
|
if is_email_field:
|
|
mapping["email_field"] = field_name
|
|
|
|
# Name field: name is exactly "name" or contains "name" (excluding nachname, eltern, pferd)
|
|
if field_name == "name" or (
|
|
"name" in name_lower
|
|
and field_type == "text"
|
|
and "nach" not in name_lower
|
|
and "eltern" not in name_lower
|
|
and "pferd" not in name_lower
|
|
):
|
|
mapping["name_field"] = field_name
|
|
|
|
# Phone field: type="tel" or name contains "telefon"/"phone"
|
|
if field_type == "tel" or "telefon" in name_lower or "phone" in name_lower:
|
|
# Prefer "telefon" over "mobil"
|
|
if mapping["phone_field"] == "phone" or "telefon" in name_lower:
|
|
mapping["phone_field"] = field_name
|
|
|
|
# Address field: name contains "adresse"/"address"/"strasse"
|
|
if (
|
|
"adresse" in name_lower
|
|
or "address" in name_lower
|
|
or "strasse" in name_lower
|
|
or "straße" in name_lower
|
|
):
|
|
mapping["address_field"] = field_name
|
|
|
|
# ZIP/PLZ field: name contains "plz"/"zip"/"postleitzahl"
|
|
if "plz" in name_lower or "zip" in name_lower or "postleitzahl" in name_lower:
|
|
mapping["zip_field"] = field_name
|
|
|
|
# City/Ort field: name contains "ort"/"city"/"stadt"
|
|
if "ort" in name_lower or "city" in name_lower or "stadt" in name_lower:
|
|
mapping["city_field"] = field_name
|
|
|
|
return mapping
|
|
|
|
|
|
@bp.route("/webhook/booking", methods=["POST"])
|
|
def webhook_booking():
|
|
"""Handle booking webhook from WordPress.
|
|
|
|
Expected payload:
|
|
{
|
|
"email": "customer@example.com",
|
|
"name": "Customer Name",
|
|
"phone": "optional phone"
|
|
}
|
|
|
|
Required header:
|
|
X-Portal-Secret: <WP_API_SECRET>
|
|
"""
|
|
# Verify secret
|
|
secret = request.headers.get("X-Portal-Secret")
|
|
expected_secret = current_app.config.get("WP_API_SECRET")
|
|
|
|
if not expected_secret:
|
|
logger.error("WP_API_SECRET not configured")
|
|
return jsonify({"error": "Server configuration error"}), 500
|
|
|
|
if secret != expected_secret:
|
|
logger.warning(f"Unauthorized webhook attempt from {request.remote_addr}")
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "No JSON data provided"}), 400
|
|
|
|
db = get_db()
|
|
result = WordPressWebhook.handle_booking_created(db, data)
|
|
|
|
if "error" in result:
|
|
return jsonify(result), 400
|
|
|
|
logger.info(
|
|
f"Webhook processed successfully for customer ID: {result.get('customer_id')}"
|
|
)
|
|
return jsonify(result)
|
|
|
|
|
|
@bp.route("/health", methods=["GET"])
|
|
def health():
|
|
"""Health check endpoint."""
|
|
return jsonify({"status": "ok"})
|
|
|
|
|
|
def _verify_portal_secret() -> bool:
|
|
"""Verify X-Portal-Secret header matches configured secret."""
|
|
secret = request.headers.get("X-Portal-Secret")
|
|
expected_secret = current_app.config.get("WP_API_SECRET")
|
|
|
|
if not expected_secret:
|
|
logger.error("WP_API_SECRET not configured")
|
|
return False
|
|
|
|
return secret == expected_secret
|
|
|
|
|
|
@bp.route("/customer/exists", methods=["POST"])
|
|
def customer_exists():
|
|
"""Check if a customer with given email exists.
|
|
|
|
WordPress calls this when customer enters email in booking form
|
|
to automatically detect existing customers.
|
|
|
|
Expected payload:
|
|
{
|
|
"email": "customer@example.com"
|
|
}
|
|
|
|
Required header:
|
|
X-Portal-Secret: <WP_API_SECRET>
|
|
|
|
Returns:
|
|
{
|
|
"exists": true/false
|
|
}
|
|
"""
|
|
if not _verify_portal_secret():
|
|
logger.warning(f"Unauthorized customer check from {request.remote_addr}")
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
|
|
data = request.get_json()
|
|
if not data or not data.get("email"):
|
|
return jsonify({"error": "Email required"}), 400
|
|
|
|
email = data["email"].lower().strip()
|
|
db = get_db()
|
|
|
|
customer = Customer.get_by_email(db, email)
|
|
|
|
# Don't reveal too much - just exists or not
|
|
return jsonify({"exists": customer is not None})
|
|
|
|
|
|
@bp.route("/prefill/request", methods=["POST"])
|
|
def prefill_request():
|
|
"""Request OTP for form pre-filling from WordPress.
|
|
|
|
WordPress calls this when customer clicks "Ich bin bereits Kunde"
|
|
and enters their email. If the customer exists, we send an OTP.
|
|
|
|
Expected payload:
|
|
{
|
|
"email": "customer@example.com"
|
|
}
|
|
|
|
Required header:
|
|
X-Portal-Secret: <WP_API_SECRET>
|
|
|
|
Returns:
|
|
{
|
|
"success": true,
|
|
"message": "OTP sent if customer exists"
|
|
}
|
|
"""
|
|
if not _verify_portal_secret():
|
|
logger.warning(f"Unauthorized prefill request from {request.remote_addr}")
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
|
|
data = request.get_json()
|
|
if not data or not data.get("email"):
|
|
return jsonify({"error": "Email required"}), 400
|
|
|
|
email = data["email"].lower().strip()
|
|
db = get_db()
|
|
|
|
# Check if customer exists
|
|
customer = Customer.get_by_email(db, email)
|
|
|
|
if customer:
|
|
# Generate OTP and send email
|
|
try:
|
|
code = OTPService.create_for_customer(db, customer.id, purpose="prefill")
|
|
EmailService.send_otp(customer.email, code, purpose="prefill")
|
|
logger.info(f"Prefill OTP sent to {email}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send prefill OTP: {e}")
|
|
# Don't reveal if customer exists or not
|
|
pass
|
|
|
|
# Always return success to prevent email enumeration
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"message": "Falls Sie bei uns registriert sind, erhalten Sie einen Code per E-Mail.",
|
|
}
|
|
)
|
|
|
|
|
|
@bp.route("/prefill/verify", methods=["POST"])
|
|
def prefill_verify():
|
|
"""Verify OTP and return prefill token.
|
|
|
|
WordPress calls this after customer enters the OTP code.
|
|
|
|
Expected payload:
|
|
{
|
|
"email": "customer@example.com",
|
|
"code": "123456"
|
|
}
|
|
|
|
Required header:
|
|
X-Portal-Secret: <WP_API_SECRET>
|
|
|
|
Returns on success:
|
|
{
|
|
"success": true,
|
|
"token": "base64_encoded_token",
|
|
"customer": {
|
|
"name": "Max Mustermann",
|
|
"email": "max@example.com",
|
|
"phone": "+43..."
|
|
}
|
|
}
|
|
|
|
Returns on failure:
|
|
{
|
|
"success": false,
|
|
"error": "Invalid or expired code"
|
|
}
|
|
"""
|
|
if not _verify_portal_secret():
|
|
logger.warning(f"Unauthorized prefill verify from {request.remote_addr}")
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "No data provided"}), 400
|
|
|
|
email = (data.get("email") or "").lower().strip()
|
|
code = (data.get("code") or "").strip()
|
|
|
|
if not email or not code:
|
|
return jsonify({"success": False, "error": "Email and code required"}), 400
|
|
|
|
db = get_db()
|
|
|
|
# Get customer
|
|
customer = Customer.get_by_email(db, email)
|
|
if not customer:
|
|
return jsonify({"success": False, "error": "Ungueltiger Code"}), 400
|
|
|
|
# Verify OTP
|
|
if not OTPService.verify(db, customer.id, code, purpose="prefill"):
|
|
return (
|
|
jsonify({"success": False, "error": "Ungueltiger oder abgelaufener Code"}),
|
|
400,
|
|
)
|
|
|
|
# Generate prefill token
|
|
try:
|
|
token = TokenService.generate_prefill_token(customer)
|
|
except ValueError as e:
|
|
logger.error(f"Token generation failed: {e}")
|
|
return jsonify({"success": False, "error": "Server error"}), 500
|
|
|
|
logger.info(f"Prefill token generated for customer {customer.id}")
|
|
|
|
# Sprint 12: All customer data comes from custom_fields
|
|
# Use display properties for backwards compatibility
|
|
addr = customer.display_address
|
|
customer_data = {
|
|
"name": customer.display_name,
|
|
"email": customer.email,
|
|
"phone": customer.display_phone,
|
|
"address_street": addr.get("street", ""),
|
|
"address_city": addr.get("city", ""),
|
|
"address_zip": addr.get("zip", ""),
|
|
}
|
|
|
|
# Include all custom_fields for full data access
|
|
custom_fields = customer.get_custom_fields()
|
|
if custom_fields:
|
|
customer_data["custom_fields"] = custom_fields
|
|
|
|
# Generate field mapping from booking_fields sent by WordPress
|
|
booking_fields = data.get("booking_fields", {})
|
|
field_mapping = _generate_field_mapping(booking_fields)
|
|
|
|
# Debug logging
|
|
logger.info(f"Customer data: {customer_data}")
|
|
logger.info(f"Booking fields from WP: {booking_fields}")
|
|
logger.info(f"Generated field mapping: {field_mapping}")
|
|
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"token": token,
|
|
"customer": customer_data,
|
|
"field_mapping": field_mapping,
|
|
}
|
|
)
|
|
|
|
|
|
# Sprint 6.6: Schema endpoint for WordPress field sync
|
|
|
|
|
|
@bp.route("/schema", methods=["GET"])
|
|
def get_schema():
|
|
"""Get field schema from WordPress.
|
|
|
|
Proxies the WordPress schema endpoint for frontend use.
|
|
Enables dynamic form generation based on WordPress settings.
|
|
|
|
Query params:
|
|
kurs_id: Optional kurs ID for kurs-specific fields
|
|
|
|
Returns:
|
|
Schema dictionary from WordPress
|
|
"""
|
|
from customer_portal.services.wordpress_api import WordPressAPI
|
|
|
|
kurs_id = request.args.get("kurs_id", type=int)
|
|
|
|
try:
|
|
schema = WordPressAPI.get_schema(kurs_id)
|
|
return jsonify(schema)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching schema: {e}")
|
|
return jsonify({"error": "Failed to fetch schema"}), 500
|
|
|
|
|
|
@bp.route("/customer/fields", methods=["GET", "PATCH"])
|
|
def customer_fields():
|
|
"""Get or update customer custom fields.
|
|
|
|
Requires authentication via session cookie.
|
|
|
|
GET: Returns all custom fields for current customer
|
|
PATCH: Updates custom fields for current customer
|
|
|
|
Returns:
|
|
Customer custom fields dictionary
|
|
"""
|
|
from customer_portal.web.decorators import get_current_customer
|
|
|
|
customer = get_current_customer()
|
|
if not customer:
|
|
return jsonify({"error": "Not authenticated"}), 401
|
|
|
|
db = get_db()
|
|
|
|
if request.method == "GET":
|
|
# Sprint 12: All data comes from custom_fields
|
|
# Return display properties for backwards compatibility
|
|
addr = customer.display_address
|
|
return jsonify(
|
|
{
|
|
"custom_fields": customer.get_custom_fields(),
|
|
"core_fields": {
|
|
"name": customer.display_name,
|
|
"email": customer.email,
|
|
"phone": customer.display_phone,
|
|
"address_street": addr.get("street", ""),
|
|
"address_city": addr.get("city", ""),
|
|
"address_zip": addr.get("zip", ""),
|
|
},
|
|
}
|
|
)
|
|
|
|
# PATCH: Update fields - Sprint 12: All updates go to custom_fields
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({"error": "No data provided"}), 400
|
|
|
|
updated = []
|
|
existing = customer.get_custom_fields()
|
|
|
|
# Update core fields (now stored in custom_fields)
|
|
core_fields = data.get("core_fields", {})
|
|
if core_fields:
|
|
for key in ("phone", "address_street", "address_city", "address_zip"):
|
|
if key in core_fields:
|
|
existing[key] = core_fields[key]
|
|
updated.append(key)
|
|
|
|
# Update custom fields
|
|
custom_fields = data.get("custom_fields", {})
|
|
if custom_fields:
|
|
existing.update(custom_fields)
|
|
updated.extend(list(custom_fields.keys()))
|
|
|
|
# Save all to custom_fields JSON
|
|
if updated:
|
|
customer.set_custom_fields(existing)
|
|
db.commit()
|
|
|
|
logger.info(f"Customer {customer.id} fields updated: {updated}")
|
|
|
|
return jsonify({"success": True, "updated": updated})
|