Skip to content

Commit

Permalink
Enhance debug capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
Arusekk committed Feb 9, 2021
1 parent 1608ab2 commit 63aa45b
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 27 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*.pyc
*.out
19 changes: 18 additions & 1 deletion libutf8.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,27 @@ def invalidu32(num):
return invalid(struct.pack('<I', num))


def invalid(bytestring):
def hexdump(bs):
print(''.join(hexdump_iter(bs)))


def hexdump_iter(bs):
line = [" "] * 16
for i, x in enumerate(bs):
x = u8char(x)
line[i & 15] = f'\33[1;3{x.type + 1}m{x:02x}\33[m'
if i % 16 == 15:
yield f'{i//16:07x}0 {" ".join(line)}\n'
line = [" "] * 16
yield f'{(i+1)//16:07x}0 {" ".join(line)}\n'


def invalid(bytestring, debug=False):
try:
bytestring.decode()
except UnicodeDecodeError:
if debug:
hexdump(bytestring)
return True
return False

Expand Down
21 changes: 21 additions & 0 deletions test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/sh

set -e

# test the script on itself
./utfpyc.py -f utfpyc.py utfpyc.pyc
# run the result again
python3 utfpyc.pyc -f utfpyc.py utfpyc2.pyc
cmp utfpyc.pyc utfpyc2.pyc

