[Subversion] / PEAK / src / peak / storage / interfaces.py  

View of /PEAK/src/peak/storage/interfaces.py

Parent Directory | Revision Log
Revision: 1436 - (download) (as text)
Thu Oct 30 19:47:14 2003 UTC (20 years, 6 months ago) by pje
File size: 18506 byte(s)
SQL connection objects now provide an 'appConfig' attribute that is a
driver-specific 'config.Namespace()'.  This allows you to easily set up
configuration properties that are driver-specific.  For example, you could
use properties to configure driver-specific SQL snippets, then access them
via the connection's 'appConfig' namespace.  The namespaces are of the form
'DRIVER.appConfig', where 'DRIVER' is the name of the DBAPI module for that
connection type (e.g. 'pgdb', 'cx_Oracle', etc.).
"""'Straw Man' Transaction Interfaces"""

from protocols import Interface, Attribute, advise
from peak.binding.interfaces import IComponent
from peak.api import NOT_GIVEN

__all__ = [
    'ITransactionService', 'ITransactionParticipant', 'ICache',
    'ITransactionErrorHandler', 'ICursor', 'IRow',
    'IDataManager', 'IDataManager_SPI', 'IWritableDM', 'IWritableDM_SPI',
    'IManagedConnection', 'IManagedConn_SPI', 'IKeyableDM',
    'ISQLConnection', 'ILDAPConnection', 'IDDEConnection',
    'ISQLIntrospector'
]



























class ITransactionService(Interface):

    """Manages transaction lifecycle, participants, and metadata.

    There is no predefined number of transactions that may exist, or
    what they are associated with.  Depending on the application
    model, there may be one per application, one per transaction, one
    per incoming connection (in server applications), or some other
    number.  The transaction package should, however, offer an API for
    managing per-thread (or per-app, if threads aren't being used)
    transactions, since this will probably be the most common usage
    scenario."""

    # The basic transaction lifecycle

    def begin(**info):
        """Begin a transaction.  Raise TransactionInProgress if
        already begun.  Any keyword arguments are passed on to the
        setInfo() method.  (See below.)"""

    def commit():
        """Commit the transaction, or raise OutsideTransaction if not in
        progress."""

    def abort():
        """Abort the transaction, or raise OutsideTransaction if not in
        progress."""

    def fail():
        """Force transaction to fail (i.e. no commits allowed, only aborts)"""











    # Managing participants

    def join(participant):
        """Add 'participant' to the set of objects that will receive
        transaction messages.  Note that no particular ordering of
        participants should be assumed.  If the transaction is already
        active, 'participant' will receive a 'begin_txn()' message. If
        a commit or savepoint is in progress, 'participant' may also
        receive other messages to "catch it up" to the other
        participants.  However, if the commit or savepoint has already
        progressed too far for the new participant to join in, a
        TransactionInProgress error will be raised."""

    def __contains__(participant):
        """Has 'participant' joined?"""

    def removeParticipant(participant):
        """Force participant to be removed; for error handler use only"""

    # Getting/setting information about a transaction

    def isActive():
        """Return True if transaction is in progress."""

    def getTimestamp():
        """Return the time that the transaction began, in time.time()
        format, or None if no transaction in progress."""

    def addInfo(**info):
        """Update the transaction's metadata dictionary with the
        supplied keyword arguments.  This can be used to record
        information such as a description of the transaction, the user
        who performed it, etc. Note that the transaction itself does
        nothing with this information. Transaction participants will
        need to retrieve the information with 'getInfo()' and record
        it at the appropriate point during the transaction."""

    def getInfo():
        """Return a copy of the transaction's metadata dictionary"""


