import numpy as np
import pickle
import inspect
from enum import Enum
from ..normalization import normalize_cfg_data, Architectures
from .cfg import CFG, CFGBasicBlock, CFGFunction, EdgeType
from ..utils import get_smallest_np_dtype, scatter_nd_numpy, hash_obj, eq_obj, get_module, AtomicTokenDict
from ..utils.type_utils import *
if IN_PYTHON_TYPING_VERSION:
BlockInfoBitDtype = np.int8 # Int type being used to store block info flags
BlockInfoBitMaskFunctionType = Callable[[CFGBasicBlock], Literal[0, 1, True, False]] # Dtype of functions that are used to compute bit flags
[docs]
class MemCFG:
"""A CFG that is more memory/speed efficient.
Keeps only the bare minimum information needed from a CFG. Stores edge connections in a CSR-like format.
Parameters
----------
cfg: `CFG`
a CFG object. Can be a normalized or un-normalized. If un-normalized, then it will be normalized using the
`normalizer` parameter.
normalizer: `Optional[Union[str, Normalizer]]`
the normalizer to use to normalize the incoming CFG (or None if it is already normalized). If the incoming CFG
object has already been normalized, and `normalizer` is not None, then this will attempt to normalize the CFG
again with this normalizer
keep_memory_addresses: `bool`
if True, then memory addresses will also be kept. Otherwise they will be removed to save space
inplace: `bool`
if True and cfg needs to be normalized, it will be normalized inplace
using_tokens: `Union[Dict[str, int], AtomicTokenDict]`
if not None, then a dictionary mapping token strings to integer values. Any tokens in cfg but not in using_tokens
will be added. Can also be an AtomicTokenDict for atomic updates to tokens
force_renormalize: `bool`
by default, this method will only normalize cfg's whose .normalizer != to the passed normalizer. However if
`force_renormalize=True`, then all cfg's will be renormalized even if they have been previously normalized with
the same normalizer.
"""
normalizer: 'NormalizerType'
"""The normalizer used to normalize input before converting to ``MemCFG``
Can be shared with a ``MemCFGDataset`` object if this ``MemCFG`` is a part of one
"""
tokens: 'dict[str, int]'
"""Dictionary mapping token strings to integer values used in this ``MemCFG``
Can be shared with a ``MemCFGDataset`` object if this ``MemCFG`` is a part of one.
Can also be an AtomicTokenDict object for atomic token updates
"""
function_name_to_idx: 'dict[str, int]'
"""Dictionary mapping string function names to their integer ids used in this ``MemCFG``"""
asm_lines: 'np.ndarray'
"""Assembly line information
A contiguous 1-d numpy array of shape (num_asm_lines,) of integer assembly line tokens. Dtype is the smallest
unsigned dtype needed to store the largest token value in this ``MemCFG``
To get the assembly lines for some block index `block_idx`, you must get the assembly line indices from ``block_asm_idx``,
and use those to slice the assembly lines:
>>> block_idx = 7
>>> memcfg.asm_lines[memcfg.block_asm_idx[block_idx]:memcfg.block_asm_idx[block_idx + 1]]
Also see :func:`~bincfg.MemCFG.get_block_asm_lines`
"""
asm_memory_addresses: 'Union[None, np.ndarray]'
"""Memory addresses for all of the assembly lines
Only saved if `keep_memory_addresses=True` when constructing the ``MemCFG``. This will be a 1-d signed integer numpy
array, where a value of -1 means the memory address for that corresponding line was not present in the basic block
"""
block_asm_idx: 'np.ndarray'
"""Indices in ``asm_lines`` that correspond to the assembly lines for each basic block in this ``MemCFG``
A 1-d numpy array of shape (num_blocks + 1,). Dtype is the smallest unsigned dtype needed to store the value
`num_asm_lines`. Assembly tokens for a block at index `i` would have a start index of `block_asm_idx[i]` and an end
index of `block_asm_idx[i + 1]` in ``asm_lines``.
"""
block_func_idx: 'np.ndarray'
"""Integer ids for the function that each basic block belongs to
A 1-d numpy array of shape (num_blocks,) where each element is a function id for the block at that index. The id
can be found in ``function_name_to_idx``. Dtype is the smallest unsigned dtype needed to store the value `num_functions`
Also see :func:`~bincfg.MemCFG.get_block_function_idx` and :func:`~bincfg.MemCFG.get_block_function_name`
"""
block_flags: 'np.ndarray'
"""Integer of bit flags for each basic block
A 1-d numpy array of shape (num_blocks,) where each element is an integer of bit flags. See ``BlockInfoBitMask``
for more info. Dtype is the smallest unsigned dtype with enough bits to store all flags in ``BlockInfoBitMask``
Also see :func:`~bincfg.MemCFG.get_block_flags`
"""
block_memory_addresses: 'Union[np.ndarray, None]'
"""Integer memory addresses of basic blocks.
Only saved if `keep_memory_addresses=True` when constructing the ``MemCFG``. This will be a 1-d unsigned integer numpy
array containing the memory addresses
"""
block_asm_mem_addr_idx: 'Union[np.ndarray, None]'
"""Indices in ``block_memory_addresses`` that correspond to the assembly line memory addresses for basic blocks
A 1-d numpy array of shape (num_blocks + 1,). Dtype is the smallest unsigned dtype needed to store the number of
assembly line memory addresses. Memory addresses for a block at index `i` would have a start index of
`block_asm_mem_addr_idx[i]` and an end index of `block_asm_mem_addr_idx[i + 1]` in ``block_memory_addresses``.
Only saved if `keep_memory_addresses=True` when constructing the ``MemCFG``.
"""
metadata: 'dict'
"""Dictionary of metadata associated with this MemCFG"""
function_metadata: 'list[Union[int, dict]]'
"""Metadata for functions
A list of run length compressed metadata at the function level. We only compress metadata dictionaries that are empty.
Elements are in the same order as the function indices in `block_func_idx`. Elements are either dictionaries (for the
metadata of that current function), or integers indicating we should skip that many functions as they all have
no metadata.
"""
block_metadata: 'list[Union[int, dict]]'
"""Metadata for blocks
A list of run length compressed metadata at the basic block level. We only compress metadata dictionaries that are empty.
Elements are in the same order as the block indices in `block_asm_idx`. Elements are either dictionaries (for the
metadata of that current block), or integers indicating we should skip that many blocks as they all have
no metadata.
"""
graph_c: 'np.ndarray'
"""Array containing all of the outgoing edges for each block in order
1-D numpy array of shape (num_edges,). Dtype will be the smallest unsigned dtype required to store the value
`num_blocks + 1`. Each element is a block index to which that edge connects. Edges will be in the order they appear in
each block's ``edges_out`` attribute, for each block in order of their block_idx.
Also see :func:`~bincfg.MemCFG.get_edges_out`
NOTE: this also contains information on which types of edges they are inherently. If the block is NOT a function call
(stored as bit flag in the block_info array), then all edges for that block are normal edges. If it IS a function
call, then there are 3 cases:
1. it has one outgoing edge: that edge is always a function call
2. it has two outgoing edges, one function call, one normal: the first edge is the function call edge, the second
is a normal edge
3. it has >2 outgoing edges, or 2 function call edges: the edges will be listed first by function call edges,
then by normal edges, with a separator inbetween. The separator will have the max unsigned int value for
graph_c's dtype. This is why we use the dtype that can store `num_blocks + 1`, since we need this extra value
just in case. Whatever exactly it means for a basic block to have >2 outgoing edges while being a function
call is left up to the user. Possibly due to call operators with non-explicit operands (eg: register memory
locations)?
"""
graph_r: 'np.ndarray'
"""Array containing information on the number of outgoing edges for each block
1-D numpy array of shape (num_edges + 1,). Dtype will be the smallest unsigned dtype required to store the value
`num_edges`. This array is a cumulative sum of the number of edges for each basic block. One could get all of the
outgoing edges for a block using:
>>> start_idx = memcfg.graph_r[block_idx]
>>> end_idx = memcfg.graph_r[block_idx + 1]
>>> edges = memcfg.graph_c[start_idx:end_idx]
Also see :func:`~bincfg.MemCFG.get_edges_out`
"""
[docs]
class BlockInfoBitMask(Enum):
"""An Enum for block info bit masks
Each value is a tuple of the bit mask for that boolean, and a function to call with the block that returns a
boolean True if that bit should be set, False otherwise. If True, then that bit will be '1' in that block's
block_flags int.
"""
IS_FUNCTION_CALL: 'Tuple[int, BlockInfoBitMaskFunctionType]' = (1 << 0, lambda block: block.is_function_call)
"""Bit set if this block is a function call. See :py:func:`~bincfg.cfg_basic_block.is_function_call`"""
IS_FUNCTION_ENTRY: 'Tuple[int, BlockInfoBitMaskFunctionType]' = (1 << 1, lambda block: block.is_function_entry)
"""Bit set if this block is a function entry. See :py:func:`~bincfg.cfg_basic_block.is_function_entry`"""
IS_IN_EXTERN_FUNCTION: 'Tuple[int, BlockInfoBitMaskFunctionType]' = (1 << 2, lambda block: block.parent_function.is_extern_function)
"""Bit set if this block is within an external function. See :py:func:`~bincfg.cfg_function.is_extern_function`"""
IS_FUNCTION_JUMP: 'Tuple[int, BlockInfoBitMaskFunctionType]' = (1 << 3, lambda block: block.is_function_jump)
"""Bit set if this block is a function jump. IE: this block has a jump instruction that resolves to a basic block
in a separate function. See :py:func:`~bincfg.cfg_basic_block.is_function_jump`"""
IS_MULTI_FUNCTION_CALL: 'Tuple[int, BlockInfoBitMaskFunctionType]' = (1 << 4, lambda block: 0)
"""Bit set if this block is a multi-function call. IE: this block has either two or more function call edges out,
or one function call and two or more normal edges out. See :py:func:`~bincfg.cfg_basic_block.is_multi_function_call`
Currently not setting the block here in _block_flags_int(), but instead in MemCFG initialization in order to save
time (we don't have to compute get_sorted_edges() multiple times)
"""
@staticmethod
def _block_flags_int(block: 'CFGBasicBlock') -> 'int':
"""Gets all of the block information and stores it as a integer of bit-set flags
Args:
block (CFGBasicBlock): the CFGBasicBlock to get the info int from
Returns:
int: the integer of bit-set flags
"""
block_flags = 0
for bm in MemCFG.BlockInfoBitMask:
block_flags |= bm.value[0] * bm.value[1](block) # Times will convert boolean to 1 or 0
return block_flags
def __init__(self, cfg: 'CFG', normalizer: 'Optional[Union[str, NormalizerType]]' = None, keep_memory_addresses: 'bool' = False,
inplace: 'bool' = False, using_tokens: 'Optional[Union[dict, AtomicTokenDict]]' = None, force_renormalize: 'bool' = False):
# Make sure input is a cfg
if not isinstance(cfg, CFG):
raise TypeError("Can only build a MemCFG out of a CFG object, not '%s'" % type(cfg).__name__)
# Normalize the CFG if needed
if normalizer is not None or cfg.normalizer is None:
# Make sure there is some normalizer to use
if normalizer is None:
raise ValueError("Must pass a normalizer if `cfg` is unnormalized!")
cfg = normalize_cfg_data(cfg, normalizer=normalizer, inplace=inplace, force_renormalize=force_renormalize)
# Keep cfg's normalization if possible
self.normalizer = cfg.normalizer
# Figure out what the tokens should be, updating the token dict if we find new ones
self.tokens = {} if using_tokens is None else using_tokens
if isinstance(self.tokens, AtomicTokenDict):
self.tokens.addtokens(*{l for block in cfg.blocks for l in block.asm_lines})
else:
for block in cfg.blocks:
for l in block.asm_lines:
self.tokens.setdefault(l, len(self.tokens))
max_token = len(self.tokens)
max_asm_addr = default_max((a for b in cfg.blocks for a in b.asm_memory_addresses), 0)
max_block_addr = default_max((b.address for b in cfg.blocks), 0)
# Make mappings from function names to indices. Make sure there aren't duplicates (most likely only going to
# occur in functions with no name)
self.function_name_to_idx = {}
function_addr_to_idx = {}
self.function_metadata = []
for i, f in enumerate(cfg.functions):
if f.name in self.function_name_to_idx:
func_name = f.name
fn_idx = 0
while func_name in self.function_name_to_idx:
func_name = f.name + '_%d' % fn_idx
fn_idx += 1
else:
func_name = f.name
self.function_name_to_idx[func_name] = i
function_addr_to_idx[f.address] = i
self.function_metadata.append(f.metadata)
# Make the data arrays
self.asm_lines = np.empty([cfg.num_asm_lines], dtype=get_smallest_np_dtype(max_token))
self.block_asm_idx = np.empty([cfg.num_blocks + 1], dtype=get_smallest_np_dtype(cfg.num_asm_lines))
self.block_func_idx = np.empty([cfg.num_blocks], dtype=get_smallest_np_dtype(len(cfg.functions_dict)))
self.block_flags = np.empty([cfg.num_blocks], dtype=get_smallest_np_dtype(len(MemCFG.BlockInfoBitMask)))
graph_c = []
self.graph_r = np.empty([cfg.num_blocks + 1])
self.block_labels = {} # Mapping of block indices to block labels
self.block_metadata = []
# Set the initial graph_r start and final block_asm_idx
self.graph_r[0] = 0
self.block_asm_idx[-1] = len(self.asm_lines)
if keep_memory_addresses:
self.block_memory_addresses = np.empty([cfg.num_blocks], dtype=get_smallest_np_dtype(max_block_addr, signed=True))
self.asm_memory_addresses = np.empty(sum(len(b.asm_memory_addresses) for b in cfg.blocks), dtype=get_smallest_np_dtype(max_asm_addr, signed=True))
self.block_asm_mem_addr_idx = np.empty([cfg.num_blocks + 1], dtype=get_smallest_np_dtype(cfg.num_asm_lines))
self.block_asm_mem_addr_idx[-1] = len(self.asm_memory_addresses)
else:
self.asm_memory_addresses, self.block_asm_mem_addr_idx, self.block_memory_addresses = None, None, None
# Copy the metadata from the cfg
self.metadata = cfg.metadata.copy()
# Create temporary mapping of CFGBasicBlock.address to integer index of block (sorted by address for determinism)
block_addr_to_idx = {block.address: i for i, block in enumerate(cfg.blocks)}
# Convert the data at each block to better memory one
asm_line_idx, asm_mem_idx = 0, 0
for block_idx, block in enumerate(cfg.blocks):
# Get the assembly lines and memory addresses, and store the length of the assembly lines
asm_line_end = asm_line_idx + block.num_asm_lines
self.asm_lines[asm_line_idx: asm_line_end] = [self.tokens[l] for l in block.asm_lines]
self.block_asm_idx[block_idx] = asm_line_idx
asm_line_idx = asm_line_end
# Add in the memory addresses if using
if self.asm_memory_addresses is not None:
self.block_memory_addresses[block_idx] = block.address
asm_mem_end = asm_mem_idx + len(block.asm_memory_addresses)
self.asm_memory_addresses[asm_mem_idx: asm_mem_end] = block.asm_memory_addresses
self.block_asm_mem_addr_idx[block_idx] = asm_mem_idx
asm_mem_idx = asm_mem_end
# Get the block's function name
self.block_func_idx[block_idx] = function_addr_to_idx[block.parent_function.address]
# Get all of the edges associated with this block in order: normal edges, function call edges
edge_lists = block.get_sorted_edges(direction='out', as_sets=False)
# Add in the function call edges
graph_c += [block_addr_to_idx[edge.to_block.address] for edge in edge_lists[1]]
# Check for either >1 function call edges, or >2 edges while being a function call.
# We add a -1, which will be rolled over to the unsigned int max value for graph_c array
added_minus_1 = 0
if len(edge_lists[1]) >= 2 or (len(edge_lists[1]) == 1 and len(edge_lists[0]) > 1):
graph_c.append(-1)
added_minus_1 = 1
# Add in the normal edges
graph_c += [block_addr_to_idx[edge.to_block.address] for edge in edge_lists[0]]
# Update the new graph_r, taking into account whether or not we inserted an extra -1 to split function call/normal edges
self.graph_r[block_idx + 1] = sum(len(l) for l in edge_lists) + added_minus_1
# Get the block information flags for this block.
# Set the flag for is_multi_function_call here so we don't have to call get_sorted_edges more than once
self.block_flags[block_idx] = MemCFG._block_flags_int(block) | \
(MemCFG.BlockInfoBitMask.IS_MULTI_FUNCTION_CALL.value[0] * added_minus_1)
# Get the block metadata and memory address
self.block_metadata.append(block.metadata)
# Conver graph_c and graph_r to numpy arrays
graphc_dtype = get_smallest_np_dtype(len(cfg.blocks_dict) + 1, signed=False)
self.max_graph_c_int = np.iinfo(graphc_dtype).max
self.graph_c = np.array([(v if v != -1 else self.max_graph_c_int) for v in graph_c], dtype=graphc_dtype)
self.graph_r = np.cumsum(self.graph_r, dtype=get_smallest_np_dtype(len(self.graph_c)))
# Run length encode the metadata
self.function_metadata = _rl_encode_metadata(self.function_metadata)
self.block_metadata = _rl_encode_metadata(self.block_metadata)
[docs]
def get_block_info(self, block_idx):
"""Returns all the info associated with the given block index as a dictionary
Args:
block_idx (int): integer block index
Returns:
dict: the block info dictionary with keys/values:
* 'asm_lines' (np.ndarray): 1-d numpy array of unsigned integer assembly line tokens in this block
* 'asm_memory_addresses' (np.ndarray): 1-d numpy array of signed integer memory addresses
for the assembly lines in this block. Values will be -1 if the memory addresses do not exist
* 'edges_out' (np.ndarray): 1-d numpy array of unsigned integer block indices for all
of the edges out from this block
* 'edge_types' (np.ndarray): 1-d numpy array of uint8 values for the edge types associated
with all of the edges out. These are the values of objects in the EdgeType enum. Currently: EdgeType.NORMAL == 1,
EdgeType.FUNCTION_CALL == 2
* 'function_index' (int): the integer function index of the function this block resides in
* 'is_function_call' (bool): true if this block is a function call block (has at least one outgoing function call edge)
* 'is_function_entry' (bool): true if this block is a function entry block (has the same memory address
as its parent function)
* 'is_extern_function' (bool): true if this block is within an external function (parent_function.is_extern_function is True)
* 'is_function_jump' (bool): true if this block is a function jump block (has a 'normal' edge to a block
that is within another function)
* 'is_multi_function_call' (bool): true if this block is a multi-function call block (has 2 or more outgoing
function call edges. IE: a call table)
* 'metadata' (dict): dictionary of metadata associated with this block
"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
ret = (self.get_block_asm_lines(block_idx),) + (self.get_block_asm_memory_addresses(block_idx),) + self.get_block_edges_out(block_idx, ret_edge_types=True) \
+ (self.get_block_function_idx(block_idx),) + self.get_block_flags(block_idx) + (self.get_block_metadata(block_idx),)
ret = {k: v for k, v in zip(['asm_lines', 'asm_memory_addresses', 'edges_out', 'edge_types', 'function_index', 'is_function_call',
'is_function_entry', 'is_extern_function', 'is_function_jump', 'is_multi_function_call', 'metadata'], ret)}
return ret
[docs]
def get_block_asm_lines(self, block_idx: 'int') -> 'np.ndarray':
"""Get the asm lines associated with this block index
Args:
block_idx (int): integer block index
Returns:
np.ndarray: a 1-d numpy array of unsigned integer assembly tokens
"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
return self.asm_lines[self.block_asm_idx[block_idx]:self.block_asm_idx[block_idx+1]]
[docs]
def get_block_asm_memory_addresses(self, block_idx: 'int') -> 'np.ndarray':
"""Get the asm memory addresses associated with this block index
Values are -1 if the memory address did not exist in that block
Args:
block_idx (int): integer block index
Returns:
np.ndarray: a 1-d numpy array of signed integer assembly tokens
"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
return self.asm_memory_addresses[self.block_asm_mem_addr_idx[block_idx]:self.block_asm_mem_addr_idx[block_idx+1]]
[docs]
def get_block_edges_out(self, block_idx: 'int', ret_edge_types: 'bool' = False) -> \
'Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]':
"""Get numpy array of block indices for all edges out associated with the given block index
Args:
block_idx (int): integer block index
ret_edge_types (bool): if True, will also return a numpy array (1-d, dtype np.uint8) containing the edge
type values for each edge with values:
- 1: normal edge
- 2: function call edge
Returns:
Union[np.ndarray, Tuple[np.ndarray, np.ndarray:
either a 1-d numpy array of unsigned integer block indices for all edges out associated with the given
block index, or if `ret_edge_types=True`, then a tuple of (block_edge_inds, edge_types) where the `edge_types`
is a 1-d numpy array of uint8 edge types with the same shape as block_edge_inds that designates the types
of the edges. Edge types will be the values of those in the `EdgeType` enum.
"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
# Get all of the edges
ret = self.graph_c[self.graph_r[block_idx]:self.graph_r[block_idx+1]]
# Check if we are returning the edge types as well
if ret_edge_types:
# Check if this block is a function call
if self.is_block_function_call(block_idx):
# Check to see if this block is a multi-function call, in which case we have to split up the array
if self.is_block_multi_function_call(block_idx):
# Get the split index (index in ret that is equal to unsigned dtype max). It should have split_idx
# function call edges, and len(ret) - split_idx - 1 normal edges (to account for the fact that
# the split_idx itself is also stored in the array)
split_idx = np.argwhere(ret == self.max_graph_c_int)[0][0]
return ret[ret != self.max_graph_c_int], \
np.array([EdgeType.FUNCTION_CALL.value] * split_idx + [EdgeType.NORMAL.value] * (len(ret) - split_idx - 1))
# Otherwise it is not a multi-function call. We can simply return ret, and edge types are either
# [function_call] or [function_call, normal]
else:
return ret, np.array(([EdgeType.FUNCTION_CALL.value] if len(ret) == 1 else \
[EdgeType.FUNCTION_CALL.value, EdgeType.NORMAL.value]), dtype=np.uint8)
# It's not a function call, so we can just return ret and all edge types must be normal edges
else:
return ret, np.full(ret.shape, EdgeType.NORMAL.value, dtype=np.uint8)
# Otherwise we are not returning the edge types, just return all values in ret that are not the splitting value
# We only need to remove the splitting value if there are more than two edges, AND it is a function call. Otherwise
# we are doing a useless numpy comparison and memory-grabbing every call which takes a non-neglible amount of time
return ret[ret != self.max_graph_c_int]
[docs]
def get_block_function_idx(self, block_idx: 'int') -> 'int':
"""Get the function index for the given block index
Args:
block_idx (int): integer block index
Returns:
int: the integer function index for the given block index
"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
return self.block_func_idx[block_idx]
[docs]
def get_block_function_name(self, block_idx: 'int') -> 'str':
"""Get the function name for the given block index
Functions without names will start with '__unnamed_func__'
Args:
block_idx (int): integer block index
Returns:
str: the function name for the given block index
"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
func_idx = self.get_block_function_idx(block_idx)
# Make an inverse mapping now that we know we are calling this function
if not hasattr(self, 'function_idx_to_name'):
self.function_idx_to_name = {v: k for k, v in self.function_name_to_idx.items()}
return self.function_idx_to_name[func_idx]
[docs]
def get_block_memory_address(self, block_idx: 'int') -> 'int':
"""Returns the memory address for the given block, if present, -1 if not present
Args:
block_idx (int): integer block index
Returns:
int: the memory address
"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
return -1 if self.block_memory_addresses is None else self.block_memory_addresses[block_idx]
[docs]
def get_block_flags(self, block_idx: 'int') -> 'Tuple[bool, bool, bool, bool, bool, bool]':
"""Get all block flags for the given block index
Args:
block_idx (int): integer block index
Returns:
Tuple[bool, bool, bool, bool, bool, bool]: (is_block_function_call, is_block_function_entry,
is_block_extern_function, is_block_function_jump, is_block_multi_function_call)
"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
return self.is_block_function_call(block_idx), self.is_block_function_entry(block_idx), \
self.is_block_extern_function(block_idx), self.is_block_function_jump(block_idx), \
self.is_block_multi_function_call(block_idx)
[docs]
def get_function_block_inds(self, func_idx: 'int') -> 'list[int]':
"""Returns all of the block indices that are within the given function
Args:
func_idx (int): the integer function index
Returns:
list[int]: list of integer block indices that are within the given function
"""
assert_valid_idx(func_idx, self.num_functions, 'functions')
if func_idx not in self.function_name_to_idx.values():
raise ValueError("Unknown function index: %d" % func_idx)
return [i for i in range(self.num_blocks) if self.block_func_idx[i] == func_idx]
[docs]
def is_block_function_call(self, block_idx: 'int') -> 'bool':
"""True if this block is a function call, False otherwise"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
return (self.block_flags[block_idx] & MemCFG.BlockInfoBitMask.IS_FUNCTION_CALL.value[0]) > 0
[docs]
def is_block_function_entry(self, block_idx: 'int') -> 'bool':
"""True if this block is a function entry, False otherwise"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
return (self.block_flags[block_idx] & MemCFG.BlockInfoBitMask.IS_FUNCTION_ENTRY.value[0]) > 0
[docs]
def is_block_extern_function(self, block_idx: 'int') -> 'bool':
"""True if this block is in an external function, False otherwise"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
return (self.block_flags[block_idx] & MemCFG.BlockInfoBitMask.IS_IN_EXTERN_FUNCTION.value[0]) > 0
[docs]
def is_block_function_jump(self, block_idx: 'int') -> 'bool':
"""True if this block is a function jump, False otherwise"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
return (self.block_flags[block_idx] & MemCFG.BlockInfoBitMask.IS_FUNCTION_JUMP.value[0]) > 0
[docs]
def is_block_multi_function_call(self, block_idx: 'int') -> 'bool':
"""True if this block is a multi-function call, False otherwise"""
assert_valid_idx(block_idx, self.num_blocks, 'blocks')
return (self.block_flags[block_idx] & MemCFG.BlockInfoBitMask.IS_MULTI_FUNCTION_CALL.value[0]) > 0
@property
def num_blocks(self) -> 'int':
"""The number of blocks in this ``MemCFG``"""
return len(self.block_func_idx)
@property
def num_edges(self) -> 'int':
"""The number of edges in this ``MemCFG``"""
return len(self.graph_c[self.graph_c != self.max_graph_c_int])
@property
def num_asm_lines(self) -> 'int':
"""The number of assembly lines in this ``MemCFG``"""
return len(self.asm_lines)
@property
def num_functions(self) -> 'int':
"""The number of functions in this ``MemCFG``"""
return len(self.function_name_to_idx)
@property
def inv_tokens(self) -> 'dict[int, str]':
"""Returns the inverse of `self.tokens`: dictionary mapping token integers to their original strings"""
return {v:k for k, v in self.tokens.items()}
[docs]
def set_tokens(self, tokens: 'Union[dict, AtomicTokenDict]') -> 'Self':
"""Sets this MemCFG's tokens to the given tokens, and returns self"""
self.tokens = tokens
return self
[docs]
def normalize(self, normalizer: 'Optional[Union[str, NormalizerType]]' = None, using_tokens: 'Union[dict, AtomicTokenDict]' = None,
inplace: 'bool' = True, force_renormalize: 'bool' = False) -> 'MemCFG':
"""Normalizes this memcfg in-place.
Args:
normalizer (Optional[Union[str, NormalizerType]]): the normalizer to use. Can be a ``Normalizer`` object, or a
string, or None to use the default BaseNormalizer(). Defaults to None.
using_tokens (Union[dict, AtomicTokenDict]): tokens to use when normalizing
inplace (bool): whether or not to normalize inplace. Defaults to True.
force_renormalize (bool): by default, this method will only normalize this cfg if the passed
`normalizer` is != `self.normalizer`. However if `force_renormalize=True`, then this will be renormalized
even if it has been previously normalized with the same normalizer. Defaults to False.
Returns:
MemCFG: this ``MemCFG`` normalized
"""
return normalize_cfg_data(self, normalizer=normalizer, using_tokens=using_tokens, inplace=inplace,
force_renormalize=force_renormalize)
@property
def architecture(self) -> 'Architectures':
"""Returns the architecture being used. Currently a WIP
Checks for an 'arch' or 'architecture' key in the metadata and returns it if it is known. Can currently return:
'java', 'x86'
"""
for k in ['arch', 'architecture']:
if k in self.metadata:
arch = self.metadata[k]
break
else:
raise ValueError("Could not find 'arch' or 'architecture' key in metadata")
if arch in ['x86']:
return Architectures.X86
elif arch in ['java', 'java-bytecode']:
return Architectures.JAVA
else:
raise ValueError("Unknown architecture value: %s" % repr(arch))
[docs]
def get_edge_values(self) -> 'np.ndarray':
"""Returns the edge type values
Returns a 1-d numpy array of length self.num_edges and dtype np.int32 containing an integer type for each
edge depending on if it is a normal or function call edge. Edges are directed and have values from `EdgeType` enum.
Values:
- 1: 'normal' edges
- 2: 'function call' edges
NOTE: this returns as type np.int32 since pytorch can be finicky about what dtypes it wants
Returns:
np.ndarray: a 1-d numpy array of length self.num_edges and dtype np.int32 containing integer edge types
"""
return np.array([v for b in range(self.num_blocks) for v in self.get_block_edges_out(b, ret_edge_types=True)[1]], dtype=np.int32)
[docs]
def get_coo_indices(self) -> 'np.ndarray':
"""Returns the COO indices for this MemCFG
Returns a 2-d numpy array of shape (num_edges, 2) of dtype np.int32. Each row is an edge, column 0 is the 'row'
indexer, and column 1 is the 'column' indexer. EG:
.. highlight:: python
.. code-block:: python
original = np.array([
[0, 1],
[1, 1]
])
coo_indices = np.array([
[0, 1],
[1, 0],
[1, 1]
])
NOTE: this returns as type np.int32 since pytorch can be finicky about what dtypes it wants
NOTE: pytorch sparse_coo_tensor's indicies are the transpose of the array this method returns
Returns:
np.ndarray: a 2-d numpy array of shape (num_edges, 2) of dtype np.int32 containing COO indices
"""
inds = np.empty([self.num_edges, 2], dtype=np.int32)
start = 0
for bi in range(self.num_blocks):
edges_out = self.get_block_edges_out(bi, ret_edge_types=False)
end = start + len(edges_out)
inds[start:end, 0] = bi
inds[start:end, 1] = edges_out
start = end
return inds
[docs]
def to_adjacency_matrix(self, type: 'Literal["np", "numpy", "torch"]' = 'np', sparse: 'bool' = False) ->\
'Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]':
"""Returns an adjacency matrix representation of this memcfg's graph connections
Edges are directed and have values from `EdgeType` enum. Values:
- 1: 'normal' edges
- 2: 'function call' edges
Args:
type (Literal["np", "numpy", "torch"]): the type of matrix to return. Defaults to 'np'. Can be:
- 'np'/'numpy' for a numpy ndarray (dtype: np.int32)
- 'torch'/'pytorch' for a pytorch tensor (type: LongTensor)
sparse (bool): whether or not the return value should be a sparse matrix. Defaults to False. Has
different behaviors based on type:
- numpy array: returns a 2-tuple of sparse COO representation (indices, values).
NOTE: the indices are the transpose of those from `get_coo_indices()`
NOTE: if you want sparse CSR format, you already have it with self.graph_c and self.graph_r
- pytorch tensor: returns a pytorch sparse COO tensor.
NOTE: not using sparse CSR format for now since it seems to have less documentation/supportedness.
Returns:
Union[np.ndarray, Tuple[np.ndarray, np.ndarray:
an adjacency matrix representation of this ``MemCFG``
"""
type = type.lower()
# Return adj mat as intended type
if type in ['np', 'numpy']:
if sparse:
return (self.get_coo_indices().T, self.get_edge_values())
else:
ret = np.zeros((self.num_blocks, self.num_blocks))
return scatter_nd_numpy(ret, self.get_coo_indices(), self.get_edge_values())
elif type in ['torch', 'pytorch']:
torch = get_module('torch', err_message="Cannot find module `torch` required to return pytorch tensors!")
sparse_coo = torch.sparse_coo_tensor(indices=self.get_coo_indices().T, values=self.get_edge_values(), size=(self.num_blocks, self.num_blocks)).coalesce()
return sparse_coo if sparse else sparse_coo.to_dense()
else:
raise ValueError("Unknown adjacency matrix type: '%s'" % type)
[docs]
def to_cfg(self) -> 'CFG':
"""Converts this MemCFG back into a CFG
NOTE: if `keep_memory_addresses=False` when constucting this MemCFG, then memory addresses will not be present
and basic blocks will be given a memory address that is just their index in the block list
"""
cfg = CFG(metadata=self.metadata, normalizer=self.normalizer)
# Build the blocks
edge_types = [None] * (max(et.value for et in EdgeType) + 1)
for et in EdgeType:
edge_types[et.value] = et.name.lower()
block_metadata = self.get_block_metadata(None)
block_addrs = self.block_memory_addresses if self.block_memory_addresses is not None else np.arange(self.num_blocks)
blocks = [CFGBasicBlock(
address = block_addrs[block_idx],
edges_out = [(None, block_addrs[addr], edge_types[et]) for addr, et in zip(*self.get_block_edges_out(block_idx, ret_edge_types=True))],
asm_lines = [self.inv_tokens[t] for t in self.asm_lines[self.block_asm_idx[block_idx]:self.block_asm_idx[block_idx+1]]],
asm_memory_addresses = self.asm_memory_addresses[self.block_asm_mem_addr_idx[block_idx]:self.block_asm_mem_addr_idx[block_idx+1]]\
if self.asm_memory_addresses is not None else None,
metadata = block_metadata[block_idx],
) for block_idx in range(self.num_blocks)]
# Build the functions
func_metadata = self.get_function_metadata(None)
funcs = [CFGFunction(metadata=func_metadata[func_idx]) for func_idx in range(self.num_functions)]
for block_idx in range(self.num_blocks):
func_idx = self.get_block_function_idx(block_idx)
funcs[func_idx].blocks.append(blocks[block_idx])
funcs[func_idx].name = self.get_block_function_name(block_idx)
if self.is_block_extern_function(block_idx):
funcs[func_idx]._is_extern_function = True
if self.is_block_function_entry(block_idx):
funcs[func_idx].address = blocks[block_idx].address
# Add the functions to the cfg and return
cfg.add_function(*funcs)
return cfg
[docs]
def save(self, path: 'str') -> 'None':
"""Saves this MemCFG to the given path"""
with open(path, 'wb') as f:
pickle.dump(self, f)
[docs]
def dumps(self) -> 'str':
"""Returns this object pickled with pickle.dumps()"""
return pickle.dumps(self)
[docs]
def drop_tokens(self) -> 'Self':
"""Sets the tokens in this normalizer to None. Make sure you only do this if tokens are saved elsewhere! Returns self"""
self.tokens = None
return self
[docs]
@classmethod
def load(cls, path: 'str') -> 'MemCFG':
"""Loads a MemCFG from the given path"""
with open(path, 'rb') as f:
return pickle.load(f)
def __str__(self) -> 'str':
return "MemCFG with normalizer: %s and %d functions, %d blocks, %d assembly lines, and %d edges. Metadata:\n%s" % \
(repr(str(self.normalizer)), self.num_functions, self.num_blocks, self.num_asm_lines, self.num_edges, self.metadata)
def __repr__(self) -> 'str':
return self.__str__()
def __getstate__(self) -> 'dict':
"""State for pickling"""
ret = self.__dict__.copy()
if 'function_idx_to_name' in ret:
del ret['function_idx_to_name']
return ret
def __setstate__(self, state: 'dict') -> 'None':
"""Set state for pickling"""
for k, v in state.items():
setattr(self, k, v)
if not hasattr(self, 'max_graph_c_int'):
self.max_graph_c_int = np.iinfo(self.graph_c.dtype).max
def __eq__(self, other) -> 'bool':
return isinstance(other, MemCFG) and all(eq_obj(self, other, selector=s) for s in [
'normalizer', 'tokens', 'function_name_to_idx', 'asm_lines', 'asm_memory_addresses', 'block_asm_idx', 'block_func_idx',
'block_flags', 'metadata', 'function_metadata', 'block_metadata', 'graph_c', 'graph_r', 'block_labels'
])
def __hash__(self) -> 'hash':
return hash_obj([
self.normalizer, self.tokens, self.function_name_to_idx, self.asm_lines, self.asm_memory_addresses,
self.block_asm_idx, self.block_func_idx, self.block_flags, self.metadata, self.function_metadata,
self.block_metadata, self.graph_c, self.graph_r, self.block_labels,
], return_int=True)
def _rl_encode_metadata(meta_list: 'list[dict]') -> 'list[Union[int, dict]]':
"""Run length encodes metadata. Only encodes empty dictionaries for now"""
ret = []
for d in meta_list:
if d is None or len(d) == 0:
# If we need to insert a new integer for rl encoding
if len(ret) == 0 or isinstance(ret[-1], dict):
ret.append(1)
else:
ret[-1] += 1
else:
ret.append(d)
return ret
def _decode_rl_metadata(meta_list: 'list[Union[int, dict]]', idx: 'Optional[int]' = None):
"""Decodes run length encoded metadata lists
Assumes 0 < `idx` < number of functions
Args:
meta_list (list[Union[int, dict]]): the metadata compressed list
idx (Optional[int]): if None, then this will return the full uncompressed list. Otherwise this will return the
element at this index
"""
# If we are just returning the full list of metadata
if idx is None:
ret = []
for e in meta_list:
if isinstance(e, int):
ret += [{} for _ in range(e)]
else:
ret.append(e)
return ret
assert 0 <= idx < len(meta_list) + sum((e - 1) for e in meta_list if isinstance(e, int))
# Otherwise, we can stop early if possible
curr_idx = 0
ml_idx = 0
while True:
if curr_idx == idx:
return {} if isinstance(meta_list[ml_idx], int) else meta_list[ml_idx]
elif isinstance(meta_list[ml_idx], int):
# If our index is in this range, return empty dict
if curr_idx <= idx < curr_idx + meta_list[ml_idx]:
return {}
curr_idx += meta_list[ml_idx]
else:
curr_idx += 1
ml_idx += 1
# We should not be able to reach here
assert False
[docs]
def assert_valid_idx(idx: 'int', max_val: 'int', objects_str: 'str') -> 'None':
"""Asserts that the idx passed is >= 0 and < max_val. `objects_str` is the type of object for error message (IE: 'blocks', 'functions', etc.)"""
func_name = repr(inspect.getframeinfo(inspect.currentframe().f_back)[2])
if idx < 0:
raise ValueError("Index passed to function %s must be non-negative, got: %d" % (func_name, idx))
elif idx >= max_val:
raise ValueError("Index passed to function %s must be < the maximum number of %s (%d), got: %d" % (func_name, objects_str, max_val, idx))
[docs]
def default_max(iterable, default=None):
"""Returns the max value in `iterable`, or `default` value if len(iterable) == 0"""
try:
return max(iterable)
except ValueError:
return default