Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ def __attrs_post_init__(self):
attribute.labels = self._labels
attribute.init_tracker()

# __dict__ is added to support functools.cached_property
__slots__ = [
"name",
"_config",
"pipeline_index",
"_job_tag_for_cleanup",
"_is_shut_down",
"__dict__",
"__dict__", # __dict__ is added to support functools.cached_property
]

# instance attributes
Expand Down
38 changes: 22 additions & 16 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import logging
import os
import typing
from abc import abstractmethod
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Type, cast
from typing import TYPE_CHECKING, Any, ClassVar, Iterable, Sequence, Type, cast

from attrs import define, field, validators

Expand Down Expand Up @@ -41,13 +42,13 @@ class ProcessorResult:

processor_name : str
The name of the processor
event: Optional[dict]
event: dict | None
A reference to the event that was processed
data : Optional[list]
data : list | None
The generated extra data
errors : Optional[list]
errors : list | None
The errors that occurred during processing
warnings : Optional[list]
warnings : list | None
The warnings that occurred during processing
"""

Expand Down Expand Up @@ -82,11 +83,11 @@ class Processor(Component):
class Config(Component.Config):
"""Common Configurations"""

rules: List[str] = field(
validator=[
validators.instance_of(list),
validators.deep_iterable(member_validator=validators.instance_of((str, dict))),
]
rules: list[str] = field(
validator=validators.deep_iterable(
member_validator=validators.instance_of((str, dict)),
iterable_validator=validators.instance_of(list),
),
)
"""List of rule locations to load rules from.
In addition to paths to file directories it is possible to retrieve rules from a URI.
Expand Down Expand Up @@ -118,14 +119,19 @@ class Config(Component.Config):

def __init__(self, name: str, configuration: "Processor.Config"):
super().__init__(name, configuration)
self._rule_tree = RuleTree(config=self._config.tree_config)
self.load_rules(rules_targets=self._config.rules)
self._rule_tree = RuleTree(config=self.config.tree_config)
self.load_rules(rules_targets=self.config.rules)
self._result = None
self._bypass_rule_tree = False
if os.environ.get("LOGPREP_BYPASS_RULE_TREE"):
self._bypass_rule_tree = True
logger.debug("Bypassing rule tree for processor %s", self.name)

@property
def config(self) -> Config:
"""Provides the properly typed rule configuration object"""
return typing.cast("Processor.Config", self._config)

@property
def result(self) -> ProcessorResult:
"""Returns the current result object which is guaranteed to be non-None
Expand Down Expand Up @@ -158,7 +164,7 @@ def metric_labels(self) -> dict:
return {
"component": "processor",
"description": self.describe(),
"type": self._config.type,
"type": self.config.type,
"name": self.name,
}

Expand Down Expand Up @@ -208,7 +214,7 @@ def _process_rule(rule, event):
return event

def _process_rule_tree_multiple_times(tree: RuleTree, event: dict):
matching_rules = tree.get_matching_rules(event)
matching_rules: Iterable[Rule] = tree.get_matching_rules(event)
while matching_rules:
for rule in matching_rules:
_process_rule(rule, event)
Expand All @@ -219,7 +225,7 @@ def _process_rule_tree_once(tree: RuleTree, event: dict):
for rule in matching_rules:
_process_rule(rule, event)

if self._config.apply_multiple_times:
if self.config.apply_multiple_times:
_process_rule_tree_multiple_times(tree, event)
else:
_process_rule_tree_once(tree, event)
Expand Down Expand Up @@ -257,7 +263,7 @@ def test_rules(self) -> dict | None:

"""

def load_rules(self, rules_targets: List[str | Dict]) -> None:
def load_rules(self, rules_targets: Sequence[str | dict]) -> None:
"""method to add rules from directories or urls"""
try:
rules = RuleLoader(rules_targets, self.name).rules
Expand Down
5 changes: 3 additions & 2 deletions logprep/filter/expression/filter_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def does_match(self, document: dict) -> bool:
value = self._get_value(self.key, document)

