| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 | |
| 5 """ | |
| 6 Test running processes. | |
| 7 """ | |
| 8 | |
| 9 import warnings, os, sys, signal | |
| 10 | |
| 11 # Twisted Imports | |
| 12 from twisted.trial import unittest | |
| 13 | |
| 14 from twisted.internet import reactor, utils, interfaces | |
| 15 | |
| 16 | |
| 17 class UtilsTestCase(unittest.TestCase): | |
| 18 """ | |
| 19 Test running a process using L{utils.getProcessOutput}, | |
| 20 L{utils.getProcessValue}, and L{utils.getProcessOutputAndValue}. | |
| 21 """ | |
| 22 | |
| 23 output = None | |
| 24 value = None | |
| 25 | |
| 26 def makeSourceFile(self, sourceLines): | |
| 27 script = self.mktemp() | |
| 28 scriptFile = file(script, 'wt') | |
| 29 scriptFile.write(os.linesep.join(sourceLines) + os.linesep) | |
| 30 scriptFile.close() | |
| 31 return script | |
| 32 | |
| 33 def testOutput(self): | |
| 34 scriptFile = self.makeSourceFile([ | |
| 35 'print "hello world"' | |
| 36 ]) | |
| 37 d = utils.getProcessOutput(sys.executable, ['-u', scriptFile]) | |
| 38 return d.addCallback(self.assertEquals, "hello world\n") | |
| 39 | |
| 40 def testOutputWithErrorIgnored(self): | |
| 41 # make sure stderr raises an error normally | |
| 42 exe = sys.executable | |
| 43 scriptFile = self.makeSourceFile([ | |
| 44 'import sys', | |
| 45 'sys.stderr.write("hello world\\n")' | |
| 46 ]) | |
| 47 | |
| 48 d = utils.getProcessOutput(exe, ['-u', scriptFile], errortoo=0) | |
| 49 return self.assertFailure(d, IOError) | |
| 50 | |
| 51 def testOutputWithErrorCollected(self): | |
| 52 # make sure stderr raises an error normally | |
| 53 exe = sys.executable | |
| 54 scriptFile = self.makeSourceFile([ | |
| 55 'import sys', | |
| 56 'sys.stderr.write("hello world\\n")' | |
| 57 ]) | |
| 58 | |
| 59 d = utils.getProcessOutput(exe, ['-u', scriptFile], errortoo=1) | |
| 60 return d.addCallback(self.assertEquals, "hello world" + os.linesep) | |
| 61 | |
| 62 def testValue(self): | |
| 63 exe = sys.executable | |
| 64 scriptFile = self.makeSourceFile([ | |
| 65 "import sys", | |
| 66 "sys.exit(1)" | |
| 67 ]) | |
| 68 | |
| 69 d = utils.getProcessValue(exe, ['-u', scriptFile]) | |
| 70 return d.addCallback(self.assertEquals, 1) | |
| 71 | |
| 72 def testOutputAndValue(self): | |
| 73 exe = sys.executable | |
| 74 scriptFile = self.makeSourceFile([ | |
| 75 "import sys", | |
| 76 "sys.stdout.write('hello world!\\n')", | |
| 77 "sys.stderr.write('goodbye world!\\n')", | |
| 78 "sys.exit(1)" | |
| 79 ]) | |
| 80 | |
| 81 def gotOutputAndValue((out, err, code)): | |
| 82 self.assertEquals(out, "hello world!" + os.linesep) | |
| 83 self.assertEquals(err, "goodbye world!" + os.linesep) | |
| 84 self.assertEquals(code, 1) | |
| 85 d = utils.getProcessOutputAndValue(exe, [scriptFile]) | |
| 86 return d.addCallback(gotOutputAndValue) | |
| 87 | |
| 88 def testOutputSignal(self): | |
| 89 # Use SIGKILL here because it's guaranteed to be delivered. Using | |
| 90 # SIGHUP might not work in, e.g., a buildbot slave run under the | |
| 91 # 'nohup' command. | |
| 92 exe = sys.executable | |
| 93 scriptFile = self.makeSourceFile([ | |
| 94 "import sys, os, signal", | |
| 95 "sys.stdout.write('stdout bytes\\n')", | |
| 96 "sys.stderr.write('stderr bytes\\n')", | |
| 97 "sys.stdout.flush()", | |
| 98 "sys.stderr.flush()", | |
| 99 "os.kill(os.getpid(), signal.SIGKILL)" | |
| 100 ]) | |
| 101 | |
| 102 def gotOutputAndValue(err): | |
| 103 (out, err, sig) = err.value # XXX Sigh wtf | |
| 104 self.assertEquals(out, "stdout bytes" + os.linesep) | |
| 105 self.assertEquals(err, "stderr bytes" + os.linesep) | |
| 106 self.assertEquals(sig, signal.SIGKILL) | |
| 107 | |
| 108 d = utils.getProcessOutputAndValue(exe, ['-u', scriptFile]) | |
| 109 return d.addErrback(gotOutputAndValue) | |
| 110 | |
| 111 | |
| 112 | |
| 113 class WarningSuppression(unittest.TestCase): | |
| 114 def setUp(self): | |
| 115 self.warnings = [] | |
| 116 self.originalshow = warnings.showwarning | |
| 117 warnings.showwarning = self.showwarning | |
| 118 | |
| 119 | |
| 120 def tearDown(self): | |
| 121 warnings.showwarning = self.originalshow | |
| 122 | |
| 123 | |
| 124 def showwarning(self, *a, **kw): | |
| 125 self.warnings.append((a, kw)) | |
| 126 | |
| 127 | |
| 128 def testSuppressWarnings(self): | |
| 129 def f(msg): | |
| 130 warnings.warn(msg) | |
| 131 g = utils.suppressWarnings(f, (('ignore',), dict(message="This is messag
e"))) | |
| 132 | |
| 133 # Start off with a sanity check - calling the original function | |
| 134 # should emit the warning. | |
| 135 f("Sanity check message") | |
| 136 self.assertEquals(len(self.warnings), 1) | |
| 137 | |
| 138 # Now that that's out of the way, call the wrapped function, and | |
| 139 # make sure no new warnings show up. | |
| 140 g("This is message") | |
| 141 self.assertEquals(len(self.warnings), 1) | |
| 142 | |
| 143 # Finally, emit another warning which should not be ignored, and | |
| 144 # make sure it is not. | |
| 145 g("Unignored message") | |
| 146 self.assertEquals(len(self.warnings), 2) | |
| 147 | |
| 148 | |
| 149 | |
| 150 if interfaces.IReactorProcess(reactor, None) is None: | |
| 151 UtilsTestCase.skip = "reactor doesn't implement IReactorProcess" | |
| OLD | NEW |