from array import array |
from array import array |
from dis import * |
from dis import * |
from new import code, function |
|
from types import CodeType |
from types import CodeType |
from peak.util.symbols import Symbol |
from peak.util.symbols import Symbol |
from peak.util.decorators import decorate_assignment, decorate |
from peak.util.decorators import decorate_assignment, decorate |
__all__.extend([k for k in globals().keys() if k.startswith('CO_')]) |
__all__.extend([k for k in globals().keys() if k.startswith('CO_')]) |
|
|
|
|
|
|
|
to_code = lambda x: x.tostring() |
|
|
|
try: |
|
from new import code as NEW_CODE, function |
|
except ImportError: |
|
from types import FunctionType as function |
|
NEW_CODE = lambda ac, *args: CodeType(ac, 0, *args) |
|
long = ord = int |
|
unicode = basestring = str |
|
def to_code(x): |
|
global to_code |
|
if not hasattr(x, 'tobytes'): |
|
to_code = lambda x: x.tostring() |
|
else: |
|
to_code = lambda x: x.tostring() |
|
return to_code(x) |
|
|
|
CODE, GLOBALS, DEFAULTS, CLOSURE, FUNC = ( |
|
'__code__', '__globals__', '__defaults__', '__closure__', '__func__' |
|
) |
|
else: |
|
CODE, GLOBALS, DEFAULTS, CLOSURE, FUNC = ( |
|
'func_code', 'func_globals', 'func_defaults', 'func_closure', 'im_func' |
|
) |
|
ord = ord |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Const(object): |
class Const(object): |
"""Wrapper to ensure constants are hashable even if mutable""" |
"""Wrapper to ensure constants are hashable even if mutable""" |
|
|
class Node(tuple): |
class Node(tuple): |
"""Base class for AST nodes""" |
"""Base class for AST nodes""" |
__slots__ = [] |
__slots__ = [] |
|
__hash__ = tuple.__hash__ |
|
|
def nodetype(*mixins, **kw): |
def nodetype(*mixins, **kw): |
|
|
return decorate_assignment(callback) |
return decorate_assignment(callback) |
|
|
|
|
|
|
nodetype() |
nodetype() |
def Global(name, code=None): |
def Global(name, code=None): |
if code is None: |
if code is None: |
return body, tuple(handlers), else_ |
return body, tuple(handlers), else_ |
okay = Label() |
okay = Label() |
done = Label() |
done = Label() |
code( |
code(okay.SETUP_EXCEPT, body, okay.POP_BLOCK) |
okay.SETUP_EXCEPT, |
if 'POP_EXCEPT' in opcode: |
body, |
code.stack_size += 3 |
okay.POP_BLOCK |
|
) |
|
for typ, handler in handlers: |
for typ, handler in handlers: |
next_test = Label() |
next_test = Label() |
Compare(Code.DUP_TOP, [('exception match', typ)], code) |
Compare(Code.DUP_TOP, [('exception match', typ)], code) |
code( |
code( |
next_test.JUMP_IF_FALSE_OR_POP, # remove condition |
next_test.JUMP_IF_FALSE_OR_POP, # remove condition |
Code.POP_TOP, Code.POP_TOP, Code.POP_TOP, # remove exc info |
Code.POP_TOP, Code.POP_TOP, Code.POP_TOP, # remove exc info |
handler |
|
) |
) |
|
if 'POP_EXCEPT' in opcode: |
|
code(Code.POP_EXCEPT) |
|
code(handler) |
if code.stack_size is not None: |
if code.stack_size is not None: |
code(done.JUMP_FORWARD) |
code(done.JUMP_FORWARD) |
code(next_test, Code.POP_TOP) # remove condition |
code(next_test, Code.POP_TOP) # remove condition |
return f |
return f |
except (TypeError,AttributeError): |
except (TypeError,AttributeError): |
return function( |
return function( |
f.func_code, f.func_globals, name, f.func_defaults, f.func_closure |
getattr(f,CODE), getattr(f,GLOBALS), name, getattr(f,DEFAULTS), |
|
getattr(f,CLOSURE) |
) |
) |
|
|
|
|
|
|
EXTRA_JUMPS = 'JUMP_IF_FALSE_OR_POP JUMP_IF_TRUE_OR_POP JUMP_IF_FALSE JUMP_IF_TRUE'.split() |
EXTRA_JUMPS = 'JUMP_IF_FALSE_OR_POP JUMP_IF_TRUE_OR_POP JUMP_IF_FALSE JUMP_IF_TRUE'.split() |
|
|
class Label(object): |
class Label(object): |
return len(self.co_code) |
return len(self.co_code) |
|
|
|
|
|
if 'UNARY_CONVERT' not in opcode: |
|
def UNARY_CONVERT(self): |
|
self(Const(repr)) |
|
self.ROT_TWO() |
|
self.CALL_FUNCTION(1, 0) |
|
|
|
if 'BINARY_DIVIDE' not in opcode: |
|
def BINARY_DIVIDE(self): |
|
self.BINARY_TRUE_DIVIDE() |
|
|
|
if 'DUP_TOPX' not in opcode: |
|
def DUP_TOPX(self, count): |
|
self.stackchange((count,count*2)) |
|
if count==2: |
|
self.emit(DUP_TOP_TWO) |
|
else: |
|
raise RuntimeError("Python 3 only supports DUP_TOP_TWO") |
|
|
|
if 'SLICE_0' not in opcode: |
|
def SLICE_0(self): |
|
self(None, None, Code.SLICE_3) |
|
def SLICE_1(self): |
|
self(None, Code.SLICE_3) |
|
def SLICE_2(self): |
|
self(None, Code.ROT_TWO, Code.SLICE_3) |
|
def SLICE_3(self): |
|
self.BUILD_SLICE(2) |
|
self.BINARY_SUBSCR() |
|
|
|
|
def set_stack_size(self, size): |
def set_stack_size(self, size): |
if size<0: |
if size<0: |
raise AssertionError("Stack underflow") |
raise AssertionError("Stack underflow") |
self.stack_history.extend([self._ss]*bytes) |
self.stack_history.extend([self._ss]*bytes) |
self._ss = size |
self._ss = size |
|
|
|
|
def get_stack_size(self): |
def get_stack_size(self): |
return self._ss |
return self._ss |
|
|
stack_size = property(get_stack_size, set_stack_size) |
stack_size = property(get_stack_size, set_stack_size) |
|
|
def stackchange(self, (inputs,outputs)): |
def stackchange(self, inout): |
|
(inputs,outputs) = inout |
if self._ss is None: |
if self._ss is None: |
raise AssertionError("Unknown stack size at this location") |
raise AssertionError("Unknown stack size at this location") |
self.stack_size -= inputs # check underflow |
self.stack_size -= inputs # check underflow |
self.stack_size += outputs # update maximum height |
self.stack_size += outputs # update maximum height |
|
|
|
|
def stack_unknown(self): |
def stack_unknown(self): |
self._ss = None |
self._ss = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def branch_stack(self, location, expected): |
def branch_stack(self, location, expected): |
if location >= len(self.stack_history): |
if location >= len(self.stack_history): |
if location > len(self.co_code): |
if location > len(self.co_code): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def jump(self, op, arg=None): |
def jump(self, op, arg=None): |
def jump_target(offset): |
def jump_target(offset): |
target = offset |
target = offset |
def __call__(self, *args): |
def __call__(self, *args): |
last = None |
last = None |
for ob in args: |
for ob in args: |
if callable(ob): |
if hasattr(ob, '__call__'): |
last = ob(self) |
last = ob(self) |
else: |
else: |
try: |
try: |
|
|
decorate(classmethod) |
decorate(classmethod) |
def from_function(cls, function, copy_lineno=False): |
def from_function(cls, function, copy_lineno=False): |
code = cls.from_code(function.func_code, copy_lineno) |
code = cls.from_code(getattr(function, CODE), copy_lineno) |
return code |
return code |
|
|
|
|
|
|
def nested(self, name='<lambda>', args=(), var=None, kw=None, cls=None): |
def nested(self, name='<lambda>', args=(), var=None, kw=None, cls=None): |
if cls is None: |
if cls is None: |
cls = Code |
cls = self.__class__ |
code = cls.from_spec(name, args, var, kw) |
code = cls.from_spec(name, args, var, kw) |
code.co_filename=self.co_filename |
code.co_filename=self.co_filename |
return code |
return code |
oparg = code[i+1] + code[i+2]*256 + extended_arg |
oparg = code[i+1] + code[i+2]*256 + extended_arg |
extended_arg = 0 |
extended_arg = 0 |
if op == EXTENDED_ARG: |
if op == EXTENDED_ARG: |
extended_arg = oparg*65536 |
extended_arg = oparg*long(65536) |
i+=3 |
i+=3 |
continue |
continue |
yield i, op, oparg |
yield i, op, oparg |
elif parent is not None and self.co_freevars: |
elif parent is not None and self.co_freevars: |
parent.makecells(self.co_freevars) |
parent.makecells(self.co_freevars) |
|
|
return code( |
return NEW_CODE( |
self.co_argcount, len(self.co_varnames), |
self.co_argcount, len(self.co_varnames), |
self.co_stacksize, flags, self.co_code.tostring(), |
self.co_stacksize, flags, to_code(self.co_code), |
tuple(self.co_consts), tuple(self.co_names), |
tuple(self.co_consts), tuple(self.co_names), |
tuple(self.co_varnames), |
tuple(self.co_varnames), |
self.co_filename, self.co_name, self.co_firstlineno, |
self.co_filename, self.co_name, self.co_firstlineno, |
self.co_lnotab.tostring(), self.co_freevars, self.co_cellvars |
to_code(self.co_lnotab), self.co_freevars, self.co_cellvars |
) |
) |
|
|
|
|
extended_arg = 0 |
extended_arg = 0 |
ptr += 2 |
ptr += 2 |
if op == EXTENDED_ARG: |
if op == EXTENDED_ARG: |
extend = arg*65536L |
extend = arg*long(65536) |
continue |
continue |
if op in hasjrel or op in hasjabs: |
if op in hasjrel or op in hasjabs: |
jump = arg+ptr*(op in hasjrel) |
jump = arg+ptr*(op in hasjrel) |
|
|
def dump(code): |
def dump(code): |
"""Disassemble code in a symbolic manner, i.e., without offsets""" |
"""Disassemble code in a symbolic manner, i.e., without offsets""" |
if hasattr(code, 'im_func'): code = code.im_func |
code = getattr(code, FUNC, code) |
if hasattr(code, 'func_code'): code = code.func_code |
code = getattr(code, CODE, code) |
|
|
co_names = code.co_names |
co_names = code.co_names |
co_consts = [repr(x) for x in code.co_consts] |
co_consts = [repr(x) for x in code.co_consts] |
co_varnames = code.co_varnames |
co_varnames = code.co_varnames |
cmp_ops = cmp_op |
cmp_ops = cmp_op |
free = code.co_cellvars + code.co_freevars |
free = code.co_cellvars + code.co_freevars |
|
|
labels = {} |
labels = {} |
instructions = list(iter_code(code.co_code)) |
instructions = list(iter_code(code.co_code)) |
lbl = [jump for start, op, arg, jump, end in instructions if jump is not None] |
lbl = [jump for start, op, arg, jump, end in instructions if jump is not None] |
i = 0 |
i = 0 |
while i<len(instructions): |
while i<len(instructions): |
start, op, arg, jump, end = instructions[i] |
start, op, arg, jump, end = instructions[i] |
print ' ', labels.get(start, '').ljust(7), |
ln = ' '+labels.get(start, '').ljust(7) |
if op==DUP_TOP and instructions[i+1][1] in (POP_JUMP_IF_FALSE, POP_JUMP_IF_TRUE): |
if op==DUP_TOP and instructions[i+1][1] in (POP_JUMP_IF_FALSE, POP_JUMP_IF_TRUE): |
s, op, arg, jump, end = instructions[i+1] |
s, op, arg, jump, end = instructions[i+1] |
print ['JUMP_IF_FALSE', 'JUMP_IF_TRUE'][op==POP_JUMP_IF_TRUE].ljust(15), |
ln+=' '+['JUMP_IF_FALSE', 'JUMP_IF_TRUE'][op==POP_JUMP_IF_TRUE].ljust(15) |
i+=1 |
i+=1 |
elif op in (JUMP_IF_TRUE_OR_POP, JUMP_IF_FALSE_OR_POP): |
elif op in (JUMP_IF_TRUE_OR_POP, JUMP_IF_FALSE_OR_POP): |
print opname[op][:-7].ljust(15), |
ln += ' ' + opname[op][:-7].ljust(15) |
else: |
else: |
print opname[op].ljust(15), |
ln += ' ' + opname[op].ljust(15) |
if jump is not None: |
if jump is not None: |
print labels[jump][:-1].rjust(10), |
ln += ' ' + labels[jump][:-1].rjust(10) |
elif arg is not None: |
elif arg is not None: |
print repr(arg).rjust(10), |
ln += ' ' + repr(arg).rjust(10) |
if op in argtype: |
if op in argtype: |
print '(%s)' % (locals()[argtype[op]][arg]), |
ln += ' (%s)' % (locals()[argtype[op]][arg]) |
print |
print(ln) |
if op in (JUMP_IF_TRUE_OR_POP, JUMP_IF_FALSE_OR_POP): |
if op in (JUMP_IF_TRUE_OR_POP, JUMP_IF_FALSE_OR_POP): |
print ' ', ''.ljust(7), 'POP_TOP' |
print(' '+''.ljust(7) + ' POP_TOP') |
i+=1 |
i+=1 |
|
|
|
|
|
|
class _se: |
class _se: |
"""Quick way of defining static stack effects of opcodes""" |
"""Quick way of defining static stack effects of opcodes""" |
POP_TOP = END_FINALLY = POP_JUMP_IF_FALSE = POP_JUMP_IF_TRUE = 1,0 |
POP_TOP = END_FINALLY = POP_JUMP_IF_FALSE = POP_JUMP_IF_TRUE = 1,0 |
DELETE_SLICE_0, DELETE_SLICE_1, DELETE_SLICE_2, DELETE_SLICE_3 = \ |
DELETE_SLICE_0, DELETE_SLICE_1, DELETE_SLICE_2, DELETE_SLICE_3 = \ |
(1,0),(2,0),(2,0),(3,0) |
(1,0),(2,0),(2,0),(3,0) |
|
|
STORE_SUBSCR = 3,0 |
STORE_SUBSCR = POP_EXCEPT = 3,0 |
DELETE_SUBSCR = STORE_ATTR = 2,0 |
DELETE_SUBSCR = STORE_ATTR = 2,0 |
DELETE_ATTR = STORE_DEREF = 1,0 |
DELETE_ATTR = STORE_DEREF = 1,0 |
PRINT_EXPR = PRINT_ITEM = PRINT_NEWLINE_TO = IMPORT_STAR = 1,0 |
PRINT_EXPR = PRINT_ITEM = PRINT_NEWLINE_TO = IMPORT_STAR = 1,0 |