[Subversion] / PEAK / src / peak / tools / supervisor / process.py  

View of /PEAK/src/peak/tools/supervisor/process.py

Parent Directory | Revision Log
Revision: 1521 - (download) (as text)
Thu Dec 4 17:46:53 2003 UTC (20 years, 5 months ago) by pje
File size: 7085 byte(s)
Promote peak.running.supervisor to peak.tools.supervisor.
"""Pre-forking process supervisor for FastCGI and other server-like apps"""

from peak.api import *
from interfaces import *
from peak.running.commands import EventDriven, Bootstrap
from peak.running.process import signals, signal_names
from shlex import shlex
from cStringIO import StringIO

def tokenize(s):
    return list(iter(shlex(StringIO(s)).get_token,''))

def unquote(s):
    if s.startswith('"') or s.startswith("'"):
        s = s[1:-1]
    return s

























class ProcessSupervisor(EventDriven):

    log           = binding.Obtain('logging.logger:supervisor')

    pidFile       = binding.Require("Filename where process ID is kept")
    minChildren   = 1
    maxChildren   = 4
    startInterval = 15  # seconds between forks
    importModules = ()  # Placeholder for ZConfig pre-import hook; see schema

    cmdText = ""        # String form of subprocess command line

    cmdLine = binding.Make(
        lambda self: [
            unquote(x) for x in tokenize(self.cmdText)
        ]+self.argv[1:]
    )

    template = binding.Make(
        lambda self: self.getSubcommand(
            Bootstrap,
            argv = ['supervise'] + self.cmdLine,
            parentComponent = config.makeRoot(),
        ),
        adaptTo=running.IProcessTemplate,
        offerAs=[running.IProcessTemplate],
        suggestParent=False
    )

    startLockURL = binding.Make(
        lambda self: "flockfile:%s.start" % self.pidFile
    )

    pidLockURL = binding.Make(
        lambda self: "flockfile:%s.lock" % self.pidFile
    )





    startLock = binding.Make(
        lambda self: self.lookupComponent(self.startLockURL),
        adaptTo = running.ILock
    )

    pidLock = binding.Make(
        lambda self: self.lookupComponent(self.pidLockURL),
        adaptTo = running.ILock
    )

    lastStart = None    # last time a fork occurred
    nextStart = None    # next scheduled fork

    processes = binding.Make(dict)
    plugins   = binding.Make(list)

    reactor = binding.Make(
        # Can't use Make(IBasicReactor), because 'getReactor()' is a singleton
        'peak.running.scheduler:UntwistedReactor',
        offerAs=[running.IBasicReactor]
    )

    mainLoop = binding.Make(running.IMainLoop, offerAs=[running.IMainLoop])

    taskQueue = binding.Make(running.ITaskQueue, offerAs=[running.ITaskQueue])

    _no_twisted = binding.Require(
        "ProcessSupervisor subcomponents may not depend on Twisted",
        offerAs = [running.ITwistedReactor]
    )

    import os

    from time import time

    def setup(self):
        self.log.debug("Beginning setup")
        template = adapt(self.template, ISupervisorPluginProvider, None)
        if template is not None:
            self.plugins.extend(template.getSupervisorPlugins(self))

    def _run(self):

        if not self.startLock.attempt():
            self.log.warning("Another process is in startup; exiting")
            return 1        # exit with errorlevel 1

        try:
            self.setup()
            self.killPredecessor()
            self.writePidFile()
        finally:
            self.startLock.release()

        self.startIfTooFew()

        retcode = super(ProcessSupervisor,self)._run()

        if adapt(retcode,running.IExecutable,None) is not None:
            # child process, drop out to the trampoline
            return retcode

        self.killProcesses()
        self.removePidFile()

        return retcode


    def requestStart(self):
        if self.nextStart is None:
            if self.lastStart is None:
                self.nextStart = self.time()
            else:
                self.nextStart = max(
                    self.lastStart + self.startInterval, self.time()
                )
            delay = self.nextStart-self.time()
            self.log.debug("Scheduling child start in %.1d seconds",delay)
            self.mainLoop.activityOccurred()
            self.reactor.callLater(delay, self._doStart)


    def _doStart(self):

        if len(self.processes)>=self.maxChildren:
            return

        proxy, stub = self.template.spawn(self)

        if proxy is None:
            self.mainLoop.childForked(stub)
            return

        self.mainLoop.activityOccurred()
        self.log.debug("Spawned new child process (%d)", proxy.pid)
        proxy.addListener(self._childChange)
        self.processes[proxy.pid] = proxy

        for plugin in self.plugins:
            plugin.processStarted(proxy)

        self.lastStart = self.time()
        self.nextStart = None

        # We might not be up to our minimum yet
        self.startIfTooFew()


    def killProcesses(self):
        self.log.debug("Killing child processes")
        for pid,proxy in self.processes.items():
            proxy.sendSignal('SIGTERM')


    def startIfTooFew(self):
        if len(self.processes)<self.minChildren:
            self.requestStart()






    def _childChange(self,proxy):

        self.mainLoop.activityOccurred()

        if proxy.isFinished:

            if proxy.exitedBecause:
                self.log.warning(
                    "Child process %d exited due to signal %d (%s)",
                    proxy.pid, proxy.exitedBecause,
                    signal_names.setdefault(proxy.exitedBecause,('?',))[0]
                )
            elif proxy.exitStatus:
                self.log.warning(
                    "Child process %d exited with errorlevel %d",
                    proxy.pid, proxy.exitStatus
                )
            else:
                self.log.debug("Child process %d has finished", proxy.pid)

            del self.processes[proxy.pid]

            self.startIfTooFew()

        elif proxy.stoppedBecause:
            self.log.error("Child process %d stopped due to signal %d (%s)",
                proxy.pid, proxy.stoppedBecause,
                signal_names.getdefault(proxy.stoppedBecause,('?',))[0]
            )

        elif proxy.isStopped:
            self.log.error("Child process %d has stopped", proxy.pid)









    def writePidFile(self):
        self.pidLock.obtain()
        try:
            pf = open(self.pidFile,'w')
            pf.write('%d\n' % self.os.getpid())
            pf.close()
        finally:
            self.pidLock.release()


    def readPidFile(self, func):
        self.pidLock.obtain()
        try:
            if self.os.path.exists(self.pidFile):
                pf = open(self.pidFile,'r')
                func(int(pf.readline().strip()))
                pf.close()
        finally:
            self.pidLock.release()


    def removePidFile(self):

        def removeIfMe(pid):
            if pid==self.os.getpid():
                self.log.debug("Unlinking %s", self.pidFile)
                self.os.unlink(self.pidFile)

        self.readPidFile(removeIfMe)

    def killPredecessor(self):

        def doKill(pid):
            try:
                self.log.debug("Killing predecessor (process #%d)", pid)
                self.os.kill(pid,signals['SIGTERM'])
            except:
                pass # XXX

        self.readPidFile(doKill)


cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help