Skip to content

Commit

Permalink
Add ClemoryReadOnlyView. (#545)
Browse files Browse the repository at this point in the history
* Add ClemoryReadOnlyView.

* Add comments to address @rhelmot's concerns.
  • Loading branch information
ltfish authored Jan 14, 2025
1 parent 8650e8b commit 72c490a
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 6 deletions.
3 changes: 2 additions & 1 deletion cle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
)
from .gdb import GDB_SEARCH_PATH, convert_info_proc_maps, convert_info_sharedlibrary
from .loader import Loader
from .memory import Clemory, ClemoryBase, ClemoryTranslator, ClemoryView
from .memory import Clemory, ClemoryBase, ClemoryReadOnlyView, ClemoryTranslator, ClemoryView
from .patched_stream import PatchedStream

__all__ = [
Expand Down Expand Up @@ -133,6 +133,7 @@
"Loader",
"Clemory",
"ClemoryBase",
"ClemoryReadOnlyView",
"ClemoryView",
"ClemoryTranslator",
"PatchedStream",
Expand Down
29 changes: 27 additions & 2 deletions cle/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from cle import Symbol
from cle.address_translator import AT
from cle.errors import CLECompatibilityError, CLEError, CLEFileNotFoundError, CLEOperationError
from cle.memory import Clemory
from cle.memory import Clemory, ClemoryReadOnlyView
from cle.utils import ALIGN_UP, key_bisect_floor_key, key_bisect_insort_right, stream_or_path

from .backends import ALL_BACKENDS, ELF, PE, Backend, Blob, ELFCore, MetaELF, Minidump
Expand Down Expand Up @@ -188,6 +188,7 @@ def __init__(

# cache
self._last_object = None
self._memory_ro_view = None

if self._extern_object and self._extern_object._warned_data_import:
log.warning(
Expand Down Expand Up @@ -218,6 +219,14 @@ def memory(self) -> Clemory:
raise ValueError("Cannot access memory before loading is complete")
return result

@property
def memory_ro_view(self) -> ClemoryReadOnlyView | None:
if self._memory is None:
# it is intentional to check if self._memory is configured when memory_ro_view is accessed.
# memory_ro_view is only set up after gen_ro_memview() is called.
raise ValueError("Cannot access memory_ro_view before loading is complete")
return self._memory_ro_view

@property
def tls(self) -> ThreadManager:
result = self._tls
Expand Down Expand Up @@ -1310,6 +1319,21 @@ def _backend_resolver(backend: str | type[Backend], default: T | None = None) ->
# Memory data loading methods
#

def gen_ro_memview(self) -> None:
"""
Generate a read-only view of the memory, and update self._memory_ro_view for faster data loading. Please call
this method again for updating the read-only view, or discard_ro_memview() to discard any previously generated
read-only views.
"""
if self.memory is not None:
self._memory_ro_view = ClemoryReadOnlyView(self.memory._arch, self.memory)

def discard_ro_memview(self) -> None:
"""
Discard any previously generated read-only views of the memory.
"""
self._memory_ro_view = None

def fast_memory_load_pointer(self, addr: int, size: int | None = None) -> int | None:
"""
Perform a fast memory loading of a pointer.
Expand All @@ -1320,6 +1344,7 @@ def fast_memory_load_pointer(self, addr: int, size: int | None = None) -> int |
"""

try:
return self.memory.unpack_word(addr, size=size)
mem = self.memory_ro_view if self.memory_ro_view is not None else self.memory
return mem.unpack_word(addr, size=size)
except KeyError:
return None
144 changes: 141 additions & 3 deletions cle/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@


class ClemoryBase:
"""
The base class of all Clemory classes.
"""

__slots__ = ("_arch", "_pointer")

def __init__(self, arch):
Expand Down Expand Up @@ -232,7 +236,7 @@ def add_backer(self, start, data, overwrite=False):
raise ValueError("Cannot add a root clemory as a backer!")
if isinstance(data, bytes):
data = bytearray(data)
bisect.insort(self._backers, (start, data))
bisect.insort(self._backers, (start, data), key=lambda x: x[0])
self._update_min_max()

def split_backer(self, addr):
Expand Down Expand Up @@ -510,10 +514,12 @@ def _update_min_max(self):


class ClemoryView(ClemoryBase):
"""
A Clemory which presents a subset of another Clemory as an address space.
"""

def __init__(self, backer, start, end, offset=0):
"""
A Clemory which presents a subset of another Clemory as an address space
:param backer: The parent clemory to use
:param start: The address in the parent to start at
:param end: The address in the parent to end at (exclusive)
Expand Down Expand Up @@ -674,3 +680,135 @@ def find(self, data, search_min=None, search_max=None):
:return:
"""
return iter(())


class ClemoryReadOnlyView(ClemoryBase):
"""
Represents an outermost read-only view of a Clemory object that does not allow updates. This class offers quick
accesses to memory reads.
"""

def __init__(self, arch, clemory: Clemory):
super().__init__(arch)
self._clemory = clemory
self._flattened_backers: list[tuple[int, bytearray]] = []

# cache
self._last_backer_pos: int | None = None

self._flatten_backers()

def __getitem__(self, k) -> int:
# check cache first
if self._last_backer_pos is not None:
start, data = self._flattened_backers[self._last_backer_pos]
if 0 <= k - start < len(data):
return data[k - start]

idx = bisect.bisect_right(self._flattened_backers, k, key=lambda x: x[0])
if idx > 0:
idx -= 1
if idx >= len(self._flattened_backers):
raise KeyError(k)
start, data = self._flattened_backers[idx]
if 0 <= k - start < len(data):
self._last_backer_pos = idx
return data[k - start]
raise KeyError(k)

def __setitem__(self, k, v):
raise NotImplementedError("ClemoryReadOnlyView does not support item assignment")

def load(self, addr: int, n: int) -> bytes:
"""
Read up to `n` bytes at address `addr` in memory and return a bytes object.
Reading will stop at the beginning of the first unallocated region found, or when
`n` bytes have been read.
"""
# check cache first
if self._last_backer_pos is not None:
start, data = self._flattened_backers[self._last_backer_pos]
if 0 <= addr - start < len(data):
offset = addr - start
if offset + n < len(data):
return bytes(memoryview(data)[offset : offset + n])

start_pos = bisect.bisect_right(self._flattened_backers, addr, key=lambda x: x[0])
if start_pos > 0:
start_pos -= 1
views = []
for i in range(start_pos, len(self._flattened_backers)):
start, data = self._flattened_backers[i]
if start > addr:
break
offset = addr - start
if not views and offset + n < len(data):
# only cache if we do not need to read across backers
self._last_backer_pos = i
return bytes(memoryview(data)[offset : offset + n])
size = len(data) - offset
views.append(memoryview(data)[offset : offset + n])

addr += size
n -= size

if n <= 0:
break

if not views:
raise KeyError(addr)
return b"".join(views)

def store(self, addr, data):
raise NotImplementedError("ClemoryReadOnlyView does not support storing")

def backers(self, addr: int = 0):
start_pos = bisect.bisect_right(self._flattened_backers, addr, key=lambda x: x[0])
if start_pos > 0:
start_pos -= 1
for idx in range(start_pos, len(self._flattened_backers)):
start, data = self._flattened_backers[idx]
if start > addr:
break
if 0 <= addr - start < len(data):
yield start, data

def unpack(self, addr, fmt):
if self._last_backer_pos is not None:
start, data = self._flattened_backers[self._last_backer_pos]
if 0 <= addr - start < len(data):
try:
return struct.unpack_from(fmt, data, addr - start)
except struct.error as ex:
if len(data) - (addr - start) >= struct.calcsize(fmt):
raise ex
raise KeyError(addr) from ex

idx = bisect.bisect_right(self._flattened_backers, addr, key=lambda x: x[0])
if idx > 0:
idx -= 1
if idx >= len(self._flattened_backers):
raise KeyError(addr)
start, data = self._flattened_backers[idx]
if start > addr:
raise KeyError(addr)
try:
v = struct.unpack_from(fmt, data, addr - start)
self._last_backer_pos = idx
return v
except struct.error as ex:
if len(data) - (addr - start) >= struct.calcsize(fmt):
raise ex
raise KeyError(addr) from ex

def _flatten_backers(self):
for start, backer in self._clemory.backers():
if isinstance(backer, bytearray):
self._flattened_backers.append((start, backer))
elif isinstance(backer, list):
raise TypeError("ClemoryReadOnlyView does not support list-backed clemories")
elif isinstance(backer, Clemory):
pass
else:
raise TypeError(f"Unsupported backer type {type(backer)}.")

0 comments on commit 72c490a

Please sign in to comment.