| OLD | NEW |
| (Empty) |
| 1 #!/usr/bin/env python | |
| 2 # | |
| 3 # $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $ | |
| 4 # | |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. | |
| 6 # Use of this source code is governed by a BSD-style license that can be | |
| 7 # found in the LICENSE file. | |
| 8 | |
| 9 """ | |
| 10 psutil test suite. | |
| 11 | |
| 12 Note: this is targeted for both python 2.x and 3.x so there's no need | |
| 13 to use 2to3 tool first. | |
| 14 """ | |
| 15 | |
| 16 import unittest | |
| 17 import os | |
| 18 import sys | |
| 19 import subprocess | |
| 20 import time | |
| 21 import signal | |
| 22 import types | |
| 23 import traceback | |
| 24 import socket | |
| 25 import warnings | |
| 26 import atexit | |
| 27 import errno | |
| 28 import threading | |
| 29 import tempfile | |
| 30 import collections | |
| 31 | |
| 32 import psutil | |
| 33 | |
| 34 | |
| 35 PYTHON = os.path.realpath(sys.executable) | |
| 36 DEVNULL = open(os.devnull, 'r+') | |
| 37 TESTFN = os.path.join(os.getcwd(), "$testfile") | |
| 38 PY3 = sys.version_info >= (3,) | |
| 39 POSIX = os.name == 'posix' | |
| 40 LINUX = sys.platform.lower().startswith("linux") | |
| 41 WINDOWS = sys.platform.lower().startswith("win32") | |
| 42 OSX = sys.platform.lower().startswith("darwin") | |
| 43 BSD = sys.platform.lower().startswith("freebsd") | |
| 44 | |
| 45 try: | |
| 46 psutil.Process(os.getpid()).get_connections() | |
| 47 except NotImplementedError: | |
| 48 err = sys.exc_info()[1] | |
| 49 SUPPORT_CONNECTIONS = False | |
| 50 atexit.register(warnings.warn, "get_connections() not supported on this plat
form", | |
| 51 RuntimeWarning) | |
| 52 else: | |
| 53 SUPPORT_CONNECTIONS = True | |
| 54 | |
| 55 if PY3: | |
| 56 long = int | |
| 57 | |
| 58 def callable(attr): | |
| 59 return isinstance(attr, collections.Callable) | |
| 60 | |
| 61 | |
| 62 _subprocesses_started = set() | |
| 63 | |
| 64 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None): | |
| 65 """Return a subprocess.Popen object to use in tests. | |
| 66 By default stdout and stderr are redirected to /dev/null and the | |
| 67 python interpreter is used as test process. | |
| 68 """ | |
| 69 if cmd is None: | |
| 70 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] | |
| 71 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin) | |
| 72 _subprocesses_started.add(sproc.pid) | |
| 73 return sproc | |
| 74 | |
| 75 def sh(cmdline): | |
| 76 """run cmd in a subprocess and return its output. | |
| 77 raises RuntimeError on error. | |
| 78 """ | |
| 79 p = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE, | |
| 80 stderr=subprocess.PIPE) | |
| 81 stdout, stderr = p.communicate() | |
| 82 if p.returncode != 0: | |
| 83 raise RuntimeError(stderr) | |
| 84 if stderr: | |
| 85 warnings.warn(stderr, RuntimeWarning) | |
| 86 if sys.version_info >= (3,): | |
| 87 stdout = str(stdout, sys.stdout.encoding) | |
| 88 return stdout.strip() | |
| 89 | |
| 90 def wait_for_pid(pid, timeout=1): | |
| 91 """Wait for pid to show up in the process list then return. | |
| 92 Used in the test suite to give time the sub process to initialize. | |
| 93 """ | |
| 94 raise_at = time.time() + timeout | |
| 95 while 1: | |
| 96 if pid in psutil.get_pid_list(): | |
| 97 # give it one more iteration to allow full initialization | |
| 98 time.sleep(0.01) | |
| 99 return | |
| 100 time.sleep(0.0001) | |
| 101 if time.time() >= raise_at: | |
| 102 raise RuntimeError("Timed out") | |
| 103 | |
| 104 def reap_children(search_all=False): | |
| 105 """Kill any subprocess started by this test suite and ensure that | |
| 106 no zombies stick around to hog resources and create problems when | |
| 107 looking for refleaks. | |
| 108 """ | |
| 109 if search_all: | |
| 110 this_process = psutil.Process(os.getpid()) | |
| 111 pids = [x.pid for x in this_process.get_children()] | |
| 112 else: | |
| 113 pids =_subprocesses_started | |
| 114 while pids: | |
| 115 pid = pids.pop() | |
| 116 try: | |
| 117 child = psutil.Process(pid) | |
| 118 child.kill() | |
| 119 except psutil.NoSuchProcess: | |
| 120 pass | |
| 121 else: | |
| 122 child.wait() | |
| 123 | |
| 124 | |
| 125 # we want to search through all processes before exiting | |
| 126 atexit.register(reap_children, search_all=True) | |
| 127 | |
| 128 def skipIf(condition, reason="", warn=False): | |
| 129 """Decorator which skip a test under if condition is satisfied. | |
| 130 This is a substitute of unittest.skipIf which is available | |
| 131 only in python 2.7 and 3.2. | |
| 132 If 'reason' argument is provided this will be printed during | |
| 133 tests execution. | |
| 134 If 'warn' is provided a RuntimeWarning will be shown when all | |
| 135 tests are run. | |
| 136 """ | |
| 137 def outer(fun, *args, **kwargs): | |
| 138 def inner(self): | |
| 139 if condition: | |
| 140 sys.stdout.write("skipped-") | |
| 141 sys.stdout.flush() | |
| 142 if warn: | |
| 143 objname = "%s.%s" % (self.__class__.__name__, fun.__name__) | |
| 144 msg = "%s was skipped" % objname | |
| 145 if reason: | |
| 146 msg += "; reason: " + repr(reason) | |
| 147 atexit.register(warnings.warn, msg, RuntimeWarning) | |
| 148 return | |
| 149 else: | |
| 150 return fun(self, *args, **kwargs) | |
| 151 return inner | |
| 152 return outer | |
| 153 | |
| 154 def skipUnless(condition, reason="", warn=False): | |
| 155 """Contrary of skipIf.""" | |
| 156 if not condition: | |
| 157 return skipIf(True, reason, warn) | |
| 158 return skipIf(False) | |
| 159 | |
| 160 def ignore_access_denied(fun): | |
| 161 """Decorator to Ignore AccessDenied exceptions.""" | |
| 162 def outer(fun, *args, **kwargs): | |
| 163 def inner(self): | |
| 164 try: | |
| 165 return fun(self, *args, **kwargs) | |
| 166 except psutil.AccessDenied: | |
| 167 pass | |
| 168 return inner | |
| 169 return outer | |
| 170 | |
| 171 def supports_ipv6(): | |
| 172 """Return True if IPv6 is supported on this platform.""" | |
| 173 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): | |
| 174 return False | |
| 175 sock = None | |
| 176 try: | |
| 177 try: | |
| 178 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
| 179 sock.bind(("::1", 0)) | |
| 180 except (socket.error, socket.gaierror): | |
| 181 return False | |
| 182 else: | |
| 183 return True | |
| 184 finally: | |
| 185 if sock is not None: | |
| 186 sock.close() | |
| 187 | |
| 188 | |
| 189 class ThreadTask(threading.Thread): | |
| 190 """A thread object used for running process thread tests.""" | |
| 191 | |
| 192 def __init__(self): | |
| 193 threading.Thread.__init__(self) | |
| 194 self._running = False | |
| 195 self._interval = None | |
| 196 self._flag = threading.Event() | |
| 197 | |
| 198 def __repr__(self): | |
| 199 name = self.__class__.__name__ | |
| 200 return '<%s running=%s at %#x>' % (name, self._running, id(self)) | |
| 201 | |
| 202 def start(self, interval=0.001): | |
| 203 """Start thread and keep it running until an explicit | |
| 204 stop() request. Polls for shutdown every 'timeout' seconds. | |
| 205 """ | |
| 206 if self._running: | |
| 207 raise ValueError("already started") | |
| 208 self._interval = interval | |
| 209 threading.Thread.start(self) | |
| 210 self._flag.wait() | |
| 211 | |
| 212 def run(self): | |
| 213 self._running = True | |
| 214 self._flag.set() | |
| 215 while self._running: | |
| 216 time.sleep(self._interval) | |
| 217 | |
| 218 def stop(self): | |
| 219 """Stop thread execution and and waits until it is stopped.""" | |
| 220 if not self._running: | |
| 221 raise ValueError("already stopped") | |
| 222 self._running = False | |
| 223 self.join() | |
| 224 | |
| 225 | |
| 226 class TestCase(unittest.TestCase): | |
| 227 | |
| 228 def tearDown(self): | |
| 229 reap_children() | |
| 230 | |
| 231 # ============================ | |
| 232 # tests for system-related API | |
| 233 # ============================ | |
| 234 | |
| 235 def test_get_process_list(self): | |
| 236 pids = [x.pid for x in psutil.get_process_list()] | |
| 237 self.assertTrue(os.getpid() in pids) | |
| 238 | |
| 239 def test_process_iter(self): | |
| 240 pids = [x.pid for x in psutil.process_iter()] | |
| 241 self.assertTrue(os.getpid() in pids) | |
| 242 | |
| 243 def test_TOTAL_PHYMEM(self): | |
| 244 x = psutil.TOTAL_PHYMEM | |
| 245 self.assertTrue(isinstance(x, (int, long))) | |
| 246 self.assertTrue(x > 0) | |
| 247 | |
| 248 def test_BOOT_TIME(self): | |
| 249 x = psutil.BOOT_TIME | |
| 250 self.assertTrue(isinstance(x, float)) | |
| 251 self.assertTrue(x > 0) | |
| 252 | |
| 253 def test_deprecated_memory_functions(self): | |
| 254 warnings.filterwarnings("error") | |
| 255 try: | |
| 256 self.assertRaises(DeprecationWarning, psutil.used_phymem) | |
| 257 self.assertRaises(DeprecationWarning, psutil.avail_phymem) | |
| 258 self.assertRaises(DeprecationWarning, psutil.total_virtmem) | |
| 259 self.assertRaises(DeprecationWarning, psutil.used_virtmem) | |
| 260 self.assertRaises(DeprecationWarning, psutil.avail_virtmem) | |
| 261 finally: | |
| 262 warnings.resetwarnings() | |
| 263 | |
| 264 def test_phymem_usage(self): | |
| 265 mem = psutil.phymem_usage() | |
| 266 self.assertTrue(mem.total > 0) | |
| 267 self.assertTrue(mem.used > 0) | |
| 268 self.assertTrue(mem.free > 0) | |
| 269 self.assertTrue(0 <= mem.percent <= 100) | |
| 270 | |
| 271 def test_virtmem_usage(self): | |
| 272 mem = psutil.virtmem_usage() | |
| 273 self.assertTrue(mem.total > 0) | |
| 274 self.assertTrue(mem.used >= 0) | |
| 275 self.assertTrue(mem.free > 0) | |
| 276 self.assertTrue(0 <= mem.percent <= 100) | |
| 277 | |
| 278 @skipUnless(LINUX) | |
| 279 def test_phymem_buffers(self): | |
| 280 x = psutil.phymem_buffers() | |
| 281 self.assertTrue(isinstance(x, (int, long))) | |
| 282 self.assertTrue(x >= 0) | |
| 283 | |
| 284 def test_pid_exists(self): | |
| 285 sproc = get_test_subprocess() | |
| 286 wait_for_pid(sproc.pid) | |
| 287 self.assertTrue(psutil.pid_exists(sproc.pid)) | |
| 288 p = psutil.Process(sproc.pid) | |
| 289 p.kill() | |
| 290 p.wait() | |
| 291 self.assertFalse(psutil.pid_exists(sproc.pid)) | |
| 292 self.assertFalse(psutil.pid_exists(-1)) | |
| 293 | |
| 294 def test_pid_exists_2(self): | |
| 295 reap_children() | |
| 296 pids = psutil.get_pid_list() | |
| 297 for pid in pids: | |
| 298 try: | |
| 299 self.assertTrue(psutil.pid_exists(pid)) | |
| 300 except AssertionError: | |
| 301 # in case the process disappeared in meantime fail only | |
| 302 # if it is no longer in get_pid_list() | |
| 303 time.sleep(.1) | |
| 304 if pid in psutil.get_pid_list(): | |
| 305 self.fail(pid) | |
| 306 pids = range(max(pids) + 5000, max(pids) + 6000) | |
| 307 for pid in pids: | |
| 308 self.assertFalse(psutil.pid_exists(pid)) | |
| 309 | |
| 310 def test_get_pid_list(self): | |
| 311 plist = [x.pid for x in psutil.get_process_list()] | |
| 312 pidlist = psutil.get_pid_list() | |
| 313 self.assertEqual(plist.sort(), pidlist.sort()) | |
| 314 # make sure every pid is unique | |
| 315 self.assertEqual(len(pidlist), len(set(pidlist))) | |
| 316 | |
| 317 def test_test(self): | |
| 318 # test for psutil.test() function | |
| 319 stdout = sys.stdout | |
| 320 sys.stdout = DEVNULL | |
| 321 try: | |
| 322 psutil.test() | |
| 323 finally: | |
| 324 sys.stdout = stdout | |
| 325 | |
| 326 def test_sys_cpu_times(self): | |
| 327 total = 0 | |
| 328 times = psutil.cpu_times() | |
| 329 sum(times) | |
| 330 for cp_time in times: | |
| 331 self.assertTrue(isinstance(cp_time, float)) | |
| 332 self.assertTrue(cp_time >= 0.0) | |
| 333 total += cp_time | |
| 334 self.assertEqual(total, sum(times)) | |
| 335 str(times) | |
| 336 | |
| 337 def test_sys_cpu_times2(self): | |
| 338 t1 = sum(psutil.cpu_times()) | |
| 339 time.sleep(0.1) | |
| 340 t2 = sum(psutil.cpu_times()) | |
| 341 difference = t2 - t1 | |
| 342 if not difference >= 0.05: | |
| 343 self.fail("difference %s" % difference) | |
| 344 | |
| 345 def test_sys_per_cpu_times(self): | |
| 346 for times in psutil.cpu_times(percpu=True): | |
| 347 total = 0 | |
| 348 sum(times) | |
| 349 for cp_time in times: | |
| 350 self.assertTrue(isinstance(cp_time, float)) | |
| 351 self.assertTrue(cp_time >= 0.0) | |
| 352 total += cp_time | |
| 353 self.assertEqual(total, sum(times)) | |
| 354 str(times) | |
| 355 | |
| 356 def test_sys_per_cpu_times2(self): | |
| 357 tot1 = psutil.cpu_times(percpu=True) | |
| 358 stop_at = time.time() + 0.1 | |
| 359 while 1: | |
| 360 if time.time() >= stop_at: | |
| 361 break | |
| 362 tot2 = psutil.cpu_times(percpu=True) | |
| 363 for t1, t2 in zip(tot1, tot2): | |
| 364 t1, t2 = sum(t1), sum(t2) | |
| 365 difference = t2 - t1 | |
| 366 if difference >= 0.05: | |
| 367 return | |
| 368 self.fail() | |
| 369 | |
| 370 def test_sys_cpu_percent(self): | |
| 371 psutil.cpu_percent(interval=0.001) | |
| 372 psutil.cpu_percent(interval=0.001) | |
| 373 for x in range(1000): | |
| 374 percent = psutil.cpu_percent(interval=None) | |
| 375 self.assertTrue(isinstance(percent, float)) | |
| 376 self.assertTrue(percent >= 0.0) | |
| 377 self.assertTrue(percent <= 100.0) | |
| 378 | |
| 379 def test_sys_per_cpu_percent(self): | |
| 380 psutil.cpu_percent(interval=0.001, percpu=True) | |
| 381 psutil.cpu_percent(interval=0.001, percpu=True) | |
| 382 for x in range(1000): | |
| 383 percents = psutil.cpu_percent(interval=None, percpu=True) | |
| 384 for percent in percents: | |
| 385 self.assertTrue(isinstance(percent, float)) | |
| 386 self.assertTrue(percent >= 0.0) | |
| 387 self.assertTrue(percent <= 100.0) | |
| 388 | |
| 389 def test_sys_cpu_percent_compare(self): | |
| 390 psutil.cpu_percent(interval=0) | |
| 391 psutil.cpu_percent(interval=0, percpu=True) | |
| 392 time.sleep(.1) | |
| 393 t1 = psutil.cpu_percent(interval=0) | |
| 394 t2 = psutil.cpu_percent(interval=0, percpu=True) | |
| 395 # calculate total average | |
| 396 t2 = sum(t2) / len(t2) | |
| 397 if abs(t1 - t2) > 5: | |
| 398 self.assertEqual(t1, t2) | |
| 399 | |
| 400 def test_disk_usage(self): | |
| 401 usage = psutil.disk_usage(os.getcwd()) | |
| 402 self.assertTrue(usage.total > 0) | |
| 403 self.assertTrue(usage.used > 0) | |
| 404 self.assertTrue(usage.free > 0) | |
| 405 self.assertTrue(usage.total > usage.used) | |
| 406 self.assertTrue(usage.total > usage.free) | |
| 407 self.assertTrue(0 <= usage.percent <= 100) | |
| 408 | |
| 409 # if path does not exist OSError ENOENT is expected across | |
| 410 # all platforms | |
| 411 fname = tempfile.mktemp() | |
| 412 try: | |
| 413 psutil.disk_usage(fname) | |
| 414 except OSError: | |
| 415 err = sys.exc_info()[1] | |
| 416 if err.args[0] != errno.ENOENT: | |
| 417 raise | |
| 418 else: | |
| 419 self.fail("OSError not raised") | |
| 420 | |
| 421 def test_disk_partitions(self): | |
| 422 for disk in psutil.disk_partitions(all=False): | |
| 423 self.assertTrue(os.path.exists(disk.device)) | |
| 424 self.assertTrue(os.path.isdir(disk.mountpoint)) | |
| 425 self.assertTrue(disk.fstype) | |
| 426 for disk in psutil.disk_partitions(all=True): | |
| 427 if not WINDOWS: | |
| 428 self.assertTrue(os.path.isdir(disk.mountpoint)) | |
| 429 self.assertTrue(disk.fstype) | |
| 430 | |
| 431 def find_mount_point(path): | |
| 432 path = os.path.abspath(path) | |
| 433 while not os.path.ismount(path): | |
| 434 path = os.path.dirname(path) | |
| 435 return path | |
| 436 | |
| 437 mount = find_mount_point(__file__) | |
| 438 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] | |
| 439 self.assertTrue(mount in mounts) | |
| 440 psutil.disk_usage(mount) | |
| 441 | |
| 442 # XXX | |
| 443 @skipUnless(hasattr(psutil, "network_io_counters")) | |
| 444 def test_anetwork_io_counters(self): | |
| 445 def check_ntuple(nt): | |
| 446 self.assertEqual(nt[0], nt.bytes_sent) | |
| 447 self.assertEqual(nt[1], nt.bytes_recv) | |
| 448 self.assertEqual(nt[2], nt.packets_sent) | |
| 449 self.assertEqual(nt[3], nt.packets_recv) | |
| 450 self.assertTrue(nt.bytes_sent >= 0) | |
| 451 self.assertTrue(nt.bytes_recv >= 0) | |
| 452 self.assertTrue(nt.packets_sent >= 0) | |
| 453 self.assertTrue(nt.packets_recv >= 0) | |
| 454 | |
| 455 ret = psutil.network_io_counters(pernic=False) | |
| 456 check_ntuple(ret) | |
| 457 ret = psutil.network_io_counters(pernic=True) | |
| 458 for name, ntuple in ret.iteritems(): | |
| 459 self.assertTrue(name) | |
| 460 check_ntuple(ntuple) | |
| 461 # XXX | |
| 462 @skipUnless(hasattr(psutil, "disk_io_counters")) | |
| 463 def test_disk_io_counters(self): | |
| 464 def check_ntuple(nt): | |
| 465 self.assertEqual(nt[0], nt.read_count) | |
| 466 self.assertEqual(nt[1], nt.write_count) | |
| 467 self.assertEqual(nt[2], nt.read_bytes) | |
| 468 self.assertEqual(nt[3], nt.write_bytes) | |
| 469 self.assertEqual(nt[4], nt.read_time) | |
| 470 self.assertEqual(nt[5], nt.write_time) | |
| 471 self.assertTrue(nt.read_count >= 0) | |
| 472 self.assertTrue(nt.write_count >= 0) | |
| 473 self.assertTrue(nt.read_bytes >= 0) | |
| 474 self.assertTrue(nt.write_bytes >= 0) | |
| 475 self.assertTrue(nt.read_time >= 0) | |
| 476 self.assertTrue(nt.write_time >= 0) | |
| 477 | |
| 478 ret = psutil.disk_io_counters(perdisk=False) | |
| 479 check_ntuple(ret) | |
| 480 ret = psutil.disk_io_counters(perdisk=True) | |
| 481 for name, ntuple in ret.iteritems(): | |
| 482 self.assertTrue(name) | |
| 483 check_ntuple(ntuple) | |
| 484 | |
| 485 # ==================== | |
| 486 # Process object tests | |
| 487 # ==================== | |
| 488 | |
| 489 def test_kill(self): | |
| 490 sproc = get_test_subprocess() | |
| 491 test_pid = sproc.pid | |
| 492 wait_for_pid(test_pid) | |
| 493 p = psutil.Process(test_pid) | |
| 494 name = p.name | |
| 495 p.kill() | |
| 496 p.wait() | |
| 497 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
| 498 | |
| 499 def test_terminate(self): | |
| 500 sproc = get_test_subprocess() | |
| 501 test_pid = sproc.pid | |
| 502 wait_for_pid(test_pid) | |
| 503 p = psutil.Process(test_pid) | |
| 504 name = p.name | |
| 505 p.terminate() | |
| 506 p.wait() | |
| 507 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
| 508 | |
| 509 def test_send_signal(self): | |
| 510 if POSIX: | |
| 511 sig = signal.SIGKILL | |
| 512 else: | |
| 513 sig = signal.SIGTERM | |
| 514 sproc = get_test_subprocess() | |
| 515 test_pid = sproc.pid | |
| 516 p = psutil.Process(test_pid) | |
| 517 name = p.name | |
| 518 p.send_signal(sig) | |
| 519 p.wait() | |
| 520 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
| 521 | |
| 522 def test_wait(self): | |
| 523 # check exit code signal | |
| 524 sproc = get_test_subprocess() | |
| 525 p = psutil.Process(sproc.pid) | |
| 526 p.kill() | |
| 527 code = p.wait() | |
| 528 if os.name == 'posix': | |
| 529 self.assertEqual(code, signal.SIGKILL) | |
| 530 else: | |
| 531 self.assertEqual(code, 0) | |
| 532 self.assertFalse(p.is_running()) | |
| 533 | |
| 534 sproc = get_test_subprocess() | |
| 535 p = psutil.Process(sproc.pid) | |
| 536 p.terminate() | |
| 537 code = p.wait() | |
| 538 if os.name == 'posix': | |
| 539 self.assertEqual(code, signal.SIGTERM) | |
| 540 else: | |
| 541 self.assertEqual(code, 0) | |
| 542 self.assertFalse(p.is_running()) | |
| 543 | |
| 544 # check sys.exit() code | |
| 545 code = "import time, sys; time.sleep(0.01); sys.exit(5);" | |
| 546 sproc = get_test_subprocess([PYTHON, "-c", code]) | |
| 547 p = psutil.Process(sproc.pid) | |
| 548 self.assertEqual(p.wait(), 5) | |
| 549 self.assertFalse(p.is_running()) | |
| 550 | |
| 551 # Test wait() issued twice. | |
| 552 # It is not supposed to raise NSP when the process is gone. | |
| 553 # On UNIX this should return None, on Windows it should keep | |
| 554 # returning the exit code. | |
| 555 sproc = get_test_subprocess([PYTHON, "-c", code]) | |
| 556 p = psutil.Process(sproc.pid) | |
| 557 self.assertEqual(p.wait(), 5) | |
| 558 self.assertTrue(p.wait() in (5, None)) | |
| 559 | |
| 560 # test timeout | |
| 561 sproc = get_test_subprocess() | |
| 562 p = psutil.Process(sproc.pid) | |
| 563 p.name | |
| 564 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) | |
| 565 | |
| 566 # timeout < 0 not allowed | |
| 567 self.assertRaises(ValueError, p.wait, -1) | |
| 568 | |
| 569 @skipUnless(POSIX) | |
| 570 def test_wait_non_children(self): | |
| 571 # test wait() against processes which are not our children | |
| 572 code = "import sys;" | |
| 573 code += "from subprocess import Popen, PIPE;" | |
| 574 code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON | |
| 575 code += "sp = Popen(cmd, stdout=PIPE);" | |
| 576 code += "sys.stdout.write(str(sp.pid));" | |
| 577 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE
) | |
| 578 | |
| 579 grandson_pid = int(sproc.stdout.read()) | |
| 580 grandson_proc = psutil.Process(grandson_pid) | |
| 581 try: | |
| 582 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) | |
| 583 grandson_proc.kill() | |
| 584 ret = grandson_proc.wait() | |
| 585 self.assertEqual(ret, None) | |
| 586 finally: | |
| 587 if grandson_proc.is_running(): | |
| 588 grandson_proc.kill() | |
| 589 grandson_proc.wait() | |
| 590 | |
| 591 def test_wait_timeout_0(self): | |
| 592 sproc = get_test_subprocess() | |
| 593 p = psutil.Process(sproc.pid) | |
| 594 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) | |
| 595 p.kill() | |
| 596 stop_at = time.time() + 2 | |
| 597 while 1: | |
| 598 try: | |
| 599 code = p.wait(0) | |
| 600 except psutil.TimeoutExpired: | |
| 601 if time.time() >= stop_at: | |
| 602 raise | |
| 603 else: | |
| 604 break | |
| 605 if os.name == 'posix': | |
| 606 self.assertEqual(code, signal.SIGKILL) | |
| 607 else: | |
| 608 self.assertEqual(code, 0) | |
| 609 self.assertFalse(p.is_running()) | |
| 610 | |
| 611 def test_cpu_percent(self): | |
| 612 p = psutil.Process(os.getpid()) | |
| 613 p.get_cpu_percent(interval=0.001) | |
| 614 p.get_cpu_percent(interval=0.001) | |
| 615 for x in range(100): | |
| 616 percent = p.get_cpu_percent(interval=None) | |
| 617 self.assertTrue(isinstance(percent, float)) | |
| 618 self.assertTrue(percent >= 0.0) | |
| 619 self.assertTrue(percent <= 100.0) | |
| 620 | |
| 621 def test_cpu_times(self): | |
| 622 times = psutil.Process(os.getpid()).get_cpu_times() | |
| 623 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) | |
| 624 # make sure returned values can be pretty printed with strftime | |
| 625 time.strftime("%H:%M:%S", time.localtime(times.user)) | |
| 626 time.strftime("%H:%M:%S", time.localtime(times.system)) | |
| 627 | |
| 628 # Test Process.cpu_times() against os.times() | |
| 629 # os.times() is broken on Python 2.6 | |
| 630 # http://bugs.python.org/issue1040026 | |
| 631 # XXX fails on OSX: not sure if it's for os.times(). We should | |
| 632 # try this with Python 2.7 and re-enable the test. | |
| 633 | |
| 634 @skipUnless(sys.version_info > (2, 6, 1) and not OSX) | |
| 635 def test_cpu_times2(self): | |
| 636 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() | |
| 637 utime, ktime = os.times()[:2] | |
| 638 | |
| 639 # Use os.times()[:2] as base values to compare our results | |
| 640 # using a tolerance of +/- 0.1 seconds. | |
| 641 # It will fail if the difference between the values is > 0.1s. | |
| 642 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: | |
| 643 self.fail("expected: %s, found: %s" %(utime, user_time)) | |
| 644 | |
| 645 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: | |
| 646 self.fail("expected: %s, found: %s" %(ktime, kernel_time)) | |
| 647 | |
| 648 def test_create_time(self): | |
| 649 sproc = get_test_subprocess() | |
| 650 now = time.time() | |
| 651 wait_for_pid(sproc.pid) | |
| 652 p = psutil.Process(sproc.pid) | |
| 653 create_time = p.create_time | |
| 654 | |
| 655 # Use time.time() as base value to compare our result using a | |
| 656 # tolerance of +/- 1 second. | |
| 657 # It will fail if the difference between the values is > 2s. | |
| 658 difference = abs(create_time - now) | |
| 659 if difference > 2: | |
| 660 self.fail("expected: %s, found: %s, difference: %s" | |
| 661 % (now, create_time, difference)) | |
| 662 | |
| 663 # make sure returned value can be pretty printed with strftime | |
| 664 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) | |
| 665 | |
| 666 @skipIf(WINDOWS) | |
| 667 def test_terminal(self): | |
| 668 tty = sh('tty') | |
| 669 p = psutil.Process(os.getpid()) | |
| 670 self.assertEqual(p.terminal, tty) | |
| 671 | |
| 672 @skipIf(OSX, warn=False) | |
| 673 def test_get_io_counters(self): | |
| 674 p = psutil.Process(os.getpid()) | |
| 675 # test reads | |
| 676 io1 = p.get_io_counters() | |
| 677 f = open(PYTHON, 'rb') | |
| 678 f.read() | |
| 679 f.close() | |
| 680 io2 = p.get_io_counters() | |
| 681 if not BSD: | |
| 682 self.assertTrue(io2.read_count > io1.read_count) | |
| 683 self.assertTrue(io2.write_count == io1.write_count) | |
| 684 self.assertTrue(io2.read_bytes >= io1.read_bytes) | |
| 685 self.assertTrue(io2.write_bytes >= io1.write_bytes) | |
| 686 # test writes | |
| 687 io1 = p.get_io_counters() | |
| 688 f = tempfile.TemporaryFile() | |
| 689 if sys.version_info >= (3,): | |
| 690 f.write(bytes("x" * 1000000, 'ascii')) | |
| 691 else: | |
| 692 f.write("x" * 1000000) | |
| 693 f.close() | |
| 694 io2 = p.get_io_counters() | |
| 695 if not BSD: | |
| 696 self.assertTrue(io2.write_count > io1.write_count) | |
| 697 self.assertTrue(io2.write_bytes > io1.write_bytes) | |
| 698 self.assertTrue(io2.read_count >= io1.read_count) | |
| 699 self.assertTrue(io2.read_bytes >= io1.read_bytes) | |
| 700 | |
| 701 @skipUnless(LINUX) | |
| 702 def test_get_set_ionice(self): | |
| 703 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE, | |
| 704 IOPRIO_CLASS_IDLE) | |
| 705 self.assertEqual(IOPRIO_CLASS_NONE, 0) | |
| 706 self.assertEqual(IOPRIO_CLASS_RT, 1) | |
| 707 self.assertEqual(IOPRIO_CLASS_BE, 2) | |
| 708 self.assertEqual(IOPRIO_CLASS_IDLE, 3) | |
| 709 p = psutil.Process(os.getpid()) | |
| 710 try: | |
| 711 p.set_ionice(2) | |
| 712 ioclass, value = p.get_ionice() | |
| 713 self.assertEqual(ioclass, 2) | |
| 714 self.assertEqual(value, 4) | |
| 715 # | |
| 716 p.set_ionice(3) | |
| 717 ioclass, value = p.get_ionice() | |
| 718 self.assertEqual(ioclass, 3) | |
| 719 self.assertEqual(value, 0) | |
| 720 # | |
| 721 p.set_ionice(2, 0) | |
| 722 ioclass, value = p.get_ionice() | |
| 723 self.assertEqual(ioclass, 2) | |
| 724 self.assertEqual(value, 0) | |
| 725 p.set_ionice(2, 7) | |
| 726 ioclass, value = p.get_ionice() | |
| 727 self.assertEqual(ioclass, 2) | |
| 728 self.assertEqual(value, 7) | |
| 729 self.assertRaises(ValueError, p.set_ionice, 2, 10) | |
| 730 finally: | |
| 731 p.set_ionice(IOPRIO_CLASS_NONE) | |
| 732 | |
| 733 def test_get_num_threads(self): | |
| 734 # on certain platforms such as Linux we might test for exact | |
| 735 # thread number, since we always have with 1 thread per process, | |
| 736 # but this does not apply across all platforms (OSX, Windows) | |
| 737 p = psutil.Process(os.getpid()) | |
| 738 step1 = p.get_num_threads() | |
| 739 | |
| 740 thread = ThreadTask() | |
| 741 thread.start() | |
| 742 try: | |
| 743 step2 = p.get_num_threads() | |
| 744 self.assertEqual(step2, step1 + 1) | |
| 745 thread.stop() | |
| 746 finally: | |
| 747 if thread._running: | |
| 748 thread.stop() | |
| 749 | |
| 750 def test_get_threads(self): | |
| 751 p = psutil.Process(os.getpid()) | |
| 752 step1 = p.get_threads() | |
| 753 | |
| 754 thread = ThreadTask() | |
| 755 thread.start() | |
| 756 | |
| 757 try: | |
| 758 step2 = p.get_threads() | |
| 759 self.assertEqual(len(step2), len(step1) + 1) | |
| 760 # on Linux, first thread id is supposed to be this process | |
| 761 if LINUX: | |
| 762 self.assertEqual(step2[0].id, os.getpid()) | |
| 763 athread = step2[0] | |
| 764 # test named tuple | |
| 765 self.assertEqual(athread.id, athread[0]) | |
| 766 self.assertEqual(athread.user_time, athread[1]) | |
| 767 self.assertEqual(athread.system_time, athread[2]) | |
| 768 # test num threads | |
| 769 thread.stop() | |
| 770 finally: | |
| 771 if thread._running: | |
| 772 thread.stop() | |
| 773 | |
| 774 def test_get_memory_info(self): | |
| 775 p = psutil.Process(os.getpid()) | |
| 776 | |
| 777 # step 1 - get a base value to compare our results | |
| 778 rss1, vms1 = p.get_memory_info() | |
| 779 percent1 = p.get_memory_percent() | |
| 780 self.assertTrue(rss1 > 0) | |
| 781 self.assertTrue(vms1 > 0) | |
| 782 | |
| 783 # step 2 - allocate some memory | |
| 784 memarr = [None] * 1500000 | |
| 785 | |
| 786 rss2, vms2 = p.get_memory_info() | |
| 787 percent2 = p.get_memory_percent() | |
| 788 # make sure that the memory usage bumped up | |
| 789 self.assertTrue(rss2 > rss1) | |
| 790 self.assertTrue(vms2 >= vms1) # vms might be equal | |
| 791 self.assertTrue(percent2 > percent1) | |
| 792 del memarr | |
| 793 | |
| 794 def test_get_memory_percent(self): | |
| 795 p = psutil.Process(os.getpid()) | |
| 796 self.assertTrue(p.get_memory_percent() > 0.0) | |
| 797 | |
| 798 def test_pid(self): | |
| 799 sproc = get_test_subprocess() | |
| 800 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) | |
| 801 | |
| 802 def test_is_running(self): | |
| 803 sproc = get_test_subprocess() | |
| 804 wait_for_pid(sproc.pid) | |
| 805 p = psutil.Process(sproc.pid) | |
| 806 self.assertTrue(p.is_running()) | |
| 807 p.kill() | |
| 808 p.wait() | |
| 809 self.assertFalse(p.is_running()) | |
| 810 | |
| 811 def test_exe(self): | |
| 812 sproc = get_test_subprocess() | |
| 813 wait_for_pid(sproc.pid) | |
| 814 try: | |
| 815 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON) | |
| 816 except AssertionError: | |
| 817 # certain platforms such as BSD are more accurate returning: | |
| 818 # "/usr/local/bin/python2.7" | |
| 819 # ...instead of: | |
| 820 # "/usr/local/bin/python" | |
| 821 # We do not want to consider this difference in accuracy | |
| 822 # an error. | |
| 823 name = psutil.Process(sproc.pid).exe | |
| 824 adjusted_name = PYTHON[:len(name)] | |
| 825 self.assertEqual(name, adjusted_name) | |
| 826 for p in psutil.process_iter(): | |
| 827 try: | |
| 828 exe = p.exe | |
| 829 except psutil.Error: | |
| 830 continue | |
| 831 if not exe: | |
| 832 continue | |
| 833 if not os.path.exists(exe): | |
| 834 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \ | |
| 835 % (repr(exe), p.pid, p.name, p.cmdline)) | |
| 836 if hasattr(os, 'access') and hasattr(os, "X_OK"): | |
| 837 if not os.access(p.exe, os.X_OK): | |
| 838 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s
)" \ | |
| 839 % (repr(p.exe), p.pid, p.name, p.cmdline)) | |
| 840 | |
| 841 def test_cmdline(self): | |
| 842 sproc = get_test_subprocess([PYTHON, "-E"]) | |
| 843 wait_for_pid(sproc.pid) | |
| 844 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"]) | |
| 845 | |
| 846 def test_name(self): | |
| 847 sproc = get_test_subprocess(PYTHON) | |
| 848 wait_for_pid(sproc.pid) | |
| 849 self.assertEqual(psutil.Process(sproc.pid).name.lower(), | |
| 850 os.path.basename(sys.executable).lower()) | |
| 851 | |
| 852 if os.name == 'posix': | |
| 853 | |
| 854 def test_uids(self): | |
| 855 p = psutil.Process(os.getpid()) | |
| 856 real, effective, saved = p.uids | |
| 857 # os.getuid() refers to "real" uid | |
| 858 self.assertEqual(real, os.getuid()) | |
| 859 # os.geteuid() refers to "effective" uid | |
| 860 self.assertEqual(effective, os.geteuid()) | |
| 861 # no such thing as os.getsuid() ("saved" uid), but starting | |
| 862 # from python 2.7 we have os.getresuid()[2] | |
| 863 if hasattr(os, "getresuid"): | |
| 864 self.assertEqual(saved, os.getresuid()[2]) | |
| 865 | |
| 866 def test_gids(self): | |
| 867 p = psutil.Process(os.getpid()) | |
| 868 real, effective, saved = p.gids | |
| 869 # os.getuid() refers to "real" uid | |
| 870 self.assertEqual(real, os.getgid()) | |
| 871 # os.geteuid() refers to "effective" uid | |
| 872 self.assertEqual(effective, os.getegid()) | |
| 873 # no such thing as os.getsuid() ("saved" uid), but starting | |
| 874 # from python 2.7 we have os.getresgid()[2] | |
| 875 if hasattr(os, "getresuid"): | |
| 876 self.assertEqual(saved, os.getresgid()[2]) | |
| 877 | |
| 878 def test_nice(self): | |
| 879 p = psutil.Process(os.getpid()) | |
| 880 self.assertRaises(TypeError, setattr, p, "nice", "str") | |
| 881 try: | |
| 882 try: | |
| 883 first_nice = p.nice | |
| 884 p.nice = 1 | |
| 885 self.assertEqual(p.nice, 1) | |
| 886 # going back to previous nice value raises AccessDenied on O
SX | |
| 887 if not OSX: | |
| 888 p.nice = 0 | |
| 889 self.assertEqual(p.nice, 0) | |
| 890 except psutil.AccessDenied: | |
| 891 pass | |
| 892 finally: | |
| 893 # going back to previous nice value raises AccessDenied on OSX | |
| 894 if not OSX: | |
| 895 p.nice = first_nice | |
| 896 | |
| 897 if os.name == 'nt': | |
| 898 | |
| 899 def test_nice(self): | |
| 900 p = psutil.Process(os.getpid()) | |
| 901 self.assertRaises(TypeError, setattr, p, "nice", "str") | |
| 902 try: | |
| 903 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) | |
| 904 p.nice = psutil.HIGH_PRIORITY_CLASS | |
| 905 self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS) | |
| 906 p.nice = psutil.NORMAL_PRIORITY_CLASS | |
| 907 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) | |
| 908 finally: | |
| 909 p.nice = psutil.NORMAL_PRIORITY_CLASS | |
| 910 | |
| 911 def test_status(self): | |
| 912 p = psutil.Process(os.getpid()) | |
| 913 self.assertEqual(p.status, psutil.STATUS_RUNNING) | |
| 914 self.assertEqual(str(p.status), "running") | |
| 915 for p in psutil.process_iter(): | |
| 916 if str(p.status) == '?': | |
| 917 self.fail("invalid status for pid %d" % p.pid) | |
| 918 | |
| 919 def test_username(self): | |
| 920 sproc = get_test_subprocess() | |
| 921 p = psutil.Process(sproc.pid) | |
| 922 if POSIX: | |
| 923 import pwd | |
| 924 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) | |
| 925 elif WINDOWS: | |
| 926 expected_username = os.environ['USERNAME'] | |
| 927 expected_domain = os.environ['USERDOMAIN'] | |
| 928 domain, username = p.username.split('\\') | |
| 929 self.assertEqual(domain, expected_domain) | |
| 930 self.assertEqual(username, expected_username) | |
| 931 else: | |
| 932 p.username | |
| 933 | |
| 934 @skipUnless(WINDOWS or LINUX) | |
| 935 def test_getcwd(self): | |
| 936 sproc = get_test_subprocess() | |
| 937 wait_for_pid(sproc.pid) | |
| 938 p = psutil.Process(sproc.pid) | |
| 939 self.assertEqual(p.getcwd(), os.getcwd()) | |
| 940 | |
| 941 @skipUnless(WINDOWS or LINUX) | |
| 942 def test_getcwd_2(self): | |
| 943 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] | |
| 944 sproc = get_test_subprocess(cmd) | |
| 945 wait_for_pid(sproc.pid) | |
| 946 p = psutil.Process(sproc.pid) | |
| 947 time.sleep(0.1) | |
| 948 expected_dir = os.path.dirname(os.getcwd()) | |
| 949 self.assertEqual(p.getcwd(), expected_dir) | |
| 950 | |
| 951 def test_get_open_files(self): | |
| 952 # current process | |
| 953 p = psutil.Process(os.getpid()) | |
| 954 files = p.get_open_files() | |
| 955 self.assertFalse(TESTFN in files) | |
| 956 f = open(TESTFN, 'r') | |
| 957 time.sleep(.1) | |
| 958 filenames = [x.path for x in p.get_open_files()] | |
| 959 self.assertTrue(TESTFN in filenames) | |
| 960 f.close() | |
| 961 for file in filenames: | |
| 962 self.assertTrue(os.path.isfile(file)) | |
| 963 | |
| 964 # another process | |
| 965 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN | |
| 966 sproc = get_test_subprocess([PYTHON, "-c", cmdline]) | |
| 967 wait_for_pid(sproc.pid) | |
| 968 time.sleep(0.1) | |
| 969 p = psutil.Process(sproc.pid) | |
| 970 for x in range(100): | |
| 971 filenames = [x.path for x in p.get_open_files()] | |
| 972 if TESTFN in filenames: | |
| 973 break | |
| 974 time.sleep(.01) | |
| 975 else: | |
| 976 self.assertTrue(TESTFN in filenames) | |
| 977 for file in filenames: | |
| 978 self.assertTrue(os.path.isfile(file)) | |
| 979 # all processes | |
| 980 for proc in psutil.process_iter(): | |
| 981 try: | |
| 982 files = proc.get_open_files() | |
| 983 except psutil.Error: | |
| 984 pass | |
| 985 else: | |
| 986 for file in filenames: | |
| 987 self.assertTrue(os.path.isfile(file)) | |
| 988 | |
| 989 def test_get_open_files2(self): | |
| 990 # test fd and path fields | |
| 991 fileobj = open(TESTFN, 'r') | |
| 992 p = psutil.Process(os.getpid()) | |
| 993 for path, fd in p.get_open_files(): | |
| 994 if path == fileobj.name or fd == fileobj.fileno(): | |
| 995 break | |
| 996 else: | |
| 997 self.fail("no file found; files=%s" % repr(p.get_open_files())) | |
| 998 self.assertEqual(path, fileobj.name) | |
| 999 if WINDOWS: | |
| 1000 self.assertEqual(fd, -1) | |
| 1001 else: | |
| 1002 self.assertEqual(fd, fileobj.fileno()) | |
| 1003 # test positions | |
| 1004 ntuple = p.get_open_files()[0] | |
| 1005 self.assertEqual(ntuple[0], ntuple.path) | |
| 1006 self.assertEqual(ntuple[1], ntuple.fd) | |
| 1007 # test file is gone | |
| 1008 fileobj.close() | |
| 1009 self.assertTrue(fileobj.name not in p.get_open_files()) | |
| 1010 | |
| 1011 @skipUnless(SUPPORT_CONNECTIONS, warn=1) | |
| 1012 def test_get_connections(self): | |
| 1013 arg = "import socket, time;" \ | |
| 1014 "s = socket.socket();" \ | |
| 1015 "s.bind(('127.0.0.1', 0));" \ | |
| 1016 "s.listen(1);" \ | |
| 1017 "conn, addr = s.accept();" \ | |
| 1018 "time.sleep(100);" | |
| 1019 sproc = get_test_subprocess([PYTHON, "-c", arg]) | |
| 1020 p = psutil.Process(sproc.pid) | |
| 1021 for x in range(100): | |
| 1022 cons = p.get_connections() | |
| 1023 if cons: | |
| 1024 break | |
| 1025 time.sleep(.01) | |
| 1026 self.assertEqual(len(cons), 1) | |
| 1027 con = cons[0] | |
| 1028 self.assertEqual(con.family, socket.AF_INET) | |
| 1029 self.assertEqual(con.type, socket.SOCK_STREAM) | |
| 1030 self.assertEqual(con.status, "LISTEN") | |
| 1031 ip, port = con.local_address | |
| 1032 self.assertEqual(ip, '127.0.0.1') | |
| 1033 self.assertEqual(con.remote_address, ()) | |
| 1034 if WINDOWS: | |
| 1035 self.assertEqual(con.fd, -1) | |
| 1036 else: | |
| 1037 self.assertTrue(con.fd > 0) | |
| 1038 # test positions | |
| 1039 self.assertEqual(con[0], con.fd) | |
| 1040 self.assertEqual(con[1], con.family) | |
| 1041 self.assertEqual(con[2], con.type) | |
| 1042 self.assertEqual(con[3], con.local_address) | |
| 1043 self.assertEqual(con[4], con.remote_address) | |
| 1044 self.assertEqual(con[5], con.status) | |
| 1045 | |
| 1046 @skipUnless(supports_ipv6()) | |
| 1047 def test_get_connections_ipv6(self): | |
| 1048 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
| 1049 s.bind(('::1', 0)) | |
| 1050 s.listen(1) | |
| 1051 cons = psutil.Process(os.getpid()).get_connections() | |
| 1052 s.close() | |
| 1053 self.assertEqual(len(cons), 1) | |
| 1054 self.assertEqual(cons[0].local_address[0], '::1') | |
| 1055 | |
| 1056 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS) | |
| 1057 def test_connection_fromfd(self): | |
| 1058 sock = socket.socket() | |
| 1059 sock.bind(('localhost', 0)) | |
| 1060 sock.listen(1) | |
| 1061 p = psutil.Process(os.getpid()) | |
| 1062 for conn in p.get_connections(): | |
| 1063 if conn.fd == sock.fileno(): | |
| 1064 break | |
| 1065 else: | |
| 1066 sock.close() | |
| 1067 self.fail("couldn't find socket fd") | |
| 1068 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) | |
| 1069 try: | |
| 1070 self.assertEqual(dupsock.getsockname(), conn.local_address) | |
| 1071 self.assertNotEqual(sock.fileno(), dupsock.fileno()) | |
| 1072 finally: | |
| 1073 sock.close() | |
| 1074 dupsock.close() | |
| 1075 | |
| 1076 @skipUnless(SUPPORT_CONNECTIONS, warn=1) | |
| 1077 def test_get_connections_all(self): | |
| 1078 | |
| 1079 def check_address(addr, family): | |
| 1080 if not addr: | |
| 1081 return | |
| 1082 ip, port = addr | |
| 1083 self.assertTrue(isinstance(port, int)) | |
| 1084 if family == socket.AF_INET: | |
| 1085 ip = list(map(int, ip.split('.'))) | |
| 1086 self.assertTrue(len(ip) == 4) | |
| 1087 for num in ip: | |
| 1088 self.assertTrue(0 <= num <= 255) | |
| 1089 self.assertTrue(0 <= port <= 65535) | |
| 1090 | |
| 1091 # all values are supposed to match Linux's tcp_states.h states | |
| 1092 # table across all platforms. | |
| 1093 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", | |
| 1094 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", | |
| 1095 "LAST_ACK", "LISTEN", "CLOSING", ""] | |
| 1096 | |
| 1097 tcp_template = "import socket;" \ | |
| 1098 "s = socket.socket($family, socket.SOCK_STREAM);" \ | |
| 1099 "s.bind(('$addr', 0));" \ | |
| 1100 "s.listen(1);" \ | |
| 1101 "conn, addr = s.accept();" | |
| 1102 | |
| 1103 udp_template = "import socket, time;" \ | |
| 1104 "s = socket.socket($family, socket.SOCK_DGRAM);" \ | |
| 1105 "s.bind(('$addr', 0));" \ | |
| 1106 "time.sleep(100);" | |
| 1107 | |
| 1108 from string import Template | |
| 1109 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, | |
| 1110 addr="127.0.0.1") | |
| 1111 udp4_template = Template(udp_template).substitute(family=socket.AF_INET, | |
| 1112 addr="127.0.0.1") | |
| 1113 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6
, | |
| 1114 addr="::1") | |
| 1115 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6
, | |
| 1116 addr="::1") | |
| 1117 | |
| 1118 # launch various subprocess instantiating a socket of various | |
| 1119 # families and tupes to enrich psutil results | |
| 1120 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) | |
| 1121 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) | |
| 1122 if supports_ipv6(): | |
| 1123 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) | |
| 1124 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) | |
| 1125 else: | |
| 1126 tcp6_proc = None | |
| 1127 udp6_proc = None | |
| 1128 | |
| 1129 # --- check connections of all processes | |
| 1130 | |
| 1131 time.sleep(0.1) | |
| 1132 for p in psutil.process_iter(): | |
| 1133 try: | |
| 1134 cons = p.get_connections() | |
| 1135 except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| 1136 pass | |
| 1137 else: | |
| 1138 for conn in cons: | |
| 1139 self.assertTrue(conn.type in (socket.SOCK_STREAM, | |
| 1140 socket.SOCK_DGRAM)) | |
| 1141 self.assertTrue(conn.family in (socket.AF_INET, | |
| 1142 socket.AF_INET6)) | |
| 1143 check_address(conn.local_address, conn.family) | |
| 1144 check_address(conn.remote_address, conn.family) | |
| 1145 if conn.status not in valid_states: | |
| 1146 self.fail("%s is not a valid status" %conn.status) | |
| 1147 # actually try to bind the local socket; ignore IPv6 | |
| 1148 # sockets as their address might be represented as | |
| 1149 # an IPv4-mapped-address (e.g. "::127.0.0.1") | |
| 1150 # and that's rejected by bind() | |
| 1151 if conn.family == socket.AF_INET: | |
| 1152 s = socket.socket(conn.family, conn.type) | |
| 1153 s.bind((conn.local_address[0], 0)) | |
| 1154 s.close() | |
| 1155 | |
| 1156 if not WINDOWS and hasattr(socket, 'fromfd'): | |
| 1157 dupsock = None | |
| 1158 try: | |
| 1159 try: | |
| 1160 dupsock = socket.fromfd(conn.fd, conn.family, | |
| 1161 conn.type) | |
| 1162 except (socket.error, OSError): | |
| 1163 err = sys.exc_info()[1] | |
| 1164 if err.args[0] == errno.EBADF: | |
| 1165 continue | |
| 1166 else: | |
| 1167 raise | |
| 1168 # python >= 2.5 | |
| 1169 if hasattr(dupsock, "family"): | |
| 1170 self.assertEqual(dupsock.family, conn.family) | |
| 1171 self.assertEqual(dupsock.type, conn.type) | |
| 1172 finally: | |
| 1173 if dupsock is not None: | |
| 1174 dupsock.close() | |
| 1175 | |
| 1176 | |
| 1177 # --- check matches against subprocesses | |
| 1178 | |
| 1179 for p in psutil.Process(os.getpid()).get_children(): | |
| 1180 for conn in p.get_connections(): | |
| 1181 # TCP v4 | |
| 1182 if p.pid == tcp4_proc.pid: | |
| 1183 self.assertEqual(conn.family, socket.AF_INET) | |
| 1184 self.assertEqual(conn.type, socket.SOCK_STREAM) | |
| 1185 self.assertEqual(conn.local_address[0], "127.0.0.1") | |
| 1186 self.assertEqual(conn.remote_address, ()) | |
| 1187 self.assertEqual(conn.status, "LISTEN") | |
| 1188 # UDP v4 | |
| 1189 elif p.pid == udp4_proc.pid: | |
| 1190 self.assertEqual(conn.family, socket.AF_INET) | |
| 1191 self.assertEqual(conn.type, socket.SOCK_DGRAM) | |
| 1192 self.assertEqual(conn.local_address[0], "127.0.0.1") | |
| 1193 self.assertEqual(conn.remote_address, ()) | |
| 1194 self.assertEqual(conn.status, "") | |
| 1195 # TCP v6 | |
| 1196 elif p.pid == getattr(tcp6_proc, "pid", None): | |
| 1197 self.assertEqual(conn.family, socket.AF_INET6) | |
| 1198 self.assertEqual(conn.type, socket.SOCK_STREAM) | |
| 1199 self.assertTrue(conn.local_address[0] in ("::", "::1")) | |
| 1200 self.assertEqual(conn.remote_address, ()) | |
| 1201 self.assertEqual(conn.status, "LISTEN") | |
| 1202 # UDP v6 | |
| 1203 elif p.pid == getattr(udp6_proc, "pid", None): | |
| 1204 self.assertEqual(conn.family, socket.AF_INET6) | |
| 1205 self.assertEqual(conn.type, socket.SOCK_DGRAM) | |
| 1206 self.assertTrue(conn.local_address[0] in ("::", "::1")) | |
| 1207 self.assertEqual(conn.remote_address, ()) | |
| 1208 self.assertEqual(conn.status, "") | |
| 1209 | |
| 1210 def test_parent_ppid(self): | |
| 1211 this_parent = os.getpid() | |
| 1212 sproc = get_test_subprocess() | |
| 1213 p = psutil.Process(sproc.pid) | |
| 1214 self.assertEqual(p.ppid, this_parent) | |
| 1215 self.assertEqual(p.parent.pid, this_parent) | |
| 1216 # no other process is supposed to have us as parent | |
| 1217 for p in psutil.process_iter(): | |
| 1218 if p.pid == sproc.pid: | |
| 1219 continue | |
| 1220 self.assertTrue(p.ppid != this_parent) | |
| 1221 | |
| 1222 def test_get_children(self): | |
| 1223 p = psutil.Process(os.getpid()) | |
| 1224 self.assertEqual(p.get_children(), []) | |
| 1225 sproc = get_test_subprocess() | |
| 1226 children = p.get_children() | |
| 1227 self.assertEqual(len(children), 1) | |
| 1228 self.assertEqual(children[0].pid, sproc.pid) | |
| 1229 self.assertEqual(children[0].ppid, os.getpid()) | |
| 1230 | |
| 1231 def test_suspend_resume(self): | |
| 1232 sproc = get_test_subprocess() | |
| 1233 p = psutil.Process(sproc.pid) | |
| 1234 p.suspend() | |
| 1235 time.sleep(0.1) | |
| 1236 self.assertEqual(p.status, psutil.STATUS_STOPPED) | |
| 1237 self.assertEqual(str(p.status), "stopped") | |
| 1238 p.resume() | |
| 1239 self.assertTrue(p.status != psutil.STATUS_STOPPED) | |
| 1240 | |
| 1241 def test_invalid_pid(self): | |
| 1242 self.assertRaises(ValueError, psutil.Process, "1") | |
| 1243 self.assertRaises(ValueError, psutil.Process, None) | |
| 1244 # Refers to Issue #12 | |
| 1245 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1) | |
| 1246 | |
| 1247 def test_zombie_process(self): | |
| 1248 # Test that NoSuchProcess exception gets raised in case the | |
| 1249 # process dies after we create the Process object. | |
| 1250 # Example: | |
| 1251 # >>> proc = Process(1234) | |
| 1252 # >>> time.sleep(5) # time-consuming task, process dies in meantime | |
| 1253 # >>> proc.name | |
| 1254 # Refers to Issue #15 | |
| 1255 sproc = get_test_subprocess() | |
| 1256 p = psutil.Process(sproc.pid) | |
| 1257 p.kill() | |
| 1258 p.wait() | |
| 1259 | |
| 1260 for name in dir(p): | |
| 1261 if name.startswith('_')\ | |
| 1262 or name in ('pid', 'send_signal', 'is_running', 'set_ionice', | |
| 1263 'wait'): | |
| 1264 continue | |
| 1265 try: | |
| 1266 meth = getattr(p, name) | |
| 1267 if callable(meth): | |
| 1268 meth() | |
| 1269 except psutil.NoSuchProcess: | |
| 1270 pass | |
| 1271 else: | |
| 1272 self.fail("NoSuchProcess exception not raised for %r" % name) | |
| 1273 | |
| 1274 # other methods | |
| 1275 try: | |
| 1276 if os.name == 'posix': | |
| 1277 p.nice = 1 | |
| 1278 else: | |
| 1279 p.nice = psutil.NORMAL_PRIORITY_CLASS | |
| 1280 except psutil.NoSuchProcess: | |
| 1281 pass | |
| 1282 else: | |
| 1283 self.fail("exception not raised") | |
| 1284 if hasattr(p, 'set_ionice'): | |
| 1285 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2) | |
| 1286 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) | |
| 1287 self.assertFalse(p.is_running()) | |
| 1288 | |
| 1289 def test__str__(self): | |
| 1290 sproc = get_test_subprocess() | |
| 1291 p = psutil.Process(sproc.pid) | |
| 1292 self.assertTrue(str(sproc.pid) in str(p)) | |
| 1293 # python shows up as 'Python' in cmdline on OS X so test fails on OS X | |
| 1294 if not OSX: | |
| 1295 self.assertTrue(os.path.basename(PYTHON) in str(p)) | |
| 1296 sproc = get_test_subprocess() | |
| 1297 p = psutil.Process(sproc.pid) | |
| 1298 p.kill() | |
| 1299 p.wait() | |
| 1300 self.assertTrue(str(sproc.pid) in str(p)) | |
| 1301 self.assertTrue("terminated" in str(p)) | |
| 1302 | |
| 1303 def test_fetch_all(self): | |
| 1304 valid_procs = 0 | |
| 1305 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', | |
| 1306 'kill', 'wait'] | |
| 1307 excluded_names += ['get_cpu_percent', 'get_children'] | |
| 1308 # XXX - skip slow lsof implementation; | |
| 1309 if BSD: | |
| 1310 excluded_names += ['get_open_files', 'get_connections'] | |
| 1311 if OSX: | |
| 1312 excluded_names += ['get_connections'] | |
| 1313 attrs = [] | |
| 1314 for name in dir(psutil.Process): | |
| 1315 if name.startswith("_"): | |
| 1316 continue | |
| 1317 if name.startswith("set_"): | |
| 1318 continue | |
| 1319 if name in excluded_names: | |
| 1320 continue | |
| 1321 attrs.append(name) | |
| 1322 | |
| 1323 for p in psutil.process_iter(): | |
| 1324 for name in attrs: | |
| 1325 try: | |
| 1326 try: | |
| 1327 attr = getattr(p, name, None) | |
| 1328 if attr is not None and callable(attr): | |
| 1329 ret = attr() | |
| 1330 else: | |
| 1331 ret = attr | |
| 1332 valid_procs += 1 | |
| 1333 except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| 1334 err = sys.exc_info()[1] | |
| 1335 self.assertEqual(err.pid, p.pid) | |
| 1336 if err.name: | |
| 1337 self.assertEqual(err.name, p.name) | |
| 1338 self.assertTrue(str(err)) | |
| 1339 self.assertTrue(err.msg) | |
| 1340 else: | |
| 1341 if name == 'parent' or ret in (0, 0.0, [], None): | |
| 1342 continue | |
| 1343 self.assertTrue(ret) | |
| 1344 if name == "exe": | |
| 1345 self.assertTrue(os.path.isfile(ret)) | |
| 1346 elif name == "getcwd": | |
| 1347 # XXX - temporary fix; on my Linux box | |
| 1348 # chrome process cws is errnously reported | |
| 1349 # as /proc/4144/fdinfo whichd doesn't exist | |
| 1350 if 'chrome' in p.name: | |
| 1351 continue | |
| 1352 self.assertTrue(os.path.isdir(ret)) | |
| 1353 except Exception: | |
| 1354 err = sys.exc_info()[1] | |
| 1355 trace = traceback.format_exc() | |
| 1356 self.fail('%s\nmethod=%s, pid=%s, retvalue=%s' | |
| 1357 %(trace, name, p.pid, repr(ret))) | |
| 1358 | |
| 1359 # we should always have a non-empty list, not including PID 0 etc. | |
| 1360 # special cases. | |
| 1361 self.assertTrue(valid_procs > 0) | |
| 1362 | |
| 1363 @skipIf(LINUX) | |
| 1364 def test_pid_0(self): | |
| 1365 # Process(0) is supposed to work on all platforms except Linux | |
| 1366 p = psutil.Process(0) | |
| 1367 self.assertTrue(p.name) | |
| 1368 | |
| 1369 if os.name == 'posix': | |
| 1370 self.assertEqual(p.uids.real, 0) | |
| 1371 self.assertEqual(p.gids.real, 0) | |
| 1372 | |
| 1373 self.assertTrue(p.ppid in (0, 1)) | |
| 1374 #self.assertEqual(p.exe, "") | |
| 1375 self.assertEqual(p.cmdline, []) | |
| 1376 try: | |
| 1377 p.get_num_threads() | |
| 1378 except psutil.AccessDenied: | |
| 1379 pass | |
| 1380 | |
| 1381 if OSX : #and os.geteuid() != 0: | |
| 1382 self.assertRaises(psutil.AccessDenied, p.get_memory_info) | |
| 1383 self.assertRaises(psutil.AccessDenied, p.get_cpu_times) | |
| 1384 else: | |
| 1385 p.get_memory_info() | |
| 1386 | |
| 1387 # username property | |
| 1388 if POSIX: | |
| 1389 self.assertEqual(p.username, 'root') | |
| 1390 elif WINDOWS: | |
| 1391 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') | |
| 1392 else: | |
| 1393 p.username | |
| 1394 | |
| 1395 self.assertTrue(0 in psutil.get_pid_list()) | |
| 1396 self.assertTrue(psutil.pid_exists(0)) | |
| 1397 | |
| 1398 def test_Popen(self): | |
| 1399 # Popen class test | |
| 1400 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] | |
| 1401 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| 1402 try: | |
| 1403 proc.name | |
| 1404 proc.stdin | |
| 1405 self.assertTrue(hasattr(proc, 'name')) | |
| 1406 self.assertTrue(hasattr(proc, 'stdin')) | |
| 1407 self.assertRaises(AttributeError, getattr, proc, 'foo') | |
| 1408 finally: | |
| 1409 proc.kill() | |
| 1410 proc.wait() | |
| 1411 | |
| 1412 | |
| 1413 if hasattr(os, 'getuid'): | |
| 1414 class LimitedUserTestCase(TestCase): | |
| 1415 """Repeat the previous tests by using a limited user. | |
| 1416 Executed only on UNIX and only if the user who run the test script | |
| 1417 is root. | |
| 1418 """ | |
| 1419 # the uid/gid the test suite runs under | |
| 1420 PROCESS_UID = os.getuid() | |
| 1421 PROCESS_GID = os.getgid() | |
| 1422 | |
| 1423 def __init__(self, *args, **kwargs): | |
| 1424 TestCase.__init__(self, *args, **kwargs) | |
| 1425 # re-define all existent test methods in order to | |
| 1426 # ignore AccessDenied exceptions | |
| 1427 for attr in [x for x in dir(self) if x.startswith('test')]: | |
| 1428 meth = getattr(self, attr) | |
| 1429 def test_(self): | |
| 1430 try: | |
| 1431 meth() | |
| 1432 except psutil.AccessDenied: | |
| 1433 pass | |
| 1434 setattr(self, attr, types.MethodType(test_, self)) | |
| 1435 | |
| 1436 def setUp(self): | |
| 1437 os.setegid(1000) | |
| 1438 os.seteuid(1000) | |
| 1439 TestCase.setUp(self) | |
| 1440 | |
| 1441 def tearDown(self): | |
| 1442 os.setegid(self.PROCESS_UID) | |
| 1443 os.seteuid(self.PROCESS_GID) | |
| 1444 TestCase.tearDown(self) | |
| 1445 | |
| 1446 def test_nice(self): | |
| 1447 try: | |
| 1448 psutil.Process(os.getpid()).nice = -1 | |
| 1449 except psutil.AccessDenied: | |
| 1450 pass | |
| 1451 else: | |
| 1452 self.fail("exception not raised") | |
| 1453 | |
| 1454 | |
| 1455 def test_main(): | |
| 1456 tests = [] | |
| 1457 test_suite = unittest.TestSuite() | |
| 1458 tests.append(TestCase) | |
| 1459 | |
| 1460 if POSIX: | |
| 1461 from _posix import PosixSpecificTestCase | |
| 1462 tests.append(PosixSpecificTestCase) | |
| 1463 | |
| 1464 # import the specific platform test suite | |
| 1465 if LINUX: | |
| 1466 from _linux import LinuxSpecificTestCase as stc | |
| 1467 elif WINDOWS: | |
| 1468 from _windows import WindowsSpecificTestCase as stc | |
| 1469 elif OSX: | |
| 1470 from _osx import OSXSpecificTestCase as stc | |
| 1471 elif BSD: | |
| 1472 from _bsd import BSDSpecificTestCase as stc | |
| 1473 tests.append(stc) | |
| 1474 | |
| 1475 if hasattr(os, 'getuid'): | |
| 1476 if os.getuid() == 0: | |
| 1477 tests.append(LimitedUserTestCase) | |
| 1478 else: | |
| 1479 atexit.register(warnings.warn, "Couldn't run limited user tests (" | |
| 1480 "super-user privileges are required)", RuntimeWarning) | |
| 1481 | |
| 1482 for test_class in tests: | |
| 1483 test_suite.addTest(unittest.makeSuite(test_class)) | |
| 1484 | |
| 1485 f = open(TESTFN, 'w') | |
| 1486 f.close() | |
| 1487 atexit.register(lambda: os.remove(TESTFN)) | |
| 1488 | |
| 1489 unittest.TextTestRunner(verbosity=2).run(test_suite) | |
| 1490 DEVNULL.close() | |
| 1491 | |
| 1492 if __name__ == '__main__': | |
| 1493 test_main() | |
| OLD | NEW |