Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from vulnerabilities.pipelines.v2_importers import apache_httpd_importer as apache_httpd_v2
from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2
from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2
from vulnerabilities.pipelines.v2_importers import cvelistv5_importer as cvelistv5_importer_v2
from vulnerabilities.pipelines.v2_importers import (
elixir_security_importer as elixir_security_importer_v2,
)
Expand All @@ -69,6 +70,7 @@
elixir_security_importer_v2.ElixirSecurityImporterPipeline,
npm_importer_v2.NpmImporterPipeline,
vulnrichment_importer_v2.VulnrichImporterPipeline,
cvelistv5_importer_v2.CVEListV5ImporterPipeline,
apache_httpd_v2.ApacheHTTPDImporterPipeline,
pypa_importer_v2.PyPaImporterPipeline,
gitlab_importer_v2.GitLabImporterPipeline,
Expand Down
162 changes: 162 additions & 0 deletions vulnerabilities/importers/cve_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import json
import re

import dateparser

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.models import VulnerabilityReference
from vulnerabilities.severity_systems import SCORING_SYSTEMS
from vulnerabilities.utils import get_cwe_id
from vulnerabilities.utils import get_reference_id
from vulnerabilities.utils import ssvc_calculator


def parse_cve_v5_advisory(raw_data, advisory_url):
cve_metadata = raw_data.get("cveMetadata", {})
cve_id = cve_metadata.get("cveId")

date_published = cve_metadata.get("datePublished")
if date_published:
date_published = dateparser.parse(
date_published,
settings={
"TIMEZONE": "UTC",
"RETURN_AS_TIMEZONE_AWARE": True,
"TO_TIMEZONE": "UTC",
},
)

# Extract containers
containers = raw_data.get("containers", {})
cna_data = containers.get("cna", {})
adp_data = containers.get("adp", {})

# Extract descriptions
summary = ""
description_list = cna_data.get("descriptions", [])
for description_dict in description_list:
if not description_dict.get("lang") in ["en", "en-US"]:
continue
summary = description_dict.get("value")

# Extract metrics
severities = []
metrics = cna_data.get("metrics", []) + [
adp_metrics for data in adp_data for adp_metrics in data.get("metrics", [])
]

cve_scoring_system = {
"cvssV4_0": SCORING_SYSTEMS["cvssv4"],
"cvssV3_1": SCORING_SYSTEMS["cvssv3.1"],
"cvssV3_0": SCORING_SYSTEMS["cvssv3"],
"cvssV2_0": SCORING_SYSTEMS["cvssv2"],
"other": {
"ssvc": SCORING_SYSTEMS["ssvc"],
}, # ignore kev
}

for metric in metrics:
for metric_type, metric_value in metric.items():
if metric_type not in cve_scoring_system:
continue

if metric_type == "other":
other_types = metric_value.get("type")
if other_types == "ssvc":
content = metric_value.get("content", {})
vector_string, decision = ssvc_calculator(content)
scoring_system = cve_scoring_system[metric_type][other_types]
severity = VulnerabilitySeverity(
system=scoring_system, value=decision, scoring_elements=vector_string
)
severities.append(severity)
# ignore kev
else:
vector_string = metric_value.get("vectorString")
base_score = metric_value.get("baseScore")
scoring_system = cve_scoring_system[metric_type]
severity = VulnerabilitySeverity(
system=scoring_system, value=base_score, scoring_elements=vector_string
)
severities.append(severity)

# Extract references cpes and ignore affected products
cpes = set()
for affected_product in cna_data.get("affected", []):
if type(affected_product) != dict:
continue
cpes.update(affected_product.get("cpes") or [])

references = []
for ref in cna_data.get("references", []):
# https://github.com/CVEProject/cve-schema/blob/main/schema/tags/reference-tags.json
# We removed all unwanted reference types and set the default reference type to 'OTHER'.
ref_type = VulnerabilityReference.OTHER
vul_ref_types = {
"exploit": VulnerabilityReference.EXPLOIT,
"issue-tracking": VulnerabilityReference.BUG,
"mailing-list": VulnerabilityReference.MAILING_LIST,
"third-party-advisory": VulnerabilityReference.ADVISORY,
"vendor-advisory": VulnerabilityReference.ADVISORY,
"vdb-entry": VulnerabilityReference.ADVISORY,
}

