Files

333 lines
10 KiB
Python
Executable File

"""Authentication routes."""
from datetime import UTC, datetime, timedelta
from functools import wraps
from flask import (
Blueprint,
current_app,
flash,
g,
redirect,
render_template,
request,
session,
url_for,
)
from customer_portal.models import get_db
from customer_portal.models.customer import Customer
from customer_portal.services import AuthService, EmailService, OTPService
bp = Blueprint("auth", __name__)
def login_required(f):
"""Decorator to require authentication."""
@wraps(f)
def decorated_function(*args, **kwargs):
token = session.get("auth_token")
if not token:
return redirect(url_for("auth.login"))
db = get_db()
customer = AuthService.get_customer_by_token(db, token)
if not customer:
session.clear()
return redirect(url_for("auth.login"))
g.customer = customer
return f(*args, **kwargs)
return decorated_function
@bp.route("/login", methods=["GET", "POST"])
def login():
"""Login page - enter email."""
# Already logged in?
if session.get("auth_token"):
db = get_db()
if AuthService.get_customer_by_token(db, session["auth_token"]):
return redirect(url_for("main.dashboard"))
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
if not email or "@" not in email:
flash("Bitte geben Sie eine gueltige E-Mail-Adresse ein.", "error")
return render_template("auth/login.html")
db = get_db()
# Get or create customer
customer = AuthService.get_or_create_customer(db, email)
# Rate limiting: max 5 OTPs per hour
max_attempts = current_app.config.get("OTP_MAX_ATTEMPTS", 5)
recent = OTPService.count_recent_attempts(db, customer.id)
if recent >= max_attempts:
flash(
"Zu viele Anfragen. Bitte warten Sie eine Stunde.",
"error",
)
return render_template("auth/login.html")
# Generate and send OTP
code = OTPService.create_for_customer(db, customer.id, "login")
EmailService.send_otp(email, code, "login")
# Store customer ID for verification step
session["pending_customer_id"] = customer.id
session["pending_email"] = email
return redirect(url_for("auth.verify"))
return render_template("auth/login.html")
@bp.route("/verify", methods=["GET", "POST"])
def verify():
"""Verify OTP code."""
customer_id = session.get("pending_customer_id")
email = session.get("pending_email")
if not customer_id:
return redirect(url_for("auth.login"))
if request.method == "POST":
code = request.form.get("code", "").strip()
if not code or len(code) != 6:
flash("Bitte geben Sie den 6-stelligen Code ein.", "error")
return render_template("auth/verify.html", email=email)
db = get_db()
if OTPService.verify(db, customer_id, code, "login"):
# Create session
token = AuthService.create_session(
db,
customer_id,
request.remote_addr,
request.user_agent.string if request.user_agent else "",
)
# Store session token
session["auth_token"] = token
session.pop("pending_customer_id", None)
session.pop("pending_email", None)
flash("Erfolgreich eingeloggt!", "success")
return redirect(url_for("main.dashboard"))
flash("Ungueltiger oder abgelaufener Code.", "error")
return render_template("auth/verify.html", email=email)
@bp.route("/resend", methods=["POST"])
def resend():
"""Resend OTP code."""
customer_id = session.get("pending_customer_id")
email = session.get("pending_email")
if not customer_id or not email:
return redirect(url_for("auth.login"))
db = get_db()
# Rate limiting
max_attempts = current_app.config.get("OTP_MAX_ATTEMPTS", 5)
recent = OTPService.count_recent_attempts(db, customer_id)
if recent >= max_attempts:
flash("Zu viele Anfragen. Bitte warten Sie eine Stunde.", "error")
return redirect(url_for("auth.verify"))
# Generate new code
code = OTPService.create_for_customer(db, customer_id, "login")
EmailService.send_otp(email, code, "login")
flash("Ein neuer Code wurde gesendet.", "success")
return redirect(url_for("auth.verify"))
@bp.route("/logout")
def logout():
"""Logout and clear session."""
token = session.get("auth_token")
if token:
db = get_db()
AuthService.logout(db, token)
session.clear()
flash("Sie wurden erfolgreich ausgeloggt.", "success")
return redirect(url_for("auth.login"))
# =============================================================================
# Registration Routes
# =============================================================================
@bp.route("/register", methods=["GET", "POST"])
def register():
"""Registration - Step 1: Enter email."""
# Already logged in?
if session.get("auth_token"):
db = get_db()
if AuthService.get_customer_by_token(db, session["auth_token"]):
return redirect(url_for("main.dashboard"))
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
if not email or "@" not in email:
flash("Bitte geben Sie eine gueltige E-Mail-Adresse ein.", "error")
return render_template("auth/register.html")
db = get_db()
# Check if customer already exists
customer = db.query(Customer).filter(Customer.email == email).first()
if customer:
flash(
"Diese E-Mail ist bereits registriert. Bitte melden Sie sich an.",
"info",
)
return redirect(url_for("auth.login"))
# Store email in session for registration flow
session["register_email"] = email
# Generate OTP
otp_minutes = current_app.config.get("OTP_LIFETIME_MINUTES", 10)
code = OTPService.generate()
session["register_otp"] = code
session["register_otp_expires"] = (
datetime.now(UTC) + timedelta(minutes=otp_minutes)
).isoformat()
# Send OTP email
EmailService.send_otp(email, code, "register")
flash("Ein Bestaetigungscode wurde an Ihre E-Mail gesendet.", "success")
return redirect(url_for("auth.register_verify"))
return render_template("auth/register.html")
@bp.route("/register/verify", methods=["GET", "POST"])
def register_verify():
"""Registration - Step 2: Verify OTP."""
email = session.get("register_email")
if not email:
return redirect(url_for("auth.register"))
if request.method == "POST":
code = request.form.get("code", "").strip()
stored_code = session.get("register_otp")
expires_str = session.get("register_otp_expires")
if not code or len(code) != 6:
flash("Bitte geben Sie den 6-stelligen Code ein.", "error")
return render_template("auth/register_verify.html", email=email)
if stored_code and code == stored_code and expires_str:
# Check if not expired
expires = datetime.fromisoformat(expires_str)
if expires > datetime.now(UTC):
session["register_verified"] = True
return redirect(url_for("auth.register_profile"))
flash("Ungueltiger oder abgelaufener Code.", "error")
return render_template("auth/register_verify.html", email=email)
@bp.route("/register/resend", methods=["POST"])
def register_resend():
"""Resend registration OTP."""
email = session.get("register_email")
if not email:
return redirect(url_for("auth.register"))
# Generate new OTP
otp_minutes = current_app.config.get("OTP_LIFETIME_MINUTES", 10)
code = OTPService.generate()
session["register_otp"] = code
session["register_otp_expires"] = (
datetime.now(UTC) + timedelta(minutes=otp_minutes)
).isoformat()
# Send OTP email
EmailService.send_otp(email, code, "register")
flash("Ein neuer Code wurde gesendet.", "success")
return redirect(url_for("auth.register_verify"))
@bp.route("/register/profile", methods=["GET", "POST"])
def register_profile():
"""Registration - Step 3: Enter profile data."""
email = session.get("register_email")
verified = session.get("register_verified")
if not email or not verified:
return redirect(url_for("auth.register"))
if request.method == "POST":
name = request.form.get("name", "").strip()
phone = request.form.get("phone", "").strip()
if not name:
flash("Bitte geben Sie Ihren Namen ein.", "error")
return render_template("auth/register_profile.html", email=email)
db = get_db()
# Double-check email doesn't exist (race condition protection)
existing = db.query(Customer).filter(Customer.email == email).first()
if existing:
session.pop("register_email", None)
session.pop("register_otp", None)
session.pop("register_otp_expires", None)
session.pop("register_verified", None)
flash("Diese E-Mail ist bereits registriert.", "info")
return redirect(url_for("auth.login"))
# Create customer with defaults from settings
customer = Customer.create_with_defaults(
db,
email=email,
name=name,
phone=phone if phone else None,
)
db.add(customer)
db.commit()
# Clear registration session data
session.pop("register_email", None)
session.pop("register_otp", None)
session.pop("register_otp_expires", None)
session.pop("register_verified", None)
# Auto-login after registration
token = AuthService.create_session(
db,
customer.id,
request.remote_addr,
request.user_agent.string if request.user_agent else "",
)
session["auth_token"] = token
# Send welcome email
EmailService.send_welcome(email, name)
flash("Registrierung erfolgreich! Willkommen im Kundenportal.", "success")
return redirect(url_for("main.dashboard"))
return render_template("auth/register_profile.html", email=email)