diff --git a/config/base.py b/config/base.py index 2bf5690..09548b6 100644 --- a/config/base.py +++ b/config/base.py @@ -25,16 +25,26 @@ class BaseConfig: # Security BCRYPT_LOG_ROUNDS = int(os.getenv("BCRYPT_LOG_ROUNDS", "12")) - # Session configuration - deprecated, migrating to Bearer token authentication - # SESSION_COOKIE_SECURE = os.getenv("SESSION_COOKIE_SECURE", "True").lower() == "true" - # SESSION_COOKIE_HTTPONLY = True - # SESSION_COOKIE_SAMESITE = os.getenv("SESSION_COOKIE_SAMESITE", "Lax") - # PERMANENT_SESSION_LIFETIME = timedelta( - # seconds=int(os.getenv("MAX_SESSION_DURATION", "86400")) - # ) + + # Session configuration for WebAuthn cross-origin support + SESSION_COOKIE_SECURE = os.getenv("SESSION_COOKIE_SECURE", "True").lower() == "true" + SESSION_COOKIE_HTTPONLY = True + SESSION_COOKIE_SAMESITE = os.getenv("SESSION_COOKIE_SAMESITE", "None") + + # Set the cookie domain to allow sharing across subdomains (e.g., ui.webauthn.local and api.webauthn.local) + # Extract base domain from WEBAUTHN_RP_ID or use default + _rp_id = os.getenv("WEBAUTHN_RP_ID", "localhost") + SESSION_COOKIE_DOMAIN = os.getenv("SESSION_COOKIE_DOMAIN", _rp_id if _rp_id != "localhost" else None) + + PERMANENT_SESSION_LIFETIME = timedelta( + seconds=int(os.getenv("MAX_SESSION_DURATION", "86400")) + ) # CORS - CORS_ORIGINS = os.getenv("CORS_ORIGINS", "http://localhost:3000").split(",") + CORS_ORIGINS = os.getenv( + "CORS_ORIGINS", + "https://ui.webauthn.local,https://ui.webauthn.local:5173,http://localhost:3000,http://localhost:5173" + ).split(",") CORS_SUPPORTS_CREDENTIALS = True # JWT (if using JWT) @@ -49,10 +59,9 @@ class BaseConfig: # Redis REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") - # Flask Session configuration - SESSION_TYPE = os.getenv("SESSION_TYPE", "filesystem") - SESSION_FILE_DIR = os.getenv("SESSION_FILE_DIR", "/tmp/flask_session") - SESSION_FILE_THRESHOLD = int(os.getenv("SESSION_FILE_THRESHOLD", "500")) + # Flask Session configuration - use Redis for better cross-instance support + SESSION_TYPE = os.getenv("SESSION_TYPE", "redis") + SESSION_REDIS_URL = os.getenv("SESSION_REDIS_URL", REDIS_URL) SESSION_REDIS = None # Will be set at app initialization # Rate Limiting @@ -96,3 +105,8 @@ class BaseConfig: # Pagination DEFAULT_PAGE_SIZE = 20 MAX_PAGE_SIZE = 100 + + # WebAuthn Configuration + WEBAUTHN_RP_ID = os.getenv("WEBAUTHN_RP_ID", "localhost") + WEBAUTHN_RP_NAME = os.getenv("WEBAUTHN_RP_NAME", "Gatehouse") + WEBAUTHN_ORIGIN = os.getenv("WEBAUTHN_ORIGIN", "https://ui.webauthn.local") diff --git a/gatehouse_app/__init__.py b/gatehouse_app/__init__.py index 15ce775..a4e1043 100644 --- a/gatehouse_app/__init__.py +++ b/gatehouse_app/__init__.py @@ -83,17 +83,18 @@ def initialize_extensions(app): if app.config.get("RATELIMIT_ENABLED"): limiter.init_app(app) - # Redis for sessions + # Redis for sessions and Flask-Session try: redis_url = app.config.get("REDIS_URL") if redis_url: import gatehouse_app.extensions gatehouse_app.extensions.redis_client = redis.from_url(redis_url) app.config["SESSION_REDIS"] = gatehouse_app.extensions.redis_client + logging.info(f"Redis connected successfully for sessions") except Exception as e: logging.warning(f"Redis connection failed: {e}") - # Flask-Session + # Flask-Session - configure with Redis if available, otherwise filesystem flask_session.init_app(app) diff --git a/gatehouse_app/api/v1/auth.py b/gatehouse_app/api/v1/auth.py index 4edfe9f..36fdff9 100644 --- a/gatehouse_app/api/v1/auth.py +++ b/gatehouse_app/api/v1/auth.py @@ -289,6 +289,7 @@ def verify_totp_enrollment(): Request body: code: 6-digit TOTP code from authenticator app + client_timestamp: Optional client UTC timestamp in seconds since epoch Returns: 200: TOTP enrollment completed successfully @@ -302,7 +303,11 @@ def verify_totp_enrollment(): data = schema.load(request.json) # Verify TOTP enrollment - AuthService.verify_totp_enrollment(g.current_user, data["code"]) + AuthService.verify_totp_enrollment( + g.current_user, + data["code"], + client_utc_timestamp=data.get("client_timestamp"), + ) return api_response( message="TOTP enrollment completed successfully", @@ -334,6 +339,7 @@ def verify_totp(): Request body: code: 6-digit TOTP code or backup code is_backup_code: True if code is a backup code, False if TOTP code (default: False) + client_timestamp: Optional client UTC timestamp in seconds since epoch Returns: 200: TOTP code verified successfully with session token @@ -368,7 +374,10 @@ def verify_totp(): # Verify TOTP code AuthService.authenticate_with_totp( - user, data["code"], data.get("is_backup_code", False) + user, + data["code"], + data.get("is_backup_code", False), + client_utc_timestamp=data.get("client_timestamp"), ) # Create full session @@ -581,19 +590,54 @@ def complete_webauthn_registration(): 401: Not authenticated 409: Credential already exists """ + import base64 + import logging + logger = logging.getLogger(__name__) + + user_email = g.current_user.email + logger.info(f"WebAuthn registration completion started for user: {user_email}") + try: # Validate request data schema = WebAuthnRegistrationCompleteSchema() data = schema.load(request.json) # Extract challenge from client data - client_data = data.get("response", {}).get("clientDataJSON", "") - import base64 - client_data_json = base64.urlsafe_b64decode(client_data + "==") - client_data_dict = json.loads(client_data_json) + client_data_json_b64 = data.get("response", {}).get("clientDataJSON", "") + + if not client_data_json_b64: + logger.error(f"WebAuthn registration failed - missing clientDataJSON for user: {user_email}") + return api_response( + success=False, + message="Missing clientDataJSON in response", + status=400, + error_type="VALIDATION_ERROR", + ) + + try: + # Add padding if needed + padding = 4 - (len(client_data_json_b64) % 4) + if padding != 4: + client_data_json_b64_padded = client_data_json_b64 + '=' * padding + else: + client_data_json_b64_padded = client_data_json_b64 + + client_data_json = base64.urlsafe_b64decode(client_data_json_b64_padded) + client_data_dict = json.loads(client_data_json) + + except Exception as e: + logger.error(f"WebAuthn registration failed - client data decode error for user {user_email}: {e}") + return api_response( + success=False, + message=f"Failed to decode client data JSON: {str(e)}", + status=400, + error_type="VALIDATION_ERROR", + ) + challenge = client_data_dict.get("challenge") if not challenge: + logger.error(f"WebAuthn registration failed - no challenge in client data for user: {user_email}") return api_response( success=False, message="Invalid challenge in client data", @@ -608,6 +652,8 @@ def complete_webauthn_registration(): challenge ) + logger.info(f"WebAuthn registration completed successfully for user: {user_email}") + return api_response( data={ "credential": auth_method.to_webauthn_dict(), @@ -617,6 +663,7 @@ def complete_webauthn_registration(): ) except ValidationError as e: + logger.error(f"WebAuthn registration validation error for user {user_email}: {e.messages}") return api_response( success=False, message="Validation failed", @@ -626,12 +673,22 @@ def complete_webauthn_registration(): ) except InvalidCredentialsError as e: + logger.warning(f"WebAuthn registration failed for user {user_email}: {e.message}") return api_response( success=False, message=e.message, status=e.status_code, error_type=e.error_type, ) + + except Exception as e: + logger.exception(f"WebAuthn registration unexpected error for user {user_email}: {e}") + return api_response( + success=False, + message="An unexpected error occurred during registration", + status=500, + error_type="INTERNAL_ERROR", + ) @api_v1_bp.route("/auth/webauthn/login/begin", methods=["POST"]) @@ -647,6 +704,9 @@ def begin_webauthn_login(): 400: Validation error 404: User not found """ + import logging + logger = logging.getLogger(__name__) + try: # Validate request data schema = WebAuthnLoginBeginSchema() @@ -660,6 +720,7 @@ def begin_webauthn_login(): ).first() if not user: + logger.warning(f"WebAuthn login begin - user not found: {data['email']}") return api_response( success=False, message="User not found", @@ -669,6 +730,7 @@ def begin_webauthn_login(): # Check if user has any WebAuthn credentials if not user.has_webauthn_enabled(): + logger.warning(f"WebAuthn login begin - no credentials for user: {user.email}") return api_response( success=False, message="No passkeys found for this account", @@ -676,16 +738,19 @@ def begin_webauthn_login(): error_type="NOT_FOUND", ) + logger.info(f"WebAuthn login challenge generated for user: {user.email}") + # Generate authentication challenge options = WebAuthnService.generate_authentication_challenge(user) - # Store user_id in session for verification + # Store user_id in Flask session for WebAuthn verification session["webauthn_pending_user_id"] = user.id # Return unwrapped JSON for WebAuthn return jsonify(options), 200 except ValidationError as e: + logger.error(f"WebAuthn login begin validation error: {e.messages}") return api_response( success=False, message="Validation failed", @@ -693,6 +758,9 @@ def begin_webauthn_login(): error_type="VALIDATION_ERROR", error_details=e.messages, ) + except Exception as e: + logger.exception(f"WebAuthn login begin unexpected error: {e}") + raise @api_v1_bp.route("/auth/webauthn/login/complete", methods=["POST"]) @@ -711,10 +779,15 @@ def complete_webauthn_login(): 400: Validation error 401: Authentication failed """ + import logging + import base64 + logger = logging.getLogger(__name__) + try: - # Get user from session + # Get user from Flask session (stored by /begin endpoint) user_id = session.get("webauthn_pending_user_id") if not user_id: + logger.error("WebAuthn login complete - no pending verification in session") return api_response( success=False, message="No pending WebAuthn verification. Please initiate login first.", @@ -730,6 +803,7 @@ def complete_webauthn_login(): from gatehouse_app.models.user import User user = User.query.get(user_id) if not user: + logger.error(f"WebAuthn login complete - user not found: {user_id}") return api_response( success=False, message="User not found", @@ -739,12 +813,14 @@ def complete_webauthn_login(): # Extract challenge from client data client_data = data.get("response", {}).get("clientDataJSON", "") - import base64 + client_data_json = base64.urlsafe_b64decode(client_data + "==") client_data_dict = json.loads(client_data_json) + challenge = client_data_dict.get("challenge") if not challenge: + logger.error(f"WebAuthn login complete - no challenge in client data for user: {user.email}") return api_response( success=False, message="Invalid challenge in client data", @@ -765,6 +841,8 @@ def complete_webauthn_login(): # Clear pending session session.pop("webauthn_pending_user_id", None) + logger.info(f"WebAuthn login completed successfully for user: {user.email}") + return api_response( data={ "user": user.to_dict(), @@ -777,6 +855,7 @@ def complete_webauthn_login(): ) except ValidationError as e: + logger.error(f"WebAuthn login complete validation error: {e.messages}") return api_response( success=False, message="Validation failed", @@ -786,12 +865,17 @@ def complete_webauthn_login(): ) except InvalidCredentialsError as e: + logger.warning(f"WebAuthn login complete authentication failed: {e.message}") return api_response( success=False, message=e.message, status=e.status_code, error_type=e.error_type, ) + + except Exception as e: + logger.exception(f"WebAuthn login complete unexpected error: {e}") + raise @api_v1_bp.route("/auth/webauthn/credentials", methods=["GET"]) diff --git a/gatehouse_app/middleware/cors.py b/gatehouse_app/middleware/cors.py index 4c088a8..defe68c 100644 --- a/gatehouse_app/middleware/cors.py +++ b/gatehouse_app/middleware/cors.py @@ -24,7 +24,7 @@ def setup_cors(app): response = make_response("", 204) response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, PATCH, DELETE, OPTIONS" - response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Request-ID, Cache-Control, Pragma" + response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Requested-With, X-Request-ID, Cache-Control, Pragma" response.headers["Access-Control-Max-Age"] = "3600" response.headers["Cache-Control"] = "no-cache, no-store" return response @@ -32,7 +32,7 @@ def setup_cors(app): response = make_response("", 204) response.headers["Access-Control-Allow-Origin"] = origin response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, PATCH, DELETE, OPTIONS" - response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Request-ID, Cache-Control, Pragma" + response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Requested-With, X-Request-ID, Cache-Control, Pragma, X-WebAuthn-Session-Token" response.headers["Access-Control-Allow-Credentials"] = "true" response.headers["Access-Control-Max-Age"] = "3600" response.headers["Cache-Control"] = "no-cache, no-store" @@ -51,13 +51,13 @@ def setup_cors(app): # When allowing all origins, set header to "*" response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, PATCH, DELETE, OPTIONS" - response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Request-ID, Cache-Control, Pragma" + response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Requested-With, X-Request-ID, Cache-Control, Pragma" response.headers["Access-Control-Max-Age"] = "3600" elif origin and origin in cors_origins: # When allowing specific origins, echo the request origin response.headers["Access-Control-Allow-Origin"] = origin response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, PATCH, DELETE, OPTIONS" - response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Request-ID, Cache-Control, Pragma" + response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization, X-Requested-With, X-Request-ID, Cache-Control, Pragma, X-WebAuthn-Session-Token" response.headers["Access-Control-Allow-Credentials"] = "true" response.headers["Access-Control-Max-Age"] = "3600" diff --git a/gatehouse_app/schemas/auth_schema.py b/gatehouse_app/schemas/auth_schema.py index 360b92a..f865671 100644 --- a/gatehouse_app/schemas/auth_schema.py +++ b/gatehouse_app/schemas/auth_schema.py @@ -67,6 +67,11 @@ class TOTPVerifyEnrollmentSchema(Schema): error="Code must be a 6-digit number", ), ) + client_timestamp = fields.Int( + required=False, + allow_none=True, + metadata={"description": "Client UTC timestamp in seconds since epoch for TOTP verification"}, + ) class TOTPVerifySchema(Schema): @@ -74,6 +79,11 @@ class TOTPVerifySchema(Schema): code = fields.Str(required=True) is_backup_code = fields.Bool(missing=False) + client_timestamp = fields.Int( + required=False, + allow_none=True, + metadata={"description": "Client UTC timestamp in seconds since epoch for TOTP verification"}, + ) class TOTPDisableSchema(Schema): diff --git a/gatehouse_app/schemas/webauthn_schema.py b/gatehouse_app/schemas/webauthn_schema.py index 6807a8d..4018a6a 100644 --- a/gatehouse_app/schemas/webauthn_schema.py +++ b/gatehouse_app/schemas/webauthn_schema.py @@ -22,6 +22,8 @@ class WebAuthnRegistrationCompleteSchema(Schema): fields.Str(validate=validate.OneOf(["usb", "nfc", "ble", "hybrid", "internal", "platform"])), load_default=[] ) + # Optional name field for the credential (WebAuthn spec supports this) + name = fields.Str(load_default=None) @validates_schema def validate_response(self, data, **kwargs): diff --git a/gatehouse_app/services/auth_service.py b/gatehouse_app/services/auth_service.py index 488f11a..b1d833e 100644 --- a/gatehouse_app/services/auth_service.py +++ b/gatehouse_app/services/auth_service.py @@ -2,6 +2,7 @@ import logging import secrets from datetime import datetime, timedelta, timezone +from typing import Optional from flask import request, g, current_app from gatehouse_app.extensions import db, bcrypt from gatehouse_app.models.user import User @@ -317,13 +318,14 @@ class AuthService: } @staticmethod - def verify_totp_enrollment(user: User, code: str) -> bool: + def verify_totp_enrollment(user: User, code: str, client_utc_timestamp: Optional[int] = None) -> bool: """ Complete TOTP enrollment by verifying the first TOTP code. Args: user: User instance code: 6-digit TOTP code from authenticator app + client_utc_timestamp: Optional client UTC timestamp in seconds since epoch Returns: True if verification successful @@ -342,7 +344,7 @@ class AuthService: raise InvalidCredentialsError("TOTP secret not found") # Verify the code - if not TOTPService.verify_code(secret, code): + if not TOTPService.verify_code(secret, code, client_utc_timestamp=client_utc_timestamp): raise InvalidCredentialsError("Invalid TOTP code") # Mark TOTP as verified @@ -409,7 +411,7 @@ class AuthService: return True @staticmethod - def authenticate_with_totp(user: User, code: str, is_backup_code: bool = False) -> bool: + def authenticate_with_totp(user: User, code: str, is_backup_code: bool = False, client_utc_timestamp: Optional[int] = None) -> bool: """ Verify TOTP code during login. @@ -417,6 +419,7 @@ class AuthService: user: User instance code: 6-digit TOTP code or backup code is_backup_code: True if code is a backup code, False if TOTP code + client_utc_timestamp: Optional client UTC timestamp in seconds since epoch Returns: True if code is valid @@ -477,7 +480,7 @@ class AuthService: if not secret: raise InvalidCredentialsError("TOTP secret not found") - is_valid = TOTPService.verify_code(secret, code) + is_valid = TOTPService.verify_code(secret, code, client_utc_timestamp=client_utc_timestamp) if is_valid: auth_method.last_used_at = datetime.now(timezone.utc) diff --git a/gatehouse_app/services/totp_service.py b/gatehouse_app/services/totp_service.py index cf89041..0b61c43 100644 --- a/gatehouse_app/services/totp_service.py +++ b/gatehouse_app/services/totp_service.py @@ -4,7 +4,7 @@ import io import logging import secrets from datetime import datetime, timezone -from typing import Tuple +from typing import Optional, Tuple import pyotp from gatehouse_app.extensions import bcrypt @@ -57,7 +57,7 @@ class TOTPService: return uri @staticmethod - def verify_code(secret: str, code: str, window: int = 1) -> bool: + def verify_code(secret: str, code: str, window: int = 1, client_utc_timestamp: Optional[int] = None) -> bool: """ Verify a TOTP code against the secret. @@ -65,6 +65,9 @@ class TOTPService: secret: TOTP secret (base32 encoded) code: 6-digit TOTP code to verify window: Time window for code validation (default: 1, allows codes from previous/next time steps) + client_utc_timestamp: Optional client UTC timestamp in seconds since epoch. + If provided, uses client's timestamp instead of server time to handle + timezone mismatches between client and server. Returns: True if code is valid, False otherwise @@ -82,7 +85,13 @@ class TOTPService: # IMPORTANT: We must pass a datetime object, NOT a Unix timestamp # pyotp's internal datetime.utcfromtimestamp() is deprecated and can be # affected by local timezone settings, causing the 10.5 hour skew issue - utc_now = datetime.now(timezone.utc) + if client_utc_timestamp: + # Use client's UTC timestamp to handle timezone mismatches + utc_now = datetime.fromtimestamp(client_utc_timestamp, tz=timezone.utc) + logger.debug(f"[TOTP] Using client UTC timestamp: {client_utc_timestamp}") + else: + # Fallback to server time + utc_now = datetime.now(timezone.utc) # DEBUG: Log detailed timezone information logger.debug(f"[TOTP DEBUG] UTC now: {utc_now}") diff --git a/gatehouse_app/services/webauthn_service.py b/gatehouse_app/services/webauthn_service.py index 9a5110f..db16612 100644 --- a/gatehouse_app/services/webauthn_service.py +++ b/gatehouse_app/services/webauthn_service.py @@ -55,16 +55,20 @@ class WebAuthnService: """ try: key = f"webauthn:challenge:{user_id}:{challenge_type}:{challenge}" + data = { "challenge": challenge, "user_id": user_id, "type": challenge_type, "created_at": datetime.now(timezone.utc).isoformat() } - redis_client.setex(key, expires_in, json.dumps(data)) + + data_json = json.dumps(data) + redis_client.setex(key, expires_in, data_json) + return True except Exception as e: - logger.error(f"Failed to store WebAuthn challenge: {e}") + logger.error(f"Failed to store WebAuthn challenge for user {user_id}: {e}") return False @staticmethod @@ -81,13 +85,23 @@ class WebAuthnService: """ try: key = f"webauthn:challenge:{user_id}:{challenge_type}:{challenge}" + data = redis_client.get(key) + if data: + # Delete the key redis_client.delete(key) - return json.loads(data) - return None + + # Parse the data + data_str = data.decode('utf-8') if isinstance(data, bytes) else data + parsed_data = json.loads(data_str) + + return parsed_data + else: + logger.warning(f"WebAuthn challenge not found or expired for user {user_id}, type: {challenge_type}") + return None except Exception as e: - logger.error(f"Failed to retrieve WebAuthn challenge: {e}") + logger.error(f"Failed to retrieve WebAuthn challenge for user {user_id}: {e}") return None @staticmethod @@ -211,9 +225,13 @@ class WebAuthnService: Raises: InvalidCredentialsError: If verification fails """ + user_email = user.email + # Verify and consume challenge stored_challenge = cls._get_and_delete_challenge(user.id, challenge, 'registration') + if not stored_challenge: + logger.error(f"WebAuthn registration failed - challenge expired for user: {user_email}") AuditService.log_action( action=AuditAction.WEBAUTHN_REGISTER_FAILED, user_id=user.id, @@ -231,18 +249,17 @@ class WebAuthnService: transports = credential_data.get("transports", ["platform"]) if not all([credential_id, raw_id, attestation_object_b64, client_data_json_b64]): + logger.error(f"WebAuthn registration failed - missing required data for user: {user_email}") raise InvalidCredentialsError("Missing required credential data") # Decode attestation object attestation_object = cls._base64url_decode(attestation_object_b64) - # Parse CBOR attestation object (simplified - in production use cbor2 library) - # The attestation object contains: authData, attStmt, fmt + # Parse CBOR attestation object try: import cbor2 attestation_dict = cbor2.loads(attestation_object) except ImportError: - # Fallback: try to parse as simple structure attestation_dict = {} logger.warning("cbor2 library not available, using fallback parsing") @@ -250,8 +267,8 @@ class WebAuthnService: auth_data = attestation_dict.get('authData', b'') # Parse authenticator data - # Format: RP ID hash (32 bytes) + Flags (1 byte) + Counter (4 bytes) + AAGUID (16 bytes) + Credential ID length (2 bytes) + Credential ID + Public key if len(auth_data) < 37: + logger.error(f"WebAuthn registration failed - invalid auth data for user: {user_email}") raise InvalidCredentialsError("Invalid authenticator data") rp_id_hash = auth_data[:32] @@ -272,19 +289,22 @@ class WebAuthnService: # Verify challenge matches if client_data.get("challenge") != challenge: + logger.error(f"WebAuthn registration failed - challenge mismatch for user: {user_email}") raise InvalidCredentialsError("Challenge mismatch") # Verify origin expected_origin = current_app.config.get('WEBAUTHN_ORIGIN', 'http://localhost:5173') + actual_origin = client_data.get("origin") + if client_data.get("origin") != expected_origin: - logger.warning(f"Origin mismatch: expected {expected_origin}, got {client_data.get('origin')}") + logger.warning(f"WebAuthn origin mismatch for user {user_email}: expected {expected_origin}, got {actual_origin}") # Don't fail on origin mismatch in development - # Verify user presence and verification + # Verify user presence user_present = bool(flags & 0x01) - user_verified = bool(flags & 0x04) if not user_present: + logger.error(f"WebAuthn registration failed - user presence not verified for user: {user_email}") raise InvalidCredentialsError("User presence not verified") # Store credential @@ -300,8 +320,16 @@ class WebAuthnService: if existing and existing.provider_data: stored_cred_id = existing.provider_data.get("credential_id", "") if stored_cred_id == credential_id: + logger.error(f"WebAuthn registration failed - credential already registered for user: {user_email}") raise InvalidCredentialsError("Credential already registered") + # Get credential name from client request, or generate default + client_provided_name = credential_data.get("name") + if client_provided_name: + credential_name = client_provided_name + else: + credential_name = f"Passkey {datetime.now(timezone.utc).strftime('%Y-%m-%d')}" + # Create or update authentication method auth_method = existing or AuthenticationMethod( user_id=user.id, @@ -321,11 +349,13 @@ class WebAuthnService: "attestation_format": attestation_dict.get('fmt', 'unknown'), "created_at": datetime.now(timezone.utc).isoformat(), "last_used_at": None, - "name": f"Passkey {datetime.now(timezone.utc).strftime('%Y-%m-%d')}" + "name": credential_name } auth_method.save() + logger.info(f"WebAuthn registration completed successfully for user: {user_email}") + # Log audit event AuditService.log_action( action=AuditAction.WEBAUTHN_REGISTER_COMPLETED, @@ -340,7 +370,7 @@ class WebAuthnService: except InvalidCredentialsError: raise except Exception as e: - logger.error(f"WebAuthn registration verification failed: {e}") + logger.exception(f"WebAuthn registration failed for user {user_email}: {e}") AuditService.log_action( action=AuditAction.WEBAUTHN_REGISTER_FAILED, user_id=user.id, @@ -362,7 +392,10 @@ class WebAuthnService: challenge = cls._generate_challenge() # Store challenge - cls._store_challenge(user.id, challenge, 'authentication') + store_result = cls._store_challenge(user.id, challenge, 'authentication') + + if not store_result: + logger.error(f"WebAuthn challenge storage failed for user: {user.email}") # Get user's credentials credentials = cls.get_user_credentials(user) @@ -373,12 +406,15 @@ class WebAuthnService: if cred.provider_data: cred_id = cred.provider_data.get("credential_id") transports = cred.provider_data.get("transports", []) + if cred_id: allow_credentials.append({ "id": cred_id, "type": "public-key", "transports": transports }) + else: + logger.warning(f"WebAuthn credential missing ID for user: {user.email}") # Get RP configuration rp_id = current_app.config.get('WEBAUTHN_RP_ID', 'localhost') @@ -421,9 +457,15 @@ class WebAuthnService: Raises: InvalidCredentialsError: If verification fails """ + user_email = user.email + logger.info(f"WebAuthn authentication started for user: {user_email}") + # Verify and consume challenge stored_challenge = cls._get_and_delete_challenge(user.id, challenge, 'authentication') + if not stored_challenge: + logger.error(f"WebAuthn authentication failed - challenge expired for user: {user_email}") + AuditService.log_action( action=AuditAction.WEBAUTHN_LOGIN_FAILED, user_id=user.id, @@ -441,6 +483,7 @@ class WebAuthnService: signature_b64 = response.get("signature") if not all([credential_id, authenticator_data_b64, client_data_json_b64, signature_b64]): + logger.error(f"WebAuthn authentication failed - missing required data for user: {user_email}") raise InvalidCredentialsError("Missing required credential data") # Find the credential @@ -451,10 +494,13 @@ class WebAuthnService: ).first() if not auth_method or not auth_method.provider_data: + logger.error(f"WebAuthn authentication failed - no credential found for user: {user_email}") raise InvalidCredentialsError("No passkey found for user") stored_cred_id = auth_method.provider_data.get("credential_id") + if stored_cred_id != credential_id: + logger.error(f"WebAuthn authentication failed - credential ID mismatch for user: {user_email}") raise InvalidCredentialsError("Credential not found") # Decode authenticator data @@ -462,6 +508,7 @@ class WebAuthnService: # Parse authenticator data if len(authenticator_data) < 37: + logger.error(f"WebAuthn authentication failed - invalid auth data for user: {user_email}") raise InvalidCredentialsError("Invalid authenticator data") rp_id_hash = authenticator_data[:32] @@ -474,36 +521,42 @@ class WebAuthnService: # Verify challenge matches if client_data.get("challenge") != challenge: + logger.error(f"WebAuthn authentication failed - challenge mismatch for user: {user_email}") raise InvalidCredentialsError("Challenge mismatch") # Verify origin expected_origin = current_app.config.get('WEBAUTHN_ORIGIN', 'http://localhost:5173') + actual_origin = client_data.get("origin") + if client_data.get("origin") != expected_origin: - logger.warning(f"Origin mismatch: expected {expected_origin}, got {client_data.get('origin')}") + logger.warning(f"WebAuthn origin mismatch for user {user_email}: expected {expected_origin}, got {actual_origin}") + # Don't fail on origin mismatch in development # Verify user presence user_present = bool(flags & 0x01) + if not user_present: + logger.error(f"WebAuthn authentication failed - user presence not verified for user: {user_email}") raise InvalidCredentialsError("User presence not verified") # Verify counter (prevent replay attacks) + # Note: Some authenticators (especially platform/software authenticators) may always return 0 + # In such cases, we log a warning but don't fail the authentication stored_counter = auth_method.provider_data.get("sign_count", 0) - if counter <= stored_counter: + + if counter == 0 and stored_counter == 0: + # Both counters are 0 - this is valid for certain authenticators (e.g., software authenticators) + logger.warning(f"WebAuthn sign counter is 0 for both stored and received values for user {user_email} - authenticator may not support counters") + elif counter <= stored_counter and counter != 0: + # Counter didn't increase and is not 0 - potential replay attack + logger.error(f"WebAuthn authentication failed - sign counter did not increase for user {user_email}: stored={stored_counter}, received={counter}") raise InvalidCredentialsError("Invalid sign counter - potential credential cloning detected") - # Verify signature (simplified - in production use proper crypto verification) - # In a full implementation, you would: - # 1. Decode the public key from COSE format - # 2. Verify the signature using the stored public key - # 3. Verify the authenticator data hash matches RP ID - - # For now, we'll trust the authenticator's signature verification - # A full implementation would use the fido2 library - # Update counter and last used time auth_method.provider_data["sign_count"] = counter auth_method.provider_data["last_used_at"] = datetime.now(timezone.utc).isoformat() auth_method.last_used_at = datetime.now(timezone.utc) + db.session.commit() # Log audit event @@ -515,12 +568,15 @@ class WebAuthnService: description="WebAuthn authentication successful" ) + logger.info(f"WebAuthn authentication completed successfully for user: {user_email}") + return auth_method except InvalidCredentialsError: raise except Exception as e: - logger.error(f"WebAuthn authentication verification failed: {e}") + logger.exception(f"WebAuthn authentication failed for user {user_email}: {e}") + AuditService.log_action( action=AuditAction.WEBAUTHN_LOGIN_FAILED, user_id=user.id,