"""squish/squash/api.py — FastAPI REST microservice for Squash attestation.

Exposes the full Squash engine over HTTP.  Core endpoints:

    POST /attest               — full attestation pipeline
    POST /scan                 — security scan only
    POST /policy/evaluate      — evaluate policy against a submitted SBOM
    POST /vex/evaluate         — VEX feed evaluation against a model inventory
    GET  /policies             — list available policy templates
    POST /vex/publish          — generate an OpenVEX 0.2.0 document
    GET  /vex/status           — current VEX cache metadata (url, age, statement count, stale flag)
    POST /vex/update           — force-refresh the local VEX feed cache from a remote URL
    POST /attest/mlflow        — offline attestation for MLflow artifacts
    POST /attest/wandb         — offline attestation for W&B artifacts
    POST /attest/huggingface   — attestation with optional HuggingFace Hub push
    POST /attest/langchain     — pre-deployment attestation for LangChain pipelines
    POST /attest/mcp           — MCP tool manifest supply-chain attestation

The server is zero-config: it discovers the squish model store automatically.
Intended to run embedded alongside the squish inference server or as a
standalone compliance sidecar.

Start from CLI::

    uvicorn squash.api:app --host 0.0.0.0 --port 4444

Or programmatically::

    from squash.api import app
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=4444)

All endpoints accept and return ``application/json``.  Heavy operations
(attestation, signing) run inside a thread-pool executor so the async
event loop is never blocked.
"""

from __future__ import annotations

import asyncio
import base64
import datetime
import hashlib
import hmac
import json
import logging
import os
import tempfile
import time
import uuid
from collections import OrderedDict, defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any

try:
    from fastapi import FastAPI, HTTPException, Request, Response
    from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse
    from pydantic import BaseModel, Field
except ImportError as _e:
    raise ImportError(
        "FastAPI is required for squash.api. "
        "Install with: pip install 'squish[squash-api]'"
    ) from _e

from squash.attest import AttestConfig, AttestPipeline, AttestResult
from squash.policy import AVAILABLE_POLICIES, PolicyEngine, PolicyRegistry
from squash.scanner import ModelScanner
from squash.auth import KeyRecord, KeyStore, get_key_store, extract_bearer
from squash.rate_limiter import get_rate_limiter
from squash.quota import QuotaEnforcer
from squash.monitoring import build_health_report, setup_sentry, _squash_version
from squash.metrics import get_collector as _get_metrics
from squash.quick_check import (
    AVAILABLE_FRAMEWORKS as _QC_FRAMEWORKS,
    POLICY_TYPES as _QC_POLICY_TYPES,
    ResultStore as _QCResultStore,
    detect_policy_type as _qc_detect_policy_type,
    get_global_stats as _qc_global_stats,
    is_valid_share_hash as _qc_is_valid_share_hash,
    run_quick_check as _qc_run,
    score_all_frameworks as _qc_score_all,
)

log = logging.getLogger(__name__)

# Initialise Sentry at module load (no-op if SQUASH_SENTRY_DSN not set)
setup_sentry()

# Thread-pool for blocking attestation work
_executor = ThreadPoolExecutor(max_workers=int(os.getenv("SQUASH_WORKERS", "4")))

# In-memory scan job store: job_id → {"status": "pending"|"done"|"error", "result": dict}
# Bounded to 1000 entries — oldest evicted first (LRU via OrderedDict).
_SCAN_JOB_LIMIT = int(os.getenv("SQUASH_SCAN_JOB_LIMIT", "1000"))
_scan_jobs: OrderedDict[str, dict[str, Any]] = OrderedDict()

# ── Auth ──────────────────────────────────────────────────────────────────────
# W137: DB-backed API key auth replaces the single SQUASH_API_TOKEN env var.
# Legacy SQUASH_API_TOKEN still works as a master bypass (ops use only).
# Paths listed here bypass auth (health probes, OpenAPI schema, metrics).
_UNAUTHED_PATHS = frozenset({
    "/health", "/health/ping", "/health/detailed",
    "/docs", "/redoc", "/openapi.json", "/metrics",
    "/billing/webhook",  # Stripe verifies its own signature
    "/quick-check",      # W246: viral demo entry point — public by design
    "/demo",             # W258 (Sprint 29): static demo landing page
    "/quick-check/frameworks",
    "/trending",         # Sprint 30: aggregate viral feed — public by design
    "/history",          # P1-B: scan history audit trail — public by design
    "/api/compliance/scan",   # P2-B: multi-framework scan — public by design
    "/api/analysis/cluster",  # P2 clustering — public by design
    "/api/trends/risk",       # P2 risk trend — public by design
    "/api/analyses",          # P2 explicit recorder — public by design
    "/api/extract/obligations",   # P2 obligation extraction — public by design
    "/api/contracts/diff",        # P2 contract redline — public by design
    "/api/alerts",                # P2 alert rules — public by design
})

# Badge + public score paths are dynamic — checked via prefix in middleware
_BADGE_PATH_PREFIX = "/badge/"
_SCORE_PATH_PREFIX = "/v1/score/"   # public procurement score endpoints
_SHARE_PATH_PREFIX = "/r/"          # W247: shareable result permalinks
_DEMO_PATH_PREFIX = "/demo/"        # W258 (Sprint 29): static demo assets
_FRIENDLY_SHARE_PREFIX = "/share/"  # W259 (Sprint 29): HTML result page
_PUBLIC_API_PREFIX = "/api/"        # P2-B+ family: every /api/* route is
                                    # public-by-design (compliance scan,
                                    # cluster, trends, contracts, alerts).

# ── Rate limiter ──────────────────────────────────────────────────────────────
# W138: Per-key plan-based rate limiter (replaces per-IP env-var limiter).
# Legacy SQUASH_RATE_LIMIT still used as a global IP-level backstop.
_RATE_LIMIT = int(os.environ.get("SQUASH_RATE_LIMIT", "60"))
_rate_window: dict[str, deque] = defaultdict(deque)

# ── Plan gating — W75 ────────────────────────────────────────────────────────
# Set SQUASH_PLAN env var to "professional" or "enterprise" to unlock full
# VEX history and higher rate limits.  Default: community (free tier).
SQUASH_PLAN: str = os.environ.get("SQUASH_PLAN", "community")
_PLAN_LIMITS: dict[str, dict[str, int | None | str]] = {
    "community":     {"vex_max_age_days": 7,   "vex_rate_limit": 10,  "export_scope": "summary"},
    "professional":  {"vex_max_age_days": 90,  "vex_rate_limit": 120, "export_scope": "compliance"},
    "enterprise":    {"vex_max_age_days": None, "vex_rate_limit": None,"export_scope": "full"},
}


def _get_plan_limits() -> dict[str, int | None]:  # W75
    """Return the limit dict for the currently configured plan."""
    return _PLAN_LIMITS.get(SQUASH_PLAN, _PLAN_LIMITS["community"])

# ── Prometheus-style counters ─────────────────────────────────────────────────
_COUNTERS: dict[str, int] = {
    "squash_attest_total": 0,
    "squash_scan_total": 0,
    "squash_policy_evaluate_total": 0,
    "squash_vex_evaluate_total": 0,
    "squash_vex_update_total": 0,
    "squash_vex_status_total": 0,
    "squash_sbom_diff_total": 0,
    "squash_policy_violations_total": 0,
}

# ── Cloud dashboard stores (W52-55) ───────────────────────────────────────────
# All stores are in-process / in-memory; a production deployment backs these
# with a database by replacing the helpers below.  The API contract is stable.

# Tenant registry: tenant_id → {name, plan, contact_email, created_at}
_TENANT_LIMIT = int(os.getenv("SQUASH_TENANT_LIMIT", "1000"))
_tenants: dict[str, dict[str, Any]] = {}

# Model inventory: tenant_id → list[record dict], total capped at _INVENTORY_LIMIT
_INVENTORY_PER_TENANT = int(os.getenv("SQUASH_INVENTORY_PER_TENANT", "500"))
_inventory: defaultdict[str, deque] = defaultdict(
    lambda: deque(maxlen=_INVENTORY_PER_TENANT)
)

# VEX alert feed: tenant_id → deque[alert dict]
_VEX_ALERTS_PER_TENANT = int(os.getenv("SQUASH_VEX_ALERTS_PER_TENANT", "500"))
_vex_alerts: defaultdict[str, deque] = defaultdict(
    lambda: deque(maxlen=_VEX_ALERTS_PER_TENANT)
)

# Drift event stream: tenant_id → deque[event dict]
_DRIFT_EVENTS_PER_TENANT = int(os.getenv("SQUASH_DRIFT_EVENTS_PER_TENANT", "500"))
_drift_events: defaultdict[str, deque] = defaultdict(
    lambda: deque(maxlen=_DRIFT_EVENTS_PER_TENANT)
)

# Policy dashboard aggregates: tenant_id → policy_name → {"passed": int, "failed": int}
_policy_stats: defaultdict[str, defaultdict[str, dict[str, int]]] = defaultdict(
    lambda: defaultdict(lambda: {"passed": 0, "failed": 0})
)

# Vertex AI attestation results: tenant_id → deque[result dict]  (W66)
_VERTEX_RESULTS_PER_TENANT = int(os.getenv("SQUASH_VERTEX_RESULTS_PER_TENANT", "500"))
_vertex_results: defaultdict[str, deque] = defaultdict(
    lambda: deque(maxlen=_VERTEX_RESULTS_PER_TENANT)
)

# Azure DevOps attestation results: tenant_id → deque[result dict]  (W67)
_ADO_RESULTS_PER_TENANT = int(os.getenv("SQUASH_ADO_RESULTS_PER_TENANT", "500"))
_ado_results: defaultdict[str, deque] = defaultdict(
    lambda: deque(maxlen=_ADO_RESULTS_PER_TENANT)
)

# Azure DevOps attestation results: tenant_id → deque[result dict]  (W67)
_ADO_RESULTS_PER_TENANT = int(os.getenv("SQUASH_ADO_RESULTS_PER_TENANT", "500"))
_ado_results: defaultdict[str, deque] = defaultdict(
    lambda: deque(maxlen=_ADO_RESULTS_PER_TENANT)
)

# ── SQLite persistence (W57) ─────────────────────────────────────────────────
# Optional write-through to a SQLite file.  Set SQUASH_CLOUD_DB=/path/to/db to
# enable durability.  When absent (default) all stores remain in-memory only.
try:
    from squash.cloud_db import CloudDB as _CloudDB, _make_db as _make_cloud_db
    _db: "_CloudDB | None" = _make_cloud_db()
except Exception:  # pragma: no cover
    _db = None  # type: ignore[assignment]

# ── Quick-check share store (W247) ────────────────────────────────────────────
# Hash-keyed store for shareable /quick-check results. Backed by a JSON file
# when SQUASH_QUICKCHECK_STORE=/path/to/file.json is set; in-memory otherwise.
_quick_check_store = _QCResultStore(
    path=os.environ.get("SQUASH_QUICKCHECK_STORE") or None,
    capacity=int(os.environ.get("SQUASH_QUICKCHECK_CAPACITY", "10000")),
)


def _db_write_tenant(tenant_id: str, record: dict) -> None:
    """Upsert a tenant record into the in-memory dict and (if active) SQLite."""
    _tenants[tenant_id] = record
    if _db is not None:
        _db.upsert_tenant(tenant_id, record)


def _db_write_inventory(tenant_id: str, record: dict) -> None:
    """Append an inventory record to the deque and (if active) SQLite."""
    _inventory[tenant_id].append(record)
    if _db is not None:
        _db.append_record("inventory", tenant_id, record)


def _db_write_vex_alert(tenant_id: str, record: dict) -> None:
    """Append a VEX alert to the deque and (if active) SQLite."""
    _vex_alerts[tenant_id].append(record)
    if _db is not None:
        _db.append_record("vex_alerts", tenant_id, record)


def _db_write_drift_event(tenant_id: str, record: dict) -> None:
    """Append a drift event to the deque and (if active) SQLite."""
    _drift_events[tenant_id].append(record)
    if _db is not None:
        _db.append_record("drift_events", tenant_id, record)


def _db_inc_policy_stat(tenant_id: str, policy_name: str, *, passed: bool) -> None:
    """Increment in-memory policy bucket and (if active) SQLite counter."""
    bucket = _policy_stats[tenant_id][policy_name]
    if passed:
        bucket["passed"] += 1
    else:
        bucket["failed"] += 1
    if _db is not None:
        _db.inc_policy_stat(tenant_id, policy_name, passed=passed)


# ── W58: CloudDB read helpers ────────────────────────────────────────────────


def _db_read_inventory(tenant_id: str) -> list[dict[str, Any]]:
    """Read inventory from SQLite when active, else fall back to in-memory deque."""
    if _db is not None:
        return _db.read_inventory(tenant_id)
    return list(_inventory[tenant_id])


def _db_read_vex_alerts(tenant_id: str) -> list[dict[str, Any]]:
    """Read VEX alerts from SQLite when active, else fall back to in-memory deque."""
    if _db is not None:
        return _db.read_vex_alerts(tenant_id)
    return list(_vex_alerts[tenant_id])


def _db_read_policy_stats() -> dict[str, dict[str, int]]:
    """Cross-tenant policy aggregate — SQLite when active, else fold in-memory dict."""
    if _db is not None:
        return _db.read_policy_stats()
    result: dict[str, dict[str, int]] = {}
    for tenant_stats in _policy_stats.values():
        for policy_name, counts in tenant_stats.items():
            if policy_name not in result:
                result[policy_name] = {"passed": 0, "failed": 0}
            result[policy_name]["passed"] += counts.get("passed", 0)
            result[policy_name]["failed"] += counts.get("failed", 0)
    return result


# ── W59: Tenant delete helper ─────────────────────────────────────────────────

def _db_delete_tenant(tenant_id: str) -> None:
    """Remove a tenant and all associated records from memory and SQLite."""
    _tenants.pop(tenant_id, None)
    _inventory.pop(tenant_id, None)
    _vex_alerts.pop(tenant_id, None)
    _drift_events.pop(tenant_id, None)
    _policy_stats.pop(tenant_id, None)
    if _db is not None:
        _db.delete_tenant(tenant_id)


# ── W60: Tenant-scoped drift-events + policy-stats reads ─────────────────────

def _db_read_drift_events(tenant_id: str) -> list[dict[str, Any]]:
    """Read drift events from SQLite when active, else fall back to in-memory deque."""
    if _db is not None:
        return _db.read_drift_events(tenant_id)
    return list(_drift_events[tenant_id])


def _db_read_tenant_policy_stats(tenant_id: str) -> dict[str, dict[str, int]]:
    """Read per-tenant policy stats from SQLite when active, else use in-memory dict."""
    if _db is not None:
        return _db.read_tenant_policy_stats(tenant_id)
    return dict(_policy_stats.get(tenant_id, {}))


def _db_read_tenant_summary(tenant_id: str) -> dict[str, Any]:
    """Read tenant summary aggregate from SQLite when active, else fold in-memory stores."""
    if _db is not None:
        return _db.read_tenant_summary(tenant_id)
    return {
        "inventory_count": len(_inventory[tenant_id]),
        "vex_alert_count": len(_vex_alerts[tenant_id]),
        "drift_event_count": len(_drift_events[tenant_id]),
        "policy_stats": dict(_policy_stats.get(tenant_id, {})),
    }


def _db_read_tenant_compliance_score(tenant_id: str) -> dict:
    """Read tenant compliance score from SQLite when active, else compute from in-memory stats."""
    if _db is not None:
        return _db.read_tenant_compliance_score(tenant_id)
    # In-memory fallback — replicate CloudDB scoring logic directly.
    stats = dict(_policy_stats.get(tenant_id, {}))
    if not stats:
        return {"score": 100.0, "grade": "A", "policy_breakdown": {}}
    total_passed = 0
    total_checks = 0
    policy_breakdown: dict[str, dict] = {}
    for policy_name, counts in stats.items():
        passed = counts.get("passed", 0)
        failed = counts.get("failed", 0)
        checks = passed + failed
        rate = (passed / checks * 100) if checks else 100.0
        total_passed += passed
        total_checks += checks
        policy_breakdown[policy_name] = {
            "passed": passed,
            "failed": failed,
            "rate": round(rate, 2),
        }
    score = round(total_passed / total_checks * 100, 2) if total_checks else 100.0
    if score >= 90:
        grade = "A"
    elif score >= 75:
        grade = "B"
    elif score >= 60:
        grade = "C"
    elif score >= 45:
        grade = "D"
    else:
        grade = "F"
    return {"score": score, "grade": grade, "policy_breakdown": policy_breakdown}


def _db_read_tenant_compliance_history(tenant_id: str) -> list[dict]:
    """Return day-bucketed compliance history from SQLite or in-memory drift events."""
    if _db is not None:
        return _db.read_tenant_compliance_history(tenant_id)
    # In-memory fallback: group drift events by their embedded timestamp/date field.
    events = list(_drift_events.get(tenant_id, []))
    if not events:
        return []
    dates: set[str] = set()
    for e in events:
        raw = e.get("timestamp") or e.get("date") or ""
        day = str(raw)[:10]
        if len(day) == 10:  # plausible YYYY-MM-DD prefix
            dates.add(day)
    if not dates:
        return []
    score_data = _db_read_tenant_compliance_score(tenant_id)
    score = score_data["score"]
    grade = score_data["grade"]
    return [{"date": d, "score": score, "grade": grade} for d in sorted(dates)]


def _db_read_compliance_overview() -> dict:
    """Return platform-wide compliance aggregate from SQLite or in-memory stores."""
    _THRESHOLD = 80.0
    if _db is not None:
        return _db.read_compliance_overview()
    # In-memory fallback: iterate registered tenants, compute per-tenant score.
    tenant_ids = list(_tenants.keys())
    if not tenant_ids:
        return {
            "total_tenants": 0,
            "compliant_tenants": 0,
            "non_compliant_tenants": 0,
            "average_score": 0.0,
            "top_at_risk": [],
        }
    scores = []
    for tid in tenant_ids:
        data = _db_read_tenant_compliance_score(tid)
        scores.append({"tenant_id": tid, "score": data["score"], "grade": data["grade"]})
    total = len(scores)
    compliant = sum(1 for s in scores if s["score"] >= _THRESHOLD)
    average = round(sum(s["score"] for s in scores) / total, 4)
    at_risk = sorted(
        [s for s in scores if s["score"] < _THRESHOLD],
        key=lambda x: x["score"],
    )[:3]
    return {
        "total_tenants": total,
        "compliant_tenants": compliant,
        "non_compliant_tenants": total - compliant,
        "average_score": average,
        "top_at_risk": at_risk,
    }


def _db_read_vex_feed() -> dict:
    """Return cross-tenant VEX feed from SQLite or in-memory stores."""
    if _db is not None:
        return _db.read_vex_feed()
    # In-memory fallback: iterate registered tenants.
    tenant_ids = list(_tenants.keys())
    all_alerts: list[dict[str, Any]] = []
    for tid in tenant_ids:
        for alert in _db_read_vex_alerts(tid):
            all_alerts.append({"tenant_id": tid, **alert})
    return {
        "total_alerts": len(all_alerts),
        "tenant_count": len(tenant_ids),
        "alerts": all_alerts,
    }


def _db_append_vertex_result(
    tenant_id: str,
    model_resource_name: str,
    passed: bool,
    labels: dict[str, str] | None = None,
) -> None:
    """Store a Vertex AI attestation result in SQLite or in-memory store."""
    record: dict[str, Any] = {
        "model_resource_name": model_resource_name,
        "passed": passed,
        "labels": labels,
    }
    _vertex_results[tenant_id].appendleft(record)
    if _db is not None:
        _db.append_vertex_result(tenant_id, model_resource_name, passed, labels)


def _db_read_vertex_results(tenant_id: str) -> list[dict[str, Any]]:
    """Return Vertex AI attestation results for *tenant_id* from SQLite or in-memory."""
    if _db is not None:
        return _db.read_vertex_results(tenant_id)
    return [dict(r) for r in _vertex_results[tenant_id]]


def _db_append_ado_result(
    tenant_id: str,
    pipeline_run_id: str,
    passed: bool,
    variables: dict[str, Any] | None = None,
) -> None:
    """Store an Azure DevOps attestation result in SQLite or in-memory store."""
    record: dict[str, Any] = {
        "pipeline_run_id": pipeline_run_id,
        "passed": passed,
        "variables": variables,
    }
    _ado_results[tenant_id].appendleft(record)
    if _db is not None:
        _db.append_ado_result(tenant_id, pipeline_run_id, passed, variables)


def _db_read_ado_results(tenant_id: str) -> list[dict[str, Any]]:
    """Return Azure DevOps attestation results for *tenant_id* from SQLite or in-memory."""
    if _db is not None:
        return _db.read_ado_results(tenant_id)
    return [dict(r) for r in _ado_results[tenant_id]]


