[Subversion] / PEAK / src / peak / running / tests / __init__.py  

View of /PEAK/src/peak/running/tests/__init__.py

Parent Directory | Revision Log
Revision: 1791 - (download) (as text)
Sun Jul 25 04:24:12 2004 UTC (19 years, 9 months ago) by pje
File size: 8439 byte(s)
Removed 'peak.running.timers' and 'peak.util.dispatch'.  Neither was in
active use, and both are being replaced by the new generic functions
package in PyProtocols.
"""Running system tests"""

from unittest import TestCase, makeSuite, TestSuite
from peak.api import *
from peak.tests import testRoot
from peak.running.clusters import loadCluster
from peak.running.daemons import AdaptiveTask, TaskQueue
from peak.running.scheduler import UntwistedReactor, MainLoop

test = binding.Component()

loadCluster(test.__instance_offers__,
    config.fileNearModule(__name__,'test_cluster.txt')
)

pm = running.CLUSTER.of(test)


groupToHosts = Items(

    __all__= (
            'foo.baz.com', 'bar.baz.com', 'one.baz.com', 'three.baz.com',
            'five.baz.com', 'two.baz.com','four.baz.com','six.baz.com',
            'frob.baz.com'
    ),

    __orphans__ = ('foo.baz.com', 'bar.baz.com'),

    odd     = ('one.baz.com','three.baz.com','five.baz.com'),
    even    = ('two.baz.com','four.baz.com','six.baz.com'),
    prime   = ('one.baz.com','three.baz.com','five.baz.com','two.baz.com'),
    qux     = ('one.baz.com','frob.baz.com'),

    weird   = (
        'one.baz.com','three.baz.com','five.baz.com','two.baz.com',
        'frob.baz.com'
    ),
)



class ClusterTests(TestCase):

    def checkHosts(self):
        assert pm.hosts==pm['groups.__all__']

    def checkGroups(self):
        assert pm.groups==('odd','even','prime','qux','weird')

    def checkMembers(self):
        groups = pm('groups')
        for group, members in groupToHosts:
            assert groups[group] == members, (group,members,groups[group])

    # XXX need host->group tests, plus ???



























class TestClock(binding.Component):

    now = 0
    cpu = 0
    log = binding.Require("function to call with event data")

    def time(self):
        return self.now

    def clock(self):
        return self.cpu

    def run(self,secs):
        self.log(("Running for",secs))
        self.now += secs
        self.cpu += secs

    def sleep(self,secs):
        self.log(("Sleeping for",secs))
        self.now += secs

    def select(self,i,o,e,timeout):
        self.log(("Select:",i,o,e,timeout))
        self.sleep(timeout)

















class ScheduleTestTask(AdaptiveTask):

    job = binding.Require("Simulated job, if any")
    log = binding.Require("function to call with event data")

    def getWork(self):
        self.log(("getting work", self.job))
        return self.job

    def doWork(self,job):
        self.log(("doing work", job))
        return job


class QuietTask(ScheduleTestTask):

    def getWork(self):
        return self.job























class TestApp(config.ServiceArea):

    clock = binding.Make(lambda self: TestClock(log=self.append))

    reactor = binding.Make(
        lambda self: UntwistedReactor(sleep = self.clock.sleep),
        offerAs=[running.IBasicReactor]
    )

    scheduler = binding.Make(
        lambda self: events.Scheduler(self.clock.time),
        offerAs=[events.IScheduler]
    )

    mainLoop = binding.Make(
        lambda self: MainLoop(sleep = self.clock.sleep),
        offerAs=[running.IMainLoop]
    )

    log = binding.Make(list)

    append = binding.Obtain('log/append')



ping = ('doing work','ping')
pong = ('doing work','pong')
spam = ('doing work','spam')
foo = ('doing work','foo')
bar = ('doing work','bar')
sleep = lambda n: ('Sleeping for',n)

notWork = ('getting work',False)
gotWork = ('getting work',True)
didWork = ('doing work',True)






