This commit is contained in:
2026-02-23 13:25:17 +10:30
parent cbdf6185b6
commit f1fff22f3e
8 changed files with 845 additions and 103 deletions
+1 -2
View File
@@ -134,6 +134,5 @@ dmypy.json
Thumbs.db
# Project specific
migrations/versions/*.py
!migrations/versions/.gitkeep
*.db
-101
View File
@@ -76,37 +76,6 @@ python wsgi.py
The API will be available at `http://localhost:5000`
## Project Structure
```
backend/
├── app/
│ ├── __init__.py # Application factory
│ ├── api/ # API endpoints
│ │ ├── __init__.py
│ │ └── v1/
│ │ ├── auth.py # Authentication endpoints
│ │ ├── users.py # User endpoints
│ │ └── organizations.py
│ ├── exceptions/ # Custom exceptions
│ ├── middleware/ # Middleware components
│ ├── models/ # Database models
│ ├── schemas/ # Marshmallow schemas
│ ├── services/ # Business logic layer
│ └── utils/ # Utilities
├── config/ # Configuration files
├── docs/ # Documentation
├── migrations/ # Database migrations
├── scripts/ # Utility scripts
├── tests/ # Test suite
│ ├── integration/
│ └── unit/
├── requirements/ # Dependencies
├── .env.example # Environment variables template
├── pytest.ini # Pytest configuration
├── pyproject.toml # Project metadata
└── wsgi.py # WSGI entry point
```
## API Endpoints
@@ -170,24 +139,6 @@ Error responses:
}
```
## Testing
Run all tests:
```bash
pytest
```
Run with coverage:
```bash
pytest --cov=app --cov-report=html
```
Run specific test types:
```bash
pytest -m unit # Unit tests only
pytest -m integration # Integration tests only
```
## Database Migrations
Create a new migration:
@@ -205,21 +156,6 @@ Rollback:
flask db downgrade
```
## Development
### Code Quality
Run linter:
```bash
flake8 app/ tests/
```
Format code:
```bash
black app/ tests/
isort app/ tests/
```
### Environment Configuration
- **Development**: `FLASK_ENV=development`
@@ -235,24 +171,6 @@ pip install -r requirements/production.txt
gunicorn -w 4 -b 0.0.0.0:8000 wsgi:app
```
### Docker (example)
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements/production.txt .
RUN pip install -r production.txt
COPY . .
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "wsgi:app"]
```
### Environment Variables
Required production environment variables:
- `SECRET_KEY` - Flask secret key (must be random)
- `DATABASE_URL` - PostgreSQL connection string
- `REDIS_URL` - Redis connection string
- `FLASK_ENV=production`
## Security Considerations
@@ -264,25 +182,6 @@ Required production environment variables:
- Session management with secure cookies
- Request ID tracking for audit trails
## License
MIT
## Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests
5. Run test suite
6. Submit a pull request
## Support
For issues and questions:
- GitHub Issues: [repository-url]/issues
- Documentation: See `docs/` directory
# Boostrap db
python manage.py db upgrade
Binary file not shown.
+124
View File
@@ -0,0 +1,124 @@
"""totp
Revision ID: d2fd4f159054
Revises: 004
Create Date: 2026-02-23 13:21:54.136904
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'd2fd4f159054'
down_revision = '004'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('application_provider_configs',
sa.Column('provider_type', sa.String(length=50), nullable=False),
sa.Column('client_id', sa.String(length=255), nullable=False),
sa.Column('client_secret_encrypted', sa.String(length=512), nullable=True),
sa.Column('is_enabled', sa.Boolean(), nullable=False),
sa.Column('default_redirect_url', sa.String(length=2048), nullable=True),
sa.Column('additional_config', sa.JSON(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('id')
)
op.create_index(op.f('ix_application_provider_configs_provider_type'), 'application_provider_configs', ['provider_type'], unique=True)
op.create_table('external_provider_configs',
sa.Column('organization_id', sa.String(length=36), nullable=False),
sa.Column('provider_type', sa.String(length=50), nullable=False),
sa.Column('client_id', sa.String(length=255), nullable=False),
sa.Column('client_secret_encrypted', sa.String(length=512), nullable=True),
sa.Column('auth_url', sa.String(length=2048), nullable=False),
sa.Column('token_url', sa.String(length=2048), nullable=False),
sa.Column('userinfo_url', sa.String(length=2048), nullable=True),
sa.Column('jwks_url', sa.String(length=2048), nullable=True),
sa.Column('scopes', sa.JSON(), nullable=False),
sa.Column('redirect_uris', sa.JSON(), nullable=False),
sa.Column('settings', sa.JSON(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=False),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['organization_id'], ['organizations.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('id'),
sa.UniqueConstraint('organization_id', 'provider_type', name='uix_org_provider_type')
)
op.create_index('idx_provider_config_org', 'external_provider_configs', ['organization_id', 'provider_type'], unique=False)
op.create_index(op.f('ix_external_provider_configs_organization_id'), 'external_provider_configs', ['organization_id'], unique=False)
op.create_index(op.f('ix_external_provider_configs_provider_type'), 'external_provider_configs', ['provider_type'], unique=False)
op.create_table('oauth_states',
sa.Column('state', sa.String(length=64), nullable=False),
sa.Column('flow_type', sa.String(length=50), nullable=False),
sa.Column('provider_type', sa.String(length=50), nullable=False),
sa.Column('user_id', sa.String(length=36), nullable=True),
sa.Column('organization_id', sa.String(length=36), nullable=True),
sa.Column('nonce', sa.String(length=128), nullable=True),
sa.Column('code_verifier', sa.String(length=128), nullable=True),
sa.Column('code_challenge', sa.String(length=128), nullable=True),
sa.Column('redirect_uri', sa.String(length=2048), nullable=True),
sa.Column('return_url', sa.String(length=2048), nullable=True),
sa.Column('extra_data', sa.JSON(), nullable=True),
sa.Column('expires_at', sa.DateTime(), nullable=False),
sa.Column('used', sa.Boolean(), nullable=False),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['organization_id'], ['organizations.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('id')
)
op.create_index(op.f('ix_oauth_states_expires_at'), 'oauth_states', ['expires_at'], unique=False)
op.create_index(op.f('ix_oauth_states_organization_id'), 'oauth_states', ['organization_id'], unique=False)
op.create_index(op.f('ix_oauth_states_state'), 'oauth_states', ['state'], unique=True)
op.create_table('organization_provider_overrides',
sa.Column('organization_id', sa.String(length=36), nullable=False),
sa.Column('provider_type', sa.String(length=50), nullable=False),
sa.Column('client_id', sa.String(length=255), nullable=True),
sa.Column('client_secret_encrypted', sa.String(length=512), nullable=True),
sa.Column('is_enabled', sa.Boolean(), nullable=False),
sa.Column('redirect_url_override', sa.String(length=2048), nullable=True),
sa.Column('additional_config', sa.JSON(), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.Column('deleted_at', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['organization_id'], ['organizations.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('id'),
sa.UniqueConstraint('organization_id', 'provider_type', name='uix_org_provider_type')
)
op.create_index(op.f('ix_organization_provider_overrides_organization_id'), 'organization_provider_overrides', ['organization_id'], unique=False)
op.create_index(op.f('ix_organization_provider_overrides_provider_type'), 'organization_provider_overrides', ['provider_type'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_organization_provider_overrides_provider_type'), table_name='organization_provider_overrides')
op.drop_index(op.f('ix_organization_provider_overrides_organization_id'), table_name='organization_provider_overrides')
op.drop_table('organization_provider_overrides')
op.drop_index(op.f('ix_oauth_states_state'), table_name='oauth_states')
op.drop_index(op.f('ix_oauth_states_organization_id'), table_name='oauth_states')
op.drop_index(op.f('ix_oauth_states_expires_at'), table_name='oauth_states')
op.drop_table('oauth_states')
op.drop_index(op.f('ix_external_provider_configs_provider_type'), table_name='external_provider_configs')
op.drop_index(op.f('ix_external_provider_configs_organization_id'), table_name='external_provider_configs')
op.drop_index('idx_provider_config_org', table_name='external_provider_configs')
op.drop_table('external_provider_configs')
op.drop_index(op.f('ix_application_provider_configs_provider_type'), table_name='application_provider_configs')
op.drop_table('application_provider_configs')
# ### end Alembic commands ###
+47
View File
@@ -0,0 +1,47 @@
# Manual TOTP Reset for Testing
Since Bob has TOTP enabled, you have two options to run the full test:
## Option 1: Restart Flask Server (Easiest)
The Flask server running on port 8888 uses an in-memory SQLite database.
Simply restart it to clear all data:
```bash
# Stop the server (Ctrl+C in the terminal)
# Then restart it
cd gatehouse-api
.venv/bin/flask run --debug --port 8888
```
Then run the test:
```bash
.venv/bin/python test_totp_full.py
```
## Option 2: Use the TOTP Secret
If you have the secret from the previous enrollment (check `.totp_test_data.json` if it exists):
1. Edit `test_totp_full.py`
2. Update the `test_data` initialization:
```python
test_data = {
"secret": "YOUR_SECRET_HERE", # From previous enrollment
"backup_codes": ["CODE1", "CODE2", ...], # From previous enrollment
"last_run": None
}
```
3. Run the test
## Option 3: Database Direct Access (if file-based DB)
If using PostgreSQL or file-based SQLite:
```sql
DELETE FROM authentication_methods
WHERE user_id = (SELECT id FROM users WHERE email = 'bob@acme-corp.com')
AND method_type = 'totp';
```
The test will then run through the complete flow and save the new secret/codes to `.totp_test_data.json` for subsequent runs.
+102
View File
@@ -0,0 +1,102 @@
#!/usr/bin/env bash
set -euo pipefail
ISSUER="https://oidctest.wsweet.org"
CLIENT_ID="secret"
CLIENT_SECRET="tardis"
REDIRECT_URI="http://127.0.0.1:5556/callback"
SCOPE="openid profile email offline_access"
# ---------------------------
# Discover OIDC endpoints
# ---------------------------
DISCOVERY=$(curl -s "$ISSUER/.well-known/openid-configuration")
AUTH_ENDPOINT=$(echo "$DISCOVERY" | jq -r .authorization_endpoint)
TOKEN_ENDPOINT=$(echo "$DISCOVERY" | jq -r .token_endpoint)
USERINFO_ENDPOINT=$(echo "$DISCOVERY" | jq -r .userinfo_endpoint)
echo "Auth endpoint : $AUTH_ENDPOINT"
echo "Token endpoint: $TOKEN_ENDPOINT"
echo
# ---------------------------
# PKCE
# ---------------------------
CODE_VERIFIER=$(openssl rand -base64 32 | tr -d '=+/')
CODE_CHALLENGE=$(echo -n "$CODE_VERIFIER" | openssl dgst -sha256 -binary | openssl base64 | tr -d '=+/' | tr '/+' '_-')
STATE=$(openssl rand -hex 16)
NONCE=$(openssl rand -hex 16)
# ---------------------------
# Build auth URL
# ---------------------------
AUTH_URL="$AUTH_ENDPOINT?response_type=code\
&client_id=$CLIENT_ID\
&redirect_uri=$(printf '%s' "$REDIRECT_URI" | jq -s -R -r @uri)\
&scope=$(printf '%s' "$SCOPE" | jq -s -R -r @uri)\
&state=$STATE\
&nonce=$NONCE\
&code_challenge=$CODE_CHALLENGE\
&code_challenge_method=S256"
echo "Open this URL in a browser:"
echo
echo "$AUTH_URL"
echo
echo "After login you will be redirected to:"
echo "$REDIRECT_URI?code=XXXX&state=YYYY"
echo
read -p "Paste the full redirect URL: " REDIRECT
CODE=$(echo "$REDIRECT" | sed -n 's/.*code=\([^&]*\).*/\1/p')
RETURNED_STATE=$(echo "$REDIRECT" | sed -n 's/.*state=\([^&]*\).*/\1/p')
if [ "$RETURNED_STATE" != "$STATE" ]; then
echo "STATE MISMATCH"
exit 1
fi
# ---------------------------
# Exchange code for tokens
# ---------------------------
TOKENS=$(curl -s -X POST "$TOKEN_ENDPOINT" \
-u "$CLIENT_ID:$CLIENT_SECRET" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=authorization_code" \
-d "code=$CODE" \
-d "redirect_uri=$REDIRECT_URI" \
-d "code_verifier=$CODE_VERIFIER")
echo
echo "Token response:"
echo "$TOKENS" | jq .
ACCESS_TOKEN=$(echo "$TOKENS" | jq -r .access_token)
ID_TOKEN=$(echo "$TOKENS" | jq -r .id_token)
# ---------------------------
# JWT decode function
# ---------------------------
decode() {
echo "$1" | awk -F. '{print $2}' | tr '_-' '/+' | base64 -d 2>/dev/null | jq .
}
echo
echo "================ ID TOKEN ================"
decode "$ID_TOKEN"
echo
echo "============== ACCESS TOKEN =============="
decode "$ACCESS_TOKEN"
# ---------------------------
# Userinfo (optional)
# ---------------------------
if [ "$USERINFO_ENDPOINT" != "null" ]; then
echo
echo "=============== USERINFO ================="
curl -s -H "Authorization: Bearer $ACCESS_TOKEN" "$USERINFO_ENDPOINT" | jq .
fi
+70
View File
@@ -0,0 +1,70 @@
#!/bin/bash
# Test script to verify OAuth endpoints work without organization_id
# This tests the fix for the "Google OAuth is not configured for this organization" error
API_BASE="http://localhost:5001/api/v1"
echo "=== Testing OAuth Authorization Endpoint (without organization_id) ==="
echo ""
echo "1. Initiating Google OAuth login flow (NO organization_id)..."
RESPONSE=$(curl -s -X GET "${API_BASE}/auth/external/google/authorize?flow=login")
echo "Response: $RESPONSE"
echo ""
# Check if we get an authorization URL
if echo "$RESPONSE" | grep -q "authorization_url"; then
echo "✅ SUCCESS: Got authorization URL without requiring organization_id"
AUTH_URL=$(echo "$RESPONSE" | jq -r '.data.authorization_url')
STATE=$(echo "$RESPONSE" | jq -r '.data.state')
echo "Authorization URL: $AUTH_URL"
echo "State: $STATE"
else
echo "❌ FAILED: Did not get authorization URL"
echo "Error: $(echo "$RESPONSE" | jq -r '.message')"
fi
echo ""
echo "=== Testing with organization_id hint (should still work) ==="
echo ""
echo "2. Initiating Google OAuth login flow (WITH organization_id hint)..."
# You'll need to replace this with an actual organization ID from your database
ORG_ID="test-org-id"
RESPONSE=$(curl -s -X GET "${API_BASE}/auth/external/google/authorize?flow=login&organization_id=${ORG_ID}")
echo "Response: $RESPONSE"
echo ""
if echo "$RESPONSE" | grep -q "authorization_url"; then
echo "✅ SUCCESS: OAuth works with organization_id hint (backward compatible)"
else
echo "⚠️ Note: This may fail if the organization ID doesn't exist or if app-level config is not set"
fi
echo ""
echo "=== Testing Register Flow ==="
echo ""
echo "3. Initiating Google OAuth register flow (NO organization_id)..."
RESPONSE=$(curl -s -X GET "${API_BASE}/auth/external/google/authorize?flow=register")
echo "Response: $RESPONSE"
echo ""
if echo "$RESPONSE" | grep -q "authorization_url"; then
echo "✅ SUCCESS: Register flow works without organization_id"
else
echo "❌ FAILED: Register flow did not work"
echo "Error: $(echo "$RESPONSE" | jq -r '.message')"
fi
echo ""
echo "=== Summary ==="
echo ""
echo "The key fix addresses the error:"
echo " 'Google OAuth is not configured for this organization'"
echo ""
echo "Now OAuth flows work at the APPLICATION level, not requiring"
echo "an organization context during initial authentication."
echo ""
echo "After OAuth callback:"
echo " - Single org user → Automatic login"
echo " - Multi org user → Organization selection UI"
echo " - New user → Organization creation/selection UI"
+501
View File
@@ -0,0 +1,501 @@
#!/usr/bin/env python3
"""
COMPREHENSIVE TOTP END-TO-END FUNCTIONAL TEST
Tests all aspects of TOTP functionality regardless of current state.
Based on approved proposal in TOTP_TEST_PROPOSAL.md
"""
import requests
import pyotp
import json
import sys
import os
from datetime import datetime, timezone
# Configuration
BASE_URL = "http://localhost:8888/api/v1"
CREDENTIALS = {
"email": "bob@acme-corp.com",
"password": "UserPass123!"
}
DATA_FILE = ".totp_test_data.json"
# Test state
test_data = {
"secret": None,
"backup_codes": [],
"last_run": None
}
def load_test_data():
"""Load test data from previous run."""
global test_data
if os.path.exists(DATA_FILE):
with open(DATA_FILE, 'r') as f:
test_data = json.load(f)
print(f"📂 Loaded test data from {DATA_FILE}")
print(f" Secret: {test_data['secret'][:20] if test_data['secret'] else 'None'}...")
print(f" Backup codes: {len(test_data.get('backup_codes', []))}")
else:
print(f"📂 No previous test data found")
def save_test_data():
"""Save test data for next run."""
test_data['last_run'] = datetime.now(timezone.utc).isoformat()
with open(DATA_FILE, 'w') as f:
json.dump(test_data, f, indent=2)
print(f"\n💾 Saved test data to {DATA_FILE}")
def print_section(step, title):
"""Print test section header."""
print(f"\n{'='*70}")
print(f"[STEP {step}] {title}")
print('='*70)
def main():
"""Run comprehensive TOTP test."""
print("\n" + "="*70)
print("COMPREHENSIVE TOTP END-TO-END TEST")
print(f"User: {CREDENTIALS['email']}")
print(f"Server: {BASE_URL}")
print(f"Time: {datetime.now(timezone.utc).isoformat()}")
print("="*70)
load_test_data()
session = requests.Session()
auth_token = None
totp = None
step = 0
try:
# ==================== PHASE 1: INITIAL LOGIN ====================
step += 1
print_section(step, "Initial Login")
login_response = session.post(f"{BASE_URL}/auth/login", json=CREDENTIALS)
if login_response.status_code != 200:
print(f"❌ Login failed: {login_response.status_code}")
print(json.dumps(login_response.json(), indent=2))
return False
login_data = login_response.json()
# Check if TOTP is required
totp_required = login_data.get("data", {}).get("requires_totp", False)
if totp_required:
print("⚠️ TOTP is ENABLED - login requires verification")
# We need either saved secret or backup code
if test_data.get('secret'):
print("️ Using saved secret to generate TOTP code")
totp = pyotp.TOTP(test_data['secret'])
utc_now = datetime.now(timezone.utc)
code = totp.at(utc_now)
print(f" Generated code: {code}")
print(f" At time: {utc_now.isoformat()}")
verify_response = session.post(
f"{BASE_URL}/auth/totp/verify",
json={"code": code}
)
if verify_response.status_code != 200:
print("❌ TOTP code verification failed")
print(" Trying backup code...")
if test_data.get('backup_codes'):
# Try first unused backup code
for backup_code in test_data['backup_codes']:
verify_response = session.post(
f"{BASE_URL}/auth/totp/verify",
json={"code": backup_code, "is_backup_code": True}
)
if verify_response.status_code == 200:
print(f"✅ Authenticated with backup code: {backup_code}")
# Remove used code
test_data['backup_codes'].remove(backup_code)
break
else:
print("❌ All backup codes failed")
print("\nPlease manually delete Bob's TOTP from database:")
print("DELETE FROM authentication_methods WHERE user_id = (SELECT id FROM users WHERE email = 'bob@acme-corp.com') AND method_type = 'totp';")
return False
else:
print("❌ No backup codes available")
return False
auth_token = verify_response.json()["data"]["token"]
print("✅ Logged in with TOTP verification")
elif test_data.get('backup_codes'):
print("️ Using backup code to authenticate")
for backup_code in test_data['backup_codes']:
verify_response = session.post(
f"{BASE_URL}/auth/totp/verify",
json={"code": backup_code, "is_backup_code": True}
)
if verify_response.status_code == 200:
auth_token = verify_response.json()["data"]["token"]
print(f"✅ Authenticated with backup code: {backup_code}")
test_data['backup_codes'].remove(backup_code)
break
else:
print("❌ No valid backup codes")
return False
else:
print("❌ TOTP enabled but no secret or backup codes available")
print("\nPlease manually delete Bob's TOTP from database:")
print("DELETE FROM authentication_methods WHERE user_id = (SELECT id FROM users WHERE email = 'bob@acme-corp.com') AND method_type = 'totp';")
return False
else:
auth_token = login_data["data"]["token"]
print("✅ Logged in (TOTP not required)")
# ==================== PHASE 2: CHECK STATUS AND DISABLE IF ENABLED ====================
step += 1
print_section(step, "Check TOTP Status")
status_response = session.get(
f"{BASE_URL}/auth/totp/status",
headers={"Authorization": f"Bearer {auth_token}"}
)
if status_response.status_code != 200:
print("❌ Failed to get TOTP status")
return False
status_data = status_response.json()["data"]
print(f"TOTP Enabled: {status_data['totp_enabled']}")
print(f"Verified At: {status_data.get('verified_at', 'N/A')}")
print(f"Backup Codes Remaining: {status_data['backup_codes_remaining']}")
# If TOTP is enabled, disable it
if status_data['totp_enabled']:
step += 1
print_section(step, "Disable TOTP")
disable_response = session.delete(
f"{BASE_URL}/auth/totp/disable",
headers={"Authorization": f"Bearer {auth_token}"},
json={"password": CREDENTIALS["password"]}
)
if disable_response.status_code != 200:
print("❌ Failed to disable TOTP")
print(json.dumps(disable_response.json(), indent=2))
return False
print("✅ TOTP disabled")
# Clear saved secret/codes since we're starting fresh
test_data['secret'] = None
test_data['backup_codes'] = []
else:
print("️ TOTP already disabled, skipping disable step")
# ==================== PHASE 3: LOGOUT AND RE-LOGIN ====================
step += 1
print_section(step, "Logout")
logout_response = session.post(
f"{BASE_URL}/auth/logout",
headers={"Authorization": f"Bearer {auth_token}"}
)
print(f"✅ Logged out (status: {logout_response.status_code})")
step += 1
print_section(step, "Re-login (TOTP should NOT be required)")
session = requests.Session() # Fresh session
login2_response = session.post(f"{BASE_URL}/auth/login", json=CREDENTIALS)
if login2_response.status_code != 200:
print("❌ Re-login failed")
return False
login2_data = login2_response.json()
if login2_data.get("data", {}).get("requires_totp"):
print("❌ Login still requires TOTP (should not after disabling)")
return False
auth_token = login2_data["data"]["token"]
print("✅ Logged in successfully (no TOTP required)")
# ==================== PHASE 4: ENROLL IN TOTP ====================
step += 1
print_section(step, "Enroll in TOTP")
enroll_response = session.post(
f"{BASE_URL}/auth/totp/enroll",
headers={"Authorization": f"Bearer {auth_token}"}
)
if enroll_response.status_code != 201:
print(f"❌ Enrollment failed: {enroll_response.status_code}")
print(json.dumps(enroll_response.json(), indent=2))
return False
enroll_data = enroll_response.json()["data"]
new_secret = enroll_data["secret"]
new_backup_codes = enroll_data["backup_codes"]
provisioning_uri = enroll_data["provisioning_uri"]
qr_code = enroll_data.get("qr_code", "")
print(f"✅ Enrollment initiated")
print(f" Secret: {new_secret}")
print(f" Provisioning URI: {provisioning_uri}")
print(f" QR Code: {'Present (%d bytes)' % len(qr_code) if qr_code else 'Missing'}")
print(f" Backup Codes: {len(new_backup_codes)}")
# Save for later use
test_data['secret'] = new_secret
test_data['backup_codes'] = new_backup_codes.copy()
# ==================== PHASE 5: VERIFY ENROLLMENT ====================
step += 1
print_section(step, "Verify TOTP Enrollment")
totp = pyotp.TOTP(new_secret)
utc_now = datetime.now(timezone.utc)
code = totp.at(utc_now)
print(f"Generated TOTP code: {code}")
print(f"At UTC time: {utc_now.isoformat()}")
print(f"Timestamp: {utc_now.timestamp()}")
verify_enrollment_response = session.post(
f"{BASE_URL}/auth/totp/verify-enrollment",
headers={"Authorization": f"Bearer {auth_token}"},
json={"code": code}
)
if verify_enrollment_response.status_code != 200:
print(f"❌ Verification failed: {verify_enrollment_response.status_code}")
print(json.dumps(verify_enrollment_response.json(), indent=2))
return False
print("✅ TOTP enrollment verified successfully!")
# ==================== PHASE 6: CONFIRM ENROLLMENT ====================
step += 1
print_section(step, "Confirm TOTP is Enabled")
final_status_response = session.get(
f"{BASE_URL}/auth/totp/status",
headers={"Authorization": f"Bearer {auth_token}"}
)
final_status = final_status_response.json()["data"]
if not final_status["totp_enabled"]:
print("❌ TOTP not enabled after verification!")
return False
print(f"✅ TOTP is enabled")
print(f" Verified at: {final_status['verified_at']}")
print(f" Backup codes remaining: {final_status['backup_codes_remaining']}")
# ==================== PHASE 7: TEST LOGIN WITH TOTP ====================
step += 1
print_section(step, "Logout")
session.post(f"{BASE_URL}/auth/logout", headers={"Authorization": f"Bearer {auth_token}"})
print("✅ Logged out")
step += 1
print_section(step, "Login (should REQUIRE TOTP)")
session2 = requests.Session()
login3_response = session2.post(f"{BASE_URL}/auth/login", json=CREDENTIALS)
if login3_response.status_code != 200:
print("❌ Login failed")
return False
login3_data = login3_response.json()
if not login3_data.get("data", {}).get("requires_totp"):
print("❌ Login did NOT require TOTP (it should!)")
return False
print("✅ Login correctly requires TOTP")
# ==================== PHASE 8: VERIFY TOTP DURING LOGIN ====================
step += 1
print_section(step, "Verify TOTP Code During Login")
utc_now = datetime.now(timezone.utc)
login_code = totp.at(utc_now)
print(f"Generated TOTP code: {login_code}")
print(f"At UTC time: {utc_now.isoformat()}")
verify_login_response = session2.post(
f"{BASE_URL}/auth/totp/verify",
json={"code": login_code}
)
if verify_login_response.status_code != 200:
print(f"❌ TOTP login verification failed: {verify_login_response.status_code}")
print(json.dumps(verify_login_response.json(), indent=2))
return False
final_token = verify_login_response.json()["data"]["token"]
print("✅ Successfully logged in with TOTP!")
print(f" Token: {final_token[:30]}...")
# ==================== PHASE 9: TEST /auth/me ====================
step += 1
print_section(step, "Confirm Logged In (/auth/me)")
me_response = session2.get(
f"{BASE_URL}/auth/me",
headers={"Authorization": f"Bearer {final_token}"}
)
if me_response.status_code != 200:
print("❌ /auth/me failed")
return False
me_data = me_response.json()["data"]
print(f"✅ Confirmed logged in as: {me_data['user']['email']}")
print(f" User ID: {me_data['user']['id']}")
# ==================== PHASE 10: TEST BACKUP CODE ====================
step += 1
print_section(step, "Test Backup Code Login")
# Logout
session2.post(f"{BASE_URL}/auth/logout", headers={"Authorization": f"Bearer {final_token}"})
# Fresh login
session3 = requests.Session()
login4_response = session3.post(f"{BASE_URL}/auth/login", json=CREDENTIALS)
if not login4_response.json().get("data", {}).get("requires_totp"):
print("❌ Login should require TOTP")
return False
print(f"️ Using backup code: {test_data['backup_codes'][0]}")
backup_verify_response = session3.post(
f"{BASE_URL}/auth/totp/verify",
json={"code": test_data['backup_codes'][0], "is_backup_code": True}
)
if backup_verify_response.status_code != 200:
print("❌ Backup code login failed")
print(json.dumps(backup_verify_response.json(), indent=2))
return False
backup_token = backup_verify_response.json()["data"]["token"]
print(f"✅ Logged in with backup code!")
# Remove used code
used_code = test_data['backup_codes'].pop(0)
# ==================== PHASE 11: CHECK BACKUP CODES REMAINING ====================
step += 1
print_section(step, "Check Backup Codes Remaining")
status3_response = session3.get(
f"{BASE_URL}/auth/totp/status",
headers={"Authorization": f"Bearer {backup_token}"}
)
status3_data = status3_response.json()["data"]
if status3_data['backup_codes_remaining'] != 9:
print(f"❌ Expected 9 backup codes, got {status3_data['backup_codes_remaining']}")
return False
print(f"✅ Backup codes remaining: {status3_data['backup_codes_remaining']} (was 10, now 9)")
# ==================== PHASE 12: REGENERATE BACKUP CODES ====================
step += 1
print_section(step, "Regenerate Backup Codes")
regen_response = session3.post(
f"{BASE_URL}/auth/totp/regenerate-backup-codes",
headers={"Authorization": f"Bearer {backup_token}"},
json={"password": CREDENTIALS["password"]}
)
if regen_response.status_code != 200:
print("❌ Failed to regenerate backup codes")
print(json.dumps(regen_response.json(), indent=2))
return False
regenerated_codes = regen_response.json()["data"]["backup_codes"]
print(f"✅ Regenerated {len(regenerated_codes)} backup codes")
# Update saved codes
test_data['backup_codes'] = regenerated_codes.copy()
# ==================== SUCCESS ====================
save_test_data()
print("\n" + "="*70)
print("🎉 ALL TESTS PASSED!")
print("="*70)
print("\n✅ TEST SUMMARY:")
print(f" 1. ✅ Initial login (with/without TOTP)")
print(f" 2. ✅ Check TOTP status")
print(f" 3. ✅ Disable TOTP")
print(f" 4. ✅ Logout")
print(f" 5. ✅ Re-login without TOTP")
print(f" 6. ✅ Enroll in TOTP")
print(f" 7. ✅ Verify enrollment")
print(f" 8. ✅ Confirm TOTP enabled")
print(f" 9. ✅ Logout")
print(f" 10. ✅ Login with TOTP required")
print(f" 11. ✅ Verify TOTP during login")
print(f" 12. ✅ Confirm logged in (/auth/me)")
print(f" 13. ✅ Login with backup code")
print(f" 14. ✅ Check backup codes decremented")
print(f" 15. ✅ Regenerate backup codes")
print(f"\n📱 Current TOTP Secret:")
print(f" {test_data['secret']}")
print(f"\n🔑 Current Backup Codes ({len(test_data['backup_codes'])}):")
for i, code in enumerate(test_data['backup_codes'], 1):
print(f" {i:2d}. {code}")
print("\n" + "="*70)
return True
except requests.exceptions.ConnectionError:
print(f"\n❌ CONNECTION ERROR - Server not running at {BASE_URL}")
return False
except KeyError as e:
print(f"\n❌ UNEXPECTED RESPONSE STRUCTURE: Missing key {e}")
import traceback
traceback.print_exc()
return False
except Exception as e:
print(f"\n❌ UNEXPECTED ERROR: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)