diff --git a/volatility3/framework/plugins/windows/malware/pebmasquerade.py b/volatility3/framework/plugins/windows/malware/pebmasquerade.py new file mode 100644 index 0000000000..dd85fb5701 --- /dev/null +++ b/volatility3/framework/plugins/windows/malware/pebmasquerade.py @@ -0,0 +1,234 @@ +import logging +from typing import List, Union, Tuple + +from volatility3.framework import interfaces, renderers, exceptions +from volatility3.framework.configuration import requirements +from volatility3.framework.objects import utility +from volatility3.plugins.windows import pslist + +vollog = logging.getLogger(__name__) + + +# https://www.ired.team/offensive-security/defense-evasion/masquerading-processes-in-userland-through-_peb +# https://github.com/FuzzySecurity/PowerShell-Suite/blob/master/Masquerade-PEB.ps1 +class PebMasquerade(interfaces.plugins.PluginInterface): + """Detects potential process name spoofing by comparing EPROCESS and PEB data.""" + + _version = (1, 0, 0) + _required_framework_version = (2, 27, 0) + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(3, 0, 0) + ), + requirements.ListRequirement( + name="pid", + element_type=int, + description="Process ID to include (all other processes are excluded)", + optional=True, + ), + ] + + @classmethod + def get_process_names(cls, proc: interfaces.objects.ObjectInterface) -> Tuple[ + Union[str, renderers.NotAvailableValue], + Union[str, renderers.NotAvailableValue], + Union[str, renderers.NotAvailableValue], + Union[str, renderers.NotAvailableValue], + ]: + """Extract process names and related information from various sources (EPROCESS and PEB). + + Args: + proc: The process object + + Returns: + tuple: (eprocess_imagefilename, eprocess_seaudit_imagefilename, peb_imagefilepath, peb_cmdline) + """ + eprocess_imagefilename = renderers.NotAvailableValue() + eprocess_seaudit_imagefilename = renderers.NotAvailableValue() + peb_imagefilepath = renderers.NotAvailableValue() + peb_cmdline = renderers.NotAvailableValue() + + try: + eprocess_imagefilename = utility.array_to_string(proc.ImageFileName) + except (AttributeError, exceptions.InvalidAddressException): + vollog.debug( + "Unable to read EPROCESS.ImageFileName for PID %d", proc.UniqueProcessId + ) + except Exception as e: + vollog.warning( + "Error reading EPROCESS.ImageFileName for PID %d: %s", + proc.UniqueProcessId, + str(e), + ) + + try: + audit = proc.SeAuditProcessCreationInfo.ImageFileName.Name + audit_string = audit.get_string() + if audit_string: + eprocess_seaudit_imagefilename = audit_string + except exceptions.InvalidAddressException: + vollog.debug( + "Unable to read SeAuditProcessCreationInfo.ImageFileName for PID %d", + proc.UniqueProcessId, + ) + except AttributeError: + vollog.debug( + "SeAuditProcessCreationInfo structure not available for PID %d", + proc.UniqueProcessId, + ) + except Exception as e: + vollog.warning( + "Error reading SeAuditProcessCreationInfo for PID %d: %s", + proc.UniqueProcessId, + str(e), + ) + + try: + peb = proc.get_peb() + if peb and peb.ProcessParameters: + # Get ImagePathName + try: + image_path_str = peb.ProcessParameters.ImagePathName.get_string() + if image_path_str: + peb_imagefilepath = image_path_str + except (AttributeError, exceptions.InvalidAddressException): + vollog.debug( + "Unable to read PEB.ImagePathName for PID %d", + proc.UniqueProcessId, + ) + except Exception as e: + vollog.warning( + "Error reading PEB.ImagePathName for PID %d: %s", + proc.UniqueProcessId, + str(e), + ) + + try: + cmdline_str = peb.ProcessParameters.CommandLine.get_string() + if cmdline_str: + peb_cmdline = cmdline_str + except (AttributeError, exceptions.InvalidAddressException): + vollog.debug( + "Unable to read PEB.ProcessParameters.CommandLine for PID %d", + proc.UniqueProcessId, + ) + except Exception as e: + vollog.warning( + "Error reading PEB.ProcessParameters.CommandLine for PID %d: %s", + proc.UniqueProcessId, + str(e), + ) + except (AttributeError, exceptions.InvalidAddressException): + # Important for cases where PEB does not exist or is inaccessible (e.g SYSTEM process) + vollog.debug("Unable to access PEB for PID %d", proc.UniqueProcessId) + except Exception as e: + vollog.warning( + "Error accessing PEB for PID %d: %s", proc.UniqueProcessId, str(e) + ) + + return ( + eprocess_imagefilename, + eprocess_seaudit_imagefilename, + peb_imagefilepath, + peb_cmdline, + ) + + def _generator(self, pids, context, kernel_module_name): + pid_filter = pslist.PsList.create_pid_filter(pids) + + for proc in pslist.PsList.list_processes( + context=context, + kernel_module_name=kernel_module_name, + filter_func=pid_filter, + ): + proc_id = proc.UniqueProcessId + try: + peb = proc.get_peb() + except (exceptions.InvalidAddressException, AttributeError): + vollog.debug( + "Unable to access PEB for PID %d, skipping process", proc_id + ) + peb_imagefilepath_length_check = False + peb_cmdline_length_check = False + ( + eprocess_imagefilename, + eprocess_seaudit_imagefilename, + peb_imagefilepath, + peb_cmdline, + ) = PebMasquerade.get_process_names(proc) + + if isinstance(peb_imagefilepath, str) and peb: + try: + + # Length values are of type USHORT + peb_imagefilepath_length = ( + peb.ProcessParameters.ImagePathName.Length // 2 + ) + peb_imagefilepath_maxlength = ( + peb.ProcessParameters.ImagePathName.MaximumLength // 2 - 1 + ) + + if (peb_imagefilepath_length != len(peb_imagefilepath)) or ( + peb_imagefilepath_maxlength != len(peb_imagefilepath) + ): + peb_imagefilepath_length_check = True + except Exception as e: + vollog.warning( + "PEB.ImagePathName Length comparison error for PID %d: %s", + proc_id, + str(e), + ) + + if isinstance(peb_cmdline, str) and peb: + try: + # Length values are of type USHORT + peb_cmdline_length = peb.ProcessParameters.CommandLine.Length // 2 + peb_cmdline_maxlength = ( + peb.ProcessParameters.CommandLine.MaximumLength // 2 - 1 + ) + + if (peb_cmdline_length != len(peb_cmdline)) or ( + peb_cmdline_maxlength != len(peb_cmdline) + ): + peb_cmdline_length_check = True + except Exception as e: + vollog.warning( + "PEB.CommandLine Length comparison error for PID %d: %s", + proc_id, + str(e), + ) + yield ( + 0, + ( + proc_id, + eprocess_imagefilename, + eprocess_seaudit_imagefilename, + peb_imagefilepath, + peb_cmdline_length_check, + peb_imagefilepath_length_check, + ), + ) + + def run(self): + pids = self.config.get("pid", None) + context = self.context + kernel_module_name = self.config["kernel"] + return renderers.TreeGrid( + [ + ("PID", int), + ("EPROCESS_ImageFileName", str), + ("EPROCESS_SeAudit_ImageFileName", str), + ("PEB_ImageFilePath", str), + ("PEB_ImageFilePath_Spoofed", bool), + ("PEB_CommandLine_Spoofed", bool), + ], + self._generator(pids, context, kernel_module_name), + )