def _db_read_attestation_score(tenant_id: str) -> dict[str, Any]:
    """Return combined attestation pass/fail score for *tenant_id*.

    Aggregates GCP Vertex AI (W66) and Azure DevOps (W67) results.  Falls back
    to the in-memory deques when SQLite is not configured.
    """
    if _db is not None:
        return _db.read_attestation_score(tenant_id)
    # In-memory fallback: derive from deques.
    all_rows: list[dict[str, Any]] = (
        list(_vertex_results[tenant_id]) + list(_ado_results[tenant_id])
    )
    total = len(all_rows)
    passed = sum(1 for r in all_rows if r.get("passed"))
    failed = total - passed
    pass_rate = round(passed / total, 4) if total > 0 else 0.0
    return {"total": total, "passed": passed, "failed": failed, "pass_rate": pass_rate}


def _db_read_attestations(tenant_id: str) -> list[dict[str, Any]]:
    """Return merged attestation history for *tenant_id* from SQLite or in-memory.

    Each item includes a ``source`` field (``"vertex"`` or ``"ado"``).
    SQLite path returns results sorted by ts DESC; in-memory path returns
    vertex results (newest first) followed by ado results (newest first).
    """
    if _db is not None:
        return _db.read_attestations(tenant_id)
    # In-memory fallback: tag each row with source; order is insertion order per source.
    att: list[dict[str, Any]] = []
    for r in _vertex_results[tenant_id]:
        att.append({"source": "vertex", **r})
    for r in _ado_results[tenant_id]:
        att.append({"source": "ado", **r})
    return att


def _db_read_attestation_overview() -> dict[str, Any]:
    """Return platform-wide attestation aggregate from SQLite or in-memory stores."""
    if _db is not None:
        return _db.read_attestation_overview()
    tenant_ids = list(_tenants.keys())
    if not tenant_ids:
        return {
            "total_attestations": 0,
            "tenants_covered": 0,
            "platform_pass_rate": 0.0,
            "tenants_with_failures": [],
        }
    total_attestations = 0
    total_passed = 0
    tenants_with_failures: list[dict[str, Any]] = []
    for tid in tenant_ids:
        score = _db_read_attestation_score(tid)
        total_attestations += score["total"]
        total_passed += score["passed"]
        if score["failed"] > 0:
            tenants_with_failures.append({
                "tenant_id": tid,
                "failed": score["failed"],
                "pass_rate": score["pass_rate"],
            })
    platform_pass_rate = (
        round(total_passed / total_attestations, 4) if total_attestations > 0 else 0.0
    )
    return {
        "total_attestations": total_attestations,
        "tenants_covered": len(tenant_ids),
        "platform_pass_rate": platform_pass_rate,
        "tenants_with_failures": tenants_with_failures,
    }


# EU AI Act enforcement deadline — W74.
_ENFORCEMENT_DATE = datetime.date(2026, 8, 2)


def _enforcement_signal() -> dict[str, Any]:  # W74
    """Return EU AI Act enforcement deadline fields for in-memory conformance responses."""
    today = datetime.date.today()
    days = (_ENFORCEMENT_DATE - today).days
    risk = (
        "CRITICAL" if days < 30
        else "HIGH" if days < 90
        else "MODERATE" if days < 180
        else "LOW"
    )
    return {
        "enforcement_deadline": "2026-08-02",
        "days_until_enforcement": days,
        "enforcement_risk_level": risk,
    }


def _db_read_tenant_conformance(tenant_id: str) -> dict[str, Any]:  # W71
    """Return EU AI Act conformance for *tenant_id* from SQLite or in-memory."""
    if _db is not None:
        return _db.read_tenant_conformance(tenant_id)
    # ── in-memory path ──────────────────────────────────────────────────────
    comp = _db_read_tenant_compliance_score(tenant_id)
    att = _db_read_attestation_score(tenant_id)
    vex = _db_read_vex_alerts(tenant_id)

    compliance_score: float = comp.get("score", 100.0)
    attestation_pass_rate: float = att["pass_rate"]
    open_vex_alerts: int = len(vex)

    reasons: list[str] = []
    if compliance_score < 80.0:
        reasons.append(
            f"compliance_score {compliance_score} < 80.0 (Art. 9/17)"
        )
    if attestation_pass_rate < 0.8:
        reasons.append(
            f"attestation_pass_rate {attestation_pass_rate} < 0.8 (Art. 12/18)"
        )
    if open_vex_alerts > 0:
        reasons.append(
            f"{open_vex_alerts} open VEX alert(s) (Art. 9 supply-chain risk)"
        )
    return {
        "conformant": len(reasons) == 0,
        "compliance_score": compliance_score,
        "attestation_pass_rate": attestation_pass_rate,
        "open_vex_alerts": open_vex_alerts,
        "reasons": reasons,
        **_enforcement_signal(),
    }


def _db_read_conformance_report() -> dict[str, Any]:  # W72
    """Return platform-wide conformance report from SQLite or in-memory."""
    if _db is not None:
        return _db.read_conformance_report()
    # ── in-memory path ──────────────────────────────────────────────────────
    tids = list(_tenants.keys())
    total = len(tids)
    conformant_count = 0
    non_conformant: list[dict[str, Any]] = []
    for tid in tids:
        status = _db_read_tenant_conformance(tid)
        if status["conformant"]:
            conformant_count += 1
        else:
            non_conformant.append({
                "tenant_id": tid,
                "compliance_score": status["compliance_score"],
                "attestation_pass_rate": status["attestation_pass_rate"],
                "open_vex_alerts": status["open_vex_alerts"],
                "reasons": status["reasons"],
            })
    return {
        "total_tenants": total,
        "conformant_tenants": conformant_count,
        "non_conformant_tenants": total - conformant_count,
        "non_conformant": non_conformant,
        **_enforcement_signal(),
    }


# ── Cloud auth helpers (W52-55) ───────────────────────────────────────────────

def _verify_jwt_hs256(token: str, secret: str) -> dict[str, Any]:
    """Minimal stdlib HS256 JWT verifier — no external dependencies.

    Raises ``ValueError`` on invalid structure, bad signature, or expiry.
    """
    parts = token.split(".")
    if len(parts) != 3:
        raise ValueError("invalid JWT structure")
    header_b64, payload_b64, sig_b64 = parts
    signing_input = f"{header_b64}.{payload_b64}".encode()
    expected_sig = hmac.new(
        secret.encode(), signing_input, hashlib.sha256
    ).digest()
    pad = "=" * (4 - len(sig_b64) % 4)
    actual_sig = base64.urlsafe_b64decode(sig_b64 + pad)
    if not hmac.compare_digest(expected_sig, actual_sig):
        raise ValueError("invalid JWT signature")
    pad2 = "=" * (4 - len(payload_b64) % 4)
    claims: dict[str, Any] = json.loads(
        base64.urlsafe_b64decode(payload_b64 + pad2)
    )
    if "exp" in claims and claims["exp"] < time.time():
        raise ValueError("JWT expired")
    return claims


def _resolve_tenant_id(request: Request) -> str:
    """Return the tenant_id for the current request.

    Resolution order:
    1. If ``SQUASH_JWT_SECRET`` is set: decode ``Authorization: Bearer <jwt>``
       and return the ``tenant_id`` claim (or ``sub`` as fallback).
    2. Otherwise: return the ``X-Tenant-ID`` header value (may be empty string
       for single-tenant deployments).
    """
    secret = os.environ.get("SQUASH_JWT_SECRET", "")
    if secret:
        auth = request.headers.get("Authorization", "")
        if not auth.startswith("Bearer "):
            raise HTTPException(status_code=401, detail="JWT required for cloud endpoints")
        try:
            claims = _verify_jwt_hs256(auth[7:], secret)
        except ValueError as exc:
            raise HTTPException(status_code=401, detail=f"Invalid JWT: {exc}") from exc
        tid = claims.get("tenant_id") or claims.get("sub", "")
        if not tid:
            raise HTTPException(status_code=401, detail="JWT missing tenant_id claim")
        return str(tid)
    return request.headers.get("X-Tenant-ID", "")


app = FastAPI(
    title="Squash — AI-SBOM Attestation API",
    description=(
        "Compliance attestation for AI/ML models. "
        "Generates CycloneDX + SPDX SBOMs, evaluates policy templates, "
        "performs security scanning, and handles Sigstore signing."
    ),
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc",
)


# ──────────────────────────────────────────────────────────────────────────────
# Modular routers (squash.routes.*)
# ──────────────────────────────────────────────────────────────────────────────
# These live in their own package so the legacy flat squash/api.py module
# does not need to be converted into a package (which would invalidate
# `from squash.api import app` imports across the codebase).
from squash.routes.compliance import analysis_router, compliance_router  # noqa: E402
from squash.routes.trends import trends_router  # noqa: E402
from squash.routes.contracts import contracts_router  # noqa: E402
from squash.routes.alerts import alerts_router  # noqa: E402

app.include_router(compliance_router)
app.include_router(analysis_router)
app.include_router(trends_router)
app.include_router(contracts_router)
app.include_router(alerts_router)


# ──────────────────────────────────────────────────────────────────────────────
# CORS — demo UI hits the API cross-origin from :8002 (or file://, or any
# locally-developed front-end). All public-by-design routes already live under
# /api/* or the legacy unauth allow-list; authenticated routes still gate on
# the Authorization header, which CORS does not bypass.
# ──────────────────────────────────────────────────────────────────────────────
from fastapi.middleware.cors import CORSMiddleware  # noqa: E402

_cors_origins = os.environ.get(
    "SQUASH_CORS_ORIGINS",
    "*",  # demo posture; tighten in prod via env
).split(",")
_cors_origins = [o.strip() for o in _cors_origins if o.strip()]

app.add_middleware(
    CORSMiddleware,
    allow_origins=_cors_origins,
    allow_origin_regex=None if _cors_origins != ["*"] else None,
    allow_credentials=False,           # demo paths don't use cookies
    allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
    allow_headers=["*"],
    expose_headers=["X-Request-ID", "X-Squash-Version"],
    max_age=600,
)


# ──────────────────────────────────────────────────────────────────────────────
# Middleware — auth + rate limiter
# ──────────────────────────────────────────────────────────────────────────────


@app.middleware("http")
async def _security_middleware(request: Request, call_next):  # noqa: C901
    """W137/W138: DB-backed API key auth + per-key plan-based rate limiter.

    Auth priority:
      1. Path in _UNAUTHED_PATHS → pass through
      2. Legacy SQUASH_API_TOKEN matches → pass through (ops bypass)
      3. Authorization: Bearer sq_live_* / sq_test_* → key store lookup
      4. No valid auth → 401

    Rate limiting (W138):
      - Per-key sliding window using the plan's rate_per_min limit
      - Falls back to per-IP limit (SQUASH_RATE_LIMIT) for unauthed paths
    """
    path = request.url.path

    # ── Unauthed paths: IP-level backstop only ────────────────────────────────
    if (
        path in _UNAUTHED_PATHS
        or path.startswith(_BADGE_PATH_PREFIX)
        or path.startswith(_SCORE_PATH_PREFIX)
        or path.startswith(_SHARE_PATH_PREFIX)
        or path.startswith(_DEMO_PATH_PREFIX)
        or path.startswith(_FRIENDLY_SHARE_PREFIX)
        or path.startswith(_PUBLIC_API_PREFIX)
    ):
        client_ip = request.client.host if request.client else "unknown"
        now = time.monotonic()
        window = _rate_window[client_ip]
        cutoff = now - 60.0
        while window and window[0] < cutoff:
            window.popleft()
        if len(window) >= _RATE_LIMIT:
            retry_after = int(60 - (now - window[0])) + 1
            return JSONResponse(
                {"detail": "Rate limit exceeded"},
                status_code=429,
                headers={"Retry-After": str(retry_after)},
            )
        window.append(now)
        return await call_next(request)

    auth_header = request.headers.get("Authorization", "")
    raw_key = extract_bearer(auth_header)

    # ── Legacy master token bypass (ops/admin use) ────────────────────────────
    master_token = os.environ.get("SQUASH_API_TOKEN", "")
    if master_token and auth_header == f"Bearer {master_token}":
        return await call_next(request)

    # ── DB-backed API key verification (W137) ─────────────────────────────────
    key_record: KeyRecord | None = None
    if raw_key:
        key_record = get_key_store().verify(raw_key)

    if key_record is None:
        # Allow unauthenticated access when no auth is configured at all
        # (dev mode: neither SQUASH_API_TOKEN nor any key store keys exist)
        store = get_key_store()
        if not master_token and len(store) == 0:
            return await call_next(request)
        return JSONResponse({"detail": "Unauthorized — provide a valid API key"}, status_code=401)

    # ── Per-key rate limit (W138) ─────────────────────────────────────────────
    rl = get_rate_limiter()
    result = rl.check(key_record.key_id, key_record.plan)
    if not result.allowed:
        return JSONResponse(
            {"detail": "Rate limit exceeded", "retry_after": result.retry_after},
            status_code=429,
            headers={
                "Retry-After": str(result.retry_after),
                "X-RateLimit-Limit": str(result.window_limit),
                "X-RateLimit-Remaining": "0",
            },
        )

    # Stamp last_used and attach record to request state for downstream handlers
    get_key_store().update_last_used(key_record.key_id)
    request.state.key_record = key_record

    response = await call_next(request)

    # Attach rate-limit headers to every response
    response.headers["X-RateLimit-Limit"] = str(result.window_limit)
    response.headers["X-RateLimit-Remaining"] = str(result.window_limit - result.window_used)
    response.headers["X-Key-Id"] = key_record.key_id
    return response


# ──────────────────────────────────────────────────────────────────────────────
# Request / Response models
# ──────────────────────────────────────────────────────────────────────────────


class AttestRequest(BaseModel):
    model_path: str = Field(description="Absolute path to model dir or file")
    output_dir: str | None = Field(
        default=None,
        description="Destination for artifacts; defaults to model_path dir",
    )
    model_id: str = Field(default="", description="Human-readable model name")
    hf_repo: str = Field(default="", description="HuggingFace repo ID")
    model_family: str | None = Field(default=None, description="Architecture family")
    quant_format: str = Field(default="unknown", description="Quantization format")
    policies: list[str] = Field(default_factory=lambda: ["enterprise-strict"])
    sign: bool = Field(default=False)
    offline: bool = Field(
        default=False,
        description="Air-gapped mode — skip all OIDC/network calls (also set by SQUASH_OFFLINE=1)",
    )
    local_signing_key: str | None = Field(
        default=None,
        description="Absolute path to an Ed25519 .priv.pem for offline signing (requires sign=True, offline=True)",
    )
    fail_on_violation: bool = Field(default=False)
    skip_scan: bool = Field(default=False)
    training_dataset_ids: list[str] = Field(default_factory=list)
    vex_feed_path: str | None = Field(default=None)
    vex_feed_url: str = Field(default="")
    # ── SPDX AI Profile enrichment (mirrors squash attest --spdx-* CLI flags) ──
    spdx_type: str | None = Field(
        default=None,
        description=(
            "SPDX AI Profile type_of_model "
            "(e.g. text-generation, text-classification, summarization). Default: text-generation"
        ),
    )
    spdx_safety_risk: str | None = Field(
        default=None,
        description="SPDX AI Profile safetyRiskAssessment: high | medium | low | unspecified",
    )
    spdx_datasets: list[str] = Field(
        default_factory=list,
        description=(
            "HuggingFace dataset IDs or URIs for the SPDX AI Profile "
            "(merged with training_dataset_ids; e.g. [\"wikipedia\", \"c4\"])"
        ),
    )
    spdx_training_info: str | None = Field(
        default=None,
        description="SPDX AI Profile informationAboutTraining free-text. Default: see-model-card",
    )
    spdx_sensitive_data: str | None = Field(
        default=None,
        description="SPDX AI Profile sensitivePIIInTrainingData: absent | present | unknown",
    )
    tenant_id: str = Field(
        default="",
        description=(
            "Optional tenant identifier.  When set the attestation result is "
            "automatically registered in the cloud model inventory under this tenant."
        ),
    )


class ScanRequest(BaseModel):
    model_path: str = Field(description="Absolute path to model dir or file")


class PolicyEvaluateRequest(BaseModel):
    sbom: dict[str, Any] = Field(description="CycloneDX BOM as JSON object")
    policy: str = Field(description="Policy name to evaluate (ignored when custom_rules is set)")
    custom_rules: list[dict[str, Any]] | None = Field(
        default=None,
        description=(
            "Ad-hoc rule list. When provided, evaluated via PolicyEngine.evaluate_custom "
            "and the 'policy' field is used only as the result label."
        ),
    )


class VexUpdateRequest(BaseModel):
    url: str | None = Field(
        default=None,
        description="VEX feed URL. Falls back to $SQUASH_VEX_URL then VexCache.DEFAULT_URL.",
    )
    timeout: float = Field(default=30.0, description="HTTP timeout in seconds.")


class VexEvaluateRequest(BaseModel):
    sbom_path: str = Field(description="Absolute path to CycloneDX BOM JSON")
    vex_feed_path: str | None = Field(
        default=None, description="Path to local VEX feed directory"
    )
    vex_feed_url: str = Field(default="", description="HTTPS URL to remote VEX feed")


class SbomDiffRequest(BaseModel):
    """Two SBOM file paths to compare."""
    sbom_a_path: str = Field(description="Path to the older (baseline) CycloneDX BOM JSON")
    sbom_b_path: str = Field(description="Path to the newer CycloneDX BOM JSON")


class VerifyRequest(BaseModel):
    """Verify a Sigstore bundle for a model's CycloneDX BOM."""
    model_path: str = Field(description="Absolute path to model dir (contains cyclonedx-mlbom.json)")
    bundle_path: str | None = Field(
        default=None,
        description="Explicit .sig.json bundle path; defaults to <bom>.sig.json",
    )
    strict: bool = Field(
        default=False,
        description="When True, treat a missing bundle as a verification failure",
    )


class WebhookTestRequest(BaseModel):
    """Send a test event to the configured webhook URL."""
    webhook_url: str | None = Field(
        default=None,
        description="Override URL (uses SQUASH_WEBHOOK_URL env if omitted)",
    )


class PushRequest(BaseModel):
    """Push a CycloneDX BOM to an SBOM registry."""
    model_path: str = Field(description="Absolute path to model directory")
    registry_url: str = Field(description="Base URL of the SBOM registry")
    api_key: str = Field(default="", description="API key / bearer token for the registry")
    registry_type: str = Field(
        default="squash",
        description="Registry type: 'dtrack' (Dependency-Track), 'guac', or 'squash'",
    )


class ComposedAttestRequest(BaseModel):
    """Composite multi-model attestation request."""
    model_paths: list[str] = Field(description="Absolute paths to component model directories")
    output_dir: str | None = Field(default=None, description="Destination for artifacts")
    policies: list[str] = Field(default_factory=lambda: ["enterprise-strict"])
    sign: bool = Field(default=False)


# ── W49: offline / air-gapped request models ─────────────────────────────────

class KeygenRequest(BaseModel):
    """Generate a local Ed25519 keypair for offline BOM signing."""
    key_name: str = Field(description="Base filename for the keypair (no extension)")
    key_dir: str = Field(
        default=".",
        description="Directory to write <key_name>.priv.pem and <key_name>.pub.pem",
    )


class VerifyLocalRequest(BaseModel):
    """Verify a CycloneDX BOM against a local Ed25519 offline signature."""
    bom_path: str = Field(description="Absolute path to the CycloneDX BOM to verify")
    pub_key_path: str = Field(description="Absolute path to the Ed25519 .pub.pem file")
    sig_path: str | None = Field(
        default=None,
        description="Explicit .sig file path; defaults to <bom_path with .sig extension>",
    )


class PackOfflineRequest(BaseModel):
    """Bundle a model directory into a portable .squash-bundle.tar.gz archive."""
    model_dir: str = Field(description="Absolute path to the model directory to bundle")
    output_path: str | None = Field(
        default=None,
        description=(
            "Output .squash-bundle.tar.gz path; "
            "auto-generated as <model_dir>-<timestamp>.squash-bundle.tar.gz if omitted"
        ),
    )


# ── W52-55: Cloud dashboard request/response models ───────────────────────────


class TenantCreateRequest(BaseModel):
    """Register a new tenant in the Squash Cloud dashboard."""
    tenant_id: str = Field(description="Unique tenant identifier (slug, max 64 chars)")
    name: str = Field(description="Human-readable tenant / organisation name")
    plan: str = Field(default="community", description="Subscription plan: community | pro | enterprise")
    contact_email: str = Field(default="", description="Primary contact e-mail")


class TenantUpdateRequest(BaseModel):
    """Partial update fields for a registered tenant.  All fields are optional;
    only the fields explicitly supplied in the request body are changed."""

    name: str | None = Field(default=None, description="New human-readable tenant / organisation name")
    plan: str | None = Field(default=None, description="New subscription plan: community | pro | enterprise")
    contact_email: str | None = Field(default=None, description="New contact e-mail")


class InventoryRegisterRequest(BaseModel):
    """Register an attestation result in the cloud model inventory.

    Called by CI/CD pipelines after a successful ``POST /attest``.
    """
    tenant_id: str = Field(description="Tenant that owns this model")
    model_id: str = Field(description="Human-readable model identifier")
    model_path: str = Field(description="Filesystem path where the model resides")
    bom_path: str = Field(default="", description="Path to the generated CycloneDX BOM")
    attestation_passed: bool = Field(description="Whether all policies passed")
    policy_results: dict[str, Any] = Field(
        default_factory=dict,
        description="Map of policy_name → {passed, error_count, warning_count}",
    )
    vex_cves: list[str] = Field(
        default_factory=list,
        description="CVE IDs surfaced by VEX evaluation for this model",
    )
    timestamp: str = Field(default="", description="ISO-8601 attestation timestamp; auto-set if empty")


