"""squish/squash/attest.py — Unified attestation orchestrator.

:class:`AttestPipeline` is the single entry point that CI/CD integrations,
the REST API, the CLI, and platform SDKs all call.  It orchestrates:

1. Security scan (pickle, GGUF, optionally ProtectAI ModelScan)
2. CycloneDX 1.7 ML-BOM generation (dual SHA-256 + SHA-512 hashes)
3. SPDX 2.3 + AI Profile generation
4. Policy evaluation (one or more named policies)
5. Sigstore keyless signing (optional)
6. Training data provenance binding (optional)
7. VEX evaluation against a feed (optional)

Output directory structure written alongside the model::

    ./model-dir/
        cyclonedx-mlbom.json        — CycloneDX 1.7
        spdx-mlbom.json             — SPDX 2.3 JSON
        spdx-mlbom.spdx             — SPDX 2.3 tag-value
        squash-scan.json            — Security scan results
        squash-policy-<name>.json   — Per-policy evaluation result
        squash-attest.json          — Master attestation record
        squash-vex-report.json      — VEX evaluation result (if fed)
        cyclonedx-mlbom.json.sig.json — Sigstore bundle (if signed)

Usage::

    result = AttestPipeline.run(AttestConfig(
        model_path=Path("./llama-3.1-8b-q4.gguf"),
        output_dir=Path("./attestation"),
        policies=["eu-ai-act", "enterprise-strict"],
        sign=True,
        fail_on_violation=True,
    ))
    if not result.passed:
        sys.exit(1)
"""

from __future__ import annotations

import datetime
import json
import logging
import platform
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from squash.canon import canonical_bytes
from squash.clock import Clock
from squash.oms_signer import OmsSigner, _is_offline
from squash.policy import AVAILABLE_POLICIES, PolicyEngine, PolicyResult
from squash.sbom_builder import CompressRunMeta, CycloneDXBuilder
from squash.scanner import ModelScanner, ScanResult
from squash.spdx_builder import SpdxBuilder, SpdxOptions

log = logging.getLogger(__name__)


@dataclass
class AttestConfig:
    """Configuration for a single attestation run.

    Parameters
    ----------
    model_path:
        Path to the model directory or single model file (GGUF, safetensors, …).
        When a directory is passed, all weight files within it are hashed and
        scanned.
    output_dir:
        Directory where all attestation artifacts are written.  Created if it
        does not exist.  Defaults to *model_path* when *model_path* is a dir,
        or ``model_path.parent`` when it is a file.
    model_id:
        Human-readable model identifier, e.g. ``"llama-3.1-8b"``.  Defaults to
        the directory/file stem.
    hf_repo:
        HuggingFace repository ID for provenance, e.g.
        ``"meta-llama/Llama-3.1-8B-Instruct"``.  Use ``""`` when not from HF.
    model_family:
        Architecture family string (``"llama"``, ``"qwen2"``, …).  ``None`` →
        left as ``"unknown"``.
    quant_format:
        Quantization label, e.g. ``"INT4"`` or ``"BF16"``.
    policies:
        List of policy names to evaluate.  Accepts any key from
        :data:`~squish.squash.policy.AVAILABLE_POLICIES`.
        Empty list → skip policy evaluation.
    sign:
        Whether to sign the CycloneDX BOM via Sigstore keyless signing.
    fail_on_violation:
        When ``True``, :meth:`AttestPipeline.run` raises
        :class:`AttestationViolationError` if any policy evaluation has
        error-severity failures or the security scan is ``unsafe``.
    skip_scan:
        Skip the security scanner (not recommended for production).
    spdx_options:
        Optional SPDX AI Profile enrichment.
    vex_feed_path:
        Optional path to a local VEX feed directory.
    vex_feed_url:
        Optional HTTPS URL to a remote VEX feed.
    training_dataset_ids:
        Optional list of HuggingFace dataset IDs to embed as training data
        provenance.
    awq_alpha:
        AWQ smooth-quant alpha (for SBOM metadata completeness).
    awq_group_size:
        AWQ group size.
    """

    model_path: Path
    output_dir: Path | None = None
    model_id: str = ""
    hf_repo: str = ""
    model_family: str | None = None
    quant_format: str = "unknown"
    policies: list[str] = field(default_factory=lambda: ["enterprise-strict"])
    sign: bool = False
    offline: bool = False
    """Air-gapped mode — skip all OIDC/network calls (also set by SQUASH_OFFLINE=1)."""
    local_signing_key: Path | None = None
    """Ed25519 .priv.pem path for offline signing (requires ``sign=True, offline=True``)."""
    fail_on_violation: bool = True
    skip_scan: bool = False
    spdx_options: SpdxOptions | None = None
    vex_feed_path: Path | None = None
    vex_feed_url: str = ""
    training_dataset_ids: list[str] = field(default_factory=list)
    awq_alpha: float | None = None
    awq_group_size: int | None = None
    # Phase G.3 — Cryptographic chain
    emit_input_manifest: bool = True
    """When True, write ``input_manifest.json`` with SHA-256 of every
    ingested file as the first action of the pipeline."""
    timestamp_with_tsa: bool = False
    """When True, send the canonical attestation digest to an RFC 3161
    TSA after signing and embed the response in ``tsa_token.json``.
    Endpoint is read from ``SQUASH_TSA_URL`` (default DigiCert)."""
    tsa_url: str | None = None
    """Override the TSA endpoint. When None, reads ``SQUASH_TSA_URL`` /
    falls back to DigiCert."""


