Files

309 lines
10 KiB
Python
Raw Permalink Normal View History

2026-01-08 01:00:26 +10:30
"""Management script for Flask application."""
import os
2026-03-06 01:36:23 +05:45
import click
2026-01-08 01:00:26 +10:30
from dotenv import load_dotenv
2026-01-08 15:59:53 +10:30
# Load environment variables FIRST, before any app imports
load_dotenv(dotenv_path=os.path.join(os.path.dirname(os.path.abspath(__file__)), '.env'))
from flask.cli import FlaskGroup
2026-01-15 03:40:29 +10:30
from gatehouse_app import create_app
2026-01-08 01:00:26 +10:30
# Create application
app = create_app(os.getenv("FLASK_ENV", "development"))
# Create Flask CLI group
cli = FlaskGroup(create_app=lambda: app)
2026-01-16 17:31:20 +10:30
@cli.command("run_mfa_compliance_job")
def run_mfa_compliance_job():
"""Run the MFA compliance scheduled job.
This command processes MFA compliance transitions:
- Transitions users from PAST_DUE to SUSPENDED status
- Sends deadline reminder notifications
- Updates notification tracking metadata
Usage:
python manage.py run_mfa_compliance_job
This can be called via cron or a task scheduler:
0 * * * * cd /path/to/app && python manage.py run_mfa_compliance_job
"""
from datetime import datetime, timezone
from gatehouse_app.jobs.mfa_compliance_job import process_mfa_compliance, get_job_status
print("=" * 60)
print("MFA Compliance Job")
print("=" * 60)
now = datetime.now(timezone.utc)
print(f"Start time: {now.isoformat()}")
print()
# Show current status before processing
print("Current Compliance Status:")
status = get_job_status(now)
for status_name, count in status["status_counts"].items():
print(f" {status_name}: {count}")
print(f" Approaching deadline: {status['approaching_deadline_count']}")
print(f" Past due: {status['past_due_count']}")
print()
# Run the job
print("Processing compliance...")
result = process_mfa_compliance(now)
print()
print("Job Results:")
print(f" Users suspended: {result['suspended_count']}")
print(f" Notifications sent: {result['notified_count']}")
print(f" Records processed: {result['processed_count']}")
if result['errors']:
print()
print("Errors:")
for error in result['errors']:
print(f" - {error}")
print()
print("=" * 60)
print("Job completed successfully")
print("=" * 60)
@cli.command("run_zerotier_reconciliation")
def run_zerotier_reconciliation():
"""Run the ZeroTier network reconciliation scheduled job.
This command:
- Expires activation sessions past their TTL and deauthorizes ZT members
- Syncs observed ZeroTier membership state into the portal cache
- Reconciles portal membership state against ZT controller state
- Detects and repairs drift between portal and ZT
Usage:
python manage.py run_zerotier_reconciliation
Cron example (every 2 minutes):
*/2 * * * * cd /path/to/app && python manage.py run_zerotier_reconciliation
"""
from datetime import datetime, timezone
from gatehouse_app.jobs.zerotier_reconciliation_job import run_reconciliation
print("=" * 60)
print("ZeroTier Reconciliation Job")
print("=" * 60)
now = datetime.now(timezone.utc)
print(f"Start time: {now.isoformat()}")
print()
result = run_reconciliation()
print()
print("Job Results:")
print(f" Expired activations: {result['expired_activations']}")
print(f" Networks processed: {result['networks_processed']}")
print(f" Authorized: {result['authorized']}")
print(f" Deauthorized: {result['deauthorized']}")
print(f" Purged memberships: {result['deleted_memberships']}")
print(f" Purge errors: {result['delete_errors']}")
print(f" Errors: {result['errors']}")
print()
print("=" * 60)
print("Job completed successfully")
print("=" * 60)
2026-01-16 17:31:20 +10:30
@cli.command("mfa_compliance_status")
def mfa_compliance_status():
"""Show current MFA compliance status.
Usage:
python manage.py mfa_compliance_status
"""
from datetime import datetime, timezone
from gatehouse_app.jobs.mfa_compliance_job import get_job_status
print("=" * 60)
print("MFA Compliance Status Report")
print("=" * 60)
now = datetime.now(timezone.utc)
status = get_job_status(now)
print(f"Report time: {status['timestamp']}")
print()
print("Compliance Records by Status:")
for status_name, count in sorted(status["status_counts"].items()):
bar = "" * min(count, 50)
print(f" {status_name:20s}: {count:5d} {bar}")
print()
print("Summary:")
print(f" Approaching deadline: {status['approaching_deadline_count']}")
print(f" Past due (pending suspension): {status['past_due_count']}")
total = sum(status["status_counts"].values())
compliant = status["status_counts"].get("compliant", 0)
if total > 0:
compliance_rate = (compliant / total) * 100
print(f" Compliance rate: {compliance_rate:.1f}%")
print("=" * 60)
@cli.command("cleanup_sessions")
def cleanup_sessions():
"""Clean up expired user sessions.
Marks sessions as EXPIRED when they have passed their expires_at
timestamp. Safe to run frequently (e.g. every 5 minutes via job_runner).
Usage:
python manage.py cleanup_sessions
"""
from gatehouse_app.services.session_service import SessionService
print("=" * 60)
print("Session Cleanup Job")
print("=" * 60)
from datetime import datetime, timezone
print(f"Start time: {datetime.now(timezone.utc).isoformat()}")
count = SessionService.cleanup_expired_sessions()
print(f"Expired sessions marked: {count}")
print("=" * 60)
@cli.command("configure_oauth")
2026-03-06 01:36:23 +05:45
@click.argument("provider", required=False)
@click.option("--client-id", default=None, help="OAuth client ID")
@click.option("--client-secret", default=None, help="OAuth client secret")
@click.option("--redirect-url", default=None, help="Default redirect URL (e.g. https://yourdomain.com/api/v1/auth/external/<provider>/callback)")
def configure_oauth(provider, client_id, client_secret, redirect_url):
"""Configure an OAuth provider at the application level.
Usage (interactive):
python manage.py configure_oauth
2026-03-06 01:36:23 +05:45
Usage (non-interactive):
python manage.py configure_oauth google --client-id ID --client-secret SECRET
Supported providers: google, github, microsoft
"""
import getpass
2026-03-06 01:36:23 +05:45
from gatehouse_app.models.auth.authentication_method import ApplicationProviderConfig
from gatehouse_app.extensions import db
SUPPORTED = ["google", "github", "microsoft"]
2026-03-06 01:36:23 +05:45
# Well-known endpoints — stored in additional_config so the adapter can
# resolve auth_url / token_url / userinfo_url without extra logic.
PROVIDER_DEFAULTS = {
"google": {
"auth_url": "https://accounts.google.com/o/oauth2/v2/auth",
"token_url": "https://oauth2.googleapis.com/token",
"userinfo_url": "https://www.googleapis.com/oauth2/v3/userinfo",
},
"github": {
"auth_url": "https://github.com/login/oauth/authorize",
"token_url": "https://github.com/login/oauth/access_token",
"userinfo_url": "https://api.github.com/user",
},
"microsoft": {
"auth_url": "https://login.microsoftonline.com/common/oauth2/v2.0/authorize",
"token_url": "https://login.microsoftonline.com/common/oauth2/v2.0/token",
"userinfo_url": "https://graph.microsoft.com/oidc/userinfo",
},
}
if not provider:
print("=" * 60)
print("OAuth Provider Configuration")
print("=" * 60)
print(f"Supported providers: {', '.join(SUPPORTED)}")
provider = input("Provider [google/github/microsoft]: ").strip().lower()
provider = provider.strip().lower()
if provider not in SUPPORTED:
print(f"❌ Unknown provider: {provider}")
return
2026-03-06 01:36:23 +05:45
if not client_id:
client_id = input("Client ID: ").strip()
if not client_id:
print("❌ client_id is required")
return
2026-03-06 01:36:23 +05:45
if not client_secret:
client_secret = getpass.getpass("Client Secret (leave blank to keep existing): ").strip()
if not redirect_url:
base_url = os.getenv("API_BASE_URL", "http://localhost:5000/api/v1")
default = f"{base_url}/auth/external/{provider}/callback"
entered = input(f"Default redirect URL [{default}]: ").strip()
redirect_url = entered or default
additional_config = PROVIDER_DEFAULTS[provider].copy()
with app.app_context():
config = ApplicationProviderConfig.query.filter_by(provider_type=provider).first()
if config:
config.client_id = client_id
if client_secret:
config.set_client_secret(client_secret)
config.is_enabled = True
2026-03-06 01:36:23 +05:45
config.default_redirect_url = redirect_url
config.additional_config = {
**(config.additional_config or {}),
**additional_config,
}
db.session.commit()
print(f"✅ Updated {provider} provider config.")
else:
config = ApplicationProviderConfig(
provider_type=provider,
client_id=client_id,
is_enabled=True,
2026-03-06 01:36:23 +05:45
default_redirect_url=redirect_url,
additional_config=additional_config,
)
if client_secret:
config.set_client_secret(client_secret)
db.session.add(config)
db.session.commit()
print(f"✅ Created {provider} provider config.")
2026-03-06 01:36:23 +05:45
print(f" redirect_url : {redirect_url}")
print(f" auth_url : {additional_config['auth_url']}")
@cli.command("list_oauth")
def list_oauth():
"""List all configured OAuth providers.
Usage:
python manage.py list_oauth
"""
2026-03-06 01:36:23 +05:45
from gatehouse_app.models.auth.authentication_method import ApplicationProviderConfig
with app.app_context():
configs = ApplicationProviderConfig.query.all()
if not configs:
print("No OAuth providers configured.")
return
print(f"{'Provider':<15} {'Client ID':<40} {'Enabled'}")
print("-" * 65)
for c in configs:
print(f"{c.provider_type:<15} {c.client_id:<40} {c.is_enabled}")
2026-01-08 01:00:26 +10:30
if __name__ == "__main__":
cli()