Skip to content

Commit

Permalink
Merge branch 'main' into refactoring_switch_node_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ebehner authored Jun 21, 2024
2 parents abef8b9 + df6e516 commit 95a86d5
Show file tree
Hide file tree
Showing 12 changed files with 590 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ def constant_fold(operation: OperationType, constants: list[Constant], result_ty
)


def _constant_fold_arithmetic_binary(constants: list[Constant], fun: Callable[[int, int], int], norm_sign: Optional[bool] = None) -> int:
def _constant_fold_arithmetic_binary(
constants: list[Constant],
fun: Callable[[int, int], int],
norm_sign: Optional[bool] = None,
allow_mismatched_sizes: bool = False,
) -> int:
"""
Fold an arithmetic binary operation with constants as operands.
Expand All @@ -84,7 +89,7 @@ def _constant_fold_arithmetic_binary(constants: list[Constant], fun: Callable[[i

if len(constants) != 2:
raise IncompatibleOperandCount(f"Expected exactly 2 constants to fold, got {len(constants)}.")
if not all(constant.type.size == constants[0].type.size for constant in constants):
if not allow_mismatched_sizes and not all(constant.type.size == constants[0].type.size for constant in constants):
raise UnsupportedMismatchedSizes(f"Can not fold constants with different sizes: {[constant.type for constant in constants]}")

left, right = constants
Expand Down Expand Up @@ -137,13 +142,19 @@ def _constant_fold_shift(constants: list[Constant], fun: Callable[[int, int], in
return fun(normalize_int(left.value, left.type.size, norm_signed), right.value)


def remainder(n, d):
return (-1 if n < 0 else 1) * (n % d)


_OPERATION_TO_FOLD_FUNCTION: dict[OperationType, Callable[[list[Constant]], int]] = {
OperationType.minus: partial(_constant_fold_arithmetic_binary, fun=operator.sub),
OperationType.plus: partial(_constant_fold_arithmetic_binary, fun=operator.add),
OperationType.multiply: partial(_constant_fold_arithmetic_binary, fun=operator.mul, norm_sign=True),
OperationType.multiply_us: partial(_constant_fold_arithmetic_binary, fun=operator.mul, norm_sign=False),
OperationType.divide: partial(_constant_fold_arithmetic_binary, fun=operator.floordiv, norm_sign=True),
OperationType.divide_us: partial(_constant_fold_arithmetic_binary, fun=operator.floordiv, norm_sign=False),
OperationType.modulo: partial(_constant_fold_arithmetic_binary, fun=remainder, norm_sign=True, allow_mismatched_sizes=True),
OperationType.modulo_us: partial(_constant_fold_arithmetic_binary, fun=operator.mod, norm_sign=False, allow_mismatched_sizes=True),
OperationType.negate: partial(_constant_fold_arithmetic_unary, fun=operator.neg),
OperationType.left_shift: partial(_constant_fold_shift, fun=operator.lshift, signed=True),
OperationType.right_shift: partial(_constant_fold_shift, fun=operator.rshift, signed=True),
Expand Down
164 changes: 102 additions & 62 deletions decompiler/pipeline/ssa/dependency_graph.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,114 @@
from typing import Iterable, List, Optional, Set
import itertools
from itertools import combinations
from typing import Iterator

import networkx
from decompiler.structures.graphs.cfg import ControlFlowGraph
from decompiler.structures.interferencegraph import InterferenceGraph
from decompiler.structures.pseudo import Expression, Operation, OperationType
from decompiler.structures.pseudo.expressions import Variable
from decompiler.structures.pseudo.instructions import Assignment
from decompiler.structures.pseudo.operations import Call
from networkx import DiGraph, weakly_connected_components
from decompiler.util.decoration import DecoratedGraph
from networkx import MultiDiGraph

# Multiplicative constant applied to dependency scores when encountering operations, to penalize too much nesting.
OPERATION_PENALTY = 0.9

def _non_call_assignments(cfg: ControlFlowGraph) -> Iterable[Assignment]:

def decorate_dependency_graph(dependency_graph: MultiDiGraph, interference_graph: InterferenceGraph) -> DecoratedGraph:
"""
Creates a decorated graph from the given dependency and interference graphs.
This function constructs a new graph where:
- Variables are represented as nodes.
- Dependencies between variables are represented as directed edges.
- Interferences between variables are represented as red, undirected edges.
"""
decorated_graph = MultiDiGraph()
for node in dependency_graph.nodes:
decorated_graph.add_node(hash(node), label="\n".join(map(lambda n: f"{n}: {n.type}, aliased: {n.is_aliased}", node)))
for u, v, data in dependency_graph.edges.data():
decorated_graph.add_edge(hash(u), hash(v), label=f"{data['score']}")
for nodes in networkx.weakly_connected_components(dependency_graph):
for node_1, node_2 in combinations(nodes, 2):
if any(interference_graph.has_edge(pair[0], pair[1]) for pair in itertools.product(node_1, node_2)):
decorated_graph.add_edge(hash(node_1), hash(node_2), color="red", dir="none")

return DecoratedGraph(decorated_graph)


def dependency_graph_from_cfg(cfg: ControlFlowGraph) -> MultiDiGraph:
"""
Construct the dependency graph of the given CFG, i.e. adds an edge between two variables if they depend on each other.
- Add an edge the definition to at most one requirement for each instruction.
- All variables that where not defined via Phi-functions before have out-degree of at most 1, because they are defined at most once.
- Variables that are defined via Phi-functions can have one successor for each required variable of the Phi-function.
"""
dependency_graph = MultiDiGraph()

for variable in _collect_variables(cfg):
dependency_graph.add_node((variable,))
for instruction in _assignments_in_cfg(cfg):
defined_variables = instruction.definitions
for used_variable, score in _expression_dependencies(instruction.value).items():
if score > 0:
dependency_graph.add_edges_from((((dvar,), (used_variable,)) for dvar in defined_variables), score=score)

return dependency_graph


def _collect_variables(cfg: ControlFlowGraph) -> Iterator[Variable]:
"""
Yields all variables contained in the given control flow graph.
"""
for instruction in cfg.instructions:
for subexpression in instruction.subexpressions():
if isinstance(subexpression, Variable):
yield subexpression


def _assignments_in_cfg(cfg: ControlFlowGraph) -> Iterator[Assignment]:
"""Yield all interesting assignments for the dependency graph."""
for instr in cfg.instructions:
if isinstance(instr, Assignment) and isinstance(instr.destination, Variable) and not isinstance(instr.value, Call):
if isinstance(instr, Assignment):
yield instr


class DependencyGraph(DiGraph):
def __init__(self, interference_graph: Optional[InterferenceGraph] = None):
super().__init__()
self.add_nodes_from(interference_graph.nodes)
self.interference_graph = interference_graph

@classmethod
def from_cfg(cls, cfg: ControlFlowGraph, interference_graph: InterferenceGraph):
"""
Construct the dependency graph of the given CFG, i.e. adds an edge between two variables if they depend on each other.
- Add an edge the definition to at most one requirement for each instruction.
- All variables that where not defined via Phi-functions before have out-degree at most 1, because they are defined at most once
- Variables that are defined via Phi-functions can have one successor for each required variable of the Phi-function.
"""
dependency_graph = cls(interference_graph)
for instruction in _non_call_assignments(cfg):
defined_variable = instruction.destination
if isinstance(instruction.value, Variable):
if dependency_graph._variables_can_have_same_name(defined_variable, instruction.value):
dependency_graph.add_edge(defined_variable, instruction.requirements[0], strength="high")
elif len(instruction.requirements) == 1:
if dependency_graph._variables_can_have_same_name(defined_variable, instruction.requirements[0]):
dependency_graph.add_edge(defined_variable, instruction.requirements[0], strength="medium")
else:
if non_interfering_variable := dependency_graph._non_interfering_requirements(instruction.requirements, defined_variable):
dependency_graph.add_edge(defined_variable, non_interfering_variable, strength="low")
return dependency_graph

def _non_interfering_requirements(self, requirements: List[Variable], defined_variable: Variable) -> Optional[Variable]:
"""Get the unique non-interfering requirement if it exists, otherwise we return None."""
non_interfering_requirement = None
for required_variable in requirements:
if self._variables_can_have_same_name(defined_variable, required_variable):
if non_interfering_requirement:
return None
non_interfering_requirement = required_variable
return non_interfering_requirement

def _variables_can_have_same_name(self, source: Variable, sink: Variable) -> bool:
"""
Two variable can have the same name, if they have the same type, are both aliased or both non-aliased variables, and if they
do not interfere.
:param source: The potential source vertex.
:param sink: The potential sink vertex
:return: True, if the given variables can have the same name, and false otherwise.
"""
if self.interference_graph.are_interfering(source, sink) or source.type != sink.type or source.is_aliased != sink.is_aliased:
return False
if source.is_aliased and sink.is_aliased and source.name != sink.name:
return False
return True

def get_components(self) -> Iterable[Set[Variable]]:
"""Returns the weakly connected components of the dependency graph."""
for component in weakly_connected_components(self):
yield set(component)
def _expression_dependencies(expression: Expression) -> dict[Variable, float]:
"""
Calculate the dependencies of an expression in terms of its constituent variables.
This function analyzes the given `expression` and returns a dictionary mapping each
`Variable` to a float score representing its contribution or dependency weight within
the expression.
The scoring mechanism accounts for different types of operations and
penalizes nested operations to reflect their complexity.
"""
match expression:
case Variable():
return {expression: 1.0}
case Operation():
if expression.operation in {
OperationType.call,
OperationType.address,
OperationType.dereference,
OperationType.member_access,
}:
return {}

operands_dependencies = list(filter(lambda d: d, (_expression_dependencies(operand) for operand in expression.operands)))
dependencies: dict[Variable, float] = {}
for deps in operands_dependencies:
for var in deps:
score = deps[var]
score /= len(operands_dependencies)
score *= OPERATION_PENALTY # penalize operations, so that expressions like (a + (a + (a + (a + a)))) gets a lower score than just (a)

if var not in dependencies:
dependencies[var] = score
else:
dependencies[var] += score

return dependencies
case _:
return {}
26 changes: 15 additions & 11 deletions decompiler/pipeline/ssa/outofssatranslation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from collections import defaultdict
from configparser import NoOptionError
from enum import Enum
from typing import DefaultDict, List
from typing import Callable, DefaultDict, List

from decompiler.pipeline.ssa.phi_cleaner import PhiFunctionCleaner
from decompiler.pipeline.ssa.phi_dependency_resolver import PhiDependencyResolver
from decompiler.pipeline.ssa.phi_lifting import PhiFunctionLifter
from decompiler.pipeline.ssa.variable_renaming import MinimalVariableRenamer, SimpleVariableRenamer
from decompiler.pipeline.ssa.variable_renaming import ConditionalVariableRenamer, MinimalVariableRenamer, SimpleVariableRenamer
from decompiler.pipeline.stage import PipelineStage
from decompiler.structures.graphs.cfg import BasicBlock
from decompiler.structures.interferencegraph import InterferenceGraph
Expand Down Expand Up @@ -98,12 +98,13 @@ def _out_of_ssa(self) -> None:
-> There are different optimization levels
"""
try:
self.out_of_ssa_strategy[self._optimization](self)
except KeyError:
error_message = f"The Out of SSA according to the optimization level {self._optimization.value} is not implemented so far."
logging.error(error_message)
raise NotImplementedError(error_message)
strategy = self.out_of_ssa_strategy.get(self._optimization, None)
if strategy is None:
raise NotImplementedError(
f"The Out of SSA according to the optimization level {self._optimization.value} is not implemented so far."
)