@dataclass
class AttestResult:
    """Result of a complete attestation pipeline run."""

    model_id: str
    output_dir: Path
    passed: bool  # True iff scan safe AND all error-policies passed
    scan_result: ScanResult | None = None
    policy_results: dict[str, PolicyResult] = field(default_factory=dict)
    cyclonedx_path: Path | None = None
    spdx_json_path: Path | None = None
    spdx_tv_path: Path | None = None
    signature_path: Path | None = None
    master_record_path: Path | None = None
    vex_report_path: Path | None = None
    input_manifest_path: Path | None = None
    """Phase G.3: path to the input_manifest.json. Always written if
    ``AttestConfig.emit_input_manifest`` is True (the default)."""
    tsa_token_path: Path | None = None
    """Phase G.3: path to the RFC 3161 TSA timestamp envelope. None when
    timestamping was disabled or skipped (offline / network failure)."""
    error: str = ""

    def summary(self) -> str:
        status = "PASS" if self.passed else "FAIL"
        policy_summary = "; ".join(r.summary() for r in self.policy_results.values())
        return f"[{status}] {self.model_id}: {policy_summary}"


class AttestationViolationError(Exception):
    """Raised when :attr:`AttestConfig.fail_on_violation` is True and a check fails."""


class AttestPipeline:
    """Orchestrate a full attestation run for a model artifact.

    All work is done in :meth:`run`.  The class itself is stateless — every
    call creates a fresh pipeline.
    """

    @staticmethod
    def run(config: AttestConfig) -> AttestResult:
        """Execute the attestation pipeline.

        Parameters
        ----------
        config:
            :class:`AttestConfig` describing what to attest and how.

        Returns
        -------
        AttestResult
            Aggregate result with paths to all written artifacts.

        Raises
        ------
        AttestationViolationError
            If ``fail_on_violation=True`` and any error-severity rule fails
            or the security scan status is ``"unsafe"``.
        """
        model_path = config.model_path.resolve()
        if not model_path.exists():
            raise FileNotFoundError(f"Model path does not exist: {model_path}")

        # Resolve output directory
        if config.output_dir is not None:
            out_dir = config.output_dir.resolve()
        elif model_path.is_dir():
            out_dir = model_path
        else:
            out_dir = model_path.parent
        out_dir.mkdir(parents=True, exist_ok=True)

        # Resolve model_id
        model_id = config.model_id or (model_path.name if model_path.is_dir() else model_path.stem)

        # Determine weight directory for hashing
        weight_dir = model_path if model_path.is_dir() else model_path.parent

        result = AttestResult(
            model_id=model_id,
            output_dir=out_dir,
            passed=True,
        )

        # ── Step 0: Phase G.3 — Input manifest (FIRST action) ────────────
        # Hash every ingested file before any analysis runs. Every later
        # finding cites a digest in this manifest, and squash self-verify
        # walks the chain backward from the cert to the manifest to the
        # on-disk bytes.
        if config.emit_input_manifest:
            try:
                from squash.input_manifest import build_input_manifest

                manifest = build_input_manifest(weight_dir)
                manifest_path = manifest.write(out_dir / "input_manifest.json")
                result.input_manifest_path = manifest_path
                log.info(
                    "Input manifest: %d file(s), root=%s, sha256=%s",
                    manifest.file_count,
                    manifest.root_path_basename,
                    manifest.manifest_sha256[:12],
                )
            except Exception as exc:
                log.warning("Input manifest emission failed (non-fatal): %s", exc)

        # ── Step 1: Security scan ──────────────────────────────────────────
        scan_result: ScanResult | None = None
        if not config.skip_scan:
            log.info("Running security scan on %s …", weight_dir)
            scan_result = ModelScanner.scan_directory(weight_dir)
            result.scan_result = scan_result

            scan_out = out_dir / "squash-scan.json"
            _write_json(scan_out, _scan_to_dict(scan_result))
            log.info("  Scan result: %s", scan_result.summary())

            if scan_result.status == "unsafe":
                result.passed = False
                if config.fail_on_violation:
                    raise AttestationViolationError(
                        f"Security scan UNSAFE for {model_id}: "
                        f"{scan_result.critical_count} critical finding(s). "
                        f"See {scan_out}"
                    )

        # ── Step 2: CycloneDX BOM ─────────────────────────────────────────
        log.info("Building CycloneDX 1.7 ML-BOM …")
        meta = CompressRunMeta(
            model_id=model_id,
            hf_mlx_repo=config.hf_repo or f"unknown/{model_id}",
            model_family=config.model_family,
            quant_format=config.quant_format,
            awq_alpha=config.awq_alpha,
            awq_group_size=config.awq_group_size,
            output_dir=out_dir,
        )
        try:
            cdx_path = CycloneDXBuilder.from_compress_run(meta)
            result.cyclonedx_path = cdx_path
        except OSError as e:
            result.error = str(e)
            result.passed = False
            if config.fail_on_violation:
                raise
            log.error("CycloneDX BOM write failed: %s", e)

        # Annotate BOM with scan result
        if scan_result is not None and cdx_path is not None:
            _annotate_bom_with_scan(cdx_path, scan_result)

        # ── Step 3: SPDX output ───────────────────────────────────────────
        log.info("Building SPDX 2.3 + AI Profile …")
        try:
            spdx_opts = config.spdx_options or SpdxOptions()
            if config.training_dataset_ids:
                spdx_opts.dataset_ids = list(config.training_dataset_ids)
            spdx_artifacts = SpdxBuilder.from_compress_run(meta, spdx_opts)
            result.spdx_json_path = spdx_artifacts.json_path
            result.spdx_tv_path = spdx_artifacts.tagvalue_path
        except OSError as e:
            log.warning("SPDX write failed (non-fatal): %s", e)

        # ── Step 4: Training data provenance ──────────────────────────────
        if config.training_dataset_ids and result.cyclonedx_path:
            log.info(
                "Resolving training data provenance for %d dataset(s) …",
                len(config.training_dataset_ids),
            )
            _bind_training_provenance(result.cyclonedx_path, config.training_dataset_ids)

        # ── Step 5: Policy evaluation ──────────────────────────────────────
        if config.policies and result.cyclonedx_path:
            log.info("Evaluating policies: %s …", config.policies)
            sbom_dict = json.loads(result.cyclonedx_path.read_text())

            # Inject scan result as a top-level squash: key for policy checks
            if scan_result is not None:
                sbom_dict["squash:scan_result"] = scan_result.status

            for policy_name in config.policies:
                if policy_name not in AVAILABLE_POLICIES:
                    log.warning("Unknown policy '%s' — skipping", policy_name)
                    continue
                pr = PolicyEngine.evaluate(sbom_dict, policy_name)
                result.policy_results[policy_name] = pr
                log.info("  %s", pr.summary())

                policy_out = out_dir / f"squash-policy-{policy_name}.json"
                _write_json(policy_out, _policy_result_to_dict(pr))

                if not pr.passed:
                    result.passed = False
                    if config.fail_on_violation:
                        raise AttestationViolationError(
                            f"Policy {policy_name!r} FAILED for {model_id}: "
                            f"{pr.error_count} error(s). See {policy_out}"
                        )

        # ── Step 6: VEX evaluation ────────────────────────────────────────
        if (config.vex_feed_path or config.vex_feed_url) and result.cyclonedx_path:
            log.info("Evaluating VEX feed …")
            try:
                from squash.vex import (
                    ModelInventory,
                    ModelInventoryEntry,
                    VexEvaluator,
                    VexFeed,
                )

                feed = (
                    VexFeed.from_directory(config.vex_feed_path)
                    if config.vex_feed_path
                    else VexFeed.from_url(config.vex_feed_url)
                )
                bom = json.loads(result.cyclonedx_path.read_text())
                purl = bom.get("components", [{}])[0].get("purl", "")
                hashes = bom.get("components", [{}])[0].get("hashes", [])
                sha256 = next((h["content"] for h in hashes if h.get("alg") == "SHA-256"), "")
                inv = ModelInventory(
                    entries=[
                        ModelInventoryEntry(
                            model_id=model_id,
                            purl=purl,
                            sbom_path=result.cyclonedx_path,
                            composite_sha256=sha256,
                        )
                    ]
                )
                vex_report = VexEvaluator.evaluate(feed, inv)
                vex_out = out_dir / "squash-vex-report.json"
                _write_json(vex_out, vex_report.to_dict())
                result.vex_report_path = vex_out
                log.info("  VEX: %s", vex_report.summary())

                if not vex_report.is_clean:
                    result.passed = False
                    if config.fail_on_violation:
                        raise AttestationViolationError(
                            f"VEX evaluation found {len(vex_report.affected_models)} "
                            f"affected model(s) for CVEs. See {vex_out}"
                        )
            except ImportError:
                log.warning("VEX engine unavailable — skipping (import error)")

        # ── Step 7: Signing (Sigstore or local Ed25519 in offline mode) ────
        if config.sign and result.cyclonedx_path:
            _offline = config.offline or _is_offline()
            if _offline and config.local_signing_key:
                log.info("Signing CycloneDX BOM with local Ed25519 key (offline mode) …")
                try:
                    sig_path = OmsSigner.sign_local(result.cyclonedx_path, config.local_signing_key)
                    result.signature_path = sig_path
                    log.info("  Signed (offline) → %s", sig_path)
                except Exception as exc:
                    log.warning("  Local signing failed (non-fatal): %s", exc)
            elif _offline:
                log.warning(
                    "  Signing skipped — offline mode active and no local_signing_key provided"
                )
            else:
                log.info("Signing CycloneDX BOM via Sigstore …")
                sig_path = OmsSigner.sign(result.cyclonedx_path)
                result.signature_path = sig_path
                if sig_path:
                    log.info("  Signed → %s", sig_path)
                else:
                    log.warning("  Signing skipped (sigstore not installed or no OIDC)")

        # ── Step 8: Master attestation record ─────────────────────────────
        master = _build_master_record(config, result)
        # Phase G.3: embed input_manifest_sha256 in the master record so
        # the cert is content-addressed back to the ingested file set.
        if result.input_manifest_path is not None:
            try:
                manifest_dict = json.loads(result.input_manifest_path.read_text())
                master["input_manifest_sha256"] = manifest_dict.get("manifest_sha256", "")
            except Exception:
                pass
        master_out = out_dir / "squash-attest.json"
        _write_json(master_out, master)
        result.master_record_path = master_out

        # Phase G.3: optional RFC 3161 trusted timestamping over the
        # canonical bytes of the master record. Network failures are
        # logged at WARN and never fail the cert (use --strict-tsa to
        # opt into the strict mode in the CLI).
        if config.timestamp_with_tsa:
            try:
                from squash.canon import canonical_bytes as _cb
                from squash.tsa import maybe_timestamp

                body_bytes = _cb(master)
                tsa_result = maybe_timestamp(body_bytes, url=config.tsa_url)
                if tsa_result is not None:
                    tsa_out = out_dir / "tsa_token.json"
                    tsa_payload = {
                        "schema": "squash.tsa-token/v1",
                        "tsa_url": tsa_result.tsa_url,
                        "request_b64": tsa_result.request_b64,
                        "response_b64": tsa_result.response_b64,
                        "nonce": tsa_result.nonce,
                        "subject_sha256": __import__("hashlib").sha256(body_bytes).hexdigest(),
                    }
                    _write_canonical(tsa_out, tsa_payload)
                    result.tsa_token_path = tsa_out
                    log.info("RFC 3161 TSA timestamp embedded → %s", tsa_out)
                else:
                    log.warning("RFC 3161 TSA timestamp skipped (network or offline)")
            except Exception as exc:
                log.warning("RFC 3161 TSA timestamp failed (non-fatal): %s", exc)

        log.info(
            "Attestation complete for %s → %s [%s]",
            model_id,
            out_dir,
            "PASS" if result.passed else "FAIL",
        )
        return result


