K2LL33D SHELL

 Apache/2.4.7 (Ubuntu)
 Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64
 uid=33(www-data) gid=33(www-data) groups=33(www-data)
 safemode : OFF
 MySQL: ON | Perl: ON | cURL: OFF | WGet: ON
  >  / usr / lib / python2.7 / dist-packages / twisted / test /
server ip : 104.21.89.46

your ip : 172.70.127.59

H O M E


Filename/usr/lib/python2.7/dist-packages/twisted/test/test_stdio.py
Size12.48 kb
Permissionrw-r--r--
Ownerroot : root
Create time27-Apr-2025 09:56
Last modified05-May-2012 02:49
Last accessed07-Jul-2025 00:09
Actionsedit | rename | delete | download (gzip)
Viewtext | code | image
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.internet.stdio}.
"""

import os, sys, itertools

from twisted.trial import unittest
from twisted.python import filepath, log
from twisted.python.runtime import platform
from twisted.internet import error, defer, protocol, stdio, reactor
from twisted.test.test_tcp import ConnectionLostNotifyingProtocol


# A short string which is intended to appear here and nowhere else,
# particularly not in any random garbage output CPython unavoidable
# generates (such as in warning text and so forth). This is searched
# for in the output from stdio_test_lastwrite.py and if it is found at
# the end, the functionality works.
UNIQUE_LAST_WRITE_STRING = 'xyz123abc Twisted is great!'

skipWindowsNopywin32 = None
if platform.isWindows():
try:
import win32process
except ImportError:
skipWindowsNopywin32 = ("On windows, spawnProcess is not available "
"in the absence of win32process.")


class StandardIOTestProcessProtocol(protocol.ProcessProtocol):
"""
Test helper for collecting output from a child process and notifying
something when it exits.

@ivar onConnection: A L{defer.Deferred} which will be called back with
C{None} when the connection to the child process is established.

@ivar onCompletion: A L{defer.Deferred} which will be errbacked with the
failure associated with the child process exiting when it exits.

@ivar onDataReceived: A L{defer.Deferred} which will be called back with
this instance whenever C{childDataReceived} is called, or C{None} to
suppress these callbacks.

@ivar data: A C{dict} mapping file descriptors to strings containing all
bytes received from the child process on each file descriptor.
"""
onDataReceived = None

def __init__(self):
self.onConnection = defer.Deferred()
self.onCompletion = defer.Deferred()
self.data = {}


def connectionMade(self):
self.onConnection.callback(None)


def childDataReceived(self, name, bytes):
"""
Record all bytes received from the child process in the C{data}
dictionary. Fire C{onDataReceived} if it is not C{None}.
"""
self.data[name] = self.data.get(name, '') + bytes
if self.onDataReceived is not None:
d, self.onDataReceived = self.onDataReceived, None
d.callback(self)


def processEnded(self, reason):
self.onCompletion.callback(reason)



class StandardInputOutputTestCase(unittest.TestCase):

skip = skipWindowsNopywin32

def _spawnProcess(self, proto, sibling, *args, **kw):
"""
Launch a child Python process and communicate with it using the
given ProcessProtocol.

@param proto: A L{ProcessProtocol} instance which will be connected
to the child process.

@param sibling: The basename of a file containing the Python program
to run in the child process.

@param *args: strings which will be passed to the child process on
the command line as C{argv[2:]}.

@param **kw: additional arguments to pass to L{reactor.spawnProcess}.

@return: The L{IProcessTransport} provider for the spawned process.
"""
import twisted
subenv = dict(os.environ)
subenv['PYTHONPATH'] = os.pathsep.join(
[os.path.abspath(
os.path.dirname(os.path.dirname(twisted.__file__))),
subenv.get('PYTHONPATH', '')
])
args = [sys.executable,
filepath.FilePath(__file__).sibling(sibling).path,
reactor.__class__.__module__] + list(args)
return reactor.spawnProcess(
proto,
sys.executable,
args,
env=subenv,
**kw)


def _requireFailure(self, d, callback):
def cb(result):
self.fail("Process terminated with non-Failure: %r" % (result,))
def eb(err):
return callback(err)
return d.addCallbacks(cb, eb)


def test_loseConnection(self):
"""
Verify that a protocol connected to L{StandardIO} can disconnect
itself using C{transport.loseConnection}.
"""
errorLogFile = self.mktemp()
log.msg("Child process logging to " + errorLogFile)
p = StandardIOTestProcessProtocol()
d = p.onCompletion
self._spawnProcess(p, 'stdio_test_loseconn.py', errorLogFile)

def processEnded(reason):
# Copy the child's log to ours so it's more visible.
for line in file(errorLogFile):
log.msg("Child logged: " + line.rstrip())

self.failIfIn(1, p.data)
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)


def test_readConnectionLost(self):
"""
When stdin is closed and the protocol connected to it implements
L{IHalfCloseableProtocol}, the protocol's C{readConnectionLost} method
is called.
"""
errorLogFile = self.mktemp()
log.msg("Child process logging to " + errorLogFile)
p = StandardIOTestProcessProtocol()
p.onDataReceived = defer.Deferred()

def cbBytes(ignored):
d = p.onCompletion
p.transport.closeStdin()
return d
p.onDataReceived.addCallback(cbBytes)

def processEnded(reason):
reason.trap(error.ProcessDone)
d = self._requireFailure(p.onDataReceived, processEnded)

self._spawnProcess(
p, 'stdio_test_halfclose.py', errorLogFile)
return d


def test_lastWriteReceived(self):
"""
Verify that a write made directly to stdout using L{os.write}
after StandardIO has finished is reliably received by the
process reading that stdout.
"""
p = StandardIOTestProcessProtocol()

# Note: the OS X bug which prompted the addition of this test
# is an apparent race condition involving non-blocking PTYs.
# Delaying the parent process significantly increases the
# likelihood of the race going the wrong way. If you need to
# fiddle with this code at all, uncommenting the next line
# will likely make your life much easier. It is commented out
# because it makes the test quite slow.

# p.onConnection.addCallback(lambda ign: __import__('time').sleep(5))

try:
self._spawnProcess(
p, 'stdio_test_lastwrite.py', UNIQUE_LAST_WRITE_STRING,
usePTY=True)
except ValueError, e:
# Some platforms don't work with usePTY=True
raise unittest.SkipTest(str(e))

def processEnded(reason):
"""
Asserts that the parent received the bytes written by the child
immediately after the child starts.
"""
self.assertTrue(
p.data[1].endswith(UNIQUE_LAST_WRITE_STRING),
"Received %r from child, did not find expected bytes." % (
p.data,))
reason.trap(error.ProcessDone)
return self._requireFailure(p.onCompletion, processEnded)


def test_hostAndPeer(self):
"""
Verify that the transport of a protocol connected to L{StandardIO}
has C{getHost} and C{getPeer} methods.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion
self._spawnProcess(p, 'stdio_test_hostpeer.py')

