Files
gatehouse-api/tests/integration/test_cli.py
T

553 lines
20 KiB
Python

"""CLI integration tests.
Runs ``gatehouse-cli.py`` as a subprocess against a real Flask server
with a file-based SQLite database. Each test is self-contained:
it creates its own user, org, CA, and SSH keys via the API, then
caches the auth token and invokes the CLI.
"""
import json
import os
import socket
import subprocess
import sys
import threading
import time
import uuid
import pytest
import requests
from gatehouse_app import create_app
from gatehouse_app.extensions import db
# ---------------------------------------------------------------------------
# Module-scoped server fixture
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def cli_server(tmp_path_factory):
"""Flask test server on ``127.0.0.1`` with a random port + temp SQLite DB.
Yields ``(server_url, app)``. A single DB file is shared by all tests
in the module; each test uses UUID-suffixed names so there are no
collisions.
"""
server_dir = tmp_path_factory.mktemp("cli-server")
db_path = server_dir / "test.db"
session_dir = server_dir / "sessions"
session_dir.mkdir()
app = create_app(config_name="testing")
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{db_path}"
app.config["SESSION_FILE_DIR"] = str(session_dir)
with app.app_context():
db.create_all()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("127.0.0.1", 0))
port = sock.getsockname()[1]
sock.close()
server_url = f"http://127.0.0.1:{port}"
t = threading.Thread(
target=lambda: app.run(
host="127.0.0.1", port=port, use_reloader=False, debug=False,
),
daemon=True,
)
t.start()
for _ in range(50):
try:
requests.get(f"{server_url}/api/v1/auth/login", timeout=1)
break
except requests.ConnectionError:
time.sleep(0.1)
else:
raise RuntimeError("Server did not start in time")
yield server_url, app
# ---------------------------------------------------------------------------
# Per-test home-directory fixture
# ---------------------------------------------------------------------------
@pytest.fixture
def home_dir(tmp_path):
"""Isolated ``~/.gatehouse`` + ``~/.ssh`` per test."""
return tmp_path
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _register_user(server_url, email, password, full_name="Test User"):
"""Register a user via ``POST /api/v1/auth/register``, return (user, token)."""
resp = requests.post(
f"{server_url}/api/v1/auth/register",
json={
"email": email,
"password": password,
"password_confirm": password,
"full_name": full_name,
},
)
assert resp.status_code == 201, (
f"Register failed ({resp.status_code}): {resp.text}"
)
data = resp.json()["data"]
return data["user"], data["token"]
def _auth_headers(token):
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def _create_org(server_url, token, name):
"""Create an org (caller becomes owner)."""
resp = requests.post(
f"{server_url}/api/v1/organizations",
json={"name": name, "slug": name.lower().replace(" ", "-")},
headers=_auth_headers(token),
)
assert resp.status_code == 201, (
f"Create org failed ({resp.status_code}): {resp.text}"
)
return resp.json()["data"]["organization"]
def _create_ca(server_url, token, org_id, ca_type="user"):
"""Create a real CA via ``POST /organizations/{id}/cas``."""
resp = requests.post(
f"{server_url}/api/v1/organizations/{org_id}/cas",
json={
"name": f"Test-CA-{uuid.uuid4().hex[:6]}",
"ca_type": ca_type,
"key_type": "ed25519",
},
headers=_auth_headers(token),
)
assert resp.status_code == 201, (
f"Create CA failed ({resp.status_code}): {resp.text}"
)
return resp.json()["data"]["ca"]
def _create_principal(server_url, token, org_id, name="deploy"):
"""Create a principal in an org."""
resp = requests.post(
f"{server_url}/api/v1/organizations/{org_id}/principals",
json={"name": name, "description": f"Principal {name}"},
headers=_auth_headers(token),
)
assert resp.status_code == 201, (
f"Create principal failed ({resp.status_code}): {resp.text}"
)
return resp.json()["data"]["principal"]
def _add_principal_member(server_url, token, org_id, principal_id, email):
"""Add a user as a direct member of a principal (``POST .../members``)."""
resp = requests.post(
f"{server_url}/api/v1/organizations/{org_id}/principals/{principal_id}/members",
json={"email": email},
headers=_auth_headers(token),
)
# 201 = created; 409 = already exists (OK)
assert resp.status_code in (201, 409), (
f"Add principal member failed ({resp.status_code}): {resp.text}"
)
def _add_ssh_key(server_url, token, public_key):
"""Add an SSH public key via the API."""
resp = requests.post(
f"{server_url}/api/v1/ssh/keys",
json={"key": public_key, "description": "CLI test key"},
headers=_auth_headers(token),
)
assert resp.status_code == 201, (
f"Add SSH key failed ({resp.status_code}): {resp.text}"
)
return resp.json()["data"]
def _mark_key_verified(app, key_id):
"""Bypass signature check — mark a key as ``verified`` directly in the DB."""
from gatehouse_app.models.ssh_ca.ssh_key import SSHKey
with app.app_context():
key = db.session.get(SSHKey, key_id)
if key:
key.verified = True
key.verified_at = __import__(
"datetime"
).datetime.now(__import__("pytz").UTC)
db.session.commit()
def _cache_token(home_dir, token):
"""Write the token to ``~/.gatehouse/token_cache.json``."""
cache_dir = home_dir / ".gatehouse"
cache_dir.mkdir(exist_ok=True)
(cache_dir / "token_cache.json").write_text(json.dumps({"token": token}))
def _run_cli(server_url, home_dir, *args):
"""Run ``gatehouse-cli.py`` as a subprocess, return ``CompletedProcess``."""
env = os.environ.copy()
env["SIGN_URL"] = server_url
env["HOME"] = str(home_dir)
proc = subprocess.run(
[sys.executable, "client/gatehouse-cli.py", *args],
capture_output=True,
text=True,
env=env,
timeout=30,
)
return proc
def _output(result: subprocess.CompletedProcess) -> str:
"""Combine stdout + stderr for message-level assertions.
The CLI emits via coloredlogs (stderr) and occasional ``print()``
calls (stdout), so we check both.
"""
return result.stdout + result.stderr
def _cleanup_shared_files():
"""Remove temp files the CLI writes to known paths."""
for p in ("/tmp/challenge.txt", "/tmp/challenge.txt.sig", "/tmp/ssh-cert"):
if os.path.exists(p):
os.remove(p)
# ---- unique-name helpers ---------------------------------------------------
def _uniq(prefix="test"):
return f"{prefix}-{uuid.uuid4().hex[:8]}"
def _email():
return f"{_uniq('cli')}@example.com"
# ===========================================================================
# Tests
# ===========================================================================
class TestCLI:
"""CLI integration tests — each test exercises a different subcommand."""
# ------------------------------------------------------------------
# 1. Add SSH key (real ssh-keygen flow)
# ------------------------------------------------------------------
def test_cli_add_key(self, cli_server, home_dir):
"""``gatehouse-cli.py -a -k <pubkey>`` — add + verify a key."""
_cleanup_shared_files()
server_url, app = cli_server
email = _email()
pw = "MyPassword123!"
_, token = _register_user(server_url, email, pw)
# Generate a real Ed25519 keypair
key_path = os.path.join(home_dir, "id_ed25519")
gen = subprocess.run(
["ssh-keygen", "-t", "ed25519", "-f", key_path, "-N", "", "-C", email],
capture_output=True,
)
if gen.returncode != 0:
pytest.skip(f"ssh-keygen not available: {gen.stderr.decode()}")
_cache_token(home_dir, token)
result = _run_cli(server_url, home_dir, "-a", "-k", key_path + ".pub")
assert result.returncode == 0, (
f"CLI add-key failed:\nstderr: {result.stderr}\nstdout: {result.stdout}"
)
assert "added successfully" in _output(result), (
f"Expected 'added successfully' in:\n{_output(result)}"
)
# ------------------------------------------------------------------
# 2. List SSH keys
# ------------------------------------------------------------------
def test_cli_list_keys(self, cli_server, home_dir):
"""``gatehouse-cli.py --list-keys`` — shows registered keys."""
_cleanup_shared_files()
server_url, app = cli_server
email = _email()
pw = "MyPassword123!"
_, token = _register_user(server_url, email, pw)
# Add two SSH keys via API (using unique generated keys)
key1 = _add_ssh_key(server_url, token, self._gen_pubkey("key1"))
key2 = _add_ssh_key(server_url, token, self._gen_pubkey("key2"))
_cache_token(home_dir, token)
result = _run_cli(server_url, home_dir, "--list-keys")
assert result.returncode == 0, (
f"CLI list-keys failed:\n{result.stderr}"
)
assert key1["id"][:8] in _output(result), (
f"Key1 ID prefix not in output:\n{_output(result)}"
)
assert key2["id"][:8] in _output(result), (
f"Key2 ID prefix not in output:\n{_output(result)}"
)
_key_counter = 0
def _gen_pubkey(self, tag: str) -> str:
"""Return a unique-looking but structurally valid Ed25519 public key."""
unique = uuid.uuid4().hex[:32]
padding = "A" * (43 - 32)
return (
f"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAI{unique}{padding} {tag}@example.com"
)
def _real_pubkey(self, tag: str) -> str:
"""Generate a real Ed25519 keypair and return the public key string."""
TestCLI._key_counter += 1
tmp = f"/tmp/cli_test_key_{TestCLI._key_counter}_{uuid.uuid4().hex[:8]}"
subprocess.run(
["ssh-keygen", "-t", "ed25519", "-f", tmp, "-N", "", "-C", tag],
capture_output=True,
)
with open(f"{tmp}.pub") as f:
pub = f.read().strip()
for p in (tmp, f"{tmp}.pub"):
try:
os.unlink(p)
except OSError:
pass
return pub
# ------------------------------------------------------------------
# 3. Remove SSH key
# ------------------------------------------------------------------
def test_cli_remove_key(self, cli_server, home_dir):
"""``gatehouse-cli.py --remove-key <id>`` — deletes a key."""
_cleanup_shared_files()
server_url, app = cli_server
email = _email()
pw = "MyPassword123!"
_, token = _register_user(server_url, email, pw)
key = _add_ssh_key(server_url, token, self._gen_pubkey("remove-me"))
_cache_token(home_dir, token)
result = _run_cli(server_url, home_dir, "--remove-key", key["id"])
assert result.returncode == 0, (
f"CLI remove-key failed:\n{result.stderr}"
)
assert "removed successfully" in _output(result), (
f"Expected removal confirmation in:\n{_output(result)}"
)
# ------------------------------------------------------------------
# 4. List organizations
# ------------------------------------------------------------------
def test_cli_list_orgs(self, cli_server, home_dir):
"""``gatehouse-cli.py --list-orgs`` — shows org memberships."""
_cleanup_shared_files()
server_url, app = cli_server
email = _email()
pw = "MyPassword123!"
_, token = _register_user(server_url, email, pw)
org_a = _create_org(server_url, token, f"Alpha-{uuid.uuid4().hex[:6]}")
org_b = _create_org(server_url, token, f"Beta-{uuid.uuid4().hex[:6]}")
_cache_token(home_dir, token)
result = _run_cli(server_url, home_dir, "--list-orgs")
assert result.returncode == 0, (
f"CLI list-orgs failed:\n{result.stderr}"
)
assert org_a["name"] in _output(result), (
f"Org A name not in output:\n{_output(result)}"
)
assert org_b["name"] in _output(result), (
f"Org B name not in output:\n{_output(result)}"
)
# ------------------------------------------------------------------
# 5. Certificate signing — single org (no --org-id needed)
# ------------------------------------------------------------------
def test_cli_request_cert_single_org(self, cli_server, home_dir):
"""``gatehouse-cli.py -r`` — auto-selects the only org."""
_cleanup_shared_files()
server_url, app = cli_server
email = _email()
pw = "MyPassword123!"
_, token = _register_user(server_url, email, pw)
org = _create_org(server_url, token, f"Single-{uuid.uuid4().hex[:6]}")
_create_ca(server_url, token, org["id"], ca_type="user")
princ = _create_principal(server_url, token, org["id"], name="deploy")
_add_principal_member(server_url, token, org["id"], princ["id"], email)
key = _add_ssh_key(server_url, token, self._real_pubkey("cert1"))
_mark_key_verified(app, key["id"])
_cache_token(home_dir, token)
result = _run_cli(server_url, home_dir, "-r")
assert result.returncode == 0, (
f"CLI request-cert failed:\nstderr: {result.stderr}\nstdout: {result.stdout}"
)
assert "Certificate signed successfully" in _output(result), (
f"Expected success in:\n{_output(result)}"
)
assert princ["name"] in _output(result), (
f"Principal name not in output:\n{_output(result)}"
)
# ------------------------------------------------------------------
# 6. Certificate signing — multi org WITHOUT --org-id
# ------------------------------------------------------------------
def test_cli_request_cert_multi_org_no_org(self, cli_server, home_dir):
"""``gatehouse-cli.py -r`` w/o ``--org-id`` — ambiguous error."""
_cleanup_shared_files()
server_url, app = cli_server
email = _email()
pw = "MyPassword123!"
_, token = _register_user(server_url, email, pw)
org1 = _create_org(server_url, token, f"MultiA-{uuid.uuid4().hex[:6]}")
_create_ca(server_url, token, org1["id"], ca_type="user")
org2 = _create_org(server_url, token, f"MultiB-{uuid.uuid4().hex[:6]}")
_create_ca(server_url, token, org2["id"], ca_type="user")
princ1 = _create_principal(server_url, token, org1["id"], name="deploy")
_add_principal_member(server_url, token, org1["id"], princ1["id"], email)
key = _add_ssh_key(server_url, token, self._real_pubkey("multi"))
_mark_key_verified(app, key["id"])
_cache_token(home_dir, token)
result = _run_cli(server_url, home_dir, "-r")
assert result.returncode == 1, (
f"Expected exit code 1, got {result.returncode}\n"
f"stdout: {result.stdout}\nstderr: {result.stderr}"
)
assert "multiple organizations" in _output(result).lower()
# ------------------------------------------------------------------
# 7. Certificate signing — multi org WITH --org-id
# ------------------------------------------------------------------
def test_cli_request_cert_multi_org_with_org(self, cli_server, home_dir):
"""``gatehouse-cli.py -r --org-id <id>`` — specifies the org."""
_cleanup_shared_files()
server_url, app = cli_server
email = _email()
pw = "MyPassword123!"
_, token = _register_user(server_url, email, pw)
org1 = _create_org(server_url, token, f"WithA-{uuid.uuid4().hex[:6]}")
_create_ca(server_url, token, org1["id"], ca_type="user")
princ1 = _create_principal(server_url, token, org1["id"], name="deploy")
_add_principal_member(server_url, token, org1["id"], princ1["id"], email)
org2 = _create_org(server_url, token, f"WithB-{uuid.uuid4().hex[:6]}")
_create_ca(server_url, token, org2["id"], ca_type="user")
key = _add_ssh_key(server_url, token, self._real_pubkey("with-org"))
_mark_key_verified(app, key["id"])
_cache_token(home_dir, token)
result = _run_cli(server_url, home_dir, "-r", "--org-id", org1["id"])
assert result.returncode == 0, (
f"CLI request-cert with org-id failed:\n"
f"stderr: {result.stderr}\nstdout: {result.stdout}"
)
assert "Certificate signed successfully" in _output(result)
# ------------------------------------------------------------------
# 8. Install known hosts
# ------------------------------------------------------------------
def test_cli_install_known_hosts(self, cli_server, home_dir):
"""``gatehouse-cli.py --install-known-hosts`` — fetches Host CA."""
_cleanup_shared_files()
server_url, app = cli_server
email = _email()
pw = "MyPassword123!"
_, token = _register_user(server_url, email, pw)
org = _create_org(server_url, token, f"HostCA-{uuid.uuid4().hex[:6]}")
ca = _create_ca(server_url, token, org["id"], ca_type="host")
_cache_token(home_dir, token)
result = _run_cli(server_url, home_dir, "--install-known-hosts")
assert result.returncode == 0, (
f"CLI install-known-hosts failed:\n{result.stderr}"
)
assert "Successfully installed Host CA" in _output(result)
# Verify the key was written to known_hosts
known_hosts = home_dir / ".ssh" / "known_hosts"
assert known_hosts.exists(), "known_hosts file was not created"
content = known_hosts.read_text()
assert ca["public_key"].strip() in content, (
"CA public key not found in known_hosts"
)
# ------------------------------------------------------------------
# 9. Clear cache
# ------------------------------------------------------------------
def test_cli_clear_cache(self, cli_server, home_dir):
"""``gatehouse-cli.py --clear-cache`` — removes cached token."""
# Create a bogus cache file first
cache_dir = home_dir / ".gatehouse"
cache_dir.mkdir(exist_ok=True)
(cache_dir / "token_cache.json").write_text('{"token":"dummy"}')
result = _run_cli("http://localhost:0", home_dir, "--clear-cache")
assert result.returncode == 0, (
f"CLI clear-cache failed:\n{result.stderr}"
)
assert "Cached token removed" in _output(result)
assert not (cache_dir / "token_cache.json").exists()
# ------------------------------------------------------------------
# 10. Check certificate (no cert file)
# ------------------------------------------------------------------
def test_cli_check_cert(self, cli_server, home_dir):
"""``gatehouse-cli.py -c`` — reports missing cert."""
_cleanup_shared_files()
result = _run_cli("http://localhost:0", home_dir, "-c")
assert result.returncode == 1, (
f"Expected exit code 1, got {result.returncode}\n"
f"stdout: {result.stdout}"
)
assert "Certificate does not exist" in _output(result)