Files
gatehouse-api/gatehouse_app/services/notification_service.py
T

420 lines
15 KiB
Python
Raw Normal View History

2026-01-16 17:31:20 +10:30
"""Notification Service for MFA compliance notifications.
This service handles sending MFA-related notifications to users, including:
- Deadline reminder emails
- Suspension notifications
- Compliance status updates
The service is designed to work with or without email infrastructure:
- If email is configured, it sends actual emails
- If email is not available, it logs notifications for debugging/auditing
Usage:
from gatehouse_app.services.notification_service import NotificationService
NotificationService.send_mfa_deadline_reminder(user, compliance, org_policy)
"""
from datetime import datetime, timezone
from typing import Optional, Dict, Any
import logging
import json
2026-03-29 23:14:20 +05:45
import threading
2026-01-16 17:31:20 +10:30
from gatehouse_app.extensions import db
from gatehouse_app.models.security.mfa_policy_compliance import MfaPolicyCompliance
from gatehouse_app.models.security.organization_security_policy import OrganizationSecurityPolicy
from gatehouse_app.models.user.user import User
2026-01-16 17:31:20 +10:30
from gatehouse_app.services.audit_service import AuditService
from gatehouse_app.services.email_provider import EmailMessage, EmailProviderFactory
from gatehouse_app.services.email_templates import (
build_mfa_deadline_reminder_html,
build_mfa_suspension_html,
)
2026-01-16 17:31:20 +10:30
from gatehouse_app.utils.constants import AuditAction
logger = logging.getLogger(__name__)
class NotificationService:
"""Service for sending MFA compliance notifications."""
# Configuration keys for email settings
EMAIL_ENABLED_KEY = "EMAIL_ENABLED"
SMTP_HOST_KEY = "SMTP_HOST"
SMTP_PORT_KEY = "SMTP_PORT"
SMTP_USERNAME_KEY = "SMTP_USERNAME"
SMTP_PASSWORD_KEY = "SMTP_PASSWORD"
SMTP_USE_TLS_KEY = "SMTP_USE_TLS"
2026-01-16 17:31:20 +10:30
FROM_ADDRESS_KEY = "FROM_ADDRESS"
@staticmethod
def send_mfa_deadline_reminder(
user: User,
compliance: MfaPolicyCompliance,
org_policy: OrganizationSecurityPolicy,
) -> bool:
"""Send MFA deadline reminder notification to user.
Sends a reminder email to users who are approaching their MFA
compliance deadline. The reminder includes:
- Days remaining until deadline
- Required MFA methods
- Link to MFA enrollment
Args:
user: User to notify
compliance: User's compliance record
org_policy: Organization's MFA policy
Returns:
True if notification was sent successfully, False otherwise
"""
try:
# Calculate days until deadline
deadline = compliance.deadline_at
if deadline.tzinfo is None:
deadline = deadline.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
days_until_deadline = (deadline - now).days
# Get organization name
org_name = compliance.organization_id
from gatehouse_app.models.organization.organization import Organization
org = Organization.query.get(compliance.organization_id)
if org:
org_name = org.name
# Build required MFA methods string
from gatehouse_app.utils.constants import MfaPolicyMode
mfa_methods = "Multi-factor authentication"
mode = org_policy.mfa_policy_mode
if mode == MfaPolicyMode.REQUIRE_TOTP:
mfa_methods = "Authenticator app (TOTP)"
elif mode == MfaPolicyMode.REQUIRE_WEBAUTHN:
mfa_methods = "Passkey (WebAuthn)"
elif mode == MfaPolicyMode.REQUIRE_TOTP_OR_WEBAUTHN:
mfa_methods = "Authenticator app (TOTP) OR Passkey (WebAuthn)"
deadline_date = compliance.deadline_at.strftime('%Y-%m-%d %H:%M UTC') if compliance.deadline_at else 'Not set'
# Build HTML email
app_url = current_app.config.get("APP_URL", "http://localhost:8080")
setup_link = f"{app_url}/settings/security"
2026-01-16 17:31:20 +10:30
subject = f"Action Required: MFA enrollment deadline in {days_until_deadline} days"
html_body = build_mfa_deadline_reminder_html(
user_name=user.full_name or user.email,
org_name=org_name,
days_remaining=days_until_deadline,
deadline_date=deadline_date,
mfa_methods=mfa_methods,
setup_link=setup_link,
2026-01-16 17:31:20 +10:30
)
# Send the notification
2026-03-29 23:14:20 +05:45
NotificationService._send_email_async(
2026-01-16 17:31:20 +10:30
to_address=user.email,
subject=subject,
body=f"MFA enrollment deadline in {days_until_deadline} days: {setup_link}",
html_body=html_body,
2026-01-16 17:31:20 +10:30
)
2026-03-29 23:14:20 +05:45
logger.info(
f"Sent MFA deadline reminder to {user.email} "
f"({days_until_deadline} days remaining)"
)
AuditService.log_action(
action=AuditAction.MFA_NOTIFICATION_SENT,
2026-03-29 23:14:20 +05:45
user_id=user.id,
organization_id=compliance.organization_id,
description=f"MFA deadline reminder sent. Days remaining: {days_until_deadline}",
)
return True
2026-01-16 17:31:20 +10:30
except Exception as e:
logger.exception(f"Error sending MFA deadline reminder to {user.email}: {e}")
return False
@staticmethod
def send_mfa_suspended_notification(
user: User,
compliance: MfaPolicyCompliance,
org_policy: OrganizationSecurityPolicy,
) -> bool:
"""Send MFA suspension notification to user.
Notifies users that their account has been suspended due to
failure to comply with MFA requirements. The notification includes:
- Explanation of suspension
- Steps to restore access
- Link to MFA enrollment
Args:
user: User to notify
compliance: User's compliance record
org_policy: Organization's MFA policy
Returns:
True if notification was sent successfully, False otherwise
"""
try:
# Get organization name
org_name = compliance.organization_id
from gatehouse_app.models.organization.organization import Organization
org = Organization.query.get(compliance.organization_id)
if org:
org_name = org.name
# Build required MFA methods string
from gatehouse_app.utils.constants import MfaPolicyMode
mfa_methods = "Multi-factor authentication"
mode = org_policy.mfa_policy_mode
if mode == MfaPolicyMode.REQUIRE_TOTP:
mfa_methods = "Authenticator app (TOTP)"
elif mode == MfaPolicyMode.REQUIRE_WEBAUTHN:
mfa_methods = "Passkey (WebAuthn)"
elif mode == MfaPolicyMode.REQUIRE_TOTP_OR_WEBAUTHN:
mfa_methods = "Authenticator app (TOTP) OR Passkey (WebAuthn)"
# Build HTML email
app_url = current_app.config.get("APP_URL", "http://localhost:8080")
setup_link = f"{app_url}/settings/security"
2026-01-16 17:31:20 +10:30
subject = "Account Access Restricted - MFA Enrollment Required"
html_body = build_mfa_suspension_html(
user_name=user.full_name or user.email,
org_name=org_name,
mfa_methods=mfa_methods,
setup_link=setup_link,
2026-01-16 17:31:20 +10:30
)
# Send the notification
2026-03-29 23:14:20 +05:45
NotificationService._send_email_async(
2026-01-16 17:31:20 +10:30
to_address=user.email,
subject=subject,
body=f"Your account has been suspended. Set up MFA to restore access: {setup_link}",
html_body=html_body,
2026-01-16 17:31:20 +10:30
)
2026-03-29 23:14:20 +05:45
logger.info(f"Sent MFA suspension notification to {user.email}")
AuditService.log_action(
action=AuditAction.MFA_SUSPENSION_NOTIFICATION_SENT,
2026-03-29 23:14:20 +05:45
user_id=user.id,
organization_id=compliance.organization_id,
description="MFA compliance suspension notification sent",
)
return True
2026-01-16 17:31:20 +10:30
except Exception as e:
logger.exception(
f"Error sending MFA suspension notification to {user.email}: {e}"
)
return False
@staticmethod
def _build_deadline_reminder_body(
user: User,
compliance: MfaPolicyCompliance,
org_policy: OrganizationSecurityPolicy,
days_until_deadline: int,
) -> str:
"""Build the email body for deadline reminder.
Args:
user: User being notified
compliance: Compliance record
org_policy: Organization policy
days_until_deadline: Days remaining until deadline
Returns:
Email body string
"""
org_name = compliance.organization_id # In real impl, fetch org name
body = f"""
Dear {user.full_name or user.email},
This is a reminder that you need to set up multi-factor authentication (MFA)
to maintain access to your account in the organization "{org_name}".
**Important Details:**
- Days remaining: {days_until_deadline}
- Deadline: {compliance.deadline_at.strftime('%Y-%m-%d %H:%M UTC') if compliance.deadline_at else 'Not set'}
**Required MFA Methods:**
"""
# Add required methods based on policy mode
from gatehouse_app.utils.constants import MfaPolicyMode
mode = org_policy.mfa_policy_mode
if mode == MfaPolicyMode.REQUIRE_TOTP:
body += "- Authenticator app (TOTP)\n"
elif mode == MfaPolicyMode.REQUIRE_WEBAUTHN:
body += "- Passkey (WebAuthn)\n"
elif mode == MfaPolicyMode.REQUIRE_TOTP_OR_WEBAUTHN:
body += "- Authenticator app (TOTP) OR Passkey (WebAuthn)\n"
else:
body += "- Multi-factor authentication\n"
body += """
**How to Set Up MFA:**
1. Log in to your account
2. Navigate to Settings > Security
3. Follow the prompts to set up an authenticator app or passkey
If you do not set up MFA by the deadline, your account access will be restricted.
If you have any questions, please contact your organization administrator.
Best regards,
Secuird Security Team
2026-01-16 17:31:20 +10:30
"""
return body
@staticmethod
def _build_suspension_body(
user: User,
compliance: MfaPolicyCompliance,
org_policy: OrganizationSecurityPolicy,
) -> str:
"""Build the email body for suspension notification.
Args:
user: User being notified
compliance: Compliance record
org_policy: Organization policy
Returns:
Email body string
"""
org_name = compliance.organization_id # In real impl, fetch org name
body = f"""
Dear {user.full_name or user.email},
Your account access has been restricted because you did not set up
multi-factor authentication (MFA) within the required timeframe for
the organization "{org_name}".
**What Happened:**
Your MFA compliance deadline passed without MFA being configured.
As a result, your account has been placed in a suspended state.
**How to Restore Access:**
1. Log in to your account (you will see a compliance enrollment screen)
2. Follow the prompts to set up an authenticator app or passkey
3. Once MFA is configured, your access will be restored
**Required MFA Methods:
"""
# Add required methods based on policy mode
from gatehouse_app.utils.constants import MfaPolicyMode
mode = org_policy.mfa_policy_mode
if mode == MfaPolicyMode.REQUIRE_TOTP:
body += "- Authenticator app (TOTP)\n"
elif mode == MfaPolicyMode.REQUIRE_WEBAUTHN:
body += "- Passkey (WebAuthn)\n"
elif mode == MfaPolicyMode.REQUIRE_TOTP_OR_WEBAUTHN:
body += "- Authenticator app (TOTP) OR Passkey (WebAuthn)\n"
else:
body += "- Multi-factor authentication\n"
body += """
**Need Help?**
Contact your organization administrator if you have questions.
Best regards,
Secuird Security Team
2026-01-16 17:31:20 +10:30
"""
return body
@staticmethod
2026-03-29 23:14:20 +05:45
def _send_email_async(
2026-01-16 17:31:20 +10:30
to_address: str,
subject: str,
body: str,
html_body: Optional[str] = None,
2026-03-29 23:14:20 +05:45
) -> None:
"""Send an email on a daemon thread so the calling request returns immediately.
2026-01-16 17:31:20 +10:30
2026-03-29 23:14:20 +05:45
If EMAIL_ENABLED is False, logs instead of sending.
All email provider exceptions are caught and logged — this method never raises.
2026-03-29 23:14:20 +05:45
The Flask app context is pushed inside the thread so current_app works correctly.
2026-01-16 17:31:20 +10:30
"""
from flask import current_app
2026-01-16 17:31:20 +10:30
2026-03-29 23:14:20 +05:45
app = current_app._get_current_object() # capture real app before leaving request context
def _send():
with app.app_context():
email_enabled = app.config.get(NotificationService.EMAIL_ENABLED_KEY, False)
if not email_enabled:
logger.info(
f"[EMAIL DISABLED] Would have sent to: {to_address} | Subject: {subject}"
2026-03-29 23:14:20 +05:45
)
return
from_address = app.config.get(NotificationService.FROM_ADDRESS_KEY, "")
# Build email message
message = EmailMessage(
to=to_address,
subject=subject,
body=body,
html_body=html_body,
from_address=from_address,
2026-03-29 23:14:20 +05:45
)
# Get provider and send
provider = EmailProviderFactory.get_provider()
success = provider.send(message)
2026-03-29 23:14:20 +05:45
if success:
2026-03-29 23:14:20 +05:45
logger.info(f"[EMAIL] Sent to {to_address} | Subject: {subject}")
else:
logger.error(f"[EMAIL] Failed to send to {to_address}")
2026-03-29 23:14:20 +05:45
threading.Thread(target=_send, daemon=True).start()
2026-01-16 17:31:20 +10:30
@staticmethod
def get_notification_stats(user_id: str) -> Dict[str, Any]:
"""Get notification statistics for a user.
Args:
user_id: User ID
Returns:
Dictionary with notification statistics
"""
from gatehouse_app.models.security.mfa_policy_compliance import MfaPolicyCompliance
2026-01-16 17:31:20 +10:30
stats = {
"total_notifications": 0,
"last_notification": None,
"by_organization": [],
}
compliance_records = MfaPolicyCompliance.query.filter_by(
user_id=user_id, deleted_at=None
).all()
total_notifications = 0
last_notification = None
for record in compliance_records:
total_notifications += record.notification_count
if record.last_notified_at:
if last_notification is None or record.last_notified_at > last_notification:
last_notification = record.last_notified_at
stats["by_organization"].append({
"organization_id": record.organization_id,
"notification_count": record.notification_count,
"last_notified_at": record.last_notified_at.isoformat() if record.last_notified_at else None,
})
stats["total_notifications"] = total_notifications
stats["last_notification"] = last_notification.isoformat() if last_notification else None
return stats