"""ZeroTier API client — reusable SDK for Secuird integration. Supports both ZeroTier Central (hosted) and self-hosted controllers. Central API (default): Base URL: https://api.zerotier.com/api/v1 Auth: Authorization: token Docs: https://docs.zerotier.com/api/central/v1/ Self-hosted Controller API: Base URL: http://:9993 Auth: X-ZT1-Auth: Docs: https://docs.zerotier.com/api/service/v1/ """ import logging import re import time from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from typing import Any, Optional import requests logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Validation helpers # --------------------------------------------------------------------------- _NETWORK_ID_RE = re.compile(r"^[0-9a-fA-F]{16}$") _NODE_ID_RE = re.compile(r"^[0-9a-fA-F]{10}$") def validate_network_id(network_id: str) -> str: """Validate and normalise a 16-hex-char ZeroTier network ID.""" network_id = network_id.strip().lower() if not _NETWORK_ID_RE.match(network_id): raise ValueError( f"Invalid ZeroTier network ID '{network_id}' — " "must be exactly 16 hexadecimal characters." ) return network_id def validate_node_id(node_id: str) -> str: """Validate and normalise a 10-hex-char ZeroTier node ID.""" node_id = node_id.strip().lower() if not _NODE_ID_RE.match(node_id): raise ValueError( f"Invalid ZeroTier node ID '{node_id}' — " "must be exactly 10 hexadecimal characters." ) return node_id # --------------------------------------------------------------------------- # API mode # --------------------------------------------------------------------------- class APIMode(Enum): """Which ZeroTier API flavour the client targets.""" CENTRAL = "central" # Hosted at api.zerotier.com CONTROLLER = "controller" # Self-hosted zerotier-one service # --------------------------------------------------------------------------- # Data classes — typed representations of ZeroTier API objects # --------------------------------------------------------------------------- @dataclass class ZTMemberConfig: """Subset of the member config block we care about.""" authorized: bool = False active_bridge: bool = False ip_assignments: list[str] = field(default_factory=list) creation_time: Optional[int] = None last_authorized_time: Optional[int] = None last_deauthorized_time: Optional[int] = None version_major: Optional[int] = None version_minor: Optional[int] = None version_rev: Optional[int] = None @classmethod def from_central(cls, data: dict) -> "ZTMemberConfig": """Parse config from Central API member.config block.""" return cls( authorized=data.get("authorized", False), active_bridge=data.get("activeBridge", False), ip_assignments=data.get("ipAssignments", []), creation_time=data.get("creationTime"), last_authorized_time=data.get("lastAuthorizedTime"), last_deauthorized_time=data.get("lastDeauthorizedTime"), version_major=data.get("vMajor"), version_minor=data.get("vMinor"), version_rev=data.get("vRev"), ) @classmethod def from_controller(cls, data: dict) -> "ZTMemberConfig": """Parse config from self-hosted controller — fields are top-level.""" return cls( authorized=data.get("authorized", False), active_bridge=data.get("activeBridge", False), ip_assignments=data.get("ipAssignments", []), creation_time=data.get("creationTime"), # Controller API does not return lastAuthorizedTime/lastDeauthorizedTime version_major=data.get("vMajor"), version_minor=data.get("vMinor"), version_rev=data.get("vRev"), ) @property def version_string(self) -> str: if self.version_major is not None: return f"{self.version_major}.{self.version_minor}.{self.version_rev}" return "unknown" def to_dict(self) -> dict: return { "authorized": self.authorized, "active_bridge": self.active_bridge, "ip_assignments": self.ip_assignments, "creation_time": self.creation_time, "last_authorized_time": self.last_authorized_time, "last_deauthorized_time": self.last_deauthorized_time, "version_string": self.version_string, } @dataclass class ZTMember: """A member (device) on a ZeroTier network.""" id: str # composite "{networkId}-{nodeId}" on Central, just nodeId on controller network_id: str node_id: str name: Optional[str] = None description: Optional[str] = None hidden: bool = False config: ZTMemberConfig = field(default_factory=ZTMemberConfig) last_online: Optional[int] = None last_seen: Optional[int] = None physical_address: Optional[str] = None client_version: Optional[str] = None controller_id: Optional[str] = None raw: dict = field(default_factory=dict, repr=False) @classmethod def from_central(cls, data: dict) -> "ZTMember": """Parse member from Central API response.""" config_data = data.get("config", {}) return cls( id=data.get("id", ""), network_id=data.get("networkId", ""), node_id=data.get("nodeId", ""), name=data.get("name"), description=data.get("description"), hidden=data.get("hidden", False), config=ZTMemberConfig.from_central(config_data), last_online=data.get("lastOnline"), last_seen=data.get("lastSeen"), physical_address=data.get("physicalAddress"), client_version=data.get("clientVersion"), controller_id=data.get("controllerId"), raw=data, ) @classmethod def from_controller(cls, data: dict, network_id: str = "") -> "ZTMember": """Parse member from self-hosted controller API response. Controller responses are flat — authorized, ipAssignments etc. are top-level keys, not nested under a config block. """ node_id = data.get("id", data.get("address", "")) nwid = data.get("nwid", network_id) return cls( id=f"{nwid}-{node_id}", network_id=nwid, node_id=node_id, name=None, # controller API doesn't have name/description description=None, hidden=False, config=ZTMemberConfig.from_controller(data), last_online=None, # not available in controller API last_seen=None, physical_address=None, client_version=None, controller_id=None, raw=data, ) @property def is_authorized(self) -> bool: return self.config.authorized @property def display_name(self) -> str: return self.name or self.node_id @property def ip_list(self) -> str: return ", ".join(self.config.ip_assignments) if self.config.ip_assignments else "none" @property def last_seen_str(self) -> str: if not self.last_seen or self.last_seen == 0: return "never" dt = datetime.fromtimestamp(self.last_seen / 1000, tz=timezone.utc) delta = datetime.now(tz=timezone.utc) - dt if delta.total_seconds() < 120: return "just now" if delta.total_seconds() < 3600: return f"{int(delta.total_seconds() / 60)}m ago" if delta.total_seconds() < 86400: return f"{int(delta.total_seconds() / 3600)}h ago" return f"{int(delta.days)}d ago" def to_dict(self) -> dict: return { "id": self.id, "network_id": self.network_id, "node_id": self.node_id, "name": self.name, "description": self.description, "hidden": self.hidden, "is_authorized": self.is_authorized, "display_name": self.display_name, "ip_list": self.ip_list, "last_online": self.last_online, "last_seen": self.last_seen, "last_seen_str": self.last_seen_str, "client_version": self.client_version, "controller_id": self.controller_id, "config": self.config.to_dict(), } @dataclass class ZTNetworkConfig: """Subset of the network config block.""" name: str = "" private: bool = True creation_time: Optional[int] = None ip_assignment_pools: list[dict] = field(default_factory=list) routes: list[dict] = field(default_factory=list) @classmethod def from_central(cls, data: dict) -> "ZTNetworkConfig": """Parse from Central API network.config block.""" return cls( name=data.get("name", ""), private=data.get("private", True), creation_time=data.get("creationTime"), ip_assignment_pools=data.get("ipAssignmentPools", []), routes=data.get("routes", []), ) @classmethod def from_controller(cls, data: dict) -> "ZTNetworkConfig": """Parse from self-hosted controller — fields are top-level.""" return cls( name=data.get("name", ""), private=data.get("private", True), creation_time=data.get("creationTime"), ip_assignment_pools=data.get("ipAssignmentPools", []), routes=data.get("routes", []), ) def to_dict(self) -> dict: return { "name": self.name, "private": self.private, "creation_time": self.creation_time, "ip_assignment_pools": self.ip_assignment_pools, "routes": self.routes, } @dataclass class ZTNetwork: """A ZeroTier virtual network.""" id: str config: ZTNetworkConfig = field(default_factory=ZTNetworkConfig) description: Optional[str] = None owner_id: Optional[str] = None online_member_count: int = 0 authorized_member_count: int = 0 total_member_count: int = 0 raw: dict = field(default_factory=dict, repr=False) @classmethod def from_central(cls, data: dict) -> "ZTNetwork": """Parse from Central API response.""" config_data = data.get("config", {}) return cls( id=data.get("id", ""), config=ZTNetworkConfig.from_central(config_data), description=data.get("description"), owner_id=data.get("ownerId"), online_member_count=data.get("onlineMemberCount", 0), authorized_member_count=data.get("authorizedMemberCount", 0), total_member_count=data.get("totalMemberCount", 0), raw=data, ) @classmethod def from_controller(cls, data: dict) -> "ZTNetwork": """Parse from self-hosted controller response — flat structure.""" return cls( id=data.get("id", data.get("nwid", "")), config=ZTNetworkConfig.from_controller(data), description=None, # controller API doesn't have description owner_id=None, online_member_count=0, # not available from controller authorized_member_count=0, total_member_count=0, raw=data, ) @property def name(self) -> str: return self.config.name or self.id def to_dict(self) -> dict: return { "id": self.id, "name": self.name, "description": self.description, "owner_id": self.owner_id, "online_member_count": self.online_member_count, "authorized_member_count": self.authorized_member_count, "total_member_count": self.total_member_count, "config": self.config.to_dict(), } # --------------------------------------------------------------------------- # Exceptions # --------------------------------------------------------------------------- class ZeroTierAPIError(Exception): """Base exception for ZeroTier API errors.""" def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[dict] = None): super().__init__(message) self.status_code = status_code self.response = response class ZeroTierAuthError(ZeroTierAPIError): """401/403 from ZeroTier — bad or missing token.""" pass class ZeroTierNotFoundError(ZeroTierAPIError): """404 — network or member not found.""" pass class ZeroTierRateLimitError(ZeroTierAPIError): """429 — rate limited.""" pass # --------------------------------------------------------------------------- # Client # --------------------------------------------------------------------------- class ZeroTierClient: """HTTP client for ZeroTier — supports both Central and self-hosted controllers. Designed to be lifted into Secuird's backend as-is. All methods are instance methods so the client can be instantiated with different tokens (e.g. per-organization tokens in a multi-tenant setup). Usage — Central (default): client = ZeroTierClient(api_token="your-central-token") Usage — Self-hosted controller: client = ZeroTierClient( api_token="contents-of-authtoken.secret", base_url="http://my-controller:9993", mode=APIMode.CONTROLLER, ) Then: network = client.get_network("8056c2e21c000001") members = client.list_members("8056c2e21c000001") client.authorize_member("8056c2e21c000001", "abcdef0123") client.deauthorize_member("8056c2e21c000001", "abcdef0123") """ DEFAULT_CENTRAL_URL = "https://api.zerotier.com/api/v1" DEFAULT_CONTROLLER_URL = "http://localhost:9993" DEFAULT_TIMEOUT = 15 # seconds def __init__( self, api_token: str, base_url: Optional[str] = None, mode: APIMode = APIMode.CENTRAL, timeout: int = DEFAULT_TIMEOUT, max_retries: int = 2, ): if not api_token: raise ValueError("ZeroTier API token is required.") self._api_token = api_token self._mode = mode self._timeout = timeout self._max_retries = max_retries # Default base URL depends on mode if base_url: self._base_url = base_url.rstrip("/") elif mode == APIMode.CENTRAL: self._base_url = self.DEFAULT_CENTRAL_URL else: self._base_url = self.DEFAULT_CONTROLLER_URL # Build session with correct auth header self._session = requests.Session() self._session.headers.update( { "Content-Type": "application/json", "Accept": "application/json", } ) if mode == APIMode.CENTRAL: self._session.headers["Authorization"] = f"token {self._api_token}" else: self._session.headers["X-ZT1-Auth"] = self._api_token @property def mode(self) -> APIMode: return self._mode @property def base_url(self) -> str: return self._base_url # ------------------------------------------------------------------ # Low-level HTTP # ------------------------------------------------------------------ def _request( self, method: str, path: str, json_body: Optional[dict] = None, params: Optional[dict] = None, ) -> Any: """Execute an HTTP request with retry and error mapping.""" url = f"{self._base_url}{path}" last_error: Optional[Exception] = None for attempt in range(1, self._max_retries + 1): try: logger.debug(f"[ZT API] {method} {url} (attempt {attempt})") resp = self._session.request( method=method, url=url, json=json_body, params=params, timeout=self._timeout, ) return self._handle_response(resp) except ZeroTierRateLimitError: if attempt < self._max_retries: wait = 2 ** attempt logger.warning(f"[ZT API] Rate limited, retrying in {wait}s...") time.sleep(wait) else: raise except requests.exceptions.Timeout as exc: last_error = exc if attempt < self._max_retries: wait = 2 ** attempt logger.warning(f"[ZT API] Timeout, retrying in {wait}s...") time.sleep(wait) else: raise ZeroTierAPIError(f"Request timed out after {self._timeout}s") from exc except requests.exceptions.ConnectionError as exc: last_error = exc if attempt < self._max_retries: wait = 2 ** attempt logger.warning(f"[ZT API] Connection error, retrying in {wait}s...") time.sleep(wait) else: raise ZeroTierAPIError( f"Connection to ZeroTier API failed at {self._base_url}" ) from exc # Should not reach here, but just in case raise ZeroTierAPIError("Request failed after retries") from last_error def _handle_response(self, resp: requests.Response) -> Any: """Map HTTP status codes to typed exceptions.""" if resp.status_code == 200: if resp.content: return resp.json() return None # Try to parse error body error_body = None try: error_body = resp.json() except (ValueError, requests.exceptions.JSONDecodeError): pass msg = f"ZeroTier API error {resp.status_code}" if error_body and isinstance(error_body, dict): msg = error_body.get("message", msg) if resp.status_code in (401, 403): raise ZeroTierAuthError(msg, status_code=resp.status_code, response=error_body) if resp.status_code == 404: raise ZeroTierNotFoundError(msg, status_code=resp.status_code, response=error_body) if resp.status_code == 429: raise ZeroTierRateLimitError(msg, status_code=resp.status_code, response=error_body) raise ZeroTierAPIError(msg, status_code=resp.status_code, response=error_body) # ------------------------------------------------------------------ # Path builders — abstract away Central vs Controller URL differences # ------------------------------------------------------------------ def _network_path(self, network_id: str = "") -> str: """Build the path prefix for network endpoints.""" if self._mode == APIMode.CENTRAL: return f"/network/{network_id}" if network_id else "/network" else: return f"/controller/network/{network_id}" if network_id else "/controller/network" def _member_path(self, network_id: str, node_id: str = "") -> str: """Build the path for member endpoints.""" base = self._network_path(network_id) + "/member" if node_id: base += f"/{node_id}" return base # ------------------------------------------------------------------ # Status / connectivity check # ------------------------------------------------------------------ def get_status(self) -> dict: """Verify token and connectivity. Central: GET /status — returns account/user info Controller: GET /controller — returns controller status """ if self._mode == APIMode.CENTRAL: return self._request("GET", "/status") else: # Combine /status (node info) and /controller (controller flag) node_status = self._request("GET", "/status") try: ctrl_status = self._request("GET", "/controller") node_status["_controller"] = ctrl_status except ZeroTierAPIError: node_status["_controller"] = None return node_status # ------------------------------------------------------------------ # Network operations # ------------------------------------------------------------------ def list_networks(self) -> list[ZTNetwork]: """List all networks the token has access to.""" data = self._request("GET", self._network_path()) if self._mode == APIMode.CENTRAL: # Central returns an array of full network objects return [ZTNetwork.from_central(n) for n in data] else: # Controller returns an array of network ID strings # e.g. ["3e245e31af000001", "3e245e31af000002"] # Fetch each one for full details networks = [] for nwid in data: try: net = self.get_network(nwid) networks.append(net) except ZeroTierAPIError: logger.warning(f"[ZT API] Failed to fetch network {nwid}, skipping") return networks def get_network(self, network_id: str) -> ZTNetwork: """Fetch a single network by ID.""" network_id = validate_network_id(network_id) data = self._request("GET", self._network_path(network_id)) if self._mode == APIMode.CENTRAL: return ZTNetwork.from_central(data) else: return ZTNetwork.from_controller(data) # ------------------------------------------------------------------ # Member operations — the core of what Secuird needs # ------------------------------------------------------------------ def list_members(self, network_id: str) -> list[ZTMember]: """List all members on a network.""" network_id = validate_network_id(network_id) data = self._request("GET", self._member_path(network_id)) if self._mode == APIMode.CENTRAL: # Central returns an array of full member objects return [ZTMember.from_central(m) for m in data] else: # Controller returns {"nodeId": revisionCounter, ...} # We must fetch each member individually for full details members = [] for node_id in data: try: member = self.get_member(network_id, node_id) members.append(member) except ZeroTierAPIError: logger.warning( f"[ZT API] Failed to fetch member {node_id} on " f"network {network_id}, skipping" ) return members def get_member(self, network_id: str, node_id: str) -> ZTMember: """Fetch a single member on a network.""" network_id = validate_network_id(network_id) node_id = validate_node_id(node_id) data = self._request("GET", self._member_path(network_id, node_id)) if self._mode == APIMode.CENTRAL: return ZTMember.from_central(data) else: return ZTMember.from_controller(data, network_id=network_id) def _build_auth_body(self, authorized: bool) -> dict: """Build the JSON body for authorize/deauthorize calls. Central nests under config: {"config": {"authorized": true}} Controller is flat: {"authorized": true} """ if self._mode == APIMode.CENTRAL: return {"config": {"authorized": authorized}} else: return {"authorized": authorized} def authorize_member(self, network_id: str, node_id: str) -> ZTMember: """Authorize a member on a network. This is the enforcement action: the device can now communicate on the network. """ network_id = validate_network_id(network_id) node_id = validate_node_id(node_id) logger.info(f"[ZT API] Authorizing member {node_id} on network {network_id}") data = self._request( "POST", self._member_path(network_id, node_id), json_body=self._build_auth_body(True), ) if self._mode == APIMode.CENTRAL: return ZTMember.from_central(data) return ZTMember.from_controller(data, network_id=network_id) def deauthorize_member(self, network_id: str, node_id: str) -> ZTMember: """De-authorize a member on a network. The member remains in the member list but cannot communicate. This is the standard ZeroTier pattern for revoking access without deleting the member record. """ network_id = validate_network_id(network_id) node_id = validate_node_id(node_id) logger.info(f"[ZT API] De-authorizing member {node_id} on network {network_id}") data = self._request( "POST", self._member_path(network_id, node_id), json_body=self._build_auth_body(False), ) if self._mode == APIMode.CENTRAL: return ZTMember.from_central(data) return ZTMember.from_controller(data, network_id=network_id) def add_member(self, network_id: str, node_id: str, authorized: bool = False) -> ZTMember: """Manually add a member to a network. Creates the member record on the controller side. By default the member is de-authorized (matching the Secuird workflow where activation is a separate step). """ network_id = validate_network_id(network_id) node_id = validate_node_id(node_id) logger.info( f"[ZT API] Adding member {node_id} to network {network_id} " f"(authorized={authorized})" ) data = self._request( "POST", self._member_path(network_id, node_id), json_body=self._build_auth_body(authorized), ) if self._mode == APIMode.CENTRAL: return ZTMember.from_central(data) return ZTMember.from_controller(data, network_id=network_id) def delete_member(self, network_id: str, node_id: str) -> None: """Remove a member entirely. Use with caution. In most Secuird workflows we de-authorize rather than delete, so the member record persists for audit purposes. Note: the self-hosted controller API does not document a DELETE member endpoint; this may only work on Central. """ network_id = validate_network_id(network_id) node_id = validate_node_id(node_id) logger.warning(f"[ZT API] Deleting member {node_id} from network {network_id}") self._request("DELETE", self._member_path(network_id, node_id)) def update_member( self, network_id: str, node_id: str, *, name: Optional[str] = None, description: Optional[str] = None, authorized: Optional[bool] = None, ip_assignments: Optional[list[str]] = None, ) -> ZTMember: """Update member metadata or config fields.""" network_id = validate_network_id(network_id) node_id = validate_node_id(node_id) if self._mode == APIMode.CENTRAL: body: dict[str, Any] = {} if name is not None: body["name"] = name if description is not None: body["description"] = description config: dict[str, Any] = {} if authorized is not None: config["authorized"] = authorized if ip_assignments is not None: config["ipAssignments"] = ip_assignments if config: body["config"] = config else: # Controller API is flat body = {} if authorized is not None: body["authorized"] = authorized if ip_assignments is not None: body["ipAssignments"] = ip_assignments # name/description not supported on controller API data = self._request( "POST", self._member_path(network_id, node_id), json_body=body, ) if self._mode == APIMode.CENTRAL: return ZTMember.from_central(data) return ZTMember.from_controller(data, network_id=network_id) # ------------------------------------------------------------------ # Bulk operations — useful for kill-switch and reconciliation # ------------------------------------------------------------------ def deauthorize_all_members(self, network_id: str) -> list[ZTMember]: """De-authorize every member on a network. Returns updated members.""" members = self.list_members(network_id) results = [] for m in members: if m.is_authorized: updated = self.deauthorize_member(network_id, m.node_id) results.append(updated) return results def get_authorized_members(self, network_id: str) -> list[ZTMember]: """Return only currently authorized members.""" return [m for m in self.list_members(network_id) if m.is_authorized] def get_online_members(self, network_id: str) -> list[ZTMember]: """Return members seen in the last 5 minutes (Central API only). The self-hosted controller API does not expose lastOnline, so this will return an empty list in controller mode. """ cutoff = (int(time.time()) - 300) * 1000 # ms return [ m for m in self.list_members(network_id) if m.last_online and m.last_online > cutoff ]