Skip to content

Commit e6152cf

Browse files
committed
Add Elixir Security Live V2 Importer Pipeline #1933
* Add Elixir Security Live V2 Importer * Add tests for the Elixir Security Live V2 Importer * Tested functionally using the Live Evaluation API in #1969 Signed-off-by: Michael Ehab Mikhail <[email protected]>
1 parent dcb0511 commit e6152cf

File tree

4 files changed

+283
-13
lines changed

4 files changed

+283
-13
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
from vulnerabilities.pipelines.v2_importers import (
4848
elixir_security_importer as elixir_security_importer_v2,
4949
)
50+
from vulnerabilities.pipelines.v2_importers import (
51+
elixir_security_live_importer as elixir_security_live_importer_v2,
52+
)
5053
from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2
5154
from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2
5255
from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2
@@ -117,3 +120,9 @@
117120
oss_fuzz.OSSFuzzImporter,
118121
]
119122
)
123+
124+
LIVE_IMPORTERS_REGISTRY = create_registry(
125+
[
126+
elixir_security_live_importer_v2.ElixirSecurityLiveImporterPipeline,
127+
]
128+
)

vulnerabilities/pipelines/v2_importers/elixir_security_importer.py

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from univers.version_range import HexVersionRange
1818

1919
from vulnerabilities.importer import AdvisoryData
20-
from vulnerabilities.importer import AffectedPackageV2
20+
from vulnerabilities.importer import AffectedPackage
2121
from vulnerabilities.importer import ReferenceV2
2222
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
2323
from vulnerabilities.utils import is_cve
@@ -69,27 +69,44 @@ def on_failure(self):
6969

7070
def process_file(self, file, base_path) -> Iterable[AdvisoryData]:
7171
relative_path = str(file.relative_to(base_path)).strip("/")
72-
path_segments = str(file).split("/")
73-
# use the last two segments as the advisory ID
74-
advisory_id = "/".join(path_segments[-2:]).replace(".yml", "")
75-
advisory_url = (
76-
f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}"
77-
)
7872
advisory_text = None
7973
with open(str(file)) as f:
8074
advisory_text = f.read()
8175

8276
yaml_file = load_yaml(str(file))
8377

78+
# Delegate to shared builder
79+
yield from self.build_advisory_from_yaml(
80+
yaml_file=yaml_file,
81+
advisory_text=advisory_text or str(yaml_file),
82+
relative_path=relative_path,
83+
)
84+
85+
def build_advisory_from_yaml(
86+
self, yaml_file, advisory_text: str, relative_path: str
87+
) -> Iterable[AdvisoryData]:
88+
"""
89+
Build AdvisoryData objects from a parsed YAML mapping and the repo-relative path.
90+
relative_path example: "packages/<pkg>/<file>.yml"
91+
"""
92+
from pathlib import Path # ensure Path is available
93+
94+
path_segments = Path(relative_path).parts
95+
# use the last two segments as the advisory ID
96+
advisory_id = "/".join(path_segments[-2:]).replace(".yml", "")
97+
advisory_url = (
98+
f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}"
99+
)
100+
84101
summary = yaml_file.get("description") or ""
85102
pkg_name = yaml_file.get("package") or ""
86103

87104
cve_id = ""
88105
cve = yaml_file.get("cve") or ""
89-
if cve and not cve.startswith("CVE-"):
106+
if cve and not str(cve).startswith("CVE-"):
90107
cve_id = f"CVE-{cve}"
91108
elif cve:
92-
cve_id = cve
109+
cve_id = str(cve)
93110

94111
if not cve_id or not is_cve(cve_id):
95112
return
@@ -105,9 +122,12 @@ def process_file(self, file, base_path) -> Iterable[AdvisoryData]:
105122
patched_versions = yaml_file.get("patched_versions") or []
106123

107124
for version in unaffected_versions:
108-
constraints.append(VersionConstraint.from_string(version_class=vrc, string=version))
125+
constraints.append(
126+
VersionConstraint.from_string(version_class=vrc, string=str(version))
127+
)
109128

