#!/usr/bin/env python3 """ tools/refresh-cve-metadata.py — fetch CWE + KEV status for every CVE in the SKELETONKEY corpus from authoritative federal sources. Sources: - CISA Known Exploited Vulnerabilities catalog https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv (authoritative for "is this exploited in the wild?") - NVD CVE API 2.0 https://services.nvd.nist.gov/rest/json/cves/2.0?cveId=... (authoritative for CWE classification) The output is intentionally NOT auto-applied to module sources — drift between an external source and our embedded metadata should surface as a diff a human reviews. The tool produces: docs/CVE_METADATA.json machine-readable per-CVE record docs/KEV_CROSSREF.md human-readable KEV table Modules consume the JSON via copy-paste into their struct skeletonkey_module literal (attack_technique, cwe, in_kev, kev_date_added fields). The provenance comment in core/module.h points contributors back here. No API key required; the script throttles to NVD's anonymous 5-req/30s limit. ~3 minutes total for 26 CVEs. Usage: tools/refresh-cve-metadata.py # refresh + write outputs tools/refresh-cve-metadata.py --check # diff against committed JSON, exit 1 on drift Dependencies: stdlib only. Python 3.8+. """ import argparse import csv import io import json import os import sys import time import urllib.error import urllib.request from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent MODULES_DIR = REPO_ROOT / "modules" OUT_JSON = REPO_ROOT / "docs" / "CVE_METADATA.json" OUT_MD = REPO_ROOT / "docs" / "KEV_CROSSREF.md" OUT_C = REPO_ROOT / "core" / "cve_metadata.c" KEV_URL = "https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv" NVD_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve}" # Per NVD's anonymous rate limit: 5 requests per 30 seconds. NVD_DELAY_SECONDS = 7 # Module → ATT&CK technique mapping. Almost all kernel/userspace LPEs # map to T1068 (Exploitation for Privilege Escalation). The two # exceptions are noted inline. This mapping is hand-curated; the # tool doesn't pull ATT&CK from any feed (MITRE doesn't publish a # clean CVE → technique CSV). ATTACK_MAPPING = { # Default for every CVE not listed: T1068, no subtechnique. "CVE-2022-0492": ("T1611", None), # cgroup_release_agent — container escape "CVE-2023-0458": ("T1082", None), # entrybleed — kernel info leak, not LPE } def discover_cves() -> list[str]: """Find every CVE-NNNN-NNNN id by scanning modules//.""" cves = set() for child in MODULES_DIR.iterdir(): if not child.is_dir(): continue # Module dirs end in _cve_YYYY_NNNNN parts = child.name.split("_cve_") if len(parts) != 2: continue cve_tail = parts[1].replace("_", "-") cves.add(f"CVE-{cve_tail}") return sorted(cves) def fetch_kev_catalog() -> dict[str, str]: """Return {cve_id: date_added_yyyy_mm_dd} from CISA's KEV CSV. Python's urlopen sometimes times out on CISA's HTTP/2 endpoint even though curl works fine; we try urlopen first with a 60s budget, then fall back to shelling out to curl. Either way we end up with the same CSV bytes.""" print(f"[*] fetching CISA KEV catalog ({KEV_URL})", file=sys.stderr) data: str | None = None try: with urllib.request.urlopen(KEV_URL, timeout=60) as r: data = r.read().decode("utf-8", errors="replace") except urllib.error.URLError as e: print(f"[!] urlopen failed ({e}); falling back to curl", file=sys.stderr) if data is None: import subprocess try: data = subprocess.check_output( ["curl", "-fsSL", "--max-time", "60", KEV_URL], stderr=subprocess.DEVNULL, ).decode("utf-8", errors="replace") except (subprocess.CalledProcessError, FileNotFoundError) as e: print(f"[!] curl fallback also failed: {e}", file=sys.stderr) sys.exit(1) out: dict[str, str] = {} reader = csv.DictReader(io.StringIO(data)) for row in reader: cve = row.get("cveID", "").strip() date = row.get("dateAdded", "").strip() if cve: out[cve] = date print(f"[+] KEV catalog has {len(out)} entries", file=sys.stderr) return out def fetch_nvd_cwe(cve: str) -> tuple[str | None, str | None]: """Return (cwe_id, description) from NVD. Returns (None, None) on miss.""" url = NVD_URL.format(cve=cve) req = urllib.request.Request(url, headers={"User-Agent": "skeletonkey-cve-metadata/1"}) try: with urllib.request.urlopen(req, timeout=30) as r: blob = json.loads(r.read().decode("utf-8")) except urllib.error.HTTPError as e: print(f"[!] NVD HTTP {e.code} for {cve}", file=sys.stderr) return None, None except (urllib.error.URLError, json.JSONDecodeError) as e: print(f"[!] NVD parse error for {cve}: {e}", file=sys.stderr) return None, None vulns = blob.get("vulnerabilities") or [] if not vulns: return None, None cve_obj = vulns[0].get("cve", {}) # weaknesses: [{source, type, description: [{lang, value: "CWE-..."}]}] for w in cve_obj.get("weaknesses", []) or []: for d in w.get("description", []) or []: v = d.get("value", "") if v.startswith("CWE-"): return v, None # description not stored; CWE id alone is what we use return None, None def attack_for_cve(cve: str) -> tuple[str, str | None]: return ATTACK_MAPPING.get(cve, ("T1068", None)) def short_module_name(cve: str) -> str: """Find the directory under modules/ that ends with this CVE's tail.""" tail = cve.removeprefix("CVE-").replace("-", "_") for child in MODULES_DIR.iterdir(): if child.is_dir() and child.name.endswith(f"_cve_{tail}"): return child.name return "?" def build_records(cves: list[str], kev: dict[str, str]) -> list[dict]: records = [] for i, cve in enumerate(cves, 1): print(f"[*] [{i:2d}/{len(cves)}] {cve}: NVD lookup", file=sys.stderr) cwe, _ = fetch_nvd_cwe(cve) tech, subtech = attack_for_cve(cve) in_kev = cve in kev rec = { "cve": cve, "module_dir": short_module_name(cve), "cwe": cwe, "attack_technique": tech, "attack_subtechnique": subtech, "in_kev": in_kev, "kev_date_added": kev.get(cve, ""), } records.append(rec) # Throttle NVD requests if i < len(cves): time.sleep(NVD_DELAY_SECONDS) return records def _c_str(s: str | None) -> str: """Render a Python str|None as a C string literal or NULL.""" if s is None: return "NULL" # only safe chars in our domain (CVE-/CWE-/T#### / dates) so no escaping needed return f'"{s}"' def write_c_table(records: list[dict]) -> None: """Generate core/cve_metadata.c from the JSON records.""" lines = [ "/*", " * SKELETONKEY — CVE metadata table", " *", " * AUTO-GENERATED by tools/refresh-cve-metadata.py from", " * docs/CVE_METADATA.json. Do not hand-edit; rerun the script.", " * Sources: CISA KEV catalog + NVD CVE API 2.0.", " */", "", '#include "cve_metadata.h"', "", "#include ", "#include ", "", "const struct cve_metadata cve_metadata_table[] = {", ] for r in records: lines.append(" {") lines.append(f" .cve = {_c_str(r['cve'])},") lines.append(f" .cwe = {_c_str(r['cwe'])},") lines.append(f" .attack_technique = {_c_str(r['attack_technique'])},") lines.append(f" .attack_subtechnique = {_c_str(r['attack_subtechnique'])},") lines.append(f" .in_kev = {'true' if r['in_kev'] else 'false'},") lines.append(f" .kev_date_added = {_c_str(r['kev_date_added'])},") lines.append(" },") lines += [ "};", "", "const size_t cve_metadata_table_len =", " sizeof(cve_metadata_table) / sizeof(cve_metadata_table[0]);", "", "const struct cve_metadata *cve_metadata_lookup(const char *cve)", "{", " if (!cve) return NULL;", " for (size_t i = 0; i < cve_metadata_table_len; i++) {", " if (strcmp(cve_metadata_table[i].cve, cve) == 0)", " return &cve_metadata_table[i];", " }", " return NULL;", "}", "", ] OUT_C.write_text("\n".join(lines)) print(f"[+] wrote {OUT_C.relative_to(REPO_ROOT)}", file=sys.stderr) def write_outputs(records: list[dict]) -> None: OUT_JSON.parent.mkdir(parents=True, exist_ok=True) OUT_JSON.write_text(json.dumps(records, indent=2) + "\n") print(f"[+] wrote {OUT_JSON.relative_to(REPO_ROOT)}", file=sys.stderr) write_c_table(records) # KEV cross-reference table in_kev = [r for r in records if r["in_kev"]] not_in_kev = [r for r in records if not r["in_kev"]] lines = [ "# CISA KEV Cross-Reference", "", "Which SKELETONKEY modules cover CVEs that CISA has observed exploited", "in the wild per the Known Exploited Vulnerabilities catalog.", "Refreshed via `tools/refresh-cve-metadata.py`.", "", f"**{len(in_kev)} of {len(records)} modules cover KEV-listed CVEs.**", "", "## In KEV (prioritize patching)", "", "| CVE | Date added to KEV | CWE | Module |", "| --- | --- | --- | --- |", ] for r in sorted(in_kev, key=lambda r: r["kev_date_added"]): lines.append( f"| {r['cve']} | {r['kev_date_added']} | {r['cwe'] or '?'} | `{r['module_dir']}` |" ) lines += [ "", "## Not in KEV", "", "Not observed exploited per CISA — but several have public PoC code", "and are technically reachable. \"Not in KEV\" is not the same as", "\"safe to ignore\".", "", "| CVE | CWE | Module |", "| --- | --- | --- |", ] for r in sorted(not_in_kev, key=lambda r: r["cve"]): lines.append(f"| {r['cve']} | {r['cwe'] or '?'} | `{r['module_dir']}` |") lines.append("") OUT_MD.write_text("\n".join(lines)) print(f"[+] wrote {OUT_MD.relative_to(REPO_ROOT)}", file=sys.stderr) def check_drift() -> int: """Exit 1 if the committed JSON differs from a fresh fetch.""" if not OUT_JSON.exists(): print(f"[!] no committed {OUT_JSON.name} — run without --check first", file=sys.stderr) return 1 committed = json.loads(OUT_JSON.read_text()) fresh = build_records(discover_cves(), fetch_kev_catalog()) if committed == fresh: print("[+] CVE_METADATA.json is current", file=sys.stderr) return 0 print("[!] CVE_METADATA.json drifted — refresh via " "`tools/refresh-cve-metadata.py`", file=sys.stderr) return 1 def main() -> int: ap = argparse.ArgumentParser(description=__doc__.splitlines()[1]) ap.add_argument("--check", action="store_true", help="diff against committed metadata; exit 1 on drift") args = ap.parse_args() if args.check: return check_drift() cves = discover_cves() print(f"[*] {len(cves)} CVE(s) in corpus", file=sys.stderr) kev = fetch_kev_catalog() records = build_records(cves, kev) write_outputs(records) return 0 if __name__ == "__main__": sys.exit(main())