diff --git a/CLI.md b/CLI.md index f4d1e2eb447..d59e025cca5 100644 --- a/CLI.md +++ b/CLI.md @@ -106,6 +106,7 @@ Options: -snv, --strip-none-values Strip None values from the rule -lc, --local-creation-date Preserve the local creation date of the rule -lu, --local-updated-date Preserve the local updated date of the rule + -lr, --load-rule-loading Enable arbitrary rule loading from the rules directories (Can be very slow!) -h, --help Show this message and exit. ``` @@ -507,6 +508,7 @@ Options: -lu, --local-updated-date Preserve the local updated date of the rule -cro, --custom-rules-only Only export custom rules -eq, --export-query TEXT Apply a query filter to exporting rules e.g. "alert.attributes.tags: \"test\"" to filter for rules that have the tag "test" + -lr, --load-rule-loading Enable arbitrary rule loading from the rules directories (Can be very slow!) -h, --help Show this message and exit. ``` diff --git a/detection_rules/kbwrap.py b/detection_rules/kbwrap.py index 6f5c9a156ed..3eaae364848 100644 --- a/detection_rules/kbwrap.py +++ b/detection_rules/kbwrap.py @@ -8,7 +8,7 @@ import re import sys from pathlib import Path -from typing import Any +from typing import Any, cast import click import kql # type: ignore[reportMissingTypeStubs] @@ -27,7 +27,8 @@ from .main import root from .misc import add_params, get_kibana_client, kibana_options, nested_set, raise_client_error from .rule import TOMLRule, TOMLRuleContents, downgrade_contents_from_rule -from .rule_loader import RuleCollection, update_metadata_from_file +from .rule_loader import RawRuleCollection, RuleCollection, update_metadata_from_file +from .schemas import definitions # noqa: TC001 from .utils import format_command_options, rulename_to_filename RULES_CONFIG = parse_rules_config() @@ -250,6 +251,12 @@ def _process_imported_items( '"alert.attributes.tags: \\"test\\"" to filter for rules that have the tag "test"' ), ) +@click.option( + "--load-rule-loading", + "-lr", + is_flag=True, + help="Enable arbitrary rule loading from the rules directories (Can be very slow!)", +) @click.pass_context def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915 ctx: click.Context, @@ -268,15 +275,20 @@ def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915 local_updated_date: bool = False, custom_rules_only: bool = False, export_query: str | None = None, + load_rule_loading: bool = False, ) -> list[TOMLRule]: """Export custom rules from Kibana.""" kibana = ctx.obj["kibana"] - kibana_include_details = export_exceptions or export_action_connectors + kibana_include_details = export_exceptions or export_action_connectors or custom_rules_only or export_query # Only allow one of rule_id or rule_name if rule_name and rule_id: raise click.UsageError("Cannot use --rule-id and --rule-name together. Please choose one.") + raw_rule_collection = RawRuleCollection() + if load_rule_loading: + raw_rule_collection = raw_rule_collection.default() + with kibana: # Look up rule IDs by name if --rule-name was provided if rule_name: @@ -365,6 +377,16 @@ def kibana_export_rules( # noqa: PLR0912, PLR0913, PLR0915 rule_name = rulename_to_filename(rule_resource.get("name"), tactic_name=tactic_name) # type: ignore[reportUnknownMemberType] save_path = directory / f"{rule_name}" + + # Get local rule data if load_rule_loading is enabled. If not enabled rules variable will be None. + local_rule: dict[str, Any] = params.get("rule", {}) + input_rule_id: str | None = None + + if local_rule: + input_rule_id = cast("definitions.UUIDString", local_rule.get("rule_id")) + + if input_rule_id and input_rule_id in raw_rule_collection.id_map: + save_path = raw_rule_collection.id_map[input_rule_id].path or save_path params.update( update_metadata_from_file( save_path, {"creation_date": local_creation_date, "updated_date": local_updated_date} diff --git a/detection_rules/main.py b/detection_rules/main.py index 5751284b9b8..7de190520d2 100644 --- a/detection_rules/main.py +++ b/detection_rules/main.py @@ -33,7 +33,7 @@ from .misc import add_client, nested_set, parse_user_config, raise_client_error from .rule import DeprecatedRule, QueryRuleData, TOMLRule, TOMLRuleContents from .rule_formatter import toml_write -from .rule_loader import RuleCollection, update_metadata_from_file +from .rule_loader import RawRuleCollection, RuleCollection, update_metadata_from_file from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file from .utils import ( Ndjson, @@ -157,6 +157,12 @@ def generate_rules_index( @click.option("--strip-none-values", "-snv", is_flag=True, help="Strip None values from the rule") @click.option("--local-creation-date", "-lc", is_flag=True, help="Preserve the local creation date of the rule") @click.option("--local-updated-date", "-lu", is_flag=True, help="Preserve the local updated date of the rule") +@click.option( + "--load-rule-loading", + "-lr", + is_flag=True, + help="Enable arbitrary rule loading from the rules directories (Can be very slow!)", +) def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915 input_file: tuple[Path, ...] | None, required_only: bool, @@ -171,6 +177,7 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915 strip_none_values: bool, local_creation_date: bool, local_updated_date: bool, + load_rule_loading: bool, ) -> None: """Import rules from json, toml, or yaml files containing Kibana exported rule(s).""" errors: list[str] = [] @@ -189,6 +196,10 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915 if not file_contents: click.echo("Must specify at least one file!") + raw_rule_collection = RawRuleCollection() + if load_rule_loading: + raw_rule_collection = raw_rule_collection.default() + exceptions_containers = {} exceptions_items = {} @@ -210,7 +221,12 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915 base_path = rulename_to_filename(base_path) if base_path else base_path if base_path is None: raise ValueError(f"Invalid rule file, please ensure the rule has a name field: {contents}") - rule_path = Path(os.path.join(str(save_directory) if save_directory else RULES_DIRS[0], base_path)) # noqa: PTH118 + + rule_base_path = Path(save_directory or RULES_DIRS[0]) + rule_path = rule_base_path / base_path + rule_id = contents.get("rule_id") + if rule_id in raw_rule_collection.id_map: + rule_path = raw_rule_collection.id_map[rule_id].path or rule_path # handle both rule json formats loaded from kibana and toml data_view_id = contents.get("data_view_id") or contents.get("rule", {}).get("data_view_id") @@ -226,7 +242,7 @@ def import_rules_into_repo( # noqa: PLR0912, PLR0913, PLR0915 contents.update( update_metadata_from_file( - Path(rule_path), {"creation_date": local_creation_date, "updated_date": local_updated_date} + rule_path, {"creation_date": local_creation_date, "updated_date": local_updated_date} ) ) diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 41dc20d059f..f251a7d5d63 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -82,18 +82,18 @@ def metadata(self) -> dict[str, Any]: @property def data(self) -> dict[str, Any]: - """Rule portion of TOML file rule.""" - return self.contents.get("data") or self.contents + """Rule portion of TOML file rule. Supports nested and flattened rule dictionaries""" + return self.contents.get("data", {}) or self.contents or self.contents.get("rule", {}) @property def id(self) -> str: - """Get the rule ID.""" - return self.data["rule_id"] # type: ignore[reportUnknownMemberType] + """Get the rule ID. Supports nested and flattened rule dictionaries.""" + return self.data.get("rule_id") or self.data.get("rule", {}).get("rule_id") @property def name(self) -> str: - """Get the rule name.""" - return self.data["name"] # type: ignore[reportUnknownMemberType] + """Get the rule name. Supports nested and flattened rule dictionaries""" + return self.data.get("name") or self.data.get("rule", {}).get("name") def __hash__(self) -> int: """Get the hash of the rule.""" diff --git a/pyproject.toml b/pyproject.toml index a9d8668fedc..48f36fe14ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.3.17" +version = "1.3.18" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12"