Skip to content
Open
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a30a278
process_spoofing plugin
SolitudePy Jun 2, 2025
8d5552e
cosmetics
SolitudePy Jun 2, 2025
a47a8a7
added mechanism for deleted exe
SolitudePy Jun 4, 2025
69c2762
categorize as a malware plugin
SolitudePy Jun 7, 2025
27a951e
Plugins: precise exception handling in process_spoofing
SolitudePy Jun 25, 2025
61f5bb6
Plugins: remove exe_file dereference()
SolitudePy Jun 25, 2025
6a905ee
Plugins: convert useful methods to classmethods
SolitudePy Jun 25, 2025
9ff54fd
Plugins: consistent return values in process_spoofing
SolitudePy Jun 25, 2025
2a66de7
Plugins: process_spoofing log exceptions as debug
SolitudePy Jun 25, 2025
5bb992d
Plugins: more precise exception handling in process_spoofing
SolitudePy Jun 25, 2025
c5ef15e
Plugins: swap notes for boolean flags in process_spoofing
SolitudePy Jun 25, 2025
da47c40
black
SolitudePy Jun 25, 2025
93bdcb3
Plugins: determine process exe deletion structurally
SolitudePy Jun 25, 2025
91ff5eb
Plugins: make get_executable_path more accurate in process_spoofing
SolitudePy Jun 25, 2025
d13b21e
Plugins: utilize linuxutilities.path_for_file (deleted) logic solely
SolitudePy Oct 1, 2025
fad88b8
Plugins: truncate deleted to check spoofing in process_spoofing
SolitudePy Oct 1, 2025
c94b039
Plugins: process_spoofing change extract_process_names to classmethod
SolitudePy Oct 1, 2025
64ef8e3
Plugins: process spoofing handle none values and set more classmethods
SolitudePy Oct 1, 2025
33911c2
black and ruff
SolitudePy Oct 1, 2025
41fa262
Plugins: process_spoofing bump required_framework_version
SolitudePy Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 287 additions & 0 deletions volatility3/framework/plugins/linux/malware/process_spoofing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
from pathlib import PurePosixPath
from typing import Optional, Tuple, Iterator

from volatility3.framework import exceptions, interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.objects import utility
from volatility3.framework.symbols import linux
from volatility3.plugins.linux import pslist

vollog = logging.getLogger(__name__)


# https://github.com/SolitudePy/linux-mal
class ProcessSpoofing(plugins.PluginInterface):
"""Detects process spoofing by comparing executable path to cmdline & comm fields"""

_required_framework_version = (2, 27, 0)
_version = (1, 1, 0)

@classmethod
def get_requirements(cls):
return [
requirements.ModuleRequirement(
name="kernel",
description="Linux kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="pslist", component=pslist.PsList, version=(4, 0, 0)
),
requirements.VersionRequirement(
name="linuxutils", component=linux.LinuxUtilities, version=(2, 0, 0)
),
requirements.ListRequirement(
name="pid",
description="Filter on specific process IDs",
element_type=int,
optional=True,
),
]

@classmethod
def get_executable_path(
cls,
context: interfaces.context.ContextInterface,
task: interfaces.objects.ObjectInterface,
) -> Optional[str]:
"""
Extract the executable path from task_struct.mm.exe_file

Args:
context: The context to operate on
task: task_struct object of the process

Returns:
Returns executable path or None if not available
"""

try:
mm = task.mm
except (exceptions.InvalidAddressException, AttributeError) as e:
vollog.debug(f"Unable to access mm for task at {task.vol.offset:#x}: {e}")
return None

if not mm or not mm.is_readable():
# Kernel threads don't have mm struct
return None

try:
exe_file = mm.exe_file
except (exceptions.InvalidAddressException, AttributeError) as e:
vollog.debug(
f"Unable to access exe_file for task at {task.vol.offset:#x}: {e}"
)
return None

if not exe_file or not exe_file.is_readable():
return None

try:
exe_path = linux.LinuxUtilities.path_for_file(context, task, exe_file)
except (exceptions.InvalidAddressException, AttributeError) as e:
vollog.debug(
f"Unable to read exe_file path for task at {task.vol.offset:#x}: {e}"
)
return None

return exe_path

@classmethod
def get_cmdline_basename(
cls,
context: interfaces.context.ContextInterface,
task: interfaces.objects.ObjectInterface,
) -> Optional[str]:
"""
Extract the command line arguments and return the basename of the first argument

Args:
context: The context to operate on
task: task_struct object of the process

Returns:
Basename of the first command line argument or None if not available
"""
mm = task.mm
if not mm or not mm.is_readable():
return None

