Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulntotal/datasources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from vulntotal.datasources import safetydb
from vulntotal.datasources import snyk
from vulntotal.datasources import vulnerablecode
from vulntotal.datasources import vulnerablecode_local
from vulntotal.validator import DataSource

DATASOURCE_REGISTRY = {
Expand All @@ -26,4 +27,5 @@
"osv": osv.OSVDataSource,
"snyk": snyk.SnykDataSource,
"vulnerablecode": vulnerablecode.VulnerableCodeDataSource,
"vulnerablecode_local": vulnerablecode_local.LocalVulnerableCodeDataSource,
}
122 changes: 122 additions & 0 deletions vulntotal/datasources/vulnerablecode_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging
import os
from typing import Optional
from urllib.parse import urljoin

import requests
from dotenv import load_dotenv
from packageurl import PackageURL

from vulntotal.datasources.vulnerablecode import VulnerableCodeDataSource
from vulntotal.validator import VendorData

logger = logging.getLogger(__name__)


def _is_true(val: Optional[str]) -> bool:
return (val is not None) and str(val).strip().lower() in {"1", "true", "yes", "on"}


class LocalVulnerableCodeDataSource(VulnerableCodeDataSource):
live_eval_api_path = "api/v2/live-evaluation/evaluate"
vc_purl_search_api_path = "api/v2/advisories-packages/bulk_search/"

def __init__(self):
super().__init__()
load_dotenv()

host = os.environ.get("VCIO_HOST", "localhost").rstrip("/")
port = os.environ.get("VCIO_PORT", "8000")

if host.startswith("http://") or host.startswith("https://"):
base = host
else:
base = f"http://{host}:{port}"

self.global_instance = f"{base}/"

self._enable_live_eval = _is_true(os.environ.get("ENABLE_LIVE_EVAL", False))

def _trigger_live_evaluation(self, purl: PackageURL) -> bool:
"""Trigger live evaluation for the given purl on the local VCIO instance.

Returns True if the trigger was accepted and False otherwise.
"""
url = urljoin(self.global_instance, self.live_eval_api_path)
try:
response = requests.post(url, json={"purl_string": str(purl)})
except Exception as e:
logger.error(f"Live evaluation trigger failed for {purl}: {e}")
return False

if response.status_code != 202:
logger.error(
f"Live evaluation trigger for {purl} failed with status {response.status_code}: {response.text}"
)
return False

logger.info(f"Live evaluation accepted for {purl} on {url}")
return True

def fetch_post_json(self, payload):
url = urljoin(self.global_instance, self.vc_purl_search_api_path)
try:
response = requests.post(url, json=payload)
except Exception as e:
logger.error(f"Error while fetching {url}: {e}")
return
if response.status_code != 200:
logger.error(f"Error while fetching {url}")
return
return response.json()

def datasource_advisory(self, purl):
if purl.type not in self.supported_ecosystem() or purl.version is None:
return

if self._enable_live_eval:
self._trigger_live_evaluation(purl)

metadata = self.fetch_post_json({"purls": [str(purl)]})
self._raw_dump.append(metadata)
if not metadata:
return

packages = metadata.get("packages") or []
advisories_map = metadata.get("advisories") or {}
if not packages:
return

pkg_entry = next((pkg for pkg in packages if pkg.get("purl") == str(purl)), packages[0])
affected_map = pkg_entry.get("affected_by_vulnerabilities", {}) or {}

for advisory_id, details in affected_map.items():
fixed_versions = []
fixed_purls = details.get("fixed_by_packages") or []
for fp in fixed_purls:
try:
ver = PackageURL.from_string(fp).version
if ver:
fixed_versions.append(ver)
except Exception:
continue

advisory_key = advisory_id.split("/")[-1]
advisory_obj = advisories_map.get(advisory_key, {})
aliases = advisory_obj.get("aliases") or []

yield VendorData(
purl=PackageURL(purl.type, purl.namespace, purl.name),
aliases=aliases,
affected_versions=[purl.version],
fixed_versions=fixed_versions,
)
142 changes: 142 additions & 0 deletions vulntotal/tests/test_vulnerablecode_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
from typing import List

import pytest
from packageurl import PackageURL

from vulntotal.datasources.vulnerablecode_local import LocalVulnerableCodeDataSource


