Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClemoryReadOnlyView. #545

Merged
merged 6 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
)
from .gdb import GDB_SEARCH_PATH, convert_info_proc_maps, convert_info_sharedlibrary
from .loader import Loader
from .memory import Clemory, ClemoryBase, ClemoryTranslator, ClemoryView
from .memory import Clemory, ClemoryBase, ClemoryReadOnlyView, ClemoryTranslator, ClemoryView
from .patched_stream import PatchedStream

__all__ = [
Expand Down Expand Up @@ -133,6 +133,7 @@
"Loader",
"Clemory",
"ClemoryBase",
"ClemoryReadOnlyView",
"ClemoryView",
"ClemoryTranslator",
"PatchedStream",
Expand Down
27 changes: 25 additions & 2 deletions cle/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from cle import Symbol
from cle.address_translator import AT
from cle.errors import CLECompatibilityError, CLEError, CLEFileNotFoundError, CLEOperationError
from cle.memory import Clemory
from cle.memory import Clemory, ClemoryReadOnlyView
from cle.utils import ALIGN_UP, key_bisect_floor_key, key_bisect_insort_right, stream_or_path

from .backends import ALL_BACKENDS, ELF, PE, Backend, Blob, ELFCore, MetaELF, Minidump
Expand Down Expand Up @@ -188,6 +188,7 @@ def __init__(

# cache
self._last_object = None
self._memory_ro_view = None

if self._extern_object and self._extern_object._warned_data_import:
log.warning(
Expand Down Expand Up @@ -218,6 +219,12 @@ def memory(self) -> Clemory:
raise ValueError("Cannot access memory before loading is complete")
return result

@property
def memory_ro_view(self) -> ClemoryReadOnlyView | None:
if self._memory is None:
raise ValueError("Cannot access memory_ro_view before loading is complete")
return self._memory_ro_view
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is gonna be a type error - should be if self._memory_ro_view is not None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s actually intended - I’ll add a comment to explain.


@property
def tls(self) -> ThreadManager:
result = self._tls
Expand Down Expand Up @@ -1310,6 +1317,21 @@ def _backend_resolver(backend: str | type[Backend], default: T | None = None) ->
# Memory data loading methods
#

def gen_ro_memview(self) -> None:
"""
Generate a read-only view of the memory, and update self._memory_ro_view for faster data loading. Please call
this method again for updating the read-only view, or discard_ro_memview() to discard any previously generated
read-only views.
"""
if self.memory is not None:
self._memory_ro_view = ClemoryReadOnlyView(self.memory._arch, self.memory)

def discard_ro_memview(self) -> None:
"""
Discard any previously generated read-only views of the memory.
"""
self._memory_ro_view = None

def fast_memory_load_pointer(self, addr: int, size: int | None = None) -> int | None:
"""
Perform a fast memory loading of a pointer.
Expand All @@ -1320,6 +1342,7 @@ def fast_memory_load_pointer(self, addr: int, size: int | None = None) -> int |
"""

try:
return self.memory.unpack_word(addr, size=size)
mem = self.memory_ro_view if self.memory_ro_view is not None else self.memory
return mem.unpack_word(addr, size=size)
except KeyError:
return None
130 changes: 130 additions & 0 deletions cle/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,133 @@ def find(self, data, search_min=None, search_max=None):
:return:
"""
return iter(())


class ClemoryReadOnlyView(ClemoryBase):
"""
Represents an outermost read-only view of a Clemory object that does not allow updates. This class offers quick
accesses to memory reads.
"""

def __init__(self, arch, clemory: Clemory):
super().__init__(arch)
self._clemory = clemory
self._flattened_backers: list[tuple[int, bytearray | list]] = []

# cache
self._last_backer_pos: int | None = None

self._flatten_backers()

def __getitem__(self, k):
# check cache first
if self._last_backer_pos is not None:
start, data = self._flattened_backers[self._last_backer_pos]
if 0 <= k - start < len(data):
return data[k - start]

idx = bisect.bisect_right(self._flattened_backers, k, key=lambda x: x[0])
if idx > 0:
idx -= 1
if idx >= len(self._flattened_backers):
raise KeyError(k)
start, data = self._flattened_backers[idx]
if 0 <= k - start < len(data):
self._last_backer_pos = idx
return data[k - start]
raise KeyError(k)

def __setitem__(self, k, v):
raise NotImplementedError("ClemoryReadOnlyView does not support item assignment")

def load(self, addr: int, n: int):
"""
Read up to `n` bytes at address `addr` in memory and return a bytes object.

Reading will stop at the beginning of the first unallocated region found, or when
`n` bytes have been read.
"""
# check cache first
if self._last_backer_pos is not None:
start, data = self._flattened_backers[self._last_backer_pos]
if 0 <= addr - start < len(data):
offset = addr - start
if offset + n < len(data):
return bytes(memoryview(data)[offset : offset + n])

start_pos = bisect.bisect_right(self._flattened_backers, addr, key=lambda x: x[0])
if start_pos > 0:
start_pos -= 1
views = []
for i in range(start_pos, len(self._flattened_backers)):
start, data = self._flattened_backers[i]
if start > addr:
break
offset = addr - start
if not views and offset + n < len(data):
# only cache if we do not need to read across backers
self._last_backer_pos = i
return bytes(memoryview(data)[offset : offset + n])
size = len(data) - offset
views.append(memoryview(data)[offset : offset + n])

addr += size
n -= size

if n <= 0:
break

if not views:
raise KeyError(addr)
return b"".join(views)

def store(self, addr, data):
raise NotImplementedError("ClemoryReadOnlyView does not support storing")

def backers(self, addr=0):
start_pos = bisect.bisect_right(self._flattened_backers, addr, key=lambda x: x[0])
if start_pos > 0:
start_pos -= 1
for idx in range(start_pos, len(self._flattened_backers)):
start, data = self._flattened_backers[idx]
if start > addr:
break
if 0 <= addr - start < len(data):
yield start, data

def unpack(self, addr, fmt):
if self._last_backer_pos is not None:
start, data = self._flattened_backers[self._last_backer_pos]
if 0 <= addr - start < len(data):
try:
return struct.unpack_from(fmt, data, addr - start)
except struct.error as ex:
if len(data) - (addr - start) >= struct.calcsize(fmt):
raise ex
raise KeyError(addr)

idx = bisect.bisect_right(self._flattened_backers, addr, key=lambda x: x[0])
if idx > 0:
idx -= 1
if idx >= len(self._flattened_backers):
raise KeyError(addr)
start, data = self._flattened_backers[idx]
if start > addr:
raise KeyError(addr)
try:
v = struct.unpack_from(fmt, data, addr - start)
self._last_backer_pos = idx
return v
except struct.error as ex:
if len(data) - (addr - start) >= struct.calcsize(fmt):
raise ex
raise KeyError(addr)

def _flatten_backers(self):
for start, backer in self._clemory.backers():
if isinstance(backer, (bytearray, list)):
self._flattened_backers.append((start, backer))
elif isinstance(backer, Clemory):
pass
else:
raise TypeError(f"Unsupported backer type {type(backer)}.")
Loading