Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: "jumpdest" Address Analysis Enhancement #13

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ IDA Pro 7.0 or newer is required to use IDA-EVM.
# Installation
* Copy `evm-loader.py` to `%IDA%/loaders`
* Copy `evm-cpu.py` and `known_hashes.py` to `%IDA%/procs`
* Restart IDA
* Restart IDA
168 changes: 139 additions & 29 deletions evm-cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
'''

class EVMAsm(object):
reverse_table = {}
'''
EVM Instruction factory

Expand Down Expand Up @@ -451,19 +452,19 @@ def is_arithmetic(self):
}

@staticmethod
#@memoized
def _get_reverse_table():
''' Build an internal table used in the assembler '''
reverse_table = {}
for (opcode, (name, immediate_operand_size, pops, pushes, gas, description)) in EVMAsm._table.items():
mnemonic = name
if name == 'PUSH':
mnemonic = '%s%d'%(name, (opcode&0x1f) + 1)
elif name in ('SWAP', 'LOG', 'DUP'):
mnemonic = '%s%d'%(name, (opcode&0xf) + 1)
if len(EVMAsm.reverse_table) == 0:

reverse_table[mnemonic] = opcode, name, immediate_operand_size, pops, pushes, gas, description
return reverse_table
for (opcode, (name, immediate_operand_size, pops, pushes, gas, description)) in EVMAsm._table.items():
mnemonic = name
if name == 'PUSH':
mnemonic = '%s%d'%(name, (opcode&0x1f) + 1)
elif name in ('SWAP', 'LOG', 'DUP'):
mnemonic = '%s%d'%(name, (opcode&0xf) + 1)

EVMAsm.reverse_table[mnemonic] = opcode, name, immediate_operand_size, pops, pushes, gas, description
return EVMAsm.reverse_table

@staticmethod
def assemble_one(assembler, offset=0):
Expand Down Expand Up @@ -732,6 +733,84 @@ class EVMProcessor(idaapi.processor_t):
"a_sizeof_fmt": "size %s",
}

# dup, swap
def __trace_stop(self, insn, ret_pos):
# determine output, if static return, else None
if insn.get_canon_mnem().startswith("PUSH"):
jump_addr = self.get_operand(insn[0])
# elif insn.get_canon_mnem().startswith("PUSH"):
# jump_addr = self.get_operand(insn[0])
else:
# print "__trace_stop else"
jump_addr = None
# print "__trace_stop end", insn.get_canon_mnem(), hex(self.get_operand(insn[0])), 'ret_pos =',ret_pos
return jump_addr, ret_pos

def add_jump(self, from_ea, to_ea, jp_type):
add_cref(from_ea, to_ea, jp_type)
if to_ea not in self.dst2src:
self.dst2src[to_ea] = []
# print 'add_jump', hex(from_ea), hex(to_ea)
if from_ea not in self.dst2src[to_ea]:
self.dst2src[to_ea].append(from_ea)

def add_jumps(self, from_ea, to_ea_list, ret_pos_list, jp_type_list):
if len(to_ea_list) > 1: # note: currently in this case, to_ea_list is not the jump dst addr, but the uppderstream branchs
cmtstr = "cant determine, have multiple upperstream branchs: " + ' '.join([hex(ea).strip('L') for ea in to_ea_list])
ida_bytes.set_cmt(from_ea, cmtstr, True)
elif len(to_ea_list) == 1:
dst_addr = to_ea_list[0]
self.add_jump(from_ea, dst_addr, jp_type_list[0])
cmtstr = "JUMP TO: " + hex(dst_addr).strip('L')
ida_bytes.set_cmt(from_ea, cmtstr, True)
else:
return

def get_all_preceding_insn_on_controlflow(self, insn):
if insn.get_canon_mnem() == 'JUMPDEST' and True: #TODO: and previous insn is not reachable
if insn.ea in self.dst2src:
prev_insn_branchs = [idautils.DecodeInstruction(ea) for ea in self.dst2src[insn.ea]]
else:
prev_insn_branchs = [idautils.DecodePreviousInstruction(insn.ea)]
else:
prev_insn_branchs = [idautils.DecodePreviousInstruction(insn.ea)]

return prev_insn_branchs

def trace_jumpdest(self, insn, current_stack_offset):
# if output == 0, keep trace, else stop

# prev_insn, fl = idautils.DecodePrecedingInstruction(insn.ea)
prev_insn_branchs = self.get_all_preceding_insn_on_controlflow(insn)
if len(prev_insn_branchs) > 1:
# print 'multiple prev_insn_branchs:', [hex(ins.ea) for ins in prev_insn_branchs]
# prev_insn = None
# note: here the return[0] is not the jump dst addr, but the upperstream branchs
return [_i.ea for _i in prev_insn_branchs], [0]*len(prev_insn_branchs)
elif len(prev_insn_branchs) == 1:
prev_insn = prev_insn_branchs[0]
# print 'in trace_jumpdest, cur insn:', hex(insn.ea), 'pre insn:', hex(prev_insn.ea) if prev_insn else None

