Cleanup wide-address jumps to allow code size>64K, as long as the jump target is known. Forward jumps are limited to jumping over a maximum span of 64K bytes. (This was always true, but bad code could be generated before, whereas now an assertion error is raised if the maximum span is exceeded.)
from array import array from dis import * from new import code from types import CodeType from peak.util.symbols import Symbol __all__ = [ 'Code', 'Const', 'Return', 'Global', 'Local', 'Call', 'const_value', 'NotAConstant', 'Label', 'fold_args', 'nodetype', 'Node', 'Pass', 'Compare', 'And', 'Or', 'Getattr', ] opcode = {} for op in range(256): name=opname[op] if name.startswith('<'): continue if name.endswith('+0'): opcode[name[:-2]]=op opcode[name]=op globals().update(opcode) # opcodes are now importable at will # Flags from code.h CO_OPTIMIZED = 0x0001 # use LOAD/STORE_FAST instead of _NAME CO_NEWLOCALS = 0x0002 # only cleared for module/exec code CO_VARARGS = 0x0004 CO_VARKEYWORDS = 0x0008 CO_NESTED = 0x0010 # ??? CO_GENERATOR = 0x0020 CO_NOFREE = 0x0040 # set if no free or cell vars CO_GENERATOR_ALLOWED = 0x1000 # unused CO_FUTURE_DIVISION = 0x2000 CO_FUTURE_ABSOLUTE_IMPORT = 0x4000 # Python 2.5+ only CO_FUTURE_WITH_STATEMENT = 0x8000 # Python 2.5+ only __all__.extend([k for k in globals().keys() if k.startswith('CO_')]) class Const(object): """Wrapper to ensure constants are hashable even if mutable""" __slots__ = 'value', 'hash', 'hashable' def __init__(self, value): self.value = value try: self.hash = hash(value) except TypeError: self.hash = hash(id(value)) self.hashable = False else: self.hashable = True def __repr__(self): return "Const(%s)" % repr(self.value) def __hash__(self): return self.hash def __eq__(self, other): if type(other) is not Const: return False if self.hashable: return self.value == other.value else: return self.value is other.value def __ne__(self, other): return not self==other def __call__(self, code): code.LOAD_CONST(self.value) class Node(tuple): """Base class for AST nodes""" __slots__ = [] def nodetype(*mixins, **kw): def callback(frame, name, func, old_locals): def __new__(cls, *args, **kw): result = func(*args, **kw) if type(result) is tuple: return tuple.__new__(cls, (cls,)+result) else: return result def __repr__(self): r = self.__class__.__name__ + tuple.__repr__(self[1:]) if len(self)==2: return r[:-2]+')' # nix trailing ',' return r def __call__(self, code): return func(*(self[1:]+(code,))) import inspect args = inspect.getargspec(func)[0] d = dict( __new__ = __new__, __repr__ = __repr__, __doc__=func.__doc__, __module__ = func.__module__, __args__ = args, __slots__ = [], __call__ = __call__ ) for p,a in enumerate(args[:-1]): # skip 'code' argument if isinstance(a,str): d[a] = property(lambda self, p=p+1: self[p]) d.update(kw) return type(name, mixins+(Node,), d) from peak.util.decorators import decorate_assignment return decorate_assignment(callback) nodetype() def Global(name, code=None): if code is None: return name, code.LOAD_GLOBAL(name) nodetype() def Local(name, code=None): if code is None: return name, if name in code.co_cellvars or name in code.co_freevars: return code.LOAD_DEREF(name) elif code.co_flags & CO_OPTIMIZED: return code.LOAD_FAST(name) else: return code.LOAD_NAME(name) nodetype() def Return(value=None, code=None): if code is None: return value, return code(value, Code.RETURN_VALUE) class _Pass(Symbol): def __call__(self, code=None): pass def __nonzero__(self): return False Pass = _Pass('Pass', __name__) nodetype() def Getattr(ob, name, code=None): try: name = const_value(name) except NotAConstant: return Call(Const(getattr), [ob, name]) if code is None: return fold_args(Getattr, ob, name) code(ob) code.LOAD_ATTR(name) nodetype() def Call(func, args=(),kwargs=(), star=None,dstar=None, fold=True, code=None): if code is None: data = ( func, tuple(args), tuple(kwargs), star or (), dstar or (), fold ) if fold and (args or kwargs or star or dstar): return fold_args(Call, *data) else: return data code(func, *args) for k,v in kwargs: code(k,v) argc = len(args) kwargc = len(kwargs) if star: if dstar: code(star, dstar) return code.CALL_FUNCTION_VAR_KW(argc, kwargc) else: code(star) return code.CALL_FUNCTION_VAR(argc, kwargc) else: if dstar: code(dstar) return code.CALL_FUNCTION_KW(argc, kwargc) else: return code.CALL_FUNCTION(argc, kwargc) nodetype() def Compare(expr, ops, code=None): if code is None: return fold_args(Compare, expr, tuple(ops)) if len(ops)==1: op, arg = ops[0] code(expr, arg) return code.COMPARE_OP(op) fail = Label() finish = Label() code(expr) for op, arg in ops[:-1]: code(arg) code.DUP_TOP() code.ROT_THREE() code.COMPARE_OP(op) fail.JUMP_IF_FALSE(code) code.POP_TOP() op, arg = ops[-1] code(arg) code.COMPARE_OP(op) finish.JUMP_FORWARD(code) fail(code) code.ROT_TWO() code.POP_TOP() return finish(code) nodetype() def And(values, code=None): if code is None: return fold_args(And, tuple(values)) end = Label() for value in values[:-1]: try: if const_value(value): continue # true constants can be skipped except NotAConstant: # but non-constants require code code(value, end.JUMP_IF_FALSE, Code.POP_TOP) else: # and false constants end the chain right away return code(value, end) code(values[-1], end) nodetype() def Or(values, code=None): if code is None: return fold_args(Or, tuple(values)) end = Label() for value in values[:-1]: try: if not const_value(value): continue # false constants can be skipped except NotAConstant: # but non-constants require code code(value, end.JUMP_IF_TRUE, Code.POP_TOP) else: # and true constants end the chain right away return code(value, end) code(values[-1], end) class Label(object): """A forward-referenceable location in a ``Code`` object""" __slots__ = 'backpatches', 'resolution' def __init__(self): self.backpatches = [] self.resolution = None def SETUP_EXCEPT(self, code): code.SETUP_EXCEPT(); self.backpatches.append(code.blocks[-1][-1]) def SETUP_FINALLY(self, code): code.SETUP_FINALLY(); self.backpatches.append(code.blocks[-1][-1]) def SETUP_LOOP(self, code): code.SETUP_LOOP(); self.backpatches.append(code.blocks[-1][-1]) def POP_BLOCK(self, code): self.backpatches[0] = code.POP_BLOCK() for op in hasjrel+hasjabs: if opname[op] not in locals(): def do_jump(self, code, op=op): method = getattr(code, opname[op]) if self.resolution is None: return self.backpatches.append(method()) else: return method(self.resolution) locals()[opname[op]] = do_jump del do_jump def __call__(self, code): if self.resolution is not None: raise AssertionError("Label previously defined") self.resolution = resolution = len(code.co_code) for p in self.backpatches: if p: p() class Code(object): co_argcount = 0 co_stacksize = 0 co_flags = CO_OPTIMIZED | CO_NEWLOCALS # typical usage co_filename = '<generated code>' co_name = '<lambda>' co_firstlineno = 0 co_freevars = () co_cellvars = () _last_lineofs = 0 _ss = 0 def __init__(self): self.co_code = array('B') self.co_consts = [None] self.co_names = [] self.co_varnames = [] self.co_lnotab = array('B') self.emit = self.co_code.append self.blocks = [] self.stack_history = [] def emit_arg(self, op, arg): emit = self.emit if arg>0xFFFF: emit(EXTENDED_ARG) emit((arg>>16)&255) emit((arg>>24)&255) emit(op) emit(arg&255) emit((arg>>8)&255) def set_lineno(self, lno): if not self.co_firstlineno: self.co_firstlineno = self._last_line = lno return append = self.co_lnotab.append incr_line = lno - self._last_line incr_addr = len(self.co_code) - self._last_lineofs if not incr_line: return assert incr_addr>=0 and incr_line>=0 while incr_addr>255: append(255) append(0) incr_addr -= 255 while incr_line>255: append(incr_addr) append(255) incr_line -= 255 incr_addr = 0 if incr_addr or incr_line: append(incr_addr) append(incr_line) self._last_line = lno self._last_lineofs = len(self.co_code) def LOAD_CONST(self, const): self.stackchange((0,1)) pos = 0 hashable = True try: hash(const) except TypeError: hashable = False while 1: try: arg = self.co_consts.index(const, pos) it = self.co_consts[arg] except ValueError: arg = len(self.co_consts) self.co_consts.append(const) break else: if type(it) is type(const) and (hashable or it is const): break pos = arg+1 continue return self.emit_arg(LOAD_CONST, arg) def CALL_FUNCTION(self, argc=0, kwargc=0, op=CALL_FUNCTION, extra=0): self.stackchange((1+argc+2*kwargc+extra,1)) emit = self.emit emit(op); emit(argc); emit(kwargc) def CALL_FUNCTION_VAR(self, argc=0, kwargc=0): self.CALL_FUNCTION(argc,kwargc,CALL_FUNCTION_VAR, 1) # 1 for *args def CALL_FUNCTION_KW(self, argc=0, kwargc=0): self.CALL_FUNCTION(argc,kwargc,CALL_FUNCTION_KW, 1) # 1 for **kw def CALL_FUNCTION_VAR_KW(self, argc=0, kwargc=0): self.CALL_FUNCTION(argc,kwargc,CALL_FUNCTION_VAR_KW, 2) # 2 *args,**kw def BUILD_TUPLE(self, count): self.stackchange((count,1)) self.emit_arg(BUILD_TUPLE,count) def BUILD_LIST(self, count): self.stackchange((count,1)) self.emit_arg(BUILD_LIST,count) def UNPACK_SEQUENCE(self, count): self.stackchange((1,count)) self.emit_arg(UNPACK_SEQUENCE,count) def RETURN_VALUE(self): self.stackchange((1,0)) self.emit(RETURN_VALUE) self.stack_unknown() def BUILD_SLICE(self, count): assert count in (2,3), "Invalid number of arguments for BUILD_SLICE" self.stackchange((count,1)) self.emit_arg(BUILD_SLICE,count) def DUP_TOPX(self, count): self.stackchange((count,count*2)) self.emit_arg(DUP_TOPX,count) def RAISE_VARARGS(self, argc): assert 0<=argc<=3, "Invalid number of arguments for RAISE_VARARGS" self.stackchange((argc,0)) self.emit_arg(RAISE_VARARGS,argc) def MAKE_FUNCTION(self, ndefaults): self.stackchange((1+ndefaults,1)) self.emit_arg(MAKE_FUNCTION, ndefaults) def MAKE_CLOSURE(self, ndefaults, freevars): self.stackchange((1+freevars+ndefaults,1)) self.emit_arg(MAKE_CLOSURE, ndefaults) def here(self): return len(self.co_code) def set_stack_size(self, size): if size<0: raise AssertionError("Stack underflow") if size>self.co_stacksize: self.co_stacksize = size bytes = len(self.co_code) - len(self.stack_history) + 1 if bytes>0: self.stack_history.extend([self._ss]*bytes) self._ss = size def get_stack_size(self): return self._ss stack_size = property(get_stack_size, set_stack_size) def stackchange(self, (inputs,outputs)): if self._ss is None: raise AssertionError("Unknown stack size at this location") self.stack_size -= inputs # check underflow self.stack_size += outputs # update maximum height def stack_unknown(self): self._ss = None def branch_stack(self, location, expected): if location >= len(self.stack_history): if location > len(self.co_code): raise AssertionError("Forward-looking stack prediction!", location, len(self.co_code) ) actual = self.stack_size if actual is None: self.stack_size = actual = expected self.stack_history[location] = actual else: actual = self.stack_history[location] if actual is None: self.stack_history[location] = actual = expected if actual != expected: raise AssertionError( "Stack level mismatch: actual=%s expected=%s" % (actual, expected) ) def jump(self, op, arg=None): def jump_target(offset): target = offset if op not in hasjabs: target = target - (posn+3) assert target>=0, "Relative jumps can't go backwards" if target>0xFFFF: target = offset - (posn+6) return target def backpatch(offset): target = jump_target(offset) if target>0xFFFF: raise AssertionError("Forward jump span must be <64K bytes") self.co_code[posn+1] = target & 255 self.co_code[posn+2] = (target>>8) & 255 self.branch_stack(offset, old_level) old_level = self.stack_size posn = self.here() if arg is not None: self.emit_arg(op, jump_target(arg)) self.branch_stack(arg, old_level) lbl = None else: self.emit_arg(op, 0) def lbl(code=None): backpatch(self.here()) if op in (JUMP_FORWARD, JUMP_ABSOLUTE, CONTINUE_LOOP): self.stack_unknown() return lbl def COMPARE_OP(self, op): self.stackchange((2,1)) self.emit_arg(COMPARE_OP, compares[op]) def setup_block(self, op): jmp = self.jump(op) self.blocks.append((op,self.stack_size,jmp)) return jmp def SETUP_EXCEPT(self): ss = self.stack_size self.stack_size = ss+3 # simulate the level at "except:" time self.setup_block(SETUP_EXCEPT) self.stack_size = ss # restore the current level def SETUP_FINALLY(self): ss = self.stack_size self.stack_size = ss+3 # allow for exceptions self.stack_size = ss+1 # simulate the level after the None is pushed self.setup_block(SETUP_FINALLY) self.stack_size = ss # restore original level def SETUP_LOOP(self): self.setup_block(SETUP_LOOP) def POP_BLOCK(self): if not self.blocks: raise AssertionError("Not currently in a block") why, level, fwd = self.blocks.pop() self.emit(POP_BLOCK) if why!=SETUP_LOOP: if why==SETUP_FINALLY: self.LOAD_CONST(None) fwd() else: else_ = self.JUMP_FORWARD() fwd() return else_ else: return fwd def assert_loop(self): for why,level,fwd in self.blocks: if why==SETUP_LOOP: return raise AssertionError("Not inside a loop") def BREAK_LOOP(self): self.assert_loop(); self.emit(BREAK_LOOP) self.stack_unknown() def CONTINUE_LOOP(self, label): self.assert_loop() if self.blocks[-1][0]==SETUP_LOOP: op = JUMP_ABSOLUTE # more efficient if not in a nested block else: op = CONTINUE_LOOP return self.jump(op, label) def __call__(self, *args): last = None for ob in args: if callable(ob): last = ob(self) else: try: f = generate_types[type(ob)] except KeyError: raise TypeError("Can't generate", ob) else: last = f(self, ob) return last def return_(self, ob=None): return self(ob, Code.RETURN_VALUE) def from_function(cls, function, copy_lineno=False): code = cls.from_code(function.func_code, copy_lineno) return code from_function = classmethod(from_function) def from_code(cls, code, copy_lineno=False): self = cls() if copy_lineno: self.set_lineno(code.co_firstlineno) self.co_filename = code.co_filename import inspect args, var, kw = inspect.getargs(code) self.co_varnames.extend(args) if var: self.co_varnames.append(var) self.co_flags |= CO_VARARGS if kw: self.co_varnames.append(kw) self.co_flags |= CO_VARKEYWORDS def tuple_arg(args): self.UNPACK_SEQUENCE(len(args)) for arg in args: if isinstance(arg, list): tuple_arg(arg) else: self.STORE_FAST(arg) for narg, arg in enumerate(args): if isinstance(arg,list): dummy_name = '.'+str(narg) self.co_varnames[narg] = dummy_name self.LOAD_FAST(dummy_name) tuple_arg(arg) self.co_argcount = code.co_argcount self.co_name = code.co_name self.co_freevars = code.co_freevars return self from_code = classmethod(from_code) def code(self): if self.blocks: raise AssertionError("%d unclosed block(s)" % len(self.blocks)) flags = self.co_flags & ~CO_NOFREE if not self.co_freevars and not self.co_cellvars: flags |= CO_NOFREE return code( self.co_argcount, len(self.co_varnames), self.co_stacksize, flags, self.co_code.tostring(), tuple(self.co_consts), tuple(self.co_names), tuple(self.co_varnames), self.co_filename, self.co_name, self.co_firstlineno, self.co_lnotab.tostring(), self.co_freevars, self.co_cellvars ) for op in hasfree: if not hasattr(Code, opname[op]): def do_free(self, varname, op=op): self.stackchange(stack_effects[op]) try: arg = list(self.co_cellvars+self.co_freevars).index(varname) except ValueError: raise NameError("Undefined free or cell var", varname) self.emit_arg(op, arg) setattr(Code, opname[op], do_free) compares = {} for value, name in enumerate(cmp_op): compares[value] = value compares[name] = value compares['<>'] = compares['!='] for op in hasname: if not hasattr(Code, opname[op]): def do_name(self, name, op=op): self.stackchange(stack_effects[op]) try: arg = self.co_names.index(name) except ValueError: arg = len(self.co_names) self.co_names.append(name) self.emit_arg(op, arg) if op in (LOAD_NAME, STORE_NAME, DELETE_NAME): # Can't use optimized local vars, so reset flags self.co_flags &= ~CO_OPTIMIZED setattr(Code, opname[op], do_name) for op in haslocal: if not hasattr(Code, opname[op]): def do_local(self, varname, op=op): if not self.co_flags & CO_OPTIMIZED: raise AssertionError( "co_flags must include CO_OPTIMIZED to use fast locals" ) self.stackchange(stack_effects[op]) try: arg = self.co_varnames.index(varname) except ValueError: arg = len(self.co_varnames) self.co_varnames.append(varname) self.emit_arg(op, arg) setattr(Code, opname[op], do_local) for op in hasjrel+hasjabs: if not hasattr(Code, opname[op]): def do_jump(self, address=None, op=op): self.stackchange(stack_effects[op]) return self.jump(op, address) setattr(Code, opname[op], do_jump) def gen_map(code, ob): code.BUILD_MAP(0) for k,v in ob.items(): code.DUP_TOP() code(k, v) code.ROT_THREE() code.STORE_SUBSCR() def gen_tuple(code, ob): code(*ob) return code.BUILD_TUPLE(len(ob)) def gen_list(code, ob): code(*ob) return code.BUILD_LIST(len(ob)) generate_types = { int: Code.LOAD_CONST, long: Code.LOAD_CONST, bool: Code.LOAD_CONST, CodeType: Code.LOAD_CONST, str: Code.LOAD_CONST, unicode: Code.LOAD_CONST, complex: Code.LOAD_CONST, float: Code.LOAD_CONST, type(None): Code.LOAD_CONST, tuple: gen_tuple, list: gen_list, dict: gen_map, } class NotAConstant(Exception): """The supplied value is not a constant expression tree""" def const_value(value): """Return the constant value -- if any -- of an expression tree Raises NotAConstant if the value or any child of the value are not constants. """ t = type(value) if t is Const: value = value.value elif t is tuple: t = tuple(map(const_value,value)) if t==value: return value return t elif generate_types.get(t) != Code.LOAD_CONST: raise NotAConstant(value) return value def fold_args(f, *args): """Return a folded ``Const`` or an argument tuple""" try: for arg in args: if arg is not Pass and arg is not None: const_value(arg) except NotAConstant: return args else: c = Code() f(*args+(c,)) c.RETURN_VALUE() return Const(eval(c.code())) class _se: """Quick way of defining static stack effects of opcodes""" POP_TOP = END_FINALLY = 1,0 ROT_TWO = 2,2 ROT_THREE = 3,3 ROT_FOUR = 4,4 DUP_TOP = 1,2 UNARY_POSITIVE = UNARY_NEGATIVE = UNARY_NOT = UNARY_CONVERT = \ UNARY_INVERT = GET_ITER = LOAD_ATTR = IMPORT_FROM = 1,1 BINARY_POWER = BINARY_MULTIPLY = BINARY_DIVIDE = BINARY_FLOOR_DIVIDE = \ BINARY_TRUE_DIVIDE = BINARY_MODULO = BINARY_ADD = BINARY_SUBTRACT = \ BINARY_SUBSCR = BINARY_LSHIFT = BINARY_RSHIFT = BINARY_AND = \ BINARY_XOR = BINARY_OR = COMPARE_OP = 2,1 INPLACE_POWER = INPLACE_MULTIPLY = INPLACE_DIVIDE = \ INPLACE_FLOOR_DIVIDE = INPLACE_TRUE_DIVIDE = INPLACE_MODULO = \ INPLACE_ADD = INPLACE_SUBTRACT = INPLACE_LSHIFT = INPLACE_RSHIFT = \ INPLACE_AND = INPLACE_XOR = INPLACE_OR = 2,1 SLICE_0, SLICE_1, SLICE_2, SLICE_3 = \ (1,1),(2,1),(2,1),(3,1) STORE_SLICE_0, STORE_SLICE_1, STORE_SLICE_2, STORE_SLICE_3 = \ (2,0),(3,0),(3,0),(4,0) DELETE_SLICE_0, DELETE_SLICE_1, DELETE_SLICE_2, DELETE_SLICE_3 = \ (1,0),(2,0),(2,0),(3,0) STORE_SUBSCR = 3,0 DELETE_SUBSCR = STORE_ATTR = 2,0 DELETE_ATTR = STORE_DEREF = 1,0 PRINT_EXPR = PRINT_ITEM = PRINT_NEWLINE_TO = IMPORT_STAR = 1,0 RETURN_VALUE = YIELD_VALUE = STORE_NAME = STORE_GLOBAL = STORE_FAST = 1,0 PRINT_ITEM_TO = LIST_APPEND = 2,0 LOAD_LOCALS = LOAD_CONST = LOAD_NAME = LOAD_GLOBAL = LOAD_FAST = \ LOAD_CLOSURE = LOAD_DEREF = IMPORT_NAME = BUILD_MAP = 0,1 EXEC_STMT = BUILD_CLASS = 3,0 JUMP_IF_TRUE = JUMP_IF_FALSE = 1,1 stack_effects = [(0,0)]*256 for name in opcode: op = opcode[name] name = name.replace('+','_') if hasattr(_se,name): # update stack effects table from the _se class stack_effects[op] = getattr(_se,name) if not hasattr(Code,name): # Create default method for Code class if op>=HAVE_ARGUMENT: def do_op(self,arg,op=op,se=stack_effects[op]): self.stackchange(se); self.emit_arg(op,arg) else: def do_op(self,op=op,se=stack_effects[op]): self.stackchange(se); self.emit(op) setattr(Code, name, do_op)
cvs-admin@eby-sarna.com Powered by ViewCVS 1.0-dev |
ViewCVS and CVS Help |