from wsgiref.util import shift_path_info, application_uri |
from wsgiref.util import shift_path_info, application_uri |
|
|
__all__ = [ |
__all__ = [ |
"Page", "form_handler", "HTML", "Text", "Template", "HTTP", |
"Page", "form_handler", "HTML", "Text", "Template", "HTTP", "expose", |
"EvalTemplate", "EvalMap", "Method", |
"test", "Redirector", "EvalTemplate", "EvalMap", "Method", |
] |
] |
|
|
class Method(object): |
class Method(object): |
def __delitem__(self, key): # needed for listcomp exprs! |
def __delitem__(self, key): # needed for listcomp exprs! |
del self.extra[key] |
del self.extra[key] |
|
|
|
def __contains__(self, key): |
|
try: self[key] |
|
except KeyError: return False |
|
return True |
|
|
def __getitem__(self, key): |
def __getitem__(self, key): |
if key.startswith('(?'): |
if key.startswith('(?'): |
return eval( |
return eval(key[2:].rstrip(')').rstrip('?').strip(), |
key[2:].rstrip(')').rstrip('?').strip(), |
sys.modules[self.module].__dict__, self) |
sys.modules[self.module].__dict__, self |
|
) |
|
elif key in self.extra: |
elif key in self.extra: |
return self.extra[key] |
return self.extra[key] |
else: |
else: |
return g[key] |
return g[key] |
if key=='self': |
if key=='self': |
return self |
return self |
raise KeyError |
raise KeyError(key) |
|
|
class EvalTemplate(string.Template): |
class EvalTemplate(string.Template): |
idpattern = r'[_a-z][_a-z0-9]*|\(\?[^?]*\?\)' |
idpattern = r'[_a-z][_a-z0-9]*|\(\?[^?]*\?\)' |
|
|
|
|
|
|
|
|
class Text(Method): |
class Text(Method): |
"""Text template w/string substitution that can be used as a method |
"""Text template w/string substitution that can be used as a method |
|
|
|
|
|
|
|
|
|
def expose(func): |
|
"""Wrapper/decorator that marks a method as a subpage""" |
|
class _Page(Page): |
|
def body(self): |
|
return func(self.parent) |
|
return _Page |
|
|
|
|
class Page(object): |
class Page(object): |
"""A generic web location""" |
"""A generic web location""" |
|
|
setattr(self,k,v) |
setattr(self,k,v) |
self.setup() # perform any dynamic initialization |
self.setup() # perform any dynamic initialization |
|
|
def HEAD(self): |
|
environ['REQUEST_METHOD'] = 'GET' |
|
resp = iter(self.invoke_method()) # forward to 'GET' |
|
list(resp) # questionable hack to exhaust the response |
|
return resp # ensure that .close() gets called, if any |
|
|
|
|
|
|
|
|
|
|
def HEAD(self): |
|
def write(txt): |
|
sr.length += len(txt) |
|
|
URL = property(lambda self: application_uri(self.environ)) |
def sr(status, headers, exc_info=None): |
|
if exc_info: |
|
if sr.length: |
|
try: |
|
raise exc_info[0], exc_info[1], exc_info[2] |
|
finally: |
|
exc_info = None |
|
elif sr.status is not None: |
|
raise AssertionError("Headers already set!") |
|
|
|
sr.status,sr.headers,sr.exc_info = status,Headers(headers),exc_info |
|
return write |
|
|
|
from wsgiref.headers import Headers |
|
sr.length = 0 |
|
sr.status, sr.headers, sr.exc_info = None, (), None |
|
old_sr = self.start_response |
|
self.start_response = sr |
|
self.environ['REQUEST_METHOD'] = 'GET' |
|
|
|
try: |
|
resp = self.invoke_method() # forward to 'GET' |
|
if 'Content-Length' not in sr.headers: |
|
for chunk in resp: |
|
sr.length += len(chunk) |
|
if 'Content-Length' in sr.headers: |
|
break |
|
if hasattr(resp,'close'): |
|
resp.close() |
|
if 'Content-Length' not in sr.headers: |
|
sr.headers['Content-Length'] = str(sr.length) |
|
|
|
old_sr(sr.status, sr.headers.items(), sr.exc_info) |
|
return [''] |
|
finally: |
|
sr.args = exc_info = None # clean up exc_info, if still present |
|
|
def go(self): |
def go(self): |
|
self.URL = application_uri(self.environ) |
name = shift_path_info(self.environ) |
name = shift_path_info(self.environ) |
if name: |
if name: |
return self.handle_child(name) |
return self.handle_child(name) |
if leaf: |
if leaf: |
return self.invoke_method() |
return self.invoke_method() |
url += '/' # add the trailing / |
url += '/' # add the trailing / |
|
|
|
if self.environ.get('QUERY_STRING'): |
|
url += '?' + self.environ['QUERY_STRING'] |
return self.redirect(url) |
return self.redirect(url) |
|
|
def handle_child(self, name): |
def handle_child(self, name): |
|
|
|
|
|
|
|
|
|
|
def invoke_method(self): |
def invoke_method(self): |
rm = self.environ['REQUEST_METHOD'] |
rm = self.environ['REQUEST_METHOD'] |
if rm=='HEAD' or rm in self.http_methods: |
if rm=='HEAD' or rm in self.http_methods: |
if self.form_handlers: |
if self.form_handlers: |
methods.add('POST') |
methods.add('POST') |
|
|
return self.METHOD_NOT_ALLOWED([('Allow', ', '.join(methods))]) |
return self.METHOD_NOT_ALLOWED([('Allow', ', '.join(sorted(methods)))]) |
|
|
METHOD_NOT_ALLOWED = Text( |
METHOD_NOT_ALLOWED = Text( |
"Excellent method!\n" |
"Excellent method!\n" |
form_parsed = False |
form_parsed = False |
form_data = () |
form_data = () |
form_defaults = {} |
form_defaults = {} |
|
escape = staticmethod(cgi.escape) |
|
|
def get_handlers(self): |
def get_handlers(self): |
handlers = [getattr(self,k) for k in self.form_handlers] |
handlers = [getattr(self,k) for k in self.form_handlers] |
|
|
def __getattr__(self, name): |
def __getattr__(self, name): |
"""Dynamic attributes from form_data and defaults""" |
"""Dynamic attributes from form_data and defaults""" |
if not name.startswith('_') and name in self.form_data: |
if name in self.form_defaults: # form vars must be explicitly listed |
|
if name in self.form_data: |
return self.form_data[name].value |
return self.form_data[name].value |
if name in self.form_defaults: |
|
return self.form_defaults[name] |
return self.form_defaults[name] |
raise AttributeError(name) |
raise AttributeError(name) |
|
|
return self.form_success() |
return self.form_success() |
|
|
|
|
|
|
# A miserably inadequate attempt at a decent UI... |
# A miserably inadequate attempt at a decent UI... |
|
|
errors_found = HTML.fragment( |
errors_found = HTML.fragment( |
"""Easy-access dict/object wrapper for DBAPI row tuples""" |
"""Easy-access dict/object wrapper for DBAPI row tuples""" |
|
|
def __init__(self, cursor, row): |
def __init__(self, cursor, row): |
self.__dict__ = dict(self, zip([d[0]for d in cursor.description], row)) |
self.__dict__ = dict(zip([d[0]for d in cursor.description], row)) |
|
|
|
|
|
|
|
|
|
def test(app, environ={}, form={}, **kw): |
|
"""Print the output of a WSGI app |
|
|
|
Runs `app` as a WSGI application and prints its output. If an untrapped |
|
error occurs in `app`, it drops into the ``pdb`` debugger's post-mortem |
|
debug shell (using ``sys.__stdout__`` if ``sys.stdout`` has been replaced). |
|
|
|
Any keyword arguments are added to the environment used to run `app`. If |
|
a keyword argument begins with ``wsgi_``, the ``_`` is replaced with a |
|
``.``, so that you can set e.g. ``wsgi.multithread`` using a |
|
``wsgi_multithread`` keyword argument. |
|
|
|
If a non-empty `form` dictionary is provided, it is treated as a collection |
|
of fields for a form ``POST``. The ``REQUEST_METHOD`` will default to |
|
``POST``, and the default ``CONTENT_LENGTH``, ``CONTENT_TYPE``, and |
|
``wsgi.input`` values will be appropriately set (but can still be |
|
overridden by explicit keyword arguments or the `environ` argument). |
|
|
|
Any `form` values that are not instances of ``basestring`` are assumed to |
|
be *sequences* of values, and will result in multiple name/value pairs |
|
being added to the encoded data sent to the application. |
|
|
|
Any WSGI-required variables that are not specified by `environ`, `form`, or |
|
keyword arguments, are initialized to default values using the |
|
``wsgiref.util.setup_testing_defaults()`` function. |
|
""" |
|
|
|
from wsgiref.util import setup_testing_defaults |
|
from wsgiref.handlers import SimpleHandler |
|
from StringIO import StringIO |
|
from urllib import quote_plus |
|
|
|
environ = environ.copy() |
|
for k, v in kw.items(): |
|
if k.startswith('wsgi_'): |
|
environ[k.replace('_','.',1)] = v |
|
else: |
|
environ[k] = v |
|
|
|
|
|
|
|
if form: |
|
encoded = [] |
|
for k, v in form.items(): |
|
if isinstance(v,basestring): |
|
v = [v] |
|
for v in v: |
|
encoded.append('%s=%s' % (quote_plus(k), quote_plus(v))) |
|
encoded = '&'.join(encoded) |
|
environ.setdefault('wsgi.input', StringIO(encoded)) |
|
environ.setdefault('CONTENT_LENGTH', str(len(encoded))) |
|
environ.setdefault('CONTENT_TYPE', 'application/x-www-form-urlencoded') |
|
environ.setdefault('REQUEST_METHOD', 'POST') |
|
|
|
setup_testing_defaults(environ) |
|
stdout = StringIO() |
|
stderr = environ['wsgi.errors'] |
|
|
|
def wrapper(env, start): |
|
try: |
|
return app(env, start) |
|
except: |
|
stdout = sys.stdout |
|
try: |
|
if stdout is not sys.__stdout__: |
|
sys.stdout = sys.__stdout__ |
|
import pdb |
|
pdb.post_mortem(sys.exc_info()[2]) |
|
finally: |
|
sys.stdout = stdout |
|
raise |
|
|
|
SimpleHandler( |
|
environ['wsgi.input'], stdout, stderr, environ, |
|
environ['wsgi.multithread'], environ['wsgi.multiprocess'] |
|
).run(wrapper) |
|
print stdout.getvalue().replace('\r\n','\n') |
|
if stderr.getvalue(): |
|
print "--- Log Output ---" |
|
print stderr.getvalue().replace('\r\n','\n') |
|
|
|
|
class TestForm(Page): |
class TestForm(Page): |
<form method="post"> |
<form method="post"> |
<table> |
<table> |
<tr><td>What is your name ?</td> |
<tr><td>What is your name ?</td> |
<td><input type="text" name="name" value="$(?cgi.escape(name)?)"/></td></tr> |
<td><input type="text" name="name" value="$(?escape(name)?)"/></td></tr> |
<tr><td>What is your favorite animal ?</td> |
<tr><td>What is your favorite animal ?</td> |
<td><input type="text" name="animal" value="$(?cgi.escape(animal)?)"/></td></tr> |
<td><input type="text" name="animal" value="$(?escape(animal)?)"/></td></tr> |
<tr><td>What is your email address ?</td> |
<tr><td>What is your email address ?</td> |
<td><input type="text" name="email" value="$(?cgi.escape(email)?)"/></td></tr> |
<td><input type="text" name="email" value="$(?escape(email)?)"/></td></tr> |
<tr><td colspan="2"><input type="submit" /></td></tr> |
<tr><td colspan="2"><input type="submit" /></td></tr> |
</table> |
</table> |
</form> |
</form> |
TestContainer.c = TestContainer # allow some depth to the test... |
TestContainer.c = TestContainer # allow some depth to the test... |
|
|
|
|
|
def Redirector(url): |
|
"""Create a method that will go to a predefined URL (w/embedded vars)""" |
|
url = Text(url) |
|
def method(self): |
|
return self.redirect(url.render(self)) |
|
return method |
|
|
|
def additional_tests(): |
|
import doctest |
|
return doctest.DocFileSuite( |
|
'README.txt', |
|
optionflags=doctest.ELLIPSIS|doctest.REPORT_ONLY_FIRST_FAILURE, |
|
) |
|
|
|
|
|
|