From 292432b94de72c2615d06d631309f765524513d1 Mon Sep 17 00:00:00 2001 From: ljstella Date: Fri, 2 May 2025 16:52:18 -0500 Subject: [PATCH] initial stab --- contentctl/actions/inspect.py | 33 +++++++++- contentctl/objects/errors.py | 112 +++++++++++++++++++++++++++++++++- 2 files changed, 143 insertions(+), 2 deletions(-) diff --git a/contentctl/actions/inspect.py b/contentctl/actions/inspect.py index 40c0ff27..7c0cde88 100644 --- a/contentctl/actions/inspect.py +++ b/contentctl/actions/inspect.py @@ -16,6 +16,7 @@ DetectionMissingError, MetadataValidationError, VersionBumpingError, + VersionBumpingNotNeededError, VersionBumpingTooFarError, VersionDecrementedError, ) @@ -427,6 +428,7 @@ def check_detection_metadata(self, config: inspect) -> None: rule_name=rule_name, current_version=current_stanza.metadata.detection_version, previous_version=previous_stanza.metadata.detection_version, + expected_version=previous_stanza.metadata.detection_version + 1, ) ) @@ -434,12 +436,28 @@ def check_detection_metadata(self, config: inspect) -> None: if ( current_stanza.metadata.detection_version > previous_stanza.metadata.detection_version + 1 + ): + if current_stanza.hash != previous_stanza.hash: + validation_errors[rule_name].append( + VersionBumpingTooFarError( + rule_name=rule_name, + current_version=current_stanza.metadata.detection_version, + previous_version=previous_stanza.metadata.detection_version, + expected_version=previous_stanza.metadata.detection_version + + 1, + ) + ) + # Versions should not be bumped if the stanza has not changed + if (current_stanza.hash == previous_stanza.hash) & ( + current_stanza.metadata.detection_version + != previous_stanza.metadata.detection_version ): validation_errors[rule_name].append( - VersionBumpingTooFarError( + VersionBumpingNotNeededError( rule_name=rule_name, current_version=current_stanza.metadata.detection_version, previous_version=previous_stanza.metadata.detection_version, + expected_version=previous_stanza.metadata.detection_version, ) ) @@ -451,12 +469,25 @@ def check_detection_metadata(self, config: inspect) -> None: # Report failure/success print("\nDetection Metadata Validation:") if len(validation_error_list) > 0: + with open("metadata_validation_errors.json", "w") as f: + fixable_errors = [ + x + for x in validation_error_list + if type(x) + in [ + VersionBumpingError, + VersionBumpingNotNeededError, + VersionBumpingTooFarError, + ] + ] + json.dump([x.toJSON() for x in fixable_errors], f, indent=4) # Iterate over each rule and report the failures for rule_name in validation_errors: if len(validation_errors[rule_name]) > 0: print(f"\t❌ {rule_name}") for error in validation_errors[rule_name]: print(f"\t\t🔸 {error.short_message}") + else: # If no errors in the list, report success print( diff --git a/contentctl/objects/errors.py b/contentctl/objects/errors.py index 3da48acc..c05d0b6b 100644 --- a/contentctl/objects/errors.py +++ b/contentctl/objects/errors.py @@ -134,11 +134,16 @@ class VersioningError(MetadataValidationError, ABC): previous_version: int def __init__( - self, rule_name: str, current_version: int, previous_version: int, *args: object + self, + rule_name: str, + current_version: int, + previous_version: int, + *args: object, ) -> None: self.rule_name = rule_name self.current_version = current_version self.previous_version = previous_version + super().__init__(self.long_message, *args) @@ -176,6 +181,20 @@ class VersionBumpingError(VersioningError): An error indicating the detection changed but its version wasn't bumped appropriately """ + # The version expected in the current build, when possible + expected_version: int + + def __init__( + self, + rule_name: str, + current_version: int, + previous_version: int, + expected_version: int, + *args: object, + ) -> None: + self.expected_version = expected_version + super().__init__(rule_name, current_version, previous_version, *args) + @property def long_message(self) -> str: """ @@ -196,12 +215,90 @@ def short_message(self) -> str: """ return f"Detection version in current build should be bumped to {self.previous_version + 1}." + def toJSON(self) -> dict[str, object]: + """ + Convert the error to a JSON-serializable dict + :returns: a dict, the error + """ + return { + "rule_name": self.rule_name, + "message": self.short_message, + "current_version": self.current_version, + "previous_version": self.previous_version, + "expected_version": self.expected_version, + } + + +class VersionBumpingNotNeededError(VersioningError): + """ + An error indicating the detection did not change but its version was bumped + """ + + expected_version: int + + def __init__( + self, + rule_name: str, + current_version: int, + previous_version: int, + expected_version: int, + *args: object, + ) -> None: + self.expected_version = expected_version + super().__init__(rule_name, current_version, previous_version, *args) + + @property + def long_message(self) -> str: + """ + A long-form error message + :returns: a str, the message + """ + return ( + f"Rule '{self.rule_name}' has not changed in current build compared to previous " + "build (stanza hashes are the same); the detection version should not be bumped." + ) + + @property + def short_message(self) -> str: + """ + A short-form error message + :returns: a str, the message + """ + return "Detection version in current build should not be bumped." + + def toJSON(self) -> dict[str, object]: + """ + Convert the error to a JSON-serializable dict + :returns: a dict, the error + """ + return { + "rule_name": self.rule_name, + "message": self.short_message, + "current_version": self.current_version, + "previous_version": self.previous_version, + "expected_version": self.expected_version, + } + class VersionBumpingTooFarError(VersioningError): """ An error indicating the detection changed but its version was bumped too far """ + # The version expected in the current build, when possible + expected_version: int + + def __init__( + self, + rule_name: str, + current_version: int, + previous_version: int, + expected_version: int, + *args: object, + ) -> None: + self.expected_version = expected_version + super().__init__(rule_name, current_version, previous_version, *args) + @property def long_message(self) -> str: """ @@ -221,3 +318,16 @@ def short_message(self) -> str: :returns: a str, the message """ return f"Detection version in current build should be reduced to {self.previous_version + 1}." + + def toJSON(self) -> dict[str, object]: + """ + Convert the error to a JSON-serializable dict + :returns: a dict, the error + """ + return { + "rule_name": self.rule_name, + "current_version": self.current_version, + "message": self.short_message, + "previous_version": self.previous_version, + "expected_version": self.expected_version, + }