class VexAlertRequest(BaseModel):
    """Ingest a VEX alert event into the cloud dashboard feed."""
    tenant_id: str = Field(description="Tenant to scope this alert to")
    cve_id: str = Field(description="CVE identifier (e.g. CVE-2024-12345)")
    severity: str = Field(default="unknown", description="critical | high | medium | low | unknown")
    model_id: str = Field(default="", description="Affected model identifier")
    status: str = Field(default="open", description="open | acknowledged | resolved")
    detail: str = Field(default="", description="Human-readable detail / remediation note")


class DriftEventRequest(BaseModel):
    """Ingest a drift event into the cloud dashboard stream."""
    tenant_id: str = Field(description="Tenant to scope this event to")
    model_id: str = Field(description="Model that changed")
    bom_a: str = Field(description="Path or identifier of the baseline BOM")
    bom_b: str = Field(description="Path or identifier of the new BOM")
    added: list[str] = Field(default_factory=list, description="Component hashes / IDs added")
    removed: list[str] = Field(default_factory=list, description="Component hashes / IDs removed")
    changed: list[str] = Field(default_factory=list, description="Component hashes / IDs changed")
    severity: str = Field(default="info", description="info | warning | critical")
    timestamp: str = Field(default="", description="ISO-8601 event timestamp; auto-set if empty")


# ──────────────────────────────────────────────────────────────────────────────
# Endpoints
# ──────────────────────────────────────────────────────────────────────────────


@app.get("/health")
async def health() -> dict[str, str]:
    """Liveness probe — used by Fly.io health checks and Better Uptime."""
    return {"status": "ok"}


@app.get("/health/ping")
async def health_ping() -> str:
    """Ultra-light ping endpoint for Better Uptime / uptime monitors."""
    return "pong"


@app.get("/health/detailed")
async def health_detailed() -> JSONResponse:
    """W144 — Detailed health report with DB and component status."""
    report = build_health_report(db=_db)
    status_code = 200 if report["status"] == "ok" else 503
    return JSONResponse(report, status_code=status_code)


@app.post("/billing/webhook")
async def billing_webhook(request: Request) -> JSONResponse:
    """W141 — Stripe webhook endpoint.  Verifies signature and updates tenant plans."""
    from squash.billing import StripeWebhookHandler
    payload = await request.body()
    sig = request.headers.get("Stripe-Signature", "")
    secret = os.environ.get("SQUASH_STRIPE_WEBHOOK_SECRET", "")
    handler = StripeWebhookHandler(get_key_store())
    result = handler.handle(payload, stripe_signature=sig, webhook_secret=secret)
    return JSONResponse(result.to_dict())


class _CheckoutRequest(BaseModel):
    plan: str
    tenant_id: str = ""
    customer_email: str = ""
    success_url: str = ""
    cancel_url: str = ""


@app.post("/billing/checkout")
async def billing_checkout(body: _CheckoutRequest, request: Request) -> JSONResponse:
    """W155 — Create a Stripe Checkout session for plan upgrade.

    Returns a ``checkout_url`` to redirect the user to Stripe's hosted payment page.
    Requires ``SQUASH_STRIPE_SECRET_KEY`` and the matching
    ``SQUASH_STRIPE_PRICE_{PLAN}`` env var to be set.

    Plans: ``pro`` ($299/mo) · ``startup`` ($499/mo) · ``team`` ($899/mo) · ``enterprise``
    """
    from squash.billing import create_checkout_session

    valid_plans = {"pro", "startup", "team", "enterprise"}
    if body.plan not in valid_plans:
        raise HTTPException(
            status_code=422,
            detail=f"Invalid plan {body.plan!r}. Must be one of: {sorted(valid_plans)}",
        )

    base = os.environ.get("SQUASH_BASE_URL", "https://getsquash.dev")
    success_url = body.success_url or f"{base}/billing/success?plan={body.plan}"
    cancel_url = body.cancel_url or f"{base}/pricing"

    tenant_id = body.tenant_id
    if not tenant_id:
        rec: KeyRecord | None = getattr(request.state, "key_record", None)
        if rec:
            tenant_id = rec.tenant_id

    try:
        result = create_checkout_session(
            tenant_id=tenant_id or "anonymous",
            plan=body.plan,
            success_url=success_url,
            cancel_url=cancel_url,
            customer_email=body.customer_email,
        )
        return JSONResponse({
            "checkout_url": result.url,
            "session_id": result.session_id,
            "plan": result.plan,
        }, status_code=201)
    except (ImportError, RuntimeError, ValueError) as exc:
        raise HTTPException(status_code=503, detail=str(exc)) from exc


@app.get("/account/status")
async def account_status(request: Request) -> JSONResponse:
    """W143 — Current plan, quota, and rate-limit status for the authenticated key."""
    rec: KeyRecord | None = getattr(request.state, "key_record", None)
    if rec is None:
        raise HTTPException(status_code=401, detail="Valid API key required")
    from squash.quota import QuotaEnforcer
    quota = QuotaEnforcer(get_key_store()).check(rec)
    return JSONResponse({
        "key_id": rec.key_id,
        "key_name": rec.name,
        "tenant_id": rec.tenant_id,
        "plan": rec.plan,
        "billing_period_start": rec.billing_period_start,
        "quota_used": quota.used,
        "quota_limit": quota.limit,
        "quota_remaining": quota.remaining,
        "rate_limit_per_minute": rec.rate_per_min,
        "last_used_at": rec.last_used_at,
    })


@app.get("/account/usage")
async def account_usage(request: Request) -> JSONResponse:
    """W143 — Attestation usage summary for the current billing period."""
    rec: KeyRecord | None = getattr(request.state, "key_record", None)
    if rec is None:
        raise HTTPException(status_code=401, detail="Valid API key required")
    return JSONResponse({
        "key_id": rec.key_id,
        "tenant_id": rec.tenant_id,
        "period_start": rec.billing_period_start,
        "total_attestations": rec.attestation_count,
        "monthly_quota": rec.monthly_quota,
        "quota_remaining": rec.quota_remaining,
    })


@app.post("/keys")
async def create_api_key(request: Request) -> JSONResponse:
    """W137 — Generate a new API key for the authenticated tenant."""
    rec: KeyRecord | None = getattr(request.state, "key_record", None)
    # Only allow if authenticated OR in dev mode (no keys configured)
    store = get_key_store()
    if rec is None and len(store) > 0:
        raise HTTPException(status_code=401, detail="Valid API key required")
    body = await request.json()
    tenant_id = body.get("tenant_id") or (rec.tenant_id if rec else "default")
    plan = body.get("plan", "free")
    name = body.get("name", "")
    live = not body.get("test", False)
    try:
        plaintext, new_rec = store.generate(tenant_id, plan=plan, name=name, live=live)
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc))
    resp = new_rec.to_dict()
    resp["key"] = plaintext  # returned only once
    return JSONResponse(resp, status_code=201)


@app.delete("/keys/{key_id}")
async def revoke_api_key(key_id: str, request: Request) -> JSONResponse:
    """W137 — Revoke an API key."""
    rec: KeyRecord | None = getattr(request.state, "key_record", None)
    store = get_key_store()
    target = store.get(key_id)
    if target is None:
        raise HTTPException(status_code=404, detail="Key not found")
    # Only the key's own tenant may revoke it
    if rec and rec.tenant_id != target.tenant_id:
        raise HTTPException(status_code=403, detail="Cannot revoke a key belonging to another tenant")
    store.revoke(key_id)
    return JSONResponse({"key_id": key_id, "status": "revoked"})


@app.get("/metrics")
async def metrics() -> PlainTextResponse:
    """Prometheus-compatible metrics export (OpenMetrics text format 0.0.4).

    Emits squash_attestations_total, squash_policy_violations_total,
    squash_drift_events_total, squash_models_compliant_ratio,
    squash_api_requests_total, squash_api_latency_seconds, plus
    legacy _COUNTERS for backward compat.
    """
    collector_text = _get_metrics().render()
    # Legacy counter shim for backward compatibility with existing dashboards
    legacy_lines: list[str] = []
    for name, value in _COUNTERS.items():
        if f"{name}" not in collector_text:
            legacy_lines.append(f"# TYPE {name} counter")
            legacy_lines.append(f"{name} {value}")
    suffix = ("\n" + "\n".join(legacy_lines)) if legacy_lines else ""
    return PlainTextResponse(
        collector_text + suffix,
        media_type="text/plain; version=0.0.4; charset=utf-8",
    )


# ── Badge SVG endpoint (W161) ─────────────────────────────────────────────────

_BADGE_SVG_TEMPLATE = """\
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="{width}" height="20" role="img" aria-label="{label_left}: {label_right}">
  <title>{label_left}: {label_right}</title>
  <linearGradient id="s" x2="0" y2="100%">
    <stop offset="0" stop-color="#bbb" stop-opacity=".1"/>
    <stop offset="1" stop-opacity=".1"/>
  </linearGradient>
  <clipPath id="r"><rect width="{width}" height="20" rx="3" fill="#fff"/></clipPath>
  <g clip-path="url(#r)">
    <rect width="{left_width}" height="20" fill="#555"/>
    <rect x="{left_width}" width="{right_width}" height="20" fill="{color}"/>
    <rect width="{width}" height="20" fill="url(#s)"/>
  </g>
  <g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="110">
    <text x="{left_cx}" y="150" fill="#010101" fill-opacity=".3" transform="scale(.1)" textLength="{left_tl}" lengthAdjust="spacing">{label_left}</text>
    <text x="{left_cx}" y="140" transform="scale(.1)" textLength="{left_tl}" lengthAdjust="spacing">{label_left}</text>
    <text x="{right_cx}" y="150" fill="#010101" fill-opacity=".3" transform="scale(.1)" textLength="{right_tl}" lengthAdjust="spacing">{label_right}</text>
    <text x="{right_cx}" y="140" transform="scale(.1)" textLength="{right_tl}" lengthAdjust="spacing">{label_right}</text>
  </g>
</svg>"""

_BADGE_COLORS = {
    "compliant": "#4c1",     # green
    "passing": "#4c1",
    "non-compliant": "#e05d44",  # red
    "failing": "#e05d44",
    "partial": "#dfb317",    # yellow
    "unknown": "#9f9f9f",    # grey
}


def _make_badge_svg(framework: str, status: str) -> str:
    left = f"squash | {framework}"
    right = status
    color = _BADGE_COLORS.get(status.lower(), "#9f9f9f")
    lw = max(6 * len(left) + 20, 80)
    rw = max(6 * len(right) + 20, 60)
    w = lw + rw
    return _BADGE_SVG_TEMPLATE.format(
        width=w, left_width=lw, right_width=rw,
        left_cx=lw * 5, right_cx=lw * 10 + rw * 5,
        left_tl=(len(left)) * 60, right_tl=(len(right)) * 60,
        label_left=left, label_right=right,
        color=color,
    )


@app.get("/badge/{framework}/{status}")
async def compliance_badge(framework: str, status: str = "unknown") -> Response:
    """Shields.io-compatible SVG compliance badge.

    Usage in GitHub README::

        ![Squash Compliant](https://api.getsquash.dev/badge/eu-ai-act/compliant)

    Parameters
    ----------
    framework:
        Policy framework name (eu-ai-act, nist-ai-rmf, iso-42001, …).
    status:
        Attestation status: compliant | non-compliant | partial | unknown.
    """
    svg = _make_badge_svg(framework.replace("-", " ").upper(), status)
    return Response(
        content=svg,
        media_type="image/svg+xml",
        headers={
            "Cache-Control": "no-cache, max-age=0",
            "ETag": f'"{hash(svg)}"',
        },
    )


@app.get("/policies")
async def list_policies() -> dict[str, list[str]]:
    """Return the names of all built-in policy templates."""
    return {"policies": sorted(AVAILABLE_POLICIES)}


# ── Sprint 28 W246/W247: Quick-check + shareable permalinks ──────────────────


class _QuickCheckRequest(BaseModel):
    """JSON body for ``POST /quick-check``.

    The endpoint also accepts ``Content-Type: text/plain`` — in that case the
    raw body is treated as ``text`` and ``framework`` defaults to ``general``.
    """

    text: str = Field(..., description="Policy / ToS / DPA / cookie notice text to score.")
    framework: str = Field(
        "general",
        description="Clause library: gdpr | ccpa | eu-ai-act | general | auto",
    )
    share: bool = Field(
        True,
        description="If true, store the result and return a /r/{hash} share permalink.",
    )


def _quick_check_response(text: str, framework: str, share: bool, base_url: str) -> dict[str, Any]:
    """Build the ``/quick-check`` response and record the check for trending.

    Sprint 30: each call adds the detected ``policy_type`` to the share
    payload (so the SVG card and result page can render it) and records
    the ``(policy_type, verdict)`` pair in the global stats tracker that
    backs ``GET /trending``.

    P1 (v3.8.0): every scan is appended to :class:`scan_history.ScanHistory`
    so the demo's "recent scans" panel and ``GET /history`` have a real
    audit trail to render. Failures are swallowed — a broken history
    sink must never break the user-facing quick-check.
    """
    result = _qc_run(text, framework=framework)
    payload = result.to_dict()
    policy_type = _qc_detect_policy_type(text)
    payload["policy_type"] = policy_type
    _qc_global_stats().record(policy_type, payload["verdict"])
    response: dict[str, Any] = {"result": payload}
    if share:
        share_hash = _quick_check_store.put(payload)
        response["share_hash"] = share_hash
        response["share_url"] = f"{base_url.rstrip('/')}/r/{share_hash}"
        response["card_url"] = f"{base_url.rstrip('/')}/r/{share_hash}/card.svg"
    else:
        share_hash = ""
    try:
        from squash.scan_history import global_history
        global_history().record(
            text=text,
            framework=payload.get("framework") or framework,
            verdict=payload["verdict"],
            score=int(payload["score"]),
            share_hash=share_hash,
        )
    except Exception as exc:  # noqa: BLE001
        log.warning("scan_history.record failed (non-fatal): %s", exc)
    return response


@app.post("/quick-check")
async def quick_check(request: Request) -> JSONResponse:
    """One-click compliance scan for pasted policy text — no auth required.

    Accepts either ``application/json`` (``{text, framework?, share?}``) or
    ``text/plain`` (raw body is the text). Returns a pass / warn / fail
    verdict with score and clause breakdown in <2 seconds. When ``share`` is
    true (default), the response includes a ``share_url`` permalink that
    anyone can visit at ``GET /r/{hash}``.

    Available frameworks: ``gdpr``, ``ccpa``, ``eu-ai-act``, ``general``, ``auto``.
    """
    content_type = (request.headers.get("content-type") or "").split(";")[0].strip().lower()
    base_url = str(request.base_url)
    if content_type == "text/plain":
        raw = await request.body()
        try:
            text = raw.decode("utf-8")
        except UnicodeDecodeError as exc:
            raise HTTPException(status_code=400, detail="text body must be UTF-8") from exc
        framework = request.query_params.get("framework", "general")
        share_param = request.query_params.get("share", "true").lower()
        share = share_param not in {"false", "0", "no"}
        try:
            return JSONResponse(_quick_check_response(text, framework, share, base_url))
        except ValueError as exc:
            raise HTTPException(status_code=422, detail=str(exc)) from exc

    # default path: JSON body
    try:
        body = await request.json()
    except (json.JSONDecodeError, ValueError) as exc:
        raise HTTPException(status_code=400, detail="invalid JSON body") from exc
    if not isinstance(body, dict):
        raise HTTPException(status_code=400, detail="JSON body must be an object")
    try:
        req = _QuickCheckRequest(**body)
    except Exception as exc:  # pydantic.ValidationError
        raise HTTPException(status_code=422, detail=str(exc)) from exc
    try:
        return JSONResponse(_quick_check_response(req.text, req.framework, req.share, base_url))
    except ValueError as exc:
        raise HTTPException(status_code=422, detail=str(exc)) from exc


@app.get("/quick-check/frameworks")
async def quick_check_frameworks() -> dict[str, list[str]]:
    """Return the list of frameworks accepted by ``POST /quick-check``."""
    return {"frameworks": sorted(_QC_FRAMEWORKS) + ["auto"]}


@app.get("/r/{share_hash}")
async def quick_check_share(share_hash: str) -> JSONResponse:
    """Resolve a shareable permalink to its stored quick-check payload.

    Returns the same ``result`` dict produced by :pyfunc:`run_quick_check`,
    plus the ``share_hash``. 404 if the hash has been evicted, was never
    stored, or is malformed.
    """
    if not _qc_is_valid_share_hash(share_hash):
        raise HTTPException(status_code=400, detail="malformed share hash")
    payload = _quick_check_store.get(share_hash)
    if payload is None:
        raise HTTPException(status_code=404, detail="share not found")
    return JSONResponse({"share_hash": share_hash, "result": payload})


# ── P1-A · P1-C — Clause-level remediation + financial exposure ──────────────


@app.get("/r/{share_hash}/remediation")
async def quick_check_remediation(share_hash: str) -> JSONResponse:
    """Return per-clause redline + suggested fix + USD exposure band.

    One entry per missing clause from the stored quick-check result. Each
    entry includes the issue (what is wrong), an ``original`` representative
    phrase, a drafted ``suggested_fix`` the user can paste in, the
    ``risk_level``, and a financial exposure range
    (``dollar_low_usd`` / ``dollar_high_usd``). The envelope carries the
    aggregate exposure across all failing clauses.

    404 if the share hash is unknown; 400 if malformed.
    """
    if not _qc_is_valid_share_hash(share_hash):
        raise HTTPException(status_code=400, detail="malformed share hash")
    payload = _quick_check_store.get(share_hash)
    if payload is None:
        raise HTTPException(status_code=404, detail="share not found")
    from squash.clause_remediation import build_remediation
    report = build_remediation(payload.get("missing") or [])
    body = report.to_dict()
    body["share_hash"] = share_hash
    body["framework"] = payload.get("framework", "")
    body["verdict"]   = payload.get("verdict", "")
    return JSONResponse(body)


# ── P1-B — Audit trail / scan history ────────────────────────────────────────


@app.get("/history")
async def quick_check_history(
    limit: int = 20,
    offset: int = 0,
    framework: str | None = None,
    verdict: str | None = None,
    sparkline: bool = True,
    sparkline_points: int = 24,
    sparkline_bucket_seconds: int = 3600,
) -> JSONResponse:
    """Return the most recent quick-check scans, newest first.

    Optional filters: ``framework`` (gdpr/ccpa/eu-ai-act/general/soc2) and
    ``verdict`` (pass/warn/fail). Pagination via ``limit`` (max 500) +
    ``offset``. When ``sparkline=true`` (default) the response also
    includes a pass-rate sparkline aligned to the configured bucket size,
    suitable for direct rendering as an SVG polyline.
    """
    if limit < 1 or limit > 500:
        raise HTTPException(status_code=400, detail="limit must be in 1..500")
    if offset < 0:
        raise HTTPException(status_code=400, detail="offset must be >= 0")
    if verdict and verdict not in {"pass", "warn", "fail"}:
        raise HTTPException(
            status_code=400, detail="verdict must be one of pass / warn / fail",
        )
    from squash.scan_history import global_history
    h = global_history()
    try:
        records = h.list(
            limit=limit, offset=offset,
            framework=framework, verdict=verdict,
        )
        total = h.count(framework=framework, verdict=verdict)
        body: dict[str, Any] = {
            "total": total,
            "limit": limit,
            "offset": offset,
            "entries": [r.to_dict() for r in records],
        }
        if sparkline:
            if sparkline_points < 1 or sparkline_points > 200:
                raise HTTPException(
                    status_code=400, detail="sparkline_points must be in 1..200",
                )
            if sparkline_bucket_seconds < 60:
                raise HTTPException(
                    status_code=400, detail="sparkline_bucket_seconds must be >= 60",
                )
            body["pass_rate_sparkline"] = h.pass_rate_sparkline(
                points=sparkline_points,
                bucket_seconds=sparkline_bucket_seconds,
                framework=framework,
            )
            body["stats"] = h.stats()
        return JSONResponse(body)
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc


# ── Sprint 30: viral SVG score card + trending feed ──────────────────────────

# SVG palette is derived from the demo UI:
#   bg      #06060f  — same near-black panel
#   ink     #f4f4ff  — primary text
#   purple  #7c3aed  — Konjo accent / scan line
#   pass    #4ade80  — green
#   warn    #fbbf24  — amber
#   fail    #f43f5e  — pink-red

_CARD_PALETTE = {
    "bg":     "#06060f",
    "panel":  "#0d0d1c",
    "ink":    "#f4f4ff",
    "ink2":   "#a4a4c8",
    "ink3":   "#5b5b80",
    "purple": "#7c3aed",
    "pass":   "#4ade80",
    "warn":   "#fbbf24",
    "fail":   "#f43f5e",
}

_VERDICT_GLYPH = {"pass": "✦", "warn": "△", "fail": "✗"}


def _verdict_color(verdict: str) -> str:
    """Map a verdict string to its palette colour. Defaults to fail."""
    return _CARD_PALETTE.get(verdict, _CARD_PALETTE["fail"])


