Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2
from vulnerabilities.pipelines.v2_importers import pypa_importer as pypa_importer_v2
from vulnerabilities.pipelines.v2_importers import pysec_importer as pysec_importer_v2
from vulnerabilities.pipelines.v2_importers import pysec_live_importer as pysec_live_importer_v2
from vulnerabilities.pipelines.v2_importers import redhat_importer as redhat_importer_v2
from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2
from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2
Expand Down Expand Up @@ -117,3 +118,9 @@
oss_fuzz.OSSFuzzImporter,
]
)

LIVE_IMPORTERS_REGISTRY = create_registry(
[
pysec_live_importer_v2.PySecLiveImporterPipeline,
]
)
122 changes: 122 additions & 0 deletions vulnerabilities/pipelines/v2_importers/pysec_live_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
#

from io import BytesIO
from typing import Iterable
from zipfile import ZipFile

from packageurl import PackageURL
from univers.versions import PypiVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines.v2_importers.pysec_importer import PyPIImporterPipeline


class PySecLiveImporterPipeline(PyPIImporterPipeline):
"""
PySec Live Importer Pipeline

Collect advisories from OSV PyPI zip for a single PURL.
"""

pipeline_id = "pysec_live_importer_v2"
supported_types = ["pypi"]

@classmethod
def steps(cls):
return (
cls.get_purl_inputs,
cls.fetch_zip,
cls.collect_and_store_advisories,
)

def get_purl_inputs(self):
purl = self.inputs["purl"]
if not purl:
raise ValueError("PURL is required for PySecLiveImporterPipeline")

if isinstance(purl, str):
purl = PackageURL.from_string(purl)

if not isinstance(purl, PackageURL):
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")

if purl.type not in self.supported_types:
raise ValueError(
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
)

if not purl.version:
raise ValueError(f"PURL: {purl!s} is expected to have a version")

self.purl = purl

def _is_version_affected(self, advisory_dict, version):
affected = advisory_dict.get("affected", [])
try:
v = PypiVersion(version)
except Exception:
return False
Comment on lines +59 to +62
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catch only InvalidVersion instead of all errors.

for entry in affected:
ranges = entry.get("ranges", [])
for r in ranges:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r in for r in ranges: could be renamed to something clearer like version_range/data_range for readability.

Comment on lines +64 to +65
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ranges = entry.get("ranges", [])
for r in ranges:
for event in r.get("events", []):

events = r.get("events", [])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
events = r.get("events", [])

introduced = None
fixed = None
for event in events:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for event in events:
for event in r.get("events", []):

if "introduced" in event:
introduced = event["introduced"]
if "fixed" in event:
fixed = event["fixed"]
try:
if introduced:
introduced_v = PypiVersion(introduced)
if v < introduced_v:
continue
if fixed:
fixed_v = PypiVersion(fixed)
if v >= fixed_v:
continue
if introduced:
introduced_v = PypiVersion(introduced)
if (not fixed or v < PypiVersion(fixed)) and v >= introduced_v:
return True
except Exception:
continue
Comment on lines +87 to +88
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More precise exception handling

return False

def collect_advisories(self) -> Iterable[AdvisoryData]:
from vulnerabilities.importers.osv import parse_advisory_data_v2

with ZipFile(BytesIO(self.advisory_zip)) as zip_file:
for file_name in zip_file.namelist():
if not file_name.startswith("PYSEC-"):
continue
with zip_file.open(file_name) as f:
import json
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move import json to the top


advisory_dict = json.load(f)

affected = advisory_dict.get("affected", [])
found = False
for entry in affected:
pkg = entry.get("package", {})
if pkg.get("name") == self.purl.name:
found = True
break
if not found:
continue
if not self._is_version_affected(advisory_dict, self.purl.version):
continue

