Files
customer-portal/customer_portal/services/otp.py

110 lines
2.9 KiB
Python
Executable File

"""OTP service."""
import secrets
from datetime import UTC, datetime, timedelta
from customer_portal.models.otp import OTPCode
class OTPService:
"""Generate and verify OTP codes."""
OTP_LENGTH = 6
OTP_LIFETIME_MINUTES = 10
@staticmethod
def generate() -> str:
"""Generate random 6-digit OTP."""
return "".join(
secrets.choice("0123456789") for _ in range(OTPService.OTP_LENGTH)
)
@staticmethod
def create_for_customer(db_session, customer_id: int, purpose: str) -> str:
"""Create OTP for customer.
Args:
db_session: Database session
customer_id: Customer ID
purpose: OTP purpose (login, register, reset)
Returns:
Generated OTP code
"""
# Invalidate any existing unused OTPs for same purpose
db_session.query(OTPCode).filter(
OTPCode.customer_id == customer_id,
OTPCode.purpose == purpose,
OTPCode.used_at.is_(None),
).update({"used_at": datetime.now(UTC)})
code = OTPService.generate()
expires_at = datetime.now(UTC) + timedelta(
minutes=OTPService.OTP_LIFETIME_MINUTES
)
otp = OTPCode(
customer_id=customer_id,
code=code,
purpose=purpose,
expires_at=expires_at,
)
db_session.add(otp)
db_session.commit()
return code
@staticmethod
def verify(db_session, customer_id: int, code: str, purpose: str) -> bool:
"""Verify OTP code.
Args:
db_session: Database session
customer_id: Customer ID
code: OTP code to verify
purpose: OTP purpose
Returns:
True if code is valid, False otherwise
"""
otp = (
db_session.query(OTPCode)
.filter(
OTPCode.customer_id == customer_id,
OTPCode.code == code,
OTPCode.purpose == purpose,
OTPCode.used_at.is_(None),
OTPCode.expires_at > datetime.now(UTC),
)
.first()
)
if otp:
otp.used_at = datetime.now(UTC)
db_session.commit()
return True
return False
@staticmethod
def count_recent_attempts(db_session, customer_id: int, minutes: int = 60) -> int:
"""Count recent OTP attempts for rate limiting.
Args:
db_session: Database session
customer_id: Customer ID
minutes: Time window in minutes
Returns:
Number of OTPs created in time window
"""
since = datetime.now(UTC) - timedelta(minutes=minutes)
return (
db_session.query(OTPCode)
.filter(
OTPCode.customer_id == customer_id,
OTPCode.created_at >= since,
)
.count()
)