Files
gatehouse-api/gatehouse_app/services/totp_service.py
T
nexgen_mirrors 41bbdb4bef feat(email): add provider abstraction and HTML templates
Add pluggable email provider system supporting SMTP, Mailgun, and SendGrid
with factory pattern for runtime provider selection. Includes branded HTML
email templates for verification, password reset, MFA notifications, and
organization invites.

Also rebrands all email content from Gatehouse to Secuird, adds email
provider configuration options, and fixes duplicate log handlers in
development mode.
2026-04-04 16:55:00 +10:30

265 lines
9.9 KiB
Python

"""TOTP (Time-based One-Time Password) service."""
import base64
import io
import logging
import secrets
from datetime import datetime, timezone
from typing import Optional, Tuple
import pyotp
from gatehouse_app.extensions import bcrypt
logger = logging.getLogger(__name__)
# TOTP codes are valid for at most (2*window + 1) * 30s steps.
# With window=1 that's 3 steps = 90 seconds. We use a slightly
# generous TTL of 95 seconds to account for clock skew at boundaries.
_TOTP_USED_CODE_TTL = 95
class TOTPService:
"""Service for TOTP operations."""
# ------------------------------------------------------------------
# Replay-attack prevention helpers
# ------------------------------------------------------------------
@staticmethod
def _used_key(user_id: str, code: str) -> str:
return f"totp:used:{user_id}:{code}"
@staticmethod
def is_code_already_used(user_id: str, code: str) -> bool:
"""Return True if *code* has already been accepted for *user_id*
within the current validity window (prevents replay attacks)."""
try:
from gatehouse_app.extensions import redis_client
if redis_client is None:
return False
return redis_client.exists(TOTPService._used_key(user_id, code)) == 1
except Exception:
logger.warning("Redis unavailable for TOTP replay check; allowing code")
return False
@staticmethod
def mark_code_used(user_id: str, code: str) -> None:
"""Record *code* as consumed for *user_id* so it cannot be reused."""
try:
from gatehouse_app.extensions import redis_client
if redis_client is None:
return
redis_client.setex(
TOTPService._used_key(user_id, code),
_TOTP_USED_CODE_TTL,
"1",
)
except Exception:
logger.warning("Redis unavailable; TOTP used-code not recorded")
@staticmethod
def generate_secret() -> str:
"""
Generate a new TOTP secret.
Returns:
Base32 encoded secret (32 characters)
Note:
The secret is generated using cryptographically secure random bytes
and encoded in base32 format for compatibility with authenticator apps.
"""
# Generate 20 random bytes (160 bits) and encode as base32
random_bytes = secrets.token_bytes(20)
secret = base64.b32encode(random_bytes).decode("utf-8")
logger.debug(f"Generated new TOTP secret: {secret[:8]}...")
return secret
@staticmethod
def generate_provisioning_uri(user_email: str, secret: str, issuer: str = "Secuird") -> str:
"""
Generate provisioning URI for QR code.
Args:
user_email: User's email address
secret: TOTP secret (base32 encoded)
issuer: Issuer name (default: "Secuird")
Returns:
otpauth:// URI for QR code generation
Example:
>>> uri = TOTPService.generate_provisioning_uri("user@example.com", "JBSWY3DPEHPK3PXP")
>>> print(uri)
otpauth://totp/Secuird:user@example.com?secret=JBSWY3DPEHPK3PXP&issuer=Secuird
"""
totp = pyotp.TOTP(secret)
uri = totp.provisioning_uri(name=user_email, issuer_name=issuer)
logger.debug(f"Generated provisioning URI for user: {user_email}")
return uri
@staticmethod
def verify_code(secret: str, code: str, window: int = 1, client_utc_timestamp: Optional[int] = None) -> bool:
"""
Verify a TOTP code against the secret.
Args:
secret: TOTP secret (base32 encoded)
code: 6-digit TOTP code to verify
window: Time window for code validation (default: 1, allows codes from previous/next time steps)
client_utc_timestamp: Optional client UTC timestamp in seconds since epoch.
If provided, uses client's timestamp instead of server time to handle
timezone mismatches between client and server.
Returns:
True if code is valid, False otherwise
Note:
The window parameter allows for clock skew between the server
and the authenticator app. A window of 1 allows codes from
the previous, current, and next 30-second intervals.
IMPORTANT: Always uses UTC time for verification to ensure
consistency across all timezones.
"""
totp = pyotp.TOTP(secret)
# Use timezone-aware UTC datetime for verification
# IMPORTANT: We must pass a datetime object, NOT a Unix timestamp
# pyotp's internal datetime.utcfromtimestamp() is deprecated and can be
# affected by local timezone settings, causing the 10.5 hour skew issue
if client_utc_timestamp:
# Use client's UTC timestamp to handle timezone mismatches
utc_now = datetime.fromtimestamp(client_utc_timestamp, tz=timezone.utc)
logger.debug(f"[TOTP] Using client UTC timestamp: {client_utc_timestamp}")
else:
# Fallback to server time
utc_now = datetime.now(timezone.utc)
# DEBUG: Log detailed timezone information
logger.debug(f"[TOTP DEBUG] UTC now: {utc_now}")
logger.debug(f"[TOTP DEBUG] UTC now isoformat: {utc_now.isoformat()}")
logger.debug(f"[TOTP DEBUG] UTC timestamp: {utc_now.timestamp()}")
logger.debug(f"[TOTP DEBUG] UTC now tzinfo: {utc_now.tzinfo}")
# Generate what the TOTP code should be at this moment using UTC datetime
expected_code = totp.at(utc_now)
logger.debug(f"[TOTP DEBUG] Expected TOTP code at UTC: {expected_code}")
# Verify with the provided code using UTC datetime object
# Passing a datetime object avoids pyotp's utcfromtimestamp() issues
is_valid = totp.verify(code, valid_window=window, for_time=utc_now)
logger.debug(f"[TOTP DEBUG] TOTP code verification: valid={is_valid}, window={window}")
logger.debug(f"[TOTP DEBUG] Provided code: {code}, Expected code: {expected_code}")
return is_valid
@staticmethod
def generate_backup_codes(count: int = 10) -> Tuple[list[str], list[str]]:
"""
Generate backup codes for TOTP recovery.
Args:
count: Number of backup codes to generate (default: 10)
Returns:
Tuple of (plain_codes, hashed_codes)
- plain_codes: List of plain text backup codes (for display to user)
- hashed_codes: List of bcrypt hashed backup codes (for storage)
Note:
Backup codes are 16-character alphanumeric codes that can be used
to recover access if the TOTP device is lost. Each code can only
be used once.
"""
plain_codes = []
hashed_codes = []
for _ in range(count):
# Generate a 16-character alphanumeric code
code = secrets.token_hex(8).upper()
plain_codes.append(code)
# Hash the code using bcrypt
hashed_code = bcrypt.generate_password_hash(code).decode("utf-8")
hashed_codes.append(hashed_code)
logger.debug(f"Generated {count} backup codes")
return plain_codes, hashed_codes
@staticmethod
def verify_backup_code(hashed_codes: list[str], code: str) -> Tuple[bool, list[str]]:
"""
Verify and consume a backup code.
Args:
hashed_codes: List of bcrypt hashed backup codes
code: Plain text backup code to verify
Returns:
Tuple of (is_valid, remaining_codes)
- is_valid: True if code was valid and consumed, False otherwise
- remaining_codes: List of remaining hashed codes (with consumed code removed)
Note:
Once a backup code is used, it is removed from the list and cannot
be used again. This ensures each code is single-use.
"""
remaining_codes = []
matched = False
for hashed_code in hashed_codes:
if not matched and bcrypt.check_password_hash(hashed_code, code):
# Code found and valid - mark as matched but don't add to remaining codes
matched = True
else:
# Code doesn't match - keep it in remaining codes
remaining_codes.append(hashed_code)
if matched:
return True, remaining_codes
else:
return False, hashed_codes
@staticmethod
def generate_qr_code_data_uri(provisioning_uri: str) -> str:
"""
Generate QR code as data URI for frontend display.
Args:
provisioning_uri: otpauth:// URI to encode in QR code
Returns:
Base64 encoded PNG image as data URI (data:image/png;base64,...)
Note:
If the qrcode library is not installed, returns a placeholder message.
Install with: pip install qrcode[pil]
"""
try:
import qrcode
# Create QR code
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(provisioning_uri)
qr.make(fit=True)
# Generate image
img = qr.make_image(fill_color="black", back_color="white")
# Convert to base64
buffer = io.BytesIO()
img.save(buffer, format="PNG")
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
data_uri = f"data:image/png;base64,{img_base64}"
logger.debug("Generated QR code data URI")
return data_uri
except ImportError:
logger.warning("qrcode library not installed, returning placeholder")
return "QR code generation requires the qrcode library. Install with: pip install qrcode[pil]"