# ──────────────────────────────────────────────────────────────────────────────
# Private helpers
# ──────────────────────────────────────────────────────────────────────────────


def _write_json(path: Path, obj: Any) -> None:
    """Atomic write of *obj* to *path* as RFC 8785 canonical JSON.

    Phase G.2: replaces ``json.dumps(obj, indent=2, default=str)``. The
    output is byte-stable across hosts/runs so that a re-run with the
    same inputs produces a re-run with the same SHA-256. ``default=str``
    is gone — any unsupported type now raises :class:`squash.canon.CanonError`
    at the boundary, where the conversion rule is reviewable.
    """
    # Normalise via ``canonical_bytes`` first so we fail fast on unknown
    # types; then re-emit with sort_keys+indent for human readability.
    # Both forms are byte-stable on rerun and produce the same SHA-256
    # under :func:`squash.canon.canonical_bytes` (which renormalises).
    parsed = json.loads(canonical_bytes(obj))
    tmp = path.with_suffix(".tmp")
    tmp.write_text(json.dumps(parsed, indent=2, sort_keys=True, ensure_ascii=False))
    tmp.replace(path)


def _write_canonical(path: Path, obj: Any) -> None:
    """Atomic write of *obj* to *path* as bare RFC 8785 bytes.

    Use for files that are directly signed/anchored — the absence of
    indent guarantees the on-disk bytes are exactly what was hashed.
    """
    tmp = path.with_suffix(".tmp")
    tmp.write_bytes(canonical_bytes(obj))
    tmp.replace(path)