class ITransactionParticipant(Interface):

    """Participant in a transaction; may be a resource manager, a transactional
    cache, or just a logging/monitoring object.

    Event sequence is approximately as follows:

        join(participant)
            ( readyToVote voteForCommit commitTransaction ) | abortTransaction

    An abortTransaction may occur at *any* point following join(), and
    aborts the transaction.

    Generally speaking, participants fall into a few broad categories:

    * Database connections

    * Resource managers that write data to another participant, e.g. a
      storage manager writing to a database connection

    * Resource managers that manage their own storage transactions,
      e.g. ZODB Database/Storage objects, a filesystem-based queue, etc.

    * Objects which don't manage any transactional resources, but need to
      know what's happening with a transaction, in order to log it.

    Each kind of participant will typically use different messages to
    achieve their goals.  Resource managers that use other
    participants for storage, for example, won't care much about
    'voteForCommit()', while a resource manager that manages direct storage
    will care about 'voteForCommit()' very deeply!

    Resource managers that use other participants for storage, but
    buffer writes to the other participant, will need to pay close
    attention to the 'readyToVote()' message.  Specifically, they must
    flush all pending writes to the participant that handles their
    storage, and return 'False' if there was anything to flush.
    'readyToVote()' will be called repeatedly on *all* participants until
    they *all* return 'True', at which point the transaction will initiate
    the 'voteForCommit()' phase.

    By following this algorithm, any number of participants may be
    chained together, such as a persistence manager that writes to an
    XML document, which is persisted in a database table, which is
    persisted in a disk file.  The persistence manager, the XML
    document, the database table, and the disk file would all be
    participants, but only the disk file would actually use
    'voteForCommit()' and 'commitTransaction()' to handle a commit.
    All of the other participants would flush pending updates during the
    'readyToVote()' loop, guaranteeing that the disk file participant
    would know about all the updates by the time 'voteForCommit()' was
    issued, regardless of the order in which the participants received
    the messages."""

    def readyToVote(txnService):
        """Transaction commit is beginning; flush dirty objects and
        enter write-through mode, if applicable.  Return a true
        value if nothing needed to be done, or a false value if
        work needed to be done.  DB connections will probably never
        do anything here, and thus will just return a true value.
        Object managers like Entity DMs will write their objects and
        return false, or return true if they have nothing to write.
        Note: participants *must* continue to accept writes until
        'voteForCommit()' occurs, and *must* accept repeated writes
        of the same objects!"""

    def voteForCommit(txnService):
        """Raise an exception if commit isn't possible.  This will
        mostly be used by resource managers that handle their own
        storage, or the few DB connections that are capable of
        multi-phase commit."""

    def commitTransaction(txnService):
        """This message follows vote_commit, if no participants vetoed
        the commit.  DB connections will probably issue COMMIT TRAN
        here. Transactional caches might use this message to reset
        themselves."""





    def abortTransaction(txnService):
        """This message can be received at any time, and means the
        entire transaction must be rolled back.  Transactional caches
        might use this message to reset themselves."""

    def finishTransaction(txnService, committed):
        """The transaction is over, whether it aborted or committed."""


class ITransactionErrorHandler(Interface):

    """Policy object to handle exceptions issued by participants"""

    def voteFailed(txnService, participant):
        """'participant' raised exception during 'voteForCommit()'"""

    def abortFailed(txnService, participant):
        """'participant' raised exception during 'abortTransaction'"""

    def finishFailed(txnService, participant):
        """'participant' raised exception during 'finishTransaction()'"""

    def commitFailed(txnService, participant):
        """'participant' raised exception during 'commitTransaction()'"""

















# DM interfaces
from peak.persistence import IPersistentDataManager

class IDataManager(IComponent,ITransactionParticipant):

    """Data manager for persistent objects or queries"""

    resetStatesAfterTxn = Attribute(
        """Set to false to disable auto-deactivation of objects from cache"""
    )

    def __getitem__(oid):
        """Retrieve the persistent object designated by 'oid'"""

    def preloadState(oid, state):
        """Pre-load 'state' for object designated by 'oid' and return it"""