_tbl = EVMAsm._get_reverse_table()
opname = prev_insn.get_canon_mnem()
info = _tbl[opname]
pops, pushes = info[3], info[4]

# print "trace_jumpdest", hex(prev_insn.ea), prev_insn.get_canon_mnem(), pops, pushes, current_stack_offset
update_stack_offset = current_stack_offset - pops + pushes
assert current_stack_offset <=0, "current_stack_offset > 0, impossible, should be addressed in previous trace_jumpdest call"
if pushes > -current_stack_offset:
jump_addr, ret_pos = self.__trace_stop(prev_insn, -current_stack_offset)
if jump_addr is not None:
return [jump_addr], [ret_pos]
else:
return None, None
else:
return self.trace_jumpdest(prev_insn, update_stack_offset)
else:
return None, None
# TODO: implement stack modeling to resolve actual top value of stack


def trace_sp(self, insn):
pass
Expand Down Expand Up @@ -773,27 +852,45 @@ def notify_emu(self, insn):
# add ref to next instruction for false branch
add_cref(insn.ea, insn.ea + insn.size, fl_JN)

# maybe we have a simple puch
prev_insn = idautils.DecodePreviousInstruction(insn.ea)
if prev_insn:
if prev_insn.get_canon_mnem().startswith("PUSH"):
jump_addr = self.get_operand(prev_insn[0])
add_cref(insn.ea, jump_addr, fl_JN)
# # maybe we have a simple puch
# prev_insn = idautils.DecodePreviousInstruction(insn.ea)
# if prev_insn:
# if prev_insn.get_canon_mnem().startswith("PUSH"):
# jump_addr = self.get_operand(prev_insn[0])
# add_cref(insn.ea, jump_addr, fl_JN)
jump_addr_list, ret_pos_list = self.trace_jumpdest(insn, 0)
if jump_addr_list is not None and len(jump_addr_list)>0:
#TODO: use ret_pos
self.add_jumps(insn.ea, jump_addr_list, ret_pos_list, [fl_JN]*len(jump_addr_list))
else:
pass

elif mnemonic == "JUMP":
prev_insn = idautils.DecodePreviousInstruction(insn.ea)
if prev_insn:
# TODO: implement stack modeling to resolve actual top value of stack
if prev_insn.get_canon_mnem().startswith("PUSH"):
jump_addr = self.get_operand(prev_insn[0])
jump_addr_list, ret_pos_list = self.trace_jumpdest(insn, 0)
if jump_addr_list is not None and len(jump_addr_list)>0:
#TODO: use ret_pos
self.add_jumps(insn.ea, jump_addr_list, ret_pos_list, [fl_JN]*len(jump_addr_list))
else:
pass

# prev_insn = idautils.DecodePreviousInstruction(insn.ea)
# if prev_insn:
# # TODO: implement stack modeling to resolve actual top value of stack
# if prev_insn.get_canon_mnem().startswith("PUSH"):
# jump_addr = self.get_operand(prev_insn[0])
#print "found jump to", hex(jump_addr)
add_cref(insn.ea, jump_addr, fl_JN)
# add_cref(insn.ea, jump_addr, fl_JN)
# print "testxhyu"

# TODO: adjust function boundary to include all code
#func = get_func(insn.ea)
#if func:
# #print "appending new tail"
# #append_func_tail(func, jump_addr, BADADDR)
# #reanalyze_function(func)
# func = get_func(insn.ea)
# if func:
# success = append_func_tail(func.start_ea, jump_addr, BADADDR)#BADADDR
# print "appending new tail", type(insn.ea), success
# print '---', get_func_name(insn.ea), insn.ea, func.start_ea, jump_addr, BADADDR
# print 'insn.ea', insn.ea, 'insn.ip', insn.ip
# reanalyze_function(func)


flows = (feature & CF_STOP) == 0
if flows:
Expand Down Expand Up @@ -975,8 +1072,21 @@ def __init__(self):
i += 1

self.instruc_end = len(self.instruc)

self.has_rebuild_cf = False
self.dst2src = {}

def notify_out_header(self, outctx):
idc.auto_wait()
self.rebuild_cf()

def rebuild_cf(self):
if self.has_rebuild_cf:
return
self.has_rebuild_cf = True
for func_ea in idautils.Functions():
print('rebuild_cf func_ea', func_ea,get_func(func_ea),idc.get_func_name(func_ea))
reanalyze_function(get_func(func_ea))


def PROCESSOR_ENTRY():
return EVMProcessor()
return EVMProcessor()