333 lines
10 KiB
Python
Executable File
333 lines
10 KiB
Python
Executable File
"""Authentication routes."""
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
from functools import wraps
|
|
|
|
from flask import (
|
|
Blueprint,
|
|
current_app,
|
|
flash,
|
|
g,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
session,
|
|
url_for,
|
|
)
|
|
|
|
from customer_portal.models import get_db
|
|
from customer_portal.models.customer import Customer
|
|
from customer_portal.services import AuthService, EmailService, OTPService
|
|
|
|
bp = Blueprint("auth", __name__)
|
|
|
|
|
|
def login_required(f):
|
|
"""Decorator to require authentication."""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
token = session.get("auth_token")
|
|
if not token:
|
|
return redirect(url_for("auth.login"))
|
|
|
|
db = get_db()
|
|
customer = AuthService.get_customer_by_token(db, token)
|
|
if not customer:
|
|
session.clear()
|
|
return redirect(url_for("auth.login"))
|
|
|
|
g.customer = customer
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
@bp.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
"""Login page - enter email."""
|
|
# Already logged in?
|
|
if session.get("auth_token"):
|
|
db = get_db()
|
|
if AuthService.get_customer_by_token(db, session["auth_token"]):
|
|
return redirect(url_for("main.dashboard"))
|
|
|
|
if request.method == "POST":
|
|
email = request.form.get("email", "").strip().lower()
|
|
|
|
if not email or "@" not in email:
|
|
flash("Bitte geben Sie eine gueltige E-Mail-Adresse ein.", "error")
|
|
return render_template("auth/login.html")
|
|
|
|
db = get_db()
|
|
|
|
# Get or create customer
|
|
customer = AuthService.get_or_create_customer(db, email)
|
|
|
|
# Rate limiting: max 5 OTPs per hour
|
|
max_attempts = current_app.config.get("OTP_MAX_ATTEMPTS", 5)
|
|
recent = OTPService.count_recent_attempts(db, customer.id)
|
|
if recent >= max_attempts:
|
|
flash(
|
|
"Zu viele Anfragen. Bitte warten Sie eine Stunde.",
|
|
"error",
|
|
)
|
|
return render_template("auth/login.html")
|
|
|
|
# Generate and send OTP
|
|
code = OTPService.create_for_customer(db, customer.id, "login")
|
|
EmailService.send_otp(email, code, "login")
|
|
|
|
# Store customer ID for verification step
|
|
session["pending_customer_id"] = customer.id
|
|
session["pending_email"] = email
|
|
|
|
return redirect(url_for("auth.verify"))
|
|
|
|
return render_template("auth/login.html")
|
|
|
|
|
|
@bp.route("/verify", methods=["GET", "POST"])
|
|
def verify():
|
|
"""Verify OTP code."""
|
|
customer_id = session.get("pending_customer_id")
|
|
email = session.get("pending_email")
|
|
|
|
if not customer_id:
|
|
return redirect(url_for("auth.login"))
|
|
|
|
if request.method == "POST":
|
|
code = request.form.get("code", "").strip()
|
|
|
|
if not code or len(code) != 6:
|
|
flash("Bitte geben Sie den 6-stelligen Code ein.", "error")
|
|
return render_template("auth/verify.html", email=email)
|
|
|
|
db = get_db()
|
|
|
|
if OTPService.verify(db, customer_id, code, "login"):
|
|
# Create session
|
|
token = AuthService.create_session(
|
|
db,
|
|
customer_id,
|
|
request.remote_addr,
|
|
request.user_agent.string if request.user_agent else "",
|
|
)
|
|
|
|
# Store session token
|
|
session["auth_token"] = token
|
|
session.pop("pending_customer_id", None)
|
|
session.pop("pending_email", None)
|
|
|
|
flash("Erfolgreich eingeloggt!", "success")
|
|
return redirect(url_for("main.dashboard"))
|
|
|
|
flash("Ungueltiger oder abgelaufener Code.", "error")
|
|
|
|
return render_template("auth/verify.html", email=email)
|
|
|
|
|
|
@bp.route("/resend", methods=["POST"])
|
|
def resend():
|
|
"""Resend OTP code."""
|
|
customer_id = session.get("pending_customer_id")
|
|
email = session.get("pending_email")
|
|
|
|
if not customer_id or not email:
|
|
return redirect(url_for("auth.login"))
|
|
|
|
db = get_db()
|
|
|
|
# Rate limiting
|
|
max_attempts = current_app.config.get("OTP_MAX_ATTEMPTS", 5)
|
|
recent = OTPService.count_recent_attempts(db, customer_id)
|
|
if recent >= max_attempts:
|
|
flash("Zu viele Anfragen. Bitte warten Sie eine Stunde.", "error")
|
|
return redirect(url_for("auth.verify"))
|
|
|
|
# Generate new code
|
|
code = OTPService.create_for_customer(db, customer_id, "login")
|
|
EmailService.send_otp(email, code, "login")
|
|
|
|
flash("Ein neuer Code wurde gesendet.", "success")
|
|
return redirect(url_for("auth.verify"))
|
|
|
|
|
|
@bp.route("/logout")
|
|
def logout():
|
|
"""Logout and clear session."""
|
|
token = session.get("auth_token")
|
|
if token:
|
|
db = get_db()
|
|
AuthService.logout(db, token)
|
|
|
|
session.clear()
|
|
flash("Sie wurden erfolgreich ausgeloggt.", "success")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
|
|
# =============================================================================
|
|
# Registration Routes
|
|
# =============================================================================
|
|
|
|
|
|
@bp.route("/register", methods=["GET", "POST"])
|
|
def register():
|
|
"""Registration - Step 1: Enter email."""
|
|
# Already logged in?
|
|
if session.get("auth_token"):
|
|
db = get_db()
|
|
if AuthService.get_customer_by_token(db, session["auth_token"]):
|
|
return redirect(url_for("main.dashboard"))
|
|
|
|
if request.method == "POST":
|
|
email = request.form.get("email", "").strip().lower()
|
|
|
|
if not email or "@" not in email:
|
|
flash("Bitte geben Sie eine gueltige E-Mail-Adresse ein.", "error")
|
|
return render_template("auth/register.html")
|
|
|
|
db = get_db()
|
|
|
|
# Check if customer already exists
|
|
customer = db.query(Customer).filter(Customer.email == email).first()
|
|
|
|
if customer:
|
|
flash(
|
|
"Diese E-Mail ist bereits registriert. Bitte melden Sie sich an.",
|
|
"info",
|
|
)
|
|
return redirect(url_for("auth.login"))
|
|
|
|
# Store email in session for registration flow
|
|
session["register_email"] = email
|
|
|
|
# Generate OTP
|
|
otp_minutes = current_app.config.get("OTP_LIFETIME_MINUTES", 10)
|
|
code = OTPService.generate()
|
|
session["register_otp"] = code
|
|
session["register_otp_expires"] = (
|
|
datetime.now(UTC) + timedelta(minutes=otp_minutes)
|
|
).isoformat()
|
|
|
|
# Send OTP email
|
|
EmailService.send_otp(email, code, "register")
|
|
|
|
flash("Ein Bestaetigungscode wurde an Ihre E-Mail gesendet.", "success")
|
|
return redirect(url_for("auth.register_verify"))
|
|
|
|
return render_template("auth/register.html")
|
|
|
|
|
|
@bp.route("/register/verify", methods=["GET", "POST"])
|
|
def register_verify():
|
|
"""Registration - Step 2: Verify OTP."""
|
|
email = session.get("register_email")
|
|
if not email:
|
|
return redirect(url_for("auth.register"))
|
|
|
|
if request.method == "POST":
|
|
code = request.form.get("code", "").strip()
|
|
stored_code = session.get("register_otp")
|
|
expires_str = session.get("register_otp_expires")
|
|
|
|
if not code or len(code) != 6:
|
|
flash("Bitte geben Sie den 6-stelligen Code ein.", "error")
|
|
return render_template("auth/register_verify.html", email=email)
|
|
|
|
if stored_code and code == stored_code and expires_str:
|
|
# Check if not expired
|
|
expires = datetime.fromisoformat(expires_str)
|
|
if expires > datetime.now(UTC):
|
|
session["register_verified"] = True
|
|
return redirect(url_for("auth.register_profile"))
|
|
|
|
flash("Ungueltiger oder abgelaufener Code.", "error")
|
|
|
|
return render_template("auth/register_verify.html", email=email)
|
|
|
|
|
|
@bp.route("/register/resend", methods=["POST"])
|
|
def register_resend():
|
|
"""Resend registration OTP."""
|
|
email = session.get("register_email")
|
|
if not email:
|
|
return redirect(url_for("auth.register"))
|
|
|
|
# Generate new OTP
|
|
otp_minutes = current_app.config.get("OTP_LIFETIME_MINUTES", 10)
|
|
code = OTPService.generate()
|
|
session["register_otp"] = code
|
|
session["register_otp_expires"] = (
|
|
datetime.now(UTC) + timedelta(minutes=otp_minutes)
|
|
).isoformat()
|
|
|
|
# Send OTP email
|
|
EmailService.send_otp(email, code, "register")
|
|
|
|
flash("Ein neuer Code wurde gesendet.", "success")
|
|
return redirect(url_for("auth.register_verify"))
|
|
|
|
|
|
@bp.route("/register/profile", methods=["GET", "POST"])
|
|
def register_profile():
|
|
"""Registration - Step 3: Enter profile data."""
|
|
email = session.get("register_email")
|
|
verified = session.get("register_verified")
|
|
|
|
if not email or not verified:
|
|
return redirect(url_for("auth.register"))
|
|
|
|
if request.method == "POST":
|
|
name = request.form.get("name", "").strip()
|
|
phone = request.form.get("phone", "").strip()
|
|
|
|
if not name:
|
|
flash("Bitte geben Sie Ihren Namen ein.", "error")
|
|
return render_template("auth/register_profile.html", email=email)
|
|
|
|
db = get_db()
|
|
|
|
# Double-check email doesn't exist (race condition protection)
|
|
existing = db.query(Customer).filter(Customer.email == email).first()
|
|
if existing:
|
|
session.pop("register_email", None)
|
|
session.pop("register_otp", None)
|
|
session.pop("register_otp_expires", None)
|
|
session.pop("register_verified", None)
|
|
flash("Diese E-Mail ist bereits registriert.", "info")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
# Create customer with defaults from settings
|
|
customer = Customer.create_with_defaults(
|
|
db,
|
|
email=email,
|
|
name=name,
|
|
phone=phone if phone else None,
|
|
)
|
|
db.add(customer)
|
|
db.commit()
|
|
|
|
# Clear registration session data
|
|
session.pop("register_email", None)
|
|
session.pop("register_otp", None)
|
|
session.pop("register_otp_expires", None)
|
|
session.pop("register_verified", None)
|
|
|
|
# Auto-login after registration
|
|
token = AuthService.create_session(
|
|
db,
|
|
customer.id,
|
|
request.remote_addr,
|
|
request.user_agent.string if request.user_agent else "",
|
|
)
|
|
session["auth_token"] = token
|
|
|
|
# Send welcome email
|
|
EmailService.send_welcome(email, name)
|
|
|
|
flash("Registrierung erfolgreich! Willkommen im Kundenportal.", "success")
|
|
return redirect(url_for("main.dashboard"))
|
|
|
|
return render_template("auth/register_profile.html", email=email)
|