module metadata: CWE + ATT&CK + CISA KEV triage from federal sources
Adds per-CVE triage annotations that turn SKELETONKEY's JSON output
into something a SIEM/CTI/threat-intel pipeline can route on, and a
KEV badge in --list so operators see at-a-glance which modules
cover actively-exploited bugs.
New tool — tools/refresh-cve-metadata.py:
- Discovers CVEs by scanning modules/<dir>/ (no hardcoded list).
- Fetches CISA's Known Exploited Vulnerabilities catalog
(https://www.cisa.gov/.../known_exploited_vulnerabilities.csv).
- Fetches CWE classifications from NVD's CVE API 2.0
(services.nvd.nist.gov), throttled to the anonymous
5-req/30s limit (~3 minutes for 26 CVEs).
- Hand-curated ATT&CK technique mapping (T1068 default; T1611 for
container escapes, T1082 for kernel info leaks — MITRE doesn't
publish a clean CVE→technique feed).
- Generates three outputs:
docs/CVE_METADATA.json machine-readable, drift-checkable
docs/KEV_CROSSREF.md human-readable table
core/cve_metadata.c auto-generated lookup table
- --check mode diffs the committed JSON against a fresh fetch for
CI drift detection.
New core API — core/cve_metadata.{h,c}:
struct cve_metadata { cve, cwe, attack_technique, attack_subtechnique,
in_kev, kev_date_added };
const struct cve_metadata *cve_metadata_lookup(const char *cve);
Lookup keyed by CVE id, not module name — the metadata is properties
of the CVE (two modules covering the same bug see the same metadata).
The opsec_notes field stays on the module struct because exploit
technique varies per-module (different footprints).
Output surfacing:
- --list: new KEV column shows ★ for KEV-listed CVEs.
- --module-info (text): prints cwe / att&ck / 'in CISA KEV: YES (added
YYYY-MM-DD)' between summary and operations.
- --module-info / --scan (JSON): emits a 'triage' subobject with the
full record, plus an 'opsec_notes' field at top level when set.
Initial snapshot:
- 10 of 26 modules cover KEV-listed CVEs (dirty_cow, dirty_pipe,
pwnkit, sudo_samedit, ptrace_traceme, fuse_legacy, nf_tables,
overlayfs, overlayfs_setuid, netfilter_xtcompat).
- 24 of 26 have NVD CWE mappings; 2 unmapped (NVD has no weakness
record for CVE-2019-13272 and CVE-2026-46300 yet).
- All 26 mapped to an ATT&CK technique.
Verification:
- macOS local: 33 kernel_range + clean build, --module-info shows
'in CISA KEV: YES (added 2024-05-30)' for nf_tables, --list KEV
column renders.
- Linux (docker gcc:latest): 33 + 54 = 87 passes, 0 fails.
Follow-up commits will add per-module OPSEC notes and --explain mode.
This commit is contained in:
Executable
+299
@@ -0,0 +1,299 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
tools/refresh-cve-metadata.py — fetch CWE + KEV status for every CVE in the
|
||||
SKELETONKEY corpus from authoritative federal sources.
|
||||
|
||||
Sources:
|
||||
- CISA Known Exploited Vulnerabilities catalog
|
||||
https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv
|
||||
(authoritative for "is this exploited in the wild?")
|
||||
- NVD CVE API 2.0
|
||||
https://services.nvd.nist.gov/rest/json/cves/2.0?cveId=...
|
||||
(authoritative for CWE classification)
|
||||
|
||||
The output is intentionally NOT auto-applied to module sources — drift
|
||||
between an external source and our embedded metadata should surface as
|
||||
a diff a human reviews. The tool produces:
|
||||
|
||||
docs/CVE_METADATA.json machine-readable per-CVE record
|
||||
docs/KEV_CROSSREF.md human-readable KEV table
|
||||
|
||||
Modules consume the JSON via copy-paste into their struct skeletonkey_module
|
||||
literal (attack_technique, cwe, in_kev, kev_date_added fields). The
|
||||
provenance comment in core/module.h points contributors back here.
|
||||
|
||||
No API key required; the script throttles to NVD's anonymous 5-req/30s
|
||||
limit. ~3 minutes total for 26 CVEs.
|
||||
|
||||
Usage:
|
||||
tools/refresh-cve-metadata.py # refresh + write outputs
|
||||
tools/refresh-cve-metadata.py --check # diff against committed JSON, exit 1 on drift
|
||||
|
||||
Dependencies: stdlib only. Python 3.8+.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
MODULES_DIR = REPO_ROOT / "modules"
|
||||
OUT_JSON = REPO_ROOT / "docs" / "CVE_METADATA.json"
|
||||
OUT_MD = REPO_ROOT / "docs" / "KEV_CROSSREF.md"
|
||||
OUT_C = REPO_ROOT / "core" / "cve_metadata.c"
|
||||
|
||||
KEV_URL = "https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv"
|
||||
NVD_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve}"
|
||||
|
||||
# Per NVD's anonymous rate limit: 5 requests per 30 seconds.
|
||||
NVD_DELAY_SECONDS = 7
|
||||
|
||||
# Module → ATT&CK technique mapping. Almost all kernel/userspace LPEs
|
||||
# map to T1068 (Exploitation for Privilege Escalation). The two
|
||||
# exceptions are noted inline. This mapping is hand-curated; the
|
||||
# tool doesn't pull ATT&CK from any feed (MITRE doesn't publish a
|
||||
# clean CVE → technique CSV).
|
||||
ATTACK_MAPPING = {
|
||||
# Default for every CVE not listed: T1068, no subtechnique.
|
||||
"CVE-2022-0492": ("T1611", None), # cgroup_release_agent — container escape
|
||||
"CVE-2023-0458": ("T1082", None), # entrybleed — kernel info leak, not LPE
|
||||
}
|
||||
|
||||
|
||||
def discover_cves() -> list[str]:
|
||||
"""Find every CVE-NNNN-NNNN id by scanning modules/<dir>/."""
|
||||
cves = set()
|
||||
for child in MODULES_DIR.iterdir():
|
||||
if not child.is_dir():
|
||||
continue
|
||||
# Module dirs end in _cve_YYYY_NNNNN
|
||||
parts = child.name.split("_cve_")
|
||||
if len(parts) != 2:
|
||||
continue
|
||||
cve_tail = parts[1].replace("_", "-")
|
||||
cves.add(f"CVE-{cve_tail}")
|
||||
return sorted(cves)
|
||||
|
||||
|
||||
def fetch_kev_catalog() -> dict[str, str]:
|
||||
"""Return {cve_id: date_added_yyyy_mm_dd} from CISA's KEV CSV."""
|
||||
print(f"[*] fetching CISA KEV catalog ({KEV_URL})", file=sys.stderr)
|
||||
try:
|
||||
with urllib.request.urlopen(KEV_URL, timeout=30) as r:
|
||||
data = r.read().decode("utf-8", errors="replace")
|
||||
except urllib.error.URLError as e:
|
||||
print(f"[!] KEV fetch failed: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
out: dict[str, str] = {}
|
||||
reader = csv.DictReader(io.StringIO(data))
|
||||
for row in reader:
|
||||
cve = row.get("cveID", "").strip()
|
||||
date = row.get("dateAdded", "").strip()
|
||||
if cve:
|
||||
out[cve] = date
|
||||
print(f"[+] KEV catalog has {len(out)} entries", file=sys.stderr)
|
||||
return out
|
||||
|
||||
|
||||
def fetch_nvd_cwe(cve: str) -> tuple[str | None, str | None]:
|
||||
"""Return (cwe_id, description) from NVD. Returns (None, None) on miss."""
|
||||
url = NVD_URL.format(cve=cve)
|
||||
req = urllib.request.Request(url, headers={"User-Agent": "skeletonkey-cve-metadata/1"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as r:
|
||||
blob = json.loads(r.read().decode("utf-8"))
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"[!] NVD HTTP {e.code} for {cve}", file=sys.stderr)
|
||||
return None, None
|
||||
except (urllib.error.URLError, json.JSONDecodeError) as e:
|
||||
print(f"[!] NVD parse error for {cve}: {e}", file=sys.stderr)
|
||||
return None, None
|
||||
vulns = blob.get("vulnerabilities") or []
|
||||
if not vulns:
|
||||
return None, None
|
||||
cve_obj = vulns[0].get("cve", {})
|
||||
# weaknesses: [{source, type, description: [{lang, value: "CWE-..."}]}]
|
||||
for w in cve_obj.get("weaknesses", []) or []:
|
||||
for d in w.get("description", []) or []:
|
||||
v = d.get("value", "")
|
||||
if v.startswith("CWE-"):
|
||||
return v, None # description not stored; CWE id alone is what we use
|
||||
return None, None
|
||||
|
||||
|
||||
def attack_for_cve(cve: str) -> tuple[str, str | None]:
|
||||
return ATTACK_MAPPING.get(cve, ("T1068", None))
|
||||
|
||||
|
||||
def short_module_name(cve: str) -> str:
|
||||
"""Find the directory under modules/ that ends with this CVE's tail."""
|
||||
tail = cve.removeprefix("CVE-").replace("-", "_")
|
||||
for child in MODULES_DIR.iterdir():
|
||||
if child.is_dir() and child.name.endswith(f"_cve_{tail}"):
|
||||
return child.name
|
||||
return "?"
|
||||
|
||||
|
||||
def build_records(cves: list[str], kev: dict[str, str]) -> list[dict]:
|
||||
records = []
|
||||
for i, cve in enumerate(cves, 1):
|
||||
print(f"[*] [{i:2d}/{len(cves)}] {cve}: NVD lookup", file=sys.stderr)
|
||||
cwe, _ = fetch_nvd_cwe(cve)
|
||||
tech, subtech = attack_for_cve(cve)
|
||||
in_kev = cve in kev
|
||||
rec = {
|
||||
"cve": cve,
|
||||
"module_dir": short_module_name(cve),
|
||||
"cwe": cwe,
|
||||
"attack_technique": tech,
|
||||
"attack_subtechnique": subtech,
|
||||
"in_kev": in_kev,
|
||||
"kev_date_added": kev.get(cve, ""),
|
||||
}
|
||||
records.append(rec)
|
||||
# Throttle NVD requests
|
||||
if i < len(cves):
|
||||
time.sleep(NVD_DELAY_SECONDS)
|
||||
return records
|
||||
|
||||
|
||||
def _c_str(s: str | None) -> str:
|
||||
"""Render a Python str|None as a C string literal or NULL."""
|
||||
if s is None:
|
||||
return "NULL"
|
||||
# only safe chars in our domain (CVE-/CWE-/T#### / dates) so no escaping needed
|
||||
return f'"{s}"'
|
||||
|
||||
|
||||
def write_c_table(records: list[dict]) -> None:
|
||||
"""Generate core/cve_metadata.c from the JSON records."""
|
||||
lines = [
|
||||
"/*",
|
||||
" * SKELETONKEY — CVE metadata table",
|
||||
" *",
|
||||
" * AUTO-GENERATED by tools/refresh-cve-metadata.py from",
|
||||
" * docs/CVE_METADATA.json. Do not hand-edit; rerun the script.",
|
||||
" * Sources: CISA KEV catalog + NVD CVE API 2.0.",
|
||||
" */",
|
||||
"",
|
||||
'#include "cve_metadata.h"',
|
||||
"",
|
||||
"#include <stddef.h>",
|
||||
"#include <string.h>",
|
||||
"",
|
||||
"const struct cve_metadata cve_metadata_table[] = {",
|
||||
]
|
||||
for r in records:
|
||||
lines.append(" {")
|
||||
lines.append(f" .cve = {_c_str(r['cve'])},")
|
||||
lines.append(f" .cwe = {_c_str(r['cwe'])},")
|
||||
lines.append(f" .attack_technique = {_c_str(r['attack_technique'])},")
|
||||
lines.append(f" .attack_subtechnique = {_c_str(r['attack_subtechnique'])},")
|
||||
lines.append(f" .in_kev = {'true' if r['in_kev'] else 'false'},")
|
||||
lines.append(f" .kev_date_added = {_c_str(r['kev_date_added'])},")
|
||||
lines.append(" },")
|
||||
lines += [
|
||||
"};",
|
||||
"",
|
||||
"const size_t cve_metadata_table_len =",
|
||||
" sizeof(cve_metadata_table) / sizeof(cve_metadata_table[0]);",
|
||||
"",
|
||||
"const struct cve_metadata *cve_metadata_lookup(const char *cve)",
|
||||
"{",
|
||||
" if (!cve) return NULL;",
|
||||
" for (size_t i = 0; i < cve_metadata_table_len; i++) {",
|
||||
" if (strcmp(cve_metadata_table[i].cve, cve) == 0)",
|
||||
" return &cve_metadata_table[i];",
|
||||
" }",
|
||||
" return NULL;",
|
||||
"}",
|
||||
"",
|
||||
]
|
||||
OUT_C.write_text("\n".join(lines))
|
||||
print(f"[+] wrote {OUT_C.relative_to(REPO_ROOT)}", file=sys.stderr)
|
||||
|
||||
|
||||
def write_outputs(records: list[dict]) -> None:
|
||||
OUT_JSON.parent.mkdir(parents=True, exist_ok=True)
|
||||
OUT_JSON.write_text(json.dumps(records, indent=2) + "\n")
|
||||
print(f"[+] wrote {OUT_JSON.relative_to(REPO_ROOT)}", file=sys.stderr)
|
||||
write_c_table(records)
|
||||
|
||||
# KEV cross-reference table
|
||||
in_kev = [r for r in records if r["in_kev"]]
|
||||
not_in_kev = [r for r in records if not r["in_kev"]]
|
||||
lines = [
|
||||
"# CISA KEV Cross-Reference",
|
||||
"",
|
||||
"Which SKELETONKEY modules cover CVEs that CISA has observed exploited",
|
||||
"in the wild per the Known Exploited Vulnerabilities catalog.",
|
||||
"Refreshed via `tools/refresh-cve-metadata.py`.",
|
||||
"",
|
||||
f"**{len(in_kev)} of {len(records)} modules cover KEV-listed CVEs.**",
|
||||
"",
|
||||
"## In KEV (prioritize patching)",
|
||||
"",
|
||||
"| CVE | Date added to KEV | CWE | Module |",
|
||||
"| --- | --- | --- | --- |",
|
||||
]
|
||||
for r in sorted(in_kev, key=lambda r: r["kev_date_added"]):
|
||||
lines.append(
|
||||
f"| {r['cve']} | {r['kev_date_added']} | {r['cwe'] or '?'} | `{r['module_dir']}` |"
|
||||
)
|
||||
lines += [
|
||||
"",
|
||||
"## Not in KEV",
|
||||
"",
|
||||
"Not observed exploited per CISA — but several have public PoC code",
|
||||
"and are technically reachable. \"Not in KEV\" is not the same as",
|
||||
"\"safe to ignore\".",
|
||||
"",
|
||||
"| CVE | CWE | Module |",
|
||||
"| --- | --- | --- |",
|
||||
]
|
||||
for r in sorted(not_in_kev, key=lambda r: r["cve"]):
|
||||
lines.append(f"| {r['cve']} | {r['cwe'] or '?'} | `{r['module_dir']}` |")
|
||||
lines.append("")
|
||||
OUT_MD.write_text("\n".join(lines))
|
||||
print(f"[+] wrote {OUT_MD.relative_to(REPO_ROOT)}", file=sys.stderr)
|
||||
|
||||
|
||||
def check_drift() -> int:
|
||||
"""Exit 1 if the committed JSON differs from a fresh fetch."""
|
||||
if not OUT_JSON.exists():
|
||||
print(f"[!] no committed {OUT_JSON.name} — run without --check first", file=sys.stderr)
|
||||
return 1
|
||||
committed = json.loads(OUT_JSON.read_text())
|
||||
fresh = build_records(discover_cves(), fetch_kev_catalog())
|
||||
if committed == fresh:
|
||||
print("[+] CVE_METADATA.json is current", file=sys.stderr)
|
||||
return 0
|
||||
print("[!] CVE_METADATA.json drifted — refresh via "
|
||||
"`tools/refresh-cve-metadata.py`", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser(description=__doc__.splitlines()[1])
|
||||
ap.add_argument("--check", action="store_true",
|
||||
help="diff against committed metadata; exit 1 on drift")
|
||||
args = ap.parse_args()
|
||||
if args.check:
|
||||
return check_drift()
|
||||
cves = discover_cves()
|
||||
print(f"[*] {len(cves)} CVE(s) in corpus", file=sys.stderr)
|
||||
kev = fetch_kev_catalog()
|
||||
records = build_records(cves, kev)
|
||||
write_outputs(records)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user