Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(434)

Unified Diff: third_party/twisted_8_1/twisted/test/test_process.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/test/test_process.py
diff --git a/third_party/twisted_8_1/twisted/test/test_process.py b/third_party/twisted_8_1/twisted/test/test_process.py
deleted file mode 100644
index 9bf05006b2f1ab1ed00ed74287c61752ee05a300..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/test/test_process.py
+++ /dev/null
@@ -1,2237 +0,0 @@
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Test running processes.
-"""
-
-import gzip
-import os
-import popen2
-import sys
-import signal
-import StringIO
-import errno
-import gc
-import warnings
-import socket
-try:
- import fcntl
-except ImportError:
- fcntl = None
-
-from zope.interface.verify import verifyObject
-
-from twisted.internet import reactor, protocol, error, interfaces, defer
-from twisted.internet import selectreactor
-from twisted.trial import unittest
-from twisted.python import util, runtime, procutils
-
-try:
- from twisted.internet import process
-except ImportError:
- process = None
-
-
-
-class StubProcessProtocol(protocol.ProcessProtocol):
- """
- ProcessProtocol counter-implementation: all methods on this class raise an
- exception, so instances of this may be used to verify that only certain
- methods are called.
- """
- def outReceived(self, data):
- raise NotImplementedError()
-
- def errReceived(self, data):
- raise NotImplementedError()
-
- def inConnectionLost(self):
- raise NotImplementedError()
-
- def outConnectionLost(self):
- raise NotImplementedError()
-
- def errConnectionLost(self):
- raise NotImplementedError()
-
-
-
-class ProcessProtocolTests(unittest.TestCase):
- """
- Tests for behavior provided by the process protocol base class,
- L{protocol.ProcessProtocol}.
- """
- def test_interface(self):
- """
- L{ProcessProtocol} implements L{IProcessProtocol}.
- """
- verifyObject(interfaces.IProcessProtocol, protocol.ProcessProtocol())
-
-
- def test_outReceived(self):
- """
- Verify that when stdout is delivered to
- L{ProcessProtocol.childDataReceived}, it is forwarded to
- L{ProcessProtocol.outReceived}.
- """
- received = []
- class OutProtocol(StubProcessProtocol):
- def outReceived(self, data):
- received.append(data)
-
- bytes = "bytes"
- p = OutProtocol()
- p.childDataReceived(1, bytes)
- self.assertEqual(received, [bytes])
-
-
- def test_errReceived(self):
- """
- Similar to L{test_outReceived}, but for stderr.
- """
- received = []
- class ErrProtocol(StubProcessProtocol):
- def errReceived(self, data):
- received.append(data)
-
- bytes = "bytes"
- p = ErrProtocol()
- p.childDataReceived(2, bytes)
- self.assertEqual(received, [bytes])
-
-
- def test_inConnectionLost(self):
- """
- Verify that when stdin close notification is delivered to
- L{ProcessProtocol.childConnectionLost}, it is forwarded to
- L{ProcessProtocol.inConnectionLost}.
- """
- lost = []
- class InLostProtocol(StubProcessProtocol):
- def inConnectionLost(self):
- lost.append(None)
-
- p = InLostProtocol()
- p.childConnectionLost(0)
- self.assertEqual(lost, [None])
-
-
- def test_outConnectionLost(self):
- """
- Similar to L{test_inConnectionLost}, but for stdout.
- """
- lost = []
- class OutLostProtocol(StubProcessProtocol):
- def outConnectionLost(self):
- lost.append(None)
-
- p = OutLostProtocol()
- p.childConnectionLost(1)
- self.assertEqual(lost, [None])
-
-
- def test_errConnectionLost(self):
- """
- Similar to L{test_inConnectionLost}, but for stderr.
- """
- lost = []
- class ErrLostProtocol(StubProcessProtocol):
- def errConnectionLost(self):
- lost.append(None)
-
- p = ErrLostProtocol()
- p.childConnectionLost(2)
- self.assertEqual(lost, [None])
-
-
-
-class TrivialProcessProtocol(protocol.ProcessProtocol):
- """
- Simple process protocol for tests purpose.
-
- @ivar outData: data received from stdin
- @ivar errData: data received from stderr
- """
-
- def __init__(self, d):
- """
- Create the deferred that will be fired at the end, and initialize
- data structures.
- """
- self.deferred = d
- self.outData = []
- self.errData = []
-
- def processEnded(self, reason):
- self.reason = reason
- self.deferred.callback(None)
-
- def outReceived(self, data):
- self.outData.append(data)
-
- def errReceived(self, data):
- self.errData.append(data)
-
-
-class TestProcessProtocol(protocol.ProcessProtocol):
-
- def connectionMade(self):
- self.stages = [1]
- self.data = ''
- self.err = ''
- self.transport.write("abcd")
-
- def childDataReceived(self, childFD, data):
- """
- Override and disable the dispatch provided by the base class to ensure
- that it is really this method which is being called, and the transport
- is not going directly to L{outReceived} or L{errReceived}.
- """
- if childFD == 1:
- self.data += data
- elif childFD == 2:
- self.err += data
-
-
- def childConnectionLost(self, childFD):
- """
- Similarly to L{childDataReceived}, disable the automatic dispatch
- provided by the base implementation to verify that the transport is
- calling this method directly.
- """
- if childFD == 1:
- self.stages.append(2)
- if self.data != "abcd":
- raise RuntimeError
- self.transport.write("1234")
- elif childFD == 2:
- self.stages.append(3)
- if self.err != "1234":
- print 'err != 1234: ' + repr(self.err)
- raise RuntimeError()
- self.transport.write("abcd")
- self.stages.append(4)
- elif childFD == 0:
- self.stages.append(5)
-
- def processEnded(self, reason):
- self.reason = reason
- self.deferred.callback(None)
-
-
-class EchoProtocol(protocol.ProcessProtocol):
-
- s = "1234567" * 1001
- n = 10
- finished = 0
-
- failure = None
-
- def __init__(self, onEnded):
- self.onEnded = onEnded
- self.count = 0
-
- def connectionMade(self):
- assert self.n > 2
- for i in range(self.n - 2):
- self.transport.write(self.s)
- # test writeSequence
- self.transport.writeSequence([self.s, self.s])
- self.buffer = self.s * self.n
-
- def outReceived(self, data):
- if buffer(self.buffer, self.count, len(data)) != buffer(data):
- self.failure = ("wrong bytes received", data, self.count)
- self.transport.closeStdin()
- else:
- self.count += len(data)
- if self.count == len(self.buffer):
- self.transport.closeStdin()
-
- def processEnded(self, reason):
- self.finished = 1
- if not reason.check(error.ProcessDone):
- self.failure = "process didn't terminate normally: " + str(reason)
- self.onEnded.callback(self)
-
-
-
-class SignalProtocol(protocol.ProcessProtocol):
- """
- A process protocol that sends a signal when data is first received.
-
- @ivar deferred: deferred firing on C{processEnded}.
- @type deferred: L{defer.Deferred}
-
- @ivar signal: the signal to send to the process.
- @type signal: C{str}
- """
-
- def __init__(self, deferred, sig):
- self.deferred = deferred
- self.signal = sig
-
-
- def outReceived(self, data):
- self.transport.signalProcess(self.signal)
-
-
- def processEnded(self, reason):
- """
- Callback C{self.deferred} with C{None} if C{reason} is a
- L{error.ProcessTerminated} failure with C{exitCode} set to C{None},
- C{signal} set to C{self.signal}, and C{status} holding the status code
- of the exited process. Otherwise, errback with a C{ValueError}
- describing the problem.
- """
- if not reason.check(error.ProcessTerminated):
- return self.deferred.errback(
- ValueError("wrong termination: %s" % (reason,)))
- v = reason.value
- signalValue = getattr(signal, 'SIG' + self.signal)
- if v.exitCode is not None:
- return self.deferred.errback(
- ValueError("SIG%s: exitCode is %s, not None" %
- (self.signal, v.exitCode)))
- if v.signal != signalValue:
- return self.deferred.errback(
- ValueError("SIG%s: .signal was %s, wanted %s" %
- (self.signal, v.signal, signalValue)))
- if os.WTERMSIG(v.status) != signalValue:
- return self.deferred.errback(
- ValueError('SIG%s: %s' % (self.signal, os.WTERMSIG(v.status))))
- self.deferred.callback(None)
-
-
-
-class TestManyProcessProtocol(TestProcessProtocol):
- def __init__(self):
- self.deferred = defer.Deferred()
-
- def processEnded(self, reason):
- self.reason = reason
- if reason.check(error.ProcessDone):
- self.deferred.callback(None)
- else:
- self.deferred.errback(reason)
-
-
-
-class UtilityProcessProtocol(protocol.ProcessProtocol):
- """
- Helper class for launching a Python process and getting a result from it.
-
- @ivar program: A string giving a Python program for the child process to
- run.
- """
- program = None
-
- def run(cls, reactor, argv, env):
- """
- Run a Python process connected to a new instance of this protocol
- class. Return the protocol instance.
-
- The Python process is given C{self.program} on the command line to
- execute, in addition to anything specified by C{argv}. C{env} is
- the complete environment.
- """
- exe = sys.executable
- self = cls()
- reactor.spawnProcess(
- self, exe, [exe, "-c", self.program] + argv, env=env)
- return self
- run = classmethod(run)
-
-
- def __init__(self):
- self.bytes = []
- self.requests = []
-
-
- def parseChunks(self, bytes):
- """
- Called with all bytes received on stdout when the process exits.
- """
- raise NotImplementedError()
-
-
- def getResult(self):
- """
- Return a Deferred which will fire with the result of L{parseChunks}
- when the child process exits.
- """
- d = defer.Deferred()
- self.requests.append(d)
- return d
-
-
- def _fireResultDeferreds(self, result):
- """
- Callback all Deferreds returned up until now by L{getResult}
- with the given result object.
- """
- requests = self.requests
- self.requests = None
- for d in requests:
- d.callback(result)
-
-
- def outReceived(self, bytes):
- """
- Accumulate output from the child process in a list.
- """
- self.bytes.append(bytes)
-
-
- def processEnded(self, reason):
- """
- Handle process termination by parsing all received output and firing
- any waiting Deferreds.
- """
- self._fireResultDeferreds(self.parseChunks(self.bytes))
-
-
-
-
-class GetArgumentVector(UtilityProcessProtocol):
- """
- Protocol which will read a serialized argv from a process and
- expose it to interested parties.
- """
- program = (
- "from sys import stdout, argv\n"
- "stdout.write(chr(0).join(argv))\n"
- "stdout.flush()\n")
-
- def parseChunks(self, chunks):
- """
- Parse the output from the process to which this protocol was
- connected, which is a single unterminated line of \\0-separated
- strings giving the argv of that process. Return this as a list of
- str objects.
- """
- return ''.join(chunks).split('\0')
-
-
-
-class GetEnvironmentDictionary(UtilityProcessProtocol):
- """
- Protocol which will read a serialized environment dict from a process
- and expose it to interested parties.
- """
- program = (
- "from sys import stdout\n"
- "from os import environ\n"
- "items = environ.iteritems()\n"
- "stdout.write(chr(0).join([k + chr(0) + v for k, v in items]))\n"
- "stdout.flush()\n")
-
- def parseChunks(self, chunks):
- """
- Parse the output from the process to which this protocol was
- connected, which is a single unterminated line of \\0-separated
- strings giving key value pairs of the environment from that process.
- Return this as a dictionary.
- """
- environString = ''.join(chunks)
- if not environString:
- return {}
- environ = iter(environString.split('\0'))
- d = {}
- while 1:
- try:
- k = environ.next()
- except StopIteration:
- break
- else:
- v = environ.next()
- d[k] = v
- return d
-
-
-
-class ProcessTestCase(unittest.TestCase):
- """Test running a process."""
-
- usePTY = False
-
- def testStdio(self):
- """twisted.internet.stdio test."""
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_twisted.py")
- p = Accumulator()
- d = p.endedDeferred = defer.Deferred()
- env = {"PYTHONPATH": os.pathsep.join(sys.path)}
- reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=env,
- path=None, usePTY=self.usePTY)
- p.transport.write("hello, world")
- p.transport.write("abc")
- p.transport.write("123")
- p.transport.closeStdin()
-
- def processEnded(ign):
- self.assertEquals(p.outF.getvalue(), "hello, worldabc123",
- "Output follows:\n"
- "%s\n"
- "Error message from process_twisted follows:\n"
- "%s\n" % (p.outF.getvalue(), p.errF.getvalue()))
- return d.addCallback(processEnded)
-
-
- def test_unsetPid(self):
- """
- Test if pid is None/non-None before/after process termination. This
- reuses process_echoer.py to get a process that blocks on stdin.
- """
- finished = defer.Deferred()
- p = TrivialProcessProtocol(finished)
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_echoer.py")
- procTrans = reactor.spawnProcess(p, exe,
- [exe, "-u", scriptPath], env=None)
- self.failUnless(procTrans.pid)
-
- def afterProcessEnd(ignored):
- self.assertEqual(procTrans.pid, None)
-
- p.transport.closeStdin()
- return finished.addCallback(afterProcessEnd)
-
-
- def test_process(self):
- """
- Test running a process: check its output, it exitCode, some property of
- signalProcess.
- """
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_tester.py")
- d = defer.Deferred()
- p = TestProcessProtocol()
- p.deferred = d
- reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
- def check(ignored):
- self.assertEquals(p.stages, [1, 2, 3, 4, 5])
- f = p.reason
- f.trap(error.ProcessTerminated)
- self.assertEquals(f.value.exitCode, 23)
- # would .signal be available on non-posix?
- # self.assertEquals(f.value.signal, None)
- self.assertRaises(
- error.ProcessExitedAlready, p.transport.signalProcess, 'INT')
- try:
- import process_tester, glob
- for f in glob.glob(process_tester.test_file_match):
- os.remove(f)
- except:
- pass
- d.addCallback(check)
- return d
-
- def testManyProcesses(self):
-
- def _check(results, protocols):
- for p in protocols:
- self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
- # test status code
- f = p.reason
- f.trap(error.ProcessTerminated)
- self.assertEquals(f.value.exitCode, 23)
-
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_tester.py")
- args = [exe, "-u", scriptPath]
- protocols = []
- deferreds = []
-
- for i in xrange(50):
- p = TestManyProcessProtocol()
- protocols.append(p)
- reactor.spawnProcess(p, exe, args, env=None)
- deferreds.append(p.deferred)
-
- deferredList = defer.DeferredList(deferreds, consumeErrors=True)
- deferredList.addCallback(_check, protocols)
- return deferredList
-
- def testEcho(self):
- finished = defer.Deferred()
- p = EchoProtocol(finished)
-
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_echoer.py")
- reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
-
- def asserts(ignored):
- self.failIf(p.failure, p.failure)
- self.failUnless(hasattr(p, 'buffer'))
- self.assertEquals(len(''.join(p.buffer)), len(p.s * p.n))
-
- def takedownProcess(err):
- p.transport.closeStdin()
- return err
-
- return finished.addCallback(asserts).addErrback(takedownProcess)
- testEcho.timeout = 60 # XXX This should not be. There is already a
- # global timeout value. Why do you think this
- # test can complete more quickly?
-
-
- def testCommandLine(self):
- args = [r'a\"b ', r'a\b ', r' a\\"b', r' a\\b', r'"foo bar" "', '\tab', '"\\', 'a"b', "a'b"]
- pyExe = sys.executable
- scriptPath = util.sibpath(__file__, "process_cmdline.py")
- p = Accumulator()
- d = p.endedDeferred = defer.Deferred()
- reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath]+args, env=None,
- path=None)
-
- def processEnded(ign):
- self.assertEquals(p.errF.getvalue(), "")
- recvdArgs = p.outF.getvalue().splitlines()
- self.assertEquals(recvdArgs, args)
- return d.addCallback(processEnded)
-
-
- def test_wrongArguments(self):
- """
- Test invalid arguments to spawnProcess: arguments and environment
- must only contains string or unicode, and not null bytes.
- """
- exe = sys.executable
- p = protocol.ProcessProtocol()
-
- badEnvs = [
- {"foo": 2},
- {"foo": "egg\0a"},
- {3: "bar"},
- {"bar\0foo": "bar"}]
-
- badArgs = [
- [exe, 2],
- "spam",
- [exe, "foo\0bar"]]
-
- # Sanity check - this will fail for people who have mucked with
- # their site configuration in a stupid way, but there's nothing we
- # can do about that.
- badUnicode = u'\N{SNOWMAN}'
- try:
- badUnicode.encode(sys.getdefaultencoding())
- except UnicodeEncodeError:
- # Okay, that unicode doesn't encode, put it in as a bad environment
- # key.
- badEnvs.append({badUnicode: 'value for bad unicode key'})
- badEnvs.append({'key for bad unicode value': badUnicode})
- badArgs.append([exe, badUnicode])
- else:
- # It _did_ encode. Most likely, Gtk2 is being used and the
- # default system encoding is UTF-8, which can encode anything.
- # In any case, if implicit unicode -> str conversion works for
- # that string, we can't test that TypeError gets raised instead,
- # so just leave it off.
- pass
-
- for env in badEnvs:
- self.assertRaises(
- TypeError,
- reactor.spawnProcess, p, exe, [exe, "-c", ""], env=env)
-
- for args in badArgs:
- self.assertRaises(
- TypeError,
- reactor.spawnProcess, p, exe, args, env=None)
-
-
- # Use upper-case so that the environment key test uses an upper case
- # name: some versions of Windows only support upper case environment
- # variable names, and I think Python (as of 2.5) doesn't use the right
- # syscall for lowercase or mixed case names to work anyway.
- okayUnicode = u"UNICODE"
- encodedValue = "UNICODE"
-
- def _deprecatedUnicodeSupportTest(self, processProtocolClass, argv=[], env={}):
- """
- Check that a deprecation warning is emitted when passing unicode to
- spawnProcess for an argv value or an environment key or value.
- Check that the warning is of the right type, has the right message,
- and refers to the correct file. Unfortunately, don't check that the
- line number is correct, because that is too hard for me to figure
- out.
-
- @param processProtocolClass: A L{UtilityProcessProtocol} subclass
- which will be instantiated to communicate with the child process.
-
- @param argv: The argv argument to spawnProcess.
-
- @param env: The env argument to spawnProcess.
-
- @return: A Deferred which fires when the test is complete.
- """
- # Sanity to check to make sure we can actually encode this unicode
- # with the default system encoding. This may be excessively
- # paranoid. -exarkun
- self.assertEqual(
- self.okayUnicode.encode(sys.getdefaultencoding()),
- self.encodedValue)
-
- p = self.assertWarns(DeprecationWarning,
- "Argument strings and environment keys/values passed to "
- "reactor.spawnProcess should be str, not unicode.", __file__,
- processProtocolClass.run, reactor, argv, env)
- return p.getResult()
-
-
- def test_deprecatedUnicodeArgvSupport(self):
- """
- Test that a unicode string passed for an argument value is allowed
- if it can be encoded with the default system encoding, but that a
- deprecation warning is emitted.
- """
- d = self._deprecatedUnicodeSupportTest(GetArgumentVector, argv=[self.okayUnicode])
- def gotArgVector(argv):
- self.assertEqual(argv, ['-c', self.encodedValue])
- d.addCallback(gotArgVector)
- return d
-
-
- def test_deprecatedUnicodeEnvKeySupport(self):
- """
- Test that a unicode string passed for the key of the environment
- dictionary is allowed if it can be encoded with the default system
- encoding, but that a deprecation warning is emitted.
- """
- d = self._deprecatedUnicodeSupportTest(
- GetEnvironmentDictionary, env={self.okayUnicode: self.encodedValue})
- def gotEnvironment(environ):
- self.assertEqual(environ[self.encodedValue], self.encodedValue)
- d.addCallback(gotEnvironment)
- return d
-
-
- def test_deprecatedUnicodeEnvValueSupport(self):
- """
- Test that a unicode string passed for the value of the environment
- dictionary is allowed if it can be encoded with the default system
- encoding, but that a deprecation warning is emitted.
- """
- d = self._deprecatedUnicodeSupportTest(
- GetEnvironmentDictionary, env={self.encodedValue: self.okayUnicode})
- def gotEnvironment(environ):
- # On Windows, the environment contains more things than we
- # specified, so only make sure that at least the key we wanted
- # is there, rather than testing the dictionary for exact
- # equality.
- self.assertEqual(environ[self.encodedValue], self.encodedValue)
- d.addCallback(gotEnvironment)
- return d
-
-
-
-class TwoProcessProtocol(protocol.ProcessProtocol):
- num = -1
- finished = 0
- def __init__(self):
- self.deferred = defer.Deferred()
- def outReceived(self, data):
- pass
- def processEnded(self, reason):
- self.finished = 1
- self.deferred.callback(None)
-
-class TestTwoProcessesBase:
- def setUp(self):
- self.processes = [None, None]
- self.pp = [None, None]
- self.done = 0
- self.verbose = 0
-
- def createProcesses(self, usePTY=0):
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_reader.py")
- for num in (0,1):
- self.pp[num] = TwoProcessProtocol()
- self.pp[num].num = num
- p = reactor.spawnProcess(self.pp[num],
- exe, [exe, "-u", scriptPath], env=None,
- usePTY=usePTY)
- self.processes[num] = p
-
- def close(self, num):
- if self.verbose: print "closing stdin [%d]" % num
- p = self.processes[num]
- pp = self.pp[num]
- self.failIf(pp.finished, "Process finished too early")
- p.loseConnection()
- if self.verbose: print self.pp[0].finished, self.pp[1].finished
-
- def _onClose(self):
- return defer.gatherResults([ p.deferred for p in self.pp ])
-
- def testClose(self):
- if self.verbose: print "starting processes"
- self.createProcesses()
- reactor.callLater(1, self.close, 0)
- reactor.callLater(2, self.close, 1)
- return self._onClose()
-
-class TestTwoProcessesNonPosix(TestTwoProcessesBase, unittest.TestCase):
- pass
-
-class TestTwoProcessesPosix(TestTwoProcessesBase, unittest.TestCase):
- def tearDown(self):
- for pp, pr in zip(self.pp, self.processes):
- if not pp.finished:
- try:
- os.kill(pr.pid, signal.SIGTERM)
- except OSError:
- # If the test failed the process may already be dead
- # The error here is only noise
- pass
- return self._onClose()
-
- def kill(self, num):
- if self.verbose: print "kill [%d] with SIGTERM" % num
- p = self.processes[num]
- pp = self.pp[num]
- self.failIf(pp.finished, "Process finished too early")
- os.kill(p.pid, signal.SIGTERM)
- if self.verbose: print self.pp[0].finished, self.pp[1].finished
-
- def testKill(self):
- if self.verbose: print "starting processes"
- self.createProcesses(usePTY=0)
- reactor.callLater(1, self.kill, 0)
- reactor.callLater(2, self.kill, 1)
- return self._onClose()
-
- def testClosePty(self):
- if self.verbose: print "starting processes"
- self.createProcesses(usePTY=1)
- reactor.callLater(1, self.close, 0)
- reactor.callLater(2, self.close, 1)
- return self._onClose()
-
- def testKillPty(self):
- if self.verbose: print "starting processes"
- self.createProcesses(usePTY=1)
- reactor.callLater(1, self.kill, 0)
- reactor.callLater(2, self.kill, 1)
- return self._onClose()
-
-class FDChecker(protocol.ProcessProtocol):
- state = 0
- data = ""
- failed = None
-
- def __init__(self, d):
- self.deferred = d
-
- def fail(self, why):
- self.failed = why
- self.deferred.callback(None)
-
- def connectionMade(self):
- self.transport.writeToChild(0, "abcd")
- self.state = 1
-
- def childDataReceived(self, childFD, data):
- if self.state == 1:
- if childFD != 1:
- self.fail("read '%s' on fd %d (not 1) during state 1" \
- % (childFD, data))
- return
- self.data += data
- #print "len", len(self.data)
- if len(self.data) == 6:
- if self.data != "righto":
- self.fail("got '%s' on fd1, expected 'righto'" \
- % self.data)
- return
- self.data = ""
- self.state = 2
- #print "state2", self.state
- self.transport.writeToChild(3, "efgh")
- return
- if self.state == 2:
- self.fail("read '%s' on fd %s during state 2" % (childFD, data))
- return
- if self.state == 3:
- if childFD != 1:
- self.fail("read '%s' on fd %s (not 1) during state 3" \
- % (childFD, data))
- return
- self.data += data
- if len(self.data) == 6:
- if self.data != "closed":
- self.fail("got '%s' on fd1, expected 'closed'" \
- % self.data)
- return
- self.state = 4
- return
- if self.state == 4:
- self.fail("read '%s' on fd %s during state 4" % (childFD, data))
- return
-
- def childConnectionLost(self, childFD):
- if self.state == 1:
- self.fail("got connectionLost(%d) during state 1" % childFD)
- return
- if self.state == 2:
- if childFD != 4:
- self.fail("got connectionLost(%d) (not 4) during state 2" \
- % childFD)
- return
- self.state = 3
- self.transport.closeChildFD(5)
- return
-
- def processEnded(self, status):
- rc = status.value.exitCode
- if self.state != 4:
- self.fail("processEnded early, rc %d" % rc)
- return
- if status.value.signal != None:
- self.fail("processEnded with signal %s" % status.value.signal)
- return
- if rc != 0:
- self.fail("processEnded with rc %d" % rc)
- return
- self.deferred.callback(None)
-
-
-class FDTest(unittest.TestCase):
-
- def testFD(self):
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_fds.py")
- d = defer.Deferred()
- p = FDChecker(d)
- reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
- path=None,
- childFDs={0:"w", 1:"r", 2:2,
- 3:"w", 4:"r", 5:"w"})
- d.addCallback(lambda x : self.failIf(p.failed, p.failed))
- return d
-
- def testLinger(self):
- # See what happens when all the pipes close before the process
- # actually stops. This test *requires* SIGCHLD catching to work,
- # as there is no other way to find out the process is done.
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_linger.py")
- p = Accumulator()
- d = p.endedDeferred = defer.Deferred()
- reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
- path=None,
- childFDs={1:"r", 2:2},
- )
- def processEnded(ign):
- self.failUnlessEqual(p.outF.getvalue(),
- "here is some text\ngoodbye\n")
- return d.addCallback(processEnded)
-
-
-
-class Accumulator(protocol.ProcessProtocol):
- """Accumulate data from a process."""
-
- closed = 0
- endedDeferred = None
-
- def connectionMade(self):
- self.outF = StringIO.StringIO()
- self.errF = StringIO.StringIO()
-
- def outReceived(self, d):
- self.outF.write(d)
-
- def errReceived(self, d):
- self.errF.write(d)
-
- def outConnectionLost(self):
- pass
-
- def errConnectionLost(self):
- pass
-
- def processEnded(self, reason):
- self.closed = 1
- if self.endedDeferred is not None:
- d, self.endedDeferred = self.endedDeferred, None
- d.callback(None)
-
-
-class PosixProcessBase:
- """
- Test running processes.
- """
- usePTY = False
-
- def getCommand(self, commandName):
- """
- Return the path of the shell command named C{commandName}, looking at
- common locations.
- """
- if os.path.exists('/bin/%s' % (commandName,)):
- cmd = '/bin/%s' % (commandName,)
- elif os.path.exists('/usr/bin/%s' % (commandName,)):
- cmd = '/usr/bin/%s' % (commandName,)
- else:
- raise RuntimeError(
- "%s not found in /bin or /usr/bin" % (commandName,))
- return cmd
-
- def testNormalTermination(self):
- cmd = self.getCommand('true')
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- reactor.spawnProcess(p, cmd, ['true'], env=None,
- usePTY=self.usePTY)
- def check(ignored):
- p.reason.trap(error.ProcessDone)
- self.assertEquals(p.reason.value.exitCode, 0)
- self.assertEquals(p.reason.value.signal, None)
- d.addCallback(check)
- return d
-
-
- def test_abnormalTermination(self):
- """
- When a process terminates with a system exit code set to 1,
- C{processEnded} is called with a L{error.ProcessTerminated} error,
- the C{exitCode} attribute reflecting the system exit code.
- """
- exe = sys.executable
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- reactor.spawnProcess(p, exe, [exe, '-c', 'import sys; sys.exit(1)'],
- env=None, usePTY=self.usePTY)
-
- def check(ignored):
- p.reason.trap(error.ProcessTerminated)
- self.assertEquals(p.reason.value.exitCode, 1)
- self.assertEquals(p.reason.value.signal, None)
- d.addCallback(check)
- return d
-
-
- def _testSignal(self, sig):
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_signal.py")
- d = defer.Deferred()
- p = SignalProtocol(d, sig)
- reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
- usePTY=self.usePTY)
- return d
-
-
- def test_signalHUP(self):
- """
- Sending the SIGHUP signal to a running process interrupts it, and
- C{processEnded} is called with a L{error.ProcessTerminated} instance
- with the C{exitCode} set to C{None} and the C{signal} attribute set to
- C{signal.SIGHUP}. C{os.WTERMSIG} can also be used on the C{status}
- attribute to extract the signal value.
- """
- return self._testSignal('HUP')
-
-
- def test_signalINT(self):
- """
- Sending the SIGINT signal to a running process interrupts it, and
- C{processEnded} is called with a L{error.ProcessTerminated} instance
- with the C{exitCode} set to C{None} and the C{signal} attribute set to
- C{signal.SIGINT}. C{os.WTERMSIG} can also be used on the C{status}
- attribute to extract the signal value.
- """
- return self._testSignal('INT')
-
-
- def test_signalKILL(self):
- """
- Sending the SIGKILL signal to a running process interrupts it, and
- C{processEnded} is called with a L{error.ProcessTerminated} instance
- with the C{exitCode} set to C{None} and the C{signal} attribute set to
- C{signal.SIGKILL}. C{os.WTERMSIG} can also be used on the C{status}
- attribute to extract the signal value.
- """
- return self._testSignal('KILL')
-
-
- def test_signalTERM(self):
- """
- Sending the SIGTERM signal to a running process interrupts it, and
- C{processEnded} is called with a L{error.ProcessTerminated} instance
- with the C{exitCode} set to C{None} and the C{signal} attribute set to
- C{signal.SIGTERM}. C{os.WTERMSIG} can also be used on the C{status}
- attribute to extract the signal value.
- """
- return self._testSignal('TERM')
-
-
- def test_executionError(self):
- """
- Raise an error during execvpe to check error management.
- """
- cmd = self.getCommand('false')
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- def buggyexecvpe(command, args, environment):
- raise RuntimeError("Ouch")
- oldexecvpe = os.execvpe
- os.execvpe = buggyexecvpe
- try:
- reactor.spawnProcess(p, cmd, ['false'], env=None,
- usePTY=self.usePTY)
-
- def check(ignored):
- errData = "".join(p.errData + p.outData)
- self.assertIn("Upon execvpe", errData)
- self.assertIn("Ouch", errData)
- d.addCallback(check)
- finally:
- os.execvpe = oldexecvpe
- return d
-
-
-
-class MockOS(object):
- """
- The mock OS: overwrite L{os}, L{fcntl} and {sys} functions with fake ones.
-
- @ivar exited: set to True when C{_exit} is called.
- @type exited: C{bool}
-
- @ivar O_RDWR: dumb value faking C{os.O_RDWR}.
- @type O_RDWR: C{int}
-
- @ivar O_NOCTTY: dumb value faking C{os.O_NOCTTY}.
- @type O_NOCTTY: C{int}
-
- @ivar WNOHANG: dumb value faking C{os.WNOHANG}.
- @type WNOHANG: C{int}
-
- @ivar raiseFork: if not C{None}, subsequent calls to fork will raise this
- object.
- @type raiseFork: C{NoneType} or C{Exception}
-
- @ivar raiseExec: if set, subsequent calls to execvpe will raise an error.
- @type raiseExec: C{bool}
-
- @ivar fdio: fake file object returned by calls to fdopen.
- @type fdio: C{StringIO.StringIO}
-
- @ivar actions: hold names of some actions executed by the object, in order
- of execution.
-
- @type actions: C{list} of C{str}
-
- @ivar closed: keep track of the file descriptor closed.
- @param closed: C{list} of C{int}
-
- @ivar child: whether fork return for the child or the parent.
- @type child: C{bool}
-
- @ivar pipeCount: count the number of time that C{os.pipe} has been called.
- @type pipeCount: C{int}
-
- @ivar raiseWaitPid: if set, subsequent calls to waitpid will raise an
- the error specified.
- @type raiseWaitPid: C{None} or a class
-
- @ivar waitChild: if set, subsequent calls to waitpid will return it.
- @type waitChild: C{None} or a tuple
- """
- exited = False
- O_RDWR = 1
- O_NOCTTY = 1
- WNOHANG = 1
- raiseExec = False
- fdio = None
- child = True
- raiseWaitPid = None
- raiseFork = None
- waitChild = None
-
- def __init__(self):
- """
- Initialiaze data structures.
- """
- self.actions = []
- self.closed = []
- self.pipeCount = 0
-
-
- def open(self, dev, flags):
- """
- Fake C{os.open}. Return a non fd number to be sure it's not used
- elsewhere.
- """
- return -3
-
-
- def fdopen(self, fd, flag):
- """
- Fake C{os.fdopen}. Return a StringIO object whose content can be tested
- later via C{self.fdio}.
- """
- self.fdio = StringIO.StringIO()
- return self.fdio
-
-
- def setsid(self):
- """
- Fake C{os.setsid}. Do nothing.
- """
-
-
- def fork(self):
- """
- Fake C{os.fork}. Save the action in C{self.actions}, and return 0 if
- C{self.child} is set, or a dumb number.
- """
- self.actions.append(('fork', gc.isenabled()))
- if self.raiseFork is not None:
- raise self.raiseFork
- elif self.child:
- # Child result is 0
- return 0
- else:
- return 21
-
-
- def close(self, fd):
- """
- Fake C{os.close}, saving the closed fd in C{self.closed}.
- """
- self.closed.append(fd)
-
-
- def dup2(self, fd1, fd2):
- """
- Fake C{os.dup2}. Do nothing.
- """
-
-
- def write(self, fd, data):
- """
- Fake C{os.write}. Do nothing.
- """
-
-
- def execvpe(self, command, args, env):
- """
- Fake C{os.execvpe}. Save the action, and raise an error if
- C{self.raiseExec} is set.
- """
- self.actions.append('exec')
- if self.raiseExec:
- raise RuntimeError("Bar")
-
-
- def pipe(self):
- """
- Fake C{os.pipe}. Return non fd numbers to be sure it's not used
- elsewhere, and increment C{self.pipeCount}. This is used to uniquify
- the result.
- """
- self.pipeCount += 1
- return - 2 * self.pipeCount + 1, - 2 * self.pipeCount
-
-
- def ttyname(self, fd):
- """
- Fake C{os.ttyname}. Return a dumb string.
- """
- return "foo"
-
-
- def _exit(self, code):
- """
- Fake C{os._exit}. Save the action, set the C{self.exited} flag, and
- raise C{SystemError}.
- """
- self.actions.append('exit')
- self.exited = True
- # Don't forget to raise an error, or you'll end up in parent
- # code path.
- raise SystemError()
-
-
- def ioctl(self, fd, flags, arg):
- """
- Override C{fcntl.ioctl}. Do nothing.
- """
-
-
- def setNonBlocking(self, fd):
- """
- Override C{fdesc.setNonBlocking}. Do nothing.
- """
-
-
- def waitpid(self, pid, options):
- """
- Override C{os.waitpid}. Return values meaning that the child process
- has exited, save executed action.
- """
- self.actions.append('waitpid')
- if self.raiseWaitPid is not None:
- raise self.raiseWaitPid
- if self.waitChild is not None:
- return self.waitChild
- return 1, 0
-
-
- def settrace(self, arg):
- """
- Override C{sys.settrace} to keep coverage working.
- """
-
-
- def getegid(self):
- """
- Override C{os.getegid}. Return a dumb number.
- """
- return 1234
-
-
- def getgid(self):
- """
- Override C{os.getgid}. Return a dumb number.
- """
- return 1235
-
-
- def geteuid(self):
- """
- Override C{os.geteuid}. Return a dumb number.
- """
- return 1236
-
-
- def getuid(self):
- """
- Override C{os.getuid}. Return a dumb number.
- """
- return 1237
-
-
- def setuid(self, val):
- """
- Override C{os.setuid}. Do nothing.
- """
- self.actions.append(('setuid', val))
-
-
- def setgid(self, val):
- """
- Override C{os.setgid}. Do nothing.
- """
- self.actions.append(('setgid', val))
-
-
- def setregid(self, val1, val2):
- """
- Override C{os.setregid}. Do nothing.
- """
- self.actions.append(('setregid', val1, val2))
-
-
- def setreuid(self, val1, val2):
- """
- Override C{os.setreuid}. Save the action.
- """
- self.actions.append(('setreuid', val1, val2))
-
-
- def switchUID(self, uid, gid):
- """
- Override C{util.switchuid}. Save the action.
- """
- self.actions.append(('switchuid', uid, gid))
-
-
-
-if process is not None:
- class DumbProcessWriter(process.ProcessWriter):
- """
- A fake L{process.ProcessWriter} used for tests.
- """
-
- def startReading(self):
- """
- Here's the faking: don't do anything here.
- """
-
-
-
- class DumbProcessReader(process.ProcessReader):
- """
- A fake L{process.ProcessReader} used for tests.
- """
-
- def startReading(self):
- """
- Here's the faking: don't do anything here.
- """
-
-
-
- class DumbPTYProcess(process.PTYProcess):
- """
- A fake L{process.PTYProcess} used for tests.
- """
-
- def startReading(self):
- """
- Here's the faking: don't do anything here.
- """
-
-
-
-class MockProcessTestCase(unittest.TestCase):
- """
- Mock a process runner to test forked child code path.
- """
-
- def setUp(self):
- """
- Replace L{process} os, fcntl, sys, switchUID modules with the mock
- class L{MockOS}.
- """
- if gc.isenabled():
- self.addCleanup(gc.enable)
- else:
- self.addCleanup(gc.disable)
- self.mockos = MockOS()
- self.oldos = os
- self.oldfcntl = fcntl
- self.oldsys = sys
- self.oldSwitchUID = util.switchUID
- self.oldFdesc = process.fdesc
- process.os = self.mockos
- process.fcntl = self.mockos
- process.sys = self.mockos
- process.switchUID = self.mockos.switchUID
- process.fdesc = self.mockos
- process.Process.processReaderFactory = DumbProcessReader
- process.Process.processWriterFactory = DumbProcessWriter
-
-
- def tearDown(self):
- """
- Restore L{process} modules, and reset processes registered for reap.
- """
- process.os = self.oldos
- process.fcntl = self.oldfcntl
- process.sys = self.oldsys
- process.switchUID = self.oldSwitchUID
- process.fdesc = self.oldFdesc
- process.Process.processReaderFactory = process.ProcessReader
- process.Process.processWriterFactory = process.ProcessWriter
- process.reapProcessHandlers = {}
-
-
- def test_mockFork(self):
- """
- Test a classic spawnProcess. Check the path of the client code:
- fork, exec, exit.
- """
- gc.enable()
-
- cmd = '/mock/ouch'
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- try:
- reactor.spawnProcess(p, cmd, ['ouch'], env=None,
- usePTY=False)
- except SystemError:
- self.assert_(self.mockos.exited)
- self.assertEquals(
- self.mockos.actions, [("fork", False), "exec", "exit"])
- else:
- self.fail("Should not be here")
-
- # It should leave the garbage collector disabled.
- self.assertFalse(gc.isenabled())
-
-
- def _mockForkInParentTest(self):
- """
- Assert that in the main process, spawnProcess disables the garbage
- collector, calls fork, closes the pipe file descriptors it created for
- the child process, and calls waitpid.
- """
- self.mockos.child = False
- cmd = '/mock/ouch'
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- reactor.spawnProcess(p, cmd, ['ouch'], env=None,
- usePTY=False)
- # It should close the first read pipe, and the 2 last write pipes
- self.assertEqual(self.mockos.closed, [-1, -4, -6])
- self.assertEquals(self.mockos.actions, [("fork", False), "waitpid"])
-
-
- def test_mockForkInParentGarbageCollectorEnabled(self):
- """
- The garbage collector should be enabled when L{reactor.spawnProcess}
- returns if it was initially enabled.
-
- @see L{_mockForkInParentTest}
- """
- gc.enable()
- self._mockForkInParentTest()
- self.assertTrue(gc.isenabled())
-
-
- def test_mockForkInParentGarbageCollectorDisabled(self):
- """
- The garbage collector should be disabled when L{reactor.spawnProcess}
- returns if it was initially disabled.
-
- @see L{_mockForkInParentTest}
- """
- gc.disable()
- self._mockForkInParentTest()
- self.assertFalse(gc.isenabled())
-
-
- def test_mockForkTTY(self):
- """
- Test a TTY spawnProcess: check the path of the client code:
- fork, exec, exit.
- """
- cmd = '/mock/ouch'
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- try:
- reactor.spawnProcess(p, cmd, ['ouch'], env=None,
- usePTY=True)
- except SystemError:
- self.assert_(self.mockos.exited)
- self.assertEquals(
- self.mockos.actions, [("fork", False), "exec", "exit"])
- else:
- self.fail("Should not be here")
-
-
- def _mockWithForkError(self):
- """
- Assert that if the fork call fails, no other process setup calls are
- made and that spawnProcess raises the exception fork raised.
- """
- self.mockos.raiseFork = OSError(errno.EAGAIN, None)
- protocol = TrivialProcessProtocol(None)
- self.assertRaises(OSError, reactor.spawnProcess, protocol, None)
- self.assertEqual(self.mockos.actions, [("fork", False)])
-
-
- def test_mockWithForkErrorGarbageCollectorEnabled(self):
- """
- The garbage collector should be enabled when L{reactor.spawnProcess}
- raises because L{os.fork} raised, if it was initially enabled.
- """
- gc.enable()
- self._mockWithForkError()
- self.assertTrue(gc.isenabled())
-
-
- def test_mockWithForkErrorGarbageCollectorDisabled(self):
- """
- The garbage collector should be disabled when
- L{reactor.spawnProcess} raises because L{os.fork} raised, if it was
- initially disabled.
- """
- gc.disable()
- self._mockWithForkError()
- self.assertFalse(gc.isenabled())
-
-
- def test_mockWithExecError(self):
- """
- Spawn a process but simulate an error during execution in the client
- path: C{os.execvpe} raises an error. It should close all the standard
- fds, try to print the error encountered, and exit cleanly.
- """
- cmd = '/mock/ouch'
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- self.mockos.raiseExec = True
- try:
- reactor.spawnProcess(p, cmd, ['ouch'], env=None,
- usePTY=False)
- except SystemError:
- self.assert_(self.mockos.exited)
- self.assertEquals(
- self.mockos.actions, [("fork", False), "exec", "exit"])
- # Check that fd have been closed
- self.assertIn(0, self.mockos.closed)
- self.assertIn(1, self.mockos.closed)
- self.assertIn(2, self.mockos.closed)
- # Check content of traceback
- self.assertIn("RuntimeError: Bar", self.mockos.fdio.getvalue())
- else:
- self.fail("Should not be here")
-
-
- def test_mockSetUid(self):
- """
- Try creating a process with setting its uid: it's almost the same path
- as the standard path, but with a C{switchUID} call before the exec.
- """
- cmd = '/mock/ouch'
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- try:
- reactor.spawnProcess(p, cmd, ['ouch'], env=None,
- usePTY=False, uid=8080)
- except SystemError:
- self.assert_(self.mockos.exited)
- self.assertEquals(self.mockos.actions,
- [('setuid', 0), ('setgid', 0), ('fork', False),
- ('switchuid', 8080, 1234), 'exec', 'exit'])
- else:
- self.fail("Should not be here")
-
-
- def test_mockSetUidInParent(self):
- """
- Try creating a process with setting its uid, in the parent path: it
- should switch to root before fork, then restore initial uid/gids.
- """
- self.mockos.child = False
- cmd = '/mock/ouch'
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- reactor.spawnProcess(p, cmd, ['ouch'], env=None,
- usePTY=False, uid=8080)
- self.assertEquals(self.mockos.actions,
- [('setuid', 0), ('setgid', 0), ('fork', False),
- ('setregid', 1235, 1234), ('setreuid', 1237, 1236), 'waitpid'])
-
-
- def test_mockPTYSetUid(self):
- """
- Try creating a PTY process with setting its uid: it's almost the same
- path as the standard path, but with a C{switchUID} call before the
- exec.
- """
- cmd = '/mock/ouch'
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- try:
- reactor.spawnProcess(p, cmd, ['ouch'], env=None,
- usePTY=True, uid=8081)
- except SystemError:
- self.assert_(self.mockos.exited)
- self.assertEquals(self.mockos.actions,
- [('setuid', 0), ('setgid', 0), ('fork', False),
- ('switchuid', 8081, 1234), 'exec', 'exit'])
- else:
- self.fail("Should not be here")
-
-
- def test_mockPTYSetUidInParent(self):
- """
- Try creating a PTY process with setting its uid, in the parent path: it
- should switch to root before fork, then restore initial uid/gids.
- """
- self.mockos.child = False
- cmd = '/mock/ouch'
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- oldPTYProcess = process.PTYProcess
- try:
- process.PTYProcess = DumbPTYProcess
- reactor.spawnProcess(p, cmd, ['ouch'], env=None,
- usePTY=True, uid=8080)
- finally:
- process.PTYProcess = oldPTYProcess
- self.assertEquals(self.mockos.actions,
- [('setuid', 0), ('setgid', 0), ('fork', False),
- ('setregid', 1235, 1234), ('setreuid', 1237, 1236), 'waitpid'])
-
-
- def test_mockWithWaitError(self):
- """
- Test that reapProcess logs errors raised.
- """
- self.mockos.child = False
- cmd = '/mock/ouch'
- self.mockos.waitChild = (0, 0)
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None,
- usePTY=False)
- self.assertEquals(self.mockos.actions, [("fork", False), "waitpid"])
-
- self.mockos.raiseWaitPid = OSError()
- proc.reapProcess()
- errors = self.flushLoggedErrors()
- self.assertEquals(len(errors), 1)
- errors[0].trap(OSError)
-
-
- def test_mockErrorECHILDInReapProcess(self):
- """
- Test that reapProcess doesn't log anything when waitpid raises a
- C{OSError} with errno C{ECHILD}.
- """
- self.mockos.child = False
- cmd = '/mock/ouch'
- self.mockos.waitChild = (0, 0)
-
- d = defer.Deferred()
- p = TrivialProcessProtocol(d)
- proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None,
- usePTY=False)
- self.assertEquals(self.mockos.actions, [("fork", False), "waitpid"])
-
- self.mockos.raiseWaitPid = OSError()
- self.mockos.raiseWaitPid.errno = errno.ECHILD
- # This should not produce any errors
- proc.reapProcess()
-
-
-class PosixProcessTestCase(unittest.TestCase, PosixProcessBase):
- # add three non-pty test cases
-
- def testStderr(self):
- # we assume there is no file named ZZXXX..., both in . and in /tmp
- cmd = self.getCommand('ls')
-
- p = Accumulator()
- d = p.endedDeferred = defer.Deferred()
- reactor.spawnProcess(p, cmd,
- [cmd,
- "ZZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"],
- env=None, path="/tmp",
- usePTY=self.usePTY)
-
- def processEnded(ign):
- self.assertEquals(lsOut, p.errF.getvalue())
- return d.addCallback(processEnded)
-
- def testProcess(self):
- cmd = self.getCommand('gzip')
- s = "there's no place like home!\n" * 3
- p = Accumulator()
- d = p.endedDeferred = defer.Deferred()
- reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp",
- usePTY=self.usePTY)
- p.transport.write(s)
- p.transport.closeStdin()
-
- def processEnded(ign):
- f = p.outF
- f.seek(0, 0)
- gf = gzip.GzipFile(fileobj=f)
- self.assertEquals(gf.read(), s)
- return d.addCallback(processEnded)
-
-
-
-class PosixProcessTestCasePTY(unittest.TestCase, PosixProcessBase):
- """
- Just like PosixProcessTestCase, but use ptys instead of pipes.
- """
- usePTY = True
- # PTYs only offer one input and one output. What still makes sense?
- # testNormalTermination
- # test_abnormalTermination
- # testSignal
- # testProcess, but not without p.transport.closeStdin
- # might be solveable: TODO: add test if so
-
- def testOpeningTTY(self):
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_tty.py")
- p = Accumulator()
- d = p.endedDeferred = defer.Deferred()
- reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None,
- path=None, usePTY=self.usePTY)
- p.transport.write("hello world!\n")
-
- def processEnded(ign):
- self.assertRaises(
- error.ProcessExitedAlready, p.transport.signalProcess, 'HUP')
- self.assertEquals(
- p.outF.getvalue(),
- "hello world!\r\nhello world!\r\n",
- "Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue())
- return d.addCallback(processEnded)
-
-
- def testBadArgs(self):
- pyExe = sys.executable
- pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
- p = Accumulator()
- self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs,
- usePTY=1, childFDs={1:'r'})
-
-
-
-class Win32SignalProtocol(SignalProtocol):
- """
- A win32-specific process protocol that handles C{processEnded}
- differently: processes should exit with exit code 1.
- """
-
- def processEnded(self, reason):
- """
- Callback C{self.deferred} with C{None} if C{reason} is a
- L{error.ProcessTerminated} failure with C{exitCode} set to 1.
- Otherwise, errback with a C{ValueError} describing the problem.
- """
- if not reason.check(error.ProcessTerminated):
- return self.deferred.errback(
- ValueError("wrong termination: %s" % (reason,)))
- v = reason.value
- if v.exitCode != 1:
- return self.deferred.errback(
- ValueError("Wrong exit code: %s" % (reason.exitCode,)))
- self.deferred.callback(None)
-
-
-
-class Win32ProcessTestCase(unittest.TestCase):
- """
- Test process programs that are packaged with twisted.
- """
-
- def testStdinReader(self):
- pyExe = sys.executable
- scriptPath = util.sibpath(__file__, "process_stdinreader.py")
- p = Accumulator()
- d = p.endedDeferred = defer.Deferred()
- reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath], env=None,
- path=None)
- p.transport.write("hello, world")
- p.transport.closeStdin()
-
- def processEnded(ign):
- self.assertEquals(p.errF.getvalue(), "err\nerr\n")
- self.assertEquals(p.outF.getvalue(), "out\nhello, world\nout\n")
- return d.addCallback(processEnded)
-
-
- def testBadArgs(self):
- pyExe = sys.executable
- pyArgs = [pyExe, "-u", "-c", "print 'hello'"]
- p = Accumulator()
- self.assertRaises(ValueError,
- reactor.spawnProcess, p, pyExe, pyArgs, uid=1)
- self.assertRaises(ValueError,
- reactor.spawnProcess, p, pyExe, pyArgs, gid=1)
- self.assertRaises(ValueError,
- reactor.spawnProcess, p, pyExe, pyArgs, usePTY=1)
- self.assertRaises(ValueError,
- reactor.spawnProcess, p, pyExe, pyArgs, childFDs={1:'r'})
-
-
- def _testSignal(self, sig):
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_signal.py")
- d = defer.Deferred()
- p = Win32SignalProtocol(d, sig)
- reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
- return d
-
-
- def test_signalTERM(self):
- """
- Sending the SIGTERM signal terminates a created process, and
- C{processEnded} is called with a L{error.ProcessTerminated} instance
- with the C{exitCode} attribute set to 1.
- """
- return self._testSignal('TERM')
-
-
- def test_signalINT(self):
- """
- Sending the SIGINT signal terminates a created process, and
- C{processEnded} is called with a L{error.ProcessTerminated} instance
- with the C{exitCode} attribute set to 1.
- """
- return self._testSignal('INT')
-
-
- def test_signalKILL(self):
- """
- Sending the SIGKILL signal terminates a created process, and
- C{processEnded} is called with a L{error.ProcessTerminated} instance
- with the C{exitCode} attribute set to 1.
- """
- return self._testSignal('KILL')
-
-
-
-class Dumbwin32procPidTest(unittest.TestCase):
- """
- Simple test for the pid attribute of Process on win32.
- """
-
- def test_pid(self):
- """
- Launch process with mock win32process. The only mock aspect of this
- module is that the pid of the process created will always be 42.
- """
- from twisted.internet import _dumbwin32proc
- from twisted.test import mock_win32process
- self.patch(_dumbwin32proc, "win32process", mock_win32process)
- exe = sys.executable
- scriptPath = util.sibpath(__file__, "process_cmdline.py")
-
- d = defer.Deferred()
- processProto = TrivialProcessProtocol(d)
- comspec = str(os.environ["COMSPEC"])
- cmd = [comspec, "/c", exe, scriptPath]
-
- p = _dumbwin32proc.Process(reactor,
- processProto,
- None,
- cmd,
- {},
- None)
- self.assertEquals(42, p.pid)
- self.assertEquals("<Process pid=42>", repr(p))
-
- def pidCompleteCb(result):
- self.assertEquals(None, p.pid)
- return d.addCallback(pidCompleteCb)
-
-
-
-class UtilTestCase(unittest.TestCase):
- """
- Tests for process-related helper functions (currently only
- L{procutils.which}.
- """
- def setUp(self):
- """
- Create several directories and files, some of which are executable
- and some of which are not. Save the current PATH setting.
- """
- j = os.path.join
-
- base = self.mktemp()
-
- self.foo = j(base, "foo")
- self.baz = j(base, "baz")
- self.foobar = j(self.foo, "bar")
- self.foobaz = j(self.foo, "baz")
- self.bazfoo = j(self.baz, "foo")
- self.bazbar = j(self.baz, "bar")
-
- for d in self.foobar, self.foobaz, self.bazfoo, self.bazbar:
- os.makedirs(d)
-
- for name, mode in [(j(self.foobaz, "executable"), 0700),
- (j(self.foo, "executable"), 0700),
- (j(self.bazfoo, "executable"), 0700),
- (j(self.bazfoo, "executable.bin"), 0700),
- (j(self.bazbar, "executable"), 0)]:
- f = file(name, "w")
- f.close()
- os.chmod(name, mode)
-
- self.oldPath = os.environ.get('PATH', None)
- os.environ['PATH'] = os.pathsep.join((
- self.foobar, self.foobaz, self.bazfoo, self.bazbar))
-
-
- def tearDown(self):
- """
- Restore the saved PATH setting.
- """
- if self.oldPath is None:
- try:
- del os.environ['PATH']
- except KeyError:
- pass
- else:
- os.environ['PATH'] = self.oldPath
-
-
- def test_whichWithoutPATH(self):
- """
- Test that if C{os.environ} does not have a C{'PATH'} key,
- L{procutils.which} returns an empty list.
- """
- del os.environ['PATH']
- self.assertEqual(procutils.which("executable"), [])
-
-
- def testWhich(self):
- j = os.path.join
- paths = procutils.which("executable")
- expectedPaths = [j(self.foobaz, "executable"),
- j(self.bazfoo, "executable")]
- if runtime.platform.isWindows():
- expectedPaths.append(j(self.bazbar, "executable"))
- self.assertEquals(paths, expectedPaths)
-
-
- def testWhichPathExt(self):
- j = os.path.join
- old = os.environ.get('PATHEXT', None)
- os.environ['PATHEXT'] = os.pathsep.join(('.bin', '.exe', '.sh'))
- try:
- paths = procutils.which("executable")
- finally:
- if old is None:
- del os.environ['PATHEXT']
- else:
- os.environ['PATHEXT'] = old
- expectedPaths = [j(self.foobaz, "executable"),
- j(self.bazfoo, "executable"),
- j(self.bazfoo, "executable.bin")]
- if runtime.platform.isWindows():
- expectedPaths.append(j(self.bazbar, "executable"))
- self.assertEquals(paths, expectedPaths)
-
-
-
-class ClosingPipesProcessProtocol(protocol.ProcessProtocol):
- output = ''
- errput = ''
-
- def __init__(self, outOrErr):
- self.deferred = defer.Deferred()
- self.outOrErr = outOrErr
-
- def processEnded(self, reason):
- self.deferred.callback(reason)
-
- def outReceived(self, data):
- self.output += data
-
- def errReceived(self, data):
- self.errput += data
-
-
-class ClosingPipes(unittest.TestCase):
-
- def doit(self, fd):
- p = ClosingPipesProcessProtocol(True)
- p.deferred.addCallbacks(
- callback=lambda _: self.fail("I wanted an errback."),
- errback=self._endProcess, errbackArgs=(p,))
- reactor.spawnProcess(p, sys.executable,
- [sys.executable, '-u', '-c',
- r'raw_input(); import sys, os; os.write(%d, "foo\n"); sys.exit(42)' % fd],
- env=None)
- p.transport.write('go\n')
-
- if fd == 1:
- p.transport.closeStdout()
- elif fd == 2:
- p.transport.closeStderr()
- else:
- raise RuntimeError
-
- # make the buggy case not hang
- p.transport.closeStdin()
- return p.deferred
-
- def _endProcess(self, reason, p):
- self.failIf(reason.check(error.ProcessDone),
- 'Child should fail due to EPIPE.')
- reason.trap(error.ProcessTerminated)
- # child must not get past that write without raising
- self.failIfEqual(reason.value.exitCode, 42,
- 'process reason was %r' % reason)
- self.failUnlessEqual(p.output, '')
- return p.errput
-
- def test_stdout(self):
- """ProcessProtocol.transport.closeStdout actually closes the pipe."""
- d = self.doit(1)
- def _check(errput):
- self.failIfEqual(errput.find('OSError'), -1)
- if runtime.platform.getType() != 'win32':
- self.failIfEqual(errput.find('Broken pipe'), -1)
- d.addCallback(_check)
- return d
-
- def test_stderr(self):
- """ProcessProtocol.transport.closeStderr actually closes the pipe."""
- d = self.doit(2)
- def _check(errput):
- # there should be no stderr open, so nothing for it to
- # write the error to.
- self.failUnlessEqual(errput, '')
- d.addCallback(_check)
- return d
-
-
-class SystemEventOrderRegressionTests(unittest.TestCase):
- """
- Ordering and reentrancy tests for C{reactor.callWhenRunning} and reactor
- shutdown (see #3146 and #3168).
- """
- def setUp(self):
- """
- Clear the SIGCHLD handler, if there is one, to ensure an environment
- like the one which exists prior to a call to L{reactor.run}.
- """
- self.originalHandler = signal.signal(signal.SIGCHLD, signal.SIG_DFL)
- self.processTransports = []
-
-
- def tearDown(self):
- """
- Restore the original SIGCHLD handler and reap processes as long as
- there seem to be any remaining.
- """
- signal.signal(signal.SIGCHLD, signal.SIG_DFL)
- while self.processTransports:
- transport = self.processTransports.pop()
- if transport.pid is not None:
- os.waitpid(transport.pid, 0)
- signal.signal(signal.SIGCHLD, self.originalHandler)
-
-
- def unbuildReactor(self, reactor):
- """
- Clean up any resources which may have been allocated for the given
- reactor by its creation or by a test which used it.
- """
- # Chris says:
- #
- # XXX This explicit calls to clean up the waker should become obsolete
- # when bug #3063 is fixed. -radix, 2008-02-29. Fortunately it should
- # probably cause an error when bug #3063 is fixed, so it should be
- # removed in the same branch that fixes it.
- #
- # -exarkun
- reactor.removeReader(reactor.waker)
- reactor.waker.connectionLost(None)
-
- # Here's an extra thing unrelated to wakers but necessary for
- # cleaning up after the reactors we make. -exarkun
- reactor.disconnectAll()
-
-
- def buildReactor(self):
- """
- Create and return an instance of L{selectreactor.SelectReactor}.
- """
- reactor = selectreactor.SelectReactor()
- self.addCleanup(self.unbuildReactor, reactor)
- return reactor
-
-
- def spawnProcess(self, reactor):
- """
- Call C{reactor.spawnProcess} with some simple arguments. Do this here
- so that code object referenced by the stack frame has a C{co_filename}
- attribute set to this file so that L{TestCase.assertWarns} can be used.
- """
- self.processTransports.append(
- reactor.spawnProcess(
- protocol.ProcessProtocol(), sys.executable,
- [sys.executable, "-c", ""]))
-
-
- def test_spawnProcessTooEarlyWarns(self):
- """
- C{reactor.spawnProcess} emits a warning if it is called before
- C{reactor.run}.
-
- If you can figure out a way to make it safe to run
- C{reactor.spawnProcess} before C{reactor.run}, you may delete the
- warning and this test.
- """
- reactor = self.buildReactor()
- self.assertWarns(
- error.PotentialZombieWarning,
- error.PotentialZombieWarning.MESSAGE, __file__,
- self.spawnProcess, reactor)
-
-
- def test_callWhenRunningSpawnProcessWarningFree(self):
- """
- L{PotentialZombieWarning} is not emitted when the reactor is run after
- C{reactor.callWhenRunning(reactor.spawnProcess, ...)} has been called.
- """
- events = []
- self.patch(warnings, 'warn', lambda *a, **kw: events.append(a))
- reactor = self.buildReactor()
- reactor.callWhenRunning(self.spawnProcess, reactor)
- reactor.callWhenRunning(reactor.stop)
- reactor.run()
- self.assertFalse(events)
-
-
- def test_clientConnectionFailedStopsReactor(self):
- """
- The reactor can be stopped by a client factory's
- C{clientConnectionFailed} method.
-
- This isn't really a process test but it's here for simplicity of
- implementation and it won't be very long lived.
- """
- class Stop(protocol.ClientFactory):
- def clientConnectionFailed(self, connector, reason):
- reactor.stop()
- probe = socket.socket()
- probe.bind(('', 0))
- host, port = probe.getsockname()
- probe.close()
- reactor = self.buildReactor()
- reactor.connectTCP(host, port, Stop())
- reactor.run()
-
-
- def test_shutdownTriggersRun(self):
- """
- C{reactor.run()} does not return until shutdown triggers have all run.
- """
- events = []
- reactor = self.buildReactor()
- reactor.addSystemEventTrigger(
- 'after', 'shutdown', events.append, "done")
- reactor.callWhenRunning(reactor.stop)
- reactor.run()
- self.assertEqual(events, ["done"])
-
-
-
-skipMessage = "wrong platform or reactor doesn't support IReactorProcess"
-if (runtime.platform.getType() != 'posix') or (not interfaces.IReactorProcess(reactor, None)):
- PosixProcessTestCase.skip = skipMessage
- PosixProcessTestCasePTY.skip = skipMessage
- TestTwoProcessesPosix.skip = skipMessage
- FDTest.skip = skipMessage
-else:
- # do this before running the tests: it uses SIGCHLD and stuff internally
- lsOut = popen2.popen3("/bin/ls ZZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")[2].read()
-
-if (runtime.platform.getType() != 'win32') or (not interfaces.IReactorProcess(reactor, None)):
- Win32ProcessTestCase.skip = skipMessage
- TestTwoProcessesNonPosix.skip = skipMessage
- Dumbwin32procPidTest.skip = skipMessage
-
-if not interfaces.IReactorProcess(reactor, None):
- ProcessTestCase.skip = skipMessage
- ClosingPipes.skip = skipMessage
-
-if process is None:
- MockProcessTestCase.skip = skipMessage
- SystemEventOrderRegressionTests.skip = skipMessage
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_postfix.py ('k') | third_party/twisted_8_1/twisted/test/test_protocols.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698