| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # | 2 # |
| 3 # $Id: test_psutil.py 806 2010-11-12 23:09:35Z g.rodola $ | 3 # $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $ |
| 4 # | 4 # |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. |
| 6 # Use of this source code is governed by a BSD-style license that can be |
| 7 # found in the LICENSE file. |
| 5 | 8 |
| 6 """psutil test suite. | 9 """ |
| 7 Note: this is targeted for python 2.x. | 10 psutil test suite. |
| 8 To run it under python 3.x you need to use 2to3 tool first: | |
| 9 | 11 |
| 10 $ 2to3 -w test/*.py | 12 Note: this is targeted for both python 2.x and 3.x so there's no need |
| 13 to use 2to3 tool first. |
| 11 """ | 14 """ |
| 12 | 15 |
| 13 import unittest | 16 import unittest |
| 14 import os | 17 import os |
| 15 import sys | 18 import sys |
| 16 import subprocess | 19 import subprocess |
| 17 import time | 20 import time |
| 18 import signal | 21 import signal |
| 19 import types | 22 import types |
| 20 import traceback | 23 import traceback |
| 21 import socket | 24 import socket |
| 22 import warnings | 25 import warnings |
| 23 import atexit | 26 import atexit |
| 24 import errno | 27 import errno |
| 25 import threading | 28 import threading |
| 29 import tempfile |
| 30 import collections |
| 26 | 31 |
| 27 import psutil | 32 import psutil |
| 28 | 33 |
| 29 | 34 |
| 30 PYTHON = os.path.realpath(sys.executable) | 35 PYTHON = os.path.realpath(sys.executable) |
| 31 DEVNULL = open(os.devnull, 'r+') | 36 DEVNULL = open(os.devnull, 'r+') |
| 32 | 37 TESTFN = os.path.join(os.getcwd(), "$testfile") |
| 38 PY3 = sys.version_info >= (3,) |
| 33 POSIX = os.name == 'posix' | 39 POSIX = os.name == 'posix' |
| 34 LINUX = sys.platform.lower().startswith("linux") | 40 LINUX = sys.platform.lower().startswith("linux") |
| 35 WINDOWS = sys.platform.lower().startswith("win32") | 41 WINDOWS = sys.platform.lower().startswith("win32") |
| 36 OSX = sys.platform.lower().startswith("darwin") | 42 OSX = sys.platform.lower().startswith("darwin") |
| 37 BSD = sys.platform.lower().startswith("freebsd") | 43 BSD = sys.platform.lower().startswith("freebsd") |
| 38 | 44 |
| 39 try: | 45 try: |
| 40 psutil.Process(os.getpid()).get_connections() | 46 psutil.Process(os.getpid()).get_connections() |
| 41 except NotImplementedError, err: | 47 except NotImplementedError: |
| 48 err = sys.exc_info()[1] |
| 42 SUPPORT_CONNECTIONS = False | 49 SUPPORT_CONNECTIONS = False |
| 43 atexit.register(warnings.warn, "get_connections() not supported on this plat
form", | 50 atexit.register(warnings.warn, "get_connections() not supported on this plat
form", |
| 44 RuntimeWarning) | 51 RuntimeWarning) |
| 45 else: | 52 else: |
| 46 SUPPORT_CONNECTIONS = True | 53 SUPPORT_CONNECTIONS = True |
| 47 | 54 |
| 55 if PY3: |
| 56 long = int |
| 57 |
| 58 def callable(attr): |
| 59 return isinstance(attr, collections.Callable) |
| 60 |
| 48 | 61 |
| 49 _subprocesses_started = set() | 62 _subprocesses_started = set() |
| 50 | 63 |
| 51 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None): | 64 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None): |
| 52 """Return a subprocess.Popen object to use in tests. | 65 """Return a subprocess.Popen object to use in tests. |
| 53 By default stdout and stderr are redirected to /dev/null and the | 66 By default stdout and stderr are redirected to /dev/null and the |
| 54 python interpreter is used as test process. | 67 python interpreter is used as test process. |
| 55 """ | 68 """ |
| 56 if cmd is None: | 69 if cmd is None: |
| 57 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] | 70 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] |
| 58 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin) | 71 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin) |
| 59 _subprocesses_started.add(sproc.pid) | 72 _subprocesses_started.add(sproc.pid) |
| 60 return sproc | 73 return sproc |
| 61 | 74 |
| 75 def sh(cmdline): |
| 76 """run cmd in a subprocess and return its output. |
| 77 raises RuntimeError on error. |
| 78 """ |
| 79 p = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE, |
| 80 stderr=subprocess.PIPE) |
| 81 stdout, stderr = p.communicate() |
| 82 if p.returncode != 0: |
| 83 raise RuntimeError(stderr) |
| 84 if stderr: |
| 85 warnings.warn(stderr, RuntimeWarning) |
| 86 if sys.version_info >= (3,): |
| 87 stdout = str(stdout, sys.stdout.encoding) |
| 88 return stdout.strip() |
| 89 |
| 62 def wait_for_pid(pid, timeout=1): | 90 def wait_for_pid(pid, timeout=1): |
| 63 """Wait for pid to show up in the process list then return. | 91 """Wait for pid to show up in the process list then return. |
| 64 Used in the test suite to give time the sub process to initialize. | 92 Used in the test suite to give time the sub process to initialize. |
| 65 """ | 93 """ |
| 66 raise_at = time.time() + timeout | 94 raise_at = time.time() + timeout |
| 67 while 1: | 95 while 1: |
| 68 if pid in psutil.get_pid_list(): | 96 if pid in psutil.get_pid_list(): |
| 69 # give it one more iteration to allow full initialization | 97 # give it one more iteration to allow full initialization |
| 70 time.sleep(0.01) | 98 time.sleep(0.01) |
| 71 return | 99 return |
| 72 time.sleep(0.0001) | 100 time.sleep(0.0001) |
| 73 if time.time() >= raise_at: | 101 if time.time() >= raise_at: |
| 74 raise RuntimeError("Timed out") | 102 raise RuntimeError("Timed out") |
| 75 | 103 |
| 76 def kill(pid): | |
| 77 """Kill a process given its PID.""" | |
| 78 if hasattr(os, 'kill'): | |
| 79 os.kill(pid, signal.SIGKILL) | |
| 80 else: | |
| 81 psutil.Process(pid).kill() | |
| 82 | |
| 83 def reap_children(search_all=False): | 104 def reap_children(search_all=False): |
| 84 """Kill any subprocess started by this test suite and ensure that | 105 """Kill any subprocess started by this test suite and ensure that |
| 85 no zombies stick around to hog resources and create problems when | 106 no zombies stick around to hog resources and create problems when |
| 86 looking for refleaks. | 107 looking for refleaks. |
| 87 """ | 108 """ |
| 88 if POSIX: | |
| 89 def waitpid(process): | |
| 90 # on posix we are free to wait for any pending process by | |
| 91 # passing -1 to os.waitpid() | |
| 92 while True: | |
| 93 try: | |
| 94 any_process = -1 | |
| 95 pid, status = os.waitpid(any_process, os.WNOHANG) | |
| 96 if pid == 0 and not process.is_running(): | |
| 97 break | |
| 98 except OSError: | |
| 99 if not process.is_running(): | |
| 100 break | |
| 101 else: | |
| 102 def waitpid(process): | |
| 103 # on non-posix systems we just wait for the given process | |
| 104 # to go away | |
| 105 while process.is_running(): | |
| 106 time.sleep(0.01) | |
| 107 | |
| 108 if search_all: | 109 if search_all: |
| 109 this_process = psutil.Process(os.getpid()) | 110 this_process = psutil.Process(os.getpid()) |
| 110 pids = [x.pid for x in this_process.get_children()] | 111 pids = [x.pid for x in this_process.get_children()] |
| 111 else: | 112 else: |
| 112 pids =_subprocesses_started | 113 pids =_subprocesses_started |
| 113 while pids: | 114 while pids: |
| 114 pid = pids.pop() | 115 pid = pids.pop() |
| 115 try: | 116 try: |
| 116 child = psutil.Process(pid) | 117 child = psutil.Process(pid) |
| 117 child.kill() | 118 child.kill() |
| 118 except psutil.NoSuchProcess: | 119 except psutil.NoSuchProcess: |
| 119 pass | 120 pass |
| 120 else: | 121 else: |
| 121 waitpid(child) | 122 child.wait() |
| 123 |
| 122 | 124 |
| 123 # we want to search through all processes before exiting | 125 # we want to search through all processes before exiting |
| 124 atexit.register(reap_children, search_all=True) | 126 atexit.register(reap_children, search_all=True) |
| 125 | 127 |
| 126 def skipIf(condition, reason="", warn=False): | 128 def skipIf(condition, reason="", warn=False): |
| 127 """Decorator which skip a test under if condition is satisfied. | 129 """Decorator which skip a test under if condition is satisfied. |
| 128 This is a substitute of unittest.skipIf which is available | 130 This is a substitute of unittest.skipIf which is available |
| 129 only in python 2.7 and 3.2. | 131 only in python 2.7 and 3.2. |
| 130 If 'reason' argument is provided this will be printed during | 132 If 'reason' argument is provided this will be printed during |
| 131 tests execution. | 133 tests execution. |
| (...skipping 16 matching lines...) Expand all Loading... |
| 148 return fun(self, *args, **kwargs) | 150 return fun(self, *args, **kwargs) |
| 149 return inner | 151 return inner |
| 150 return outer | 152 return outer |
| 151 | 153 |
| 152 def skipUnless(condition, reason="", warn=False): | 154 def skipUnless(condition, reason="", warn=False): |
| 153 """Contrary of skipIf.""" | 155 """Contrary of skipIf.""" |
| 154 if not condition: | 156 if not condition: |
| 155 return skipIf(True, reason, warn) | 157 return skipIf(True, reason, warn) |
| 156 return skipIf(False) | 158 return skipIf(False) |
| 157 | 159 |
| 160 def ignore_access_denied(fun): |
| 161 """Decorator to Ignore AccessDenied exceptions.""" |
| 162 def outer(fun, *args, **kwargs): |
| 163 def inner(self): |
| 164 try: |
| 165 return fun(self, *args, **kwargs) |
| 166 except psutil.AccessDenied: |
| 167 pass |
| 168 return inner |
| 169 return outer |
| 170 |
| 171 def supports_ipv6(): |
| 172 """Return True if IPv6 is supported on this platform.""" |
| 173 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): |
| 174 return False |
| 175 sock = None |
| 176 try: |
| 177 try: |
| 178 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| 179 sock.bind(("::1", 0)) |
| 180 except (socket.error, socket.gaierror): |
| 181 return False |
| 182 else: |
| 183 return True |
| 184 finally: |
| 185 if sock is not None: |
| 186 sock.close() |
| 187 |
| 188 |
| 189 class ThreadTask(threading.Thread): |
| 190 """A thread object used for running process thread tests.""" |
| 191 |
| 192 def __init__(self): |
| 193 threading.Thread.__init__(self) |
| 194 self._running = False |
| 195 self._interval = None |
| 196 self._flag = threading.Event() |
| 197 |
| 198 def __repr__(self): |
| 199 name = self.__class__.__name__ |
| 200 return '<%s running=%s at %#x>' % (name, self._running, id(self)) |
| 201 |
| 202 def start(self, interval=0.001): |
| 203 """Start thread and keep it running until an explicit |
| 204 stop() request. Polls for shutdown every 'timeout' seconds. |
| 205 """ |
| 206 if self._running: |
| 207 raise ValueError("already started") |
| 208 self._interval = interval |
| 209 threading.Thread.start(self) |
| 210 self._flag.wait() |
| 211 |
| 212 def run(self): |
| 213 self._running = True |
| 214 self._flag.set() |
| 215 while self._running: |
| 216 time.sleep(self._interval) |
| 217 |
| 218 def stop(self): |
| 219 """Stop thread execution and and waits until it is stopped.""" |
| 220 if not self._running: |
| 221 raise ValueError("already stopped") |
| 222 self._running = False |
| 223 self.join() |
| 224 |
| 158 | 225 |
| 159 class TestCase(unittest.TestCase): | 226 class TestCase(unittest.TestCase): |
| 160 | 227 |
| 161 def tearDown(self): | 228 def tearDown(self): |
| 162 reap_children() | 229 reap_children() |
| 163 | 230 |
| 231 # ============================ |
| 232 # tests for system-related API |
| 233 # ============================ |
| 234 |
| 164 def test_get_process_list(self): | 235 def test_get_process_list(self): |
| 165 pids = [x.pid for x in psutil.get_process_list()] | 236 pids = [x.pid for x in psutil.get_process_list()] |
| 166 self.assertTrue(os.getpid() in pids) | 237 self.assertTrue(os.getpid() in pids) |
| 167 self.assertTrue(0 in pids) | |
| 168 | 238 |
| 169 def test_process_iter(self): | 239 def test_process_iter(self): |
| 170 pids = [x.pid for x in psutil.process_iter()] | 240 pids = [x.pid for x in psutil.process_iter()] |
| 171 self.assertTrue(os.getpid() in pids) | 241 self.assertTrue(os.getpid() in pids) |
| 172 self.assertTrue(0 in pids) | |
| 173 | |
| 174 def test_kill(self): | |
| 175 sproc = get_test_subprocess() | |
| 176 test_pid = sproc.pid | |
| 177 wait_for_pid(test_pid) | |
| 178 p = psutil.Process(test_pid) | |
| 179 name = p.name | |
| 180 p.kill() | |
| 181 sproc.wait() | |
| 182 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
| 183 | |
| 184 def test_terminate(self): | |
| 185 sproc = get_test_subprocess() | |
| 186 test_pid = sproc.pid | |
| 187 wait_for_pid(test_pid) | |
| 188 p = psutil.Process(test_pid) | |
| 189 name = p.name | |
| 190 p.terminate() | |
| 191 sproc.wait() | |
| 192 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
| 193 | |
| 194 def test_send_signal(self): | |
| 195 if POSIX: | |
| 196 sig = signal.SIGKILL | |
| 197 else: | |
| 198 sig = signal.SIGTERM | |
| 199 sproc = get_test_subprocess() | |
| 200 test_pid = sproc.pid | |
| 201 p = psutil.Process(test_pid) | |
| 202 name = p.name | |
| 203 p.send_signal(sig) | |
| 204 sproc.wait() | |
| 205 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
| 206 | 242 |
| 207 def test_TOTAL_PHYMEM(self): | 243 def test_TOTAL_PHYMEM(self): |
| 208 x = psutil.TOTAL_PHYMEM | 244 x = psutil.TOTAL_PHYMEM |
| 209 self.assertTrue(isinstance(x, (int, long))) | 245 self.assertTrue(isinstance(x, (int, long))) |
| 210 self.assertTrue(x > 0) | 246 self.assertTrue(x > 0) |
| 211 | 247 |
| 212 def test_used_phymem(self): | 248 def test_BOOT_TIME(self): |
| 213 x = psutil.used_phymem() | 249 x = psutil.BOOT_TIME |
| 214 self.assertTrue(isinstance(x, (int, long))) | 250 self.assertTrue(isinstance(x, float)) |
| 215 self.assertTrue(x > 0) | 251 self.assertTrue(x > 0) |
| 216 | 252 |
| 217 def test_avail_phymem(self): | 253 def test_deprecated_memory_functions(self): |
| 218 x = psutil.avail_phymem() | 254 warnings.filterwarnings("error") |
| 219 self.assertTrue(isinstance(x, (int, long))) | 255 try: |
| 220 self.assertTrue(x > 0) | 256 self.assertRaises(DeprecationWarning, psutil.used_phymem) |
| 257 self.assertRaises(DeprecationWarning, psutil.avail_phymem) |
| 258 self.assertRaises(DeprecationWarning, psutil.total_virtmem) |
| 259 self.assertRaises(DeprecationWarning, psutil.used_virtmem) |
| 260 self.assertRaises(DeprecationWarning, psutil.avail_virtmem) |
| 261 finally: |
| 262 warnings.resetwarnings() |
| 221 | 263 |
| 222 def test_total_virtmem(self): | 264 def test_phymem_usage(self): |
| 223 x = psutil.total_virtmem() | 265 mem = psutil.phymem_usage() |
| 224 self.assertTrue(isinstance(x, (int, long))) | 266 self.assertTrue(mem.total > 0) |
| 225 self.assertTrue(x >= 0) | 267 self.assertTrue(mem.used > 0) |
| 268 self.assertTrue(mem.free > 0) |
| 269 self.assertTrue(0 <= mem.percent <= 100) |
| 226 | 270 |
| 227 def test_used_virtmem(self): | 271 def test_virtmem_usage(self): |
| 228 x = psutil.used_virtmem() | 272 mem = psutil.virtmem_usage() |
| 229 self.assertTrue(isinstance(x, (int, long))) | 273 self.assertTrue(mem.total > 0) |
| 230 self.assertTrue(x >= 0) | 274 self.assertTrue(mem.used >= 0) |
| 231 | 275 self.assertTrue(mem.free > 0) |
| 232 def test_avail_virtmem(self): | 276 self.assertTrue(0 <= mem.percent <= 100) |
| 233 x = psutil.avail_virtmem() | |
| 234 self.assertTrue(isinstance(x, (int, long))) | |
| 235 self.assertTrue(x >= 0) | |
| 236 | |
| 237 @skipUnless(LINUX) | |
| 238 def test_cached_phymem(self): | |
| 239 x = psutil.cached_phymem() | |
| 240 self.assertTrue(isinstance(x, (int, long))) | |
| 241 self.assertTrue(x >= 0) | |
| 242 | 277 |
| 243 @skipUnless(LINUX) | 278 @skipUnless(LINUX) |
| 244 def test_phymem_buffers(self): | 279 def test_phymem_buffers(self): |
| 245 x = psutil.phymem_buffers() | 280 x = psutil.phymem_buffers() |
| 246 self.assertTrue(isinstance(x, (int, long))) | 281 self.assertTrue(isinstance(x, (int, long))) |
| 247 self.assertTrue(x >= 0) | 282 self.assertTrue(x >= 0) |
| 248 | 283 |
| 249 def test_system_cpu_times(self): | 284 def test_pid_exists(self): |
| 285 sproc = get_test_subprocess() |
| 286 wait_for_pid(sproc.pid) |
| 287 self.assertTrue(psutil.pid_exists(sproc.pid)) |
| 288 p = psutil.Process(sproc.pid) |
| 289 p.kill() |
| 290 p.wait() |
| 291 self.assertFalse(psutil.pid_exists(sproc.pid)) |
| 292 self.assertFalse(psutil.pid_exists(-1)) |
| 293 |
| 294 def test_pid_exists_2(self): |
| 295 reap_children() |
| 296 pids = psutil.get_pid_list() |
| 297 for pid in pids: |
| 298 try: |
| 299 self.assertTrue(psutil.pid_exists(pid)) |
| 300 except AssertionError: |
| 301 # in case the process disappeared in meantime fail only |
| 302 # if it is no longer in get_pid_list() |
| 303 time.sleep(.1) |
| 304 if pid in psutil.get_pid_list(): |
| 305 self.fail(pid) |
| 306 pids = range(max(pids) + 5000, max(pids) + 6000) |
| 307 for pid in pids: |
| 308 self.assertFalse(psutil.pid_exists(pid)) |
| 309 |
| 310 def test_get_pid_list(self): |
| 311 plist = [x.pid for x in psutil.get_process_list()] |
| 312 pidlist = psutil.get_pid_list() |
| 313 self.assertEqual(plist.sort(), pidlist.sort()) |
| 314 # make sure every pid is unique |
| 315 self.assertEqual(len(pidlist), len(set(pidlist))) |
| 316 |
| 317 def test_test(self): |
| 318 # test for psutil.test() function |
| 319 stdout = sys.stdout |
| 320 sys.stdout = DEVNULL |
| 321 try: |
| 322 psutil.test() |
| 323 finally: |
| 324 sys.stdout = stdout |
| 325 |
| 326 def test_sys_cpu_times(self): |
| 250 total = 0 | 327 total = 0 |
| 251 times = psutil.cpu_times() | 328 times = psutil.cpu_times() |
| 252 self.assertTrue(isinstance(times, psutil.CPUTimes)) | |
| 253 sum(times) | 329 sum(times) |
| 254 for cp_time in times: | 330 for cp_time in times: |
| 255 self.assertTrue(isinstance(cp_time, float)) | 331 self.assertTrue(isinstance(cp_time, float)) |
| 256 self.assertTrue(cp_time >= 0.0) | 332 self.assertTrue(cp_time >= 0.0) |
| 257 total += cp_time | 333 total += cp_time |
| 258 # test CPUTimes's __iter__ and __str__ implementation | |
| 259 self.assertEqual(total, sum(times)) | 334 self.assertEqual(total, sum(times)) |
| 260 str(times) | 335 str(times) |
| 261 | 336 |
| 262 def test_system_cpu_times2(self): | 337 def test_sys_cpu_times2(self): |
| 263 t1 = sum(psutil.cpu_times()) | 338 t1 = sum(psutil.cpu_times()) |
| 264 time.sleep(0.1) | 339 time.sleep(0.1) |
| 265 t2 = sum(psutil.cpu_times()) | 340 t2 = sum(psutil.cpu_times()) |
| 266 difference = t2 - t1 | 341 difference = t2 - t1 |
| 267 if not difference >= 0.05: | 342 if not difference >= 0.05: |
| 268 self.fail("difference %s" % difference) | 343 self.fail("difference %s" % difference) |
| 269 | 344 |
| 270 def test_system_cpu_percent(self): | 345 def test_sys_per_cpu_times(self): |
| 346 for times in psutil.cpu_times(percpu=True): |
| 347 total = 0 |
| 348 sum(times) |
| 349 for cp_time in times: |
| 350 self.assertTrue(isinstance(cp_time, float)) |
| 351 self.assertTrue(cp_time >= 0.0) |
| 352 total += cp_time |
| 353 self.assertEqual(total, sum(times)) |
| 354 str(times) |
| 355 |
| 356 def test_sys_per_cpu_times2(self): |
| 357 tot1 = psutil.cpu_times(percpu=True) |
| 358 stop_at = time.time() + 0.1 |
| 359 while 1: |
| 360 if time.time() >= stop_at: |
| 361 break |
| 362 tot2 = psutil.cpu_times(percpu=True) |
| 363 for t1, t2 in zip(tot1, tot2): |
| 364 t1, t2 = sum(t1), sum(t2) |
| 365 difference = t2 - t1 |
| 366 if difference >= 0.05: |
| 367 return |
| 368 self.fail() |
| 369 |
| 370 def test_sys_cpu_percent(self): |
| 271 psutil.cpu_percent(interval=0.001) | 371 psutil.cpu_percent(interval=0.001) |
| 272 psutil.cpu_percent(interval=0.001) | 372 psutil.cpu_percent(interval=0.001) |
| 273 for x in xrange(1000): | 373 for x in range(1000): |
| 274 percent = psutil.cpu_percent(interval=None) | 374 percent = psutil.cpu_percent(interval=None) |
| 275 self.assertTrue(isinstance(percent, float)) | 375 self.assertTrue(isinstance(percent, float)) |
| 276 self.assertTrue(percent >= 0.0) | 376 self.assertTrue(percent >= 0.0) |
| 277 self.assertTrue(percent <= 100.0) | 377 self.assertTrue(percent <= 100.0) |
| 278 | 378 |
| 279 def test_process_cpu_percent(self): | 379 def test_sys_per_cpu_percent(self): |
| 380 psutil.cpu_percent(interval=0.001, percpu=True) |
| 381 psutil.cpu_percent(interval=0.001, percpu=True) |
| 382 for x in range(1000): |
| 383 percents = psutil.cpu_percent(interval=None, percpu=True) |
| 384 for percent in percents: |
| 385 self.assertTrue(isinstance(percent, float)) |
| 386 self.assertTrue(percent >= 0.0) |
| 387 self.assertTrue(percent <= 100.0) |
| 388 |
| 389 def test_sys_cpu_percent_compare(self): |
| 390 psutil.cpu_percent(interval=0) |
| 391 psutil.cpu_percent(interval=0, percpu=True) |
| 392 time.sleep(.1) |
| 393 t1 = psutil.cpu_percent(interval=0) |
| 394 t2 = psutil.cpu_percent(interval=0, percpu=True) |
| 395 # calculate total average |
| 396 t2 = sum(t2) / len(t2) |
| 397 if abs(t1 - t2) > 5: |
| 398 self.assertEqual(t1, t2) |
| 399 |
| 400 def test_disk_usage(self): |
| 401 usage = psutil.disk_usage(os.getcwd()) |
| 402 self.assertTrue(usage.total > 0) |
| 403 self.assertTrue(usage.used > 0) |
| 404 self.assertTrue(usage.free > 0) |
| 405 self.assertTrue(usage.total > usage.used) |
| 406 self.assertTrue(usage.total > usage.free) |
| 407 self.assertTrue(0 <= usage.percent <= 100) |
| 408 |
| 409 # if path does not exist OSError ENOENT is expected across |
| 410 # all platforms |
| 411 fname = tempfile.mktemp() |
| 412 try: |
| 413 psutil.disk_usage(fname) |
| 414 except OSError: |
| 415 err = sys.exc_info()[1] |
| 416 if err.args[0] != errno.ENOENT: |
| 417 raise |
| 418 else: |
| 419 self.fail("OSError not raised") |
| 420 |
| 421 def test_disk_partitions(self): |
| 422 for disk in psutil.disk_partitions(all=False): |
| 423 self.assertTrue(os.path.exists(disk.device)) |
| 424 self.assertTrue(os.path.isdir(disk.mountpoint)) |
| 425 self.assertTrue(disk.fstype) |
| 426 for disk in psutil.disk_partitions(all=True): |
| 427 if not WINDOWS: |
| 428 self.assertTrue(os.path.isdir(disk.mountpoint)) |
| 429 self.assertTrue(disk.fstype) |
| 430 |
| 431 def find_mount_point(path): |
| 432 path = os.path.abspath(path) |
| 433 while not os.path.ismount(path): |
| 434 path = os.path.dirname(path) |
| 435 return path |
| 436 |
| 437 mount = find_mount_point(__file__) |
| 438 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] |
| 439 self.assertTrue(mount in mounts) |
| 440 psutil.disk_usage(mount) |
| 441 |
| 442 # XXX |
| 443 @skipUnless(hasattr(psutil, "network_io_counters")) |
| 444 def test_anetwork_io_counters(self): |
| 445 def check_ntuple(nt): |
| 446 self.assertEqual(nt[0], nt.bytes_sent) |
| 447 self.assertEqual(nt[1], nt.bytes_recv) |
| 448 self.assertEqual(nt[2], nt.packets_sent) |
| 449 self.assertEqual(nt[3], nt.packets_recv) |
| 450 self.assertTrue(nt.bytes_sent >= 0) |
| 451 self.assertTrue(nt.bytes_recv >= 0) |
| 452 self.assertTrue(nt.packets_sent >= 0) |
| 453 self.assertTrue(nt.packets_recv >= 0) |
| 454 |
| 455 ret = psutil.network_io_counters(pernic=False) |
| 456 check_ntuple(ret) |
| 457 ret = psutil.network_io_counters(pernic=True) |
| 458 for name, ntuple in ret.iteritems(): |
| 459 self.assertTrue(name) |
| 460 check_ntuple(ntuple) |
| 461 # XXX |
| 462 @skipUnless(hasattr(psutil, "disk_io_counters")) |
| 463 def test_disk_io_counters(self): |
| 464 def check_ntuple(nt): |
| 465 self.assertEqual(nt[0], nt.read_count) |
| 466 self.assertEqual(nt[1], nt.write_count) |
| 467 self.assertEqual(nt[2], nt.read_bytes) |
| 468 self.assertEqual(nt[3], nt.write_bytes) |
| 469 self.assertEqual(nt[4], nt.read_time) |
| 470 self.assertEqual(nt[5], nt.write_time) |
| 471 self.assertTrue(nt.read_count >= 0) |
| 472 self.assertTrue(nt.write_count >= 0) |
| 473 self.assertTrue(nt.read_bytes >= 0) |
| 474 self.assertTrue(nt.write_bytes >= 0) |
| 475 self.assertTrue(nt.read_time >= 0) |
| 476 self.assertTrue(nt.write_time >= 0) |
| 477 |
| 478 ret = psutil.disk_io_counters(perdisk=False) |
| 479 check_ntuple(ret) |
| 480 ret = psutil.disk_io_counters(perdisk=True) |
| 481 for name, ntuple in ret.iteritems(): |
| 482 self.assertTrue(name) |
| 483 check_ntuple(ntuple) |
| 484 |
| 485 # ==================== |
| 486 # Process object tests |
| 487 # ==================== |
| 488 |
| 489 def test_kill(self): |
| 490 sproc = get_test_subprocess() |
| 491 test_pid = sproc.pid |
| 492 wait_for_pid(test_pid) |
| 493 p = psutil.Process(test_pid) |
| 494 name = p.name |
| 495 p.kill() |
| 496 p.wait() |
| 497 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| 498 |
| 499 def test_terminate(self): |
| 500 sproc = get_test_subprocess() |
| 501 test_pid = sproc.pid |
| 502 wait_for_pid(test_pid) |
| 503 p = psutil.Process(test_pid) |
| 504 name = p.name |
| 505 p.terminate() |
| 506 p.wait() |
| 507 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| 508 |
| 509 def test_send_signal(self): |
| 510 if POSIX: |
| 511 sig = signal.SIGKILL |
| 512 else: |
| 513 sig = signal.SIGTERM |
| 514 sproc = get_test_subprocess() |
| 515 test_pid = sproc.pid |
| 516 p = psutil.Process(test_pid) |
| 517 name = p.name |
| 518 p.send_signal(sig) |
| 519 p.wait() |
| 520 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| 521 |
| 522 def test_wait(self): |
| 523 # check exit code signal |
| 524 sproc = get_test_subprocess() |
| 525 p = psutil.Process(sproc.pid) |
| 526 p.kill() |
| 527 code = p.wait() |
| 528 if os.name == 'posix': |
| 529 self.assertEqual(code, signal.SIGKILL) |
| 530 else: |
| 531 self.assertEqual(code, 0) |
| 532 self.assertFalse(p.is_running()) |
| 533 |
| 534 sproc = get_test_subprocess() |
| 535 p = psutil.Process(sproc.pid) |
| 536 p.terminate() |
| 537 code = p.wait() |
| 538 if os.name == 'posix': |
| 539 self.assertEqual(code, signal.SIGTERM) |
| 540 else: |
| 541 self.assertEqual(code, 0) |
| 542 self.assertFalse(p.is_running()) |
| 543 |
| 544 # check sys.exit() code |
| 545 code = "import time, sys; time.sleep(0.01); sys.exit(5);" |
| 546 sproc = get_test_subprocess([PYTHON, "-c", code]) |
| 547 p = psutil.Process(sproc.pid) |
| 548 self.assertEqual(p.wait(), 5) |
| 549 self.assertFalse(p.is_running()) |
| 550 |
| 551 # Test wait() issued twice. |
| 552 # It is not supposed to raise NSP when the process is gone. |
| 553 # On UNIX this should return None, on Windows it should keep |
| 554 # returning the exit code. |
| 555 sproc = get_test_subprocess([PYTHON, "-c", code]) |
| 556 p = psutil.Process(sproc.pid) |
| 557 self.assertEqual(p.wait(), 5) |
| 558 self.assertTrue(p.wait() in (5, None)) |
| 559 |
| 560 # test timeout |
| 561 sproc = get_test_subprocess() |
| 562 p = psutil.Process(sproc.pid) |
| 563 p.name |
| 564 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) |
| 565 |
| 566 # timeout < 0 not allowed |
| 567 self.assertRaises(ValueError, p.wait, -1) |
| 568 |
| 569 @skipUnless(POSIX) |
| 570 def test_wait_non_children(self): |
| 571 # test wait() against processes which are not our children |
| 572 code = "import sys;" |
| 573 code += "from subprocess import Popen, PIPE;" |
| 574 code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON |
| 575 code += "sp = Popen(cmd, stdout=PIPE);" |
| 576 code += "sys.stdout.write(str(sp.pid));" |
| 577 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE
) |
| 578 |
| 579 grandson_pid = int(sproc.stdout.read()) |
| 580 grandson_proc = psutil.Process(grandson_pid) |
| 581 try: |
| 582 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) |
| 583 grandson_proc.kill() |
| 584 ret = grandson_proc.wait() |
| 585 self.assertEqual(ret, None) |
| 586 finally: |
| 587 if grandson_proc.is_running(): |
| 588 grandson_proc.kill() |
| 589 grandson_proc.wait() |
| 590 |
| 591 def test_wait_timeout_0(self): |
| 592 sproc = get_test_subprocess() |
| 593 p = psutil.Process(sproc.pid) |
| 594 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) |
| 595 p.kill() |
| 596 stop_at = time.time() + 2 |
| 597 while 1: |
| 598 try: |
| 599 code = p.wait(0) |
| 600 except psutil.TimeoutExpired: |
| 601 if time.time() >= stop_at: |
| 602 raise |
| 603 else: |
| 604 break |
| 605 if os.name == 'posix': |
| 606 self.assertEqual(code, signal.SIGKILL) |
| 607 else: |
| 608 self.assertEqual(code, 0) |
| 609 self.assertFalse(p.is_running()) |
| 610 |
| 611 def test_cpu_percent(self): |
| 280 p = psutil.Process(os.getpid()) | 612 p = psutil.Process(os.getpid()) |
| 281 p.get_cpu_percent(interval=0.001) | 613 p.get_cpu_percent(interval=0.001) |
| 282 p.get_cpu_percent(interval=0.001) | 614 p.get_cpu_percent(interval=0.001) |
| 283 for x in xrange(100): | 615 for x in range(100): |
| 284 percent = p.get_cpu_percent(interval=None) | 616 percent = p.get_cpu_percent(interval=None) |
| 285 self.assertTrue(isinstance(percent, float)) | 617 self.assertTrue(isinstance(percent, float)) |
| 286 self.assertTrue(percent >= 0.0) | 618 self.assertTrue(percent >= 0.0) |
| 287 self.assertTrue(percent <= 100.0) | 619 self.assertTrue(percent <= 100.0) |
| 288 | 620 |
| 289 def test_get_process_cpu_times(self): | 621 def test_cpu_times(self): |
| 290 times = psutil.Process(os.getpid()).get_cpu_times() | 622 times = psutil.Process(os.getpid()).get_cpu_times() |
| 291 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) | 623 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) |
| 292 # make sure returned values can be pretty printed with strftime | 624 # make sure returned values can be pretty printed with strftime |
| 293 time.strftime("%H:%M:%S", time.localtime(times.user)) | 625 time.strftime("%H:%M:%S", time.localtime(times.user)) |
| 294 time.strftime("%H:%M:%S", time.localtime(times.system)) | 626 time.strftime("%H:%M:%S", time.localtime(times.system)) |
| 295 | 627 |
| 296 # Test Process.cpu_times() against os.times() | 628 # Test Process.cpu_times() against os.times() |
| 297 # os.times() is broken on Python 2.6 | 629 # os.times() is broken on Python 2.6 |
| 298 # http://bugs.python.org/issue1040026 | 630 # http://bugs.python.org/issue1040026 |
| 299 # XXX fails on OSX: not sure if it's for os.times(). We should | 631 # XXX fails on OSX: not sure if it's for os.times(). We should |
| 300 # try this with Python 2.7 and re-enable the test. | 632 # try this with Python 2.7 and re-enable the test. |
| 301 | 633 |
| 302 @skipUnless(sys.version_info > (2, 6, 1) and not OSX) | 634 @skipUnless(sys.version_info > (2, 6, 1) and not OSX) |
| 303 def test_get_process_cpu_times2(self): | 635 def test_cpu_times2(self): |
| 304 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() | 636 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() |
| 305 utime, ktime = os.times()[:2] | 637 utime, ktime = os.times()[:2] |
| 306 | 638 |
| 307 # Use os.times()[:2] as base values to compare our results | 639 # Use os.times()[:2] as base values to compare our results |
| 308 # using a tolerance of +/- 0.1 seconds. | 640 # using a tolerance of +/- 0.1 seconds. |
| 309 # It will fail if the difference between the values is > 0.1s. | 641 # It will fail if the difference between the values is > 0.1s. |
| 310 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: | 642 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: |
| 311 self.fail("expected: %s, found: %s" %(utime, user_time)) | 643 self.fail("expected: %s, found: %s" %(utime, user_time)) |
| 312 | 644 |
| 313 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: | 645 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: |
| (...skipping 10 matching lines...) Expand all Loading... |
| 324 # tolerance of +/- 1 second. | 656 # tolerance of +/- 1 second. |
| 325 # It will fail if the difference between the values is > 2s. | 657 # It will fail if the difference between the values is > 2s. |
| 326 difference = abs(create_time - now) | 658 difference = abs(create_time - now) |
| 327 if difference > 2: | 659 if difference > 2: |
| 328 self.fail("expected: %s, found: %s, difference: %s" | 660 self.fail("expected: %s, found: %s, difference: %s" |
| 329 % (now, create_time, difference)) | 661 % (now, create_time, difference)) |
| 330 | 662 |
| 331 # make sure returned value can be pretty printed with strftime | 663 # make sure returned value can be pretty printed with strftime |
| 332 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) | 664 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) |
| 333 | 665 |
| 666 @skipIf(WINDOWS) |
| 667 def test_terminal(self): |
| 668 tty = sh('tty') |
| 669 p = psutil.Process(os.getpid()) |
| 670 self.assertEqual(p.terminal, tty) |
| 671 |
| 672 @skipIf(OSX, warn=False) |
| 673 def test_get_io_counters(self): |
| 674 p = psutil.Process(os.getpid()) |
| 675 # test reads |
| 676 io1 = p.get_io_counters() |
| 677 f = open(PYTHON, 'rb') |
| 678 f.read() |
| 679 f.close() |
| 680 io2 = p.get_io_counters() |
| 681 if not BSD: |
| 682 self.assertTrue(io2.read_count > io1.read_count) |
| 683 self.assertTrue(io2.write_count == io1.write_count) |
| 684 self.assertTrue(io2.read_bytes >= io1.read_bytes) |
| 685 self.assertTrue(io2.write_bytes >= io1.write_bytes) |
| 686 # test writes |
| 687 io1 = p.get_io_counters() |
| 688 f = tempfile.TemporaryFile() |
| 689 if sys.version_info >= (3,): |
| 690 f.write(bytes("x" * 1000000, 'ascii')) |
| 691 else: |
| 692 f.write("x" * 1000000) |
| 693 f.close() |
| 694 io2 = p.get_io_counters() |
| 695 if not BSD: |
| 696 self.assertTrue(io2.write_count > io1.write_count) |
| 697 self.assertTrue(io2.write_bytes > io1.write_bytes) |
| 698 self.assertTrue(io2.read_count >= io1.read_count) |
| 699 self.assertTrue(io2.read_bytes >= io1.read_bytes) |
| 700 |
| 701 @skipUnless(LINUX) |
| 702 def test_get_set_ionice(self): |
| 703 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE, |
| 704 IOPRIO_CLASS_IDLE) |
| 705 self.assertEqual(IOPRIO_CLASS_NONE, 0) |
| 706 self.assertEqual(IOPRIO_CLASS_RT, 1) |
| 707 self.assertEqual(IOPRIO_CLASS_BE, 2) |
| 708 self.assertEqual(IOPRIO_CLASS_IDLE, 3) |
| 709 p = psutil.Process(os.getpid()) |
| 710 try: |
| 711 p.set_ionice(2) |
| 712 ioclass, value = p.get_ionice() |
| 713 self.assertEqual(ioclass, 2) |
| 714 self.assertEqual(value, 4) |
| 715 # |
| 716 p.set_ionice(3) |
| 717 ioclass, value = p.get_ionice() |
| 718 self.assertEqual(ioclass, 3) |
| 719 self.assertEqual(value, 0) |
| 720 # |
| 721 p.set_ionice(2, 0) |
| 722 ioclass, value = p.get_ionice() |
| 723 self.assertEqual(ioclass, 2) |
| 724 self.assertEqual(value, 0) |
| 725 p.set_ionice(2, 7) |
| 726 ioclass, value = p.get_ionice() |
| 727 self.assertEqual(ioclass, 2) |
| 728 self.assertEqual(value, 7) |
| 729 self.assertRaises(ValueError, p.set_ionice, 2, 10) |
| 730 finally: |
| 731 p.set_ionice(IOPRIO_CLASS_NONE) |
| 732 |
| 334 def test_get_num_threads(self): | 733 def test_get_num_threads(self): |
| 734 # on certain platforms such as Linux we might test for exact |
| 735 # thread number, since we always have with 1 thread per process, |
| 736 # but this does not apply across all platforms (OSX, Windows) |
| 335 p = psutil.Process(os.getpid()) | 737 p = psutil.Process(os.getpid()) |
| 336 numt1 = p.get_num_threads() | 738 step1 = p.get_num_threads() |
| 337 if not WINDOWS and not OSX: | 739 |
| 338 # test is unreliable on Windows and OS X | 740 thread = ThreadTask() |
| 339 # NOTE: sleep(1) is too long for OS X, works with sleep(.5) | 741 thread.start() |
| 340 self.assertEqual(numt1, 1) | 742 try: |
| 341 t = threading.Thread(target=lambda:time.sleep(1)) | 743 step2 = p.get_num_threads() |
| 342 t.start() | 744 self.assertEqual(step2, step1 + 1) |
| 343 numt2 = p.get_num_threads() | 745 thread.stop() |
| 344 if WINDOWS: | 746 finally: |
| 345 self.assertTrue(numt2 > numt1) | 747 if thread._running: |
| 346 else: | 748 thread.stop() |
| 347 self.assertEqual(numt2, 2) | 749 |
| 750 def test_get_threads(self): |
| 751 p = psutil.Process(os.getpid()) |
| 752 step1 = p.get_threads() |
| 753 |
| 754 thread = ThreadTask() |
| 755 thread.start() |
| 756 |
| 757 try: |
| 758 step2 = p.get_threads() |
| 759 self.assertEqual(len(step2), len(step1) + 1) |
| 760 # on Linux, first thread id is supposed to be this process |
| 761 if LINUX: |
| 762 self.assertEqual(step2[0].id, os.getpid()) |
| 763 athread = step2[0] |
| 764 # test named tuple |
| 765 self.assertEqual(athread.id, athread[0]) |
| 766 self.assertEqual(athread.user_time, athread[1]) |
| 767 self.assertEqual(athread.system_time, athread[2]) |
| 768 # test num threads |
| 769 thread.stop() |
| 770 finally: |
| 771 if thread._running: |
| 772 thread.stop() |
| 348 | 773 |
| 349 def test_get_memory_info(self): | 774 def test_get_memory_info(self): |
| 350 p = psutil.Process(os.getpid()) | 775 p = psutil.Process(os.getpid()) |
| 351 | 776 |
| 352 # step 1 - get a base value to compare our results | 777 # step 1 - get a base value to compare our results |
| 353 rss1, vms1 = p.get_memory_info() | 778 rss1, vms1 = p.get_memory_info() |
| 354 percent1 = p.get_memory_percent() | 779 percent1 = p.get_memory_percent() |
| 355 self.assertTrue(rss1 > 0) | 780 self.assertTrue(rss1 > 0) |
| 356 self.assertTrue(vms1 > 0) | 781 self.assertTrue(vms1 > 0) |
| 357 | 782 |
| 358 # step 2 - allocate some memory | 783 # step 2 - allocate some memory |
| 359 memarr = [None] * 1500000 | 784 memarr = [None] * 1500000 |
| 360 | 785 |
| 361 rss2, vms2 = p.get_memory_info() | 786 rss2, vms2 = p.get_memory_info() |
| 362 percent2 = p.get_memory_percent() | 787 percent2 = p.get_memory_percent() |
| 363 # make sure that the memory usage bumped up | 788 # make sure that the memory usage bumped up |
| 364 self.assertTrue(rss2 > rss1) | 789 self.assertTrue(rss2 > rss1) |
| 365 self.assertTrue(vms2 >= vms1) # vms might be equal | 790 self.assertTrue(vms2 >= vms1) # vms might be equal |
| 366 self.assertTrue(percent2 > percent1) | 791 self.assertTrue(percent2 > percent1) |
| 367 del memarr | 792 del memarr |
| 368 | 793 |
| 369 def test_get_memory_percent(self): | 794 def test_get_memory_percent(self): |
| 370 p = psutil.Process(os.getpid()) | 795 p = psutil.Process(os.getpid()) |
| 371 self.assertTrue(p.get_memory_percent() > 0.0) | 796 self.assertTrue(p.get_memory_percent() > 0.0) |
| 372 | 797 |
| 373 def test_pid(self): | 798 def test_pid(self): |
| 374 sproc = get_test_subprocess() | 799 sproc = get_test_subprocess() |
| 375 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) | 800 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) |
| 376 | 801 |
| 377 def test_eq(self): | |
| 378 sproc = get_test_subprocess() | |
| 379 wait_for_pid(sproc.pid) | |
| 380 self.assertTrue(psutil.Process(sproc.pid) == psutil.Process(sproc.pid)) | |
| 381 | |
| 382 def test_is_running(self): | 802 def test_is_running(self): |
| 383 sproc = get_test_subprocess() | 803 sproc = get_test_subprocess() |
| 384 wait_for_pid(sproc.pid) | 804 wait_for_pid(sproc.pid) |
| 385 p = psutil.Process(sproc.pid) | 805 p = psutil.Process(sproc.pid) |
| 386 self.assertTrue(p.is_running()) | 806 self.assertTrue(p.is_running()) |
| 387 psutil.Process(sproc.pid).kill() | 807 p.kill() |
| 388 sproc.wait() | 808 p.wait() |
| 389 self.assertFalse(p.is_running()) | 809 self.assertFalse(p.is_running()) |
| 390 | 810 |
| 391 def test_pid_exists(self): | |
| 392 sproc = get_test_subprocess() | |
| 393 wait_for_pid(sproc.pid) | |
| 394 self.assertTrue(psutil.pid_exists(sproc.pid)) | |
| 395 psutil.Process(sproc.pid).kill() | |
| 396 sproc.wait() | |
| 397 self.assertFalse(psutil.pid_exists(sproc.pid)) | |
| 398 self.assertFalse(psutil.pid_exists(-1)) | |
| 399 | |
| 400 def test_exe(self): | 811 def test_exe(self): |
| 401 sproc = get_test_subprocess() | 812 sproc = get_test_subprocess() |
| 402 wait_for_pid(sproc.pid) | 813 wait_for_pid(sproc.pid) |
| 403 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON) | 814 try: |
| 815 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON) |
| 816 except AssertionError: |
| 817 # certain platforms such as BSD are more accurate returning: |
| 818 # "/usr/local/bin/python2.7" |
| 819 # ...instead of: |
| 820 # "/usr/local/bin/python" |
| 821 # We do not want to consider this difference in accuracy |
| 822 # an error. |
| 823 name = psutil.Process(sproc.pid).exe |
| 824 adjusted_name = PYTHON[:len(name)] |
| 825 self.assertEqual(name, adjusted_name) |
| 404 for p in psutil.process_iter(): | 826 for p in psutil.process_iter(): |
| 405 try: | 827 try: |
| 406 exe = p.exe | 828 exe = p.exe |
| 407 except psutil.Error: | 829 except psutil.Error: |
| 408 continue | 830 continue |
| 409 if not exe: | 831 if not exe: |
| 410 continue | 832 continue |
| 411 if not os.path.exists(exe): | 833 if not os.path.exists(exe): |
| 412 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \ | 834 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \ |
| 413 % (repr(exe), p.pid, p.name, p.cmdline)) | 835 % (repr(exe), p.pid, p.name, p.cmdline)) |
| 414 if hasattr(os, 'access') and hasattr(os, "X_OK"): | 836 if hasattr(os, 'access') and hasattr(os, "X_OK"): |
| 415 if not os.access(p.exe, os.X_OK): | 837 if not os.access(p.exe, os.X_OK): |
| 416 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s
)" \ | 838 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s
)" \ |
| 417 % (repr(p.exe), p.pid, p.name, p.cmdline)) | 839 % (repr(p.exe), p.pid, p.name, p.cmdline)) |
| 418 | 840 |
| 419 def test_path(self): | |
| 420 proc = psutil.Process(os.getpid()) | |
| 421 warnings.filterwarnings("error") | |
| 422 try: | |
| 423 self.assertRaises(DeprecationWarning, getattr, proc, 'path') | |
| 424 finally: | |
| 425 warnings.resetwarnings() | |
| 426 | |
| 427 def test_cmdline(self): | 841 def test_cmdline(self): |
| 428 sproc = get_test_subprocess([PYTHON, "-E"]) | 842 sproc = get_test_subprocess([PYTHON, "-E"]) |
| 429 wait_for_pid(sproc.pid) | 843 wait_for_pid(sproc.pid) |
| 430 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"]) | 844 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"]) |
| 431 | 845 |
| 432 def test_name(self): | 846 def test_name(self): |
| 433 sproc = get_test_subprocess(PYTHON) | 847 sproc = get_test_subprocess(PYTHON) |
| 434 wait_for_pid(sproc.pid) | 848 wait_for_pid(sproc.pid) |
| 435 if OSX: | 849 self.assertEqual(psutil.Process(sproc.pid).name.lower(), |
| 436 self.assertEqual(psutil.Process(sproc.pid).name, "Python") | 850 os.path.basename(sys.executable).lower()) |
| 437 else: | |
| 438 self.assertEqual(psutil.Process(sproc.pid).name, os.path.basename(PY
THON)) | |
| 439 | 851 |
| 440 def test_uid(self): | 852 if os.name == 'posix': |
| 441 sproc = get_test_subprocess() | |
| 442 wait_for_pid(sproc.pid) | |
| 443 uid = psutil.Process(sproc.pid).uid | |
| 444 if hasattr(os, 'getuid'): | |
| 445 self.assertEqual(uid, os.getuid()) | |
| 446 else: | |
| 447 # On those platforms where UID doesn't make sense (Windows) | |
| 448 # we expect it to be -1 | |
| 449 self.assertEqual(uid, -1) | |
| 450 | 853 |
| 451 def test_gid(self): | 854 def test_uids(self): |
| 452 sproc = get_test_subprocess() | 855 p = psutil.Process(os.getpid()) |
| 453 wait_for_pid(sproc.pid) | 856 real, effective, saved = p.uids |
| 454 gid = psutil.Process(sproc.pid).gid | 857 # os.getuid() refers to "real" uid |
| 455 if hasattr(os, 'getgid'): | 858 self.assertEqual(real, os.getuid()) |
| 456 self.assertEqual(gid, os.getgid()) | 859 # os.geteuid() refers to "effective" uid |
| 457 else: | 860 self.assertEqual(effective, os.geteuid()) |
| 458 # On those platforms where GID doesn't make sense (Windows) | 861 # no such thing as os.getsuid() ("saved" uid), but starting |
| 459 # we expect it to be -1 | 862 # from python 2.7 we have os.getresuid()[2] |
| 460 self.assertEqual(gid, -1) | 863 if hasattr(os, "getresuid"): |
| 864 self.assertEqual(saved, os.getresuid()[2]) |
| 865 |
| 866 def test_gids(self): |
| 867 p = psutil.Process(os.getpid()) |
| 868 real, effective, saved = p.gids |
| 869 # os.getuid() refers to "real" uid |
| 870 self.assertEqual(real, os.getgid()) |
| 871 # os.geteuid() refers to "effective" uid |
| 872 self.assertEqual(effective, os.getegid()) |
| 873 # no such thing as os.getsuid() ("saved" uid), but starting |
| 874 # from python 2.7 we have os.getresgid()[2] |
| 875 if hasattr(os, "getresuid"): |
| 876 self.assertEqual(saved, os.getresgid()[2]) |
| 877 |
| 878 def test_nice(self): |
| 879 p = psutil.Process(os.getpid()) |
| 880 self.assertRaises(TypeError, setattr, p, "nice", "str") |
| 881 try: |
| 882 try: |
| 883 first_nice = p.nice |
| 884 p.nice = 1 |
| 885 self.assertEqual(p.nice, 1) |
| 886 # going back to previous nice value raises AccessDenied on O
SX |
| 887 if not OSX: |
| 888 p.nice = 0 |
| 889 self.assertEqual(p.nice, 0) |
| 890 except psutil.AccessDenied: |
| 891 pass |
| 892 finally: |
| 893 # going back to previous nice value raises AccessDenied on OSX |
| 894 if not OSX: |
| 895 p.nice = first_nice |
| 896 |
| 897 if os.name == 'nt': |
| 898 |
| 899 def test_nice(self): |
| 900 p = psutil.Process(os.getpid()) |
| 901 self.assertRaises(TypeError, setattr, p, "nice", "str") |
| 902 try: |
| 903 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) |
| 904 p.nice = psutil.HIGH_PRIORITY_CLASS |
| 905 self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS) |
| 906 p.nice = psutil.NORMAL_PRIORITY_CLASS |
| 907 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) |
| 908 finally: |
| 909 p.nice = psutil.NORMAL_PRIORITY_CLASS |
| 910 |
| 911 def test_status(self): |
| 912 p = psutil.Process(os.getpid()) |
| 913 self.assertEqual(p.status, psutil.STATUS_RUNNING) |
| 914 self.assertEqual(str(p.status), "running") |
| 915 for p in psutil.process_iter(): |
| 916 if str(p.status) == '?': |
| 917 self.fail("invalid status for pid %d" % p.pid) |
| 461 | 918 |
| 462 def test_username(self): | 919 def test_username(self): |
| 463 sproc = get_test_subprocess() | 920 sproc = get_test_subprocess() |
| 464 p = psutil.Process(sproc.pid) | 921 p = psutil.Process(sproc.pid) |
| 465 if POSIX: | 922 if POSIX: |
| 466 import pwd | 923 import pwd |
| 467 user = pwd.getpwuid(p.uid).pw_name | 924 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) |
| 468 self.assertEqual(p.username, user) | |
| 469 elif WINDOWS: | 925 elif WINDOWS: |
| 470 expected_username = os.environ['USERNAME'] | 926 expected_username = os.environ['USERNAME'] |
| 471 expected_domain = os.environ['USERDOMAIN'] | 927 expected_domain = os.environ['USERDOMAIN'] |
| 472 domain, username = p.username.split('\\') | 928 domain, username = p.username.split('\\') |
| 473 self.assertEqual(domain, expected_domain) | 929 self.assertEqual(domain, expected_domain) |
| 474 self.assertEqual(username, expected_username) | 930 self.assertEqual(username, expected_username) |
| 475 else: | 931 else: |
| 476 p.username | 932 p.username |
| 477 | 933 |
| 478 @skipUnless(WINDOWS or LINUX) | 934 @skipUnless(WINDOWS or LINUX) |
| 479 def test_getcwd(self): | 935 def test_getcwd(self): |
| 480 sproc = get_test_subprocess() | 936 sproc = get_test_subprocess() |
| 481 wait_for_pid(sproc.pid) | 937 wait_for_pid(sproc.pid) |
| 482 p = psutil.Process(sproc.pid) | 938 p = psutil.Process(sproc.pid) |
| 483 self.assertEqual(p.getcwd(), os.getcwd()) | 939 self.assertEqual(p.getcwd(), os.getcwd()) |
| 484 | 940 |
| 485 @skipUnless(WINDOWS or LINUX) | 941 @skipUnless(WINDOWS or LINUX) |
| 486 def test_getcwd_2(self): | 942 def test_getcwd_2(self): |
| 487 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] | 943 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] |
| 488 sproc = get_test_subprocess(cmd) | 944 sproc = get_test_subprocess(cmd) |
| 489 wait_for_pid(sproc.pid) | 945 wait_for_pid(sproc.pid) |
| 490 p = psutil.Process(sproc.pid) | 946 p = psutil.Process(sproc.pid) |
| 491 time.sleep(0.1) | 947 time.sleep(0.1) |
| 492 expected_dir = os.path.dirname(os.getcwd()) | 948 expected_dir = os.path.dirname(os.getcwd()) |
| 493 self.assertEqual(p.getcwd(), expected_dir) | 949 self.assertEqual(p.getcwd(), expected_dir) |
| 494 | 950 |
| 495 def test_get_open_files(self): | 951 def test_get_open_files(self): |
| 496 thisfile = os.path.join(os.getcwd(), __file__) | |
| 497 | |
| 498 # current process | 952 # current process |
| 499 p = psutil.Process(os.getpid()) | 953 p = psutil.Process(os.getpid()) |
| 500 files = p.get_open_files() | 954 files = p.get_open_files() |
| 501 self.assertFalse(thisfile in files) | 955 self.assertFalse(TESTFN in files) |
| 502 f = open(thisfile, 'r') | 956 f = open(TESTFN, 'r') |
| 957 time.sleep(.1) |
| 503 filenames = [x.path for x in p.get_open_files()] | 958 filenames = [x.path for x in p.get_open_files()] |
| 504 self.assertTrue(thisfile in filenames) | 959 self.assertTrue(TESTFN in filenames) |
| 505 f.close() | 960 f.close() |
| 506 for file in filenames: | 961 for file in filenames: |
| 507 self.assertTrue(os.path.isfile(file)) | 962 self.assertTrue(os.path.isfile(file)) |
| 508 | 963 |
| 509 # another process | 964 # another process |
| 510 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % thisfi
le | 965 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN |
| 511 sproc = get_test_subprocess([PYTHON, "-c", cmdline]) | 966 sproc = get_test_subprocess([PYTHON, "-c", cmdline]) |
| 512 wait_for_pid(sproc.pid) | 967 wait_for_pid(sproc.pid) |
| 513 time.sleep(0.1) | 968 time.sleep(0.1) |
| 514 p = psutil.Process(sproc.pid) | 969 p = psutil.Process(sproc.pid) |
| 515 filenames = [x.path for x in p.get_open_files()] | 970 for x in range(100): |
| 516 self.assertTrue(thisfile in filenames) | 971 filenames = [x.path for x in p.get_open_files()] |
| 972 if TESTFN in filenames: |
| 973 break |
| 974 time.sleep(.01) |
| 975 else: |
| 976 self.assertTrue(TESTFN in filenames) |
| 517 for file in filenames: | 977 for file in filenames: |
| 518 self.assertTrue(os.path.isfile(file)) | 978 self.assertTrue(os.path.isfile(file)) |
| 519 # all processes | 979 # all processes |
| 520 for proc in psutil.process_iter(): | 980 for proc in psutil.process_iter(): |
| 521 try: | 981 try: |
| 522 files = proc.get_open_files() | 982 files = proc.get_open_files() |
| 523 except psutil.Error: | 983 except psutil.Error: |
| 524 pass | 984 pass |
| 525 else: | 985 else: |
| 526 for file in filenames: | 986 for file in filenames: |
| 527 self.assertTrue(os.path.isfile(file)) | 987 self.assertTrue(os.path.isfile(file)) |
| 528 | 988 |
| 529 def test_get_open_files2(self): | 989 def test_get_open_files2(self): |
| 530 # test fd and path fields | 990 # test fd and path fields |
| 531 fileobj = open(os.path.join(os.getcwd(), __file__), 'r') | 991 fileobj = open(TESTFN, 'r') |
| 532 p = psutil.Process(os.getpid()) | 992 p = psutil.Process(os.getpid()) |
| 533 for path, fd in p.get_open_files(): | 993 for path, fd in p.get_open_files(): |
| 534 if path == fileobj.name or fd == fileobj.fileno(): | 994 if path == fileobj.name or fd == fileobj.fileno(): |
| 535 break | 995 break |
| 536 else: | 996 else: |
| 537 self.fail("no file found; files=%s" % repr(p.get_open_files())) | 997 self.fail("no file found; files=%s" % repr(p.get_open_files())) |
| 538 self.assertEqual(path, fileobj.name) | 998 self.assertEqual(path, fileobj.name) |
| 539 if WINDOWS: | 999 if WINDOWS: |
| 540 self.assertEqual(fd, -1) | 1000 self.assertEqual(fd, -1) |
| 541 else: | 1001 else: |
| 542 self.assertEqual(fd, fileobj.fileno()) | 1002 self.assertEqual(fd, fileobj.fileno()) |
| 543 # test positions | 1003 # test positions |
| 544 ntuple = p.get_open_files()[0] | 1004 ntuple = p.get_open_files()[0] |
| 545 self.assertEqual(ntuple[0], ntuple.path) | 1005 self.assertEqual(ntuple[0], ntuple.path) |
| 546 self.assertEqual(ntuple[1], ntuple.fd) | 1006 self.assertEqual(ntuple[1], ntuple.fd) |
| 547 # test file is gone | 1007 # test file is gone |
| 548 fileobj.close() | 1008 fileobj.close() |
| 549 self.assertTrue(fileobj.name not in p.get_open_files()) | 1009 self.assertTrue(fileobj.name not in p.get_open_files()) |
| 550 | 1010 |
| 551 @skipUnless(SUPPORT_CONNECTIONS, warn=1) | 1011 @skipUnless(SUPPORT_CONNECTIONS, warn=1) |
| 552 def test_get_connections(self): | 1012 def test_get_connections(self): |
| 553 arg = "import socket, time;" \ | 1013 arg = "import socket, time;" \ |
| 554 "s = socket.socket();" \ | 1014 "s = socket.socket();" \ |
| 555 "s.bind(('127.0.0.1', 0));" \ | 1015 "s.bind(('127.0.0.1', 0));" \ |
| 556 "s.listen(1);" \ | 1016 "s.listen(1);" \ |
| 557 "conn, addr = s.accept();" \ | 1017 "conn, addr = s.accept();" \ |
| 558 "time.sleep(100);" | 1018 "time.sleep(100);" |
| 559 sproc = get_test_subprocess([PYTHON, "-c", arg]) | 1019 sproc = get_test_subprocess([PYTHON, "-c", arg]) |
| 560 time.sleep(0.1) | |
| 561 p = psutil.Process(sproc.pid) | 1020 p = psutil.Process(sproc.pid) |
| 562 cons = p.get_connections() | 1021 for x in range(100): |
| 563 self.assertTrue(len(cons) == 1) | 1022 cons = p.get_connections() |
| 1023 if cons: |
| 1024 break |
| 1025 time.sleep(.01) |
| 1026 self.assertEqual(len(cons), 1) |
| 564 con = cons[0] | 1027 con = cons[0] |
| 565 self.assertEqual(con.family, socket.AF_INET) | 1028 self.assertEqual(con.family, socket.AF_INET) |
| 566 self.assertEqual(con.type, socket.SOCK_STREAM) | 1029 self.assertEqual(con.type, socket.SOCK_STREAM) |
| 567 self.assertEqual(con.status, "LISTEN") | 1030 self.assertEqual(con.status, "LISTEN") |
| 568 ip, port = con.local_address | 1031 ip, port = con.local_address |
| 569 self.assertEqual(ip, '127.0.0.1') | 1032 self.assertEqual(ip, '127.0.0.1') |
| 570 self.assertEqual(con.remote_address, ()) | 1033 self.assertEqual(con.remote_address, ()) |
| 571 if WINDOWS: | 1034 if WINDOWS: |
| 572 self.assertEqual(con.fd, -1) | 1035 self.assertEqual(con.fd, -1) |
| 573 else: | 1036 else: |
| 574 self.assertTrue(con.fd > 0) | 1037 self.assertTrue(con.fd > 0) |
| 575 # test positions | 1038 # test positions |
| 576 self.assertEqual(con[0], con.fd) | 1039 self.assertEqual(con[0], con.fd) |
| 577 self.assertEqual(con[1], con.family) | 1040 self.assertEqual(con[1], con.family) |
| 578 self.assertEqual(con[2], con.type) | 1041 self.assertEqual(con[2], con.type) |
| 579 self.assertEqual(con[3], con.local_address) | 1042 self.assertEqual(con[3], con.local_address) |
| 580 self.assertEqual(con[4], con.remote_address) | 1043 self.assertEqual(con[4], con.remote_address) |
| 581 self.assertEqual(con[5], con.status) | 1044 self.assertEqual(con[5], con.status) |
| 582 | 1045 |
| 1046 @skipUnless(supports_ipv6()) |
| 1047 def test_get_connections_ipv6(self): |
| 1048 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| 1049 s.bind(('::1', 0)) |
| 1050 s.listen(1) |
| 1051 cons = psutil.Process(os.getpid()).get_connections() |
| 1052 s.close() |
| 1053 self.assertEqual(len(cons), 1) |
| 1054 self.assertEqual(cons[0].local_address[0], '::1') |
| 1055 |
| 583 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS) | 1056 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS) |
| 584 def test_connection_fromfd(self): | 1057 def test_connection_fromfd(self): |
| 585 sock = socket.socket() | 1058 sock = socket.socket() |
| 586 sock.bind(('127.0.0.1', 0)) | 1059 sock.bind(('localhost', 0)) |
| 587 sock.listen(1) | 1060 sock.listen(1) |
| 588 p = psutil.Process(os.getpid()) | 1061 p = psutil.Process(os.getpid()) |
| 589 for conn in p.get_connections(): | 1062 for conn in p.get_connections(): |
| 590 if conn.fd == sock.fileno(): | 1063 if conn.fd == sock.fileno(): |
| 591 break | 1064 break |
| 592 else: | 1065 else: |
| 593 sock.close() | 1066 sock.close() |
| 594 self.fail("couldn't find socket fd") | 1067 self.fail("couldn't find socket fd") |
| 595 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) | 1068 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
| 596 try: | 1069 try: |
| 597 self.assertEqual(dupsock.getsockname(), conn.local_address) | 1070 self.assertEqual(dupsock.getsockname(), conn.local_address) |
| 598 self.assertNotEqual(sock.fileno(), dupsock.fileno()) | 1071 self.assertNotEqual(sock.fileno(), dupsock.fileno()) |
| 599 finally: | 1072 finally: |
| 600 sock.close() | 1073 sock.close() |
| 601 dupsock.close() | 1074 dupsock.close() |
| 602 | 1075 |
| 603 @skipUnless(SUPPORT_CONNECTIONS, warn=1) | 1076 @skipUnless(SUPPORT_CONNECTIONS, warn=1) |
| 604 def test_get_connections_all(self): | 1077 def test_get_connections_all(self): |
| 605 | 1078 |
| 606 def check_address(addr, family): | 1079 def check_address(addr, family): |
| 607 if not addr: | 1080 if not addr: |
| 608 return | 1081 return |
| 609 ip, port = addr | 1082 ip, port = addr |
| 610 self.assertTrue(isinstance(port, int)) | 1083 self.assertTrue(isinstance(port, int)) |
| 611 if family == socket.AF_INET: | 1084 if family == socket.AF_INET: |
| 612 ip = map(int, ip.split('.')) | 1085 ip = list(map(int, ip.split('.'))) |
| 613 self.assertTrue(len(ip) == 4) | 1086 self.assertTrue(len(ip) == 4) |
| 614 for num in ip: | 1087 for num in ip: |
| 615 self.assertTrue(0 <= num <= 255) | 1088 self.assertTrue(0 <= num <= 255) |
| 616 self.assertTrue(0 <= port <= 65535) | 1089 self.assertTrue(0 <= port <= 65535) |
| 617 | 1090 |
| 618 def supports_ipv6(): | |
| 619 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): | |
| 620 return False | |
| 621 try: | |
| 622 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
| 623 sock.bind(("::1", 0)) | |
| 624 except (socket.error, socket.gaierror): | |
| 625 return False | |
| 626 else: | |
| 627 sock.close() | |
| 628 return True | |
| 629 | |
| 630 # all values are supposed to match Linux's tcp_states.h states | 1091 # all values are supposed to match Linux's tcp_states.h states |
| 631 # table across all platforms. | 1092 # table across all platforms. |
| 632 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", | 1093 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", |
| 633 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", | 1094 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", |
| 634 "LAST_ACK", "LISTEN", "CLOSING", ""] | 1095 "LAST_ACK", "LISTEN", "CLOSING", ""] |
| 635 | 1096 |
| 636 tcp_template = "import socket;" \ | 1097 tcp_template = "import socket;" \ |
| 637 "s = socket.socket($family, socket.SOCK_STREAM);" \ | 1098 "s = socket.socket($family, socket.SOCK_STREAM);" \ |
| 638 "s.bind(('$addr', 0));" \ | 1099 "s.bind(('$addr', 0));" \ |
| 639 "s.listen(1);" \ | 1100 "s.listen(1);" \ |
| (...skipping 18 matching lines...) Expand all Loading... |
| 658 # families and tupes to enrich psutil results | 1119 # families and tupes to enrich psutil results |
| 659 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) | 1120 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) |
| 660 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) | 1121 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) |
| 661 if supports_ipv6(): | 1122 if supports_ipv6(): |
| 662 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) | 1123 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) |
| 663 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) | 1124 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) |
| 664 else: | 1125 else: |
| 665 tcp6_proc = None | 1126 tcp6_proc = None |
| 666 udp6_proc = None | 1127 udp6_proc = None |
| 667 | 1128 |
| 1129 # --- check connections of all processes |
| 1130 |
| 668 time.sleep(0.1) | 1131 time.sleep(0.1) |
| 669 this_proc = psutil.Process(os.getpid()) | |
| 670 children_pids = [p.pid for p in this_proc.get_children()] | |
| 671 for p in psutil.process_iter(): | 1132 for p in psutil.process_iter(): |
| 672 try: | 1133 try: |
| 673 cons = p.get_connections() | 1134 cons = p.get_connections() |
| 674 except (psutil.NoSuchProcess, psutil.AccessDenied): | 1135 except (psutil.NoSuchProcess, psutil.AccessDenied): |
| 675 pass | 1136 pass |
| 676 else: | 1137 else: |
| 677 for conn in cons: | 1138 for conn in cons: |
| 678 self.assertTrue(conn.type in (socket.SOCK_STREAM, | 1139 self.assertTrue(conn.type in (socket.SOCK_STREAM, |
| 679 socket.SOCK_DGRAM)) | 1140 socket.SOCK_DGRAM)) |
| 680 self.assertTrue(conn.family in (socket.AF_INET, | 1141 self.assertTrue(conn.family in (socket.AF_INET, |
| 681 socket.AF_INET6)) | 1142 socket.AF_INET6)) |
| 682 check_address(conn.local_address, conn.family) | 1143 check_address(conn.local_address, conn.family) |
| 683 check_address(conn.remote_address, conn.family) | 1144 check_address(conn.remote_address, conn.family) |
| 684 if conn.status not in valid_states: | 1145 if conn.status not in valid_states: |
| 685 self.fail("%s is not a valid status" %conn.status) | 1146 self.fail("%s is not a valid status" %conn.status) |
| 686 # actually try to bind the local socket; ignore IPv6 | 1147 # actually try to bind the local socket; ignore IPv6 |
| 687 # sockets as their address might be represented as | 1148 # sockets as their address might be represented as |
| 688 # an IPv4-mapped-address (e.g. "::127.0.0.1") | 1149 # an IPv4-mapped-address (e.g. "::127.0.0.1") |
| 689 # and that's rejected by bind() | 1150 # and that's rejected by bind() |
| 690 if conn.family == socket.AF_INET: | 1151 if conn.family == socket.AF_INET: |
| 691 s = socket.socket(conn.family, conn.type) | 1152 s = socket.socket(conn.family, conn.type) |
| 692 s.bind((conn.local_address[0], 0)) | 1153 s.bind((conn.local_address[0], 0)) |
| 693 s.close() | 1154 s.close() |
| 694 | 1155 |
| 695 if not WINDOWS and hasattr(socket, 'fromfd'): | 1156 if not WINDOWS and hasattr(socket, 'fromfd'): |
| 1157 dupsock = None |
| 696 try: | 1158 try: |
| 697 dupsock = socket.fromfd(conn.fd, conn.family, | 1159 try: |
| 698 conn.type) | 1160 dupsock = socket.fromfd(conn.fd, conn.family, |
| 699 except (socket.error, OSError), err: | 1161 conn.type) |
| 700 if err.args[0] == errno.EBADF: | 1162 except (socket.error, OSError): |
| 701 continue | 1163 err = sys.exc_info()[1] |
| 702 else: | 1164 if err.args[0] == errno.EBADF: |
| 703 raise | 1165 continue |
| 704 self.assertEqual(dupsock.family, conn.family) | 1166 else: |
| 705 self.assertEqual(dupsock.type, conn.type) | 1167 raise |
| 1168 # python >= 2.5 |
| 1169 if hasattr(dupsock, "family"): |
| 1170 self.assertEqual(dupsock.family, conn.family) |
| 1171 self.assertEqual(dupsock.type, conn.type) |
| 1172 finally: |
| 1173 if dupsock is not None: |
| 1174 dupsock.close() |
| 706 | 1175 |
| 707 # check matches against subprocesses | 1176 |
| 708 if p.pid in children_pids: | 1177 # --- check matches against subprocesses |
| 709 self.assertTrue(len(cons) == 1) | 1178 |
| 710 conn = cons[0] | 1179 for p in psutil.Process(os.getpid()).get_children(): |
| 711 # TCP v4 | 1180 for conn in p.get_connections(): |
| 712 if p.pid == tcp4_proc.pid: | 1181 # TCP v4 |
| 713 self.assertEqual(conn.family, socket.AF_INET) | 1182 if p.pid == tcp4_proc.pid: |
| 714 self.assertEqual(conn.type, socket.SOCK_STREAM) | 1183 self.assertEqual(conn.family, socket.AF_INET) |
| 715 self.assertEqual(conn.local_address[0], "127.0.0.1") | 1184 self.assertEqual(conn.type, socket.SOCK_STREAM) |
| 716 self.assertEqual(conn.remote_address, ()) | 1185 self.assertEqual(conn.local_address[0], "127.0.0.1") |
| 717 self.assertEqual(conn.status, "LISTEN") | 1186 self.assertEqual(conn.remote_address, ()) |
| 718 # UDP v4 | 1187 self.assertEqual(conn.status, "LISTEN") |
| 719 elif p.pid == udp4_proc.pid: | 1188 # UDP v4 |
| 720 self.assertEqual(conn.family, socket.AF_INET) | 1189 elif p.pid == udp4_proc.pid: |
| 721 self.assertEqual(conn.type, socket.SOCK_DGRAM) | 1190 self.assertEqual(conn.family, socket.AF_INET) |
| 722 self.assertEqual(conn.local_address[0], "127.0.0.1") | 1191 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
| 723 self.assertEqual(conn.remote_address, ()) | 1192 self.assertEqual(conn.local_address[0], "127.0.0.1") |
| 724 self.assertEqual(conn.status, "") | 1193 self.assertEqual(conn.remote_address, ()) |
| 725 # TCP v6 | 1194 self.assertEqual(conn.status, "") |
| 726 elif p.pid == getattr(tcp6_proc, "pid", None): | 1195 # TCP v6 |
| 727 self.assertEqual(conn.family, socket.AF_INET6) | 1196 elif p.pid == getattr(tcp6_proc, "pid", None): |
| 728 self.assertEqual(conn.type, socket.SOCK_STREAM) | 1197 self.assertEqual(conn.family, socket.AF_INET6) |
| 729 self.assertEqual(conn.local_address[0], "::1") | 1198 self.assertEqual(conn.type, socket.SOCK_STREAM) |
| 730 self.assertEqual(conn.remote_address, ()) | 1199 self.assertTrue(conn.local_address[0] in ("::", "::1")) |
| 731 self.assertEqual(conn.status, "LISTEN") | 1200 self.assertEqual(conn.remote_address, ()) |
| 732 # UDP v6 | 1201 self.assertEqual(conn.status, "LISTEN") |
| 733 elif p.pid == getattr(udp6_proc, "pid", None): | 1202 # UDP v6 |
| 734 self.assertEqual(conn.family, socket.AF_INET6) | 1203 elif p.pid == getattr(udp6_proc, "pid", None): |
| 735 self.assertEqual(conn.type, socket.SOCK_DGRAM) | 1204 self.assertEqual(conn.family, socket.AF_INET6) |
| 736 self.assertEqual(conn.local_address[0], "::1") | 1205 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
| 737 self.assertEqual(conn.remote_address, ()) | 1206 self.assertTrue(conn.local_address[0] in ("::", "::1")) |
| 738 self.assertEqual(conn.status, "") | 1207 self.assertEqual(conn.remote_address, ()) |
| 1208 self.assertEqual(conn.status, "") |
| 739 | 1209 |
| 740 def test_parent_ppid(self): | 1210 def test_parent_ppid(self): |
| 741 this_parent = os.getpid() | 1211 this_parent = os.getpid() |
| 742 sproc = get_test_subprocess() | 1212 sproc = get_test_subprocess() |
| 743 p = psutil.Process(sproc.pid) | 1213 p = psutil.Process(sproc.pid) |
| 744 self.assertEqual(p.ppid, this_parent) | 1214 self.assertEqual(p.ppid, this_parent) |
| 745 self.assertEqual(p.parent.pid, this_parent) | 1215 self.assertEqual(p.parent.pid, this_parent) |
| 746 # no other process is supposed to have us as parent | 1216 # no other process is supposed to have us as parent |
| 747 for p in psutil.process_iter(): | 1217 for p in psutil.process_iter(): |
| 748 if p.pid == sproc.pid: | 1218 if p.pid == sproc.pid: |
| 749 continue | 1219 continue |
| 750 self.assertTrue(p.ppid != this_parent) | 1220 self.assertTrue(p.ppid != this_parent) |
| 751 | 1221 |
| 752 def test_get_children(self): | 1222 def test_get_children(self): |
| 753 p = psutil.Process(os.getpid()) | 1223 p = psutil.Process(os.getpid()) |
| 754 self.assertEqual(p.get_children(), []) | 1224 self.assertEqual(p.get_children(), []) |
| 755 sproc = get_test_subprocess() | 1225 sproc = get_test_subprocess() |
| 756 children = p.get_children() | 1226 children = p.get_children() |
| 757 self.assertEqual(len(children), 1) | 1227 self.assertEqual(len(children), 1) |
| 758 self.assertEqual(children[0].pid, sproc.pid) | 1228 self.assertEqual(children[0].pid, sproc.pid) |
| 759 self.assertEqual(children[0].ppid, os.getpid()) | 1229 self.assertEqual(children[0].ppid, os.getpid()) |
| 760 | 1230 |
| 761 def test_suspend_resume(self): | 1231 def test_suspend_resume(self): |
| 762 sproc = get_test_subprocess() | 1232 sproc = get_test_subprocess() |
| 763 p = psutil.Process(sproc.pid) | 1233 p = psutil.Process(sproc.pid) |
| 764 p.suspend() | 1234 p.suspend() |
| 1235 time.sleep(0.1) |
| 1236 self.assertEqual(p.status, psutil.STATUS_STOPPED) |
| 1237 self.assertEqual(str(p.status), "stopped") |
| 765 p.resume() | 1238 p.resume() |
| 766 | 1239 self.assertTrue(p.status != psutil.STATUS_STOPPED) |
| 767 def test_get_pid_list(self): | |
| 768 plist = [x.pid for x in psutil.get_process_list()] | |
| 769 pidlist = psutil.get_pid_list() | |
| 770 self.assertEqual(plist.sort(), pidlist.sort()) | |
| 771 # make sure every pid is unique | |
| 772 self.assertEqual(len(pidlist), len(set(pidlist))) | |
| 773 | |
| 774 def test_test(self): | |
| 775 # test for psutil.test() function | |
| 776 stdout = sys.stdout | |
| 777 sys.stdout = DEVNULL | |
| 778 try: | |
| 779 psutil.test() | |
| 780 finally: | |
| 781 sys.stdout = stdout | |
| 782 | |
| 783 def test_types(self): | |
| 784 sproc = get_test_subprocess() | |
| 785 wait_for_pid(sproc.pid) | |
| 786 p = psutil.Process(sproc.pid) | |
| 787 self.assert_(isinstance(p.pid, int)) | |
| 788 self.assert_(isinstance(p.ppid, int)) | |
| 789 self.assert_(isinstance(p.parent, psutil.Process)) | |
| 790 self.assert_(isinstance(p.name, str)) | |
| 791 if self.__class__.__name__ != "LimitedUserTestCase": | |
| 792 self.assert_(isinstance(p.exe, str)) | |
| 793 self.assert_(isinstance(p.cmdline, list)) | |
| 794 self.assert_(isinstance(p.uid, int)) | |
| 795 self.assert_(isinstance(p.gid, int)) | |
| 796 self.assert_(isinstance(p.create_time, float)) | |
| 797 self.assert_(isinstance(p.username, (unicode, str))) | |
| 798 if hasattr(p, 'getcwd'): | |
| 799 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase": | |
| 800 self.assert_(isinstance(p.getcwd(), str)) | |
| 801 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase": | |
| 802 self.assert_(isinstance(p.get_open_files(), list)) | |
| 803 for path, fd in p.get_open_files(): | |
| 804 self.assert_(isinstance(path, (unicode, str))) | |
| 805 self.assert_(isinstance(fd, int)) | |
| 806 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase" \ | |
| 807 and SUPPORT_CONNECTIONS: | |
| 808 self.assert_(isinstance(p.get_connections(), list)) | |
| 809 self.assert_(isinstance(p.is_running(), bool)) | |
| 810 if not OSX or self.__class__.__name__ != "LimitedUserTestCase": | |
| 811 self.assert_(isinstance(p.get_cpu_times(), tuple)) | |
| 812 self.assert_(isinstance(p.get_cpu_times()[0], float)) | |
| 813 self.assert_(isinstance(p.get_cpu_times()[1], float)) | |
| 814 self.assert_(isinstance(p.get_cpu_percent(0), float)) | |
| 815 self.assert_(isinstance(p.get_memory_info(), tuple)) | |
| 816 self.assert_(isinstance(p.get_memory_info()[0], int)) | |
| 817 self.assert_(isinstance(p.get_memory_info()[1], int)) | |
| 818 self.assert_(isinstance(p.get_memory_percent(), float)) | |
| 819 self.assert_(isinstance(p.get_num_threads(), int)) | |
| 820 self.assert_(isinstance(psutil.get_process_list(), list)) | |
| 821 self.assert_(isinstance(psutil.get_process_list()[0], psutil.Process)) | |
| 822 self.assert_(isinstance(psutil.process_iter(), types.GeneratorType)) | |
| 823 self.assert_(isinstance(psutil.process_iter().next(), psutil.Process)) | |
| 824 self.assert_(isinstance(psutil.get_pid_list(), list)) | |
| 825 self.assert_(isinstance(psutil.get_pid_list()[0], int)) | |
| 826 self.assert_(isinstance(psutil.pid_exists(1), bool)) | |
| 827 | 1240 |
| 828 def test_invalid_pid(self): | 1241 def test_invalid_pid(self): |
| 829 self.assertRaises(ValueError, psutil.Process, "1") | 1242 self.assertRaises(ValueError, psutil.Process, "1") |
| 830 self.assertRaises(ValueError, psutil.Process, None) | 1243 self.assertRaises(ValueError, psutil.Process, None) |
| 831 # Refers to Issue #12 | 1244 # Refers to Issue #12 |
| 832 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1) | 1245 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1) |
| 833 | 1246 |
| 834 def test_zombie_process(self): | 1247 def test_zombie_process(self): |
| 835 # Test that NoSuchProcess exception gets raised in the event the | 1248 # Test that NoSuchProcess exception gets raised in case the |
| 836 # process dies after we create the Process object. | 1249 # process dies after we create the Process object. |
| 837 # Example: | 1250 # Example: |
| 838 # >>> proc = Process(1234) | 1251 # >>> proc = Process(1234) |
| 839 # >>> time.sleep(5) # time-consuming task, process dies in meantime | 1252 # >>> time.sleep(5) # time-consuming task, process dies in meantime |
| 840 # >>> proc.name | 1253 # >>> proc.name |
| 841 # Refers to Issue #15 | 1254 # Refers to Issue #15 |
| 842 sproc = get_test_subprocess() | 1255 sproc = get_test_subprocess() |
| 843 p = psutil.Process(sproc.pid) | 1256 p = psutil.Process(sproc.pid) |
| 844 p.kill() | 1257 p.kill() |
| 845 sproc.wait() | 1258 p.wait() |
| 846 | 1259 |
| 847 self.assertRaises(psutil.NoSuchProcess, getattr, p, "ppid") | 1260 for name in dir(p): |
| 848 self.assertRaises(psutil.NoSuchProcess, getattr, p, "parent") | 1261 if name.startswith('_')\ |
| 849 self.assertRaises(psutil.NoSuchProcess, getattr, p, "name") | 1262 or name in ('pid', 'send_signal', 'is_running', 'set_ionice', |
| 850 self.assertRaises(psutil.NoSuchProcess, getattr, p, "exe") | 1263 'wait'): |
| 851 self.assertRaises(psutil.NoSuchProcess, getattr, p, "cmdline") | 1264 continue |
| 852 self.assertRaises(psutil.NoSuchProcess, getattr, p, "uid") | 1265 try: |
| 853 self.assertRaises(psutil.NoSuchProcess, getattr, p, "gid") | 1266 meth = getattr(p, name) |
| 854 self.assertRaises(psutil.NoSuchProcess, getattr, p, "create_time") | 1267 if callable(meth): |
| 855 self.assertRaises(psutil.NoSuchProcess, getattr, p, "username") | 1268 meth() |
| 856 if hasattr(p, 'getcwd'): | 1269 except psutil.NoSuchProcess: |
| 857 self.assertRaises(psutil.NoSuchProcess, p.getcwd) | 1270 pass |
| 858 self.assertRaises(psutil.NoSuchProcess, p.get_open_files) | 1271 else: |
| 859 self.assertRaises(psutil.NoSuchProcess, p.get_connections) | 1272 self.fail("NoSuchProcess exception not raised for %r" % name) |
| 860 self.assertRaises(psutil.NoSuchProcess, p.suspend) | 1273 |
| 861 self.assertRaises(psutil.NoSuchProcess, p.resume) | 1274 # other methods |
| 862 self.assertRaises(psutil.NoSuchProcess, p.kill) | 1275 try: |
| 863 self.assertRaises(psutil.NoSuchProcess, p.terminate) | 1276 if os.name == 'posix': |
| 1277 p.nice = 1 |
| 1278 else: |
| 1279 p.nice = psutil.NORMAL_PRIORITY_CLASS |
| 1280 except psutil.NoSuchProcess: |
| 1281 pass |
| 1282 else: |
| 1283 self.fail("exception not raised") |
| 1284 if hasattr(p, 'set_ionice'): |
| 1285 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2) |
| 864 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) | 1286 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) |
| 865 self.assertRaises(psutil.NoSuchProcess, p.get_cpu_times) | |
| 866 self.assertRaises(psutil.NoSuchProcess, p.get_cpu_percent, 0) | |
| 867 self.assertRaises(psutil.NoSuchProcess, p.get_memory_info) | |
| 868 self.assertRaises(psutil.NoSuchProcess, p.get_memory_percent) | |
| 869 self.assertRaises(psutil.NoSuchProcess, p.get_children) | |
| 870 self.assertRaises(psutil.NoSuchProcess, p.get_num_threads) | |
| 871 self.assertFalse(p.is_running()) | 1287 self.assertFalse(p.is_running()) |
| 872 | 1288 |
| 873 def test__str__(self): | 1289 def test__str__(self): |
| 874 sproc = get_test_subprocess() | 1290 sproc = get_test_subprocess() |
| 875 p = psutil.Process(sproc.pid) | 1291 p = psutil.Process(sproc.pid) |
| 876 self.assertTrue(str(sproc.pid) in str(p)) | 1292 self.assertTrue(str(sproc.pid) in str(p)) |
| 877 self.assertTrue(os.path.basename(PYTHON) in str(p)) | 1293 # python shows up as 'Python' in cmdline on OS X so test fails on OS X |
| 1294 if not OSX: |
| 1295 self.assertTrue(os.path.basename(PYTHON) in str(p)) |
| 878 sproc = get_test_subprocess() | 1296 sproc = get_test_subprocess() |
| 879 p = psutil.Process(sproc.pid) | 1297 p = psutil.Process(sproc.pid) |
| 880 p.kill() | 1298 p.kill() |
| 881 sproc.wait() | 1299 p.wait() |
| 882 self.assertTrue(str(sproc.pid) in str(p)) | 1300 self.assertTrue(str(sproc.pid) in str(p)) |
| 883 self.assertTrue("terminated" in str(p)) | 1301 self.assertTrue("terminated" in str(p)) |
| 884 | 1302 |
| 885 def test_fetch_all(self): | 1303 def test_fetch_all(self): |
| 886 valid_procs = 0 | 1304 valid_procs = 0 |
| 887 attrs = ['__str__', 'create_time', 'username', 'getcwd', 'get_cpu_times'
, | 1305 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', |
| 888 'get_memory_info', 'get_memory_percent', 'get_open_files', | 1306 'kill', 'wait'] |
| 889 'get_num_threads'] | 1307 excluded_names += ['get_cpu_percent', 'get_children'] |
| 1308 # XXX - skip slow lsof implementation; |
| 1309 if BSD: |
| 1310 excluded_names += ['get_open_files', 'get_connections'] |
| 1311 if OSX: |
| 1312 excluded_names += ['get_connections'] |
| 1313 attrs = [] |
| 1314 for name in dir(psutil.Process): |
| 1315 if name.startswith("_"): |
| 1316 continue |
| 1317 if name.startswith("set_"): |
| 1318 continue |
| 1319 if name in excluded_names: |
| 1320 continue |
| 1321 attrs.append(name) |
| 1322 |
| 890 for p in psutil.process_iter(): | 1323 for p in psutil.process_iter(): |
| 891 for attr in attrs: | 1324 for name in attrs: |
| 892 # skip slow Python implementation; we're reasonably sure | |
| 893 # it works anyway | |
| 894 if POSIX and attr == 'get_open_files': | |
| 895 continue | |
| 896 try: | 1325 try: |
| 897 attr = getattr(p, attr, None) | 1326 try: |
| 898 if attr is not None and callable(attr): | 1327 attr = getattr(p, name, None) |
| 899 attr() | 1328 if attr is not None and callable(attr): |
| 900 valid_procs += 1 | 1329 ret = attr() |
| 901 except (psutil.NoSuchProcess, psutil.AccessDenied), err: | 1330 else: |
| 902 self.assertEqual(err.pid, p.pid) | 1331 ret = attr |
| 903 if err.name: | 1332 valid_procs += 1 |
| 904 self.assertEqual(err.name, p.name) | 1333 except (psutil.NoSuchProcess, psutil.AccessDenied): |
| 905 self.assertTrue(str(err)) | 1334 err = sys.exc_info()[1] |
| 906 self.assertTrue(err.msg) | 1335 self.assertEqual(err.pid, p.pid) |
| 907 except: | 1336 if err.name: |
| 1337 self.assertEqual(err.name, p.name) |
| 1338 self.assertTrue(str(err)) |
| 1339 self.assertTrue(err.msg) |
| 1340 else: |
| 1341 if name == 'parent' or ret in (0, 0.0, [], None): |
| 1342 continue |
| 1343 self.assertTrue(ret) |
| 1344 if name == "exe": |
| 1345 self.assertTrue(os.path.isfile(ret)) |
| 1346 elif name == "getcwd": |
| 1347 # XXX - temporary fix; on my Linux box |
| 1348 # chrome process cws is errnously reported |
| 1349 # as /proc/4144/fdinfo whichd doesn't exist |
| 1350 if 'chrome' in p.name: |
| 1351 continue |
| 1352 self.assertTrue(os.path.isdir(ret)) |
| 1353 except Exception: |
| 1354 err = sys.exc_info()[1] |
| 908 trace = traceback.format_exc() | 1355 trace = traceback.format_exc() |
| 909 self.fail('Exception raised for method %s, pid %s:\n%s' | 1356 self.fail('%s\nmethod=%s, pid=%s, retvalue=%s' |
| 910 %(attr, p.pid, trace)) | 1357 %(trace, name, p.pid, repr(ret))) |
| 911 | 1358 |
| 912 # we should always have a non-empty list, not including PID 0 etc. | 1359 # we should always have a non-empty list, not including PID 0 etc. |
| 913 # special cases. | 1360 # special cases. |
| 914 self.assertTrue(valid_procs > 0) | 1361 self.assertTrue(valid_procs > 0) |
| 915 | 1362 |
| 1363 @skipIf(LINUX) |
| 916 def test_pid_0(self): | 1364 def test_pid_0(self): |
| 917 # Process(0) is supposed to work on all platforms even if with | 1365 # Process(0) is supposed to work on all platforms except Linux |
| 918 # some differences | |
| 919 p = psutil.Process(0) | 1366 p = psutil.Process(0) |
| 920 if WINDOWS: | 1367 self.assertTrue(p.name) |
| 921 self.assertEqual(p.name, 'System Idle Process') | |
| 922 elif LINUX: | |
| 923 self.assertEqual(p.name, 'sched') | |
| 924 elif BSD: | |
| 925 self.assertEqual(p.name, 'swapper') | |
| 926 elif OSX: | |
| 927 self.assertEqual(p.name, 'kernel_task') | |
| 928 | 1368 |
| 929 if os.name == 'posix': | 1369 if os.name == 'posix': |
| 930 self.assertEqual(p.uid, 0) | 1370 self.assertEqual(p.uids.real, 0) |
| 931 self.assertEqual(p.gid, 0) | 1371 self.assertEqual(p.gids.real, 0) |
| 932 else: | |
| 933 self.assertEqual(p.uid, -1) | |
| 934 self.assertEqual(p.gid, -1) | |
| 935 | 1372 |
| 936 self.assertTrue(p.ppid in (0, 1)) | 1373 self.assertTrue(p.ppid in (0, 1)) |
| 937 self.assertEqual(p.exe, "") | 1374 #self.assertEqual(p.exe, "") |
| 938 self.assertEqual(p.cmdline, []) | 1375 self.assertEqual(p.cmdline, []) |
| 939 # this can either raise AD (Win) or return 0 (UNIX) | |
| 940 try: | 1376 try: |
| 941 self.assertTrue(p.get_num_threads() in (0, 1)) | 1377 p.get_num_threads() |
| 942 except psutil.AccessDenied: | 1378 except psutil.AccessDenied: |
| 943 pass | 1379 pass |
| 944 | 1380 |
| 945 if OSX : #and os.geteuid() != 0: | 1381 if OSX : #and os.geteuid() != 0: |
| 946 self.assertRaises(psutil.AccessDenied, p.get_memory_info) | 1382 self.assertRaises(psutil.AccessDenied, p.get_memory_info) |
| 947 self.assertRaises(psutil.AccessDenied, p.get_cpu_times) | 1383 self.assertRaises(psutil.AccessDenied, p.get_cpu_times) |
| 948 else: | 1384 else: |
| 949 p.get_memory_info() | 1385 p.get_memory_info() |
| 950 | 1386 |
| 951 # username property | 1387 # username property |
| 952 if POSIX: | 1388 if POSIX: |
| 953 self.assertEqual(p.username, 'root') | 1389 self.assertEqual(p.username, 'root') |
| 954 elif WINDOWS: | 1390 elif WINDOWS: |
| 955 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') | 1391 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') |
| 956 else: | 1392 else: |
| 957 p.username | 1393 p.username |
| 958 | 1394 |
| 959 # PID 0 is supposed to be available on all platforms | |
| 960 self.assertTrue(0 in psutil.get_pid_list()) | 1395 self.assertTrue(0 in psutil.get_pid_list()) |
| 961 self.assertTrue(psutil.pid_exists(0)) | 1396 self.assertTrue(psutil.pid_exists(0)) |
| 962 | 1397 |
| 963 # --- OS specific tests | 1398 def test_Popen(self): |
| 1399 # Popen class test |
| 1400 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] |
| 1401 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 1402 try: |
| 1403 proc.name |
| 1404 proc.stdin |
| 1405 self.assertTrue(hasattr(proc, 'name')) |
| 1406 self.assertTrue(hasattr(proc, 'stdin')) |
| 1407 self.assertRaises(AttributeError, getattr, proc, 'foo') |
| 1408 finally: |
| 1409 proc.kill() |
| 1410 proc.wait() |
| 964 | 1411 |
| 965 | 1412 |
| 966 if hasattr(os, 'getuid'): | 1413 if hasattr(os, 'getuid'): |
| 967 class LimitedUserTestCase(TestCase): | 1414 class LimitedUserTestCase(TestCase): |
| 968 """Repeat the previous tests by using a limited user. | 1415 """Repeat the previous tests by using a limited user. |
| 969 Executed only on UNIX and only if the user who run the test script | 1416 Executed only on UNIX and only if the user who run the test script |
| 970 is root. | 1417 is root. |
| 971 """ | 1418 """ |
| 972 # the uid/gid the test suite runs under | 1419 # the uid/gid the test suite runs under |
| 973 PROCESS_UID = os.getuid() | 1420 PROCESS_UID = os.getuid() |
| 974 PROCESS_GID = os.getgid() | 1421 PROCESS_GID = os.getgid() |
| 975 | 1422 |
| 1423 def __init__(self, *args, **kwargs): |
| 1424 TestCase.__init__(self, *args, **kwargs) |
| 1425 # re-define all existent test methods in order to |
| 1426 # ignore AccessDenied exceptions |
| 1427 for attr in [x for x in dir(self) if x.startswith('test')]: |
| 1428 meth = getattr(self, attr) |
| 1429 def test_(self): |
| 1430 try: |
| 1431 meth() |
| 1432 except psutil.AccessDenied: |
| 1433 pass |
| 1434 setattr(self, attr, types.MethodType(test_, self)) |
| 1435 |
| 976 def setUp(self): | 1436 def setUp(self): |
| 977 os.setegid(1000) | 1437 os.setegid(1000) |
| 978 os.seteuid(1000) | 1438 os.seteuid(1000) |
| 979 TestCase.setUp(self) | 1439 TestCase.setUp(self) |
| 980 | 1440 |
| 981 def tearDown(self): | 1441 def tearDown(self): |
| 982 os.setegid(self.PROCESS_UID) | 1442 os.setegid(self.PROCESS_UID) |
| 983 os.seteuid(self.PROCESS_GID) | 1443 os.seteuid(self.PROCESS_GID) |
| 984 TestCase.tearDown(self) | 1444 TestCase.tearDown(self) |
| 985 | 1445 |
| 986 def test_path(self): | 1446 def test_nice(self): |
| 987 # DeprecationWarning is only raised once | 1447 try: |
| 988 pass | 1448 psutil.Process(os.getpid()).nice = -1 |
| 989 | 1449 except psutil.AccessDenied: |
| 990 # overridden tests known to raise AccessDenied when run | |
| 991 # as limited user on different platforms | |
| 992 | |
| 993 if LINUX: | |
| 994 | |
| 995 def test_getcwd(self): | |
| 996 self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd, sel
f) | |
| 997 | |
| 998 def test_getcwd_2(self): | |
| 999 self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd_2, s
elf) | |
| 1000 | |
| 1001 def test_get_open_files(self): | |
| 1002 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi
les, self) | |
| 1003 | |
| 1004 def test_get_connections(self): | |
| 1005 pass | 1450 pass |
| 1006 | 1451 else: |
| 1007 def test_exe(self): | 1452 self.fail("exception not raised") |
| 1008 self.assertRaises(psutil.AccessDenied, TestCase.test_exe, self) | |
| 1009 | |
| 1010 if BSD: | |
| 1011 | |
| 1012 def test_get_open_files(self): | |
| 1013 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi
les, self) | |
| 1014 | |
| 1015 def test_get_open_files2(self): | |
| 1016 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi
les, self) | |
| 1017 | |
| 1018 def test_get_connections(self): | |
| 1019 self.assertRaises(psutil.AccessDenied, TestCase.test_get_connect
ions, self) | |
| 1020 | |
| 1021 def test_connection_fromfd(self): | |
| 1022 self.assertRaises(psutil.AccessDenied, TestCase.test_connection_
fromfd, self) | |
| 1023 | 1453 |
| 1024 | 1454 |
| 1025 def test_main(): | 1455 def test_main(): |
| 1026 tests = [] | 1456 tests = [] |
| 1027 test_suite = unittest.TestSuite() | 1457 test_suite = unittest.TestSuite() |
| 1028 | |
| 1029 tests.append(TestCase) | 1458 tests.append(TestCase) |
| 1030 | 1459 |
| 1031 if hasattr(os, 'getuid'): | |
| 1032 if os.getuid() == 0: | |
| 1033 tests.append(LimitedUserTestCase) | |
| 1034 else: | |
| 1035 atexit.register(warnings.warn, "Couldn't run limited user tests (" | |
| 1036 "super-user privileges are required)", RuntimeWarning) | |
| 1037 | |
| 1038 if POSIX: | 1460 if POSIX: |
| 1039 from _posix import PosixSpecificTestCase | 1461 from _posix import PosixSpecificTestCase |
| 1040 tests.append(PosixSpecificTestCase) | 1462 tests.append(PosixSpecificTestCase) |
| 1041 | 1463 |
| 1042 # import the specific platform test suite | 1464 # import the specific platform test suite |
| 1043 if LINUX: | 1465 if LINUX: |
| 1044 from _linux import LinuxSpecificTestCase as stc | 1466 from _linux import LinuxSpecificTestCase as stc |
| 1045 elif WINDOWS: | 1467 elif WINDOWS: |
| 1046 from _windows import WindowsSpecificTestCase as stc | 1468 from _windows import WindowsSpecificTestCase as stc |
| 1047 elif OSX: | 1469 elif OSX: |
| 1048 from _osx import OSXSpecificTestCase as stc | 1470 from _osx import OSXSpecificTestCase as stc |
| 1049 elif BSD: | 1471 elif BSD: |
| 1050 from _bsd import BSDSpecificTestCase as stc | 1472 from _bsd import BSDSpecificTestCase as stc |
| 1051 tests.append(stc) | 1473 tests.append(stc) |
| 1052 | 1474 |
| 1475 if hasattr(os, 'getuid'): |
| 1476 if os.getuid() == 0: |
| 1477 tests.append(LimitedUserTestCase) |
| 1478 else: |
| 1479 atexit.register(warnings.warn, "Couldn't run limited user tests (" |
| 1480 "super-user privileges are required)", RuntimeWarning) |
| 1481 |
| 1053 for test_class in tests: | 1482 for test_class in tests: |
| 1054 test_suite.addTest(unittest.makeSuite(test_class)) | 1483 test_suite.addTest(unittest.makeSuite(test_class)) |
| 1055 | 1484 |
| 1485 f = open(TESTFN, 'w') |
| 1486 f.close() |
| 1487 atexit.register(lambda: os.remove(TESTFN)) |
| 1488 |
| 1056 unittest.TextTestRunner(verbosity=2).run(test_suite) | 1489 unittest.TextTestRunner(verbosity=2).run(test_suite) |
| 1057 DEVNULL.close() | 1490 DEVNULL.close() |
| 1058 | 1491 |
| 1059 if __name__ == '__main__': | 1492 if __name__ == '__main__': |
| 1060 test_main() | 1493 test_main() |
| 1061 | |
| 1062 | |
| OLD | NEW |