OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # | 2 # |
3 # $Id: test_psutil.py 806 2010-11-12 23:09:35Z g.rodola $ | 3 # $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $ |
4 # | 4 # |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. |
| 6 # Use of this source code is governed by a BSD-style license that can be |
| 7 # found in the LICENSE file. |
5 | 8 |
6 """psutil test suite. | 9 """ |
7 Note: this is targeted for python 2.x. | 10 psutil test suite. |
8 To run it under python 3.x you need to use 2to3 tool first: | |
9 | 11 |
10 $ 2to3 -w test/*.py | 12 Note: this is targeted for both python 2.x and 3.x so there's no need |
| 13 to use 2to3 tool first. |
11 """ | 14 """ |
12 | 15 |
13 import unittest | 16 import unittest |
14 import os | 17 import os |
15 import sys | 18 import sys |
16 import subprocess | 19 import subprocess |
17 import time | 20 import time |
18 import signal | 21 import signal |
19 import types | 22 import types |
20 import traceback | 23 import traceback |
21 import socket | 24 import socket |
22 import warnings | 25 import warnings |
23 import atexit | 26 import atexit |
24 import errno | 27 import errno |
25 import threading | 28 import threading |
| 29 import tempfile |
| 30 import collections |
26 | 31 |
27 import psutil | 32 import psutil |
28 | 33 |
29 | 34 |
30 PYTHON = os.path.realpath(sys.executable) | 35 PYTHON = os.path.realpath(sys.executable) |
31 DEVNULL = open(os.devnull, 'r+') | 36 DEVNULL = open(os.devnull, 'r+') |
32 | 37 TESTFN = os.path.join(os.getcwd(), "$testfile") |
| 38 PY3 = sys.version_info >= (3,) |
33 POSIX = os.name == 'posix' | 39 POSIX = os.name == 'posix' |
34 LINUX = sys.platform.lower().startswith("linux") | 40 LINUX = sys.platform.lower().startswith("linux") |
35 WINDOWS = sys.platform.lower().startswith("win32") | 41 WINDOWS = sys.platform.lower().startswith("win32") |
36 OSX = sys.platform.lower().startswith("darwin") | 42 OSX = sys.platform.lower().startswith("darwin") |
37 BSD = sys.platform.lower().startswith("freebsd") | 43 BSD = sys.platform.lower().startswith("freebsd") |
38 | 44 |
39 try: | 45 try: |
40 psutil.Process(os.getpid()).get_connections() | 46 psutil.Process(os.getpid()).get_connections() |
41 except NotImplementedError, err: | 47 except NotImplementedError: |
| 48 err = sys.exc_info()[1] |
42 SUPPORT_CONNECTIONS = False | 49 SUPPORT_CONNECTIONS = False |
43 atexit.register(warnings.warn, "get_connections() not supported on this plat
form", | 50 atexit.register(warnings.warn, "get_connections() not supported on this plat
form", |
44 RuntimeWarning) | 51 RuntimeWarning) |
45 else: | 52 else: |
46 SUPPORT_CONNECTIONS = True | 53 SUPPORT_CONNECTIONS = True |
47 | 54 |
| 55 if PY3: |
| 56 long = int |
| 57 |
| 58 def callable(attr): |
| 59 return isinstance(attr, collections.Callable) |
| 60 |
48 | 61 |
49 _subprocesses_started = set() | 62 _subprocesses_started = set() |
50 | 63 |
51 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None): | 64 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None): |
52 """Return a subprocess.Popen object to use in tests. | 65 """Return a subprocess.Popen object to use in tests. |
53 By default stdout and stderr are redirected to /dev/null and the | 66 By default stdout and stderr are redirected to /dev/null and the |
54 python interpreter is used as test process. | 67 python interpreter is used as test process. |
55 """ | 68 """ |
56 if cmd is None: | 69 if cmd is None: |
57 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] | 70 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] |
58 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin) | 71 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin) |
59 _subprocesses_started.add(sproc.pid) | 72 _subprocesses_started.add(sproc.pid) |
60 return sproc | 73 return sproc |
61 | 74 |
| 75 def sh(cmdline): |
| 76 """run cmd in a subprocess and return its output. |
| 77 raises RuntimeError on error. |
| 78 """ |
| 79 p = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE, |
| 80 stderr=subprocess.PIPE) |
| 81 stdout, stderr = p.communicate() |
| 82 if p.returncode != 0: |
| 83 raise RuntimeError(stderr) |
| 84 if stderr: |
| 85 warnings.warn(stderr, RuntimeWarning) |
| 86 if sys.version_info >= (3,): |
| 87 stdout = str(stdout, sys.stdout.encoding) |
| 88 return stdout.strip() |
| 89 |
62 def wait_for_pid(pid, timeout=1): | 90 def wait_for_pid(pid, timeout=1): |
63 """Wait for pid to show up in the process list then return. | 91 """Wait for pid to show up in the process list then return. |
64 Used in the test suite to give time the sub process to initialize. | 92 Used in the test suite to give time the sub process to initialize. |
65 """ | 93 """ |
66 raise_at = time.time() + timeout | 94 raise_at = time.time() + timeout |
67 while 1: | 95 while 1: |
68 if pid in psutil.get_pid_list(): | 96 if pid in psutil.get_pid_list(): |
69 # give it one more iteration to allow full initialization | 97 # give it one more iteration to allow full initialization |
70 time.sleep(0.01) | 98 time.sleep(0.01) |
71 return | 99 return |
72 time.sleep(0.0001) | 100 time.sleep(0.0001) |
73 if time.time() >= raise_at: | 101 if time.time() >= raise_at: |
74 raise RuntimeError("Timed out") | 102 raise RuntimeError("Timed out") |
75 | 103 |
76 def kill(pid): | |
77 """Kill a process given its PID.""" | |
78 if hasattr(os, 'kill'): | |
79 os.kill(pid, signal.SIGKILL) | |
80 else: | |
81 psutil.Process(pid).kill() | |
82 | |
83 def reap_children(search_all=False): | 104 def reap_children(search_all=False): |
84 """Kill any subprocess started by this test suite and ensure that | 105 """Kill any subprocess started by this test suite and ensure that |
85 no zombies stick around to hog resources and create problems when | 106 no zombies stick around to hog resources and create problems when |
86 looking for refleaks. | 107 looking for refleaks. |
87 """ | 108 """ |
88 if POSIX: | |
89 def waitpid(process): | |
90 # on posix we are free to wait for any pending process by | |
91 # passing -1 to os.waitpid() | |
92 while True: | |
93 try: | |
94 any_process = -1 | |
95 pid, status = os.waitpid(any_process, os.WNOHANG) | |
96 if pid == 0 and not process.is_running(): | |
97 break | |
98 except OSError: | |
99 if not process.is_running(): | |
100 break | |
101 else: | |
102 def waitpid(process): | |
103 # on non-posix systems we just wait for the given process | |
104 # to go away | |
105 while process.is_running(): | |
106 time.sleep(0.01) | |
107 | |
108 if search_all: | 109 if search_all: |
109 this_process = psutil.Process(os.getpid()) | 110 this_process = psutil.Process(os.getpid()) |
110 pids = [x.pid for x in this_process.get_children()] | 111 pids = [x.pid for x in this_process.get_children()] |
111 else: | 112 else: |
112 pids =_subprocesses_started | 113 pids =_subprocesses_started |
113 while pids: | 114 while pids: |
114 pid = pids.pop() | 115 pid = pids.pop() |
115 try: | 116 try: |
116 child = psutil.Process(pid) | 117 child = psutil.Process(pid) |
117 child.kill() | 118 child.kill() |
118 except psutil.NoSuchProcess: | 119 except psutil.NoSuchProcess: |
119 pass | 120 pass |
120 else: | 121 else: |
121 waitpid(child) | 122 child.wait() |
| 123 |
122 | 124 |
123 # we want to search through all processes before exiting | 125 # we want to search through all processes before exiting |
124 atexit.register(reap_children, search_all=True) | 126 atexit.register(reap_children, search_all=True) |
125 | 127 |
126 def skipIf(condition, reason="", warn=False): | 128 def skipIf(condition, reason="", warn=False): |
127 """Decorator which skip a test under if condition is satisfied. | 129 """Decorator which skip a test under if condition is satisfied. |
128 This is a substitute of unittest.skipIf which is available | 130 This is a substitute of unittest.skipIf which is available |
129 only in python 2.7 and 3.2. | 131 only in python 2.7 and 3.2. |
130 If 'reason' argument is provided this will be printed during | 132 If 'reason' argument is provided this will be printed during |
131 tests execution. | 133 tests execution. |
(...skipping 16 matching lines...) Expand all Loading... |
148 return fun(self, *args, **kwargs) | 150 return fun(self, *args, **kwargs) |
149 return inner | 151 return inner |
150 return outer | 152 return outer |
151 | 153 |
152 def skipUnless(condition, reason="", warn=False): | 154 def skipUnless(condition, reason="", warn=False): |
153 """Contrary of skipIf.""" | 155 """Contrary of skipIf.""" |
154 if not condition: | 156 if not condition: |
155 return skipIf(True, reason, warn) | 157 return skipIf(True, reason, warn) |
156 return skipIf(False) | 158 return skipIf(False) |
157 | 159 |
| 160 def ignore_access_denied(fun): |
| 161 """Decorator to Ignore AccessDenied exceptions.""" |
| 162 def outer(fun, *args, **kwargs): |
| 163 def inner(self): |
| 164 try: |
| 165 return fun(self, *args, **kwargs) |
| 166 except psutil.AccessDenied: |
| 167 pass |
| 168 return inner |
| 169 return outer |
| 170 |
| 171 def supports_ipv6(): |
| 172 """Return True if IPv6 is supported on this platform.""" |
| 173 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): |
| 174 return False |
| 175 sock = None |
| 176 try: |
| 177 try: |
| 178 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| 179 sock.bind(("::1", 0)) |
| 180 except (socket.error, socket.gaierror): |
| 181 return False |
| 182 else: |
| 183 return True |
| 184 finally: |
| 185 if sock is not None: |
| 186 sock.close() |
| 187 |
| 188 |
| 189 class ThreadTask(threading.Thread): |
| 190 """A thread object used for running process thread tests.""" |
| 191 |
| 192 def __init__(self): |
| 193 threading.Thread.__init__(self) |
| 194 self._running = False |
| 195 self._interval = None |
| 196 self._flag = threading.Event() |
| 197 |
| 198 def __repr__(self): |
| 199 name = self.__class__.__name__ |
| 200 return '<%s running=%s at %#x>' % (name, self._running, id(self)) |
| 201 |
| 202 def start(self, interval=0.001): |
| 203 """Start thread and keep it running until an explicit |
| 204 stop() request. Polls for shutdown every 'timeout' seconds. |
| 205 """ |
| 206 if self._running: |
| 207 raise ValueError("already started") |
| 208 self._interval = interval |
| 209 threading.Thread.start(self) |
| 210 self._flag.wait() |
| 211 |
| 212 def run(self): |
| 213 self._running = True |
| 214 self._flag.set() |
| 215 while self._running: |
| 216 time.sleep(self._interval) |
| 217 |
| 218 def stop(self): |
| 219 """Stop thread execution and and waits until it is stopped.""" |
| 220 if not self._running: |
| 221 raise ValueError("already stopped") |
| 222 self._running = False |
| 223 self.join() |
| 224 |
158 | 225 |
159 class TestCase(unittest.TestCase): | 226 class TestCase(unittest.TestCase): |
160 | 227 |
161 def tearDown(self): | 228 def tearDown(self): |
162 reap_children() | 229 reap_children() |
163 | 230 |
| 231 # ============================ |
| 232 # tests for system-related API |
| 233 # ============================ |
| 234 |
164 def test_get_process_list(self): | 235 def test_get_process_list(self): |
165 pids = [x.pid for x in psutil.get_process_list()] | 236 pids = [x.pid for x in psutil.get_process_list()] |
166 self.assertTrue(os.getpid() in pids) | 237 self.assertTrue(os.getpid() in pids) |
167 self.assertTrue(0 in pids) | |
168 | 238 |
169 def test_process_iter(self): | 239 def test_process_iter(self): |
170 pids = [x.pid for x in psutil.process_iter()] | 240 pids = [x.pid for x in psutil.process_iter()] |
171 self.assertTrue(os.getpid() in pids) | 241 self.assertTrue(os.getpid() in pids) |
172 self.assertTrue(0 in pids) | |
173 | |
174 def test_kill(self): | |
175 sproc = get_test_subprocess() | |
176 test_pid = sproc.pid | |
177 wait_for_pid(test_pid) | |
178 p = psutil.Process(test_pid) | |
179 name = p.name | |
180 p.kill() | |
181 sproc.wait() | |
182 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
183 | |
184 def test_terminate(self): | |
185 sproc = get_test_subprocess() | |
186 test_pid = sproc.pid | |
187 wait_for_pid(test_pid) | |
188 p = psutil.Process(test_pid) | |
189 name = p.name | |
190 p.terminate() | |
191 sproc.wait() | |
192 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
193 | |
194 def test_send_signal(self): | |
195 if POSIX: | |
196 sig = signal.SIGKILL | |
197 else: | |
198 sig = signal.SIGTERM | |
199 sproc = get_test_subprocess() | |
200 test_pid = sproc.pid | |
201 p = psutil.Process(test_pid) | |
202 name = p.name | |
203 p.send_signal(sig) | |
204 sproc.wait() | |
205 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
206 | 242 |
207 def test_TOTAL_PHYMEM(self): | 243 def test_TOTAL_PHYMEM(self): |
208 x = psutil.TOTAL_PHYMEM | 244 x = psutil.TOTAL_PHYMEM |
209 self.assertTrue(isinstance(x, (int, long))) | 245 self.assertTrue(isinstance(x, (int, long))) |
210 self.assertTrue(x > 0) | 246 self.assertTrue(x > 0) |
211 | 247 |
212 def test_used_phymem(self): | 248 def test_BOOT_TIME(self): |
213 x = psutil.used_phymem() | 249 x = psutil.BOOT_TIME |
214 self.assertTrue(isinstance(x, (int, long))) | 250 self.assertTrue(isinstance(x, float)) |
215 self.assertTrue(x > 0) | 251 self.assertTrue(x > 0) |
216 | 252 |
217 def test_avail_phymem(self): | 253 def test_deprecated_memory_functions(self): |
218 x = psutil.avail_phymem() | 254 warnings.filterwarnings("error") |
219 self.assertTrue(isinstance(x, (int, long))) | 255 try: |
220 self.assertTrue(x > 0) | 256 self.assertRaises(DeprecationWarning, psutil.used_phymem) |
| 257 self.assertRaises(DeprecationWarning, psutil.avail_phymem) |
| 258 self.assertRaises(DeprecationWarning, psutil.total_virtmem) |
| 259 self.assertRaises(DeprecationWarning, psutil.used_virtmem) |
| 260 self.assertRaises(DeprecationWarning, psutil.avail_virtmem) |
| 261 finally: |
| 262 warnings.resetwarnings() |
221 | 263 |
222 def test_total_virtmem(self): | 264 def test_phymem_usage(self): |
223 x = psutil.total_virtmem() | 265 mem = psutil.phymem_usage() |
224 self.assertTrue(isinstance(x, (int, long))) | 266 self.assertTrue(mem.total > 0) |
225 self.assertTrue(x >= 0) | 267 self.assertTrue(mem.used > 0) |
| 268 self.assertTrue(mem.free > 0) |
| 269 self.assertTrue(0 <= mem.percent <= 100) |
226 | 270 |
227 def test_used_virtmem(self): | 271 def test_virtmem_usage(self): |
228 x = psutil.used_virtmem() | 272 mem = psutil.virtmem_usage() |
229 self.assertTrue(isinstance(x, (int, long))) | 273 self.assertTrue(mem.total > 0) |
230 self.assertTrue(x >= 0) | 274 self.assertTrue(mem.used >= 0) |
231 | 275 self.assertTrue(mem.free > 0) |
232 def test_avail_virtmem(self): | 276 self.assertTrue(0 <= mem.percent <= 100) |
233 x = psutil.avail_virtmem() | |
234 self.assertTrue(isinstance(x, (int, long))) | |
235 self.assertTrue(x >= 0) | |
236 | |
237 @skipUnless(LINUX) | |
238 def test_cached_phymem(self): | |
239 x = psutil.cached_phymem() | |
240 self.assertTrue(isinstance(x, (int, long))) | |
241 self.assertTrue(x >= 0) | |
242 | 277 |
243 @skipUnless(LINUX) | 278 @skipUnless(LINUX) |
244 def test_phymem_buffers(self): | 279 def test_phymem_buffers(self): |
245 x = psutil.phymem_buffers() | 280 x = psutil.phymem_buffers() |
246 self.assertTrue(isinstance(x, (int, long))) | 281 self.assertTrue(isinstance(x, (int, long))) |
247 self.assertTrue(x >= 0) | 282 self.assertTrue(x >= 0) |
248 | 283 |
249 def test_system_cpu_times(self): | 284 def test_pid_exists(self): |
| 285 sproc = get_test_subprocess() |
| 286 wait_for_pid(sproc.pid) |
| 287 self.assertTrue(psutil.pid_exists(sproc.pid)) |
| 288 p = psutil.Process(sproc.pid) |
| 289 p.kill() |
| 290 p.wait() |
| 291 self.assertFalse(psutil.pid_exists(sproc.pid)) |
| 292 self.assertFalse(psutil.pid_exists(-1)) |
| 293 |
| 294 def test_pid_exists_2(self): |
| 295 reap_children() |
| 296 pids = psutil.get_pid_list() |
| 297 for pid in pids: |
| 298 try: |
| 299 self.assertTrue(psutil.pid_exists(pid)) |
| 300 except AssertionError: |
| 301 # in case the process disappeared in meantime fail only |
| 302 # if it is no longer in get_pid_list() |
| 303 time.sleep(.1) |
| 304 if pid in psutil.get_pid_list(): |
| 305 self.fail(pid) |
| 306 pids = range(max(pids) + 5000, max(pids) + 6000) |
| 307 for pid in pids: |
| 308 self.assertFalse(psutil.pid_exists(pid)) |
| 309 |
| 310 def test_get_pid_list(self): |
| 311 plist = [x.pid for x in psutil.get_process_list()] |
| 312 pidlist = psutil.get_pid_list() |
| 313 self.assertEqual(plist.sort(), pidlist.sort()) |
| 314 # make sure every pid is unique |
| 315 self.assertEqual(len(pidlist), len(set(pidlist))) |
| 316 |
| 317 def test_test(self): |
| 318 # test for psutil.test() function |
| 319 stdout = sys.stdout |
| 320 sys.stdout = DEVNULL |
| 321 try: |
| 322 psutil.test() |
| 323 finally: |
| 324 sys.stdout = stdout |
| 325 |
| 326 def test_sys_cpu_times(self): |
250 total = 0 | 327 total = 0 |
251 times = psutil.cpu_times() | 328 times = psutil.cpu_times() |
252 self.assertTrue(isinstance(times, psutil.CPUTimes)) | |
253 sum(times) | 329 sum(times) |
254 for cp_time in times: | 330 for cp_time in times: |
255 self.assertTrue(isinstance(cp_time, float)) | 331 self.assertTrue(isinstance(cp_time, float)) |
256 self.assertTrue(cp_time >= 0.0) | 332 self.assertTrue(cp_time >= 0.0) |
257 total += cp_time | 333 total += cp_time |
258 # test CPUTimes's __iter__ and __str__ implementation | |
259 self.assertEqual(total, sum(times)) | 334 self.assertEqual(total, sum(times)) |
260 str(times) | 335 str(times) |
261 | 336 |
262 def test_system_cpu_times2(self): | 337 def test_sys_cpu_times2(self): |
263 t1 = sum(psutil.cpu_times()) | 338 t1 = sum(psutil.cpu_times()) |
264 time.sleep(0.1) | 339 time.sleep(0.1) |
265 t2 = sum(psutil.cpu_times()) | 340 t2 = sum(psutil.cpu_times()) |
266 difference = t2 - t1 | 341 difference = t2 - t1 |
267 if not difference >= 0.05: | 342 if not difference >= 0.05: |
268 self.fail("difference %s" % difference) | 343 self.fail("difference %s" % difference) |
269 | 344 |
270 def test_system_cpu_percent(self): | 345 def test_sys_per_cpu_times(self): |
| 346 for times in psutil.cpu_times(percpu=True): |
| 347 total = 0 |
| 348 sum(times) |
| 349 for cp_time in times: |
| 350 self.assertTrue(isinstance(cp_time, float)) |
| 351 self.assertTrue(cp_time >= 0.0) |
| 352 total += cp_time |
| 353 self.assertEqual(total, sum(times)) |
| 354 str(times) |
| 355 |
| 356 def test_sys_per_cpu_times2(self): |
| 357 tot1 = psutil.cpu_times(percpu=True) |
| 358 stop_at = time.time() + 0.1 |
| 359 while 1: |
| 360 if time.time() >= stop_at: |
| 361 break |
| 362 tot2 = psutil.cpu_times(percpu=True) |
| 363 for t1, t2 in zip(tot1, tot2): |
| 364 t1, t2 = sum(t1), sum(t2) |
| 365 difference = t2 - t1 |
| 366 if difference >= 0.05: |
| 367 return |
| 368 self.fail() |
| 369 |
| 370 def test_sys_cpu_percent(self): |
271 psutil.cpu_percent(interval=0.001) | 371 psutil.cpu_percent(interval=0.001) |
272 psutil.cpu_percent(interval=0.001) | 372 psutil.cpu_percent(interval=0.001) |
273 for x in xrange(1000): | 373 for x in range(1000): |
274 percent = psutil.cpu_percent(interval=None) | 374 percent = psutil.cpu_percent(interval=None) |
275 self.assertTrue(isinstance(percent, float)) | 375 self.assertTrue(isinstance(percent, float)) |
276 self.assertTrue(percent >= 0.0) | 376 self.assertTrue(percent >= 0.0) |
277 self.assertTrue(percent <= 100.0) | 377 self.assertTrue(percent <= 100.0) |
278 | 378 |
279 def test_process_cpu_percent(self): | 379 def test_sys_per_cpu_percent(self): |
| 380 psutil.cpu_percent(interval=0.001, percpu=True) |
| 381 psutil.cpu_percent(interval=0.001, percpu=True) |
| 382 for x in range(1000): |
| 383 percents = psutil.cpu_percent(interval=None, percpu=True) |
| 384 for percent in percents: |
| 385 self.assertTrue(isinstance(percent, float)) |
| 386 self.assertTrue(percent >= 0.0) |
| 387 self.assertTrue(percent <= 100.0) |
| 388 |
| 389 def test_sys_cpu_percent_compare(self): |
| 390 psutil.cpu_percent(interval=0) |
| 391 psutil.cpu_percent(interval=0, percpu=True) |
| 392 time.sleep(.1) |
| 393 t1 = psutil.cpu_percent(interval=0) |
| 394 t2 = psutil.cpu_percent(interval=0, percpu=True) |
| 395 # calculate total average |
| 396 t2 = sum(t2) / len(t2) |
| 397 if abs(t1 - t2) > 5: |
| 398 self.assertEqual(t1, t2) |
| 399 |
| 400 def test_disk_usage(self): |
| 401 usage = psutil.disk_usage(os.getcwd()) |
| 402 self.assertTrue(usage.total > 0) |
| 403 self.assertTrue(usage.used > 0) |
| 404 self.assertTrue(usage.free > 0) |
| 405 self.assertTrue(usage.total > usage.used) |
| 406 self.assertTrue(usage.total > usage.free) |
| 407 self.assertTrue(0 <= usage.percent <= 100) |
| 408 |
| 409 # if path does not exist OSError ENOENT is expected across |
| 410 # all platforms |
| 411 fname = tempfile.mktemp() |
| 412 try: |
| 413 psutil.disk_usage(fname) |
| 414 except OSError: |
| 415 err = sys.exc_info()[1] |
| 416 if err.args[0] != errno.ENOENT: |
| 417 raise |
| 418 else: |
| 419 self.fail("OSError not raised") |
| 420 |
| 421 def test_disk_partitions(self): |
| 422 for disk in psutil.disk_partitions(all=False): |
| 423 self.assertTrue(os.path.exists(disk.device)) |
| 424 self.assertTrue(os.path.isdir(disk.mountpoint)) |
| 425 self.assertTrue(disk.fstype) |
| 426 for disk in psutil.disk_partitions(all=True): |
| 427 if not WINDOWS: |
| 428 self.assertTrue(os.path.isdir(disk.mountpoint)) |
| 429 self.assertTrue(disk.fstype) |
| 430 |
| 431 def find_mount_point(path): |
| 432 path = os.path.abspath(path) |
| 433 while not os.path.ismount(path): |
| 434 path = os.path.dirname(path) |
| 435 return path |
| 436 |
| 437 mount = find_mount_point(__file__) |
| 438 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] |
| 439 self.assertTrue(mount in mounts) |
| 440 psutil.disk_usage(mount) |
| 441 |
| 442 # XXX |
| 443 @skipUnless(hasattr(psutil, "network_io_counters")) |
| 444 def test_anetwork_io_counters(self): |
| 445 def check_ntuple(nt): |
| 446 self.assertEqual(nt[0], nt.bytes_sent) |
| 447 self.assertEqual(nt[1], nt.bytes_recv) |
| 448 self.assertEqual(nt[2], nt.packets_sent) |
| 449 self.assertEqual(nt[3], nt.packets_recv) |
| 450 self.assertTrue(nt.bytes_sent >= 0) |
| 451 self.assertTrue(nt.bytes_recv >= 0) |
| 452 self.assertTrue(nt.packets_sent >= 0) |
| 453 self.assertTrue(nt.packets_recv >= 0) |
| 454 |
| 455 ret = psutil.network_io_counters(pernic=False) |
| 456 check_ntuple(ret) |
| 457 ret = psutil.network_io_counters(pernic=True) |
| 458 for name, ntuple in ret.iteritems(): |
| 459 self.assertTrue(name) |
| 460 check_ntuple(ntuple) |
| 461 # XXX |
| 462 @skipUnless(hasattr(psutil, "disk_io_counters")) |
| 463 def test_disk_io_counters(self): |
| 464 def check_ntuple(nt): |
| 465 self.assertEqual(nt[0], nt.read_count) |
| 466 self.assertEqual(nt[1], nt.write_count) |
| 467 self.assertEqual(nt[2], nt.read_bytes) |
| 468 self.assertEqual(nt[3], nt.write_bytes) |
| 469 self.assertEqual(nt[4], nt.read_time) |
| 470 self.assertEqual(nt[5], nt.write_time) |
| 471 self.assertTrue(nt.read_count >= 0) |
| 472 self.assertTrue(nt.write_count >= 0) |
| 473 self.assertTrue(nt.read_bytes >= 0) |
| 474 self.assertTrue(nt.write_bytes >= 0) |
| 475 self.assertTrue(nt.read_time >= 0) |
| 476 self.assertTrue(nt.write_time >= 0) |
| 477 |
| 478 ret = psutil.disk_io_counters(perdisk=False) |
| 479 check_ntuple(ret) |
| 480 ret = psutil.disk_io_counters(perdisk=True) |
| 481 for name, ntuple in ret.iteritems(): |
| 482 self.assertTrue(name) |
| 483 check_ntuple(ntuple) |
| 484 |
| 485 # ==================== |
| 486 # Process object tests |
| 487 # ==================== |
| 488 |
| 489 def test_kill(self): |
| 490 sproc = get_test_subprocess() |
| 491 test_pid = sproc.pid |
| 492 wait_for_pid(test_pid) |
| 493 p = psutil.Process(test_pid) |
| 494 name = p.name |
| 495 p.kill() |
| 496 p.wait() |
| 497 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| 498 |
| 499 def test_terminate(self): |
| 500 sproc = get_test_subprocess() |
| 501 test_pid = sproc.pid |
| 502 wait_for_pid(test_pid) |
| 503 p = psutil.Process(test_pid) |
| 504 name = p.name |
| 505 p.terminate() |
| 506 p.wait() |
| 507 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| 508 |
| 509 def test_send_signal(self): |
| 510 if POSIX: |
| 511 sig = signal.SIGKILL |
| 512 else: |
| 513 sig = signal.SIGTERM |
| 514 sproc = get_test_subprocess() |
| 515 test_pid = sproc.pid |
| 516 p = psutil.Process(test_pid) |
| 517 name = p.name |
| 518 p.send_signal(sig) |
| 519 p.wait() |
| 520 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| 521 |
| 522 def test_wait(self): |
| 523 # check exit code signal |
| 524 sproc = get_test_subprocess() |
| 525 p = psutil.Process(sproc.pid) |
| 526 p.kill() |
| 527 code = p.wait() |
| 528 if os.name == 'posix': |
| 529 self.assertEqual(code, signal.SIGKILL) |
| 530 else: |
| 531 self.assertEqual(code, 0) |
| 532 self.assertFalse(p.is_running()) |
| 533 |
| 534 sproc = get_test_subprocess() |
| 535 p = psutil.Process(sproc.pid) |
| 536 p.terminate() |
| 537 code = p.wait() |
| 538 if os.name == 'posix': |
| 539 self.assertEqual(code, signal.SIGTERM) |
| 540 else: |
| 541 self.assertEqual(code, 0) |
| 542 self.assertFalse(p.is_running()) |
| 543 |
| 544 # check sys.exit() code |
| 545 code = "import time, sys; time.sleep(0.01); sys.exit(5);" |
| 546 sproc = get_test_subprocess([PYTHON, "-c", code]) |
| 547 p = psutil.Process(sproc.pid) |
| 548 self.assertEqual(p.wait(), 5) |
| 549 self.assertFalse(p.is_running()) |
| 550 |
| 551 # Test wait() issued twice. |
| 552 # It is not supposed to raise NSP when the process is gone. |
| 553 # On UNIX this should return None, on Windows it should keep |
| 554 # returning the exit code. |
| 555 sproc = get_test_subprocess([PYTHON, "-c", code]) |
| 556 p = psutil.Process(sproc.pid) |
| 557 self.assertEqual(p.wait(), 5) |
| 558 self.assertTrue(p.wait() in (5, None)) |
| 559 |
| 560 # test timeout |
| 561 sproc = get_test_subprocess() |
| 562 p = psutil.Process(sproc.pid) |
| 563 p.name |
| 564 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) |
| 565 |
| 566 # timeout < 0 not allowed |
| 567 self.assertRaises(ValueError, p.wait, -1) |
| 568 |
| 569 @skipUnless(POSIX) |
| 570 def test_wait_non_children(self): |
| 571 # test wait() against processes which are not our children |
| 572 code = "import sys;" |
| 573 code += "from subprocess import Popen, PIPE;" |
| 574 code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON |
| 575 code += "sp = Popen(cmd, stdout=PIPE);" |
| 576 code += "sys.stdout.write(str(sp.pid));" |
| 577 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE
) |
| 578 |
| 579 grandson_pid = int(sproc.stdout.read()) |
| 580 grandson_proc = psutil.Process(grandson_pid) |
| 581 try: |
| 582 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) |
| 583 grandson_proc.kill() |
| 584 ret = grandson_proc.wait() |
| 585 self.assertEqual(ret, None) |
| 586 finally: |
| 587 if grandson_proc.is_running(): |
| 588 grandson_proc.kill() |
| 589 grandson_proc.wait() |
| 590 |
| 591 def test_wait_timeout_0(self): |
| 592 sproc = get_test_subprocess() |
| 593 p = psutil.Process(sproc.pid) |
| 594 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) |
| 595 p.kill() |
| 596 stop_at = time.time() + 2 |
| 597 while 1: |
| 598 try: |
| 599 code = p.wait(0) |
| 600 except psutil.TimeoutExpired: |
| 601 if time.time() >= stop_at: |
| 602 raise |
| 603 else: |
| 604 break |
| 605 if os.name == 'posix': |
| 606 self.assertEqual(code, signal.SIGKILL) |
| 607 else: |
| 608 self.assertEqual(code, 0) |
| 609 self.assertFalse(p.is_running()) |
| 610 |
| 611 def test_cpu_percent(self): |
280 p = psutil.Process(os.getpid()) | 612 p = psutil.Process(os.getpid()) |
281 p.get_cpu_percent(interval=0.001) | 613 p.get_cpu_percent(interval=0.001) |
282 p.get_cpu_percent(interval=0.001) | 614 p.get_cpu_percent(interval=0.001) |
283 for x in xrange(100): | 615 for x in range(100): |
284 percent = p.get_cpu_percent(interval=None) | 616 percent = p.get_cpu_percent(interval=None) |
285 self.assertTrue(isinstance(percent, float)) | 617 self.assertTrue(isinstance(percent, float)) |
286 self.assertTrue(percent >= 0.0) | 618 self.assertTrue(percent >= 0.0) |
287 self.assertTrue(percent <= 100.0) | 619 self.assertTrue(percent <= 100.0) |
288 | 620 |
289 def test_get_process_cpu_times(self): | 621 def test_cpu_times(self): |
290 times = psutil.Process(os.getpid()).get_cpu_times() | 622 times = psutil.Process(os.getpid()).get_cpu_times() |
291 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) | 623 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) |
292 # make sure returned values can be pretty printed with strftime | 624 # make sure returned values can be pretty printed with strftime |
293 time.strftime("%H:%M:%S", time.localtime(times.user)) | 625 time.strftime("%H:%M:%S", time.localtime(times.user)) |
294 time.strftime("%H:%M:%S", time.localtime(times.system)) | 626 time.strftime("%H:%M:%S", time.localtime(times.system)) |
295 | 627 |
296 # Test Process.cpu_times() against os.times() | 628 # Test Process.cpu_times() against os.times() |
297 # os.times() is broken on Python 2.6 | 629 # os.times() is broken on Python 2.6 |
298 # http://bugs.python.org/issue1040026 | 630 # http://bugs.python.org/issue1040026 |
299 # XXX fails on OSX: not sure if it's for os.times(). We should | 631 # XXX fails on OSX: not sure if it's for os.times(). We should |
300 # try this with Python 2.7 and re-enable the test. | 632 # try this with Python 2.7 and re-enable the test. |
301 | 633 |
302 @skipUnless(sys.version_info > (2, 6, 1) and not OSX) | 634 @skipUnless(sys.version_info > (2, 6, 1) and not OSX) |
303 def test_get_process_cpu_times2(self): | 635 def test_cpu_times2(self): |
304 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() | 636 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() |
305 utime, ktime = os.times()[:2] | 637 utime, ktime = os.times()[:2] |
306 | 638 |
307 # Use os.times()[:2] as base values to compare our results | 639 # Use os.times()[:2] as base values to compare our results |
308 # using a tolerance of +/- 0.1 seconds. | 640 # using a tolerance of +/- 0.1 seconds. |
309 # It will fail if the difference between the values is > 0.1s. | 641 # It will fail if the difference between the values is > 0.1s. |
310 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: | 642 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: |
311 self.fail("expected: %s, found: %s" %(utime, user_time)) | 643 self.fail("expected: %s, found: %s" %(utime, user_time)) |
312 | 644 |
313 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: | 645 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: |
(...skipping 10 matching lines...) Expand all Loading... |
324 # tolerance of +/- 1 second. | 656 # tolerance of +/- 1 second. |
325 # It will fail if the difference between the values is > 2s. | 657 # It will fail if the difference between the values is > 2s. |
326 difference = abs(create_time - now) | 658 difference = abs(create_time - now) |
327 if difference > 2: | 659 if difference > 2: |
328 self.fail("expected: %s, found: %s, difference: %s" | 660 self.fail("expected: %s, found: %s, difference: %s" |
329 % (now, create_time, difference)) | 661 % (now, create_time, difference)) |
330 | 662 |
331 # make sure returned value can be pretty printed with strftime | 663 # make sure returned value can be pretty printed with strftime |
332 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) | 664 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) |
333 | 665 |
| 666 @skipIf(WINDOWS) |
| 667 def test_terminal(self): |
| 668 tty = sh('tty') |
| 669 p = psutil.Process(os.getpid()) |
| 670 self.assertEqual(p.terminal, tty) |
| 671 |
| 672 @skipIf(OSX, warn=False) |
| 673 def test_get_io_counters(self): |
| 674 p = psutil.Process(os.getpid()) |
| 675 # test reads |
| 676 io1 = p.get_io_counters() |
| 677 f = open(PYTHON, 'rb') |
| 678 f.read() |
| 679 f.close() |
| 680 io2 = p.get_io_counters() |
| 681 if not BSD: |
| 682 self.assertTrue(io2.read_count > io1.read_count) |
| 683 self.assertTrue(io2.write_count == io1.write_count) |
| 684 self.assertTrue(io2.read_bytes >= io1.read_bytes) |
| 685 self.assertTrue(io2.write_bytes >= io1.write_bytes) |
| 686 # test writes |
| 687 io1 = p.get_io_counters() |
| 688 f = tempfile.TemporaryFile() |
| 689 if sys.version_info >= (3,): |
| 690 f.write(bytes("x" * 1000000, 'ascii')) |
| 691 else: |
| 692 f.write("x" * 1000000) |
| 693 f.close() |
| 694 io2 = p.get_io_counters() |
| 695 if not BSD: |
| 696 self.assertTrue(io2.write_count > io1.write_count) |
| 697 self.assertTrue(io2.write_bytes > io1.write_bytes) |
| 698 self.assertTrue(io2.read_count >= io1.read_count) |
| 699 self.assertTrue(io2.read_bytes >= io1.read_bytes) |
| 700 |
| 701 @skipUnless(LINUX) |
| 702 def test_get_set_ionice(self): |
| 703 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE, |
| 704 IOPRIO_CLASS_IDLE) |
| 705 self.assertEqual(IOPRIO_CLASS_NONE, 0) |
| 706 self.assertEqual(IOPRIO_CLASS_RT, 1) |
| 707 self.assertEqual(IOPRIO_CLASS_BE, 2) |
| 708 self.assertEqual(IOPRIO_CLASS_IDLE, 3) |
| 709 p = psutil.Process(os.getpid()) |
| 710 try: |
| 711 p.set_ionice(2) |
| 712 ioclass, value = p.get_ionice() |
| 713 self.assertEqual(ioclass, 2) |
| 714 self.assertEqual(value, 4) |
| 715 # |
| 716 p.set_ionice(3) |
| 717 ioclass, value = p.get_ionice() |
| 718 self.assertEqual(ioclass, 3) |
| 719 self.assertEqual(value, 0) |
| 720 # |
| 721 p.set_ionice(2, 0) |
| 722 ioclass, value = p.get_ionice() |
| 723 self.assertEqual(ioclass, 2) |
| 724 self.assertEqual(value, 0) |
| 725 p.set_ionice(2, 7) |
| 726 ioclass, value = p.get_ionice() |
| 727 self.assertEqual(ioclass, 2) |
| 728 self.assertEqual(value, 7) |
| 729 self.assertRaises(ValueError, p.set_ionice, 2, 10) |
| 730 finally: |
| 731 p.set_ionice(IOPRIO_CLASS_NONE) |
| 732 |
334 def test_get_num_threads(self): | 733 def test_get_num_threads(self): |
| 734 # on certain platforms such as Linux we might test for exact |
| 735 # thread number, since we always have with 1 thread per process, |
| 736 # but this does not apply across all platforms (OSX, Windows) |
335 p = psutil.Process(os.getpid()) | 737 p = psutil.Process(os.getpid()) |
336 numt1 = p.get_num_threads() | 738 step1 = p.get_num_threads() |
337 if not WINDOWS and not OSX: | 739 |
338 # test is unreliable on Windows and OS X | 740 thread = ThreadTask() |
339 # NOTE: sleep(1) is too long for OS X, works with sleep(.5) | 741 thread.start() |
340 self.assertEqual(numt1, 1) | 742 try: |
341 t = threading.Thread(target=lambda:time.sleep(1)) | 743 step2 = p.get_num_threads() |
342 t.start() | 744 self.assertEqual(step2, step1 + 1) |
343 numt2 = p.get_num_threads() | 745 thread.stop() |
344 if WINDOWS: | 746 finally: |
345 self.assertTrue(numt2 > numt1) | 747 if thread._running: |
346 else: | 748 thread.stop() |
347 self.assertEqual(numt2, 2) | 749 |
| 750 def test_get_threads(self): |
| 751 p = psutil.Process(os.getpid()) |
| 752 step1 = p.get_threads() |
| 753 |
| 754 thread = ThreadTask() |
| 755 thread.start() |
| 756 |
| 757 try: |
| 758 step2 = p.get_threads() |
| 759 self.assertEqual(len(step2), len(step1) + 1) |
| 760 # on Linux, first thread id is supposed to be this process |
| 761 if LINUX: |
| 762 self.assertEqual(step2[0].id, os.getpid()) |
| 763 athread = step2[0] |
| 764 # test named tuple |
| 765 self.assertEqual(athread.id, athread[0]) |
| 766 self.assertEqual(athread.user_time, athread[1]) |
| 767 self.assertEqual(athread.system_time, athread[2]) |
| 768 # test num threads |
| 769 thread.stop() |
| 770 finally: |
| 771 if thread._running: |
| 772 thread.stop() |
348 | 773 |
349 def test_get_memory_info(self): | 774 def test_get_memory_info(self): |
350 p = psutil.Process(os.getpid()) | 775 p = psutil.Process(os.getpid()) |
351 | 776 |
352 # step 1 - get a base value to compare our results | 777 # step 1 - get a base value to compare our results |
353 rss1, vms1 = p.get_memory_info() | 778 rss1, vms1 = p.get_memory_info() |
354 percent1 = p.get_memory_percent() | 779 percent1 = p.get_memory_percent() |
355 self.assertTrue(rss1 > 0) | 780 self.assertTrue(rss1 > 0) |
356 self.assertTrue(vms1 > 0) | 781 self.assertTrue(vms1 > 0) |
357 | 782 |
358 # step 2 - allocate some memory | 783 # step 2 - allocate some memory |
359 memarr = [None] * 1500000 | 784 memarr = [None] * 1500000 |
360 | 785 |
361 rss2, vms2 = p.get_memory_info() | 786 rss2, vms2 = p.get_memory_info() |
362 percent2 = p.get_memory_percent() | 787 percent2 = p.get_memory_percent() |
363 # make sure that the memory usage bumped up | 788 # make sure that the memory usage bumped up |
364 self.assertTrue(rss2 > rss1) | 789 self.assertTrue(rss2 > rss1) |
365 self.assertTrue(vms2 >= vms1) # vms might be equal | 790 self.assertTrue(vms2 >= vms1) # vms might be equal |
366 self.assertTrue(percent2 > percent1) | 791 self.assertTrue(percent2 > percent1) |
367 del memarr | 792 del memarr |
368 | 793 |
369 def test_get_memory_percent(self): | 794 def test_get_memory_percent(self): |
370 p = psutil.Process(os.getpid()) | 795 p = psutil.Process(os.getpid()) |
371 self.assertTrue(p.get_memory_percent() > 0.0) | 796 self.assertTrue(p.get_memory_percent() > 0.0) |
372 | 797 |
373 def test_pid(self): | 798 def test_pid(self): |
374 sproc = get_test_subprocess() | 799 sproc = get_test_subprocess() |
375 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) | 800 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) |
376 | 801 |
377 def test_eq(self): | |
378 sproc = get_test_subprocess() | |
379 wait_for_pid(sproc.pid) | |
380 self.assertTrue(psutil.Process(sproc.pid) == psutil.Process(sproc.pid)) | |
381 | |
382 def test_is_running(self): | 802 def test_is_running(self): |
383 sproc = get_test_subprocess() | 803 sproc = get_test_subprocess() |
384 wait_for_pid(sproc.pid) | 804 wait_for_pid(sproc.pid) |
385 p = psutil.Process(sproc.pid) | 805 p = psutil.Process(sproc.pid) |
386 self.assertTrue(p.is_running()) | 806 self.assertTrue(p.is_running()) |
387 psutil.Process(sproc.pid).kill() | 807 p.kill() |
388 sproc.wait() | 808 p.wait() |
389 self.assertFalse(p.is_running()) | 809 self.assertFalse(p.is_running()) |
390 | 810 |
391 def test_pid_exists(self): | |
392 sproc = get_test_subprocess() | |
393 wait_for_pid(sproc.pid) | |
394 self.assertTrue(psutil.pid_exists(sproc.pid)) | |
395 psutil.Process(sproc.pid).kill() | |
396 sproc.wait() | |
397 self.assertFalse(psutil.pid_exists(sproc.pid)) | |
398 self.assertFalse(psutil.pid_exists(-1)) | |
399 | |
400 def test_exe(self): | 811 def test_exe(self): |
401 sproc = get_test_subprocess() | 812 sproc = get_test_subprocess() |
402 wait_for_pid(sproc.pid) | 813 wait_for_pid(sproc.pid) |
403 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON) | 814 try: |
| 815 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON) |
| 816 except AssertionError: |
| 817 # certain platforms such as BSD are more accurate returning: |
| 818 # "/usr/local/bin/python2.7" |
| 819 # ...instead of: |
| 820 # "/usr/local/bin/python" |
| 821 # We do not want to consider this difference in accuracy |
| 822 # an error. |
| 823 name = psutil.Process(sproc.pid).exe |
| 824 adjusted_name = PYTHON[:len(name)] |
| 825 self.assertEqual(name, adjusted_name) |
404 for p in psutil.process_iter(): | 826 for p in psutil.process_iter(): |
405 try: | 827 try: |
406 exe = p.exe | 828 exe = p.exe |
407 except psutil.Error: | 829 except psutil.Error: |
408 continue | 830 continue |
409 if not exe: | 831 if not exe: |
410 continue | 832 continue |
411 if not os.path.exists(exe): | 833 if not os.path.exists(exe): |
412 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \ | 834 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \ |
413 % (repr(exe), p.pid, p.name, p.cmdline)) | 835 % (repr(exe), p.pid, p.name, p.cmdline)) |
414 if hasattr(os, 'access') and hasattr(os, "X_OK"): | 836 if hasattr(os, 'access') and hasattr(os, "X_OK"): |
415 if not os.access(p.exe, os.X_OK): | 837 if not os.access(p.exe, os.X_OK): |
416 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s
)" \ | 838 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s
)" \ |
417 % (repr(p.exe), p.pid, p.name, p.cmdline)) | 839 % (repr(p.exe), p.pid, p.name, p.cmdline)) |
418 | 840 |
419 def test_path(self): | |
420 proc = psutil.Process(os.getpid()) | |
421 warnings.filterwarnings("error") | |
422 try: | |
423 self.assertRaises(DeprecationWarning, getattr, proc, 'path') | |
424 finally: | |
425 warnings.resetwarnings() | |
426 | |
427 def test_cmdline(self): | 841 def test_cmdline(self): |
428 sproc = get_test_subprocess([PYTHON, "-E"]) | 842 sproc = get_test_subprocess([PYTHON, "-E"]) |
429 wait_for_pid(sproc.pid) | 843 wait_for_pid(sproc.pid) |
430 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"]) | 844 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"]) |
431 | 845 |
432 def test_name(self): | 846 def test_name(self): |
433 sproc = get_test_subprocess(PYTHON) | 847 sproc = get_test_subprocess(PYTHON) |
434 wait_for_pid(sproc.pid) | 848 wait_for_pid(sproc.pid) |
435 if OSX: | 849 self.assertEqual(psutil.Process(sproc.pid).name.lower(), |
436 self.assertEqual(psutil.Process(sproc.pid).name, "Python") | 850 os.path.basename(sys.executable).lower()) |
437 else: | |
438 self.assertEqual(psutil.Process(sproc.pid).name, os.path.basename(PY
THON)) | |
439 | 851 |
440 def test_uid(self): | 852 if os.name == 'posix': |
441 sproc = get_test_subprocess() | |
442 wait_for_pid(sproc.pid) | |
443 uid = psutil.Process(sproc.pid).uid | |
444 if hasattr(os, 'getuid'): | |
445 self.assertEqual(uid, os.getuid()) | |
446 else: | |
447 # On those platforms where UID doesn't make sense (Windows) | |
448 # we expect it to be -1 | |
449 self.assertEqual(uid, -1) | |
450 | 853 |
451 def test_gid(self): | 854 def test_uids(self): |
452 sproc = get_test_subprocess() | 855 p = psutil.Process(os.getpid()) |
453 wait_for_pid(sproc.pid) | 856 real, effective, saved = p.uids |
454 gid = psutil.Process(sproc.pid).gid | 857 # os.getuid() refers to "real" uid |
455 if hasattr(os, 'getgid'): | 858 self.assertEqual(real, os.getuid()) |
456 self.assertEqual(gid, os.getgid()) | 859 # os.geteuid() refers to "effective" uid |
457 else: | 860 self.assertEqual(effective, os.geteuid()) |
458 # On those platforms where GID doesn't make sense (Windows) | 861 # no such thing as os.getsuid() ("saved" uid), but starting |
459 # we expect it to be -1 | 862 # from python 2.7 we have os.getresuid()[2] |
460 self.assertEqual(gid, -1) | 863 if hasattr(os, "getresuid"): |
| 864 self.assertEqual(saved, os.getresuid()[2]) |
| 865 |
| 866 def test_gids(self): |
| 867 p = psutil.Process(os.getpid()) |
| 868 real, effective, saved = p.gids |
| 869 # os.getuid() refers to "real" uid |
| 870 self.assertEqual(real, os.getgid()) |
| 871 # os.geteuid() refers to "effective" uid |
| 872 self.assertEqual(effective, os.getegid()) |
| 873 # no such thing as os.getsuid() ("saved" uid), but starting |
| 874 # from python 2.7 we have os.getresgid()[2] |
| 875 if hasattr(os, "getresuid"): |
| 876 self.assertEqual(saved, os.getresgid()[2]) |
| 877 |
| 878 def test_nice(self): |
| 879 p = psutil.Process(os.getpid()) |
| 880 self.assertRaises(TypeError, setattr, p, "nice", "str") |
| 881 try: |
| 882 try: |
| 883 first_nice = p.nice |
| 884 p.nice = 1 |
| 885 self.assertEqual(p.nice, 1) |
| 886 # going back to previous nice value raises AccessDenied on O
SX |
| 887 if not OSX: |
| 888 p.nice = 0 |
| 889 self.assertEqual(p.nice, 0) |
| 890 except psutil.AccessDenied: |
| 891 pass |
| 892 finally: |
| 893 # going back to previous nice value raises AccessDenied on OSX |
| 894 if not OSX: |
| 895 p.nice = first_nice |
| 896 |
| 897 if os.name == 'nt': |
| 898 |
| 899 def test_nice(self): |
| 900 p = psutil.Process(os.getpid()) |
| 901 self.assertRaises(TypeError, setattr, p, "nice", "str") |
| 902 try: |
| 903 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) |
| 904 p.nice = psutil.HIGH_PRIORITY_CLASS |
| 905 self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS) |
| 906 p.nice = psutil.NORMAL_PRIORITY_CLASS |
| 907 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) |
| 908 finally: |
| 909 p.nice = psutil.NORMAL_PRIORITY_CLASS |
| 910 |
| 911 def test_status(self): |
| 912 p = psutil.Process(os.getpid()) |
| 913 self.assertEqual(p.status, psutil.STATUS_RUNNING) |
| 914 self.assertEqual(str(p.status), "running") |
| 915 for p in psutil.process_iter(): |
| 916 if str(p.status) == '?': |
| 917 self.fail("invalid status for pid %d" % p.pid) |
461 | 918 |
462 def test_username(self): | 919 def test_username(self): |
463 sproc = get_test_subprocess() | 920 sproc = get_test_subprocess() |
464 p = psutil.Process(sproc.pid) | 921 p = psutil.Process(sproc.pid) |
465 if POSIX: | 922 if POSIX: |
466 import pwd | 923 import pwd |
467 user = pwd.getpwuid(p.uid).pw_name | 924 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) |
468 self.assertEqual(p.username, user) | |
469 elif WINDOWS: | 925 elif WINDOWS: |
470 expected_username = os.environ['USERNAME'] | 926 expected_username = os.environ['USERNAME'] |
471 expected_domain = os.environ['USERDOMAIN'] | 927 expected_domain = os.environ['USERDOMAIN'] |
472 domain, username = p.username.split('\\') | 928 domain, username = p.username.split('\\') |
473 self.assertEqual(domain, expected_domain) | 929 self.assertEqual(domain, expected_domain) |
474 self.assertEqual(username, expected_username) | 930 self.assertEqual(username, expected_username) |
475 else: | 931 else: |
476 p.username | 932 p.username |
477 | 933 |
478 @skipUnless(WINDOWS or LINUX) | 934 @skipUnless(WINDOWS or LINUX) |
479 def test_getcwd(self): | 935 def test_getcwd(self): |
480 sproc = get_test_subprocess() | 936 sproc = get_test_subprocess() |
481 wait_for_pid(sproc.pid) | 937 wait_for_pid(sproc.pid) |
482 p = psutil.Process(sproc.pid) | 938 p = psutil.Process(sproc.pid) |
483 self.assertEqual(p.getcwd(), os.getcwd()) | 939 self.assertEqual(p.getcwd(), os.getcwd()) |
484 | 940 |
485 @skipUnless(WINDOWS or LINUX) | 941 @skipUnless(WINDOWS or LINUX) |
486 def test_getcwd_2(self): | 942 def test_getcwd_2(self): |
487 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] | 943 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] |
488 sproc = get_test_subprocess(cmd) | 944 sproc = get_test_subprocess(cmd) |
489 wait_for_pid(sproc.pid) | 945 wait_for_pid(sproc.pid) |
490 p = psutil.Process(sproc.pid) | 946 p = psutil.Process(sproc.pid) |
491 time.sleep(0.1) | 947 time.sleep(0.1) |
492 expected_dir = os.path.dirname(os.getcwd()) | 948 expected_dir = os.path.dirname(os.getcwd()) |
493 self.assertEqual(p.getcwd(), expected_dir) | 949 self.assertEqual(p.getcwd(), expected_dir) |
494 | 950 |
495 def test_get_open_files(self): | 951 def test_get_open_files(self): |
496 thisfile = os.path.join(os.getcwd(), __file__) | |
497 | |
498 # current process | 952 # current process |
499 p = psutil.Process(os.getpid()) | 953 p = psutil.Process(os.getpid()) |
500 files = p.get_open_files() | 954 files = p.get_open_files() |
501 self.assertFalse(thisfile in files) | 955 self.assertFalse(TESTFN in files) |
502 f = open(thisfile, 'r') | 956 f = open(TESTFN, 'r') |
| 957 time.sleep(.1) |
503 filenames = [x.path for x in p.get_open_files()] | 958 filenames = [x.path for x in p.get_open_files()] |
504 self.assertTrue(thisfile in filenames) | 959 self.assertTrue(TESTFN in filenames) |
505 f.close() | 960 f.close() |
506 for file in filenames: | 961 for file in filenames: |
507 self.assertTrue(os.path.isfile(file)) | 962 self.assertTrue(os.path.isfile(file)) |
508 | 963 |
509 # another process | 964 # another process |
510 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % thisfi
le | 965 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN |
511 sproc = get_test_subprocess([PYTHON, "-c", cmdline]) | 966 sproc = get_test_subprocess([PYTHON, "-c", cmdline]) |
512 wait_for_pid(sproc.pid) | 967 wait_for_pid(sproc.pid) |
513 time.sleep(0.1) | 968 time.sleep(0.1) |
514 p = psutil.Process(sproc.pid) | 969 p = psutil.Process(sproc.pid) |
515 filenames = [x.path for x in p.get_open_files()] | 970 for x in range(100): |
516 self.assertTrue(thisfile in filenames) | 971 filenames = [x.path for x in p.get_open_files()] |
| 972 if TESTFN in filenames: |
| 973 break |
| 974 time.sleep(.01) |
| 975 else: |
| 976 self.assertTrue(TESTFN in filenames) |
517 for file in filenames: | 977 for file in filenames: |
518 self.assertTrue(os.path.isfile(file)) | 978 self.assertTrue(os.path.isfile(file)) |
519 # all processes | 979 # all processes |
520 for proc in psutil.process_iter(): | 980 for proc in psutil.process_iter(): |
521 try: | 981 try: |
522 files = proc.get_open_files() | 982 files = proc.get_open_files() |
523 except psutil.Error: | 983 except psutil.Error: |
524 pass | 984 pass |
525 else: | 985 else: |
526 for file in filenames: | 986 for file in filenames: |
527 self.assertTrue(os.path.isfile(file)) | 987 self.assertTrue(os.path.isfile(file)) |
528 | 988 |
529 def test_get_open_files2(self): | 989 def test_get_open_files2(self): |
530 # test fd and path fields | 990 # test fd and path fields |
531 fileobj = open(os.path.join(os.getcwd(), __file__), 'r') | 991 fileobj = open(TESTFN, 'r') |
532 p = psutil.Process(os.getpid()) | 992 p = psutil.Process(os.getpid()) |
533 for path, fd in p.get_open_files(): | 993 for path, fd in p.get_open_files(): |
534 if path == fileobj.name or fd == fileobj.fileno(): | 994 if path == fileobj.name or fd == fileobj.fileno(): |
535 break | 995 break |
536 else: | 996 else: |
537 self.fail("no file found; files=%s" % repr(p.get_open_files())) | 997 self.fail("no file found; files=%s" % repr(p.get_open_files())) |
538 self.assertEqual(path, fileobj.name) | 998 self.assertEqual(path, fileobj.name) |
539 if WINDOWS: | 999 if WINDOWS: |
540 self.assertEqual(fd, -1) | 1000 self.assertEqual(fd, -1) |
541 else: | 1001 else: |
542 self.assertEqual(fd, fileobj.fileno()) | 1002 self.assertEqual(fd, fileobj.fileno()) |
543 # test positions | 1003 # test positions |
544 ntuple = p.get_open_files()[0] | 1004 ntuple = p.get_open_files()[0] |
545 self.assertEqual(ntuple[0], ntuple.path) | 1005 self.assertEqual(ntuple[0], ntuple.path) |
546 self.assertEqual(ntuple[1], ntuple.fd) | 1006 self.assertEqual(ntuple[1], ntuple.fd) |
547 # test file is gone | 1007 # test file is gone |
548 fileobj.close() | 1008 fileobj.close() |
549 self.assertTrue(fileobj.name not in p.get_open_files()) | 1009 self.assertTrue(fileobj.name not in p.get_open_files()) |
550 | 1010 |
551 @skipUnless(SUPPORT_CONNECTIONS, warn=1) | 1011 @skipUnless(SUPPORT_CONNECTIONS, warn=1) |
552 def test_get_connections(self): | 1012 def test_get_connections(self): |
553 arg = "import socket, time;" \ | 1013 arg = "import socket, time;" \ |
554 "s = socket.socket();" \ | 1014 "s = socket.socket();" \ |
555 "s.bind(('127.0.0.1', 0));" \ | 1015 "s.bind(('127.0.0.1', 0));" \ |
556 "s.listen(1);" \ | 1016 "s.listen(1);" \ |
557 "conn, addr = s.accept();" \ | 1017 "conn, addr = s.accept();" \ |
558 "time.sleep(100);" | 1018 "time.sleep(100);" |
559 sproc = get_test_subprocess([PYTHON, "-c", arg]) | 1019 sproc = get_test_subprocess([PYTHON, "-c", arg]) |
560 time.sleep(0.1) | |
561 p = psutil.Process(sproc.pid) | 1020 p = psutil.Process(sproc.pid) |
562 cons = p.get_connections() | 1021 for x in range(100): |
563 self.assertTrue(len(cons) == 1) | 1022 cons = p.get_connections() |
| 1023 if cons: |
| 1024 break |
| 1025 time.sleep(.01) |
| 1026 self.assertEqual(len(cons), 1) |
564 con = cons[0] | 1027 con = cons[0] |
565 self.assertEqual(con.family, socket.AF_INET) | 1028 self.assertEqual(con.family, socket.AF_INET) |
566 self.assertEqual(con.type, socket.SOCK_STREAM) | 1029 self.assertEqual(con.type, socket.SOCK_STREAM) |
567 self.assertEqual(con.status, "LISTEN") | 1030 self.assertEqual(con.status, "LISTEN") |
568 ip, port = con.local_address | 1031 ip, port = con.local_address |
569 self.assertEqual(ip, '127.0.0.1') | 1032 self.assertEqual(ip, '127.0.0.1') |
570 self.assertEqual(con.remote_address, ()) | 1033 self.assertEqual(con.remote_address, ()) |
571 if WINDOWS: | 1034 if WINDOWS: |
572 self.assertEqual(con.fd, -1) | 1035 self.assertEqual(con.fd, -1) |
573 else: | 1036 else: |
574 self.assertTrue(con.fd > 0) | 1037 self.assertTrue(con.fd > 0) |
575 # test positions | 1038 # test positions |
576 self.assertEqual(con[0], con.fd) | 1039 self.assertEqual(con[0], con.fd) |
577 self.assertEqual(con[1], con.family) | 1040 self.assertEqual(con[1], con.family) |
578 self.assertEqual(con[2], con.type) | 1041 self.assertEqual(con[2], con.type) |
579 self.assertEqual(con[3], con.local_address) | 1042 self.assertEqual(con[3], con.local_address) |
580 self.assertEqual(con[4], con.remote_address) | 1043 self.assertEqual(con[4], con.remote_address) |
581 self.assertEqual(con[5], con.status) | 1044 self.assertEqual(con[5], con.status) |
582 | 1045 |
| 1046 @skipUnless(supports_ipv6()) |
| 1047 def test_get_connections_ipv6(self): |
| 1048 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| 1049 s.bind(('::1', 0)) |
| 1050 s.listen(1) |
| 1051 cons = psutil.Process(os.getpid()).get_connections() |
| 1052 s.close() |
| 1053 self.assertEqual(len(cons), 1) |
| 1054 self.assertEqual(cons[0].local_address[0], '::1') |
| 1055 |
583 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS) | 1056 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS) |
584 def test_connection_fromfd(self): | 1057 def test_connection_fromfd(self): |
585 sock = socket.socket() | 1058 sock = socket.socket() |
586 sock.bind(('127.0.0.1', 0)) | 1059 sock.bind(('localhost', 0)) |
587 sock.listen(1) | 1060 sock.listen(1) |
588 p = psutil.Process(os.getpid()) | 1061 p = psutil.Process(os.getpid()) |
589 for conn in p.get_connections(): | 1062 for conn in p.get_connections(): |
590 if conn.fd == sock.fileno(): | 1063 if conn.fd == sock.fileno(): |
591 break | 1064 break |
592 else: | 1065 else: |
593 sock.close() | 1066 sock.close() |
594 self.fail("couldn't find socket fd") | 1067 self.fail("couldn't find socket fd") |
595 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) | 1068 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
596 try: | 1069 try: |
597 self.assertEqual(dupsock.getsockname(), conn.local_address) | 1070 self.assertEqual(dupsock.getsockname(), conn.local_address) |
598 self.assertNotEqual(sock.fileno(), dupsock.fileno()) | 1071 self.assertNotEqual(sock.fileno(), dupsock.fileno()) |
599 finally: | 1072 finally: |
600 sock.close() | 1073 sock.close() |
601 dupsock.close() | 1074 dupsock.close() |
602 | 1075 |
603 @skipUnless(SUPPORT_CONNECTIONS, warn=1) | 1076 @skipUnless(SUPPORT_CONNECTIONS, warn=1) |
604 def test_get_connections_all(self): | 1077 def test_get_connections_all(self): |
605 | 1078 |
606 def check_address(addr, family): | 1079 def check_address(addr, family): |
607 if not addr: | 1080 if not addr: |
608 return | 1081 return |
609 ip, port = addr | 1082 ip, port = addr |
610 self.assertTrue(isinstance(port, int)) | 1083 self.assertTrue(isinstance(port, int)) |
611 if family == socket.AF_INET: | 1084 if family == socket.AF_INET: |
612 ip = map(int, ip.split('.')) | 1085 ip = list(map(int, ip.split('.'))) |
613 self.assertTrue(len(ip) == 4) | 1086 self.assertTrue(len(ip) == 4) |
614 for num in ip: | 1087 for num in ip: |
615 self.assertTrue(0 <= num <= 255) | 1088 self.assertTrue(0 <= num <= 255) |
616 self.assertTrue(0 <= port <= 65535) | 1089 self.assertTrue(0 <= port <= 65535) |
617 | 1090 |
618 def supports_ipv6(): | |
619 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): | |
620 return False | |
621 try: | |
622 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
623 sock.bind(("::1", 0)) | |
624 except (socket.error, socket.gaierror): | |
625 return False | |
626 else: | |
627 sock.close() | |
628 return True | |
629 | |
630 # all values are supposed to match Linux's tcp_states.h states | 1091 # all values are supposed to match Linux's tcp_states.h states |
631 # table across all platforms. | 1092 # table across all platforms. |
632 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", | 1093 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", |
633 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", | 1094 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", |
634 "LAST_ACK", "LISTEN", "CLOSING", ""] | 1095 "LAST_ACK", "LISTEN", "CLOSING", ""] |
635 | 1096 |
636 tcp_template = "import socket;" \ | 1097 tcp_template = "import socket;" \ |
637 "s = socket.socket($family, socket.SOCK_STREAM);" \ | 1098 "s = socket.socket($family, socket.SOCK_STREAM);" \ |
638 "s.bind(('$addr', 0));" \ | 1099 "s.bind(('$addr', 0));" \ |
639 "s.listen(1);" \ | 1100 "s.listen(1);" \ |
(...skipping 18 matching lines...) Expand all Loading... |
658 # families and tupes to enrich psutil results | 1119 # families and tupes to enrich psutil results |
659 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) | 1120 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) |
660 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) | 1121 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) |
661 if supports_ipv6(): | 1122 if supports_ipv6(): |
662 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) | 1123 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) |
663 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) | 1124 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) |
664 else: | 1125 else: |
665 tcp6_proc = None | 1126 tcp6_proc = None |
666 udp6_proc = None | 1127 udp6_proc = None |
667 | 1128 |
| 1129 # --- check connections of all processes |
| 1130 |
668 time.sleep(0.1) | 1131 time.sleep(0.1) |
669 this_proc = psutil.Process(os.getpid()) | |
670 children_pids = [p.pid for p in this_proc.get_children()] | |
671 for p in psutil.process_iter(): | 1132 for p in psutil.process_iter(): |
672 try: | 1133 try: |
673 cons = p.get_connections() | 1134 cons = p.get_connections() |
674 except (psutil.NoSuchProcess, psutil.AccessDenied): | 1135 except (psutil.NoSuchProcess, psutil.AccessDenied): |
675 pass | 1136 pass |
676 else: | 1137 else: |
677 for conn in cons: | 1138 for conn in cons: |
678 self.assertTrue(conn.type in (socket.SOCK_STREAM, | 1139 self.assertTrue(conn.type in (socket.SOCK_STREAM, |
679 socket.SOCK_DGRAM)) | 1140 socket.SOCK_DGRAM)) |
680 self.assertTrue(conn.family in (socket.AF_INET, | 1141 self.assertTrue(conn.family in (socket.AF_INET, |
681 socket.AF_INET6)) | 1142 socket.AF_INET6)) |
682 check_address(conn.local_address, conn.family) | 1143 check_address(conn.local_address, conn.family) |
683 check_address(conn.remote_address, conn.family) | 1144 check_address(conn.remote_address, conn.family) |
684 if conn.status not in valid_states: | 1145 if conn.status not in valid_states: |
685 self.fail("%s is not a valid status" %conn.status) | 1146 self.fail("%s is not a valid status" %conn.status) |
686 # actually try to bind the local socket; ignore IPv6 | 1147 # actually try to bind the local socket; ignore IPv6 |
687 # sockets as their address might be represented as | 1148 # sockets as their address might be represented as |
688 # an IPv4-mapped-address (e.g. "::127.0.0.1") | 1149 # an IPv4-mapped-address (e.g. "::127.0.0.1") |
689 # and that's rejected by bind() | 1150 # and that's rejected by bind() |
690 if conn.family == socket.AF_INET: | 1151 if conn.family == socket.AF_INET: |
691 s = socket.socket(conn.family, conn.type) | 1152 s = socket.socket(conn.family, conn.type) |
692 s.bind((conn.local_address[0], 0)) | 1153 s.bind((conn.local_address[0], 0)) |
693 s.close() | 1154 s.close() |
694 | 1155 |
695 if not WINDOWS and hasattr(socket, 'fromfd'): | 1156 if not WINDOWS and hasattr(socket, 'fromfd'): |
| 1157 dupsock = None |
696 try: | 1158 try: |
697 dupsock = socket.fromfd(conn.fd, conn.family, | 1159 try: |
698 conn.type) | 1160 dupsock = socket.fromfd(conn.fd, conn.family, |
699 except (socket.error, OSError), err: | 1161 conn.type) |
700 if err.args[0] == errno.EBADF: | 1162 except (socket.error, OSError): |
701 continue | 1163 err = sys.exc_info()[1] |
702 else: | 1164 if err.args[0] == errno.EBADF: |
703 raise | 1165 continue |
704 self.assertEqual(dupsock.family, conn.family) | 1166 else: |
705 self.assertEqual(dupsock.type, conn.type) | 1167 raise |
| 1168 # python >= 2.5 |
| 1169 if hasattr(dupsock, "family"): |
| 1170 self.assertEqual(dupsock.family, conn.family) |
| 1171 self.assertEqual(dupsock.type, conn.type) |
| 1172 finally: |
| 1173 if dupsock is not None: |
| 1174 dupsock.close() |
706 | 1175 |
707 # check matches against subprocesses | 1176 |
708 if p.pid in children_pids: | 1177 # --- check matches against subprocesses |
709 self.assertTrue(len(cons) == 1) | 1178 |
710 conn = cons[0] | 1179 for p in psutil.Process(os.getpid()).get_children(): |
711 # TCP v4 | 1180 for conn in p.get_connections(): |
712 if p.pid == tcp4_proc.pid: | 1181 # TCP v4 |
713 self.assertEqual(conn.family, socket.AF_INET) | 1182 if p.pid == tcp4_proc.pid: |
714 self.assertEqual(conn.type, socket.SOCK_STREAM) | 1183 self.assertEqual(conn.family, socket.AF_INET) |
715 self.assertEqual(conn.local_address[0], "127.0.0.1") | 1184 self.assertEqual(conn.type, socket.SOCK_STREAM) |
716 self.assertEqual(conn.remote_address, ()) | 1185 self.assertEqual(conn.local_address[0], "127.0.0.1") |
717 self.assertEqual(conn.status, "LISTEN") | 1186 self.assertEqual(conn.remote_address, ()) |
718 # UDP v4 | 1187 self.assertEqual(conn.status, "LISTEN") |
719 elif p.pid == udp4_proc.pid: | 1188 # UDP v4 |
720 self.assertEqual(conn.family, socket.AF_INET) | 1189 elif p.pid == udp4_proc.pid: |
721 self.assertEqual(conn.type, socket.SOCK_DGRAM) | 1190 self.assertEqual(conn.family, socket.AF_INET) |
722 self.assertEqual(conn.local_address[0], "127.0.0.1") | 1191 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
723 self.assertEqual(conn.remote_address, ()) | 1192 self.assertEqual(conn.local_address[0], "127.0.0.1") |
724 self.assertEqual(conn.status, "") | 1193 self.assertEqual(conn.remote_address, ()) |
725 # TCP v6 | 1194 self.assertEqual(conn.status, "") |
726 elif p.pid == getattr(tcp6_proc, "pid", None): | 1195 # TCP v6 |
727 self.assertEqual(conn.family, socket.AF_INET6) | 1196 elif p.pid == getattr(tcp6_proc, "pid", None): |
728 self.assertEqual(conn.type, socket.SOCK_STREAM) | 1197 self.assertEqual(conn.family, socket.AF_INET6) |
729 self.assertEqual(conn.local_address[0], "::1") | 1198 self.assertEqual(conn.type, socket.SOCK_STREAM) |
730 self.assertEqual(conn.remote_address, ()) | 1199 self.assertTrue(conn.local_address[0] in ("::", "::1")) |
731 self.assertEqual(conn.status, "LISTEN") | 1200 self.assertEqual(conn.remote_address, ()) |
732 # UDP v6 | 1201 self.assertEqual(conn.status, "LISTEN") |
733 elif p.pid == getattr(udp6_proc, "pid", None): | 1202 # UDP v6 |
734 self.assertEqual(conn.family, socket.AF_INET6) | 1203 elif p.pid == getattr(udp6_proc, "pid", None): |
735 self.assertEqual(conn.type, socket.SOCK_DGRAM) | 1204 self.assertEqual(conn.family, socket.AF_INET6) |
736 self.assertEqual(conn.local_address[0], "::1") | 1205 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
737 self.assertEqual(conn.remote_address, ()) | 1206 self.assertTrue(conn.local_address[0] in ("::", "::1")) |
738 self.assertEqual(conn.status, "") | 1207 self.assertEqual(conn.remote_address, ()) |
| 1208 self.assertEqual(conn.status, "") |
739 | 1209 |
740 def test_parent_ppid(self): | 1210 def test_parent_ppid(self): |
741 this_parent = os.getpid() | 1211 this_parent = os.getpid() |
742 sproc = get_test_subprocess() | 1212 sproc = get_test_subprocess() |
743 p = psutil.Process(sproc.pid) | 1213 p = psutil.Process(sproc.pid) |
744 self.assertEqual(p.ppid, this_parent) | 1214 self.assertEqual(p.ppid, this_parent) |
745 self.assertEqual(p.parent.pid, this_parent) | 1215 self.assertEqual(p.parent.pid, this_parent) |
746 # no other process is supposed to have us as parent | 1216 # no other process is supposed to have us as parent |
747 for p in psutil.process_iter(): | 1217 for p in psutil.process_iter(): |
748 if p.pid == sproc.pid: | 1218 if p.pid == sproc.pid: |
749 continue | 1219 continue |
750 self.assertTrue(p.ppid != this_parent) | 1220 self.assertTrue(p.ppid != this_parent) |
751 | 1221 |
752 def test_get_children(self): | 1222 def test_get_children(self): |
753 p = psutil.Process(os.getpid()) | 1223 p = psutil.Process(os.getpid()) |
754 self.assertEqual(p.get_children(), []) | 1224 self.assertEqual(p.get_children(), []) |
755 sproc = get_test_subprocess() | 1225 sproc = get_test_subprocess() |
756 children = p.get_children() | 1226 children = p.get_children() |
757 self.assertEqual(len(children), 1) | 1227 self.assertEqual(len(children), 1) |
758 self.assertEqual(children[0].pid, sproc.pid) | 1228 self.assertEqual(children[0].pid, sproc.pid) |
759 self.assertEqual(children[0].ppid, os.getpid()) | 1229 self.assertEqual(children[0].ppid, os.getpid()) |
760 | 1230 |
761 def test_suspend_resume(self): | 1231 def test_suspend_resume(self): |
762 sproc = get_test_subprocess() | 1232 sproc = get_test_subprocess() |
763 p = psutil.Process(sproc.pid) | 1233 p = psutil.Process(sproc.pid) |
764 p.suspend() | 1234 p.suspend() |
| 1235 time.sleep(0.1) |
| 1236 self.assertEqual(p.status, psutil.STATUS_STOPPED) |
| 1237 self.assertEqual(str(p.status), "stopped") |
765 p.resume() | 1238 p.resume() |
766 | 1239 self.assertTrue(p.status != psutil.STATUS_STOPPED) |
767 def test_get_pid_list(self): | |
768 plist = [x.pid for x in psutil.get_process_list()] | |
769 pidlist = psutil.get_pid_list() | |
770 self.assertEqual(plist.sort(), pidlist.sort()) | |
771 # make sure every pid is unique | |
772 self.assertEqual(len(pidlist), len(set(pidlist))) | |
773 | |
774 def test_test(self): | |
775 # test for psutil.test() function | |
776 stdout = sys.stdout | |
777 sys.stdout = DEVNULL | |
778 try: | |
779 psutil.test() | |
780 finally: | |
781 sys.stdout = stdout | |
782 | |
783 def test_types(self): | |
784 sproc = get_test_subprocess() | |
785 wait_for_pid(sproc.pid) | |
786 p = psutil.Process(sproc.pid) | |
787 self.assert_(isinstance(p.pid, int)) | |
788 self.assert_(isinstance(p.ppid, int)) | |
789 self.assert_(isinstance(p.parent, psutil.Process)) | |
790 self.assert_(isinstance(p.name, str)) | |
791 if self.__class__.__name__ != "LimitedUserTestCase": | |
792 self.assert_(isinstance(p.exe, str)) | |
793 self.assert_(isinstance(p.cmdline, list)) | |
794 self.assert_(isinstance(p.uid, int)) | |
795 self.assert_(isinstance(p.gid, int)) | |
796 self.assert_(isinstance(p.create_time, float)) | |
797 self.assert_(isinstance(p.username, (unicode, str))) | |
798 if hasattr(p, 'getcwd'): | |
799 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase": | |
800 self.assert_(isinstance(p.getcwd(), str)) | |
801 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase": | |
802 self.assert_(isinstance(p.get_open_files(), list)) | |
803 for path, fd in p.get_open_files(): | |
804 self.assert_(isinstance(path, (unicode, str))) | |
805 self.assert_(isinstance(fd, int)) | |
806 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase" \ | |
807 and SUPPORT_CONNECTIONS: | |
808 self.assert_(isinstance(p.get_connections(), list)) | |
809 self.assert_(isinstance(p.is_running(), bool)) | |
810 if not OSX or self.__class__.__name__ != "LimitedUserTestCase": | |
811 self.assert_(isinstance(p.get_cpu_times(), tuple)) | |
812 self.assert_(isinstance(p.get_cpu_times()[0], float)) | |
813 self.assert_(isinstance(p.get_cpu_times()[1], float)) | |
814 self.assert_(isinstance(p.get_cpu_percent(0), float)) | |
815 self.assert_(isinstance(p.get_memory_info(), tuple)) | |
816 self.assert_(isinstance(p.get_memory_info()[0], int)) | |
817 self.assert_(isinstance(p.get_memory_info()[1], int)) | |
818 self.assert_(isinstance(p.get_memory_percent(), float)) | |
819 self.assert_(isinstance(p.get_num_threads(), int)) | |
820 self.assert_(isinstance(psutil.get_process_list(), list)) | |
821 self.assert_(isinstance(psutil.get_process_list()[0], psutil.Process)) | |
822 self.assert_(isinstance(psutil.process_iter(), types.GeneratorType)) | |
823 self.assert_(isinstance(psutil.process_iter().next(), psutil.Process)) | |
824 self.assert_(isinstance(psutil.get_pid_list(), list)) | |
825 self.assert_(isinstance(psutil.get_pid_list()[0], int)) | |
826 self.assert_(isinstance(psutil.pid_exists(1), bool)) | |
827 | 1240 |
828 def test_invalid_pid(self): | 1241 def test_invalid_pid(self): |
829 self.assertRaises(ValueError, psutil.Process, "1") | 1242 self.assertRaises(ValueError, psutil.Process, "1") |
830 self.assertRaises(ValueError, psutil.Process, None) | 1243 self.assertRaises(ValueError, psutil.Process, None) |
831 # Refers to Issue #12 | 1244 # Refers to Issue #12 |
832 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1) | 1245 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1) |
833 | 1246 |
834 def test_zombie_process(self): | 1247 def test_zombie_process(self): |
835 # Test that NoSuchProcess exception gets raised in the event the | 1248 # Test that NoSuchProcess exception gets raised in case the |
836 # process dies after we create the Process object. | 1249 # process dies after we create the Process object. |
837 # Example: | 1250 # Example: |
838 # >>> proc = Process(1234) | 1251 # >>> proc = Process(1234) |
839 # >>> time.sleep(5) # time-consuming task, process dies in meantime | 1252 # >>> time.sleep(5) # time-consuming task, process dies in meantime |
840 # >>> proc.name | 1253 # >>> proc.name |
841 # Refers to Issue #15 | 1254 # Refers to Issue #15 |
842 sproc = get_test_subprocess() | 1255 sproc = get_test_subprocess() |
843 p = psutil.Process(sproc.pid) | 1256 p = psutil.Process(sproc.pid) |
844 p.kill() | 1257 p.kill() |
845 sproc.wait() | 1258 p.wait() |
846 | 1259 |
847 self.assertRaises(psutil.NoSuchProcess, getattr, p, "ppid") | 1260 for name in dir(p): |
848 self.assertRaises(psutil.NoSuchProcess, getattr, p, "parent") | 1261 if name.startswith('_')\ |
849 self.assertRaises(psutil.NoSuchProcess, getattr, p, "name") | 1262 or name in ('pid', 'send_signal', 'is_running', 'set_ionice', |
850 self.assertRaises(psutil.NoSuchProcess, getattr, p, "exe") | 1263 'wait'): |
851 self.assertRaises(psutil.NoSuchProcess, getattr, p, "cmdline") | 1264 continue |
852 self.assertRaises(psutil.NoSuchProcess, getattr, p, "uid") | 1265 try: |
853 self.assertRaises(psutil.NoSuchProcess, getattr, p, "gid") | 1266 meth = getattr(p, name) |
854 self.assertRaises(psutil.NoSuchProcess, getattr, p, "create_time") | 1267 if callable(meth): |
855 self.assertRaises(psutil.NoSuchProcess, getattr, p, "username") | 1268 meth() |
856 if hasattr(p, 'getcwd'): | 1269 except psutil.NoSuchProcess: |
857 self.assertRaises(psutil.NoSuchProcess, p.getcwd) | 1270 pass |
858 self.assertRaises(psutil.NoSuchProcess, p.get_open_files) | 1271 else: |
859 self.assertRaises(psutil.NoSuchProcess, p.get_connections) | 1272 self.fail("NoSuchProcess exception not raised for %r" % name) |
860 self.assertRaises(psutil.NoSuchProcess, p.suspend) | 1273 |
861 self.assertRaises(psutil.NoSuchProcess, p.resume) | 1274 # other methods |
862 self.assertRaises(psutil.NoSuchProcess, p.kill) | 1275 try: |
863 self.assertRaises(psutil.NoSuchProcess, p.terminate) | 1276 if os.name == 'posix': |
| 1277 p.nice = 1 |
| 1278 else: |
| 1279 p.nice = psutil.NORMAL_PRIORITY_CLASS |
| 1280 except psutil.NoSuchProcess: |
| 1281 pass |
| 1282 else: |
| 1283 self.fail("exception not raised") |
| 1284 if hasattr(p, 'set_ionice'): |
| 1285 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2) |
864 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) | 1286 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) |
865 self.assertRaises(psutil.NoSuchProcess, p.get_cpu_times) | |
866 self.assertRaises(psutil.NoSuchProcess, p.get_cpu_percent, 0) | |
867 self.assertRaises(psutil.NoSuchProcess, p.get_memory_info) | |
868 self.assertRaises(psutil.NoSuchProcess, p.get_memory_percent) | |
869 self.assertRaises(psutil.NoSuchProcess, p.get_children) | |
870 self.assertRaises(psutil.NoSuchProcess, p.get_num_threads) | |
871 self.assertFalse(p.is_running()) | 1287 self.assertFalse(p.is_running()) |
872 | 1288 |
873 def test__str__(self): | 1289 def test__str__(self): |
874 sproc = get_test_subprocess() | 1290 sproc = get_test_subprocess() |
875 p = psutil.Process(sproc.pid) | 1291 p = psutil.Process(sproc.pid) |
876 self.assertTrue(str(sproc.pid) in str(p)) | 1292 self.assertTrue(str(sproc.pid) in str(p)) |
877 self.assertTrue(os.path.basename(PYTHON) in str(p)) | 1293 # python shows up as 'Python' in cmdline on OS X so test fails on OS X |
| 1294 if not OSX: |
| 1295 self.assertTrue(os.path.basename(PYTHON) in str(p)) |
878 sproc = get_test_subprocess() | 1296 sproc = get_test_subprocess() |
879 p = psutil.Process(sproc.pid) | 1297 p = psutil.Process(sproc.pid) |
880 p.kill() | 1298 p.kill() |
881 sproc.wait() | 1299 p.wait() |
882 self.assertTrue(str(sproc.pid) in str(p)) | 1300 self.assertTrue(str(sproc.pid) in str(p)) |
883 self.assertTrue("terminated" in str(p)) | 1301 self.assertTrue("terminated" in str(p)) |
884 | 1302 |
885 def test_fetch_all(self): | 1303 def test_fetch_all(self): |
886 valid_procs = 0 | 1304 valid_procs = 0 |
887 attrs = ['__str__', 'create_time', 'username', 'getcwd', 'get_cpu_times'
, | 1305 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', |
888 'get_memory_info', 'get_memory_percent', 'get_open_files', | 1306 'kill', 'wait'] |
889 'get_num_threads'] | 1307 excluded_names += ['get_cpu_percent', 'get_children'] |
| 1308 # XXX - skip slow lsof implementation; |
| 1309 if BSD: |
| 1310 excluded_names += ['get_open_files', 'get_connections'] |
| 1311 if OSX: |
| 1312 excluded_names += ['get_connections'] |
| 1313 attrs = [] |
| 1314 for name in dir(psutil.Process): |
| 1315 if name.startswith("_"): |
| 1316 continue |
| 1317 if name.startswith("set_"): |
| 1318 continue |
| 1319 if name in excluded_names: |
| 1320 continue |
| 1321 attrs.append(name) |
| 1322 |
890 for p in psutil.process_iter(): | 1323 for p in psutil.process_iter(): |
891 for attr in attrs: | 1324 for name in attrs: |
892 # skip slow Python implementation; we're reasonably sure | |
893 # it works anyway | |
894 if POSIX and attr == 'get_open_files': | |
895 continue | |
896 try: | 1325 try: |
897 attr = getattr(p, attr, None) | 1326 try: |
898 if attr is not None and callable(attr): | 1327 attr = getattr(p, name, None) |
899 attr() | 1328 if attr is not None and callable(attr): |
900 valid_procs += 1 | 1329 ret = attr() |
901 except (psutil.NoSuchProcess, psutil.AccessDenied), err: | 1330 else: |
902 self.assertEqual(err.pid, p.pid) | 1331 ret = attr |
903 if err.name: | 1332 valid_procs += 1 |
904 self.assertEqual(err.name, p.name) | 1333 except (psutil.NoSuchProcess, psutil.AccessDenied): |
905 self.assertTrue(str(err)) | 1334 err = sys.exc_info()[1] |
906 self.assertTrue(err.msg) | 1335 self.assertEqual(err.pid, p.pid) |
907 except: | 1336 if err.name: |
| 1337 self.assertEqual(err.name, p.name) |
| 1338 self.assertTrue(str(err)) |
| 1339 self.assertTrue(err.msg) |
| 1340 else: |
| 1341 if name == 'parent' or ret in (0, 0.0, [], None): |
| 1342 continue |
| 1343 self.assertTrue(ret) |
| 1344 if name == "exe": |
| 1345 self.assertTrue(os.path.isfile(ret)) |
| 1346 elif name == "getcwd": |
| 1347 # XXX - temporary fix; on my Linux box |
| 1348 # chrome process cws is errnously reported |
| 1349 # as /proc/4144/fdinfo whichd doesn't exist |
| 1350 if 'chrome' in p.name: |
| 1351 continue |
| 1352 self.assertTrue(os.path.isdir(ret)) |
| 1353 except Exception: |
| 1354 err = sys.exc_info()[1] |
908 trace = traceback.format_exc() | 1355 trace = traceback.format_exc() |
909 self.fail('Exception raised for method %s, pid %s:\n%s' | 1356 self.fail('%s\nmethod=%s, pid=%s, retvalue=%s' |
910 %(attr, p.pid, trace)) | 1357 %(trace, name, p.pid, repr(ret))) |
911 | 1358 |
912 # we should always have a non-empty list, not including PID 0 etc. | 1359 # we should always have a non-empty list, not including PID 0 etc. |
913 # special cases. | 1360 # special cases. |
914 self.assertTrue(valid_procs > 0) | 1361 self.assertTrue(valid_procs > 0) |
915 | 1362 |
| 1363 @skipIf(LINUX) |
916 def test_pid_0(self): | 1364 def test_pid_0(self): |
917 # Process(0) is supposed to work on all platforms even if with | 1365 # Process(0) is supposed to work on all platforms except Linux |
918 # some differences | |
919 p = psutil.Process(0) | 1366 p = psutil.Process(0) |
920 if WINDOWS: | 1367 self.assertTrue(p.name) |
921 self.assertEqual(p.name, 'System Idle Process') | |
922 elif LINUX: | |
923 self.assertEqual(p.name, 'sched') | |
924 elif BSD: | |
925 self.assertEqual(p.name, 'swapper') | |
926 elif OSX: | |
927 self.assertEqual(p.name, 'kernel_task') | |
928 | 1368 |
929 if os.name == 'posix': | 1369 if os.name == 'posix': |
930 self.assertEqual(p.uid, 0) | 1370 self.assertEqual(p.uids.real, 0) |
931 self.assertEqual(p.gid, 0) | 1371 self.assertEqual(p.gids.real, 0) |
932 else: | |
933 self.assertEqual(p.uid, -1) | |
934 self.assertEqual(p.gid, -1) | |
935 | 1372 |
936 self.assertTrue(p.ppid in (0, 1)) | 1373 self.assertTrue(p.ppid in (0, 1)) |
937 self.assertEqual(p.exe, "") | 1374 #self.assertEqual(p.exe, "") |
938 self.assertEqual(p.cmdline, []) | 1375 self.assertEqual(p.cmdline, []) |
939 # this can either raise AD (Win) or return 0 (UNIX) | |
940 try: | 1376 try: |
941 self.assertTrue(p.get_num_threads() in (0, 1)) | 1377 p.get_num_threads() |
942 except psutil.AccessDenied: | 1378 except psutil.AccessDenied: |
943 pass | 1379 pass |
944 | 1380 |
945 if OSX : #and os.geteuid() != 0: | 1381 if OSX : #and os.geteuid() != 0: |
946 self.assertRaises(psutil.AccessDenied, p.get_memory_info) | 1382 self.assertRaises(psutil.AccessDenied, p.get_memory_info) |
947 self.assertRaises(psutil.AccessDenied, p.get_cpu_times) | 1383 self.assertRaises(psutil.AccessDenied, p.get_cpu_times) |
948 else: | 1384 else: |
949 p.get_memory_info() | 1385 p.get_memory_info() |
950 | 1386 |
951 # username property | 1387 # username property |
952 if POSIX: | 1388 if POSIX: |
953 self.assertEqual(p.username, 'root') | 1389 self.assertEqual(p.username, 'root') |
954 elif WINDOWS: | 1390 elif WINDOWS: |
955 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') | 1391 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') |
956 else: | 1392 else: |
957 p.username | 1393 p.username |
958 | 1394 |
959 # PID 0 is supposed to be available on all platforms | |
960 self.assertTrue(0 in psutil.get_pid_list()) | 1395 self.assertTrue(0 in psutil.get_pid_list()) |
961 self.assertTrue(psutil.pid_exists(0)) | 1396 self.assertTrue(psutil.pid_exists(0)) |
962 | 1397 |
963 # --- OS specific tests | 1398 def test_Popen(self): |
| 1399 # Popen class test |
| 1400 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] |
| 1401 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 1402 try: |
| 1403 proc.name |
| 1404 proc.stdin |
| 1405 self.assertTrue(hasattr(proc, 'name')) |
| 1406 self.assertTrue(hasattr(proc, 'stdin')) |
| 1407 self.assertRaises(AttributeError, getattr, proc, 'foo') |
| 1408 finally: |
| 1409 proc.kill() |
| 1410 proc.wait() |
964 | 1411 |
965 | 1412 |
966 if hasattr(os, 'getuid'): | 1413 if hasattr(os, 'getuid'): |
967 class LimitedUserTestCase(TestCase): | 1414 class LimitedUserTestCase(TestCase): |
968 """Repeat the previous tests by using a limited user. | 1415 """Repeat the previous tests by using a limited user. |
969 Executed only on UNIX and only if the user who run the test script | 1416 Executed only on UNIX and only if the user who run the test script |
970 is root. | 1417 is root. |
971 """ | 1418 """ |
972 # the uid/gid the test suite runs under | 1419 # the uid/gid the test suite runs under |
973 PROCESS_UID = os.getuid() | 1420 PROCESS_UID = os.getuid() |
974 PROCESS_GID = os.getgid() | 1421 PROCESS_GID = os.getgid() |
975 | 1422 |
| 1423 def __init__(self, *args, **kwargs): |
| 1424 TestCase.__init__(self, *args, **kwargs) |
| 1425 # re-define all existent test methods in order to |
| 1426 # ignore AccessDenied exceptions |
| 1427 for attr in [x for x in dir(self) if x.startswith('test')]: |
| 1428 meth = getattr(self, attr) |
| 1429 def test_(self): |
| 1430 try: |
| 1431 meth() |
| 1432 except psutil.AccessDenied: |
| 1433 pass |
| 1434 setattr(self, attr, types.MethodType(test_, self)) |
| 1435 |
976 def setUp(self): | 1436 def setUp(self): |
977 os.setegid(1000) | 1437 os.setegid(1000) |
978 os.seteuid(1000) | 1438 os.seteuid(1000) |
979 TestCase.setUp(self) | 1439 TestCase.setUp(self) |
980 | 1440 |
981 def tearDown(self): | 1441 def tearDown(self): |
982 os.setegid(self.PROCESS_UID) | 1442 os.setegid(self.PROCESS_UID) |
983 os.seteuid(self.PROCESS_GID) | 1443 os.seteuid(self.PROCESS_GID) |
984 TestCase.tearDown(self) | 1444 TestCase.tearDown(self) |
985 | 1445 |
986 def test_path(self): | 1446 def test_nice(self): |
987 # DeprecationWarning is only raised once | 1447 try: |
988 pass | 1448 psutil.Process(os.getpid()).nice = -1 |
989 | 1449 except psutil.AccessDenied: |
990 # overridden tests known to raise AccessDenied when run | |
991 # as limited user on different platforms | |
992 | |
993 if LINUX: | |
994 | |
995 def test_getcwd(self): | |
996 self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd, sel
f) | |
997 | |
998 def test_getcwd_2(self): | |
999 self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd_2, s
elf) | |
1000 | |
1001 def test_get_open_files(self): | |
1002 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi
les, self) | |
1003 | |
1004 def test_get_connections(self): | |
1005 pass | 1450 pass |
1006 | 1451 else: |
1007 def test_exe(self): | 1452 self.fail("exception not raised") |
1008 self.assertRaises(psutil.AccessDenied, TestCase.test_exe, self) | |
1009 | |
1010 if BSD: | |
1011 | |
1012 def test_get_open_files(self): | |
1013 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi
les, self) | |
1014 | |
1015 def test_get_open_files2(self): | |
1016 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi
les, self) | |
1017 | |
1018 def test_get_connections(self): | |
1019 self.assertRaises(psutil.AccessDenied, TestCase.test_get_connect
ions, self) | |
1020 | |
1021 def test_connection_fromfd(self): | |
1022 self.assertRaises(psutil.AccessDenied, TestCase.test_connection_
fromfd, self) | |
1023 | 1453 |
1024 | 1454 |
1025 def test_main(): | 1455 def test_main(): |
1026 tests = [] | 1456 tests = [] |
1027 test_suite = unittest.TestSuite() | 1457 test_suite = unittest.TestSuite() |
1028 | |
1029 tests.append(TestCase) | 1458 tests.append(TestCase) |
1030 | 1459 |
1031 if hasattr(os, 'getuid'): | |
1032 if os.getuid() == 0: | |
1033 tests.append(LimitedUserTestCase) | |
1034 else: | |
1035 atexit.register(warnings.warn, "Couldn't run limited user tests (" | |
1036 "super-user privileges are required)", RuntimeWarning) | |
1037 | |
1038 if POSIX: | 1460 if POSIX: |
1039 from _posix import PosixSpecificTestCase | 1461 from _posix import PosixSpecificTestCase |
1040 tests.append(PosixSpecificTestCase) | 1462 tests.append(PosixSpecificTestCase) |
1041 | 1463 |
1042 # import the specific platform test suite | 1464 # import the specific platform test suite |
1043 if LINUX: | 1465 if LINUX: |
1044 from _linux import LinuxSpecificTestCase as stc | 1466 from _linux import LinuxSpecificTestCase as stc |
1045 elif WINDOWS: | 1467 elif WINDOWS: |
1046 from _windows import WindowsSpecificTestCase as stc | 1468 from _windows import WindowsSpecificTestCase as stc |
1047 elif OSX: | 1469 elif OSX: |
1048 from _osx import OSXSpecificTestCase as stc | 1470 from _osx import OSXSpecificTestCase as stc |
1049 elif BSD: | 1471 elif BSD: |
1050 from _bsd import BSDSpecificTestCase as stc | 1472 from _bsd import BSDSpecificTestCase as stc |
1051 tests.append(stc) | 1473 tests.append(stc) |
1052 | 1474 |
| 1475 if hasattr(os, 'getuid'): |
| 1476 if os.getuid() == 0: |
| 1477 tests.append(LimitedUserTestCase) |
| 1478 else: |
| 1479 atexit.register(warnings.warn, "Couldn't run limited user tests (" |
| 1480 "super-user privileges are required)", RuntimeWarning) |
| 1481 |
1053 for test_class in tests: | 1482 for test_class in tests: |
1054 test_suite.addTest(unittest.makeSuite(test_class)) | 1483 test_suite.addTest(unittest.makeSuite(test_class)) |
1055 | 1484 |
| 1485 f = open(TESTFN, 'w') |
| 1486 f.close() |
| 1487 atexit.register(lambda: os.remove(TESTFN)) |
| 1488 |
1056 unittest.TextTestRunner(verbosity=2).run(test_suite) | 1489 unittest.TextTestRunner(verbosity=2).run(test_suite) |
1057 DEVNULL.close() | 1490 DEVNULL.close() |
1058 | 1491 |
1059 if __name__ == '__main__': | 1492 if __name__ == '__main__': |
1060 test_main() | 1493 test_main() |
1061 | |
1062 | |
OLD | NEW |