Skip to content

Commit d803c6a

Browse files
committed
Add NPM Live V2 Importer Pipeline #1936
* Add NPM Live V2 Importer * Add tests for the NPM Live V2 Importer * Tested functionally using the Live Evaluation API in #1969 Signed-off-by: Michael Ehab Mikhail <[email protected]>
1 parent dcb0511 commit d803c6a

File tree

3 files changed

+228
-0
lines changed

3 files changed

+228
-0
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2
5353
from vulnerabilities.pipelines.v2_importers import mozilla_importer as mozilla_importer_v2
5454
from vulnerabilities.pipelines.v2_importers import npm_importer as npm_importer_v2
55+
from vulnerabilities.pipelines.v2_importers import npm_live_importer as npm_live_importer_v2
5556
from vulnerabilities.pipelines.v2_importers import nvd_importer as nvd_importer_v2
5657
from vulnerabilities.pipelines.v2_importers import oss_fuzz as oss_fuzz_v2
5758
from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2
@@ -117,3 +118,9 @@
117118
oss_fuzz.OSSFuzzImporter,
118119
]
119120
)
121+
122+
LIVE_IMPORTERS_REGISTRY = create_registry(
123+
[
124+
npm_live_importer_v2.NpmLiveImporterPipeline,
125+
]
126+
)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from pathlib import Path
11+
from typing import Iterable
12+
13+
from packageurl import PackageURL
14+
from univers.versions import SemverVersion
15+
16+
from vulnerabilities.importer import AdvisoryData
17+
from vulnerabilities.pipelines.v2_importers.npm_importer import NpmImporterPipeline
18+
from vulnerabilities.utils import load_json
19+
20+
21+
class NpmLiveImporterPipeline(NpmImporterPipeline):
22+
"""
23+
Node.js Security Working Group importer pipeline
24+
25+
Import advisories from nodejs security working group including node proper advisories and npm advisories for a single PURL.
26+
"""
27+
28+
pipeline_id = "nodejs_security_wg_live_importer"
29+
supported_types = ["npm"]
30+
31+
@classmethod
32+
def steps(cls):
33+
return (
34+
cls.get_purl_inputs,
35+
cls.clone,
36+
cls.collect_and_store_advisories,
37+
cls.clean_downloads,
38+
)
39+
40+
def get_purl_inputs(self):
41+
purl = self.inputs["purl"]
42+
if not purl:
43+
raise ValueError("PURL is required for NpmLiveImporterPipeline")
44+
45+
if isinstance(purl, str):
46+
purl = PackageURL.from_string(purl)
47+
48+
if not isinstance(purl, PackageURL):
49+
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")
50+
51+
if purl.type not in self.supported_types:
52+
raise ValueError(
53+
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
54+
)
55+
56+
if not purl.version:
57+
raise ValueError(f"PURL: {purl!s} is expected to have a version")
58+
59+
self.purl = purl
60+
61+
def collect_advisories(self) -> Iterable[AdvisoryData]:
62+
vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm"
63+
advisory_files = list(vuln_directory.glob("*.json"))
64+
65+
package_name = self.purl.name
66+
filtered_files = []
67+
for advisory_file in advisory_files:
68+
try:
69+
data = load_json(advisory_file)
70+
if data.get("module_name") == package_name:
71+
affected_package = self.get_affected_package(data, package_name)
72+
if not self.purl.version or self._version_is_affected(affected_package):
73+
filtered_files.append(advisory_file)
74+
except Exception as e:
75+
self.log(f"Error processing advisory file {advisory_file}: {str(e)}")
76+
advisory_files = filtered_files
77+
78+
for advisory in list(advisory_files):
79+
result = self.to_advisory_data(advisory)
80+
if result:
81+
yield result
82+
83+
def _version_is_affected(self, affected_package):
84+
if not self.purl.version or not affected_package.affected_version_range:
85+
return True
86+
87+
purl_version = SemverVersion(self.purl.version)
88+
return purl_version in affected_package.affected_version_range
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import json
11+
import os
12+
from pathlib import Path
13+
from types import SimpleNamespace
14+
15+
import pytest
16+
from packageurl import PackageURL
17+
from univers.version_constraint import VersionConstraint
18+
from univers.version_range import NpmVersionRange
19+
from univers.versions import SemverVersion
20+
21+
from vulnerabilities.importer import AffectedPackageV2
22+
from vulnerabilities.pipelines.v2_importers.npm_live_importer import NpmLiveImporterPipeline
23+
24+
TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "npm"
25+
26+
27+
def test_package_first_mode_valid_npm_package(tmp_path):
28+
vuln_dir = tmp_path / "vuln" / "npm"
29+
vuln_dir.mkdir(parents=True)
30+
31+
npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json")
32+
with open(npm_sample_file) as f:
33+
sample_data = json.load(f)
34+
35+
advisory_file = vuln_dir / "152.json"
36+
advisory_file.write_text(json.dumps(sample_data))
37+
38+
mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None)
39+
40+
purl = PackageURL(type="npm", name="npm", version="1.2.0")
41+
pipeline = NpmLiveImporterPipeline(purl=purl)
42+
pipeline.vcs_response = mock_vcs_response
43+
44+
pipeline.get_purl_inputs()
45+
advisories = list(pipeline.collect_advisories())
46+
47+
assert len(advisories) == 1
48+
assert advisories[0].aliases == ["CVE-2013-4116"]
49+
assert len(advisories[0].affected_packages) == 1
50+
assert advisories[0].affected_packages[0].package.name == "npm"
51+
52+
53+
def test_package_first_mode_unaffected_version(tmp_path):
54+
vuln_dir = tmp_path / "vuln" / "npm"
55+
vuln_dir.mkdir(parents=True)
56+
57+
npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json")
58+
with open(npm_sample_file) as f:
59+
sample_data = json.load(f)
60+
61+
advisory_file = vuln_dir / "152.json"
62+
advisory_file.write_text(json.dumps(sample_data))
63+
64+
mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None)
65+
66+
purl = PackageURL(type="npm", name="npm", version="1.4.0")
67+
pipeline = NpmLiveImporterPipeline(purl=purl)
68+
pipeline.vcs_response = mock_vcs_response
69+
70+
pipeline.get_purl_inputs()
71+
advisories = list(pipeline.collect_advisories())
72+
73+
assert len(advisories) == 0
74+
75+
76+
def test_package_first_mode_invalid_package_type(tmp_path):
77+
vuln_dir = tmp_path / "vuln" / "npm"
78+
vuln_dir.mkdir(parents=True)
79+
80+
mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None)
81+
82+
purl = PackageURL(type="pypi", name="django", version="3.0.0")
83+
pipeline = NpmLiveImporterPipeline(purl=purl)
84+
pipeline.vcs_response = mock_vcs_response
85+
86+
with pytest.raises(ValueError):
87+
pipeline.get_purl_inputs()
88+
89+
90+
def test_package_first_mode_package_not_found(tmp_path):
91+
vuln_dir = tmp_path / "vuln" / "npm"
92+
vuln_dir.mkdir(parents=True)
93+
94+
npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json")
95+
with open(npm_sample_file) as f:
96+
sample_data = json.load(f)
97+
98+
sample_data["module_name"] = "some-other-package"
99+
100+
advisory_file = vuln_dir / "152.json"
101+
advisory_file.write_text(json.dumps(sample_data))
102+
103+
mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None)
104+
105+
purl = PackageURL(type="npm", name="nonexistent-package", version="1.0.0")
106+
pipeline = NpmLiveImporterPipeline(purl=purl)
107+
pipeline.vcs_response = mock_vcs_response
108+
109+
pipeline.get_purl_inputs()
110+
advisories = list(pipeline.collect_advisories())
111+
112+
assert len(advisories) == 0
113+
114+
115+
def test_version_is_affected():
116+
purl = PackageURL(type="npm", name="npm", version="1.2.0")
117+
pipeline = NpmLiveImporterPipeline(purl=purl)
118+
pipeline.get_purl_inputs()
119+
120+
affected_package = AffectedPackageV2(
121+
package=PackageURL(type="npm", name="npm"),
122+
affected_version_range=NpmVersionRange(
123+
constraints=(VersionConstraint(comparator="<", version=SemverVersion(string="1.3.3")),)
124+
),
125+
)
126+
127+
assert pipeline._version_is_affected(affected_package) == True
128+
129+
pipeline.purl = PackageURL(type="npm", name="npm", version="1.4.0")
130+
assert pipeline._version_is_affected(affected_package) == False
131+
132+
pipeline.purl = PackageURL(type="npm", name="npm")
133+
assert pipeline._version_is_affected(affected_package) == True

0 commit comments

Comments
 (0)