def processEnded(reason):
host, peer = p.data[1].splitlines()
self.failUnless(host)
self.failUnless(peer)
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)


def test_write(self):
"""
Verify that the C{write} method of the transport of a protocol
connected to L{StandardIO} sends bytes to standard out.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion

self._spawnProcess(p, 'stdio_test_write.py')

def processEnded(reason):
self.assertEqual(p.data[1], 'ok!')
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)


def test_writeSequence(self):
"""
Verify that the C{writeSequence} method of the transport of a
protocol connected to L{StandardIO} sends bytes to standard out.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion

self._spawnProcess(p, 'stdio_test_writeseq.py')

def processEnded(reason):
self.assertEqual(p.data[1], 'ok!')
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)


def _junkPath(self):
junkPath = self.mktemp()
junkFile = file(junkPath, 'w')
for i in xrange(1024):
junkFile.write(str(i) + '\n')
junkFile.close()
return junkPath


def test_producer(self):
"""
Verify that the transport of a protocol connected to L{StandardIO}
is a working L{IProducer} provider.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion

written = []
toWrite = range(100)

def connectionMade(ign):
if toWrite:
written.append(str(toWrite.pop()) + "\n")
proc.write(written[-1])
reactor.callLater(0.01, connectionMade, None)

proc = self._spawnProcess(p, 'stdio_test_producer.py')

p.onConnection.addCallback(connectionMade)

def processEnded(reason):
self.assertEqual(p.data[1], ''.join(written))
self.failIf(toWrite, "Connection lost with %d writes left to go." % (len(toWrite),))
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)


def test_consumer(self):
"""
Verify that the transport of a protocol connected to L{StandardIO}
is a working L{IConsumer} provider.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion

junkPath = self._junkPath()

self._spawnProcess(p, 'stdio_test_consumer.py', junkPath)

def processEnded(reason):
self.assertEqual(p.data[1], file(junkPath).read())
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)


def test_normalFileStandardOut(self):
"""
If L{StandardIO} is created with a file descriptor which refers to a
normal file (ie, a file from the filesystem), L{StandardIO.write}
writes bytes to that file. In particular, it does not immediately
consider the file closed or call its protocol's C{connectionLost}
method.
"""
onConnLost = defer.Deferred()
proto = ConnectionLostNotifyingProtocol(onConnLost)
path = filepath.FilePath(self.mktemp())
self.normal = normal = path.open('w')
self.addCleanup(normal.close)

kwargs = dict(stdout=normal.fileno())
if not platform.isWindows():
# Make a fake stdin so that StandardIO doesn't mess with the *real*
# stdin.
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
kwargs['stdin'] = r
connection = stdio.StandardIO(proto, **kwargs)

# The reactor needs to spin a bit before it might have incorrectly
# decided stdout is closed. Use this counter to keep track of how
# much we've let it spin. If it closes before we expected, this
# counter will have a value that's too small and we'll know.
howMany = 5
count = itertools.count()

def spin():
for value in count:
if value == howMany:
connection.loseConnection()
return
connection.write(str(value))
break
reactor.callLater(0, spin)
reactor.callLater(0, spin)

# Once the connection is lost, make sure the counter is at the
# appropriate value.
def cbLost(reason):
self.assertEqual(count.next(), howMany + 1)
self.assertEqual(
path.getContent(),
''.join(map(str, range(howMany))))
onConnLost.addCallback(cbLost)
return onConnLost

if platform.isWindows():
test_normalFileStandardOut.skip = (
"StandardIO does not accept stdout as an argument to Windows. "
"Testing redirection to a file is therefore harder.")