Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(123)

Side by Side Diff: third_party/psutil/test/test_psutil.py

Issue 8919026: Remove psutil from tree, install via install-build-deps.sh (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Sort package list. Created 9 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/psutil/test/test_memory_leaks.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 #
3 # $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $
4 #
5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
6 # Use of this source code is governed by a BSD-style license that can be
7 # found in the LICENSE file.
8
9 """
10 psutil test suite.
11
12 Note: this is targeted for both python 2.x and 3.x so there's no need
13 to use 2to3 tool first.
14 """
15
16 import unittest
17 import os
18 import sys
19 import subprocess
20 import time
21 import signal
22 import types
23 import traceback
24 import socket
25 import warnings
26 import atexit
27 import errno
28 import threading
29 import tempfile
30 import collections
31
32 import psutil
33
34
35 PYTHON = os.path.realpath(sys.executable)
36 DEVNULL = open(os.devnull, 'r+')
37 TESTFN = os.path.join(os.getcwd(), "$testfile")
38 PY3 = sys.version_info >= (3,)
39 POSIX = os.name == 'posix'
40 LINUX = sys.platform.lower().startswith("linux")
41 WINDOWS = sys.platform.lower().startswith("win32")
42 OSX = sys.platform.lower().startswith("darwin")
43 BSD = sys.platform.lower().startswith("freebsd")
44
45 try:
46 psutil.Process(os.getpid()).get_connections()
47 except NotImplementedError:
48 err = sys.exc_info()[1]
49 SUPPORT_CONNECTIONS = False
50 atexit.register(warnings.warn, "get_connections() not supported on this plat form",
51 RuntimeWarning)
52 else:
53 SUPPORT_CONNECTIONS = True
54
55 if PY3:
56 long = int
57
58 def callable(attr):
59 return isinstance(attr, collections.Callable)
60
61
62 _subprocesses_started = set()
63
64 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None):
65 """Return a subprocess.Popen object to use in tests.
66 By default stdout and stderr are redirected to /dev/null and the
67 python interpreter is used as test process.
68 """
69 if cmd is None:
70 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
71 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin)
72 _subprocesses_started.add(sproc.pid)
73 return sproc
74
75 def sh(cmdline):
76 """run cmd in a subprocess and return its output.
77 raises RuntimeError on error.
78 """
79 p = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE,
80 stderr=subprocess.PIPE)
81 stdout, stderr = p.communicate()
82 if p.returncode != 0:
83 raise RuntimeError(stderr)
84 if stderr:
85 warnings.warn(stderr, RuntimeWarning)
86 if sys.version_info >= (3,):
87 stdout = str(stdout, sys.stdout.encoding)
88 return stdout.strip()
89
90 def wait_for_pid(pid, timeout=1):
91 """Wait for pid to show up in the process list then return.
92 Used in the test suite to give time the sub process to initialize.
93 """
94 raise_at = time.time() + timeout
95 while 1:
96 if pid in psutil.get_pid_list():
97 # give it one more iteration to allow full initialization
98 time.sleep(0.01)
99 return
100 time.sleep(0.0001)
101 if time.time() >= raise_at:
102 raise RuntimeError("Timed out")
103
104 def reap_children(search_all=False):
105 """Kill any subprocess started by this test suite and ensure that
106 no zombies stick around to hog resources and create problems when
107 looking for refleaks.
108 """
109 if search_all:
110 this_process = psutil.Process(os.getpid())
111 pids = [x.pid for x in this_process.get_children()]
112 else:
113 pids =_subprocesses_started
114 while pids:
115 pid = pids.pop()
116 try:
117 child = psutil.Process(pid)
118 child.kill()
119 except psutil.NoSuchProcess:
120 pass
121 else:
122 child.wait()
123
124
125 # we want to search through all processes before exiting
126 atexit.register(reap_children, search_all=True)
127
128 def skipIf(condition, reason="", warn=False):
129 """Decorator which skip a test under if condition is satisfied.
130 This is a substitute of unittest.skipIf which is available
131 only in python 2.7 and 3.2.
132 If 'reason' argument is provided this will be printed during
133 tests execution.
134 If 'warn' is provided a RuntimeWarning will be shown when all
135 tests are run.
136 """
137 def outer(fun, *args, **kwargs):
138 def inner(self):
139 if condition:
140 sys.stdout.write("skipped-")
141 sys.stdout.flush()
142 if warn:
143 objname = "%s.%s" % (self.__class__.__name__, fun.__name__)
144 msg = "%s was skipped" % objname
145 if reason:
146 msg += "; reason: " + repr(reason)
147 atexit.register(warnings.warn, msg, RuntimeWarning)
148 return
149 else:
150 return fun(self, *args, **kwargs)
151 return inner
152 return outer
153
154 def skipUnless(condition, reason="", warn=False):
155 """Contrary of skipIf."""
156 if not condition:
157 return skipIf(True, reason, warn)
158 return skipIf(False)
159
160 def ignore_access_denied(fun):
161 """Decorator to Ignore AccessDenied exceptions."""
162 def outer(fun, *args, **kwargs):
163 def inner(self):
164 try:
165 return fun(self, *args, **kwargs)
166 except psutil.AccessDenied:
167 pass
168 return inner
169 return outer
170
171 def supports_ipv6():
172 """Return True if IPv6 is supported on this platform."""
173 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
174 return False
175 sock = None
176 try:
177 try:
178 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
179 sock.bind(("::1", 0))
180 except (socket.error, socket.gaierror):
181 return False
182 else:
183 return True
184 finally:
185 if sock is not None:
186 sock.close()
187
188
189 class ThreadTask(threading.Thread):
190 """A thread object used for running process thread tests."""
191
192 def __init__(self):
193 threading.Thread.__init__(self)
194 self._running = False
195 self._interval = None
196 self._flag = threading.Event()
197
198 def __repr__(self):
199 name = self.__class__.__name__
200 return '<%s running=%s at %#x>' % (name, self._running, id(self))
201
202 def start(self, interval=0.001):
203 """Start thread and keep it running until an explicit
204 stop() request. Polls for shutdown every 'timeout' seconds.
205 """
206 if self._running:
207 raise ValueError("already started")
208 self._interval = interval
209 threading.Thread.start(self)
210 self._flag.wait()
211
212 def run(self):
213 self._running = True
214 self._flag.set()
215 while self._running:
216 time.sleep(self._interval)
217
218 def stop(self):
219 """Stop thread execution and and waits until it is stopped."""
220 if not self._running:
221 raise ValueError("already stopped")
222 self._running = False
223 self.join()
224
225
226 class TestCase(unittest.TestCase):
227
228 def tearDown(self):
229 reap_children()
230
231 # ============================
232 # tests for system-related API
233 # ============================
234
235 def test_get_process_list(self):
236 pids = [x.pid for x in psutil.get_process_list()]
237 self.assertTrue(os.getpid() in pids)
238
239 def test_process_iter(self):
240 pids = [x.pid for x in psutil.process_iter()]
241 self.assertTrue(os.getpid() in pids)
242
243 def test_TOTAL_PHYMEM(self):
244 x = psutil.TOTAL_PHYMEM
245 self.assertTrue(isinstance(x, (int, long)))
246 self.assertTrue(x > 0)
247
248 def test_BOOT_TIME(self):
249 x = psutil.BOOT_TIME
250 self.assertTrue(isinstance(x, float))
251 self.assertTrue(x > 0)
252
253 def test_deprecated_memory_functions(self):
254 warnings.filterwarnings("error")
255 try:
256 self.assertRaises(DeprecationWarning, psutil.used_phymem)
257 self.assertRaises(DeprecationWarning, psutil.avail_phymem)
258 self.assertRaises(DeprecationWarning, psutil.total_virtmem)
259 self.assertRaises(DeprecationWarning, psutil.used_virtmem)
260 self.assertRaises(DeprecationWarning, psutil.avail_virtmem)
261 finally:
262 warnings.resetwarnings()
263
264 def test_phymem_usage(self):
265 mem = psutil.phymem_usage()
266 self.assertTrue(mem.total > 0)
267 self.assertTrue(mem.used > 0)
268 self.assertTrue(mem.free > 0)
269 self.assertTrue(0 <= mem.percent <= 100)
270
271 def test_virtmem_usage(self):
272 mem = psutil.virtmem_usage()
273 self.assertTrue(mem.total > 0)
274 self.assertTrue(mem.used >= 0)
275 self.assertTrue(mem.free > 0)
276 self.assertTrue(0 <= mem.percent <= 100)
277
278 @skipUnless(LINUX)
279 def test_phymem_buffers(self):
280 x = psutil.phymem_buffers()
281 self.assertTrue(isinstance(x, (int, long)))
282 self.assertTrue(x >= 0)
283
284 def test_pid_exists(self):
285 sproc = get_test_subprocess()
286 wait_for_pid(sproc.pid)
287 self.assertTrue(psutil.pid_exists(sproc.pid))
288 p = psutil.Process(sproc.pid)
289 p.kill()
290 p.wait()
291 self.assertFalse(psutil.pid_exists(sproc.pid))
292 self.assertFalse(psutil.pid_exists(-1))
293
294 def test_pid_exists_2(self):
295 reap_children()
296 pids = psutil.get_pid_list()
297 for pid in pids:
298 try:
299 self.assertTrue(psutil.pid_exists(pid))
300 except AssertionError:
301 # in case the process disappeared in meantime fail only
302 # if it is no longer in get_pid_list()
303 time.sleep(.1)
304 if pid in psutil.get_pid_list():
305 self.fail(pid)
306 pids = range(max(pids) + 5000, max(pids) + 6000)
307 for pid in pids:
308 self.assertFalse(psutil.pid_exists(pid))
309
310 def test_get_pid_list(self):
311 plist = [x.pid for x in psutil.get_process_list()]
312 pidlist = psutil.get_pid_list()
313 self.assertEqual(plist.sort(), pidlist.sort())
314 # make sure every pid is unique
315 self.assertEqual(len(pidlist), len(set(pidlist)))
316
317 def test_test(self):
318 # test for psutil.test() function
319 stdout = sys.stdout
320 sys.stdout = DEVNULL
321 try:
322 psutil.test()
323 finally:
324 sys.stdout = stdout
325
326 def test_sys_cpu_times(self):
327 total = 0
328 times = psutil.cpu_times()
329 sum(times)
330 for cp_time in times:
331 self.assertTrue(isinstance(cp_time, float))
332 self.assertTrue(cp_time >= 0.0)
333 total += cp_time
334 self.assertEqual(total, sum(times))
335 str(times)
336
337 def test_sys_cpu_times2(self):
338 t1 = sum(psutil.cpu_times())
339 time.sleep(0.1)
340 t2 = sum(psutil.cpu_times())
341 difference = t2 - t1
342 if not difference >= 0.05:
343 self.fail("difference %s" % difference)
344
345 def test_sys_per_cpu_times(self):
346 for times in psutil.cpu_times(percpu=True):
347 total = 0
348 sum(times)
349 for cp_time in times:
350 self.assertTrue(isinstance(cp_time, float))
351 self.assertTrue(cp_time >= 0.0)
352 total += cp_time
353 self.assertEqual(total, sum(times))
354 str(times)
355
356 def test_sys_per_cpu_times2(self):
357 tot1 = psutil.cpu_times(percpu=True)
358 stop_at = time.time() + 0.1
359 while 1:
360 if time.time() >= stop_at:
361 break
362 tot2 = psutil.cpu_times(percpu=True)
363 for t1, t2 in zip(tot1, tot2):
364 t1, t2 = sum(t1), sum(t2)
365 difference = t2 - t1
366 if difference >= 0.05:
367 return
368 self.fail()
369
370 def test_sys_cpu_percent(self):
371 psutil.cpu_percent(interval=0.001)
372 psutil.cpu_percent(interval=0.001)
373 for x in range(1000):
374 percent = psutil.cpu_percent(interval=None)
375 self.assertTrue(isinstance(percent, float))
376 self.assertTrue(percent >= 0.0)
377 self.assertTrue(percent <= 100.0)
378
379 def test_sys_per_cpu_percent(self):
380 psutil.cpu_percent(interval=0.001, percpu=True)
381 psutil.cpu_percent(interval=0.001, percpu=True)
382 for x in range(1000):
383 percents = psutil.cpu_percent(interval=None, percpu=True)
384 for percent in percents:
385 self.assertTrue(isinstance(percent, float))
386 self.assertTrue(percent >= 0.0)
387 self.assertTrue(percent <= 100.0)
388
389 def test_sys_cpu_percent_compare(self):
390 psutil.cpu_percent(interval=0)
391 psutil.cpu_percent(interval=0, percpu=True)
392 time.sleep(.1)
393 t1 = psutil.cpu_percent(interval=0)
394 t2 = psutil.cpu_percent(interval=0, percpu=True)
395 # calculate total average
396 t2 = sum(t2) / len(t2)
397 if abs(t1 - t2) > 5:
398 self.assertEqual(t1, t2)
399
400 def test_disk_usage(self):
401 usage = psutil.disk_usage(os.getcwd())
402 self.assertTrue(usage.total > 0)
403 self.assertTrue(usage.used > 0)
404 self.assertTrue(usage.free > 0)
405 self.assertTrue(usage.total > usage.used)
406 self.assertTrue(usage.total > usage.free)
407 self.assertTrue(0 <= usage.percent <= 100)
408
409 # if path does not exist OSError ENOENT is expected across
410 # all platforms
411 fname = tempfile.mktemp()
412 try:
413 psutil.disk_usage(fname)
414 except OSError:
415 err = sys.exc_info()[1]
416 if err.args[0] != errno.ENOENT:
417 raise
418 else:
419 self.fail("OSError not raised")
420
421 def test_disk_partitions(self):
422 for disk in psutil.disk_partitions(all=False):
423 self.assertTrue(os.path.exists(disk.device))
424 self.assertTrue(os.path.isdir(disk.mountpoint))
425 self.assertTrue(disk.fstype)
426 for disk in psutil.disk_partitions(all=True):
427 if not WINDOWS:
428 self.assertTrue(os.path.isdir(disk.mountpoint))
429 self.assertTrue(disk.fstype)
430
431 def find_mount_point(path):
432 path = os.path.abspath(path)
433 while not os.path.ismount(path):
434 path = os.path.dirname(path)
435 return path
436
437 mount = find_mount_point(__file__)
438 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
439 self.assertTrue(mount in mounts)
440 psutil.disk_usage(mount)
441
442 # XXX
443 @skipUnless(hasattr(psutil, "network_io_counters"))
444 def test_anetwork_io_counters(self):
445 def check_ntuple(nt):
446 self.assertEqual(nt[0], nt.bytes_sent)
447 self.assertEqual(nt[1], nt.bytes_recv)
448 self.assertEqual(nt[2], nt.packets_sent)
449 self.assertEqual(nt[3], nt.packets_recv)
450 self.assertTrue(nt.bytes_sent >= 0)
451 self.assertTrue(nt.bytes_recv >= 0)
452 self.assertTrue(nt.packets_sent >= 0)
453 self.assertTrue(nt.packets_recv >= 0)
454
455 ret = psutil.network_io_counters(pernic=False)
456 check_ntuple(ret)
457 ret = psutil.network_io_counters(pernic=True)
458 for name, ntuple in ret.iteritems():
459 self.assertTrue(name)
460 check_ntuple(ntuple)
461 # XXX
462 @skipUnless(hasattr(psutil, "disk_io_counters"))
463 def test_disk_io_counters(self):
464 def check_ntuple(nt):
465 self.assertEqual(nt[0], nt.read_count)
466 self.assertEqual(nt[1], nt.write_count)
467 self.assertEqual(nt[2], nt.read_bytes)
468 self.assertEqual(nt[3], nt.write_bytes)
469 self.assertEqual(nt[4], nt.read_time)
470 self.assertEqual(nt[5], nt.write_time)
471 self.assertTrue(nt.read_count >= 0)
472 self.assertTrue(nt.write_count >= 0)
473 self.assertTrue(nt.read_bytes >= 0)
474 self.assertTrue(nt.write_bytes >= 0)
475 self.assertTrue(nt.read_time >= 0)
476 self.assertTrue(nt.write_time >= 0)
477
478 ret = psutil.disk_io_counters(perdisk=False)
479 check_ntuple(ret)
480 ret = psutil.disk_io_counters(perdisk=True)
481 for name, ntuple in ret.iteritems():
482 self.assertTrue(name)
483 check_ntuple(ntuple)
484
485 # ====================
486 # Process object tests
487 # ====================
488
489 def test_kill(self):
490 sproc = get_test_subprocess()
491 test_pid = sproc.pid
492 wait_for_pid(test_pid)
493 p = psutil.Process(test_pid)
494 name = p.name
495 p.kill()
496 p.wait()
497 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
498
499 def test_terminate(self):
500 sproc = get_test_subprocess()
501 test_pid = sproc.pid
502 wait_for_pid(test_pid)
503 p = psutil.Process(test_pid)
504 name = p.name
505 p.terminate()
506 p.wait()
507 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
508
509 def test_send_signal(self):
510 if POSIX:
511 sig = signal.SIGKILL
512 else:
513 sig = signal.SIGTERM
514 sproc = get_test_subprocess()
515 test_pid = sproc.pid
516 p = psutil.Process(test_pid)
517 name = p.name
518 p.send_signal(sig)
519 p.wait()
520 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
521
522 def test_wait(self):
523 # check exit code signal
524 sproc = get_test_subprocess()
525 p = psutil.Process(sproc.pid)
526 p.kill()
527 code = p.wait()
528 if os.name == 'posix':
529 self.assertEqual(code, signal.SIGKILL)
530 else:
531 self.assertEqual(code, 0)
532 self.assertFalse(p.is_running())
533
534 sproc = get_test_subprocess()
535 p = psutil.Process(sproc.pid)
536 p.terminate()
537 code = p.wait()
538 if os.name == 'posix':
539 self.assertEqual(code, signal.SIGTERM)
540 else:
541 self.assertEqual(code, 0)
542 self.assertFalse(p.is_running())
543
544 # check sys.exit() code
545 code = "import time, sys; time.sleep(0.01); sys.exit(5);"
546 sproc = get_test_subprocess([PYTHON, "-c", code])
547 p = psutil.Process(sproc.pid)
548 self.assertEqual(p.wait(), 5)
549 self.assertFalse(p.is_running())
550
551 # Test wait() issued twice.
552 # It is not supposed to raise NSP when the process is gone.
553 # On UNIX this should return None, on Windows it should keep
554 # returning the exit code.
555 sproc = get_test_subprocess([PYTHON, "-c", code])
556 p = psutil.Process(sproc.pid)
557 self.assertEqual(p.wait(), 5)
558 self.assertTrue(p.wait() in (5, None))
559
560 # test timeout
561 sproc = get_test_subprocess()
562 p = psutil.Process(sproc.pid)
563 p.name
564 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
565
566 # timeout < 0 not allowed
567 self.assertRaises(ValueError, p.wait, -1)
568
569 @skipUnless(POSIX)
570 def test_wait_non_children(self):
571 # test wait() against processes which are not our children
572 code = "import sys;"
573 code += "from subprocess import Popen, PIPE;"
574 code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON
575 code += "sp = Popen(cmd, stdout=PIPE);"
576 code += "sys.stdout.write(str(sp.pid));"
577 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE )
578
579 grandson_pid = int(sproc.stdout.read())
580 grandson_proc = psutil.Process(grandson_pid)
581 try:
582 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
583 grandson_proc.kill()
584 ret = grandson_proc.wait()
585 self.assertEqual(ret, None)
586 finally:
587 if grandson_proc.is_running():
588 grandson_proc.kill()
589 grandson_proc.wait()
590
591 def test_wait_timeout_0(self):
592 sproc = get_test_subprocess()
593 p = psutil.Process(sproc.pid)
594 self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
595 p.kill()
596 stop_at = time.time() + 2
597 while 1:
598 try:
599 code = p.wait(0)
600 except psutil.TimeoutExpired:
601 if time.time() >= stop_at:
602 raise
603 else:
604 break
605 if os.name == 'posix':
606 self.assertEqual(code, signal.SIGKILL)
607 else:
608 self.assertEqual(code, 0)
609 self.assertFalse(p.is_running())
610
611 def test_cpu_percent(self):
612 p = psutil.Process(os.getpid())
613 p.get_cpu_percent(interval=0.001)
614 p.get_cpu_percent(interval=0.001)
615 for x in range(100):
616 percent = p.get_cpu_percent(interval=None)
617 self.assertTrue(isinstance(percent, float))
618 self.assertTrue(percent >= 0.0)
619 self.assertTrue(percent <= 100.0)
620
621 def test_cpu_times(self):
622 times = psutil.Process(os.getpid()).get_cpu_times()
623 self.assertTrue((times.user > 0.0) or (times.system > 0.0))
624 # make sure returned values can be pretty printed with strftime
625 time.strftime("%H:%M:%S", time.localtime(times.user))
626 time.strftime("%H:%M:%S", time.localtime(times.system))
627
628 # Test Process.cpu_times() against os.times()
629 # os.times() is broken on Python 2.6
630 # http://bugs.python.org/issue1040026
631 # XXX fails on OSX: not sure if it's for os.times(). We should
632 # try this with Python 2.7 and re-enable the test.
633
634 @skipUnless(sys.version_info > (2, 6, 1) and not OSX)
635 def test_cpu_times2(self):
636 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
637 utime, ktime = os.times()[:2]
638
639 # Use os.times()[:2] as base values to compare our results
640 # using a tolerance of +/- 0.1 seconds.
641 # It will fail if the difference between the values is > 0.1s.
642 if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
643 self.fail("expected: %s, found: %s" %(utime, user_time))
644
645 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
646 self.fail("expected: %s, found: %s" %(ktime, kernel_time))
647
648 def test_create_time(self):
649 sproc = get_test_subprocess()
650 now = time.time()
651 wait_for_pid(sproc.pid)
652 p = psutil.Process(sproc.pid)
653 create_time = p.create_time
654
655 # Use time.time() as base value to compare our result using a
656 # tolerance of +/- 1 second.
657 # It will fail if the difference between the values is > 2s.
658 difference = abs(create_time - now)
659 if difference > 2:
660 self.fail("expected: %s, found: %s, difference: %s"
661 % (now, create_time, difference))
662
663 # make sure returned value can be pretty printed with strftime
664 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
665
666 @skipIf(WINDOWS)
667 def test_terminal(self):
668 tty = sh('tty')
669 p = psutil.Process(os.getpid())
670 self.assertEqual(p.terminal, tty)
671
672 @skipIf(OSX, warn=False)
673 def test_get_io_counters(self):
674 p = psutil.Process(os.getpid())
675 # test reads
676 io1 = p.get_io_counters()
677 f = open(PYTHON, 'rb')
678 f.read()
679 f.close()
680 io2 = p.get_io_counters()
681 if not BSD:
682 self.assertTrue(io2.read_count > io1.read_count)
683 self.assertTrue(io2.write_count == io1.write_count)
684 self.assertTrue(io2.read_bytes >= io1.read_bytes)
685 self.assertTrue(io2.write_bytes >= io1.write_bytes)
686 # test writes
687 io1 = p.get_io_counters()
688 f = tempfile.TemporaryFile()
689 if sys.version_info >= (3,):
690 f.write(bytes("x" * 1000000, 'ascii'))
691 else:
692 f.write("x" * 1000000)
693 f.close()
694 io2 = p.get_io_counters()
695 if not BSD:
696 self.assertTrue(io2.write_count > io1.write_count)
697 self.assertTrue(io2.write_bytes > io1.write_bytes)
698 self.assertTrue(io2.read_count >= io1.read_count)
699 self.assertTrue(io2.read_bytes >= io1.read_bytes)
700
701 @skipUnless(LINUX)
702 def test_get_set_ionice(self):
703 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE,
704 IOPRIO_CLASS_IDLE)
705 self.assertEqual(IOPRIO_CLASS_NONE, 0)
706 self.assertEqual(IOPRIO_CLASS_RT, 1)
707 self.assertEqual(IOPRIO_CLASS_BE, 2)
708 self.assertEqual(IOPRIO_CLASS_IDLE, 3)
709 p = psutil.Process(os.getpid())
710 try:
711 p.set_ionice(2)
712 ioclass, value = p.get_ionice()
713 self.assertEqual(ioclass, 2)
714 self.assertEqual(value, 4)
715 #
716 p.set_ionice(3)
717 ioclass, value = p.get_ionice()
718 self.assertEqual(ioclass, 3)
719 self.assertEqual(value, 0)
720 #
721 p.set_ionice(2, 0)
722 ioclass, value = p.get_ionice()
723 self.assertEqual(ioclass, 2)
724 self.assertEqual(value, 0)
725 p.set_ionice(2, 7)
726 ioclass, value = p.get_ionice()
727 self.assertEqual(ioclass, 2)
728 self.assertEqual(value, 7)
729 self.assertRaises(ValueError, p.set_ionice, 2, 10)
730 finally:
731 p.set_ionice(IOPRIO_CLASS_NONE)
732
733 def test_get_num_threads(self):
734 # on certain platforms such as Linux we might test for exact
735 # thread number, since we always have with 1 thread per process,
736 # but this does not apply across all platforms (OSX, Windows)
737 p = psutil.Process(os.getpid())
738 step1 = p.get_num_threads()
739
740 thread = ThreadTask()
741 thread.start()
742 try:
743 step2 = p.get_num_threads()
744 self.assertEqual(step2, step1 + 1)
745 thread.stop()
746 finally:
747 if thread._running:
748 thread.stop()
749
750 def test_get_threads(self):
751 p = psutil.Process(os.getpid())
752 step1 = p.get_threads()
753
754 thread = ThreadTask()
755 thread.start()
756
757 try:
758 step2 = p.get_threads()
759 self.assertEqual(len(step2), len(step1) + 1)
760 # on Linux, first thread id is supposed to be this process
761 if LINUX:
762 self.assertEqual(step2[0].id, os.getpid())
763 athread = step2[0]
764 # test named tuple
765 self.assertEqual(athread.id, athread[0])
766 self.assertEqual(athread.user_time, athread[1])
767 self.assertEqual(athread.system_time, athread[2])
768 # test num threads
769 thread.stop()
770 finally:
771 if thread._running:
772 thread.stop()
773
774 def test_get_memory_info(self):
775 p = psutil.Process(os.getpid())
776
777 # step 1 - get a base value to compare our results
778 rss1, vms1 = p.get_memory_info()
779 percent1 = p.get_memory_percent()
780 self.assertTrue(rss1 > 0)
781 self.assertTrue(vms1 > 0)
782
783 # step 2 - allocate some memory
784 memarr = [None] * 1500000
785
786 rss2, vms2 = p.get_memory_info()
787 percent2 = p.get_memory_percent()
788 # make sure that the memory usage bumped up
789 self.assertTrue(rss2 > rss1)
790 self.assertTrue(vms2 >= vms1) # vms might be equal
791 self.assertTrue(percent2 > percent1)
792 del memarr
793
794 def test_get_memory_percent(self):
795 p = psutil.Process(os.getpid())
796 self.assertTrue(p.get_memory_percent() > 0.0)
797
798 def test_pid(self):
799 sproc = get_test_subprocess()
800 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
801
802 def test_is_running(self):
803 sproc = get_test_subprocess()
804 wait_for_pid(sproc.pid)
805 p = psutil.Process(sproc.pid)
806 self.assertTrue(p.is_running())
807 p.kill()
808 p.wait()
809 self.assertFalse(p.is_running())
810
811 def test_exe(self):
812 sproc = get_test_subprocess()
813 wait_for_pid(sproc.pid)
814 try:
815 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON)
816 except AssertionError:
817 # certain platforms such as BSD are more accurate returning:
818 # "/usr/local/bin/python2.7"
819 # ...instead of:
820 # "/usr/local/bin/python"
821 # We do not want to consider this difference in accuracy
822 # an error.
823 name = psutil.Process(sproc.pid).exe
824 adjusted_name = PYTHON[:len(name)]
825 self.assertEqual(name, adjusted_name)
826 for p in psutil.process_iter():
827 try:
828 exe = p.exe
829 except psutil.Error:
830 continue
831 if not exe:
832 continue
833 if not os.path.exists(exe):
834 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \
835 % (repr(exe), p.pid, p.name, p.cmdline))
836 if hasattr(os, 'access') and hasattr(os, "X_OK"):
837 if not os.access(p.exe, os.X_OK):
838 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s )" \
839 % (repr(p.exe), p.pid, p.name, p.cmdline))
840
841 def test_cmdline(self):
842 sproc = get_test_subprocess([PYTHON, "-E"])
843 wait_for_pid(sproc.pid)
844 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"])
845
846 def test_name(self):
847 sproc = get_test_subprocess(PYTHON)
848 wait_for_pid(sproc.pid)
849 self.assertEqual(psutil.Process(sproc.pid).name.lower(),
850 os.path.basename(sys.executable).lower())
851
852 if os.name == 'posix':
853
854 def test_uids(self):
855 p = psutil.Process(os.getpid())
856 real, effective, saved = p.uids
857 # os.getuid() refers to "real" uid
858 self.assertEqual(real, os.getuid())
859 # os.geteuid() refers to "effective" uid
860 self.assertEqual(effective, os.geteuid())
861 # no such thing as os.getsuid() ("saved" uid), but starting
862 # from python 2.7 we have os.getresuid()[2]
863 if hasattr(os, "getresuid"):
864 self.assertEqual(saved, os.getresuid()[2])
865
866 def test_gids(self):
867 p = psutil.Process(os.getpid())
868 real, effective, saved = p.gids
869 # os.getuid() refers to "real" uid
870 self.assertEqual(real, os.getgid())
871 # os.geteuid() refers to "effective" uid
872 self.assertEqual(effective, os.getegid())
873 # no such thing as os.getsuid() ("saved" uid), but starting
874 # from python 2.7 we have os.getresgid()[2]
875 if hasattr(os, "getresuid"):
876 self.assertEqual(saved, os.getresgid()[2])
877
878 def test_nice(self):
879 p = psutil.Process(os.getpid())
880 self.assertRaises(TypeError, setattr, p, "nice", "str")
881 try:
882 try:
883 first_nice = p.nice
884 p.nice = 1
885 self.assertEqual(p.nice, 1)
886 # going back to previous nice value raises AccessDenied on O SX
887 if not OSX:
888 p.nice = 0
889 self.assertEqual(p.nice, 0)
890 except psutil.AccessDenied:
891 pass
892 finally:
893 # going back to previous nice value raises AccessDenied on OSX
894 if not OSX:
895 p.nice = first_nice
896
897 if os.name == 'nt':
898
899 def test_nice(self):
900 p = psutil.Process(os.getpid())
901 self.assertRaises(TypeError, setattr, p, "nice", "str")
902 try:
903 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS)
904 p.nice = psutil.HIGH_PRIORITY_CLASS
905 self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS)
906 p.nice = psutil.NORMAL_PRIORITY_CLASS
907 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS)
908 finally:
909 p.nice = psutil.NORMAL_PRIORITY_CLASS
910
911 def test_status(self):
912 p = psutil.Process(os.getpid())
913 self.assertEqual(p.status, psutil.STATUS_RUNNING)
914 self.assertEqual(str(p.status), "running")
915 for p in psutil.process_iter():
916 if str(p.status) == '?':
917 self.fail("invalid status for pid %d" % p.pid)
918
919 def test_username(self):
920 sproc = get_test_subprocess()
921 p = psutil.Process(sproc.pid)
922 if POSIX:
923 import pwd
924 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name)
925 elif WINDOWS:
926 expected_username = os.environ['USERNAME']
927 expected_domain = os.environ['USERDOMAIN']
928 domain, username = p.username.split('\\')
929 self.assertEqual(domain, expected_domain)
930 self.assertEqual(username, expected_username)
931 else:
932 p.username
933
934 @skipUnless(WINDOWS or LINUX)
935 def test_getcwd(self):
936 sproc = get_test_subprocess()
937 wait_for_pid(sproc.pid)
938 p = psutil.Process(sproc.pid)
939 self.assertEqual(p.getcwd(), os.getcwd())
940
941 @skipUnless(WINDOWS or LINUX)
942 def test_getcwd_2(self):
943 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"]
944 sproc = get_test_subprocess(cmd)
945 wait_for_pid(sproc.pid)
946 p = psutil.Process(sproc.pid)
947 time.sleep(0.1)
948 expected_dir = os.path.dirname(os.getcwd())
949 self.assertEqual(p.getcwd(), expected_dir)
950
951 def test_get_open_files(self):
952 # current process
953 p = psutil.Process(os.getpid())
954 files = p.get_open_files()
955 self.assertFalse(TESTFN in files)
956 f = open(TESTFN, 'r')
957 time.sleep(.1)
958 filenames = [x.path for x in p.get_open_files()]
959 self.assertTrue(TESTFN in filenames)
960 f.close()
961 for file in filenames:
962 self.assertTrue(os.path.isfile(file))
963
964 # another process
965 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN
966 sproc = get_test_subprocess([PYTHON, "-c", cmdline])
967 wait_for_pid(sproc.pid)
968 time.sleep(0.1)
969 p = psutil.Process(sproc.pid)
970 for x in range(100):
971 filenames = [x.path for x in p.get_open_files()]
972 if TESTFN in filenames:
973 break
974 time.sleep(.01)
975 else:
976 self.assertTrue(TESTFN in filenames)
977 for file in filenames:
978 self.assertTrue(os.path.isfile(file))
979 # all processes
980 for proc in psutil.process_iter():
981 try:
982 files = proc.get_open_files()
983 except psutil.Error:
984 pass
985 else:
986 for file in filenames:
987 self.assertTrue(os.path.isfile(file))
988
989 def test_get_open_files2(self):
990 # test fd and path fields
991 fileobj = open(TESTFN, 'r')
992 p = psutil.Process(os.getpid())
993 for path, fd in p.get_open_files():
994 if path == fileobj.name or fd == fileobj.fileno():
995 break
996 else:
997 self.fail("no file found; files=%s" % repr(p.get_open_files()))
998 self.assertEqual(path, fileobj.name)
999 if WINDOWS:
1000 self.assertEqual(fd, -1)
1001 else:
1002 self.assertEqual(fd, fileobj.fileno())
1003 # test positions
1004 ntuple = p.get_open_files()[0]
1005 self.assertEqual(ntuple[0], ntuple.path)
1006 self.assertEqual(ntuple[1], ntuple.fd)
1007 # test file is gone
1008 fileobj.close()
1009 self.assertTrue(fileobj.name not in p.get_open_files())
1010
1011 @skipUnless(SUPPORT_CONNECTIONS, warn=1)
1012 def test_get_connections(self):
1013 arg = "import socket, time;" \
1014 "s = socket.socket();" \
1015 "s.bind(('127.0.0.1', 0));" \
1016 "s.listen(1);" \
1017 "conn, addr = s.accept();" \
1018 "time.sleep(100);"
1019 sproc = get_test_subprocess([PYTHON, "-c", arg])
1020 p = psutil.Process(sproc.pid)
1021 for x in range(100):
1022 cons = p.get_connections()
1023 if cons:
1024 break
1025 time.sleep(.01)
1026 self.assertEqual(len(cons), 1)
1027 con = cons[0]
1028 self.assertEqual(con.family, socket.AF_INET)
1029 self.assertEqual(con.type, socket.SOCK_STREAM)
1030 self.assertEqual(con.status, "LISTEN")
1031 ip, port = con.local_address
1032 self.assertEqual(ip, '127.0.0.1')
1033 self.assertEqual(con.remote_address, ())
1034 if WINDOWS:
1035 self.assertEqual(con.fd, -1)
1036 else:
1037 self.assertTrue(con.fd > 0)
1038 # test positions
1039 self.assertEqual(con[0], con.fd)
1040 self.assertEqual(con[1], con.family)
1041 self.assertEqual(con[2], con.type)
1042 self.assertEqual(con[3], con.local_address)
1043 self.assertEqual(con[4], con.remote_address)
1044 self.assertEqual(con[5], con.status)
1045
1046 @skipUnless(supports_ipv6())
1047 def test_get_connections_ipv6(self):
1048 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
1049 s.bind(('::1', 0))
1050 s.listen(1)
1051 cons = psutil.Process(os.getpid()).get_connections()
1052 s.close()
1053 self.assertEqual(len(cons), 1)
1054 self.assertEqual(cons[0].local_address[0], '::1')
1055
1056 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS)
1057 def test_connection_fromfd(self):
1058 sock = socket.socket()
1059 sock.bind(('localhost', 0))
1060 sock.listen(1)
1061 p = psutil.Process(os.getpid())
1062 for conn in p.get_connections():
1063 if conn.fd == sock.fileno():
1064 break
1065 else:
1066 sock.close()
1067 self.fail("couldn't find socket fd")
1068 dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
1069 try:
1070 self.assertEqual(dupsock.getsockname(), conn.local_address)
1071 self.assertNotEqual(sock.fileno(), dupsock.fileno())
1072 finally:
1073 sock.close()
1074 dupsock.close()
1075
1076 @skipUnless(SUPPORT_CONNECTIONS, warn=1)
1077 def test_get_connections_all(self):
1078
1079 def check_address(addr, family):
1080 if not addr:
1081 return
1082 ip, port = addr
1083 self.assertTrue(isinstance(port, int))
1084 if family == socket.AF_INET:
1085 ip = list(map(int, ip.split('.')))
1086 self.assertTrue(len(ip) == 4)
1087 for num in ip:
1088 self.assertTrue(0 <= num <= 255)
1089 self.assertTrue(0 <= port <= 65535)
1090
1091 # all values are supposed to match Linux's tcp_states.h states
1092 # table across all platforms.
1093 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
1094 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
1095 "LAST_ACK", "LISTEN", "CLOSING", ""]
1096
1097 tcp_template = "import socket;" \
1098 "s = socket.socket($family, socket.SOCK_STREAM);" \
1099 "s.bind(('$addr', 0));" \
1100 "s.listen(1);" \
1101 "conn, addr = s.accept();"
1102
1103 udp_template = "import socket, time;" \
1104 "s = socket.socket($family, socket.SOCK_DGRAM);" \
1105 "s.bind(('$addr', 0));" \
1106 "time.sleep(100);"
1107
1108 from string import Template
1109 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET,
1110 addr="127.0.0.1")
1111 udp4_template = Template(udp_template).substitute(family=socket.AF_INET,
1112 addr="127.0.0.1")
1113 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6 ,
1114 addr="::1")
1115 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6 ,
1116 addr="::1")
1117
1118 # launch various subprocess instantiating a socket of various
1119 # families and tupes to enrich psutil results
1120 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template])
1121 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template])
1122 if supports_ipv6():
1123 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template])
1124 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template])
1125 else:
1126 tcp6_proc = None
1127 udp6_proc = None
1128
1129 # --- check connections of all processes
1130
1131 time.sleep(0.1)
1132 for p in psutil.process_iter():
1133 try:
1134 cons = p.get_connections()
1135 except (psutil.NoSuchProcess, psutil.AccessDenied):
1136 pass
1137 else:
1138 for conn in cons:
1139 self.assertTrue(conn.type in (socket.SOCK_STREAM,
1140 socket.SOCK_DGRAM))
1141 self.assertTrue(conn.family in (socket.AF_INET,
1142 socket.AF_INET6))
1143 check_address(conn.local_address, conn.family)
1144 check_address(conn.remote_address, conn.family)
1145 if conn.status not in valid_states:
1146 self.fail("%s is not a valid status" %conn.status)
1147 # actually try to bind the local socket; ignore IPv6
1148 # sockets as their address might be represented as
1149 # an IPv4-mapped-address (e.g. "::127.0.0.1")
1150 # and that's rejected by bind()
1151 if conn.family == socket.AF_INET:
1152 s = socket.socket(conn.family, conn.type)
1153 s.bind((conn.local_address[0], 0))
1154 s.close()
1155
1156 if not WINDOWS and hasattr(socket, 'fromfd'):
1157 dupsock = None
1158 try:
1159 try:
1160 dupsock = socket.fromfd(conn.fd, conn.family,
1161 conn.type)
1162 except (socket.error, OSError):
1163 err = sys.exc_info()[1]
1164 if err.args[0] == errno.EBADF:
1165 continue
1166 else:
1167 raise
1168 # python >= 2.5
1169 if hasattr(dupsock, "family"):
1170 self.assertEqual(dupsock.family, conn.family)
1171 self.assertEqual(dupsock.type, conn.type)
1172 finally:
1173 if dupsock is not None:
1174 dupsock.close()
1175
1176
1177 # --- check matches against subprocesses
1178
1179 for p in psutil.Process(os.getpid()).get_children():
1180 for conn in p.get_connections():
1181 # TCP v4
1182 if p.pid == tcp4_proc.pid:
1183 self.assertEqual(conn.family, socket.AF_INET)
1184 self.assertEqual(conn.type, socket.SOCK_STREAM)
1185 self.assertEqual(conn.local_address[0], "127.0.0.1")
1186 self.assertEqual(conn.remote_address, ())
1187 self.assertEqual(conn.status, "LISTEN")
1188 # UDP v4
1189 elif p.pid == udp4_proc.pid:
1190 self.assertEqual(conn.family, socket.AF_INET)
1191 self.assertEqual(conn.type, socket.SOCK_DGRAM)
1192 self.assertEqual(conn.local_address[0], "127.0.0.1")
1193 self.assertEqual(conn.remote_address, ())
1194 self.assertEqual(conn.status, "")
1195 # TCP v6
1196 elif p.pid == getattr(tcp6_proc, "pid", None):
1197 self.assertEqual(conn.family, socket.AF_INET6)
1198 self.assertEqual(conn.type, socket.SOCK_STREAM)
1199 self.assertTrue(conn.local_address[0] in ("::", "::1"))
1200 self.assertEqual(conn.remote_address, ())
1201 self.assertEqual(conn.status, "LISTEN")
1202 # UDP v6
1203 elif p.pid == getattr(udp6_proc, "pid", None):
1204 self.assertEqual(conn.family, socket.AF_INET6)
1205 self.assertEqual(conn.type, socket.SOCK_DGRAM)
1206 self.assertTrue(conn.local_address[0] in ("::", "::1"))
1207 self.assertEqual(conn.remote_address, ())
1208 self.assertEqual(conn.status, "")
1209
1210 def test_parent_ppid(self):
1211 this_parent = os.getpid()
1212 sproc = get_test_subprocess()
1213 p = psutil.Process(sproc.pid)
1214 self.assertEqual(p.ppid, this_parent)
1215 self.assertEqual(p.parent.pid, this_parent)
1216 # no other process is supposed to have us as parent
1217 for p in psutil.process_iter():
1218 if p.pid == sproc.pid:
1219 continue
1220 self.assertTrue(p.ppid != this_parent)
1221
1222 def test_get_children(self):
1223 p = psutil.Process(os.getpid())
1224 self.assertEqual(p.get_children(), [])
1225 sproc = get_test_subprocess()
1226 children = p.get_children()
1227 self.assertEqual(len(children), 1)
1228 self.assertEqual(children[0].pid, sproc.pid)
1229 self.assertEqual(children[0].ppid, os.getpid())
1230
1231 def test_suspend_resume(self):
1232 sproc = get_test_subprocess()
1233 p = psutil.Process(sproc.pid)
1234 p.suspend()
1235 time.sleep(0.1)
1236 self.assertEqual(p.status, psutil.STATUS_STOPPED)
1237 self.assertEqual(str(p.status), "stopped")
1238 p.resume()
1239 self.assertTrue(p.status != psutil.STATUS_STOPPED)
1240
1241 def test_invalid_pid(self):
1242 self.assertRaises(ValueError, psutil.Process, "1")
1243 self.assertRaises(ValueError, psutil.Process, None)
1244 # Refers to Issue #12
1245 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1)
1246
1247 def test_zombie_process(self):
1248 # Test that NoSuchProcess exception gets raised in case the
1249 # process dies after we create the Process object.
1250 # Example:
1251 # >>> proc = Process(1234)
1252 # >>> time.sleep(5) # time-consuming task, process dies in meantime
1253 # >>> proc.name
1254 # Refers to Issue #15
1255 sproc = get_test_subprocess()
1256 p = psutil.Process(sproc.pid)
1257 p.kill()
1258 p.wait()
1259
1260 for name in dir(p):
1261 if name.startswith('_')\
1262 or name in ('pid', 'send_signal', 'is_running', 'set_ionice',
1263 'wait'):
1264 continue
1265 try:
1266 meth = getattr(p, name)
1267 if callable(meth):
1268 meth()
1269 except psutil.NoSuchProcess:
1270 pass
1271 else:
1272 self.fail("NoSuchProcess exception not raised for %r" % name)
1273
1274 # other methods
1275 try:
1276 if os.name == 'posix':
1277 p.nice = 1
1278 else:
1279 p.nice = psutil.NORMAL_PRIORITY_CLASS
1280 except psutil.NoSuchProcess:
1281 pass
1282 else:
1283 self.fail("exception not raised")
1284 if hasattr(p, 'set_ionice'):
1285 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2)
1286 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
1287 self.assertFalse(p.is_running())
1288
1289 def test__str__(self):
1290 sproc = get_test_subprocess()
1291 p = psutil.Process(sproc.pid)
1292 self.assertTrue(str(sproc.pid) in str(p))
1293 # python shows up as 'Python' in cmdline on OS X so test fails on OS X
1294 if not OSX:
1295 self.assertTrue(os.path.basename(PYTHON) in str(p))
1296 sproc = get_test_subprocess()
1297 p = psutil.Process(sproc.pid)
1298 p.kill()
1299 p.wait()
1300 self.assertTrue(str(sproc.pid) in str(p))
1301 self.assertTrue("terminated" in str(p))
1302
1303 def test_fetch_all(self):
1304 valid_procs = 0
1305 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate',
1306 'kill', 'wait']
1307 excluded_names += ['get_cpu_percent', 'get_children']
1308 # XXX - skip slow lsof implementation;
1309 if BSD:
1310 excluded_names += ['get_open_files', 'get_connections']
1311 if OSX:
1312 excluded_names += ['get_connections']
1313 attrs = []
1314 for name in dir(psutil.Process):
1315 if name.startswith("_"):
1316 continue
1317 if name.startswith("set_"):
1318 continue
1319 if name in excluded_names:
1320 continue
1321 attrs.append(name)
1322
1323 for p in psutil.process_iter():
1324 for name in attrs:
1325 try:
1326 try:
1327 attr = getattr(p, name, None)
1328 if attr is not None and callable(attr):
1329 ret = attr()
1330 else:
1331 ret = attr
1332 valid_procs += 1
1333 except (psutil.NoSuchProcess, psutil.AccessDenied):
1334 err = sys.exc_info()[1]
1335 self.assertEqual(err.pid, p.pid)
1336 if err.name:
1337 self.assertEqual(err.name, p.name)
1338 self.assertTrue(str(err))
1339 self.assertTrue(err.msg)
1340 else:
1341 if name == 'parent' or ret in (0, 0.0, [], None):
1342 continue
1343 self.assertTrue(ret)
1344 if name == "exe":
1345 self.assertTrue(os.path.isfile(ret))
1346 elif name == "getcwd":
1347 # XXX - temporary fix; on my Linux box
1348 # chrome process cws is errnously reported
1349 # as /proc/4144/fdinfo whichd doesn't exist
1350 if 'chrome' in p.name:
1351 continue
1352 self.assertTrue(os.path.isdir(ret))
1353 except Exception:
1354 err = sys.exc_info()[1]
1355 trace = traceback.format_exc()
1356 self.fail('%s\nmethod=%s, pid=%s, retvalue=%s'
1357 %(trace, name, p.pid, repr(ret)))
1358
1359 # we should always have a non-empty list, not including PID 0 etc.
1360 # special cases.
1361 self.assertTrue(valid_procs > 0)
1362
1363 @skipIf(LINUX)
1364 def test_pid_0(self):
1365 # Process(0) is supposed to work on all platforms except Linux
1366 p = psutil.Process(0)
1367 self.assertTrue(p.name)
1368
1369 if os.name == 'posix':
1370 self.assertEqual(p.uids.real, 0)
1371 self.assertEqual(p.gids.real, 0)
1372
1373 self.assertTrue(p.ppid in (0, 1))
1374 #self.assertEqual(p.exe, "")
1375 self.assertEqual(p.cmdline, [])
1376 try:
1377 p.get_num_threads()
1378 except psutil.AccessDenied:
1379 pass
1380
1381 if OSX : #and os.geteuid() != 0:
1382 self.assertRaises(psutil.AccessDenied, p.get_memory_info)
1383 self.assertRaises(psutil.AccessDenied, p.get_cpu_times)
1384 else:
1385 p.get_memory_info()
1386
1387 # username property
1388 if POSIX:
1389 self.assertEqual(p.username, 'root')
1390 elif WINDOWS:
1391 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM')
1392 else:
1393 p.username
1394
1395 self.assertTrue(0 in psutil.get_pid_list())
1396 self.assertTrue(psutil.pid_exists(0))
1397
1398 def test_Popen(self):
1399 # Popen class test
1400 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
1401 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1402 try:
1403 proc.name
1404 proc.stdin
1405 self.assertTrue(hasattr(proc, 'name'))
1406 self.assertTrue(hasattr(proc, 'stdin'))
1407 self.assertRaises(AttributeError, getattr, proc, 'foo')
1408 finally:
1409 proc.kill()
1410 proc.wait()
1411
1412
1413 if hasattr(os, 'getuid'):
1414 class LimitedUserTestCase(TestCase):
1415 """Repeat the previous tests by using a limited user.
1416 Executed only on UNIX and only if the user who run the test script
1417 is root.
1418 """
1419 # the uid/gid the test suite runs under
1420 PROCESS_UID = os.getuid()
1421 PROCESS_GID = os.getgid()
1422
1423 def __init__(self, *args, **kwargs):
1424 TestCase.__init__(self, *args, **kwargs)
1425 # re-define all existent test methods in order to
1426 # ignore AccessDenied exceptions
1427 for attr in [x for x in dir(self) if x.startswith('test')]:
1428 meth = getattr(self, attr)
1429 def test_(self):
1430 try:
1431 meth()
1432 except psutil.AccessDenied:
1433 pass
1434 setattr(self, attr, types.MethodType(test_, self))
1435
1436 def setUp(self):
1437 os.setegid(1000)
1438 os.seteuid(1000)
1439 TestCase.setUp(self)
1440
1441 def tearDown(self):
1442 os.setegid(self.PROCESS_UID)
1443 os.seteuid(self.PROCESS_GID)
1444 TestCase.tearDown(self)
1445
1446 def test_nice(self):
1447 try:
1448 psutil.Process(os.getpid()).nice = -1
1449 except psutil.AccessDenied:
1450 pass
1451 else:
1452 self.fail("exception not raised")
1453
1454
1455 def test_main():
1456 tests = []
1457 test_suite = unittest.TestSuite()
1458 tests.append(TestCase)
1459
1460 if POSIX:
1461 from _posix import PosixSpecificTestCase
1462 tests.append(PosixSpecificTestCase)
1463
1464 # import the specific platform test suite
1465 if LINUX:
1466 from _linux import LinuxSpecificTestCase as stc
1467 elif WINDOWS:
1468 from _windows import WindowsSpecificTestCase as stc
1469 elif OSX:
1470 from _osx import OSXSpecificTestCase as stc
1471 elif BSD:
1472 from _bsd import BSDSpecificTestCase as stc
1473 tests.append(stc)
1474
1475 if hasattr(os, 'getuid'):
1476 if os.getuid() == 0:
1477 tests.append(LimitedUserTestCase)
1478 else:
1479 atexit.register(warnings.warn, "Couldn't run limited user tests ("
1480 "super-user privileges are required)", RuntimeWarning)
1481
1482 for test_class in tests:
1483 test_suite.addTest(unittest.makeSuite(test_class))
1484
1485 f = open(TESTFN, 'w')
1486 f.close()
1487 atexit.register(lambda: os.remove(TESTFN))
1488
1489 unittest.TextTestRunner(verbosity=2).run(test_suite)
1490 DEVNULL.close()
1491
1492 if __name__ == '__main__':
1493 test_main()
OLDNEW
« no previous file with comments | « third_party/psutil/test/test_memory_leaks.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698