[Subversion] / WebHaiku / web_haiku.py  

Diff of /WebHaiku/web_haiku.py

Parent Directory | Revision Log

version 2266, Sun Jan 14 21:00:12 2007 UTC version 2277, Wed Jan 24 01:45:35 2007 UTC
Line 4 
Line 4 
 from wsgiref.util import shift_path_info, application_uri  from wsgiref.util import shift_path_info, application_uri
   
 __all__ = [  __all__ = [
     "Page", "form_handler", "HTML", "Text", "Template", "HTTP",      "Page", "form_handler", "HTML", "Text", "Template", "HTTP", "expose",
     "EvalTemplate", "EvalMap", "Method",      "test", "Redirector", "EvalTemplate", "EvalMap", "Method",
 ]  ]
   
 class Method(object):  class Method(object):
Line 55 
Line 55 
     def __delitem__(self, key):         # needed for listcomp exprs!      def __delitem__(self, key):         # needed for listcomp exprs!
         del self.extra[key]          del self.extra[key]
   
       def __contains__(self, key):
           try: self[key]
           except KeyError: return False
           return True
   
     def __getitem__(self, key):      def __getitem__(self, key):
         if key.startswith('(?'):          if key.startswith('(?'):
             return eval(              return eval(key[2:].rstrip(')').rstrip('?').strip(),
                 key[2:].rstrip(')').rstrip('?').strip(),                          sys.modules[self.module].__dict__, self)
                 sys.modules[self.module].__dict__, self  
             )  
         elif key in self.extra:          elif key in self.extra:
             return self.extra[key]              return self.extra[key]
         else:          else:
Line 72 
Line 75 
                 return g[key]                  return g[key]
         if key=='self':          if key=='self':
             return self              return self
         raise KeyError          raise KeyError(key)
   
 class EvalTemplate(string.Template):  class EvalTemplate(string.Template):
     idpattern = r'[_a-z][_a-z0-9]*|\(\?[^?]*\?\)'      idpattern = r'[_a-z][_a-z0-9]*|\(\?[^?]*\?\)'
   
   
   
   
 class Text(Method):  class Text(Method):
     """Text template w/string substitution that can be used as a method      """Text template w/string substitution that can be used as a method
   
Line 203 
Line 203 
   
   
   
   def expose(func):
       """Wrapper/decorator that marks a method as a subpage"""
       class _Page(Page):
           def body(self):
               return func(self.parent)
       return _Page
   
   
 class Page(object):  class Page(object):
     """A generic web location"""      """A generic web location"""
   
Line 234 
Line 242 
             setattr(self,k,v)              setattr(self,k,v)
         self.setup()    # perform any dynamic initialization          self.setup()    # perform any dynamic initialization
   
     def HEAD(self):  
         environ['REQUEST_METHOD'] = 'GET'  
         resp = iter(self.invoke_method())   # forward to 'GET'  
         list(resp)      # questionable hack to exhaust the response  
         return resp     # ensure that .close() gets called, if any  
   
   
   
   
       def HEAD(self):
           def write(txt):
               sr.length += len(txt)
   
     URL = property(lambda self: application_uri(self.environ))          def sr(status, headers, exc_info=None):
               if exc_info:
                   if sr.length:
                       try:
                           raise exc_info[0], exc_info[1], exc_info[2]
                       finally:
                           exc_info = None
               elif sr.status is not None:
                   raise AssertionError("Headers already set!")
   
               sr.status,sr.headers,sr.exc_info = status,Headers(headers),exc_info
               return write
   
           from wsgiref.headers import Headers
           sr.length = 0
           sr.status, sr.headers, sr.exc_info = None, (), None
           old_sr = self.start_response
           self.start_response = sr
           self.environ['REQUEST_METHOD'] = 'GET'
   
           try:
               resp = self.invoke_method()   # forward to 'GET'
               if 'Content-Length' not in sr.headers:
                   for chunk in resp:
                       sr.length += len(chunk)
                       if 'Content-Length' in sr.headers:
                           break
               if hasattr(resp,'close'):
                   resp.close()
               if 'Content-Length' not in sr.headers:
                   sr.headers['Content-Length'] = str(sr.length)
   
               old_sr(sr.status, sr.headers.items(), sr.exc_info)
               return ['']
           finally:
               sr.args = exc_info = None  # clean up exc_info, if still present
   
     def go(self):      def go(self):
           self.URL = application_uri(self.environ)
         name = shift_path_info(self.environ)          name = shift_path_info(self.environ)
         if name:          if name:
             return self.handle_child(name)              return self.handle_child(name)
