Skip to content

Commit 9468a8b

Browse files
committed
feat: Add Python SAST workflow with three security analysis tools
Implements Issue #5 - Python SAST workflow that combines: - Dependency scanning (pip-audit) for CVE detection - Security linting (Bandit) for vulnerability patterns - Type checking (Mypy) for type safety issues ## Changes **New Modules:** - `DependencyScanner`: Scans Python dependencies for known CVEs using pip-audit - `BanditAnalyzer`: Analyzes Python code for security issues using Bandit - `MypyAnalyzer`: Checks Python code for type safety issues using Mypy **New Workflow:** - `python_sast`: Temporal workflow that orchestrates all three SAST tools - Runs tools in parallel for fast feedback (3-5 min vs hours for fuzzing) - Generates unified SARIF report with findings from all tools - Supports configurable severity/confidence thresholds **Updates:** - Added SAST dependencies to Python worker (bandit, pip-audit, mypy) - Updated module __init__.py files to export new analyzers - Added type_errors.py test file to vulnerable_app for Mypy validation ## Testing Workflow tested successfully on vulnerable_app: - ✅ Bandit: Detected 9 security issues (command injection, unsafe functions) - ✅ Mypy: Detected 5 type errors - ✅ DependencyScanner: Ran successfully (no CVEs in test dependencies) - ✅ SARIF export: Generated valid SARIF with 14 total findings
1 parent 6e4241a commit 9468a8b

File tree

11 files changed

+1556
-2
lines changed

11 files changed

+1556
-2
lines changed

backend/toolbox/modules/analyzer/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,7 @@
1010
# Additional attribution and requirements are provided in the NOTICE file.
1111

1212
from .security_analyzer import SecurityAnalyzer
13+
from .bandit_analyzer import BanditAnalyzer
14+
from .mypy_analyzer import MypyAnalyzer
1315