if isinstance(value, list):
return any(filter(self._matcher.match, (str(val) for val in value)))
return any(self._matcher.match(str(v)) for v in value)

match_result = self._matcher.match(str(value))

Expand Down Expand Up @@ -339,6 +339,7 @@ def _normalize_regex(regex: str) -> str:
else:
end_token = "$"
match = RegExFilterExpression.match_parts_pattern.match(regex)
assert match, "regex is designed to always match"
flag, _, pattern = match.groups()
flag = "" if flag is None else flag
pattern = "" if pattern is None else pattern
Expand All @@ -348,7 +349,7 @@ def does_match(self, document: dict) -> bool:
value = self._get_value(self.key, document)

if isinstance(value, list):
return any(filter(self._matcher.match, value))
return any(self._matcher.match(str(v)) for v in value)
return self._matcher.match(str(value)) is not None


Expand Down
33 changes: 17 additions & 16 deletions logprep/filter/lucene_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@
import re
from itertools import chain, zip_longest

# pylint: enable=anomalous-backslash-in-string
from typing import List, Optional, Union

import luqum
from luqum.parser import IllegalCharacterError, ParseSyntaxError, parser
from luqum.tree import (
Expand Down Expand Up @@ -128,6 +125,9 @@
StringFilterExpression,
)

# pylint: enable=anomalous-backslash-in-string


logger = logging.getLogger("LuceneFilter")


Expand All @@ -143,7 +143,7 @@ class LuceneFilter:
end_escaping_pattern = re.compile(r'((?:\\)+"[\s\)]+(?:AND|OR|NOT|$))')

@staticmethod
def create(query_string: str, special_fields: dict = None) -> FilterExpression:
def create(query_string: str, special_fields: dict | None = None) -> FilterExpression:
"""Create a FilterExpression from a lucene query string.

Parameters
Expand Down Expand Up @@ -232,22 +232,22 @@ def _escape_ends_of_expressions(query_string):
class LuceneTransformer:
"""A transformer that converts a luqum tree into a FilterExpression."""

_special_fields_map = {
_special_fields_map: dict[str, type[RegExFilterExpression] | type[SigmaFilterExpression]] = {
"regex_fields": RegExFilterExpression,
"sigma_fields": SigmaFilterExpression,
}

find_unescaping_quote_pattern = re.compile(r'(?:\\)*"')
find_unescaping_end_pattern = re.compile(r"(?:\\)*$")

def __init__(self, tree: luqum.tree, special_fields: dict = None):
def __init__(self, tree: luqum.tree, special_fields: dict | None = None):
self._tree = tree

self._special_fields = {}

special_fields = special_fields if special_fields else {}
for key in self._special_fields_map:
self._special_fields[key] = special_fields.get(key) if special_fields.get(key) else []
self._special_fields[key] = special_fields.get(key, [])

self._last_search_field = None

Expand Down Expand Up @@ -308,6 +308,7 @@ def _create_field_group_expression(self, tree: luqum.tree) -> FilterExpression:
Parsed filter expression.

"""
assert self._last_search_field is not None
key = self._last_search_field.split(".")
value = self._strip_quote_from_string(tree.value)
value = self._remove_lucene_escaping(value)
Expand All @@ -317,13 +318,13 @@ def _create_field_group_expression(self, tree: luqum.tree) -> FilterExpression:
else:
return self._get_filter_expression(key, value)

def _collect_children(self, tree: luqum.tree) -> List[FilterExpression]:
def _collect_children(self, tree: luqum.tree) -> list[FilterExpression]:
expressions = []
for child in tree.children:
expressions.append(self._parse_tree(child))
return expressions

