110 lines
2.9 KiB
Python
Executable File
110 lines
2.9 KiB
Python
Executable File
"""OTP service."""
|
|
|
|
import secrets
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
from customer_portal.models.otp import OTPCode
|
|
|
|
|
|
class OTPService:
|
|
"""Generate and verify OTP codes."""
|
|
|
|
OTP_LENGTH = 6
|
|
OTP_LIFETIME_MINUTES = 10
|
|
|
|
@staticmethod
|
|
def generate() -> str:
|
|
"""Generate random 6-digit OTP."""
|
|
return "".join(
|
|
secrets.choice("0123456789") for _ in range(OTPService.OTP_LENGTH)
|
|
)
|
|
|
|
@staticmethod
|
|
def create_for_customer(db_session, customer_id: int, purpose: str) -> str:
|
|
"""Create OTP for customer.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
customer_id: Customer ID
|
|
purpose: OTP purpose (login, register, reset)
|
|
|
|
Returns:
|
|
Generated OTP code
|
|
"""
|
|
# Invalidate any existing unused OTPs for same purpose
|
|
db_session.query(OTPCode).filter(
|
|
OTPCode.customer_id == customer_id,
|
|
OTPCode.purpose == purpose,
|
|
OTPCode.used_at.is_(None),
|
|
).update({"used_at": datetime.now(UTC)})
|
|
|
|
code = OTPService.generate()
|
|
expires_at = datetime.now(UTC) + timedelta(
|
|
minutes=OTPService.OTP_LIFETIME_MINUTES
|
|
)
|
|
|
|
otp = OTPCode(
|
|
customer_id=customer_id,
|
|
code=code,
|
|
purpose=purpose,
|
|
expires_at=expires_at,
|
|
)
|
|
db_session.add(otp)
|
|
db_session.commit()
|
|
|
|
return code
|
|
|
|
@staticmethod
|
|
def verify(db_session, customer_id: int, code: str, purpose: str) -> bool:
|
|
"""Verify OTP code.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
customer_id: Customer ID
|
|
code: OTP code to verify
|
|
purpose: OTP purpose
|
|
|
|
Returns:
|
|
True if code is valid, False otherwise
|
|
"""
|
|
otp = (
|
|
db_session.query(OTPCode)
|
|
.filter(
|
|
OTPCode.customer_id == customer_id,
|
|
OTPCode.code == code,
|
|
OTPCode.purpose == purpose,
|
|
OTPCode.used_at.is_(None),
|
|
OTPCode.expires_at > datetime.now(UTC),
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if otp:
|
|
otp.used_at = datetime.now(UTC)
|
|
db_session.commit()
|
|
return True
|
|
|
|
return False
|
|
|
|
@staticmethod
|
|
def count_recent_attempts(db_session, customer_id: int, minutes: int = 60) -> int:
|
|
"""Count recent OTP attempts for rate limiting.
|
|
|
|
Args:
|
|
db_session: Database session
|
|
customer_id: Customer ID
|
|
minutes: Time window in minutes
|
|
|
|
Returns:
|
|
Number of OTPs created in time window
|
|
"""
|
|
since = datetime.now(UTC) - timedelta(minutes=minutes)
|
|
return (
|
|
db_session.query(OTPCode)
|
|
.filter(
|
|
OTPCode.customer_id == customer_id,
|
|
OTPCode.created_at >= since,
|
|
)
|
|
.count()
|
|
)
|