def _render_card_svg(
    *,
    verdict: str,
    score: int,
    framework: str,
    policy_type: str,
    sub_scores: dict[str, dict[str, Any]],
    timestamp: str,
    share_hash: str,
) -> str:
    """Render a 600×340 SVG score card. Pure stdlib — no svgwrite dependency.

    The card is the unit-of-share for viral propagation: it embeds verdict +
    score + GDPR/CCPA/SOC2 sub-scores + timestamp + share_hash in a single
    standalone SVG that renders identically anywhere (Slack unfurl, Twitter
    card, GitHub README, blog embed). All text is XML-escaped.
    """
    glyph = _VERDICT_GLYPH.get(verdict, _VERDICT_GLYPH["fail"])
    accent = _verdict_color(verdict)
    p = _CARD_PALETTE
    pretty_type = policy_type.replace("_", " ")

    # Sub-score chips: x positions are evenly spaced across the lower half.
    chips = []
    chip_x = [60, 235, 410]
    for i, (label, key) in enumerate(
        (("GDPR", "gdpr"), ("CCPA", "ccpa"), ("SOC 2", "soc2"))
    ):
        info = sub_scores.get(key)
        if info is None:
            sc, vd = 0, "fail"
        else:
            sc = int(info.get("score", 0))
            vd = str(info.get("verdict", "fail"))
        col = _verdict_color(vd)
        x = chip_x[i]
        # Each chip: rounded box + label + numeric score + verdict bar.
        chips.append(
            f'<g transform="translate({x},220)">'
            f'<rect width="155" height="80" rx="14" fill="{p["panel"]}" '
            f'stroke="{p["ink3"]}" stroke-width="1"/>'
            f'<text x="20" y="30" font-family="ui-sans-serif,system-ui,sans-serif" '
            f'font-size="13" fill="{p["ink2"]}" letter-spacing="2">{label}</text>'
            f'<text x="20" y="62" font-family="ui-sans-serif,system-ui,sans-serif" '
            f'font-size="30" font-weight="700" fill="{p["ink"]}">{sc}</text>'
            f'<rect x="20" y="68" width="115" height="3" rx="1.5" fill="{p["ink3"]}"/>'
            f'<rect x="20" y="68" width="{int(115 * sc / 100)}" height="3" rx="1.5" fill="{col}"/>'
            f"</g>"
        )

    chips_svg = "".join(chips)

    # Verdict label (centered under glyph).
    verdict_text = verdict.upper()

    return (
        f'<svg xmlns="http://www.w3.org/2000/svg" width="600" height="340" '
        f'viewBox="0 0 600 340" role="img" '
        f'aria-label="Squash compliance card — {verdict_text} {score}/100 ({framework})">'
        # background
        f'<rect width="600" height="340" rx="20" fill="{p["bg"]}"/>'
        # subtle dot grid
        f'<defs><pattern id="dots-{share_hash}" x="0" y="0" width="20" height="20" '
        f'patternUnits="userSpaceOnUse"><circle cx="2" cy="2" r="1" '
        f'fill="{p["purple"]}" fill-opacity="0.08"/></pattern></defs>'
        f'<rect width="600" height="340" rx="20" fill="url(#dots-{share_hash})"/>'
        # accent ring around verdict glyph
        f'<circle cx="80" cy="100" r="46" fill="{p["panel"]}" '
        f'stroke="{accent}" stroke-width="2"/>'
        f'<circle cx="80" cy="100" r="56" fill="none" '
        f'stroke="{accent}" stroke-width="1" stroke-opacity="0.25"/>'
        # verdict glyph
        f'<text x="80" y="120" text-anchor="middle" font-size="58" '
        f'fill="{accent}" font-family="ui-sans-serif,system-ui,sans-serif">{glyph}</text>'
        # main title
        f'<text x="160" y="80" font-family="ui-sans-serif,system-ui,sans-serif" '
        f'font-size="14" letter-spacing="3" fill="{p["ink3"]}">SQUASH · COMPLIANCE</text>'
        f'<text x="160" y="118" font-family="ui-sans-serif,system-ui,sans-serif" '
        f'font-size="42" font-weight="700" fill="{p["ink"]}">{verdict_text}</text>'
        f'<text x="160" y="148" font-family="ui-sans-serif,system-ui,sans-serif" '
        f'font-size="14" fill="{p["ink2"]}">{score}/100 · {_xml_escape(framework)} '
        f'· {_xml_escape(pretty_type)}</text>'
        # sub-score row
        f"{chips_svg}"
        # footer: timestamp + share hash
        f'<text x="60" y="320" font-family="ui-monospace,SFMono-Regular,Menlo,monospace" '
        f'font-size="11" fill="{p["ink3"]}">{_xml_escape(timestamp)} · '
        f'r/{_xml_escape(share_hash)}</text>'
        f'<text x="540" y="320" text-anchor="end" font-family="ui-sans-serif,system-ui,sans-serif" '
        f'font-size="11" fill="{p["purple"]}">getsquash.dev</text>'
        f"</svg>"
    )


def _xml_escape(value: str) -> str:
    """Minimal XML-attribute-safe escape for SVG text content."""
    return (
        value.replace("&", "&amp;")
        .replace("<", "&lt;")
        .replace(">", "&gt;")
        .replace('"', "&quot;")
    )


@app.get("/r/{share_hash}/card.svg")
async def quick_check_card(share_hash: str) -> Response:
    """Render a viral SVG score card for a stored quick-check result.

    The card shows the verdict glyph, primary score and framework, three
    sub-scores (GDPR / CCPA / SOC 2) computed against the same input text,
    and the share hash + timestamp. 404 when the hash has been evicted,
    400 on a malformed hash.
    """
    if not _qc_is_valid_share_hash(share_hash):
        raise HTTPException(status_code=400, detail="malformed share hash")
    payload = _quick_check_store.get(share_hash)
    if payload is None:
        raise HTTPException(status_code=404, detail="share not found")

    sub_scores: dict[str, dict[str, Any]] = {}
    primary_fw = str(payload.get("framework", ""))
    primary_score = int(payload.get("score", 0))
    primary_verdict = str(payload.get("verdict", "fail"))

    # We can't recover the original text from the share payload — by design.
    # So sub_scores are reconstructed from the matched/missing clause lists
    # for the primary framework, and we re-derive companion frameworks from
    # whether the primary already covers them. For non-primary frameworks
    # without a re-runnable input, we report the primary as a hint.
    matched = list(payload.get("matched") or [])
    missing = list(payload.get("missing") or [])

    def _scaled(prefix: str) -> dict[str, Any]:
        """Approximate a framework-specific score from the stored matched/missing
        lists, when the matched clause IDs share a known prefix (e.g. ``GDPR-``,
        ``CCPA-``, ``SOC2-``). For non-matching primary, return the primary
        score so the card still renders something meaningful.
        """
        all_ids = list(matched) + [m.get("id", "") for m in missing if isinstance(m, dict)]
        relevant = [i for i in all_ids if isinstance(i, str) and i.upper().startswith(prefix)]
        if not relevant:
            return {"score": primary_score, "verdict": primary_verdict}
        ok = sum(1 for i in matched if isinstance(i, str) and i.upper().startswith(prefix))
        score = round(100 * ok / len(relevant))
        if score >= 80:
            verdict = "pass"
        elif score >= 50:
            verdict = "warn"
        else:
            verdict = "fail"
        return {"score": score, "verdict": verdict}

    for key, prefix in (("gdpr", "GDPR-"), ("ccpa", "CCPA-"), ("soc2", "SOC2-")):
        if primary_fw == key:
            sub_scores[key] = {"score": primary_score, "verdict": primary_verdict}
        else:
            sub_scores[key] = _scaled(prefix)

    timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
    svg = _render_card_svg(
        verdict=primary_verdict,
        score=primary_score,
        framework=primary_fw or "unknown",
        policy_type=str(payload.get("policy_type", "other")),
        sub_scores=sub_scores,
        timestamp=timestamp,
        share_hash=share_hash,
    )
    return Response(
        content=svg,
        media_type="image/svg+xml",
        headers={
            "Cache-Control": "public, max-age=300",
            "X-Squash-Share": share_hash,
        },
    )


@app.get("/trending")
async def trending(top: int = 5) -> JSONResponse:
    """Public viral feed — top *top* policy types and their pass rates.

    Returns ``{"total": int, "top": [{"policy_type", "count", "pass",
    "warn", "fail", "pass_rate"}, ...]}``. ``top`` is clamped to ``[1, 20]``.
    """
    top = max(1, min(20, int(top)))
    feed = _qc_global_stats().trending(top=top)
    return JSONResponse({
        "policy_types": list(_QC_POLICY_TYPES),
        **feed,
    })


# ── Sprint 29 W258/W259: Static demo page + sample policies + HTML share view ─

# All static demo files live under demo/ at the repo root, sibling to squash/.
_DEMO_ROOT = (Path(__file__).resolve().parent.parent / "demo").resolve()
_SAMPLE_POLICIES_DIR = (_DEMO_ROOT / "sample_policies").resolve()
# Allowlist of sample policy filenames — pinned to prevent path traversal even
# if symlinks are added to the directory later. Must match files on disk.
_SAMPLE_POLICY_ALLOWLIST = frozenset({
    "01_privacy_policy.txt",
    "02_terms_of_service.txt",
    "03_gdpr_dpa.txt",
    "04_ccpa_notice.txt",
    "05_cookie_policy.txt",
    "README.md",
})


def _serve_demo_html(filename: str) -> HTMLResponse:
    """Serve a demo HTML file with no-cache headers — the demo iterates fast."""
    target = (_DEMO_ROOT / filename).resolve()
    # Path traversal guard — resolved target must remain inside _DEMO_ROOT.
    if not str(target).startswith(str(_DEMO_ROOT) + os.sep) and target != _DEMO_ROOT:
        raise HTTPException(status_code=404, detail="not found")
    if not target.is_file():
        raise HTTPException(status_code=404, detail="not found")
    return HTMLResponse(
        content=target.read_text(encoding="utf-8"),
        headers={"Cache-Control": "no-cache, max-age=0"},
    )


@app.get("/demo")
@app.get("/demo/")
async def demo_index() -> HTMLResponse:
    """Serve the interactive demo landing page (Sprint 29)."""
    return _serve_demo_html("index.html")


@app.get("/demo/sample_policies/{name}")
async def demo_sample_policy(name: str) -> PlainTextResponse:
    """Serve one of the curated sample policies bundled with the demo.

    The set is allowlisted (no glob, no path traversal). Returns 404 for
    anything that isn't on the explicit list. Plain text content type so
    the textarea on /demo can drop it in untouched.
    """
    if name not in _SAMPLE_POLICY_ALLOWLIST:
        raise HTTPException(status_code=404, detail="unknown sample")
    target = (_SAMPLE_POLICIES_DIR / name).resolve()
    if not str(target).startswith(str(_SAMPLE_POLICIES_DIR) + os.sep):
        raise HTTPException(status_code=404, detail="not found")
    if not target.is_file():
        raise HTTPException(status_code=404, detail="not found")
    return PlainTextResponse(
        content=target.read_text(encoding="utf-8"),
        headers={"Cache-Control": "public, max-age=300"},
    )


@app.get("/share/{share_hash}")
async def share_html(share_hash: str) -> HTMLResponse:
    """Browser-friendly HTML view of a shared quick-check result.

    The page is a thin shell — it loads result.html and the client-side
    script fetches the JSON from /r/{hash}. We validate the hash here so
    bad URLs surface 404 instantly without a wasted client roundtrip.
    """
    if not _qc_is_valid_share_hash(share_hash):
        raise HTTPException(status_code=400, detail="malformed share hash")
    if _quick_check_store.get(share_hash) is None:
        raise HTTPException(status_code=404, detail="share not found")
    return _serve_demo_html("result.html")


@app.post("/attest")
async def attest(req: AttestRequest, request: Request) -> JSONResponse:
    """Run the full attestation pipeline for a model artifact.

    This is the primary endpoint for CI/CD integrations.  The operation is
    CPU-bound (file hashing, scanning) so it runs in a thread-pool executor.

    Returns the squash-attest.json master record plus paths to all generated
    artifacts.
    """
    # W142: quota enforcement before any compute
    key_rec: KeyRecord | None = getattr(request.state, "key_record", None)
    if key_rec is not None:
        from squash.quota import QuotaEnforcer
        enforcer = QuotaEnforcer(get_key_store())
        quota_result = enforcer.check(key_rec)
        if not quota_result.allowed:
            return JSONResponse(
                {
                    "detail": "Monthly attestation quota exceeded",
                    "quota_used": quota_result.used,
                    "quota_limit": quota_result.limit,
                    "plan": key_rec.plan,
                },
                status_code=429,
                headers=quota_result.headers,
            )

    _require_path(req.model_path)

    # Merge spdx_datasets with training_dataset_ids (deduplicated, order preserved).
    all_datasets: list[str] = list(req.training_dataset_ids) + [
        d for d in req.spdx_datasets if d not in req.training_dataset_ids
    ]

    # Construct SpdxOptions when any SPDX enrichment field is populated.
    spdx_options = None
    if any([req.spdx_type, req.spdx_safety_risk, req.spdx_training_info,
            req.spdx_sensitive_data, req.spdx_datasets]):
        from squash.spdx_builder import SpdxOptions
        spdx_options = SpdxOptions(
            type_of_model=req.spdx_type or "text-generation",
            safety_risk_assessment=req.spdx_safety_risk or "unspecified",
            dataset_ids=all_datasets,
            information_about_training=req.spdx_training_info or "see-model-card",
            sensitive_personal_information=req.spdx_sensitive_data or "absent",
        )

    config = AttestConfig(
        model_path=Path(req.model_path),
        output_dir=Path(req.output_dir) if req.output_dir else None,
        model_id=req.model_id,
        hf_repo=req.hf_repo,
        model_family=req.model_family,
        quant_format=req.quant_format,
        policies=req.policies,
        sign=req.sign,
        offline=req.offline,
        local_signing_key=Path(req.local_signing_key) if req.local_signing_key else None,
        fail_on_violation=False,  # never raise from HTTP handler; return 422 instead
        skip_scan=req.skip_scan,
        training_dataset_ids=all_datasets,
        vex_feed_path=Path(req.vex_feed_path) if req.vex_feed_path else None,
        vex_feed_url=req.vex_feed_url,
        spdx_options=spdx_options,
    )

    loop = asyncio.get_running_loop()
    result: AttestResult = await loop.run_in_executor(
        _executor, AttestPipeline.run, config
    )

    master_data = _result_to_dict(result)

    _COUNTERS["squash_attest_total"] += 1

    # W142: consume quota after a successful attestation
    if key_rec is not None:
        from squash.quota import QuotaEnforcer
        QuotaEnforcer(get_key_store()).consume(key_rec.key_id)

    # ── Auto-register in cloud inventory when tenant_id is provided ───────────
    if req.tenant_id:
        _ts = _ts_now()
        _rec: dict[str, Any] = {
            "model_id": result.model_id or req.model_id,
            "model_path": req.model_path,
            "bom_path": str(result.cyclonedx_path) if result.cyclonedx_path else "",
            "attestation_passed": result.passed,
            "policy_results": master_data.get("policy_results", {}),
            "vex_cves": [],
            "timestamp": _ts,
            "record_id": str(uuid.uuid4()),
        }
        _db_write_inventory(req.tenant_id, _rec)
        for policy_name, pr in _rec["policy_results"].items():
            _db_inc_policy_stat(req.tenant_id, policy_name, passed=bool(pr.get("passed")))

    if req.fail_on_violation and not result.passed:
        return JSONResponse(status_code=422, content=master_data)

    return JSONResponse(content=master_data)


@app.post("/scan")
async def scan(req: ScanRequest) -> JSONResponse:
    """Queue a security scan job.  Returns ``{"job_id": "..."}`` immediately.

    The scan runs asynchronously in the thread-pool executor.  Poll
    ``GET /scan/{job_id}`` for results.

    Why async queuing?  Scanning a large GGUF or ONNX model can take several
    seconds.  Blocking the HTTP response for that duration breaks load-balanced
    deployments.  The queue is in-memory and bounded to
    ``SQUASH_SCAN_JOB_LIMIT`` entries (default 1000); the oldest job is evicted
    when the limit is hit.
    """
    _require_path(req.model_path)
    model_path = Path(req.model_path)
    scan_dir = model_path if model_path.is_dir() else model_path.parent

    job_id = str(uuid.uuid4())

    # Evict oldest if at capacity
    if len(_scan_jobs) >= _SCAN_JOB_LIMIT:
        _scan_jobs.popitem(last=False)

    _scan_jobs[job_id] = {"status": "pending", "result": None}

    loop = asyncio.get_running_loop()

    def _do_scan() -> None:
        try:
            result = ModelScanner.scan_directory(scan_dir)
            payload = {
                "status": result.status,
                "is_safe": result.is_safe,
                "critical": result.critical_count,
                "high": result.high_count,
                "findings": [
                    {
                        "severity": f.severity,
                        "id": f.finding_id,
                        "title": f.title,
                        "detail": f.detail,
                        "file": f.file_path,
                        "cve": f.cve,
                    }
                    for f in result.findings
                ],
            }
            _scan_jobs[job_id] = {"status": "done", "result": payload}
        except Exception as exc:  # noqa: BLE001
            log.warning("scan job %s failed: %s", job_id, exc)
            _scan_jobs[job_id] = {"status": "error", "result": {"error": str(exc)}}

    _COUNTERS["squash_scan_total"] += 1
    loop.run_in_executor(_executor, _do_scan)
    return JSONResponse(status_code=202, content={"job_id": job_id})


@app.get("/scan/{job_id}")
async def scan_result(job_id: str) -> JSONResponse:
    """Return the result of a previously queued scan job.

    Responses:
    - ``200``  ``{"status": "done",    "result": {...}}``
    - ``200``  ``{"status": "error",   "result": {"error": "..."}}``
    - ``202``  ``{"status": "pending"}``  — scan still running
    - ``404``  job_id unknown
    """
    job = _scan_jobs.get(job_id)
    if job is None:
        raise HTTPException(status_code=404, detail=f"Unknown job_id: {job_id}")

    if job["status"] == "pending":
        return JSONResponse(status_code=202, content={"status": "pending"})

    return JSONResponse(content={"status": job["status"], "result": job["result"]})


@app.get("/scan/{job_id}/sarif")
async def scan_result_sarif(job_id: str) -> JSONResponse:
    """Return the SARIF 2.1.0 representation of a completed scan job.

    Responses:
    - ``200``  SARIF document (``application/json``)
    - ``202``  scan still pending
    - ``404``  job_id unknown
    - ``400``  job ended with an error (no scan result to convert)
    """
    from squash.sarif import SarifBuilder  # noqa: PLC0415

    job = _scan_jobs.get(job_id)
    if job is None:
        raise HTTPException(status_code=404, detail=f"Unknown job_id: {job_id}")
    if job["status"] == "pending":
        return JSONResponse(status_code=202, content={"status": "pending"})
    if job["status"] == "error":
        raise HTTPException(
            status_code=400,
            detail=f"Scan job {job_id} ended with an error: {job['result'].get('error', 'unknown')}",
        )
    sarif = SarifBuilder.from_payload(job["result"])
    return JSONResponse(content=sarif)


@app.post("/policy/evaluate")
async def evaluate_policy(req: PolicyEvaluateRequest) -> JSONResponse:
    """Evaluate a submitted CycloneDX BOM against a named policy template or custom rules.

    - When ``custom_rules`` is provided, evaluates the ad-hoc rule list.
      The ``policy`` field is used only as the label in the response.
    - When ``custom_rules`` is absent, evaluates the named built-in policy.

    The SBOM is posted as a JSON body; no files are read from disk.
    """
    if req.custom_rules is not None:
        # Validate custom rules before running
        errors = PolicyRegistry.validate_rules(req.custom_rules)
        if errors:
            raise HTTPException(
                status_code=400,
                detail={"message": "Invalid custom rules", "errors": errors},
            )
        policy_result = PolicyEngine.evaluate_custom(
            req.sbom,
            req.custom_rules,
            policy_name=req.policy or "custom",
        )
    else:
        if req.policy not in AVAILABLE_POLICIES:
            raise HTTPException(
                status_code=400,
                detail=f"Unknown policy '{req.policy}'. Available: {sorted(AVAILABLE_POLICIES)}",
            )
        policy_result = PolicyEngine.evaluate(req.sbom, req.policy)

    _COUNTERS["squash_policy_evaluate_total"] += 1
    if not policy_result.passed:
        _COUNTERS["squash_policy_violations_total"] += policy_result.error_count
    status_code = 200 if policy_result.passed else 422
    return JSONResponse(
        status_code=status_code,
        content={
            "policy": req.policy,
            "passed": policy_result.passed,
            "error_count": policy_result.error_count,
            "warning_count": policy_result.warning_count,
            "pass_count": policy_result.pass_count,
            "findings": [
                {
                    "rule_id": f.rule_id,
                    "severity": f.severity,
                    "passed": f.passed,
                    "field": f.field,
                    "rationale": f.rationale,
                    "remediation": f.remediation,
                    "remediation_link": f.remediation_link,
                    "actual_value": str(f.actual_value) if f.actual_value else None,
                }
                for f in policy_result.findings
            ],
        },
    )


