Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(180)

Unified Diff: third_party/psutil/test/test_psutil.py

Issue 8159001: Update third_party/psutil and fix the licence issue with it. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: remove the suppression and unnecessary files. Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/psutil/test/test_memory_leaks.py ('k') | tools/checklicenses/checklicenses.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/psutil/test/test_psutil.py
diff --git a/third_party/psutil/test/test_psutil.py b/third_party/psutil/test/test_psutil.py
index c067dee07a7bc5dcaa82f0e4738ce439ce3c8feb..35ed7442888106df0de7511e210a60fa4e1895bf 100644
--- a/third_party/psutil/test/test_psutil.py
+++ b/third_party/psutil/test/test_psutil.py
@@ -1,13 +1,16 @@
#!/usr/bin/env python
#
-# $Id: test_psutil.py 806 2010-11-12 23:09:35Z g.rodola $
+# $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $
#
+# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
-"""psutil test suite.
-Note: this is targeted for python 2.x.
-To run it under python 3.x you need to use 2to3 tool first:
+"""
+psutil test suite.
-$ 2to3 -w test/*.py
+Note: this is targeted for both python 2.x and 3.x so there's no need
+to use 2to3 tool first.
"""
import unittest
@@ -23,13 +26,16 @@ import warnings
import atexit
import errno
import threading
+import tempfile
+import collections
import psutil
PYTHON = os.path.realpath(sys.executable)
DEVNULL = open(os.devnull, 'r+')
-
+TESTFN = os.path.join(os.getcwd(), "$testfile")
+PY3 = sys.version_info >= (3,)
POSIX = os.name == 'posix'
LINUX = sys.platform.lower().startswith("linux")
WINDOWS = sys.platform.lower().startswith("win32")
@@ -38,13 +44,20 @@ BSD = sys.platform.lower().startswith("freebsd")
try:
psutil.Process(os.getpid()).get_connections()
-except NotImplementedError, err:
+except NotImplementedError:
+ err = sys.exc_info()[1]
SUPPORT_CONNECTIONS = False
atexit.register(warnings.warn, "get_connections() not supported on this platform",
RuntimeWarning)
else:
SUPPORT_CONNECTIONS = True
+if PY3:
+ long = int
+
+ def callable(attr):
+ return isinstance(attr, collections.Callable)
+
_subprocesses_started = set()
@@ -59,6 +72,21 @@ def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None):
_subprocesses_started.add(sproc.pid)
return sproc
+def sh(cmdline):
+ """run cmd in a subprocess and return its output.
+ raises RuntimeError on error.
+ """
+ p = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ if p.returncode != 0:
+ raise RuntimeError(stderr)
+ if stderr:
+ warnings.warn(stderr, RuntimeWarning)
+ if sys.version_info >= (3,):
+ stdout = str(stdout, sys.stdout.encoding)
+ return stdout.strip()
+
def wait_for_pid(pid, timeout=1):
"""Wait for pid to show up in the process list then return.
Used in the test suite to give time the sub process to initialize.
@@ -73,38 +101,11 @@ def wait_for_pid(pid, timeout=1):
if time.time() >= raise_at:
raise RuntimeError("Timed out")
-def kill(pid):
- """Kill a process given its PID."""
- if hasattr(os, 'kill'):
- os.kill(pid, signal.SIGKILL)
- else:
- psutil.Process(pid).kill()
-
def reap_children(search_all=False):
"""Kill any subprocess started by this test suite and ensure that
no zombies stick around to hog resources and create problems when
looking for refleaks.
"""
- if POSIX:
- def waitpid(process):
- # on posix we are free to wait for any pending process by
- # passing -1 to os.waitpid()
- while True:
- try:
- any_process = -1
- pid, status = os.waitpid(any_process, os.WNOHANG)
- if pid == 0 and not process.is_running():
- break
- except OSError:
- if not process.is_running():
- break
- else:
- def waitpid(process):
- # on non-posix systems we just wait for the given process
- # to go away
- while process.is_running():
- time.sleep(0.01)
-
if search_all:
this_process = psutil.Process(os.getpid())
pids = [x.pid for x in this_process.get_children()]
@@ -118,7 +119,8 @@ def reap_children(search_all=False):
except psutil.NoSuchProcess:
pass
else:
- waitpid(child)
+ child.wait()
+
# we want to search through all processes before exiting
atexit.register(reap_children, search_all=True)
@@ -155,90 +157,123 @@ def skipUnless(condition, reason="", warn=False):
return skipIf(True, reason, warn)
return skipIf(False)
+def ignore_access_denied(fun):
+ """Decorator to Ignore AccessDenied exceptions."""
+ def outer(fun, *args, **kwargs):
+ def inner(self):
+ try:
+ return fun(self, *args, **kwargs)
+ except psutil.AccessDenied:
+ pass
+ return inner
+ return outer
+
+def supports_ipv6():
+ """Return True if IPv6 is supported on this platform."""
+ if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
+ return False
+ sock = None
+ try:
+ try:
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ sock.bind(("::1", 0))
+ except (socket.error, socket.gaierror):
+ return False
+ else:
+ return True
+ finally:
+ if sock is not None:
+ sock.close()
+
+
+class ThreadTask(threading.Thread):
+ """A thread object used for running process thread tests."""
+
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self._running = False
+ self._interval = None
+ self._flag = threading.Event()
+
+ def __repr__(self):
+ name = self.__class__.__name__
+ return '<%s running=%s at %#x>' % (name, self._running, id(self))
+
+ def start(self, interval=0.001):
+ """Start thread and keep it running until an explicit
+ stop() request. Polls for shutdown every 'timeout' seconds.
+ """
+ if self._running:
+ raise ValueError("already started")
+ self._interval = interval
+ threading.Thread.start(self)
+ self._flag.wait()
+
+ def run(self):
+ self._running = True
+ self._flag.set()
+ while self._running:
+ time.sleep(self._interval)
+
+ def stop(self):
+ """Stop thread execution and and waits until it is stopped."""
+ if not self._running:
+ raise ValueError("already stopped")
+ self._running = False
+ self.join()
+
class TestCase(unittest.TestCase):
def tearDown(self):
reap_children()
+ # ============================
+ # tests for system-related API
+ # ============================
+
def test_get_process_list(self):
pids = [x.pid for x in psutil.get_process_list()]
self.assertTrue(os.getpid() in pids)
- self.assertTrue(0 in pids)
def test_process_iter(self):
pids = [x.pid for x in psutil.process_iter()]
self.assertTrue(os.getpid() in pids)
- self.assertTrue(0 in pids)
-
- def test_kill(self):
- sproc = get_test_subprocess()
- test_pid = sproc.pid
- wait_for_pid(test_pid)
- p = psutil.Process(test_pid)
- name = p.name
- p.kill()
- sproc.wait()
- self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
-
- def test_terminate(self):
- sproc = get_test_subprocess()
- test_pid = sproc.pid
- wait_for_pid(test_pid)
- p = psutil.Process(test_pid)
- name = p.name
- p.terminate()
- sproc.wait()
- self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
-
- def test_send_signal(self):
- if POSIX:
- sig = signal.SIGKILL
- else:
- sig = signal.SIGTERM
- sproc = get_test_subprocess()
- test_pid = sproc.pid
- p = psutil.Process(test_pid)
- name = p.name
- p.send_signal(sig)
- sproc.wait()
- self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
def test_TOTAL_PHYMEM(self):
x = psutil.TOTAL_PHYMEM
self.assertTrue(isinstance(x, (int, long)))
self.assertTrue(x > 0)
- def test_used_phymem(self):
- x = psutil.used_phymem()
- self.assertTrue(isinstance(x, (int, long)))
- self.assertTrue(x > 0)
-
- def test_avail_phymem(self):
- x = psutil.avail_phymem()
- self.assertTrue(isinstance(x, (int, long)))
+ def test_BOOT_TIME(self):
+ x = psutil.BOOT_TIME
+ self.assertTrue(isinstance(x, float))
self.assertTrue(x > 0)
- def test_total_virtmem(self):
- x = psutil.total_virtmem()
- self.assertTrue(isinstance(x, (int, long)))
- self.assertTrue(x >= 0)
-
- def test_used_virtmem(self):
- x = psutil.used_virtmem()
- self.assertTrue(isinstance(x, (int, long)))
- self.assertTrue(x >= 0)
+ def test_deprecated_memory_functions(self):
+ warnings.filterwarnings("error")
+ try:
+ self.assertRaises(DeprecationWarning, psutil.used_phymem)
+ self.assertRaises(DeprecationWarning, psutil.avail_phymem)
+ self.assertRaises(DeprecationWarning, psutil.total_virtmem)
+ self.assertRaises(DeprecationWarning, psutil.used_virtmem)
+ self.assertRaises(DeprecationWarning, psutil.avail_virtmem)
+ finally:
+ warnings.resetwarnings()
- def test_avail_virtmem(self):
- x = psutil.avail_virtmem()
- self.assertTrue(isinstance(x, (int, long)))
- self.assertTrue(x >= 0)
+ def test_phymem_usage(self):
+ mem = psutil.phymem_usage()
+ self.assertTrue(mem.total > 0)
+ self.assertTrue(mem.used > 0)
+ self.assertTrue(mem.free > 0)
+ self.assertTrue(0 <= mem.percent <= 100)
- @skipUnless(LINUX)
- def test_cached_phymem(self):
- x = psutil.cached_phymem()
- self.assertTrue(isinstance(x, (int, long)))
- self.assertTrue(x >= 0)
+ def test_virtmem_usage(self):
+ mem = psutil.virtmem_usage()
+ self.assertTrue(mem.total > 0)
+ self.assertTrue(mem.used >= 0)
+ self.assertTrue(mem.free > 0)
+ self.assertTrue(0 <= mem.percent <= 100)
@skipUnless(LINUX)
def test_phymem_buffers(self):
@@ -246,20 +281,60 @@ class TestCase(unittest.TestCase):
self.assertTrue(isinstance(x, (int, long)))
self.assertTrue(x >= 0)
- def test_system_cpu_times(self):
+ def test_pid_exists(self):
+ sproc = get_test_subprocess()
+ wait_for_pid(sproc.pid)
+ self.assertTrue(psutil.pid_exists(sproc.pid))
+ p = psutil.Process(sproc.pid)
+ p.kill()
+ p.wait()
+ self.assertFalse(psutil.pid_exists(sproc.pid))
+ self.assertFalse(psutil.pid_exists(-1))
+
+ def test_pid_exists_2(self):
+ reap_children()
+ pids = psutil.get_pid_list()
+ for pid in pids:
+ try:
+ self.assertTrue(psutil.pid_exists(pid))
+ except AssertionError:
+ # in case the process disappeared in meantime fail only
+ # if it is no longer in get_pid_list()
+ time.sleep(.1)
+ if pid in psutil.get_pid_list():
+ self.fail(pid)
+ pids = range(max(pids) + 5000, max(pids) + 6000)
+ for pid in pids:
+ self.assertFalse(psutil.pid_exists(pid))
+
+ def test_get_pid_list(self):
+ plist = [x.pid for x in psutil.get_process_list()]
+ pidlist = psutil.get_pid_list()
+ self.assertEqual(plist.sort(), pidlist.sort())
+ # make sure every pid is unique
+ self.assertEqual(len(pidlist), len(set(pidlist)))
+
+ def test_test(self):
+ # test for psutil.test() function
+ stdout = sys.stdout
+ sys.stdout = DEVNULL
+ try:
+ psutil.test()
+ finally:
+ sys.stdout = stdout
+
+ def test_sys_cpu_times(self):
total = 0
times = psutil.cpu_times()
- self.assertTrue(isinstance(times, psutil.CPUTimes))
sum(times)
for cp_time in times:
self.assertTrue(isinstance(cp_time, float))
self.assertTrue(cp_time >= 0.0)
total += cp_time
- # test CPUTimes's __iter__ and __str__ implementation
self.assertEqual(total, sum(times))
str(times)
- def test_system_cpu_times2(self):
+ def test_sys_cpu_times2(self):
t1 = sum(psutil.cpu_times())
time.sleep(0.1)
t2 = sum(psutil.cpu_times())
@@ -267,26 +342,283 @@ class TestCase(unittest.TestCase):
if not difference >= 0.05:
self.fail("difference %s" % difference)
- def test_system_cpu_percent(self):
+ def test_sys_per_cpu_times(self):
+ for times in psutil.cpu_times(percpu=True):
+ total = 0
+ sum(times)
+ for cp_time in times:
+ self.assertTrue(isinstance(cp_time, float))
+ self.assertTrue(cp_time >= 0.0)
+ total += cp_time
+ self.assertEqual(total, sum(times))
+ str(times)
+
+ def test_sys_per_cpu_times2(self):
+ tot1 = psutil.cpu_times(percpu=True)
+ stop_at = time.time() + 0.1
+ while 1:
+ if time.time() >= stop_at:
+ break
+ tot2 = psutil.cpu_times(percpu=True)
+ for t1, t2 in zip(tot1, tot2):
+ t1, t2 = sum(t1), sum(t2)
+ difference = t2 - t1
+ if difference >= 0.05:
+ return
+ self.fail()
+
+ def test_sys_cpu_percent(self):
psutil.cpu_percent(interval=0.001)
psutil.cpu_percent(interval=0.001)
- for x in xrange(1000):
+ for x in range(1000):
percent = psutil.cpu_percent(interval=None)
self.assertTrue(isinstance(percent, float))
self.assertTrue(percent >= 0.0)
self.assertTrue(percent <= 100.0)
- def test_process_cpu_percent(self):
+ def test_sys_per_cpu_percent(self):
+ psutil.cpu_percent(interval=0.001, percpu=True)
+ psutil.cpu_percent(interval=0.001, percpu=True)
+ for x in range(1000):
+ percents = psutil.cpu_percent(interval=None, percpu=True)
+ for percent in percents:
+ self.assertTrue(isinstance(percent, float))
+ self.assertTrue(percent >= 0.0)
+ self.assertTrue(percent <= 100.0)
+
+ def test_sys_cpu_percent_compare(self):
+ psutil.cpu_percent(interval=0)
+ psutil.cpu_percent(interval=0, percpu=True)
+ time.sleep(.1)
+ t1 = psutil.cpu_percent(interval=0)
+ t2 = psutil.cpu_percent(interval=0, percpu=True)
+ # calculate total average
+ t2 = sum(t2) / len(t2)
+ if abs(t1 - t2) > 5:
+ self.assertEqual(t1, t2)
+
+ def test_disk_usage(self):
+ usage = psutil.disk_usage(os.getcwd())
+ self.assertTrue(usage.total > 0)
+ self.assertTrue(usage.used > 0)
+ self.assertTrue(usage.free > 0)
+ self.assertTrue(usage.total > usage.used)
+ self.assertTrue(usage.total > usage.free)
+ self.assertTrue(0 <= usage.percent <= 100)
+
+ # if path does not exist OSError ENOENT is expected across
+ # all platforms
+ fname = tempfile.mktemp()
+ try:
+ psutil.disk_usage(fname)
+ except OSError:
+ err = sys.exc_info()[1]
+ if err.args[0] != errno.ENOENT:
+ raise
+ else:
+ self.fail("OSError not raised")
+
+ def test_disk_partitions(self):
+ for disk in psutil.disk_partitions(all=False):
+ self.assertTrue(os.path.exists(disk.device))
+ self.assertTrue(os.path.isdir(disk.mountpoint))
+ self.assertTrue(disk.fstype)
+ for disk in psutil.disk_partitions(all=True):
+ if not WINDOWS:
+ self.assertTrue(os.path.isdir(disk.mountpoint))
+ self.assertTrue(disk.fstype)
+
+ def find_mount_point(path):
+ path = os.path.abspath(path)
+ while not os.path.ismount(path):
+ path = os.path.dirname(path)
+ return path
+
+ mount = find_mount_point(__file__)
+ mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
+ self.assertTrue(mount in mounts)
+ psutil.disk_usage(mount)
+
+ # XXX
+ @skipUnless(hasattr(psutil, "network_io_counters"))
+ def test_anetwork_io_counters(self):
+ def check_ntuple(nt):
+ self.assertEqual(nt[0], nt.bytes_sent)
+ self.assertEqual(nt[1], nt.bytes_recv)
+ self.assertEqual(nt[2], nt.packets_sent)
+ self.assertEqual(nt[3], nt.packets_recv)
+ self.assertTrue(nt.bytes_sent >= 0)
+ self.assertTrue(nt.bytes_recv >= 0)
+ self.assertTrue(nt.packets_sent >= 0)
+ self.assertTrue(nt.packets_recv >= 0)
+
+ ret = psutil.network_io_counters(pernic=False)
+ check_ntuple(ret)
+ ret = psutil.network_io_counters(pernic=True)
+ for name, ntuple in ret.iteritems():
+ self.assertTrue(name)
+ check_ntuple(ntuple)
+ # XXX
+ @skipUnless(hasattr(psutil, "disk_io_counters"))
+ def test_disk_io_counters(self):
+ def check_ntuple(nt):
+ self.assertEqual(nt[0], nt.read_count)
+ self.assertEqual(nt[1], nt.write_count)
+ self.assertEqual(nt[2], nt.read_bytes)
+ self.assertEqual(nt[3], nt.write_bytes)
+ self.assertEqual(nt[4], nt.read_time)
+ self.assertEqual(nt[5], nt.write_time)
+ self.assertTrue(nt.read_count >= 0)
+ self.assertTrue(nt.write_count >= 0)
+ self.assertTrue(nt.read_bytes >= 0)
+ self.assertTrue(nt.write_bytes >= 0)
+ self.assertTrue(nt.read_time >= 0)
+ self.assertTrue(nt.write_time >= 0)
+
+ ret = psutil.disk_io_counters(perdisk=False)
+ check_ntuple(ret)
+ ret = psutil.disk_io_counters(perdisk=True)
+ for name, ntuple in ret.iteritems():
+ self.assertTrue(name)
+ check_ntuple(ntuple)
+
+ # ====================
+ # Process object tests
+ # ====================
+
+ def test_kill(self):
+ sproc = get_test_subprocess()
+ test_pid = sproc.pid
+ wait_for_pid(test_pid)
+ p = psutil.Process(test_pid)
+ name = p.name
+ p.kill()
+ p.wait()
+ self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
+
+ def test_terminate(self):
+ sproc = get_test_subprocess()
+ test_pid = sproc.pid
+ wait_for_pid(test_pid)
+ p = psutil.Process(test_pid)
+ name = p.name
+ p.terminate()
+ p.wait()
+ self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
+
+ def test_send_signal(self):
+ if POSIX:
+ sig = signal.SIGKILL
+ else:
+ sig = signal.SIGTERM
+ sproc = get_test_subprocess()
+ test_pid = sproc.pid
+ p = psutil.Process(test_pid)
+ name = p.name
+ p.send_signal(sig)
+ p.wait()
+ self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
+
+ def test_wait(self):
+ # check exit code signal
+ sproc = get_test_subprocess()
+ p = psutil.Process(sproc.pid)
+ p.kill()
+ code = p.wait()
+ if os.name == 'posix':
+ self.assertEqual(code, signal.SIGKILL)
+ else:
+ self.assertEqual(code, 0)
+ self.assertFalse(p.is_running())
+
+ sproc = get_test_subprocess()
+ p = psutil.Process(sproc.pid)
+ p.terminate()
+ code = p.wait()
+ if os.name == 'posix':
+ self.assertEqual(code, signal.SIGTERM)
+ else:
+ self.assertEqual(code, 0)
+ self.assertFalse(p.is_running())
+
+ # check sys.exit() code
+ code = "import time, sys; time.sleep(0.01); sys.exit(5);"
+ sproc = get_test_subprocess([PYTHON, "-c", code])
+ p = psutil.Process(sproc.pid)
+ self.assertEqual(p.wait(), 5)
+ self.assertFalse(p.is_running())
+
+ # Test wait() issued twice.
+ # It is not supposed to raise NSP when the process is gone.
+ # On UNIX this should return None, on Windows it should keep
+ # returning the exit code.
+ sproc = get_test_subprocess([PYTHON, "-c", code])
+ p = psutil.Process(sproc.pid)
+ self.assertEqual(p.wait(), 5)
+ self.assertTrue(p.wait() in (5, None))
+
+ # test timeout
+ sproc = get_test_subprocess()
+ p = psutil.Process(sproc.pid)
+ p.name
+ self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
+
+ # timeout < 0 not allowed
+ self.assertRaises(ValueError, p.wait, -1)
+
+ @skipUnless(POSIX)
+ def test_wait_non_children(self):
+ # test wait() against processes which are not our children
+ code = "import sys;"
+ code += "from subprocess import Popen, PIPE;"
+ code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON
+ code += "sp = Popen(cmd, stdout=PIPE);"
+ code += "sys.stdout.write(str(sp.pid));"
+ sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE)
+
+ grandson_pid = int(sproc.stdout.read())
+ grandson_proc = psutil.Process(grandson_pid)
+ try:
+ self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
+ grandson_proc.kill()
+ ret = grandson_proc.wait()
+ self.assertEqual(ret, None)
+ finally:
+ if grandson_proc.is_running():
+ grandson_proc.kill()
+ grandson_proc.wait()
+
+ def test_wait_timeout_0(self):
+ sproc = get_test_subprocess()
+ p = psutil.Process(sproc.pid)
+ self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
+ p.kill()
+ stop_at = time.time() + 2
+ while 1:
+ try:
+ code = p.wait(0)
+ except psutil.TimeoutExpired:
+ if time.time() >= stop_at:
+ raise
+ else:
+ break
+ if os.name == 'posix':
+ self.assertEqual(code, signal.SIGKILL)
+ else:
+ self.assertEqual(code, 0)
+ self.assertFalse(p.is_running())
+
+ def test_cpu_percent(self):
p = psutil.Process(os.getpid())
p.get_cpu_percent(interval=0.001)
p.get_cpu_percent(interval=0.001)
- for x in xrange(100):
+ for x in range(100):
percent = p.get_cpu_percent(interval=None)
self.assertTrue(isinstance(percent, float))
self.assertTrue(percent >= 0.0)
self.assertTrue(percent <= 100.0)
- def test_get_process_cpu_times(self):
+ def test_cpu_times(self):
times = psutil.Process(os.getpid()).get_cpu_times()
self.assertTrue((times.user > 0.0) or (times.system > 0.0))
# make sure returned values can be pretty printed with strftime
@@ -300,7 +632,7 @@ class TestCase(unittest.TestCase):
# try this with Python 2.7 and re-enable the test.
@skipUnless(sys.version_info > (2, 6, 1) and not OSX)
- def test_get_process_cpu_times2(self):
+ def test_cpu_times2(self):
user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
utime, ktime = os.times()[:2]
@@ -331,20 +663,113 @@ class TestCase(unittest.TestCase):
# make sure returned value can be pretty printed with strftime
time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
- def test_get_num_threads(self):
+ @skipIf(WINDOWS)
+ def test_terminal(self):
+ tty = sh('tty')
p = psutil.Process(os.getpid())
- numt1 = p.get_num_threads()
- if not WINDOWS and not OSX:
- # test is unreliable on Windows and OS X
- # NOTE: sleep(1) is too long for OS X, works with sleep(.5)
- self.assertEqual(numt1, 1)
- t = threading.Thread(target=lambda:time.sleep(1))
- t.start()
- numt2 = p.get_num_threads()
- if WINDOWS:
- self.assertTrue(numt2 > numt1)
+ self.assertEqual(p.terminal, tty)
+
+ @skipIf(OSX, warn=False)
+ def test_get_io_counters(self):
+ p = psutil.Process(os.getpid())
+ # test reads
+ io1 = p.get_io_counters()
+ f = open(PYTHON, 'rb')
+ f.read()
+ f.close()
+ io2 = p.get_io_counters()
+ if not BSD:
+ self.assertTrue(io2.read_count > io1.read_count)
+ self.assertTrue(io2.write_count == io1.write_count)
+ self.assertTrue(io2.read_bytes >= io1.read_bytes)
+ self.assertTrue(io2.write_bytes >= io1.write_bytes)
+ # test writes
+ io1 = p.get_io_counters()
+ f = tempfile.TemporaryFile()
+ if sys.version_info >= (3,):
+ f.write(bytes("x" * 1000000, 'ascii'))
else:
- self.assertEqual(numt2, 2)
+ f.write("x" * 1000000)
+ f.close()
+ io2 = p.get_io_counters()
+ if not BSD:
+ self.assertTrue(io2.write_count > io1.write_count)
+ self.assertTrue(io2.write_bytes > io1.write_bytes)
+ self.assertTrue(io2.read_count >= io1.read_count)
+ self.assertTrue(io2.read_bytes >= io1.read_bytes)
+
+ @skipUnless(LINUX)
+ def test_get_set_ionice(self):
+ from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE,
+ IOPRIO_CLASS_IDLE)
+ self.assertEqual(IOPRIO_CLASS_NONE, 0)
+ self.assertEqual(IOPRIO_CLASS_RT, 1)
+ self.assertEqual(IOPRIO_CLASS_BE, 2)
+ self.assertEqual(IOPRIO_CLASS_IDLE, 3)
+ p = psutil.Process(os.getpid())
+ try:
+ p.set_ionice(2)
+ ioclass, value = p.get_ionice()
+ self.assertEqual(ioclass, 2)
+ self.assertEqual(value, 4)
+ #
+ p.set_ionice(3)
+ ioclass, value = p.get_ionice()
+ self.assertEqual(ioclass, 3)
+ self.assertEqual(value, 0)
+ #
+ p.set_ionice(2, 0)
+ ioclass, value = p.get_ionice()
+ self.assertEqual(ioclass, 2)
+ self.assertEqual(value, 0)
+ p.set_ionice(2, 7)
+ ioclass, value = p.get_ionice()
+ self.assertEqual(ioclass, 2)
+ self.assertEqual(value, 7)
+ self.assertRaises(ValueError, p.set_ionice, 2, 10)
+ finally:
+ p.set_ionice(IOPRIO_CLASS_NONE)
+
+ def test_get_num_threads(self):
+ # on certain platforms such as Linux we might test for exact
+ # thread number, since we always have with 1 thread per process,
+ # but this does not apply across all platforms (OSX, Windows)
+ p = psutil.Process(os.getpid())
+ step1 = p.get_num_threads()
+
+ thread = ThreadTask()
+ thread.start()
+ try:
+ step2 = p.get_num_threads()
+ self.assertEqual(step2, step1 + 1)
+ thread.stop()
+ finally:
+ if thread._running:
+ thread.stop()
+
+ def test_get_threads(self):
+ p = psutil.Process(os.getpid())
+ step1 = p.get_threads()
+
+ thread = ThreadTask()
+ thread.start()
+
+ try:
+ step2 = p.get_threads()
+ self.assertEqual(len(step2), len(step1) + 1)
+ # on Linux, first thread id is supposed to be this process
+ if LINUX:
+ self.assertEqual(step2[0].id, os.getpid())
+ athread = step2[0]
+ # test named tuple
+ self.assertEqual(athread.id, athread[0])
+ self.assertEqual(athread.user_time, athread[1])
+ self.assertEqual(athread.system_time, athread[2])
+ # test num threads
+ thread.stop()
+ finally:
+ if thread._running:
+ thread.stop()
def test_get_memory_info(self):
p = psutil.Process(os.getpid())
@@ -374,33 +799,30 @@ class TestCase(unittest.TestCase):
sproc = get_test_subprocess()
self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
- def test_eq(self):
- sproc = get_test_subprocess()
- wait_for_pid(sproc.pid)
- self.assertTrue(psutil.Process(sproc.pid) == psutil.Process(sproc.pid))
-
def test_is_running(self):
sproc = get_test_subprocess()
wait_for_pid(sproc.pid)
p = psutil.Process(sproc.pid)
self.assertTrue(p.is_running())
- psutil.Process(sproc.pid).kill()
- sproc.wait()
+ p.kill()
+ p.wait()
self.assertFalse(p.is_running())
- def test_pid_exists(self):
- sproc = get_test_subprocess()
- wait_for_pid(sproc.pid)
- self.assertTrue(psutil.pid_exists(sproc.pid))
- psutil.Process(sproc.pid).kill()
- sproc.wait()
- self.assertFalse(psutil.pid_exists(sproc.pid))
- self.assertFalse(psutil.pid_exists(-1))
-
def test_exe(self):
sproc = get_test_subprocess()
wait_for_pid(sproc.pid)
- self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON)
+ try:
+ self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON)
+ except AssertionError:
+ # certain platforms such as BSD are more accurate returning:
+ # "/usr/local/bin/python2.7"
+ # ...instead of:
+ # "/usr/local/bin/python"
+ # We do not want to consider this difference in accuracy
+ # an error.
+ name = psutil.Process(sproc.pid).exe
+ adjusted_name = PYTHON[:len(name)]
+ self.assertEqual(name, adjusted_name)
for p in psutil.process_iter():
try:
exe = p.exe
@@ -416,14 +838,6 @@ class TestCase(unittest.TestCase):
self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s)" \
% (repr(p.exe), p.pid, p.name, p.cmdline))
- def test_path(self):
- proc = psutil.Process(os.getpid())
- warnings.filterwarnings("error")
- try:
- self.assertRaises(DeprecationWarning, getattr, proc, 'path')
- finally:
- warnings.resetwarnings()
-
def test_cmdline(self):
sproc = get_test_subprocess([PYTHON, "-E"])
wait_for_pid(sproc.pid)
@@ -432,40 +846,82 @@ class TestCase(unittest.TestCase):
def test_name(self):
sproc = get_test_subprocess(PYTHON)
wait_for_pid(sproc.pid)
- if OSX:
- self.assertEqual(psutil.Process(sproc.pid).name, "Python")
- else:
- self.assertEqual(psutil.Process(sproc.pid).name, os.path.basename(PYTHON))
-
- def test_uid(self):
- sproc = get_test_subprocess()
- wait_for_pid(sproc.pid)
- uid = psutil.Process(sproc.pid).uid
- if hasattr(os, 'getuid'):
- self.assertEqual(uid, os.getuid())
- else:
- # On those platforms where UID doesn't make sense (Windows)
- # we expect it to be -1
- self.assertEqual(uid, -1)
-
- def test_gid(self):
- sproc = get_test_subprocess()
- wait_for_pid(sproc.pid)
- gid = psutil.Process(sproc.pid).gid
- if hasattr(os, 'getgid'):
- self.assertEqual(gid, os.getgid())
- else:
- # On those platforms where GID doesn't make sense (Windows)
- # we expect it to be -1
- self.assertEqual(gid, -1)
+ self.assertEqual(psutil.Process(sproc.pid).name.lower(),
+ os.path.basename(sys.executable).lower())
+
+ if os.name == 'posix':
+
+ def test_uids(self):
+ p = psutil.Process(os.getpid())
+ real, effective, saved = p.uids
+ # os.getuid() refers to "real" uid
+ self.assertEqual(real, os.getuid())
+ # os.geteuid() refers to "effective" uid
+ self.assertEqual(effective, os.geteuid())
+ # no such thing as os.getsuid() ("saved" uid), but starting
+ # from python 2.7 we have os.getresuid()[2]
+ if hasattr(os, "getresuid"):
+ self.assertEqual(saved, os.getresuid()[2])
+
+ def test_gids(self):
+ p = psutil.Process(os.getpid())
+ real, effective, saved = p.gids
+ # os.getuid() refers to "real" uid
+ self.assertEqual(real, os.getgid())
+ # os.geteuid() refers to "effective" uid
+ self.assertEqual(effective, os.getegid())
+ # no such thing as os.getsuid() ("saved" uid), but starting
+ # from python 2.7 we have os.getresgid()[2]
+ if hasattr(os, "getresuid"):
+ self.assertEqual(saved, os.getresgid()[2])
+
+ def test_nice(self):
+ p = psutil.Process(os.getpid())
+ self.assertRaises(TypeError, setattr, p, "nice", "str")
+ try:
+ try:
+ first_nice = p.nice
+ p.nice = 1
+ self.assertEqual(p.nice, 1)
+ # going back to previous nice value raises AccessDenied on OSX
+ if not OSX:
+ p.nice = 0
+ self.assertEqual(p.nice, 0)
+ except psutil.AccessDenied:
+ pass
+ finally:
+ # going back to previous nice value raises AccessDenied on OSX
+ if not OSX:
+ p.nice = first_nice
+
+ if os.name == 'nt':
+
+ def test_nice(self):
+ p = psutil.Process(os.getpid())
+ self.assertRaises(TypeError, setattr, p, "nice", "str")
+ try:
+ self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS)
+ p.nice = psutil.HIGH_PRIORITY_CLASS
+ self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS)
+ p.nice = psutil.NORMAL_PRIORITY_CLASS
+ self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS)
+ finally:
+ p.nice = psutil.NORMAL_PRIORITY_CLASS
+
+ def test_status(self):
+ p = psutil.Process(os.getpid())
+ self.assertEqual(p.status, psutil.STATUS_RUNNING)
+ self.assertEqual(str(p.status), "running")
+ for p in psutil.process_iter():
+ if str(p.status) == '?':
+ self.fail("invalid status for pid %d" % p.pid)
def test_username(self):
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
if POSIX:
import pwd
- user = pwd.getpwuid(p.uid).pw_name
- self.assertEqual(p.username, user)
+ self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name)
elif WINDOWS:
expected_username = os.environ['USERNAME']
expected_domain = os.environ['USERDOMAIN']
@@ -493,27 +949,31 @@ class TestCase(unittest.TestCase):
self.assertEqual(p.getcwd(), expected_dir)
def test_get_open_files(self):
- thisfile = os.path.join(os.getcwd(), __file__)
-
# current process
p = psutil.Process(os.getpid())
files = p.get_open_files()
- self.assertFalse(thisfile in files)
- f = open(thisfile, 'r')
+ self.assertFalse(TESTFN in files)
+ f = open(TESTFN, 'r')
+ time.sleep(.1)
filenames = [x.path for x in p.get_open_files()]
- self.assertTrue(thisfile in filenames)
+ self.assertTrue(TESTFN in filenames)
f.close()
for file in filenames:
self.assertTrue(os.path.isfile(file))
# another process
- cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % thisfile
+ cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN
sproc = get_test_subprocess([PYTHON, "-c", cmdline])
wait_for_pid(sproc.pid)
time.sleep(0.1)
p = psutil.Process(sproc.pid)
- filenames = [x.path for x in p.get_open_files()]
- self.assertTrue(thisfile in filenames)
+ for x in range(100):
+ filenames = [x.path for x in p.get_open_files()]
+ if TESTFN in filenames:
+ break
+ time.sleep(.01)
+ else:
+ self.assertTrue(TESTFN in filenames)
for file in filenames:
self.assertTrue(os.path.isfile(file))
# all processes
@@ -528,7 +988,7 @@ class TestCase(unittest.TestCase):
def test_get_open_files2(self):
# test fd and path fields
- fileobj = open(os.path.join(os.getcwd(), __file__), 'r')
+ fileobj = open(TESTFN, 'r')
p = psutil.Process(os.getpid())
for path, fd in p.get_open_files():
if path == fileobj.name or fd == fileobj.fileno():
@@ -557,10 +1017,13 @@ class TestCase(unittest.TestCase):
"conn, addr = s.accept();" \
"time.sleep(100);"
sproc = get_test_subprocess([PYTHON, "-c", arg])
- time.sleep(0.1)
p = psutil.Process(sproc.pid)
- cons = p.get_connections()
- self.assertTrue(len(cons) == 1)
+ for x in range(100):
+ cons = p.get_connections()
+ if cons:
+ break
+ time.sleep(.01)
+ self.assertEqual(len(cons), 1)
con = cons[0]
self.assertEqual(con.family, socket.AF_INET)
self.assertEqual(con.type, socket.SOCK_STREAM)
@@ -580,10 +1043,20 @@ class TestCase(unittest.TestCase):
self.assertEqual(con[4], con.remote_address)
self.assertEqual(con[5], con.status)
+ @skipUnless(supports_ipv6())
+ def test_get_connections_ipv6(self):
+ s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ s.bind(('::1', 0))
+ s.listen(1)
+ cons = psutil.Process(os.getpid()).get_connections()
+ s.close()
+ self.assertEqual(len(cons), 1)
+ self.assertEqual(cons[0].local_address[0], '::1')
+
@skipUnless(hasattr(socket, "fromfd") and not WINDOWS)
def test_connection_fromfd(self):
sock = socket.socket()
- sock.bind(('127.0.0.1', 0))
+ sock.bind(('localhost', 0))
sock.listen(1)
p = psutil.Process(os.getpid())
for conn in p.get_connections():
@@ -609,24 +1082,12 @@ class TestCase(unittest.TestCase):
ip, port = addr
self.assertTrue(isinstance(port, int))
if family == socket.AF_INET:
- ip = map(int, ip.split('.'))
+ ip = list(map(int, ip.split('.')))
self.assertTrue(len(ip) == 4)
for num in ip:
self.assertTrue(0 <= num <= 255)
self.assertTrue(0 <= port <= 65535)
- def supports_ipv6():
- if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
- return False
- try:
- sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- sock.bind(("::1", 0))
- except (socket.error, socket.gaierror):
- return False
- else:
- sock.close()
- return True
-
# all values are supposed to match Linux's tcp_states.h states
# table across all platforms.
valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
@@ -665,9 +1126,9 @@ class TestCase(unittest.TestCase):
tcp6_proc = None
udp6_proc = None
+ # --- check connections of all processes
+
time.sleep(0.1)
- this_proc = psutil.Process(os.getpid())
- children_pids = [p.pid for p in this_proc.get_children()]
for p in psutil.process_iter():
try:
cons = p.get_connections()
@@ -693,49 +1154,58 @@ class TestCase(unittest.TestCase):
s.close()
if not WINDOWS and hasattr(socket, 'fromfd'):
+ dupsock = None
try:
- dupsock = socket.fromfd(conn.fd, conn.family,
- conn.type)
- except (socket.error, OSError), err:
- if err.args[0] == errno.EBADF:
- continue
- else:
- raise
- self.assertEqual(dupsock.family, conn.family)
- self.assertEqual(dupsock.type, conn.type)
-
- # check matches against subprocesses
- if p.pid in children_pids:
- self.assertTrue(len(cons) == 1)
- conn = cons[0]
- # TCP v4
- if p.pid == tcp4_proc.pid:
- self.assertEqual(conn.family, socket.AF_INET)
- self.assertEqual(conn.type, socket.SOCK_STREAM)
- self.assertEqual(conn.local_address[0], "127.0.0.1")
- self.assertEqual(conn.remote_address, ())
- self.assertEqual(conn.status, "LISTEN")
- # UDP v4
- elif p.pid == udp4_proc.pid:
- self.assertEqual(conn.family, socket.AF_INET)
- self.assertEqual(conn.type, socket.SOCK_DGRAM)
- self.assertEqual(conn.local_address[0], "127.0.0.1")
- self.assertEqual(conn.remote_address, ())
- self.assertEqual(conn.status, "")
- # TCP v6
- elif p.pid == getattr(tcp6_proc, "pid", None):
- self.assertEqual(conn.family, socket.AF_INET6)
- self.assertEqual(conn.type, socket.SOCK_STREAM)
- self.assertEqual(conn.local_address[0], "::1")
- self.assertEqual(conn.remote_address, ())
- self.assertEqual(conn.status, "LISTEN")
- # UDP v6
- elif p.pid == getattr(udp6_proc, "pid", None):
- self.assertEqual(conn.family, socket.AF_INET6)
- self.assertEqual(conn.type, socket.SOCK_DGRAM)
- self.assertEqual(conn.local_address[0], "::1")
- self.assertEqual(conn.remote_address, ())
- self.assertEqual(conn.status, "")
+ try:
+ dupsock = socket.fromfd(conn.fd, conn.family,
+ conn.type)
+ except (socket.error, OSError):
+ err = sys.exc_info()[1]
+ if err.args[0] == errno.EBADF:
+ continue
+ else:
+ raise
+ # python >= 2.5
+ if hasattr(dupsock, "family"):
+ self.assertEqual(dupsock.family, conn.family)
+ self.assertEqual(dupsock.type, conn.type)
+ finally:
+ if dupsock is not None:
+ dupsock.close()
+
+
+ # --- check matches against subprocesses
+
+ for p in psutil.Process(os.getpid()).get_children():
+ for conn in p.get_connections():
+ # TCP v4
+ if p.pid == tcp4_proc.pid:
+ self.assertEqual(conn.family, socket.AF_INET)
+ self.assertEqual(conn.type, socket.SOCK_STREAM)
+ self.assertEqual(conn.local_address[0], "127.0.0.1")
+ self.assertEqual(conn.remote_address, ())
+ self.assertEqual(conn.status, "LISTEN")
+ # UDP v4
+ elif p.pid == udp4_proc.pid:
+ self.assertEqual(conn.family, socket.AF_INET)
+ self.assertEqual(conn.type, socket.SOCK_DGRAM)
+ self.assertEqual(conn.local_address[0], "127.0.0.1")
+ self.assertEqual(conn.remote_address, ())
+ self.assertEqual(conn.status, "")
+ # TCP v6
+ elif p.pid == getattr(tcp6_proc, "pid", None):
+ self.assertEqual(conn.family, socket.AF_INET6)
+ self.assertEqual(conn.type, socket.SOCK_STREAM)
+ self.assertTrue(conn.local_address[0] in ("::", "::1"))
+ self.assertEqual(conn.remote_address, ())
+ self.assertEqual(conn.status, "LISTEN")
+ # UDP v6
+ elif p.pid == getattr(udp6_proc, "pid", None):
+ self.assertEqual(conn.family, socket.AF_INET6)
+ self.assertEqual(conn.type, socket.SOCK_DGRAM)
+ self.assertTrue(conn.local_address[0] in ("::", "::1"))
+ self.assertEqual(conn.remote_address, ())
+ self.assertEqual(conn.status, "")
def test_parent_ppid(self):
this_parent = os.getpid()
@@ -762,68 +1232,11 @@ class TestCase(unittest.TestCase):
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.suspend()
+ time.sleep(0.1)
+ self.assertEqual(p.status, psutil.STATUS_STOPPED)
+ self.assertEqual(str(p.status), "stopped")
p.resume()
-
- def test_get_pid_list(self):
- plist = [x.pid for x in psutil.get_process_list()]
- pidlist = psutil.get_pid_list()
- self.assertEqual(plist.sort(), pidlist.sort())
- # make sure every pid is unique
- self.assertEqual(len(pidlist), len(set(pidlist)))
-
- def test_test(self):
- # test for psutil.test() function
- stdout = sys.stdout
- sys.stdout = DEVNULL
- try:
- psutil.test()
- finally:
- sys.stdout = stdout
-
- def test_types(self):
- sproc = get_test_subprocess()
- wait_for_pid(sproc.pid)
- p = psutil.Process(sproc.pid)
- self.assert_(isinstance(p.pid, int))
- self.assert_(isinstance(p.ppid, int))
- self.assert_(isinstance(p.parent, psutil.Process))
- self.assert_(isinstance(p.name, str))
- if self.__class__.__name__ != "LimitedUserTestCase":
- self.assert_(isinstance(p.exe, str))
- self.assert_(isinstance(p.cmdline, list))
- self.assert_(isinstance(p.uid, int))
- self.assert_(isinstance(p.gid, int))
- self.assert_(isinstance(p.create_time, float))
- self.assert_(isinstance(p.username, (unicode, str)))
- if hasattr(p, 'getcwd'):
- if not POSIX and self.__class__.__name__ != "LimitedUserTestCase":
- self.assert_(isinstance(p.getcwd(), str))
- if not POSIX and self.__class__.__name__ != "LimitedUserTestCase":
- self.assert_(isinstance(p.get_open_files(), list))
- for path, fd in p.get_open_files():
- self.assert_(isinstance(path, (unicode, str)))
- self.assert_(isinstance(fd, int))
- if not POSIX and self.__class__.__name__ != "LimitedUserTestCase" \
- and SUPPORT_CONNECTIONS:
- self.assert_(isinstance(p.get_connections(), list))
- self.assert_(isinstance(p.is_running(), bool))
- if not OSX or self.__class__.__name__ != "LimitedUserTestCase":
- self.assert_(isinstance(p.get_cpu_times(), tuple))
- self.assert_(isinstance(p.get_cpu_times()[0], float))
- self.assert_(isinstance(p.get_cpu_times()[1], float))
- self.assert_(isinstance(p.get_cpu_percent(0), float))
- self.assert_(isinstance(p.get_memory_info(), tuple))
- self.assert_(isinstance(p.get_memory_info()[0], int))
- self.assert_(isinstance(p.get_memory_info()[1], int))
- self.assert_(isinstance(p.get_memory_percent(), float))
- self.assert_(isinstance(p.get_num_threads(), int))
- self.assert_(isinstance(psutil.get_process_list(), list))
- self.assert_(isinstance(psutil.get_process_list()[0], psutil.Process))
- self.assert_(isinstance(psutil.process_iter(), types.GeneratorType))
- self.assert_(isinstance(psutil.process_iter().next(), psutil.Process))
- self.assert_(isinstance(psutil.get_pid_list(), list))
- self.assert_(isinstance(psutil.get_pid_list()[0], int))
- self.assert_(isinstance(psutil.pid_exists(1), bool))
+ self.assertTrue(p.status != psutil.STATUS_STOPPED)
def test_invalid_pid(self):
self.assertRaises(ValueError, psutil.Process, "1")
@@ -832,7 +1245,7 @@ class TestCase(unittest.TestCase):
self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1)
def test_zombie_process(self):
- # Test that NoSuchProcess exception gets raised in the event the
+ # Test that NoSuchProcess exception gets raised in case the
# process dies after we create the Process object.
# Example:
# >>> proc = Process(1234)
@@ -842,103 +1255,126 @@ class TestCase(unittest.TestCase):
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.kill()
- sproc.wait()
-
- self.assertRaises(psutil.NoSuchProcess, getattr, p, "ppid")
- self.assertRaises(psutil.NoSuchProcess, getattr, p, "parent")
- self.assertRaises(psutil.NoSuchProcess, getattr, p, "name")
- self.assertRaises(psutil.NoSuchProcess, getattr, p, "exe")
- self.assertRaises(psutil.NoSuchProcess, getattr, p, "cmdline")
- self.assertRaises(psutil.NoSuchProcess, getattr, p, "uid")
- self.assertRaises(psutil.NoSuchProcess, getattr, p, "gid")
- self.assertRaises(psutil.NoSuchProcess, getattr, p, "create_time")
- self.assertRaises(psutil.NoSuchProcess, getattr, p, "username")
- if hasattr(p, 'getcwd'):
- self.assertRaises(psutil.NoSuchProcess, p.getcwd)
- self.assertRaises(psutil.NoSuchProcess, p.get_open_files)
- self.assertRaises(psutil.NoSuchProcess, p.get_connections)
- self.assertRaises(psutil.NoSuchProcess, p.suspend)
- self.assertRaises(psutil.NoSuchProcess, p.resume)
- self.assertRaises(psutil.NoSuchProcess, p.kill)
- self.assertRaises(psutil.NoSuchProcess, p.terminate)
+ p.wait()
+
+ for name in dir(p):
+ if name.startswith('_')\
+ or name in ('pid', 'send_signal', 'is_running', 'set_ionice',
+ 'wait'):
+ continue
+ try:
+ meth = getattr(p, name)
+ if callable(meth):
+ meth()
+ except psutil.NoSuchProcess:
+ pass
+ else:
+ self.fail("NoSuchProcess exception not raised for %r" % name)
+
+ # other methods
+ try:
+ if os.name == 'posix':
+ p.nice = 1
+ else:
+ p.nice = psutil.NORMAL_PRIORITY_CLASS
+ except psutil.NoSuchProcess:
+ pass
+ else:
+ self.fail("exception not raised")
+ if hasattr(p, 'set_ionice'):
+ self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2)
self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
- self.assertRaises(psutil.NoSuchProcess, p.get_cpu_times)
- self.assertRaises(psutil.NoSuchProcess, p.get_cpu_percent, 0)
- self.assertRaises(psutil.NoSuchProcess, p.get_memory_info)
- self.assertRaises(psutil.NoSuchProcess, p.get_memory_percent)
- self.assertRaises(psutil.NoSuchProcess, p.get_children)
- self.assertRaises(psutil.NoSuchProcess, p.get_num_threads)
self.assertFalse(p.is_running())
def test__str__(self):
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
self.assertTrue(str(sproc.pid) in str(p))
- self.assertTrue(os.path.basename(PYTHON) in str(p))
+ # python shows up as 'Python' in cmdline on OS X so test fails on OS X
+ if not OSX:
+ self.assertTrue(os.path.basename(PYTHON) in str(p))
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.kill()
- sproc.wait()
+ p.wait()
self.assertTrue(str(sproc.pid) in str(p))
self.assertTrue("terminated" in str(p))
def test_fetch_all(self):
valid_procs = 0
- attrs = ['__str__', 'create_time', 'username', 'getcwd', 'get_cpu_times',
- 'get_memory_info', 'get_memory_percent', 'get_open_files',
- 'get_num_threads']
+ excluded_names = ['send_signal', 'suspend', 'resume', 'terminate',
+ 'kill', 'wait']
+ excluded_names += ['get_cpu_percent', 'get_children']
+ # XXX - skip slow lsof implementation;
+ if BSD:
+ excluded_names += ['get_open_files', 'get_connections']
+ if OSX:
+ excluded_names += ['get_connections']
+ attrs = []
+ for name in dir(psutil.Process):
+ if name.startswith("_"):
+ continue
+ if name.startswith("set_"):
+ continue
+ if name in excluded_names:
+ continue
+ attrs.append(name)
+
for p in psutil.process_iter():
- for attr in attrs:
- # skip slow Python implementation; we're reasonably sure
- # it works anyway
- if POSIX and attr == 'get_open_files':
- continue
+ for name in attrs:
try:
- attr = getattr(p, attr, None)
- if attr is not None and callable(attr):
- attr()
- valid_procs += 1
- except (psutil.NoSuchProcess, psutil.AccessDenied), err:
- self.assertEqual(err.pid, p.pid)
- if err.name:
- self.assertEqual(err.name, p.name)
- self.assertTrue(str(err))
- self.assertTrue(err.msg)
- except:
+ try:
+ attr = getattr(p, name, None)
+ if attr is not None and callable(attr):
+ ret = attr()
+ else:
+ ret = attr
+ valid_procs += 1
+ except (psutil.NoSuchProcess, psutil.AccessDenied):
+ err = sys.exc_info()[1]
+ self.assertEqual(err.pid, p.pid)
+ if err.name:
+ self.assertEqual(err.name, p.name)
+ self.assertTrue(str(err))
+ self.assertTrue(err.msg)
+ else:
+ if name == 'parent' or ret in (0, 0.0, [], None):
+ continue
+ self.assertTrue(ret)
+ if name == "exe":
+ self.assertTrue(os.path.isfile(ret))
+ elif name == "getcwd":
+ # XXX - temporary fix; on my Linux box
+ # chrome process cws is errnously reported
+ # as /proc/4144/fdinfo whichd doesn't exist
+ if 'chrome' in p.name:
+ continue
+ self.assertTrue(os.path.isdir(ret))
+ except Exception:
+ err = sys.exc_info()[1]
trace = traceback.format_exc()
- self.fail('Exception raised for method %s, pid %s:\n%s'
- %(attr, p.pid, trace))
+ self.fail('%s\nmethod=%s, pid=%s, retvalue=%s'
+ %(trace, name, p.pid, repr(ret)))
# we should always have a non-empty list, not including PID 0 etc.
# special cases.
self.assertTrue(valid_procs > 0)
+ @skipIf(LINUX)
def test_pid_0(self):
- # Process(0) is supposed to work on all platforms even if with
- # some differences
+ # Process(0) is supposed to work on all platforms except Linux
p = psutil.Process(0)
- if WINDOWS:
- self.assertEqual(p.name, 'System Idle Process')
- elif LINUX:
- self.assertEqual(p.name, 'sched')
- elif BSD:
- self.assertEqual(p.name, 'swapper')
- elif OSX:
- self.assertEqual(p.name, 'kernel_task')
+ self.assertTrue(p.name)
if os.name == 'posix':
- self.assertEqual(p.uid, 0)
- self.assertEqual(p.gid, 0)
- else:
- self.assertEqual(p.uid, -1)
- self.assertEqual(p.gid, -1)
+ self.assertEqual(p.uids.real, 0)
+ self.assertEqual(p.gids.real, 0)
self.assertTrue(p.ppid in (0, 1))
- self.assertEqual(p.exe, "")
+ #self.assertEqual(p.exe, "")
self.assertEqual(p.cmdline, [])
- # this can either raise AD (Win) or return 0 (UNIX)
try:
- self.assertTrue(p.get_num_threads() in (0, 1))
+ p.get_num_threads()
except psutil.AccessDenied:
pass
@@ -956,11 +1392,22 @@ class TestCase(unittest.TestCase):
else:
p.username
- # PID 0 is supposed to be available on all platforms
self.assertTrue(0 in psutil.get_pid_list())
self.assertTrue(psutil.pid_exists(0))
- # --- OS specific tests
+ def test_Popen(self):
+ # Popen class test
+ cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
+ proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ try:
+ proc.name
+ proc.stdin
+ self.assertTrue(hasattr(proc, 'name'))
+ self.assertTrue(hasattr(proc, 'stdin'))
+ self.assertRaises(AttributeError, getattr, proc, 'foo')
+ finally:
+ proc.kill()
+ proc.wait()
if hasattr(os, 'getuid'):
@@ -973,6 +1420,19 @@ if hasattr(os, 'getuid'):
PROCESS_UID = os.getuid()
PROCESS_GID = os.getgid()
+ def __init__(self, *args, **kwargs):
+ TestCase.__init__(self, *args, **kwargs)
+ # re-define all existent test methods in order to
+ # ignore AccessDenied exceptions
+ for attr in [x for x in dir(self) if x.startswith('test')]:
+ meth = getattr(self, attr)
+ def test_(self):
+ try:
+ meth()
+ except psutil.AccessDenied:
+ pass
+ setattr(self, attr, types.MethodType(test_, self))
+
def setUp(self):
os.setegid(1000)
os.seteuid(1000)
@@ -983,58 +1443,20 @@ if hasattr(os, 'getuid'):
os.seteuid(self.PROCESS_GID)
TestCase.tearDown(self)
- def test_path(self):
- # DeprecationWarning is only raised once
- pass
-
- # overridden tests known to raise AccessDenied when run
- # as limited user on different platforms
-
- if LINUX:
-
- def test_getcwd(self):
- self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd, self)
-
- def test_getcwd_2(self):
- self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd_2, self)
-
- def test_get_open_files(self):
- self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self)
-
- def test_get_connections(self):
+ def test_nice(self):
+ try:
+ psutil.Process(os.getpid()).nice = -1
+ except psutil.AccessDenied:
pass
-
- def test_exe(self):
- self.assertRaises(psutil.AccessDenied, TestCase.test_exe, self)
-
- if BSD:
-
- def test_get_open_files(self):
- self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self)
-
- def test_get_open_files2(self):
- self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self)
-
- def test_get_connections(self):
- self.assertRaises(psutil.AccessDenied, TestCase.test_get_connections, self)
-
- def test_connection_fromfd(self):
- self.assertRaises(psutil.AccessDenied, TestCase.test_connection_fromfd, self)
+ else:
+ self.fail("exception not raised")
def test_main():
tests = []
test_suite = unittest.TestSuite()
-
tests.append(TestCase)
- if hasattr(os, 'getuid'):
- if os.getuid() == 0:
- tests.append(LimitedUserTestCase)
- else:
- atexit.register(warnings.warn, "Couldn't run limited user tests ("
- "super-user privileges are required)", RuntimeWarning)
-
if POSIX:
from _posix import PosixSpecificTestCase
tests.append(PosixSpecificTestCase)
@@ -1050,13 +1472,22 @@ def test_main():
from _bsd import BSDSpecificTestCase as stc
tests.append(stc)
+ if hasattr(os, 'getuid'):
+ if os.getuid() == 0:
+ tests.append(LimitedUserTestCase)
+ else:
+ atexit.register(warnings.warn, "Couldn't run limited user tests ("
+ "super-user privileges are required)", RuntimeWarning)
+
for test_class in tests:
test_suite.addTest(unittest.makeSuite(test_class))
+ f = open(TESTFN, 'w')
+ f.close()
+ atexit.register(lambda: os.remove(TESTFN))
+
unittest.TextTestRunner(verbosity=2).run(test_suite)
DEVNULL.close()
if __name__ == '__main__':
test_main()
-
-
« no previous file with comments | « third_party/psutil/test/test_memory_leaks.py ('k') | tools/checklicenses/checklicenses.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698