"""Usage tracking service for superadmin operations.""" import logging from datetime import datetime, timedelta, timezone from gatehouse_app.models.organization.organization import Organization from gatehouse_app.models.user.session import Session from gatehouse_app.models.organization.organization_member import OrganizationMember logger = logging.getLogger(__name__) class UsageMetric: """Usage metric types.""" USERS = "users" SESSIONS = "sessions" ACTIVE_SESSIONS = "active_sessions" API_CALLS = "api_calls" class SuperadminUsageService: """Service for tracking and retrieving usage metrics.""" @staticmethod def get_current_usage(org_id: str) -> dict: """Get current period usage for an organization. Args: org_id: Organization UUID Returns: Current usage metrics including user count and active sessions """ org = Organization.query.get(org_id) if not org: raise ValueError("Organization not found") # Get active member count member_count = org.get_member_count() # Get active sessions count member_user_ids = [m.user_id for m in org.members if m.deleted_at is None] active_sessions = Session.query.filter( Session.user_id.in_(member_user_ids), Session.deleted_at.is_(None), Session.status == "active", ).count() # Get max concurrent sessions this month now = datetime.now(timezone.utc) period_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) # For simplicity, we'll track peak concurrent sessions # In production, you'd want a separate tracking table max_sessions_this_month = active_sessions # Placeholder return { "organization_id": org_id, "period_start": period_start.isoformat() + "Z", "period_end": now.isoformat() + "Z", "metrics": { "users": { "current": member_count, "limit": None, # Will come from plan "description": "Total organization members", }, "active_sessions": { "current": active_sessions, "max_this_month": max_sessions_this_month, "description": "Currently active user sessions", }, }, } @staticmethod def get_usage_history( org_id: str, metric: str, days: int = 30, ) -> dict: """Get usage history for a specific metric. Args: org_id: Organization UUID metric: Metric type (users, sessions) days: Number of days of history Returns: List of daily usage data points """ org = Organization.query.get(org_id) if not org: raise ValueError("Organization not found") now = datetime.now(timezone.utc) start_date = now - timedelta(days=days) # Get member history (simplified - would need a history table in production) history = [] current_count = org.get_member_count() # Generate daily data points (placeholder - real implementation needs history table) for i in range(days): date = start_date + timedelta(days=i) history.append({ "date": date.strftime("%Y-%m-%d"), "value": current_count, # Simplified }) return { "organization_id": org_id, "metric": metric, "period_start": start_date.isoformat() + "Z", "period_end": now.isoformat() + "Z", "history": history, } @staticmethod def get_seat_count_for_period(org_id: str, year: int, month: int) -> dict: """Calculate maximum seat count used in a given month. For billing purposes - tracks the peak number of users. Args: org_id: Organization UUID year: Year month: Month Returns: Seat count data for the period """ org = Organization.query.get(org_id) if not org: raise ValueError("Organization not found") # Calculate first and last day of month first_day = datetime(year, month, 1, tzinfo=timezone.utc) if month == 12: last_day = datetime(year + 1, 1, 1, tzinfo=timezone.utc) - timedelta(seconds=1) else: last_day = datetime(year, month + 1, 1, tzinfo=timezone.utc) - timedelta(seconds=1) # Get all members that existed during this period members = OrganizationMember.query.filter( OrganizationMember.organization_id == org_id, OrganizationMember.deleted_at.is_(None), ).all() # Count unique users who were members at any point during the month max_seats = len(members) # Current count at end of month current_seats = len([m for m in members if m.deleted_at is None or m.deleted_at > last_day]) return { "organization_id": org_id, "period": f"{year}-{month:02d}", "max_seats": max_seats, "current_seats": current_seats, "period_start": first_day.isoformat() + "Z", "period_end": last_day.isoformat() + "Z", } @staticmethod def adjust_usage( org_id: str, metric: str, adjustment: int, reason: str, superadmin_id: str, ) -> dict: """Apply a manual usage adjustment (credit or charge). Args: org_id: Organization UUID metric: Metric to adjust adjustment: Positive (credit) or negative (charge) reason: Reason for adjustment superadmin_id: Superadmin making the adjustment Returns: Adjustment confirmation """ org = Organization.query.get(org_id) if not org: raise ValueError("Organization not found") # In production, you'd create a UsageAdjustment record # For now, just log and return logger.warning( f"[SuperadminUsage] Adjustment: org={org_id}, metric={metric}, " f"adjustment={adjustment}, reason={reason}, by={superadmin_id}" ) return { "organization_id": org_id, "metric": metric, "adjustment": adjustment, "reason": reason, "applied_at": datetime.now(timezone.utc).isoformat() + "Z", }