from wsgiref.util import shift_path_info, application_uri |
from wsgiref.util import shift_path_info, application_uri |
|
|
__all__ = [ |
__all__ = [ |
"Page", "Form", "validator", "HTML", "Text", "Template", "HTTP", |
"Page", "form_handler", "HTML", "Text", "Template", "HTTP", |
"EvalTemplate", "EvalMap", "Method", |
"EvalTemplate", "EvalMap", "Method", |
] |
] |
|
|
|
|
cls_registry = "http_methods" |
cls_registry = "http_methods" |
|
|
|
def form_handler(arg): |
def validator(func): |
|
"""Decorator that marks an object as a registered validator""" |
"""Decorator that marks an object as a registered validator""" |
func.cls_registry = "registered_validators" |
def decorator(func): |
|
func.cls_registry = "form_handlers" |
|
func.priority = priority, func.__name__ |
return func |
return func |
|
if isinstance(arg, type(form_handler)): |
|
priority = 0 |
|
return decorator(arg) |
|
return decorator |
|
|
text_plain = ('Content-Type', 'text/plain') |
text_plain = ('Content-Type', 'text/plain') |
text_html = ('Content-Type', 'text/html') |
text_html = ('Content-Type', 'text/html') |
def get_module(): |
def get_module(): |
return sys._getframe(2).f_globals.get('__name__', __name__) |
return sys._getframe(2).f_globals.get('__name__', __name__) |
|
|
|
|
|
|
|
|
|
|
|
|
sentinel = object() |
sentinel = object() |
|
|
class EvalMap(object): |
class EvalMap(object): |
setattr(self,k,v) |
setattr(self,k,v) |
self.setup() # perform any dynamic initialization |
self.setup() # perform any dynamic initialization |
|
|
def invoke_method(self): |
|
rm = self.environ['REQUEST_METHOD'] |
|
if rm=='GET' and self.body is not None: |
|
return self.body() |
|
if rm=='HEAD' or rm in self.http_methods: |
|
return getattr(self, rm)() |
|
return self.METHOD_NOT_ALLOWED( |
|
[('Allow', ', '.join(self.http_methods))] |
|
) |
|
|
|
def HEAD(self): |
def HEAD(self): |
environ['REQUEST_METHOD'] = 'GET' |
environ['REQUEST_METHOD'] = 'GET' |
resp = iter(self.invoke_method()) # forward to 'GET' |
resp = iter(self.invoke_method()) # forward to 'GET' |
list(resp) # questionable hack to exhaust the response |
list(resp) # questionable hack to exhaust the response |
return resp # ensure that .close() gets called, if any |
return resp # ensure that .close() gets called, if any |
|
|
|
|
|
|
|
|
|
|
def go(self): |
def go(self): |
name = shift_path_info(self.environ) |
name = shift_path_info(self.environ) |
if name: |
if name: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def invoke_method(self): |
|
rm = self.environ['REQUEST_METHOD'] |
|
if rm=='HEAD' or rm in self.http_methods: |
|
return getattr(self, rm)() |
|
elif rm=='GET' and self.body is not None: |
|
return self.body() |
|
elif rm=='POST' and self.form_handlers: |
|
return self.POST() |
|
|
|
methods = set(self.http_methods) # Compute available methods |
|
if self.body is not None: |
|
methods.add('GET') |
|
if 'GET' in methods: |
|
methods.add('HEAD') |
|
if self.form_handlers: |
|
methods.add('POST') |
|
|
|
return self.METHOD_NOT_ALLOWED([('Allow', ', '.join(methods))]) |
|
|
METHOD_NOT_ALLOWED = Text( |
METHOD_NOT_ALLOWED = Text( |
"Excellent method!\n" |
"Excellent method!\n" |
"Alas, my response must be:\n" |
"Alas, my response must be:\n" |
) |
) |
|
|
|
|
|
form_handlers = [] |
|
form_parsed = False |
|
form_data = () |
|
form_defaults = {} |
|
|
|
def get_handlers(self): |
|
handlers = [getattr(self,k) for k in self.form_handlers] |
|
handlers.sort(key=lambda h: h.priority) |
|
return handlers |
|
|
|
def __getattr__(self, name): |
|
"""Dynamic attributes from form_data and defaults""" |
|
if not name.startswith('_') and name in self.form_data: |
|
return self.form_data[name].value |
|
if name in self.form_defaults: |
|
return self.form_defaults[name] |
|
raise AttributeError(name) |
|
|
|
def parse_form(self): |
|
"""Ensure that self.form_data contains a FieldStorage, and return it""" |
|
if not self.form_parsed: |
|
self.form_data = cgi.FieldStorage( |
|
self.environ['wsgi.input'], environ=self.environ |
|
) |
|
self.form_parsed = True |
|
return self.form_data |
|
|
|
def POST(self): |
|
self.parse_form() |
|
for handler in self.get_handlers(): |
|
response = handler() |
|
if response: |
|
return response |
|
|
|
if self.errors: |
|
return self.form_failure() |
|
else: |
|
return self.form_success() |
|
|
|
|
|
|
|
# A miserably inadequate attempt at a decent UI... |
|
|
|
errors_found = HTML.fragment( |
|
'<ul class="form_errors"><li class="form_error">' |
|
r'$(? "</li><li class=\"form_error\">".join(errors) ?)' |
|
'</li></ul>' |
|
) |
|
|
|
show_errors = property(lambda self: self.errors and self.errors_found or '') |
|
|
|
form_failure = HTML( |
|
"<html><head>" |
|
"<title>Sorry, we couldn't process your request</title></head>\n<body>" |
|
"<h2>Sorry, we couldn't process your request</h2>\n" |
|
"<p>We encountered some difficulties with your input:</p>" |
|
"$errors_found\n" |
|
"<p>If you would please use your browser's BACK button to go back and " |
|
"correct these problems, we'd appreciate it. Thanks for your help!" |
|
"</p></body>" |
|
) |
|
|
|
|
|
form_success = Text( |
|
"Oops. Someone forgot to create a template or method here." |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
class Form(Page): |
class TestForm(Page): |
"""A page with POST processing, form parsing, validation, etc.""" |
|
registered_validators = [] |
|
data = {} |
|
defaults = {} |
|
def get_validators(self): |
|
return self.registered_validators |
|
|
|
def validate(self): |
|
for k in self.get_validators(): |
|
response = getattr(self,k)() |
|
if response: |
|
return response |
|
|
|
def __getattr__(self, name): |
|
if not name.startswith('__'): |
|
try: |
|
return self.data[name].value |
|
except KeyError: |
|
if name in self.defaults: |
|
return self.defaults[name] |
|
raise AttributeError(name) |
|
|
|
succeed = Text("Oops. Someone forgot to create a form or method here.") |
|
|
|
body = succeed # you should replace this with the form's template |
|
|
|
def parse(self): |
|
self.data = cgi.FieldStorage( |
|
self.environ['wsgi.input'], environ=self.environ |
|
) |
|
|
|
errors_found = HTML.fragment( |
|
'<ul class="form_errors"><li class="form_error">' |
|
r'$(? "</li><li class=\"form_error\">".join(errors) ?)' |
|
'</li></ul>' |
|
) |
|
|
|
show_errors = property(lambda self: self.errors and self.errors_found or '') |
|
|
|
|
|
@HTTP |
|
def POST(self): |
|
self.parse() |
|
response = self.validate() |
|
if response: |
|
return response |
|
elif self.errors: |
|
return self.fail() |
|
else: |
|
return self.succeed() |
|
|
|
# A miserably inadequate attempt at a decent UI... |
|
fail = HTML( |
|
"<html><head>" |
|
"<title>Sorry, we couldn't process your request</title></head>\n<body>" |
|
"<h2>Sorry, we couldn't process your request</h2>\n" |
|
"<p>We encountered some difficulties with your input:</p>" |
|
"$errors_found\n" |
|
"<p>If you would please use your browser's BACK button to go back and " |
|
"correct these problems, we'd appreciate it. Thanks for your help!" |
|
"</p></body>" |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestForm(Form): |
|
"""A stupid example to test the framework""" |
"""A stupid example to test the framework""" |
|
|
defaults = dict(name='Joey', animal='Dog', email='joe@dog.com') |
form_defaults = dict(name='Joey', animal='Dog', email='joe@dog.com') |
|
|
body = fail = HTML("""<?xml version="1.0" encoding="iso-8859-1"?> |
body = form_failure = HTML("""<?xml version="1.0" encoding="iso-8859-1"?> |
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" |
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" |
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> |
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> |
<html xmlns="http://www.w3.org/1999/xhtml"> |
<html xmlns="http://www.w3.org/1999/xhtml"> |
</body> |
</body> |
</html> |
</html> |
""") |
""") |
succeed = Text("Hey Joe!") |
form_success = Text("Hey Joe!") |
|
|
@validator |
@form_handler |
def check_joe(self): |
def check_joe(self): |
if self.name!='Joe': self.errors.append("Hey, you're not Joe!") |
if self.name!='Joe': self.errors.append("Hey, you're not Joe!") |
|
|