class FakeResponse:
def __init__(self, status_code=200, json_data=None, text=""):
self.status_code = status_code
self._json = json_data
self.text = text or ("" if json_data is None else str(json_data))

def json(self):
return self._json


def make_v2_advisories_response(
pkg_purl: str, advisory_id: str, aliases: List[str], fixes: List[str]
):
return {
"packages": [
{
"purl": pkg_purl,
"affected_by_vulnerabilities": {
"live_v2_importer_name/"
+ advisory_id: {
"advisory_id": "live_v2_importer_name/" + advisory_id,
"fixed_by_packages": fixes,
"code_fixes": [],
}
},
}
],
"advisories": {
advisory_id: {
"advisory_id": "live_v2_importer_name/" + advisory_id,
"aliases": aliases,
}
},
}


def test_local_vulnerablecode_v2_bulk_search_and_vendor_data(monkeypatch):
monkeypatch.setenv("VCIO_HOST", "localhost")
monkeypatch.setenv("VCIO_PORT", "1234")
monkeypatch.setenv("ENABLE_LIVE_EVAL", "0")

calls = []

def fake_post(url, json=None, **kwargs):
calls.append((url, json))
if url.endswith("/api/v2/advisories-packages/bulk_search/"):
return FakeResponse(
200,
make_v2_advisories_response(
pkg_purl="pkg:pypi/[email protected]",
advisory_id="ADV-123",
aliases=["CVE-2024-0001", "GHSA-foo"],
fixes=["pkg:pypi/[email protected]", "pkg:pypi/[email protected]"],
),
)

return FakeResponse(404, {"detail": "not found"})

monkeypatch.setattr("vulntotal.datasources.vulnerablecode_local.requests.post", fake_post)

ds = LocalVulnerableCodeDataSource()
purl = PackageURL.from_string("pkg:pypi/[email protected]")

results = list(ds.datasource_advisory(purl))

assert any(
"/api/v2/advisories-packages/bulk_search/" in url for url, _ in calls
), "v2 advisories bulk_search should be called"

assert not any(
"/api/v2/live-evaluation/evaluate" in url for url, _ in calls
), "live evaluation should not be called when disabled"

assert len(results) == 1
vd = results[0].to_dict()
assert vd["purl"] == "pkg:pypi/demo"
assert vd["aliases"] == ["CVE-2024-0001", "GHSA-foo"]
assert vd["affected_versions"] == ["1.2.3"]
assert sorted(vd["fixed_versions"]) == ["1.2.4", "1.3.0"]


def test_local_vulnerablecode_triggers_live_evaluation_when_enabled(monkeypatch):
monkeypatch.setenv("VCIO_HOST", "localhost")
monkeypatch.setenv("VCIO_PORT", "1234")
monkeypatch.setenv("ENABLE_LIVE_EVAL", "1")

calls = []

def fake_post(url, json=None, **kwargs): # noqa: A002 (shadowing builtins)
calls.append((url, json))
if url.endswith("/api/v2/live-evaluation/evaluate"):
return FakeResponse(202, {"status": "accepted"})
if url.endswith("/api/v2/advisories-packages/bulk_search/"):
return FakeResponse(
200,
make_v2_advisories_response(
pkg_purl="pkg:pypi/[email protected]",
advisory_id="ADV-999",
aliases=["CVE-2025-1111"],
fixes=["pkg:pypi/[email protected]"],
),
)
return FakeResponse(404, {"detail": "not found"})

monkeypatch.setattr("vulntotal.datasources.vulnerablecode_local.requests.post", fake_post)

ds = LocalVulnerableCodeDataSource()
purl = PackageURL.from_string("pkg:pypi/[email protected]")

results = list(ds.datasource_advisory(purl))

urls = [u for u, _ in calls]
assert any(
"/api/v2/live-evaluation/evaluate" in url for url in urls
), "live evaluation endpoint should be called when enabled"
assert any(
"/api/v2/advisories-packages/bulk_search/" in url for url in urls
), "v2 advisories bulk_search should be called"

assert len(results) == 1
vd = results[0].to_dict()
assert vd["aliases"] == ["CVE-2025-1111"]
assert vd["affected_versions"] == ["1.2.3"]
assert vd["fixed_versions"] == ["1.2.5"]