OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 |
| 3 # This Source Code Form is subject to the terms of the Mozilla Public |
| 4 # License, v. 2.0. If a copy of the MPL was not distributed with this file, |
| 5 # You can obtain one at http://mozilla.org/MPL/2.0/. |
| 6 |
| 7 import os |
| 8 import subprocess |
| 9 import sys |
| 10 import unittest |
| 11 from time import sleep |
| 12 |
| 13 from mozprocess import processhandler |
| 14 |
| 15 here = os.path.dirname(os.path.abspath(__file__)) |
| 16 |
| 17 def make_proclaunch(aDir): |
| 18 """ |
| 19 Makes the proclaunch executable. |
| 20 Params: |
| 21 aDir - the directory in which to issue the make commands |
| 22 Returns: |
| 23 the path to the proclaunch executable that is generated |
| 24 """ |
| 25 # Ideally make should take care of this, but since it doesn't - on windows, |
| 26 # anyway, let's just call out both targets explicitly. |
| 27 p = subprocess.call(["make", "-C", "iniparser"], stdout=subprocess.PIPE, std
err=subprocess.PIPE, cwd=aDir) |
| 28 p = subprocess.call(["make"],stdout=subprocess.PIPE, stderr=subprocess.PIPE
,cwd=aDir) |
| 29 if sys.platform == "win32": |
| 30 exepath = os.path.join(aDir, "proclaunch.exe") |
| 31 else: |
| 32 exepath = os.path.join(aDir, "proclaunch") |
| 33 return exepath |
| 34 |
| 35 def check_for_process(processName): |
| 36 """ |
| 37 Use to determine if process of the given name is still running. |
| 38 |
| 39 Returns: |
| 40 detected -- True if process is detected to exist, False otherwise |
| 41 output -- if process exists, stdout of the process, '' otherwise |
| 42 """ |
| 43 output = '' |
| 44 if sys.platform == "win32": |
| 45 # On windows we use tasklist |
| 46 p1 = subprocess.Popen(["tasklist"], stdout=subprocess.PIPE) |
| 47 output = p1.communicate()[0] |
| 48 detected = False |
| 49 for line in output.splitlines(): |
| 50 if processName in line: |
| 51 detected = True |
| 52 break |
| 53 else: |
| 54 p1 = subprocess.Popen(["ps", "-ef"], stdout=subprocess.PIPE) |
| 55 p2 = subprocess.Popen(["grep", processName], stdin=p1.stdout, stdout=sub
process.PIPE) |
| 56 p1.stdout.close() |
| 57 output = p2.communicate()[0] |
| 58 detected = False |
| 59 for line in output.splitlines(): |
| 60 if "grep %s" % processName in line: |
| 61 continue |
| 62 elif processName in line and not 'defunct' in line: |
| 63 detected = True |
| 64 break |
| 65 |
| 66 return detected, output |
| 67 |
| 68 |
| 69 class ProcTest1(unittest.TestCase): |
| 70 |
| 71 def __init__(self, *args, **kwargs): |
| 72 |
| 73 # Ideally, I'd use setUpClass but that only exists in 2.7. |
| 74 # So, we'll do this make step now. |
| 75 self.proclaunch = make_proclaunch(here) |
| 76 unittest.TestCase.__init__(self, *args, **kwargs) |
| 77 |
| 78 def test_process_normal_finish(self): |
| 79 """Process is started, runs to completion while we wait for it""" |
| 80 |
| 81 p = processhandler.ProcessHandler([self.proclaunch, "process_normal_fini
sh.ini"], |
| 82 cwd=here) |
| 83 p.run() |
| 84 p.wait() |
| 85 |
| 86 detected, output = check_for_process(self.proclaunch) |
| 87 self.determine_status(detected, |
| 88 output, |
| 89 p.proc.returncode, |
| 90 p.didTimeout) |
| 91 |
| 92 def test_process_waittimeout(self): |
| 93 """ Process is started, runs but we time out waiting on it |
| 94 to complete |
| 95 """ |
| 96 p = processhandler.ProcessHandler([self.proclaunch, "process_waittimeout
.ini"], |
| 97 cwd=here) |
| 98 p.run(timeout=10) |
| 99 p.wait() |
| 100 |
| 101 detected, output = check_for_process(self.proclaunch) |
| 102 self.determine_status(detected, |
| 103 output, |
| 104 p.proc.returncode, |
| 105 p.didTimeout, |
| 106 False, |
| 107 ['returncode', 'didtimeout']) |
| 108 |
| 109 def test_process_kill(self): |
| 110 """ Process is started, we kill it |
| 111 """ |
| 112 p = processhandler.ProcessHandler([self.proclaunch, "process_normal_fini
sh.ini"], |
| 113 cwd=here) |
| 114 p.run() |
| 115 p.kill() |
| 116 |
| 117 detected, output = check_for_process(self.proclaunch) |
| 118 self.determine_status(detected, |
| 119 output, |
| 120 p.proc.returncode, |
| 121 p.didTimeout) |
| 122 |
| 123 def determine_status(self, |
| 124 detected=False, |
| 125 output='', |
| 126 returncode=0, |
| 127 didtimeout=False, |
| 128 isalive=False, |
| 129 expectedfail=[]): |
| 130 """ |
| 131 Use to determine if the situation has failed. |
| 132 Parameters: |
| 133 detected -- value from check_for_process to determine if the process
is detected |
| 134 output -- string of data from detected process, can be '' |
| 135 returncode -- return code from process, defaults to 0 |
| 136 didtimeout -- True if process timed out, defaults to False |
| 137 isalive -- Use True to indicate we pass if the process exists; howev
er, by default |
| 138 the test will pass if the process does not exist (isalive
== False) |
| 139 expectedfail -- Defaults to [], used to indicate a list of fields th
at are expected to fail |
| 140 """ |
| 141 if 'returncode' in expectedfail: |
| 142 self.assertTrue(returncode, "Detected an unexpected return code of:
%s" % returncode) |
| 143 elif not isalive: |
| 144 self.assertTrue(returncode == 0, "Detected non-zero return code of:
%d" % returncode) |
| 145 |
| 146 if 'didtimeout' in expectedfail: |
| 147 self.assertTrue(didtimeout, "Detected that process didn't time out") |
| 148 else: |
| 149 self.assertTrue(not didtimeout, "Detected that process timed out") |
| 150 |
| 151 if isalive: |
| 152 self.assertTrue(detected, "Detected process is not running, process
output: %s" % output) |
| 153 else: |
| 154 self.assertTrue(not detected, "Detected process is still running, pr
ocess output: %s" % output) |
| 155 |
| 156 if __name__ == '__main__': |
| 157 unittest.main() |
OLD | NEW |