#!/usr/bin/env python3
"""
tools/refresh-cve-metadata.py — fetch CWE + KEV status for every CVE in the
SKELETONKEY corpus from authoritative federal sources.
Sources:
- CISA Known Exploited Vulnerabilities catalog
https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv
(authoritative for "is this exploited in the wild?")
- NVD CVE API 2.0
https://services.nvd.nist.gov/rest/json/cves/2.0?cveId=...
(authoritative for CWE classification)
The output is intentionally NOT auto-applied to module sources — drift
between an external source and our embedded metadata should surface as
a diff a human reviews. The tool produces:
docs/CVE_METADATA.json machine-readable per-CVE record
docs/KEV_CROSSREF.md human-readable KEV table
Modules consume the JSON via copy-paste into their struct skeletonkey_module
literal (attack_technique, cwe, in_kev, kev_date_added fields). The
provenance comment in core/module.h points contributors back here.
No API key required; the script throttles to NVD's anonymous 5-req/30s
limit. ~3 minutes total for 26 CVEs.
Usage:
tools/refresh-cve-metadata.py # refresh + write outputs
tools/refresh-cve-metadata.py --check # diff against committed JSON, exit 1 on drift
Dependencies: stdlib only. Python 3.8+.
"""
import argparse
import csv
import io
import json
import os
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
MODULES_DIR = REPO_ROOT / "modules"
OUT_JSON = REPO_ROOT / "docs" / "CVE_METADATA.json"
OUT_MD = REPO_ROOT / "docs" / "KEV_CROSSREF.md"
OUT_C = REPO_ROOT / "core" / "cve_metadata.c"
KEV_URL = "https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv"
NVD_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve}"
# Per NVD's anonymous rate limit: 5 requests per 30 seconds.
NVD_DELAY_SECONDS = 7
# Module → ATT&CK technique mapping. Almost all kernel/userspace LPEs
# map to T1068 (Exploitation for Privilege Escalation). The two
# exceptions are noted inline. This mapping is hand-curated; the
# tool doesn't pull ATT&CK from any feed (MITRE doesn't publish a
# clean CVE → technique CSV).
ATTACK_MAPPING = {
# Default for every CVE not listed: T1068, no subtechnique.
"CVE-2022-0492": ("T1611", None), # cgroup_release_agent — container escape
"CVE-2023-0458": ("T1082", None), # entrybleed — kernel info leak, not LPE
}
def discover_cves() -> list[str]:
"""Find every CVE-NNNN-NNNN id by scanning modules/
/."""
cves = set()
for child in MODULES_DIR.iterdir():
if not child.is_dir():
continue
# Module dirs end in _cve_YYYY_NNNNN
parts = child.name.split("_cve_")
if len(parts) != 2:
continue
cve_tail = parts[1].replace("_", "-")
cves.add(f"CVE-{cve_tail}")
return sorted(cves)
def fetch_kev_catalog() -> dict[str, str]:
"""Return {cve_id: date_added_yyyy_mm_dd} from CISA's KEV CSV.
Python's urlopen sometimes times out on CISA's HTTP/2 endpoint
even though curl works fine; we try urlopen first with a 60s
budget, then fall back to shelling out to curl. Either way we
end up with the same CSV bytes."""
print(f"[*] fetching CISA KEV catalog ({KEV_URL})", file=sys.stderr)
data: str | None = None
try:
with urllib.request.urlopen(KEV_URL, timeout=60) as r:
data = r.read().decode("utf-8", errors="replace")
except urllib.error.URLError as e:
print(f"[!] urlopen failed ({e}); falling back to curl", file=sys.stderr)
if data is None:
import subprocess
try:
data = subprocess.check_output(
["curl", "-fsSL", "--max-time", "60", KEV_URL],
stderr=subprocess.DEVNULL,
).decode("utf-8", errors="replace")
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(f"[!] curl fallback also failed: {e}", file=sys.stderr)
sys.exit(1)
out: dict[str, str] = {}
reader = csv.DictReader(io.StringIO(data))
for row in reader:
cve = row.get("cveID", "").strip()
date = row.get("dateAdded", "").strip()
if cve:
out[cve] = date
print(f"[+] KEV catalog has {len(out)} entries", file=sys.stderr)
return out
def fetch_nvd_cwe(cve: str) -> tuple[str | None, str | None]:
"""Return (cwe_id, description) from NVD. Returns (None, None) on miss."""
url = NVD_URL.format(cve=cve)
req = urllib.request.Request(url, headers={"User-Agent": "skeletonkey-cve-metadata/1"})
try:
with urllib.request.urlopen(req, timeout=30) as r:
blob = json.loads(r.read().decode("utf-8"))
except urllib.error.HTTPError as e:
print(f"[!] NVD HTTP {e.code} for {cve}", file=sys.stderr)
return None, None
except (urllib.error.URLError, json.JSONDecodeError) as e:
print(f"[!] NVD parse error for {cve}: {e}", file=sys.stderr)
return None, None
vulns = blob.get("vulnerabilities") or []
if not vulns:
return None, None
cve_obj = vulns[0].get("cve", {})
# weaknesses: [{source, type, description: [{lang, value: "CWE-..."}]}]
for w in cve_obj.get("weaknesses", []) or []:
for d in w.get("description", []) or []:
v = d.get("value", "")
if v.startswith("CWE-"):
return v, None # description not stored; CWE id alone is what we use
return None, None
def attack_for_cve(cve: str) -> tuple[str, str | None]:
return ATTACK_MAPPING.get(cve, ("T1068", None))
def short_module_name(cve: str) -> str:
"""Find the directory under modules/ that ends with this CVE's tail."""
tail = cve.removeprefix("CVE-").replace("-", "_")
for child in MODULES_DIR.iterdir():
if child.is_dir() and child.name.endswith(f"_cve_{tail}"):
return child.name
return "?"
def build_records(cves: list[str], kev: dict[str, str]) -> list[dict]:
records = []
for i, cve in enumerate(cves, 1):
print(f"[*] [{i:2d}/{len(cves)}] {cve}: NVD lookup", file=sys.stderr)
cwe, _ = fetch_nvd_cwe(cve)
tech, subtech = attack_for_cve(cve)
in_kev = cve in kev
rec = {
"cve": cve,
"module_dir": short_module_name(cve),
"cwe": cwe,
"attack_technique": tech,
"attack_subtechnique": subtech,
"in_kev": in_kev,
"kev_date_added": kev.get(cve, ""),
}
records.append(rec)
# Throttle NVD requests
if i < len(cves):
time.sleep(NVD_DELAY_SECONDS)
return records
def _c_str(s: str | None) -> str:
"""Render a Python str|None as a C string literal or NULL."""
if s is None:
return "NULL"
# only safe chars in our domain (CVE-/CWE-/T#### / dates) so no escaping needed
return f'"{s}"'
def write_c_table(records: list[dict]) -> None:
"""Generate core/cve_metadata.c from the JSON records."""
lines = [
"/*",
" * SKELETONKEY — CVE metadata table",
" *",
" * AUTO-GENERATED by tools/refresh-cve-metadata.py from",
" * docs/CVE_METADATA.json. Do not hand-edit; rerun the script.",
" * Sources: CISA KEV catalog + NVD CVE API 2.0.",
" */",
"",
'#include "cve_metadata.h"',
"",
"#include ",
"#include ",
"",
"const struct cve_metadata cve_metadata_table[] = {",
]
for r in records:
lines.append(" {")
lines.append(f" .cve = {_c_str(r['cve'])},")
lines.append(f" .cwe = {_c_str(r['cwe'])},")
lines.append(f" .attack_technique = {_c_str(r['attack_technique'])},")
lines.append(f" .attack_subtechnique = {_c_str(r['attack_subtechnique'])},")
lines.append(f" .in_kev = {'true' if r['in_kev'] else 'false'},")
lines.append(f" .kev_date_added = {_c_str(r['kev_date_added'])},")
lines.append(" },")
lines += [
"};",
"",
"const size_t cve_metadata_table_len =",
" sizeof(cve_metadata_table) / sizeof(cve_metadata_table[0]);",
"",
"const struct cve_metadata *cve_metadata_lookup(const char *cve)",
"{",
" if (!cve) return NULL;",
" for (size_t i = 0; i < cve_metadata_table_len; i++) {",
" if (strcmp(cve_metadata_table[i].cve, cve) == 0)",
" return &cve_metadata_table[i];",
" }",
" return NULL;",
"}",
"",
]
OUT_C.write_text("\n".join(lines))
print(f"[+] wrote {OUT_C.relative_to(REPO_ROOT)}", file=sys.stderr)
def write_outputs(records: list[dict]) -> None:
OUT_JSON.parent.mkdir(parents=True, exist_ok=True)
OUT_JSON.write_text(json.dumps(records, indent=2) + "\n")
print(f"[+] wrote {OUT_JSON.relative_to(REPO_ROOT)}", file=sys.stderr)
write_c_table(records)
# KEV cross-reference table
in_kev = [r for r in records if r["in_kev"]]
not_in_kev = [r for r in records if not r["in_kev"]]
lines = [
"# CISA KEV Cross-Reference",
"",
"Which SKELETONKEY modules cover CVEs that CISA has observed exploited",
"in the wild per the Known Exploited Vulnerabilities catalog.",
"Refreshed via `tools/refresh-cve-metadata.py`.",
"",
f"**{len(in_kev)} of {len(records)} modules cover KEV-listed CVEs.**",
"",
"## In KEV (prioritize patching)",
"",
"| CVE | Date added to KEV | CWE | Module |",
"| --- | --- | --- | --- |",
]
for r in sorted(in_kev, key=lambda r: r["kev_date_added"]):
lines.append(
f"| {r['cve']} | {r['kev_date_added']} | {r['cwe'] or '?'} | `{r['module_dir']}` |"
)
lines += [
"",
"## Not in KEV",
"",
"Not observed exploited per CISA — but several have public PoC code",
"and are technically reachable. \"Not in KEV\" is not the same as",
"\"safe to ignore\".",
"",
"| CVE | CWE | Module |",
"| --- | --- | --- |",
]
for r in sorted(not_in_kev, key=lambda r: r["cve"]):
lines.append(f"| {r['cve']} | {r['cwe'] or '?'} | `{r['module_dir']}` |")
lines.append("")
OUT_MD.write_text("\n".join(lines))
print(f"[+] wrote {OUT_MD.relative_to(REPO_ROOT)}", file=sys.stderr)
def check_drift() -> int:
"""Exit 1 if the committed JSON differs from a fresh fetch."""
if not OUT_JSON.exists():
print(f"[!] no committed {OUT_JSON.name} — run without --check first", file=sys.stderr)
return 1
committed = json.loads(OUT_JSON.read_text())
fresh = build_records(discover_cves(), fetch_kev_catalog())
if committed == fresh:
print("[+] CVE_METADATA.json is current", file=sys.stderr)
return 0
print("[!] CVE_METADATA.json drifted — refresh via "
"`tools/refresh-cve-metadata.py`", file=sys.stderr)
return 1
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__.splitlines()[1])
ap.add_argument("--check", action="store_true",
help="diff against committed metadata; exit 1 on drift")
args = ap.parse_args()
if args.check:
return check_drift()
cves = discover_cves()
print(f"[*] {len(cves)} CVE(s) in corpus", file=sys.stderr)
kev = fetch_kev_catalog()
records = build_records(cves, kev)
write_outputs(records)
return 0
if __name__ == "__main__":
sys.exit(main())