Apache/2.4.7 (Ubuntu) Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64 uid=33(www-data) gid=33(www-data) groups=33(www-data) safemode : OFF MySQL: ON | Perl: ON | cURL: OFF | WGet: ON > / usr / lib / python2.7 / dist-packages / twisted / python / test / | server ip : 104.21.89.46 your ip : 172.69.130.102 H O M E |
Filename | /usr/lib/python2.7/dist-packages/twisted/python/test/test_util.py |
Size | 36.09 kb |
Permission | rw-r--r-- |
Owner | root : root |
Create time | 27-Apr-2025 09:56 |
Last modified | 04-Mar-2013 03:16 |
Last accessed | 07-Jul-2025 12:24 |
Actions | edit | rename | delete | download (gzip) |
View | text | code | image |
# -*- test-case-name: twisted.python.test.test_util
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.util}.
"""
from __future__ import division, absolute_import
import os.path, sys
import shutil, errno, warnings
try:
import pwd, grp
except ImportError:
pwd = grp = None
from twisted.trial import unittest
from twisted.trial.util import suppress as SUPPRESS
from twisted.python.compat import _PY3
from twisted.python import util
from twisted.python.versions import Version
from twisted.internet import reactor
from twisted.internet.interfaces import IReactorProcess
from twisted.internet.protocol import ProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet.error import ProcessDone
if _PY3:
MockOS = None
from twisted.python.deprecate import _fullyQualifiedName as fullyQualifiedName
else:
from twisted.test.test_process import MockOS
from twisted.python.reflect import fullyQualifiedName
class UtilTestCase(unittest.TestCase):
def testUniq(self):
l = ["a", 1, "ab", "a", 3, 4, 1, 2, 2, 4, 6]
self.assertEqual(util.uniquify(l), ["a", 1, "ab", 3, 4, 2, 6])
def testRaises(self):
self.failUnless(util.raises(ZeroDivisionError, divmod, 1, 0))
self.failIf(util.raises(ZeroDivisionError, divmod, 0, 1))
try:
util.raises(TypeError, divmod, 1, 0)
except ZeroDivisionError:
pass
else:
raise unittest.FailTest("util.raises didn't raise when it should have")
def test_uidFromNumericString(self):
"""
When L{uidFromString} is called with a base-ten string representation
of an integer, it returns the integer.
"""
self.assertEqual(util.uidFromString("100"), 100)
def test_uidFromUsernameString(self):
"""
When L{uidFromString} is called with a base-ten string representation
of an integer, it returns the integer.
"""
pwent = pwd.getpwuid(os.getuid())
self.assertEqual(util.uidFromString(pwent.pw_name), pwent.pw_uid)
if pwd is None:
test_uidFromUsernameString.skip = (
"Username/UID conversion requires the pwd module.")
def test_gidFromNumericString(self):
"""
When L{gidFromString} is called with a base-ten string representation
of an integer, it returns the integer.
"""
self.assertEqual(util.gidFromString("100"), 100)
def test_gidFromGroupnameString(self):
"""
When L{gidFromString} is called with a base-ten string representation
of an integer, it returns the integer.
"""
grent = grp.getgrgid(os.getgid())
self.assertEqual(util.gidFromString(grent.gr_name), grent.gr_gid)
if grp is None:
test_gidFromGroupnameString.skip = (
"Group Name/GID conversion requires the grp module.")
class NameToLabelTests(unittest.TestCase):
"""
Tests for L{nameToLabel}.
"""
def test_nameToLabel(self):
"""
Test the various kinds of inputs L{nameToLabel} supports.
"""
nameData = [
('f', 'F'),
('fo', 'Fo'),
('foo', 'Foo'),
('fooBar', 'Foo Bar'),
('fooBarBaz', 'Foo Bar Baz'),
]
for inp, out in nameData:
got = util.nameToLabel(inp)
self.assertEqual(
got, out,
"nameToLabel(%r) == %r != %r" % (inp, got, out))
class UntilConcludesTests(unittest.TestCase):
"""
Tests for L{untilConcludes}, an C{EINTR} helper.
"""
def test_uninterruptably(self):
"""
L{untilConcludes} calls the function passed to it until the function
does not raise either L{OSError} or L{IOError} with C{errno} of
C{EINTR}. It otherwise completes with the same result as the function
passed to it.
"""
def f(a, b):
self.calls += 1
exc = self.exceptions.pop()
if exc is not None:
raise exc(errno.EINTR, "Interrupted system call!")
return a + b
self.exceptions = [None]
self.calls = 0
self.assertEqual(util.untilConcludes(f, 1, 2), 3)
self.assertEqual(self.calls, 1)
self.exceptions = [None, OSError, IOError]
self.calls = 0
self.assertEqual(util.untilConcludes(f, 2, 3), 5)
self.assertEqual(self.calls, 3)
class SwitchUIDTest(unittest.TestCase):
"""
Tests for L{util.switchUID}.
"""
if getattr(os, "getuid", None) is None:
skip = "getuid/setuid not available"
def setUp(self):
self.mockos = MockOS()
self.patch(util, "os", self.mockos)
self.patch(util, "initgroups", self.initgroups)
self.initgroupsCalls = []
def initgroups(self, uid, gid):
"""
Save L{util.initgroups} calls in C{self.initgroupsCalls}.
"""
self.initgroupsCalls.append((uid, gid))
def test_uid(self):
"""
L{util.switchUID} calls L{util.initgroups} and then C{os.setuid} with
the given uid.
"""
util.switchUID(12000, None)
self.assertEqual(self.initgroupsCalls, [(12000, None)])
self.assertEqual(self.mockos.actions, [("setuid", 12000)])
def test_euid(self):
"""
L{util.switchUID} calls L{util.initgroups} and then C{os.seteuid} with
the given uid if the C{euid} parameter is set to C{True}.
"""
util.switchUID(12000, None, True)
self.assertEqual(self.initgroupsCalls, [(12000, None)])
self.assertEqual(self.mockos.seteuidCalls, [12000])
def test_currentUID(self):
"""
If the current uid is the same as the uid passed to L{util.switchUID},
then initgroups does not get called, but a warning is issued.
"""
uid = self.mockos.getuid()
util.switchUID(uid, None)
self.assertEqual(self.initgroupsCalls, [])
self.assertEqual(self.mockos.actions, [])
warnings = self.flushWarnings([util.switchUID])
self.assertEqual(len(warnings), 1)
self.assertIn('tried to drop privileges and setuid %i' % uid,
warnings[0]['message'])
self.assertIn('but uid is already %i' % uid, warnings[0]['message'])
def test_currentEUID(self):
"""
If the current euid is the same as the euid passed to L{util.switchUID},
then initgroups does not get called, but a warning is issued.
"""
euid = self.mockos.geteuid()
util.switchUID(euid, None, True)
self.assertEqual(self.initgroupsCalls, [])
self.assertEqual(self.mockos.seteuidCalls, [])
warnings = self.flushWarnings([util.switchUID])
self.assertEqual(len(warnings), 1)
self.assertIn('tried to drop privileges and seteuid %i' % euid,
warnings[0]['message'])
self.assertIn('but euid is already %i' % euid, warnings[0]['message'])
class TestMergeFunctionMetadata(unittest.TestCase):
"""
Tests for L{mergeFunctionMetadata}.
"""
def test_mergedFunctionBehavesLikeMergeTarget(self):
"""
After merging C{foo}'s data into C{bar}, the returned function behaves
as if it is C{bar}.
"""
foo_object = object()
bar_object = object()
def foo():
return foo_object
def bar(x, y, ab, c=10, *d, **e):
(a, b) = ab
return bar_object
baz = util.mergeFunctionMetadata(foo, bar)
self.assertIdentical(baz(1, 2, (3, 4), quux=10), bar_object)
def test_moduleIsMerged(self):
"""
Merging C{foo} into C{bar} returns a function with C{foo}'s
C{__module__}.
"""
def foo():
pass
def bar():
pass
bar.__module__ = 'somewhere.else'
baz = util.mergeFunctionMetadata(foo, bar)
self.assertEqual(baz.__module__, foo.__module__)
def test_docstringIsMerged(self):
"""
Merging C{foo} into C{bar} returns a function with C{foo}'s docstring.
"""
def foo():
"""
This is foo.
"""
def bar():
"""
This is bar.
"""
baz = util.mergeFunctionMetadata(foo, bar)
self.assertEqual(baz.__doc__, foo.__doc__)
def test_nameIsMerged(self):
"""
Merging C{foo} into C{bar} returns a function with C{foo}'s name.
"""
def foo():
pass
def bar():
pass
baz = util.mergeFunctionMetadata(foo, bar)
self.assertEqual(baz.__name__, foo.__name__)
def test_instanceDictionaryIsMerged(self):
"""
Merging C{foo} into C{bar} returns a function with C{bar}'s
dictionary, updated by C{foo}'s.
"""
def foo():
pass
foo.a = 1
foo.b = 2
def bar():
pass
bar.b = 3
bar.c = 4
baz = util.mergeFunctionMetadata(foo, bar)
self.assertEqual(foo.a, baz.a)
self.assertEqual(foo.b, baz.b)
self.assertEqual(bar.c, baz.c)
class OrderedDictTest(unittest.TestCase):
def testOrderedDict(self):
d = util.OrderedDict()
d['a'] = 'b'
d['b'] = 'a'
d[3] = 12
d[1234] = 4321
self.assertEqual(repr(d), "{'a': 'b', 'b': 'a', 3: 12, 1234: 4321}")
self.assertEqual(d.values(), ['b', 'a', 12, 4321])
del d[3]
self.assertEqual(repr(d), "{'a': 'b', 'b': 'a', 1234: 4321}")
self.assertEqual(d, {'a': 'b', 'b': 'a', 1234:4321})
self.assertEqual(d.keys(), ['a', 'b', 1234])
self.assertEqual(list(d.iteritems()),
[('a', 'b'), ('b','a'), (1234, 4321)])
item = d.popitem()
self.assertEqual(item, (1234, 4321))
def testInitialization(self):
d = util.OrderedDict({'monkey': 'ook',
'apple': 'red'})
self.failUnless(d._order)
d = util.OrderedDict(((1,1),(3,3),(2,2),(0,0)))
self.assertEqual(repr(d), "{1: 1, 3: 3, 2: 2, 0: 0}")
class InsensitiveDictTest(unittest.TestCase):
"""
Tests for L{util.InsensitiveDict}.
"""
def test_preserve(self):
"""
L{util.InsensitiveDict} preserves the case of keys if constructed with
C{preserve=True}.
"""
dct = util.InsensitiveDict({'Foo':'bar', 1:2, 'fnz':{1:2}}, preserve=1)
self.assertEqual(dct['fnz'], {1:2})
self.assertEqual(dct['foo'], 'bar')
self.assertEqual(dct.copy(), dct)
self.assertEqual(dct['foo'], dct.get('Foo'))
self.assertIn(1, dct)
self.assertIn('foo', dct)
# Make eval() work, urrrrgh:
InsensitiveDict = util.InsensitiveDict
self.assertEqual(eval(repr(dct)), dct)
keys=['Foo', 'fnz', 1]
for x in keys:
self.assertIn(x, dct.keys())
self.assertIn((x, dct[x]), dct.items())
self.assertEqual(len(keys), len(dct))
del dct[1]
del dct['foo']
self.assertEqual(dct.keys(), ['fnz'])
def test_noPreserve(self):
"""
L{util.InsensitiveDict} does not preserves the case of keys if
constructed with C{preserve=False}.
"""
dct = util.InsensitiveDict({'Foo':'bar', 1:2, 'fnz':{1:2}}, preserve=0)
keys=['foo', 'fnz', 1]
for x in keys:
self.assertIn(x, dct.keys())
self.assertIn((x, dct[x]), dct.items())
self.assertEqual(len(keys), len(dct))
del dct[1]
del dct['foo']
self.assertEqual(dct.keys(), ['fnz'])
def test_unicode(self):
"""
Unicode keys are case insensitive.
"""
d = util.InsensitiveDict(preserve=False)
d[u"Foo"] = 1
self.assertEqual(d[u"FOO"], 1)
self.assertEqual(d.keys(), [u"foo"])
def test_bytes(self):
"""
Bytes keys are case insensitive.
"""
d = util.InsensitiveDict(preserve=False)
d[b"Foo"] = 1
self.assertEqual(d[b"FOO"], 1)
self.assertEqual(d.keys(), [b"foo"])
class PasswordTestingProcessProtocol(ProcessProtocol):
"""
Write the string C{"secret\n"} to a subprocess and then collect all of
its output and fire a Deferred with it when the process ends.
"""
def connectionMade(self):
self.output = []
self.transport.write('secret\n')
def childDataReceived(self, fd, output):
self.output.append((fd, output))
def processEnded(self, reason):
self.finished.callback((reason, self.output))
class GetPasswordTest(unittest.TestCase):
if not IReactorProcess.providedBy(reactor):
skip = "Process support required to test getPassword"
def test_stdin(self):
"""
Making sure getPassword accepts a password from standard input by
running a child process which uses getPassword to read in a string
which it then writes it out again. Write a string to the child
process and then read one and make sure it is the right string.
"""
p = PasswordTestingProcessProtocol()
p.finished = Deferred()
reactor.spawnProcess(
p,
sys.executable,
[sys.executable,
'-c',
('import sys\n'
'from twisted.python.util import getPassword\n'
'sys.stdout.write(getPassword())\n'
'sys.stdout.flush()\n')],
env={'PYTHONPATH': os.pathsep.join(sys.path)})
def processFinished(result):
(reason, output) = result
reason.trap(ProcessDone)
self.assertIn((1, 'secret'), output)
return p.finished.addCallback(processFinished)
class SearchUpwardsTest(unittest.TestCase):
def testSearchupwards(self):
os.makedirs('searchupwards/a/b/c')
file('searchupwards/foo.txt', 'w').close()
file('searchupwards/a/foo.txt', 'w').close()
file('searchupwards/a/b/c/foo.txt', 'w').close()
os.mkdir('searchupwards/bar')
os.mkdir('searchupwards/bam')
os.mkdir('searchupwards/a/bar')
os.mkdir('searchupwards/a/b/bam')
actual=util.searchupwards('searchupwards/a/b/c',
files=['foo.txt'],
dirs=['bar', 'bam'])
expected=os.path.abspath('searchupwards') + os.sep
self.assertEqual(actual, expected)
shutil.rmtree('searchupwards')
actual=util.searchupwards('searchupwards/a/b/c',
files=['foo.txt'],
dirs=['bar', 'bam'])
expected=None
self.assertEqual(actual, expected)
class IntervalDifferentialTestCase(unittest.TestCase):
def testDefault(self):
d = iter(util.IntervalDifferential([], 10))
for i in range(100):
self.assertEqual(d.next(), (10, None))
def testSingle(self):
d = iter(util.IntervalDifferential([5], 10))
for i in range(100):
self.assertEqual(d.next(), (5, 0))
def testPair(self):
d = iter(util.IntervalDifferential([5, 7], 10))
for i in range(100):
self.assertEqual(d.next(), (5, 0))
self.assertEqual(d.next(), (2, 1))
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (4, 1))
self.assertEqual(d.next(), (1, 0))
self.assertEqual(d.next(), (5, 0))
self.assertEqual(d.next(), (1, 1))
self.assertEqual(d.next(), (4, 0))
self.assertEqual(d.next(), (3, 1))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (5, 0))
self.assertEqual(d.next(), (0, 1))
def testTriple(self):
d = iter(util.IntervalDifferential([2, 4, 5], 10))
for i in range(100):
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (1, 2))
self.assertEqual(d.next(), (1, 0))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (0, 2))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (1, 2))
self.assertEqual(d.next(), (1, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (0, 2))
def testInsert(self):
d = iter(util.IntervalDifferential([], 10))
self.assertEqual(d.next(), (10, None))
d.addInterval(3)
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (3, 0))
d.addInterval(6)
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (0, 1))
def testRemove(self):
d = iter(util.IntervalDifferential([3, 5], 10))
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (2, 1))
self.assertEqual(d.next(), (1, 0))
d.removeInterval(3)
self.assertEqual(d.next(), (4, 0))
self.assertEqual(d.next(), (5, 0))
d.removeInterval(5)
self.assertEqual(d.next(), (10, None))
self.assertRaises(ValueError, d.removeInterval, 10)
class Record(util.FancyEqMixin):
"""
Trivial user of L{FancyEqMixin} used by tests.
"""
compareAttributes = ('a', 'b')
def __init__(self, a, b):
self.a = a
self.b = b
class DifferentRecord(util.FancyEqMixin):
"""
Trivial user of L{FancyEqMixin} which is not related to L{Record}.
"""
compareAttributes = ('a', 'b')
def __init__(self, a, b):
self.a = a
self.b = b
class DerivedRecord(Record):
"""
A class with an inheritance relationship to L{Record}.
"""
class EqualToEverything(object):
"""
A class the instances of which consider themselves equal to everything.
"""
def __eq__(self, other):
return True
def __ne__(self, other):
return False
class EqualToNothing(object):
"""
A class the instances of which consider themselves equal to nothing.
"""
def __eq__(self, other):
return False
def __ne__(self, other):
return True
class EqualityTests(unittest.TestCase):
"""
Tests for L{FancyEqMixin}.
"""
def test_identity(self):
"""
Instances of a class which mixes in L{FancyEqMixin} but which
defines no comparison attributes compare by identity.
"""
class Empty(util.FancyEqMixin):
pass
self.assertFalse(Empty() == Empty())
self.assertTrue(Empty() != Empty())
empty = Empty()
self.assertTrue(empty == empty)
self.assertFalse(empty != empty)
def test_equality(self):
"""
Instances of a class which mixes in L{FancyEqMixin} should compare
equal if all of their attributes compare equal. They should not
compare equal if any of their attributes do not compare equal.
"""
self.assertTrue(Record(1, 2) == Record(1, 2))
self.assertFalse(Record(1, 2) == Record(1, 3))
self.assertFalse(Record(1, 2) == Record(2, 2))
self.assertFalse(Record(1, 2) == Record(3, 4))
def test_unequality(self):
"""
Unequality between instances of a particular L{record} should be
defined as the negation of equality.
"""
self.assertFalse(Record(1, 2) != Record(1, 2))
self.assertTrue(Record(1, 2) != Record(1, 3))
self.assertTrue(Record(1, 2) != Record(2, 2))
self.assertTrue(Record(1, 2) != Record(3, 4))
def test_differentClassesEquality(self):
"""
Instances of different classes which mix in L{FancyEqMixin} should not
compare equal.
"""
self.assertFalse(Record(1, 2) == DifferentRecord(1, 2))
def test_differentClassesInequality(self):
"""
Instances of different classes which mix in L{FancyEqMixin} should
compare unequal.
"""
self.assertTrue(Record(1, 2) != DifferentRecord(1, 2))
def test_inheritedClassesEquality(self):
"""
An instance of a class which derives from a class which mixes in
L{FancyEqMixin} should compare equal to an instance of the base class
if and only if all of their attributes compare equal.
"""
self.assertTrue(Record(1, 2) == DerivedRecord(1, 2))
self.assertFalse(Record(1, 2) == DerivedRecord(1, 3))
self.assertFalse(Record(1, 2) == DerivedRecord(2, 2))
self.assertFalse(Record(1, 2) == DerivedRecord(3, 4))
def test_inheritedClassesInequality(self):
"""
An instance of a class which derives from a class which mixes in
L{FancyEqMixin} should compare unequal to an instance of the base
class if any of their attributes compare unequal.
"""
self.assertFalse(Record(1, 2) != DerivedRecord(1, 2))
self.assertTrue(Record(1, 2) != DerivedRecord(1, 3))
self.assertTrue(Record(1, 2) != DerivedRecord(2, 2))
self.assertTrue(Record(1, 2) != DerivedRecord(3, 4))
def test_rightHandArgumentImplementsEquality(self):
"""
The right-hand argument to the equality operator is given a chance
to determine the result of the operation if it is of a type
unrelated to the L{FancyEqMixin}-based instance on the left-hand
side.
"""
self.assertTrue(Record(1, 2) == EqualToEverything())
self.assertFalse(Record(1, 2) == EqualToNothing())
def test_rightHandArgumentImplementsUnequality(self):
"""
The right-hand argument to the non-equality operator is given a
chance to determine the result of the operation if it is of a type
unrelated to the L{FancyEqMixin}-based instance on the left-hand
side.
"""
self.assertFalse(Record(1, 2) != EqualToEverything())
self.assertTrue(Record(1, 2) != EqualToNothing())
class RunAsEffectiveUserTests(unittest.TestCase):
"""
Test for the L{util.runAsEffectiveUser} function.
"""
if getattr(os, "geteuid", None) is None:
skip = "geteuid/seteuid not available"
def setUp(self):
self.mockos = MockOS()
self.patch(os, "geteuid", self.mockos.geteuid)
self.patch(os, "getegid", self.mockos.getegid)
self.patch(os, "seteuid", self.mockos.seteuid)
self.patch(os, "setegid", self.mockos.setegid)
def _securedFunction(self, startUID, startGID, wantUID, wantGID):
"""
Check if wanted UID/GID matched start or saved ones.
"""
self.assertTrue(wantUID == startUID or
wantUID == self.mockos.seteuidCalls[-1])
self.assertTrue(wantGID == startGID or
wantGID == self.mockos.setegidCalls[-1])
def test_forwardResult(self):
"""
L{util.runAsEffectiveUser} forwards the result obtained by calling the
given function
"""
result = util.runAsEffectiveUser(0, 0, lambda: 1)
self.assertEqual(result, 1)
def test_takeParameters(self):
"""
L{util.runAsEffectiveUser} pass the given parameters to the given
function.
"""
result = util.runAsEffectiveUser(0, 0, lambda x: 2*x, 3)
self.assertEqual(result, 6)
def test_takesKeyworkArguments(self):
"""
L{util.runAsEffectiveUser} pass the keyword parameters to the given
function.
"""
result = util.runAsEffectiveUser(0, 0, lambda x, y=1, z=1: x*y*z, 2, z=3)
self.assertEqual(result, 6)
def _testUIDGIDSwitch(self, startUID, startGID, wantUID, wantGID,
expectedUIDSwitches, expectedGIDSwitches):
"""
Helper method checking the calls to C{os.seteuid} and C{os.setegid}
made by L{util.runAsEffectiveUser}, when switching from startUID to
wantUID and from startGID to wantGID.
"""
self.mockos.euid = startUID
self.mockos.egid = startGID
util.runAsEffectiveUser(
wantUID, wantGID,
self._securedFunction, startUID, startGID, wantUID, wantGID)
self.assertEqual(self.mockos.seteuidCalls, expectedUIDSwitches)
self.assertEqual(self.mockos.setegidCalls, expectedGIDSwitches)
self.mockos.seteuidCalls = []
self.mockos.setegidCalls = []
def test_root(self):
"""
Check UID/GID switches when current effective UID is root.
"""
self._testUIDGIDSwitch(0, 0, 0, 0, [], [])
self._testUIDGIDSwitch(0, 0, 1, 0, [1, 0], [])
self._testUIDGIDSwitch(0, 0, 0, 1, [], [1, 0])
self._testUIDGIDSwitch(0, 0, 1, 1, [1, 0], [1, 0])
def test_UID(self):
"""
Check UID/GID switches when current effective UID is non-root.
"""
self._testUIDGIDSwitch(1, 0, 0, 0, [0, 1], [])
self._testUIDGIDSwitch(1, 0, 1, 0, [], [])
self._testUIDGIDSwitch(1, 0, 1, 1, [0, 1, 0, 1], [1, 0])
self._testUIDGIDSwitch(1, 0, 2, 1, [0, 2, 0, 1], [1, 0])
def test_GID(self):
"""
Check UID/GID switches when current effective GID is non-root.
"""
self._testUIDGIDSwitch(0, 1, 0, 0, [], [0, 1])
self._testUIDGIDSwitch(0, 1, 0, 1, [], [])
self._testUIDGIDSwitch(0, 1, 1, 1, [1, 0], [])
self._testUIDGIDSwitch(0, 1, 1, 2, [1, 0], [2, 1])
def test_UIDGID(self):
"""
Check UID/GID switches when current effective UID/GID is non-root.
"""
self._testUIDGIDSwitch(1, 1, 0, 0, [0, 1], [0, 1])
self._testUIDGIDSwitch(1, 1, 0, 1, [0, 1], [])
self._testUIDGIDSwitch(1, 1, 1, 0, [0, 1, 0, 1], [0, 1])
self._testUIDGIDSwitch(1, 1, 1, 1, [], [])
self._testUIDGIDSwitch(1, 1, 2, 1, [0, 2, 0, 1], [])
self._testUIDGIDSwitch(1, 1, 1, 2, [0, 1, 0, 1], [2, 1])
self._testUIDGIDSwitch(1, 1, 2, 2, [0, 2, 0, 1], [2, 1])
def _getDeprecationSuppression(f):
"""
Returns a tuple of arguments needed to suppress deprecation warnings from
a specified function.
@param f: function to suppress dperecation warnings for
@type f: L{callable}
@return: tuple to add to C{suppress} attribute
"""
return SUPPRESS(
category=DeprecationWarning,
message='%s was deprecated' % (fullyQualifiedName(f),))
class UnsignedIDTests(unittest.TestCase):
"""
Tests for L{util.unsignedID} and L{util.setIDFunction}.
"""
suppress = [
_getDeprecationSuppression(util.unsignedID),
_getDeprecationSuppression(util.setIDFunction),
]
def setUp(self):
"""
Save the value of L{util._idFunction} and arrange for it to be restored
after the test runs.
"""
self.addCleanup(setattr, util, '_idFunction', util._idFunction)
def test_setIDFunction(self):
"""
L{util.setIDFunction} returns the last value passed to it.
"""
value = object()
previous = util.setIDFunction(value)
result = util.setIDFunction(previous)
self.assertIdentical(value, result)
def test_setIDFunctionDeprecated(self):
"""
L{util.setIDFunction} is deprecated.
"""
self.callDeprecated(
(Version("Twisted", 13, 0, 0)),
util.setIDFunction, UnsignedIDTests)
test_setIDFunctionDeprecated.suppress = []
def test_unsignedID(self):
"""
L{util.unsignedID} uses the function passed to L{util.setIDFunction} to
determine the unique integer id of an object and then adjusts it to be
positive if necessary.
"""
foo = object()
bar = object()
# A fake object identity mapping
objects = {foo: 17, bar: -73}
def fakeId(obj):
return objects[obj]
util.setIDFunction(fakeId)
self.assertEqual(util.unsignedID(foo), 17)
self.assertEqual(util.unsignedID(bar), (sys.maxsize + 1) * 2 - 73)
def test_unsignedIDDeprecated(self):
"""
L{util.unsignedID} is deprecated, use the builtin C{id} instead.
"""
self.callDeprecated(
(Version("Twisted", 13, 0, 0), "builtin id"),
util.unsignedID, UnsignedIDTests)
test_unsignedIDDeprecated.suppress = []
def test_defaultIDFunction(self):
"""
L{util.unsignedID} uses the built in L{id} by default.
"""
obj = object()
idValue = id(obj)
if idValue < 0:
idValue += (sys.maxsize + 1) * 2
self.assertEqual(util.unsignedID(obj), idValue)
class InitGroupsTests(unittest.TestCase):
"""
Tests for L{util.initgroups}.
"""
if pwd is None:
skip = "pwd not available"
def setUp(self):
self.addCleanup(setattr, util, "_c_initgroups", util._c_initgroups)
self.addCleanup(setattr, util, "setgroups", util.setgroups)
def test_initgroupsForceC(self):
"""
If we fake the presence of the C extension, it's called instead of the
Python implementation.
"""
calls = []
util._c_initgroups = lambda x, y: calls.append((x, y))
setgroupsCalls = []
util.setgroups = calls.append
util.initgroups(os.getuid(), 4)
self.assertEqual(calls, [(pwd.getpwuid(os.getuid())[0], 4)])
self.assertFalse(setgroupsCalls)
def test_initgroupsForcePython(self):
"""
If we fake the absence of the C extension, the Python implementation is
called instead, calling C{os.setgroups}.
"""
util._c_initgroups = None
calls = []
util.setgroups = calls.append
util.initgroups(os.getuid(), os.getgid())
# Something should be in the calls, we don't really care what
self.assertTrue(calls)
def test_initgroupsInC(self):
"""
If the C extension is present, it's called instead of the Python
version. We check that by making sure C{os.setgroups} is not called.
"""
calls = []
util.setgroups = calls.append
try:
util.initgroups(os.getuid(), os.getgid())
except OSError:
pass
self.assertFalse(calls)
if util._c_initgroups is None:
test_initgroupsInC.skip = "C initgroups not available"
class DeprecationTests(unittest.TestCase):
"""
Tests for deprecations in C{twisted.python.util}.
"""
def test_getPluginDirs(self):
"""
L{util.getPluginDirs} is deprecated.
"""
util.getPluginDirs()
warnings = self.flushWarnings(offendingFunctions=[
self.test_getPluginDirs])
self.assertEqual(
warnings[0]['message'],
"twisted.python.util.getPluginDirs is deprecated since Twisted "
"12.2.")
self.assertEqual(warnings[0]['category'], DeprecationWarning)
self.assertEqual(len(warnings), 1)
def test_addPluginDir(self):
"""
L{util.addPluginDir} is deprecated.
"""
util.addPluginDir()
warnings = self.flushWarnings(offendingFunctions=[
self.test_addPluginDir])
self.assertEqual(
warnings[0]['message'],
"twisted.python.util.addPluginDir is deprecated since Twisted "
"12.2.")
self.assertEqual(warnings[0]['category'], DeprecationWarning)
self.assertEqual(len(warnings), 1)
test_addPluginDir.suppress = [
SUPPRESS(category=DeprecationWarning,
message="twisted.python.util.getPluginDirs is deprecated")
]
class SuppressedWarningsTests(unittest.TestCase):
"""
Tests for L{util.runWithWarningsSuppressed}.
"""
runWithWarningsSuppressed = staticmethod(util.runWithWarningsSuppressed)
def test_runWithWarningsSuppressedFiltered(self):
"""
Warnings from the function called by C{runWithWarningsSuppressed} are
suppressed if they match the passed in filter.
"""
filters = [(("ignore", ".*foo.*"), {}),
(("ignore", ".*bar.*"), {})]
self.runWithWarningsSuppressed(filters, warnings.warn, "ignore foo")
self.runWithWarningsSuppressed(filters, warnings.warn, "ignore bar")
self.assertEqual([], self.flushWarnings())
def test_runWithWarningsSuppressedUnfiltered(self):
"""
Warnings from the function called by C{runWithWarningsSuppressed} are
not suppressed if they do not match the passed in filter.
"""
filters = [(("ignore", ".*foo.*"), {}),
(("ignore", ".*bar.*"), {})]
self.runWithWarningsSuppressed(filters, warnings.warn, "don't ignore")
self.assertEqual(
["don't ignore"], [w['message'] for w in self.flushWarnings()])
def test_passThrough(self):
"""
C{runWithWarningsSuppressed} returns the result of the function it
called.
"""
self.assertEqual(self.runWithWarningsSuppressed([], lambda: 4), 4)
def test_noSideEffects(self):
"""
Once C{runWithWarningsSuppressed} has returned, it no longer
suppresses warnings.
"""
filters = [(("ignore", ".*foo.*"), {}),
(("ignore", ".*bar.*"), {})]
self.runWithWarningsSuppressed(filters, lambda: None)
warnings.warn("ignore foo")
self.assertEqual(
["ignore foo"], [w['message'] for w in self.flushWarnings()])
class FancyStrMixinTests(unittest.TestCase):
"""
Tests for L{util.FancyStrMixin}.
"""
def test_sequenceOfStrings(self):
"""
If C{showAttributes} is set to a sequence of strings, C{__str__}
renders using those by looking them up as attributes on the object.
"""
class Foo(util.FancyStrMixin):
showAttributes = ("first", "second")
first = 1
second = "hello"
self.assertEqual(str(Foo()), "<Foo first=1 second='hello'>")
def test_formatter(self):
"""
If C{showAttributes} has an item that is a 2-tuple, C{__str__} renders
the first item in the tuple as a key and the result of calling the
second item with the value of the attribute named by the first item as
the value.
"""
class Foo(util.FancyStrMixin):
showAttributes = (
"first",
("second", lambda value: repr(value[::-1])))
first = "hello"
second = "world"
self.assertEqual("<Foo first='hello' second='dlrow'>", str(Foo()))
def test_override(self):
"""
If C{showAttributes} has an item that is a 3-tuple, C{__str__} renders
the second item in the tuple as a key, and the contents of the
attribute named in the first item are rendered as the value. The value
is formatted using the third item in the tuple.
"""
class Foo(util.FancyStrMixin):
showAttributes = ("first", ("second", "2nd", "%.1f"))
first = 1
second = 2.111
self.assertEqual(str(Foo()), "<Foo first=1 2nd=2.1>")
def test_fancybasename(self):
"""
If C{fancybasename} is present, C{__str__} uses it instead of the class name.
"""
class Foo(util.FancyStrMixin):
fancybasename = "Bar"
self.assertEqual(str(Foo()), "<Bar>")
def test_repr(self):
"""
C{__repr__} outputs the same content as C{__str__}.
"""
class Foo(util.FancyStrMixin):
showAttributes = ("first", "second")
first = 1
second = "hello"
obj = Foo()
self.assertEqual(str(obj), repr(obj))
if _PY3:
del (SwitchUIDTest, SearchUpwardsTest, RunAsEffectiveUserTests,
OrderedDictTest, IntervalDifferentialTestCase, UtilTestCase,
TestMergeFunctionMetadata, DeprecationTests, InitGroupsTests,
GetPasswordTest,
)
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.python.util}.
"""
from __future__ import division, absolute_import
import os.path, sys
import shutil, errno, warnings
try:
import pwd, grp
except ImportError:
pwd = grp = None
from twisted.trial import unittest
from twisted.trial.util import suppress as SUPPRESS
from twisted.python.compat import _PY3
from twisted.python import util
from twisted.python.versions import Version
from twisted.internet import reactor
from twisted.internet.interfaces import IReactorProcess
from twisted.internet.protocol import ProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet.error import ProcessDone
if _PY3:
MockOS = None
from twisted.python.deprecate import _fullyQualifiedName as fullyQualifiedName
else:
from twisted.test.test_process import MockOS
from twisted.python.reflect import fullyQualifiedName
class UtilTestCase(unittest.TestCase):
def testUniq(self):
l = ["a", 1, "ab", "a", 3, 4, 1, 2, 2, 4, 6]
self.assertEqual(util.uniquify(l), ["a", 1, "ab", 3, 4, 2, 6])
def testRaises(self):
self.failUnless(util.raises(ZeroDivisionError, divmod, 1, 0))
self.failIf(util.raises(ZeroDivisionError, divmod, 0, 1))
try:
util.raises(TypeError, divmod, 1, 0)
except ZeroDivisionError:
pass
else:
raise unittest.FailTest("util.raises didn't raise when it should have")
def test_uidFromNumericString(self):
"""
When L{uidFromString} is called with a base-ten string representation
of an integer, it returns the integer.
"""
self.assertEqual(util.uidFromString("100"), 100)
def test_uidFromUsernameString(self):
"""
When L{uidFromString} is called with a base-ten string representation
of an integer, it returns the integer.
"""
pwent = pwd.getpwuid(os.getuid())
self.assertEqual(util.uidFromString(pwent.pw_name), pwent.pw_uid)
if pwd is None:
test_uidFromUsernameString.skip = (
"Username/UID conversion requires the pwd module.")
def test_gidFromNumericString(self):
"""
When L{gidFromString} is called with a base-ten string representation
of an integer, it returns the integer.
"""
self.assertEqual(util.gidFromString("100"), 100)
def test_gidFromGroupnameString(self):
"""
When L{gidFromString} is called with a base-ten string representation
of an integer, it returns the integer.
"""
grent = grp.getgrgid(os.getgid())
self.assertEqual(util.gidFromString(grent.gr_name), grent.gr_gid)
if grp is None:
test_gidFromGroupnameString.skip = (
"Group Name/GID conversion requires the grp module.")
class NameToLabelTests(unittest.TestCase):
"""
Tests for L{nameToLabel}.
"""
def test_nameToLabel(self):
"""
Test the various kinds of inputs L{nameToLabel} supports.
"""
nameData = [
('f', 'F'),
('fo', 'Fo'),
('foo', 'Foo'),
('fooBar', 'Foo Bar'),
('fooBarBaz', 'Foo Bar Baz'),
]
for inp, out in nameData:
got = util.nameToLabel(inp)
self.assertEqual(
got, out,
"nameToLabel(%r) == %r != %r" % (inp, got, out))
class UntilConcludesTests(unittest.TestCase):
"""
Tests for L{untilConcludes}, an C{EINTR} helper.
"""
def test_uninterruptably(self):
"""
L{untilConcludes} calls the function passed to it until the function
does not raise either L{OSError} or L{IOError} with C{errno} of
C{EINTR}. It otherwise completes with the same result as the function
passed to it.
"""
def f(a, b):
self.calls += 1
exc = self.exceptions.pop()
if exc is not None:
raise exc(errno.EINTR, "Interrupted system call!")
return a + b
self.exceptions = [None]
self.calls = 0
self.assertEqual(util.untilConcludes(f, 1, 2), 3)
self.assertEqual(self.calls, 1)
self.exceptions = [None, OSError, IOError]
self.calls = 0
self.assertEqual(util.untilConcludes(f, 2, 3), 5)
self.assertEqual(self.calls, 3)
class SwitchUIDTest(unittest.TestCase):
"""
Tests for L{util.switchUID}.
"""
if getattr(os, "getuid", None) is None:
skip = "getuid/setuid not available"
def setUp(self):
self.mockos = MockOS()
self.patch(util, "os", self.mockos)
self.patch(util, "initgroups", self.initgroups)
self.initgroupsCalls = []
def initgroups(self, uid, gid):
"""
Save L{util.initgroups} calls in C{self.initgroupsCalls}.
"""
self.initgroupsCalls.append((uid, gid))
def test_uid(self):
"""
L{util.switchUID} calls L{util.initgroups} and then C{os.setuid} with
the given uid.
"""
util.switchUID(12000, None)
self.assertEqual(self.initgroupsCalls, [(12000, None)])
self.assertEqual(self.mockos.actions, [("setuid", 12000)])
def test_euid(self):
"""
L{util.switchUID} calls L{util.initgroups} and then C{os.seteuid} with
the given uid if the C{euid} parameter is set to C{True}.
"""
util.switchUID(12000, None, True)
self.assertEqual(self.initgroupsCalls, [(12000, None)])
self.assertEqual(self.mockos.seteuidCalls, [12000])
def test_currentUID(self):
"""
If the current uid is the same as the uid passed to L{util.switchUID},
then initgroups does not get called, but a warning is issued.
"""
uid = self.mockos.getuid()
util.switchUID(uid, None)
self.assertEqual(self.initgroupsCalls, [])
self.assertEqual(self.mockos.actions, [])
warnings = self.flushWarnings([util.switchUID])
self.assertEqual(len(warnings), 1)
self.assertIn('tried to drop privileges and setuid %i' % uid,
warnings[0]['message'])
self.assertIn('but uid is already %i' % uid, warnings[0]['message'])
def test_currentEUID(self):
"""
If the current euid is the same as the euid passed to L{util.switchUID},
then initgroups does not get called, but a warning is issued.
"""
euid = self.mockos.geteuid()
util.switchUID(euid, None, True)
self.assertEqual(self.initgroupsCalls, [])
self.assertEqual(self.mockos.seteuidCalls, [])
warnings = self.flushWarnings([util.switchUID])
self.assertEqual(len(warnings), 1)
self.assertIn('tried to drop privileges and seteuid %i' % euid,
warnings[0]['message'])
self.assertIn('but euid is already %i' % euid, warnings[0]['message'])
class TestMergeFunctionMetadata(unittest.TestCase):
"""
Tests for L{mergeFunctionMetadata}.
"""
def test_mergedFunctionBehavesLikeMergeTarget(self):
"""
After merging C{foo}'s data into C{bar}, the returned function behaves
as if it is C{bar}.
"""
foo_object = object()
bar_object = object()
def foo():
return foo_object
def bar(x, y, ab, c=10, *d, **e):
(a, b) = ab
return bar_object
baz = util.mergeFunctionMetadata(foo, bar)
self.assertIdentical(baz(1, 2, (3, 4), quux=10), bar_object)
def test_moduleIsMerged(self):
"""
Merging C{foo} into C{bar} returns a function with C{foo}'s
C{__module__}.
"""
def foo():
pass
def bar():
pass
bar.__module__ = 'somewhere.else'
baz = util.mergeFunctionMetadata(foo, bar)
self.assertEqual(baz.__module__, foo.__module__)
def test_docstringIsMerged(self):
"""
Merging C{foo} into C{bar} returns a function with C{foo}'s docstring.
"""
def foo():
"""
This is foo.
"""
def bar():
"""
This is bar.
"""
baz = util.mergeFunctionMetadata(foo, bar)
self.assertEqual(baz.__doc__, foo.__doc__)
def test_nameIsMerged(self):
"""
Merging C{foo} into C{bar} returns a function with C{foo}'s name.
"""
def foo():
pass
def bar():
pass
baz = util.mergeFunctionMetadata(foo, bar)
self.assertEqual(baz.__name__, foo.__name__)
def test_instanceDictionaryIsMerged(self):
"""
Merging C{foo} into C{bar} returns a function with C{bar}'s
dictionary, updated by C{foo}'s.
"""
def foo():
pass
foo.a = 1
foo.b = 2
def bar():
pass
bar.b = 3
bar.c = 4
baz = util.mergeFunctionMetadata(foo, bar)
self.assertEqual(foo.a, baz.a)
self.assertEqual(foo.b, baz.b)
self.assertEqual(bar.c, baz.c)
class OrderedDictTest(unittest.TestCase):
def testOrderedDict(self):
d = util.OrderedDict()
d['a'] = 'b'
d['b'] = 'a'
d[3] = 12
d[1234] = 4321
self.assertEqual(repr(d), "{'a': 'b', 'b': 'a', 3: 12, 1234: 4321}")
self.assertEqual(d.values(), ['b', 'a', 12, 4321])
del d[3]
self.assertEqual(repr(d), "{'a': 'b', 'b': 'a', 1234: 4321}")
self.assertEqual(d, {'a': 'b', 'b': 'a', 1234:4321})
self.assertEqual(d.keys(), ['a', 'b', 1234])
self.assertEqual(list(d.iteritems()),
[('a', 'b'), ('b','a'), (1234, 4321)])
item = d.popitem()
self.assertEqual(item, (1234, 4321))
def testInitialization(self):
d = util.OrderedDict({'monkey': 'ook',
'apple': 'red'})
self.failUnless(d._order)
d = util.OrderedDict(((1,1),(3,3),(2,2),(0,0)))
self.assertEqual(repr(d), "{1: 1, 3: 3, 2: 2, 0: 0}")
class InsensitiveDictTest(unittest.TestCase):
"""
Tests for L{util.InsensitiveDict}.
"""
def test_preserve(self):
"""
L{util.InsensitiveDict} preserves the case of keys if constructed with
C{preserve=True}.
"""
dct = util.InsensitiveDict({'Foo':'bar', 1:2, 'fnz':{1:2}}, preserve=1)
self.assertEqual(dct['fnz'], {1:2})
self.assertEqual(dct['foo'], 'bar')
self.assertEqual(dct.copy(), dct)
self.assertEqual(dct['foo'], dct.get('Foo'))
self.assertIn(1, dct)
self.assertIn('foo', dct)
# Make eval() work, urrrrgh:
InsensitiveDict = util.InsensitiveDict
self.assertEqual(eval(repr(dct)), dct)
keys=['Foo', 'fnz', 1]
for x in keys:
self.assertIn(x, dct.keys())
self.assertIn((x, dct[x]), dct.items())
self.assertEqual(len(keys), len(dct))
del dct[1]
del dct['foo']
self.assertEqual(dct.keys(), ['fnz'])
def test_noPreserve(self):
"""
L{util.InsensitiveDict} does not preserves the case of keys if
constructed with C{preserve=False}.
"""
dct = util.InsensitiveDict({'Foo':'bar', 1:2, 'fnz':{1:2}}, preserve=0)
keys=['foo', 'fnz', 1]
for x in keys:
self.assertIn(x, dct.keys())
self.assertIn((x, dct[x]), dct.items())
self.assertEqual(len(keys), len(dct))
del dct[1]
del dct['foo']
self.assertEqual(dct.keys(), ['fnz'])
def test_unicode(self):
"""
Unicode keys are case insensitive.
"""
d = util.InsensitiveDict(preserve=False)
d[u"Foo"] = 1
self.assertEqual(d[u"FOO"], 1)
self.assertEqual(d.keys(), [u"foo"])
def test_bytes(self):
"""
Bytes keys are case insensitive.
"""
d = util.InsensitiveDict(preserve=False)
d[b"Foo"] = 1
self.assertEqual(d[b"FOO"], 1)
self.assertEqual(d.keys(), [b"foo"])
class PasswordTestingProcessProtocol(ProcessProtocol):
"""
Write the string C{"secret\n"} to a subprocess and then collect all of
its output and fire a Deferred with it when the process ends.
"""
def connectionMade(self):
self.output = []
self.transport.write('secret\n')
def childDataReceived(self, fd, output):
self.output.append((fd, output))
def processEnded(self, reason):
self.finished.callback((reason, self.output))
class GetPasswordTest(unittest.TestCase):
if not IReactorProcess.providedBy(reactor):
skip = "Process support required to test getPassword"
def test_stdin(self):
"""
Making sure getPassword accepts a password from standard input by
running a child process which uses getPassword to read in a string
which it then writes it out again. Write a string to the child
process and then read one and make sure it is the right string.
"""
p = PasswordTestingProcessProtocol()
p.finished = Deferred()
reactor.spawnProcess(
p,
sys.executable,
[sys.executable,
'-c',
('import sys\n'
'from twisted.python.util import getPassword\n'
'sys.stdout.write(getPassword())\n'
'sys.stdout.flush()\n')],
env={'PYTHONPATH': os.pathsep.join(sys.path)})
def processFinished(result):
(reason, output) = result
reason.trap(ProcessDone)
self.assertIn((1, 'secret'), output)
return p.finished.addCallback(processFinished)
class SearchUpwardsTest(unittest.TestCase):
def testSearchupwards(self):
os.makedirs('searchupwards/a/b/c')
file('searchupwards/foo.txt', 'w').close()
file('searchupwards/a/foo.txt', 'w').close()
file('searchupwards/a/b/c/foo.txt', 'w').close()
os.mkdir('searchupwards/bar')
os.mkdir('searchupwards/bam')
os.mkdir('searchupwards/a/bar')
os.mkdir('searchupwards/a/b/bam')
actual=util.searchupwards('searchupwards/a/b/c',
files=['foo.txt'],
dirs=['bar', 'bam'])
expected=os.path.abspath('searchupwards') + os.sep
self.assertEqual(actual, expected)
shutil.rmtree('searchupwards')
actual=util.searchupwards('searchupwards/a/b/c',
files=['foo.txt'],
dirs=['bar', 'bam'])
expected=None
self.assertEqual(actual, expected)
class IntervalDifferentialTestCase(unittest.TestCase):
def testDefault(self):
d = iter(util.IntervalDifferential([], 10))
for i in range(100):
self.assertEqual(d.next(), (10, None))
def testSingle(self):
d = iter(util.IntervalDifferential([5], 10))
for i in range(100):
self.assertEqual(d.next(), (5, 0))
def testPair(self):
d = iter(util.IntervalDifferential([5, 7], 10))
for i in range(100):
self.assertEqual(d.next(), (5, 0))
self.assertEqual(d.next(), (2, 1))
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (4, 1))
self.assertEqual(d.next(), (1, 0))
self.assertEqual(d.next(), (5, 0))
self.assertEqual(d.next(), (1, 1))
self.assertEqual(d.next(), (4, 0))
self.assertEqual(d.next(), (3, 1))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (5, 0))
self.assertEqual(d.next(), (0, 1))
def testTriple(self):
d = iter(util.IntervalDifferential([2, 4, 5], 10))
for i in range(100):
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (1, 2))
self.assertEqual(d.next(), (1, 0))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (0, 2))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (1, 2))
self.assertEqual(d.next(), (1, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (2, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (0, 2))
def testInsert(self):
d = iter(util.IntervalDifferential([], 10))
self.assertEqual(d.next(), (10, None))
d.addInterval(3)
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (3, 0))
d.addInterval(6)
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (0, 1))
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (0, 1))
def testRemove(self):
d = iter(util.IntervalDifferential([3, 5], 10))
self.assertEqual(d.next(), (3, 0))
self.assertEqual(d.next(), (2, 1))
self.assertEqual(d.next(), (1, 0))
d.removeInterval(3)
self.assertEqual(d.next(), (4, 0))
self.assertEqual(d.next(), (5, 0))
d.removeInterval(5)
self.assertEqual(d.next(), (10, None))
self.assertRaises(ValueError, d.removeInterval, 10)
class Record(util.FancyEqMixin):
"""
Trivial user of L{FancyEqMixin} used by tests.
"""
compareAttributes = ('a', 'b')
def __init__(self, a, b):
self.a = a
self.b = b
class DifferentRecord(util.FancyEqMixin):
"""
Trivial user of L{FancyEqMixin} which is not related to L{Record}.
"""
compareAttributes = ('a', 'b')
def __init__(self, a, b):
self.a = a
self.b = b
class DerivedRecord(Record):
"""
A class with an inheritance relationship to L{Record}.
"""
class EqualToEverything(object):
"""
A class the instances of which consider themselves equal to everything.
"""
def __eq__(self, other):
return True
def __ne__(self, other):
return False
class EqualToNothing(object):
"""
A class the instances of which consider themselves equal to nothing.
"""
def __eq__(self, other):
return False
def __ne__(self, other):
return True
class EqualityTests(unittest.TestCase):
"""
Tests for L{FancyEqMixin}.
"""
def test_identity(self):
"""
Instances of a class which mixes in L{FancyEqMixin} but which
defines no comparison attributes compare by identity.
"""
class Empty(util.FancyEqMixin):
pass
self.assertFalse(Empty() == Empty())
self.assertTrue(Empty() != Empty())
empty = Empty()
self.assertTrue(empty == empty)
self.assertFalse(empty != empty)
def test_equality(self):
"""
Instances of a class which mixes in L{FancyEqMixin} should compare
equal if all of their attributes compare equal. They should not
compare equal if any of their attributes do not compare equal.
"""
self.assertTrue(Record(1, 2) == Record(1, 2))
self.assertFalse(Record(1, 2) == Record(1, 3))
self.assertFalse(Record(1, 2) == Record(2, 2))
self.assertFalse(Record(1, 2) == Record(3, 4))
def test_unequality(self):
"""
Unequality between instances of a particular L{record} should be
defined as the negation of equality.
"""
self.assertFalse(Record(1, 2) != Record(1, 2))
self.assertTrue(Record(1, 2) != Record(1, 3))
self.assertTrue(Record(1, 2) != Record(2, 2))
self.assertTrue(Record(1, 2) != Record(3, 4))
def test_differentClassesEquality(self):
"""
Instances of different classes which mix in L{FancyEqMixin} should not
compare equal.
"""
self.assertFalse(Record(1, 2) == DifferentRecord(1, 2))
def test_differentClassesInequality(self):
"""
Instances of different classes which mix in L{FancyEqMixin} should
compare unequal.
"""
self.assertTrue(Record(1, 2) != DifferentRecord(1, 2))
def test_inheritedClassesEquality(self):
"""
An instance of a class which derives from a class which mixes in
L{FancyEqMixin} should compare equal to an instance of the base class
if and only if all of their attributes compare equal.
"""
self.assertTrue(Record(1, 2) == DerivedRecord(1, 2))
self.assertFalse(Record(1, 2) == DerivedRecord(1, 3))
self.assertFalse(Record(1, 2) == DerivedRecord(2, 2))
self.assertFalse(Record(1, 2) == DerivedRecord(3, 4))
def test_inheritedClassesInequality(self):
"""
An instance of a class which derives from a class which mixes in
L{FancyEqMixin} should compare unequal to an instance of the base
class if any of their attributes compare unequal.
"""
self.assertFalse(Record(1, 2) != DerivedRecord(1, 2))
self.assertTrue(Record(1, 2) != DerivedRecord(1, 3))
self.assertTrue(Record(1, 2) != DerivedRecord(2, 2))
self.assertTrue(Record(1, 2) != DerivedRecord(3, 4))
def test_rightHandArgumentImplementsEquality(self):
"""
The right-hand argument to the equality operator is given a chance
to determine the result of the operation if it is of a type
unrelated to the L{FancyEqMixin}-based instance on the left-hand
side.
"""
self.assertTrue(Record(1, 2) == EqualToEverything())
self.assertFalse(Record(1, 2) == EqualToNothing())
def test_rightHandArgumentImplementsUnequality(self):
"""
The right-hand argument to the non-equality operator is given a
chance to determine the result of the operation if it is of a type
unrelated to the L{FancyEqMixin}-based instance on the left-hand
side.
"""
self.assertFalse(Record(1, 2) != EqualToEverything())
self.assertTrue(Record(1, 2) != EqualToNothing())
class RunAsEffectiveUserTests(unittest.TestCase):
"""
Test for the L{util.runAsEffectiveUser} function.
"""
if getattr(os, "geteuid", None) is None:
skip = "geteuid/seteuid not available"
def setUp(self):
self.mockos = MockOS()
self.patch(os, "geteuid", self.mockos.geteuid)
self.patch(os, "getegid", self.mockos.getegid)
self.patch(os, "seteuid", self.mockos.seteuid)
self.patch(os, "setegid", self.mockos.setegid)
def _securedFunction(self, startUID, startGID, wantUID, wantGID):
"""
Check if wanted UID/GID matched start or saved ones.
"""
self.assertTrue(wantUID == startUID or
wantUID == self.mockos.seteuidCalls[-1])
self.assertTrue(wantGID == startGID or
wantGID == self.mockos.setegidCalls[-1])
def test_forwardResult(self):
"""
L{util.runAsEffectiveUser} forwards the result obtained by calling the
given function
"""
result = util.runAsEffectiveUser(0, 0, lambda: 1)
self.assertEqual(result, 1)
def test_takeParameters(self):
"""
L{util.runAsEffectiveUser} pass the given parameters to the given
function.
"""
result = util.runAsEffectiveUser(0, 0, lambda x: 2*x, 3)
self.assertEqual(result, 6)
def test_takesKeyworkArguments(self):
"""
L{util.runAsEffectiveUser} pass the keyword parameters to the given
function.
"""
result = util.runAsEffectiveUser(0, 0, lambda x, y=1, z=1: x*y*z, 2, z=3)
self.assertEqual(result, 6)
def _testUIDGIDSwitch(self, startUID, startGID, wantUID, wantGID,
expectedUIDSwitches, expectedGIDSwitches):
"""
Helper method checking the calls to C{os.seteuid} and C{os.setegid}
made by L{util.runAsEffectiveUser}, when switching from startUID to
wantUID and from startGID to wantGID.
"""
self.mockos.euid = startUID
self.mockos.egid = startGID
util.runAsEffectiveUser(
wantUID, wantGID,
self._securedFunction, startUID, startGID, wantUID, wantGID)
self.assertEqual(self.mockos.seteuidCalls, expectedUIDSwitches)
self.assertEqual(self.mockos.setegidCalls, expectedGIDSwitches)
self.mockos.seteuidCalls = []
self.mockos.setegidCalls = []
def test_root(self):
"""
Check UID/GID switches when current effective UID is root.
"""
self._testUIDGIDSwitch(0, 0, 0, 0, [], [])
self._testUIDGIDSwitch(0, 0, 1, 0, [1, 0], [])
self._testUIDGIDSwitch(0, 0, 0, 1, [], [1, 0])
self._testUIDGIDSwitch(0, 0, 1, 1, [1, 0], [1, 0])
def test_UID(self):
"""
Check UID/GID switches when current effective UID is non-root.
"""
self._testUIDGIDSwitch(1, 0, 0, 0, [0, 1], [])
self._testUIDGIDSwitch(1, 0, 1, 0, [], [])
self._testUIDGIDSwitch(1, 0, 1, 1, [0, 1, 0, 1], [1, 0])
self._testUIDGIDSwitch(1, 0, 2, 1, [0, 2, 0, 1], [1, 0])
def test_GID(self):
"""
Check UID/GID switches when current effective GID is non-root.
"""
self._testUIDGIDSwitch(0, 1, 0, 0, [], [0, 1])
self._testUIDGIDSwitch(0, 1, 0, 1, [], [])
self._testUIDGIDSwitch(0, 1, 1, 1, [1, 0], [])
self._testUIDGIDSwitch(0, 1, 1, 2, [1, 0], [2, 1])
def test_UIDGID(self):
"""
Check UID/GID switches when current effective UID/GID is non-root.
"""
self._testUIDGIDSwitch(1, 1, 0, 0, [0, 1], [0, 1])
self._testUIDGIDSwitch(1, 1, 0, 1, [0, 1], [])
self._testUIDGIDSwitch(1, 1, 1, 0, [0, 1, 0, 1], [0, 1])
self._testUIDGIDSwitch(1, 1, 1, 1, [], [])
self._testUIDGIDSwitch(1, 1, 2, 1, [0, 2, 0, 1], [])
self._testUIDGIDSwitch(1, 1, 1, 2, [0, 1, 0, 1], [2, 1])
self._testUIDGIDSwitch(1, 1, 2, 2, [0, 2, 0, 1], [2, 1])
def _getDeprecationSuppression(f):
"""
Returns a tuple of arguments needed to suppress deprecation warnings from
a specified function.
@param f: function to suppress dperecation warnings for
@type f: L{callable}
@return: tuple to add to C{suppress} attribute
"""
return SUPPRESS(
category=DeprecationWarning,
message='%s was deprecated' % (fullyQualifiedName(f),))
class UnsignedIDTests(unittest.TestCase):
"""
Tests for L{util.unsignedID} and L{util.setIDFunction}.
"""
suppress = [
_getDeprecationSuppression(util.unsignedID),
_getDeprecationSuppression(util.setIDFunction),
]
def setUp(self):
"""
Save the value of L{util._idFunction} and arrange for it to be restored
after the test runs.
"""
self.addCleanup(setattr, util, '_idFunction', util._idFunction)
def test_setIDFunction(self):
"""
L{util.setIDFunction} returns the last value passed to it.
"""
value = object()
previous = util.setIDFunction(value)
result = util.setIDFunction(previous)
self.assertIdentical(value, result)
def test_setIDFunctionDeprecated(self):
"""
L{util.setIDFunction} is deprecated.
"""
self.callDeprecated(
(Version("Twisted", 13, 0, 0)),
util.setIDFunction, UnsignedIDTests)
test_setIDFunctionDeprecated.suppress = []
def test_unsignedID(self):
"""
L{util.unsignedID} uses the function passed to L{util.setIDFunction} to
determine the unique integer id of an object and then adjusts it to be
positive if necessary.
"""
foo = object()
bar = object()
# A fake object identity mapping
objects = {foo: 17, bar: -73}
def fakeId(obj):
return objects[obj]
util.setIDFunction(fakeId)
self.assertEqual(util.unsignedID(foo), 17)
self.assertEqual(util.unsignedID(bar), (sys.maxsize + 1) * 2 - 73)
def test_unsignedIDDeprecated(self):
"""
L{util.unsignedID} is deprecated, use the builtin C{id} instead.
"""
self.callDeprecated(
(Version("Twisted", 13, 0, 0), "builtin id"),
util.unsignedID, UnsignedIDTests)
test_unsignedIDDeprecated.suppress = []
def test_defaultIDFunction(self):
"""
L{util.unsignedID} uses the built in L{id} by default.
"""
obj = object()
idValue = id(obj)
if idValue < 0:
idValue += (sys.maxsize + 1) * 2
self.assertEqual(util.unsignedID(obj), idValue)
class InitGroupsTests(unittest.TestCase):
"""
Tests for L{util.initgroups}.
"""
if pwd is None:
skip = "pwd not available"
def setUp(self):
self.addCleanup(setattr, util, "_c_initgroups", util._c_initgroups)
self.addCleanup(setattr, util, "setgroups", util.setgroups)
def test_initgroupsForceC(self):
"""
If we fake the presence of the C extension, it's called instead of the
Python implementation.
"""
calls = []
util._c_initgroups = lambda x, y: calls.append((x, y))
setgroupsCalls = []
util.setgroups = calls.append
util.initgroups(os.getuid(), 4)
self.assertEqual(calls, [(pwd.getpwuid(os.getuid())[0], 4)])
self.assertFalse(setgroupsCalls)
def test_initgroupsForcePython(self):
"""
If we fake the absence of the C extension, the Python implementation is
called instead, calling C{os.setgroups}.
"""
util._c_initgroups = None
calls = []
util.setgroups = calls.append
util.initgroups(os.getuid(), os.getgid())
# Something should be in the calls, we don't really care what
self.assertTrue(calls)
def test_initgroupsInC(self):
"""
If the C extension is present, it's called instead of the Python
version. We check that by making sure C{os.setgroups} is not called.
"""
calls = []
util.setgroups = calls.append
try:
util.initgroups(os.getuid(), os.getgid())
except OSError:
pass
self.assertFalse(calls)
if util._c_initgroups is None:
test_initgroupsInC.skip = "C initgroups not available"
class DeprecationTests(unittest.TestCase):
"""
Tests for deprecations in C{twisted.python.util}.
"""
def test_getPluginDirs(self):
"""
L{util.getPluginDirs} is deprecated.
"""
util.getPluginDirs()
warnings = self.flushWarnings(offendingFunctions=[
self.test_getPluginDirs])
self.assertEqual(
warnings[0]['message'],
"twisted.python.util.getPluginDirs is deprecated since Twisted "
"12.2.")
self.assertEqual(warnings[0]['category'], DeprecationWarning)
self.assertEqual(len(warnings), 1)
def test_addPluginDir(self):
"""
L{util.addPluginDir} is deprecated.
"""
util.addPluginDir()
warnings = self.flushWarnings(offendingFunctions=[
self.test_addPluginDir])
self.assertEqual(
warnings[0]['message'],
"twisted.python.util.addPluginDir is deprecated since Twisted "
"12.2.")
self.assertEqual(warnings[0]['category'], DeprecationWarning)
self.assertEqual(len(warnings), 1)
test_addPluginDir.suppress = [
SUPPRESS(category=DeprecationWarning,
message="twisted.python.util.getPluginDirs is deprecated")
]
class SuppressedWarningsTests(unittest.TestCase):
"""
Tests for L{util.runWithWarningsSuppressed}.
"""
runWithWarningsSuppressed = staticmethod(util.runWithWarningsSuppressed)
def test_runWithWarningsSuppressedFiltered(self):
"""
Warnings from the function called by C{runWithWarningsSuppressed} are
suppressed if they match the passed in filter.
"""
filters = [(("ignore", ".*foo.*"), {}),
(("ignore", ".*bar.*"), {})]
self.runWithWarningsSuppressed(filters, warnings.warn, "ignore foo")
self.runWithWarningsSuppressed(filters, warnings.warn, "ignore bar")
self.assertEqual([], self.flushWarnings())
def test_runWithWarningsSuppressedUnfiltered(self):
"""
Warnings from the function called by C{runWithWarningsSuppressed} are
not suppressed if they do not match the passed in filter.
"""
filters = [(("ignore", ".*foo.*"), {}),
(("ignore", ".*bar.*"), {})]
self.runWithWarningsSuppressed(filters, warnings.warn, "don't ignore")
self.assertEqual(
["don't ignore"], [w['message'] for w in self.flushWarnings()])
def test_passThrough(self):
"""
C{runWithWarningsSuppressed} returns the result of the function it
called.
"""
self.assertEqual(self.runWithWarningsSuppressed([], lambda: 4), 4)
def test_noSideEffects(self):
"""
Once C{runWithWarningsSuppressed} has returned, it no longer
suppresses warnings.
"""
filters = [(("ignore", ".*foo.*"), {}),
(("ignore", ".*bar.*"), {})]
self.runWithWarningsSuppressed(filters, lambda: None)
warnings.warn("ignore foo")
self.assertEqual(
["ignore foo"], [w['message'] for w in self.flushWarnings()])
class FancyStrMixinTests(unittest.TestCase):
"""
Tests for L{util.FancyStrMixin}.
"""
def test_sequenceOfStrings(self):
"""
If C{showAttributes} is set to a sequence of strings, C{__str__}
renders using those by looking them up as attributes on the object.
"""
class Foo(util.FancyStrMixin):
showAttributes = ("first", "second")
first = 1
second = "hello"
self.assertEqual(str(Foo()), "<Foo first=1 second='hello'>")
def test_formatter(self):
"""
If C{showAttributes} has an item that is a 2-tuple, C{__str__} renders
the first item in the tuple as a key and the result of calling the
second item with the value of the attribute named by the first item as
the value.
"""
class Foo(util.FancyStrMixin):
showAttributes = (
"first",
("second", lambda value: repr(value[::-1])))
first = "hello"
second = "world"
self.assertEqual("<Foo first='hello' second='dlrow'>", str(Foo()))
def test_override(self):
"""
If C{showAttributes} has an item that is a 3-tuple, C{__str__} renders
the second item in the tuple as a key, and the contents of the
attribute named in the first item are rendered as the value. The value
is formatted using the third item in the tuple.
"""
class Foo(util.FancyStrMixin):
showAttributes = ("first", ("second", "2nd", "%.1f"))
first = 1
second = 2.111
self.assertEqual(str(Foo()), "<Foo first=1 2nd=2.1>")
def test_fancybasename(self):
"""
If C{fancybasename} is present, C{__str__} uses it instead of the class name.
"""
class Foo(util.FancyStrMixin):
fancybasename = "Bar"
self.assertEqual(str(Foo()), "<Bar>")
def test_repr(self):
"""
C{__repr__} outputs the same content as C{__str__}.
"""
class Foo(util.FancyStrMixin):
showAttributes = ("first", "second")
first = 1
second = "hello"
obj = Foo()
self.assertEqual(str(obj), repr(obj))
if _PY3:
del (SwitchUIDTest, SearchUpwardsTest, RunAsEffectiveUserTests,
OrderedDictTest, IntervalDifferentialTestCase, UtilTestCase,
TestMergeFunctionMetadata, DeprecationTests, InitGroupsTests,
GetPasswordTest,
)