Index: third_party/twisted_8_1/twisted/test/test_stdio.py |
diff --git a/third_party/twisted_8_1/twisted/test/test_stdio.py b/third_party/twisted_8_1/twisted/test/test_stdio.py |
deleted file mode 100644 |
index b47b6cf293b2c72dd6951a6d1e772a7a73cfc68a..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/test/test_stdio.py |
+++ /dev/null |
@@ -1,265 +0,0 @@ |
-# Copyright (c) 2006-2007 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-import os, sys |
- |
-from twisted.trial import unittest |
-from twisted.python import filepath |
-from twisted.internet import error, defer, protocol, reactor |
- |
- |
-# A short string which is intended to appear here and nowhere else, |
-# particularly not in any random garbage output CPython unavoidable |
-# generates (such as in warning text and so forth). This is searched |
-# for in the output from stdio_test_lastwrite.py and if it is found at |
-# the end, the functionality works. |
-UNIQUE_LAST_WRITE_STRING = 'xyz123abc Twisted is great!' |
- |
- |
-class StandardIOTestProcessProtocol(protocol.ProcessProtocol): |
- """ |
- Test helper for collecting output from a child process and notifying |
- something when it exits. |
- |
- @ivar onConnection: A L{defer.Deferred} which will be called back with |
- C{None} when the connection to the child process is established. |
- |
- @ivar onCompletion: A L{defer.Deferred} which will be errbacked with the |
- failure associated with the child process exiting when it exits. |
- |
- @ivar onDataReceived: A L{defer.Deferred} which will be called back with |
- this instance whenever C{childDataReceived} is called, or C{None} to |
- suppress these callbacks. |
- |
- @ivar data: A C{dict} mapping file descriptors to strings containing all |
- bytes received from the child process on each file descriptor. |
- """ |
- onDataReceived = None |
- |
- def __init__(self): |
- self.onConnection = defer.Deferred() |
- self.onCompletion = defer.Deferred() |
- self.data = {} |
- |
- |
- def connectionMade(self): |
- self.onConnection.callback(None) |
- |
- |
- def childDataReceived(self, name, bytes): |
- """ |
- Record all bytes received from the child process in the C{data} |
- dictionary. Fire C{onDataReceived} if it is not C{None}. |
- """ |
- self.data[name] = self.data.get(name, '') + bytes |
- if self.onDataReceived is not None: |
- d, self.onDataReceived = self.onDataReceived, None |
- d.callback(self) |
- |
- |
- def processEnded(self, reason): |
- self.onCompletion.callback(reason) |
- |
- |
- |
-class StandardInputOutputTestCase(unittest.TestCase): |
- def _spawnProcess(self, proto, sibling, *args, **kw): |
- """ |
- Launch a child Python process and communicate with it using the |
- given ProcessProtocol. |
- |
- @param proto: A L{ProcessProtocol} instance which will be connected |
- to the child process. |
- |
- @param sibling: The basename of a file containing the Python program |
- to run in the child process. |
- |
- @param *args: strings which will be passed to the child process on |
- the command line as C{argv[2:]}. |
- |
- @param **kw: additional arguments to pass to L{reactor.spawnProcess}. |
- |
- @return: The L{IProcessTransport} provider for the spawned process. |
- """ |
- import twisted |
- subenv = dict(os.environ) |
- subenv['PYTHONPATH'] = os.pathsep.join( |
- [os.path.abspath( |
- os.path.dirname(os.path.dirname(twisted.__file__))), |
- subenv.get('PYTHONPATH', '') |
- ]) |
- args = [sys.executable, |
- filepath.FilePath(__file__).sibling(sibling).path, |
- reactor.__class__.__module__] + list(args) |
- return reactor.spawnProcess( |
- proto, |
- sys.executable, |
- args, |
- env=subenv, |
- **kw) |
- |
- |
- def _requireFailure(self, d, callback): |
- def cb(result): |
- self.fail("Process terminated with non-Failure: %r" % (result,)) |
- def eb(err): |
- return callback(err) |
- return d.addCallbacks(cb, eb) |
- |
- |
- def test_loseConnection(self): |
- """ |
- Verify that a protocol connected to L{StandardIO} can disconnect |
- itself using C{transport.loseConnection}. |
- """ |
- p = StandardIOTestProcessProtocol() |
- d = p.onCompletion |
- self._spawnProcess(p, 'stdio_test_loseconn.py') |
- |
- def processEnded(reason): |
- self.failIfIn(1, p.data) |
- reason.trap(error.ProcessDone) |
- return self._requireFailure(d, processEnded) |
- |
- |
- def test_lastWriteReceived(self): |
- """ |
- Verify that a write made directly to stdout using L{os.write} |
- after StandardIO has finished is reliably received by the |
- process reading that stdout. |
- """ |
- p = StandardIOTestProcessProtocol() |
- |
- # Note: the OS X bug which prompted the addition of this test |
- # is an apparent race condition involving non-blocking PTYs. |
- # Delaying the parent process significantly increases the |
- # likelihood of the race going the wrong way. If you need to |
- # fiddle with this code at all, uncommenting the next line |
- # will likely make your life much easier. It is commented out |
- # because it makes the test quite slow. |
- |
- # p.onConnection.addCallback(lambda ign: __import__('time').sleep(5)) |
- |
- try: |
- self._spawnProcess( |
- p, 'stdio_test_lastwrite.py', UNIQUE_LAST_WRITE_STRING, |
- usePTY=True) |
- except ValueError, e: |
- # Some platforms don't work with usePTY=True |
- raise unittest.SkipTest(str(e)) |
- |
- def processEnded(reason): |
- """ |
- Asserts that the parent received the bytes written by the child |
- immediately after the child starts. |
- """ |
- self.assertTrue( |
- p.data[1].endswith(UNIQUE_LAST_WRITE_STRING), |
- "Received %r from child, did not find expected bytes." % ( |
- p.data,)) |
- reason.trap(error.ProcessDone) |
- return self._requireFailure(p.onCompletion, processEnded) |
- |
- |
- def test_hostAndPeer(self): |
- """ |
- Verify that the transport of a protocol connected to L{StandardIO} |
- has C{getHost} and C{getPeer} methods. |
- """ |
- p = StandardIOTestProcessProtocol() |
- d = p.onCompletion |
- self._spawnProcess(p, 'stdio_test_hostpeer.py') |
- |
- def processEnded(reason): |
- host, peer = p.data[1].splitlines() |
- self.failUnless(host) |
- self.failUnless(peer) |
- reason.trap(error.ProcessDone) |
- return self._requireFailure(d, processEnded) |
- |
- |
- def test_write(self): |
- """ |
- Verify that the C{write} method of the transport of a protocol |
- connected to L{StandardIO} sends bytes to standard out. |
- """ |
- p = StandardIOTestProcessProtocol() |
- d = p.onCompletion |
- |
- self._spawnProcess(p, 'stdio_test_write.py') |
- |
- def processEnded(reason): |
- self.assertEquals(p.data[1], 'ok!') |
- reason.trap(error.ProcessDone) |
- return self._requireFailure(d, processEnded) |
- |
- |
- def test_writeSequence(self): |
- """ |
- Verify that the C{writeSequence} method of the transport of a |
- protocol connected to L{StandardIO} sends bytes to standard out. |
- """ |
- p = StandardIOTestProcessProtocol() |
- d = p.onCompletion |
- |
- self._spawnProcess(p, 'stdio_test_writeseq.py') |
- |
- def processEnded(reason): |
- self.assertEquals(p.data[1], 'ok!') |
- reason.trap(error.ProcessDone) |
- return self._requireFailure(d, processEnded) |
- |
- |
- def _junkPath(self): |
- junkPath = self.mktemp() |
- junkFile = file(junkPath, 'w') |
- for i in xrange(1024): |
- junkFile.write(str(i) + '\n') |
- junkFile.close() |
- return junkPath |
- |
- |
- def test_producer(self): |
- """ |
- Verify that the transport of a protocol connected to L{StandardIO} |
- is a working L{IProducer} provider. |
- """ |
- p = StandardIOTestProcessProtocol() |
- d = p.onCompletion |
- |
- written = [] |
- toWrite = range(100) |
- |
- def connectionMade(ign): |
- if toWrite: |
- written.append(str(toWrite.pop()) + "\n") |
- proc.write(written[-1]) |
- reactor.callLater(0.01, connectionMade, None) |
- |
- proc = self._spawnProcess(p, 'stdio_test_producer.py') |
- |
- p.onConnection.addCallback(connectionMade) |
- |
- def processEnded(reason): |
- self.assertEquals(p.data[1], ''.join(written)) |
- self.failIf(toWrite, "Connection lost with %d writes left to go." % (len(toWrite),)) |
- reason.trap(error.ProcessDone) |
- return self._requireFailure(d, processEnded) |
- |
- |
- def test_consumer(self): |
- """ |
- Verify that the transport of a protocol connected to L{StandardIO} |
- is a working L{IConsumer} provider. |
- """ |
- p = StandardIOTestProcessProtocol() |
- d = p.onCompletion |
- |
- junkPath = self._junkPath() |
- |
- self._spawnProcess(p, 'stdio_test_consumer.py', junkPath) |
- |
- def processEnded(reason): |
- self.assertEquals(p.data[1], file(junkPath).read()) |
- reason.trap(error.ProcessDone) |
- return self._requireFailure(d, processEnded) |