Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions ssg/build_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

from .cce import is_cce_format_valid, is_cce_value_valid
from .yaml import DocumentationNotComplete, open_and_expand
from .utils import required_key, mkdir_p
from .utils import required_key, mkdir_p, safe_evaluate_boolean_filter

from .xml import ElementTree as ET, register_namespaces, parse_file
import ssg.build_stig
Expand Down Expand Up @@ -1619,9 +1619,7 @@ def rule_filter_from_def(filterdef):
return noop_rule_filterfunc

def filterfunc(rule):
# Remove globals for security and only expose
# variables relevant to the rule
return eval(filterdef, {"__builtins__": None}, rule.__dict__)
return safe_evaluate_boolean_filter(filterdef, rule.__dict__)
return filterfunc


Expand Down
5 changes: 2 additions & 3 deletions ssg/entities/profile_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
OSCAP_RULE,
OSCAP_VALUE,
)
from ..utils import safe_evaluate_boolean_filter


def noop_rule_filterfunc(rule):
Expand All @@ -30,9 +31,7 @@ def filterfunc(rule):
c = copy.copy(rule)
if c.platform is None:
c.platform = ''
# Remove globals for security and only expose
# variables relevant to the rule
return eval(filterdef, {"__builtins__": None}, c.__dict__)
return safe_evaluate_boolean_filter(filterdef, c.__dict__)
return filterfunc


Expand Down
69 changes: 68 additions & 1 deletion ssg/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

from __future__ import absolute_import

import ast
import multiprocessing
import os
import re
from collections import namedtuple
import hashlib
from typing import Dict
from typing import Any, Dict, Mapping

