7cb522b590
Refractor Codes into sub file/folders Admin can remove users'/members mfa/2fa, unlink account from oauth provider Admin can add/reset password Different Email (OIDC + Manual)-Same Account; (Block Linking and authorize if available)
197 lines
6.3 KiB
Python
197 lines
6.3 KiB
Python
"""OIDC authorization code generation and validation."""
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Tuple
|
|
|
|
from flask import current_app
|
|
|
|
from gatehouse_app.models import User, OIDCAuthCode
|
|
from gatehouse_app.exceptions.validation_exceptions import ValidationError, NotFoundError
|
|
from gatehouse_app.services.oidc_audit_service import OIDCAuditService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _hash_value(value: str) -> str:
|
|
import hashlib
|
|
return hashlib.sha256(value.encode()).hexdigest()
|
|
|
|
|
|
def _compute_code_challenge(verifier: str, method: str = "S256") -> str:
|
|
import hashlib
|
|
import base64
|
|
if method == "S256":
|
|
digest = hashlib.sha256(verifier.encode()).digest()
|
|
return base64.urlsafe_b64encode(digest).decode().rstrip("=")
|
|
return verifier
|
|
|
|
|
|
def generate_authorization_code(
|
|
client_id: str,
|
|
user_id: str,
|
|
redirect_uri: str,
|
|
scope: list,
|
|
state: str,
|
|
nonce: str,
|
|
code_challenge: str = None,
|
|
code_challenge_method: str = None,
|
|
ip_address: str = None,
|
|
user_agent: str = None,
|
|
) -> str:
|
|
import secrets
|
|
|
|
from gatehouse_app.models import OIDCClient
|
|
|
|
logger.debug("[OIDC SERVICE] generate_authorization_code called")
|
|
logger.debug("[OIDC SERVICE] client_id=%s, user_id=%s", client_id, user_id)
|
|
|
|
client = OIDCClient.query.filter_by(client_id=client_id).first()
|
|
|
|
if current_app.config.get('ENV') == 'development':
|
|
logger.debug(f"[OIDC] Generate auth code - Client validation: client_id={client_id}, exists={client is not None}")
|
|
|
|
if not client:
|
|
raise NotFoundError("Client not found")
|
|
|
|
if not client.is_active:
|
|
raise ValidationError("Client is not active")
|
|
|
|
if not client.is_redirect_uri_allowed(redirect_uri):
|
|
raise ValidationError("Invalid redirect_uri")
|
|
|
|
allowed_scopes = client.scopes or []
|
|
valid_scopes = [s for s in scope if s in allowed_scopes]
|
|
|
|
if not valid_scopes:
|
|
raise ValidationError("Invalid scopes")
|
|
|
|
code = secrets.token_urlsafe(32)
|
|
code_hash = _hash_value(code)
|
|
|
|
auth_code = OIDCAuthCode.create_code(
|
|
client_id=client.id,
|
|
user_id=user_id,
|
|
code_hash=code_hash,
|
|
redirect_uri=redirect_uri,
|
|
scope=valid_scopes,
|
|
nonce=nonce,
|
|
code_verifier=code_challenge,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
lifetime_seconds=600,
|
|
)
|
|
logger.debug("[OIDC SERVICE] Auth code created, expires_at=%s", auth_code.expires_at.isoformat())
|
|
|
|
OIDCAuditService.log_authorization_event(
|
|
client_id=client.id,
|
|
user_id=user_id,
|
|
success=True,
|
|
redirect_uri=redirect_uri,
|
|
scope=valid_scopes,
|
|
)
|
|
|
|
return code
|
|
|
|
|
|
def validate_authorization_code(
|
|
code: str,
|
|
client_id: str,
|
|
redirect_uri: str,
|
|
code_verifier: str = None,
|
|
ip_address: str = None,
|
|
user_agent: str = None,
|
|
) -> Tuple[Dict, User]:
|
|
from gatehouse_app.models import OIDCClient
|
|
from gatehouse_app.exceptions.auth_exceptions import InvalidTokenError
|
|
|
|
logger.debug("[OIDC SERVICE] validate_authorization_code called, client_id=%s", client_id)
|
|
|
|
client = OIDCClient.query.filter_by(client_id=client_id).first()
|
|
if not client:
|
|
logger.error(f"[OIDC] Validate auth code - Client not found: client_id={client_id}")
|
|
from gatehouse_app.services.oidc import InvalidGrantError
|
|
raise InvalidGrantError("Invalid client")
|
|
|
|
code_hash = _hash_value(code)
|
|
auth_code = OIDCAuthCode.query.filter_by(
|
|
code_hash=code_hash,
|
|
client_id=client.id,
|
|
deleted_at=None,
|
|
).first()
|
|
|
|
if not auth_code:
|
|
OIDCAuditService.log_authorization_event(
|
|
client_id=client.id,
|
|
success=False,
|
|
error_code="invalid_grant",
|
|
error_description="Invalid or expired authorization code",
|
|
)
|
|
from gatehouse_app.services.oidc import InvalidGrantError
|
|
raise InvalidGrantError("Invalid or expired authorization code")
|
|
|
|
if auth_code.is_used:
|
|
OIDCAuditService.log_authorization_event(
|
|
client_id=client.id,
|
|
user_id=auth_code.user_id,
|
|
success=False,
|
|
error_code="invalid_grant",
|
|
error_description="Authorization code already used",
|
|
)
|
|
from gatehouse_app.services.oidc import InvalidGrantError
|
|
raise InvalidGrantError("Authorization code already used")
|
|
|
|
expires_at = auth_code.expires_at
|
|
if expires_at.tzinfo is None:
|
|
expires_at = expires_at.replace(tzinfo=timezone.utc)
|
|
logger.debug(
|
|
"[OIDC SERVICE] Time until expiration (seconds): %s",
|
|
(expires_at - datetime.now(timezone.utc)).total_seconds(),
|
|
)
|
|
|
|
if auth_code.is_expired():
|
|
OIDCAuditService.log_authorization_event(
|
|
client_id=client.id,
|
|
user_id=auth_code.user_id,
|
|
success=False,
|
|
error_code="invalid_grant",
|
|
error_description="Authorization code expired",
|
|
)
|
|
from gatehouse_app.services.oidc import InvalidGrantError
|
|
raise InvalidGrantError("Authorization code expired")
|
|
|
|
if auth_code.redirect_uri != redirect_uri:
|
|
from gatehouse_app.services.oidc import InvalidGrantError
|
|
raise InvalidGrantError("Invalid redirect_uri")
|
|
|
|
if client.require_pkce and auth_code.code_verifier:
|
|
if not code_verifier:
|
|
raise ValidationError("code_verifier is required")
|
|
expected_challenge = _compute_code_challenge(code_verifier, "S256")
|
|
if expected_challenge != auth_code.code_verifier:
|
|
OIDCAuditService.log_authorization_event(
|
|
client_id=client.id,
|
|
user_id=auth_code.user_id,
|
|
success=False,
|
|
error_code="invalid_grant",
|
|
error_description="Invalid code_verifier",
|
|
)
|
|
from gatehouse_app.services.oidc import InvalidGrantError
|
|
raise InvalidGrantError("Invalid code_verifier")
|
|
|
|
auth_code.mark_as_used()
|
|
|
|
user = User.query.get(auth_code.user_id)
|
|
if not user:
|
|
from gatehouse_app.services.oidc import InvalidGrantError
|
|
raise InvalidGrantError("User not found")
|
|
|
|
claims = {
|
|
"user_id": auth_code.user_id,
|
|
"client_id": client_id,
|
|
"redirect_uri": redirect_uri,
|
|
"scope": auth_code.scope,
|
|
"nonce": auth_code.nonce,
|
|
}
|
|
|
|
return claims, user
|