129 lines
3.3 KiB
Python
Executable File
129 lines
3.3 KiB
Python
Executable File
"""Authentication service."""
|
|
|
|
import secrets
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
from flask import current_app
|
|
|
|
from customer_portal.models.customer import Customer
|
|
from customer_portal.models.session import Session
|
|
|
|
|
|
class AuthService:
|
|
"""Handle authentication."""
|
|
|
|
@staticmethod
|
|
def get_or_create_customer(db_session, email: str) -> Customer:
|
|
"""Get existing customer or create new one.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
email: Customer email
|
|
|
|
Returns:
|
|
Customer instance
|
|
"""
|
|
customer = db_session.query(Customer).filter(Customer.email == email).first()
|
|
|
|
if not customer:
|
|
customer = Customer.create_with_defaults(
|
|
db_session,
|
|
email=email,
|
|
name=email.split("@")[0], # Temporary name from email
|
|
)
|
|
db_session.add(customer)
|
|
db_session.commit()
|
|
|
|
return customer
|
|
|
|
@staticmethod
|
|
def create_session(
|
|
db_session, customer_id: int, ip_address: str, user_agent: str
|
|
) -> str:
|
|
"""Create login session.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
customer_id: Customer ID
|
|
ip_address: Client IP address
|
|
user_agent: Browser user agent
|
|
|
|
Returns:
|
|
Session token
|
|
"""
|
|
token = secrets.token_urlsafe(32)
|
|
hours = current_app.config.get("SESSION_LIFETIME_HOURS", 24)
|
|
expires_at = datetime.now(UTC) + timedelta(hours=hours)
|
|
|
|
session = Session(
|
|
customer_id=customer_id,
|
|
token=token,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
expires_at=expires_at,
|
|
)
|
|
db_session.add(session)
|
|
|
|
# Update last login
|
|
customer = db_session.query(Customer).get(customer_id)
|
|
if customer:
|
|
customer.last_login_at = datetime.now(UTC)
|
|
|
|
db_session.commit()
|
|
|
|
return token
|
|
|
|
@staticmethod
|
|
def get_customer_by_token(db_session, token: str) -> Customer | None:
|
|
"""Get customer from session token.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
token: Session token
|
|
|
|
Returns:
|
|
Customer if valid session, None otherwise
|
|
"""
|
|
session = (
|
|
db_session.query(Session)
|
|
.filter(
|
|
Session.token == token,
|
|
Session.expires_at > datetime.now(UTC),
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if session:
|
|
return db_session.query(Customer).get(session.customer_id)
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def logout(db_session, token: str) -> None:
|
|
"""Delete session.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
token: Session token
|
|
"""
|
|
db_session.query(Session).filter(Session.token == token).delete()
|
|
db_session.commit()
|
|
|
|
@staticmethod
|
|
def cleanup_expired_sessions(db_session) -> int:
|
|
"""Remove expired sessions.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
|
|
Returns:
|
|
Number of deleted sessions
|
|
"""
|
|
result = (
|
|
db_session.query(Session)
|
|
.filter(Session.expires_at < datetime.now(UTC))
|
|
.delete()
|
|
)
|
|
db_session.commit()
|
|
return result
|