K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / lib / python2.7 / dist-packages / twisted / trial / _dist / test /
server ip : 172.67.156.115

your ip : 172.69.7.43

H O M E


Filename/usr/lib/python2.7/dist-packages/twisted/trial/_dist/test/test_disttrial.py
Size12.1 kb
Permissionrw-r--r--
Ownerroot : root
Create time27-Apr-2025 09:56
Last modified30-May-2013 09:52
Last accessed07-Jul-2025 20:58
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.trial._dist.disttrial}.
"""

import os
import sys
from cStringIO import StringIO

from twisted.internet.protocol import ProcessProtocol
from twisted.internet.defer import fail, succeed
from twisted.internet.task import Cooperator, deferLater
from twisted.internet.main import CONNECTION_DONE
from twisted.internet import reactor
from twisted.python.failure import Failure
from twisted.python.lockfile import FilesystemLock

from twisted.test.test_cooperator import FakeScheduler

from twisted.trial.unittest import TestCase
from twisted.trial.reporter import Reporter, TreeReporter
from twisted.trial.reporter import UncleanWarningsReporterWrapper
from twisted.trial.runner import TrialSuite, ErrorHolder

from twisted.trial._dist.disttrial import DistTrialRunner
from twisted.trial._dist.distreporter import DistReporter
from twisted.trial._dist.worker import LocalWorker



class FakeTransport(object):
"""
A simple fake process transport.
"""

def writeToChild(self, fd, data):
"""
Ignore write calls.
"""



class FakeReactor(object):
"""
A simple fake reactor for testing purposes.
"""
spawnCount = 0
stopCount = 0
runCount = 0

def spawnProcess(self, worker, *args, **kwargs):
worker.makeConnection(FakeTransport())
self.spawnCount += 1


def stop(self):
self.stopCount += 1


def run(self):
self.runCount += 1


def addSystemEventTrigger(self, *args, **kw):
pass



class DistTrialRunnerTestCase(TestCase):
"""
Tests for L{DistTrialRunner}.
"""

def setUp(self):
"""
Create a runner for testing.
"""
self.runner = DistTrialRunner(TreeReporter, 4, [],
workingDirectory=self.mktemp())
self.runner._stream = StringIO()


def test_writeResults(self):
"""
L{DistTrialRunner.writeResults} writes to the stream specified in the
init.
"""
stringIO = StringIO()
result = DistReporter(Reporter(stringIO))
self.runner.writeResults(result)
self.assertTrue(stringIO.tell() > 0)


def test_createLocalWorkers(self):
"""
C{createLocalWorkers} iterates the list of protocols and create one
L{LocalWorker} for each.
"""
protocols = [object() for x in xrange(4)]
workers = self.runner.createLocalWorkers(protocols, "path")
for s in workers:
self.assertIsInstance(s, LocalWorker)
self.assertEqual(4, len(workers))


def test_launchWorkerProcesses(self):
"""
Given a C{spawnProcess} function, C{launchWorkerProcess} launches a
python process with a existing path as its argument.
"""
protocols = [ProcessProtocol() for i in range(4)]
arguments = []
environment = {}

def fakeSpawnProcess(processProtocol, executable, args=(), env={},
path=None, uid=None, gid=None, usePTY=0,
childFDs=None):
arguments.append(executable)
arguments.extend(args)
environment.update(env)

self.runner.launchWorkerProcesses(
fakeSpawnProcess, protocols, ["foo"])
self.assertEqual(arguments[0], arguments[1])
self.assertTrue(os.path.exists(arguments[2]))
self.assertEqual("foo", arguments[3])
self.assertEqual(os.pathsep.join(sys.path),
environment["TRIAL_PYTHONPATH"])


def test_run(self):
"""
C{run} starts the reactor exactly once and spawns each of the workers
exactly once.
"""
fakeReactor = FakeReactor()
suite = TrialSuite()
for i in xrange(10):
suite.addTest(TestCase())
self.runner.run(suite, fakeReactor)
self.assertEqual(fakeReactor.runCount, 1)
self.assertEqual(fakeReactor.spawnCount, self.runner._workerNumber)


def test_runUsedDirectory(self):
"""
L{DistTrialRunner} checks if the test directory is already locked, and
if it is generates a name based on it.
"""

class FakeReactorWithLock(FakeReactor):

def spawnProcess(oself, worker, *args, **kwargs):
self.assertEqual(os.path.abspath(worker._logDirectory),
os.path.abspath(
os.path.join(workingDirectory + "-1",
str(oself.spawnCount))))
localLock = FilesystemLock(workingDirectory + "-1.lock")
self.assertFalse(localLock.lock())
oself.spawnCount += 1
worker.makeConnection(FakeTransport())
worker._ampProtocol.run = lambda *args: succeed(None)

newDirectory = self.mktemp()
os.mkdir(newDirectory)
workingDirectory = os.path.join(newDirectory, "_trial_temp")
lock = FilesystemLock(workingDirectory + ".lock")
lock.lock()
self.addCleanup(lock.unlock)
self.runner._workingDirectory = workingDirectory

fakeReactor = FakeReactorWithLock()
suite = TrialSuite()
for i in xrange(10):
suite.addTest(TestCase())
self.runner.run(suite, fakeReactor)


def test_minimalWorker(self):
"""
L{DistTrialRunner} doesn't try to start more workers than the number of
tests.
"""
fakeReactor = FakeReactor()
self.runner.run(TestCase(), fakeReactor)
self.assertEqual(fakeReactor.runCount, 1)
self.assertEqual(fakeReactor.spawnCount, 1)


def test_runUncleanWarnings(self):
"""
Running with the C{unclean-warnings} option makes L{DistTrialRunner}
uses the L{UncleanWarningsReporterWrapper}.
"""
fakeReactor = FakeReactor()
self.runner._uncleanWarnings = True
result = self.runner.run(TestCase(), fakeReactor)
self.assertIsInstance(result, DistReporter)
self.assertIsInstance(result.original,
UncleanWarningsReporterWrapper)


def test_runWithoutTest(self):
"""
When the suite contains no test, L{DistTrialRunner} takes a shortcut
path without launching any process or starting the reactor.
"""
fakeReactor = object()
suite = TrialSuite()
result = self.runner.run(suite, fakeReactor)
self.assertIsInstance(result, DistReporter)
output = self.runner._stream.getvalue()
self.assertIn("Running 0 test", output)
self.assertIn("PASSED", output)


def test_runWithoutTestButWithAnError(self):
"""
Even if there is no test, the suite can contain an error (most likely,
an import error): this should make the run fail, and the error should
be printed.
"""
fakeReactor = object()
error = ErrorHolder("an error", Failure(RuntimeError("foo bar")))
result = self.runner.run(error, fakeReactor)
self.assertIsInstance(result, DistReporter)
output = self.runner._stream.getvalue()
self.assertIn("Running 0 test", output)
self.assertIn("foo bar", output)
self.assertIn("an error", output)
self.assertIn("errors=1", output)
self.assertIn("FAILED", output)


def test_runUnexpectedError(self):
"""
If for some reasons we can't connect to the worker process, the test
suite catches and fails.
"""

class FakeReactorWithFail(FakeReactor):

def spawnProcess(self, worker, *args, **kwargs):
worker.makeConnection(FakeTransport())
self.spawnCount += 1
worker._ampProtocol.run = self.failingRun

def failingRun(self, case, result):
return fail(RuntimeError("oops"))

scheduler = FakeScheduler()
cooperator = Cooperator(scheduler=scheduler)

fakeReactor = FakeReactorWithFail()
result = self.runner.run(TestCase(), fakeReactor,
cooperator.cooperate)
self.assertEqual(fakeReactor.runCount, 1)
self.assertEqual(fakeReactor.spawnCount, 1)
scheduler.pump()
self.assertEqual(1, len(result.original.failures))


def test_runStopAfterTests(self):
"""
L{DistTrialRunner} calls C{reactor.stop} and unlocks the test directory
once the tests have run.
"""
functions = []

class FakeReactorWithSuccess(FakeReactor):

def spawnProcess(self, worker, *args, **kwargs):
worker.makeConnection(FakeTransport())
self.spawnCount += 1
worker._ampProtocol.run = self.succeedingRun

def succeedingRun(self, case, result):
return succeed(None)

def addSystemEventTrigger(oself, phase, event, function):
self.assertEqual('before', phase)
self.assertEqual('shutdown', event)
functions.append(function)

workingDirectory = self.runner._workingDirectory

fakeReactor = FakeReactorWithSuccess()
self.runner.run(TestCase(), fakeReactor)

def check():
localLock = FilesystemLock(workingDirectory + ".lock")
self.assertTrue(localLock.lock())
self.assertEqual(1, fakeReactor.stopCount)
# We don't wait for the process deferreds here, so nothign is
# returned by the function before shutdown
self.assertIdentical(None, functions[0]())

return deferLater(reactor, 0, check)


def test_runWaitForProcessesDeferreds(self):
"""
L{DistTrialRunner} waits for the worker processes to stop when the
reactor is stopping, and then unlocks the test directory, not trying to
stop the reactor again.
"""
functions = []
workers = []

class FakeReactorWithEvent(FakeReactor):

def spawnProcess(self, worker, *args, **kwargs):
worker.makeConnection(FakeTransport())
workers.append(worker)

def addSystemEventTrigger(oself, phase, event, function):
self.assertEqual('before', phase)
self.assertEqual('shutdown', event)
functions.append(function)

workingDirectory = self.runner._workingDirectory

fakeReactor = FakeReactorWithEvent()
self.runner.run(TestCase(), fakeReactor)

def check(ign):
# Let the AMP deferreds fire
return deferLater(reactor, 0, realCheck)

def realCheck():
localLock = FilesystemLock(workingDirectory + ".lock")
self.assertTrue(localLock.lock())
# Stop is not called, as it ought to have been called before
self.assertEqual(0, fakeReactor.stopCount)

workers[0].processEnded(Failure(CONNECTION_DONE))
return functions[0]().addCallback(check)


def test_runUntilFailure(self):
"""
L{DistTrialRunner} can run in C{untilFailure} mode where it will run
the given tests until they fail.
"""
called = []

class FakeReactorWithSuccess(FakeReactor):

def spawnProcess(self, worker, *args, **kwargs):
worker.makeConnection(FakeTransport())
self.spawnCount += 1
worker._ampProtocol.run = self.succeedingRun

def succeedingRun(self, case, result):
called.append(None)
if len(called) == 5:
return fail(RuntimeError("oops"))
return succeed(None)

fakeReactor = FakeReactorWithSuccess()

scheduler = FakeScheduler()
cooperator = Cooperator(scheduler=scheduler)

result = self.runner.run(
TestCase(), fakeReactor, cooperate=cooperator.cooperate,
untilFailure=True)
scheduler.pump()
self.assertEqual(5, len(called))
self.assertFalse(result.wasSuccessful())
output = self.runner._stream.getvalue()
self.assertIn("PASSED", output)
self.assertIn("FAIL", output)