@app.post("/vex/evaluate")
async def evaluate_vex(req: VexEvaluateRequest) -> JSONResponse:
    """Evaluate a VEX feed against a single model's SBOM.

    Loads the CycloneDX BOM from disk, constructs a single-model inventory,
    and runs VexEvaluator.  Returns a VEX report with affected CVEs.
    """
    try:
        from squash.vex import (
            VexFeed,
            VexEvaluator,
            ModelInventory,
            ModelInventoryEntry,
        )
    except ImportError:
        raise HTTPException(500, "VEX engine not available (import error)")

    _require_path(req.sbom_path)
    bom_path = Path(req.sbom_path)

    try:
        bom: dict = json.loads(bom_path.read_text())
    except OSError as e:
        raise HTTPException(400, f"Cannot read SBOM: {e}") from e

    if not req.vex_feed_path and not req.vex_feed_url:
        raise HTTPException(400, "Provide either vex_feed_path or vex_feed_url")

    loop = asyncio.get_running_loop()

    def _run_vex() -> dict:
        feed = (
            VexFeed.from_directory(Path(req.vex_feed_path))
            if req.vex_feed_path
            else VexFeed.from_url(req.vex_feed_url)
        )
        purl = bom.get("components", [{}])[0].get("purl", "")
        hashes = bom.get("components", [{}])[0].get("hashes", [])
        sha256 = next(
            (h["content"] for h in hashes if h.get("alg") == "SHA-256"), ""
        )
        inv = ModelInventory(
            entries=[
                ModelInventoryEntry(
                    model_id=bom_path.stem,
                    purl=purl,
                    sbom_path=bom_path,
                    composite_sha256=sha256,
                )
            ]
        )
        report = VexEvaluator.evaluate(feed, inv)
        return report.to_dict()

    _COUNTERS["squash_vex_evaluate_total"] += 1
    report_dict = await loop.run_in_executor(_executor, _run_vex)
    return JSONResponse(content=report_dict)


@app.get("/vex/status")
async def vex_status() -> JSONResponse:
    """Return metadata about the local VEX feed cache.

    Reads the on-disk manifest — no network I/O.  Returns 200 in all cases;
    callers should check the ``empty`` field before relying on other fields.
    """
    try:
        from squash.vex import VexCache  # noqa: PLC0415
    except ImportError:
        raise HTTPException(500, "VEX engine not available (import error)")

    cache = VexCache()
    manifest = cache.manifest()

    _COUNTERS["squash_vex_status_total"] += 1

    if not manifest:
        return JSONResponse(content={"empty": True})

    return JSONResponse(
        content={
            "empty": False,
            "url": manifest.get("url", ""),
            "last_fetched": manifest.get("last_fetched", ""),
            "statement_count": manifest.get("statement_count", 0),
            "stale": cache.is_stale(),
        }
    )


@app.post("/vex/update")
async def vex_update(req: VexUpdateRequest) -> JSONResponse:
    """Force-refresh the local VEX feed cache from a remote URL.

    Equivalent to ``squash vex update``.  The URL resolves via:
    1. ``req.url`` if provided
    2. ``$SQUASH_VEX_URL`` environment variable
    3. :attr:`VexCache.DEFAULT_URL` (canonical community feed)

    Returns 200 on success, 502 on network failure.
    """
    try:
        from squash.vex import VexCache  # noqa: PLC0415
    except ImportError:
        raise HTTPException(500, "VEX engine not available (import error)")

    url = req.url or os.environ.get("SQUASH_VEX_URL", VexCache.DEFAULT_URL)

    loop = asyncio.get_running_loop()

    def _do_update() -> dict:
        cache = VexCache()
        feed = cache.load_or_fetch(url, timeout=req.timeout, force=True)
        return {
            "url": url,
            "statement_count": sum(len(d.statements) for d in feed.documents),
            "updated": True,
        }

    try:
        _COUNTERS["squash_vex_update_total"] += 1
        result = await loop.run_in_executor(_executor, _do_update)
    except Exception as exc:
        raise HTTPException(status_code=502, detail=str(exc)) from exc
    return JSONResponse(content=result)


@app.post("/sbom/diff")
async def sbom_diff(req: SbomDiffRequest) -> JSONResponse:
    """Compare two CycloneDX BOMs and return a diff summary."""
    from squash.sbom_builder import SbomDiff

    _require_path(req.sbom_a_path)
    _require_path(req.sbom_b_path)

    def _run() -> dict:
        with open(req.sbom_a_path) as fh:
            bom_a = json.load(fh)
        with open(req.sbom_b_path) as fh:
            bom_b = json.load(fh)
        diff = SbomDiff.compare(bom_a, bom_b)
        return {
            "hash_changed": diff.hash_changed,
            "score_delta": diff.score_delta,
            "policy_status_changed": diff.policy_status_changed,
            "new_findings": diff.new_findings,
            "resolved_findings": diff.resolved_findings,
            "metadata_changes": {k: list(v) for k, v in diff.metadata_changes.items()},
            "has_regressions": diff.has_regressions,
        }

    _COUNTERS["squash_sbom_diff_total"] += 1
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(_executor, _run)
    return JSONResponse(content=result)


@app.post("/attest/verify")
async def attest_verify(req: VerifyRequest) -> JSONResponse:
    """Verify the Sigstore bundle for a model's CycloneDX BOM.

    Returns ``{"verified": bool, "skipped": bool, "reason": str}``.
    """
    from squash.oms_signer import OmsVerifier

    model_path = Path(req.model_path)
    if not model_path.exists():
        raise HTTPException(status_code=404, detail=f"Model path not found: {req.model_path}")

    bom_path = model_path / "cyclonedx-mlbom.json" if model_path.is_dir() else model_path
    if not bom_path.exists():
        raise HTTPException(
            status_code=404,
            detail=f"CycloneDX BOM not found: {bom_path}",
        )

    bundle_path = Path(req.bundle_path) if req.bundle_path else None

    def _run() -> dict:
        res = OmsVerifier.verify(bom_path, bundle_path)
        if res is None:
            if req.strict:
                return {"verified": False, "skipped": False, "reason": "no bundle (strict)"}
            return {"verified": False, "skipped": True, "reason": "no bundle found"}
        if res:
            return {"verified": True, "skipped": False, "reason": ""}
        return {"verified": False, "skipped": False, "reason": "bundle verification failed"}

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


@app.get("/report")
async def get_report(model_path: str) -> Any:
    """Generate a human-readable HTML compliance report for a model.

    Query parameter: ``model_path`` — absolute path to the model directory.
    Returns ``text/html``.
    """
    from fastapi.responses import HTMLResponse
    from squash.report import ComplianceReporter

    if not Path(model_path).exists():
        raise HTTPException(status_code=404, detail=f"Model path not found: {model_path}")

    def _run() -> str:
        return ComplianceReporter.generate_html(Path(model_path))

    loop = asyncio.get_running_loop()
    html = await loop.run_in_executor(_executor, _run)
    return HTMLResponse(content=html)


@app.post("/webhooks/test")
async def webhooks_test(req: WebhookTestRequest) -> JSONResponse:
    """Send a synthetic test event to the configured webhook URL."""
    from squash.policy import PolicyWebhook

    url = req.webhook_url or os.environ.get("SQUASH_WEBHOOK_URL", "")
    if not url:
        raise HTTPException(
            status_code=400,
            detail="No webhook URL provided; set SQUASH_WEBHOOK_URL or pass webhook_url",
        )

    def _run() -> dict:
        fired = PolicyWebhook.notify_raw(
            {
                "event": "squash_webhook_test",
                "message": "This is a test event from Squash.",
            },
            url,
        )
        return {"sent": fired, "url": url}

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


@app.post("/attest/composed")
async def attest_composed(req: ComposedAttestRequest) -> JSONResponse:
    """Run composite multi-model attestation and return the parent BOM path."""
    from squash.attest import CompositeAttestConfig, CompositeAttestPipeline

    for mp in req.model_paths:
        if not Path(mp).exists():
            raise HTTPException(status_code=404, detail=f"Model path not found: {mp}")

    def _run() -> dict:
        cfg = CompositeAttestConfig(
            model_paths=[Path(p) for p in req.model_paths],
            output_dir=Path(req.output_dir) if req.output_dir else None,
            policies=req.policies,
            sign=req.sign,
        )
        result = CompositeAttestPipeline.run(cfg)
        return {
            "passed": result.passed,
            "component_count": len(result.component_results),
            "parent_bom_path": str(result.parent_bom_path) if result.parent_bom_path else None,
            "output_dir": str(result.output_dir),
            "error": result.error,
        }

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


@app.post("/sbom/push")
async def sbom_push(req: PushRequest) -> JSONResponse:
    """Push a CycloneDX BOM to an SBOM registry (Dependency-Track, GUAC, or squash)."""
    from squash.sbom_builder import SbomRegistry

    model_path = Path(req.model_path)
    if not model_path.exists():
        raise HTTPException(status_code=404, detail=f"Model path not found: {req.model_path}")

    bom_path = model_path / "cyclonedx-mlbom.json"
    if not bom_path.exists():
        raise HTTPException(status_code=404, detail=f"CycloneDX BOM not found: {bom_path}")

    valid_types = {"dtrack", "guac", "squash"}
    if req.registry_type not in valid_types:
        raise HTTPException(
            status_code=400,
            detail=f"registry_type must be one of {sorted(valid_types)}",
        )

    def _run() -> dict:
        if req.registry_type == "dtrack":
            url = SbomRegistry.push_dtrack(bom_path, req.registry_url, req.api_key)
        elif req.registry_type == "guac":
            url = SbomRegistry.push_guac(bom_path, req.registry_url)
        else:
            url = SbomRegistry.push_squash(bom_path, req.registry_url, req.api_key)
        return {"pushed": True, "registry_url": url, "bom_path": str(bom_path)}

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


# ── Wave 20 — NTIA minimum elements validation ────────────────────────────────

class NtiaRequest(BaseModel):
    bom_path: str
    strict: bool = False


@app.post("/ntia/validate")
async def ntia_validate(req: NtiaRequest) -> JSONResponse:
    """Validate NTIA minimum elements in a CycloneDX BOM.

    Returns ``passed``, ``completeness_score`` (0–1), ``missing_fields``,
    and ``present_fields``.
    """
    _require_path(req.bom_path)
    from squash.policy import NtiaValidator

    def _run() -> dict:
        result = NtiaValidator.check(Path(req.bom_path), strict=req.strict)
        return {
            "passed": result.passed,
            "completeness_score": result.completeness_score,
            "missing_fields": result.missing_fields,
            "present_fields": result.present_fields,
            "bom_path": str(result.bom_path) if result.bom_path else req.bom_path,
        }

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


# ── Wave 21 — SLSA provenance attestation ─────────────────────────────────────

class SlsaAttestRequest(BaseModel):
    model_dir: str
    level: int = 1
    builder_id: str = "https://squish.local/squash/builder"
    sign: bool = False


@app.post("/slsa/attest")
async def slsa_attest(req: SlsaAttestRequest) -> JSONResponse:
    """Generate a SLSA 1.0 provenance statement for the given model directory.

    Returns ``output_path``, ``level``, ``subject_sha256``, and ``subject_name``.
    """
    _require_path(req.model_dir)
    from squash.slsa import SlsaLevel, SlsaProvenanceBuilder

    def _run() -> dict:
        level = SlsaLevel(req.level)
        attest = SlsaProvenanceBuilder.build(
            Path(req.model_dir),
            level=level,
            builder_id=req.builder_id,
        )
        return {
            "output_path": str(attest.output_path),
            "level": attest.level.value,
            "subject_name": attest.subject_name,
            "subject_sha256": attest.subject_sha256,
            "signed": req.level >= 2,
            "invocation_id": attest.invocation_id,
        }

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


# ── Wave 22 — BOM merge ────────────────────────────────────────────────────────

class BomMergeRequest(BaseModel):
    bom_paths: list[str]
    output_path: str
    metadata: dict = {}  # noqa: RUF012


@app.post("/sbom/merge")
async def sbom_merge(req: BomMergeRequest) -> JSONResponse:
    """Merge multiple CycloneDX BOMs into one canonical BOM.

    Deduplicates by PURL and unions vulnerabilities.
    """
    for p in req.bom_paths:
        _require_path(p)
    from squash.sbom_builder import BomMerger

    def _run() -> dict:
        merged = BomMerger.merge(
            [Path(p) for p in req.bom_paths],
            Path(req.output_path),
            metadata=req.metadata or None,
        )
        return {
            "merged": True,
            "output_path": req.output_path,
            "component_count": len(merged.get("components", [])),
            "vulnerability_count": len(merged.get("vulnerabilities", [])),
        }

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


# ── Wave 23 — AI risk assessment ──────────────────────────────────────────────

class RiskAssessRequest(BaseModel):
    model_dir: str
    framework: str = "both"


@app.post("/risk/assess")
async def risk_assess(req: RiskAssessRequest) -> JSONResponse:
    """Assess AI risk per EU AI Act and/or NIST AI RMF.

    ``framework`` must be ``"eu-ai-act"``, ``"nist-rmf"``, or ``"both"``
    (default).
    """
    _require_path(req.model_dir)
    from squash.risk import AiRiskAssessor

    def _run() -> dict:
        bom_path = Path(req.model_dir) / "cyclonedx-mlbom.json"
        result: dict = {}
        if req.framework in ("eu-ai-act", "both"):
            eu = AiRiskAssessor.assess_eu_ai_act(bom_path)
            result["eu_ai_act"] = {
                "passed": eu.passed,
                "category": eu.category.value,
                "rationale": eu.rationale,
                "mitigation_required": eu.mitigation_required,
            }
        if req.framework in ("nist-rmf", "both"):
            rmf = AiRiskAssessor.assess_nist_rmf(bom_path)
            result["nist_rmf"] = {
                "passed": rmf.passed,
                "category": rmf.category.value,
                "rationale": rmf.rationale,
                "mitigation_required": rmf.mitigation_required,
            }
        return result

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


# ── Wave 24 — Drift monitoring ─────────────────────────────────────────────────

class MonitorSnapshotRequest(BaseModel):
    model_dir: str


class MonitorCompareRequest(BaseModel):
    model_dir: str
    baseline_snapshot: str


@app.post("/monitor/snapshot")
async def monitor_snapshot(req: MonitorSnapshotRequest) -> JSONResponse:
    """Return a SHA-256 snapshot fingerprint of *model_dir*."""
    _require_path(req.model_dir)
    from squash.governor import DriftMonitor

    def _run() -> dict:
        snap = DriftMonitor.snapshot(Path(req.model_dir))
        return {"snapshot": snap, "model_dir": req.model_dir}

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


@app.post("/monitor/compare")
async def monitor_compare(req: MonitorCompareRequest) -> JSONResponse:
    """Compare current *model_dir* state against *baseline_snapshot*.

    Returns a list of drift events.
    """
    _require_path(req.model_dir)
    from squash.governor import DriftMonitor

    def _run() -> dict:
        events = DriftMonitor.compare(Path(req.model_dir), req.baseline_snapshot)
        return {
            "drift_detected": len(events) > 0,
            "event_count": len(events),
            "events": [
                {
                    "event_type": e.event_type,
                    "component": e.component,
                    "old_value": e.old_value,
                    "new_value": e.new_value,
                    "detected_at": e.detected_at,
                }
                for e in events
            ],
        }

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


# ── Wave 25 — CI/CD integration ────────────────────────────────────────────────

class CiRunRequest(BaseModel):
    model_dir: str
    report_format: str = "text"


@app.post("/cicd/report")
async def cicd_report(req: CiRunRequest) -> JSONResponse:
    """Run the full squash check pipeline and return a CI report.

    Runs NTIA validation, AI risk assessment, and drift detection.
    """
    _require_path(req.model_dir)
    from squash.cicd import CicdAdapter

    def _run() -> dict:
        report = CicdAdapter.run_pipeline(
            Path(req.model_dir), report_format=req.report_format
        )
        return {
            "passed": report.passed,
            "env": {
                "env_name": report.env.env_name,
                "job_id": report.env.job_id,
                "repo": report.env.repo,
                "branch": report.env.branch,
            },
            "ntia": report.ntia,
            "risk": report.risk,
            "drift_events": report.drift_events,
            "summary": CicdAdapter.job_summary(report),
        }

    loop = asyncio.get_running_loop()
    return JSONResponse(content=await loop.run_in_executor(_executor, _run))


# ── Wave 30 — VEX publish + integration attestation REST endpoints ─────────────


class VexPublishRequest(BaseModel):
    """Request body for POST /vex/publish."""

    entries: list[dict] = Field(default_factory=list)
    author: str = Field(default="squash")
    doc_id: str | None = Field(default=None)


@app.post("/vex/publish")
async def vex_publish(req: VexPublishRequest) -> JSONResponse:
    """Generate an OpenVEX 0.2.0 document from a list of statement entries.

    Returns the full OpenVEX document JSON.  Validates the document before
    returning; responds with 422 if validation errors are found.
    """
    from squash.vex import VexFeedManifest

    def _run() -> dict:
        doc = VexFeedManifest.generate(
            req.entries,
            author=req.author,
            doc_id=req.doc_id or None,
        )
        errors = VexFeedManifest.validate(doc)
        if errors:
            raise ValueError("; ".join(errors))
        return doc

    loop = asyncio.get_running_loop()
    try:
        doc = await loop.run_in_executor(_executor, _run)
    except ValueError as exc:
        raise HTTPException(status_code=422, detail=str(exc)) from exc
    return JSONResponse(content=doc)


class AttestIntegrationRequest(BaseModel):
    """Shared request body for offline integration attestation endpoints."""

    model_path: str
    policies: list[str] | None = Field(default=None)
    sign: bool = False
    fail_on_violation: bool = False


class McpAttestRequest(BaseModel):
    """Request body for POST /attest/mcp — MCP tool manifest attestation."""

    catalog_path: str = Field(description="Absolute path to MCP tool catalog JSON file")
    policy: str = Field(default="mcp-strict", description="Policy to apply (default: mcp-strict)")
    sign: bool = Field(default=False, description="Sign catalog with Sigstore after scanning")
    fail_on_violation: bool = Field(
        default=False,
        description="Return HTTP 422 if any error-severity finding is present",
    )


# ── Wave 47 — RAG KB integrity ────────────────────────────────────────────────


class RagIndexRequest(BaseModel):
    """Request body for POST /rag/index."""

    corpus_dir: str = Field(description="Absolute path to the corpus directory to index")
    glob: str = Field(default="**/*", description='File glob pattern (default "**/*")')


class RagVerifyRequest(BaseModel):
    """Request body for POST /rag/verify."""

    corpus_dir: str = Field(description="Absolute path to the corpus directory to verify")


# ── Wave 48 — Model transformation lineage ────────────────────────────────────


class LineageRecordRequest(BaseModel):
    """Request body for POST /lineage/record."""

    model_dir: str = Field(description="Absolute path to the model artefact directory")
    operation: str = Field(description='Operation label, e.g. "compress", "sign", "export"')
    model_id: str = Field(default="", description="Model identifier (default: directory name)")
    input_dir: str = Field(default="", description="Source directory (default: model_dir)")
    params: dict = Field(default_factory=dict, description="Operation-specific key/value params")


class LineageVerifyRequest(BaseModel):
    """Request body for POST /lineage/verify."""

    model_dir: str = Field(description="Absolute path to the model artefact directory")


@app.post("/attest/mlflow")
async def attest_mlflow(req: AttestIntegrationRequest) -> JSONResponse:
    """Run an offline AttestPipeline for an MLflow model artifact.

    Equivalent to ``squash attest-mlflow <model_path>``.
    """
    _require_path(req.model_path)

    def _run() -> dict:
        config = AttestConfig(
            model_path=Path(req.model_path),
            policies=(req.policies if req.policies is not None else ["enterprise-strict"]),
            sign=req.sign,
            fail_on_violation=False,  # never raise from HTTP handler
            output_dir=Path(req.model_path).parent / "squash",
        )
        result = AttestPipeline.run(config)
        return _result_to_dict(result)

    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(_executor, _run)
    if req.fail_on_violation and not result.get("passed"):
        return JSONResponse(content=result, status_code=422)
    status = 200 if result.get("passed") else 400
    return JSONResponse(content=result, status_code=status)


@app.post("/attest/wandb")
async def attest_wandb(req: AttestIntegrationRequest) -> JSONResponse:
    """Run an offline AttestPipeline for a W&B artifact directory.

    Equivalent to ``squash attest-wandb <model_path>``.
    """
    _require_path(req.model_path)

    def _run() -> dict:
        config = AttestConfig(
            model_path=Path(req.model_path),
            policies=(req.policies if req.policies is not None else ["enterprise-strict"]),
            sign=req.sign,
            fail_on_violation=False,  # never raise from HTTP handler
            output_dir=Path(req.model_path).parent / "squash",
        )
        result = AttestPipeline.run(config)
        return _result_to_dict(result)

    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(_executor, _run)
    if req.fail_on_violation and not result.get("passed"):
        return JSONResponse(content=result, status_code=422)
    status = 200 if result.get("passed") else 400
    return JSONResponse(content=result, status_code=status)