class IKeyableDM(IDataManager):

    """Data manager that supports "foreign key" references"""

    def oidFor(ob):
        """Return an 'oid' suitable for retrieving 'ob' from this DM"""


class IWritableDM(IKeyableDM):

    """Data manager that possibly supports adding/modifying objects"""

    advise(
        # Can't subclass this if it's a Zope Interface, but we can extend it:
        protocolExtends = [IPersistentDataManager]
    )

    def newItem(klass=None):
        """Create and return a new persistent object of class 'klass'"""

    def flush(ob=None):
        """Sync stored state to in-memory state of 'ob' or all objects"""

class IDataManager_SPI(Interface):

    """Methods/attrs that must/may be redefined in a QueryDM subclass"""

    cache = Attribute("a cache for ghosts and loaded objects")

    defaultClass = Attribute("Default class used for 'newItem()' method")

    def _ghost(oid, state=None):
        """Return a ghost of appropriate class, based on 'oid' and 'state'

        Note that 'state' will be loaded into the returned object via its
        '__setstate__()' method.  If 'state' is 'None', the returned object's
        '_p_deactivate()' method will be called instead."""


    def _load(oid, ob):
        """Load & return the state for 'oid', suitable for '__setstate__()'"""


class IWritableDM_SPI(IDataManager_SPI):

    """Additional methods needed for writing objects in an EntityDM"""

    def _save(ob):
        """Save 'ob' to underlying storage"""

    def _new(ob):
        """Create 'ob' in underlying storage and return its new 'oid'"""

    def _defaultState(ob):
        """Return a default '__setstate__()' state for a new 'ob'"""

    def _thunk(ob):
        """Hook for implementing cross-database "thunk" references"""






# Connection interfaces

class IManagedConnection(IComponent,ITransactionParticipant):

    """Transactable "Connection" object that appears to always be open"""

    connection = Attribute(

        """The actual underlying (LDAP, SQL, etc.) connection object

        This attribute is primarily for use by subclasses of ManagedConnection.
        It is a 'binding.Make()' link to the '_open()' method (see
        IManagedConnImpl interface for details)."""
    )

    txnTime = Attribute(
        """This connection's view of the Unix-format time of this transaction

        This attribute should be computed once and stored for the transaction
        duration, by something like 'SELECT GETDATE()' in the case of an SQL
        connection.  If the underlying connection has no notion of the current
        time, this can be computed by calling getTimestamp() on the transaction
        object.  In any event, accessing this attribute should ensure that the
        connection joins the current transaction, if it hasn't already"""
    )

    def joinTxn():
        """Join the current transaction, if not already joined"""

    def assertUntransacted():
        """Raise 'TransactionInProgress' if transaction already joined"""

    def closeASAP():
        """Close the connection as soon as it's not in a transaction"""

    def close():
        """Close the connection immediately"""




    def __call__(*args, **kw):
        """Return a (possibly initialized) ICursor

            Creates a new ICursor instance initialized with the passed
            keyword arguments.  If positional arguments are supplied,
            they are passed to the new cursor's 'execute()' method before
            it is returned.

            This method is the primary way of interacting with a connection;
            either you'll pass positional arguments and receive an
            initialized and iterable cursor, or you'll call with no arguments
            or keywords only to receive a cursor that you can use to perform
            more "low-level" interactions with the database.
        """


    def registerCursor(ob):
        """Register an object whose 'close()' method must be called"""


    def closeCursor():
        """Close all registered cursors which are still active"""



















# XXX These need more fleshing out w/API, exceptions, etc

