feat(zerotier): add ZeroTier network governance module

Add comprehensive ZeroTier integration for managing network access:

- Portal networks: manager-created ZeroTier network bindings
- Device registration: user-owned ZeroTier node endpoints
- Approval workflows: request/approve/revoke network access
- Activation sessions: time-limited network authorization
- Kill switch: emergency access revocation
- Reconciliation job: sync portal state with ZeroTier controller

Includes ZeroTier client SDK supporting both Central and self-hosted
controller APIs, with full CRUD operations for networks and members.
This commit is contained in:
2026-03-20 21:50:20 +10:30
parent 49e724222f
commit 1789590167
27 changed files with 4862 additions and 4 deletions
+78
View File
@@ -213,3 +213,81 @@ class MfaRequirementOverride(str, Enum):
INHERIT = "inherit"
REQUIRED = "required"
EXEMPT = "exempt"
# ── ZeroTier / Portal Network ────────────────────────────────────────────────
class NetworkEnvironment(str, Enum):
"""Environment tag for a portal network."""
PRODUCTION = "production"
STAGING = "staging"
DEVELOPMENT = "development"
LAB = "lab"
class NetworkRequestMode(str, Enum):
"""How users request access to a portal network."""
OPEN = "open" # anyone in the org can request
APPROVAL_REQUIRED = "approval_required" # manager must approve
INVITE_ONLY = "invite_only" # only managers can assign
class ApprovalGrantType(str, Enum):
"""How a user was granted network access."""
REQUESTED = "requested" # user initiated
ASSIGNED = "assigned" # manager initiated
class ApprovalState(str, Enum):
"""State of a user network approval record."""
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
REVOKED = "revoked"
SUSPENDED = "suspended"
class MembershipState(str, Enum):
"""State of a device network membership record."""
PENDING_DEVICE_REGISTRATION = "pending_device_registration"
PENDING_REQUEST = "pending_request"
PENDING_MANAGER_APPROVAL = "pending_manager_approval"
APPROVED_INACTIVE = "approved_inactive"
JOINED_DEAUTHORIZED = "joined_deauthorized"
ACTIVE_AUTHORIZED = "active_authorized"
ACTIVATION_EXPIRED = "activation_expired"
SUSPENDED = "suspended"
REVOKED = "revoked"
REJECTED = "rejected"
class ActivationEndReason(str, Enum):
"""Why an activation session ended."""
EXPIRED = "expired"
LOGOUT = "logout"
KILL_SWITCH = "kill_switch"
MANUAL_REVOKE = "manual_revoke"
APPROVAL_REVOKED = "approval_revoked"
ADMIN_ACTION = "admin_action"
class KillSwitchScope(str, Enum):
"""Scope of a kill switch event."""
ORGANIZATION = "organization"
GLOBAL = "global"
SELECTED_NETWORKS = "selected_networks"
class DeviceStatus(str, Enum):
"""Status of a registered device."""
ACTIVE = "active"
INACTIVE = "inactive"
+808
View File
@@ -0,0 +1,808 @@
"""ZeroTier API client — reusable SDK for Secuird integration.
Supports both ZeroTier Central (hosted) and self-hosted controllers.
Central API (default):
Base URL: https://api.zerotier.com/api/v1
Auth: Authorization: token <api_token>
Docs: https://docs.zerotier.com/api/central/v1/
Self-hosted Controller API:
Base URL: http://<host>:9993
Auth: X-ZT1-Auth: <authtoken.secret>
Docs: https://docs.zerotier.com/api/service/v1/
"""
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
import requests
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Validation helpers
# ---------------------------------------------------------------------------
_NETWORK_ID_RE = re.compile(r"^[0-9a-fA-F]{16}$")
_NODE_ID_RE = re.compile(r"^[0-9a-fA-F]{10}$")
def validate_network_id(network_id: str) -> str:
"""Validate and normalise a 16-hex-char ZeroTier network ID."""
network_id = network_id.strip().lower()
if not _NETWORK_ID_RE.match(network_id):
raise ValueError(
f"Invalid ZeroTier network ID '{network_id}'"
"must be exactly 16 hexadecimal characters."
)
return network_id
def validate_node_id(node_id: str) -> str:
"""Validate and normalise a 10-hex-char ZeroTier node ID."""
node_id = node_id.strip().lower()
if not _NODE_ID_RE.match(node_id):
raise ValueError(
f"Invalid ZeroTier node ID '{node_id}'"
"must be exactly 10 hexadecimal characters."
)
return node_id
# ---------------------------------------------------------------------------
# API mode
# ---------------------------------------------------------------------------
class APIMode(Enum):
"""Which ZeroTier API flavour the client targets."""
CENTRAL = "central" # Hosted at api.zerotier.com
CONTROLLER = "controller" # Self-hosted zerotier-one service
# ---------------------------------------------------------------------------
# Data classes — typed representations of ZeroTier API objects
# ---------------------------------------------------------------------------
@dataclass
class ZTMemberConfig:
"""Subset of the member config block we care about."""
authorized: bool = False
active_bridge: bool = False
ip_assignments: list[str] = field(default_factory=list)
creation_time: Optional[int] = None
last_authorized_time: Optional[int] = None
last_deauthorized_time: Optional[int] = None
version_major: Optional[int] = None
version_minor: Optional[int] = None
version_rev: Optional[int] = None
@classmethod
def from_central(cls, data: dict) -> "ZTMemberConfig":
"""Parse config from Central API member.config block."""
return cls(
authorized=data.get("authorized", False),
active_bridge=data.get("activeBridge", False),
ip_assignments=data.get("ipAssignments", []),
creation_time=data.get("creationTime"),
last_authorized_time=data.get("lastAuthorizedTime"),
last_deauthorized_time=data.get("lastDeauthorizedTime"),
version_major=data.get("vMajor"),
version_minor=data.get("vMinor"),
version_rev=data.get("vRev"),
)
@classmethod
def from_controller(cls, data: dict) -> "ZTMemberConfig":
"""Parse config from self-hosted controller — fields are top-level."""
return cls(
authorized=data.get("authorized", False),
active_bridge=data.get("activeBridge", False),
ip_assignments=data.get("ipAssignments", []),
creation_time=data.get("creationTime"),
# Controller API does not return lastAuthorizedTime/lastDeauthorizedTime
version_major=data.get("vMajor"),
version_minor=data.get("vMinor"),
version_rev=data.get("vRev"),
)
@property
def version_string(self) -> str:
if self.version_major is not None:
return f"{self.version_major}.{self.version_minor}.{self.version_rev}"
return "unknown"
def to_dict(self) -> dict:
return {
"authorized": self.authorized,
"active_bridge": self.active_bridge,
"ip_assignments": self.ip_assignments,
"creation_time": self.creation_time,
"last_authorized_time": self.last_authorized_time,
"last_deauthorized_time": self.last_deauthorized_time,
"version_string": self.version_string,
}
@dataclass
class ZTMember:
"""A member (device) on a ZeroTier network."""
id: str # composite "{networkId}-{nodeId}" on Central, just nodeId on controller
network_id: str
node_id: str
name: Optional[str] = None
description: Optional[str] = None
hidden: bool = False
config: ZTMemberConfig = field(default_factory=ZTMemberConfig)
last_online: Optional[int] = None
last_seen: Optional[int] = None
physical_address: Optional[str] = None
client_version: Optional[str] = None
controller_id: Optional[str] = None
raw: dict = field(default_factory=dict, repr=False)
@classmethod
def from_central(cls, data: dict) -> "ZTMember":
"""Parse member from Central API response."""
config_data = data.get("config", {})
return cls(
id=data.get("id", ""),
network_id=data.get("networkId", ""),
node_id=data.get("nodeId", ""),
name=data.get("name"),
description=data.get("description"),
hidden=data.get("hidden", False),
config=ZTMemberConfig.from_central(config_data),
last_online=data.get("lastOnline"),
last_seen=data.get("lastSeen"),
physical_address=data.get("physicalAddress"),
client_version=data.get("clientVersion"),
controller_id=data.get("controllerId"),
raw=data,
)
@classmethod
def from_controller(cls, data: dict, network_id: str = "") -> "ZTMember":
"""Parse member from self-hosted controller API response.
Controller responses are flat — authorized, ipAssignments etc. are
top-level keys, not nested under a config block.
"""
node_id = data.get("id", data.get("address", ""))
nwid = data.get("nwid", network_id)
return cls(
id=f"{nwid}-{node_id}",
network_id=nwid,
node_id=node_id,
name=None, # controller API doesn't have name/description
description=None,
hidden=False,
config=ZTMemberConfig.from_controller(data),
last_online=None, # not available in controller API
last_seen=None,
physical_address=None,
client_version=None,
controller_id=None,
raw=data,
)
@property
def is_authorized(self) -> bool:
return self.config.authorized
@property
def display_name(self) -> str:
return self.name or self.node_id
@property
def ip_list(self) -> str:
return ", ".join(self.config.ip_assignments) if self.config.ip_assignments else "none"
@property
def last_seen_str(self) -> str:
if not self.last_seen or self.last_seen == 0:
return "never"
dt = datetime.fromtimestamp(self.last_seen / 1000, tz=timezone.utc)
delta = datetime.now(tz=timezone.utc) - dt
if delta.total_seconds() < 120:
return "just now"
if delta.total_seconds() < 3600:
return f"{int(delta.total_seconds() / 60)}m ago"
if delta.total_seconds() < 86400:
return f"{int(delta.total_seconds() / 3600)}h ago"
return f"{int(delta.days)}d ago"
def to_dict(self) -> dict:
return {
"id": self.id,
"network_id": self.network_id,
"node_id": self.node_id,
"name": self.name,
"description": self.description,
"hidden": self.hidden,
"is_authorized": self.is_authorized,
"display_name": self.display_name,
"ip_list": self.ip_list,
"last_online": self.last_online,
"last_seen": self.last_seen,
"last_seen_str": self.last_seen_str,
"client_version": self.client_version,
"controller_id": self.controller_id,
"config": self.config.to_dict(),
}
@dataclass
class ZTNetworkConfig:
"""Subset of the network config block."""
name: str = ""
private: bool = True
creation_time: Optional[int] = None
ip_assignment_pools: list[dict] = field(default_factory=list)
routes: list[dict] = field(default_factory=list)
@classmethod
def from_central(cls, data: dict) -> "ZTNetworkConfig":
"""Parse from Central API network.config block."""
return cls(
name=data.get("name", ""),
private=data.get("private", True),
creation_time=data.get("creationTime"),
ip_assignment_pools=data.get("ipAssignmentPools", []),
routes=data.get("routes", []),
)
@classmethod
def from_controller(cls, data: dict) -> "ZTNetworkConfig":
"""Parse from self-hosted controller — fields are top-level."""
return cls(
name=data.get("name", ""),
private=data.get("private", True),
creation_time=data.get("creationTime"),
ip_assignment_pools=data.get("ipAssignmentPools", []),
routes=data.get("routes", []),
)
def to_dict(self) -> dict:
return {
"name": self.name,
"private": self.private,
"creation_time": self.creation_time,
"ip_assignment_pools": self.ip_assignment_pools,
"routes": self.routes,
}
@dataclass
class ZTNetwork:
"""A ZeroTier virtual network."""
id: str
config: ZTNetworkConfig = field(default_factory=ZTNetworkConfig)
description: Optional[str] = None
owner_id: Optional[str] = None
online_member_count: int = 0
authorized_member_count: int = 0
total_member_count: int = 0
raw: dict = field(default_factory=dict, repr=False)
@classmethod
def from_central(cls, data: dict) -> "ZTNetwork":
"""Parse from Central API response."""
config_data = data.get("config", {})
return cls(
id=data.get("id", ""),
config=ZTNetworkConfig.from_central(config_data),
description=data.get("description"),
owner_id=data.get("ownerId"),
online_member_count=data.get("onlineMemberCount", 0),
authorized_member_count=data.get("authorizedMemberCount", 0),
total_member_count=data.get("totalMemberCount", 0),
raw=data,
)
@classmethod
def from_controller(cls, data: dict) -> "ZTNetwork":
"""Parse from self-hosted controller response — flat structure."""
return cls(
id=data.get("id", data.get("nwid", "")),
config=ZTNetworkConfig.from_controller(data),
description=None, # controller API doesn't have description
owner_id=None,
online_member_count=0, # not available from controller
authorized_member_count=0,
total_member_count=0,
raw=data,
)
@property
def name(self) -> str:
return self.config.name or self.id
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"owner_id": self.owner_id,
"online_member_count": self.online_member_count,
"authorized_member_count": self.authorized_member_count,
"total_member_count": self.total_member_count,
"config": self.config.to_dict(),
}
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
class ZeroTierAPIError(Exception):
"""Base exception for ZeroTier API errors."""
def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[dict] = None):
super().__init__(message)
self.status_code = status_code
self.response = response
class ZeroTierAuthError(ZeroTierAPIError):
"""401/403 from ZeroTier — bad or missing token."""
pass
class ZeroTierNotFoundError(ZeroTierAPIError):
"""404 — network or member not found."""
pass
class ZeroTierRateLimitError(ZeroTierAPIError):
"""429 — rate limited."""
pass
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
class ZeroTierClient:
"""HTTP client for ZeroTier — supports both Central and self-hosted controllers.
Designed to be lifted into Secuird's backend as-is. All methods are
instance methods so the client can be instantiated with different tokens
(e.g. per-organization tokens in a multi-tenant setup).
Usage — Central (default):
client = ZeroTierClient(api_token="your-central-token")
Usage — Self-hosted controller:
client = ZeroTierClient(
api_token="contents-of-authtoken.secret",
base_url="http://my-controller:9993",
mode=APIMode.CONTROLLER,
)
Then:
network = client.get_network("8056c2e21c000001")
members = client.list_members("8056c2e21c000001")
client.authorize_member("8056c2e21c000001", "abcdef0123")
client.deauthorize_member("8056c2e21c000001", "abcdef0123")
"""
DEFAULT_CENTRAL_URL = "https://api.zerotier.com/api/v1"
DEFAULT_CONTROLLER_URL = "http://localhost:9993"
DEFAULT_TIMEOUT = 15 # seconds
def __init__(
self,
api_token: str,
base_url: Optional[str] = None,
mode: APIMode = APIMode.CENTRAL,
timeout: int = DEFAULT_TIMEOUT,
max_retries: int = 2,
):
if not api_token:
raise ValueError("ZeroTier API token is required.")
self._api_token = api_token
self._mode = mode
self._timeout = timeout
self._max_retries = max_retries
# Default base URL depends on mode
if base_url:
self._base_url = base_url.rstrip("/")
elif mode == APIMode.CENTRAL:
self._base_url = self.DEFAULT_CENTRAL_URL
else:
self._base_url = self.DEFAULT_CONTROLLER_URL
# Build session with correct auth header
self._session = requests.Session()
self._session.headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json",
}
)
if mode == APIMode.CENTRAL:
self._session.headers["Authorization"] = f"token {self._api_token}"
else:
self._session.headers["X-ZT1-Auth"] = self._api_token
@property
def mode(self) -> APIMode:
return self._mode
@property
def base_url(self) -> str:
return self._base_url
# ------------------------------------------------------------------
# Low-level HTTP
# ------------------------------------------------------------------
def _request(
self,
method: str,
path: str,
json_body: Optional[dict] = None,
params: Optional[dict] = None,
) -> Any:
"""Execute an HTTP request with retry and error mapping."""
url = f"{self._base_url}{path}"
last_error: Optional[Exception] = None
for attempt in range(1, self._max_retries + 1):
try:
logger.debug(f"[ZT API] {method} {url} (attempt {attempt})")
resp = self._session.request(
method=method,
url=url,
json=json_body,
params=params,
timeout=self._timeout,
)
return self._handle_response(resp)
except ZeroTierRateLimitError:
if attempt < self._max_retries:
wait = 2 ** attempt
logger.warning(f"[ZT API] Rate limited, retrying in {wait}s...")
time.sleep(wait)
else:
raise
except requests.exceptions.Timeout as exc:
last_error = exc
if attempt < self._max_retries:
wait = 2 ** attempt
logger.warning(f"[ZT API] Timeout, retrying in {wait}s...")
time.sleep(wait)
else:
raise ZeroTierAPIError(f"Request timed out after {self._timeout}s") from exc
except requests.exceptions.ConnectionError as exc:
last_error = exc
if attempt < self._max_retries:
wait = 2 ** attempt
logger.warning(f"[ZT API] Connection error, retrying in {wait}s...")
time.sleep(wait)
else:
raise ZeroTierAPIError(
f"Connection to ZeroTier API failed at {self._base_url}"
) from exc
# Should not reach here, but just in case
raise ZeroTierAPIError("Request failed after retries") from last_error
def _handle_response(self, resp: requests.Response) -> Any:
"""Map HTTP status codes to typed exceptions."""
if resp.status_code == 200:
if resp.content:
return resp.json()
return None
# Try to parse error body
error_body = None
try:
error_body = resp.json()
except (ValueError, requests.exceptions.JSONDecodeError):
pass
msg = f"ZeroTier API error {resp.status_code}"
if error_body and isinstance(error_body, dict):
msg = error_body.get("message", msg)
if resp.status_code in (401, 403):
raise ZeroTierAuthError(msg, status_code=resp.status_code, response=error_body)
if resp.status_code == 404:
raise ZeroTierNotFoundError(msg, status_code=resp.status_code, response=error_body)
if resp.status_code == 429:
raise ZeroTierRateLimitError(msg, status_code=resp.status_code, response=error_body)
raise ZeroTierAPIError(msg, status_code=resp.status_code, response=error_body)
# ------------------------------------------------------------------
# Path builders — abstract away Central vs Controller URL differences
# ------------------------------------------------------------------
def _network_path(self, network_id: str = "") -> str:
"""Build the path prefix for network endpoints."""
if self._mode == APIMode.CENTRAL:
return f"/network/{network_id}" if network_id else "/network"
else:
return f"/controller/network/{network_id}" if network_id else "/controller/network"
def _member_path(self, network_id: str, node_id: str = "") -> str:
"""Build the path for member endpoints."""
base = self._network_path(network_id) + "/member"
if node_id:
base += f"/{node_id}"
return base
# ------------------------------------------------------------------
# Status / connectivity check
# ------------------------------------------------------------------
def get_status(self) -> dict:
"""Verify token and connectivity.
Central: GET /status — returns account/user info
Controller: GET /controller — returns controller status
"""
if self._mode == APIMode.CENTRAL:
return self._request("GET", "/status")
else:
# Combine /status (node info) and /controller (controller flag)
node_status = self._request("GET", "/status")
try:
ctrl_status = self._request("GET", "/controller")
node_status["_controller"] = ctrl_status
except ZeroTierAPIError:
node_status["_controller"] = None
return node_status
# ------------------------------------------------------------------
# Network operations
# ------------------------------------------------------------------
def list_networks(self) -> list[ZTNetwork]:
"""List all networks the token has access to."""
data = self._request("GET", self._network_path())
if self._mode == APIMode.CENTRAL:
# Central returns an array of full network objects
return [ZTNetwork.from_central(n) for n in data]
else:
# Controller returns an array of network ID strings
# e.g. ["3e245e31af000001", "3e245e31af000002"]
# Fetch each one for full details
networks = []
for nwid in data:
try:
net = self.get_network(nwid)
networks.append(net)
except ZeroTierAPIError:
logger.warning(f"[ZT API] Failed to fetch network {nwid}, skipping")
return networks
def get_network(self, network_id: str) -> ZTNetwork:
"""Fetch a single network by ID."""
network_id = validate_network_id(network_id)
data = self._request("GET", self._network_path(network_id))
if self._mode == APIMode.CENTRAL:
return ZTNetwork.from_central(data)
else:
return ZTNetwork.from_controller(data)
# ------------------------------------------------------------------
# Member operations — the core of what Secuird needs
# ------------------------------------------------------------------
def list_members(self, network_id: str) -> list[ZTMember]:
"""List all members on a network."""
network_id = validate_network_id(network_id)
data = self._request("GET", self._member_path(network_id))
if self._mode == APIMode.CENTRAL:
# Central returns an array of full member objects
return [ZTMember.from_central(m) for m in data]
else:
# Controller returns {"nodeId": revisionCounter, ...}
# We must fetch each member individually for full details
members = []
for node_id in data:
try:
member = self.get_member(network_id, node_id)
members.append(member)
except ZeroTierAPIError:
logger.warning(
f"[ZT API] Failed to fetch member {node_id} on "
f"network {network_id}, skipping"
)
return members
def get_member(self, network_id: str, node_id: str) -> ZTMember:
"""Fetch a single member on a network."""
network_id = validate_network_id(network_id)
node_id = validate_node_id(node_id)
data = self._request("GET", self._member_path(network_id, node_id))
if self._mode == APIMode.CENTRAL:
return ZTMember.from_central(data)
else:
return ZTMember.from_controller(data, network_id=network_id)
def _build_auth_body(self, authorized: bool) -> dict:
"""Build the JSON body for authorize/deauthorize calls.
Central nests under config: {"config": {"authorized": true}}
Controller is flat: {"authorized": true}
"""
if self._mode == APIMode.CENTRAL:
return {"config": {"authorized": authorized}}
else:
return {"authorized": authorized}
def authorize_member(self, network_id: str, node_id: str) -> ZTMember:
"""Authorize a member on a network.
This is the enforcement action: the device can now communicate
on the network.
"""
network_id = validate_network_id(network_id)
node_id = validate_node_id(node_id)
logger.info(f"[ZT API] Authorizing member {node_id} on network {network_id}")
data = self._request(
"POST",
self._member_path(network_id, node_id),
json_body=self._build_auth_body(True),
)
if self._mode == APIMode.CENTRAL:
return ZTMember.from_central(data)
return ZTMember.from_controller(data, network_id=network_id)
def deauthorize_member(self, network_id: str, node_id: str) -> ZTMember:
"""De-authorize a member on a network.
The member remains in the member list but cannot communicate.
This is the standard ZeroTier pattern for revoking access without
deleting the member record.
"""
network_id = validate_network_id(network_id)
node_id = validate_node_id(node_id)
logger.info(f"[ZT API] De-authorizing member {node_id} on network {network_id}")
data = self._request(
"POST",
self._member_path(network_id, node_id),
json_body=self._build_auth_body(False),
)
if self._mode == APIMode.CENTRAL:
return ZTMember.from_central(data)
return ZTMember.from_controller(data, network_id=network_id)
def add_member(self, network_id: str, node_id: str, authorized: bool = False) -> ZTMember:
"""Manually add a member to a network.
Creates the member record on the controller side. By default the
member is de-authorized (matching the Secuird workflow where
activation is a separate step).
"""
network_id = validate_network_id(network_id)
node_id = validate_node_id(node_id)
logger.info(
f"[ZT API] Adding member {node_id} to network {network_id} "
f"(authorized={authorized})"
)
data = self._request(
"POST",
self._member_path(network_id, node_id),
json_body=self._build_auth_body(authorized),
)
if self._mode == APIMode.CENTRAL:
return ZTMember.from_central(data)
return ZTMember.from_controller(data, network_id=network_id)
def delete_member(self, network_id: str, node_id: str) -> None:
"""Remove a member entirely.
Use with caution. In most Secuird workflows we de-authorize rather
than delete, so the member record persists for audit purposes.
Note: the self-hosted controller API does not document a DELETE
member endpoint; this may only work on Central.
"""
network_id = validate_network_id(network_id)
node_id = validate_node_id(node_id)
logger.warning(f"[ZT API] Deleting member {node_id} from network {network_id}")
self._request("DELETE", self._member_path(network_id, node_id))
def update_member(
self,
network_id: str,
node_id: str,
*,
name: Optional[str] = None,
description: Optional[str] = None,
authorized: Optional[bool] = None,
ip_assignments: Optional[list[str]] = None,
) -> ZTMember:
"""Update member metadata or config fields."""
network_id = validate_network_id(network_id)
node_id = validate_node_id(node_id)
if self._mode == APIMode.CENTRAL:
body: dict[str, Any] = {}
if name is not None:
body["name"] = name
if description is not None:
body["description"] = description
config: dict[str, Any] = {}
if authorized is not None:
config["authorized"] = authorized
if ip_assignments is not None:
config["ipAssignments"] = ip_assignments
if config:
body["config"] = config
else:
# Controller API is flat
body = {}
if authorized is not None:
body["authorized"] = authorized
if ip_assignments is not None:
body["ipAssignments"] = ip_assignments
# name/description not supported on controller API
data = self._request(
"POST",
self._member_path(network_id, node_id),
json_body=body,
)
if self._mode == APIMode.CENTRAL:
return ZTMember.from_central(data)
return ZTMember.from_controller(data, network_id=network_id)
# ------------------------------------------------------------------
# Bulk operations — useful for kill-switch and reconciliation
# ------------------------------------------------------------------
def deauthorize_all_members(self, network_id: str) -> list[ZTMember]:
"""De-authorize every member on a network. Returns updated members."""
members = self.list_members(network_id)
results = []
for m in members:
if m.is_authorized:
updated = self.deauthorize_member(network_id, m.node_id)
results.append(updated)
return results
def get_authorized_members(self, network_id: str) -> list[ZTMember]:
"""Return only currently authorized members."""
return [m for m in self.list_members(network_id) if m.is_authorized]
def get_online_members(self, network_id: str) -> list[ZTMember]:
"""Return members seen in the last 5 minutes (Central API only).
The self-hosted controller API does not expose lastOnline, so this
will return an empty list in controller mode.
"""
cutoff = (int(time.time()) - 300) * 1000 # ms
return [
m for m in self.list_members(network_id)
if m.last_online and m.last_online > cutoff
]