Skip to content

Commit

Permalink
Further refactor for greater capability
Browse files Browse the repository at this point in the history
  • Loading branch information
Arusekk committed Feb 8, 2021
1 parent d65f603 commit 1608ab2
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 87 deletions.
69 changes: 69 additions & 0 deletions libutf8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@

import struct
from enum import IntEnum


class U8(IntEnum):
ascii, cont, start2, start3, start4, invalid = range(6)


class u8char(int):
def __new__(cls, val):
if val is not None:
return int.__new__(cls, val)

@property
def type(self):
if self < 0x80:
return U8.ascii
if self < 0xc0:
return U8.cont
if self < 0xe0:
return U8.start2
if self < 0xf0:
return U8.start3
if self < 0xf8:
return U8.start4
return U8.invalid

@property
def ascii(self):
return self.type == U8.ascii

@property
def cont(self):
return self.type == U8.cont

@property
def start(self):
return self.type >= U8.start2

@property
def start2(self):
return self.type == U8.start2

@property
def start3(self):
return self.type >= U8.start3

@property
def start4(self):
return self.type == U8.start4


def invalidu32(num):
return invalid(struct.pack('<I', num))


def invalid(bytestring):
try:
bytestring.decode()
except UnicodeDecodeError:
return True
return False


def maybe_bigger(num):
while invalidu32(num):
num += 1
return num
161 changes: 74 additions & 87 deletions utfpyc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,50 @@
import os
import struct

from enum import IntEnum
from itertools import zip_longest
from importlib._bootstrap_external import MAGIC_NUMBER

from libutf8 import U8, u8char, invalid, invalidu32, maybe_bigger

class U8(IntEnum):
ascii, cont, start2, start3, start4, invalid = range(6)

def mk_extended_arg(arg, extended):
return dis.Instruction(
opname='EXTENDED_ARG', opcode=dis.EXTENDED_ARG, arg=arg, argval=arg,
argrepr=None, offset=extended.offset, starts_line=extended.starts_line,
is_jump_target=extended.is_jump_target)

class u8char(int):
def __new__(cls, val):
if val is not None:
return int.__new__(cls, val)

@property
def type(self):
if self < 0x80:
return U8.ascii
if self < 0xc0:
return U8.cont
if self < 0xe0:
return U8.start2
if self < 0xf0:
return U8.start3
if self < 0xf8:
return U8.start4
return U8.invalid

@property
def ascii(self):
return self.type == U8.ascii

@property
def cont(self):
return self.type == U8.cont

@property
def start(self):
return self.type >= U8.start2

@property
def start2(self):
return self.type == U8.start2

@property
def start3(self):
return self.type >= U8.start3

@property
def start4(self):
return self.type == U8.start4


def invalidu32(num):
bytestring = struct.pack('<I', num)
try:
bytestring.decode()
except UnicodeDecodeError:
return True
return False

def _unpack_opargs(code):
extended_arg = 0
for i in range(0, len(code), 2):
op = code[i]
if op >= dis.HAVE_ARGUMENT:
arg = code[i+1] | extended_arg
else:
arg = None
# in stock dis, this is done only in the HAVE_ARGUMENT branch
# and that is wrong, since it is different from ceval.c logic
extended_arg = (arg << 8) if op == dis.EXTENDED_ARG else 0
yield (i, op, arg)

def maybe_bigger(num):
while invalidu32(num):
num += 1
return num

dis._unpack_opargs = _unpack_opargs

ANY_ASCII = ord('S')
empty_instr = dis.Instruction(
opname=None, opcode=None, arg=None, argval=None, argrepr=None,
offset=None, starts_line=None, is_jump_target=None)


class CodeWrapper:
def __init__(self, code, **attrs):
self.__dict__.update(attrs)
self.code = code

def __getattr__(self, attr):
return getattr(self.code, attr)


class Transcoder:
def __init__(self, codeobj, force=False, verbose=False):
self.bcode = dis.Bytecode(codeobj)
Expand Down Expand Up @@ -125,6 +95,11 @@ def process(self, x, nextx):
(x.opcode, x.arg, nextx.opcode, nextx.arg)
)

# if we are recursive
if arg and arg >= 256 and not self.was_extended_arg:
self.process(mk_extended_arg(arg >> 8, x), x)
arg = u8char(arg % 256)

need_close = (self.state >= U8.start2 and not opcode.cont
or self.state >= U8.start3 and not arg.cont
or self.state == U8.start2 and arg.cont)
Expand Down Expand Up @@ -152,7 +127,7 @@ def process(self, x, nextx):
self.newcode[-1] = ANY_ASCII

self.places[x.offset] = startlen, len(self.newcode)
self.newcode.extend((x.opcode, x.arg))
self.newcode.extend((x.opcode, arg))

self.was_extended_arg = opcode == dis.EXTENDED_ARG

Expand All @@ -165,37 +140,42 @@ def process(self, x, nextx):
elif self.state >= U8.start2:
self.state -= 2

def fixjump(self, x):
_, pl = self.places[x.offset]
vmin, vmax = self.places[x.argval]
if x.opcode in dis.hasjrel:
vmin -= pl + 2
vmax -= pl + 2
if x.arg < vmin:
v = vmin
elif x.arg > vmax:
v = vmax
else:
return

oldrep = x.arg.to_bytes(4, 'little')
newrep = v.to_bytes(4, 'little')
while True:
self.newcode[pl + 1] = newrep[0]
oldrep = oldrep[1:]
newrep = newrep[1:]
if oldrep == newrep:
break
if not any(oldrep):
print('need new EXTENDED_ARG for '
f'{self.codeobj.co_name} in '
f'{self.codeobj.co_filename}'
f':{self.codeobj.co_firstlineno}')
assert not any(newrep[1:])
self.newcode[pl + 1] |= newrep[0] << 8
break
pl -= 2
assert self.newcode[pl] == dis.EXTENDED_ARG

def adjumps(self):
for x in self.bcode:
if x.opcode in dis.hasjrel or x.opcode in dis.hasjabs:
_, pl = self.places[x.offset]
vmin, vmax = self.places[x.argval]
if x.opcode in dis.hasjrel:
vmin -= pl + 2
vmax -= pl + 2
if x.arg < vmin:
v = vmin
elif x.arg > vmax:
v = vmax
else:
continue

oldrep = x.arg.to_bytes(4, 'little')
newrep = v.to_bytes(4, 'little')
while True:
self.newcode[pl + 1] = newrep[0]
oldrep = oldrep[1:]
newrep = newrep[1:]
if oldrep == newrep:
break
if not any(oldrep):
print('does not converge! try to tweak '
f'{self.codeobj.co_name} in '
f'{self.codeobj.co_filename}'
f':{self.codeobj.co_firstlineno}')
break
pl -= 2
assert self.newcode[pl] == dis.EXTENDED_ARG
self.fixjump(x)

def transcode(self):
for x, nextx in zip_longest(self.bcode, self.nextcode,
Expand All @@ -207,6 +187,13 @@ def transcode(self):

self.adjumps()

if any(x >= 256 for x in self.newcode):
print("Re-rolling...")
return Transcoder(
CodeWrapper(self.codeobj, co_code=self.newcode),
self.force,
self.verbose).transcode()

# adjust code length
while invalidu32(len(self.newcode)):
self.newcode.append(ANY_ASCII)
Expand Down

0 comments on commit 1608ab2

Please sign in to comment.