Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

### Development

- rules: prune rules with incompatible OS/arch/format requirements before analysis to skip them across all scopes #2127
- doc: document that default output shows top-level matches only; -v/-vv show nested matches @devs6186 #1410
- doc: fix typo in usage.md, add documentation links to README @devs6186 #2274
- doc: add table comparing ways to consume capa output (CLI, IDA, Ghidra, dynamic sandbox, web) @devs6186 #2273
Expand Down
10 changes: 10 additions & 0 deletions capa/capabilities/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,16 @@ def find_dynamic_capabilities(
# Tuples are immutable, so `t += (x,)` copies the entire tuple each time.
process_feature_counts: list[rdoc.ProcessFeatureCount] = []

# Prune rules that cannot match this binary's global features (OS, arch, format)
# once, before the per-process matching loop. This eliminates rules that could
# never match — e.g. Windows-specific rules when analysing a Linux trace — from all
# per-process, per-thread, and per-call evaluations.
# See: https://github.com/mandiant/capa/issues/2127
global_features: FeatureSet = collections.defaultdict(set)
for feature, addr in extractor.extract_global_features():
global_features[feature].add(addr)
ruleset = ruleset.filter_rules_by_meta_features(global_features)

assert isinstance(extractor, DynamicFeatureExtractor)
processes: list[ProcessHandle] = list(extractor.get_processes())
n_processes: int = len(processes)
Expand Down
10 changes: 10 additions & 0 deletions capa/capabilities/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ def find_static_capabilities(
function_feature_counts: list[rdoc.FunctionFeatureCount] = []
library_functions_list: list[rdoc.LibraryFunction] = []

# Prune rules that cannot match this binary's global features (OS, arch, format)
# once, before the per-function matching loop. This eliminates rules that could
# never match — e.g. Windows-specific rules when analysing a Linux ELF — from all
# per-function, per-basic-block, and per-instruction evaluations.
# See: https://github.com/mandiant/capa/issues/2127
global_features: FeatureSet = collections.defaultdict(set)
for feature, addr in extractor.extract_global_features():
global_features[feature].add(addr)
ruleset = ruleset.filter_rules_by_meta_features(global_features)

assert isinstance(extractor, StaticFeatureExtractor)
functions: list[FunctionHandle] = list(extractor.get_functions())
n_funcs: int = len(functions)
Expand Down
187 changes: 187 additions & 0 deletions capa/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,12 @@ def __init__(
self.rules = {rule.name: rule for rule in rules}
self.rules_by_namespace = index_rules_by_namespace(rules)
self.rules_by_scope = {scope: self._get_rules_for_scope(rules, scope) for scope in scopes}
self._dependencies_by_rule_name = {
rule.name: set(rule.get_dependencies(self.rules_by_namespace)) for rule in rules
}
self._rules_with_global_features = {
rule.name for rule in rules if self._statement_uses_global_features(rule.statement)
}

# these structures are unstable and may change before the next major release.
scores_by_rule: dict[str, int] = {}
Expand Down Expand Up @@ -1920,6 +1926,184 @@ def filter_rules_by_meta(self, tag: str) -> "RuleSet":
break
return RuleSet(list(rules_filtered))

@staticmethod
def _statement_uses_global_features(node: Union[Feature, Statement]) -> bool:
if isinstance(node, capa.features.common.Feature):
return capa.features.common.is_global_feature(node)

return any(RuleSet._statement_uses_global_features(child) for child in node.get_children())

def _clone_with_rule_subset(self, rule_names: set[str]) -> "RuleSet":
"""
Return a shallow-then-filtered clone of this RuleSet restricted to ``rule_names``.

All derived data structures are filtered so the hot ``_match`` path needs no runtime
guards to skip pruned rules. The cost — O(total index entries across all scopes) — is
paid once at analysis start rather than on every per-function / per-instruction call.
"""
clone = copy.copy(self)

clone.rules = {name: self.rules[name] for name in self.rules if name in rule_names}
clone._rules_with_global_features = self._rules_with_global_features & rule_names
clone._dependencies_by_rule_name = {
name: deps for name, deps in self._dependencies_by_rule_name.items() if name in rule_names
}

# Filter per-scope rule lists (preserves topological order; used by paranoid mode).
clone.rules_by_scope = {
scope: [rule for rule in scope_rules if rule.name in rule_names]
for scope, scope_rules in self.rules_by_scope.items()
}

# Filter topological index: gaps in index values are fine because the values are only
# used as sort keys, and the relative order of surviving rules is already correct.
clone._rule_index_by_scope = {
scope: {name: idx for name, idx in rule_index.items() if name in rule_names}
for scope, rule_index in self._rule_index_by_scope.items()
}

# Filter namespace index (values are lists of Rule objects, not strings).
clone.rules_by_namespace = {
namespace: [rule for rule in ns_rules if rule.name in rule_names]
for namespace, ns_rules in self.rules_by_namespace.items()
}
# Drop namespaces that became empty after pruning.
clone.rules_by_namespace = {ns: rules for ns, rules in clone.rules_by_namespace.items() if rules}

# Filter feature indexes: remove pruned rule names from every set in rules_by_feature,
# and drop string/bytes scan entries for pruned rules.
clone._feature_indexes_by_scopes = {}
for scope, feature_index in self._feature_indexes_by_scopes.items():
new_rules_by_feature: dict[Feature, set[str]] = {}
for feature, rule_set in feature_index.rules_by_feature.items():
filtered_set = rule_set & rule_names
if filtered_set:
new_rules_by_feature[feature] = filtered_set
clone._feature_indexes_by_scopes[scope] = RuleSet._RuleFeatureIndex(
new_rules_by_feature,
{name: feats for name, feats in feature_index.string_rules.items() if name in rule_names},
{name: feats for name, feats in feature_index.bytes_rules.items() if name in rule_names},
)

return clone

def filter_rules_by_meta_features(self, features: FeatureSet) -> "RuleSet":
"""
Return a new RuleSet with rules removed whose global-feature requirements
cannot be satisfied by the binary under analysis.

Global features — OS, architecture, and format — are determined once at the
start of analysis from the binary's headers. Any rule that requires, for
example, ``os: windows`` while we are analyzing a Linux ELF can never match
and is safe to discard before the per-function matching loop begins.

The filtering is conservative: a rule is only removed when its global-feature
constraints are *provably* unsatisfiable. Rules with no global-feature
constraints, or with ``os: any``-style wildcards, are always kept.

Rules that are kept as transitive dependencies of other kept rules are also
retained, so the returned RuleSet always satisfies internal dependency
invariants.

Args:
features: the global FeatureSet for the binary (typically the output of
``extractor.extract_global_features()``).

Returns:
A new :class:`RuleSet` with incompatible rules removed, or *self* if
no rules were pruned.
"""
global_features: FeatureSet = {
feature: locations
for feature, locations in features.items()
if capa.features.common.is_global_feature(feature)
}

if not global_features:
return self

rules_with_global_features = getattr(self, "_rules_with_global_features", None)
if rules_with_global_features is None:
rules_with_global_features = {
rule.name for rule in self.rules.values() if self._statement_uses_global_features(rule.statement)
}
self._rules_with_global_features = rules_with_global_features

if not rules_with_global_features:
return self

def can_match(node) -> bool:
"""
Return True if *node* might be satisfiable given the known global features.
Returns False only when provably unsatisfiable.
"""
if isinstance(node, capa.features.common.Feature):
if capa.features.common.is_global_feature(node):
return bool(node.evaluate(global_features))
return True

if isinstance(node, ceng.Not):
return True

if isinstance(node, ceng.And):
return all(can_match(child) for child in node.children)

if isinstance(node, ceng.Or):
return any(can_match(child) for child in node.children)

if isinstance(node, ceng.Some):
if node.count == 0:
return True
return sum(1 for child in node.children if can_match(child)) >= node.count

if isinstance(node, ceng.Range):
if node.min == 0:
return True
return can_match(node.child)

return True

compatible_rule_names = set(self.rules) - rules_with_global_features
compatible_rule_names.update(
rule_name for rule_name in rules_with_global_features if can_match(self.rules[rule_name].statement)
)

if len(compatible_rule_names) == len(self.rules):
return self

# Collect the surviving rules plus all of their transitive dependencies
# to ensure RuleSet dependency invariants are maintained.
dependencies_by_rule_name = getattr(self, "_dependencies_by_rule_name", None)
if dependencies_by_rule_name is None:
dependencies_by_rule_name = {
rule.name: set(rule.get_dependencies(self.rules_by_namespace)) for rule in self.rules.values()
}
self._dependencies_by_rule_name = dependencies_by_rule_name

rules_to_keep: set[str] = set(compatible_rule_names)
stack: list[str] = list(compatible_rule_names)
while stack:
rule_name = stack.pop()
for dependency_name in dependencies_by_rule_name.get(rule_name, ()):
if dependency_name not in self.rules:
continue
if dependency_name in rules_to_keep:
continue
rules_to_keep.add(dependency_name)
stack.append(dependency_name)

pruned_count = len(self.rules) - len(rules_to_keep)
if pruned_count == 0:
return self

logger.debug(
"pruned %d rules incompatible with global features (%s)",
pruned_count,
", ".join(f"{f.name}: {f.value!r}" for f in global_features),
)

return self._clone_with_rule_subset(rules_to_keep)

# this routine is unstable and may change before the next major release.
@staticmethod
def _sort_rules_by_index(rule_index_by_rule_name: dict[str, int], rules: list[Rule]):
Expand Down Expand Up @@ -2093,6 +2277,9 @@ def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[Fea
new_candidates.extend(feature_index.rules_by_feature.get(new_feature, ()))

if new_candidates:
new_candidates = [
rule_name for rule_name in new_candidates if rule_name not in candidate_rule_names
]
candidate_rule_names.update(new_candidates)
candidate_rules.extend([self.rules[rule_name] for rule_name in new_candidates])
RuleSet._sort_rules_by_index(rule_index_by_rule_name, candidate_rules)
Expand Down
Loading
Loading