for tag_type in ref.get("tags", []):
if tag_type in vul_ref_types:
ref_type = vul_ref_types.get(tag_type)

url = ref.get("url")
reference = ReferenceV2(
reference_id=get_reference_id(url),
url=url,
reference_type=ref_type,
)

references.append(reference)

cpes_ref = [
ReferenceV2(
reference_id=cpe,
reference_type=VulnerabilityReference.OTHER,
url=f"https://nvd.nist.gov/vuln/search/results?adv_search=true&isCpeNameSearch=true&query={cpe}",
)
for cpe in sorted(list(cpes))
]
references.extend(cpes_ref)

weaknesses = set()
for problem_type in cna_data.get("problemTypes", []):
descriptions = problem_type.get("descriptions", [])
for description in descriptions:
cwe_id = description.get("cweId")
if cwe_id:
weaknesses.add(get_cwe_id(cwe_id))

description_text = description.get("description")
if description_text:
pattern = r"CWE-(\d+)"
match = re.search(pattern, description_text)
if match:
weaknesses.add(int(match.group(1)))

return AdvisoryData(
advisory_id=cve_id,
aliases=[],
summary=summary,
references_v2=references,
date_published=date_published,
weaknesses=sorted(weaknesses),
url=advisory_url,
severities=severities,
original_advisory_text=json.dumps(raw_data, indent=2, ensure_ascii=False),
)
115 changes: 1 addition & 114 deletions vulnerabilities/importers/vulnrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from vulnerabilities.utils import get_advisory_url
from vulnerabilities.utils import get_cwe_id
from vulnerabilities.utils import get_reference_id
from vulnerabilities.utils import ssvc_calculator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -184,117 +185,3 @@ def parse_cve_advisory(raw_data, advisory_url):
weaknesses=sorted(weaknesses),
url=advisory_url,
)


def ssvc_calculator(ssvc_data):
"""
Return the ssvc vector and the decision value
"""
options = ssvc_data.get("options", [])
timestamp = ssvc_data.get("timestamp")

# Extract the options into a dictionary
options_dict = {k: v.lower() for option in options for k, v in option.items()}

# We copied the table value from this link.
# https://www.cisa.gov/sites/default/files/publications/cisa-ssvc-guide%20508c.pdf

# Determining Mission and Well-Being Impact Value
mission_well_being_table = {
# (Mission Prevalence, Public Well-being Impact) : "Mission & Well-being"
("minimal", "minimal"): "low",
("minimal", "material"): "medium",
("minimal", "irreversible"): "high",
("support", "minimal"): "medium",
("support", "material"): "medium",
("support", "irreversible"): "high",
("essential", "minimal"): "high",
("essential", "material"): "high",
("essential", "irreversible"): "high",
}

if "Mission Prevalence" not in options_dict:
options_dict["Mission Prevalence"] = "minimal"

if "Public Well-being Impact" not in options_dict:
options_dict["Public Well-being Impact"] = "material"

options_dict["Mission & Well-being"] = mission_well_being_table[
(options_dict["Mission Prevalence"], options_dict["Public Well-being Impact"])
]

decision_key = (
options_dict.get("Exploitation"),
options_dict.get("Automatable"),
options_dict.get("Technical Impact"),
options_dict.get("Mission & Well-being"),
)

decision_points = {
"Exploitation": {"E": {"none": "N", "poc": "P", "active": "A"}},
"Automatable": {"A": {"no": "N", "yes": "Y"}},
"Technical Impact": {"T": {"partial": "P", "total": "T"}},
"Public Well-being Impact": {"B": {"minimal": "M", "material": "A", "irreversible": "I"}},
"Mission Prevalence": {"P": {"minimal": "M", "support": "S", "essential": "E"}},
"Mission & Well-being": {"M": {"low": "L", "medium": "M", "high": "H"}},
}

