from wsgiref.util import shift_path_info, application_uri |
from wsgiref.util import shift_path_info, application_uri |
|
|
__all__ = [ |
__all__ = [ |
"Page", "form_handler", "HTML", "Text", "Template", "HTTP", |
"Page", "form_handler", "HTML", "Text", "Template", "HTTP", "test", |
"EvalTemplate", "EvalMap", "Method", |
"EvalTemplate", "EvalMap", "Method", |
] |
] |
|
|
def __delitem__(self, key): # needed for listcomp exprs! |
def __delitem__(self, key): # needed for listcomp exprs! |
del self.extra[key] |
del self.extra[key] |
|
|
|
def __contains__(self, key): |
|
try: self[key] |
|
except KeyError: return False |
|
return True |
|
|
def __getitem__(self, key): |
def __getitem__(self, key): |
if key.startswith('(?'): |
if key.startswith('(?'): |
return eval( |
return eval(key[2:].rstrip(')').rstrip('?').strip(), |
key[2:].rstrip(')').rstrip('?').strip(), |
sys.modules[self.module].__dict__, self) |
sys.modules[self.module].__dict__, self |
|
) |
|
elif key in self.extra: |
elif key in self.extra: |
return self.extra[key] |
return self.extra[key] |
else: |
else: |
class EvalTemplate(string.Template): |
class EvalTemplate(string.Template): |
idpattern = r'[_a-z][_a-z0-9]*|\(\?[^?]*\?\)' |
idpattern = r'[_a-z][_a-z0-9]*|\(\?[^?]*\?\)' |
|
|
|
|
|
|
|
|
class Text(Method): |
class Text(Method): |
"""Text template w/string substitution that can be used as a method |
"""Text template w/string substitution that can be used as a method |
|
|
setattr(self,k,v) |
setattr(self,k,v) |
self.setup() # perform any dynamic initialization |
self.setup() # perform any dynamic initialization |
|
|
def HEAD(self): |
|
environ['REQUEST_METHOD'] = 'GET' |
|
resp = iter(self.invoke_method()) # forward to 'GET' |
|
list(resp) # questionable hack to exhaust the response |
|
return resp # ensure that .close() gets called, if any |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def HEAD(self): |
|
def write(txt): |
|
sr.length += len(txt) |
|
|
|
def sr(status, headers, exc_info=None): |
|
if exc_info: |
|
if sr.length: |
|
try: |
|
raise exc_info[0], exc_info[1], exc_info[2] |
|
finally: |
|
exc_info = None |
|
elif sr.status is not None: |
|
raise AssertionError("Headers already set!") |
|
|
|
sr.status,sr.headers,sr.exc_info = status,Headers(headers),exc_info |
|
return write |
|
|
|
from wsgiref.headers import Headers |
|
sr.length = 0 |
|
sr.status, sr.headers, sr.exc_info = None, (), None |
|
old_sr = self.start_response |
|
self.start_response = sr |
|
self.environ['REQUEST_METHOD'] = 'GET' |
|
|
|
try: |
|
resp = self.invoke_method() # forward to 'GET' |
|
if 'Content-Length' not in sr.headers: |
|
for chunk in resp: |
|
sr.length += len(chunk) |
|
if 'Content-Length' in sr.headers: |
|
break |
|
if hasattr(resp,'close'): |
|
resp.close() |
|
if 'Content-Length' not in sr.headers: |
|
sr.headers['Content-Length'] = str(sr.length) |
|
|
|
old_sr(sr.status, sr.headers.items(), sr.exc_info) |
|
return [''] |
|
finally: |
|
sr.args = exc_info = None # clean up exc_info, if still present |
|
|
def go(self): |
def go(self): |
self.URL = application_uri(self.environ) |
self.URL = application_uri(self.environ) |
name = shift_path_info(self.environ) |
name = shift_path_info(self.environ) |
form_parsed = False |
form_parsed = False |
form_data = () |
form_data = () |
form_defaults = {} |
form_defaults = {} |
escape = cgi.escape |
escape = staticmethod(cgi.escape) |
|
|
def get_handlers(self): |
def get_handlers(self): |
handlers = [getattr(self,k) for k in self.form_handlers] |
handlers = [getattr(self,k) for k in self.form_handlers] |
|
|
|
|
|
|
|
def test(app, environ={}, form={}, **kw): |
|
"""Print the output of a WSGI app |
|
|
|
Runs `app` as a WSGI application and prints its output. If an untrapped |
|
error occurs in `app`, it drops into the ``pdb`` debugger's post-mortem |
|
debug shell (using ``sys.__stdout__`` if ``sys.stdout`` has been replaced). |
|
|
|
Any keyword arguments are added to the environment used to run `app`. If |
|
a keyword argument begins with ``wsgi_``, the ``_`` is replaced with a |
|
``.``, so that you can set e.g. ``wsgi.multithread`` using a |
|
``wsgi_multithread`` keyword argument. |
|
|
|
If a non-empty `form` dictionary is provided, it is treated as a collection |
|
of fields for a form ``POST``. The ``REQUEST_METHOD`` will default to |
|
``POST``, and the default ``CONTENT_LENGTH``, ``CONTENT_TYPE``, and |
|
``wsgi.input`` values will be appropriately set (but can still be |
|
overridden by explicit keyword arguments or the `environ` argument). |
|
|
|
Any `form` values that are not instances of ``basestring`` are assumed to |
|
be *sequences* of values, and will result in multiple name/value pairs |
|
being added to the encoded data sent to the application. |
|
|
|
Any WSGI-required variables that are not specified by `environ`, `form`, or |
|
keyword arguments, are initialized to default values using the |
|
``wsgiref.util.setup_testing_defaults()`` function. |
|
""" |
|
|
|
from wsgiref.util import setup_testing_defaults |
|
from wsgiref.handlers import SimpleHandler |
|
from StringIO import StringIO |
|
from urllib import quote_plus |
|
|
|
environ = environ.copy() |
|
for k, v in kw.items(): |
|
if k.startswith('wsgi_'): |
|
environ[k.replace('_','.',1)] = v |
|
else: |
|
environ[k] = v |
|
|
|
|
|
|
|
if form: |
|
encoded = [] |
|
for k, v in form.items(): |
|
if isinstance(v,basestring): |
|
v = [v] |
|
for v in v: |
|
encoded.append('%s=%s' % (quote_plus(k), quote_plus(v))) |
|
encoded = '&'.join(encoded) |
|
environ.setdefault('wsgi.input', StringIO(encoded)) |
|
environ.setdefault('CONTENT_LENGTH', str(len(encoded))) |
|
environ.setdefault('CONTENT_TYPE', 'application/x-www-form-urlencoded') |
|
environ.setdefault('REQUEST_METHOD', 'POST') |
|
|
|
setup_testing_defaults(environ) |
|
stdout = StringIO() |
|
stderr = environ['wsgi.errors'] |
|
|
|
def wrapper(env, start): |
|
try: |
|
return app(env, start) |
|
except: |
|
stdout = sys.stdout |
|
try: |
|
if stdout is not sys.__stdout__: |
|
sys.stdout = sys.__stdout__ |
|
import pdb |
|
pdb.post_mortem(sys.exc_info()[2]) |
|
finally: |
|
sys.stdout = stdout |
|
raise |
|
|
|
SimpleHandler( |
|
environ['wsgi.input'], stdout, stderr, environ, |
|
environ['wsgi.multithread'], environ['wsgi.multiprocess'] |
|
).run(wrapper) |
|
print stdout.getvalue().replace('\r\n','\n') |
|
if stderr.getvalue(): |
|
print "--- Log Output ---" |
|
print stderr.getvalue().replace('\r\n','\n') |
|
|
|
|
class TestForm(Page): |
class TestForm(Page): |
"""A stupid example to test the framework""" |
"""A stupid example to test the framework""" |
|
|
TestContainer.c = TestContainer # allow some depth to the test... |
TestContainer.c = TestContainer # allow some depth to the test... |
|
|
|
|
|
def additional_tests(): |
|
import doctest |
|
return doctest.DocFileSuite( |
|
'README.txt', |
|
optionflags=doctest.ELLIPSIS|doctest.REPORT_ONLY_FIRST_FAILURE, |
|
) |
|
|
|
|
|
|