class AttestHuggingFaceRequest(BaseModel):
    """Request body for POST /attest/huggingface."""

    model_path: str
    repo_id: str | None = Field(default=None)
    hf_token: str | None = Field(default=None)
    policies: list[str] | None = Field(default=None)
    sign: bool = False
    fail_on_violation: bool = False


@app.post("/attest/huggingface")
async def attest_huggingface(req: AttestHuggingFaceRequest) -> JSONResponse:
    """Attest a HuggingFace model — offline or with Hub push.

    If ``repo_id`` is provided the attestation artefacts are pushed to the Hub
    via ``HFSquash.attest_and_push()``.  Otherwise an offline
    ``AttestPipeline.run()`` is executed.

    Equivalent to ``squash attest-huggingface <model_path> [--repo-id ...]``.
    """
    _require_path(req.model_path)

    def _run() -> dict:
        if req.repo_id:
            from squash.integrations.huggingface import HFSquash

            result = HFSquash.attest_and_push(
                req.repo_id,
                Path(req.model_path),
                hf_token=req.hf_token or "",
                policies=(req.policies if req.policies is not None else ["enterprise-strict"]),
                sign=req.sign,
                fail_on_violation=False,  # never raise from HTTP handler
                repo_prefix="squash-attestations",
            )
        else:
            config = AttestConfig(
                model_path=Path(req.model_path),
                policies=(req.policies if req.policies is not None else ["enterprise-strict"]),
                sign=req.sign,
                fail_on_violation=False,  # never raise from HTTP handler
                output_dir=Path(req.model_path).parent / "squash",
            )
            result = AttestPipeline.run(config)
        return _result_to_dict(result)

    loop = asyncio.get_running_loop()
    try:
        result = await loop.run_in_executor(_executor, _run)
    except Exception as exc:  # HF push failures (auth, network, missing dep)
        raise HTTPException(status_code=502, detail=str(exc)) from exc
    if req.fail_on_violation and not result.get("passed"):
        return JSONResponse(content=result, status_code=422)
    status = 200 if result.get("passed") else 400
    return JSONResponse(content=result, status_code=status)


@app.post("/attest/langchain")
async def attest_langchain(req: AttestIntegrationRequest) -> JSONResponse:
    """Run a pre-deployment attestation for a LangChain pipeline artifact.

    Equivalent to ``squash attest-langchain <model_path>``.
    """
    _require_path(req.model_path)

    def _run() -> dict:
        config = AttestConfig(
            model_path=Path(req.model_path),
            policies=(req.policies if req.policies is not None else ["enterprise-strict"]),
            sign=req.sign,
            fail_on_violation=False,  # never raise from HTTP handler
            output_dir=Path(req.model_path).parent / "squash",
        )
        result = AttestPipeline.run(config)
        return _result_to_dict(result)

    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(_executor, _run)
    if req.fail_on_violation and not result.get("passed"):
        return JSONResponse(content=result, status_code=422)
    status = 200 if result.get("passed") else 400
    return JSONResponse(content=result, status_code=status)


@app.post("/attest/mcp")
async def attest_mcp(req: McpAttestRequest) -> JSONResponse:
    """Scan an MCP tool manifest catalog for supply-chain threats.

    Equivalent to ``squash attest-mcp <catalog_path> --policy mcp-strict``.

    Addresses EU AI Act Art. 9(2)(d): adversarial input resilience for
    agentic AI systems that invoke MCP tools at runtime.
    """
    _require_path(req.catalog_path)

    from squash.mcp import McpScanner, McpSigner  # lazy — keeps api.py import-fast

    def _run() -> dict:
        result = McpScanner.scan_file(Path(req.catalog_path), req.policy)
        if req.sign:
            McpSigner.sign(Path(req.catalog_path))
        return result.to_dict()

    loop = asyncio.get_running_loop()
    mcp_result = await loop.run_in_executor(_executor, _run)
    _COUNTERS["squash_attest_total"] += 1
    if req.fail_on_violation and mcp_result.get("status") == "unsafe":
        return JSONResponse(content=mcp_result, status_code=422)
    return JSONResponse(content=mcp_result)


def _require_path(p: str) -> None:
    if not Path(p).exists():
        raise HTTPException(status_code=404, detail=f"Path not found: {p}")


# ── Wave 46 — Agent audit trail endpoint ─────────────────────────────────────

@app.get("/audit/trail")
async def get_audit_trail(
    limit: int = 100,
    log: str | None = None,
) -> JSONResponse:
    """Return the last *limit* entries from the agent audit trail.

    The audit trail is an append-only JSONL file maintained by
    :class:`~squish.squash.governor.AgentAuditLogger`.  Each entry contains a
    SHA-256 hash of the LLM input/output, the event type, session ID, and a
    forward hash-chain link for tamper evidence.

    Addresses EU AI Act Art. 12: mandatory logging for high-risk AI systems.

    Query parameters
    ----------------
    limit:
        Maximum number of most-recent entries to return (default 100, max 1000).
    log:
        Override the audit log file path (default: ``SQUASH_AUDIT_LOG`` env or
        ``~/.squash/audit.jsonl``).
    """
    from squash.governor import AgentAuditLogger

    limit = max(1, min(limit, 1000))
    logger = AgentAuditLogger(log_path=log)
    entries = logger.read_tail(limit)
    return JSONResponse(
        content={
            "count": len(entries),
            "log_path": str(logger.path),
            "entries": entries,
        }
    )


# ── Wave 47 — RAG KB integrity ────────────────────────────────────────────────


@app.post("/rag/index")
async def post_rag_index(req: RagIndexRequest) -> JSONResponse:
    """Hash every document in *corpus_dir* and write ``.rag_manifest.json``.

    Equivalent to ``squash scan-rag index <corpus_dir>``.

    Addresses the #1 enterprise RAG failure: silently poisoned or drifted
    knowledge bases.  The returned ``manifest_hash`` is a deterministic
    content fingerprint suitable for CI/CD gating.
    """
    from squash.rag import RagScanner

    corpus = Path(req.corpus_dir)
    if not corpus.is_dir():
        raise HTTPException(status_code=404, detail=f"corpus_dir not found: {req.corpus_dir}")

    def _run() -> dict:
        manifest = RagScanner.index(corpus, glob=req.glob)
        return {
            "corpus_dir": manifest.corpus_dir,
            "file_count": manifest.file_count,
            "manifest_path": str(Path(manifest.corpus_dir) / RagScanner.MANIFEST_FILENAME),
            "manifest_hash": manifest.manifest_hash,
            "indexed_at": manifest.indexed_at,
        }

    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(_executor, _run)
    return JSONResponse(content=result)


@app.post("/rag/verify")
async def post_rag_verify(req: RagVerifyRequest) -> JSONResponse:
    """Compare live corpus against the stored manifest.

    Equivalent to ``squash scan-rag verify <corpus_dir>``.

    Returns HTTP 200 with ``"ok": true`` when no drift is detected.
    Returns HTTP 200 with ``"ok": false`` when drift exists — callers
    should gate deployments on the ``ok`` field, not the status code.
    """
    from squash.rag import RagScanner

    corpus = Path(req.corpus_dir)
    if not corpus.exists():
        raise HTTPException(status_code=404, detail=f"corpus_dir not found: {req.corpus_dir}")

    def _run() -> dict:
        result = RagScanner.verify(corpus)
        return result.to_dict()

    loop = asyncio.get_running_loop()
    content = await loop.run_in_executor(_executor, _run)
    return JSONResponse(content=content)


# ── Wave 48 — Model transformation lineage ────────────────────────────────────


@app.post("/lineage/record")
async def post_lineage_record(req: LineageRecordRequest) -> JSONResponse:
    """Record a model transformation event into the lineage chain.

    Equivalent to ``squash lineage record <model_dir> --operation <op>``.
    """
    from squash.lineage import LineageChain

    model_dir = Path(req.model_dir)
    model_dir.mkdir(parents=True, exist_ok=True)

    def _run() -> dict:
        model_id = req.model_id or model_dir.name
        input_dir = req.input_dir or str(model_dir)
        evt = LineageChain.create_event(
            operation=req.operation,
            model_id=model_id,
            input_dir=input_dir,
            output_dir=str(model_dir),
            params=req.params,
        )
        event_hash = LineageChain.record(model_dir, evt)
        return {"event_hash": event_hash, "event_id": evt.event_id, "model_dir": str(model_dir)}

    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(_executor, _run)
    return JSONResponse(content=result)


@app.get("/lineage/show")
async def get_lineage_show(model_dir: str) -> JSONResponse:
    """Return all recorded lineage events for a model directory.

    Equivalent to ``squash lineage show <model_dir>``.
    """
    from squash.lineage import LineageChain

    mdir = Path(model_dir)
    if not mdir.exists():
        raise HTTPException(status_code=404, detail=f"model_dir not found: {model_dir}")

    def _run() -> list[dict]:
        events = LineageChain.load(mdir)
        return [e.to_dict() for e in events]

    loop = asyncio.get_running_loop()
    events = await loop.run_in_executor(_executor, _run)
    return JSONResponse(content={"model_dir": str(mdir), "event_count": len(events), "events": events})


@app.post("/lineage/verify")
async def post_lineage_verify(req: LineageVerifyRequest) -> JSONResponse:
    """Verify the integrity of a model's lineage chain.

    Equivalent to ``squash lineage verify <model_dir>``.
    Returns HTTP 200 with ``"ok": true`` when the chain is intact.
    Returns HTTP 200 with ``"ok": false`` on tampering or missing chain.
    """
    from squash.lineage import LineageChain

    mdir = Path(req.model_dir)
    if not mdir.exists():
        raise HTTPException(status_code=404, detail=f"model_dir not found: {req.model_dir}")

    def _run() -> dict:
        result = LineageChain.verify(mdir)
        return result.to_dict()

    loop = asyncio.get_running_loop()
    content = await loop.run_in_executor(_executor, _run)
    return JSONResponse(content=content)


# ── W49: offline / air-gapped endpoints ──────────────────────────────────────


@app.post("/keygen")
async def keygen(req: KeygenRequest) -> JSONResponse:
    """Generate an Ed25519 keypair for offline BOM signing.

    Equivalent to ``squash keygen <name> --key-dir <dir>``.
    Returns the absolute paths to the generated ``.priv.pem`` and ``.pub.pem`` files.
    """
    from squash.oms_signer import OmsSigner

    def _run() -> tuple[str, str]:
        priv, pub = OmsSigner.keygen(req.key_name, req.key_dir)
        return str(priv), str(pub)

    loop = asyncio.get_running_loop()
    try:
        priv_path, pub_path = await loop.run_in_executor(_executor, _run)
    except ImportError as exc:
        raise HTTPException(status_code=503, detail=str(exc))

    return JSONResponse(content={"priv_path": priv_path, "pub_path": pub_path})


@app.post("/attest/verify-local")
async def verify_local(req: VerifyLocalRequest) -> JSONResponse:
    """Verify a CycloneDX BOM against a local Ed25519 offline signature.

    Equivalent to ``squash verify-local <bom_path> --key <pub_key_path>``.
    Returns ``{"ok": true}`` on valid signature, ``{"ok": false}`` on failure.
    """
    from squash.oms_signer import OmsVerifier

    bom = Path(req.bom_path)
    if not bom.exists():
        raise HTTPException(status_code=404, detail=f"bom_path not found: {req.bom_path}")

    pub = Path(req.pub_key_path)
    if not pub.exists():
        raise HTTPException(status_code=404, detail=f"pub_key_path not found: {req.pub_key_path}")

    sig = Path(req.sig_path) if req.sig_path else None

    def _run() -> bool:
        return OmsVerifier.verify_local(bom, pub, sig)

    loop = asyncio.get_running_loop()
    try:
        ok = await loop.run_in_executor(_executor, _run)
    except ImportError as exc:
        raise HTTPException(status_code=503, detail=str(exc))

    return JSONResponse(content={"ok": ok, "bom_path": str(bom)})


@app.post("/pack/offline")
async def pack_offline(req: PackOfflineRequest) -> JSONResponse:
    """Bundle a model directory into a portable .squash-bundle.tar.gz archive.

    Equivalent to ``squash pack-offline <model_dir>``.
    Returns the bundle path and its size in bytes.
    """
    from squash.oms_signer import OmsSigner

    mdir = Path(req.model_dir)
    if not mdir.exists():
        raise HTTPException(status_code=404, detail=f"model_dir not found: {req.model_dir}")

    out = Path(req.output_path) if req.output_path else None

    def _run() -> str:
        return str(OmsSigner.pack_offline(mdir, out))

    loop = asyncio.get_running_loop()
    try:
        bundle_path = await loop.run_in_executor(_executor, _run)
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc))

    size_bytes = Path(bundle_path).stat().st_size
    return JSONResponse(content={
        "bundle_path": bundle_path,
        "size_bytes": size_bytes,
        "model_dir": str(mdir),
    })


# ──────────────────────────────────────────────────────────────────────────────
# W52-55: Squash Cloud — tenant management + dashboard endpoints
# ──────────────────────────────────────────────────────────────────────────────
# All /cloud/* endpoints are multi-tenant.  Tenant identity is resolved via:
#   • SQUASH_JWT_SECRET set → JWT Bearer auth (tenant_id claim)
#   • No SQUASH_JWT_SECRET  → X-Tenant-ID header (single-tenant or trusted proxy)
#
# For the hosted Squash Cloud product the Next.js dashboard authenticates with
# the SSO provider (OIDC), receives a signed JWT, and forwards it here.  The
# squish API validates the JWT signature with the shared SQUASH_JWT_SECRET
# (HS256) and extracts the tenant_id claim.
# ──────────────────────────────────────────────────────────────────────────────


def _ts_now() -> str:
    """Return current UTC time as ISO-8601 string."""
    import datetime
    return datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")


# ── Tenant management ─────────────────────────────────────────────────────────

@app.post("/cloud/tenant", status_code=201)
async def cloud_create_tenant(req: TenantCreateRequest) -> JSONResponse:
    """Register a new tenant in the Squash Cloud dashboard.

    Idempotent: re-posting the same ``tenant_id`` updates the record.

    Used by the Next.js onboarding flow and by enterprise provisioning scripts.
    """
    if len(req.tenant_id) > 64 or not req.tenant_id:
        raise HTTPException(status_code=400, detail="tenant_id must be 1–64 chars")
    if len(_tenants) >= _TENANT_LIMIT:
        raise HTTPException(status_code=507, detail="Tenant limit reached")
    record: dict[str, Any] = {
        "tenant_id": req.tenant_id,
        "name": req.name,
        "plan": req.plan,
        "contact_email": req.contact_email,
        "created_at": _tenants.get(req.tenant_id, {}).get("created_at", _ts_now()),
        "updated_at": _ts_now(),
    }
    _db_write_tenant(req.tenant_id, record)
    return JSONResponse(status_code=201, content=record)


@app.get("/cloud/tenant/{tenant_id}")
async def cloud_get_tenant(tenant_id: str) -> JSONResponse:
    """Return metadata for a registered tenant.

    Used by the dashboard header / org switcher to display plan and quota info.
    """
    record = _tenants.get(tenant_id)
    if record is None:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    return JSONResponse(content=record)


@app.get("/cloud/tenants")
async def cloud_list_tenants(request: Request) -> JSONResponse:
    """List all registered tenants.  Admin-only — requires SQUASH_API_TOKEN auth."""
    return JSONResponse(content={
        "count": len(_tenants),
        "tenants": list(_tenants.values()),
    })


@app.patch("/cloud/tenant/{tenant_id}")
async def cloud_update_tenant(tenant_id: str, req: TenantUpdateRequest) -> JSONResponse:
    """Partially update a registered tenant's metadata.

    Only the fields explicitly set in the request body are changed; omitted
    fields retain their existing values.
    """
    existing = _tenants.get(tenant_id)
    if existing is None:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    updated = dict(existing)
    if req.name is not None:
        updated["name"] = req.name
    if req.plan is not None:
        updated["plan"] = req.plan
    if req.contact_email is not None:
        updated["contact_email"] = req.contact_email
    updated["updated_at"] = _ts_now()
    _db_write_tenant(tenant_id, updated)
    return JSONResponse(content=updated)


@app.delete("/cloud/tenant/{tenant_id}", status_code=204)
async def cloud_delete_tenant(tenant_id: str) -> Response:
    """Delete a tenant and all associated records.

    Permanently removes the tenant's inventory, VEX alerts, drift events, and
    policy stats from both memory and SQLite (when ``SQUASH_CLOUD_DB`` is set).
    Returns HTTP 204 No Content on success.
    """
    if tenant_id not in _tenants:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    _db_delete_tenant(tenant_id)
    return Response(status_code=204)


# ── Model inventory ───────────────────────────────────────────────────────────

@app.post("/cloud/inventory/register", status_code=201)
async def cloud_register_inventory(
    req: InventoryRegisterRequest, request: Request
) -> JSONResponse:
    """Register an attestation result in the cloud model inventory.

    Called automatically by ``POST /attest`` when ``tenant_id`` is set, or
    explicitly by CI/CD scripts for out-of-band registrations.
    """
    if not req.tenant_id:
        raise HTTPException(status_code=400, detail="tenant_id required")
    ts = req.timestamp or _ts_now()
    record: dict[str, Any] = {
        "model_id": req.model_id,
        "model_path": req.model_path,
        "bom_path": req.bom_path,
        "attestation_passed": req.attestation_passed,
        "policy_results": req.policy_results,
        "vex_cves": req.vex_cves,
        "timestamp": ts,
        "record_id": str(uuid.uuid4()),
    }
    _db_write_inventory(req.tenant_id, record)
    # Update policy stats aggregates
    for policy_name, pr in req.policy_results.items():
        _db_inc_policy_stat(req.tenant_id, policy_name, passed=bool(pr.get("passed")))
    return JSONResponse(status_code=201, content=record)


@app.get("/cloud/inventory")
async def cloud_get_inventory(
    request: Request,
    limit: int = 50,
    passed: bool | None = None,
) -> JSONResponse:
    """Return the model inventory for the resolved tenant.

    Query parameters
    ----------------
    limit:
        Maximum number of most-recent records to return (default 50, max 500).
    passed:
        If ``true``, return only models where all policies passed.
        If ``false``, return only models with policy failures.
        Omit to return all.
    """
    tenant_id = _resolve_tenant_id(request)
    limit = max(1, min(limit, 500))
    records: list[dict[str, Any]] = list(_inventory[tenant_id])
    if passed is not None:
        records = [r for r in records if r["attestation_passed"] is passed]
    return JSONResponse(content={
        "tenant_id": tenant_id,
        "count": len(records[-limit:]),
        "total": len(records),
        "models": records[-limit:],
    })


# ── VEX alert feed ────────────────────────────────────────────────────────────

@app.post("/cloud/vex/alert", status_code=201)
async def cloud_post_vex_alert(
    req: VexAlertRequest, request: Request
) -> JSONResponse:
    """Ingest a VEX alert event into the cloud dashboard feed.

    Called by the VEX poller / ``squash vex subscribe`` daemon when a new
    CVE is detected in the feed for a subscribed tenant.
    """
    if not req.tenant_id:
        raise HTTPException(status_code=400, detail="tenant_id required")
    record: dict[str, Any] = {
        "alert_id": str(uuid.uuid4()),
        "cve_id": req.cve_id,
        "severity": req.severity,
        "model_id": req.model_id,
        "status": req.status,
        "detail": req.detail,
        "tenant_id": req.tenant_id,
        "created_at": _ts_now(),
    }
    _db_write_vex_alert(req.tenant_id, record)
    return JSONResponse(status_code=201, content=record)


@app.get("/cloud/vex/alerts")
async def cloud_get_vex_alerts(
    request: Request,
    limit: int = 50,
    status: str | None = None,
    severity: str | None = None,
) -> JSONResponse:
    """Return the VEX alert feed for the resolved tenant.

    Addresses CISO-level requirement: real-time CVE exposure dashboard.

    Query parameters
    ----------------
    limit:
        Maximum recent alerts (default 50, max 500).
    status:
        Filter by alert status: ``open`` | ``acknowledged`` | ``resolved``.
    severity:
        Filter by severity: ``critical`` | ``high`` | ``medium`` | ``low`` | ``unknown``.
    """
    tenant_id = _resolve_tenant_id(request)
    limit = max(1, min(limit, 500))
    alerts: list[dict[str, Any]] = list(_vex_alerts[tenant_id])
    if status:
        alerts = [a for a in alerts if a["status"] == status]
    if severity:
        alerts = [a for a in alerts if a["severity"] == severity]
    return JSONResponse(content={
        "tenant_id": tenant_id,
        "count": len(alerts[-limit:]),
        "total": len(alerts),
        "alerts": alerts[-limit:],
    })


# ── Drift event stream ────────────────────────────────────────────────────────