# Create the SSVC vector
ssvc_vector = "SSVCv2/"
for key, value_map in options_dict.items():
options_key = decision_points.get(key)
for lhs, rhs_map in options_key.items():
ssvc_vector += f"{lhs}:{rhs_map.get(value_map)}/"

# "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}},
decision_values = {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}

decision_lookup = {
("none", "no", "partial", "low"): "Track",
("none", "no", "partial", "medium"): "Track",
("none", "no", "partial", "high"): "Track",
("none", "no", "total", "low"): "Track",
("none", "no", "total", "medium"): "Track",
("none", "no", "total", "high"): "Track*",
("none", "yes", "partial", "low"): "Track",
("none", "yes", "partial", "medium"): "Track",
("none", "yes", "partial", "high"): "Attend",
("none", "yes", "total", "low"): "Track",
("none", "yes", "total", "medium"): "Track",
("none", "yes", "total", "high"): "Attend",
("poc", "no", "partial", "low"): "Track",
("poc", "no", "partial", "medium"): "Track",
("poc", "no", "partial", "high"): "Track*",
("poc", "no", "total", "low"): "Track",
("poc", "no", "total", "medium"): "Track*",
("poc", "no", "total", "high"): "Attend",
("poc", "yes", "partial", "low"): "Track",
("poc", "yes", "partial", "medium"): "Track",
("poc", "yes", "partial", "high"): "Attend",
("poc", "yes", "total", "low"): "Track",
("poc", "yes", "total", "medium"): "Track*",
("poc", "yes", "total", "high"): "Attend",
("active", "no", "partial", "low"): "Track",
("active", "no", "partial", "medium"): "Track",
("active", "no", "partial", "high"): "Attend",
("active", "no", "total", "low"): "Track",
("active", "no", "total", "medium"): "Attend",
("active", "no", "total", "high"): "Act",
("active", "yes", "partial", "low"): "Attend",
("active", "yes", "partial", "medium"): "Attend",
("active", "yes", "partial", "high"): "Act",
("active", "yes", "total", "low"): "Attend",
("active", "yes", "total", "medium"): "Act",
("active", "yes", "total", "high"): "Act",
}

decision = decision_lookup.get(decision_key, "")

if decision:
ssvc_vector += f"D:{decision_values.get(decision)}/"

if timestamp:
timestamp_formatted = dateparser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ")

ssvc_vector += f"{timestamp_formatted}/"
return ssvc_vector, decision
71 changes: 71 additions & 0 deletions vulnerabilities/pipelines/v2_importers/cvelistv5_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import json
import logging
from pathlib import Path
from typing import Iterable

from fetchcode.vcs import fetch_via_vcs

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importers.cve_schema import parse_cve_v5_advisory
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.utils import get_advisory_url

logger = logging.getLogger(__name__)


class CVEListV5ImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
pipeline_id = "cvelistv5_importer_v2"
# license PR: https://github.com/CVEProject/cvelistV5/pull/65
spdx_license_expression = "CC0-1.0"
license_url = "https://github.com/CVEProject/cvelistV5/blob/main/LICENSE"
repo_url = "git+https://github.com/CVEProject/cvelistV5"

@classmethod
def steps(cls):
return (
cls.clone,
cls.collect_and_store_advisories,
cls.clean_downloads,
)

def clone(self):
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)

def advisories_count(self):
vuln_directory = Path(self.vcs_response.dest_dir) / "cves"
return sum(1 for _ in vuln_directory.glob("*.json"))

def collect_advisories(self) -> Iterable[AdvisoryData]:
base_directory = Path(self.vcs_response.dest_dir)
vulns_directory = base_directory / "cves"

for file in vulns_directory.rglob("*.json"):
if not file.name.startswith("CVE-"):
continue

advisory_url = get_advisory_url(
file=file,
base_path=base_directory,
url="https://github.com/CVEProject/cvelistV5/blob/main/",
)

with open(file) as f:
raw_data = json.load(f)
yield parse_cve_v5_advisory(raw_data, advisory_url)

def clean_downloads(self):
if self.vcs_response:
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()
Loading