148 lines
4.7 KiB
Python
Executable File
148 lines
4.7 KiB
Python
Executable File
"""Token service for secure form pre-filling.
|
|
|
|
Generates signed tokens for cross-system communication between
|
|
Customer Portal and WordPress kurs-booking plugin.
|
|
|
|
Sprint 6.6: Extended to include all custom_fields for dynamic sync.
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
from typing import Any
|
|
|
|
from flask import current_app
|
|
|
|
|
|
class TokenService:
|
|
"""Generate and validate signed tokens for cross-system communication."""
|
|
|
|
@staticmethod
|
|
def generate_prefill_token(customer: Any) -> str:
|
|
"""
|
|
Generate signed token for WordPress form pre-fill.
|
|
|
|
The token contains all customer data (core fields + custom_fields)
|
|
signed with the shared WP_API_SECRET. WordPress validates the
|
|
signature and uses the data to pre-fill the booking form.
|
|
|
|
Sprint 6.6: Now includes all custom_fields for complete sync.
|
|
|
|
Args:
|
|
customer: Customer model instance with id, name, email, phone, custom_fields
|
|
|
|
Returns:
|
|
Base64-URL-safe encoded signed token string
|
|
|
|
Raises:
|
|
ValueError: If WP_API_SECRET is not configured
|
|
"""
|
|
secret = current_app.config.get("WP_API_SECRET", "")
|
|
expiry = current_app.config.get("PREFILL_TOKEN_EXPIRY", 300)
|
|
|
|
if not secret:
|
|
raise ValueError("WP_API_SECRET not configured")
|
|
|
|
# Sprint 12: Use display properties - all data from custom_fields
|
|
addr = customer.display_address
|
|
payload = {
|
|
"customer_id": customer.id,
|
|
"name": customer.display_name,
|
|
"email": customer.email or "",
|
|
"phone": customer.display_phone,
|
|
"address_street": addr.get("street", ""),
|
|
"address_city": addr.get("city", ""),
|
|
"address_zip": addr.get("zip", ""),
|
|
"exp": int(time.time()) + expiry,
|
|
}
|
|
|
|
# Add all custom_fields to payload for full data access
|
|
custom_fields = customer.get_custom_fields()
|
|
if custom_fields:
|
|
payload["custom_fields"] = custom_fields
|
|
|
|
# JSON encode with minimal whitespace.
|
|
payload_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
|
|
|
|
# Create HMAC-SHA256 signature.
|
|
signature = hmac.new(
|
|
secret.encode("utf-8"),
|
|
payload_json.encode("utf-8"),
|
|
hashlib.sha256,
|
|
).hexdigest()
|
|
|
|
# Combine payload and signature.
|
|
token_data = f"{payload_json}.{signature}"
|
|
|
|
# Base64 URL-safe encode.
|
|
return base64.urlsafe_b64encode(token_data.encode("utf-8")).decode("utf-8")
|
|
|
|
@staticmethod
|
|
def validate_prefill_token(token: str) -> dict | None:
|
|
"""
|
|
Validate a prefill token and extract customer data.
|
|
|
|
This method is primarily for testing purposes. The actual validation
|
|
happens in WordPress (class-frontend.php).
|
|
|
|
Args:
|
|
token: Base64-URL-safe encoded signed token
|
|
|
|
Returns:
|
|
Dictionary with customer data if valid, None otherwise
|
|
"""
|
|
secret = current_app.config.get("WP_API_SECRET", "")
|
|
|
|
if not secret:
|
|
return None
|
|
|
|
try:
|
|
# Decode base64.
|
|
decoded = base64.urlsafe_b64decode(token.encode("utf-8")).decode("utf-8")
|
|
|
|
# Split payload and signature.
|
|
parts = decoded.split(".", 1)
|
|
if len(parts) != 2:
|
|
return None
|
|
|
|
payload_json, signature = parts
|
|
|
|
# Verify signature.
|
|
expected_sig = hmac.new(
|
|
secret.encode("utf-8"),
|
|
payload_json.encode("utf-8"),
|
|
hashlib.sha256,
|
|
).hexdigest()
|
|
|
|
if not hmac.compare_digest(expected_sig, signature):
|
|
return None
|
|
|
|
# Parse payload.
|
|
payload = json.loads(payload_json)
|
|
|
|
# Check expiration.
|
|
if payload.get("exp", 0) < time.time():
|
|
return None
|
|
|
|
# Sprint 6.6: Return all fields including custom_fields
|
|
result = {
|
|
"customer_id": payload.get("customer_id"),
|
|
"name": payload.get("name", ""),
|
|
"email": payload.get("email", ""),
|
|
"phone": payload.get("phone", ""),
|
|
"address_street": payload.get("address_street", ""),
|
|
"address_city": payload.get("address_city", ""),
|
|
"address_zip": payload.get("address_zip", ""),
|
|
}
|
|
|
|
# Include custom_fields if present
|
|
if "custom_fields" in payload:
|
|
result["custom_fields"] = payload["custom_fields"]
|
|
|
|
return result
|
|
|
|
except (ValueError, json.JSONDecodeError, UnicodeDecodeError):
|
|
return None
|