from .constants import (FULL_NAME_TO_PRODUCT_MAPPING,
MAKEFILE_ID_TO_PRODUCT_MAP,
Expand All @@ -24,6 +25,72 @@ class SSGError(RuntimeError):
PRODUCT_NAME_PARSER = re.compile(r"(.+?)([0-9]+)$")


def _safe_eval_filter_node(node: ast.AST, variables: Mapping[str, Any]) -> Any:
if isinstance(node, ast.Expression):
return _safe_eval_filter_node(node.body, variables)

if isinstance(node, ast.BoolOp):
values = [_safe_eval_filter_node(value, variables) for value in node.values]
if isinstance(node.op, ast.And):
return all(values)
if isinstance(node.op, ast.Or):
return any(values)
raise ValueError(f"Unsupported boolean operator in filter expression: {ast.dump(node.op)}")

if isinstance(node, ast.UnaryOp) and isinstance(node.op, ast.Not):
return not bool(_safe_eval_filter_node(node.operand, variables))

if isinstance(node, ast.Compare):
left = _safe_eval_filter_node(node.left, variables)
for op, comparator in zip(node.ops, node.comparators):
right = _safe_eval_filter_node(comparator, variables)
if isinstance(op, ast.In):
result = left in right
elif isinstance(op, ast.NotIn):
result = left not in right
elif isinstance(op, ast.Eq):
result = left == right
elif isinstance(op, ast.NotEq):
result = left != right
else:
raise ValueError(
f"Unsupported comparison operator in filter expression: {ast.dump(op)}"
)
if not result:
return False
left = right
return True

if isinstance(node, ast.Name):
if node.id not in variables:
raise ValueError(f"Unknown variable '{node.id}' in filter expression")
return variables[node.id]

if isinstance(node, ast.Constant):
return node.value

if isinstance(node, ast.List):
return [_safe_eval_filter_node(element, variables) for element in node.elts]

if isinstance(node, ast.Tuple):
return tuple(_safe_eval_filter_node(element, variables) for element in node.elts)

if isinstance(node, ast.Set):
return {_safe_eval_filter_node(element, variables) for element in node.elts}

raise ValueError(
"Unsupported construct in filter expression: {node}".format(node=ast.dump(node))
)


def safe_evaluate_boolean_filter(filterdef: str, variables: Mapping[str, Any]) -> bool:
try:
expression = ast.parse(filterdef, mode="eval")
except SyntaxError as exc:
raise ValueError(f"Invalid filter expression: {filterdef}") from exc
return bool(_safe_eval_filter_node(expression, variables))


class VersionSpecifierSet(set):
"""
A set-like collection that only accepts VersionSpecifier objects.
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/ssg-module/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,29 @@ def test_ver_specs():
# We support only VersionSpecifier objects as members of VersionSpecifierSet
with pytest.raises(ValueError):
utils.VersionSpecifierSet([utils.VersionSpecifier('>=', evr123), '1.2.3'])


def test_safe_evaluate_boolean_filter_allows_supported_profile_expressions():
variables = {
"platform": "ocp4-node",
"platforms": {"eks-control-plane", "ocp4-node"},
}

assert utils.safe_evaluate_boolean_filter('"ocp4-node" in platform', variables)
assert utils.safe_evaluate_boolean_filter('"eks-node" not in platforms', variables)
assert utils.safe_evaluate_boolean_filter(
'"ocp4-node" in platform or "ocp4-master-node" in platform', variables
)
assert utils.safe_evaluate_boolean_filter(
'not ("ocp4-master-node" in platform)', variables
)


def test_safe_evaluate_boolean_filter_rejects_unsafe_constructs():
variables = {"platform": "ocp4-node", "platforms": {"ocp4-node"}}

with pytest.raises(ValueError):
utils.safe_evaluate_boolean_filter('__import__("os").system("id")', variables)

with pytest.raises(ValueError):
utils.safe_evaluate_boolean_filter('platform.startswith("ocp4")', variables)
53 changes: 41 additions & 12 deletions utils/ansible_playbook_to_role.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io
import os
import os.path
import subprocess
import sys
import shutil
import re
Expand Down Expand Up @@ -82,6 +83,21 @@ def dict_constructor(loader, node):
os.path.dirname(os.path.abspath(__file__)),
"ansible_galaxy_readme_template.md"
)
GITHUB_IDENTIFIER_RE = re.compile(r"^[A-Za-z0-9_.-]+$")


def _validate_github_identifier(value, field_name):
if not GITHUB_IDENTIFIER_RE.fullmatch(value):
raise ValueError(
"{field_name} contains unsupported characters: {value}".format(
field_name=field_name, value=value
)
)
return value


def _run_command(args, cwd=None):
subprocess.run(args, cwd=cwd, check=True)


def create_empty_repositories(github_new_repos, github_org):
Expand All @@ -98,20 +114,33 @@ def create_empty_repositories(github_new_repos, github_org):


def clone_and_init_repository(parent_dir, organization, repo):
organization = _validate_github_identifier(organization, "organization")
repo = _validate_github_identifier(repo, "repo")
repo_dir = os.path.join(parent_dir, repo)

# 1. Initialize the Ansible role first (creates the directory)
os.system(f"ansible-galaxy init {repo}")
_run_command(["ansible-galaxy", "init", repo], cwd=parent_dir)

# 2. Change directory and initialize git
os.chdir(repo)
try:
os.system("git init --initial-branch=main")
os.system(f"git remote add origin git@github.com:{organization}/{repo}")
os.system('git add .')
os.system('git commit -a -m "Initial commit" --author "%s <%s>"'
% (GIT_COMMIT_AUTHOR_NAME, GIT_COMMIT_AUTHOR_EMAIL))
os.system('git push origin main')
finally:
os.chdir("..")
# 2. Initialize git in the generated role directory
_run_command(["git", "init", "--initial-branch=main"], cwd=repo_dir)
_run_command(
["git", "remote", "add", "origin", f"git@github.com:{organization}/{repo}"],
cwd=repo_dir,
)
_run_command(["git", "add", "."], cwd=repo_dir)
_run_command(
[
"git",
"commit",
"-a",
"-m",
"Initial commit",
"--author",
f"{GIT_COMMIT_AUTHOR_NAME} <{GIT_COMMIT_AUTHOR_EMAIL}>",
],
cwd=repo_dir,
)
_run_command(["git", "push", "origin", "main"], cwd=repo_dir)


def update_repo_release(github, repo):
Expand Down
Loading