Skip to content

fix: Allow different order of the metadata fields in ESQL queries #4956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,9 +934,15 @@ def validates_esql_data(self, data: dict[str, Any], **_: Any) -> None:
# Convert the query string to lowercase to handle case insensitivity
query_lower = data["query"].lower()

# Combine both patterns using an OR operator and compile the regex
# Combine both patterns using an OR operator and compile the regex.
# The first part matches the metadata fields in the from clause by allowing one or
# multiple indices and any order of the metadata fields
# The second part matches the stats command with the by clause
combined_pattern = re.compile(
r"(from\s+\S+\s+metadata\s+_id,\s*_version,\s*_index)|(\bstats\b.*?\bby\b)", re.DOTALL
r"(from\s+(?:\S+\s*,\s*)*\S+\s+metadata\s+"
r"(?:_id|_version|_index)(?:,\s*(?:_id|_version|_index)){2})"
r"|(\bstats\b.*?\bby\b)",
re.DOTALL,
)

# Ensure that non-aggregate queries have metadata
Expand All @@ -948,7 +954,9 @@ def validates_esql_data(self, data: dict[str, Any], **_: Any) -> None:
)

# Enforce KEEP command for ESQL rules
if "| keep" not in query_lower:
# Match | followed by optional whitespace/newlines and then 'keep'
keep_pattern = re.compile(r"\|\s*keep\b", re.IGNORECASE | re.DOTALL)
if not keep_pattern.search(query_lower):
raise ValidationError(
f"Rule: {data['name']} does not contain a 'keep' command -> Add a 'keep' command to the query."
)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.3.18"
version = "1.3.19"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
54 changes: 54 additions & 0 deletions tests/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
import copy
import unittest
import uuid
from pathlib import Path

import eql
import pytest
import pytoml
from marshmallow import ValidationError
from semver import Version

from detection_rules import utils
from detection_rules.config import load_current_package_version
from detection_rules.rule import TOMLRuleContents
from detection_rules.rule_loader import RuleCollection
from detection_rules.schemas import RULES_CONFIG, downgrade
from detection_rules.version_lock import VersionLockFile

Expand Down Expand Up @@ -302,3 +306,53 @@ def test_stack_schema_map(self):
stack_map = utils.load_etc_dump(["stack-schema-map.yaml"])
err_msg = f"There is no entry defined for the current package ({package_version}) in the stack-schema-map"
self.assertIn(package_version, [Version.parse(v) for v in stack_map], err_msg)


class TestESQLValidation(unittest.TestCase):
"""Test ESQL rule validation"""

def test_esql_data_validation(self):
"""Test ESQL rule data validation"""

# A random ESQL rule to deliver a test query
rule_path = Path("rules/windows/defense_evasion_posh_obfuscation_index_reversal.toml")
rule_body = rule_path.read_text()
rule_dict = pytoml.loads(rule_body)

# Most used order of the metadata fields
query = """
FROM logs-windows.powershell_operational* METADATA _id, _version, _index
| WHERE event.code == "4104"
| KEEP event.count
"""
rule_dict["rule"]["query"] = query
_ = RuleCollection().load_dict(rule_dict, path=rule_path)

# The order of the metadata fields from the example in the docs -
# https://www.elastic.co/guide/en/security/8.17/rules-ui-create.html#esql-non-agg-query
query = """
FROM logs-windows.powershell_operational* METADATA _id, _index, _version
| WHERE event.code == "4104"
| KEEP event.count
"""
rule_dict["rule"]["query"] = query
_ = RuleCollection().load_dict(rule_dict, path=rule_path)

# Different metadata fields
with pytest.raises(ValidationError):
query = """
FROM logs-windows.powershell_operational* METADATA _foo, _index
| WHERE event.code == "4104"
| KEEP event.count
"""
rule_dict["rule"]["query"] = query
_ = RuleCollection().load_dict(rule_dict, path=rule_path)

# Missing `keep`
with pytest.raises(ValidationError):
query = """
FROM logs-windows.powershell_operational* METADATA _id, _index, _version
| WHERE event.code == "4104"
"""
rule_dict["rule"]["query"] = query
_ = RuleCollection().load_dict(rule_dict, path=rule_path)
Loading