K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / lib / python2.7 / dist-packages / twisted / test /
server ip : 104.21.89.46

your ip : 172.69.214.80

H O M E


Filename/usr/lib/python2.7/dist-packages/twisted/test/test_unix.py
Size14.22 kb
Permissionrw-r--r--
Ownerroot : root
Create time27-Apr-2025 09:56
Last modified15-Jul-2011 03:05
Last accessed07-Jul-2025 00:06
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{IReactorUNIX} and L{IReactorUNIXDatagram}.
"""

import stat, os, sys, types
import socket

from twisted.internet import interfaces, reactor, protocol, error, address, defer, utils
from twisted.python import lockfile
from twisted.trial import unittest

from twisted.test.test_tcp import MyServerFactory, MyClientFactory


class FailedConnectionClientFactory(protocol.ClientFactory):
def __init__(self, onFail):
self.onFail = onFail

def clientConnectionFailed(self, connector, reason):
self.onFail.errback(reason)



class UnixSocketTestCase(unittest.TestCase):
"""
Test unix sockets.
"""
def test_peerBind(self):
"""
The address passed to the server factory's C{buildProtocol} method and
the address returned by the connected protocol's transport's C{getPeer}
method match the address the client socket is bound to.
"""
filename = self.mktemp()
peername = self.mktemp()
serverFactory = MyServerFactory()
connMade = serverFactory.protocolConnectionMade = defer.Deferred()
unixPort = reactor.listenUNIX(filename, serverFactory)
self.addCleanup(unixPort.stopListening)
unixSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.addCleanup(unixSocket.close)
unixSocket.bind(peername)
unixSocket.connect(filename)
def cbConnMade(proto):
expected = address.UNIXAddress(peername)
self.assertEqual(serverFactory.peerAddresses, [expected])
self.assertEqual(proto.transport.getPeer(), expected)
connMade.addCallback(cbConnMade)
return connMade


def test_dumber(self):
"""
L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
started with L{IReactorUNIX.listenUNIX}.
"""
filename = self.mktemp()
serverFactory = MyServerFactory()
serverConnMade = defer.Deferred()
serverFactory.protocolConnectionMade = serverConnMade
unixPort = reactor.listenUNIX(filename, serverFactory)
self.addCleanup(unixPort.stopListening)
clientFactory = MyClientFactory()
clientConnMade = defer.Deferred()
clientFactory.protocolConnectionMade = clientConnMade
c = reactor.connectUNIX(filename, clientFactory)
d = defer.gatherResults([serverConnMade, clientConnMade])
def allConnected((serverProtocol, clientProtocol)):

# Incidental assertion which may or may not be redundant with some
# other test. This probably deserves its own test method.
self.assertEqual(clientFactory.peerAddresses,
[address.UNIXAddress(filename)])

clientProtocol.transport.loseConnection()
serverProtocol.transport.loseConnection()
d.addCallback(allConnected)
return d


def test_pidFile(self):
"""
A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is
called and released when the Deferred returned by the L{IListeningPort}
provider's C{stopListening} method is called back.
"""
filename = self.mktemp()
serverFactory = MyServerFactory()
serverConnMade = defer.Deferred()
serverFactory.protocolConnectionMade = serverConnMade
unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
self.assertTrue(lockfile.isLocked(filename + ".lock"))

# XXX This part would test something about the checkPID parameter, but
# it doesn't actually. It should be rewritten to test the several
# different possible behaviors. -exarkun
clientFactory = MyClientFactory()
clientConnMade = defer.Deferred()
clientFactory.protocolConnectionMade = clientConnMade
c = reactor.connectUNIX(filename, clientFactory, checkPID=1)

d = defer.gatherResults([serverConnMade, clientConnMade])
def _portStuff((serverProtocol, clientProto)):

# Incidental assertion which may or may not be redundant with some
# other test. This probably deserves its own test method.
self.assertEqual(clientFactory.peerAddresses,
[address.UNIXAddress(filename)])

clientProto.transport.loseConnection()
serverProtocol.transport.loseConnection()
return unixPort.stopListening()
d.addCallback(_portStuff)

def _check(ignored):
self.failIf(lockfile.isLocked(filename + ".lock"), 'locked')
d.addCallback(_check)
return d


def test_socketLocking(self):
"""
L{IReactorUNIX.listenUNIX} raises L{error.CannotListenError} if passed
the name of a file on which a server is already listening.
"""
filename = self.mktemp()
serverFactory = MyServerFactory()
unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)

self.assertRaises(
error.CannotListenError,
reactor.listenUNIX, filename, serverFactory, wantPID=True)

def stoppedListening(ign):
unixPort = reactor.listenUNIX(filename, serverFactory, wantPID=True)
return unixPort.stopListening()

return unixPort.stopListening().addCallback(stoppedListening)


def _uncleanSocketTest(self, callback):
self.filename = self.mktemp()
source = ("from twisted.internet import protocol, reactor\n"
"reactor.listenUNIX(%r, protocol.ServerFactory(), wantPID=True)\n") % (self.filename,)
env = {'PYTHONPATH': os.pathsep.join(sys.path)}

d = utils.getProcessValue(sys.executable, ("-u", "-c", source), env=env)
d.addCallback(callback)
return d


def test_uncleanServerSocketLocking(self):
"""
If passed C{True} for the C{wantPID} parameter, a server can be started
listening with L{IReactorUNIX.listenUNIX} when passed the name of a
file on which a previous server which has not exited cleanly has been
listening using the C{wantPID} option.
"""
def ranStupidChild(ign):
# If this next call succeeds, our lock handling is correct.
p = reactor.listenUNIX(self.filename, MyServerFactory(), wantPID=True)
return p.stopListening()
return self._uncleanSocketTest(ranStupidChild)


def test_connectToUncleanServer(self):
"""
If passed C{True} for the C{checkPID} parameter, a client connection
attempt made with L{IReactorUNIX.connectUNIX} fails with
L{error.BadFileError}.
"""
def ranStupidChild(ign):
d = defer.Deferred()
f = FailedConnectionClientFactory(d)
c = reactor.connectUNIX(self.filename, f, checkPID=True)
return self.assertFailure(d, error.BadFileError)
return self._uncleanSocketTest(ranStupidChild)


def _reprTest(self, serverFactory, factoryName):
"""
Test the C{__str__} and C{__repr__} implementations of a UNIX port when
used with the given factory.
"""
filename = self.mktemp()
unixPort = reactor.listenUNIX(filename, serverFactory)

connectedString = "<%s on %r>" % (factoryName, filename)
self.assertEqual(repr(unixPort), connectedString)
self.assertEqual(str(unixPort), connectedString)

d = defer.maybeDeferred(unixPort.stopListening)
def stoppedListening(ign):
unconnectedString = "<%s (not listening)>" % (factoryName,)
self.assertEqual(repr(unixPort), unconnectedString)
self.assertEqual(str(unixPort), unconnectedString)
d.addCallback(stoppedListening)
return d


def test_reprWithClassicFactory(self):
"""
The two string representations of the L{IListeningPort} returned by
L{IReactorUNIX.listenUNIX} contains the name of the classic factory
class being used and the filename on which the port is listening or
indicates that the port is not listening.
"""
class ClassicFactory:
def doStart(self):
pass

def doStop(self):
pass

# Sanity check
self.assertIsInstance(ClassicFactory, types.ClassType)

return self._reprTest(
ClassicFactory(), "twisted.test.test_unix.ClassicFactory")


def test_reprWithNewStyleFactory(self):
"""
The two string representations of the L{IListeningPort} returned by
L{IReactorUNIX.listenUNIX} contains the name of the new-style factory
class being used and the filename on which the port is listening or
indicates that the port is not listening.
"""
class NewStyleFactory(object):
def doStart(self):
pass

def doStop(self):
pass

# Sanity check
self.assertIsInstance(NewStyleFactory, type)

return self._reprTest(
NewStyleFactory(), "twisted.test.test_unix.NewStyleFactory")



class ClientProto(protocol.ConnectedDatagramProtocol):
started = stopped = False
gotback = None

def __init__(self):
self.deferredStarted = defer.Deferred()
self.deferredGotBack = defer.Deferred()

def stopProtocol(self):
self.stopped = True

def startProtocol(self):
self.started = True
self.deferredStarted.callback(None)

def datagramReceived(self, data):
self.gotback = data
self.deferredGotBack.callback(None)

class ServerProto(protocol.DatagramProtocol):
started = stopped = False
gotwhat = gotfrom = None

def __init__(self):
self.deferredStarted = defer.Deferred()
self.deferredGotWhat = defer.Deferred()

def stopProtocol(self):
self.stopped = True

def startProtocol(self):
self.started = True
self.deferredStarted.callback(None)

def datagramReceived(self, data, addr):
self.gotfrom = addr
self.transport.write("hi back", addr)
self.gotwhat = data
self.deferredGotWhat.callback(None)



class DatagramUnixSocketTestCase(unittest.TestCase):
"""
Test datagram UNIX sockets.
"""
def test_exchange(self):
"""
Test that a datagram can be sent to and received by a server and vice
versa.
"""
clientaddr = self.mktemp()
serveraddr = self.mktemp()
sp = ServerProto()
cp = ClientProto()
s = reactor.listenUNIXDatagram(serveraddr, sp)
self.addCleanup(s.stopListening)
c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
self.addCleanup(c.stopListening)

d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
def write(ignored):
cp.transport.write("hi")
return defer.gatherResults([sp.deferredGotWhat,
cp.deferredGotBack])

def _cbTestExchange(ignored):
self.assertEqual("hi", sp.gotwhat)
self.assertEqual(clientaddr, sp.gotfrom)
self.assertEqual("hi back", cp.gotback)

d.addCallback(write)
d.addCallback(_cbTestExchange)
return d


def test_cannotListen(self):
"""
L{IReactorUNIXDatagram.listenUNIXDatagram} raises
L{error.CannotListenError} if the unix socket specified is already in
use.
"""
addr = self.mktemp()
p = ServerProto()
s = reactor.listenUNIXDatagram(addr, p)
self.failUnlessRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
s.stopListening()
os.unlink(addr)

# test connecting to bound and connected (somewhere else) address

def _reprTest(self, serverProto, protocolName):
"""
Test the C{__str__} and C{__repr__} implementations of a UNIX datagram
port when used with the given protocol.
"""
filename = self.mktemp()
unixPort = reactor.listenUNIXDatagram(filename, serverProto)

connectedString = "<%s on %r>" % (protocolName, filename)
self.assertEqual(repr(unixPort), connectedString)
self.assertEqual(str(unixPort), connectedString)

stopDeferred = defer.maybeDeferred(unixPort.stopListening)
def stoppedListening(ign):
unconnectedString = "<%s (not listening)>" % (protocolName,)
self.assertEqual(repr(unixPort), unconnectedString)
self.assertEqual(str(unixPort), unconnectedString)
stopDeferred.addCallback(stoppedListening)
return stopDeferred


def test_reprWithClassicProtocol(self):
"""
The two string representations of the L{IListeningPort} returned by
L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
classic protocol class being used and the filename on which the port is
listening or indicates that the port is not listening.
"""
class ClassicProtocol:
def makeConnection(self, transport):
pass

def doStop(self):
pass

# Sanity check
self.assertIsInstance(ClassicProtocol, types.ClassType)

return self._reprTest(
ClassicProtocol(), "twisted.test.test_unix.ClassicProtocol")


def test_reprWithNewStyleProtocol(self):
"""
The two string representations of the L{IListeningPort} returned by
L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
new-style protocol class being used and the filename on which the port
is listening or indicates that the port is not listening.
"""
class NewStyleProtocol(object):
def makeConnection(self, transport):
pass

def doStop(self):
pass

# Sanity check
self.assertIsInstance(NewStyleProtocol, type)

return self._reprTest(
NewStyleProtocol(), "twisted.test.test_unix.NewStyleProtocol")



if not interfaces.IReactorUNIX(reactor, None):
UnixSocketTestCase.skip = "This reactor does not support UNIX domain sockets"
if not interfaces.IReactorUNIXDatagram(reactor, None):
DatagramUnixSocketTestCase.skip = "This reactor does not support UNIX datagram sockets"