Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions plugins/modules/crawl_n_mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ def crawl(module, path) -> bool:
# Check if any parent directory (not the root) is excluded
if any(part in EXCLUDED_DIRS for part in rel_path.split(os.sep)):
continue

for f in files:
if not re.search(excluded_file_ext_regex, f):
if mask(module, os.path.join(root, f)):
Expand All @@ -206,7 +205,15 @@ def _get_masked_string(value):
return value[:2] + MASK_STR + value[-2:]


def partial_mask(value):
def format_masked(prefix, value, suffix, extension):
return (
f"{prefix}'{value}'{suffix}"
if extension == "yaml"
else f'{prefix}"{value}"{suffix}'
)


def partial_mask(value, extension):
"""
Check length of the string. If it is too long, take 2 chars
from beginning, then add mask string and add 2 chars from the
Expand All @@ -215,22 +222,21 @@ def partial_mask(value):
"""
if not value.strip():
return

if "'" in value:
parsed_value = value.split("'")
if "'" in value or extension == "json":
parsed_value = value.split('"') if extension == "json" else value.split("'")
if len(parsed_value) > 2 and parsed_value[1] != "":
prefix = parsed_value[0]
value = _get_masked_string(parsed_value[1])
suffix = parsed_value[2]
return f"{prefix}'{value}'{suffix}"
return format_masked(prefix, value, suffix, extension)
else:
match = re.match(r"^(\s*)(.*?)(\n?)$", value)
if match:
parts = list(match.groups())
prefix = parts[0]
value = _get_masked_string(parts[1])
suffix = parts[2]
return f"{prefix}'{value}'{suffix}"
return format_masked(prefix, value, suffix, extension)


def mask(module, path: str) -> bool:
Expand All @@ -240,16 +246,17 @@ def mask(module, path: str) -> bool:
respective masking methods for that file.
"""
changed = False
if (
if path.endswith("json"):
changed = mask_file(module, path, "json")
elif (
path.endswith((tuple(["yaml", "yml"])))
or os.path.basename(path).split(".")[0] in ALLOWED_YAML_FILES
):
extension = "yaml"
changed = mask_file(module, path, extension)
changed = mask_file(module, path, "yaml")
return changed


def mask_yaml(infile, outfile, changed) -> bool:
def mask_by_extension(infile, outfile, changed, extension) -> bool:
"""
Read the file, search for colon (':'), take value and
mask sensitive data
Expand All @@ -259,12 +266,12 @@ def mask_yaml(infile, outfile, changed) -> bool:
if ":" not in line:
outfile.write(line)
continue

key, sep, value = line.partition(":")
comparable_key = key.strip().replace('"', "")
masked_value = value
for word in PROTECT_KEYS:
if key.strip() == word:
masked = partial_mask(value)
if comparable_key == word.strip():
masked = partial_mask(value, extension)
if not masked:
continue
masked_value = masked_value.replace(value, masked)
Expand Down Expand Up @@ -293,10 +300,9 @@ def mask_file(module, path, extension) -> bool:
try:
with file_path.open("r", encoding="utf-8") as infile:
with temp_path.open("w", encoding="utf-8") as outfile:
if extension == "yaml":
changed = mask_yaml(infile, outfile, changed)
replace_file(temp_path, file_path, changed)
return changed
changed = mask_by_extension(infile, outfile, changed, extension)
replace_file(temp_path, file_path, changed)
return changed
except Exception as e:
print(f"An unexpected error occurred on masking file {file_path}: {e}")

Expand Down
36 changes: 15 additions & 21 deletions tests/unit/modules/test_crawl_n_mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,19 @@ def test_crawl_false(self, test_dir, expected_files):
changed = cnm.crawl(module, test_dir)
assert not changed

def test_partial_mask_scenario_1(self):
example_value = " 'test1234'\n"
expected_value = " 'te**********34'\n"
test_value = cnm.partial_mask(example_value)
assert expected_value == test_value

def test_partial_mask_scenario_2(self):
example_value = " osp_ci_framework_keytab\n"
expected_value = " 'os**********ab'\n"
test_value = cnm.partial_mask(example_value)
assert expected_value == test_value

def test_partial_mask_scenario_3(self):
example_value = " ''\n"
test_value = cnm.partial_mask(example_value)
assert test_value is None

def test_partial_mask_scenario_4(self):
example_value = "tet"
expected_value = "'te**********'"
test_value = cnm.partial_mask(example_value)
@pytest.mark.parametrize(
"file_format, example_value, expected_value",
[
("yaml", " 'test1234'\n", " 'te**********34'\n"),
("yaml", " osp_ci_framework_keytab\n", " 'os**********ab'\n"),
("yaml", " ''\n", None),
("yaml", "tet", "'te**********'"),
("json", ' "test1234",\n', ' "te**********34",\n'),
("json", ' "osp_ci_framework_keytab",\n', ' "os**********ab",\n'),
("json", " ''\n", None),
("json", '"tet"', '"te**********"'),
],
)
def test_partial_mask(self, file_format, example_value, expected_value):
test_value = cnm.partial_mask(example_value, file_format)
assert expected_value == test_value