Feat(Chore, Fix): Refractor, Half Baked Deletion + Admin Privilege

Refractor Codes into sub file/folders
Admin can remove users'/members mfa/2fa, unlink account from  oauth provider
Admin can  add/reset password
Different Email (OIDC + Manual)-Same Account; (Block Linking and authorize if available)
This commit is contained in:
2026-03-04 18:49:04 +05:45
parent ea1bacc794
commit 7cb522b590
63 changed files with 7896 additions and 10863 deletions
@@ -0,0 +1,168 @@
"""ExternalAuthService — public facade re-exporting the full API."""
import logging
from typing import Optional, Tuple
from gatehouse_app.models import AuthenticationMethod, User
from gatehouse_app.models.auth.authentication_method import (
ApplicationProviderConfig,
OrganizationProviderOverride,
OAuthState,
)
from gatehouse_app.utils.constants import AuthMethodType
from gatehouse_app.services.external_auth.models import (
ExternalAuthError,
ExternalProviderConfig,
ProviderConfigAdapter,
)
from gatehouse_app.services.external_auth import app_provider, org_override, linking
from gatehouse_app.services.external_auth._helpers import (
_compute_s256_challenge,
_build_authorization_url,
_exchange_code,
_get_user_info,
_encrypt_provider_data,
_decrypt_provider_data,
)
logger = logging.getLogger(__name__)
class ExternalAuthService:
"""Service for external authentication operations."""
# ── Provider config lookup ──────────────────────────────────────────────
@classmethod
def get_provider_config(
cls,
provider_type: AuthMethodType,
organization_id: Optional[str] = None,
) -> ProviderConfigAdapter:
provider_type_str = provider_type.value if isinstance(provider_type, AuthMethodType) else provider_type
app_config = ApplicationProviderConfig.query.filter_by(
provider_type=provider_type_str
).first()
if not app_config:
raise ExternalAuthError(
f"{provider_type_str.title()} OAuth is not configured for this application",
"PROVIDER_NOT_CONFIGURED",
400,
)
if not app_config.is_enabled:
raise ExternalAuthError(
f"{provider_type_str.title()} OAuth is currently disabled",
"PROVIDER_DISABLED",
400,
)
org_override_obj = None
if organization_id:
org_override_obj = OrganizationProviderOverride.query.filter_by(
organization_id=organization_id,
provider_type=provider_type_str,
).first()
if org_override_obj and not org_override_obj.is_enabled:
raise ExternalAuthError(
f"{provider_type_str.title()} OAuth is disabled for this organization",
"PROVIDER_DISABLED_FOR_ORG",
400,
)
return ProviderConfigAdapter(app_config, org_override_obj)
# ── App-wide provider config ────────────────────────────────────────────
@classmethod
def create_app_provider_config(cls, provider_type, client_id, client_secret, **kwargs):
return app_provider.create_app_provider_config(provider_type, client_id, client_secret, **kwargs)
@classmethod
def update_app_provider_config(cls, provider_type, **updates):
return app_provider.update_app_provider_config(provider_type, **updates)
@classmethod
def get_app_provider_config(cls, provider_type):
return app_provider.get_app_provider_config(provider_type)
@classmethod
def list_app_provider_configs(cls):
return app_provider.list_app_provider_configs()
@classmethod
def delete_app_provider_config(cls, provider_type):
return app_provider.delete_app_provider_config(provider_type)
# ── Org override management ─────────────────────────────────────────────
@classmethod
def create_org_provider_override(cls, organization_id, provider_type, **kwargs):
return org_override.create_org_provider_override(organization_id, provider_type, **kwargs)
@classmethod
def update_org_provider_override(cls, organization_id, provider_type, **updates):
return org_override.update_org_provider_override(organization_id, provider_type, **updates)
@classmethod
def get_org_provider_override(cls, organization_id, provider_type):
return org_override.get_org_provider_override(organization_id, provider_type)
@classmethod
def list_org_provider_overrides(cls, organization_id):
return org_override.list_org_provider_overrides(organization_id)
@classmethod
def delete_org_provider_override(cls, organization_id, provider_type):
return org_override.delete_org_provider_override(organization_id, provider_type)
# ── OAuth link / auth flows ─────────────────────────────────────────────
@classmethod
def initiate_link_flow(cls, user_id, provider_type, organization_id, redirect_uri=None):
return linking.initiate_link_flow(cls.get_provider_config, user_id, provider_type, organization_id, redirect_uri)
@classmethod
def complete_link_flow(cls, provider_type, authorization_code, state, redirect_uri):
return linking.complete_link_flow(cls.get_provider_config, provider_type, authorization_code, state, redirect_uri)
@classmethod
def authenticate_with_provider(cls, provider_type, organization_id, authorization_code, state, redirect_uri):
return linking.authenticate_with_provider(cls.get_provider_config, provider_type, organization_id, authorization_code, state, redirect_uri)
@classmethod
def unlink_provider(cls, user_id, provider_type, organization_id=None):
return linking.unlink_provider(user_id, provider_type, organization_id)
@classmethod
def get_linked_accounts(cls, user_id):
return linking.get_linked_accounts(user_id)
# ── Static helpers (kept as class methods for backward compatibility) ───
@staticmethod
def _compute_s256_challenge(verifier: str) -> str:
return _compute_s256_challenge(verifier)
@staticmethod
def _build_authorization_url(config, state) -> str:
return _build_authorization_url(config, state)
@staticmethod
def _exchange_code(config, code, redirect_uri, code_verifier=None) -> dict:
return _exchange_code(config, code, redirect_uri, code_verifier)
@staticmethod
def _get_user_info(config, access_token) -> dict:
return _get_user_info(config, access_token)
@staticmethod
def _encrypt_provider_data(tokens, user_info) -> dict:
return _encrypt_provider_data(tokens, user_info)
@staticmethod
def _decrypt_provider_data(provider_data) -> dict:
return _decrypt_provider_data(provider_data)
@@ -0,0 +1,183 @@
"""Static helper methods for OAuth flows."""
import logging
from typing import Optional
logger = logging.getLogger(__name__)
def _compute_s256_challenge(verifier: str) -> str:
import hashlib
import base64
digest = hashlib.sha256(verifier.encode()).digest()
return base64.urlsafe_b64encode(digest).decode().rstrip("=")
def _build_authorization_url(config, state) -> str:
from urllib.parse import urlencode
provider = (config.provider_type or "").lower()
params = {
"client_id": config.client_id,
"redirect_uri": state.redirect_uri,
"response_type": "code",
"scope": " ".join(config.scopes or ["openid", "profile", "email"]),
"state": state.state,
}
if provider == "google":
params["access_type"] = (
config.settings.get("access_type", "offline") if config.settings else "offline"
)
params["prompt"] = (
config.settings.get("prompt", "consent") if config.settings else "consent"
)
elif provider == "microsoft":
params["prompt"] = (
config.settings.get("prompt", "select_account") if config.settings else "select_account"
)
else:
if config.settings:
if "prompt" in config.settings:
params["prompt"] = config.settings["prompt"]
if "access_type" in config.settings:
params["access_type"] = config.settings["access_type"]
if state.nonce:
params["nonce"] = state.nonce
if state.code_challenge:
params["code_challenge"] = state.code_challenge
params["code_challenge_method"] = "S256"
full_url = f"{config.auth_url}?{urlencode(params)}"
logger.info(
f"[PKCE DEBUG] Building authorization URL:\n"
f" provider_type: {config.provider_type}\n"
f" state.code_challenge: {state.code_challenge[:20] if state.code_challenge else 'None'}...\n"
f" params has code_challenge: {'code_challenge' in params}\n"
f" Full URL: {full_url}"
)
return full_url
def _exchange_code(config, code: str, redirect_uri: str, code_verifier: str = None) -> dict:
import requests
data = {
"client_id": config.client_id,
"client_secret": config.get_client_secret(),
"code": code,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
}
if code_verifier:
data["code_verifier"] = code_verifier
logger.debug(
f"Token exchange request: url={config.token_url}, "
f"client_id={config.client_id}, redirect_uri={redirect_uri}, "
f"has_code_verifier={bool(code_verifier)}"
)
response = requests.post(config.token_url, data=data)
if response.status_code != 200:
logger.error(
f"Token exchange failed: status={response.status_code}, "
f"response={response.text}"
)
response.raise_for_status()
return response.json()
def _get_user_info(config, access_token: str) -> dict:
import re
import requests
provider = (config.provider_type or "").lower()
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(config.userinfo_url, headers=headers)
response.raise_for_status()
data = response.json()
if provider == "microsoft":
email_verified = data.get("email_verified", True)
else:
email_verified = data.get("email_verified", False)
sub = data.get("sub")
raw_email = data.get("email")
if not raw_email and sub:
if re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", sub):
raw_email = sub
email_verified = True
else:
raw_email = f"{sub}@{provider or 'oauth'}.local"
email_verified = False
raw_name = data.get("name") or data.get("display_name")
if not raw_name and raw_email:
raw_name = raw_email.split("@")[0]
return {
"provider_user_id": sub,
"email": raw_email,
"email_verified": email_verified,
"name": raw_name,
"first_name": data.get("given_name"),
"last_name": data.get("family_name"),
"picture": data.get("picture"),
"raw_data": data,
}
def _encrypt_provider_data(tokens: dict, user_info: dict) -> dict:
from gatehouse_app.utils.encryption import encrypt
return {
"access_token": encrypt(tokens.get("access_token")) if tokens.get("access_token") else None,
"token_type": tokens.get("token_type", "Bearer"),
"expires_in": tokens.get("expires_in"),
"refresh_token": encrypt(tokens.get("refresh_token")) if tokens.get("refresh_token") else None,
"scope": tokens.get("scope", []),
"id_token": encrypt(tokens.get("id_token")) if tokens.get("id_token") else None,
"email": user_info.get("email"),
"name": user_info.get("name"),
"picture": user_info.get("picture"),
"raw_data": user_info.get("raw_data", {}),
}
def _decrypt_provider_data(provider_data: dict) -> dict:
from gatehouse_app.utils.encryption import decrypt
if not provider_data:
return {}
result = {
"token_type": provider_data.get("token_type", "Bearer"),
"expires_in": provider_data.get("expires_in"),
"scope": provider_data.get("scope", []),
"email": provider_data.get("email"),
"name": provider_data.get("name"),
"picture": provider_data.get("picture"),
"raw_data": provider_data.get("raw_data", {}),
}
for field in ("access_token", "refresh_token", "id_token"):
value = provider_data.get(field)
if value:
try:
result[field] = decrypt(value)
except Exception:
result[field] = value
else:
result[field] = None
return result
@@ -0,0 +1,125 @@
"""Application-wide provider configuration management."""
import logging
from gatehouse_app.models.auth.authentication_method import ApplicationProviderConfig
from gatehouse_app.services.external_auth.models import ExternalAuthError
logger = logging.getLogger(__name__)
def create_app_provider_config(
provider_type: str,
client_id: str,
client_secret: str,
**kwargs,
) -> ApplicationProviderConfig:
existing = ApplicationProviderConfig.query.filter_by(
provider_type=provider_type
).first()
if existing:
raise ExternalAuthError(
f"Provider {provider_type} already exists",
"PROVIDER_EXISTS",
400,
)
additional_config = {}
for key in ['auth_url', 'token_url', 'userinfo_url', 'jwks_url', 'scopes']:
if key in kwargs:
additional_config[key] = kwargs.pop(key)
if 'settings' in kwargs:
additional_config.update(kwargs.pop('settings'))
config = ApplicationProviderConfig(
provider_type=provider_type,
client_id=client_id,
is_enabled=kwargs.get('is_enabled', True),
default_redirect_url=kwargs.get('default_redirect_url'),
additional_config=additional_config,
)
config.set_client_secret(client_secret)
config.save()
logger.info(f"Created application provider config for {provider_type}")
return config
def update_app_provider_config(
provider_type: str,
**updates,
) -> ApplicationProviderConfig:
config = ApplicationProviderConfig.query.filter_by(
provider_type=provider_type
).first()
if not config:
raise ExternalAuthError(
f"Provider {provider_type} not found",
"PROVIDER_NOT_FOUND",
404,
)
if 'client_id' in updates:
config.client_id = updates['client_id']
if 'client_secret' in updates:
config.set_client_secret(updates['client_secret'])
if 'is_enabled' in updates:
config.is_enabled = updates['is_enabled']
if 'default_redirect_url' in updates:
config.default_redirect_url = updates['default_redirect_url']
if config.additional_config is None:
config.additional_config = {}
for key in ['auth_url', 'token_url', 'userinfo_url', 'jwks_url', 'scopes']:
if key in updates:
config.additional_config[key] = updates[key]
if 'settings' in updates:
config.additional_config.update(updates['settings'])
config.save()
logger.info(f"Updated application provider config for {provider_type}")
return config
def get_app_provider_config(provider_type: str) -> ApplicationProviderConfig:
config = ApplicationProviderConfig.query.filter_by(
provider_type=provider_type
).first()
if not config:
raise ExternalAuthError(
f"Provider {provider_type} not found",
"PROVIDER_NOT_FOUND",
404,
)
return config
def list_app_provider_configs() -> list:
configs = ApplicationProviderConfig.query.all()
return [config.to_dict() for config in configs]
def delete_app_provider_config(provider_type: str) -> bool:
config = ApplicationProviderConfig.query.filter_by(
provider_type=provider_type
).first()
if not config:
raise ExternalAuthError(
f"Provider {provider_type} not found",
"PROVIDER_NOT_FOUND",
404,
)
config.delete()
logger.info(f"Deleted application provider config for {provider_type}")
return True
@@ -0,0 +1,339 @@
"""Account linking, authentication, and unlinking flows."""
import logging
import secrets
from datetime import datetime
from typing import Optional, Tuple
from gatehouse_app.models import User, AuthenticationMethod
from gatehouse_app.models.auth.authentication_method import OAuthState
from gatehouse_app.utils.constants import AuthMethodType
from gatehouse_app.services.audit_service import AuditService
from gatehouse_app.services.external_auth.models import ExternalAuthError
logger = logging.getLogger(__name__)
def initiate_link_flow(
get_provider_config,
user_id: str,
provider_type: AuthMethodType,
organization_id: str,
redirect_uri: str = None,
) -> Tuple[str, str]:
from gatehouse_app.services.external_auth._helpers import (
_compute_s256_challenge,
_build_authorization_url,
)
provider_type_str = provider_type.value if isinstance(provider_type, AuthMethodType) else provider_type
config = get_provider_config(provider_type, organization_id)
if redirect_uri and not config.is_redirect_uri_allowed(redirect_uri):
raise ExternalAuthError("Invalid redirect URI", "INVALID_REDIRECT_URI", 400)
code_verifier = None
code_challenge = None
if provider_type_str not in ('google', 'microsoft'):
code_verifier = secrets.token_urlsafe(32)
code_challenge = _compute_s256_challenge(code_verifier)
state = OAuthState.create_state(
flow_type="link",
provider_type=provider_type,
user_id=user_id,
organization_id=organization_id,
redirect_uri=redirect_uri or (config.redirect_uris[0] if config.redirect_uris else None),
code_verifier=code_verifier,
code_challenge=code_challenge,
lifetime_seconds=600,
)
auth_url = _build_authorization_url(config=config, state=state)
AuditService.log_external_auth_link_initiated(
user_id=user_id,
organization_id=organization_id,
provider_type=provider_type_str,
state_id=state.id,
)
return auth_url, state.state
def complete_link_flow(
get_provider_config,
provider_type: AuthMethodType,
authorization_code: str,
state: str,
redirect_uri: str,
) -> AuthenticationMethod:
from gatehouse_app.services.external_auth._helpers import (
_exchange_code,
_get_user_info,
_encrypt_provider_data,
)
provider_type_str = provider_type.value if isinstance(provider_type, AuthMethodType) else provider_type
state_record = OAuthState.query.filter_by(state=state).first()
if not state_record or not state_record.is_valid():
AuditService.log_external_auth_link_failed(
user_id=None,
organization_id=None,
provider_type=provider_type_str,
error_message="Invalid or expired OAuth state",
failure_reason="invalid_state",
)
raise ExternalAuthError("Invalid or expired OAuth state", "INVALID_STATE", 400)
if state_record.flow_type != "link":
AuditService.log_external_auth_link_failed(
user_id=state_record.user_id,
organization_id=state_record.organization_id,
provider_type=provider_type_str,
error_message="Invalid flow type for this operation",
failure_reason="invalid_flow_type",
)
raise ExternalAuthError("Invalid flow type for this operation", "INVALID_FLOW_TYPE", 400)
if state_record.provider_type != provider_type_str:
AuditService.log_external_auth_link_failed(
user_id=state_record.user_id,
organization_id=state_record.organization_id,
provider_type=provider_type_str,
error_message="Provider mismatch",
failure_reason="provider_mismatch",
)
raise ExternalAuthError("Provider mismatch", "PROVIDER_MISMATCH", 400)
config = get_provider_config(provider_type, state_record.organization_id)
tokens = _exchange_code(
config=config,
code=authorization_code,
redirect_uri=redirect_uri,
code_verifier=state_record.code_verifier,
)
user_info = _get_user_info(config=config, access_token=tokens["access_token"])
user = User.query.get(state_record.user_id)
if not user:
AuditService.log_external_auth_link_failed(
user_id=None,
organization_id=state_record.organization_id,
provider_type=provider_type_str,
error_message="User not found",
failure_reason="user_not_found",
)
raise ExternalAuthError("User not found", "USER_NOT_FOUND", 400)
conflicting = AuthenticationMethod.query.filter(
AuthenticationMethod.method_type == provider_type,
AuthenticationMethod.provider_user_id == user_info["provider_user_id"],
AuthenticationMethod.user_id != user.id,
AuthenticationMethod.deleted_at == None,
).first()
if conflicting:
raise ExternalAuthError(
f"This {provider_type_str} account is already linked to a different Gatehouse user.",
"PROVIDER_ALREADY_LINKED",
409,
)
auth_method = AuthenticationMethod.query.filter_by(
user_id=user.id,
method_type=provider_type,
provider_user_id=user_info["provider_user_id"],
).first()
if auth_method:
# Restore the row if it was previously soft-deleted (re-linking after admin unlink)
auth_method.deleted_at = None
auth_method.provider_data = _encrypt_provider_data(tokens, user_info)
auth_method.verified = user_info.get("email_verified", False)
auth_method.last_used_at = datetime.utcnow()
auth_method.save()
else:
auth_method = AuthenticationMethod(
user_id=user.id,
method_type=provider_type,
provider_user_id=user_info["provider_user_id"],
provider_data=_encrypt_provider_data(tokens, user_info),
verified=user_info.get("email_verified", False),
is_primary=False,
last_used_at=datetime.utcnow(),
)
auth_method.save()
state_record.mark_used()
AuditService.log_external_auth_link_completed(
user_id=user.id,
organization_id=state_record.organization_id,
provider_type=provider_type_str,
provider_user_id=user_info["provider_user_id"],
auth_method_id=auth_method.id,
)
return auth_method
def authenticate_with_provider(
get_provider_config,
provider_type: AuthMethodType,
organization_id: str,
authorization_code: str,
state: str,
redirect_uri: str,
) -> Tuple[User, dict]:
from gatehouse_app.services.external_auth._helpers import (
_exchange_code,
_get_user_info,
_encrypt_provider_data,
)
provider_type_str = provider_type.value if isinstance(provider_type, AuthMethodType) else provider_type
state_record = OAuthState.query.filter_by(state=state).first()
if not state_record or not state_record.is_valid():
AuditService.log_external_auth_login_failed(
organization_id=organization_id,
provider_type=provider_type_str,
failure_reason="invalid_state",
error_message="Invalid or expired OAuth state",
)
raise ExternalAuthError("Invalid or expired OAuth state", "INVALID_STATE", 400)
config = get_provider_config(provider_type, organization_id)
tokens = _exchange_code(
config=config,
code=authorization_code,
redirect_uri=redirect_uri,
code_verifier=state_record.code_verifier,
)
user_info = _get_user_info(config=config, access_token=tokens["access_token"])
auth_method = AuthenticationMethod.query.filter_by(
method_type=provider_type,
provider_user_id=user_info["provider_user_id"],
).first()
if not auth_method:
existing_user = User.query.filter_by(email=user_info["email"]).first()
if existing_user:
AuditService.log_external_auth_login_failed(
organization_id=organization_id,
provider_type=provider_type_str,
provider_user_id=user_info["provider_user_id"],
email=user_info["email"],
failure_reason="email_exists",
error_message=f"An account with email {user_info['email']} already exists",
)
raise ExternalAuthError(
f"An account with email {user_info['email']} already exists. "
"Please log in with your password and link your Google account from settings.",
"EMAIL_EXISTS",
400,
)
AuditService.log_external_auth_login_failed(
organization_id=organization_id,
provider_type=provider_type_str,
provider_user_id=user_info["provider_user_id"],
email=user_info["email"],
failure_reason="account_not_found",
error_message="No Gatehouse account matches this external account",
)
raise ExternalAuthError(
"No Gatehouse account matches this external account. Please register first.",
"ACCOUNT_NOT_FOUND",
400,
)
user = auth_method.user
auth_method.provider_data = _encrypt_provider_data(tokens, user_info)
auth_method.last_used_at = datetime.utcnow()
auth_method.save()
state_record.mark_used()
from gatehouse_app.services.auth_service import AuthService
session = AuthService.create_session(user=user, organization_id=organization_id)
AuditService.log_external_auth_login(
user_id=user.id,
organization_id=organization_id,
provider_type=provider_type_str,
provider_user_id=user_info["provider_user_id"],
auth_method_id=auth_method.id,
session_id=session.id,
)
return user, session.to_dict()
def unlink_provider(
user_id: str,
provider_type: AuthMethodType,
organization_id: str = None,
) -> bool:
provider_type_str = provider_type.value if isinstance(provider_type, AuthMethodType) else provider_type
auth_method = AuthenticationMethod.query.filter_by(
user_id=user_id,
method_type=provider_type,
).first()
if not auth_method:
raise ExternalAuthError("Provider not linked", "PROVIDER_NOT_LINKED", 400)
other_methods = AuthenticationMethod.query.filter_by(user_id=user_id).count()
if other_methods <= 1:
raise ExternalAuthError(
"Cannot unlink the last authentication method",
"CANNOT_UNLINK_LAST",
400,
)
provider_user_id = auth_method.provider_user_id
auth_method_id = auth_method.id
auth_method.delete()
AuditService.log_external_auth_unlink(
user_id=user_id,
organization_id=organization_id,
provider_type=provider_type_str,
provider_user_id=provider_user_id,
auth_method_id=auth_method_id,
)
return True
def get_linked_accounts(user_id: str) -> list:
from gatehouse_app.utils.constants import AuthMethodType as AMT
methods = AuthenticationMethod.query.filter_by(user_id=user_id, deleted_at=None).all()
external_providers = [AMT.GOOGLE, AMT.GITHUB, AMT.MICROSOFT]
return [
{
"id": m.id,
"provider_type": m.method_type.value if hasattr(m.method_type, 'value') else str(m.method_type),
"provider_user_id": m.provider_user_id,
"email": m.provider_data.get("email") if m.provider_data else None,
"name": m.provider_data.get("name") if m.provider_data else None,
"picture": m.provider_data.get("picture") if m.provider_data else None,
"verified": m.verified,
"linked_at": m.created_at.isoformat() if m.created_at else None,
"last_used_at": m.last_used_at.isoformat() if m.last_used_at else None,
}
for m in methods
if m.method_type in external_providers
or str(m.method_type) in [p.value for p in external_providers]
]
@@ -0,0 +1,173 @@
"""External auth models and adapter classes."""
from typing import Optional
from gatehouse_app.extensions import db
from gatehouse_app.models.base import BaseModel
from gatehouse_app.models.auth.authentication_method import (
ApplicationProviderConfig,
OrganizationProviderOverride,
)
class ExternalAuthError(Exception):
"""Base exception for external auth errors."""
def __init__(self, message: str, error_type: str, status_code: int = 400):
self.message = message
self.error_type = error_type
self.status_code = status_code
super().__init__(message)
class ExternalProviderConfig(BaseModel):
"""OAuth provider configuration per organization.
DEPRECATED: This model is maintained for backward compatibility only.
Use ApplicationProviderConfig and OrganizationProviderOverride instead.
"""
__tablename__ = "external_provider_configs"
organization_id = db.Column(
db.String(36), db.ForeignKey("organizations.id"), nullable=False, index=True
)
provider_type = db.Column(db.String(50), nullable=False, index=True)
client_id = db.Column(db.String(255), nullable=False)
client_secret_encrypted = db.Column(db.String(512), nullable=True)
auth_url = db.Column(db.String(2048), nullable=False)
token_url = db.Column(db.String(2048), nullable=False)
userinfo_url = db.Column(db.String(2048), nullable=True)
jwks_url = db.Column(db.String(2048), nullable=True)
scopes = db.Column(db.JSON, nullable=False, default=list)
redirect_uris = db.Column(db.JSON, nullable=False, default=list)
settings = db.Column(db.JSON, nullable=True)
is_active = db.Column(db.Boolean, default=True, nullable=False)
organization = db.relationship(
"Organization", back_populates="external_provider_configs"
)
__table_args__ = (
db.Index("idx_provider_config_org", "organization_id", "provider_type"),
db.UniqueConstraint(
"organization_id",
"provider_type",
name="uix_org_provider_type",
),
)
def get_client_secret(self) -> str:
from gatehouse_app.utils.encryption import decrypt
if self.client_secret_encrypted:
return decrypt(self.client_secret_encrypted)
return None
def set_client_secret(self, secret: str):
from gatehouse_app.utils.encryption import encrypt
self.client_secret_encrypted = encrypt(secret)
def is_redirect_uri_allowed(self, uri: str) -> bool:
return uri in (self.redirect_uris or [])
def to_dict(self, include_secrets: bool = False) -> dict:
data = {
"id": self.id,
"organization_id": self.organization_id,
"provider_type": self.provider_type,
"client_id": self.client_id,
"auth_url": self.auth_url,
"token_url": self.token_url,
"userinfo_url": self.userinfo_url,
"scopes": self.scopes,
"redirect_uris": self.redirect_uris,
"is_active": self.is_active,
"settings": self.settings,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
if include_secrets and self.client_secret_encrypted:
data["client_secret"] = self.get_client_secret()
return data
class ProviderConfigAdapter:
"""Unified interface for provider configuration.
Merges application-level config with optional organization overrides.
"""
def __init__(
self,
app_config: ApplicationProviderConfig,
org_override: Optional[OrganizationProviderOverride] = None,
):
self.app_config = app_config
self.org_override = org_override
self.provider_type = app_config.provider_type
@property
def client_id(self) -> str:
if self.org_override and self.org_override.client_id:
return self.org_override.client_id
return self.app_config.client_id
def get_client_secret(self) -> str:
if self.org_override and self.org_override.client_secret_encrypted:
return self.org_override.get_client_secret()
return self.app_config.get_client_secret()
@property
def auth_url(self) -> str:
return self._get_provider_endpoint('auth_url')
@property
def token_url(self) -> str:
return self._get_provider_endpoint('token_url')
@property
def userinfo_url(self) -> str:
return self._get_provider_endpoint('userinfo_url')
@property
def jwks_url(self) -> str:
return self._get_provider_endpoint('jwks_url')
@property
def scopes(self) -> list:
base_scopes = self.app_config.additional_config.get('scopes', []) if self.app_config.additional_config else []
if self.org_override and self.org_override.additional_config:
override_scopes = self.org_override.additional_config.get('scopes')
if override_scopes is not None:
return override_scopes
return base_scopes or ['openid', 'profile', 'email']
@property
def redirect_uris(self) -> list:
if self.org_override and self.org_override.redirect_url_override:
return [self.org_override.redirect_url_override]
if self.app_config.default_redirect_url:
return [self.app_config.default_redirect_url]
return []
@property
def settings(self) -> dict:
settings = {}
if self.app_config.additional_config:
settings.update(self.app_config.additional_config)
if self.org_override and self.org_override.additional_config:
settings.update(self.org_override.additional_config)
return settings
@property
def is_active(self) -> bool:
app_enabled = self.app_config.is_enabled
org_enabled = True if not self.org_override else self.org_override.is_enabled
return app_enabled and org_enabled
def is_redirect_uri_allowed(self, uri: str) -> bool:
return uri in self.redirect_uris
def _get_provider_endpoint(self, endpoint_name: str) -> Optional[str]:
if not self.app_config.additional_config:
return None
return self.app_config.additional_config.get(endpoint_name)
@@ -0,0 +1,147 @@
"""Organization-specific provider override management."""
import logging
from gatehouse_app.models.auth.authentication_method import (
ApplicationProviderConfig,
OrganizationProviderOverride,
)
from gatehouse_app.services.external_auth.models import ExternalAuthError
logger = logging.getLogger(__name__)
def create_org_provider_override(
organization_id: str,
provider_type: str,
**kwargs,
) -> OrganizationProviderOverride:
app_config = ApplicationProviderConfig.query.filter_by(
provider_type=provider_type
).first()
if not app_config:
raise ExternalAuthError(
f"Application provider {provider_type} must be configured first",
"PROVIDER_NOT_CONFIGURED",
400,
)
existing = OrganizationProviderOverride.query.filter_by(
organization_id=organization_id,
provider_type=provider_type,
).first()
if existing:
raise ExternalAuthError(
f"Override for {provider_type} already exists for this organization",
"OVERRIDE_EXISTS",
400,
)
additional_config = {}
if 'settings' in kwargs:
additional_config.update(kwargs.pop('settings'))
if 'scopes' in kwargs:
additional_config['scopes'] = kwargs.pop('scopes')
override = OrganizationProviderOverride(
organization_id=organization_id,
provider_type=provider_type,
client_id=kwargs.get('client_id'),
is_enabled=kwargs.get('is_enabled', True),
redirect_url_override=kwargs.get('redirect_url_override'),
additional_config=additional_config if additional_config else None,
)
if 'client_secret' in kwargs:
override.set_client_secret(kwargs['client_secret'])
override.save()
logger.info(f"Created org override for {provider_type} in org {organization_id}")
return override
def update_org_provider_override(
organization_id: str,
provider_type: str,
**updates,
) -> OrganizationProviderOverride:
override = OrganizationProviderOverride.query.filter_by(
organization_id=organization_id,
provider_type=provider_type,
).first()
if not override:
raise ExternalAuthError(
f"Override for {provider_type} not found for this organization",
"OVERRIDE_NOT_FOUND",
404,
)
if 'client_id' in updates:
override.client_id = updates['client_id']
if 'client_secret' in updates:
override.set_client_secret(updates['client_secret'])
if 'is_enabled' in updates:
override.is_enabled = updates['is_enabled']
if 'redirect_url_override' in updates:
override.redirect_url_override = updates['redirect_url_override']
if 'settings' in updates or 'scopes' in updates:
if override.additional_config is None:
override.additional_config = {}
if 'settings' in updates:
override.additional_config.update(updates['settings'])
if 'scopes' in updates:
override.additional_config['scopes'] = updates['scopes']
override.save()
logger.info(f"Updated org override for {provider_type} in org {organization_id}")
return override
def get_org_provider_override(
organization_id: str,
provider_type: str,
) -> OrganizationProviderOverride:
override = OrganizationProviderOverride.query.filter_by(
organization_id=organization_id,
provider_type=provider_type,
).first()
if not override:
raise ExternalAuthError(
f"Override for {provider_type} not found for this organization",
"OVERRIDE_NOT_FOUND",
404,
)
return override
def list_org_provider_overrides(organization_id: str) -> list:
overrides = OrganizationProviderOverride.query.filter_by(
organization_id=organization_id
).all()
return [override.to_dict() for override in overrides]
def delete_org_provider_override(organization_id: str, provider_type: str) -> bool:
override = OrganizationProviderOverride.query.filter_by(
organization_id=organization_id,
provider_type=provider_type,
).first()
if not override:
raise ExternalAuthError(
f"Override for {provider_type} not found for this organization",
"OVERRIDE_NOT_FOUND",
404,
)
override.delete()
logger.info(f"Deleted org override for {provider_type} in org {organization_id}")
return True