@app.post("/cloud/drift/event", status_code=201)
async def cloud_post_drift_event(
    req: DriftEventRequest, request: Request
) -> JSONResponse:
    """Ingest a drift event into the cloud dashboard stream.

    Called by CI/CD after ``squash drift-check`` detects BOM divergence.
    Enables boardroom-level "supply chain integrity over time" charts.
    """
    if not req.tenant_id:
        raise HTTPException(status_code=400, detail="tenant_id required")
    record: dict[str, Any] = {
        "event_id": str(uuid.uuid4()),
        "model_id": req.model_id,
        "bom_a": req.bom_a,
        "bom_b": req.bom_b,
        "added": req.added,
        "removed": req.removed,
        "changed": req.changed,
        "severity": req.severity,
        "tenant_id": req.tenant_id,
        "timestamp": req.timestamp or _ts_now(),
    }
    _db_write_drift_event(req.tenant_id, record)
    return JSONResponse(status_code=201, content=record)


@app.get("/cloud/drift/events")
async def cloud_get_drift_events(
    request: Request,
    limit: int = 50,
    model_id: str | None = None,
    severity: str | None = None,
) -> JSONResponse:
    """Return the drift event stream for the resolved tenant.

    Addresses EU AI Act Art. 16(d): ongoing monitoring of model changes.

    Query parameters
    ----------------
    limit:
        Maximum recent events (default 50, max 500).
    model_id:
        Filter to a specific model.
    severity:
        Filter by: ``info`` | ``warning`` | ``critical``.
    """
    tenant_id = _resolve_tenant_id(request)
    limit = max(1, min(limit, 500))
    events: list[dict[str, Any]] = list(_drift_events[tenant_id])
    if model_id:
        events = [e for e in events if e["model_id"] == model_id]
    if severity:
        events = [e for e in events if e["severity"] == severity]
    return JSONResponse(content={
        "tenant_id": tenant_id,
        "count": len(events[-limit:]),
        "total": len(events),
        "events": events[-limit:],
    })


# ── Policy dashboard ──────────────────────────────────────────────────────────

@app.get("/cloud/policy/dashboard")
async def cloud_policy_dashboard(request: Request) -> JSONResponse:
    """Return policy pass/fail aggregates for the resolved tenant.

    Feeds the boardroom compliance reporting dashboard:
    "What % of model deployments are policy-compliant across our fleet?"

    Addresses CMMC Level 2/3, NIST AI RMF, and ISO/IEC 42001 audit requirements.
    """
    tenant_id = _resolve_tenant_id(request)
    stats = _policy_stats[tenant_id]
    dashboard: list[dict[str, Any]] = []
    for policy_name, counts in stats.items():
        total = counts["passed"] + counts["failed"]
        rate = round(counts["passed"] / total, 4) if total else 0.0
        dashboard.append({
            "policy": policy_name,
            "passed": counts["passed"],
            "failed": counts["failed"],
            "total": total,
            "pass_rate": rate,
        })
    # Overall aggregate — model-level (did the deployment pass attestation?)
    models = list(_inventory[tenant_id])
    m_passed = sum(1 for m in models if m.get("attestation_passed"))
    m_failed = len(models) - m_passed
    m_total = len(models)
    return JSONResponse(content={
        "tenant_id": tenant_id,
        "overall": {
            "passed": m_passed,
            "failed": m_failed,
            "total": m_total,
            "pass_rate": round(m_passed / m_total, 4) if m_total else 0.0,
        },
        "by_policy": dashboard,
    })


# ── Tenant-scoped audit log ───────────────────────────────────────────────────

@app.get("/cloud/audit")
async def cloud_get_audit(
    request: Request,
    limit: int = 100,
    log: str | None = None,
) -> JSONResponse:
    """Return the last *limit* audit trail entries, scoped to the resolved tenant.

    Delegates to the existing AgentAuditLogger hash-chain log.  When
    ``tenant_id`` is non-empty, entries are filtered to those whose
    ``session_id`` starts with the tenant prefix (CI pipelines should set
    ``SQUASH_AUDIT_SESSION_PREFIX=<tenant_id>-``).

    Addresses EU AI Act Art. 12 and SEC cybersecurity disclosure requirements.
    """
    from squash.governor import AgentAuditLogger

    tenant_id = _resolve_tenant_id(request)
    limit = max(1, min(limit, 1000))
    logger = AgentAuditLogger(log_path=log)
    entries = logger.read_tail(limit * 10 if tenant_id else limit)

    if tenant_id:
        entries = [
            e for e in entries
            if str(e.get("session_id", "")).startswith(tenant_id)
        ][:limit]
    else:
        entries = entries[:limit]

    return JSONResponse(content={
        "tenant_id": tenant_id,
        "count": len(entries),
        "log_path": str(logger.path),
        "entries": entries,
    })


# ── W58: CloudDB-backed read endpoints ───────────────────────────────────────


@app.get("/cloud/tenants/{tenant_id}/inventory")
async def cloud_get_tenant_inventory_db(tenant_id: str) -> JSONResponse:
    """Return inventory for *tenant_id* — reads from CloudDB when active.

    Unlike ``GET /cloud/inventory`` (resolved via JWT/header), this endpoint
    accepts the tenant_id explicitly in the URL path (suitable for admin
    dashboards and cross-tenant tooling).
    """
    if tenant_id not in _tenants:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    records = _db_read_inventory(tenant_id)
    return JSONResponse(content={
        "tenant_id": tenant_id,
        "count": len(records),
        "models": records,
    })


@app.get("/cloud/tenants/{tenant_id}/vex-alerts")
async def cloud_get_tenant_vex_alerts_db(tenant_id: str) -> JSONResponse:
    """Return VEX alerts for *tenant_id* — reads from CloudDB when active."""
    if tenant_id not in _tenants:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    alerts = _db_read_vex_alerts(tenant_id)
    return JSONResponse(content={
        "tenant_id": tenant_id,
        "count": len(alerts),
        "alerts": alerts,
    })


@app.get("/cloud/tenants/{tenant_id}/drift-events")
async def cloud_get_tenant_drift_events(tenant_id: str) -> JSONResponse:
    """Return drift events for *tenant_id* — reads from CloudDB when active."""
    if tenant_id not in _tenants:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    events = _db_read_drift_events(tenant_id)
    return JSONResponse(content={
        "tenant_id": tenant_id,
        "count": len(events),
        "events": events,
    })


@app.get("/cloud/tenants/{tenant_id}/policy-stats")
async def cloud_get_tenant_policy_stats(tenant_id: str) -> JSONResponse:
    """Return per-tenant policy pass/fail counts — reads from CloudDB when active."""
    if tenant_id not in _tenants:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    stats = _db_read_tenant_policy_stats(tenant_id)
    return JSONResponse(content={
        "tenant_id": tenant_id,
        "count": len(stats),
        "stats": stats,
    })


@app.get("/cloud/policy-stats")
async def cloud_get_policy_stats() -> JSONResponse:
    """Return cross-tenant policy pass/fail aggregates — reads from CloudDB when active.

    Aggregates across ALL tenants (global fleet view).  Does not require
    tenant auth — intended for admin dashboards and SLA reporting.
    """
    stats = _db_read_policy_stats()
    return JSONResponse(content={
        "count": len(stats),
        "stats": stats,
    })


# ── W61: Tenant summary endpoint ─────────────────────────────────────────────


@app.get("/cloud/tenants/{tenant_id}/summary")
async def cloud_get_tenant_summary(tenant_id: str) -> JSONResponse:
    """Return aggregated compliance posture for *tenant_id* in a single call.

    Collects inventory count, VEX alert count, drift-event count, and
    policy pass/fail stats — the "boardroom at a glance" view that otherwise
    requires four separate round-trips.

    Addresses EU AI Act Art. 9 (risk management) and NIST AI RMF Govern 1.2
    audit trail requirements.
    """
    if tenant_id not in _tenants:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    summary = _db_read_tenant_summary(tenant_id)
    return JSONResponse(content={
        "tenant_id": tenant_id,
        "tenant": _tenants[tenant_id],
        **summary,
    })


@app.get("/cloud/tenants/{tenant_id}/compliance-score")  # W62
async def cloud_get_tenant_compliance_score(tenant_id: str) -> JSONResponse:
    """Return a computed compliance score (0–100) and letter grade for *tenant_id*.

    Distils the tenant's per-policy pass/fail history from W60 policy-stats
    into a single executive-dashboard metric.  Score formula:
    ``sum(passed) / sum(passed + failed) * 100`` across all policies.

    An unknown tenant or one with no policy checks returns score=100.0 /
    grade='A' — no violations recorded implies perfect posture.

    Addresses NIST AI RMF Govern 1.7 (quantified risk metrics) and EU AI Act
    Art. 9 obligation to maintain auditable risk management records.
    """
    if tenant_id not in _tenants:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    result = _db_read_tenant_compliance_score(tenant_id)
    return JSONResponse(content={"tenant_id": tenant_id, **result})


@app.get("/cloud/tenants/{tenant_id}/compliance-history")  # W63
async def cloud_get_tenant_compliance_history(tenant_id: str) -> JSONResponse:
    """Return a time-bucketed compliance history for *tenant_id*.

    Each history entry represents a calendar day on which at least one drift
    event was recorded, paired with the tenant's compliance score and letter
    grade.  Entries are sorted ascending by date so a client can render a
    trend line.  Returns an empty ``history`` list for a tenant with no
    drift events.

    Addresses NIST AI RMF Govern 1.7 (quantified risk metrics over time) and
    EU AI Act Art. 9 obligation to maintain auditable longitudinal risk records.
    """
    if tenant_id not in _tenants:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    history = _db_read_tenant_compliance_history(tenant_id)
    return JSONResponse(content={"tenant_id": tenant_id, "history": history})


@app.get("/cloud/compliance-overview")  # W64
async def cloud_get_compliance_overview() -> JSONResponse:
    """Return a platform-wide compliance aggregate across all registered tenants.

    Provides a single-request view of the entire platform's AI compliance posture,
    including total and compliant tenant counts, the average compliance score, and
    the top three tenants most at risk.  Always returns HTTP 200; an empty platform
    returns zero counts and an empty top_at_risk list.

    Addresses EU AI Act Art. 9 / Art. 17 obligations for systematic portfolio-level
    risk monitoring and NIST AI RMF Govern 1.7 (quantified risk tracking across
    all AI deployments in the organisation).
    """
    return JSONResponse(content=_db_read_compliance_overview())


@app.get("/cloud/vex-feed")  # W65 / W75
async def cloud_get_vex_feed() -> JSONResponse:
    """Return hosted cross-tenant VEX advisory feed.

    EU AI Act Art. 9 / ISO 42001 §8.4: operators must maintain a live,
    machine-readable feed of known-vulnerability advisories (VEX) across
    the full model inventory for real-time supply-chain transparency.

    W75 plan gating:
    - community:    alerts filtered to the last 7 days; 402 if history exists
                    but all alerts are outside the window.
    - professional: alerts filtered to the last 90 days.
    - enterprise:   full history, no age filter.
    All responses include ``X-Squash-Plan: <plan>`` header.
    """
    payload = _db_read_vex_feed()
    limits = _get_plan_limits()
    max_age = limits["vex_max_age_days"]

    if max_age is not None:
        cutoff = datetime.date.today() - datetime.timedelta(days=max_age)
        all_alerts: list[dict[str, Any]] = payload.get("alerts", [])
        def _alert_in_window(a: dict[str, Any]) -> bool:
            try:
                return datetime.datetime.fromisoformat(a["created_at"]).date() >= cutoff
            except (KeyError, ValueError):
                return True  # no/invalid timestamp — treat as recent

        filtered = [a for a in all_alerts if _alert_in_window(a)]
        # 402: history exists but none fall within the community window.
        if SQUASH_PLAN == "community" and all_alerts and not filtered:
            return JSONResponse(
                status_code=402,
                content={
                    "error": "plan_upgrade_required",
                    "plan": "community",
                    "upgrade_to": "professional",
                    "reason": "vex history beyond 7 days requires a professional plan",
                },
                headers={"X-Squash-Plan": SQUASH_PLAN},
            )
        payload = {**payload, "alerts": filtered, "total_alerts": len(filtered)}

    return JSONResponse(
        content=payload,
        headers={"X-Squash-Plan": SQUASH_PLAN},
    )


@app.post("/cloud/tenants/{tenant_id}/vertex-result", status_code=201)  # W66
async def cloud_post_vertex_result(
    tenant_id: str,
    body: dict[str, Any],
) -> JSONResponse:
    """Ingest a GCP Vertex AI attestation result for *tenant_id*.

    Request body fields:
    - ``model_resource_name`` (str, required): full Vertex AI Model resource name.
    - ``passed`` (bool, required): whether the attestation run passed.
    - ``labels`` (dict, optional): GCP labels applied to the Model resource.

    Returns ``{"status": "ok", "tenant_id": …, "passed": …}`` on success.
    """
    mrn = body.get("model_resource_name")
    if not mrn:
        raise HTTPException(status_code=422, detail="model_resource_name is required")
    passed = bool(body.get("passed", False))
    labels: dict[str, str] | None = body.get("labels") or None
    _db_append_vertex_result(tenant_id, str(mrn), passed, labels)
    return JSONResponse(
        content={"status": "ok", "tenant_id": tenant_id, "passed": passed},
        status_code=201,
    )


@app.get("/cloud/tenants/{tenant_id}/vertex-results")  # W66
async def cloud_get_vertex_results(tenant_id: str) -> JSONResponse:
    """Return all stored Vertex AI attestation results for *tenant_id*.

    Newest result first. Returns ``{"tenant_id": …, "results": […]}``
    where each entry has ``{model_resource_name, passed, labels, ts}``.
    """
    results = _db_read_vertex_results(tenant_id)
    return JSONResponse(content={"tenant_id": tenant_id, "results": results})


@app.post("/cloud/tenants/{tenant_id}/ado-result", status_code=201)  # W67
async def cloud_post_ado_result(
    tenant_id: str,
    body: dict[str, Any],
) -> JSONResponse:
    """Ingest an Azure DevOps pipeline attestation result for *tenant_id*.

    Request body fields:
    - ``pipeline_run_id`` (str, required): ADO pipeline run identifier.
    - ``passed`` (bool, required): whether the attestation pipeline run passed.
    - ``variables`` (dict, optional): pipeline output variables to store.

    Returns ``{"status": "ok", "tenant_id": …, "passed": …}`` on success.
    """
    run_id = body.get("pipeline_run_id")
    if not run_id:
        raise HTTPException(status_code=422, detail="pipeline_run_id is required")
    passed = bool(body.get("passed", False))
    variables: dict[str, Any] | None = body.get("variables") or None
    _db_append_ado_result(tenant_id, str(run_id), passed, variables)
    return JSONResponse(
        content={"status": "ok", "tenant_id": tenant_id, "passed": passed},
        status_code=201,
    )


@app.get("/cloud/tenants/{tenant_id}/ado-results")  # W67
async def cloud_get_ado_results(tenant_id: str) -> JSONResponse:
    """Return all stored Azure DevOps attestation results for *tenant_id*.

    Newest result first. Returns ``{"tenant_id": …, "results": […]}``
    where each entry has ``{pipeline_run_id, passed, variables, ts}``.
    """
    results = _db_read_ado_results(tenant_id)
    return JSONResponse(content={"tenant_id": tenant_id, "results": results})


@app.get("/cloud/tenants/{tenant_id}/attestation-score")  # W68
async def cloud_get_attestation_score(tenant_id: str) -> JSONResponse:
    """Return combined attestation pass/fail score for *tenant_id*.

    Aggregates GCP Vertex AI (W66) and Azure DevOps (W67) attestation results.
    Returns ``{tenant_id, total, passed, failed, pass_rate}`` where
    ``pass_rate`` is 0.0 when no attestations are recorded.

    Supports EU AI Act Art. 9 supply-chain integrity continuous-assessment
    obligations: operators can poll this endpoint to verify that all upstream
    CI/CD and ML-platform pipelines are passing attestation checks.
    """
    score = _db_read_attestation_score(tenant_id)
    return JSONResponse(content={"tenant_id": tenant_id, **score})


@app.get("/cloud/tenants/{tenant_id}/attestations")  # W69
async def cloud_get_attestations(tenant_id: str) -> JSONResponse:
    """Return merged chronological attestation history for *tenant_id*, newest first.

    Combines GCP Vertex AI (W66) and Azure DevOps (W67) attestation records.
    Returns ``{tenant_id, attestations: [...]}`` where each item includes
    ``{source, passed, ts, ...source-specific fields}``.

    Supports EU AI Act Art. 12 + Art. 18 technical-documentation and
    record-keeping obligations by providing a complete, auditable trail.
    """
    attestations = _db_read_attestations(tenant_id)
    return JSONResponse(content={"tenant_id": tenant_id, "attestations": attestations})


@app.get("/cloud/attestation-overview")  # W70
async def cloud_get_attestation_overview() -> JSONResponse:
    """Return cross-tenant attestation health overview for the whole platform.

    Aggregates every registered tenant's attestation scores into:
    ``{total_attestations, tenants_covered, platform_pass_rate, tenants_with_failures}``.

    Always returns HTTP 200; empty platform returns zero counts and an empty list.
    Supports EU AI Act Art. 9 / Art. 17 portfolio-level supply-chain risk monitoring.
    """
    return JSONResponse(content=_db_read_attestation_overview())


@app.get("/cloud/tenants/{tenant_id}/conformance")  # W71
async def cloud_get_tenant_conformance(tenant_id: str) -> JSONResponse:
    """Return EU AI Act conformance status for *tenant_id*.

    Evaluates three gates:
    - ``compliance_score >= 80.0``  (policy hygiene)
    - ``attestation_pass_rate >= 0.8``  (model attestation coverage)
    - ``open_vex_alerts == 0``  (no unresolved supply-chain alerts)

    Returns ``{tenant_id, conformant, compliance_score, attestation_pass_rate,
    open_vex_alerts, reasons}``.  Always HTTP 200; unknown tenants return
    ``conformant=False`` because attestation_pass_rate defaults to 0.0.
    Supports EU AI Act Art. 9 + Art. 17 operator self-assessment obligations.
    """
    data = _db_read_tenant_conformance(tenant_id)
    return JSONResponse(content={"tenant_id": tenant_id, **data})


@app.get("/cloud/conformance-report")  # W72
async def cloud_get_conformance_report() -> JSONResponse:
    """Return platform-wide EU AI Act conformance status across all registered tenants.

    Returns ``{total_tenants, conformant_tenants, non_conformant_tenants,
    non_conformant: [{tenant_id, compliance_score, attestation_pass_rate,
    open_vex_alerts, reasons}]}``. Always HTTP 200; empty platform returns
    all-zero counts.
    Supports EU AI Act Art. 9 + 12 + 17 + 18 platform-level audit obligations.
    """
    return JSONResponse(content=_db_read_conformance_report())


def _result_to_dict(r: AttestResult) -> dict[str, Any]:
    return {
        "model_id": r.model_id,
        "passed": r.passed,
        "output_dir": str(r.output_dir),
        "scan_status": r.scan_result.status if r.scan_result else "skipped",
        "policy_results": {
            name: {
                "passed": pr.passed,
                "error_count": pr.error_count,
                "warning_count": pr.warning_count,
                "pass_count": pr.pass_count,
            }
            for name, pr in r.policy_results.items()
        },
        "artifacts": {
            "cyclonedx": str(r.cyclonedx_path) if r.cyclonedx_path else None,
            "spdx_json": str(r.spdx_json_path) if r.spdx_json_path else None,
            "spdx_tv": str(r.spdx_tv_path) if r.spdx_tv_path else None,
            "signature": str(r.signature_path) if r.signature_path else None,
            "vex_report": str(r.vex_report_path) if r.vex_report_path else None,
            "master_record": str(r.master_record_path) if r.master_record_path else None,
        },
        "error": r.error,
    }


# ── On-demand drift check ─────────────────────────────────────────────────────


class DriftCheckRequest(BaseModel):
    """Request body for ``POST /drift-check``."""

    model_dir: str = Field(
        description="Absolute path (on the squash server) to the compressed model directory"
    )
    bom_path: str = Field(
        description="Absolute path (on the squash server) to the CycloneDX BOM JSON sidecar"
    )


@app.post("/drift-check")
async def post_drift_check(req: DriftCheckRequest) -> JSONResponse:
    """Compare BOM-attested SHA-256 digests against the model directory on disk.

    Drives CMMC / EU AI Act Art. 9 supply-chain integrity verification from
    CI/CD pipelines.  Both ``model_dir`` and ``bom_path`` must be absolute
    paths accessible to the squash server process.

    Returns
    -------
    JSON with keys ``ok`` (bool), ``files_checked`` (int), ``hits`` (list),
    and ``summary`` (str).  Each hit has ``path``, ``expected_digest``,
    ``actual_digest``, ``missing``, and ``tampered`` fields.
    """
    from squash.drift import DriftConfig, DriftResult, check_drift

    model_path = Path(req.model_dir).resolve()
    bom = Path(req.bom_path).resolve()

    if not model_path.is_dir():
        raise HTTPException(status_code=400, detail=f"model_dir not found: {model_path}")
    if not bom.is_file():
        raise HTTPException(status_code=400, detail=f"bom_path not found: {bom}")

    try:
        result: DriftResult = await asyncio.get_event_loop().run_in_executor(
            None, check_drift, DriftConfig(bom_path=bom, model_dir=model_path)
        )
    except ValueError as exc:
        raise HTTPException(status_code=422, detail=str(exc))
    except OSError as exc:
        raise HTTPException(status_code=500, detail=f"I/O error reading drift inputs: {exc}")

    return JSONResponse(content={
        "ok": result.ok,
        "files_checked": result.files_checked,
        "summary": result.summary,
        "hits": [
            {
                "path": h.path,
                "expected_digest": h.expected_digest,
                "actual_digest": h.actual_digest,
                "missing": h.missing,
                "tampered": h.tampered,
            }
            for h in result.hits
        ],
    })