class ISQLConnection(IManagedConnection):

    """A ManagedConnection that talks SQL"""

    def getRowConverter(description,post=None):
        """Get row-convert function for a given DBAPI description (or None)

        For the given 'description', which is a DBAPI cursor result description
        (i.e. sequence of tuples describing result columns), return a conversion
        function which will map the values in a row, to application-specific
        datatypes.  (The database-type to application-type mapping should be
        controlled via the connection's configuration properties.)

        The created conversion function will accept a single database result
        row, and return a sequence of values.  'post' is an optional
        postprocessing function which will be passed the entire sequence of
        converted values as a single argument.  The return value of 'post' will
        then be returned from the conversion function.

        Note that if no postprocessing function is supplied, and no columns in
        the given description require type conversion, this method may
        return 'None', indicating that no conversion of any kind is required.
        """

    DRIVER = Attribute(
        """The name of the DBAPI driver module for this connection"""
    )

    appConfig = Attribute(
        """A config.Namespace() pointing to 'DRIVER.appConfig'"""
    )

class ILDAPConnection(IManagedConnection):
    """A ManagedConnection that talks LDAP"""

class IDDEConnection(IManagedConnection):
    """A ManagedConnection that talks DDE"""


class IManagedConn_SPI(Interface):

    """Methods that must/may be defined in a ManagedConnection subclass"""

    def _open():
        """Return new "real" connection to be saved as 'self.connection'

            This method will be called whenever a new connection needs to
            be opened.  It should return an opened connection of the
            appropriate type, using whatever configuration data is
            available.  The result will be saved as 'self.connection' for
            use by other methods.  (Note: your subclass code shouldn't call
            'self._open()', since it'll be called automatically if it's
            needed, when you attempt to use 'self.connection'.

            Overriding this method is required.
        """

    def _close(self):
        """Actions to take before 'del self.connection', if needed.

            This method is automatically called when 'self.connection'
            exists and needs to be closed.  If your subclass needs to
            do anything special at this time (e.g. calling a close
            method on 'self.connection', you can override this method
            to do so.
        """














class ICache(Interface):

    """Cache - a restricted subset of the standard dictionary interface"""

    def get(key, default=None):
        """Retrieve object denoted by 'key', or 'default' if not found

            Note that cache implementations do not have to guarantee that
            'get()' will return items placed in the cache, or indeed
            ever return anything other than 'default'.  For example, the
            'NoCache' type always returns 'default'.
        """

    def __setitem__(key,value):
        """Save 'value' in the cache under 'key'

            Note that no particular lifetime for 'value' remaining in the
            cache is required.  For example, the 'NoCache' type implements
            this method as a no-op.
        """

    def clear():
        """Clear cache contents, if any"""


    def values():
        """Return a sequence of the cache's contents"""














class ICursor(Interface):

    """Iterable database cursor"""

    def execute(*args):

        """Execute a command

            Note that the types and semantics of this method's arguments
            are database-specific.  DBAPI cursors expect an SQL 'command'
            and an optional 'params' object, while LDAP cursors expect
            the arguments for an LDAP 'search' operation.

            Following 'execute()', a cursor should be ready for iteration
            over its result rows.
        """

    def __iter__():
        """Return an iterator returning the IRows of the result"""

    def allSets():
        """Return an iterator returning a list of IRows for each result set"""

    def justOne():
        """Assert that result contains only one IRow, and return it"""

    def __invert__():
        """The same as justOne(); i.e. '~cursor == cursor.justOne()'"""

    def nextset():
        """Result of calling the true DB cursor's 'nextset()', or None"""

    def close():
        """Close/reset the true DB cursor; the proxy can still be reused"""


class IRow(Interface):
    """Row that smells like a tuple, dict, or instance attr"""



class ISQLIntrospector(Interface):
    """Adapt a managed connection to this to obtain information on
       objects in the database"""

    def listObjects(full=False, obtypes=NOT_GIVEN):
        """Returns an active cursor with information on objects in the DB

            with full=True, includes all available information, otherwise
            only includes the information likely to be most elevant to the
            user.

            The returned cursor shall have a column 'obname' first, with
            the object name, and a column 'obtype' second, with one of
            the following values standardized:

                table, systable, view, proc, index, synonym

            if obtypes is given, it shall be a sequence of the above types,
            and rows shall only be returned for the given types.
        """






















cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help