f.seek(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need f.seek(0) here? Could we just read the file once instead?

advisory_text = f.read().decode("utf-8")
yield parse_advisory_data_v2(
raw_data=advisory_dict,
supported_ecosystems=["pypi"],
advisory_url=self.url,
advisory_text=advisory_text,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import json
from io import BytesIO
from unittest.mock import patch
from zipfile import ZipFile

import pytest
from packageurl import PackageURL

from vulnerabilities.importer import AdvisoryData


@pytest.fixture
def mock_zip_data():
# Create a zip with two advisories for the same package with different versions
zip_buffer = BytesIO()
with ZipFile(zip_buffer, mode="w") as zip_file:
advisory1 = {
"advisory_id": "PYSEC-1001",
"summary": "Vuln in foo",
"affected": [
{
"package": {"name": "foo", "ecosystem": "PyPI"},
"ranges": [
{
"type": "ECOSYSTEM",
"events": [{"introduced": "1.0.0"}, {"fixed": "2.0.0"}],
}
],
}
],
}
advisory2 = {
"advisory_id": "PYSEC-1002",
"summary": "Vuln in foo, later version",
"affected": [
{
"package": {"name": "foo", "ecosystem": "PyPI"},
"ranges": [
{
"type": "ECOSYSTEM",
"events": [{"introduced": "2.5.0"}, {"fixed": "3.0.0"}],
}
],
}
],
}
advisory3 = {
"advisory_id": "PYSEC-2000",
"summary": "Vuln in bar",
"affected": [
{
"package": {"name": "bar", "ecosystem": "PyPI"},
"ranges": [
{
"type": "ECOSYSTEM",
"events": [{"introduced": "0.1.0"}, {"fixed": "0.2.0"}],
}
],
}
],
}
zip_file.writestr("PYSEC-1001.json", json.dumps(advisory1))
zip_file.writestr("PYSEC-1002.json", json.dumps(advisory2))
zip_file.writestr("PYSEC-2000.json", json.dumps(advisory3))
zip_buffer.seek(0)
return zip_buffer


def test_package_with_version_affected(mock_zip_data):
from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline

Comment on lines +70 to +71
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move import PySecLiveImporterPipeline to the top of the test

Suggested change
from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline

purl = PackageURL(type="pypi", name="foo", version="1.5.0")

with patch("requests.get") as mock_get:
mock_get.return_value.content = mock_zip_data.read()

with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse:

def parse_side_effect(raw_data, supported_ecosystems, advisory_url, advisory_text):
return AdvisoryData(
advisory_id=raw_data["advisory_id"],
summary=raw_data["summary"],
references_v2=[{"url": advisory_url}],
affected_packages=[],
weaknesses=[],
url=advisory_url,
)

mock_parse.side_effect = parse_side_effect

pipeline = PySecLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
pipeline.fetch_zip()
advisories = list(pipeline.collect_advisories())

# Only PYSEC-1001 should match
assert len(advisories) == 1
assert advisories[0].advisory_id == "PYSEC-1001"


def test_package_with_version_not_affected(mock_zip_data):
from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline


purl = PackageURL(type="pypi", name="foo", version="2.2.0")

with patch("requests.get") as mock_get:
mock_get.return_value.content = mock_zip_data.read()

with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse:
mock_parse.return_value = AdvisoryData(
advisory_id="PYSEC-1002",
summary="Vuln in foo, later version",
references_v2=[{"url": "dummy"}],
affected_packages=[],
weaknesses=[],
url="dummy",
)

pipeline = PySecLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
pipeline.fetch_zip()
advisories = list(pipeline.collect_advisories())

# No advisories should match
assert len(advisories) == 0


def test_nonexistent_package(mock_zip_data):
from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline

Comment on lines +129 to +130
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline

purl = PackageURL(type="pypi", name="baz", version="1.0.0")

with patch("requests.get") as mock_get:
mock_get.return_value.content = mock_zip_data.read()

pipeline = PySecLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
pipeline.fetch_zip()
advisories = list(pipeline.collect_advisories())

assert len(advisories) == 0