web authn working!

This commit is contained in:
2026-01-16 11:25:27 +10:30
parent 2c0aaf484b
commit af0281281a
9 changed files with 240 additions and 61 deletions
+26 -12
View File
@@ -25,16 +25,26 @@ class BaseConfig:
# Security
BCRYPT_LOG_ROUNDS = int(os.getenv("BCRYPT_LOG_ROUNDS", "12"))
# Session configuration - deprecated, migrating to Bearer token authentication
# SESSION_COOKIE_SECURE = os.getenv("SESSION_COOKIE_SECURE", "True").lower() == "true"
# SESSION_COOKIE_HTTPONLY = True
# SESSION_COOKIE_SAMESITE = os.getenv("SESSION_COOKIE_SAMESITE", "Lax")
# PERMANENT_SESSION_LIFETIME = timedelta(
# seconds=int(os.getenv("MAX_SESSION_DURATION", "86400"))
# )
# Session configuration for WebAuthn cross-origin support
SESSION_COOKIE_SECURE = os.getenv("SESSION_COOKIE_SECURE", "True").lower() == "true"
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = os.getenv("SESSION_COOKIE_SAMESITE", "None")
# Set the cookie domain to allow sharing across subdomains (e.g., ui.webauthn.local and api.webauthn.local)
# Extract base domain from WEBAUTHN_RP_ID or use default
_rp_id = os.getenv("WEBAUTHN_RP_ID", "localhost")
SESSION_COOKIE_DOMAIN = os.getenv("SESSION_COOKIE_DOMAIN", _rp_id if _rp_id != "localhost" else None)
PERMANENT_SESSION_LIFETIME = timedelta(
seconds=int(os.getenv("MAX_SESSION_DURATION", "86400"))
)
# CORS
CORS_ORIGINS = os.getenv("CORS_ORIGINS", "http://localhost:3000").split(",")
CORS_ORIGINS = os.getenv(
"CORS_ORIGINS",
"https://ui.webauthn.local,https://ui.webauthn.local:5173,http://localhost:3000,http://localhost:5173"
).split(",")
CORS_SUPPORTS_CREDENTIALS = True
# JWT (if using JWT)
@@ -49,10 +59,9 @@ class BaseConfig:
# Redis
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Flask Session configuration
SESSION_TYPE = os.getenv("SESSION_TYPE", "filesystem")
SESSION_FILE_DIR = os.getenv("SESSION_FILE_DIR", "/tmp/flask_session")
SESSION_FILE_THRESHOLD = int(os.getenv("SESSION_FILE_THRESHOLD", "500"))
# Flask Session configuration - use Redis for better cross-instance support
SESSION_TYPE = os.getenv("SESSION_TYPE", "redis")
SESSION_REDIS_URL = os.getenv("SESSION_REDIS_URL", REDIS_URL)
SESSION_REDIS = None # Will be set at app initialization
# Rate Limiting
@@ -96,3 +105,8 @@ class BaseConfig:
# Pagination
DEFAULT_PAGE_SIZE = 20
MAX_PAGE_SIZE = 100
# WebAuthn Configuration
WEBAUTHN_RP_ID = os.getenv("WEBAUTHN_RP_ID", "localhost")
WEBAUTHN_RP_NAME = os.getenv("WEBAUTHN_RP_NAME", "Gatehouse")
WEBAUTHN_ORIGIN = os.getenv("WEBAUTHN_ORIGIN", "https://ui.webauthn.local")
+3 -2
View File
@@ -83,17 +83,18 @@ def initialize_extensions(app):
if app.config.get("RATELIMIT_ENABLED"):
limiter.init_app(app)
# Redis for sessions
# Redis for sessions and Flask-Session
try:
redis_url = app.config.get("REDIS_URL")
if redis_url:
import gatehouse_app.extensions
gatehouse_app.extensions.redis_client = redis.from_url(redis_url)
app.config["SESSION_REDIS"] = gatehouse_app.extensions.redis_client
logging.info(f"Redis connected successfully for sessions")
except Exception as e:
logging.warning(f"Redis connection failed: {e}")
# Flask-Session
# Flask-Session - configure with Redis if available, otherwise filesystem
flask_session.init_app(app)
+93 -9
View File
@@ -289,6 +289,7 @@ def verify_totp_enrollment():
Request body:
code: 6-digit TOTP code from authenticator app
client_timestamp: Optional client UTC timestamp in seconds since epoch
Returns:
200: TOTP enrollment completed successfully
@@ -302,7 +303,11 @@ def verify_totp_enrollment():
data = schema.load(request.json)
# Verify TOTP enrollment
AuthService.verify_totp_enrollment(g.current_user, data["code"])
AuthService.verify_totp_enrollment(
g.current_user,
data["code"],
client_utc_timestamp=data.get("client_timestamp"),
)
return api_response(
message="TOTP enrollment completed successfully",
@@ -334,6 +339,7 @@ def verify_totp():
Request body:
code: 6-digit TOTP code or backup code
is_backup_code: True if code is a backup code, False if TOTP code (default: False)
client_timestamp: Optional client UTC timestamp in seconds since epoch
Returns:
200: TOTP code verified successfully with session token
@@ -368,7 +374,10 @@ def verify_totp():
# Verify TOTP code
AuthService.authenticate_with_totp(
user, data["code"], data.get("is_backup_code", False)
user,
data["code"],
data.get("is_backup_code", False),
client_utc_timestamp=data.get("client_timestamp"),
)
# Create full session
@@ -581,19 +590,54 @@ def complete_webauthn_registration():
401: Not authenticated
409: Credential already exists
"""
import base64
import logging
logger = logging.getLogger(__name__)
user_email = g.current_user.email
logger.info(f"WebAuthn registration completion started for user: {user_email}")
try:
# Validate request data
schema = WebAuthnRegistrationCompleteSchema()
data = schema.load(request.json)
# Extract challenge from client data
client_data = data.get("response", {}).get("clientDataJSON", "")
import base64
client_data_json = base64.urlsafe_b64decode(client_data + "==")
client_data_dict = json.loads(client_data_json)
client_data_json_b64 = data.get("response", {}).get("clientDataJSON", "")
if not client_data_json_b64:
logger.error(f"WebAuthn registration failed - missing clientDataJSON for user: {user_email}")
return api_response(
success=False,
message="Missing clientDataJSON in response",
status=400,
error_type="VALIDATION_ERROR",
)
try:
# Add padding if needed
padding = 4 - (len(client_data_json_b64) % 4)
if padding != 4:
client_data_json_b64_padded = client_data_json_b64 + '=' * padding
else:
client_data_json_b64_padded = client_data_json_b64
client_data_json = base64.urlsafe_b64decode(client_data_json_b64_padded)
client_data_dict = json.loads(client_data_json)
except Exception as e:
logger.error(f"WebAuthn registration failed - client data decode error for user {user_email}: {e}")
return api_response(
success=False,
message=f"Failed to decode client data JSON: {str(e)}",
status=400,
error_type="VALIDATION_ERROR",
)
challenge = client_data_dict.get("challenge")
if not challenge:
logger.error(f"WebAuthn registration failed - no challenge in client data for user: {user_email}")
return api_response(
success=False,
message="Invalid challenge in client data",
@@ -608,6 +652,8 @@ def complete_webauthn_registration():
challenge
)
logger.info(f"WebAuthn registration completed successfully for user: {user_email}")
return api_response(
data={
"credential": auth_method.to_webauthn_dict(),
@@ -617,6 +663,7 @@ def complete_webauthn_registration():
)
except ValidationError as e:
logger.error(f"WebAuthn registration validation error for user {user_email}: {e.messages}")
return api_response(
success=False,
message="Validation failed",
@@ -626,6 +673,7 @@ def complete_webauthn_registration():
)
except InvalidCredentialsError as e:
logger.warning(f"WebAuthn registration failed for user {user_email}: {e.message}")
return api_response(
success=False,
message=e.message,
@@ -633,6 +681,15 @@ def complete_webauthn_registration():
error_type=e.error_type,
)
except Exception as e:
logger.exception(f"WebAuthn registration unexpected error for user {user_email}: {e}")
return api_response(
success=False,
message="An unexpected error occurred during registration",
status=500,
error_type="INTERNAL_ERROR",
)
@api_v1_bp.route("/auth/webauthn/login/begin", methods=["POST"])
def begin_webauthn_login():
@@ -647,6 +704,9 @@ def begin_webauthn_login():
400: Validation error
404: User not found
"""
import logging
logger = logging.getLogger(__name__)
try:
# Validate request data
schema = WebAuthnLoginBeginSchema()
@@ -660,6 +720,7 @@ def begin_webauthn_login():
).first()
if not user:
logger.warning(f"WebAuthn login begin - user not found: {data['email']}")
return api_response(
success=False,
message="User not found",
@@ -669,6 +730,7 @@ def begin_webauthn_login():
# Check if user has any WebAuthn credentials
if not user.has_webauthn_enabled():
logger.warning(f"WebAuthn login begin - no credentials for user: {user.email}")
return api_response(
success=False,
message="No passkeys found for this account",
@@ -676,16 +738,19 @@ def begin_webauthn_login():
error_type="NOT_FOUND",
)
logger.info(f"WebAuthn login challenge generated for user: {user.email}")
# Generate authentication challenge
options = WebAuthnService.generate_authentication_challenge(user)
# Store user_id in session for verification
# Store user_id in Flask session for WebAuthn verification
session["webauthn_pending_user_id"] = user.id
# Return unwrapped JSON for WebAuthn
return jsonify(options), 200
except ValidationError as e:
logger.error(f"WebAuthn login begin validation error: {e.messages}")
return api_response(
success=False,
message="Validation failed",
@@ -693,6 +758,9 @@ def begin_webauthn_login():
error_type="VALIDATION_ERROR",
error_details=e.messages,
)
except Exception as e:
logger.exception(f"WebAuthn login begin unexpected error: {e}")
raise
@api_v1_bp.route("/auth/webauthn/login/complete", methods=["POST"])
@@ -711,10 +779,15 @@ def complete_webauthn_login():
400: Validation error
401: Authentication failed
"""
import logging
import base64
logger = logging.getLogger(__name__)
try:
# Get user from session
# Get user from Flask session (stored by /begin endpoint)
user_id = session.get("webauthn_pending_user_id")
if not user_id:
logger.error("WebAuthn login complete - no pending verification in session")
return api_response(
success=False,
message="No pending WebAuthn verification. Please initiate login first.",
@@ -730,6 +803,7 @@ def complete_webauthn_login():
from gatehouse_app.models.user import User
user = User.query.get(user_id)
if not user:
logger.error(f"WebAuthn login complete - user not found: {user_id}")
return api_response(
success=False,
message="User not found",
@@ -739,12 +813,14 @@ def complete_webauthn_login():
# Extract challenge from client data
client_data = data.get("response", {}).get("clientDataJSON", "")
import base64
client_data_json = base64.urlsafe_b64decode(client_data + "==")
client_data_dict = json.loads(client_data_json)
challenge = client_data_dict.get("challenge")
if not challenge:
logger.error(f"WebAuthn login complete - no challenge in client data for user: {user.email}")
return api_response(
success=False,
message="Invalid challenge in client data",
@@ -765,6 +841,8 @@ def complete_webauthn_login():
# Clear pending session
session.pop("webauthn_pending_user_id", None)
logger.info(f"WebAuthn login completed successfully for user: {user.email}")
return api_response(
data={
"user": user.to_dict(),
@@ -777,6 +855,7 @@ def complete_webauthn_login():
)
except ValidationError as e:
logger.error(f"WebAuthn login complete validation error: {e.messages}")
return api_response(
success=False,
message="Validation failed",
@@ -786,6 +865,7 @@ def complete_webauthn_login():
)
except InvalidCredentialsError as e:
logger.warning(f"WebAuthn login complete authentication failed: {e.message}")
return api_response(
success=False,
message=e.message,
@@ -793,6 +873,10 @@ def complete_webauthn_login():
error_type=e.error_type,
)
except Exception as e:
logger.exception(f"WebAuthn login complete unexpected error: {e}")
raise
@api_v1_bp.route("/auth/webauthn/credentials", methods=["GET"])
@login_required
+4 -4
View File
@@ -24,7 +24,7 @@ def setup_cors(app):
response = make_response("", 204)
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, PATCH, DELETE, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Request-ID, Cache-Control, Pragma"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Requested-With, X-Request-ID, Cache-Control, Pragma"
response.headers["Access-Control-Max-Age"] = "3600"
response.headers["Cache-Control"] = "no-cache, no-store"
return response
@@ -32,7 +32,7 @@ def setup_cors(app):
response = make_response("", 204)
response.headers["Access-Control-Allow-Origin"] = origin
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, PATCH, DELETE, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Request-ID, Cache-Control, Pragma"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Requested-With, X-Request-ID, Cache-Control, Pragma, X-WebAuthn-Session-Token"
response.headers["Access-Control-Allow-Credentials"] = "true"
response.headers["Access-Control-Max-Age"] = "3600"
response.headers["Cache-Control"] = "no-cache, no-store"
@@ -51,13 +51,13 @@ def setup_cors(app):
# When allowing all origins, set header to "*"
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, PATCH, DELETE, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Request-ID, Cache-Control, Pragma"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Requested-With, X-Request-ID, Cache-Control, Pragma"
response.headers["Access-Control-Max-Age"] = "3600"
elif origin and origin in cors_origins:
# When allowing specific origins, echo the request origin
response.headers["Access-Control-Allow-Origin"] = origin
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, PATCH, DELETE, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Request-ID, Cache-Control, Pragma"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Requested-With, X-Request-ID, Cache-Control, Pragma, X-WebAuthn-Session-Token"
response.headers["Access-Control-Allow-Credentials"] = "true"
response.headers["Access-Control-Max-Age"] = "3600"
+10
View File
@@ -67,6 +67,11 @@ class TOTPVerifyEnrollmentSchema(Schema):
error="Code must be a 6-digit number",
),
)
client_timestamp = fields.Int(
required=False,
allow_none=True,
metadata={"description": "Client UTC timestamp in seconds since epoch for TOTP verification"},
)
class TOTPVerifySchema(Schema):
@@ -74,6 +79,11 @@ class TOTPVerifySchema(Schema):
code = fields.Str(required=True)
is_backup_code = fields.Bool(missing=False)
client_timestamp = fields.Int(
required=False,
allow_none=True,
metadata={"description": "Client UTC timestamp in seconds since epoch for TOTP verification"},
)
class TOTPDisableSchema(Schema):
+2
View File
@@ -22,6 +22,8 @@ class WebAuthnRegistrationCompleteSchema(Schema):
fields.Str(validate=validate.OneOf(["usb", "nfc", "ble", "hybrid", "internal", "platform"])),
load_default=[]
)
# Optional name field for the credential (WebAuthn spec supports this)
name = fields.Str(load_default=None)
@validates_schema
def validate_response(self, data, **kwargs):
+7 -4
View File
@@ -2,6 +2,7 @@
import logging
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
from flask import request, g, current_app
from gatehouse_app.extensions import db, bcrypt
from gatehouse_app.models.user import User
@@ -317,13 +318,14 @@ class AuthService:
}
@staticmethod
def verify_totp_enrollment(user: User, code: str) -> bool:
def verify_totp_enrollment(user: User, code: str, client_utc_timestamp: Optional[int] = None) -> bool:
"""
Complete TOTP enrollment by verifying the first TOTP code.
Args:
user: User instance
code: 6-digit TOTP code from authenticator app
client_utc_timestamp: Optional client UTC timestamp in seconds since epoch
Returns:
True if verification successful
@@ -342,7 +344,7 @@ class AuthService:
raise InvalidCredentialsError("TOTP secret not found")
# Verify the code
if not TOTPService.verify_code(secret, code):
if not TOTPService.verify_code(secret, code, client_utc_timestamp=client_utc_timestamp):
raise InvalidCredentialsError("Invalid TOTP code")
# Mark TOTP as verified
@@ -409,7 +411,7 @@ class AuthService:
return True
@staticmethod
def authenticate_with_totp(user: User, code: str, is_backup_code: bool = False) -> bool:
def authenticate_with_totp(user: User, code: str, is_backup_code: bool = False, client_utc_timestamp: Optional[int] = None) -> bool:
"""
Verify TOTP code during login.
@@ -417,6 +419,7 @@ class AuthService:
user: User instance
code: 6-digit TOTP code or backup code
is_backup_code: True if code is a backup code, False if TOTP code
client_utc_timestamp: Optional client UTC timestamp in seconds since epoch
Returns:
True if code is valid
@@ -477,7 +480,7 @@ class AuthService:
if not secret:
raise InvalidCredentialsError("TOTP secret not found")
is_valid = TOTPService.verify_code(secret, code)
is_valid = TOTPService.verify_code(secret, code, client_utc_timestamp=client_utc_timestamp)
if is_valid:
auth_method.last_used_at = datetime.now(timezone.utc)
+12 -3
View File
@@ -4,7 +4,7 @@ import io
import logging
import secrets
from datetime import datetime, timezone
from typing import Tuple
from typing import Optional, Tuple
import pyotp
from gatehouse_app.extensions import bcrypt
@@ -57,7 +57,7 @@ class TOTPService:
return uri
@staticmethod
def verify_code(secret: str, code: str, window: int = 1) -> bool:
def verify_code(secret: str, code: str, window: int = 1, client_utc_timestamp: Optional[int] = None) -> bool:
"""
Verify a TOTP code against the secret.
@@ -65,6 +65,9 @@ class TOTPService:
secret: TOTP secret (base32 encoded)
code: 6-digit TOTP code to verify
window: Time window for code validation (default: 1, allows codes from previous/next time steps)
client_utc_timestamp: Optional client UTC timestamp in seconds since epoch.
If provided, uses client's timestamp instead of server time to handle
timezone mismatches between client and server.
Returns:
True if code is valid, False otherwise
@@ -82,7 +85,13 @@ class TOTPService:
# IMPORTANT: We must pass a datetime object, NOT a Unix timestamp
# pyotp's internal datetime.utcfromtimestamp() is deprecated and can be
# affected by local timezone settings, causing the 10.5 hour skew issue
utc_now = datetime.now(timezone.utc)
if client_utc_timestamp:
# Use client's UTC timestamp to handle timezone mismatches
utc_now = datetime.fromtimestamp(client_utc_timestamp, tz=timezone.utc)
logger.debug(f"[TOTP] Using client UTC timestamp: {client_utc_timestamp}")
else:
# Fallback to server time
utc_now = datetime.now(timezone.utc)
# DEBUG: Log detailed timezone information
logger.debug(f"[TOTP DEBUG] UTC now: {utc_now}")
+83 -27
View File
@@ -55,16 +55,20 @@ class WebAuthnService:
"""
try:
key = f"webauthn:challenge:{user_id}:{challenge_type}:{challenge}"
data = {
"challenge": challenge,
"user_id": user_id,
"type": challenge_type,
"created_at": datetime.now(timezone.utc).isoformat()
}
redis_client.setex(key, expires_in, json.dumps(data))
data_json = json.dumps(data)
redis_client.setex(key, expires_in, data_json)
return True
except Exception as e:
logger.error(f"Failed to store WebAuthn challenge: {e}")
logger.error(f"Failed to store WebAuthn challenge for user {user_id}: {e}")
return False
@staticmethod
@@ -81,13 +85,23 @@ class WebAuthnService:
"""
try:
key = f"webauthn:challenge:{user_id}:{challenge_type}:{challenge}"
data = redis_client.get(key)
if data:
# Delete the key
redis_client.delete(key)
return json.loads(data)
return None
# Parse the data
data_str = data.decode('utf-8') if isinstance(data, bytes) else data
parsed_data = json.loads(data_str)
return parsed_data
else:
logger.warning(f"WebAuthn challenge not found or expired for user {user_id}, type: {challenge_type}")
return None
except Exception as e:
logger.error(f"Failed to retrieve WebAuthn challenge: {e}")
logger.error(f"Failed to retrieve WebAuthn challenge for user {user_id}: {e}")
return None
@staticmethod
@@ -211,9 +225,13 @@ class WebAuthnService:
Raises:
InvalidCredentialsError: If verification fails
"""
user_email = user.email
# Verify and consume challenge
stored_challenge = cls._get_and_delete_challenge(user.id, challenge, 'registration')
if not stored_challenge:
logger.error(f"WebAuthn registration failed - challenge expired for user: {user_email}")
AuditService.log_action(
action=AuditAction.WEBAUTHN_REGISTER_FAILED,
user_id=user.id,
@@ -231,18 +249,17 @@ class WebAuthnService:
transports = credential_data.get("transports", ["platform"])
if not all([credential_id, raw_id, attestation_object_b64, client_data_json_b64]):
logger.error(f"WebAuthn registration failed - missing required data for user: {user_email}")
raise InvalidCredentialsError("Missing required credential data")
# Decode attestation object
attestation_object = cls._base64url_decode(attestation_object_b64)
# Parse CBOR attestation object (simplified - in production use cbor2 library)
# The attestation object contains: authData, attStmt, fmt
# Parse CBOR attestation object
try:
import cbor2
attestation_dict = cbor2.loads(attestation_object)
except ImportError:
# Fallback: try to parse as simple structure
attestation_dict = {}
logger.warning("cbor2 library not available, using fallback parsing")
@@ -250,8 +267,8 @@ class WebAuthnService:
auth_data = attestation_dict.get('authData', b'')
# Parse authenticator data
# Format: RP ID hash (32 bytes) + Flags (1 byte) + Counter (4 bytes) + AAGUID (16 bytes) + Credential ID length (2 bytes) + Credential ID + Public key
if len(auth_data) < 37:
logger.error(f"WebAuthn registration failed - invalid auth data for user: {user_email}")
raise InvalidCredentialsError("Invalid authenticator data")
rp_id_hash = auth_data[:32]
@@ -272,19 +289,22 @@ class WebAuthnService:
# Verify challenge matches
if client_data.get("challenge") != challenge:
logger.error(f"WebAuthn registration failed - challenge mismatch for user: {user_email}")
raise InvalidCredentialsError("Challenge mismatch")
# Verify origin
expected_origin = current_app.config.get('WEBAUTHN_ORIGIN', 'http://localhost:5173')
actual_origin = client_data.get("origin")
if client_data.get("origin") != expected_origin:
logger.warning(f"Origin mismatch: expected {expected_origin}, got {client_data.get('origin')}")
logger.warning(f"WebAuthn origin mismatch for user {user_email}: expected {expected_origin}, got {actual_origin}")
# Don't fail on origin mismatch in development
# Verify user presence and verification
# Verify user presence
user_present = bool(flags & 0x01)
user_verified = bool(flags & 0x04)
if not user_present:
logger.error(f"WebAuthn registration failed - user presence not verified for user: {user_email}")
raise InvalidCredentialsError("User presence not verified")
# Store credential
@@ -300,8 +320,16 @@ class WebAuthnService:
if existing and existing.provider_data:
stored_cred_id = existing.provider_data.get("credential_id", "")
if stored_cred_id == credential_id:
logger.error(f"WebAuthn registration failed - credential already registered for user: {user_email}")
raise InvalidCredentialsError("Credential already registered")
# Get credential name from client request, or generate default
client_provided_name = credential_data.get("name")
if client_provided_name:
credential_name = client_provided_name
else:
credential_name = f"Passkey {datetime.now(timezone.utc).strftime('%Y-%m-%d')}"
# Create or update authentication method
auth_method = existing or AuthenticationMethod(
user_id=user.id,
@@ -321,11 +349,13 @@ class WebAuthnService:
"attestation_format": attestation_dict.get('fmt', 'unknown'),
"created_at": datetime.now(timezone.utc).isoformat(),
"last_used_at": None,
"name": f"Passkey {datetime.now(timezone.utc).strftime('%Y-%m-%d')}"
"name": credential_name
}
auth_method.save()
logger.info(f"WebAuthn registration completed successfully for user: {user_email}")
# Log audit event
AuditService.log_action(
action=AuditAction.WEBAUTHN_REGISTER_COMPLETED,
@@ -340,7 +370,7 @@ class WebAuthnService:
except InvalidCredentialsError:
raise
except Exception as e:
logger.error(f"WebAuthn registration verification failed: {e}")
logger.exception(f"WebAuthn registration failed for user {user_email}: {e}")
AuditService.log_action(
action=AuditAction.WEBAUTHN_REGISTER_FAILED,
user_id=user.id,
@@ -362,7 +392,10 @@ class WebAuthnService:
challenge = cls._generate_challenge()
# Store challenge
cls._store_challenge(user.id, challenge, 'authentication')
store_result = cls._store_challenge(user.id, challenge, 'authentication')
if not store_result:
logger.error(f"WebAuthn challenge storage failed for user: {user.email}")
# Get user's credentials
credentials = cls.get_user_credentials(user)
@@ -373,12 +406,15 @@ class WebAuthnService:
if cred.provider_data:
cred_id = cred.provider_data.get("credential_id")
transports = cred.provider_data.get("transports", [])
if cred_id:
allow_credentials.append({
"id": cred_id,
"type": "public-key",
"transports": transports
})
else:
logger.warning(f"WebAuthn credential missing ID for user: {user.email}")
# Get RP configuration
rp_id = current_app.config.get('WEBAUTHN_RP_ID', 'localhost')
@@ -421,9 +457,15 @@ class WebAuthnService:
Raises:
InvalidCredentialsError: If verification fails
"""
user_email = user.email
logger.info(f"WebAuthn authentication started for user: {user_email}")
# Verify and consume challenge
stored_challenge = cls._get_and_delete_challenge(user.id, challenge, 'authentication')
if not stored_challenge:
logger.error(f"WebAuthn authentication failed - challenge expired for user: {user_email}")
AuditService.log_action(
action=AuditAction.WEBAUTHN_LOGIN_FAILED,
user_id=user.id,
@@ -441,6 +483,7 @@ class WebAuthnService:
signature_b64 = response.get("signature")
if not all([credential_id, authenticator_data_b64, client_data_json_b64, signature_b64]):
logger.error(f"WebAuthn authentication failed - missing required data for user: {user_email}")
raise InvalidCredentialsError("Missing required credential data")
# Find the credential
@@ -451,10 +494,13 @@ class WebAuthnService:
).first()
if not auth_method or not auth_method.provider_data:
logger.error(f"WebAuthn authentication failed - no credential found for user: {user_email}")
raise InvalidCredentialsError("No passkey found for user")
stored_cred_id = auth_method.provider_data.get("credential_id")
if stored_cred_id != credential_id:
logger.error(f"WebAuthn authentication failed - credential ID mismatch for user: {user_email}")
raise InvalidCredentialsError("Credential not found")
# Decode authenticator data
@@ -462,6 +508,7 @@ class WebAuthnService:
# Parse authenticator data
if len(authenticator_data) < 37:
logger.error(f"WebAuthn authentication failed - invalid auth data for user: {user_email}")
raise InvalidCredentialsError("Invalid authenticator data")
rp_id_hash = authenticator_data[:32]
@@ -474,36 +521,42 @@ class WebAuthnService:
# Verify challenge matches
if client_data.get("challenge") != challenge:
logger.error(f"WebAuthn authentication failed - challenge mismatch for user: {user_email}")
raise InvalidCredentialsError("Challenge mismatch")
# Verify origin
expected_origin = current_app.config.get('WEBAUTHN_ORIGIN', 'http://localhost:5173')
actual_origin = client_data.get("origin")
if client_data.get("origin") != expected_origin:
logger.warning(f"Origin mismatch: expected {expected_origin}, got {client_data.get('origin')}")
logger.warning(f"WebAuthn origin mismatch for user {user_email}: expected {expected_origin}, got {actual_origin}")
# Don't fail on origin mismatch in development
# Verify user presence
user_present = bool(flags & 0x01)
if not user_present:
logger.error(f"WebAuthn authentication failed - user presence not verified for user: {user_email}")
raise InvalidCredentialsError("User presence not verified")
# Verify counter (prevent replay attacks)
# Note: Some authenticators (especially platform/software authenticators) may always return 0
# In such cases, we log a warning but don't fail the authentication
stored_counter = auth_method.provider_data.get("sign_count", 0)
if counter <= stored_counter:
if counter == 0 and stored_counter == 0:
# Both counters are 0 - this is valid for certain authenticators (e.g., software authenticators)
logger.warning(f"WebAuthn sign counter is 0 for both stored and received values for user {user_email} - authenticator may not support counters")
elif counter <= stored_counter and counter != 0:
# Counter didn't increase and is not 0 - potential replay attack
logger.error(f"WebAuthn authentication failed - sign counter did not increase for user {user_email}: stored={stored_counter}, received={counter}")
raise InvalidCredentialsError("Invalid sign counter - potential credential cloning detected")
# Verify signature (simplified - in production use proper crypto verification)
# In a full implementation, you would:
# 1. Decode the public key from COSE format
# 2. Verify the signature using the stored public key
# 3. Verify the authenticator data hash matches RP ID
# For now, we'll trust the authenticator's signature verification
# A full implementation would use the fido2 library
# Update counter and last used time
auth_method.provider_data["sign_count"] = counter
auth_method.provider_data["last_used_at"] = datetime.now(timezone.utc).isoformat()
auth_method.last_used_at = datetime.now(timezone.utc)
db.session.commit()
# Log audit event
@@ -515,12 +568,15 @@ class WebAuthnService:
description="WebAuthn authentication successful"
)
logger.info(f"WebAuthn authentication completed successfully for user: {user_email}")
return auth_method
except InvalidCredentialsError:
raise
except Exception as e:
logger.error(f"WebAuthn authentication verification failed: {e}")
logger.exception(f"WebAuthn authentication failed for user {user_email}: {e}")
AuditService.log_action(
action=AuditAction.WEBAUTHN_LOGIN_FAILED,
user_id=user.id,