|
1 | 1 | # -*- coding: utf_8 -*- |
2 | 2 | """Semantic Grep Helpers.""" |
3 | 3 | import json |
4 | | -import logging |
5 | 4 | import platform |
6 | | -import multiprocessing |
| 5 | +import subprocess |
7 | 6 |
|
8 | 7 |
|
9 | | -def invoke_semgrep(paths, scan_rules, **kwargs): |
10 | | - """Call Semgrep.""" |
| 8 | +def invoke_semgrep(paths, scan_rules): |
11 | 9 | if platform.system() == 'Windows': |
12 | 10 | return None |
13 | | - from semgrep import semgrep_main |
14 | | - from semgrep.state import get_state |
15 | | - from semgrep.constants import OutputFormat |
16 | | - from semgrep.output import OutputHandler, OutputSettings |
17 | | - try: |
18 | | - cpu_count = multiprocessing.cpu_count() |
19 | | - except NotImplementedError: |
20 | | - cpu_count = 1 # CPU count is not implemented on Windows |
21 | | - # Semgrep output formatting |
22 | | - state = get_state() |
23 | | - state.terminal.configure( |
24 | | - verbose=False, |
25 | | - debug=False, |
26 | | - quiet=True, |
27 | | - force_color=False, |
28 | | - ) |
29 | | - logging.getLogger('semgrep').propagate = False |
30 | | - output_settings = OutputSettings( |
31 | | - output_format=OutputFormat.JSON, |
32 | | - output_destination=None, |
33 | | - output_per_finding_max_lines_limit=None, |
34 | | - output_per_line_max_chars_limit=None, |
35 | | - error_on_findings=False, |
36 | | - verbose_errors=False, |
37 | | - strict=False, |
38 | | - timeout_threshold=3, |
39 | | - ) |
40 | | - output_handler = OutputHandler(output_settings) |
41 | | - ( |
42 | | - filtered_matches_by_rule, |
43 | | - _, |
44 | | - _, |
45 | | - _, |
46 | | - _, |
47 | | - _, |
48 | | - _, |
49 | | - _, |
50 | | - _, |
51 | | - _, |
52 | | - _, |
53 | | - _, |
54 | | - ) = semgrep_main.main( |
55 | | - output_handler=output_handler, |
56 | | - target=[pt.as_posix() for pt in paths], |
57 | | - jobs=cpu_count, |
58 | | - pattern=None, |
59 | | - lang=None, |
60 | | - configs=[scan_rules], |
61 | | - timeout=5, |
62 | | - timeout_threshold=3, |
63 | | - **kwargs, |
64 | | - ) |
65 | | - output_handler.rule_matches = [ |
66 | | - m for ms in filtered_matches_by_rule.values() for m in ms |
| 11 | + ps = [pt.as_posix() for pt in paths] |
| 12 | + command = [ |
| 13 | + 'semgrep', |
| 14 | + '--metrics=off', |
| 15 | + '--no-rewrite-rule-ids', |
| 16 | + '--json', |
| 17 | + '-q', |
| 18 | + '--config', |
| 19 | + scan_rules, |
| 20 | + *ps, |
67 | 21 | ] |
68 | | - return json.loads(output_handler._build_output()) |
| 22 | + try: |
| 23 | + result = subprocess.run(command, capture_output=True, text=True, check=True) |
| 24 | + return json.loads(result.stdout) |
| 25 | + except subprocess.CalledProcessError as e: |
| 26 | + try: |
| 27 | + return json.loads(e.output) |
| 28 | + except json.JSONDecodeError: |
| 29 | + return {'errors': e.output} |
0 commit comments