proc_layer_name = task.add_process_layer()
if proc_layer_name is None:
return None

proc_layer = context.layers[proc_layer_name]
start = task.mm.arg_start
size_to_read = task.mm.arg_end - task.mm.arg_start

if not (0 < size_to_read <= 4096):
return None

try:
argv = proc_layer.read(start, size_to_read)
except exceptions.InvalidAddressException as e:
vollog.debug(
f"Unable to read cmdline for task at {task.vol.offset:#x}: {e}"
)
return None

# Parse the arguments - they are null byte terminated
args_str = argv.decode(encoding="utf8", errors="replace")
args_list = args_str.split("\x00")
if args_list and args_list[0]:
basename = PurePosixPath(args_list[0]).name
return basename
else:
return None

@classmethod
def get_comm(cls, task: interfaces.objects.ObjectInterface) -> Optional[str]:
"""
Extract the comm field from task_struct

Args:
task: task_struct object of the process

Returns:
Process name from comm field or None if not available
"""
try:
return utility.array_to_string(task.comm)
except (exceptions.InvalidAddressException, AttributeError) as e:
vollog.debug(f"Unable to read comm for task at {task.vol.offset:#x}: {e}")
return None

@classmethod
def extract_process_names(
cls,
context: interfaces.context.ContextInterface,
task: interfaces.objects.ObjectInterface,
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], bool]:
"""
Extract all process name sources for comparison

Returns:
Tuple of (exe_path, exe_basename, cmdline_basename, comm)
"""
exe_path = cls.get_executable_path(context, task)
exe_basename = PurePosixPath(exe_path).name if exe_path else None
if exe_basename and exe_basename.endswith(" (deleted)"):
exe_basename = exe_basename[: -len(" (deleted)")]
cmdline_basename = cls.get_cmdline_basename(context, task)
comm = cls.get_comm(task)

return exe_path, exe_basename, cmdline_basename, comm

def _detect_spoofing(
self,
exe_basename: Optional[str],
cmdline_basename: Optional[str],
comm: Optional[str],
) -> Tuple[bool, bool]:
"""
Analyze the three name sources to detect potential spoofing

Args:
exe_basename: Basename from exe_file path
cmdline_basename: Basename from command line
comm: Name from comm field

Returns:
Tuple of (cmdline_spoofed, comm_spoofed) boolean flags
"""
# Skip kernel threads - need at least 2 sources for comparison
available_sources = sum(
1 for name in [exe_basename, cmdline_basename, comm] if name
)
if available_sources < 2:
return False, False

# Check for cmdline spoofing
cmdline_spoofed = False
if exe_basename and cmdline_basename:
cmdline_spoofed = exe_basename != cmdline_basename

# Check for comm spoofing (comm is truncated to 15 characters)
comm_spoofed = False
if exe_basename and comm:
comm_spoofed = exe_basename[:15] != comm

return cmdline_spoofed, comm_spoofed

def _generator(self, tasks) -> Iterator[Tuple[int, Tuple]]:
"""
Generate process spoofing detection results

Args:
tasks: Iterator of task_struct objects

Yields:
Tuple containing process information and spoofing analysis
"""
for task in tasks:
try:
pid = task.pid
ppid = task.get_parent_pid()

exe_path, exe_basename, cmdline_basename, comm = (
self.extract_process_names(self.context, task)
)

cmdline_spoofed, comm_spoofed = self._detect_spoofing(
exe_basename, cmdline_basename, comm
)

is_deleted = exe_path.endswith(" (deleted)") if exe_path else False

# Convert None values to strings for TreeGrid compatibility
exe_path_render = exe_path if exe_path else "N/A"
cmdline_render = cmdline_basename if cmdline_basename else "N/A"
comm_render = comm if comm else "N/A"

yield (
0,
(
pid,
ppid,
exe_path_render,
cmdline_render,
comm_render,
cmdline_spoofed,
comm_spoofed,
is_deleted,
),
)

except (exceptions.InvalidAddressException, AttributeError) as e:
vollog.warning(
f"Unable to process task PID {getattr(task, 'pid', 'unknown')} at {task.vol.offset:#x}: {e}"
)
continue

def run(self):
filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None))

return renderers.TreeGrid(
[
("PID", int),
("PPID", int),
("Exe_Path", str),
("Cmdline_Basename", str),
("Comm", str),
("Cmdline_Spoofed", bool),
("Comm_Spoofed", bool),
("Exe_Deleted", bool),
],
self._generator(
pslist.PsList.list_tasks(
self.context, self.config["kernel"], filter_func=filter_func
)
),
)
Loading