def _scan_to_dict(r: ScanResult) -> dict[str, Any]:
    return {
        "scanned_path": r.scanned_path,
        "status": r.status,
        "scanner_version": r.scanner_version,
        "critical": r.critical_count,
        "high": r.high_count,
        "findings": [
            {
                "severity": f.severity,
                "id": f.finding_id,
                "title": f.title,
                "detail": f.detail,
                "file": f.file_path,
                "cve": f.cve,
            }
            for f in r.findings
        ],
    }


def _policy_result_to_dict(r: PolicyResult) -> dict[str, Any]:
    return {
        "policy": r.policy_name,
        "passed": r.passed,
        "error_count": r.error_count,
        "warning_count": r.warning_count,
        "pass_count": r.pass_count,
        "findings": [
            {
                "rule_id": f.rule_id,
                "severity": f.severity,
                "passed": f.passed,
                "field": f.field,
                "rationale": f.rationale,
                "remediation": f.remediation,
            }
            for f in r.findings
        ],
    }


def _annotate_bom_with_scan(bom_path: Path, scan: ScanResult) -> None:
    """Inject scan findings as CycloneDX vulnerabilities into an existing BOM."""
    try:
        bom: dict = json.loads(bom_path.read_text())
        vulns = scan.to_cdx_vulnerabilities()
        if vulns:
            bom["vulnerabilities"] = vulns
        bom["squash:scan_result"] = scan.status
        # Phase G.2: canonical bytes — the BOM is the signed body.
        tmp = bom_path.with_suffix(".tmp")
        tmp.write_bytes(canonical_bytes(bom))
        tmp.replace(bom_path)
    except OSError as e:
        log.warning("Could not annotate BOM with scan result: %s", e)