# ── W76: Tenant audit export bundle ──────────────────────────────────────────


def _db_build_tenant_export(tenant_id: str) -> dict[str, Any]:  # W76
    """Compose a complete compliance export bundle for *tenant_id*.

    Export scope is gated by ``SQUASH_PLAN``:

    * ``summary`` (community): compliance score/grade, conformance, attestation
      score, enforcement signal, and aggregate summary counts.
    * ``compliance`` (professional): + VEX alerts (plan-age-filtered per W75),
      policy stats, inventory, and compliance history.
    * ``full`` (enterprise): + merged attestation history, drift events, GCP
      Vertex AI results, and Azure DevOps attestation results.

    Returns a dict suitable for direct JSON serialisation.  Callers are
    responsible for checking that *tenant_id* is in ``_tenants`` before
    invoking this helper.
    """
    scope: str = str(_get_plan_limits().get("export_scope", "summary"))

    # ── Always-included sections (all plans) ─────────────────────────────────
    bundle: dict[str, Any] = {
        "tenant_id": tenant_id,
        "tenant": _tenants.get(tenant_id, {}),
        "compliance": _db_read_tenant_compliance_score(tenant_id),
        "conformance": _db_read_tenant_conformance(tenant_id),
        "attestation_score": _db_read_attestation_score(tenant_id),
        **_enforcement_signal(),
    }

    # ── Professional+ sections ────────────────────────────────────────────────
    if scope in ("compliance", "full"):
        # VEX alerts reuse the W75 plan-age filter via the same helper
        limits = _get_plan_limits()
        max_age = limits.get("vex_max_age_days")
        raw_alerts = _db_read_vex_alerts(tenant_id)
        if max_age is not None:
            cutoff = datetime.date.today() - datetime.timedelta(days=int(max_age))

            def _alert_in_window(a: dict[str, Any]) -> bool:
                try:
                    return datetime.datetime.fromisoformat(a["created_at"]).date() >= cutoff
                except (KeyError, ValueError):
                    return True

            filtered_alerts = [a for a in raw_alerts if _alert_in_window(a)]
        else:
            filtered_alerts = raw_alerts

        bundle["policy_stats"] = _db_read_tenant_policy_stats(tenant_id)
        bundle["inventory"] = _db_read_inventory(tenant_id)
        bundle["compliance_history"] = _db_read_tenant_compliance_history(tenant_id)
        bundle["vex_alerts"] = filtered_alerts

    # ── Enterprise-only sections ──────────────────────────────────────────────
    if scope == "full":
        bundle["attestations"] = _db_read_attestations(tenant_id)
        bundle["drift_events"] = _db_read_drift_events(tenant_id)
        bundle["vertex_results"] = _db_read_vertex_results(tenant_id)
        bundle["ado_results"] = _db_read_ado_results(tenant_id)

    return bundle


@app.get("/cloud/tenants/{tenant_id}/export")  # W76
async def cloud_get_tenant_export(tenant_id: str) -> JSONResponse:
    """Return a complete compliance export bundle for *tenant_id*.

    Aggregates all compliance artefacts into a single JSON document suitable
    for regulatory submission, audit evidence packages, and offline storage.

    **Plan gating** (set ``SQUASH_PLAN`` env var):

    * ``community``  — compliance score, conformance, attestation score,
      EU AI Act enforcement signal.
    * ``professional`` — + VEX alerts (90-day window), policy stats,
      inventory, compliance history.
    * ``enterprise``  — full export including attestation history, drift events,
      GCP Vertex AI results, and Azure DevOps pipeline results.

    Responds with ``X-Squash-Plan`` header so API consumers can detect gating.

    Addresses EU AI Act Art. 12 + Art. 18 obligations for maintaining and
    producing technical documentation on request.
    """
    if tenant_id not in _tenants:
        raise HTTPException(status_code=404, detail=f"Tenant not found: {tenant_id}")
    bundle = _db_build_tenant_export(tenant_id)
    payload = {
        "exported_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
        "squash_plan": SQUASH_PLAN,
        "export_scope": str(_get_plan_limits().get("export_scope", "summary")),
        **bundle,
    }
    return JSONResponse(
        content=payload,
        headers={"X-Squash-Plan": SQUASH_PLAN},
    )


@app.get("/cloud/export")  # W76
async def cloud_get_platform_export() -> JSONResponse:
    """Return a complete compliance export bundle for the entire platform.

    Iterates every registered tenant and calls ``_db_build_tenant_export``
    for each, returning a single document keyed by tenant.  Same plan gating
    as the per-tenant endpoint applies to each tenant's section.

    Response shape::

        {
          "exported_at": "...",
          "squash_plan": "...",
          "export_scope": "...",
          "tenant_count": N,
          "tenants": [ { ...per-tenant bundle... }, ... ]
        }

    Always HTTP 200; empty platform returns ``tenant_count=0`` and an empty
    ``tenants`` list.  Addresses EU AI Act Art. 9 + Art. 18 obligations for
    platform-level technical documentation.
    """
    tenant_exports = [_db_build_tenant_export(tid) for tid in list(_tenants.keys())]
    return JSONResponse(
        content={
            "exported_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
            "squash_plan": SQUASH_PLAN,
            "export_scope": str(_get_plan_limits().get("export_scope", "summary")),
            "tenant_count": len(tenant_exports),
            "tenants": tenant_exports,
        },
        headers={"X-Squash-Plan": SQUASH_PLAN},
    )


# ── W80: Per-Tenant Risk Profile ──────────────────────────────────────────────

# EU AI Act risk tiers (Article 6/9 hierarchy): UNACCEPTABLE > HIGH > LIMITED > MINIMAL.
# Derived entirely from stored inventory fields — no disk I/O, no BOM parsing.
_RISK_TIER_UNACCEPTABLE = "UNACCEPTABLE"
_RISK_TIER_HIGH = "HIGH"
_RISK_TIER_LIMITED = "LIMITED"
_RISK_TIER_MINIMAL = "MINIMAL"


def _compute_model_risk_tier(record: dict[str, Any], open_vex: int) -> str:
    """Derive EU AI Act risk tier for a single inventory record.

    Decision logic (first match wins):
    - UNACCEPTABLE: >50% policy failures AND at least one critical VEX CVE present.
    - HIGH: attestation_passed=False with any non-zero error_count across policies.
    - LIMITED: attestation_passed=True but open VEX alerts exist.
    - MINIMAL: attestation_passed=True and zero open VEX alerts.

    Parameters
    ----------
    record:
        A single inventory record dict as stored by ``_db_write_inventory``.
    open_vex:
        Number of open VEX alerts for the tenant (from ``_db_read_vex_alerts``).

    Returns
    -------
    str
        One of: ``"UNACCEPTABLE"``, ``"HIGH"``, ``"LIMITED"``, ``"MINIMAL"``.
    """
    policy_results: dict = record.get("policy_results", {})
    total_policies = len(policy_results)
    failed_count = sum(
        1 for pr in policy_results.values() if not pr.get("passed", True)
    )
    error_count = sum(pr.get("error_count", 0) for pr in policy_results.values())
    attestation_passed: bool = bool(record.get("attestation_passed", False))

    # UNACCEPTABLE: majority policy failures + active VEX exposure
    if total_policies > 0 and (failed_count / total_policies) > 0.5 and open_vex > 0:
        return _RISK_TIER_UNACCEPTABLE

    # HIGH: attestation failed with errors
    if not attestation_passed and error_count > 0:
        return _RISK_TIER_HIGH

    # LIMITED: attested but VEX exposure
    if attestation_passed and open_vex > 0:
        return _RISK_TIER_LIMITED

    # MINIMAL: attested and clean
    return _RISK_TIER_MINIMAL


@app.get("/cloud/tenants/{tenant_id}/risk-profile")  # W80
async def cloud_get_tenant_risk_profile(tenant_id: str) -> JSONResponse:
    """Return the EU AI Act risk profile for all inventory models belonging to *tenant_id*.

    For each inventory record the risk tier is derived from stored attestation
    and VEX data — no disk access or BOM parsing is required.  The overall
    tenant risk tier is the worst tier across all models.

    Response shape::

        {
          "tenant_id": "...",
          "overall_risk_tier": "HIGH",
          "model_count": N,
          "models": [
            {
              "model_id": "...",
              "risk_tier": "HIGH",
              "attestation_passed": false,
              "open_vex_alerts": 2,
              "policy_failure_rate": 0.5
            },
            ...
          ],
          "enforcement_deadline": "2026-08-02",
          "days_until_enforcement": N,
          "enforcement_risk_level": "MODERATE"
        }

    Returns HTTP 404 for unknown tenants.
    """
    if tenant_id not in _tenants:
        return JSONResponse(
            content={"detail": f"tenant not found: {tenant_id}"},
            status_code=404,
        )

    inventory = _db_read_inventory(tenant_id)
    vex = _db_read_vex_alerts(tenant_id)
    open_vex = len(vex)

    tier_order = [
        _RISK_TIER_UNACCEPTABLE,
        _RISK_TIER_HIGH,
        _RISK_TIER_LIMITED,
        _RISK_TIER_MINIMAL,
    ]

    model_profiles: list[dict[str, Any]] = []
    for rec in inventory:
        tier = _compute_model_risk_tier(rec, open_vex)
        policy_results = rec.get("policy_results", {})
        total = len(policy_results)
        failed = sum(1 for pr in policy_results.values() if not pr.get("passed", True))
        failure_rate = round(failed / total, 4) if total > 0 else 0.0
        model_profiles.append(
            {
                "model_id": rec.get("model_id", ""),
                "risk_tier": tier,
                "attestation_passed": bool(rec.get("attestation_passed", False)),
                "open_vex_alerts": open_vex,
                "policy_failure_rate": failure_rate,
            }
        )

    # Overall = worst tier observed (or MINIMAL when inventory is empty)
    if model_profiles:
        overall = min(
            (p["risk_tier"] for p in model_profiles),
            key=lambda t: tier_order.index(t) if t in tier_order else len(tier_order),
        )
    else:
        overall = _RISK_TIER_MINIMAL

    return JSONResponse(
        content={
            "tenant_id": tenant_id,
            "overall_risk_tier": overall,
            "model_count": len(model_profiles),
            "models": model_profiles,
            **_enforcement_signal(),
        }
    )


@app.get("/cloud/risk-overview")  # W80
async def cloud_get_risk_overview() -> JSONResponse:
    """Return a platform-wide risk summary across all registered tenants.

    Response shape::

        {
          "total_tenants": N,
          "risk_summary": {
            "UNACCEPTABLE": 0,
            "HIGH": 1,
            "LIMITED": 2,
            "MINIMAL": 5
          },
          "tenants": [
            {"tenant_id": "...", "overall_risk_tier": "HIGH", "model_count": 3},
            ...
          ],
          "enforcement_deadline": "2026-08-02",
          "days_until_enforcement": N,
          "enforcement_risk_level": "MODERATE"
        }

    Always HTTP 200; empty platform returns all-zero risk_summary.
    """
    summary: dict[str, int] = {
        _RISK_TIER_UNACCEPTABLE: 0,
        _RISK_TIER_HIGH: 0,
        _RISK_TIER_LIMITED: 0,
        _RISK_TIER_MINIMAL: 0,
    }
    tenant_rows: list[dict[str, Any]] = []

    for tid in list(_tenants.keys()):
        inventory = _db_read_inventory(tid)
        vex = _db_read_vex_alerts(tid)
        open_vex = len(vex)

        tier_order = [
            _RISK_TIER_UNACCEPTABLE,
            _RISK_TIER_HIGH,
            _RISK_TIER_LIMITED,
            _RISK_TIER_MINIMAL,
        ]
        if inventory:
            tiers = [_compute_model_risk_tier(rec, open_vex) for rec in inventory]
            overall = min(tiers, key=lambda t: tier_order.index(t) if t in tier_order else len(tier_order))
        else:
            overall = _RISK_TIER_MINIMAL

        summary[overall] += 1
        tenant_rows.append(
            {
                "tenant_id": tid,
                "overall_risk_tier": overall,
                "model_count": len(inventory),
            }
        )

    return JSONResponse(
        content={
            "total_tenants": len(tenant_rows),
            "risk_summary": summary,
            "tenants": tenant_rows,
            **_enforcement_signal(),
        }
    )


@app.get("/cloud/tenants/{tenant_id}/remediation-plan")  # W81
async def cloud_get_remediation_plan(tenant_id: str) -> JSONResponse:
    """Return a prioritised remediation plan for all models under *tenant_id*.

    Response shape::

        {
          "tenant_id": "acme",
          "risk_tier": "HIGH",
          "total_steps": 2,
          "critical_count": 1,
          "steps": [
            {
              "id": "obtain_attestation",
              "priority": 1,
              "action": "Obtain attestation",
              "description": "...",
              "evidence_required": "...",
              "estimated_effort": "1d"
            },
            ...
          ],
          "enforcement_deadline": "2026-08-02",
          "days_until_enforcement": N,
          "enforcement_risk_level": "MODERATE"
        }

    Returns HTTP 404 when *tenant_id* is not registered.
    Returns HTTP 200 with ``total_steps == 0`` when the tenant is fully compliant.
    """
    from squash.risk import generate_remediation_plan  # lazy — avoids circular at module load

    if tenant_id not in _tenants:
        raise HTTPException(status_code=404, detail=f"Tenant '{tenant_id}' not found")

    inventory = _db_read_inventory(tenant_id)
    vex = _db_read_vex_alerts(tenant_id)
    open_vex = len(vex)

    tier_order = [
        _RISK_TIER_UNACCEPTABLE,
        _RISK_TIER_HIGH,
        _RISK_TIER_LIMITED,
        _RISK_TIER_MINIMAL,
    ]

    # Compute tenant overall risk tier (same logic as risk-profile)
    if inventory:
        tiers = [_compute_model_risk_tier(rec, open_vex) for rec in inventory]
        overall = min(
            tiers,
            key=lambda t: tier_order.index(t) if t in tier_order else len(tier_order),
        )
    else:
        overall = _RISK_TIER_MINIMAL

    # Aggregate policy_results and attestation state across all models
    all_policy_results: dict[str, Any] = {}
    all_attested = True
    for rec in inventory:
        if not rec.get("attestation_passed", True):
            all_attested = False
        for pname, presult in rec.get("policy_results", {}).items():
            if pname not in all_policy_results:
                all_policy_results[pname] = presult
            elif not presult.get("passed", True):
                # Worst case wins
                all_policy_results[pname] = presult

    steps = generate_remediation_plan(
        risk_tier=overall,
        policy_results=all_policy_results,
        open_vex=open_vex,
        attestation_passed=all_attested,
    )

    critical_count = sum(1 for s in steps if s.priority == 1)

    return JSONResponse(
        content={
            "tenant_id": tenant_id,
            "risk_tier": overall,
            "total_steps": len(steps),
            "critical_count": critical_count,
            "steps": [
                {
                    "id": s.id,
                    "priority": s.priority,
                    "action": s.action,
                    "description": s.description,
                    "evidence_required": s.evidence_required,
                    "estimated_effort": s.estimated_effort,
                }
                for s in steps
            ],
            **_enforcement_signal(),
        }
    )


# ── B5 (Track B) — gateway runtime gate verifier endpoint ────────────────────

@app.post("/v1/gateway/verify")
async def gateway_verify(request: Request) -> JSONResponse:
    """Inspect a presented squash attestation token and return an allow/deny.

    Used by the Kong plugin (handler.lua POSTs here) and the AWS Lambda
    authorizer (handler.py urllib POSTs here). Body shape:
        {"token": "att://...", "min_score": 0.8,
         "max_age_days": 30, "required_frameworks": ["eu-ai-act"]}
    """
    from squash.attestation_registry import AttestationRegistry
    from squash.integrations.gateway import evaluate_token

    try:
        body = await request.json()
    except Exception:  # noqa: BLE001 — malformed JSON → 400
        return JSONResponse(status_code=400, content={
            "allow": False, "reason": "BAD_REQUEST",
            "http_status": 400, "detail": "Body must be JSON",
        })

    token               = body.get("token")
    min_score           = float(body.get("min_score", 0.8))
    max_age_days        = body.get("max_age_days")
    required_frameworks = body.get("required_frameworks") or None
    if max_age_days is not None:
        try:
            max_age_days = int(max_age_days)
        except (TypeError, ValueError):
            max_age_days = None

    with AttestationRegistry() as registry:
        decision = evaluate_token(
            token=token,
            registry=registry,
            min_score=min_score,
            max_age_days=max_age_days,
            required_frameworks=required_frameworks,
        )
    return JSONResponse(content=decision.to_dict())


# ── D3 (Track D) — Procurement Scoring API (Sprint 28 W246–W248) ─────────────
#
# The credit-score API for AI compliance. Freemium: basic score is public;
# breakdown requires Pro auth; history requires Enterprise auth.
#
# Rate limiting: 60 req/min per IP for unauthenticated calls (same window
# as the existing rate-limiter bucket keyed on "anon:{ip}").

def _procurement_scorer() -> "ProcurementScorer":  # noqa: F821 — lazy import
    from squash.procurement_scoring import ProcurementScorer
    return ProcurementScorer(base_url=os.environ.get("SQUASH_PUBLIC_URL", "https://squash.works"))


def _caller_plan(request: Request) -> str:
    """Return 'enterprise' | 'pro' | 'startup' | 'team' | 'free'.

    Unauthenticated callers get 'free'. Invalid tokens also get 'free'
    (no error — the endpoint is public).
    """
    auth = request.headers.get("Authorization", "")
    if not auth:
        return "free"
    try:
        token = extract_bearer(auth)
        ks = get_key_store()
        record = ks.lookup(token)
        return record.plan if record else "free"
    except Exception:  # noqa: BLE001
        return "free"


@app.get("/v1/score/{vendor}")
async def procurement_score(vendor: str, request: Request) -> JSONResponse:
    """Return the AI compliance score for *vendor*.

    ## Tier gating

    | Field         | Free | Pro | Team | Enterprise |
    |---------------|------|-----|------|------------|
    | score + tier  | ✓    | ✓   | ✓    | ✓          |
    | breakdown     | —    | ✓   | ✓    | ✓          |
    | history       | —    | —   | —    | ✓          |

    ## Rate limits
    Unauthenticated: 60 req / min / IP.
    Authenticated: governed by plan rate_per_min.
    """
    from squash.procurement_scoring import ProcurementScorer
    if not vendor or len(vendor) > 128:
        return JSONResponse(status_code=400, content={"error": "Invalid vendor name"})

    plan = _caller_plan(request)
    include_breakdown = plan in ("pro", "startup", "team", "enterprise")
    include_history   = plan == "enterprise"

    scorer = _procurement_scorer()
    vs = scorer.score_vendor(vendor)

    if include_history:
        vs.history = scorer.score_history(vendor, months=12)

    return JSONResponse(content=vs.to_dict(
        include_breakdown=include_breakdown,
        include_history=include_history,
    ))


@app.get("/v1/score/{vendor}/history")
async def procurement_score_history(vendor: str, request: Request) -> JSONResponse:
    """Return 12-month score time-series for *vendor*.

    Requires authentication. Enterprise plan required for full history;
    Pro/Team plans receive a 3-month window.
    """
    from squash.procurement_scoring import ProcurementScorer
    if not vendor or len(vendor) > 128:
        return JSONResponse(status_code=400, content={"error": "Invalid vendor name"})

    plan = _caller_plan(request)
    if plan == "free":
        return JSONResponse(
            status_code=402,
            content={"error": "Score history requires Pro or higher plan",
                     "upgrade_url": "https://squash.works/pricing"},
        )

    months = 12 if plan == "enterprise" else 3
    scorer = _procurement_scorer()
    history = scorer.score_history(vendor, months=months)

    return JSONResponse(content={
        "vendor":   vendor,
        "months":   months,
        "history":  history,
        "plan":     plan,
    })


@app.get("/v1/score/{vendor}/badge")
async def vendor_badge(vendor: str) -> Response:
    """Return an embeddable shields.io-style SVG badge for *vendor*.

    Public, unauthenticated. Safe to embed directly in a README or
    procurement portal. The badge renders the vendor's current tier
    and score in Squash brand colours.

    Example:
        <img src="https://squash.works/v1/score/acme-corp/badge" />
    """
    from squash.procurement_scoring import ProcurementScorer
    if not vendor or len(vendor) > 128:
        return Response(content="invalid", status_code=400)

    scorer = ProcurementScorer(
        base_url=os.environ.get("SQUASH_PUBLIC_URL", "https://squash.works")
    )
    vs = scorer.score_vendor(vendor)
    svg = scorer.badge_svg(vendor, vs.score, vs.tier)
    return Response(
        content=svg,
        media_type="image/svg+xml",
        headers={
            "Cache-Control": "max-age=300, public",
            "X-Squash-Score": str(vs.score),
            "X-Squash-Tier":  vs.tier,
        },
    )