110129
for version in patched_versions:
130+
version = str(version)
111131
if version.startswith("~>"):
112132
version = version[2:]
113133
constraints.append(
@@ -117,15 +137,18 @@ def process_file(self, file, base_path) -> Iterable[AdvisoryData]:
117137
affected_packages = []
118138
if pkg_name:
119139
affected_packages.append(
120-
AffectedPackageV2(
140+
AffectedPackage(
121141
package=PackageURL(type="hex", name=pkg_name),
122142
affected_version_range=HexVersionRange(constraints=constraints),
123143
)
124144
)
125145

126146
date_published = None
127147
if yaml_file.get("disclosure_date"):
128-
date_published = dateparser.parse(yaml_file.get("disclosure_date"))
148+
disclosure = yaml_file.get("disclosure_date")
149+
if not isinstance(disclosure, str):
150+
disclosure = str(disclosure)
151+
date_published = dateparser.parse(disclosure)
129152

130153
yield AdvisoryData(
131154
advisory_id=advisory_id,
@@ -135,5 +158,5 @@ def process_file(self, file, base_path) -> Iterable[AdvisoryData]:
135158
affected_packages=affected_packages,
136159
url=advisory_url,
137160
date_published=date_published,
138-
original_advisory_text=advisory_text or str(yaml_file),
161+
original_advisory_text=advisory_text,
139162
)
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from typing import Iterable
11+
12+
import requests
13+
import yaml
14+
from packageurl import PackageURL
15+
from univers.versions import SemverVersion
16+
17+
from vulnerabilities.importer import AdvisoryData
18+
from vulnerabilities.pipelines.v2_importers.elixir_security_importer import (
19+
ElixirSecurityImporterPipeline,
20+
)
21+
22+
23+
class ElixirSecurityLiveImporterPipeline(ElixirSecurityImporterPipeline):
24+
"""
25+
Elixir Security Advisories Importer Pipeline
26+
27+
This pipeline imports security advisories for a single elixir PURL.
28+
"""
29+
30+
pipeline_id = "elixir_security_live_importer_v2"
31+
supported_types = ["hex"]
32+
33+
@classmethod
34+
def steps(cls):
35+
return (
36+
cls.get_purl_inputs,
37+
cls.collect_and_store_advisories,
38+
)
39+
40+
def get_purl_inputs(self):
41+
purl = self.inputs["purl"]
42+
if not purl:
43+
raise ValueError("PURL is required for ElixirSecurityLiveImporterPipeline")
44+
45+
if isinstance(purl, str):
46+
purl = PackageURL.from_string(purl)
47+
48+
if not isinstance(purl, PackageURL):
49+
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")
50+
51+
if purl.type not in self.supported_types:
52+
raise ValueError(
53+
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
54+
)
55+
56+
if not purl.version:
57+
raise ValueError(f"PURL: {purl!s} is expected to have a version")
58+
59+
self.purl = purl
60+
61+
def advisories_count(self) -> int:
62+
if self.purl.type != "hex":
63+
return 0
64+
65+
try:
66+
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{self.purl.name}"
67+
response = requests.get(directory_url)
68+
69+
if response.status_code != 200:
70+
return 0
71+
72+
yaml_files = [file for file in response.json() if file["name"].endswith(".yml")]
73+
return len(yaml_files)
74+
except Exception:
75+
return 0
76+
77+
def collect_advisories(self) -> Iterable[AdvisoryData]:
78+
if self.purl.type != "hex":
79+
self.log(f"PURL type {self.purl.type} is not supported by Elixir Security importer")
80+
return []
81+
82+
package_name = self.purl.name
83+
84+
try:
85+
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}"
86+
response = requests.get(directory_url)
87+
88+
if response.status_code != 200:
89+
self.log(f"No advisories found for {package_name} in Elixir Security Database")
90+
return []
91+
92+
yaml_entries = [file for file in response.json() if file["name"].endswith(".yml")]
93+
94+
for entry in yaml_entries:
95+
# entry["path"] looks like: packages/<pkg>/<file>.yml
96+
file_path = entry["path"]
97+
content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}"
98+
content_response = requests.get(
99+
content_url, headers={"Accept": "application/vnd.github.v3.raw"}
100+
)
101+
102+
if content_response.status_code != 200:
103+
self.log(f"Failed to fetch file content for {file_path}")
104+
continue
105+
106+
advisory_text = content_response.text
107+
108+
try:
109+
yaml_file = yaml.safe_load(advisory_text) or {}
110+
except Exception as e:
111+
self.log(f"Failed to parse YAML for {file_path}: {e}")
112+
continue
113+
114+
for advisory in self.build_advisory_from_yaml(
115+
yaml_file=yaml_file, advisory_text=advisory_text, relative_path=file_path
116+
):
117+
if self.purl.version and not self._advisory_affects_version(advisory):
118+
continue
119+
yield advisory
120+
121+
except Exception as e:
122+
self.log(f"Error fetching advisories for {self.purl}: {str(e)}")
123+
return []
124+
125+
def _advisory_affects_version(self, advisory: AdvisoryData) -> bool:
126+
if not self.purl.version:
127+
return True
128+
129+
for affected_package in advisory.affected_packages:
130+
if affected_package.affected_version_range:
131+
try:
132+
purl_version = SemverVersion(self.purl.version)
133+
134+
if purl_version in affected_package.affected_version_range:
135+
return True
136+
except Exception as e:
137+
self.log(f"Failed to parse version {self.purl.version}: {str(e)}")
138+
return True
139+
140+
return False
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import shutil
11+
from pathlib import Path
12+
from unittest.mock import MagicMock
13+
from unittest.mock import patch
14+
15+
import pytest
16+
from packageurl import PackageURL
17+
18+
from vulnerabilities.importer import AdvisoryData
19+
from vulnerabilities.pipelines.v2_importers.elixir_security_live_importer import (
20+
ElixirSecurityLiveImporterPipeline,
21+
)
22+
23+
24+
@pytest.fixture
25+
def test_data_dir():
26+
return Path(__file__).parent.parent.parent / "test_data" / "elixir_security"
27+
28+
29+
@patch("requests.get")
30+
def test_package_first_mode_with_version_filter(mock_get, test_data_dir):
31+
directory_response = MagicMock()
32+
directory_response.status_code = 200
33+
directory_response.json.return_value = [
34+
{"name": "test_file.yml", "path": "packages/coherence/test_file.yml"}
35+
]
36+
37+
advisory_file_path = test_data_dir / "test_file.yml"
38+
advisory_content = advisory_file_path.read_text()
39+
40+
content_response = MagicMock()
41+
content_response.status_code = 200
42+
content_response.text = advisory_content
43+
44+
mock_get.side_effect = [directory_response, content_response]
45+
46+
# Version affected
47+
purl = PackageURL(type="hex", name="coherence", version="0.5.1")
48+
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
49+
importer.get_purl_inputs()
50+
advisories = list(importer.collect_advisories())
51+
assert len(advisories) == 1
52+
53+
# Version not affected
54+
mock_get.side_effect = [directory_response, content_response]
55+
purl = PackageURL(type="hex", name="coherence", version="0.5.2")
56+
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
57+
importer.get_purl_inputs()
58+
advisories = list(importer.collect_advisories())
59+
assert len(advisories) == 0
60+
61+
62+
@patch("requests.get")
63+
def test_package_first_mode_no_advisories(mock_get):
64+
mock_response = MagicMock()
65+
mock_response.status_code = 404
66+
mock_get.return_value = mock_response
67+
68+
purl = PackageURL(type="hex", name="nonexistent-package")
69+
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
70+
with pytest.raises(ValueError):
71+
importer.get_purl_inputs()
72+
73+
74+
@patch("requests.get")
75+
def test_package_first_mode_api_error(mock_get):
76+
directory_response = MagicMock()
77+
directory_response.status_code = 200
78+
directory_response.json.return_value = [
79+
{"name": "test_file.yml", "path": "packages/coherence/test_file.yml"}
80+
]
81+
82+
content_response = MagicMock()
83+
content_response.status_code = 500
84+
85+
mock_get.side_effect = [directory_response, content_response]
86+
87+
purl = PackageURL(type="hex", name="coherence", version="0.5.1")
88+
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
89+
importer.get_purl_inputs()
90+
advisories = list(importer.collect_advisories())
91+
assert len(advisories) == 0
92+
93+
94+
def test_package_first_mode_non_hex_purl():
95+
purl = PackageURL(type="npm", name="some-package")
96+
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
97+
with pytest.raises(ValueError):
98+
importer.get_purl_inputs()

0 commit comments

Comments
 (0)