Line 261 
Line 301 
             if leaf:              if leaf:
                 return self.invoke_method()                  return self.invoke_method()
             url += '/'  # add the trailing /              url += '/'  # add the trailing /
   
           if self.environ.get('QUERY_STRING'):
               url += '?' + self.environ['QUERY_STRING']
         return self.redirect(url)          return self.redirect(url)
   
     def handle_child(self, name):      def handle_child(self, name):
Line 283 
Line 326 
   
   
   
   
   
     def invoke_method(self):      def invoke_method(self):
         rm = self.environ['REQUEST_METHOD']          rm = self.environ['REQUEST_METHOD']
         if rm=='HEAD' or rm in self.http_methods:          if rm=='HEAD' or rm in self.http_methods:
Line 302 
Line 343 
         if self.form_handlers:          if self.form_handlers:
             methods.add('POST')              methods.add('POST')
   
         return self.METHOD_NOT_ALLOWED([('Allow', ', '.join(methods))])          return self.METHOD_NOT_ALLOWED([('Allow', ', '.join(sorted(methods)))])
   
     METHOD_NOT_ALLOWED = Text(      METHOD_NOT_ALLOWED = Text(
         "Excellent method!\n"          "Excellent method!\n"
Line 330 
Line 371 
     form_parsed = False      form_parsed = False
     form_data = ()      form_data = ()
     form_defaults = {}      form_defaults = {}
       escape = staticmethod(cgi.escape)
   
     def get_handlers(self):      def get_handlers(self):
         handlers = [getattr(self,k) for k in self.form_handlers]          handlers = [getattr(self,k) for k in self.form_handlers]
Line 338 
Line 380 
   
     def __getattr__(self, name):      def __getattr__(self, name):
         """Dynamic attributes from form_data and defaults"""          """Dynamic attributes from form_data and defaults"""
         if not name.startswith('_') and name in self.form_data:          if name in self.form_defaults:  # form vars must be explicitly listed
               if name in self.form_data:
             return self.form_data[name].value              return self.form_data[name].value
         if name in self.form_defaults:  
             return self.form_defaults[name]              return self.form_defaults[name]
         raise AttributeError(name)          raise AttributeError(name)
   
Line 366 
Line 408 
             return self.form_success()              return self.form_success()
   
   
   
     # A miserably inadequate attempt at a decent UI...      # A miserably inadequate attempt at a decent UI...
   
     errors_found = HTML.fragment(      errors_found = HTML.fragment(
Line 444 
Line 485 
     """Easy-access dict/object wrapper for DBAPI row tuples"""      """Easy-access dict/object wrapper for DBAPI row tuples"""
   
     def __init__(self, cursor, row):      def __init__(self, cursor, row):
         self.__dict__ = dict(self, zip([d[0]for d in cursor.description], row))          self.__dict__ = dict(zip([d[0]for d in cursor.description], row))
   
   
   
   
   def test(app, environ={}, form={}, **kw):
       """Print the output of a WSGI app
   
       Runs `app` as a WSGI application and prints its output.  If an untrapped
       error occurs in `app`, it drops into the ``pdb`` debugger's post-mortem
       debug shell (using ``sys.__stdout__`` if ``sys.stdout`` has been replaced).
   
       Any keyword arguments are added to the environment used to run `app`.  If
       a keyword argument begins with ``wsgi_``, the ``_`` is replaced with a
       ``.``, so that you can set e.g. ``wsgi.multithread`` using a
       ``wsgi_multithread`` keyword argument.
   
       If a non-empty `form` dictionary is provided, it is treated as a collection
       of fields for a form ``POST``. The ``REQUEST_METHOD`` will default to
       ``POST``, and the default ``CONTENT_LENGTH``, ``CONTENT_TYPE``, and
       ``wsgi.input`` values will be appropriately set (but can still be
       overridden by explicit keyword arguments or the `environ` argument).
   
       Any `form` values that are not instances of ``basestring`` are assumed to
       be *sequences* of values, and will result in multiple name/value pairs
       being added to the encoded data sent to the application.
   
       Any WSGI-required variables that are not specified by `environ`, `form`, or
       keyword arguments, are initialized to default values using the
       ``wsgiref.util.setup_testing_defaults()`` function.
       """
   
       from wsgiref.util import setup_testing_defaults
       from wsgiref.handlers import SimpleHandler
       from StringIO import StringIO
       from urllib import quote_plus
   
       environ = environ.copy()
       for k, v in kw.items():
           if k.startswith('wsgi_'):
               environ[k.replace('_','.',1)] = v
           else:
               environ[k] = v
   
   
   
       if form:
           encoded = []
           for k, v in form.items():
               if isinstance(v,basestring):
                   v = [v]
               for v in v:
                   encoded.append('%s=%s' % (quote_plus(k), quote_plus(v)))
           encoded = '&'.join(encoded)
           environ.setdefault('wsgi.input', StringIO(encoded))
           environ.setdefault('CONTENT_LENGTH', str(len(encoded)))
           environ.setdefault('CONTENT_TYPE', 'application/x-www-form-urlencoded')
           environ.setdefault('REQUEST_METHOD', 'POST')
   
       setup_testing_defaults(environ)
       stdout = StringIO()
       stderr = environ['wsgi.errors']
   
       def wrapper(env, start):
           try:
               return app(env, start)
           except:
               stdout = sys.stdout
               try:
                   if stdout is not sys.__stdout__:
                       sys.stdout = sys.__stdout__
                   import pdb
                   pdb.post_mortem(sys.exc_info()[2])
               finally:
                   sys.stdout = stdout
               raise
   
       SimpleHandler(
           environ['wsgi.input'], stdout, stderr, environ,
           environ['wsgi.multithread'], environ['wsgi.multiprocess']
       ).run(wrapper)
       print stdout.getvalue().replace('\r\n','\n')
       if stderr.getvalue():
           print "--- Log Output ---"
           print stderr.getvalue().replace('\r\n','\n')
   
   
 class TestForm(Page):  class TestForm(Page):
Line 464 
Line 587 
  <form method="post">   <form method="post">
   <table>    <table>
    <tr><td>What is your name ?</td>     <tr><td>What is your name ?</td>
        <td><input type="text" name="name" value="$(?cgi.escape(name)?)"/></td></tr>         <td><input type="text" name="name" value="$(?escape(name)?)"/></td></tr>
    <tr><td>What is your favorite animal ?</td>     <tr><td>What is your favorite animal ?</td>
        <td><input type="text" name="animal" value="$(?cgi.escape(animal)?)"/></td></tr>         <td><input type="text" name="animal" value="$(?escape(animal)?)"/></td></tr>
    <tr><td>What is your email address ?</td>     <tr><td>What is your email address ?</td>
        <td><input type="text" name="email" value="$(?cgi.escape(email)?)"/></td></tr>         <td><input type="text" name="email" value="$(?escape(email)?)"/></td></tr>
    <tr><td colspan="2"><input type="submit" /></td></tr>     <tr><td colspan="2"><input type="submit" /></td></tr>
   </table>    </table>
  </form>   </form>
Line 510 
Line 633 
 TestContainer.c = TestContainer     # allow some depth to the test...  TestContainer.c = TestContainer     # allow some depth to the test...
   
   
   def Redirector(url):
       """Create a method that will go to a predefined URL (w/embedded vars)"""
       url = Text(url)
       def method(self):
           return self.redirect(url.render(self))
       return method
   
   def additional_tests():
       import doctest
       return doctest.DocFileSuite(
           'README.txt',
           optionflags=doctest.ELLIPSIS|doctest.REPORT_ONLY_FIRST_FAILURE,
       )
   
   
   


Generate output suitable for use with a patch program
Legend:
Removed from v.2266  
changed lines
  Added in v.2277

cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help