From 72c490a7d9dc2d370ca8deac430c66d054c579b3 Mon Sep 17 00:00:00 2001 From: Fish Date: Tue, 14 Jan 2025 00:51:55 -0700 Subject: [PATCH] Add ClemoryReadOnlyView. (#545) * Add ClemoryReadOnlyView. * Add comments to address @rhelmot's concerns. --- cle/__init__.py | 3 +- cle/loader.py | 29 +++++++++- cle/memory.py | 144 +++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 170 insertions(+), 6 deletions(-) diff --git a/cle/__init__.py b/cle/__init__.py index bcfaad49..2e6d701d 100644 --- a/cle/__init__.py +++ b/cle/__init__.py @@ -74,7 +74,7 @@ ) from .gdb import GDB_SEARCH_PATH, convert_info_proc_maps, convert_info_sharedlibrary from .loader import Loader -from .memory import Clemory, ClemoryBase, ClemoryTranslator, ClemoryView +from .memory import Clemory, ClemoryBase, ClemoryReadOnlyView, ClemoryTranslator, ClemoryView from .patched_stream import PatchedStream __all__ = [ @@ -133,6 +133,7 @@ "Loader", "Clemory", "ClemoryBase", + "ClemoryReadOnlyView", "ClemoryView", "ClemoryTranslator", "PatchedStream", diff --git a/cle/loader.py b/cle/loader.py index 96888057..599562fe 100644 --- a/cle/loader.py +++ b/cle/loader.py @@ -22,7 +22,7 @@ from cle import Symbol from cle.address_translator import AT from cle.errors import CLECompatibilityError, CLEError, CLEFileNotFoundError, CLEOperationError -from cle.memory import Clemory +from cle.memory import Clemory, ClemoryReadOnlyView from cle.utils import ALIGN_UP, key_bisect_floor_key, key_bisect_insort_right, stream_or_path from .backends import ALL_BACKENDS, ELF, PE, Backend, Blob, ELFCore, MetaELF, Minidump @@ -188,6 +188,7 @@ def __init__( # cache self._last_object = None + self._memory_ro_view = None if self._extern_object and self._extern_object._warned_data_import: log.warning( @@ -218,6 +219,14 @@ def memory(self) -> Clemory: raise ValueError("Cannot access memory before loading is complete") return result + @property + def memory_ro_view(self) -> ClemoryReadOnlyView | None: + if self._memory is None: + # it is intentional to check if self._memory is configured when memory_ro_view is accessed. + # memory_ro_view is only set up after gen_ro_memview() is called. + raise ValueError("Cannot access memory_ro_view before loading is complete") + return self._memory_ro_view + @property def tls(self) -> ThreadManager: result = self._tls @@ -1310,6 +1319,21 @@ def _backend_resolver(backend: str | type[Backend], default: T | None = None) -> # Memory data loading methods # + def gen_ro_memview(self) -> None: + """ + Generate a read-only view of the memory, and update self._memory_ro_view for faster data loading. Please call + this method again for updating the read-only view, or discard_ro_memview() to discard any previously generated + read-only views. + """ + if self.memory is not None: + self._memory_ro_view = ClemoryReadOnlyView(self.memory._arch, self.memory) + + def discard_ro_memview(self) -> None: + """ + Discard any previously generated read-only views of the memory. + """ + self._memory_ro_view = None + def fast_memory_load_pointer(self, addr: int, size: int | None = None) -> int | None: """ Perform a fast memory loading of a pointer. @@ -1320,6 +1344,7 @@ def fast_memory_load_pointer(self, addr: int, size: int | None = None) -> int | """ try: - return self.memory.unpack_word(addr, size=size) + mem = self.memory_ro_view if self.memory_ro_view is not None else self.memory + return mem.unpack_word(addr, size=size) except KeyError: return None diff --git a/cle/memory.py b/cle/memory.py index 6bd7bd2d..5407268f 100644 --- a/cle/memory.py +++ b/cle/memory.py @@ -10,6 +10,10 @@ class ClemoryBase: + """ + The base class of all Clemory classes. + """ + __slots__ = ("_arch", "_pointer") def __init__(self, arch): @@ -232,7 +236,7 @@ def add_backer(self, start, data, overwrite=False): raise ValueError("Cannot add a root clemory as a backer!") if isinstance(data, bytes): data = bytearray(data) - bisect.insort(self._backers, (start, data)) + bisect.insort(self._backers, (start, data), key=lambda x: x[0]) self._update_min_max() def split_backer(self, addr): @@ -510,10 +514,12 @@ def _update_min_max(self): class ClemoryView(ClemoryBase): + """ + A Clemory which presents a subset of another Clemory as an address space. + """ + def __init__(self, backer, start, end, offset=0): """ - A Clemory which presents a subset of another Clemory as an address space - :param backer: The parent clemory to use :param start: The address in the parent to start at :param end: The address in the parent to end at (exclusive) @@ -674,3 +680,135 @@ def find(self, data, search_min=None, search_max=None): :return: """ return iter(()) + + +class ClemoryReadOnlyView(ClemoryBase): + """ + Represents an outermost read-only view of a Clemory object that does not allow updates. This class offers quick + accesses to memory reads. + """ + + def __init__(self, arch, clemory: Clemory): + super().__init__(arch) + self._clemory = clemory + self._flattened_backers: list[tuple[int, bytearray]] = [] + + # cache + self._last_backer_pos: int | None = None + + self._flatten_backers() + + def __getitem__(self, k) -> int: + # check cache first + if self._last_backer_pos is not None: + start, data = self._flattened_backers[self._last_backer_pos] + if 0 <= k - start < len(data): + return data[k - start] + + idx = bisect.bisect_right(self._flattened_backers, k, key=lambda x: x[0]) + if idx > 0: + idx -= 1 + if idx >= len(self._flattened_backers): + raise KeyError(k) + start, data = self._flattened_backers[idx] + if 0 <= k - start < len(data): + self._last_backer_pos = idx + return data[k - start] + raise KeyError(k) + + def __setitem__(self, k, v): + raise NotImplementedError("ClemoryReadOnlyView does not support item assignment") + + def load(self, addr: int, n: int) -> bytes: + """ + Read up to `n` bytes at address `addr` in memory and return a bytes object. + + Reading will stop at the beginning of the first unallocated region found, or when + `n` bytes have been read. + """ + # check cache first + if self._last_backer_pos is not None: + start, data = self._flattened_backers[self._last_backer_pos] + if 0 <= addr - start < len(data): + offset = addr - start + if offset + n < len(data): + return bytes(memoryview(data)[offset : offset + n]) + + start_pos = bisect.bisect_right(self._flattened_backers, addr, key=lambda x: x[0]) + if start_pos > 0: + start_pos -= 1 + views = [] + for i in range(start_pos, len(self._flattened_backers)): + start, data = self._flattened_backers[i] + if start > addr: + break + offset = addr - start + if not views and offset + n < len(data): + # only cache if we do not need to read across backers + self._last_backer_pos = i + return bytes(memoryview(data)[offset : offset + n]) + size = len(data) - offset + views.append(memoryview(data)[offset : offset + n]) + + addr += size + n -= size + + if n <= 0: + break + + if not views: + raise KeyError(addr) + return b"".join(views) + + def store(self, addr, data): + raise NotImplementedError("ClemoryReadOnlyView does not support storing") + + def backers(self, addr: int = 0): + start_pos = bisect.bisect_right(self._flattened_backers, addr, key=lambda x: x[0]) + if start_pos > 0: + start_pos -= 1 + for idx in range(start_pos, len(self._flattened_backers)): + start, data = self._flattened_backers[idx] + if start > addr: + break + if 0 <= addr - start < len(data): + yield start, data + + def unpack(self, addr, fmt): + if self._last_backer_pos is not None: + start, data = self._flattened_backers[self._last_backer_pos] + if 0 <= addr - start < len(data): + try: + return struct.unpack_from(fmt, data, addr - start) + except struct.error as ex: + if len(data) - (addr - start) >= struct.calcsize(fmt): + raise ex + raise KeyError(addr) from ex + + idx = bisect.bisect_right(self._flattened_backers, addr, key=lambda x: x[0]) + if idx > 0: + idx -= 1 + if idx >= len(self._flattened_backers): + raise KeyError(addr) + start, data = self._flattened_backers[idx] + if start > addr: + raise KeyError(addr) + try: + v = struct.unpack_from(fmt, data, addr - start) + self._last_backer_pos = idx + return v + except struct.error as ex: + if len(data) - (addr - start) >= struct.calcsize(fmt): + raise ex + raise KeyError(addr) from ex + + def _flatten_backers(self): + for start, backer in self._clemory.backers(): + if isinstance(backer, bytearray): + self._flattened_backers.append((start, backer)) + elif isinstance(backer, list): + raise TypeError("ClemoryReadOnlyView does not support list-backed clemories") + elif isinstance(backer, Clemory): + pass + else: + raise TypeError(f"Unsupported backer type {type(backer)}.")