feat(auth): implement TOTP two-factor authentication with enrollment and verification

Adds TOTP (Time-based One-Time Password) two-factor authentication support including:
- New TOTP service with secret generation, QR code provisioning, and code verification
- New auth endpoints for enrollment, verification, status, and backup code management
- New TOTP authentication method type and user methods for TOTP management
- Backup codes generation and verification for account recovery
- Updated OIDC endpoints with timezone-aware datetime handling and RFC-compliant responses
- Added "roles" scope support for OIDC userinfo and ID tokens
- New pyotp dependency for TOTP operations
- Comprehensive unit tests for TOTP service
This commit is contained in:
2026-01-14 18:06:17 +10:30
parent 977abf66df
commit cfd79190ee
26 changed files with 2176 additions and 263 deletions
+313
View File
@@ -11,6 +11,7 @@ from app.utils.constants import AuthMethodType, SessionStatus, UserStatus, Audit
from app.exceptions.auth_exceptions import InvalidCredentialsError, AccountSuspendedError, AccountInactiveError
from app.exceptions.validation_exceptions import EmailAlreadyExistsError
from app.services.audit_service import AuditService
from app.services.totp_service import TOTPService
logger = logging.getLogger(__name__)
@@ -234,3 +235,315 @@ class AuthService:
resource_id=session.id,
description=f"Session revoked: {reason or 'User logout'}",
)
@staticmethod
def enroll_totp(user: User) -> dict:
"""
Initiate TOTP enrollment for a user.
Args:
user: User instance
Returns:
Dictionary containing:
- secret: TOTP secret (base32 encoded)
- provisioning_uri: otpauth:// URI for QR code
- qr_code: Base64 encoded QR code as data URI
- backup_codes: List of plain text backup codes
Raises:
ConflictError: If user already has TOTP enabled
"""
from app.exceptions.validation_exceptions import ConflictError
# Check if user already has TOTP enabled
if user.has_totp_enabled():
raise ConflictError("TOTP is already enabled for this account")
# Generate TOTP secret
secret = TOTPService.generate_secret()
# Generate provisioning URI
provisioning_uri = TOTPService.generate_provisioning_uri(
user_email=user.email,
secret=secret,
issuer="Gatehouse",
)
# Generate QR code data URI
qr_code = TOTPService.generate_qr_code_data_uri(provisioning_uri)
# Generate backup codes
backup_codes, hashed_backup_codes = TOTPService.generate_backup_codes()
# Create unverified TOTP authentication method
auth_method = AuthenticationMethod(
user_id=user.id,
method_type=AuthMethodType.TOTP,
verified=False,
is_primary=False,
)
auth_method.save()
# Store TOTP data in provider_data (since totp_secret field is commented out)
auth_method.provider_data = {
"secret": secret,
"backup_codes": hashed_backup_codes,
}
db.session.commit()
# Log TOTP enrollment initiation
AuditService.log_action(
action=AuditAction.TOTP_ENROLL_INITIATED,
user_id=user.id,
resource_type="authentication_method",
resource_id=auth_method.id,
description="TOTP enrollment initiated",
)
return {
"secret": secret,
"provisioning_uri": provisioning_uri,
"qr_code": qr_code,
"backup_codes": backup_codes,
}
@staticmethod
def verify_totp_enrollment(user: User, code: str) -> bool:
"""
Complete TOTP enrollment by verifying the first TOTP code.
Args:
user: User instance
code: 6-digit TOTP code from authenticator app
Returns:
True if verification successful
Raises:
InvalidCredentialsError: If code is invalid or TOTP method not found
"""
# Get user's TOTP authentication method
auth_method = user.get_totp_method()
if not auth_method:
raise InvalidCredentialsError("TOTP enrollment not found")
# Get secret from provider_data
secret = auth_method.provider_data.get("secret") if auth_method.provider_data else None
if not secret:
raise InvalidCredentialsError("TOTP secret not found")
# Verify the code
if not TOTPService.verify_code(secret, code):
raise InvalidCredentialsError("Invalid TOTP code")
# Mark TOTP as verified
auth_method.verified = True
auth_method.totp_verified_at = datetime.utcnow()
db.session.commit()
# Log TOTP enrollment completion
AuditService.log_action(
action=AuditAction.TOTP_ENROLL_COMPLETED,
user_id=user.id,
resource_type="authentication_method",
resource_id=auth_method.id,
description="TOTP enrollment completed",
)
return True
@staticmethod
def disable_totp(user: User, password: str) -> bool:
"""
Disable TOTP for a user.
Args:
user: User instance
password: User's current password for verification
Returns:
True if TOTP disabled successfully
Raises:
InvalidCredentialsError: If password is invalid or TOTP method not found
"""
# Verify user's password
auth_method = AuthenticationMethod.query.filter_by(
user_id=user.id,
method_type=AuthMethodType.PASSWORD,
deleted_at=None,
).first()
if not auth_method or not auth_method.password_hash:
raise InvalidCredentialsError("No password authentication method found")
if not bcrypt.check_password_hash(auth_method.password_hash, password):
raise InvalidCredentialsError("Invalid password")
# Get user's TOTP authentication method
totp_method = user.get_totp_method()
if not totp_method:
raise InvalidCredentialsError("TOTP is not enabled for this account")
# Soft-delete the TOTP authentication method
totp_method.delete(soft=True)
# Log TOTP disabled
AuditService.log_action(
action=AuditAction.TOTP_DISABLED,
user_id=user.id,
resource_type="authentication_method",
resource_id=totp_method.id,
description="TOTP disabled",
)
return True
@staticmethod
def authenticate_with_totp(user: User, code: str, is_backup_code: bool = False) -> bool:
"""
Verify TOTP code during login.
Args:
user: User instance
code: 6-digit TOTP code or backup code
is_backup_code: True if code is a backup code, False if TOTP code
Returns:
True if code is valid
Raises:
InvalidCredentialsError: If code is invalid or TOTP method not found
"""
# Get user's TOTP authentication method
auth_method = user.get_totp_method()
if not auth_method:
raise InvalidCredentialsError("TOTP is not enabled for this account")
if is_backup_code:
# Verify backup code
backup_codes = (
auth_method.provider_data.get("backup_codes")
if auth_method.provider_data
else []
)
is_valid, remaining_codes = TOTPService.verify_backup_code(backup_codes, code)
if is_valid:
# Update remaining backup codes
auth_method.provider_data = {
"secret": auth_method.provider_data.get("secret"),
"backup_codes": remaining_codes,
}
auth_method.last_used_at = datetime.utcnow()
db.session.commit()
# Log backup code usage
AuditService.log_action(
action=AuditAction.TOTP_BACKUP_CODE_USED,
user_id=user.id,
resource_type="authentication_method",
resource_id=auth_method.id,
description="Backup code used for authentication",
)
else:
# Log failed verification
AuditService.log_action(
action=AuditAction.TOTP_VERIFY_FAILED,
user_id=user.id,
resource_type="authentication_method",
resource_id=auth_method.id,
description="Invalid backup code provided",
)
raise InvalidCredentialsError("Invalid backup code")
else:
# Verify TOTP code
secret = (
auth_method.provider_data.get("secret")
if auth_method.provider_data
else None
)
if not secret:
raise InvalidCredentialsError("TOTP secret not found")
is_valid = TOTPService.verify_code(secret, code)
if is_valid:
auth_method.last_used_at = datetime.utcnow()
db.session.commit()
# Log successful verification
AuditService.log_action(
action=AuditAction.TOTP_VERIFY_SUCCESS,
user_id=user.id,
resource_type="authentication_method",
resource_id=auth_method.id,
description="TOTP code verified successfully",
)
else:
# Log failed verification
AuditService.log_action(
action=AuditAction.TOTP_VERIFY_FAILED,
user_id=user.id,
resource_type="authentication_method",
resource_id=auth_method.id,
description="Invalid TOTP code provided",
)
raise InvalidCredentialsError("Invalid TOTP code")
return True
@staticmethod
def regenerate_totp_backup_codes(user: User, password: str) -> list[str]:
"""
Generate new backup codes for TOTP.
Args:
user: User instance
password: User's current password for verification
Returns:
List of new plain text backup codes
Raises:
InvalidCredentialsError: If password is invalid or TOTP method not found
"""
# Verify user's password
auth_method = AuthenticationMethod.query.filter_by(
user_id=user.id,
method_type=AuthMethodType.PASSWORD,
deleted_at=None,
).first()
if not auth_method or not auth_method.password_hash:
raise InvalidCredentialsError("No password authentication method found")
if not bcrypt.check_password_hash(auth_method.password_hash, password):
raise InvalidCredentialsError("Invalid password")
# Get user's TOTP authentication method
totp_method = user.get_totp_method()
if not totp_method:
raise InvalidCredentialsError("TOTP is not enabled for this account")
# Generate new backup codes
backup_codes, hashed_backup_codes = TOTPService.generate_backup_codes()
# Update the authentication method with new backup codes
totp_method.provider_data = {
"secret": totp_method.provider_data.get("secret"),
"backup_codes": hashed_backup_codes,
}
db.session.commit()
# Log backup codes regeneration
AuditService.log_action(
action=AuditAction.TOTP_BACKUP_CODES_REGENERATED,
user_id=user.id,
resource_type="authentication_method",
resource_id=totp_method.id,
description="TOTP backup codes regenerated",
)
return backup_codes