Skip to content
Merged
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 234 additions & 0 deletions volatility3/framework/plugins/windows/malware/pebmasquerade.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import logging
from typing import List, Union, Tuple

from volatility3.framework import interfaces, renderers, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.objects import utility
from volatility3.plugins.windows import pslist

vollog = logging.getLogger(__name__)


# https://www.ired.team/offensive-security/defense-evasion/masquerading-processes-in-userland-through-_peb
# https://github.com/FuzzySecurity/PowerShell-Suite/blob/master/Masquerade-PEB.ps1
class PebMasquerade(interfaces.plugins.PluginInterface):
"""Detects potential process name spoofing by comparing EPROCESS and PEB data."""

_version = (1, 0, 0)
_required_framework_version = (2, 0, 0)

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.ModuleRequirement(
name="kernel",
description="Windows kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="pslist", component=pslist.PsList, version=(3, 0, 0)
),
requirements.ListRequirement(
name="pid",
element_type=int,
description="Process ID to include (all other processes are excluded)",
optional=True,
),
]

@classmethod
def get_process_names(cls, proc: interfaces.objects.ObjectInterface) -> Tuple[
Union[str, renderers.NotAvailableValue],
Union[str, renderers.NotAvailableValue],
Union[str, renderers.NotAvailableValue],
Union[str, renderers.NotAvailableValue],
]:
"""Extract process names and related information from various sources (EPROCESS and PEB).

Args:
proc: The process object

Returns:
tuple: (eprocess_imagefilename, eprocess_seaudit_imagefilename, peb_imagefilepath, peb_cmdline)
"""
eprocess_imagefilename = renderers.NotAvailableValue()
eprocess_seaudit_imagefilename = renderers.NotAvailableValue()
peb_imagefilepath = renderers.NotAvailableValue()
peb_cmdline = renderers.NotAvailableValue()

try:
eprocess_imagefilename = utility.array_to_string(proc.ImageFileName)
except (AttributeError, exceptions.InvalidAddressException):
vollog.debug(
"Unable to read EPROCESS.ImageFileName for PID %d", proc.UniqueProcessId
)
except Exception as e:
vollog.warning(
"Error reading EPROCESS.ImageFileName for PID %d: %s",
proc.UniqueProcessId,
str(e),
)

try:
audit = proc.SeAuditProcessCreationInfo.ImageFileName.Name
audit_string = audit.get_string()
if audit_string:
eprocess_seaudit_imagefilename = audit_string
except exceptions.InvalidAddressException:
vollog.debug(
"Unable to read SeAuditProcessCreationInfo.ImageFileName for PID %d",
proc.UniqueProcessId,
)
except AttributeError:
vollog.debug(
"SeAuditProcessCreationInfo structure not available for PID %d",
proc.UniqueProcessId,
)
except Exception as e:
vollog.warning(
"Error reading SeAuditProcessCreationInfo for PID %d: %s",
proc.UniqueProcessId,
str(e),
)

try:
peb = proc.get_peb()
if peb and peb.ProcessParameters:
# Get ImagePathName
try:
image_path_str = peb.ProcessParameters.ImagePathName.get_string()
if image_path_str:
peb_imagefilepath = image_path_str
except (AttributeError, exceptions.InvalidAddressException):
vollog.debug(
"Unable to read PEB.ImagePathName for PID %d",
proc.UniqueProcessId,
)
except Exception as e:
vollog.warning(
"Error reading PEB.ImagePathName for PID %d: %s",
proc.UniqueProcessId,
str(e),
)

try:
cmdline_str = peb.ProcessParameters.CommandLine.get_string()
if cmdline_str:
peb_cmdline = cmdline_str
except (AttributeError, exceptions.InvalidAddressException):
vollog.debug(
"Unable to read PEB.ProcessParameters.CommandLine for PID %d",
proc.UniqueProcessId,
)
except Exception as e:
vollog.warning(
"Error reading PEB.ProcessParameters.CommandLine for PID %d: %s",
proc.UniqueProcessId,
str(e),
)
except (AttributeError, exceptions.InvalidAddressException):
# Important for cases where PEB does not exist or is inaccessible (e.g SYSTEM process)
vollog.debug("Unable to access PEB for PID %d", proc.UniqueProcessId)
except Exception as e:
vollog.warning(
"Error accessing PEB for PID %d: %s", proc.UniqueProcessId, str(e)
)

return (
eprocess_imagefilename,
eprocess_seaudit_imagefilename,
peb_imagefilepath,
peb_cmdline,
)

def _generator(self, pids, context, kernel_module_name):
pid_filter = pslist.PsList.create_pid_filter(pids)

for proc in pslist.PsList.list_processes(
context=context,
kernel_module_name=kernel_module_name,
filter_func=pid_filter,
):
proc_id = proc.UniqueProcessId
try:
peb = proc.get_peb()
except (exceptions.InvalidAddressException, AttributeError):
vollog.debug(
"Unable to access PEB for PID %d, skipping process", proc_id
)
peb_imagefilepath_length_check = False
peb_cmdline_length_check = False
(
eprocess_imagefilename,
eprocess_seaudit_imagefilename,
peb_imagefilepath,
peb_cmdline,
) = PebMasquerade.get_process_names(proc)

if isinstance(peb_imagefilepath, str) and peb:
try:

# Length values are of type USHORT
peb_imagefilepath_length = (
peb.ProcessParameters.ImagePathName.Length // 2
)
peb_imagefilepath_maxlength = (
peb.ProcessParameters.ImagePathName.MaximumLength // 2 - 1
)

if (peb_imagefilepath_length != len(peb_imagefilepath)) or (
peb_imagefilepath_maxlength != len(peb_imagefilepath)
):
peb_imagefilepath_length_check = True
except Exception as e:
vollog.warning(
"PEB.ImagePathName Length comparison error for PID %d: %s",
proc_id,
str(e),
)

if isinstance(peb_cmdline, str) and peb:
try:
# Length values are of type USHORT
peb_cmdline_length = peb.ProcessParameters.CommandLine.Length // 2
peb_cmdline_maxlength = (
peb.ProcessParameters.CommandLine.MaximumLength // 2 - 1
)

if (peb_cmdline_length != len(peb_cmdline)) or (
peb_cmdline_maxlength != len(peb_cmdline)
):
peb_cmdline_length_check = True
except Exception as e:
vollog.warning(
"PEB.CommandLine Length comparison error for PID %d: %s",
proc_id,
str(e),
)
yield (
0,
(
proc_id,
eprocess_imagefilename,
eprocess_seaudit_imagefilename,
peb_imagefilepath,
peb_cmdline_length_check,
peb_imagefilepath_length_check,
),
)

def run(self):
pids = self.config.get("pid", None)
context = self.context
kernel_module_name = self.config["kernel"]
return renderers.TreeGrid(
[
("PID", int),
("EPROCESS_ImageFileName", str),
("EPROCESS_SeAudit_ImageFileName", str),
("PEB_ImageFilePath", str),
("PEB_ImageFilePath_Spoofed", bool),
("PEB_CommandLine_Spoofed", bool),
],
self._generator(pids, context, kernel_module_name),
)
Loading