From 43f49d1643afeadac0f25a84963a7654047afbd4 Mon Sep 17 00:00:00 2001 From: Manuel Blatt <45859907+blattm@users.noreply.github.com> Date: Wed, 13 Nov 2024 15:54:38 +0100 Subject: [PATCH] Improving the Go/Rust Support - Boilerplate Removal and String Recovery (#437) --------- Co-authored-by: joey Co-authored-by: ebehner --- decompiler/frontend/binaryninja/frontend.py | 18 + .../binaryninja/handlers/constants.py | 15 +- .../binaryninja/rust_string_detection.py | 63 +++ .../bitfieldcomparisonunrolling.py | 24 +- decompiler/pipeline/pipeline.py | 4 + decompiler/pipeline/preprocessing/__init__.py | 2 + .../preprocessing/remove_go_prologue.py | 386 ++++++++++++++++++ .../remove_noreturn_boilerplate.py | 102 +++++ .../preprocessing/remove_stack_canary.py | 39 +- decompiler/pipeline/preprocessing/util.py | 125 +++++- decompiler/structures/pseudo/expressions.py | 4 + decompiler/structures/pseudo/operations.py | 2 - decompiler/task.py | 1 + decompiler/util/default.json | 50 +++ 14 files changed, 815 insertions(+), 20 deletions(-) create mode 100644 decompiler/frontend/binaryninja/rust_string_detection.py create mode 100644 decompiler/pipeline/preprocessing/remove_go_prologue.py create mode 100644 decompiler/pipeline/preprocessing/remove_noreturn_boilerplate.py diff --git a/decompiler/frontend/binaryninja/frontend.py b/decompiler/frontend/binaryninja/frontend.py index 11c691c60..b1ff994d5 100644 --- a/decompiler/frontend/binaryninja/frontend.py +++ b/decompiler/frontend/binaryninja/frontend.py @@ -3,10 +3,12 @@ from __future__ import annotations import logging +from typing import List import binaryninja from binaryninja import BinaryView from binaryninja.types import SymbolType +from decompiler.frontend.binaryninja.rust_string_detection import RustStringDetection from decompiler.task import DecompilerTask from decompiler.util.options import Options @@ -68,12 +70,16 @@ def lift(self, task: DecompilerTask): function = self._get_binninja_function(task.function_identifier) lifter, parser = self._create_lifter_parser(task.options) + rust_string_detection = RustStringDetection(self._bv, task.options) + rust_string_detection.run() + task.function_return_type = lifter.lift(function.return_type) task.function_parameters = [lifter.lift(param_type) for param_type in function.type.parameters] self._tagging.run(function, task.options) task.cfg = parser.parse(function) + task.function_parameter_locations = self._parameter_locations(function) task.complex_types = parser.complex_types except Exception as e: task.fail("Function lifting", e) @@ -81,6 +87,18 @@ def lift(self, task: DecompilerTask): if task.options.getboolean("pipeline.debug", fallback=False): raise e + def _parameter_locations(self, function: binaryninja.function.Function) -> List[str | None]: + """ + For a given Binary Ninja Function, this method returns a list of its parameters' locations in the correct order. + E.g. if the first parameter is stored in r14, the first entry in the returned list will be 'r14'. + """ + raw_parameters = function.type.parameters + parameter_locations = [] + for parameter in raw_parameters: + name = parameter.location.name if parameter.location is not None else None + parameter_locations.append(name) + return parameter_locations + def get_all_function_names(self): """Returns the entire list of all function names in the binary. Ignores blacklisted functions and imported functions.""" functions = list() diff --git a/decompiler/frontend/binaryninja/handlers/constants.py b/decompiler/frontend/binaryninja/handlers/constants.py index 25c897889..fc270a0c9 100644 --- a/decompiler/frontend/binaryninja/handlers/constants.py +++ b/decompiler/frontend/binaryninja/handlers/constants.py @@ -8,12 +8,11 @@ from decompiler.frontend.lifter import Handler from decompiler.structures.pseudo import ( Constant, - CustomType, + FunctionSymbol, GlobalVariable, Integer, NotUseableConstant, OperationType, - Pointer, Symbol, UnaryOperation, ) @@ -61,10 +60,18 @@ def lift_constant_pointer(self, pointer: mediumlevelil.MediumLevelILConstPtr, ** res = self._lifter.lift(variable, view=view, parent=pointer) elif (symbol := view.get_symbol_at(pointer.constant)) and symbol.type != SymbolType.DataSymbol: - return self._lifter.lift(symbol) + if isinstance(result := self._lifter.lift(symbol), FunctionSymbol): + try: + result.can_return = view.get_function_at(pointer.constant).can_return.value + return result + except Exception: + pass + return result elif function := view.get_function_at(pointer.constant): - return self._lifter.lift(function.symbol) + if isinstance(result := self._lifter.lift(function.symbol), FunctionSymbol): + result.can_return = function.can_return.value + return result else: res = self._lifter.lift(DataVariable(view, pointer.constant, Type.void(), False), view=view, parent=pointer) diff --git a/decompiler/frontend/binaryninja/rust_string_detection.py b/decompiler/frontend/binaryninja/rust_string_detection.py new file mode 100644 index 000000000..ea2bf99f1 --- /dev/null +++ b/decompiler/frontend/binaryninja/rust_string_detection.py @@ -0,0 +1,63 @@ +import logging +import sys + +from binaryninja import BinaryView +from decompiler.util.options import Options + + +class RustStringDetection: + """ + This 'stage' detects certain Rust strings (string slices), which are struct based strings. + It requires the RustStringSlicer. A path to the tool needs to be configured via the options. + + The stage is executed before lifting, as it uses the Binary Ninja API to identify string slices + and 'mark' them, by assigning the appropriate type. + It can be configured to run always, never, or for Rust binaries only. + """ + + def __init__(self, binary_view: BinaryView, options: Options): + self._bv = binary_view + self._enabled = options.getboolean("rust-string-detection.enabled", fallback=False) + self._rust_binaries_only = options.getboolean("rust-string-detection.rust_binaries_only", fallback=False) + self._string_slicer_path = options.getstring("rust-string-detection.string_slicer_path", fallback="") + self._debug_submodules = options.getboolean("logging.debug-submodules", fallback=False) + + def is_rust_binary(self) -> bool: + """ + Simple heurstic to determine, whether the binary is a Rust binary. + + """ + for _ in self._bv.find_all_data(self._bv.start, self._bv.end, "rustc".encode("utf-8")): + return True + for _ in self._bv.find_all_data(self._bv.start, self._bv.end, "cargo".encode("utf-8")): + return True + return False + + def run(self): + """ + Runs the Rust String Slicer, if the required conditions are met. + + String Slicer's path will be added to Python's path before importing the module. + """ + if not self._enabled: + logging.info("Rust String Slicer not executed") + return + + if self._rust_binaries_only and not self.is_rust_binary(): + logging.info("Rust String Slicer not executed: Not a Rust Binary") + return + + logging.info("Starting Rust String Slicer") + try: + sys.path.append(self._string_slicer_path) + from rust_string_slicer.binja_plugin.actions import RecoverStringFromReadOnlyDataTask, RustStringSlice + + if not RustStringSlice.check_binary_ninja_type_exists(self._bv): + RustStringSlice.create_binary_ninja_type(self._bv) + RecoverStringFromReadOnlyDataTask(bv=self._bv).run() + + except Exception as e: + if self._debug_submodules: + raise RuntimeError(e) + logging.warning("Rust String Slicer failed. Please check if the tool is installed and the path is set correctly!") + return diff --git a/decompiler/pipeline/expressions/bitfieldcomparisonunrolling.py b/decompiler/pipeline/expressions/bitfieldcomparisonunrolling.py index bb7326d45..b2bdb8610 100644 --- a/decompiler/pipeline/expressions/bitfieldcomparisonunrolling.py +++ b/decompiler/pipeline/expressions/bitfieldcomparisonunrolling.py @@ -103,13 +103,13 @@ def _get_folded_case(self, block: BasicBlock) -> Optional[FoldedCase]: if not isinstance(branch_instruction := block[-1], Branch): return None match branch_instruction.condition: - case Condition(OperationType.equal, subexpr, Constant(value=0x0)): + case Condition(operation=OperationType.equal, left=subexpr, right=Constant(value=0x0)): edge_type_to_case_node = FalseCase - case Condition(OperationType.not_equal, subexpr, Constant(value=0x0)): + case Condition(operation=OperationType.not_equal, left=subexpr, right=Constant(value=0x0)): edge_type_to_case_node = TrueCase - case Condition(OperationType.equal, Constant(value=0x0), subexpr): + case Condition(operation=OperationType.equal, left=Constant(value=0x0), right=subexpr): edge_type_to_case_node = FalseCase - case Condition(OperationType.not_equal, Constant(value=0x0), subexpr): + case Condition(operation=OperationType.not_equal, left=Constant(value=0x0), right=subexpr): edge_type_to_case_node = TrueCase case _: return None @@ -132,17 +132,19 @@ def _get_switch_var_and_bitfield(self, subexpr: Expression) -> Optional[Tuple[Ex """ match subexpr: case BinaryOperation( - OperationType.bitwise_and, - BinaryOperation( - OperationType.bitwise_and, BinaryOperation(OperationType.left_shift, Constant(value=1), switch_var), Constant() + operation=OperationType.bitwise_and, + left=BinaryOperation( + operation=OperationType.bitwise_and, + left=BinaryOperation(operation=OperationType.left_shift, left=Constant(value=1), right=switch_var), + right=Constant(), ), - Constant() as bit_field, + right=Constant() as bit_field, ) if bit_field.value != 0xFFFFFFFF: return switch_var, bit_field case BinaryOperation( - OperationType.bitwise_and, - BinaryOperation(OperationType.left_shift, Constant(value=1), switch_var), - Constant() as bit_field, + operation=OperationType.bitwise_and, + left=BinaryOperation(operation=OperationType.left_shift, left=Constant(value=1), right=switch_var), + right=Constant() as bit_field, ) if bit_field.value != 0xFFFFFFFF: return switch_var, bit_field case _: diff --git a/decompiler/pipeline/pipeline.py b/decompiler/pipeline/pipeline.py index eb39b423f..bebb8459a 100644 --- a/decompiler/pipeline/pipeline.py +++ b/decompiler/pipeline/pipeline.py @@ -13,6 +13,8 @@ MemPhiConverter, PhiFunctionFixer, RegisterPairHandling, + RemoveGoPrologue, + RemoveNoreturnBoilerplate, RemoveStackCanary, SwitchVariableDetection, ) @@ -28,7 +30,9 @@ PREPROCESSING_STAGES = [ CompilerIdiomHandling, + RemoveGoPrologue, RemoveStackCanary, + RemoveNoreturnBoilerplate, RegisterPairHandling, Coherence, SwitchVariableDetection, diff --git a/decompiler/pipeline/preprocessing/__init__.py b/decompiler/pipeline/preprocessing/__init__.py index 5dfe26cc0..1006fc0da 100644 --- a/decompiler/pipeline/preprocessing/__init__.py +++ b/decompiler/pipeline/preprocessing/__init__.py @@ -6,5 +6,7 @@ from .missing_definitions import InsertMissingDefinitions from .phi_predecessors import PhiFunctionFixer from .register_pair_handling import RegisterPairHandling +from .remove_go_prologue import RemoveGoPrologue +from .remove_noreturn_boilerplate import RemoveNoreturnBoilerplate from .remove_stack_canary import RemoveStackCanary from .switch_variable_detection import BackwardSliceSwitchVariableDetection as SwitchVariableDetection diff --git a/decompiler/pipeline/preprocessing/remove_go_prologue.py b/decompiler/pipeline/preprocessing/remove_go_prologue.py new file mode 100644 index 000000000..c0227324c --- /dev/null +++ b/decompiler/pipeline/preprocessing/remove_go_prologue.py @@ -0,0 +1,386 @@ +"""Module for removing go idioms""" + +import logging +from typing import Optional, Set, Tuple + +from decompiler.pipeline.preprocessing.util import get_constant_condition, is_noreturn_node, match_expression +from decompiler.pipeline.stage import PipelineStage +from decompiler.structures.graphs.basicblock import BasicBlock +from decompiler.structures.graphs.branches import ConditionalEdge, FalseCase, TrueCase, UnconditionalEdge +from decompiler.structures.pseudo.expressions import Variable +from decompiler.structures.pseudo.instructions import Assignment, Branch, Comment, Phi +from decompiler.structures.pseudo.operations import Call, OperationType, UnaryOperation +from decompiler.task import DecompilerTask + + +class RemoveGoPrologue(PipelineStage): + """ + RemoveGoPrologue finds and removes Go function prologues, + Caution: this stage changes code semantic + """ + + name = "remove-go-prologue" + + def run(self, task: DecompilerTask): + if task.options.getboolean(f"{self.name}.remove_prologue", fallback=False): + self._cfg = task.graph + self.r14_name = self._get_r14_name(task) + self._function_name = task.name + if self._check_and_remove_go_prologue(): + pass + else: + logging.info("No Go function prologue found") + + def _get_r14_name(self, task: DecompilerTask) -> str | None: + """ + Returns the variable name of the parameter stored in r14, e.g. 'arg1'. + If no such parameter exists, None is returned. + """ + r14_parameter_index = None + for i, location in enumerate(task.function_parameter_locations): + if location == "r14": + r14_parameter_index = i + break + if r14_parameter_index is None: + return None + return task.function_parameters[r14_parameter_index].name + + def _is_root_single_indirect_successor(self, node: BasicBlock) -> bool: + """ + Helper function used to verify the graph structure. + + It checks whether there is a path from the given `node` to the root, in which every node only has one successor, zero instructions and just one incomming edge. + In other words, it checks if root is the single successor of the given `node`, but with possibly only indirect via jumps. + """ + successors = self._cfg.get_successors(node) + + if len(successors) != 1: + return False + successor = successors[0] + + if successor == self._cfg.root: + return True + + if len(node.instructions) == 0 and self._cfg.in_degree(node) == 1: + return self._is_root_single_indirect_successor(successor) + + return False + + def _find_morestack_node_in_loop(self, node: BasicBlock) -> BasicBlock: + """ + Helper function used to verify the graph structure. + + If we have a loop connecting the morestack node to the root node, possibly via jumps, we can identify the morestack node as it is the only node with >0 instructions. + """ + if len(node.instructions) != 0: + return node + + successor = self._cfg.get_successors(node)[0] + + # To prevent endless loops + if successor == self._cfg.root: + return node + + return self._find_morestack_node_in_loop(successor) + + def _verify_graph_structure(self) -> Optional[Tuple[BasicBlock, BasicBlock]]: + """ + Verify the graph structure. This method returns morestack_node and start_node if graph structure matches go prologue, otherwise None. + + Typically Binary ninja successfully detected the loop leading form the morestack_node back to the root. + Since 3.5 this is no longer the case. Therefore, we also check if an alternative (loopless) graph structure matches. + """ + return self._verify_graph_structure_loop() or self._verify_graph_structure_loopless() + + def _verify_graph_structure_loopless(self) -> Optional[Tuple[BasicBlock, BasicBlock]]: + """ + Verify the graph structure. This method returns morestack_node and start_node if graph structure matches go prologue, otherwise None. + + This method checks for the newer CFGs without loops created by Binary Ninja >= 4.0 + """ + # In a Go function prologue one of the successors (start_node) marks the start of the function. + # The other successor (morestack_node) contains a call to runtime_morestack(_noctxt_abi0) + # and has the root as its only successor. + # morestack_node is the only predecessor of root. + # root is the only predecessor of morestack_node. EXCEPT IF A NON-RETURNING FUNCTION RIGHT BEFORE IT IS NOT DETECTED! + + # Function should have a root node + root = self._cfg.root + if root is None: + return None + + # root node should have no incoming node: not even from morestack node + if self._cfg.in_degree(root) != 0: + return None + + # root node needs exactly two successors + successors = self._cfg.get_successors(root) + if len(successors) != 2: + return None + + # The following code determines start_node and morestack_node + morestack_node = None + start_node = None + for successor in successors: + if result := self._find_morestack_node_loopless(successor, set()): + morestack_node = result + else: + start_node = successor + + if (start_node is None) or (morestack_node is None): + return None + + # Dont check (self._cfg.in_degree(morestack_node) != 1), because of non-returning functions... + # however, check that those edges are unconditional + conditional_in_edges = [edge for edge in self._cfg.get_in_edges(morestack_node) if isinstance(edge, ConditionalEdge)] + if len(conditional_in_edges) > 1: # zero is ok, because the graph could be root -> goto_node -> morestack_node + return None + + return start_node, morestack_node + + def _find_morestack_node_loopless(self, node: BasicBlock, visited: Set[BasicBlock]) -> BasicBlock | None: + """ + Helper function used to verify the graph structure. + + For Binary Ninja >=4.0 the morestack node is a no return node connected to the root (possibly via jump nodes) + """ + if node in visited: + return None + + visited.add(node) + successors = self._cfg.get_successors(node) + + if len(successors) > 1: + return None + + if len(successors) == 1: + successor = successors[0] + if len(node.instructions) == 0 and self._cfg.in_degree(node) == 1: + return self._find_morestack_node_loopless(successor, visited) + else: + return None + + # zero successors, check for no return + if is_noreturn_node(node): + return node + + return None + + def _verify_graph_structure_loop(self) -> Optional[Tuple[BasicBlock, BasicBlock]]: + """ + Verify the graph structure. This method returns morestack_node and start_node if graph structure matches go prologue, otherwise None. + + This method checks for the older CFGs with loops created by Binary Ninja <= 3.5 + """ + # In a Go function prologue one of the successors (start_node) marks the start of the function. + # The other successor (morestack_node) contains a call to runtime_morestack(_noctxt_abi0) + # and has the root as its only successor. + # root has no predecessor + # root is the only predecessor of morestack_node. EXCEPT IF A NON-RETURNING FUNCTION RIGHT BEFORE IT IS NOT DETECTED! + + # Function should have a root node + root = self._cfg.root + if root is None: + return None + + # root node should only have an incomming edge from morestack_node + if self._cfg.in_degree(root) != 1: + return None + + # root node needs exactly two successors + successors = self._cfg.get_successors(root) + if len(successors) != 2: + return None + + # The following code determines start_node and morestack_node + morestack_node = None + start_node = None + for successor in successors: + if self._is_root_single_indirect_successor(successor): + morestack_node = self._find_morestack_node_in_loop(successor) + else: + start_node = successor + + if (start_node is None) or (morestack_node is None): + return None + + # Dont check (self._cfg.in_degree(morestack_node) != 1), because of non-returning functions... + # however, check that those edges are unconditional + conditional_in_edges = [edge for edge in self._cfg.get_in_edges(morestack_node) if isinstance(edge, ConditionalEdge)] + if len(conditional_in_edges) > 1: # zero is ok, because the graph could be root -> goto_node -> morestack_node + return None + + return start_node, morestack_node + + def _match_r14(self, variable: Variable) -> bool: + """ + This method is used to check if `variable` corresponds to r14 which has a special meaning in Go prologues. + + It is used for the pattern matching of the root node. + """ + if self.r14_name is not None and variable.name == self.r14_name: + return True + + if variable.name.startswith("r14"): + return True + + return False + + def _check_root_node(self) -> bool: + """ + This method checks if the root node looks like expected for a Go prologue. + + It checks if the node has an if similar to "if((&(__return_addr)) u<= (*(r14 + 0x10)))", + or "if((&(__return_addr)) u<= (*(*(fsbase -8) + 0x10)))", + or any of the other patterns found below. + As the variable in lhs sometimes differs from __return_address we just check for the address operator. + """ + + root = self._cfg.root + if root is None: + return False + + root_node_if = root.instructions[-1] + if not isinstance(root_node_if, Branch): + return False + + # check if rhs of condition compares an address (e.g. of __return_addr) + left_expression = root_node_if.condition.left + match left_expression: + case UnaryOperation(operation=OperationType.address): + pass + case _: + return False + + # match stackguard0 within g struct + right_expression = root_node_if.condition.right + + patterns = [ + (self._match_r14, 0x10), # 1.17+ (darwin amd64, linux amd64, windows amd64) + ((("gsbase", 0), -4), 0x8), # linux 386 1.5 -1.18 + (("fsbase", -8), 0x10), # linux amd64 1.5 -1.16 + (("gsbase", 0x468), 0x8), # darwin 386 1.5 -1.10 + (("gsbase", 0x18), 0x8), # darwin 386 1.11 -1.14 + (("gsbase", 0x8A0), 0x10), # darwin amd64 1.5 -1.10 + (("gsbase", 0x30), 0x10), # darwin amd64 1.11 -1.16 + ((("fsbase", 0x14), 0), 0), # windows 386 1.2.2- 1.3 + ((("fsbase", 0x14), 0), 0x8), # windows 386 1.4 -1.18 + ((("gsbase", 0x28), 0), 0), # windows amd64 1.2.2- 1.3 + ((("gsbase", 0x28), 0), 0x10), # windows amd64 1.4 -1.16 + ] + for pattern in patterns: + if match_expression(root, right_expression, pattern): + return True + + return False + + def _verify_morestack_instructions(self, morestack_node: BasicBlock) -> bool: + """ + This helper method verifies if the morestack node is of the expected format: + + - an arbitrary number of assignments, where value is Phi or MemPhi + - n assignments (storing registers) + - a single call call + - n assignments (restoring registers) + """ + instructions = morestack_node.instructions + # Find end of Phi / MemPhi Assignments + phi_pos = 0 + for i, instruction in enumerate(instructions): + if not isinstance(instruction, Phi): # covers MemPhi as well + phi_pos = i + break + + # verify there is an odd number of instructions left + num_non_phi_instructions = len(instructions) - phi_pos + if num_non_phi_instructions % 2 == 0: + return False + num_assignments = (num_non_phi_instructions - 1) // 2 + + # verify call is in the middle + morestack_instruction = instructions[phi_pos + num_assignments] + if not isinstance(morestack_instruction, Assignment) or not isinstance(morestack_instruction.value, Call): + return False + + # save this to restore function name later + self._morestack_instruction = morestack_instruction + return True + + def _remove_go_prologue(self, start_node: BasicBlock, morestack_node: BasicBlock): + """ + This method removes the Go prologue. It is only called if a Go prologue was detected before. + """ + + # get root_node_if + root = self._cfg.root + assert root is not None + root_node_if = root.instructions[-1] + assert isinstance(root_node_if, Branch) + + # "remove" prologue + # Because of Phi functions and Variable Assignments, + # things go wrong if we delete the nodes (old code below). + # Instead we change the condition such that morestack_node is never executed. + # The prologue will be optimized away in later stages. + # But before we change the condition, we need to find out if it will be True or False. + + root_edges = self._cfg.get_out_edges(root) + for root_edge in root_edges: + if isinstance(root_edge, TrueCase): + new_condition = root_edge.sink == start_node + break + else: + # This should never happen + raise ValueError("If condition with broken out edges") + + root_node_if.substitute(root_node_if.condition, get_constant_condition(new_condition)) + + # Handle incoming edges to morestack_node from non-returning functions + # We can't simply delete edges without causing problems to Phi functions. + # Therefore, we replace the unconditional edge with a conditional one. + # The added condition at the end of the block makes sure the edge is never taken. + # A conditional edge to a newly created "return_node" is added as well. + # The return_node does nothing. + # After dead code elmination, this will just have the effect of deleting the edge. + return_node = self._cfg.create_block() + unconditional_in_edges = [edge for edge in self._cfg.get_in_edges(morestack_node) if isinstance(edge, UnconditionalEdge)] + for edge in unconditional_in_edges: + self._cfg.remove_edge(edge) + self._cfg.add_edge(FalseCase(edge.source, edge.sink)) + self._cfg.add_edge(TrueCase(edge.source, return_node)) + condition = get_constant_condition(True) + edge.source.add_instruction(Branch(condition)) + + if unconditional_in_edges: + self._dont_crash = True + + ## add comment + function = self._morestack_instruction.value.function + comment_string = f"Removed Go function prologue (calling function '{function}')." + comment = Comment(comment_string) + root.add_instruction_where_possible(comment) + + logging.info(comment_string) + + def _check_and_remove_go_prologue(self) -> bool: + """ + Detect and remove the typical go function prologue + + First we check if the CFG matches a pattern the expected structure of a Go Prologue. + If the match is successful, the graph result will contain the detected start node and morestack node + If the root node and the morestack node pass some additional checks, we asume that we found a Go prologue and it will removed. + """ + + if (graph_result := self._verify_graph_structure()) is None: + return False + + start_node, morestack_node = graph_result + + if not self._check_root_node(): + return False + + if not self._verify_morestack_instructions(morestack_node): + return False + + self._remove_go_prologue(start_node, morestack_node) + return True diff --git a/decompiler/pipeline/preprocessing/remove_noreturn_boilerplate.py b/decompiler/pipeline/preprocessing/remove_noreturn_boilerplate.py new file mode 100644 index 000000000..1d92eaaea --- /dev/null +++ b/decompiler/pipeline/preprocessing/remove_noreturn_boilerplate.py @@ -0,0 +1,102 @@ +"""Module for removing boilerplate code""" + +from typing import Iterator, List + +from decompiler.pipeline.preprocessing.util import get_constant_condition, is_noreturn_node +from decompiler.pipeline.stage import PipelineStage +from decompiler.structures.graphs.basicblock import BasicBlock +from decompiler.structures.graphs.branches import ConditionalEdge, FalseCase, TrueCase +from decompiler.structures.graphs.rootedgraph import RootedGraph +from decompiler.structures.pseudo.instructions import Branch, Comment +from decompiler.task import DecompilerTask +from networkx import MultiDiGraph, dominance_frontiers + + +class RemoveNoreturnBoilerplate(PipelineStage): + """ + RemoveNoreturnBoilerplate finds and removes boilerplate related to non-returning functions. + Caution: this stage changes code semantic + """ + + name = "remove-noreturn-boilerplate" + + def run(self, task: DecompilerTask): + if task.options.getboolean(f"{self.name}.remove_noreturn_boilerplate", fallback=False): + self._cfg = task.graph + self._aggressive_removal_postdominators_merged_sinks() + + def _get_noreturn_nodes(self) -> Iterator[BasicBlock]: + """ + Iterate leaf nodes of cfg, yield nodes containing a call to a non-returning funtion. + """ + leaf_nodes = [x for x in self._cfg.nodes if self._cfg.out_degree(x) == 0] + for node in leaf_nodes: + if is_noreturn_node(node): + yield node + + def _patch_condition_edges(self, edges: List[ConditionalEdge]) -> None: + """ + This method removes whatever was detected to be boilerplate. + + It works by changing the conditions leading to the boilerplate in a way, that it is never reached. + """ + for edge in edges: + match edge: + case TrueCase(): + condition = get_constant_condition(False) + case FalseCase(): + condition = get_constant_condition(True) + case _: + continue + instructions = edge.source.instructions + assert isinstance(instructions[-1], Branch) + instructions.pop() + instructions.append(Comment("Removed potential boilerplate code")) + instructions.append(Branch(condition)) + + def _aggressive_removal_postdominators_merged_sinks(self): + """ + Finds and removes boilerplate code, using the heuristic described below + + Nodes that will always transfer the control flow to a non-returning function are removed, + except when the node is always executed. + In other words, we cut the CFG between the post-dominance frontier and the nodes postdominated by non-returning nodes. + Differen non-returning nodes are considered to be 'the same', by adding a merged sink. + + Implementations detail: We calculate the postdominance frontier via the dominance frontier of the reversed CFG. + To add virtual nodes to the (read-only) reversed CFG, we create an editable shallow copy of it. + """ + if len(self._cfg) == 1: + return # do not remove the only node + # determine returning and non-returning leaf nodes + noreturn_nodes = list(self._get_noreturn_nodes()) + leaf_nodes = [x for x in self._cfg.nodes if self._cfg.out_degree(x) == 0] + returning_leaf_nodes = [node for node in leaf_nodes if node not in noreturn_nodes] + # create a virtual merged end node (for post dominance calculation) + # additionally we create another virtual node so that different non-returning nodes are 'merged' + min_used_address = min(block.address for block in self._cfg) + virtual_end_node = BasicBlock(min_used_address - 1) + virtual_merged_noreturn_node = BasicBlock(min_used_address - 2) + # reverse CFG and add virtual nodes to it + reversed_cfg_view: MultiDiGraph = self._cfg._graph.reverse(copy=False) + reversed_cfg_shallow_copy = MultiDiGraph(reversed_cfg_view) + reversed_cfg_shallow_copy.add_node(virtual_end_node) + reversed_cfg_shallow_copy.add_node(virtual_merged_noreturn_node) + # connect virtual nodes to the leafs + for noreturn_node in noreturn_nodes: + reversed_cfg_shallow_copy.add_edge(virtual_merged_noreturn_node, noreturn_node) + reversed_cfg_shallow_copy.add_edge(virtual_end_node, virtual_merged_noreturn_node) + for leaf_node in returning_leaf_nodes: + reversed_cfg_shallow_copy.add_edge(virtual_end_node, leaf_node) + # calculate postdominance-frontier + post_dominance_frontier = dominance_frontiers(reversed_cfg_shallow_copy, virtual_end_node) + # add a root to the graph, so it works with the networkx method + wrapped_reverse_cfg = RootedGraph(reversed_cfg_shallow_copy, virtual_end_node) + # find the edges to be removed + condition_edges = set() + for post_dominator in post_dominance_frontier[virtual_merged_noreturn_node]: + for edge_from_post_dominator in list(self._cfg.get_out_edges(post_dominator)): + if wrapped_reverse_cfg.is_dominating(virtual_merged_noreturn_node, edge_from_post_dominator.sink): + condition_edges.add(edge_from_post_dominator) + # remove the edges leading to boilerplate + self._patch_condition_edges(list(condition_edges)) diff --git a/decompiler/pipeline/preprocessing/remove_stack_canary.py b/decompiler/pipeline/preprocessing/remove_stack_canary.py index 3bd000fe0..700295f76 100644 --- a/decompiler/pipeline/preprocessing/remove_stack_canary.py +++ b/decompiler/pipeline/preprocessing/remove_stack_canary.py @@ -2,9 +2,12 @@ from typing import Iterator +from decompiler.pipeline.preprocessing.util import match_expression from decompiler.pipeline.stage import PipelineStage +from decompiler.structures.graphs.branches import BasicBlockEdgeCondition from decompiler.structures.graphs.cfg import BasicBlock, UnconditionalEdge from decompiler.structures.pseudo.instructions import Branch +from decompiler.structures.pseudo.operations import OperationType from decompiler.task import DecompilerTask @@ -39,7 +42,41 @@ def _is_stack_chk_fail(self, node: BasicBlock) -> bool: """ Check if node contains call to __stack_chk_fail """ - return any(self.STACK_FAIL_STR in str(inst) for inst in node.instructions) + return any(self.STACK_FAIL_STR in str(inst) for inst in node.instructions) or self._reached_by_failed_canary_check(node) + + def _reached_by_failed_canary_check(self, node: BasicBlock) -> bool: + """ + Determine if the given `node` is reached by a failed stack canary check. + + This function checks if any incoming edges to the `node` are conditional branches + that failed a stack canary check. It examines the predecessor nodes to see if the + branching condition corresponds to a failed comparison involving the canary value. + + Args: + node (BasicBlock): The basic block to check if it is reached by a failed canary check. + + Returns: + bool: Returns `True` if the node is reached by a failed canary check; otherwise, `False`. + + The function specifically looks for conditions that match the pattern *(fsbase+0x28), + indicating a check involving a stack canary. It then verifies if the condition's operation + and the type of the edge align with typical patterns of failed canary checks: + - `equal` operation with `false` edge condition, or + - `not_equal` operation with `true` edge condition. + """ + pattern = ("fsbase", 0x28) + for in_edge in self._cfg.get_in_edges(node): + predecessor = in_edge.source + if len(predecessor.instructions) and isinstance(predecessor.instructions[-1], Branch): + condition = predecessor.instructions[-1].condition + if not (condition.operation, in_edge.condition_type) in { + (OperationType.equal, BasicBlockEdgeCondition.false), + (OperationType.not_equal, BasicBlockEdgeCondition.true), + }: + continue + if match_expression(predecessor, condition.left, pattern) or match_expression(predecessor, condition.right, pattern): + return True + return False def _patch_canary(self, node: BasicBlock): """ diff --git a/decompiler/pipeline/preprocessing/util.py b/decompiler/pipeline/preprocessing/util.py index ea7060c3c..4ef4bd84d 100644 --- a/decompiler/pipeline/preprocessing/util.py +++ b/decompiler/pipeline/preprocessing/util.py @@ -1,11 +1,14 @@ """Helper functions for modules in the preprocessing pipeline.""" from collections import defaultdict -from typing import DefaultDict, Dict, Set, Tuple +from typing import Callable, DefaultDict, Dict, Iterator, List, Optional, Set, Tuple from decompiler.structures.graphs.cfg import BasicBlock, ControlFlowGraph from decompiler.structures.maps import DefMap, UseMap -from decompiler.structures.pseudo.expressions import Variable +from decompiler.structures.pseudo import Integer +from decompiler.structures.pseudo.expressions import Constant, Expression, Variable +from decompiler.structures.pseudo.instructions import Assignment, Instruction +from decompiler.structures.pseudo.operations import BinaryOperation, Call, Condition, OperationType, UnaryOperation def _init_maps(cfg: ControlFlowGraph) -> Tuple[DefMap, UseMap]: @@ -57,3 +60,121 @@ def _init_basicblocks_usages_variable(cfg: ControlFlowGraph) -> DefaultDict[Vari for variable in instruction.requirements: basicblocks_usages_variable[variable].add(node) return basicblocks_usages_variable + + +def _get_last_definition(node: BasicBlock, var: Variable, max_instr_num: int) -> Optional[Tuple[int, Expression]]: + """ + This helper method finds a variable's last definition within a Block. Only instructions up to `max_instr_num` are considered. + It returns the instructions position and the assigned value if a definition exists and none otherwise. + """ + for index in reversed(range(max_instr_num + 1)): + instruction = node.instructions[index] + if isinstance(instruction, Assignment) and instruction.destination == var: + return index, instruction.value + return None + + +def match_expression( + node: BasicBlock, expression: Expression, pattern: Tuple | Callable[[Variable], bool] | str, instr_num: int | None = None +) -> bool: + """ + This function checks whether the given `expression` matches the specified `pattern`. + + The function uses recursion to check whether the provided `expression` matches the given `pattern`. + It also considers the instructions defined earlier in the provided `node` (a `BasicBlock`) to resolve variable definitions. + + Args: + node (BasicBlock): The basic block containing instructions that define variables and their usage. + expression (Expression): The expression to be matched against the `pattern`. + pattern (tuple or Callable or str): The pattern used for matching. + Patterns are nested tuples representing the structure of expressions, constants, and operations. + The innermost (first) entry in a pattern is either: + - A string representing a variable name to be matched exactly. + - A function (Callable) that takes an `expression` and returns `True` if the expression matches some criteria, `False` otherwise. + The rest of the entries are constants representing offsets or operations to be dereferenced. + For example: + - (self._match_r14, 0x10) + - ((("gsbase", 0), -4), 0x8) + The latter pattern represents an expression equivalent to *(*(*(gsbase+0) - 4) + 8). + instr_num (int, optional): The instruction number to start searching backwards for variable definitions. + If not provided, it defaults to the last instruction in the `node`. + + Returns: + bool: Returns `True` if the `expression` matches the specified `pattern`, `False` otherwise. + + The function operates as follows: + - If the pattern is not a tuple, it checks if it's a callable or a string: + - If callable, it calls the pattern function with `expression`. + - If string, it checks if the `expression` is a `Variable` and its name matches the string. + - If the pattern is a tuple, it extracts the inner pattern and dereference offset and tries to match: + - If the expression is a `Variable` and there are earlier instructions, it retrieves the last definition of the variable and recursively checks. + - If the expression involves dereferencing with specific operations (plus or minus with constants), it adjusts and continues matching. + - It also handles simple dereferences when the offset is zero. + - The function returns `False` if no match is found according to the above rules. + """ + if not isinstance(pattern, Tuple): + if isinstance(pattern, Callable): + assert isinstance(expression, Variable), "The callable must get a Variable as Input." + return pattern(expression) + else: + # pattern is a sting in this case + return isinstance(expression, Variable) and expression.name == pattern + + if instr_num is None: + instr_num = len(node.instructions) - 1 + + inner_pattern, deref_offset = pattern + match expression: + case Variable() if instr_num > 0: + last_def = _get_last_definition(node, expression, instr_num - 1) + if last_def is not None: + definition_instruction_num, defined_value = last_def + # important: dont use inner_pattern here + return match_expression(node, defined_value, pattern, definition_instruction_num) + case UnaryOperation( + operation=OperationType.dereference, + operand=BinaryOperation(operation=OperationType.plus, left=inner_expression, right=Constant(value=deref_offset)), + ): + return match_expression(node, inner_expression, inner_pattern, instr_num) + case UnaryOperation( + operation=OperationType.dereference, + operand=BinaryOperation(operation=OperationType.minus, left=inner_expression, right=Constant(value=neg_deref_offset)), + ): + return match_expression(node, inner_expression, inner_pattern, instr_num) + case UnaryOperation(operation=OperationType.dereference, operand=inner_expression) if deref_offset == 0: + return match_expression(node, inner_expression, inner_pattern, instr_num) + + return False + + +def _get_called_functions(instructions: List[Instruction]) -> Iterator[Expression]: + """ + Helper method to iterate over all called functions in a list of instructions. + """ + for instruction in instructions: + if isinstance(instruction, Assignment) and isinstance(instruction.value, Call): + yield instruction.value.function + + +def is_noreturn_node(node: BasicBlock) -> bool: + """ + Helper method to check if `node` contains just one call to a non-returning function. + """ + called_functions = list(_get_called_functions(node.instructions)) + if len(called_functions) != 1: + return False + return called_functions[0].can_return == False + + +def get_constant_condition(value: bool) -> Condition: + """ + Helper method creating a Pseudo condition that always evaluates to `True` or `False`, depending on `value`. + """ + int_value = 1 if value else 0 + return Condition( + OperationType.equal, + [ + Constant(1, Integer.int32_t()), + Constant(int_value, Integer.int32_t()), + ], + ) diff --git a/decompiler/structures/pseudo/expressions.py b/decompiler/structures/pseudo/expressions.py index 96749ccd8..4db1389df 100644 --- a/decompiler/structures/pseudo/expressions.py +++ b/decompiler/structures/pseudo/expressions.py @@ -295,6 +295,10 @@ def copy(self) -> Symbol: class FunctionSymbol(Symbol): """Represents a function name""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.can_return = None + def __eq__(self, __value): return isinstance(__value, FunctionSymbol) and super().__eq__(__value) diff --git a/decompiler/structures/pseudo/operations.py b/decompiler/structures/pseudo/operations.py index d3feeea46..fd6c6c4ff 100644 --- a/decompiler/structures/pseudo/operations.py +++ b/decompiler/structures/pseudo/operations.py @@ -473,8 +473,6 @@ def is_write_access(self) -> bool: class BinaryOperation(Operation): """Class representing operations with two operands.""" - __match_args__ = ("operation", "left", "right") - def __eq__(self, __value): return isinstance(__value, BinaryOperation) and super().__eq__(__value) diff --git a/decompiler/task.py b/decompiler/task.py index 2cfe446e1..88f565db4 100644 --- a/decompiler/task.py +++ b/decompiler/task.py @@ -26,6 +26,7 @@ class DecompilerTask: ast: AbstractSyntaxTree | None = None function_return_type: Type = Integer.int32_t() function_parameters: List[Variable] = field(default_factory=list) + function_parameter_locations: List[str | None] = field(default_factory=list) complex_types: ComplexTypeMap = field(default_factory=ComplexTypeMap) _failure_origin: str | None = field(default=None, init=False) diff --git a/decompiler/util/default.json b/decompiler/util/default.json index f99a95817..5329c87de 100644 --- a/decompiler/util/default.json +++ b/decompiler/util/default.json @@ -109,6 +109,56 @@ "is_hidden_from_cli": false, "argument_name": "--remove-stack-canary" }, + { + "dest": "remove-go-prologue.remove_prologue", + "default": false, + "title": "Remove Go function prologues", + "type": "boolean", + "description": "remove go funcion prologues", + "is_hidden_from_gui": false, + "is_hidden_from_cli": false, + "argument_name": "--remove-go-prologue" + }, + { + "dest": "remove-noreturn-boilerplate.remove_noreturn_boilerplate", + "default": false, + "title": "Generic no-return boilerplate removal", + "type": "boolean", + "description": "remove boilerplate leading to non-returning functions", + "is_hidden_from_gui": false, + "is_hidden_from_cli": false, + "argument_name": "--remove-no-return-boilerplate" + }, + { + "dest": "rust-string-detection.enabled", + "default": false, + "title": "Detect Rust string slices", + "type": "boolean", + "description": "enable the detection of Rust string slices. Requires setting up Rust String Slicer", + "is_hidden_from_gui": false, + "is_hidden_from_cli": false, + "argument_name": "--detect-rust-string-slices" + }, + { + "dest": "rust-string-detection.rust_binaries_only", + "default": false, + "title": "Restrict string slice detection to Rust binaries", + "type": "boolean", + "description": "string slices will only be detected for Rust binaries", + "is_hidden_from_gui": false, + "is_hidden_from_cli": false, + "argument_name": "--string-slices-rust-only" + }, + { + "dest": "rust-string-detection.string_slicer_path", + "default": "", + "title": "Rust String Slicer Path", + "type": "string", + "description": "Path to the Rust String Slicer folder", + "is_hidden_from_gui": false, + "is_hidden_from_cli": false, + "argument_name": "--rust-string-slicer-path" + }, { "dest": "array-access-detection.enabled", "default": true,