"""Device management service.""" import logging import re from gatehouse_app.extensions import db from gatehouse_app.models import Device from gatehouse_app.models.user import User from gatehouse_app.services.audit_service import AuditService from gatehouse_app.exceptions import ( DeviceNotFoundError, DeviceAlreadyExistsError, InvalidNodeIdError, ValidationError, ) logger = logging.getLogger(__name__) _NODE_ID_RE = re.compile(r"^[0-9a-fA-F]{10}$") def _validate_node_id(node_id: str) -> str: node_id = node_id.strip().lower() if not _NODE_ID_RE.match(node_id): raise InvalidNodeIdError( f"Invalid ZeroTier node ID '{node_id}' — must be exactly 10 hex characters." ) return node_id def register_device( user_id: str, organization_id: str, node_id: str, nickname: str | None = None, hostname: str | None = None, asset_tag: str | None = None, serial_number: str | None = None, ) -> Device: """Register a new device for a user in an organization. Args: user_id: ID of the owning user organization_id: ID of the organization node_id: 10-char hex ZeroTier node ID nickname: User-assigned friendly name hostname: Device hostname asset_tag: Corporate asset tag serial_number: Device serial number Returns: The created Device record """ node_id = _validate_node_id(node_id) existing = Device.query.filter( Device.node_id == node_id, Device.deleted_at.is_(None), ).first() if existing: raise DeviceAlreadyExistsError( f"A device with node ID {node_id} is already registered." ) device = Device( user_id=user_id, organization_id=organization_id, node_id=node_id, device_nickname=nickname, hostname=hostname, asset_tag=asset_tag, serial_number=serial_number, ) device.save() AuditService.log_action( action="device.registered", user_id=user_id, organization_id=organization_id, resource_type="device", resource_id=device.id, metadata={"node_id": node_id, "nickname": nickname}, description=f"Device {node_id} registered", success=True, ) return device def list_user_devices(user_id: str, organization_id: str) -> list[Device]: """List all active devices for a user in an organization.""" return Device.query.filter( Device.user_id == user_id, Device.organization_id == organization_id, Device.deleted_at.is_(None), ).all() def get_device(device_id: str, organization_id: str | None = None) -> Device: """Fetch a device by ID, optionally scoped to an organization.""" q = Device.query.filter( Device.id == device_id, Device.deleted_at.is_(None), ) if organization_id: q = q.filter(Device.organization_id == organization_id) device = q.first() if not device: raise DeviceNotFoundError(f"Device {device_id} not found.") return device def get_device_by_node_id(node_id: str, organization_id: str | None = None) -> Device | None: """Find a device by its ZeroTier node ID, optionally scoped to org.""" node_id = _validate_node_id(node_id) q = Device.query.filter( Device.node_id == node_id, Device.deleted_at.is_(None), ) if organization_id: q = q.filter(Device.organization_id == organization_id) return q.first() def update_device( device_id: str, user_id: str, **kwargs, ) -> Device: """Update device fields. Only allows nickname, hostname, asset_tag.""" device = get_device(device_id) if device.user_id != user_id: raise DeviceNotFoundError("Device not found.") allowed = {"device_nickname", "hostname", "asset_tag", "serial_number"} for key in kwargs: if key not in allowed: raise ValidationError(f"Cannot update field: {key}") device.update(**kwargs) AuditService.log_action( action="device.updated", user_id=user_id, organization_id=device.organization_id, resource_type="device", resource_id=device.id, metadata=kwargs, description=f"Device {device.node_id} updated", success=True, ) return device def remove_device(device_id: str, user_id: str) -> None: """Soft-delete a device, deactivate its active memberships, and soft-delete all memberships. Since memberships are device-centric (tied to a specific device/node_id), removing a device means all its memberships are orphaned and must be cleaned up. """ device = get_device(device_id) if device.user_id != user_id: raise DeviceNotFoundError("Device not found.") # Soft-delete all memberships (deactivates active ones first) for membership in device.memberships: if membership.deleted_at is None: from gatehouse_app.services.network_access_service import revoke_membership_soft revoke_membership_soft(membership.id, revoked_by_user_id=user_id) device.delete(soft=True) AuditService.log_action( action="device.removed", user_id=user_id, organization_id=device.organization_id, resource_type="device", resource_id=device.id, metadata={"node_id": device.node_id, "memberships_removed": len([m for m in device.memberships if m.deleted_at is None])}, description=f"Device {device.node_id} removed", success=True, )