def _bind_training_provenance(bom_path: Path, dataset_ids: list[str]) -> None:
    """Resolve HF dataset provenance and bind to BOM (best-effort)."""
    try:
        from squash.provenance import ProvenanceCollector

        manifest = ProvenanceCollector.from_hf_datasets(dataset_ids)
        manifest.bind_to_sbom(bom_path)
    except Exception as e:  # broad catch — provenance is enrichment, not gating
        log.warning("Training data provenance binding failed (non-fatal): %s", e)


def _build_master_record(
    config: AttestConfig,
    result: AttestResult,
    clock: Clock | None = None,
) -> dict[str, Any]:
    """Build the squash-attest.json master record.

    Phase G.2: ``clock`` is injected so reproducibility tests can freeze
    time. Production callers leave it ``None`` and get the system clock.
    """
    import squash as squish  # version reference

    # Phase G.2: when no explicit clock is passed, route through the
    # module-level default so `with_clock(FrozenClock(...))` propagates
    # into the signed body. Tests rely on this; see
    # tests/test_reproducibility.py::TestPipelineReproducibility.
    if clock is None:
        from squash.clock import get_default_clock

        clk = get_default_clock()
    else:
        clk = clock
    now = (
        clk()
        .astimezone(datetime.timezone.utc)
        .replace(microsecond=0)
        .strftime("%Y-%m-%dT%H:%M:%SZ")
    )
    policies_summary = {
        name: {
            "passed": pr.passed,
            "errors": pr.error_count,
            "warnings": pr.warning_count,
        }
        for name, pr in result.policy_results.items()
    }
    return {
        "squash_version": squish.__version__,
        "attested_at": now,
        "model_id": result.model_id,
        "model_path": str(config.model_path),
        "output_dir": str(result.output_dir),
        "passed": result.passed,
        "scan_status": result.scan_result.status if result.scan_result else "skipped",
        "policies_evaluated": list(config.policies),
        "policy_results": policies_summary,
        "artifacts": {
            "cyclonedx": str(result.cyclonedx_path) if result.cyclonedx_path else None,
            "spdx_json": str(result.spdx_json_path) if result.spdx_json_path else None,
            "spdx_tv": str(result.spdx_tv_path) if result.spdx_tv_path else None,
            "signature": str(result.signature_path) if result.signature_path else None,
            "vex_report": str(result.vex_report_path) if result.vex_report_path else None,
        },
        "platform": {
            "python": sys.version,
            "os": platform.platform(),
        },
    }


