# XXX This is VERY rough... :( |
|
|
|
from peak.api import * |
from peak.api import * |
|
from peak.naming import URL |
|
|
|
justPath = URL.Sequence( |
|
URL.MatchString(), ':', |
|
('//', URL.MatchString(pattern='[^/]*')), |
|
URL.Named('path'), |
|
('?', URL.MatchString()), ('#', URL.MatchString()), |
|
) |
|
|
|
def pathFromURL(url): |
|
return URL.parse(str(url),justPath)['path'] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GenericPathURL(URL.Base): |
|
|
|
nameAttr = 'path' |
|
supportedSchemes = 'http','https','ftp','file', |
|
|
|
class user(URL.Field): pass |
|
class password(URL.Field): pass |
|
|
class OpenableURL(naming.URL.Base): |
class hostname(URL.Field): |
|
defaultValue = '' |
|
syntax = URL.Conversion( |
|
URL.ExtractQuoted(URL.MatchString(pattern='[^/:]*')), |
|
canBeEmpty = True |
|
) |
|
|
|
class port(URL.IntField): pass |
|
|
|
class path(URL.NameField): |
|
referencedType = naming.CompositeName |
|
canBeEmpty = True |
|
|
|
class query(URL.Field): pass |
|
|
|
class fragment(URL.Field): pass |
|
|
|
# Make syntax usable w/subclasses that redefine individual fields |
|
|
|
syntax = binding.classAttr( |
|
binding.Once( |
|
lambda s,d,a: URL.Sequence( |
|
'//', |
|
( (s.user, (':',s.password) ,'@'), s.hostname, (':',s.port)), |
|
'/', s.path, ('?',s.query), ('#',s.fragment) |
|
) |
|
) |
|
) |
|
|
|
|
|
|
|
|
|
|
|
class OpenableURL(GenericPathURL): |
|
|
def retrieve(self, refInfo, name, context, attrs=None): |
def retrieve(self, refInfo, name, context, attrs=None): |
return URLStreamFactory(target = str(self)) |
return URLStreamFactory(target = str(self)) |
|
|
|
|
class FileURL(naming.URL.Base): |
class FileURL(GenericPathURL): |
|
|
|
supportedSchemes = 'file', |
|
|
|
# Make syntax usable w/subclasses that redefine individual fields |
|
syntax = binding.classAttr( |
|
binding.Once( |
|
lambda s,d,a: URL.Sequence( |
|
URL.Alternatives( |
|
URL.Sequence('//', s.hostname, s.path), |
|
s.path, |
|
), ('?', s.querySyntax), ('#', s.fragment), |
|
) |
|
) |
|
) |
|
|
|
querySyntax = GenericPathURL.query._syntax |
|
|
|
def getFilename(self): |
|
# We need to normalize ourself to match urllib's funky |
|
# conventions for file: conversion. :( |
|
from urllib import url2pathname |
|
return url2pathname(pathFromURL(self)) |
|
|
# XXX needs parser for //user:auth@host:port/path?query#frag |
|
# XXX could then be shared with URLs for http, ftp, https... |
|
# XXX Probably path portion needs to be manipulable as |
|
|
|
def retrieve(self, refInfo, name, context, attrs=None): |
def retrieve(self, refInfo, name, context, attrs=None): |
return FileFactory(filename = urlToLocalFile(self)) |
return FileFactory( filename = self.getFilename() ) |
|
|
|
|
|
|
|
|
|
|
def urlToLocalFile(url): |
|
|
|
path = url.body |
|
|
|
if path.startswith('//'): |
|
# XXX need to translate '/' into local 'os.sep'? |
|
return path[2:].split('/',1)[1] |
|
|
|
|
class PkgFileURL(URL.Base): |
|
|
|
nameAttr = 'body' |
|
|
|
supportedSchemes = 'pkgfile', |
|
|
|
class body(URL.NameField): |
|
referencedType = naming.CompositeName |
|
canBeEmpty = True |
|
|
|
def getFilename(self): |
|
if len(self.body)<2 or not self.body[0]: |
|
raise exceptions.InvalidName( |
|
"Missing package name in %s" % self |
|
) |
|
from os.path import join, dirname |
|
from peak.util.imports import importString |
|
path = dirname(importString(self.body[0]).__file__) |
|
for p in self.body[1:]: |
|
path = join(path,p) |
return path |
return path |
|
|
|
def retrieve(self, refInfo, name, context, attrs=None): |
|
return FileFactory( filename = self.getFilename() ) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
This is a pretty lame duck right now. It's mainly here so we have a |
This is a pretty lame duck right now. It's mainly here so we have a |
consistent interface across file-like URLs.""" |
consistent interface across file-like URLs.""" |
|
|
implements(naming.IStreamFactory) |
protocols.advise( |
|
instancesProvide=[naming.IStreamFactory] |
|
) |
|
|
target = binding.requireBinding("urllib2 URL or request", "target") |
target = binding.requireBinding("urllib2 URL or request", "target") |
|
|
raise TypeError("URL not seekable", self.target) |
raise TypeError("URL not seekable", self.target) |
|
|
from urllib2 import urlopen |
from urllib2 import urlopen |
return urlopen(self.target) |
return urlopen(str(self.target)) |
|
|
|
|
def create(self,mode,seek=False,readable=False,autocommit=False): |
def create(self,mode,seek=False,readable=False,autocommit=False): |
|
|
|
|
|
|
|
|
|
|
def exists(self): |
def exists(self): |
|
|
from urllib2 import urlopen, HTTPError |
from urllib2 import urlopen, HTTPError |
|
|
"""Stream factory for a local file object""" |
"""Stream factory for a local file object""" |
|
|
implements(naming.IStreamFactory) |
protocols.advise( |
|
instancesProvide=[naming.IStreamFactory] |
|
) |
|
|
filename = binding.requireBinding("Filename to open/modify", "filename") |
filename = binding.requireBinding("Filename to open/modify", "filename") |
|
|
|
|
|
|
|
|
|
|