OLD | NEW |
| (Empty) |
1 #!/usr/bin/env python | |
2 # | |
3 # $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $ | |
4 # | |
5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. | |
6 # Use of this source code is governed by a BSD-style license that can be | |
7 # found in the LICENSE file. | |
8 | |
9 """ | |
10 psutil test suite. | |
11 | |
12 Note: this is targeted for both python 2.x and 3.x so there's no need | |
13 to use 2to3 tool first. | |
14 """ | |
15 | |
16 import unittest | |
17 import os | |
18 import sys | |
19 import subprocess | |
20 import time | |
21 import signal | |
22 import types | |
23 import traceback | |
24 import socket | |
25 import warnings | |
26 import atexit | |
27 import errno | |
28 import threading | |
29 import tempfile | |
30 import collections | |
31 | |
32 import psutil | |
33 | |
34 | |
35 PYTHON = os.path.realpath(sys.executable) | |
36 DEVNULL = open(os.devnull, 'r+') | |
37 TESTFN = os.path.join(os.getcwd(), "$testfile") | |
38 PY3 = sys.version_info >= (3,) | |
39 POSIX = os.name == 'posix' | |
40 LINUX = sys.platform.lower().startswith("linux") | |
41 WINDOWS = sys.platform.lower().startswith("win32") | |
42 OSX = sys.platform.lower().startswith("darwin") | |
43 BSD = sys.platform.lower().startswith("freebsd") | |
44 | |
45 try: | |
46 psutil.Process(os.getpid()).get_connections() | |
47 except NotImplementedError: | |
48 err = sys.exc_info()[1] | |
49 SUPPORT_CONNECTIONS = False | |
50 atexit.register(warnings.warn, "get_connections() not supported on this plat
form", | |
51 RuntimeWarning) | |
52 else: | |
53 SUPPORT_CONNECTIONS = True | |
54 | |
55 if PY3: | |
56 long = int | |
57 | |
58 def callable(attr): | |
59 return isinstance(attr, collections.Callable) | |
60 | |
61 | |
62 _subprocesses_started = set() | |
63 | |
64 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None): | |
65 """Return a subprocess.Popen object to use in tests. | |
66 By default stdout and stderr are redirected to /dev/null and the | |
67 python interpreter is used as test process. | |
68 """ | |
69 if cmd is None: | |
70 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] | |
71 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin) | |
72 _subprocesses_started.add(sproc.pid) | |
73 return sproc | |
74 | |
75 def sh(cmdline): | |
76 """run cmd in a subprocess and return its output. | |
77 raises RuntimeError on error. | |
78 """ | |
79 p = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE, | |
80 stderr=subprocess.PIPE) | |
81 stdout, stderr = p.communicate() | |
82 if p.returncode != 0: | |
83 raise RuntimeError(stderr) | |
84 if stderr: | |
85 warnings.warn(stderr, RuntimeWarning) | |
86 if sys.version_info >= (3,): | |
87 stdout = str(stdout, sys.stdout.encoding) | |
88 return stdout.strip() | |
89 | |
90 def wait_for_pid(pid, timeout=1): | |
91 """Wait for pid to show up in the process list then return. | |
92 Used in the test suite to give time the sub process to initialize. | |
93 """ | |
94 raise_at = time.time() + timeout | |
95 while 1: | |
96 if pid in psutil.get_pid_list(): | |
97 # give it one more iteration to allow full initialization | |
98 time.sleep(0.01) | |
99 return | |
100 time.sleep(0.0001) | |
101 if time.time() >= raise_at: | |
102 raise RuntimeError("Timed out") | |
103 | |
104 def reap_children(search_all=False): | |
105 """Kill any subprocess started by this test suite and ensure that | |
106 no zombies stick around to hog resources and create problems when | |
107 looking for refleaks. | |
108 """ | |
109 if search_all: | |
110 this_process = psutil.Process(os.getpid()) | |
111 pids = [x.pid for x in this_process.get_children()] | |
112 else: | |
113 pids =_subprocesses_started | |
114 while pids: | |
115 pid = pids.pop() | |
116 try: | |
117 child = psutil.Process(pid) | |
118 child.kill() | |
119 except psutil.NoSuchProcess: | |
120 pass | |
121 else: | |
122 child.wait() | |
123 | |
124 | |
125 # we want to search through all processes before exiting | |
126 atexit.register(reap_children, search_all=True) | |
127 | |
128 def skipIf(condition, reason="", warn=False): | |
129 """Decorator which skip a test under if condition is satisfied. | |
130 This is a substitute of unittest.skipIf which is available | |
131 only in python 2.7 and 3.2. | |
132 If 'reason' argument is provided this will be printed during | |
133 tests execution. | |
134 If 'warn' is provided a RuntimeWarning will be shown when all | |
135 tests are run. | |
136 """ | |
137 def outer(fun, *args, **kwargs): | |
138 def inner(self): | |
139 if condition: | |
140 sys.stdout.write("skipped-") | |
141 sys.stdout.flush() | |
142 if warn: | |
143 objname = "%s.%s" % (self.__class__.__name__, fun.__name__) | |
144 msg = "%s was skipped" % objname | |
145 if reason: | |
146 msg += "; reason: " + repr(reason) | |
147 atexit.register(warnings.warn, msg, RuntimeWarning) | |
148 return | |
149 else: | |
150 return fun(self, *args, **kwargs) | |
151 return inner | |
152 return outer | |
153 | |
154 def skipUnless(condition, reason="", warn=False): | |
155 """Contrary of skipIf.""" | |
156 if not condition: | |
157 return skipIf(True, reason, warn) | |
158 return skipIf(False) | |
159 | |
160 def ignore_access_denied(fun): | |
161 """Decorator to Ignore AccessDenied exceptions.""" | |
162 def outer(fun, *args, **kwargs): | |
163 def inner(self): | |
164 try: | |
165 return fun(self, *args, **kwargs) | |
166 except psutil.AccessDenied: | |
167 pass | |
168 return inner | |
169 return outer | |
170 | |
171 def supports_ipv6(): | |
172 """Return True if IPv6 is supported on this platform.""" | |
173 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): | |
174 return False | |
175 sock = None | |
176 try: | |
177 try: | |
178 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
179 sock.bind(("::1", 0)) | |
180 except (socket.error, socket.gaierror): | |
181 return False | |
182 else: | |
183 return True | |
184 finally: | |
185 if sock is not None: | |
186 sock.close() | |
187 | |
188 | |
189 class ThreadTask(threading.Thread): | |
190 """A thread object used for running process thread tests.""" | |
191 | |
192 def __init__(self): | |
193 threading.Thread.__init__(self) | |
194 self._running = False | |
195 self._interval = None | |
196 self._flag = threading.Event() | |
197 | |
198 def __repr__(self): | |
199 name = self.__class__.__name__ | |
200 return '<%s running=%s at %#x>' % (name, self._running, id(self)) | |
201 | |
202 def start(self, interval=0.001): | |
203 """Start thread and keep it running until an explicit | |
204 stop() request. Polls for shutdown every 'timeout' seconds. | |
205 """ | |
206 if self._running: | |
207 raise ValueError("already started") | |
208 self._interval = interval | |
209 threading.Thread.start(self) | |
210 self._flag.wait() | |
211 | |
212 def run(self): | |
213 self._running = True | |
214 self._flag.set() | |
215 while self._running: | |
216 time.sleep(self._interval) | |
217 | |
218 def stop(self): | |
219 """Stop thread execution and and waits until it is stopped.""" | |
220 if not self._running: | |
221 raise ValueError("already stopped") | |
222 self._running = False | |
223 self.join() | |
224 | |
225 | |
226 class TestCase(unittest.TestCase): | |
227 | |
228 def tearDown(self): | |
229 reap_children() | |
230 | |
231 # ============================ | |
232 # tests for system-related API | |
233 # ============================ | |
234 | |
235 def test_get_process_list(self): | |
236 pids = [x.pid for x in psutil.get_process_list()] | |
237 self.assertTrue(os.getpid() in pids) | |
238 | |
239 def test_process_iter(self): | |
240 pids = [x.pid for x in psutil.process_iter()] | |
241 self.assertTrue(os.getpid() in pids) | |
242 | |
243 def test_TOTAL_PHYMEM(self): | |
244 x = psutil.TOTAL_PHYMEM | |
245 self.assertTrue(isinstance(x, (int, long))) | |
246 self.assertTrue(x > 0) | |
247 | |
248 def test_BOOT_TIME(self): | |
249 x = psutil.BOOT_TIME | |
250 self.assertTrue(isinstance(x, float)) | |
251 self.assertTrue(x > 0) | |
252 | |
253 def test_deprecated_memory_functions(self): | |
254 warnings.filterwarnings("error") | |
255 try: | |
256 self.assertRaises(DeprecationWarning, psutil.used_phymem) | |
257 self.assertRaises(DeprecationWarning, psutil.avail_phymem) | |
258 self.assertRaises(DeprecationWarning, psutil.total_virtmem) | |
259 self.assertRaises(DeprecationWarning, psutil.used_virtmem) | |
260 self.assertRaises(DeprecationWarning, psutil.avail_virtmem) | |
261 finally: | |
262 warnings.resetwarnings() | |
263 | |
264 def test_phymem_usage(self): | |
265 mem = psutil.phymem_usage() | |
266 self.assertTrue(mem.total > 0) | |
267 self.assertTrue(mem.used > 0) | |
268 self.assertTrue(mem.free > 0) | |
269 self.assertTrue(0 <= mem.percent <= 100) | |
270 | |
271 def test_virtmem_usage(self): | |
272 mem = psutil.virtmem_usage() | |
273 self.assertTrue(mem.total > 0) | |
274 self.assertTrue(mem.used >= 0) | |
275 self.assertTrue(mem.free > 0) | |
276 self.assertTrue(0 <= mem.percent <= 100) | |
277 | |
278 @skipUnless(LINUX) | |
279 def test_phymem_buffers(self): | |
280 x = psutil.phymem_buffers() | |
281 self.assertTrue(isinstance(x, (int, long))) | |
282 self.assertTrue(x >= 0) | |
283 | |
284 def test_pid_exists(self): | |
285 sproc = get_test_subprocess() | |
286 wait_for_pid(sproc.pid) | |
287 self.assertTrue(psutil.pid_exists(sproc.pid)) | |
288 p = psutil.Process(sproc.pid) | |
289 p.kill() | |
290 p.wait() | |
291 self.assertFalse(psutil.pid_exists(sproc.pid)) | |
292 self.assertFalse(psutil.pid_exists(-1)) | |
293 | |
294 def test_pid_exists_2(self): | |
295 reap_children() | |
296 pids = psutil.get_pid_list() | |
297 for pid in pids: | |
298 try: | |
299 self.assertTrue(psutil.pid_exists(pid)) | |
300 except AssertionError: | |
301 # in case the process disappeared in meantime fail only | |
302 # if it is no longer in get_pid_list() | |
303 time.sleep(.1) | |
304 if pid in psutil.get_pid_list(): | |
305 self.fail(pid) | |
306 pids = range(max(pids) + 5000, max(pids) + 6000) | |
307 for pid in pids: | |
308 self.assertFalse(psutil.pid_exists(pid)) | |
309 | |
310 def test_get_pid_list(self): | |
311 plist = [x.pid for x in psutil.get_process_list()] | |
312 pidlist = psutil.get_pid_list() | |
313 self.assertEqual(plist.sort(), pidlist.sort()) | |
314 # make sure every pid is unique | |
315 self.assertEqual(len(pidlist), len(set(pidlist))) | |
316 | |
317 def test_test(self): | |
318 # test for psutil.test() function | |
319 stdout = sys.stdout | |
320 sys.stdout = DEVNULL | |
321 try: | |
322 psutil.test() | |
323 finally: | |
324 sys.stdout = stdout | |
325 | |
326 def test_sys_cpu_times(self): | |
327 total = 0 | |
328 times = psutil.cpu_times() | |
329 sum(times) | |
330 for cp_time in times: | |
331 self.assertTrue(isinstance(cp_time, float)) | |
332 self.assertTrue(cp_time >= 0.0) | |
333 total += cp_time | |
334 self.assertEqual(total, sum(times)) | |
335 str(times) | |
336 | |
337 def test_sys_cpu_times2(self): | |
338 t1 = sum(psutil.cpu_times()) | |
339 time.sleep(0.1) | |
340 t2 = sum(psutil.cpu_times()) | |
341 difference = t2 - t1 | |
342 if not difference >= 0.05: | |
343 self.fail("difference %s" % difference) | |
344 | |
345 def test_sys_per_cpu_times(self): | |
346 for times in psutil.cpu_times(percpu=True): | |
347 total = 0 | |
348 sum(times) | |
349 for cp_time in times: | |
350 self.assertTrue(isinstance(cp_time, float)) | |
351 self.assertTrue(cp_time >= 0.0) | |
352 total += cp_time | |
353 self.assertEqual(total, sum(times)) | |
354 str(times) | |
355 | |
356 def test_sys_per_cpu_times2(self): | |
357 tot1 = psutil.cpu_times(percpu=True) | |
358 stop_at = time.time() + 0.1 | |
359 while 1: | |
360 if time.time() >= stop_at: | |
361 break | |
362 tot2 = psutil.cpu_times(percpu=True) | |
363 for t1, t2 in zip(tot1, tot2): | |
364 t1, t2 = sum(t1), sum(t2) | |
365 difference = t2 - t1 | |
366 if difference >= 0.05: | |
367 return | |
368 self.fail() | |
369 | |
370 def test_sys_cpu_percent(self): | |
371 psutil.cpu_percent(interval=0.001) | |
372 psutil.cpu_percent(interval=0.001) | |
373 for x in range(1000): | |
374 percent = psutil.cpu_percent(interval=None) | |
375 self.assertTrue(isinstance(percent, float)) | |
376 self.assertTrue(percent >= 0.0) | |
377 self.assertTrue(percent <= 100.0) | |
378 | |
379 def test_sys_per_cpu_percent(self): | |
380 psutil.cpu_percent(interval=0.001, percpu=True) | |
381 psutil.cpu_percent(interval=0.001, percpu=True) | |
382 for x in range(1000): | |
383 percents = psutil.cpu_percent(interval=None, percpu=True) | |
384 for percent in percents: | |
385 self.assertTrue(isinstance(percent, float)) | |
386 self.assertTrue(percent >= 0.0) | |
387 self.assertTrue(percent <= 100.0) | |
388 | |
389 def test_sys_cpu_percent_compare(self): | |
390 psutil.cpu_percent(interval=0) | |
391 psutil.cpu_percent(interval=0, percpu=True) | |
392 time.sleep(.1) | |
393 t1 = psutil.cpu_percent(interval=0) | |
394 t2 = psutil.cpu_percent(interval=0, percpu=True) | |
395 # calculate total average | |
396 t2 = sum(t2) / len(t2) | |
397 if abs(t1 - t2) > 5: | |
398 self.assertEqual(t1, t2) | |
399 | |
400 def test_disk_usage(self): | |
401 usage = psutil.disk_usage(os.getcwd()) | |
402 self.assertTrue(usage.total > 0) | |
403 self.assertTrue(usage.used > 0) | |
404 self.assertTrue(usage.free > 0) | |
405 self.assertTrue(usage.total > usage.used) | |
406 self.assertTrue(usage.total > usage.free) | |
407 self.assertTrue(0 <= usage.percent <= 100) | |
408 | |
409 # if path does not exist OSError ENOENT is expected across | |
410 # all platforms | |
411 fname = tempfile.mktemp() | |
412 try: | |
413 psutil.disk_usage(fname) | |
414 except OSError: | |
415 err = sys.exc_info()[1] | |
416 if err.args[0] != errno.ENOENT: | |
417 raise | |
418 else: | |
419 self.fail("OSError not raised") | |
420 | |
421 def test_disk_partitions(self): | |
422 for disk in psutil.disk_partitions(all=False): | |
423 self.assertTrue(os.path.exists(disk.device)) | |
424 self.assertTrue(os.path.isdir(disk.mountpoint)) | |
425 self.assertTrue(disk.fstype) | |
426 for disk in psutil.disk_partitions(all=True): | |
427 if not WINDOWS: | |
428 self.assertTrue(os.path.isdir(disk.mountpoint)) | |
429 self.assertTrue(disk.fstype) | |
430 | |
431 def find_mount_point(path): | |
432 path = os.path.abspath(path) | |
433 while not os.path.ismount(path): | |
434 path = os.path.dirname(path) | |
435 return path | |
436 | |
437 mount = find_mount_point(__file__) | |
438 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] | |
439 self.assertTrue(mount in mounts) | |
440 psutil.disk_usage(mount) | |
441 | |
442 # XXX | |
443 @skipUnless(hasattr(psutil, "network_io_counters")) | |
444 def test_anetwork_io_counters(self): | |
445 def check_ntuple(nt): | |
446 self.assertEqual(nt[0], nt.bytes_sent) | |
447 self.assertEqual(nt[1], nt.bytes_recv) | |
448 self.assertEqual(nt[2], nt.packets_sent) | |
449 self.assertEqual(nt[3], nt.packets_recv) | |
450 self.assertTrue(nt.bytes_sent >= 0) | |
451 self.assertTrue(nt.bytes_recv >= 0) | |
452 self.assertTrue(nt.packets_sent >= 0) | |
453 self.assertTrue(nt.packets_recv >= 0) | |
454 | |
455 ret = psutil.network_io_counters(pernic=False) | |
456 check_ntuple(ret) | |
457 ret = psutil.network_io_counters(pernic=True) | |
458 for name, ntuple in ret.iteritems(): | |
459 self.assertTrue(name) | |
460 check_ntuple(ntuple) | |
461 # XXX | |
462 @skipUnless(hasattr(psutil, "disk_io_counters")) | |
463 def test_disk_io_counters(self): | |
464 def check_ntuple(nt): | |
465 self.assertEqual(nt[0], nt.read_count) | |
466 self.assertEqual(nt[1], nt.write_count) | |
467 self.assertEqual(nt[2], nt.read_bytes) | |
468 self.assertEqual(nt[3], nt.write_bytes) | |
469 self.assertEqual(nt[4], nt.read_time) | |
470 self.assertEqual(nt[5], nt.write_time) | |
471 self.assertTrue(nt.read_count >= 0) | |
472 self.assertTrue(nt.write_count >= 0) | |
473 self.assertTrue(nt.read_bytes >= 0) | |
474 self.assertTrue(nt.write_bytes >= 0) | |
475 self.assertTrue(nt.read_time >= 0) | |
476 self.assertTrue(nt.write_time >= 0) | |
477 | |
478 ret = psutil.disk_io_counters(perdisk=False) | |
479 check_ntuple(ret) | |
480 ret = psutil.disk_io_counters(perdisk=True) | |
481 for name, ntuple in ret.iteritems(): | |
482 self.assertTrue(name) | |
483 check_ntuple(ntuple) | |
484 | |
485 # ==================== | |
486 # Process object tests | |
487 # ==================== | |
488 | |
489 def test_kill(self): | |
490 sproc = get_test_subprocess() | |
491 test_pid = sproc.pid | |
492 wait_for_pid(test_pid) | |
493 p = psutil.Process(test_pid) | |
494 name = p.name | |
495 p.kill() | |
496 p.wait() | |
497 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
498 | |
499 def test_terminate(self): | |
500 sproc = get_test_subprocess() | |
501 test_pid = sproc.pid | |
502 wait_for_pid(test_pid) | |
503 p = psutil.Process(test_pid) | |
504 name = p.name | |
505 p.terminate() | |
506 p.wait() | |
507 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
508 | |
509 def test_send_signal(self): | |
510 if POSIX: | |
511 sig = signal.SIGKILL | |
512 else: | |
513 sig = signal.SIGTERM | |
514 sproc = get_test_subprocess() | |
515 test_pid = sproc.pid | |
516 p = psutil.Process(test_pid) | |
517 name = p.name | |
518 p.send_signal(sig) | |
519 p.wait() | |
520 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) | |
521 | |
522 def test_wait(self): | |
523 # check exit code signal | |
524 sproc = get_test_subprocess() | |
525 p = psutil.Process(sproc.pid) | |
526 p.kill() | |
527 code = p.wait() | |
528 if os.name == 'posix': | |
529 self.assertEqual(code, signal.SIGKILL) | |
530 else: | |
531 self.assertEqual(code, 0) | |
532 self.assertFalse(p.is_running()) | |
533 | |
534 sproc = get_test_subprocess() | |
535 p = psutil.Process(sproc.pid) | |
536 p.terminate() | |
537 code = p.wait() | |
538 if os.name == 'posix': | |
539 self.assertEqual(code, signal.SIGTERM) | |
540 else: | |
541 self.assertEqual(code, 0) | |
542 self.assertFalse(p.is_running()) | |
543 | |
544 # check sys.exit() code | |
545 code = "import time, sys; time.sleep(0.01); sys.exit(5);" | |
546 sproc = get_test_subprocess([PYTHON, "-c", code]) | |
547 p = psutil.Process(sproc.pid) | |
548 self.assertEqual(p.wait(), 5) | |
549 self.assertFalse(p.is_running()) | |
550 | |
551 # Test wait() issued twice. | |
552 # It is not supposed to raise NSP when the process is gone. | |
553 # On UNIX this should return None, on Windows it should keep | |
554 # returning the exit code. | |
555 sproc = get_test_subprocess([PYTHON, "-c", code]) | |
556 p = psutil.Process(sproc.pid) | |
557 self.assertEqual(p.wait(), 5) | |
558 self.assertTrue(p.wait() in (5, None)) | |
559 | |
560 # test timeout | |
561 sproc = get_test_subprocess() | |
562 p = psutil.Process(sproc.pid) | |
563 p.name | |
564 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) | |
565 | |
566 # timeout < 0 not allowed | |
567 self.assertRaises(ValueError, p.wait, -1) | |
568 | |
569 @skipUnless(POSIX) | |
570 def test_wait_non_children(self): | |
571 # test wait() against processes which are not our children | |
572 code = "import sys;" | |
573 code += "from subprocess import Popen, PIPE;" | |
574 code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON | |
575 code += "sp = Popen(cmd, stdout=PIPE);" | |
576 code += "sys.stdout.write(str(sp.pid));" | |
577 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE
) | |
578 | |
579 grandson_pid = int(sproc.stdout.read()) | |
580 grandson_proc = psutil.Process(grandson_pid) | |
581 try: | |
582 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) | |
583 grandson_proc.kill() | |
584 ret = grandson_proc.wait() | |
585 self.assertEqual(ret, None) | |
586 finally: | |
587 if grandson_proc.is_running(): | |
588 grandson_proc.kill() | |
589 grandson_proc.wait() | |
590 | |
591 def test_wait_timeout_0(self): | |
592 sproc = get_test_subprocess() | |
593 p = psutil.Process(sproc.pid) | |
594 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) | |
595 p.kill() | |
596 stop_at = time.time() + 2 | |
597 while 1: | |
598 try: | |
599 code = p.wait(0) | |
600 except psutil.TimeoutExpired: | |
601 if time.time() >= stop_at: | |
602 raise | |
603 else: | |
604 break | |
605 if os.name == 'posix': | |
606 self.assertEqual(code, signal.SIGKILL) | |
607 else: | |
608 self.assertEqual(code, 0) | |
609 self.assertFalse(p.is_running()) | |
610 | |
611 def test_cpu_percent(self): | |
612 p = psutil.Process(os.getpid()) | |
613 p.get_cpu_percent(interval=0.001) | |
614 p.get_cpu_percent(interval=0.001) | |
615 for x in range(100): | |
616 percent = p.get_cpu_percent(interval=None) | |
617 self.assertTrue(isinstance(percent, float)) | |
618 self.assertTrue(percent >= 0.0) | |
619 self.assertTrue(percent <= 100.0) | |
620 | |
621 def test_cpu_times(self): | |
622 times = psutil.Process(os.getpid()).get_cpu_times() | |
623 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) | |
624 # make sure returned values can be pretty printed with strftime | |
625 time.strftime("%H:%M:%S", time.localtime(times.user)) | |
626 time.strftime("%H:%M:%S", time.localtime(times.system)) | |
627 | |
628 # Test Process.cpu_times() against os.times() | |
629 # os.times() is broken on Python 2.6 | |
630 # http://bugs.python.org/issue1040026 | |
631 # XXX fails on OSX: not sure if it's for os.times(). We should | |
632 # try this with Python 2.7 and re-enable the test. | |
633 | |
634 @skipUnless(sys.version_info > (2, 6, 1) and not OSX) | |
635 def test_cpu_times2(self): | |
636 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() | |
637 utime, ktime = os.times()[:2] | |
638 | |
639 # Use os.times()[:2] as base values to compare our results | |
640 # using a tolerance of +/- 0.1 seconds. | |
641 # It will fail if the difference between the values is > 0.1s. | |
642 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: | |
643 self.fail("expected: %s, found: %s" %(utime, user_time)) | |
644 | |
645 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: | |
646 self.fail("expected: %s, found: %s" %(ktime, kernel_time)) | |
647 | |
648 def test_create_time(self): | |
649 sproc = get_test_subprocess() | |
650 now = time.time() | |
651 wait_for_pid(sproc.pid) | |
652 p = psutil.Process(sproc.pid) | |
653 create_time = p.create_time | |
654 | |
655 # Use time.time() as base value to compare our result using a | |
656 # tolerance of +/- 1 second. | |
657 # It will fail if the difference between the values is > 2s. | |
658 difference = abs(create_time - now) | |
659 if difference > 2: | |
660 self.fail("expected: %s, found: %s, difference: %s" | |
661 % (now, create_time, difference)) | |
662 | |
663 # make sure returned value can be pretty printed with strftime | |
664 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) | |
665 | |
666 @skipIf(WINDOWS) | |
667 def test_terminal(self): | |
668 tty = sh('tty') | |
669 p = psutil.Process(os.getpid()) | |
670 self.assertEqual(p.terminal, tty) | |
671 | |
672 @skipIf(OSX, warn=False) | |
673 def test_get_io_counters(self): | |
674 p = psutil.Process(os.getpid()) | |
675 # test reads | |
676 io1 = p.get_io_counters() | |
677 f = open(PYTHON, 'rb') | |
678 f.read() | |
679 f.close() | |
680 io2 = p.get_io_counters() | |
681 if not BSD: | |
682 self.assertTrue(io2.read_count > io1.read_count) | |
683 self.assertTrue(io2.write_count == io1.write_count) | |
684 self.assertTrue(io2.read_bytes >= io1.read_bytes) | |
685 self.assertTrue(io2.write_bytes >= io1.write_bytes) | |
686 # test writes | |
687 io1 = p.get_io_counters() | |
688 f = tempfile.TemporaryFile() | |
689 if sys.version_info >= (3,): | |
690 f.write(bytes("x" * 1000000, 'ascii')) | |
691 else: | |
692 f.write("x" * 1000000) | |
693 f.close() | |
694 io2 = p.get_io_counters() | |
695 if not BSD: | |
696 self.assertTrue(io2.write_count > io1.write_count) | |
697 self.assertTrue(io2.write_bytes > io1.write_bytes) | |
698 self.assertTrue(io2.read_count >= io1.read_count) | |
699 self.assertTrue(io2.read_bytes >= io1.read_bytes) | |
700 | |
701 @skipUnless(LINUX) | |
702 def test_get_set_ionice(self): | |
703 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE, | |
704 IOPRIO_CLASS_IDLE) | |
705 self.assertEqual(IOPRIO_CLASS_NONE, 0) | |
706 self.assertEqual(IOPRIO_CLASS_RT, 1) | |
707 self.assertEqual(IOPRIO_CLASS_BE, 2) | |
708 self.assertEqual(IOPRIO_CLASS_IDLE, 3) | |
709 p = psutil.Process(os.getpid()) | |
710 try: | |
711 p.set_ionice(2) | |
712 ioclass, value = p.get_ionice() | |
713 self.assertEqual(ioclass, 2) | |
714 self.assertEqual(value, 4) | |
715 # | |
716 p.set_ionice(3) | |
717 ioclass, value = p.get_ionice() | |
718 self.assertEqual(ioclass, 3) | |
719 self.assertEqual(value, 0) | |
720 # | |
721 p.set_ionice(2, 0) | |
722 ioclass, value = p.get_ionice() | |
723 self.assertEqual(ioclass, 2) | |
724 self.assertEqual(value, 0) | |
725 p.set_ionice(2, 7) | |
726 ioclass, value = p.get_ionice() | |
727 self.assertEqual(ioclass, 2) | |
728 self.assertEqual(value, 7) | |
729 self.assertRaises(ValueError, p.set_ionice, 2, 10) | |
730 finally: | |
731 p.set_ionice(IOPRIO_CLASS_NONE) | |
732 | |
733 def test_get_num_threads(self): | |
734 # on certain platforms such as Linux we might test for exact | |
735 # thread number, since we always have with 1 thread per process, | |
736 # but this does not apply across all platforms (OSX, Windows) | |
737 p = psutil.Process(os.getpid()) | |
738 step1 = p.get_num_threads() | |
739 | |
740 thread = ThreadTask() | |
741 thread.start() | |
742 try: | |
743 step2 = p.get_num_threads() | |
744 self.assertEqual(step2, step1 + 1) | |
745 thread.stop() | |
746 finally: | |
747 if thread._running: | |
748 thread.stop() | |
749 | |
750 def test_get_threads(self): | |
751 p = psutil.Process(os.getpid()) | |
752 step1 = p.get_threads() | |
753 | |
754 thread = ThreadTask() | |
755 thread.start() | |
756 | |
757 try: | |
758 step2 = p.get_threads() | |
759 self.assertEqual(len(step2), len(step1) + 1) | |
760 # on Linux, first thread id is supposed to be this process | |
761 if LINUX: | |
762 self.assertEqual(step2[0].id, os.getpid()) | |
763 athread = step2[0] | |
764 # test named tuple | |
765 self.assertEqual(athread.id, athread[0]) | |
766 self.assertEqual(athread.user_time, athread[1]) | |
767 self.assertEqual(athread.system_time, athread[2]) | |
768 # test num threads | |
769 thread.stop() | |
770 finally: | |
771 if thread._running: | |
772 thread.stop() | |
773 | |
774 def test_get_memory_info(self): | |
775 p = psutil.Process(os.getpid()) | |
776 | |
777 # step 1 - get a base value to compare our results | |
778 rss1, vms1 = p.get_memory_info() | |
779 percent1 = p.get_memory_percent() | |
780 self.assertTrue(rss1 > 0) | |
781 self.assertTrue(vms1 > 0) | |
782 | |
783 # step 2 - allocate some memory | |
784 memarr = [None] * 1500000 | |
785 | |
786 rss2, vms2 = p.get_memory_info() | |
787 percent2 = p.get_memory_percent() | |
788 # make sure that the memory usage bumped up | |
789 self.assertTrue(rss2 > rss1) | |
790 self.assertTrue(vms2 >= vms1) # vms might be equal | |
791 self.assertTrue(percent2 > percent1) | |
792 del memarr | |
793 | |
794 def test_get_memory_percent(self): | |
795 p = psutil.Process(os.getpid()) | |
796 self.assertTrue(p.get_memory_percent() > 0.0) | |
797 | |
798 def test_pid(self): | |
799 sproc = get_test_subprocess() | |
800 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) | |
801 | |
802 def test_is_running(self): | |
803 sproc = get_test_subprocess() | |
804 wait_for_pid(sproc.pid) | |
805 p = psutil.Process(sproc.pid) | |
806 self.assertTrue(p.is_running()) | |
807 p.kill() | |
808 p.wait() | |
809 self.assertFalse(p.is_running()) | |
810 | |
811 def test_exe(self): | |
812 sproc = get_test_subprocess() | |
813 wait_for_pid(sproc.pid) | |
814 try: | |
815 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON) | |
816 except AssertionError: | |
817 # certain platforms such as BSD are more accurate returning: | |
818 # "/usr/local/bin/python2.7" | |
819 # ...instead of: | |
820 # "/usr/local/bin/python" | |
821 # We do not want to consider this difference in accuracy | |
822 # an error. | |
823 name = psutil.Process(sproc.pid).exe | |
824 adjusted_name = PYTHON[:len(name)] | |
825 self.assertEqual(name, adjusted_name) | |
826 for p in psutil.process_iter(): | |
827 try: | |
828 exe = p.exe | |
829 except psutil.Error: | |
830 continue | |
831 if not exe: | |
832 continue | |
833 if not os.path.exists(exe): | |
834 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \ | |
835 % (repr(exe), p.pid, p.name, p.cmdline)) | |
836 if hasattr(os, 'access') and hasattr(os, "X_OK"): | |
837 if not os.access(p.exe, os.X_OK): | |
838 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s
)" \ | |
839 % (repr(p.exe), p.pid, p.name, p.cmdline)) | |
840 | |
841 def test_cmdline(self): | |
842 sproc = get_test_subprocess([PYTHON, "-E"]) | |
843 wait_for_pid(sproc.pid) | |
844 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"]) | |
845 | |
846 def test_name(self): | |
847 sproc = get_test_subprocess(PYTHON) | |
848 wait_for_pid(sproc.pid) | |
849 self.assertEqual(psutil.Process(sproc.pid).name.lower(), | |
850 os.path.basename(sys.executable).lower()) | |
851 | |
852 if os.name == 'posix': | |
853 | |
854 def test_uids(self): | |
855 p = psutil.Process(os.getpid()) | |
856 real, effective, saved = p.uids | |
857 # os.getuid() refers to "real" uid | |
858 self.assertEqual(real, os.getuid()) | |
859 # os.geteuid() refers to "effective" uid | |
860 self.assertEqual(effective, os.geteuid()) | |
861 # no such thing as os.getsuid() ("saved" uid), but starting | |
862 # from python 2.7 we have os.getresuid()[2] | |
863 if hasattr(os, "getresuid"): | |
864 self.assertEqual(saved, os.getresuid()[2]) | |
865 | |
866 def test_gids(self): | |
867 p = psutil.Process(os.getpid()) | |
868 real, effective, saved = p.gids | |
869 # os.getuid() refers to "real" uid | |
870 self.assertEqual(real, os.getgid()) | |
871 # os.geteuid() refers to "effective" uid | |
872 self.assertEqual(effective, os.getegid()) | |
873 # no such thing as os.getsuid() ("saved" uid), but starting | |
874 # from python 2.7 we have os.getresgid()[2] | |
875 if hasattr(os, "getresuid"): | |
876 self.assertEqual(saved, os.getresgid()[2]) | |
877 | |
878 def test_nice(self): | |
879 p = psutil.Process(os.getpid()) | |
880 self.assertRaises(TypeError, setattr, p, "nice", "str") | |
881 try: | |
882 try: | |
883 first_nice = p.nice | |
884 p.nice = 1 | |
885 self.assertEqual(p.nice, 1) | |
886 # going back to previous nice value raises AccessDenied on O
SX | |
887 if not OSX: | |
888 p.nice = 0 | |
889 self.assertEqual(p.nice, 0) | |
890 except psutil.AccessDenied: | |
891 pass | |
892 finally: | |
893 # going back to previous nice value raises AccessDenied on OSX | |
894 if not OSX: | |
895 p.nice = first_nice | |
896 | |
897 if os.name == 'nt': | |
898 | |
899 def test_nice(self): | |
900 p = psutil.Process(os.getpid()) | |
901 self.assertRaises(TypeError, setattr, p, "nice", "str") | |
902 try: | |
903 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) | |
904 p.nice = psutil.HIGH_PRIORITY_CLASS | |
905 self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS) | |
906 p.nice = psutil.NORMAL_PRIORITY_CLASS | |
907 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) | |
908 finally: | |
909 p.nice = psutil.NORMAL_PRIORITY_CLASS | |
910 | |
911 def test_status(self): | |
912 p = psutil.Process(os.getpid()) | |
913 self.assertEqual(p.status, psutil.STATUS_RUNNING) | |
914 self.assertEqual(str(p.status), "running") | |
915 for p in psutil.process_iter(): | |
916 if str(p.status) == '?': | |
917 self.fail("invalid status for pid %d" % p.pid) | |
918 | |
919 def test_username(self): | |
920 sproc = get_test_subprocess() | |
921 p = psutil.Process(sproc.pid) | |
922 if POSIX: | |
923 import pwd | |
924 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) | |
925 elif WINDOWS: | |
926 expected_username = os.environ['USERNAME'] | |
927 expected_domain = os.environ['USERDOMAIN'] | |
928 domain, username = p.username.split('\\') | |
929 self.assertEqual(domain, expected_domain) | |
930 self.assertEqual(username, expected_username) | |
931 else: | |
932 p.username | |
933 | |
934 @skipUnless(WINDOWS or LINUX) | |
935 def test_getcwd(self): | |
936 sproc = get_test_subprocess() | |
937 wait_for_pid(sproc.pid) | |
938 p = psutil.Process(sproc.pid) | |
939 self.assertEqual(p.getcwd(), os.getcwd()) | |
940 | |
941 @skipUnless(WINDOWS or LINUX) | |
942 def test_getcwd_2(self): | |
943 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] | |
944 sproc = get_test_subprocess(cmd) | |
945 wait_for_pid(sproc.pid) | |
946 p = psutil.Process(sproc.pid) | |
947 time.sleep(0.1) | |
948 expected_dir = os.path.dirname(os.getcwd()) | |
949 self.assertEqual(p.getcwd(), expected_dir) | |
950 | |
951 def test_get_open_files(self): | |
952 # current process | |
953 p = psutil.Process(os.getpid()) | |
954 files = p.get_open_files() | |
955 self.assertFalse(TESTFN in files) | |
956 f = open(TESTFN, 'r') | |
957 time.sleep(.1) | |
958 filenames = [x.path for x in p.get_open_files()] | |
959 self.assertTrue(TESTFN in filenames) | |
960 f.close() | |
961 for file in filenames: | |
962 self.assertTrue(os.path.isfile(file)) | |
963 | |
964 # another process | |
965 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN | |
966 sproc = get_test_subprocess([PYTHON, "-c", cmdline]) | |
967 wait_for_pid(sproc.pid) | |
968 time.sleep(0.1) | |
969 p = psutil.Process(sproc.pid) | |
970 for x in range(100): | |
971 filenames = [x.path for x in p.get_open_files()] | |
972 if TESTFN in filenames: | |
973 break | |
974 time.sleep(.01) | |
975 else: | |
976 self.assertTrue(TESTFN in filenames) | |
977 for file in filenames: | |
978 self.assertTrue(os.path.isfile(file)) | |
979 # all processes | |
980 for proc in psutil.process_iter(): | |
981 try: | |
982 files = proc.get_open_files() | |
983 except psutil.Error: | |
984 pass | |
985 else: | |
986 for file in filenames: | |
987 self.assertTrue(os.path.isfile(file)) | |
988 | |
989 def test_get_open_files2(self): | |
990 # test fd and path fields | |
991 fileobj = open(TESTFN, 'r') | |
992 p = psutil.Process(os.getpid()) | |
993 for path, fd in p.get_open_files(): | |
994 if path == fileobj.name or fd == fileobj.fileno(): | |
995 break | |
996 else: | |
997 self.fail("no file found; files=%s" % repr(p.get_open_files())) | |
998 self.assertEqual(path, fileobj.name) | |
999 if WINDOWS: | |
1000 self.assertEqual(fd, -1) | |
1001 else: | |
1002 self.assertEqual(fd, fileobj.fileno()) | |
1003 # test positions | |
1004 ntuple = p.get_open_files()[0] | |
1005 self.assertEqual(ntuple[0], ntuple.path) | |
1006 self.assertEqual(ntuple[1], ntuple.fd) | |
1007 # test file is gone | |
1008 fileobj.close() | |
1009 self.assertTrue(fileobj.name not in p.get_open_files()) | |
1010 | |
1011 @skipUnless(SUPPORT_CONNECTIONS, warn=1) | |
1012 def test_get_connections(self): | |
1013 arg = "import socket, time;" \ | |
1014 "s = socket.socket();" \ | |
1015 "s.bind(('127.0.0.1', 0));" \ | |
1016 "s.listen(1);" \ | |
1017 "conn, addr = s.accept();" \ | |
1018 "time.sleep(100);" | |
1019 sproc = get_test_subprocess([PYTHON, "-c", arg]) | |
1020 p = psutil.Process(sproc.pid) | |
1021 for x in range(100): | |
1022 cons = p.get_connections() | |
1023 if cons: | |
1024 break | |
1025 time.sleep(.01) | |
1026 self.assertEqual(len(cons), 1) | |
1027 con = cons[0] | |
1028 self.assertEqual(con.family, socket.AF_INET) | |
1029 self.assertEqual(con.type, socket.SOCK_STREAM) | |
1030 self.assertEqual(con.status, "LISTEN") | |
1031 ip, port = con.local_address | |
1032 self.assertEqual(ip, '127.0.0.1') | |
1033 self.assertEqual(con.remote_address, ()) | |
1034 if WINDOWS: | |
1035 self.assertEqual(con.fd, -1) | |
1036 else: | |
1037 self.assertTrue(con.fd > 0) | |
1038 # test positions | |
1039 self.assertEqual(con[0], con.fd) | |
1040 self.assertEqual(con[1], con.family) | |
1041 self.assertEqual(con[2], con.type) | |
1042 self.assertEqual(con[3], con.local_address) | |
1043 self.assertEqual(con[4], con.remote_address) | |
1044 self.assertEqual(con[5], con.status) | |
1045 | |
1046 @skipUnless(supports_ipv6()) | |
1047 def test_get_connections_ipv6(self): | |
1048 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | |
1049 s.bind(('::1', 0)) | |
1050 s.listen(1) | |
1051 cons = psutil.Process(os.getpid()).get_connections() | |
1052 s.close() | |
1053 self.assertEqual(len(cons), 1) | |
1054 self.assertEqual(cons[0].local_address[0], '::1') | |
1055 | |
1056 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS) | |
1057 def test_connection_fromfd(self): | |
1058 sock = socket.socket() | |
1059 sock.bind(('localhost', 0)) | |
1060 sock.listen(1) | |
1061 p = psutil.Process(os.getpid()) | |
1062 for conn in p.get_connections(): | |
1063 if conn.fd == sock.fileno(): | |
1064 break | |
1065 else: | |
1066 sock.close() | |
1067 self.fail("couldn't find socket fd") | |
1068 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) | |
1069 try: | |
1070 self.assertEqual(dupsock.getsockname(), conn.local_address) | |
1071 self.assertNotEqual(sock.fileno(), dupsock.fileno()) | |
1072 finally: | |
1073 sock.close() | |
1074 dupsock.close() | |
1075 | |
1076 @skipUnless(SUPPORT_CONNECTIONS, warn=1) | |
1077 def test_get_connections_all(self): | |
1078 | |
1079 def check_address(addr, family): | |
1080 if not addr: | |
1081 return | |
1082 ip, port = addr | |
1083 self.assertTrue(isinstance(port, int)) | |
1084 if family == socket.AF_INET: | |
1085 ip = list(map(int, ip.split('.'))) | |
1086 self.assertTrue(len(ip) == 4) | |
1087 for num in ip: | |
1088 self.assertTrue(0 <= num <= 255) | |
1089 self.assertTrue(0 <= port <= 65535) | |
1090 | |
1091 # all values are supposed to match Linux's tcp_states.h states | |
1092 # table across all platforms. | |
1093 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", | |
1094 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", | |
1095 "LAST_ACK", "LISTEN", "CLOSING", ""] | |
1096 | |
1097 tcp_template = "import socket;" \ | |
1098 "s = socket.socket($family, socket.SOCK_STREAM);" \ | |
1099 "s.bind(('$addr', 0));" \ | |
1100 "s.listen(1);" \ | |
1101 "conn, addr = s.accept();" | |
1102 | |
1103 udp_template = "import socket, time;" \ | |
1104 "s = socket.socket($family, socket.SOCK_DGRAM);" \ | |
1105 "s.bind(('$addr', 0));" \ | |
1106 "time.sleep(100);" | |
1107 | |
1108 from string import Template | |
1109 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, | |
1110 addr="127.0.0.1") | |
1111 udp4_template = Template(udp_template).substitute(family=socket.AF_INET, | |
1112 addr="127.0.0.1") | |
1113 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6
, | |
1114 addr="::1") | |
1115 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6
, | |
1116 addr="::1") | |
1117 | |
1118 # launch various subprocess instantiating a socket of various | |
1119 # families and tupes to enrich psutil results | |
1120 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) | |
1121 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) | |
1122 if supports_ipv6(): | |
1123 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) | |
1124 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) | |
1125 else: | |
1126 tcp6_proc = None | |
1127 udp6_proc = None | |
1128 | |
1129 # --- check connections of all processes | |
1130 | |
1131 time.sleep(0.1) | |
1132 for p in psutil.process_iter(): | |
1133 try: | |
1134 cons = p.get_connections() | |
1135 except (psutil.NoSuchProcess, psutil.AccessDenied): | |
1136 pass | |
1137 else: | |
1138 for conn in cons: | |
1139 self.assertTrue(conn.type in (socket.SOCK_STREAM, | |
1140 socket.SOCK_DGRAM)) | |
1141 self.assertTrue(conn.family in (socket.AF_INET, | |
1142 socket.AF_INET6)) | |
1143 check_address(conn.local_address, conn.family) | |
1144 check_address(conn.remote_address, conn.family) | |
1145 if conn.status not in valid_states: | |
1146 self.fail("%s is not a valid status" %conn.status) | |
1147 # actually try to bind the local socket; ignore IPv6 | |
1148 # sockets as their address might be represented as | |
1149 # an IPv4-mapped-address (e.g. "::127.0.0.1") | |
1150 # and that's rejected by bind() | |
1151 if conn.family == socket.AF_INET: | |
1152 s = socket.socket(conn.family, conn.type) | |
1153 s.bind((conn.local_address[0], 0)) | |
1154 s.close() | |
1155 | |
1156 if not WINDOWS and hasattr(socket, 'fromfd'): | |
1157 dupsock = None | |
1158 try: | |
1159 try: | |
1160 dupsock = socket.fromfd(conn.fd, conn.family, | |
1161 conn.type) | |
1162 except (socket.error, OSError): | |
1163 err = sys.exc_info()[1] | |
1164 if err.args[0] == errno.EBADF: | |
1165 continue | |
1166 else: | |
1167 raise | |
1168 # python >= 2.5 | |
1169 if hasattr(dupsock, "family"): | |
1170 self.assertEqual(dupsock.family, conn.family) | |
1171 self.assertEqual(dupsock.type, conn.type) | |
1172 finally: | |
1173 if dupsock is not None: | |
1174 dupsock.close() | |
1175 | |
1176 | |
1177 # --- check matches against subprocesses | |
1178 | |
1179 for p in psutil.Process(os.getpid()).get_children(): | |
1180 for conn in p.get_connections(): | |
1181 # TCP v4 | |
1182 if p.pid == tcp4_proc.pid: | |
1183 self.assertEqual(conn.family, socket.AF_INET) | |
1184 self.assertEqual(conn.type, socket.SOCK_STREAM) | |
1185 self.assertEqual(conn.local_address[0], "127.0.0.1") | |
1186 self.assertEqual(conn.remote_address, ()) | |
1187 self.assertEqual(conn.status, "LISTEN") | |
1188 # UDP v4 | |
1189 elif p.pid == udp4_proc.pid: | |
1190 self.assertEqual(conn.family, socket.AF_INET) | |
1191 self.assertEqual(conn.type, socket.SOCK_DGRAM) | |
1192 self.assertEqual(conn.local_address[0], "127.0.0.1") | |
1193 self.assertEqual(conn.remote_address, ()) | |
1194 self.assertEqual(conn.status, "") | |
1195 # TCP v6 | |
1196 elif p.pid == getattr(tcp6_proc, "pid", None): | |
1197 self.assertEqual(conn.family, socket.AF_INET6) | |
1198 self.assertEqual(conn.type, socket.SOCK_STREAM) | |
1199 self.assertTrue(conn.local_address[0] in ("::", "::1")) | |
1200 self.assertEqual(conn.remote_address, ()) | |
1201 self.assertEqual(conn.status, "LISTEN") | |
1202 # UDP v6 | |
1203 elif p.pid == getattr(udp6_proc, "pid", None): | |
1204 self.assertEqual(conn.family, socket.AF_INET6) | |
1205 self.assertEqual(conn.type, socket.SOCK_DGRAM) | |
1206 self.assertTrue(conn.local_address[0] in ("::", "::1")) | |
1207 self.assertEqual(conn.remote_address, ()) | |
1208 self.assertEqual(conn.status, "") | |
1209 | |
1210 def test_parent_ppid(self): | |
1211 this_parent = os.getpid() | |
1212 sproc = get_test_subprocess() | |
1213 p = psutil.Process(sproc.pid) | |
1214 self.assertEqual(p.ppid, this_parent) | |
1215 self.assertEqual(p.parent.pid, this_parent) | |
1216 # no other process is supposed to have us as parent | |
1217 for p in psutil.process_iter(): | |
1218 if p.pid == sproc.pid: | |
1219 continue | |
1220 self.assertTrue(p.ppid != this_parent) | |
1221 | |
1222 def test_get_children(self): | |
1223 p = psutil.Process(os.getpid()) | |
1224 self.assertEqual(p.get_children(), []) | |
1225 sproc = get_test_subprocess() | |
1226 children = p.get_children() | |
1227 self.assertEqual(len(children), 1) | |
1228 self.assertEqual(children[0].pid, sproc.pid) | |
1229 self.assertEqual(children[0].ppid, os.getpid()) | |
1230 | |
1231 def test_suspend_resume(self): | |
1232 sproc = get_test_subprocess() | |
1233 p = psutil.Process(sproc.pid) | |
1234 p.suspend() | |
1235 time.sleep(0.1) | |
1236 self.assertEqual(p.status, psutil.STATUS_STOPPED) | |
1237 self.assertEqual(str(p.status), "stopped") | |
1238 p.resume() | |
1239 self.assertTrue(p.status != psutil.STATUS_STOPPED) | |
1240 | |
1241 def test_invalid_pid(self): | |
1242 self.assertRaises(ValueError, psutil.Process, "1") | |
1243 self.assertRaises(ValueError, psutil.Process, None) | |
1244 # Refers to Issue #12 | |
1245 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1) | |
1246 | |
1247 def test_zombie_process(self): | |
1248 # Test that NoSuchProcess exception gets raised in case the | |
1249 # process dies after we create the Process object. | |
1250 # Example: | |
1251 # >>> proc = Process(1234) | |
1252 # >>> time.sleep(5) # time-consuming task, process dies in meantime | |
1253 # >>> proc.name | |
1254 # Refers to Issue #15 | |
1255 sproc = get_test_subprocess() | |
1256 p = psutil.Process(sproc.pid) | |
1257 p.kill() | |
1258 p.wait() | |
1259 | |
1260 for name in dir(p): | |
1261 if name.startswith('_')\ | |
1262 or name in ('pid', 'send_signal', 'is_running', 'set_ionice', | |
1263 'wait'): | |
1264 continue | |
1265 try: | |
1266 meth = getattr(p, name) | |
1267 if callable(meth): | |
1268 meth() | |
1269 except psutil.NoSuchProcess: | |
1270 pass | |
1271 else: | |
1272 self.fail("NoSuchProcess exception not raised for %r" % name) | |
1273 | |
1274 # other methods | |
1275 try: | |
1276 if os.name == 'posix': | |
1277 p.nice = 1 | |
1278 else: | |
1279 p.nice = psutil.NORMAL_PRIORITY_CLASS | |
1280 except psutil.NoSuchProcess: | |
1281 pass | |
1282 else: | |
1283 self.fail("exception not raised") | |
1284 if hasattr(p, 'set_ionice'): | |
1285 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2) | |
1286 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) | |
1287 self.assertFalse(p.is_running()) | |
1288 | |
1289 def test__str__(self): | |
1290 sproc = get_test_subprocess() | |
1291 p = psutil.Process(sproc.pid) | |
1292 self.assertTrue(str(sproc.pid) in str(p)) | |
1293 # python shows up as 'Python' in cmdline on OS X so test fails on OS X | |
1294 if not OSX: | |
1295 self.assertTrue(os.path.basename(PYTHON) in str(p)) | |
1296 sproc = get_test_subprocess() | |
1297 p = psutil.Process(sproc.pid) | |
1298 p.kill() | |
1299 p.wait() | |
1300 self.assertTrue(str(sproc.pid) in str(p)) | |
1301 self.assertTrue("terminated" in str(p)) | |
1302 | |
1303 def test_fetch_all(self): | |
1304 valid_procs = 0 | |
1305 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', | |
1306 'kill', 'wait'] | |
1307 excluded_names += ['get_cpu_percent', 'get_children'] | |
1308 # XXX - skip slow lsof implementation; | |
1309 if BSD: | |
1310 excluded_names += ['get_open_files', 'get_connections'] | |
1311 if OSX: | |
1312 excluded_names += ['get_connections'] | |
1313 attrs = [] | |
1314 for name in dir(psutil.Process): | |
1315 if name.startswith("_"): | |
1316 continue | |
1317 if name.startswith("set_"): | |
1318 continue | |
1319 if name in excluded_names: | |
1320 continue | |
1321 attrs.append(name) | |
1322 | |
1323 for p in psutil.process_iter(): | |
1324 for name in attrs: | |
1325 try: | |
1326 try: | |
1327 attr = getattr(p, name, None) | |
1328 if attr is not None and callable(attr): | |
1329 ret = attr() | |
1330 else: | |
1331 ret = attr | |
1332 valid_procs += 1 | |
1333 except (psutil.NoSuchProcess, psutil.AccessDenied): | |
1334 err = sys.exc_info()[1] | |
1335 self.assertEqual(err.pid, p.pid) | |
1336 if err.name: | |
1337 self.assertEqual(err.name, p.name) | |
1338 self.assertTrue(str(err)) | |
1339 self.assertTrue(err.msg) | |
1340 else: | |
1341 if name == 'parent' or ret in (0, 0.0, [], None): | |
1342 continue | |
1343 self.assertTrue(ret) | |
1344 if name == "exe": | |
1345 self.assertTrue(os.path.isfile(ret)) | |
1346 elif name == "getcwd": | |
1347 # XXX - temporary fix; on my Linux box | |
1348 # chrome process cws is errnously reported | |
1349 # as /proc/4144/fdinfo whichd doesn't exist | |
1350 if 'chrome' in p.name: | |
1351 continue | |
1352 self.assertTrue(os.path.isdir(ret)) | |
1353 except Exception: | |
1354 err = sys.exc_info()[1] | |
1355 trace = traceback.format_exc() | |
1356 self.fail('%s\nmethod=%s, pid=%s, retvalue=%s' | |
1357 %(trace, name, p.pid, repr(ret))) | |
1358 | |
1359 # we should always have a non-empty list, not including PID 0 etc. | |
1360 # special cases. | |
1361 self.assertTrue(valid_procs > 0) | |
1362 | |
1363 @skipIf(LINUX) | |
1364 def test_pid_0(self): | |
1365 # Process(0) is supposed to work on all platforms except Linux | |
1366 p = psutil.Process(0) | |
1367 self.assertTrue(p.name) | |
1368 | |
1369 if os.name == 'posix': | |
1370 self.assertEqual(p.uids.real, 0) | |
1371 self.assertEqual(p.gids.real, 0) | |
1372 | |
1373 self.assertTrue(p.ppid in (0, 1)) | |
1374 #self.assertEqual(p.exe, "") | |
1375 self.assertEqual(p.cmdline, []) | |
1376 try: | |
1377 p.get_num_threads() | |
1378 except psutil.AccessDenied: | |
1379 pass | |
1380 | |
1381 if OSX : #and os.geteuid() != 0: | |
1382 self.assertRaises(psutil.AccessDenied, p.get_memory_info) | |
1383 self.assertRaises(psutil.AccessDenied, p.get_cpu_times) | |
1384 else: | |
1385 p.get_memory_info() | |
1386 | |
1387 # username property | |
1388 if POSIX: | |
1389 self.assertEqual(p.username, 'root') | |
1390 elif WINDOWS: | |
1391 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') | |
1392 else: | |
1393 p.username | |
1394 | |
1395 self.assertTrue(0 in psutil.get_pid_list()) | |
1396 self.assertTrue(psutil.pid_exists(0)) | |
1397 | |
1398 def test_Popen(self): | |
1399 # Popen class test | |
1400 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] | |
1401 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
1402 try: | |
1403 proc.name | |
1404 proc.stdin | |
1405 self.assertTrue(hasattr(proc, 'name')) | |
1406 self.assertTrue(hasattr(proc, 'stdin')) | |
1407 self.assertRaises(AttributeError, getattr, proc, 'foo') | |
1408 finally: | |
1409 proc.kill() | |
1410 proc.wait() | |
1411 | |
1412 | |
1413 if hasattr(os, 'getuid'): | |
1414 class LimitedUserTestCase(TestCase): | |
1415 """Repeat the previous tests by using a limited user. | |
1416 Executed only on UNIX and only if the user who run the test script | |
1417 is root. | |
1418 """ | |
1419 # the uid/gid the test suite runs under | |
1420 PROCESS_UID = os.getuid() | |
1421 PROCESS_GID = os.getgid() | |
1422 | |
1423 def __init__(self, *args, **kwargs): | |
1424 TestCase.__init__(self, *args, **kwargs) | |
1425 # re-define all existent test methods in order to | |
1426 # ignore AccessDenied exceptions | |
1427 for attr in [x for x in dir(self) if x.startswith('test')]: | |
1428 meth = getattr(self, attr) | |
1429 def test_(self): | |
1430 try: | |
1431 meth() | |
1432 except psutil.AccessDenied: | |
1433 pass | |
1434 setattr(self, attr, types.MethodType(test_, self)) | |
1435 | |
1436 def setUp(self): | |
1437 os.setegid(1000) | |
1438 os.seteuid(1000) | |
1439 TestCase.setUp(self) | |
1440 | |
1441 def tearDown(self): | |
1442 os.setegid(self.PROCESS_UID) | |
1443 os.seteuid(self.PROCESS_GID) | |
1444 TestCase.tearDown(self) | |
1445 | |
1446 def test_nice(self): | |
1447 try: | |
1448 psutil.Process(os.getpid()).nice = -1 | |
1449 except psutil.AccessDenied: | |
1450 pass | |
1451 else: | |
1452 self.fail("exception not raised") | |
1453 | |
1454 | |
1455 def test_main(): | |
1456 tests = [] | |
1457 test_suite = unittest.TestSuite() | |
1458 tests.append(TestCase) | |
1459 | |
1460 if POSIX: | |
1461 from _posix import PosixSpecificTestCase | |
1462 tests.append(PosixSpecificTestCase) | |
1463 | |
1464 # import the specific platform test suite | |
1465 if LINUX: | |
1466 from _linux import LinuxSpecificTestCase as stc | |
1467 elif WINDOWS: | |
1468 from _windows import WindowsSpecificTestCase as stc | |
1469 elif OSX: | |
1470 from _osx import OSXSpecificTestCase as stc | |
1471 elif BSD: | |
1472 from _bsd import BSDSpecificTestCase as stc | |
1473 tests.append(stc) | |
1474 | |
1475 if hasattr(os, 'getuid'): | |
1476 if os.getuid() == 0: | |
1477 tests.append(LimitedUserTestCase) | |
1478 else: | |
1479 atexit.register(warnings.warn, "Couldn't run limited user tests (" | |
1480 "super-user privileges are required)", RuntimeWarning) | |
1481 | |
1482 for test_class in tests: | |
1483 test_suite.addTest(unittest.makeSuite(test_class)) | |
1484 | |
1485 f = open(TESTFN, 'w') | |
1486 f.close() | |
1487 atexit.register(lambda: os.remove(TESTFN)) | |
1488 | |
1489 unittest.TextTestRunner(verbosity=2).run(test_suite) | |
1490 DEVNULL.close() | |
1491 | |
1492 if __name__ == '__main__': | |
1493 test_main() | |
OLD | NEW |