From e6152cf997ec69c0a770ae211af3e984c6bc94bb Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Thu, 21 Aug 2025 15:41:22 +0300 Subject: [PATCH 1/2] Add Elixir Security Live V2 Importer Pipeline #1933 * Add Elixir Security Live V2 Importer * Add tests for the Elixir Security Live V2 Importer * Tested functionally using the Live Evaluation API in #1969 Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/importers/__init__.py | 9 ++ .../v2_importers/elixir_security_importer.py | 49 ++++-- .../elixir_security_live_importer.py | 140 ++++++++++++++++++ .../test_elixir_security_live_importer_v2.py | 98 ++++++++++++ 4 files changed, 283 insertions(+), 13 deletions(-) create mode 100644 vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py create mode 100644 vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 82ee4525a..c2df1061c 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -47,6 +47,9 @@ from vulnerabilities.pipelines.v2_importers import ( elixir_security_importer as elixir_security_importer_v2, ) +from vulnerabilities.pipelines.v2_importers import ( + elixir_security_live_importer as elixir_security_live_importer_v2, +) from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2 from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2 from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2 @@ -117,3 +120,9 @@ oss_fuzz.OSSFuzzImporter, ] ) + +LIVE_IMPORTERS_REGISTRY = create_registry( + [ + elixir_security_live_importer_v2.ElixirSecurityLiveImporterPipeline, + ] +) diff --git a/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py b/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py index 4fb95ad3b..7258ea727 100644 --- a/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py +++ b/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py @@ -17,7 +17,7 @@ from univers.version_range import HexVersionRange from vulnerabilities.importer import AdvisoryData -from vulnerabilities.importer import AffectedPackageV2 +from vulnerabilities.importer import AffectedPackage from vulnerabilities.importer import ReferenceV2 from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 from vulnerabilities.utils import is_cve @@ -69,27 +69,44 @@ def on_failure(self): def process_file(self, file, base_path) -> Iterable[AdvisoryData]: relative_path = str(file.relative_to(base_path)).strip("/") - path_segments = str(file).split("/") - # use the last two segments as the advisory ID - advisory_id = "/".join(path_segments[-2:]).replace(".yml", "") - advisory_url = ( - f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}" - ) advisory_text = None with open(str(file)) as f: advisory_text = f.read() yaml_file = load_yaml(str(file)) + # Delegate to shared builder + yield from self.build_advisory_from_yaml( + yaml_file=yaml_file, + advisory_text=advisory_text or str(yaml_file), + relative_path=relative_path, + ) + + def build_advisory_from_yaml( + self, yaml_file, advisory_text: str, relative_path: str + ) -> Iterable[AdvisoryData]: + """ + Build AdvisoryData objects from a parsed YAML mapping and the repo-relative path. + relative_path example: "packages//.yml" + """ + from pathlib import Path # ensure Path is available + + path_segments = Path(relative_path).parts + # use the last two segments as the advisory ID + advisory_id = "/".join(path_segments[-2:]).replace(".yml", "") + advisory_url = ( + f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}" + ) + summary = yaml_file.get("description") or "" pkg_name = yaml_file.get("package") or "" cve_id = "" cve = yaml_file.get("cve") or "" - if cve and not cve.startswith("CVE-"): + if cve and not str(cve).startswith("CVE-"): cve_id = f"CVE-{cve}" elif cve: - cve_id = cve + cve_id = str(cve) if not cve_id or not is_cve(cve_id): return @@ -105,9 +122,12 @@ def process_file(self, file, base_path) -> Iterable[AdvisoryData]: patched_versions = yaml_file.get("patched_versions") or [] for version in unaffected_versions: - constraints.append(VersionConstraint.from_string(version_class=vrc, string=version)) + constraints.append( + VersionConstraint.from_string(version_class=vrc, string=str(version)) + ) for version in patched_versions: + version = str(version) if version.startswith("~>"): version = version[2:] constraints.append( @@ -117,7 +137,7 @@ def process_file(self, file, base_path) -> Iterable[AdvisoryData]: affected_packages = [] if pkg_name: affected_packages.append( - AffectedPackageV2( + AffectedPackage( package=PackageURL(type="hex", name=pkg_name), affected_version_range=HexVersionRange(constraints=constraints), ) @@ -125,7 +145,10 @@ def process_file(self, file, base_path) -> Iterable[AdvisoryData]: date_published = None if yaml_file.get("disclosure_date"): - date_published = dateparser.parse(yaml_file.get("disclosure_date")) + disclosure = yaml_file.get("disclosure_date") + if not isinstance(disclosure, str): + disclosure = str(disclosure) + date_published = dateparser.parse(disclosure) yield AdvisoryData( advisory_id=advisory_id, @@ -135,5 +158,5 @@ def process_file(self, file, base_path) -> Iterable[AdvisoryData]: affected_packages=affected_packages, url=advisory_url, date_published=date_published, - original_advisory_text=advisory_text or str(yaml_file), + original_advisory_text=advisory_text, ) diff --git a/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py b/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py new file mode 100644 index 000000000..6f3e181cb --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py @@ -0,0 +1,140 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from typing import Iterable + +import requests +import yaml +from packageurl import PackageURL +from univers.versions import SemverVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines.v2_importers.elixir_security_importer import ( + ElixirSecurityImporterPipeline, +) + + +class ElixirSecurityLiveImporterPipeline(ElixirSecurityImporterPipeline): + """ + Elixir Security Advisories Importer Pipeline + + This pipeline imports security advisories for a single elixir PURL. + """ + + pipeline_id = "elixir_security_live_importer_v2" + supported_types = ["hex"] + + @classmethod + def steps(cls): + return ( + cls.get_purl_inputs, + cls.collect_and_store_advisories, + ) + + def get_purl_inputs(self): + purl = self.inputs["purl"] + if not purl: + raise ValueError("PURL is required for ElixirSecurityLiveImporterPipeline") + + if isinstance(purl, str): + purl = PackageURL.from_string(purl) + + if not isinstance(purl, PackageURL): + raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance") + + if purl.type not in self.supported_types: + raise ValueError( + f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}" + ) + + if not purl.version: + raise ValueError(f"PURL: {purl!s} is expected to have a version") + + self.purl = purl + + def advisories_count(self) -> int: + if self.purl.type != "hex": + return 0 + + try: + directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{self.purl.name}" + response = requests.get(directory_url) + + if response.status_code != 200: + return 0 + + yaml_files = [file for file in response.json() if file["name"].endswith(".yml")] + return len(yaml_files) + except Exception: + return 0 + + def collect_advisories(self) -> Iterable[AdvisoryData]: + if self.purl.type != "hex": + self.log(f"PURL type {self.purl.type} is not supported by Elixir Security importer") + return [] + + package_name = self.purl.name + + try: + directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}" + response = requests.get(directory_url) + + if response.status_code != 200: + self.log(f"No advisories found for {package_name} in Elixir Security Database") + return [] + + yaml_entries = [file for file in response.json() if file["name"].endswith(".yml")] + + for entry in yaml_entries: + # entry["path"] looks like: packages//.yml + file_path = entry["path"] + content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}" + content_response = requests.get( + content_url, headers={"Accept": "application/vnd.github.v3.raw"} + ) + + if content_response.status_code != 200: + self.log(f"Failed to fetch file content for {file_path}") + continue + + advisory_text = content_response.text + + try: + yaml_file = yaml.safe_load(advisory_text) or {} + except Exception as e: + self.log(f"Failed to parse YAML for {file_path}: {e}") + continue + + for advisory in self.build_advisory_from_yaml( + yaml_file=yaml_file, advisory_text=advisory_text, relative_path=file_path + ): + if self.purl.version and not self._advisory_affects_version(advisory): + continue + yield advisory + + except Exception as e: + self.log(f"Error fetching advisories for {self.purl}: {str(e)}") + return [] + + def _advisory_affects_version(self, advisory: AdvisoryData) -> bool: + if not self.purl.version: + return True + + for affected_package in advisory.affected_packages: + if affected_package.affected_version_range: + try: + purl_version = SemverVersion(self.purl.version) + + if purl_version in affected_package.affected_version_range: + return True + except Exception as e: + self.log(f"Failed to parse version {self.purl.version}: {str(e)}") + return True + + return False diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py new file mode 100644 index 000000000..b2d267cd3 --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py @@ -0,0 +1,98 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import shutil +from pathlib import Path +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from packageurl import PackageURL + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines.v2_importers.elixir_security_live_importer import ( + ElixirSecurityLiveImporterPipeline, +) + + +@pytest.fixture +def test_data_dir(): + return Path(__file__).parent.parent.parent / "test_data" / "elixir_security" + + +@patch("requests.get") +def test_package_first_mode_with_version_filter(mock_get, test_data_dir): + directory_response = MagicMock() + directory_response.status_code = 200 + directory_response.json.return_value = [ + {"name": "test_file.yml", "path": "packages/coherence/test_file.yml"} + ] + + advisory_file_path = test_data_dir / "test_file.yml" + advisory_content = advisory_file_path.read_text() + + content_response = MagicMock() + content_response.status_code = 200 + content_response.text = advisory_content + + mock_get.side_effect = [directory_response, content_response] + + # Version affected + purl = PackageURL(type="hex", name="coherence", version="0.5.1") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 1 + + # Version not affected + mock_get.side_effect = [directory_response, content_response] + purl = PackageURL(type="hex", name="coherence", version="0.5.2") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 0 + + +@patch("requests.get") +def test_package_first_mode_no_advisories(mock_get): + mock_response = MagicMock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + purl = PackageURL(type="hex", name="nonexistent-package") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + with pytest.raises(ValueError): + importer.get_purl_inputs() + + +@patch("requests.get") +def test_package_first_mode_api_error(mock_get): + directory_response = MagicMock() + directory_response.status_code = 200 + directory_response.json.return_value = [ + {"name": "test_file.yml", "path": "packages/coherence/test_file.yml"} + ] + + content_response = MagicMock() + content_response.status_code = 500 + + mock_get.side_effect = [directory_response, content_response] + + purl = PackageURL(type="hex", name="coherence", version="0.5.1") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 0 + + +def test_package_first_mode_non_hex_purl(): + purl = PackageURL(type="npm", name="some-package") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + with pytest.raises(ValueError): + importer.get_purl_inputs() From 0fee3b93a3c86aa64ddc95846950cbc6e8d27972 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Thu, 21 Aug 2025 16:08:39 +0300 Subject: [PATCH 2/2] Update Elixir Security V2 importer Signed-off-by: Michael Ehab Mikhail --- .../pipelines/v2_importers/elixir_security_importer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py b/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py index 7258ea727..ec1d1a08b 100644 --- a/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py +++ b/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py @@ -17,7 +17,7 @@ from univers.version_range import HexVersionRange from vulnerabilities.importer import AdvisoryData -from vulnerabilities.importer import AffectedPackage +from vulnerabilities.importer import AffectedPackageV2 from vulnerabilities.importer import ReferenceV2 from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 from vulnerabilities.utils import is_cve @@ -137,7 +137,7 @@ def build_advisory_from_yaml( affected_packages = [] if pkg_name: affected_packages.append( - AffectedPackage( + AffectedPackageV2( package=PackageURL(type="hex", name=pkg_name), affected_version_range=HexVersionRange(constraints=constraints), ) @@ -158,5 +158,5 @@ def build_advisory_from_yaml( affected_packages=affected_packages, url=advisory_url, date_published=date_published, - original_advisory_text=advisory_text, + original_advisory_text=advisory_text or str(yaml_file), )