Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions integrations/checkmarx-one/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.2 (2025-10-07)


### Features

- Added DAST scan environment exporter for fetching scan environments
- Added DAST scan exporter for fetching scans within environments
- Added DAST scan result exporter for fetching scan results with filtering capabilities


## 0.1.1 (2025-10-03)


Expand Down
7 changes: 3 additions & 4 deletions integrations/checkmarx-one/checkmarx_one/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,12 @@ async def send_api_request(
status_code = e.response.status_code
response_text = e.response.text

logger.error(
f"HTTP error {status_code} for {method} {url}: {response_text}"
)

if self._should_ignore_error(e, url, ignored_errors):
return {}

logger.error(
f"HTTP error {status_code} for {method} {url}: {response_text}"
)
# Re-raise the original error for non-ignored errors
raise

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Any
from loguru import logger

from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from checkmarx_one.core.exporters.abstract_exporter import AbstractCheckmarxExporter
from port_ocean.utils.cache import cache_iterator_result


class CheckmarxDastScanEnvironmentExporter(AbstractCheckmarxExporter):
"""Exporter for Checkmarx One DAST scan environments."""

async def get_resource(self, options: Any) -> Any:
raise NotImplementedError(
"Fetching single DAST scan environment is not supported"
)

@cache_iterator_result()
async def get_paginated_resources(
self,
options: None = None,
) -> ASYNC_GENERATOR_RESYNC_TYPE:
async for dast_scan_environments in self.client.send_paginated_request(
"/dast/scans/environments", "environments"
):
logger.info(
f"Fetched batch of {len(dast_scan_environments)} DAST scan environments"
)
yield dast_scan_environments
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import Any, Dict
from loguru import logger

from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from checkmarx_one.core.exporters.abstract_exporter import AbstractCheckmarxExporter
from port_ocean.utils.cache import cache_iterator_result
from checkmarx_one.core.options import ListDastScanOptions


class CheckmarxDastScanExporter(AbstractCheckmarxExporter):
"""Exporter for Checkmarx One DAST scans."""

def _enrich_dast_scan_with_environment_id(
self, dast_scan: Dict[str, Any], environment_id: str
) -> dict[str, Any]:
"""Enrich DAST scan with environment ID."""
dast_scan["__environment_id"] = environment_id
return dast_scan

async def get_resource(self, options: Any) -> Any:
raise NotImplementedError("Fetching single DAST scan is not supported")

@cache_iterator_result()
async def get_paginated_resources(
self,
options: ListDastScanOptions,
) -> ASYNC_GENERATOR_RESYNC_TYPE:
environment_id = options["environment_id"]
params: dict[str, Any] = {"environmentID": environment_id}
if groups := options.get("groups"):
params["groups"] = groups

async for results in self.client.send_paginated_request(
"/dast/scans/scans", "scans", params
):
logger.info(f"Fetched batch of {len(results)} DAST scans")
yield [
self._enrich_dast_scan_with_environment_id(result, environment_id)
for result in results
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import Any, Dict
from loguru import logger

from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from checkmarx_one.core.exporters.abstract_exporter import AbstractCheckmarxExporter
from checkmarx_one.core.options import ListDastScanResultOptions


class CheckmarxDastScanResultExporter(AbstractCheckmarxExporter):
"""Exporter for Checkmarx One DAST results per scan."""

def _enrich_scan_result_with_dast_scan_id(
self, dast_scan_result: Dict[str, Any], dast_scan_id: str
) -> dict[str, Any]:
"""Enrich scan result with scan ID."""
dast_scan_result["__dast_scan_id"] = dast_scan_id
return dast_scan_result

async def get_resource(self, options: Any) -> Any:
raise NotImplementedError("Fetching single DAST result is not supported")

def _build_params(self, options: ListDastScanResultOptions) -> dict[str, Any]:
"""Build query parameters for DAST scan results API request."""
params: dict[str, Any] = {}

if severity := options.get("severity"):
params["severity"] = severity
if status := options.get("status"):
params["status"] = status
if state := options.get("state"):
params["state"] = state

return params

async def get_paginated_resources(
self,
options: ListDastScanResultOptions,
) -> ASYNC_GENERATOR_RESYNC_TYPE:
params: dict[str, Any] = self._build_params(options)
dast_scan_id = options["dast_scan_id"]
endpoint = f"/dast/mfe-results/results/{dast_scan_id}"
async for results in self.client.send_paginated_request(
endpoint, "results", params
):
logger.info(
f"Fetched batch of {len(results)} DAST results for scan {dast_scan_id}"
)
yield [
self._enrich_scan_result_with_dast_scan_id(
result,
dast_scan_id,
)
for result in results
]
36 changes: 36 additions & 0 deletions integrations/checkmarx-one/checkmarx_one/core/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,39 @@ class ListSastOptions(TypedDict, total=False):
]
]
visible_columns: NotRequired[Optional[List[str]]]


class ListDastScanEnvironmentOptions(TypedDict):
"""Options for listing DAST scan environments."""

search: NotRequired[Optional[str]]


class ListDastScanOptions(TypedDict):
"""Options for listing DAST scans."""

environment_id: Required[str]
groups: NotRequired[Optional[List[str]]]


