Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,22 @@
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.plugins.windows import pslist, pe_symbols
from volatility3.plugins.windows.malware import inlinehooks

vollog = logging.getLogger(__name__)


# EtwpEventWriteFull -> https://github.com/SolitudePy/Stealthy-ETW-Patch
# CAPA rule -> https://github.com/mandiant/capa-rules/blob/master/anti-analysis/anti-av/patch-event-tracing-for-windows-function.yml
class EtwPatch(interfaces.plugins.PluginInterface):
"""Identifies ETW (Event Tracing for Windows) patching techniques used by malware to evade detection.

This plugin examines the first opcode of key ETW functions in ntdll.dll and advapi32.dll
to detect common ETW bypass techniques such as return pointer manipulation (RET) or function
redirection (JMP). Attackers often patch these functions to prevent security tools from
receiving telemetry about process execution, API calls, and other system events.
"""
# ETW CAPA rule -> https://github.com/mandiant/capa-rules/blob/master/anti-analysis/anti-av/patch-event-tracing-for-windows-function.yml
# AMSI CAPA rule -> https://github.com/mandiant/capa-rules/blob/master/anti-analysis/anti-av/patch-antimalware-scan-interface-function.yml
# AMSI patch -> https://github.com/okankurtuluss/AMSIBypassPatch
class AvPatch(interfaces.plugins.PluginInterface):
"""Detects ETW & AMSI in-memory patching used by malware for defense evasion."""

_version = (1, 0, 0)
_required_framework_version = (2, 26, 0)

etw_functions = {
av_functions = {
"ntdll.dll": {
pe_symbols.wanted_names_identifier: [
"EtwEventWrite",
Expand All @@ -41,6 +38,14 @@ class EtwPatch(interfaces.plugins.PluginInterface):
"advapi32.dll": {
pe_symbols.wanted_names_identifier: ["EventWrite", "TraceEvent"],
},
"amsi.dll": {
pe_symbols.wanted_names_identifier: [
"AmsiScanBuffer",
"AmsiScanString",
"AmsiInitialize",
"AmsiOpenSession",
],
},
}

@classmethod
Expand All @@ -57,6 +62,9 @@ def get_requirements(cls):
requirements.VersionRequirement(
name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0)
),
requirements.VersionRequirement(
name="inlinehooks", component=inlinehooks.InlineHooks, version=(1, 0, 0)
),
requirements.ListRequirement(
name="pid",
description="Filter on specific process IDs",
Expand All @@ -66,16 +74,18 @@ def get_requirements(cls):
]

def _generator(self):
# Get all ETW function addresses before looping through processes
# Get all ETW & AMSI function addresses before looping through processes
found_symbols = pe_symbols.PESymbols.addresses_for_process_symbols(
context=self.context,
config_path=self.config_path,
kernel_module_name=self.config["kernel"],
symbols=self.etw_functions,
symbols=self.av_functions,
)

filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None))

inlineHooks = inlinehooks.InlineHooks(self.context, self.config_path)

for proc in pslist.PsList.list_processes(
context=self.context,
kernel_module_name=self.config["kernel"],
Expand All @@ -89,20 +99,19 @@ def _generator(self):
vollog.debug(f"Unable to create process layer for PID {proc_id}")
continue

# Map of opcodes to their instruction names
opcode_map = {
0xC3: "RET",
0xE9: "JMP",
}

for dll_name, functions in found_symbols.items():
for func_name, func_addr in functions:
try:
opcode = self.context.layers[proc_layer_name].read(
func_addr, 1
)[0]
if opcode in opcode_map:
instruction = opcode_map[opcode]
data = self.context.layers[proc_layer_name].read(func_addr, 24)
disasm = renderers.Disassembly(data, func_addr)
inline_hook_check = inlineHooks.check_inline_hook(
data=data, addr=func_addr
)

if inline_hook_check:
vollog.debug(
f"Inline hook detected at {func_addr:#x} in process {proc_id} ({proc_name}) for function {func_name}"
)
yield (
0,
(
Expand All @@ -111,7 +120,13 @@ def _generator(self):
dll_name,
func_name,
format_hints.Hex(func_addr),
f"{opcode:02x} ({instruction})",
inline_hook_check[1],
(
format_hints.HexBytes(inline_hook_check[0])
if inline_hook_check[0]
else format_hints.HexBytes(b"")
),
disasm,
),
)
except exceptions.InvalidAddressException:
Expand All @@ -126,8 +141,10 @@ def run(self):
("Process", str),
("DLL", str),
("Function", str),
("Offset", format_hints.Hex),
("Opcode", str),
("Hook Address", format_hints.Hex),
("Hook Info", str),
("Hook Hexdump", format_hints.HexBytes),
("Disasm", renderers.Disassembly),
],
self._generator(),
)
Loading
Loading