strategy(self)

def _simple_out_of_ssa(self) -> None:
"""
Expand Down Expand Up @@ -158,12 +159,15 @@ def _conditional_out_of_ssa(self) -> None:
This is a more advanced algorithm for out of SSA:
- We first remove the circular dependency of the Phi-functions
- Then, we remove the Phi-functions by lifting them to their predecessor basic blocks.
- Afterwards, we rename the variables, by considering their dependency on each other.
- Afterwards, we rename the variables by considering their dependency on each other.
"""
pass
PhiDependencyResolver(self._phi_functions_of).resolve()
self.interference_graph = InterferenceGraph(self.task.graph)
PhiFunctionLifter(self.task.graph, self.interference_graph, self._phi_functions_of).lift()
ConditionalVariableRenamer(self.task, self.interference_graph).rename()

# This translator maps the optimization levels to the functions.
out_of_ssa_strategy = {
out_of_ssa_strategy: dict[SSAOptions, Callable[["OutOfSsaTranslation"], None]] = {
SSAOptions.simple: _simple_out_of_ssa,
SSAOptions.minimization: _minimization_out_of_ssa,
SSAOptions.lift_minimal: _lift_minimal_out_of_ssa,
Expand Down
Loading

0 comments on commit 95a86d5

Please sign in to comment.