[Subversion] / PEAK / src / peak / util / threads.py  

View of /PEAK/src/peak/util/threads.py

Parent Directory | Revision Log
Revision: 1076 - (download) (as text)
Sat May 3 13:21:20 2003 UTC (20 years, 11 months ago) by pje
File size: 5211 byte(s)
Normalized whitespace.
"""Threading-related utilities

    This module provides a degree of platform independence for thread support.
    At the moment, it only really provides conditional 'allocate_lock()' and
    'get_ident()' functions, but other API's may be added in the future.

    By default, this module will use thread support if it is available.  You
    can override this, however, by using the 'disableThreading()',
    'allowThreading()', or 'requireThreading()' API function, preferably at the
    start of your program before any other modules have a chance to use any
    of this module's APIs.
"""

__all__ = [
    'allocate_lock', 'get_ident', 'DummyLock', 'RLock', 'ResourceManager',
    'allowThreading', 'disableThreading', 'requireThreading',
    'HAVE_THREADS', 'globalRM',
]

try:
    import thread
except ImportError:
    HAVE_THREADS = False
else:
    HAVE_THREADS = True


# default is to 'allowThreading'

USE_THREADS = HAVE_THREADS


if HAVE_THREADS:
    from thread import get_ident
else:
    def get_ident(): return 1





def allowThreading():

    """Enable the use of real thread locks, if possible"""

    global USE_THREADS
    USE_THREADS = HAVE_THREADS


def disableThreading():

    """Don't use threads, even if we have them.

    Note that this must be called *before* any locks have been allocated, as it
    only affects subsequent allocations."""

    global USE_THREADS
    USE_THREADS = False


def requireThreading():

    """Raise RuntimeError if threads aren't available; otherwise enable them"""

    if not HAVE_THREADS:
        raise RuntimeError, "Threads required but not available"

    self.allowThreading()


def allocate_lock():

    """Returns either a real or dummy thread lock, depending on availability"""

    if USE_THREADS:
        return thread.allocate_lock()

    else:
        return DummyLock()



class DummyLock(object):

    """Dummy lock type used when threads are inactive or unavailable"""

    __slots__ = ['_lockCount']

    def __init__(self):
        self._lockCount = 0


    def acquire(self, *waitflag):

        lc = self._lockCount = self._lockCount + 1

        if lc==1:
            if waitflag:
                # an argument was supplied, return success
                return 1
            else:
                # no argument, just return None
                return None

        else:
            self._lockCount -= 1

            if waitflag and not waitflag[0]:
                # no-wait; unlock/return failure
                return 0

            raise RunTimeError(
                "Attempt to double-lock a lock in a single-threaded program"
            )


    def release(self):
        self._lockCount -= 1

    def locked(self):
        return self._lockCount and True or False


class RLock(object):

    """Re-entrant lock; can be locked multiple times by same thread"""

    __slots__ = 'lock','owner','count'

    def __init__(self):
        self.lock  = allocate_lock()
        self.owner = 0
        self.count = 0

    def attempt(self):
        """Nonblocking, nestable attempt to acquire; returns success flag"""
        return self._doAcquire(False)

    def obtain(self):
        """Blocking, nestable wait to acquire; success unless error"""
        return self._doAcquire(True)

    def release(self):
        """Release one level of lock nesting; must be 'locked()' when called"""

        me = get_ident()

        if self.owner <> me:
            raise RuntimeError, "release() of un-acquire()d lock"

        count = self.count = self.count - 1

        if count==0:
            self.owner = 0
            self.lock.release()

    def locked(self):
        """Is this lock owned by the current thread?"""
        return self.owner==get_ident() and self.count>0





    def _doAcquire(self, blocking):

        me = get_ident()

        if self.owner == me:
            self.count += 1
            return True

        if blocking:
            self.lock.acquire(); rc = True
        else:
            rc = self.lock.acquire(blocking)

        if rc:
            self.owner = me
            self.count = 1

        return rc























class ResourceManager(object):

    """Hold/manage thread locks for arbitrary resource keys"""

    __slots__ = 'locks', 'lock'

    def __init__(self):
        self.locks = {}
        self.lock  = allocate_lock()

    def attempt(self, key):
        """Nonblocking, nestable attempt to acquire; returns success flag"""
        return self[key].attempt()

    def obtain(self, key):
        """Blocking, nestable wait to acquire; success unless error"""
        return self[key].obtain()

    def release(self, key):
        """Release one level of locking; key must be 'locked()' when called"""
        return self[key].release()

    def locked(self, key):
        """Is this key owned by the current thread?"""
        return self[key].locked()

    def __getitem__(self, key):
        self.lock.acquire()
        try:
            try:
                return self.locks[key]
            except KeyError:
                rlock = self.locks[key] = RLock()
                return rlock
        finally:
            self.lock.release()


globalRM = ResourceManager()



cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help