diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 82ee4525a..fb040019d 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -49,6 +49,7 @@ ) from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2 from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2 +from vulnerabilities.pipelines.v2_importers import gitlab_live_importer as gitlab_live_importer_v2 from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2 from vulnerabilities.pipelines.v2_importers import mozilla_importer as mozilla_importer_v2 from vulnerabilities.pipelines.v2_importers import npm_importer as npm_importer_v2 @@ -117,3 +118,9 @@ oss_fuzz.OSSFuzzImporter, ] ) + +LIVE_IMPORTERS_REGISTRY = create_registry( + [ + gitlab_live_importer_v2.GitLabLiveImporterPipeline, + ] +) diff --git a/vulnerabilities/pipelines/v2_importers/gitlab_advisory_utils.py b/vulnerabilities/pipelines/v2_importers/gitlab_advisory_utils.py new file mode 100644 index 000000000..f70f3cca7 --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/gitlab_advisory_utils.py @@ -0,0 +1,168 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import json +import logging +import traceback + +import pytz +from dateutil import parser as dateparser +from univers.version_range import RANGE_CLASS_BY_SCHEMES +from univers.version_range import from_gitlab_native + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackageV2 +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.utils import build_description +from vulnerabilities.utils import get_cwe_id + + +def advisory_dict_to_advisory_data( + advisory: dict, + *, + purl_type_by_gitlab_scheme, + gitlab_scheme_by_purl_type, + logger, + get_purl_fn, + purl=None, + advisory_url=None, +): + """ + Convert a GitLab advisory mapping (already loaded from YAML or JSON) to an + `AdvisoryData` instance. + Returns None when no affected or fixed version range can be derived. + + Parameters: + - advisory: dict per GitLab schema (identifier, package_slug, ...) + - purl_type_by_gitlab_scheme: mapping of GitLab package type to PackageURL type + - gitlab_scheme_by_purl_type: inverse mapping of PackageURL type to GitLab type + - logger: callable like pipeline.log(message, level=logging.LEVEL) + - get_purl_fn: function to build a version-less PURL from package_slug + - purl: optional PURL (may include version); used only for context, ranges use + a version-less PURL derived from package_slug via get_purl_fn + - advisory_url: optional URL; if not provided, a default URL will be built when possible + """ + + aliases = list(advisory.get("identifiers", []) or []) + identifier = advisory.get("identifier") or "" + package_slug = advisory.get("package_slug") + + advisory_id = f"{package_slug}/{identifier}" if package_slug else identifier + if advisory_id in aliases: + try: + aliases.remove(advisory_id) + except ValueError: + pass + + summary = build_description(advisory.get("title"), advisory.get("description")) + urls = advisory.get("urls") or [] + references = [ReferenceV2.from_url(u) for u in urls] + + cwe_ids = advisory.get("cwe_ids") or [] + cwe_list = list(map(get_cwe_id, cwe_ids)) + + date_published = dateparser.parse(advisory.get("pubdate")) if advisory.get("pubdate") else None + if date_published: + date_published = date_published.replace(tzinfo=pytz.UTC) + + # Prefer a version-less PURL derived from package_slug for affected/fixed ranges + purl_for_package = None + if package_slug: + purl_for_package = get_purl_fn( + package_slug=package_slug, + purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme, + logger=logger, + ) + + if not purl_for_package: + logger( + f"advisory_dict_to_advisory_data: purl is not valid: {package_slug!r}", + level=logging.ERROR, + ) + return AdvisoryData( + advisory_id=advisory_id, + aliases=aliases, + summary=summary, + references_v2=references, + date_published=date_published, + url=advisory_url, + original_advisory_text=json.dumps(advisory, indent=2, ensure_ascii=False), + ) + + # Compute affected and fixed ranges + affected_version_range = None + fixed_versions = advisory.get("fixed_versions") or [] + affected_range = advisory.get("affected_range") + gitlab_native_schemes = {"pypi", "gem", "npm", "go", "packagist", "conan"} + vrc = RANGE_CLASS_BY_SCHEMES[purl_for_package.type] + gitlab_scheme = gitlab_scheme_by_purl_type[purl_for_package.type] + try: + if affected_range: + if gitlab_scheme in gitlab_native_schemes: + affected_version_range = from_gitlab_native( + gitlab_scheme=gitlab_scheme, string=affected_range + ) + else: + affected_version_range = vrc.from_native(affected_range) + except Exception as e: + logger( + ( + "advisory_dict_to_advisory_data: affected_range is not parsable: " + f"{affected_range!r} for: {purl_for_package!s} error: {e!r}\n {traceback.format_exc()}" + ), + level=logging.ERROR, + ) + + parsed_fixed_versions = [] + for fixed_version in fixed_versions: + try: + fixed_version = vrc.version_class(fixed_version) + parsed_fixed_versions.append(fixed_version.string) + except Exception as e: + logger( + ( + "advisory_dict_to_advisory_data: fixed_version is not parsable`: " + f"{fixed_version!r} error: {e!r}\n {traceback.format_exc()}" + ), + level=logging.ERROR, + ) + + if affected_version_range: + vrc = affected_version_range.__class__ + + fixed_version_range = vrc.from_versions(parsed_fixed_versions) + if not fixed_version_range and not affected_version_range: + return + + affected_package = AffectedPackageV2( + package=purl_for_package, + affected_version_range=affected_version_range, + fixed_version_range=fixed_version_range, + ) + + # Build a default advisory URL if not provided + if not advisory_url and package_slug and identifier: + from urllib.parse import urljoin + + advisory_url = urljoin( + "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/", + package_slug + "/" + identifier + ".yml", + ) + + return AdvisoryData( + advisory_id=advisory_id, + aliases=aliases, + summary=summary, + references_v2=references, + date_published=date_published, + affected_packages=[affected_package], + weaknesses=cwe_list, + url=advisory_url, + original_advisory_text=json.dumps(advisory, indent=2, ensure_ascii=False), + ) diff --git a/vulnerabilities/pipelines/v2_importers/gitlab_importer.py b/vulnerabilities/pipelines/v2_importers/gitlab_importer.py index e28ab7520..aa8d211ab 100644 --- a/vulnerabilities/pipelines/v2_importers/gitlab_importer.py +++ b/vulnerabilities/pipelines/v2_importers/gitlab_importer.py @@ -7,28 +7,21 @@ # See https://aboutcode.org for more information about nexB OSS projects. # -import json import logging -import traceback from pathlib import Path from typing import Iterable from typing import Tuple -import pytz import saneyaml -from dateutil import parser as dateparser from fetchcode.vcs import fetch_via_vcs from packageurl import PackageURL -from univers.version_range import RANGE_CLASS_BY_SCHEMES -from univers.version_range import from_gitlab_native from vulnerabilities.importer import AdvisoryData -from vulnerabilities.importer import AffectedPackageV2 -from vulnerabilities.importer import ReferenceV2 from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 -from vulnerabilities.utils import build_description +from vulnerabilities.pipelines.v2_importers.gitlab_advisory_utils import ( + advisory_dict_to_advisory_data as shared_advisory_dict_to_advisory_data, +) from vulnerabilities.utils import get_advisory_url -from vulnerabilities.utils import get_cwe_id class GitLabImporterPipeline(VulnerableCodeBaseImporterPipelineV2): @@ -208,97 +201,18 @@ def parse_gitlab_advisory( ) return - # refer to schema here https://gitlab.com/gitlab-org/advisories-community/-/blob/main/ci/schema/schema.json - aliases = gitlab_advisory.get("identifiers") - advisory_id = gitlab_advisory.get("identifier") - package_slug = gitlab_advisory.get("package_slug") - advisory_id = f"{package_slug}/{advisory_id}" if package_slug else advisory_id - if advisory_id in aliases: - aliases.remove(advisory_id) - summary = build_description(gitlab_advisory.get("title"), gitlab_advisory.get("description")) - urls = gitlab_advisory.get("urls") - references = [ReferenceV2.from_url(u) for u in urls] - - cwe_ids = gitlab_advisory.get("cwe_ids") or [] - cwe_list = list(map(get_cwe_id, cwe_ids)) - - date_published = dateparser.parse(gitlab_advisory.get("pubdate")) - date_published = date_published.replace(tzinfo=pytz.UTC) + # Build a stable URL to the advisory file within the repo for traceability advisory_url = get_advisory_url( file=file, base_path=base_path, url="https://gitlab.com/gitlab-org/advisories-community/-/blob/main/", ) - purl: PackageURL = get_purl( - package_slug=package_slug, + + return shared_advisory_dict_to_advisory_data( + advisory=gitlab_advisory, purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme, + gitlab_scheme_by_purl_type=gitlab_scheme_by_purl_type, logger=logger, - ) - if not purl: - logger( - f"parse_yaml_file: purl is not valid: {file!r} {package_slug!r}", level=logging.ERROR - ) - return AdvisoryData( - advisory_id=advisory_id, - aliases=aliases, - summary=summary, - references_v2=references, - date_published=date_published, - url=advisory_url, - original_advisory_text=json.dumps(gitlab_advisory, indent=2, ensure_ascii=False), - ) - affected_version_range = None - fixed_versions = gitlab_advisory.get("fixed_versions") or [] - affected_range = gitlab_advisory.get("affected_range") - gitlab_native_schemes = set(["pypi", "gem", "npm", "go", "packagist", "conan"]) - vrc = RANGE_CLASS_BY_SCHEMES[purl.type] - gitlab_scheme = gitlab_scheme_by_purl_type[purl.type] - try: - if affected_range: - if gitlab_scheme in gitlab_native_schemes: - affected_version_range = from_gitlab_native( - gitlab_scheme=gitlab_scheme, string=affected_range - ) - else: - affected_version_range = vrc.from_native(affected_range) - except Exception as e: - logger( - f"parse_yaml_file: affected_range is not parsable: {affected_range!r} for: {purl!s} error: {e!r}\n {traceback.format_exc()}", - level=logging.ERROR, - ) - - parsed_fixed_versions = [] - for fixed_version in fixed_versions: - try: - fixed_version = vrc.version_class(fixed_version) - parsed_fixed_versions.append(fixed_version.string) - except Exception as e: - logger( - f"parse_yaml_file: fixed_version is not parsable`: {fixed_version!r} error: {e!r}\n {traceback.format_exc()}", - level=logging.ERROR, - ) - - if affected_version_range: - vrc = affected_version_range.__class__ - - fixed_version_range = vrc.from_versions(parsed_fixed_versions) - if not fixed_version_range and not affected_version_range: - return - - affected_package = AffectedPackageV2( - package=purl, - affected_version_range=affected_version_range, - fixed_version_range=fixed_version_range, - ) - - return AdvisoryData( - advisory_id=advisory_id, - aliases=aliases, - summary=summary, - references_v2=references, - date_published=date_published, - affected_packages=[affected_package], - weaknesses=cwe_list, - url=advisory_url, - original_advisory_text=json.dumps(gitlab_advisory, indent=2, ensure_ascii=False), + get_purl_fn=get_purl, + advisory_url=advisory_url, ) diff --git a/vulnerabilities/pipelines/v2_importers/gitlab_live_importer.py b/vulnerabilities/pipelines/v2_importers/gitlab_live_importer.py new file mode 100644 index 000000000..3134f2bc9 --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/gitlab_live_importer.py @@ -0,0 +1,117 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from typing import Iterable + +from packageurl import PackageURL + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 +from vulnerabilities.pipelines.v2_importers.gitlab_advisory_utils import ( + advisory_dict_to_advisory_data as shared_advisory_dict_to_advisory_data, +) +from vulnerabilities.pipelines.v2_importers.gitlab_importer import get_purl +from vulntotal.datasources.gitlab import get_casesensitive_slug +from vulntotal.datasources.gitlab_api import fetch_gitlab_advisories_for_purl +from vulntotal.datasources.gitlab_api import get_estimated_advisories_count + + +class GitLabLiveImporterPipeline(VulnerableCodeBaseImporterPipelineV2): + """ + GitLab Live Importer Pipeline + + Collect advisory from GitLab Advisory Database (Open Source Edition) for a single PURL. + """ + + pipeline_id = "gitlab_live_importer_v2" + spdx_license_expression = "MIT" + license_url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/LICENSE" + supported_types = ["pypi", "npm", "maven", "nuget", "composer", "conan", "gem"] + + @classmethod + def steps(cls): + return ( + cls.get_purl_inputs, + cls.collect_and_store_advisories, + ) + + def get_purl_inputs(self): + purl = self.inputs["purl"] + if not purl: + raise ValueError("PURL is required for GitLabLiveImporterPipeline") + + if isinstance(purl, str): + purl = PackageURL.from_string(purl) + + if not isinstance(purl, PackageURL): + raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance") + + if purl.type not in self.supported_types: + raise ValueError( + f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}" + ) + + if not purl.version: + raise ValueError(f"PURL: {purl!s} is expected to have a version") + + self.purl = purl + + purl_type_by_gitlab_scheme = { + "conan": "conan", + "gem": "gem", + # Entering issue to parse go package names https://github.com/nexB/vulnerablecode/issues/742 + # "go": "golang", + "maven": "maven", + "npm": "npm", + "nuget": "nuget", + "packagist": "composer", + "pypi": "pypi", + } + + gitlab_scheme_by_purl_type = {v: k for k, v in purl_type_by_gitlab_scheme.items()} + + def advisories_count(self): + return get_estimated_advisories_count( + self.purl, self.gitlab_scheme_by_purl_type, get_casesensitive_slug + ) + + def collect_advisories(self) -> Iterable[AdvisoryData]: + advisories = fetch_gitlab_advisories_for_purl( + self.purl, self.gitlab_scheme_by_purl_type, get_casesensitive_slug + ) + + for advisory in advisories: + advisory_data = self._advisory_dict_to_advisory_data(advisory) + if not advisory_data: + continue + # Filter by the input version: keep only advisories where the given version is affected + from univers.version_range import RANGE_CLASS_BY_SCHEMES + + input_version = self.purl.version + vrc = RANGE_CLASS_BY_SCHEMES[self.purl.type] + version_obj = vrc.version_class(input_version) if input_version else None + + affected = False + for affected_package in advisory_data.affected_packages: + vrange = affected_package.affected_version_range + if vrange and version_obj in vrange: + affected = True + break + if affected: + yield advisory_data + + def _advisory_dict_to_advisory_data(self, advisory): + return shared_advisory_dict_to_advisory_data( + advisory=advisory, + purl_type_by_gitlab_scheme=self.purl_type_by_gitlab_scheme, + gitlab_scheme_by_purl_type=self.gitlab_scheme_by_purl_type, + logger=self.log, + get_purl_fn=get_purl, + purl=self.purl, + ) diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_gitlab_live_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_gitlab_live_importer_v2.py new file mode 100644 index 000000000..dce68a8ee --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_gitlab_live_importer_v2.py @@ -0,0 +1,48 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# + +from pathlib import Path +from unittest import mock + +import saneyaml +from packageurl import PackageURL + +from vulnerabilities.pipelines.v2_importers.gitlab_live_importer import GitLabLiveImporterPipeline +from vulnerabilities.tests import util_tests + +TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "gitlab" + + +@mock.patch( + "vulnerabilities.pipelines.v2_importers.gitlab_live_importer.fetch_gitlab_advisories_for_purl" +) +def test_gitlab_importer_package_first_mode_found_with_version(mock_fetch): + pkg_type = "pypi" + response_file = TEST_DATA / f"{pkg_type}.yaml" + expected_file = TEST_DATA / f"{pkg_type}-live-importer-expected.json" + + with open(response_file) as f: + advisory_dict = saneyaml.load(f) + + mock_fetch.return_value = [advisory_dict] + purl = PackageURL(type="pypi", name="flask", version="0.9") + pipeline = GitLabLiveImporterPipeline(purl=purl) + pipeline.get_purl_inputs() + advisories = list(pipeline.collect_advisories()) + util_tests.check_results_against_json(advisories[0].to_dict(), expected_file) + + +@mock.patch( + "vulnerabilities.pipelines.v2_importers.gitlab_live_importer.fetch_gitlab_advisories_for_purl" +) +def test_gitlab_importer_package_first_mode_none_found(mock_fetch): + mock_fetch.return_value = [] + purl = PackageURL(type="pypi", name="flask", version="1.2") + pipeline = GitLabLiveImporterPipeline(purl=purl) + pipeline.get_purl_inputs() + advisories = list(pipeline.collect_advisories()) + assert advisories == [] diff --git a/vulnerabilities/tests/test_data/gitlab/pypi-live-importer-expected.json b/vulnerabilities/tests/test_data/gitlab/pypi-live-importer-expected.json new file mode 100644 index 000000000..3d739217e --- /dev/null +++ b/vulnerabilities/tests/test_data/gitlab/pypi-live-importer-expected.json @@ -0,0 +1,35 @@ +{ + "advisory_id": "pypi/Flask/CVE-2019-1010083", + "aliases": ["CVE-2019-1010083"], + "summary": "Denial of service\nDenial of Service due to unexpected memory usage in the Pallets Project Flask", + "affected_packages": [ + { + "package": { + "type": "pypi", + "namespace": "", + "name": "flask", + "version": "", + "qualifiers": "", + "subpath": "" + }, + "affected_version_range": "vers:pypi/<1.0", + "fixed_version_range": "vers:pypi/1.0" + } + ], + "references_v2": [ + { + "reference_id": "CVE-2019-1010083", + "reference_type": "", + "url": "https://nvd.nist.gov/vuln/detail/CVE-2019-1010083" + }, + { + "reference_id": "", + "reference_type": "", + "url": "https://www.palletsprojects.com/blog/flask-1-0-released/" + } + ], + "severities": [], + "date_published": "2019-07-17T00:00:00+00:00", + "weaknesses": [1035, 937], + "url": "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/pypi/Flask/CVE-2019-1010083.yml" +} diff --git a/vulntotal/datasources/gitlab.py b/vulntotal/datasources/gitlab.py index dbf84dce7..8237ffc0d 100644 --- a/vulntotal/datasources/gitlab.py +++ b/vulntotal/datasources/gitlab.py @@ -19,6 +19,7 @@ from fetchcode import fetch from packageurl import PackageURL +from vulntotal.datasources.gitlab_api import fetch_gitlab_advisories_for_purl from vulntotal.validator import DataSource from vulntotal.validator import VendorData from vulntotal.vulntotal_utils import gitlab_constraints_satisfied @@ -40,18 +41,12 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]: Yields: VendorData instance containing the advisory information for the package. """ - package_slug = get_package_slug(purl) - directory_files = fetch_directory_contents(package_slug) - if not directory_files: - path = self.supported_ecosystem()[purl.type] - casesensitive_package_slug = get_casesensitive_slug(path, package_slug) - directory_files = fetch_directory_contents(casesensitive_package_slug) + advisories = fetch_gitlab_advisories_for_purl( + purl, self.supported_ecosystem(), get_casesensitive_slug + ) - if directory_files: - yml_files = [file for file in directory_files if file["name"].endswith(".yml")] - - interesting_advisories = parse_interesting_advisories(yml_files, purl) - return interesting_advisories + if advisories: + return parse_interesting_advisories(advisories, purl) @classmethod def supported_ecosystem(cls): @@ -67,45 +62,6 @@ def supported_ecosystem(cls): } -def fetch_directory_contents(package_slug): - url = f"https://gitlab.com/api/v4/projects/12006272/repository/tree?path={package_slug}" - response = requests.get(url) - if response.status_code == 200: - return response.json() - - -def fetch_yaml(file_path): - response = requests.get( - f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/raw/master/{file_path}" - ) - if response.status_code == 200: - return response.text - - -def get_package_slug(purl): - """ - Constructs a package slug from a given purl. - - Parameters: - purl: A PackageURL instance representing the package to query. - - Returns: - A string representing the package slug, or None if the purl type is not supported by GitLab. - """ - supported_ecosystem = GitlabDataSource.supported_ecosystem() - - if purl.type not in supported_ecosystem: - return - - ecosystem = supported_ecosystem[purl.type] - package_name = purl.name - - if purl.type in ("maven", "composer", "golang"): - package_name = f"{purl.namespace}/{purl.name}" - - return f"{ecosystem}/{package_name}" - - def get_casesensitive_slug(path, package_slug): payload = [ { @@ -163,12 +119,12 @@ def get_casesensitive_slug(path, package_slug): has_next = paginated_tree["pageInfo"]["hasNextPage"] -def parse_interesting_advisories(yml_files, purl) -> Iterable[VendorData]: +def parse_interesting_advisories(advisories, purl) -> Iterable[VendorData]: """ Parses advisories from YAML files in a given location that match a given version. Parameters: - yml_files: An array having the paths of yml files to parse. + advisories: A list of advisory dictionaries fetched from the GitLab API. purl: PURL for the advisory. Yields: @@ -176,14 +132,12 @@ def parse_interesting_advisories(yml_files, purl) -> Iterable[VendorData]: """ version = purl.version - for file in yml_files: - yml_data = fetch_yaml(file["path"]) - gitlab_advisory = saneyaml.load(yml_data) - affected_range = gitlab_advisory["affected_range"] + for advisory in advisories: + affected_range = advisory.get("affected_range") if gitlab_constraints_satisfied(affected_range, version): yield VendorData( purl=PackageURL(purl.type, purl.namespace, purl.name), - aliases=gitlab_advisory["identifiers"], + aliases=advisory.get("identifiers", []), affected_versions=[affected_range], - fixed_versions=gitlab_advisory["fixed_versions"], + fixed_versions=advisory.get("fixed_versions", []), ) diff --git a/vulntotal/datasources/gitlab_api.py b/vulntotal/datasources/gitlab_api.py new file mode 100644 index 000000000..278c51138 --- /dev/null +++ b/vulntotal/datasources/gitlab_api.py @@ -0,0 +1,56 @@ +import requests +import saneyaml + + +def fetch_directory_contents(package_slug): + url = f"https://gitlab.com/api/v4/projects/12006272/repository/tree?path={package_slug}" + response = requests.get(url) + if response.status_code == 200: + return response.json() + return [] + + +def fetch_yaml(file_path): + response = requests.get( + f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/raw/master/{file_path}" + ) + if response.status_code == 200: + return response.text + return None + + +def get_package_slug(purl, supported_ecosystem): + if purl.type not in supported_ecosystem: + return + ecosystem = supported_ecosystem[purl.type] + package_name = purl.name + if purl.type in ("maven", "composer", "golang"): + package_name = f"{purl.namespace}/{purl.name}" + return f"{ecosystem}/{package_name}" + + +def get_directory_yml_files(purl, supported_ecosystem, get_casesensitive_slug): + package_slug = get_package_slug(purl, supported_ecosystem) + directory_files = fetch_directory_contents(package_slug) + if not directory_files: + path = supported_ecosystem[purl.type] + casesensitive_package_slug = get_casesensitive_slug(path, package_slug) + directory_files = fetch_directory_contents(casesensitive_package_slug) + if not directory_files: + return [] + return [file for file in directory_files if file["name"].endswith(".yml")] + + +def fetch_gitlab_advisories_for_purl(purl, supported_ecosystem, get_casesensitive_slug): + yml_files = get_directory_yml_files(purl, supported_ecosystem, get_casesensitive_slug) + + advisories = [] + for file in yml_files: + yml_data = fetch_yaml(file["path"]) + if yml_data: + advisories.append(saneyaml.load(yml_data)) + return advisories + + +def get_estimated_advisories_count(purl, supported_ecosystem, get_casesensitive_slug): + return len(get_directory_yml_files(purl, supported_ecosystem, get_casesensitive_slug)) diff --git a/vulntotal/tests/test_data/gitlab/parsed_advisory-expected.json b/vulntotal/tests/test_data/gitlab/parsed_advisory-expected.json index 1a4e4a024..6391ed740 100644 --- a/vulntotal/tests/test_data/gitlab/parsed_advisory-expected.json +++ b/vulntotal/tests/test_data/gitlab/parsed_advisory-expected.json @@ -1,51 +1,26 @@ [ { - "purl": "pkg:generic/namespace/test", - "affected_versions": [ - "<=2.7.1" - ], - "fixed_versions": [ - "2.7.2" - ], - "aliases": [ - "CVE-2014-1402" - ] + "purl": "pkg:pypi/namespace/test", + "affected_versions": ["<=2.7.1"], + "fixed_versions": ["2.7.2"], + "aliases": ["CVE-2014-1402"] }, { - "purl": "pkg:generic/namespace/test", - "affected_versions": [ - "<2.8.1" - ], - "fixed_versions": [ - "2.8.1" - ], - "aliases": [ - "GHSA-hj2j-77xm-mc5v", - "CVE-2016-10745" - ] + "purl": "pkg:pypi/namespace/test", + "affected_versions": ["<2.8.1"], + "fixed_versions": ["2.8.1"], + "aliases": ["GHSA-hj2j-77xm-mc5v", "CVE-2016-10745"] }, { - "purl": "pkg:generic/namespace/test", - "affected_versions": [ - "<2.10.1" - ], - "fixed_versions": [ - "2.10.1" - ], - "aliases": [ - "CVE-2019-10906" - ] + "purl": "pkg:pypi/namespace/test", + "affected_versions": ["<2.10.1"], + "fixed_versions": ["2.10.1"], + "aliases": ["CVE-2019-10906"] }, { - "purl": "pkg:generic/namespace/test", - "affected_versions": [ - "<2.11.3" - ], - "fixed_versions": [ - "2.11.3" - ], - "aliases": [ - "CVE-2020-28493" - ] + "purl": "pkg:pypi/namespace/test", + "affected_versions": ["<2.11.3"], + "fixed_versions": ["2.11.3"], + "aliases": ["CVE-2020-28493"] } -] \ No newline at end of file +] diff --git a/vulntotal/tests/test_gitlab.py b/vulntotal/tests/test_gitlab.py index 2870e81dc..3f8e993a4 100644 --- a/vulntotal/tests/test_gitlab.py +++ b/vulntotal/tests/test_gitlab.py @@ -14,6 +14,7 @@ from vulnerabilities.tests import util_tests from vulntotal.datasources import gitlab +from vulntotal.datasources import gitlab_api class TestGitlab(testcase.FileBasedTesting): @@ -28,12 +29,26 @@ def test_generate_package_advisory_url(self): "pkg:composer/bolt/core@0.1", "pkg:nuget/moment.js@2.18.0", ] - results = [gitlab.get_package_slug(PackageURL.from_string(purl)) for purl in purls] + supported_ecosystem = gitlab.GitlabDataSource.supported_ecosystem() + results = [ + gitlab_api.get_package_slug(PackageURL.from_string(purl), supported_ecosystem) + for purl in purls + ] expected_file = self.get_test_loc("package_advisory_url-expected.json", must_exist=False) util_tests.check_results_against_json(results, expected_file) - @mock.patch("vulntotal.datasources.gitlab.fetch_yaml") - def test_parse_interesting_advisories(self, mock_fetch_yaml): + @mock.patch("vulntotal.datasources.gitlab_api.fetch_yaml") + @mock.patch("vulntotal.datasources.gitlab_api.fetch_directory_contents") + def test_parse_interesting_advisories(self, mock_fetch_directory_contents, mock_fetch_yaml): + # Mock the directory contents response + mock_fetch_directory_contents.return_value = [ + {"name": "CVE-2014-1402.yml", "path": "path/to/CVE-2014-1402.yml"}, + {"name": "CVE-2016-10745.yml", "path": "path/to/CVE-2016-10745.yml"}, + {"name": "CVE-2019-10906.yml", "path": "path/to/CVE-2019-10906.yml"}, + {"name": "CVE-2019-8341.yml", "path": "path/to/CVE-2019-8341.yml"}, + {"name": "CVE-2020-28493.yml", "path": "path/to/CVE-2020-28493.yml"}, + ] + # Mock the yaml file responses advisory_folder = ( Path(__file__) @@ -51,17 +66,15 @@ def test_parse_interesting_advisories(self, mock_fetch_yaml): mock_fetch_yaml.side_effect = yaml_files - purl = PackageURL("generic", "namespace", "test", "0.1.1") + purl = PackageURL("pypi", "namespace", "test", "0.1.1") - yml_files = [ - {"name": "CVE-2014-1402.yml", "path": "path/to/CVE-2014-1402.yml"}, - {"name": "CVE-2016-10745.yml", "path": "path/to/CVE-2016-10745.yml"}, - {"name": "CVE-2019-10906.yml", "path": "path/to/CVE-2019-10906.yml"}, - {"name": "CVE-2019-8341.yml", "path": "path/to/CVE-2019-8341.yml"}, - {"name": "CVE-2020-28493.yml", "path": "path/to/CVE-2020-28493.yml"}, - ] + supported_ecosystem = gitlab.GitlabDataSource.supported_ecosystem() + + advisories = gitlab_api.fetch_gitlab_advisories_for_purl( + purl, supported_ecosystem, gitlab.get_casesensitive_slug + ) - results = [adv.to_dict() for adv in gitlab.parse_interesting_advisories(yml_files, purl)] + results = [adv.to_dict() for adv in gitlab.parse_interesting_advisories(advisories, purl)] expected_file = self.get_test_loc("parsed_advisory-expected.json", must_exist=False) util_tests.check_results_against_json(results, expected_file)