class ListDastScanResultOptions(TypedDict):
"""Options for listing DAST results for a scan."""

dast_scan_id: Required[str]
severity: NotRequired[
Optional[List[Literal["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]]]
]
status: NotRequired[Optional[List[Literal["NEW", "RECURRENT"]]]]
state: NotRequired[
Optional[
List[
Literal[
"TO_VERIFY",
"NOT_EXPLOITABLE",
"PROPOSED_NOT_EXPLOITABLE",
"CONFIRMED",
"URGENT",
]
]
]
]
27 changes: 27 additions & 0 deletions integrations/checkmarx-one/checkmarx_one/exporter_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@
from checkmarx_one.core.exporters.scan_result_exporter import (
CheckmarxScanResultExporter,
)
from checkmarx_one.core.exporters.dast_scan_environment_exporter import (
CheckmarxDastScanEnvironmentExporter,
)
from checkmarx_one.core.exporters.dast_scan_exporter import (
CheckmarxDastScanExporter,
)
from checkmarx_one.core.exporters.dast_scan_result_exporter import (
CheckmarxDastScanResultExporter,
)


def create_project_exporter() -> CheckmarxProjectExporter:
Expand Down Expand Up @@ -43,3 +52,21 @@ def create_scan_result_exporter() -> CheckmarxScanResultExporter:
"""Create a scan result exporter with initialized client."""
client = get_checkmarx_client()
return CheckmarxScanResultExporter(client)


def create_dast_scan_environment_exporter() -> CheckmarxDastScanEnvironmentExporter:
"""Create an environment exporter with initialized client."""
client = get_checkmarx_client()
return CheckmarxDastScanEnvironmentExporter(client)


def create_dast_scan_exporter() -> CheckmarxDastScanExporter:
"""Create a DAST scan exporter with initialized client."""
client = get_checkmarx_client()
return CheckmarxDastScanExporter(client)


def create_dast_scan_result_exporter() -> CheckmarxDastScanResultExporter:
"""Create a DAST results exporter with initialized client."""
client = get_checkmarx_client()
return CheckmarxDastScanResultExporter(client)
3 changes: 3 additions & 0 deletions integrations/checkmarx-one/checkmarx_one/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class ObjectKind(StrEnum):
API_SEC = "api-security"
SAST = "sast"
KICS = "kics"
DAST_SCAN_ENVIRONMENT = "dast-scan-environment"
DAST_SCAN = "dast-scan"
DAST_SCAN_RESULT = "dast-scan-result"


class ScanResultObjectKind(StrEnum):
Expand Down
84 changes: 84 additions & 0 deletions integrations/checkmarx-one/fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
from checkmarx_one.exporter_factory import (
create_dast_scan_exporter,
create_dast_scan_result_exporter,
)
from checkmarx_one.core.options import ListDastScanOptions, ListDastScanResultOptions
from loguru import logger
from typing import TYPE_CHECKING, Dict, Any, cast

if TYPE_CHECKING:
from integration import CheckmarxOneDastScanResultSelector
from checkmarx_one.core.exporters.dast_scan_result_exporter import (
CheckmarxDastScanResultExporter,
)


async def fetch_dast_scan_results(
dast_scan_id: str,
selector: "CheckmarxOneDastScanResultSelector",
dast_scan_result_exporter: "CheckmarxDastScanResultExporter",
) -> list[Dict[str, Any]]:
"""Fetch all paginated DAST results for a single scan."""
options = ListDastScanResultOptions(
dast_scan_id=dast_scan_id,
severity=selector.filter.severity,
status=selector.filter.status,
state=selector.filter.state,
)

all_results: list[Dict[str, Any]] = []
async for results_batch in dast_scan_result_exporter.get_paginated_resources(
options
):
all_results.extend(results_batch)

logger.info(f"Fetched {len(all_results)} DAST results for scan {dast_scan_id}")
return all_results


async def fetch_dast_scan_results_for_environment(
dast_scan_environment: Dict[str, Any],
selector: "CheckmarxOneDastScanResultSelector",
) -> list[Dict[str, Any]]:
"""Fetch all scans and their results for a given DAST environment."""

dast_scan_exporter = create_dast_scan_exporter()
dast_scan_result_exporter = create_dast_scan_result_exporter()

env_id = dast_scan_environment["environmentId"]

dast_scan_options = ListDastScanOptions(
environment_id=env_id,
groups=selector.dast_scan_filter.groups,
)

tasks = []
async for dast_scans_batch in dast_scan_exporter.get_paginated_resources(
dast_scan_options
):
for dast_scan in dast_scans_batch:
scan_id = dast_scan["scanId"]
tasks.append(
asyncio.create_task(
fetch_dast_scan_results(
scan_id, selector, dast_scan_result_exporter
)
)
)

if not tasks:
logger.info(f"No scans found for environment {env_id}")
return []

results_batches = await asyncio.gather(*tasks, return_exceptions=True)

merged: list[Dict[str, Any]] = []
for result in results_batches:
if isinstance(result, Exception):
logger.warning(f"Error fetching DAST scan results: {result}")
continue
merged.extend(cast(list[Dict[str, Any]], result))

logger.info(f"Environment {env_id}: {len(merged)} total results")
return merged
Loading