def _create_field(self, tree: luqum.tree) -> Optional[FilterExpression]:
def _create_field(self, tree: luqum.tree) -> FilterExpression:
if isinstance(tree.expr, (Phrase, Word)):
key = tree.name.replace("\\", "")
key = key.split(".")
Expand All @@ -333,7 +334,7 @@ def _create_field(self, tree: luqum.tree) -> Optional[FilterExpression]:
value = self._strip_quote_from_string(tree.expr.value)
value = self._remove_lucene_escaping(value)
return self._get_filter_expression(key, value)
elif isinstance(tree.expr, Regex):
if isinstance(tree.expr, Regex):
key = tree.name.replace("\\", "")
key = key.split(".")
if tree.expr.value == "null":
Expand All @@ -342,7 +343,7 @@ def _create_field(self, tree: luqum.tree) -> Optional[FilterExpression]:
value = self._strip_quote_from_string(tree.expr.value)
value = self._remove_lucene_escaping(value)
return self._get_filter_expression_regex(key, value)
return None
raise LuceneFilterError(f'The expression "{str(tree)}" is invalid!')

@staticmethod
def _check_key_and_modifier(key, value):
Expand All @@ -353,8 +354,8 @@ def _check_key_and_modifier(key, value):
return None

def _get_filter_expression(
self, key: List[str], value
) -> Union[RegExFilterExpression, StringFilterExpression]:
self, key: list[str], value
) -> RegExFilterExpression | StringFilterExpression | SigmaFilterExpression:

key_and_modifier_check = LuceneTransformer._check_key_and_modifier(key, value)
if key_and_modifier_check is not None:
Expand All @@ -376,8 +377,8 @@ def _get_filter_expression(
return StringFilterExpression(key, value)

def _get_filter_expression_regex(
self, key: List[str], value
) -> Union[RegExFilterExpression, StringFilterExpression]:
self, key: list[str], value
) -> RegExFilterExpression | StringFilterExpression:

key_and_modifier_check = LuceneTransformer._check_key_and_modifier(key, value)
if key_and_modifier_check is not None:
Expand All @@ -387,7 +388,7 @@ def _get_filter_expression_regex(
return RegExFilterExpression(key, value)

@staticmethod
def _create_value_expression(word: luqum.tree) -> Union[Exists, Always]:
def _create_value_expression(word: luqum.tree) -> Exists | Always:
value = word.value.replace("\\", "")
value = value.split(".")
if value == ["*"]:
Expand Down
3 changes: 1 addition & 2 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# type: ignore

# mypy: ignore-errors
"""This module contains all Pipeline functionality.

Pipelines contain a list of processors that can be executed in order to process input log data.
Expand Down
6 changes: 3 additions & 3 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def __init__(self, configuration: Configuration):
self.restart_count: int = 0
self.restart_timeout_ms: int = random.randint(100, 1000)
self.metrics = self.Metrics(labels={"component": "manager"})
self.loghandler: LogprepMPQueueListener = None
self.loghandler: LogprepMPQueueListener | None = None
self.error_queue: multiprocessing.Queue | None = None
self._error_listener: OutputQueueListener | None = None
self._configuration: Configuration = configuration
Expand Down Expand Up @@ -246,7 +246,7 @@ def _increase_to_count(self, count: int):
def _decrease_to_count(self, count: int):
while len(self._pipelines) > count:
pipeline_process = self._pipelines.pop()
pipeline_process.stop()
pipeline_process.stop() # type: ignore
pipeline_process.join()
self.metrics.number_of_pipeline_stops += 1

Expand Down Expand Up @@ -328,7 +328,7 @@ def _create_pipeline(self, index) -> multiprocessing.Process:
process = multiprocessing.Process(
target=pipeline.run, daemon=True, name=f"Pipeline-{index}"
)
process.stop = pipeline.stop
process.stop = pipeline.stop # type: ignore
process.start()
logger.info("Created new pipeline")
return process
Expand Down
Loading
Loading