[Subversion] / PEAK / src / peak / storage / transactions.py  

View of /PEAK/src/peak/storage/transactions.py

Parent Directory | Revision Log
Revision: 1060 - (download) (as text)
Thu May 1 16:32:39 2003 UTC (21 years ago) by pje
File size: 7815 byte(s)
Paramegeddon!  Adjusted API signatures so that all calls that have a
context component, have it as the first parameter.  Changed functions,
methods, and classes are:

* binding.acquireComponent()

* binding.lookupComponent()

* config.getProperty()

* config.findUtility()

* config.findUtilities()

* config.PropertyMap.getValueFor()

* config.IConfigSource._getConfigData()

* config.PropertySet()

* naming.lookup()

* naming.parseURL()

Also, renamed 'config.LazyLoader' -> 'config.LazyRule' to reduce confusion
with 'storage.LazyLoader', which has a very different purpose/function.
from peak.api import *
from interfaces import *
from time import time
import sys

__all__ = [
    'TransactionService', 'AbstractParticipant', 'TransactionComponent',
    'BasicTxnErrorHandler', 'getTransaction', 'beginTransaction',
    'commitTransaction', 'abortTransaction', 'begin', 'commit', 'abort',
]


def getTransaction(subject):
    return config.findUtility(subject,ITransactionService)

def beginTransaction(subject, **info):
    getTransaction(subject).begin(**info)

def commitTransaction(subject):
    getTransaction(subject).commit()

def abortTransaction(subject):
    getTransaction(subject).abort()


begin  = beginTransaction
commit = commitTransaction
abort  = abortTransaction













class BasicTxnErrorHandler(object):

    """Simple error handling policy, w/simple logging, no retries"""

    __implements__ = ITransactionErrorHandler


    def voteFailed(self, txnService, participant):

        t,v,tb = sys.exc_info()

        try:
            LOG_WARNING(
                "Error during participant vote", txnService, exc_info=(t,v,tb)
            )

            # Force txn to abort
            txnService.fail()
            raise t,v,tb

        finally:
            del t,v,tb


    def commitFailed(self, txnService, participant):

        t,v,tb = sys.exc_info()

        try:
            LOG_CRITICAL(
                "Unrecoverable transaction failure", txnService,
                exc_info=(t,v,tb)
            )
            txnService.fail()
            raise t,v,tb

        finally:
            del t,v,tb



    def abortFailed(self, txnService, participant):

        t,v,tb = sys.exc_info()

        try:
            LOG_WARNING(
                "Error during participant abort", txnService, exc_info=(t,v,tb)
            )

            # remove and retry after fail
            txnService.removeParticipant(participant)
            raise t,v,tb

        finally:
            del t,v,tb


    def finishFailed(self, txnService, participant, committed):

        LOG_WARNING(
            "Error during participant finishTransaction", txnService,
            exc_info=True
        )

        # ignore the error
        txnService.removeParticipant(participant)















class TransactionState(binding.Component):

    """Helper object representing a single transaction's state"""

    participants = binding.New(list)
    info         = binding.New(dict)
    timestamp    = None
    safeToJoin   = True
    cantCommit   = False



class TransactionService(binding.Component):

    """Basic transaction service component"""

    __implements__ = ITransactionService

    state          = binding.New(TransactionState)
    errorHandler   = binding.New(BasicTxnErrorHandler)

    def join(self, participant):

        if self.state.cantCommit:
            raise exceptions.BrokenTransaction

        elif not self.isActive():
            raise exceptions.OutsideTransaction

        elif self.state.safeToJoin:

            if participant not in self.state.participants:
                self.state.participants.append(participant)

        else:
            raise exceptions.TransactionInProgress





    def _prepare(self):

        """Get votes from all participants

        Ask all participants if they're ready to vote, up to N+1 times (where
        N is the number of participants), until all agree they are ready, or
        an exception occurs.  N+1 iterations is sufficient for any acyclic
        structure of cascading data managers.  Any more than that, and either
        there's a cascade cycle or a broken participant is always returning a
        false value from its readyToVote() method.

        Once all participants are ready, ask them all to vote."""

        tries = 0
        unready = True
        state = self.state

        while unready and tries <= len(state.participants):
            unready = [p for p in state.participants if not p.readyToVote(self)]
            tries += 1

        if unready:
            raise exceptions.NotReadyError(unready)


        self.state.safeToJoin = False

        for p in state.participants:
            try:
                p.voteForCommit(self)
            except:
                self.errorHandler.voteFailed(self,p)

        return True







    def begin(self, **info):

        if self.isActive():
            raise exceptions.TransactionInProgress

        self.state.timestamp = time()
        self.addInfo(**info)


    def commit(self):

        if not self.isActive():
            raise exceptions.OutsideTransaction

        if self.state.cantCommit:
            raise exceptions.BrokenTransaction

        self._prepare()

        for p in self.state.participants:
            try:
                p.commitTransaction(self)
            except:
                self.errorHandler.commitFailed(self,p)

        self._cleanup(True)


    def fail(self):

        if not self.isActive():
            raise exceptions.OutsideTransaction

        self.state.cantCommit = True
        self.state.safeToJoin = False


    def removeParticipant(self,participant):
        self.state.participants.remove(participant)


    def abort(self):

        if not self.isActive():
            raise exceptions.OutsideTransaction

        self.fail()

        for p in self.state.participants[:]:
            try:
                p.abortTransaction(self)
            except:
                self.errorHandler.abortFailed(self,p)

        self._cleanup(False)


    def getTimestamp(self):

        """Return the time that the transaction began, in time.time()
        format, or None if no transaction in progress."""

        return self.state.timestamp


    def addInfo(self, **info):

        if self.state.cantCommit:
            raise exceptions.BrokenTransaction

        elif self.state.safeToJoin:
            self.state.info.update(info)

        else:
            raise exceptions.TransactionInProgress



    def getInfo(self):
        return self.state.info


    def _cleanup(self, committed):

        for p in self.state.participants[:]:
            try:
                p.finishTransaction(self,committed)
            except:
                self.errorHandler.finishFailed(self,p,committed)

        del self.state


    def isActive(self):
        return self.state.timestamp is not None


    def __contains__(self,ob):
        return ob in self.state.participants
























class AbstractParticipant(object):

    __implements__ = ITransactionParticipant

    def readyToVote(self, txnService):
        return True

    def voteForCommit(self, txnService):
        pass

    def commitTransaction(self, txnService):
        pass

    def abortTransaction(self, txnService):
        pass

    def finishTransaction(self, txnService, committed):
        pass























class TransactionComponent(binding.Component, AbstractParticipant):

    """Object that has a 'txnSvc' and auto-joins transactions"""

    __implements__ = (
        binding.Component.__implements__, AbstractParticipant.__implements__
    )

    inTransaction = False

    txnAttrs = 'joinedTxn', 'inTransaction'

    def joinedTxn(self,d,a):

        """Our TransactionService (auto-joined when attribute is accessed)"""

        ts = self.txnSvc
        ts.join(self)
        self.inTransaction = True
        d[a] = ts
        self.onJoinTxn(ts)
        return ts

    joinedTxn = binding.Once(joinedTxn)

    txnSvc = binding.Acquire(ITransactionService)

    def onJoinTxn(self, txnService):
        pass

    def finishTransaction(self, txnService, committed):

        """Ensure that subsequent transactions will require re-registering"""

        d = self.__dict__
        have = d.has_key

        for attr in self.txnAttrs:
            if have(attr):
                del d[attr]


cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help