class ReactiveTests(TestCase):

    verbose = False

    def setUp(self):
        self.app = TestApp(testRoot())
        self.log = self.app.log
        self.append = self.app.append


    def tearDown(self):
        if self.verbose:
            print "Event log for %s:" % (self,)
            print
            from pprint import pprint
            pprint(self.log)
            print


    def checkActivationAndPriorityQueueing(self):

        app = self.app
        tq = app.lookupComponent(running.ITaskQueue)

        tasks = [
            ScheduleTestTask(app, runEvery = 5, priority = 5-priority)
                for priority in range(5)
        ]

        assert len(tq.activeTasks)==5, (tq.activeTasks, tasks)

        for priority in range(5):
            assert tq.activeTasks[priority] is tasks[4-priority]








    def checkPrioritized1(self):

        log = self.append
        app = self.app

        t1 = QuietTask(app, runEvery = 1, job = "ping", log = log, priority=1)
        t2 = QuietTask(app, runEvery = 2, job = "pong", log = log, priority=2)

        app.mainLoop.run(4)

        self.assertEqual(self.log, [
            pong, ping, sleep(1), ping, sleep(1), pong, ping, sleep(1), ping,
            sleep(1), #pong, ping
        ])


    def checkPrioritized2(self):

        log = self.append
        app = self.app

        t1 = QuietTask(app, runEvery = 1, job = "ping", log = log, priority=2)
        t2 = QuietTask(app, runEvery = 2, job = "pong", log = log, priority=1)

        app.mainLoop.run(4)

        self.assertEqual(self.log, [
            ping, pong, sleep(1), ping, sleep(1), ping, pong, sleep(1), ping,
            sleep(1), #ping, pong
        ])











    def checkIntervalsAndMainLoop(self):

        log = self.append
        app = self.app

        t5 = QuietTask(app, runEvery = 5, job = "ping", log = log)
        t7 = QuietTask(app, runEvery = 7, job = "pong", log = log)

        app.mainLoop.run(60, 5, 20.5)

        # Note: this does not match exactly what would happen with a "real"
        # clock; execution times will make a difference.  But *because* this
        # isn't real, the test is therefore stable (i.e. deterministic), as
        # well as faster-than-real-time.

        self.assertEqual(self.log, [
            ping, pong, sleep(5), ping, sleep(2), pong, sleep(3), # 10 seconds
            ping, sleep(4), pong, sleep(1), ping, sleep(5), ping, # 20 seconds
            sleep(.5), # at 20.5 seconds idle checking begins
            sleep(.5), # but not idle enough yet, so next is pong at 21 seconds
            pong, sleep(4.0), ping, # 25 seconds

            sleep(3.0),
            #sleep(0.5), # wake up and make sure we haven't been idle for 5 secs
            #sleep(0.5), # our first timeout set at 20.5 goes off, but no effect
            #sleep(2.0), # false alarms, so finish waiting to 28 secs
            
            pong, sleep(2.0), ping,    # 30 seconds now

            sleep(5.0),

            #sleep(3.0), # idle checker looking for timeout, false alarm again
            #sleep(2.0), # idle timeout, so stop! 
            #pong, ping  # finit at 35 seconds
        ])






    def checkAdaptativeScheduling(self):

        log = self.append

        task = ScheduleTestTask(
            runEvery = 5,
            minimumIdle = 1,
            maximumIdle = 150,
            multiplyIdleBy = 2,
            increaseIdleBy = 7,
            job = False,
            log = log,
        )


        for i in range(10):
            if i==6: task.job = True
            if i==8: task.job = False
            log((i,task(),task.pollInterval))

        self.assertEqual(self.log, [
            notWork, (0,False,5),
            notWork, (1,False,17),
            notWork, (2,False,41),
            notWork, (3,False,89),
            notWork, (4,False,150),
            notWork, (5,False,150),
            gotWork, didWork, (6,True,1),
            gotWork, didWork, (7,True,1),
            notWork, (8,False,5),
            notWork, (9,False,17),
        ])









    def checkRoundRobin(self):

        log = self.append
        app = self.app

        t1 = QuietTask(app, runEvery = 1, job = "ping", log = log, priority=1)
        t2 = QuietTask(app, runEvery = 1, job = "pong", log = log, priority=1)
        t3 = QuietTask(app, runEvery = 1, job = "spam", log = log, priority=1)
        t4 = QuietTask(app, runEvery = 1, job = "foo", log = log, priority=1)
        t5 = QuietTask(app, runEvery = 1, job = "bar", log = log, priority=1)

        app.mainLoop.run(5)

        all = [ping,pong,spam,foo,bar]

        self.assertEqual(self.log, (all+[sleep(1)]) * 5) # + all

























TestClasses = (
    ClusterTests, ReactiveTests
)

def _suite():
    s = []
    for t in TestClasses:
        s.append(makeSuite(t,'check'))

    return TestSuite(s)


allSuites = [
    'peak.running.tests:_suite',
    'test_logs:test_suite',
]


def test_suite():
    from peak.util.imports import importSuite
    return importSuite(allSuites, globals())




















cvs-admin@eby-sarna.com

Powered by ViewCVS 1.0-dev

ViewCVS and CVS Help