|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""Fetch the MITRE CWE catalog and generate a Python lookup table. |
| 3 | +
|
| 4 | +Downloads the full CWE catalog from MITRE, parses it, and generates |
| 5 | +a Python dict at socket_basics/core/connector/opengrep/cwe_catalog.py. |
| 6 | +
|
| 7 | +Usage: |
| 8 | + python scripts/update_cwe_catalog.py |
| 9 | +
|
| 10 | +Re-run anytime MITRE publishes new CWE entries to update the local table. |
| 11 | +""" |
| 12 | + |
| 13 | +import csv |
| 14 | +import io |
| 15 | +import re |
| 16 | +import sys |
| 17 | +import textwrap |
| 18 | +import urllib.request |
| 19 | +import zipfile |
| 20 | +from datetime import datetime, timezone |
| 21 | +from pathlib import Path |
| 22 | + |
| 23 | +PROJECT_ROOT = Path(__file__).resolve().parent.parent |
| 24 | +OUTPUT_FILE = ( |
| 25 | + PROJECT_ROOT |
| 26 | + / "socket_basics" |
| 27 | + / "core" |
| 28 | + / "connector" |
| 29 | + / "opengrep" |
| 30 | + / "cwe_catalog.py" |
| 31 | +) |
| 32 | + |
| 33 | +# MITRE CWE Research Concepts view — covers all software weaknesses |
| 34 | +CWE_CSV_URL = "https://cwe.mitre.org/data/csv/1000.csv.zip" |
| 35 | + |
| 36 | +# --------------------------------------------------------------------------- |
| 37 | +# OWASP Top 10 (2021) reverse mapping: CWE-ID -> OWASP category |
| 38 | +# Source: https://owasp.org/Top10/ |
| 39 | +# --------------------------------------------------------------------------- |
| 40 | +_OWASP_MAPPING: dict[str, str] = {} |
| 41 | + |
| 42 | +# A01:2021 — Broken Access Control |
| 43 | +for _id in [ |
| 44 | + 22, 23, 35, 59, 200, 219, 264, 275, 276, 284, 285, 352, 359, 377, |
| 45 | + 402, 425, 441, 497, 538, 540, 548, 552, 566, 601, 639, 651, 668, |
| 46 | + 706, 862, 863, 913, 922, 1275, |
| 47 | +]: |
| 48 | + _OWASP_MAPPING[f"CWE-{_id}"] = "A01:2021" |
| 49 | + |
| 50 | +# A02:2021 — Cryptographic Failures |
| 51 | +for _id in [ |
| 52 | + 261, 296, 310, 319, 321, 322, 323, 324, 325, 326, 327, 328, 329, |
| 53 | + 330, 331, 335, 336, 337, 338, 340, 347, 523, 720, 757, 759, 760, |
| 54 | + 780, 798, 916, |
| 55 | +]: |
| 56 | + _OWASP_MAPPING[f"CWE-{_id}"] = "A02:2021" |
| 57 | + |
| 58 | +# A03:2021 — Injection |
| 59 | +for _id in [ |
| 60 | + 20, 74, 75, 77, 78, 79, 80, 83, 87, 88, 89, 90, 91, 93, 94, 95, |
| 61 | + 96, 97, 98, 99, 100, 113, 116, 117, 134, 138, 184, 470, 471, 564, |
| 62 | + 610, 643, 644, 652, 917, 943, 1236, 1321, 1336, |
| 63 | +]: |
| 64 | + _OWASP_MAPPING[f"CWE-{_id}"] = "A03:2021" |
| 65 | + |
| 66 | +# A04:2021 — Insecure Design |
| 67 | +for _id in [ |
| 68 | + 73, 183, 209, 213, 235, 256, 257, 266, 269, 280, 311, 312, 313, |
| 69 | + 316, 419, 430, 434, 444, 451, 472, 501, 522, 525, 539, 579, 598, |
| 70 | + 602, 642, 646, 650, 653, 656, 657, 799, 807, 840, 841, 927, 1021, |
| 71 | + 1173, |
| 72 | +]: |
| 73 | + _OWASP_MAPPING.setdefault(f"CWE-{_id}", "A04:2021") |
| 74 | + |
| 75 | +# A05:2021 — Security Misconfiguration |
| 76 | +for _id in [ |
| 77 | + 2, 11, 13, 15, 16, 260, 315, 489, 497, 520, 526, 537, 541, 547, |
| 78 | + 611, 614, 693, 732, 756, 776, 942, 1004, 1032, 1174, |
| 79 | +]: |
| 80 | + _OWASP_MAPPING.setdefault(f"CWE-{_id}", "A05:2021") |
| 81 | + |
| 82 | +# A06:2021 — Vulnerable and Outdated Components |
| 83 | +for _id in [477, 1104, 1059]: |
| 84 | + _OWASP_MAPPING.setdefault(f"CWE-{_id}", "A06:2021") |
| 85 | + |
| 86 | +# A07:2021 — Identification and Authentication Failures |
| 87 | +for _id in [ |
| 88 | + 255, 259, 287, 288, 290, 294, 295, 297, 300, 302, 304, 306, 307, |
| 89 | + 346, 384, 521, 613, 620, 640, 798, 940, 1216, |
| 90 | +]: |
| 91 | + _OWASP_MAPPING.setdefault(f"CWE-{_id}", "A07:2021") |
| 92 | + |
| 93 | +# A08:2021 — Software and Data Integrity Failures |
| 94 | +for _id in [345, 353, 426, 494, 502, 565, 784, 829, 830, 915]: |
| 95 | + _OWASP_MAPPING.setdefault(f"CWE-{_id}", "A08:2021") |
| 96 | + |
| 97 | +# A09:2021 — Security Logging and Monitoring Failures |
| 98 | +for _id in [117, 223, 532, 778]: |
| 99 | + _OWASP_MAPPING.setdefault(f"CWE-{_id}", "A09:2021") |
| 100 | + |
| 101 | +# A10:2021 — Server-Side Request Forgery |
| 102 | +for _id in [918]: |
| 103 | + _OWASP_MAPPING.setdefault(f"CWE-{_id}", "A10:2021") |
| 104 | + |
| 105 | + |
| 106 | +# --------------------------------------------------------------------------- |
| 107 | +# Vulnerability category classification |
| 108 | +# --------------------------------------------------------------------------- |
| 109 | + |
| 110 | +# Explicit overrides for CWEs where keyword matching would be wrong |
| 111 | +_CATEGORY_OVERRIDES: dict[str, str] = { |
| 112 | + # XSS variants |
| 113 | + "CWE-79": "Cross-Site Scripting (XSS)", |
| 114 | + "CWE-80": "Cross-Site Scripting (XSS)", |
| 115 | + "CWE-83": "Cross-Site Scripting (XSS)", |
| 116 | + "CWE-87": "Cross-Site Scripting (XSS)", |
| 117 | + # Specific vulnerability classes |
| 118 | + "CWE-502": "Insecure Deserialization", |
| 119 | + "CWE-611": "XML External Entity (XXE)", |
| 120 | + "CWE-918": "Server-Side Request Forgery (SSRF)", |
| 121 | + "CWE-352": "Cross-Site Request Forgery (CSRF)", |
| 122 | + "CWE-434": "Unrestricted File Upload", |
| 123 | + "CWE-1321": "Prototype Pollution", |
| 124 | + "CWE-1336": "Template Injection", |
| 125 | + "CWE-470": "Unsafe Reflection", |
| 126 | + # CWEs used in our rules that keyword matching misses |
| 127 | + "CWE-16": "Security Misconfiguration", |
| 128 | + "CWE-98": "Injection Vulnerability", |
| 129 | + "CWE-117": "Sensitive Data Exposure", |
| 130 | + "CWE-131": "Memory Safety Violation", |
| 131 | + "CWE-208": "Cryptographic Weakness", |
| 132 | + "CWE-242": "Memory Safety Violation", |
| 133 | + "CWE-310": "Cryptographic Weakness", |
| 134 | + "CWE-330": "Cryptographic Weakness", |
| 135 | + "CWE-345": "Insecure Deserialization", |
| 136 | + "CWE-353": "Insecure Deserialization", |
| 137 | + "CWE-477": "Security Misconfiguration", |
| 138 | + "CWE-479": "Memory Safety Violation", |
| 139 | + "CWE-494": "Insecure Deserialization", |
| 140 | + "CWE-667": "Insecure Design", |
| 141 | + "CWE-693": "Security Misconfiguration", |
| 142 | + "CWE-697": "Insecure Design", |
| 143 | + "CWE-704": "Insecure Design", |
| 144 | + "CWE-915": "Injection Vulnerability", |
| 145 | + "CWE-926": "Security Misconfiguration", |
| 146 | + "CWE-942": "Security Misconfiguration", |
| 147 | + "CWE-943": "Injection Vulnerability", |
| 148 | + "CWE-1059": "Security Misconfiguration", |
| 149 | + "CWE-1104": "Security Misconfiguration", |
| 150 | +} |
| 151 | + |
| 152 | +# Ordered keyword rules — first match wins |
| 153 | +_CATEGORY_KEYWORDS: list[tuple[str, list[str]]] = [ |
| 154 | + ("Injection Vulnerability", [ |
| 155 | + "sql injection", "os command", "command injection", "code injection", |
| 156 | + "eval injection", "ldap injection", "xpath injection", "injection", |
| 157 | + "nosql", "format string", "argument delimiter", |
| 158 | + ]), |
| 159 | + ("Cross-Site Scripting (XSS)", [ |
| 160 | + "cross-site scripting", "xss", |
| 161 | + ]), |
| 162 | + ("Cryptographic Weakness", [ |
| 163 | + "cryptograph", "cipher", "hash", "random number", "prng", "rng", |
| 164 | + "certificate", "tls", "ssl", "encrypt", "key exchange", "key manage", |
| 165 | + "cleartext transmission", "password hash", |
| 166 | + ]), |
| 167 | + ("Authentication Weakness", [ |
| 168 | + "authenticat", "credential", "password", "session fixation", "brute force", |
| 169 | + "login", "hard-coded password", |
| 170 | + ]), |
| 171 | + ("Access Control Violation", [ |
| 172 | + "access control", "authorization", "traversal", "path traversal", |
| 173 | + "redirect", "permission", "privilege", "idor", "direct object", |
| 174 | + ]), |
| 175 | + ("Memory Safety Violation", [ |
| 176 | + "buffer overflow", "buffer over-read", "buffer underwrite", "buffer copy", |
| 177 | + "heap-based", "stack-based", "out-of-bounds", "use after free", |
| 178 | + "double free", "null pointer", "integer overflow", "integer underflow", |
| 179 | + "memory", "free of pointer", "uninitialized", |
| 180 | + ]), |
| 181 | + ("Security Misconfiguration", [ |
| 182 | + "misconfigur", "debug", "default", "configuration", "verbose error", |
| 183 | + "information exposure", "information leak", "error message", |
| 184 | + "sensitive cookie", "cors", |
| 185 | + ]), |
| 186 | + ("Insecure Deserialization", [ |
| 187 | + "deserializ", "pickle", "unmarshall", "untrusted data", |
| 188 | + ]), |
| 189 | + ("Denial of Service", [ |
| 190 | + "denial of service", "resource consumption", "regular expression", |
| 191 | + "redos", "decompression bomb", "amplification", "loop", |
| 192 | + ]), |
| 193 | + ("Sensitive Data Exposure", [ |
| 194 | + "sensitive information", "log file", "cleartext storage", "plaintext", |
| 195 | + "insufficient logging", |
| 196 | + ]), |
| 197 | + ("Insecure File Operation", [ |
| 198 | + "temporary file", "file name", "file path", "symlink", "race condition", |
| 199 | + ]), |
| 200 | + ("Improper Error Handling", [ |
| 201 | + "exception", "error handling", "exceptional condition", |
| 202 | + ]), |
| 203 | + ("Insecure Design", [ |
| 204 | + "input validation", "improper validation", "missing validation", |
| 205 | + ]), |
| 206 | + ("Server-Side Request Forgery (SSRF)", [ |
| 207 | + "server-side request forgery", "ssrf", |
| 208 | + ]), |
| 209 | +] |
| 210 | + |
| 211 | + |
| 212 | +def _classify_cwe(cwe_id: str, name: str) -> str: |
| 213 | + """Map a CWE to a vulnerability category using overrides + keyword matching.""" |
| 214 | + if cwe_id in _CATEGORY_OVERRIDES: |
| 215 | + return _CATEGORY_OVERRIDES[cwe_id] |
| 216 | + name_lower = name.lower() |
| 217 | + for category, keywords in _CATEGORY_KEYWORDS: |
| 218 | + if any(kw in name_lower for kw in keywords): |
| 219 | + return category |
| 220 | + return "Other" |
| 221 | + |
| 222 | + |
| 223 | +def _clean_description(desc: str) -> str: |
| 224 | + """Normalize whitespace and truncate excessively long descriptions.""" |
| 225 | + desc = re.sub(r"\s+", " ", desc).strip() |
| 226 | + # Truncate to ~250 chars at a sentence boundary for readability |
| 227 | + if len(desc) > 300: |
| 228 | + # Try to cut at a period |
| 229 | + cut = desc[:300].rfind(". ") |
| 230 | + if cut > 100: |
| 231 | + desc = desc[: cut + 1] |
| 232 | + else: |
| 233 | + desc = desc[:297] + "..." |
| 234 | + return desc |
| 235 | + |
| 236 | + |
| 237 | +def fetch_cwe_csv() -> list[dict]: |
| 238 | + """Download and parse the MITRE CWE CSV catalog.""" |
| 239 | + print(f"Downloading CWE catalog from {CWE_CSV_URL} ...") |
| 240 | + req = urllib.request.Request(CWE_CSV_URL, headers={"User-Agent": "socket-basics/1.0"}) |
| 241 | + with urllib.request.urlopen(req, timeout=60) as resp: |
| 242 | + zip_data = resp.read() |
| 243 | + |
| 244 | + print(f"Downloaded {len(zip_data)} bytes, extracting ...") |
| 245 | + with zipfile.ZipFile(io.BytesIO(zip_data)) as zf: |
| 246 | + csv_names = [n for n in zf.namelist() if n.endswith(".csv")] |
| 247 | + if not csv_names: |
| 248 | + raise RuntimeError(f"No CSV found in ZIP. Contents: {zf.namelist()}") |
| 249 | + csv_data = zf.read(csv_names[0]).decode("utf-8-sig") |
| 250 | + |
| 251 | + reader = csv.DictReader(io.StringIO(csv_data)) |
| 252 | + entries = [] |
| 253 | + for row in reader: |
| 254 | + cwe_num = row.get("CWE-ID", "").strip() |
| 255 | + name = row.get("Name", "").strip() |
| 256 | + description = row.get("Description", "").strip() |
| 257 | + status = row.get("Status", "").strip() |
| 258 | + |
| 259 | + if not cwe_num or not name: |
| 260 | + continue |
| 261 | + # Skip deprecated/obsolete entries |
| 262 | + if status.lower() in ("deprecated", "obsolete"): |
| 263 | + continue |
| 264 | + |
| 265 | + cwe_id = f"CWE-{cwe_num}" |
| 266 | + entries.append( |
| 267 | + { |
| 268 | + "id": cwe_id, |
| 269 | + "name": name, |
| 270 | + "description": _clean_description(description), |
| 271 | + "category": _classify_cwe(cwe_id, name), |
| 272 | + "owasp": _OWASP_MAPPING.get(cwe_id, ""), |
| 273 | + } |
| 274 | + ) |
| 275 | + |
| 276 | + # Add synthetic entries for deprecated/pillar CWEs that our rules still use |
| 277 | + # but MITRE removed from the Research Concepts view. |
| 278 | + seen = {e["id"] for e in entries} |
| 279 | + for cwe_id, info in _SYNTHETIC_ENTRIES.items(): |
| 280 | + if cwe_id not in seen: |
| 281 | + entries.append( |
| 282 | + { |
| 283 | + "id": cwe_id, |
| 284 | + "name": info["name"], |
| 285 | + "description": info["description"], |
| 286 | + "category": _classify_cwe(cwe_id, info["name"]), |
| 287 | + "owasp": _OWASP_MAPPING.get(cwe_id, ""), |
| 288 | + } |
| 289 | + ) |
| 290 | + |
| 291 | + return entries |
| 292 | + |
| 293 | + |
| 294 | +# Deprecated/pillar CWEs still referenced by our rule YAML files. |
| 295 | +# These are absent from the Research Concepts CSV but needed in the catalog. |
| 296 | +_SYNTHETIC_ENTRIES: dict[str, dict[str, str]] = { |
| 297 | + "CWE-16": { |
| 298 | + "name": "Configuration", |
| 299 | + "description": ( |
| 300 | + "The application uses an insecure or incorrect configuration " |
| 301 | + "setting, which may weaken its overall security posture." |
| 302 | + ), |
| 303 | + }, |
| 304 | + "CWE-310": { |
| 305 | + "name": "Cryptographic Issues", |
| 306 | + "description": ( |
| 307 | + "The application contains a general cryptographic weakness, " |
| 308 | + "such as misuse of primitives or improper key management, " |
| 309 | + "that may undermine data protection." |
| 310 | + ), |
| 311 | + }, |
| 312 | +} |
| 313 | + |
| 314 | + |
| 315 | +def generate_python(entries: list[dict]) -> str: |
| 316 | + """Generate the cwe_catalog.py source code.""" |
| 317 | + now = datetime.now(timezone.utc).strftime("%Y-%m-%d") |
| 318 | + lines = [ |
| 319 | + '"""CWE Catalog — auto-generated lookup table.', |
| 320 | + "", |
| 321 | + f"Source: MITRE CWE Research Concepts (View 1000)", |
| 322 | + f"URL: {CWE_CSV_URL}", |
| 323 | + f"Generated: {now}", |
| 324 | + f"Entries: {len(entries)}", |
| 325 | + "", |
| 326 | + "Run `python scripts/update_cwe_catalog.py` to regenerate.", |
| 327 | + '"""', |
| 328 | + "", |
| 329 | + "", |
| 330 | + "CWE_CATALOG: dict[str, dict[str, str]] = {", |
| 331 | + ] |
| 332 | + |
| 333 | + for e in sorted(entries, key=lambda x: int(x["id"].split("-")[1])): |
| 334 | + cwe_id = e["id"] |
| 335 | + name = e["name"].replace('"', '\\"') |
| 336 | + desc = e["description"].replace('"', '\\"') |
| 337 | + cat = e["category"].replace('"', '\\"') |
| 338 | + owasp = e["owasp"] |
| 339 | + |
| 340 | + lines.append(f' "{cwe_id}": {{') |
| 341 | + lines.append(f' "name": "{name}",') |
| 342 | + lines.append(f' "description": "{desc}",') |
| 343 | + lines.append(f' "category": "{cat}",') |
| 344 | + if owasp: |
| 345 | + lines.append(f' "owasp": "{owasp}",') |
| 346 | + lines.append(" },") |
| 347 | + |
| 348 | + lines.append("}") |
| 349 | + lines.append("") |
| 350 | + return "\n".join(lines) |
| 351 | + |
| 352 | + |
| 353 | +def main() -> None: |
| 354 | + entries = fetch_cwe_csv() |
| 355 | + print(f"Parsed {len(entries)} CWE entries") |
| 356 | + |
| 357 | + # Stats |
| 358 | + categories = {} |
| 359 | + for e in entries: |
| 360 | + cat = e["category"] |
| 361 | + categories[cat] = categories.get(cat, 0) + 1 |
| 362 | + print("\nCategory distribution:") |
| 363 | + for cat, count in sorted(categories.items(), key=lambda x: -x[1]): |
| 364 | + print(f" {cat}: {count}") |
| 365 | + |
| 366 | + owasp_count = sum(1 for e in entries if e["owasp"]) |
| 367 | + print(f"\nOWASP mapped: {owasp_count}/{len(entries)}") |
| 368 | + |
| 369 | + source = generate_python(entries) |
| 370 | + OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) |
| 371 | + OUTPUT_FILE.write_text(source, encoding="utf-8") |
| 372 | + print(f"\nWrote {OUTPUT_FILE} ({len(source)} bytes, {len(entries)} entries)") |
| 373 | + |
| 374 | + |
| 375 | +if __name__ == "__main__": |
| 376 | + main() |
0 commit comments