From 0283699e7ae6962fa5d1f5b0b37ae46e449286ae Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Thu, 21 Aug 2025 23:17:41 +0300 Subject: [PATCH 1/2] Add Local VulnerableCod Datasource in VulnTotal and allow live evaluation #1984 Signed-off-by: Michael Ehab Mikhail --- vulntotal/datasources/__init__.py | 2 + vulntotal/datasources/vulnerablecode_local.py | 121 +++++++++++++++ vulntotal/tests/test_vulnerablecode_local.py | 142 ++++++++++++++++++ 3 files changed, 265 insertions(+) create mode 100644 vulntotal/datasources/vulnerablecode_local.py create mode 100644 vulntotal/tests/test_vulnerablecode_local.py diff --git a/vulntotal/datasources/__init__.py b/vulntotal/datasources/__init__.py index fb269a201..8f685d059 100644 --- a/vulntotal/datasources/__init__.py +++ b/vulntotal/datasources/__init__.py @@ -15,6 +15,7 @@ from vulntotal.datasources import safetydb from vulntotal.datasources import snyk from vulntotal.datasources import vulnerablecode +from vulntotal.datasources import vulnerablecode_local from vulntotal.validator import DataSource DATASOURCE_REGISTRY = { @@ -26,4 +27,5 @@ "osv": osv.OSVDataSource, "snyk": snyk.SnykDataSource, "vulnerablecode": vulnerablecode.VulnerableCodeDataSource, + "vulnerablecode_local": vulnerablecode_local.LocalVulnerableCodeDataSource, } diff --git a/vulntotal/datasources/vulnerablecode_local.py b/vulntotal/datasources/vulnerablecode_local.py new file mode 100644 index 000000000..95ba63424 --- /dev/null +++ b/vulntotal/datasources/vulnerablecode_local.py @@ -0,0 +1,121 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/nexB/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import logging +import os +from urllib.parse import urljoin + +import requests +from dotenv import load_dotenv +from packageurl import PackageURL + +from vulntotal.datasources.vulnerablecode import VulnerableCodeDataSource +from vulntotal.validator import VendorData + +logger = logging.getLogger(__name__) + + +def _is_true(val: str | None) -> bool: + return (val is not None) and str(val).strip().lower() in {"1", "true", "yes", "on"} + + +class LocalVulnerableCodeDataSource(VulnerableCodeDataSource): + live_eval_api_path = "api/v2/live-evaluation/evaluate" + vc_purl_search_api_path = "api/v2/advisories-packages/bulk_search/" + + def __init__(self): + super().__init__() + load_dotenv() + + host = os.environ.get("VCIO_HOST", "localhost").rstrip("/") + port = os.environ.get("VCIO_PORT", "8000") + + if host.startswith("http://") or host.startswith("https://"): + base = host + else: + base = f"http://{host}:{port}" + + self.global_instance = f"{base}/" + + self._enable_live_eval = _is_true(os.environ.get("ENABLE_LIVE_EVAL", False)) + + def _trigger_live_evaluation(self, purl: PackageURL) -> bool: + """Trigger live evaluation for the given purl on the local VCIO instance. + + Returns True if the trigger was accepted and False otherwise. + """ + url = urljoin(self.global_instance, self.live_eval_api_path) + try: + response = requests.post(url, json={"purl_string": str(purl)}) + except Exception as e: + logger.error(f"Live evaluation trigger failed for {purl}: {e}") + return False + + if response.status_code != 202: + logger.error( + f"Live evaluation trigger for {purl} failed with status {response.status_code}: {response.text}" + ) + return False + + logger.info(f"Live evaluation accepted for {purl} on {url}") + return True + + def fetch_post_json(self, payload): + url = urljoin(self.global_instance, self.vc_purl_search_api_path) + try: + response = requests.post(url, json=payload) + except Exception as e: + logger.error(f"Error while fetching {url}: {e}") + return + if response.status_code != 200: + logger.error(f"Error while fetching {url}") + return + return response.json() + + def datasource_advisory(self, purl): + if purl.type not in self.supported_ecosystem() or purl.version is None: + return + + if self._enable_live_eval: + self._trigger_live_evaluation(purl) + + metadata = self.fetch_post_json({"purls": [str(purl)]}) + self._raw_dump.append(metadata) + if not metadata: + return + + packages = metadata.get("packages") or [] + advisories_map = metadata.get("advisories") or {} + if not packages: + return + + pkg_entry = next((pkg for pkg in packages if pkg.get("purl") == str(purl)), packages[0]) + affected_map = pkg_entry.get("affected_by_vulnerabilities", {}) or {} + + for advisory_id, details in affected_map.items(): + fixed_versions = [] + fixed_purls = details.get("fixed_by_packages") or [] + for fp in fixed_purls: + try: + ver = PackageURL.from_string(fp).version + if ver: + fixed_versions.append(ver) + except Exception: + continue + + advisory_key = advisory_id.split("/")[-1] + advisory_obj = advisories_map.get(advisory_key, {}) + aliases = advisory_obj.get("aliases") or [] + + yield VendorData( + purl=PackageURL(purl.type, purl.namespace, purl.name), + aliases=aliases, + affected_versions=[purl.version], + fixed_versions=fixed_versions, + ) diff --git a/vulntotal/tests/test_vulnerablecode_local.py b/vulntotal/tests/test_vulnerablecode_local.py new file mode 100644 index 000000000..821d7e694 --- /dev/null +++ b/vulntotal/tests/test_vulnerablecode_local.py @@ -0,0 +1,142 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import os +from typing import List + +import pytest +from packageurl import PackageURL + +from vulntotal.datasources.vulnerablecode_local import LocalVulnerableCodeDataSource + + +class FakeResponse: + def __init__(self, status_code=200, json_data=None, text=""): + self.status_code = status_code + self._json = json_data + self.text = text or ("" if json_data is None else str(json_data)) + + def json(self): + return self._json + + +def make_v2_advisories_response( + pkg_purl: str, advisory_id: str, aliases: List[str], fixes: List[str] +): + return { + "packages": [ + { + "purl": pkg_purl, + "affected_by_vulnerabilities": { + "live_v2_importer_name/" + + advisory_id: { + "advisory_id": "live_v2_importer_name/" + advisory_id, + "fixed_by_packages": fixes, + "code_fixes": [], + } + }, + } + ], + "advisories": { + advisory_id: { + "advisory_id": "live_v2_importer_name/" + advisory_id, + "aliases": aliases, + } + }, + } + + +def test_local_vulnerablecode_v2_bulk_search_and_vendor_data(monkeypatch): + monkeypatch.setenv("VCIO_HOST", "localhost") + monkeypatch.setenv("VCIO_PORT", "1234") + monkeypatch.setenv("ENABLE_LIVE_EVAL", "0") + + calls = [] + + def fake_post(url, json=None, **kwargs): + calls.append((url, json)) + if url.endswith("/api/v2/advisories-packages/bulk_search/"): + return FakeResponse( + 200, + make_v2_advisories_response( + pkg_purl="pkg:pypi/demo@1.2.3", + advisory_id="ADV-123", + aliases=["CVE-2024-0001", "GHSA-foo"], + fixes=["pkg:pypi/demo@1.2.4", "pkg:pypi/demo@1.3.0"], + ), + ) + + return FakeResponse(404, {"detail": "not found"}) + + monkeypatch.setattr("vulntotal.datasources.vulnerablecode_local.requests.post", fake_post) + + ds = LocalVulnerableCodeDataSource() + purl = PackageURL.from_string("pkg:pypi/demo@1.2.3") + + results = list(ds.datasource_advisory(purl)) + + assert any( + "/api/v2/advisories-packages/bulk_search/" in url for url, _ in calls + ), "v2 advisories bulk_search should be called" + + assert not any( + "/api/v2/live-evaluation/evaluate" in url for url, _ in calls + ), "live evaluation should not be called when disabled" + + assert len(results) == 1 + vd = results[0].to_dict() + assert vd["purl"] == "pkg:pypi/demo" + assert vd["aliases"] == ["CVE-2024-0001", "GHSA-foo"] + assert vd["affected_versions"] == ["1.2.3"] + assert sorted(vd["fixed_versions"]) == ["1.2.4", "1.3.0"] + + +def test_local_vulnerablecode_triggers_live_evaluation_when_enabled(monkeypatch): + monkeypatch.setenv("VCIO_HOST", "localhost") + monkeypatch.setenv("VCIO_PORT", "1234") + monkeypatch.setenv("ENABLE_LIVE_EVAL", "1") + + calls = [] + + def fake_post(url, json=None, **kwargs): # noqa: A002 (shadowing builtins) + calls.append((url, json)) + if url.endswith("/api/v2/live-evaluation/evaluate"): + return FakeResponse(202, {"status": "accepted"}) + if url.endswith("/api/v2/advisories-packages/bulk_search/"): + return FakeResponse( + 200, + make_v2_advisories_response( + pkg_purl="pkg:pypi/demo@1.2.3", + advisory_id="ADV-999", + aliases=["CVE-2025-1111"], + fixes=["pkg:pypi/demo@1.2.5"], + ), + ) + return FakeResponse(404, {"detail": "not found"}) + + monkeypatch.setattr("vulntotal.datasources.vulnerablecode_local.requests.post", fake_post) + + ds = LocalVulnerableCodeDataSource() + purl = PackageURL.from_string("pkg:pypi/demo@1.2.3") + + results = list(ds.datasource_advisory(purl)) + + urls = [u for u, _ in calls] + assert any( + "/api/v2/live-evaluation/evaluate" in url for url in urls + ), "live evaluation endpoint should be called when enabled" + assert any( + "/api/v2/advisories-packages/bulk_search/" in url for url in urls + ), "v2 advisories bulk_search should be called" + + assert len(results) == 1 + vd = results[0].to_dict() + assert vd["aliases"] == ["CVE-2025-1111"] + assert vd["affected_versions"] == ["1.2.3"] + assert vd["fixed_versions"] == ["1.2.5"] From cd9257e0455b8b498559b50ace89e18720cb270c Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Thu, 21 Aug 2025 23:36:42 +0300 Subject: [PATCH 2/2] Use Optional for compatibility with older python versions Signed-off-by: Michael Ehab Mikhail --- vulntotal/datasources/vulnerablecode_local.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vulntotal/datasources/vulnerablecode_local.py b/vulntotal/datasources/vulnerablecode_local.py index 95ba63424..7b6046632 100644 --- a/vulntotal/datasources/vulnerablecode_local.py +++ b/vulntotal/datasources/vulnerablecode_local.py @@ -9,6 +9,7 @@ import logging import os +from typing import Optional from urllib.parse import urljoin import requests @@ -21,7 +22,7 @@ logger = logging.getLogger(__name__) -def _is_true(val: str | None) -> bool: +def _is_true(val: Optional[str]) -> bool: return (val is not None) and str(val).strip().lower() in {"1", "true", "yes", "on"}