diff --git a/volatility3/framework/constants/linux/__init__.py b/volatility3/framework/constants/linux/__init__.py index 0567b8574c..7c485d3c39 100644 --- a/volatility3/framework/constants/linux/__init__.py +++ b/volatility3/framework/constants/linux/__init__.py @@ -339,3 +339,11 @@ class PT_FLAGS(Flag): def flags(self) -> str: """Returns the ptrace flags string""" return str(self).replace(self.__class__.__name__ + ".", "") + + +# Valid sizes for modules. Note that the Linux kernel does not define these values; they +# are based on empirical observations of typical memory allocations for kernel modules. +# We use this to verify that the found module falls within reasonable limits. +MODULE_MAXIMUM_CORE_SIZE = 20000000 +MODULE_MAXIMUM_CORE_TEXT_SIZE = 20000000 +MODULE_MINIMUM_SIZE = 4096 diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py new file mode 100644 index 0000000000..fd4b289430 --- /dev/null +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -0,0 +1,246 @@ +# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# +import logging +from typing import List, Set, Tuple, Iterable +from volatility3.framework import renderers, interfaces, exceptions, objects +from volatility3.framework.constants import architectures +from volatility3.framework.renderers import format_hints +from volatility3.framework.configuration import requirements +from volatility3.plugins.linux import lsmod + +vollog = logging.getLogger(__name__) + + +class Hidden_modules(interfaces.plugins.PluginInterface): + """Carves memory to find hidden kernel modules""" + + _required_framework_version = (2, 10, 0) + + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Linux kernel", + architectures=architectures.LINUX_ARCHS, + ), + requirements.PluginRequirement( + name="lsmod", plugin=lsmod.Lsmod, version=(2, 0, 0) + ), + ] + + @staticmethod + def get_modules_memory_boundaries( + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + ) -> Tuple[int]: + """Determine the boundaries of the module allocation area + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + + Returns: + A tuple containing the minimum and maximum addresses for the module allocation area. + """ + vmlinux = context.modules[vmlinux_module_name] + if vmlinux.has_symbol("mod_tree"): + # Kernel >= 5.19 58d208de3e8d87dbe196caf0b57cc58c7a3836ca + mod_tree = vmlinux.object_from_symbol("mod_tree") + modules_addr_min = mod_tree.addr_min + modules_addr_max = mod_tree.addr_max + elif vmlinux.has_symbol("module_addr_min"): + # 2.6.27 <= kernel < 5.19 3a642e99babe0617febb6f402e1e063479f489db + modules_addr_min = vmlinux.object_from_symbol("module_addr_min") + modules_addr_max = vmlinux.object_from_symbol("module_addr_max") + + if isinstance(modules_addr_min, objects.Void): + raise exceptions.VolatilityException( + "Your ISF symbols lack type information. You may need to update the" + "ISF using the latest version of dwarf2json" + ) + else: + raise exceptions.VolatilityException( + "Cannot find the module memory allocation area. Unsupported kernel" + ) + + return modules_addr_min, modules_addr_max + + @classmethod + def _get_module_address_alignment( + cls, + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + ) -> int: + """Obtain the module memory address alignment. + + struct module is aligned to the L1 cache line, which is typically 64 bytes for most + common i386/AMD64/ARM64 configurations. In some cases, it can be 128 bytes, but this + will still work. + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + + Returns: + The struct module alignment + """ + # FIXME: When dwarf2json/ISF supports type alignments. Read it directly from the type metadata + # Additionally, while 'context' and 'vmlinux_module_name' are currently unused, they will be + # essential for retrieving type metadata in the future. + return 64 + + @staticmethod + def _validate_alignment_patterns( + addresses: Iterable[int], + address_alignment: int, + ) -> bool: + """Check if the memory addresses meet our alignments patterns + + Args: + addresses: Iterable with the address values + address_alignment: Number of bytes for alignment validation + + Returns: + True if all the addresses meet the alignment + """ + return all(addr % address_alignment == 0 for addr in addresses) + + @classmethod + def get_hidden_modules( + cls, + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + known_module_addresses: Set[int], + modules_memory_boundaries: Tuple, + ) -> Iterable[interfaces.objects.ObjectInterface]: + """Enumerate hidden modules by taking advantage of memory address alignment patterns + + This technique is much faster and uses less memory than the traditional scan method + in Volatility2, but it doesn't work with older kernels. + + From kernels 4.2 struct module allocation are aligned to the L1 cache line size. + In i386/amd64/arm64 this is typically 64 bytes. However, this can be changed in + the Linux kernel configuration via CONFIG_X86_L1_CACHE_SHIFT. The alignment can + also be obtained from the DWARF info i.e. DW_AT_alignment<64>, but dwarf2json + doesn't support this feature yet. + In kernels < 4.2, alignment attributes are absent in the struct module, meaning + alignment cannot be guaranteed. Therefore, for older kernels, it's better to use + the traditional scan technique. + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + known_module_addresses: Set with known module addresses + modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. + Yields: + module objects + """ + vmlinux = context.modules[vmlinux_module_name] + vmlinux_layer = context.layers[vmlinux.layer_name] + + module_addr_min, module_addr_max = modules_memory_boundaries + module_address_alignment = cls._get_module_address_alignment( + context, vmlinux_module_name + ) + if not cls._validate_alignment_patterns( + known_module_addresses, module_address_alignment + ): + vollog.warning( + f"Module addresses aren't aligned to {module_address_alignment} bytes. " + "Switching to 1 byte aligment scan method." + ) + module_address_alignment = 1 + + mkobj_offset = vmlinux.get_type("module").relative_child_offset("mkobj") + mod_offset = vmlinux.get_type("module_kobject").relative_child_offset("mod") + offset_to_mkobj_mod = mkobj_offset + mod_offset + mod_member_template = vmlinux.get_type("module_kobject").child_template("mod") + mod_size = mod_member_template.size + mod_member_data_format = mod_member_template.data_format + + for module_addr in range( + module_addr_min, module_addr_max, module_address_alignment + ): + if module_addr in known_module_addresses: + continue + + try: + # This is just a pre-filter. Module readability and consistency are verified in module.is_valid() + self_referential_bytes = vmlinux_layer.read( + module_addr + offset_to_mkobj_mod, mod_size + ) + self_referential = objects.convert_data_to_value( + self_referential_bytes, int, mod_member_data_format + ) + if self_referential != module_addr: + continue + except ( + exceptions.PagedInvalidAddressException, + exceptions.InvalidAddressException, + ): + continue + + module = vmlinux.object("module", offset=module_addr, absolute=True) + if module and module.is_valid(): + yield module + + @classmethod + def get_lsmod_module_addresses( + cls, + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + ) -> Set[int]: + """Obtain a set the known module addresses from linux.lsmod plugin + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + + Returns: + A set containing known kernel module addresses + """ + vmlinux = context.modules[vmlinux_module_name] + vmlinux_layer = context.layers[vmlinux.layer_name] + + known_module_addresses = { + vmlinux_layer.canonicalize(module.vol.offset) + for module in lsmod.Lsmod.list_modules(context, vmlinux_module_name) + } + return known_module_addresses + + def _generator(self): + vmlinux_module_name = self.config["kernel"] + known_module_addresses = self.get_lsmod_module_addresses( + self.context, vmlinux_module_name + ) + modules_memory_boundaries = self.get_modules_memory_boundaries( + self.context, vmlinux_module_name + ) + for module in self.get_hidden_modules( + self.context, + vmlinux_module_name, + known_module_addresses, + modules_memory_boundaries, + ): + module_addr = module.vol.offset + module_name = module.get_name() or renderers.NotAvailableValue() + fields = (format_hints.Hex(module_addr), module_name) + yield (0, fields) + + def run(self): + if self.context.symbol_space.verify_table_versions( + "dwarf2json", lambda version, _: (not version) or version < (0, 8, 0) + ): + raise exceptions.SymbolSpaceError( + "Invalid symbol table, please ensure the ISF table produced by dwarf2json was created with version 0.8.0 or later" + ) + + headers = [ + ("Address", format_hints.Hex), + ("Name", str), + ] + return renderers.TreeGrid(headers, self._generator()) diff --git a/volatility3/framework/symbols/linux/extensions/__init__.py b/volatility3/framework/symbols/linux/extensions/__init__.py index 462edefdd6..339896e9de 100644 --- a/volatility3/framework/symbols/linux/extensions/__init__.py +++ b/volatility3/framework/symbols/linux/extensions/__init__.py @@ -13,12 +13,7 @@ from volatility3.framework import constants, exceptions, objects, interfaces, symbols from volatility3.framework.renderers import conversion -from volatility3.framework.constants.linux import SOCK_TYPES, SOCK_FAMILY -from volatility3.framework.constants.linux import IP_PROTOCOLS, IPV6_PROTOCOLS -from volatility3.framework.constants.linux import TCP_STATES, NETLINK_PROTOCOLS -from volatility3.framework.constants.linux import ETH_PROTOCOLS, BLUETOOTH_STATES -from volatility3.framework.constants.linux import BLUETOOTH_PROTOCOLS, SOCKET_STATES -from volatility3.framework.constants.linux import CAPABILITIES, PT_FLAGS +from volatility3.framework.constants import linux as linux_constants from volatility3.framework.layers import linear from volatility3.framework.objects import utility from volatility3.framework.symbols import generic, linux, intermed @@ -31,107 +26,141 @@ class module(generic.GenericIntelProcess): + def is_valid(self): + """Determine whether it is a valid module object by verifying the self-referential + in module_kobject. This also confirms that the module is actively allocated and + not a remnant of freed memory or a failed module load attempt by verifying the + module memory section sizes. + """ + layer = self._context.layers[self.vol.layer_name] + # Make sure the entire module content is readable + if not layer.is_valid(self.vol.offset, self.vol.size): + return False + + core_size = self.get_core_size() + core_text_size = self.get_core_text_size() + init_size = self.get_init_size() + if not ( + 0 < core_text_size <= linux_constants.MODULE_MAXIMUM_CORE_TEXT_SIZE + and 0 < core_size <= linux_constants.MODULE_MAXIMUM_CORE_SIZE + and core_size + init_size >= linux_constants.MODULE_MINIMUM_SIZE + ): + return False + + if not ( + self.mkobj + and self.mkobj.mod + and self.mkobj.mod.is_readable() + and self.mkobj.mod == self.vol.offset + ): + return False + + return True + @functools.cached_property def mod_mem_type(self) -> Dict: """Return the mod_mem_type enum choices if available or an empty dict if not""" # mod_mem_type and module_memory were added in kernel 6.4 which replaces # module_layout for storing the information around core_layout etc. # see commit ac3b43283923440900b4f36ca5f9f0b1ca43b70e for more information - symbol_table_name = self.get_symbol_table_name() - mod_mem_type_symname = symbol_table_name + constants.BANG + "mod_mem_type" - symbol_space = self._context.symbol_space - try: - mod_mem_type = symbol_space.get_enumeration(mod_mem_type_symname).choices - except exceptions.SymbolError: - mod_mem_type = {} - vollog.debug( - "Unable to find mod_mem_type enum. This message can be ignored for kernels < 6.4" + + if self._mod_mem_type is None: + try: + self._mod_mem_type = self._context.symbol_space.get_enumeration( + self.get_symbol_table_name() + constants.BANG + "mod_mem_type" + ).choices + except exceptions.SymbolError: + vollog.debug( + "Unable to find mod_mem_type enum. This message can be ignored for kernels < 6.4" + ) + # set to empty dict to show that the enum was not found, and so shouldn't be searched for again + self._mod_mem_type = {} + return self._mod_mem_type + + def _get_mem_type(self, mod_mem_type_name): + module_mem_index = self.mod_mem_type.get(mod_mem_type_name) + if module_mem_index is None: + raise AttributeError(f"Unknown module memory type '{mod_mem_type_name}'") + + if not (0 <= module_mem_index < self.mem.count): + raise AttributeError( + f"Invalid module memory type index '{module_mem_index}'" ) - return mod_mem_type + return self.mem[module_mem_index] + + def _get_mem_size(self, mod_mem_type_name): + return self._get_mem_type(mod_mem_type_name).size + + def _get_mem_base(self, mod_mem_type_name): + return self._get_mem_type(mod_mem_type_name).base def get_module_base(self): if self.has_member("mem"): # kernels 6.4+ - try: - return self.mem[self.mod_mem_type["MOD_TEXT"]].base - except KeyError: - raise AttributeError( - "module -> get_module_base: Unable to get module base. Cannot read base from MOD_TEXT." - ) + return self._get_mem_base("MOD_TEXT") elif self.has_member("core_layout"): return self.core_layout.base elif self.has_member("module_core"): return self.module_core - raise AttributeError("module -> get_module_base: Unable to get module base") + + raise AttributeError("Unable to get module base") def get_init_size(self): if self.has_member("mem"): # kernels 6.4+ - try: - return ( - self.mem[self.mod_mem_type["MOD_INIT_TEXT"]].size - + self.mem[self.mod_mem_type["MOD_INIT_DATA"]].size - + self.mem[self.mod_mem_type["MOD_INIT_RODATA"]].size - ) - except KeyError: - raise AttributeError( - "module -> get_init_size: Unable to determine .init section size of module. Cannot read size of MOD_INIT_TEXT, MOD_INIT_DATA, and MOD_INIT_RODATA" - ) + return ( + self._get_mem_size("MOD_INIT_TEXT") + + self._get_mem_size("MOD_INIT_DATA") + + self._get_mem_size("MOD_INIT_RODATA") + ) elif self.has_member("init_layout"): return self.init_layout.size elif self.has_member("init_size"): return self.init_size - raise AttributeError( - "module -> get_init_size: Unable to determine .init section size of module" - ) + + raise AttributeError("Unable to determine .init section size of module") def get_core_size(self): if self.has_member("mem"): # kernels 6.4+ - try: - return ( - self.mem[self.mod_mem_type["MOD_TEXT"]].size - + self.mem[self.mod_mem_type["MOD_DATA"]].size - + self.mem[self.mod_mem_type["MOD_RODATA"]].size - + self.mem[self.mod_mem_type["MOD_RO_AFTER_INIT"]].size - ) - except KeyError: - raise AttributeError( - "module -> get_core_size: Unable to determine core size of module. Cannot read size of MOD_TEXT, MOD_DATA, MOD_RODATA, and MOD_RO_AFTER_INIT." - ) + return ( + self._get_mem_size("MOD_TEXT") + + self._get_mem_size("MOD_DATA") + + self._get_mem_size("MOD_RODATA") + + self._get_mem_size("MOD_RO_AFTER_INIT") + ) elif self.has_member("core_layout"): return self.core_layout.size elif self.has_member("core_size"): return self.core_size - raise AttributeError( - "module -> get_core_size: Unable to determine core size of module" - ) + + raise AttributeError("Unable to determine core size of module") + + def get_core_text_size(self): + if self.has_member("mem"): # kernels 6.4+ + return self._get_mem_size("MOD_TEXT") + elif self.has_member("core_layout"): + return self.core_layout.text_size + elif self.has_member("core_text_size"): + return self.core_text_size + + raise AttributeError("Unable to determine core text size of module") def get_module_core(self): if self.has_member("mem"): # kernels 6.4+ - try: - return self.mem[self.mod_mem_type["MOD_TEXT"]].base - except KeyError: - raise AttributeError( - "module -> get_module_core: Unable to get module core. Cannot read base from MOD_TEXT." - ) + return self._get_mem_base("MOD_TEXT") elif self.has_member("core_layout"): return self.core_layout.base elif self.has_member("module_core"): return self.module_core - raise AttributeError("module -> get_module_core: Unable to get module core") + raise AttributeError("Unable to get module core") def get_module_init(self): if self.has_member("mem"): # kernels 6.4+ - try: - return self.mem[self.mod_mem_type["MOD_INIT_TEXT"]].base - except KeyError: - raise AttributeError( - "module -> get_module_core: Unable to get module init. Cannot read base from MOD_INIT_TEXT." - ) + return self._get_mem_base("MOD_INIT_TEXT") elif self.has_member("init_layout"): return self.init_layout.base elif self.has_member("module_init"): return self.module_init - raise AttributeError("module -> get_module_init: Unable to get module init") + raise AttributeError("Unable to get module init") def get_name(self): """Get the name of the module as a string""" @@ -333,7 +362,7 @@ def is_kernel_thread(self) -> bool: Returns: bool: True, if this task is a kernel thread. Otherwise, False. """ - return (self.flags & constants.linux.PF_KTHREAD) != 0 + return (self.flags & linux_constants.PF_KTHREAD) != 0 @property def is_thread_group_leader(self) -> bool: @@ -410,7 +439,11 @@ def get_ptrace_tracee_tids(self) -> List[int]: def get_ptrace_tracee_flags(self) -> Optional[str]: """Returns a string with the ptrace flags""" - return PT_FLAGS(self.ptrace).flags if self.is_being_ptraced else None + return ( + linux_constants.PT_FLAGS(self.ptrace).flags + if self.is_being_ptraced + else None + ) class fs_struct(objects.StructType): @@ -1517,18 +1550,18 @@ def get_inode(self): def get_state(self): socket_state_idx = self.state - if 0 <= socket_state_idx < len(SOCKET_STATES): - return SOCKET_STATES[socket_state_idx] + if 0 <= socket_state_idx < len(linux_constants.SOCKET_STATES): + return linux_constants.SOCKET_STATES[socket_state_idx] class sock(objects.StructType): def get_family(self): family_idx = self.__sk_common.skc_family - if 0 <= family_idx < len(SOCK_FAMILY): - return SOCK_FAMILY[family_idx] + if 0 <= family_idx < len(linux_constants.SOCK_FAMILY): + return linux_constants.SOCK_FAMILY[family_idx] def get_type(self): - return SOCK_TYPES.get(self.sk_type, "") + return linux_constants.SOCK_TYPES.get(self.sk_type, "") def get_inode(self): if not self.sk_socket: @@ -1562,8 +1595,8 @@ def get_state(self): # Unix socket states reuse (a subset) of the inet_sock states contants if self.sk.get_type() == "STREAM": state_idx = self.sk.__sk_common.skc_state - if 0 <= state_idx < len(TCP_STATES): - return TCP_STATES[state_idx] + if 0 <= state_idx < len(linux_constants.TCP_STATES): + return linux_constants.TCP_STATES[state_idx] else: # Return the generic socket state return self.sk.sk_socket.get_state() @@ -1575,15 +1608,15 @@ def get_inode(self): class inet_sock(objects.StructType): def get_family(self): family_idx = self.sk.__sk_common.skc_family - if 0 <= family_idx < len(SOCK_FAMILY): - return SOCK_FAMILY[family_idx] + if 0 <= family_idx < len(linux_constants.SOCK_FAMILY): + return linux_constants.SOCK_FAMILY[family_idx] def get_protocol(self): # If INET6 family and a proto is defined, we use that specific IPv6 protocol. # Otherwise, we use the standard IP protocol. - protocol = IP_PROTOCOLS.get(self.sk.sk_protocol) + protocol = linux_constants.IP_PROTOCOLS.get(self.sk.sk_protocol) if self.get_family() == "AF_INET6": - protocol = IPV6_PROTOCOLS.get(self.sk.sk_protocol, protocol) + protocol = linux_constants.IPV6_PROTOCOLS.get(self.sk.sk_protocol, protocol) return protocol def get_state(self): @@ -1591,8 +1624,8 @@ def get_state(self): if self.sk.get_type() == "STREAM": state_idx = self.sk.__sk_common.skc_state - if 0 <= state_idx < len(TCP_STATES): - return TCP_STATES[state_idx] + if 0 <= state_idx < len(linux_constants.TCP_STATES): + return linux_constants.TCP_STATES[state_idx] else: # Return the generic socket state return self.sk.sk_socket.get_state() @@ -1675,8 +1708,8 @@ def get_dst_addr(self): class netlink_sock(objects.StructType): def get_protocol(self): protocol_idx = self.sk.sk_protocol - if 0 <= protocol_idx < len(NETLINK_PROTOCOLS): - return NETLINK_PROTOCOLS[protocol_idx] + if 0 <= protocol_idx < len(linux_constants.NETLINK_PROTOCOLS): + return linux_constants.NETLINK_PROTOCOLS[protocol_idx] def get_state(self): # Return the generic socket state @@ -1718,8 +1751,8 @@ def get_protocol(self): eth_proto = socket_module.htons(self.num) if eth_proto == 0: return None - elif eth_proto in ETH_PROTOCOLS: - return ETH_PROTOCOLS[eth_proto] + elif eth_proto in linux_constants.ETH_PROTOCOLS: + return linux_constants.ETH_PROTOCOLS[eth_proto] else: return f"0x{eth_proto:x}" @@ -1731,13 +1764,13 @@ def get_state(self): class bt_sock(objects.StructType): def get_protocol(self): type_idx = self.sk.sk_protocol - if 0 <= type_idx < len(BLUETOOTH_PROTOCOLS): - return BLUETOOTH_PROTOCOLS[type_idx] + if 0 <= type_idx < len(linux_constants.BLUETOOTH_PROTOCOLS): + return linux_constants.BLUETOOTH_PROTOCOLS[type_idx] def get_state(self): state_idx = self.sk.__sk_common.skc_state - if 0 <= state_idx < len(BLUETOOTH_STATES): - return BLUETOOTH_STATES[state_idx] + if 0 <= state_idx < len(linux_constants.BLUETOOTH_STATES): + return linux_constants.BLUETOOTH_STATES[state_idx] class xdp_sock(objects.StructType): @@ -1855,7 +1888,7 @@ def get_last_cap_value(cls) -> int: Returns: int: The latest capability ID supported by the framework. """ - return len(CAPABILITIES) - 1 + return len(linux_constants.CAPABILITIES) - 1 def get_kernel_cap_full(self) -> int: """Return the maximum value allowed for this kernel for a capability @@ -1884,7 +1917,7 @@ def capabilities_to_string(cls, capabilities_bitfield: int) -> List[str]: """ capabilities = [] - for bit, name in enumerate(CAPABILITIES): + for bit, name in enumerate(linux_constants.CAPABILITIES): if capabilities_bitfield & (1 << bit) != 0: capabilities.append(name) @@ -1945,10 +1978,10 @@ def has_capability(self, capability: str) -> bool: Returns: bool: "True" if the given capability is enabled. """ - if capability not in CAPABILITIES: + if capability not in linux_constants.CAPABILITIES: raise AttributeError(f"Unknown capability with name '{capability}'") - cap_value = 1 << CAPABILITIES.index(capability) + cap_value = 1 << linux_constants.CAPABILITIES.index(capability) return cap_value & self.get_capabilities() != 0