diff --git a/.gitignore b/.gitignore index a145224..747bd85 100644 --- a/.gitignore +++ b/.gitignore @@ -134,6 +134,5 @@ dmypy.json Thumbs.db # Project specific -migrations/versions/*.py -!migrations/versions/.gitkeep + *.db diff --git a/README.md b/README.md index b93c081..bdce491 100644 --- a/README.md +++ b/README.md @@ -76,37 +76,6 @@ python wsgi.py The API will be available at `http://localhost:5000` -## Project Structure - -``` -backend/ -├── app/ -│ ├── __init__.py # Application factory -│ ├── api/ # API endpoints -│ │ ├── __init__.py -│ │ └── v1/ -│ │ ├── auth.py # Authentication endpoints -│ │ ├── users.py # User endpoints -│ │ └── organizations.py -│ ├── exceptions/ # Custom exceptions -│ ├── middleware/ # Middleware components -│ ├── models/ # Database models -│ ├── schemas/ # Marshmallow schemas -│ ├── services/ # Business logic layer -│ └── utils/ # Utilities -├── config/ # Configuration files -├── docs/ # Documentation -├── migrations/ # Database migrations -├── scripts/ # Utility scripts -├── tests/ # Test suite -│ ├── integration/ -│ └── unit/ -├── requirements/ # Dependencies -├── .env.example # Environment variables template -├── pytest.ini # Pytest configuration -├── pyproject.toml # Project metadata -└── wsgi.py # WSGI entry point -``` ## API Endpoints @@ -170,24 +139,6 @@ Error responses: } ``` -## Testing - -Run all tests: -```bash -pytest -``` - -Run with coverage: -```bash -pytest --cov=app --cov-report=html -``` - -Run specific test types: -```bash -pytest -m unit # Unit tests only -pytest -m integration # Integration tests only -``` - ## Database Migrations Create a new migration: @@ -205,21 +156,6 @@ Rollback: flask db downgrade ``` -## Development - -### Code Quality - -Run linter: -```bash -flake8 app/ tests/ -``` - -Format code: -```bash -black app/ tests/ -isort app/ tests/ -``` - ### Environment Configuration - **Development**: `FLASK_ENV=development` @@ -235,24 +171,6 @@ pip install -r requirements/production.txt gunicorn -w 4 -b 0.0.0.0:8000 wsgi:app ``` -### Docker (example) - -```dockerfile -FROM python:3.11-slim -WORKDIR /app -COPY requirements/production.txt . -RUN pip install -r production.txt -COPY . . -CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "wsgi:app"] -``` - -### Environment Variables - -Required production environment variables: -- `SECRET_KEY` - Flask secret key (must be random) -- `DATABASE_URL` - PostgreSQL connection string -- `REDIS_URL` - Redis connection string -- `FLASK_ENV=production` ## Security Considerations @@ -264,25 +182,6 @@ Required production environment variables: - Session management with secure cookies - Request ID tracking for audit trails -## License - -MIT - -## Contributing - -1. Fork the repository -2. Create a feature branch -3. Make your changes -4. Add tests -5. Run test suite -6. Submit a pull request - -## Support - -For issues and questions: -- GitHub Issues: [repository-url]/issues -- Documentation: See `docs/` directory - # Boostrap db python manage.py db upgrade diff --git a/flask_session/2029240f6d1128be89ddc32729463129 b/flask_session/2029240f6d1128be89ddc32729463129 new file mode 100644 index 0000000..60b84f8 Binary files /dev/null and b/flask_session/2029240f6d1128be89ddc32729463129 differ diff --git a/migrations/versions/d2fd4f159054_totp.py b/migrations/versions/d2fd4f159054_totp.py new file mode 100644 index 0000000..2f9b464 --- /dev/null +++ b/migrations/versions/d2fd4f159054_totp.py @@ -0,0 +1,124 @@ +"""totp + +Revision ID: d2fd4f159054 +Revises: 004 +Create Date: 2026-02-23 13:21:54.136904 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'd2fd4f159054' +down_revision = '004' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('application_provider_configs', + sa.Column('provider_type', sa.String(length=50), nullable=False), + sa.Column('client_id', sa.String(length=255), nullable=False), + sa.Column('client_secret_encrypted', sa.String(length=512), nullable=True), + sa.Column('is_enabled', sa.Boolean(), nullable=False), + sa.Column('default_redirect_url', sa.String(length=2048), nullable=True), + sa.Column('additional_config', sa.JSON(), nullable=True), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.Column('deleted_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('id') + ) + op.create_index(op.f('ix_application_provider_configs_provider_type'), 'application_provider_configs', ['provider_type'], unique=True) + op.create_table('external_provider_configs', + sa.Column('organization_id', sa.String(length=36), nullable=False), + sa.Column('provider_type', sa.String(length=50), nullable=False), + sa.Column('client_id', sa.String(length=255), nullable=False), + sa.Column('client_secret_encrypted', sa.String(length=512), nullable=True), + sa.Column('auth_url', sa.String(length=2048), nullable=False), + sa.Column('token_url', sa.String(length=2048), nullable=False), + sa.Column('userinfo_url', sa.String(length=2048), nullable=True), + sa.Column('jwks_url', sa.String(length=2048), nullable=True), + sa.Column('scopes', sa.JSON(), nullable=False), + sa.Column('redirect_uris', sa.JSON(), nullable=False), + sa.Column('settings', sa.JSON(), nullable=True), + sa.Column('is_active', sa.Boolean(), nullable=False), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.Column('deleted_at', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['organization_id'], ['organizations.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('id'), + sa.UniqueConstraint('organization_id', 'provider_type', name='uix_org_provider_type') + ) + op.create_index('idx_provider_config_org', 'external_provider_configs', ['organization_id', 'provider_type'], unique=False) + op.create_index(op.f('ix_external_provider_configs_organization_id'), 'external_provider_configs', ['organization_id'], unique=False) + op.create_index(op.f('ix_external_provider_configs_provider_type'), 'external_provider_configs', ['provider_type'], unique=False) + op.create_table('oauth_states', + sa.Column('state', sa.String(length=64), nullable=False), + sa.Column('flow_type', sa.String(length=50), nullable=False), + sa.Column('provider_type', sa.String(length=50), nullable=False), + sa.Column('user_id', sa.String(length=36), nullable=True), + sa.Column('organization_id', sa.String(length=36), nullable=True), + sa.Column('nonce', sa.String(length=128), nullable=True), + sa.Column('code_verifier', sa.String(length=128), nullable=True), + sa.Column('code_challenge', sa.String(length=128), nullable=True), + sa.Column('redirect_uri', sa.String(length=2048), nullable=True), + sa.Column('return_url', sa.String(length=2048), nullable=True), + sa.Column('extra_data', sa.JSON(), nullable=True), + sa.Column('expires_at', sa.DateTime(), nullable=False), + sa.Column('used', sa.Boolean(), nullable=False), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.Column('deleted_at', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['organization_id'], ['organizations.id'], ), + sa.ForeignKeyConstraint(['user_id'], ['users.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('id') + ) + op.create_index(op.f('ix_oauth_states_expires_at'), 'oauth_states', ['expires_at'], unique=False) + op.create_index(op.f('ix_oauth_states_organization_id'), 'oauth_states', ['organization_id'], unique=False) + op.create_index(op.f('ix_oauth_states_state'), 'oauth_states', ['state'], unique=True) + op.create_table('organization_provider_overrides', + sa.Column('organization_id', sa.String(length=36), nullable=False), + sa.Column('provider_type', sa.String(length=50), nullable=False), + sa.Column('client_id', sa.String(length=255), nullable=True), + sa.Column('client_secret_encrypted', sa.String(length=512), nullable=True), + sa.Column('is_enabled', sa.Boolean(), nullable=False), + sa.Column('redirect_url_override', sa.String(length=2048), nullable=True), + sa.Column('additional_config', sa.JSON(), nullable=True), + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.Column('deleted_at', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['organization_id'], ['organizations.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('id'), + sa.UniqueConstraint('organization_id', 'provider_type', name='uix_org_provider_type') + ) + op.create_index(op.f('ix_organization_provider_overrides_organization_id'), 'organization_provider_overrides', ['organization_id'], unique=False) + op.create_index(op.f('ix_organization_provider_overrides_provider_type'), 'organization_provider_overrides', ['provider_type'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_organization_provider_overrides_provider_type'), table_name='organization_provider_overrides') + op.drop_index(op.f('ix_organization_provider_overrides_organization_id'), table_name='organization_provider_overrides') + op.drop_table('organization_provider_overrides') + op.drop_index(op.f('ix_oauth_states_state'), table_name='oauth_states') + op.drop_index(op.f('ix_oauth_states_organization_id'), table_name='oauth_states') + op.drop_index(op.f('ix_oauth_states_expires_at'), table_name='oauth_states') + op.drop_table('oauth_states') + op.drop_index(op.f('ix_external_provider_configs_provider_type'), table_name='external_provider_configs') + op.drop_index(op.f('ix_external_provider_configs_organization_id'), table_name='external_provider_configs') + op.drop_index('idx_provider_config_org', table_name='external_provider_configs') + op.drop_table('external_provider_configs') + op.drop_index(op.f('ix_application_provider_configs_provider_type'), table_name='application_provider_configs') + op.drop_table('application_provider_configs') + # ### end Alembic commands ### diff --git a/scripts/manual_totp_reset.md b/scripts/manual_totp_reset.md new file mode 100644 index 0000000..790a3cf --- /dev/null +++ b/scripts/manual_totp_reset.md @@ -0,0 +1,47 @@ +# Manual TOTP Reset for Testing + +Since Bob has TOTP enabled, you have two options to run the full test: + +## Option 1: Restart Flask Server (Easiest) +The Flask server running on port 8888 uses an in-memory SQLite database. +Simply restart it to clear all data: + +```bash +# Stop the server (Ctrl+C in the terminal) +# Then restart it +cd gatehouse-api +.venv/bin/flask run --debug --port 8888 +``` + +Then run the test: +```bash +.venv/bin/python test_totp_full.py +``` + +## Option 2: Use the TOTP Secret + +If you have the secret from the previous enrollment (check `.totp_test_data.json` if it exists): + +1. Edit `test_totp_full.py` +2. Update the `test_data` initialization: +```python +test_data = { + "secret": "YOUR_SECRET_HERE", # From previous enrollment + "backup_codes": ["CODE1", "CODE2", ...], # From previous enrollment + "last_run": None +} +``` + +3. Run the test + +## Option 3: Database Direct Access (if file-based DB) + +If using PostgreSQL or file-based SQLite: + +```sql +DELETE FROM authentication_methods +WHERE user_id = (SELECT id FROM users WHERE email = 'bob@acme-corp.com') + AND method_type = 'totp'; +``` + +The test will then run through the complete flow and save the new secret/codes to `.totp_test_data.json` for subsequent runs. diff --git a/scripts/oidc_test.sh b/scripts/oidc_test.sh new file mode 100644 index 0000000..f5abb1a --- /dev/null +++ b/scripts/oidc_test.sh @@ -0,0 +1,102 @@ +#!/usr/bin/env bash +set -euo pipefail + +ISSUER="https://oidctest.wsweet.org" +CLIENT_ID="secret" +CLIENT_SECRET="tardis" +REDIRECT_URI="http://127.0.0.1:5556/callback" +SCOPE="openid profile email offline_access" + +# --------------------------- +# Discover OIDC endpoints +# --------------------------- +DISCOVERY=$(curl -s "$ISSUER/.well-known/openid-configuration") + +AUTH_ENDPOINT=$(echo "$DISCOVERY" | jq -r .authorization_endpoint) +TOKEN_ENDPOINT=$(echo "$DISCOVERY" | jq -r .token_endpoint) +USERINFO_ENDPOINT=$(echo "$DISCOVERY" | jq -r .userinfo_endpoint) + +echo "Auth endpoint : $AUTH_ENDPOINT" +echo "Token endpoint: $TOKEN_ENDPOINT" +echo + +# --------------------------- +# PKCE +# --------------------------- +CODE_VERIFIER=$(openssl rand -base64 32 | tr -d '=+/') +CODE_CHALLENGE=$(echo -n "$CODE_VERIFIER" | openssl dgst -sha256 -binary | openssl base64 | tr -d '=+/' | tr '/+' '_-') + +STATE=$(openssl rand -hex 16) +NONCE=$(openssl rand -hex 16) + +# --------------------------- +# Build auth URL +# --------------------------- +AUTH_URL="$AUTH_ENDPOINT?response_type=code\ +&client_id=$CLIENT_ID\ +&redirect_uri=$(printf '%s' "$REDIRECT_URI" | jq -s -R -r @uri)\ +&scope=$(printf '%s' "$SCOPE" | jq -s -R -r @uri)\ +&state=$STATE\ +&nonce=$NONCE\ +&code_challenge=$CODE_CHALLENGE\ +&code_challenge_method=S256" + +echo "Open this URL in a browser:" +echo +echo "$AUTH_URL" +echo +echo "After login you will be redirected to:" +echo "$REDIRECT_URI?code=XXXX&state=YYYY" +echo +read -p "Paste the full redirect URL: " REDIRECT + +CODE=$(echo "$REDIRECT" | sed -n 's/.*code=\([^&]*\).*/\1/p') +RETURNED_STATE=$(echo "$REDIRECT" | sed -n 's/.*state=\([^&]*\).*/\1/p') + +if [ "$RETURNED_STATE" != "$STATE" ]; then + echo "STATE MISMATCH" + exit 1 +fi + +# --------------------------- +# Exchange code for tokens +# --------------------------- +TOKENS=$(curl -s -X POST "$TOKEN_ENDPOINT" \ + -u "$CLIENT_ID:$CLIENT_SECRET" \ + -H "Content-Type: application/x-www-form-urlencoded" \ + -d "grant_type=authorization_code" \ + -d "code=$CODE" \ + -d "redirect_uri=$REDIRECT_URI" \ + -d "code_verifier=$CODE_VERIFIER") + +echo +echo "Token response:" +echo "$TOKENS" | jq . + +ACCESS_TOKEN=$(echo "$TOKENS" | jq -r .access_token) +ID_TOKEN=$(echo "$TOKENS" | jq -r .id_token) + +# --------------------------- +# JWT decode function +# --------------------------- +decode() { + echo "$1" | awk -F. '{print $2}' | tr '_-' '/+' | base64 -d 2>/dev/null | jq . +} + +echo +echo "================ ID TOKEN ================" +decode "$ID_TOKEN" + +echo +echo "============== ACCESS TOKEN ==============" +decode "$ACCESS_TOKEN" + +# --------------------------- +# Userinfo (optional) +# --------------------------- +if [ "$USERINFO_ENDPOINT" != "null" ]; then + echo + echo "=============== USERINFO =================" + curl -s -H "Authorization: Bearer $ACCESS_TOKEN" "$USERINFO_ENDPOINT" | jq . +fi + diff --git a/scripts/test_oauth_without_org.sh b/scripts/test_oauth_without_org.sh new file mode 100755 index 0000000..fdf4d76 --- /dev/null +++ b/scripts/test_oauth_without_org.sh @@ -0,0 +1,70 @@ +#!/bin/bash + +# Test script to verify OAuth endpoints work without organization_id +# This tests the fix for the "Google OAuth is not configured for this organization" error + +API_BASE="http://localhost:5001/api/v1" + +echo "=== Testing OAuth Authorization Endpoint (without organization_id) ===" +echo "" +echo "1. Initiating Google OAuth login flow (NO organization_id)..." +RESPONSE=$(curl -s -X GET "${API_BASE}/auth/external/google/authorize?flow=login") +echo "Response: $RESPONSE" +echo "" + +# Check if we get an authorization URL +if echo "$RESPONSE" | grep -q "authorization_url"; then + echo "✅ SUCCESS: Got authorization URL without requiring organization_id" + AUTH_URL=$(echo "$RESPONSE" | jq -r '.data.authorization_url') + STATE=$(echo "$RESPONSE" | jq -r '.data.state') + echo "Authorization URL: $AUTH_URL" + echo "State: $STATE" +else + echo "❌ FAILED: Did not get authorization URL" + echo "Error: $(echo "$RESPONSE" | jq -r '.message')" +fi + +echo "" +echo "=== Testing with organization_id hint (should still work) ===" +echo "" +echo "2. Initiating Google OAuth login flow (WITH organization_id hint)..." +# You'll need to replace this with an actual organization ID from your database +ORG_ID="test-org-id" +RESPONSE=$(curl -s -X GET "${API_BASE}/auth/external/google/authorize?flow=login&organization_id=${ORG_ID}") +echo "Response: $RESPONSE" +echo "" + +if echo "$RESPONSE" | grep -q "authorization_url"; then + echo "✅ SUCCESS: OAuth works with organization_id hint (backward compatible)" +else + echo "⚠️ Note: This may fail if the organization ID doesn't exist or if app-level config is not set" +fi + +echo "" +echo "=== Testing Register Flow ===" +echo "" +echo "3. Initiating Google OAuth register flow (NO organization_id)..." +RESPONSE=$(curl -s -X GET "${API_BASE}/auth/external/google/authorize?flow=register") +echo "Response: $RESPONSE" +echo "" + +if echo "$RESPONSE" | grep -q "authorization_url"; then + echo "✅ SUCCESS: Register flow works without organization_id" +else + echo "❌ FAILED: Register flow did not work" + echo "Error: $(echo "$RESPONSE" | jq -r '.message')" +fi + +echo "" +echo "=== Summary ===" +echo "" +echo "The key fix addresses the error:" +echo " 'Google OAuth is not configured for this organization'" +echo "" +echo "Now OAuth flows work at the APPLICATION level, not requiring" +echo "an organization context during initial authentication." +echo "" +echo "After OAuth callback:" +echo " - Single org user → Automatic login" +echo " - Multi org user → Organization selection UI" +echo " - New user → Organization creation/selection UI" diff --git a/scripts/test_totp_full.py b/scripts/test_totp_full.py new file mode 100644 index 0000000..a745654 --- /dev/null +++ b/scripts/test_totp_full.py @@ -0,0 +1,501 @@ +#!/usr/bin/env python3 +""" +COMPREHENSIVE TOTP END-TO-END FUNCTIONAL TEST +Tests all aspects of TOTP functionality regardless of current state. + +Based on approved proposal in TOTP_TEST_PROPOSAL.md +""" +import requests +import pyotp +import json +import sys +import os +from datetime import datetime, timezone + +# Configuration +BASE_URL = "http://localhost:8888/api/v1" +CREDENTIALS = { + "email": "bob@acme-corp.com", + "password": "UserPass123!" +} +DATA_FILE = ".totp_test_data.json" + +# Test state +test_data = { + "secret": None, + "backup_codes": [], + "last_run": None +} + +def load_test_data(): + """Load test data from previous run.""" + global test_data + if os.path.exists(DATA_FILE): + with open(DATA_FILE, 'r') as f: + test_data = json.load(f) + print(f"📂 Loaded test data from {DATA_FILE}") + print(f" Secret: {test_data['secret'][:20] if test_data['secret'] else 'None'}...") + print(f" Backup codes: {len(test_data.get('backup_codes', []))}") + else: + print(f"📂 No previous test data found") + +def save_test_data(): + """Save test data for next run.""" + test_data['last_run'] = datetime.now(timezone.utc).isoformat() + with open(DATA_FILE, 'w') as f: + json.dump(test_data, f, indent=2) + print(f"\n💾 Saved test data to {DATA_FILE}") + +def print_section(step, title): + """Print test section header.""" + print(f"\n{'='*70}") + print(f"[STEP {step}] {title}") + print('='*70) + +def main(): + """Run comprehensive TOTP test.""" + + print("\n" + "="*70) + print("COMPREHENSIVE TOTP END-TO-END TEST") + print(f"User: {CREDENTIALS['email']}") + print(f"Server: {BASE_URL}") + print(f"Time: {datetime.now(timezone.utc).isoformat()}") + print("="*70) + + load_test_data() + + session = requests.Session() + auth_token = None + totp = None + step = 0 + + try: + # ==================== PHASE 1: INITIAL LOGIN ==================== + + step += 1 + print_section(step, "Initial Login") + + login_response = session.post(f"{BASE_URL}/auth/login", json=CREDENTIALS) + + if login_response.status_code != 200: + print(f"❌ Login failed: {login_response.status_code}") + print(json.dumps(login_response.json(), indent=2)) + return False + + login_data = login_response.json() + + # Check if TOTP is required + totp_required = login_data.get("data", {}).get("requires_totp", False) + + if totp_required: + print("⚠️ TOTP is ENABLED - login requires verification") + + # We need either saved secret or backup code + if test_data.get('secret'): + print("ℹ️ Using saved secret to generate TOTP code") + totp = pyotp.TOTP(test_data['secret']) + utc_now = datetime.now(timezone.utc) + code = totp.at(utc_now) + print(f" Generated code: {code}") + print(f" At time: {utc_now.isoformat()}") + + verify_response = session.post( + f"{BASE_URL}/auth/totp/verify", + json={"code": code} + ) + + if verify_response.status_code != 200: + print("❌ TOTP code verification failed") + print(" Trying backup code...") + + if test_data.get('backup_codes'): + # Try first unused backup code + for backup_code in test_data['backup_codes']: + verify_response = session.post( + f"{BASE_URL}/auth/totp/verify", + json={"code": backup_code, "is_backup_code": True} + ) + if verify_response.status_code == 200: + print(f"✅ Authenticated with backup code: {backup_code}") + # Remove used code + test_data['backup_codes'].remove(backup_code) + break + else: + print("❌ All backup codes failed") + print("\nPlease manually delete Bob's TOTP from database:") + print("DELETE FROM authentication_methods WHERE user_id = (SELECT id FROM users WHERE email = 'bob@acme-corp.com') AND method_type = 'totp';") + return False + else: + print("❌ No backup codes available") + return False + + auth_token = verify_response.json()["data"]["token"] + print("✅ Logged in with TOTP verification") + + elif test_data.get('backup_codes'): + print("ℹ️ Using backup code to authenticate") + + for backup_code in test_data['backup_codes']: + verify_response = session.post( + f"{BASE_URL}/auth/totp/verify", + json={"code": backup_code, "is_backup_code": True} + ) + if verify_response.status_code == 200: + auth_token = verify_response.json()["data"]["token"] + print(f"✅ Authenticated with backup code: {backup_code}") + test_data['backup_codes'].remove(backup_code) + break + else: + print("❌ No valid backup codes") + return False + else: + print("❌ TOTP enabled but no secret or backup codes available") + print("\nPlease manually delete Bob's TOTP from database:") + print("DELETE FROM authentication_methods WHERE user_id = (SELECT id FROM users WHERE email = 'bob@acme-corp.com') AND method_type = 'totp';") + return False + else: + auth_token = login_data["data"]["token"] + print("✅ Logged in (TOTP not required)") + + # ==================== PHASE 2: CHECK STATUS AND DISABLE IF ENABLED ==================== + + step += 1 + print_section(step, "Check TOTP Status") + + status_response = session.get( + f"{BASE_URL}/auth/totp/status", + headers={"Authorization": f"Bearer {auth_token}"} + ) + + if status_response.status_code != 200: + print("❌ Failed to get TOTP status") + return False + + status_data = status_response.json()["data"] + print(f"TOTP Enabled: {status_data['totp_enabled']}") + print(f"Verified At: {status_data.get('verified_at', 'N/A')}") + print(f"Backup Codes Remaining: {status_data['backup_codes_remaining']}") + + # If TOTP is enabled, disable it + if status_data['totp_enabled']: + step += 1 + print_section(step, "Disable TOTP") + + disable_response = session.delete( + f"{BASE_URL}/auth/totp/disable", + headers={"Authorization": f"Bearer {auth_token}"}, + json={"password": CREDENTIALS["password"]} + ) + + if disable_response.status_code != 200: + print("❌ Failed to disable TOTP") + print(json.dumps(disable_response.json(), indent=2)) + return False + + print("✅ TOTP disabled") + + # Clear saved secret/codes since we're starting fresh + test_data['secret'] = None + test_data['backup_codes'] = [] + else: + print("ℹ️ TOTP already disabled, skipping disable step") + + # ==================== PHASE 3: LOGOUT AND RE-LOGIN ==================== + + step += 1 + print_section(step, "Logout") + + logout_response = session.post( + f"{BASE_URL}/auth/logout", + headers={"Authorization": f"Bearer {auth_token}"} + ) + print(f"✅ Logged out (status: {logout_response.status_code})") + + step += 1 + print_section(step, "Re-login (TOTP should NOT be required)") + + session = requests.Session() # Fresh session + login2_response = session.post(f"{BASE_URL}/auth/login", json=CREDENTIALS) + + if login2_response.status_code != 200: + print("❌ Re-login failed") + return False + + login2_data = login2_response.json() + if login2_data.get("data", {}).get("requires_totp"): + print("❌ Login still requires TOTP (should not after disabling)") + return False + + auth_token = login2_data["data"]["token"] + print("✅ Logged in successfully (no TOTP required)") + + # ==================== PHASE 4: ENROLL IN TOTP ==================== + + step += 1 + print_section(step, "Enroll in TOTP") + + enroll_response = session.post( + f"{BASE_URL}/auth/totp/enroll", + headers={"Authorization": f"Bearer {auth_token}"} + ) + + if enroll_response.status_code != 201: + print(f"❌ Enrollment failed: {enroll_response.status_code}") + print(json.dumps(enroll_response.json(), indent=2)) + return False + + enroll_data = enroll_response.json()["data"] + new_secret = enroll_data["secret"] + new_backup_codes = enroll_data["backup_codes"] + provisioning_uri = enroll_data["provisioning_uri"] + qr_code = enroll_data.get("qr_code", "") + + print(f"✅ Enrollment initiated") + print(f" Secret: {new_secret}") + print(f" Provisioning URI: {provisioning_uri}") + print(f" QR Code: {'Present (%d bytes)' % len(qr_code) if qr_code else 'Missing'}") + print(f" Backup Codes: {len(new_backup_codes)}") + + # Save for later use + test_data['secret'] = new_secret + test_data['backup_codes'] = new_backup_codes.copy() + + # ==================== PHASE 5: VERIFY ENROLLMENT ==================== + + step += 1 + print_section(step, "Verify TOTP Enrollment") + + totp = pyotp.TOTP(new_secret) + utc_now = datetime.now(timezone.utc) + code = totp.at(utc_now) + + print(f"Generated TOTP code: {code}") + print(f"At UTC time: {utc_now.isoformat()}") + print(f"Timestamp: {utc_now.timestamp()}") + + verify_enrollment_response = session.post( + f"{BASE_URL}/auth/totp/verify-enrollment", + headers={"Authorization": f"Bearer {auth_token}"}, + json={"code": code} + ) + + if verify_enrollment_response.status_code != 200: + print(f"❌ Verification failed: {verify_enrollment_response.status_code}") + print(json.dumps(verify_enrollment_response.json(), indent=2)) + return False + + print("✅ TOTP enrollment verified successfully!") + + # ==================== PHASE 6: CONFIRM ENROLLMENT ==================== + + step += 1 + print_section(step, "Confirm TOTP is Enabled") + + final_status_response = session.get( + f"{BASE_URL}/auth/totp/status", + headers={"Authorization": f"Bearer {auth_token}"} + ) + + final_status = final_status_response.json()["data"] + if not final_status["totp_enabled"]: + print("❌ TOTP not enabled after verification!") + return False + + print(f"✅ TOTP is enabled") + print(f" Verified at: {final_status['verified_at']}") + print(f" Backup codes remaining: {final_status['backup_codes_remaining']}") + + # ==================== PHASE 7: TEST LOGIN WITH TOTP ==================== + + step += 1 + print_section(step, "Logout") + + session.post(f"{BASE_URL}/auth/logout", headers={"Authorization": f"Bearer {auth_token}"}) + print("✅ Logged out") + + step += 1 + print_section(step, "Login (should REQUIRE TOTP)") + + session2 = requests.Session() + login3_response = session2.post(f"{BASE_URL}/auth/login", json=CREDENTIALS) + + if login3_response.status_code != 200: + print("❌ Login failed") + return False + + login3_data = login3_response.json() + if not login3_data.get("data", {}).get("requires_totp"): + print("❌ Login did NOT require TOTP (it should!)") + return False + + print("✅ Login correctly requires TOTP") + + # ==================== PHASE 8: VERIFY TOTP DURING LOGIN ==================== + + step += 1 + print_section(step, "Verify TOTP Code During Login") + + utc_now = datetime.now(timezone.utc) + login_code = totp.at(utc_now) + + print(f"Generated TOTP code: {login_code}") + print(f"At UTC time: {utc_now.isoformat()}") + + verify_login_response = session2.post( + f"{BASE_URL}/auth/totp/verify", + json={"code": login_code} + ) + + if verify_login_response.status_code != 200: + print(f"❌ TOTP login verification failed: {verify_login_response.status_code}") + print(json.dumps(verify_login_response.json(), indent=2)) + return False + + final_token = verify_login_response.json()["data"]["token"] + print("✅ Successfully logged in with TOTP!") + print(f" Token: {final_token[:30]}...") + + # ==================== PHASE 9: TEST /auth/me ==================== + + step += 1 + print_section(step, "Confirm Logged In (/auth/me)") + + me_response = session2.get( + f"{BASE_URL}/auth/me", + headers={"Authorization": f"Bearer {final_token}"} + ) + + if me_response.status_code != 200: + print("❌ /auth/me failed") + return False + + me_data = me_response.json()["data"] + print(f"✅ Confirmed logged in as: {me_data['user']['email']}") + print(f" User ID: {me_data['user']['id']}") + + # ==================== PHASE 10: TEST BACKUP CODE ==================== + + step += 1 + print_section(step, "Test Backup Code Login") + + # Logout + session2.post(f"{BASE_URL}/auth/logout", headers={"Authorization": f"Bearer {final_token}"}) + + # Fresh login + session3 = requests.Session() + login4_response = session3.post(f"{BASE_URL}/auth/login", json=CREDENTIALS) + + if not login4_response.json().get("data", {}).get("requires_totp"): + print("❌ Login should require TOTP") + return False + + print(f"ℹ️ Using backup code: {test_data['backup_codes'][0]}") + + backup_verify_response = session3.post( + f"{BASE_URL}/auth/totp/verify", + json={"code": test_data['backup_codes'][0], "is_backup_code": True} + ) + + if backup_verify_response.status_code != 200: + print("❌ Backup code login failed") + print(json.dumps(backup_verify_response.json(), indent=2)) + return False + + backup_token = backup_verify_response.json()["data"]["token"] + print(f"✅ Logged in with backup code!") + + # Remove used code + used_code = test_data['backup_codes'].pop(0) + + # ==================== PHASE 11: CHECK BACKUP CODES REMAINING ==================== + + step += 1 + print_section(step, "Check Backup Codes Remaining") + + status3_response = session3.get( + f"{BASE_URL}/auth/totp/status", + headers={"Authorization": f"Bearer {backup_token}"} + ) + + status3_data = status3_response.json()["data"] + if status3_data['backup_codes_remaining'] != 9: + print(f"❌ Expected 9 backup codes, got {status3_data['backup_codes_remaining']}") + return False + + print(f"✅ Backup codes remaining: {status3_data['backup_codes_remaining']} (was 10, now 9)") + + # ==================== PHASE 12: REGENERATE BACKUP CODES ==================== + + step += 1 + print_section(step, "Regenerate Backup Codes") + + regen_response = session3.post( + f"{BASE_URL}/auth/totp/regenerate-backup-codes", + headers={"Authorization": f"Bearer {backup_token}"}, + json={"password": CREDENTIALS["password"]} + ) + + if regen_response.status_code != 200: + print("❌ Failed to regenerate backup codes") + print(json.dumps(regen_response.json(), indent=2)) + return False + + regenerated_codes = regen_response.json()["data"]["backup_codes"] + print(f"✅ Regenerated {len(regenerated_codes)} backup codes") + + # Update saved codes + test_data['backup_codes'] = regenerated_codes.copy() + + # ==================== SUCCESS ==================== + + save_test_data() + + print("\n" + "="*70) + print("🎉 ALL TESTS PASSED!") + print("="*70) + + print("\n✅ TEST SUMMARY:") + print(f" 1. ✅ Initial login (with/without TOTP)") + print(f" 2. ✅ Check TOTP status") + print(f" 3. ✅ Disable TOTP") + print(f" 4. ✅ Logout") + print(f" 5. ✅ Re-login without TOTP") + print(f" 6. ✅ Enroll in TOTP") + print(f" 7. ✅ Verify enrollment") + print(f" 8. ✅ Confirm TOTP enabled") + print(f" 9. ✅ Logout") + print(f" 10. ✅ Login with TOTP required") + print(f" 11. ✅ Verify TOTP during login") + print(f" 12. ✅ Confirm logged in (/auth/me)") + print(f" 13. ✅ Login with backup code") + print(f" 14. ✅ Check backup codes decremented") + print(f" 15. ✅ Regenerate backup codes") + + print(f"\n📱 Current TOTP Secret:") + print(f" {test_data['secret']}") + + print(f"\n🔑 Current Backup Codes ({len(test_data['backup_codes'])}):") + for i, code in enumerate(test_data['backup_codes'], 1): + print(f" {i:2d}. {code}") + + print("\n" + "="*70) + + return True + + except requests.exceptions.ConnectionError: + print(f"\n❌ CONNECTION ERROR - Server not running at {BASE_URL}") + return False + except KeyError as e: + print(f"\n❌ UNEXPECTED RESPONSE STRUCTURE: Missing key {e}") + import traceback + traceback.print_exc() + return False + except Exception as e: + print(f"\n❌ UNEXPECTED ERROR: {e}") + import traceback + traceback.print_exc() + return False + +if __name__ == "__main__": + success = main() + sys.exit(0 if success else 1)