# ─────────────────────────────────────────────────────────────────────────────
# Wave 18 — Composite multi-model attestation
# ─────────────────────────────────────────────────────────────────────────────


@dataclass
class CompositeAttestConfig:
    """Configuration for attesting multiple models in a single pass.

    Each model is attested independently using :class:`AttestPipeline`, then
    the individual CycloneDX BOMs are assembled into a parent BOM whose
    ``dependencies`` list references each component by ``serialNumber``.
    """

    model_paths: list[Path]
    """Paths to each model directory to attest (minimum 2)."""

    output_dir: Path | None = None
    """Destination for the parent BOM and summary record.  Defaults to the
    first model's directory."""

    policies: list[str] = field(default_factory=lambda: ["enterprise-strict"])
    """Policy names to evaluate against each component."""

    sign: bool = False
    """Sign each component BOM with Sigstore after attestation."""


@dataclass
class CompositeAttestResult:
    """Result of a composite multi-model attestation."""

    component_results: list[AttestResult]
    """Individual attestation result for each input model."""

    parent_bom_path: Path | None
    """Path to the written parent CycloneDX BOM, or ``None`` on failure."""

    output_dir: Path
    """Directory where parent artefacts were written."""

    passed: bool
    """``True`` only when *all* component attestations pass."""

    error: str = ""
    """Non-empty when a fatal error prevented the run."""