14-
__all__ = ["SecurityAnalyzer"]
16+
__all__ = ["SecurityAnalyzer", "BanditAnalyzer", "MypyAnalyzer"]
Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
"""
2+
Bandit Analyzer Module - Analyzes Python code for security issues using Bandit
3+
"""
4+
5+
# Copyright (c) 2025 FuzzingLabs
6+
#
7+
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
8+
# at the root of this repository for details.
9+
#
10+
# After the Change Date (four years from publication), this version of the
11+
# Licensed Work will be made available under the Apache License, Version 2.0.
12+
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
13+
#
14+
# Additional attribution and requirements are provided in the NOTICE file.
15+
16+
import asyncio
17+
import json
18+
import logging
19+
import time
20+
from pathlib import Path
21+
from typing import Dict, Any, List
22+
import uuid
23+
24+
try:
25+
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
26+
except ImportError:
27+
try:
28+
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
29+
except ImportError:
30+
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
31+
32+
logger = logging.getLogger(__name__)
33+
34+
35+
class BanditAnalyzer(BaseModule):
36+
"""
37+
Analyzes Python code for security issues using Bandit.
38+
39+
This module:
40+
- Runs Bandit security linter on Python files
41+
- Detects common security issues (SQL injection, hardcoded secrets, etc.)
42+
- Reports findings with severity levels
43+
"""
44+
45+
# Severity mapping from Bandit levels to our standard
46+
SEVERITY_MAP = {
47+
"LOW": "low",
48+
"MEDIUM": "medium",
49+
"HIGH": "high"
50+
}
51+
52+
def get_metadata(self) -> ModuleMetadata:
53+
"""Get module metadata"""
54+
return ModuleMetadata(
55+
name="bandit_analyzer",
56+
version="1.0.0",
57+
description="Analyzes Python code for security issues using Bandit",
58+
author="FuzzForge Team",
59+
category="analyzer",
60+
tags=["python", "security", "bandit", "sast"],
61+
input_schema={
62+
"severity_level": {
63+
"type": "string",
64+
"enum": ["low", "medium", "high"],
65+
"description": "Minimum severity level to report",
66+
"default": "low"
67+
},
68+
"confidence_level": {
69+
"type": "string",
70+
"enum": ["low", "medium", "high"],
71+
"description": "Minimum confidence level to report",
72+
"default": "medium"
73+
},
74+
"exclude_tests": {
75+
"type": "boolean",
76+
"description": "Exclude test files from analysis",
77+
"default": True
78+
},
79+
"skip_ids": {
80+
"type": "array",
81+
"items": {"type": "string"},
82+
"description": "List of Bandit test IDs to skip",
83+
"default": []
84+
}
85+
},
86+
output_schema={
87+
"findings": {
88+
"type": "array",
89+
"description": "List of security issues found by Bandit"
90+
}
91+
},
92+
requires_workspace=True
93+
)
94+
95+
def validate_config(self, config: Dict[str, Any]) -> bool:
96+
"""Validate module configuration"""
97+
severity = config.get("severity_level", "low")
98+
if severity not in ["low", "medium", "high"]:
99+
raise ValueError("severity_level must be one of: low, medium, high")
100+
101+
confidence = config.get("confidence_level", "medium")
102+
if confidence not in ["low", "medium", "high"]:
103+
raise ValueError("confidence_level must be one of: low, medium, high")
104+
105+
skip_ids = config.get("skip_ids", [])
106+
if not isinstance(skip_ids, list):
107+
raise ValueError("skip_ids must be a list")
108+
109+
return True
110+
111+
async def _run_bandit(
112+
self,
113+
workspace: Path,
114+
severity_level: str,
115+
confidence_level: str,
116+
exclude_tests: bool,
117+
skip_ids: List[str]
118+
) -> Dict[str, Any]:
119+
"""
120+
Run Bandit on the workspace.
121+
122+
Args:
123+
workspace: Path to workspace
124+
severity_level: Minimum severity to report
125+
confidence_level: Minimum confidence to report
126+
exclude_tests: Whether to exclude test files
127+
skip_ids: List of test IDs to skip
128+
129+
Returns:
130+
Bandit JSON output as dict
131+
"""
132+
try:
133+
# Build bandit command
134+
cmd = [
135+
"bandit",
136+
"-r", str(workspace),
137+
"-f", "json",
138+
"-ll", # Report all findings (we'll filter later)
139+
]
140+
141+
# Add exclude patterns for test files
142+
if exclude_tests:
143+
cmd.extend(["-x", "*/test_*.py,*/tests/*,*_test.py"])
144+
145+
# Add skip IDs if specified
146+
if skip_ids:
147+
cmd.extend(["-s", ",".join(skip_ids)])
148+
149+
logger.info(f"Running Bandit on: {workspace}")
150+
process = await asyncio.create_subprocess_exec(
151+
*cmd,
152+
stdout=asyncio.subprocess.PIPE,
153+
stderr=asyncio.subprocess.PIPE
154+
)
155+
156+
stdout, stderr = await process.communicate()
157+
158+
# Bandit returns non-zero if issues found, which is expected
159+
if process.returncode not in [0, 1]:
160+
logger.error(f"Bandit failed: {stderr.decode()}")
161+
return {"results": []}
162+
163+
# Parse JSON output
164+
result = json.loads(stdout.decode())
165+
return result
166+
167+
except Exception as e:
168+
logger.error(f"Error running Bandit: {e}")
169+
return {"results": []}
170+
171+
def _should_include_finding(
172+
self,
173+
issue: Dict[str, Any],
174+
min_severity: str,
175+
min_confidence: str
176+
) -> bool:
177+
"""
178+
Determine if a Bandit issue should be included based on severity/confidence.
179+
180+
Args:
181+
issue: Bandit issue dict
182+
min_severity: Minimum severity threshold
183+
min_confidence: Minimum confidence threshold
184+
185+
Returns:
186+
True if issue should be included
187+
"""
188+
severity_order = ["low", "medium", "high"]
189+
issue_severity = issue.get("issue_severity", "LOW").lower()
190+
issue_confidence = issue.get("issue_confidence", "LOW").lower()
191+
192+
severity_meets_threshold = severity_order.index(issue_severity) >= severity_order.index(min_severity)
193+
confidence_meets_threshold = severity_order.index(issue_confidence) >= severity_order.index(min_confidence)
194+
195+
return severity_meets_threshold and confidence_meets_threshold
196+
197+
def _convert_to_findings(
198+
self,
199+
bandit_result: Dict[str, Any],
200+
workspace: Path,
201+
min_severity: str,
202+
min_confidence: str
203+
) -> List[ModuleFinding]:
204+
"""
205+
Convert Bandit results to ModuleFindings.
206+
207+
Args:
208+
bandit_result: Bandit JSON output
209+
workspace: Workspace path for relative paths
210+
min_severity: Minimum severity to include
211+
min_confidence: Minimum confidence to include
212+
213+
Returns:
214+
List of ModuleFindings
215+
"""
216+
findings = []
217+
218+
for issue in bandit_result.get("results", []):
219+
# Filter by severity and confidence
220+
if not self._should_include_finding(issue, min_severity, min_confidence):
221+
continue
222+
223+
# Extract issue details
224+
test_id = issue.get("test_id", "B000")
225+
test_name = issue.get("test_name", "unknown")
226+
issue_text = issue.get("issue_text", "No description")
227+
severity = self.SEVERITY_MAP.get(issue.get("issue_severity", "LOW"), "low")
228+
229+
# File location
230+
filename = issue.get("filename", "")
231+
line_number = issue.get("line_number", 0)
232+
code = issue.get("code", "")
233+
234+
# Try to get relative path
235+
try:
236+
file_path = Path(filename)
237+
rel_path = file_path.relative_to(workspace)
238+
except (ValueError, TypeError):
239+
rel_path = Path(filename).name
240+
241+
# Create finding
242+
finding = self.create_finding(
243+
title=f"{test_name} ({test_id})",
244+
description=issue_text,
245+
severity=severity,
246+
category="security-issue",
247+
file_path=str(rel_path),
248+
line_start=line_number,
249+
line_end=line_number,
250+
code_snippet=code.strip() if code else None,
251+
recommendation=f"Review and fix the security issue identified by Bandit test {test_id}",
252+
metadata={
253+
"test_id": test_id,
254+
"test_name": test_name,
255+
"confidence": issue.get("issue_confidence", "LOW").lower(),
256+
"cwe": issue.get("issue_cwe", {}).get("id") if issue.get("issue_cwe") else None,
257+
"more_info": issue.get("more_info", "")
258+
}
259+
)
260+
findings.append(finding)
261+
262+
return findings
263+
264+
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
265+
"""
266+
Execute the Bandit analyzer module.
267+
268+
Args:
269+
config: Module configuration
270+
workspace: Path to workspace
271+
272+
Returns:
273+
ModuleResult with security findings
274+
"""
275+
start_time = time.time()
276+
metadata = self.get_metadata()
277+
278+
# Validate inputs
279+
self.validate_config(config)
280+
self.validate_workspace(workspace)
281+
282+
# Get configuration
283+
severity_level = config.get("severity_level", "low")
284+
confidence_level = config.get("confidence_level", "medium")
285+
exclude_tests = config.get("exclude_tests", True)
286+
skip_ids = config.get("skip_ids", [])
287+
288+
# Run Bandit
289+
logger.info("Starting Bandit analysis...")
290+
bandit_result = await self._run_bandit(
291+
workspace,
292+
severity_level,
293+
confidence_level,
294+
exclude_tests,
295+
skip_ids
296+
)
297+
298+
# Convert to findings
299+
findings = self._convert_to_findings(
300+
bandit_result,
301+
workspace,
302+
severity_level,
303+
confidence_level
304+
)
305+
306+
# Calculate summary
307+
severity_counts = {}
308+
for finding in findings:
309+
sev = finding.severity
310+
severity_counts[sev] = severity_counts.get(sev, 0) + 1
311+
312+
execution_time = time.time() - start_time
313+
314+
return ModuleResult(
315+
module=metadata.name,
316+
version=metadata.version,
317+
status="success",
318+
execution_time=execution_time,
319+
findings=findings,
320+
summary={
321+
"total_issues": len(findings),
322+
"by_severity": severity_counts,
323+
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
324+
},
325+
metadata={
326+
"bandit_version": bandit_result.get("generated_at", "unknown"),
327+
"metrics": bandit_result.get("metrics", {})
328+
}
329+
)

0 commit comments

Comments
 (0)