for script in test/*.py; do
# compare the original script and self-compiled script results on tests
./utfpyc.py "$script" "${script%.py}.pyc"
python3 utfpyc.pyc "$script" "${script%.py}.pyc.pyc"
cmp "${script%.py}.pyc" "${script%.py}.pyc.pyc"
# run the test
python3 "$script" >"$script.out"
python3 "${script%.py}.pyc" >"${script%.py}.pyc.out"
cmp "$script.out" "${script%.py}.pyc.out"
done

74 changes: 48 additions & 26 deletions utfpyc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from itertools import zip_longest
from importlib._bootstrap_external import MAGIC_NUMBER

from libutf8 import U8, u8char, invalid, invalidu32, maybe_bigger
from libutf8 import U8, u8char, invalid, invalidu32, maybe_bigger, hexdump


def mk_extended_arg(arg, extended):
Expand Down Expand Up @@ -63,46 +63,46 @@ def __init__(self, codeobj, force=False, verbose=False):
def maybe_insert_cont(self):
if self.was_extended_arg:
print('Warn: insert after ext arg would change semantics (1)')
if self.state == U8.start2:
elif self.state == U8.start2:
self.newcode.extend((dis.EXTENDED_ARG, 0))
self.state = U8.ascii
if self.state == U8.start3:
elif self.state == U8.start3:
self.newcode.extend((dis.EXTENDED_ARG, 0x80))
self.state = U8.cont
self.need_ignore = True
if self.state == U8.start4:
elif self.state == U8.start4:
self.newcode.extend((dis.EXTENDED_ARG, 0x80,
dis.EXTENDED_ARG, 0))
dis.EXTENDED_ARG, None))
self.state = U8.ascii
self.need_ignore = True

def maybe_insert_start(self, val):
def maybe_insert_start(self, val, instr):
if self.newcode[-1:] == [None]:
self.newcode[-1] = val
elif self.was_extended_arg:
print('Warn: insert after ext arg would change semantics (2)')
val = 0
val = ANY_ASCII
else:
self.newcode.extend((dis.opmap['NOP'], val))
self.need_ignore = False
self.state = u8char(val).type

def process(self, x, nextx):
startlen = len(self.newcode)

opcode, arg, nextopcode, nextarg = map(
u8char,
(x.opcode, x.arg, nextx.opcode, nextx.arg)
)

# if we are recursive
if arg and arg >= 256 and not self.was_extended_arg:
if arg and arg >= 256:
self.process(mk_extended_arg(arg >> 8, x), x)
arg = u8char(arg % 256)

need_close = (self.state >= U8.start2 and not opcode.cont
or self.state >= U8.start3 and not arg.cont
or self.state == U8.start2 and arg.cont)
or self.state == U8.start2 and arg.cont
or self.state == U8.start4 and not nextopcode.cont
and opcode == dis.EXTENDED_ARG)
self.need_ignore = False

if self.verbose > 2:
Expand All @@ -118,19 +118,24 @@ def process(self, x, nextx):
val = 0xe1 # escape arg as well
if nextopcode.cont and not nextarg.cont:
val = 0xf1 # escape next opcode as well
self.maybe_insert_start(val)
self.maybe_insert_start(val, x)

if self.need_ignore and x.opcode >= dis.HAVE_ARGUMENT:
if self.need_ignore and opcode >= dis.HAVE_ARGUMENT:
self.newcode.extend((dis.opmap['NOP'], None))

if self.newcode[-1:] == [None]:
self.newcode[-1] = ANY_ASCII

self.places[x.offset] = startlen, len(self.newcode)
self.newcode.extend((x.opcode, arg))
self.newcode.extend((opcode, arg))

self.was_extended_arg = opcode == dis.EXTENDED_ARG

if opcode.ascii and arg and arg.cont:
print('Warn: opcode is low and arg is '
f'0x80 <= {arg:#02x} < 0xc2')
if self.verbose > 1:
dis.disassemble(self.codeobj, x.offset)

if not arg:
self.state = U8.ascii
elif arg.start:
Expand Down Expand Up @@ -162,10 +167,11 @@ def fixjump(self, x):
if oldrep == newrep:
break
if not any(oldrep):
print('need new EXTENDED_ARG for '
f'{self.codeobj.co_name} in '
f'{self.codeobj.co_filename}'
f':{self.codeobj.co_firstlineno}')
if self.verbose:
print('need new EXTENDED_ARG for '
f'{self.codeobj.co_name} in '
f'{self.codeobj.co_filename}'
f':{self.codeobj.co_firstlineno}')
assert not any(newrep[1:])
self.newcode[pl + 1] |= newrep[0] << 8
break
Expand All @@ -177,22 +183,33 @@ def adjumps(self):
if x.opcode in dis.hasjrel or x.opcode in dis.hasjabs:
self.fixjump(x)

def transcode(self):
def transcode(self, can_recurse=False):
for x, nextx in zip_longest(self.bcode, self.nextcode,
fillvalue=empty_instr):
self.process(x, nextx)
minoff = len(self.newcode)
if x.opcode != dis.EXTENDED_ARG:
self.process(x, nextx)
maxoff = len(self.newcode)
if maxoff > minoff:
maxoff -= 2
self.places[x.offset] = minoff, maxoff

if self.newcode[-1:] == [None]:
self.newcode[-1] = ANY_ASCII

self.adjumps()

if any(x >= 256 for x in self.newcode):
print("Re-rolling...")
if (any(x >= 256 for x in self.newcode) or invalid(
bytes(self.newcode), self.verbose > 1)) and can_recurse:
if self.verbose:
print(f"Invalid code {self.codeobj.co_name} detected, "
f"retrying ({len(self.newcode)=})")
if self.verbose > 1:
hexdump(self.newcode)
return Transcoder(
CodeWrapper(self.codeobj, co_code=self.newcode),
self.force,
self.verbose).transcode()
self.verbose).transcode(can_recurse - 1)

# adjust code length
while invalidu32(len(self.newcode)):
Expand All @@ -208,7 +225,12 @@ def transcode(self):
if self.verbose > 1:
dis.dis(codeobj)
print(repr(newcode))
print(repr(newcode.decode()))
try:
print(repr(newcode.decode()))
except UnicodeDecodeError:
pass
if self.verbose > 1:
hexdump(newcode)

# make sure UTF-8 magic really worked
assert self.force or newcode.decode()
Expand Down Expand Up @@ -272,7 +294,7 @@ def dump_tuple(self, t):

def dump_code(self, co):
self.fp.write(b'c')
co = Transcoder(co, self.force, self.verbose).transcode()
co = Transcoder(co, self.force, self.verbose).transcode(4)
self.u32(co.co_argcount)
self.u32(co.co_posonlyargcount)
self.u32(co.co_kwonlyargcount)
Expand Down

0 comments on commit 63aa45b

Please sign in to comment.