class CompositeAttestPipeline:
    """Attest N models and assemble a parent CycloneDX composition BOM.

    All component BOMs must already exist (produced by
    :class:`AttestPipeline`).  The parent BOM has ``componentType``
    ``"application"`` and a ``"dependencies"`` clause listing each
    component's ``serialNumber``.

    Example::

        cfg = CompositeAttestConfig(
            model_paths=[Path("model-a"), Path("model-b")],
            policies=["enterprise-strict"],
        )
        result = CompositeAttestPipeline.run(cfg)
        assert result.passed
    """

    @staticmethod
    def run(config: CompositeAttestConfig) -> CompositeAttestResult:
        """Run attestation on every model in *config.model_paths* and compose.

        Returns :class:`CompositeAttestResult` — never raises.
        """
        output_dir = config.output_dir or config.model_paths[0]
        output_dir.mkdir(parents=True, exist_ok=True)

        component_results: list[AttestResult] = []
        for mp in config.model_paths:
            try:
                cfg = AttestConfig(
                    model_path=mp,
                    policies=config.policies,
                    sign=config.sign,
                )
                r = AttestPipeline.run(cfg)
                component_results.append(r)
            except Exception as exc:
                # Build a minimal failed result so we can surface the error
                log.warning("CompositeAttestPipeline: error attesting %s — %s", mp, exc)
                failed = AttestResult(
                    model_id=mp.name,
                    output_dir=mp,
                    passed=False,
                    error=str(exc),
                )
                component_results.append(failed)

        all_passed = all(r.passed for r in component_results)

        # Build parent CycloneDX composition BOM
        try:
            parent_bom = CompositeAttestPipeline._build_parent_bom(
                component_results, config, output_dir
            )
            parent_bom_path = output_dir / "cyclonedx-composed.json"
            tmp = parent_bom_path.with_suffix(".tmp")
            # Phase G.2: canonical bytes for parent BOM (signable surface).
            tmp.write_bytes(canonical_bytes(parent_bom))
            tmp.replace(parent_bom_path)
        except Exception as exc:
            log.warning("CompositeAttestPipeline: parent BOM assembly failed — %s", exc)
            parent_bom_path = None

        return CompositeAttestResult(
            component_results=component_results,
            parent_bom_path=parent_bom_path,
            output_dir=output_dir,
            passed=all_passed,
        )

    @staticmethod
    def _build_parent_bom(
        results: list[AttestResult],
        config: CompositeAttestConfig,
        output_dir: Path,
        clock: Clock | None = None,
    ) -> dict[str, Any]:
        """Return a CycloneDX 1.5 JSON document referencing component BOMs.

        Phase G.2: clock-injected timestamp and deterministic serial numbers
        keyed on component content so the parent BOM is byte-identical on
        rerun given the same inputs.
        """
        from squash._ids import cert_id as _cert_id

        # Phase G.2: clock injection — reproducibility tests freeze this.
        if clock is None:
            from squash.clock import get_default_clock

            clk = get_default_clock()
        else:
            clk = clock
        now = (
            clk()
            .astimezone(datetime.timezone.utc)
            .replace(microsecond=0)
            .strftime("%Y-%m-%dT%H:%M:%SZ")
        )

        # Collect component serial numbers from each result's CycloneDX BOM
        components: list[dict[str, Any]] = []
        dep_refs: list[str] = []

        for r in results:
            cdx_path = r.cyclonedx_path
            if cdx_path and cdx_path.exists():
                try:
                    cdx = json.loads(cdx_path.read_text(encoding="utf-8"))
                    serial = cdx.get("serialNumber") or (
                        "urn:uuid:"
                        + _cert_id(
                            "cdx", canonical_payload=canonical_bytes({"path": str(cdx_path)})
                        )
                    )
                    comp = cdx.get("components", [{}])[0]
                    components.append(comp)
                    dep_refs.append(serial)
                except Exception as _e:
                    log.warning("CompositeAttestPipeline: could not read BOM %s — %s", cdx_path, _e)
                    dep_refs.append(
                        "urn:uuid:"
                        + _cert_id(
                            "cdx",
                            canonical_payload=canonical_bytes(
                                {"path": str(cdx_path), "err": str(_e)}
                            ),
                        )
                    )
            else:
                dep_refs.append(
                    "urn:uuid:"
                    + _cert_id("cdx", canonical_payload=canonical_bytes({"model_id": r.model_id}))
                )

        # Phase G.2: parent serial keyed on all component refs — deterministic.
        parent_serial = "urn:uuid:" + _cert_id(
            "comp", canonical_payload=canonical_bytes({"refs": dep_refs, "output": str(output_dir)})
        )

        return {
            "bomFormat": "CycloneDX",
            "specVersion": "1.5",
            "version": 1,
            "serialNumber": parent_serial,
            "metadata": {
                "timestamp": now,
                "component": {
                    "type": "application",
                    "name": output_dir.name,
                    "version": "composed",
                },
            },
            "components": components,
            "dependencies": [
                {
                    "ref": parent_serial,
                    "dependsOn": dep_refs,
                }
            ],
        }
