| Index: third_party/psutil/test/test_psutil.py
|
| diff --git a/third_party/psutil/test/test_psutil.py b/third_party/psutil/test/test_psutil.py
|
| index c067dee07a7bc5dcaa82f0e4738ce439ce3c8feb..35ed7442888106df0de7511e210a60fa4e1895bf 100644
|
| --- a/third_party/psutil/test/test_psutil.py
|
| +++ b/third_party/psutil/test/test_psutil.py
|
| @@ -1,13 +1,16 @@
|
| #!/usr/bin/env python
|
| #
|
| -# $Id: test_psutil.py 806 2010-11-12 23:09:35Z g.rodola $
|
| +# $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $
|
| #
|
| +# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
|
|
| -"""psutil test suite.
|
| -Note: this is targeted for python 2.x.
|
| -To run it under python 3.x you need to use 2to3 tool first:
|
| +"""
|
| +psutil test suite.
|
|
|
| -$ 2to3 -w test/*.py
|
| +Note: this is targeted for both python 2.x and 3.x so there's no need
|
| +to use 2to3 tool first.
|
| """
|
|
|
| import unittest
|
| @@ -23,13 +26,16 @@ import warnings
|
| import atexit
|
| import errno
|
| import threading
|
| +import tempfile
|
| +import collections
|
|
|
| import psutil
|
|
|
|
|
| PYTHON = os.path.realpath(sys.executable)
|
| DEVNULL = open(os.devnull, 'r+')
|
| -
|
| +TESTFN = os.path.join(os.getcwd(), "$testfile")
|
| +PY3 = sys.version_info >= (3,)
|
| POSIX = os.name == 'posix'
|
| LINUX = sys.platform.lower().startswith("linux")
|
| WINDOWS = sys.platform.lower().startswith("win32")
|
| @@ -38,13 +44,20 @@ BSD = sys.platform.lower().startswith("freebsd")
|
|
|
| try:
|
| psutil.Process(os.getpid()).get_connections()
|
| -except NotImplementedError, err:
|
| +except NotImplementedError:
|
| + err = sys.exc_info()[1]
|
| SUPPORT_CONNECTIONS = False
|
| atexit.register(warnings.warn, "get_connections() not supported on this platform",
|
| RuntimeWarning)
|
| else:
|
| SUPPORT_CONNECTIONS = True
|
|
|
| +if PY3:
|
| + long = int
|
| +
|
| + def callable(attr):
|
| + return isinstance(attr, collections.Callable)
|
| +
|
|
|
| _subprocesses_started = set()
|
|
|
| @@ -59,6 +72,21 @@ def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None):
|
| _subprocesses_started.add(sproc.pid)
|
| return sproc
|
|
|
| +def sh(cmdline):
|
| + """run cmd in a subprocess and return its output.
|
| + raises RuntimeError on error.
|
| + """
|
| + p = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE,
|
| + stderr=subprocess.PIPE)
|
| + stdout, stderr = p.communicate()
|
| + if p.returncode != 0:
|
| + raise RuntimeError(stderr)
|
| + if stderr:
|
| + warnings.warn(stderr, RuntimeWarning)
|
| + if sys.version_info >= (3,):
|
| + stdout = str(stdout, sys.stdout.encoding)
|
| + return stdout.strip()
|
| +
|
| def wait_for_pid(pid, timeout=1):
|
| """Wait for pid to show up in the process list then return.
|
| Used in the test suite to give time the sub process to initialize.
|
| @@ -73,38 +101,11 @@ def wait_for_pid(pid, timeout=1):
|
| if time.time() >= raise_at:
|
| raise RuntimeError("Timed out")
|
|
|
| -def kill(pid):
|
| - """Kill a process given its PID."""
|
| - if hasattr(os, 'kill'):
|
| - os.kill(pid, signal.SIGKILL)
|
| - else:
|
| - psutil.Process(pid).kill()
|
| -
|
| def reap_children(search_all=False):
|
| """Kill any subprocess started by this test suite and ensure that
|
| no zombies stick around to hog resources and create problems when
|
| looking for refleaks.
|
| """
|
| - if POSIX:
|
| - def waitpid(process):
|
| - # on posix we are free to wait for any pending process by
|
| - # passing -1 to os.waitpid()
|
| - while True:
|
| - try:
|
| - any_process = -1
|
| - pid, status = os.waitpid(any_process, os.WNOHANG)
|
| - if pid == 0 and not process.is_running():
|
| - break
|
| - except OSError:
|
| - if not process.is_running():
|
| - break
|
| - else:
|
| - def waitpid(process):
|
| - # on non-posix systems we just wait for the given process
|
| - # to go away
|
| - while process.is_running():
|
| - time.sleep(0.01)
|
| -
|
| if search_all:
|
| this_process = psutil.Process(os.getpid())
|
| pids = [x.pid for x in this_process.get_children()]
|
| @@ -118,7 +119,8 @@ def reap_children(search_all=False):
|
| except psutil.NoSuchProcess:
|
| pass
|
| else:
|
| - waitpid(child)
|
| + child.wait()
|
| +
|
|
|
| # we want to search through all processes before exiting
|
| atexit.register(reap_children, search_all=True)
|
| @@ -155,90 +157,123 @@ def skipUnless(condition, reason="", warn=False):
|
| return skipIf(True, reason, warn)
|
| return skipIf(False)
|
|
|
| +def ignore_access_denied(fun):
|
| + """Decorator to Ignore AccessDenied exceptions."""
|
| + def outer(fun, *args, **kwargs):
|
| + def inner(self):
|
| + try:
|
| + return fun(self, *args, **kwargs)
|
| + except psutil.AccessDenied:
|
| + pass
|
| + return inner
|
| + return outer
|
| +
|
| +def supports_ipv6():
|
| + """Return True if IPv6 is supported on this platform."""
|
| + if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
|
| + return False
|
| + sock = None
|
| + try:
|
| + try:
|
| + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
| + sock.bind(("::1", 0))
|
| + except (socket.error, socket.gaierror):
|
| + return False
|
| + else:
|
| + return True
|
| + finally:
|
| + if sock is not None:
|
| + sock.close()
|
| +
|
| +
|
| +class ThreadTask(threading.Thread):
|
| + """A thread object used for running process thread tests."""
|
| +
|
| + def __init__(self):
|
| + threading.Thread.__init__(self)
|
| + self._running = False
|
| + self._interval = None
|
| + self._flag = threading.Event()
|
| +
|
| + def __repr__(self):
|
| + name = self.__class__.__name__
|
| + return '<%s running=%s at %#x>' % (name, self._running, id(self))
|
| +
|
| + def start(self, interval=0.001):
|
| + """Start thread and keep it running until an explicit
|
| + stop() request. Polls for shutdown every 'timeout' seconds.
|
| + """
|
| + if self._running:
|
| + raise ValueError("already started")
|
| + self._interval = interval
|
| + threading.Thread.start(self)
|
| + self._flag.wait()
|
| +
|
| + def run(self):
|
| + self._running = True
|
| + self._flag.set()
|
| + while self._running:
|
| + time.sleep(self._interval)
|
| +
|
| + def stop(self):
|
| + """Stop thread execution and and waits until it is stopped."""
|
| + if not self._running:
|
| + raise ValueError("already stopped")
|
| + self._running = False
|
| + self.join()
|
| +
|
|
|
| class TestCase(unittest.TestCase):
|
|
|
| def tearDown(self):
|
| reap_children()
|
|
|
| + # ============================
|
| + # tests for system-related API
|
| + # ============================
|
| +
|
| def test_get_process_list(self):
|
| pids = [x.pid for x in psutil.get_process_list()]
|
| self.assertTrue(os.getpid() in pids)
|
| - self.assertTrue(0 in pids)
|
|
|
| def test_process_iter(self):
|
| pids = [x.pid for x in psutil.process_iter()]
|
| self.assertTrue(os.getpid() in pids)
|
| - self.assertTrue(0 in pids)
|
| -
|
| - def test_kill(self):
|
| - sproc = get_test_subprocess()
|
| - test_pid = sproc.pid
|
| - wait_for_pid(test_pid)
|
| - p = psutil.Process(test_pid)
|
| - name = p.name
|
| - p.kill()
|
| - sproc.wait()
|
| - self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| -
|
| - def test_terminate(self):
|
| - sproc = get_test_subprocess()
|
| - test_pid = sproc.pid
|
| - wait_for_pid(test_pid)
|
| - p = psutil.Process(test_pid)
|
| - name = p.name
|
| - p.terminate()
|
| - sproc.wait()
|
| - self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| -
|
| - def test_send_signal(self):
|
| - if POSIX:
|
| - sig = signal.SIGKILL
|
| - else:
|
| - sig = signal.SIGTERM
|
| - sproc = get_test_subprocess()
|
| - test_pid = sproc.pid
|
| - p = psutil.Process(test_pid)
|
| - name = p.name
|
| - p.send_signal(sig)
|
| - sproc.wait()
|
| - self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
|
|
| def test_TOTAL_PHYMEM(self):
|
| x = psutil.TOTAL_PHYMEM
|
| self.assertTrue(isinstance(x, (int, long)))
|
| self.assertTrue(x > 0)
|
|
|
| - def test_used_phymem(self):
|
| - x = psutil.used_phymem()
|
| - self.assertTrue(isinstance(x, (int, long)))
|
| - self.assertTrue(x > 0)
|
| -
|
| - def test_avail_phymem(self):
|
| - x = psutil.avail_phymem()
|
| - self.assertTrue(isinstance(x, (int, long)))
|
| + def test_BOOT_TIME(self):
|
| + x = psutil.BOOT_TIME
|
| + self.assertTrue(isinstance(x, float))
|
| self.assertTrue(x > 0)
|
|
|
| - def test_total_virtmem(self):
|
| - x = psutil.total_virtmem()
|
| - self.assertTrue(isinstance(x, (int, long)))
|
| - self.assertTrue(x >= 0)
|
| -
|
| - def test_used_virtmem(self):
|
| - x = psutil.used_virtmem()
|
| - self.assertTrue(isinstance(x, (int, long)))
|
| - self.assertTrue(x >= 0)
|
| + def test_deprecated_memory_functions(self):
|
| + warnings.filterwarnings("error")
|
| + try:
|
| + self.assertRaises(DeprecationWarning, psutil.used_phymem)
|
| + self.assertRaises(DeprecationWarning, psutil.avail_phymem)
|
| + self.assertRaises(DeprecationWarning, psutil.total_virtmem)
|
| + self.assertRaises(DeprecationWarning, psutil.used_virtmem)
|
| + self.assertRaises(DeprecationWarning, psutil.avail_virtmem)
|
| + finally:
|
| + warnings.resetwarnings()
|
|
|
| - def test_avail_virtmem(self):
|
| - x = psutil.avail_virtmem()
|
| - self.assertTrue(isinstance(x, (int, long)))
|
| - self.assertTrue(x >= 0)
|
| + def test_phymem_usage(self):
|
| + mem = psutil.phymem_usage()
|
| + self.assertTrue(mem.total > 0)
|
| + self.assertTrue(mem.used > 0)
|
| + self.assertTrue(mem.free > 0)
|
| + self.assertTrue(0 <= mem.percent <= 100)
|
|
|
| - @skipUnless(LINUX)
|
| - def test_cached_phymem(self):
|
| - x = psutil.cached_phymem()
|
| - self.assertTrue(isinstance(x, (int, long)))
|
| - self.assertTrue(x >= 0)
|
| + def test_virtmem_usage(self):
|
| + mem = psutil.virtmem_usage()
|
| + self.assertTrue(mem.total > 0)
|
| + self.assertTrue(mem.used >= 0)
|
| + self.assertTrue(mem.free > 0)
|
| + self.assertTrue(0 <= mem.percent <= 100)
|
|
|
| @skipUnless(LINUX)
|
| def test_phymem_buffers(self):
|
| @@ -246,20 +281,60 @@ class TestCase(unittest.TestCase):
|
| self.assertTrue(isinstance(x, (int, long)))
|
| self.assertTrue(x >= 0)
|
|
|
| - def test_system_cpu_times(self):
|
| + def test_pid_exists(self):
|
| + sproc = get_test_subprocess()
|
| + wait_for_pid(sproc.pid)
|
| + self.assertTrue(psutil.pid_exists(sproc.pid))
|
| + p = psutil.Process(sproc.pid)
|
| + p.kill()
|
| + p.wait()
|
| + self.assertFalse(psutil.pid_exists(sproc.pid))
|
| + self.assertFalse(psutil.pid_exists(-1))
|
| +
|
| + def test_pid_exists_2(self):
|
| + reap_children()
|
| + pids = psutil.get_pid_list()
|
| + for pid in pids:
|
| + try:
|
| + self.assertTrue(psutil.pid_exists(pid))
|
| + except AssertionError:
|
| + # in case the process disappeared in meantime fail only
|
| + # if it is no longer in get_pid_list()
|
| + time.sleep(.1)
|
| + if pid in psutil.get_pid_list():
|
| + self.fail(pid)
|
| + pids = range(max(pids) + 5000, max(pids) + 6000)
|
| + for pid in pids:
|
| + self.assertFalse(psutil.pid_exists(pid))
|
| +
|
| + def test_get_pid_list(self):
|
| + plist = [x.pid for x in psutil.get_process_list()]
|
| + pidlist = psutil.get_pid_list()
|
| + self.assertEqual(plist.sort(), pidlist.sort())
|
| + # make sure every pid is unique
|
| + self.assertEqual(len(pidlist), len(set(pidlist)))
|
| +
|
| + def test_test(self):
|
| + # test for psutil.test() function
|
| + stdout = sys.stdout
|
| + sys.stdout = DEVNULL
|
| + try:
|
| + psutil.test()
|
| + finally:
|
| + sys.stdout = stdout
|
| +
|
| + def test_sys_cpu_times(self):
|
| total = 0
|
| times = psutil.cpu_times()
|
| - self.assertTrue(isinstance(times, psutil.CPUTimes))
|
| sum(times)
|
| for cp_time in times:
|
| self.assertTrue(isinstance(cp_time, float))
|
| self.assertTrue(cp_time >= 0.0)
|
| total += cp_time
|
| - # test CPUTimes's __iter__ and __str__ implementation
|
| self.assertEqual(total, sum(times))
|
| str(times)
|
|
|
| - def test_system_cpu_times2(self):
|
| + def test_sys_cpu_times2(self):
|
| t1 = sum(psutil.cpu_times())
|
| time.sleep(0.1)
|
| t2 = sum(psutil.cpu_times())
|
| @@ -267,26 +342,283 @@ class TestCase(unittest.TestCase):
|
| if not difference >= 0.05:
|
| self.fail("difference %s" % difference)
|
|
|
| - def test_system_cpu_percent(self):
|
| + def test_sys_per_cpu_times(self):
|
| + for times in psutil.cpu_times(percpu=True):
|
| + total = 0
|
| + sum(times)
|
| + for cp_time in times:
|
| + self.assertTrue(isinstance(cp_time, float))
|
| + self.assertTrue(cp_time >= 0.0)
|
| + total += cp_time
|
| + self.assertEqual(total, sum(times))
|
| + str(times)
|
| +
|
| + def test_sys_per_cpu_times2(self):
|
| + tot1 = psutil.cpu_times(percpu=True)
|
| + stop_at = time.time() + 0.1
|
| + while 1:
|
| + if time.time() >= stop_at:
|
| + break
|
| + tot2 = psutil.cpu_times(percpu=True)
|
| + for t1, t2 in zip(tot1, tot2):
|
| + t1, t2 = sum(t1), sum(t2)
|
| + difference = t2 - t1
|
| + if difference >= 0.05:
|
| + return
|
| + self.fail()
|
| +
|
| + def test_sys_cpu_percent(self):
|
| psutil.cpu_percent(interval=0.001)
|
| psutil.cpu_percent(interval=0.001)
|
| - for x in xrange(1000):
|
| + for x in range(1000):
|
| percent = psutil.cpu_percent(interval=None)
|
| self.assertTrue(isinstance(percent, float))
|
| self.assertTrue(percent >= 0.0)
|
| self.assertTrue(percent <= 100.0)
|
|
|
| - def test_process_cpu_percent(self):
|
| + def test_sys_per_cpu_percent(self):
|
| + psutil.cpu_percent(interval=0.001, percpu=True)
|
| + psutil.cpu_percent(interval=0.001, percpu=True)
|
| + for x in range(1000):
|
| + percents = psutil.cpu_percent(interval=None, percpu=True)
|
| + for percent in percents:
|
| + self.assertTrue(isinstance(percent, float))
|
| + self.assertTrue(percent >= 0.0)
|
| + self.assertTrue(percent <= 100.0)
|
| +
|
| + def test_sys_cpu_percent_compare(self):
|
| + psutil.cpu_percent(interval=0)
|
| + psutil.cpu_percent(interval=0, percpu=True)
|
| + time.sleep(.1)
|
| + t1 = psutil.cpu_percent(interval=0)
|
| + t2 = psutil.cpu_percent(interval=0, percpu=True)
|
| + # calculate total average
|
| + t2 = sum(t2) / len(t2)
|
| + if abs(t1 - t2) > 5:
|
| + self.assertEqual(t1, t2)
|
| +
|
| + def test_disk_usage(self):
|
| + usage = psutil.disk_usage(os.getcwd())
|
| + self.assertTrue(usage.total > 0)
|
| + self.assertTrue(usage.used > 0)
|
| + self.assertTrue(usage.free > 0)
|
| + self.assertTrue(usage.total > usage.used)
|
| + self.assertTrue(usage.total > usage.free)
|
| + self.assertTrue(0 <= usage.percent <= 100)
|
| +
|
| + # if path does not exist OSError ENOENT is expected across
|
| + # all platforms
|
| + fname = tempfile.mktemp()
|
| + try:
|
| + psutil.disk_usage(fname)
|
| + except OSError:
|
| + err = sys.exc_info()[1]
|
| + if err.args[0] != errno.ENOENT:
|
| + raise
|
| + else:
|
| + self.fail("OSError not raised")
|
| +
|
| + def test_disk_partitions(self):
|
| + for disk in psutil.disk_partitions(all=False):
|
| + self.assertTrue(os.path.exists(disk.device))
|
| + self.assertTrue(os.path.isdir(disk.mountpoint))
|
| + self.assertTrue(disk.fstype)
|
| + for disk in psutil.disk_partitions(all=True):
|
| + if not WINDOWS:
|
| + self.assertTrue(os.path.isdir(disk.mountpoint))
|
| + self.assertTrue(disk.fstype)
|
| +
|
| + def find_mount_point(path):
|
| + path = os.path.abspath(path)
|
| + while not os.path.ismount(path):
|
| + path = os.path.dirname(path)
|
| + return path
|
| +
|
| + mount = find_mount_point(__file__)
|
| + mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
|
| + self.assertTrue(mount in mounts)
|
| + psutil.disk_usage(mount)
|
| +
|
| + # XXX
|
| + @skipUnless(hasattr(psutil, "network_io_counters"))
|
| + def test_anetwork_io_counters(self):
|
| + def check_ntuple(nt):
|
| + self.assertEqual(nt[0], nt.bytes_sent)
|
| + self.assertEqual(nt[1], nt.bytes_recv)
|
| + self.assertEqual(nt[2], nt.packets_sent)
|
| + self.assertEqual(nt[3], nt.packets_recv)
|
| + self.assertTrue(nt.bytes_sent >= 0)
|
| + self.assertTrue(nt.bytes_recv >= 0)
|
| + self.assertTrue(nt.packets_sent >= 0)
|
| + self.assertTrue(nt.packets_recv >= 0)
|
| +
|
| + ret = psutil.network_io_counters(pernic=False)
|
| + check_ntuple(ret)
|
| + ret = psutil.network_io_counters(pernic=True)
|
| + for name, ntuple in ret.iteritems():
|
| + self.assertTrue(name)
|
| + check_ntuple(ntuple)
|
| + # XXX
|
| + @skipUnless(hasattr(psutil, "disk_io_counters"))
|
| + def test_disk_io_counters(self):
|
| + def check_ntuple(nt):
|
| + self.assertEqual(nt[0], nt.read_count)
|
| + self.assertEqual(nt[1], nt.write_count)
|
| + self.assertEqual(nt[2], nt.read_bytes)
|
| + self.assertEqual(nt[3], nt.write_bytes)
|
| + self.assertEqual(nt[4], nt.read_time)
|
| + self.assertEqual(nt[5], nt.write_time)
|
| + self.assertTrue(nt.read_count >= 0)
|
| + self.assertTrue(nt.write_count >= 0)
|
| + self.assertTrue(nt.read_bytes >= 0)
|
| + self.assertTrue(nt.write_bytes >= 0)
|
| + self.assertTrue(nt.read_time >= 0)
|
| + self.assertTrue(nt.write_time >= 0)
|
| +
|
| + ret = psutil.disk_io_counters(perdisk=False)
|
| + check_ntuple(ret)
|
| + ret = psutil.disk_io_counters(perdisk=True)
|
| + for name, ntuple in ret.iteritems():
|
| + self.assertTrue(name)
|
| + check_ntuple(ntuple)
|
| +
|
| + # ====================
|
| + # Process object tests
|
| + # ====================
|
| +
|
| + def test_kill(self):
|
| + sproc = get_test_subprocess()
|
| + test_pid = sproc.pid
|
| + wait_for_pid(test_pid)
|
| + p = psutil.Process(test_pid)
|
| + name = p.name
|
| + p.kill()
|
| + p.wait()
|
| + self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| +
|
| + def test_terminate(self):
|
| + sproc = get_test_subprocess()
|
| + test_pid = sproc.pid
|
| + wait_for_pid(test_pid)
|
| + p = psutil.Process(test_pid)
|
| + name = p.name
|
| + p.terminate()
|
| + p.wait()
|
| + self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| +
|
| + def test_send_signal(self):
|
| + if POSIX:
|
| + sig = signal.SIGKILL
|
| + else:
|
| + sig = signal.SIGTERM
|
| + sproc = get_test_subprocess()
|
| + test_pid = sproc.pid
|
| + p = psutil.Process(test_pid)
|
| + name = p.name
|
| + p.send_signal(sig)
|
| + p.wait()
|
| + self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| +
|
| + def test_wait(self):
|
| + # check exit code signal
|
| + sproc = get_test_subprocess()
|
| + p = psutil.Process(sproc.pid)
|
| + p.kill()
|
| + code = p.wait()
|
| + if os.name == 'posix':
|
| + self.assertEqual(code, signal.SIGKILL)
|
| + else:
|
| + self.assertEqual(code, 0)
|
| + self.assertFalse(p.is_running())
|
| +
|
| + sproc = get_test_subprocess()
|
| + p = psutil.Process(sproc.pid)
|
| + p.terminate()
|
| + code = p.wait()
|
| + if os.name == 'posix':
|
| + self.assertEqual(code, signal.SIGTERM)
|
| + else:
|
| + self.assertEqual(code, 0)
|
| + self.assertFalse(p.is_running())
|
| +
|
| + # check sys.exit() code
|
| + code = "import time, sys; time.sleep(0.01); sys.exit(5);"
|
| + sproc = get_test_subprocess([PYTHON, "-c", code])
|
| + p = psutil.Process(sproc.pid)
|
| + self.assertEqual(p.wait(), 5)
|
| + self.assertFalse(p.is_running())
|
| +
|
| + # Test wait() issued twice.
|
| + # It is not supposed to raise NSP when the process is gone.
|
| + # On UNIX this should return None, on Windows it should keep
|
| + # returning the exit code.
|
| + sproc = get_test_subprocess([PYTHON, "-c", code])
|
| + p = psutil.Process(sproc.pid)
|
| + self.assertEqual(p.wait(), 5)
|
| + self.assertTrue(p.wait() in (5, None))
|
| +
|
| + # test timeout
|
| + sproc = get_test_subprocess()
|
| + p = psutil.Process(sproc.pid)
|
| + p.name
|
| + self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
|
| +
|
| + # timeout < 0 not allowed
|
| + self.assertRaises(ValueError, p.wait, -1)
|
| +
|
| + @skipUnless(POSIX)
|
| + def test_wait_non_children(self):
|
| + # test wait() against processes which are not our children
|
| + code = "import sys;"
|
| + code += "from subprocess import Popen, PIPE;"
|
| + code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON
|
| + code += "sp = Popen(cmd, stdout=PIPE);"
|
| + code += "sys.stdout.write(str(sp.pid));"
|
| + sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE)
|
| +
|
| + grandson_pid = int(sproc.stdout.read())
|
| + grandson_proc = psutil.Process(grandson_pid)
|
| + try:
|
| + self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
|
| + grandson_proc.kill()
|
| + ret = grandson_proc.wait()
|
| + self.assertEqual(ret, None)
|
| + finally:
|
| + if grandson_proc.is_running():
|
| + grandson_proc.kill()
|
| + grandson_proc.wait()
|
| +
|
| + def test_wait_timeout_0(self):
|
| + sproc = get_test_subprocess()
|
| + p = psutil.Process(sproc.pid)
|
| + self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
|
| + p.kill()
|
| + stop_at = time.time() + 2
|
| + while 1:
|
| + try:
|
| + code = p.wait(0)
|
| + except psutil.TimeoutExpired:
|
| + if time.time() >= stop_at:
|
| + raise
|
| + else:
|
| + break
|
| + if os.name == 'posix':
|
| + self.assertEqual(code, signal.SIGKILL)
|
| + else:
|
| + self.assertEqual(code, 0)
|
| + self.assertFalse(p.is_running())
|
| +
|
| + def test_cpu_percent(self):
|
| p = psutil.Process(os.getpid())
|
| p.get_cpu_percent(interval=0.001)
|
| p.get_cpu_percent(interval=0.001)
|
| - for x in xrange(100):
|
| + for x in range(100):
|
| percent = p.get_cpu_percent(interval=None)
|
| self.assertTrue(isinstance(percent, float))
|
| self.assertTrue(percent >= 0.0)
|
| self.assertTrue(percent <= 100.0)
|
|
|
| - def test_get_process_cpu_times(self):
|
| + def test_cpu_times(self):
|
| times = psutil.Process(os.getpid()).get_cpu_times()
|
| self.assertTrue((times.user > 0.0) or (times.system > 0.0))
|
| # make sure returned values can be pretty printed with strftime
|
| @@ -300,7 +632,7 @@ class TestCase(unittest.TestCase):
|
| # try this with Python 2.7 and re-enable the test.
|
|
|
| @skipUnless(sys.version_info > (2, 6, 1) and not OSX)
|
| - def test_get_process_cpu_times2(self):
|
| + def test_cpu_times2(self):
|
| user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
|
| utime, ktime = os.times()[:2]
|
|
|
| @@ -331,20 +663,113 @@ class TestCase(unittest.TestCase):
|
| # make sure returned value can be pretty printed with strftime
|
| time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
|
|
|
| - def test_get_num_threads(self):
|
| + @skipIf(WINDOWS)
|
| + def test_terminal(self):
|
| + tty = sh('tty')
|
| p = psutil.Process(os.getpid())
|
| - numt1 = p.get_num_threads()
|
| - if not WINDOWS and not OSX:
|
| - # test is unreliable on Windows and OS X
|
| - # NOTE: sleep(1) is too long for OS X, works with sleep(.5)
|
| - self.assertEqual(numt1, 1)
|
| - t = threading.Thread(target=lambda:time.sleep(1))
|
| - t.start()
|
| - numt2 = p.get_num_threads()
|
| - if WINDOWS:
|
| - self.assertTrue(numt2 > numt1)
|
| + self.assertEqual(p.terminal, tty)
|
| +
|
| + @skipIf(OSX, warn=False)
|
| + def test_get_io_counters(self):
|
| + p = psutil.Process(os.getpid())
|
| + # test reads
|
| + io1 = p.get_io_counters()
|
| + f = open(PYTHON, 'rb')
|
| + f.read()
|
| + f.close()
|
| + io2 = p.get_io_counters()
|
| + if not BSD:
|
| + self.assertTrue(io2.read_count > io1.read_count)
|
| + self.assertTrue(io2.write_count == io1.write_count)
|
| + self.assertTrue(io2.read_bytes >= io1.read_bytes)
|
| + self.assertTrue(io2.write_bytes >= io1.write_bytes)
|
| + # test writes
|
| + io1 = p.get_io_counters()
|
| + f = tempfile.TemporaryFile()
|
| + if sys.version_info >= (3,):
|
| + f.write(bytes("x" * 1000000, 'ascii'))
|
| else:
|
| - self.assertEqual(numt2, 2)
|
| + f.write("x" * 1000000)
|
| + f.close()
|
| + io2 = p.get_io_counters()
|
| + if not BSD:
|
| + self.assertTrue(io2.write_count > io1.write_count)
|
| + self.assertTrue(io2.write_bytes > io1.write_bytes)
|
| + self.assertTrue(io2.read_count >= io1.read_count)
|
| + self.assertTrue(io2.read_bytes >= io1.read_bytes)
|
| +
|
| + @skipUnless(LINUX)
|
| + def test_get_set_ionice(self):
|
| + from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE,
|
| + IOPRIO_CLASS_IDLE)
|
| + self.assertEqual(IOPRIO_CLASS_NONE, 0)
|
| + self.assertEqual(IOPRIO_CLASS_RT, 1)
|
| + self.assertEqual(IOPRIO_CLASS_BE, 2)
|
| + self.assertEqual(IOPRIO_CLASS_IDLE, 3)
|
| + p = psutil.Process(os.getpid())
|
| + try:
|
| + p.set_ionice(2)
|
| + ioclass, value = p.get_ionice()
|
| + self.assertEqual(ioclass, 2)
|
| + self.assertEqual(value, 4)
|
| + #
|
| + p.set_ionice(3)
|
| + ioclass, value = p.get_ionice()
|
| + self.assertEqual(ioclass, 3)
|
| + self.assertEqual(value, 0)
|
| + #
|
| + p.set_ionice(2, 0)
|
| + ioclass, value = p.get_ionice()
|
| + self.assertEqual(ioclass, 2)
|
| + self.assertEqual(value, 0)
|
| + p.set_ionice(2, 7)
|
| + ioclass, value = p.get_ionice()
|
| + self.assertEqual(ioclass, 2)
|
| + self.assertEqual(value, 7)
|
| + self.assertRaises(ValueError, p.set_ionice, 2, 10)
|
| + finally:
|
| + p.set_ionice(IOPRIO_CLASS_NONE)
|
| +
|
| + def test_get_num_threads(self):
|
| + # on certain platforms such as Linux we might test for exact
|
| + # thread number, since we always have with 1 thread per process,
|
| + # but this does not apply across all platforms (OSX, Windows)
|
| + p = psutil.Process(os.getpid())
|
| + step1 = p.get_num_threads()
|
| +
|
| + thread = ThreadTask()
|
| + thread.start()
|
| + try:
|
| + step2 = p.get_num_threads()
|
| + self.assertEqual(step2, step1 + 1)
|
| + thread.stop()
|
| + finally:
|
| + if thread._running:
|
| + thread.stop()
|
| +
|
| + def test_get_threads(self):
|
| + p = psutil.Process(os.getpid())
|
| + step1 = p.get_threads()
|
| +
|
| + thread = ThreadTask()
|
| + thread.start()
|
| +
|
| + try:
|
| + step2 = p.get_threads()
|
| + self.assertEqual(len(step2), len(step1) + 1)
|
| + # on Linux, first thread id is supposed to be this process
|
| + if LINUX:
|
| + self.assertEqual(step2[0].id, os.getpid())
|
| + athread = step2[0]
|
| + # test named tuple
|
| + self.assertEqual(athread.id, athread[0])
|
| + self.assertEqual(athread.user_time, athread[1])
|
| + self.assertEqual(athread.system_time, athread[2])
|
| + # test num threads
|
| + thread.stop()
|
| + finally:
|
| + if thread._running:
|
| + thread.stop()
|
|
|
| def test_get_memory_info(self):
|
| p = psutil.Process(os.getpid())
|
| @@ -374,33 +799,30 @@ class TestCase(unittest.TestCase):
|
| sproc = get_test_subprocess()
|
| self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
|
|
|
| - def test_eq(self):
|
| - sproc = get_test_subprocess()
|
| - wait_for_pid(sproc.pid)
|
| - self.assertTrue(psutil.Process(sproc.pid) == psutil.Process(sproc.pid))
|
| -
|
| def test_is_running(self):
|
| sproc = get_test_subprocess()
|
| wait_for_pid(sproc.pid)
|
| p = psutil.Process(sproc.pid)
|
| self.assertTrue(p.is_running())
|
| - psutil.Process(sproc.pid).kill()
|
| - sproc.wait()
|
| + p.kill()
|
| + p.wait()
|
| self.assertFalse(p.is_running())
|
|
|
| - def test_pid_exists(self):
|
| - sproc = get_test_subprocess()
|
| - wait_for_pid(sproc.pid)
|
| - self.assertTrue(psutil.pid_exists(sproc.pid))
|
| - psutil.Process(sproc.pid).kill()
|
| - sproc.wait()
|
| - self.assertFalse(psutil.pid_exists(sproc.pid))
|
| - self.assertFalse(psutil.pid_exists(-1))
|
| -
|
| def test_exe(self):
|
| sproc = get_test_subprocess()
|
| wait_for_pid(sproc.pid)
|
| - self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON)
|
| + try:
|
| + self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON)
|
| + except AssertionError:
|
| + # certain platforms such as BSD are more accurate returning:
|
| + # "/usr/local/bin/python2.7"
|
| + # ...instead of:
|
| + # "/usr/local/bin/python"
|
| + # We do not want to consider this difference in accuracy
|
| + # an error.
|
| + name = psutil.Process(sproc.pid).exe
|
| + adjusted_name = PYTHON[:len(name)]
|
| + self.assertEqual(name, adjusted_name)
|
| for p in psutil.process_iter():
|
| try:
|
| exe = p.exe
|
| @@ -416,14 +838,6 @@ class TestCase(unittest.TestCase):
|
| self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s)" \
|
| % (repr(p.exe), p.pid, p.name, p.cmdline))
|
|
|
| - def test_path(self):
|
| - proc = psutil.Process(os.getpid())
|
| - warnings.filterwarnings("error")
|
| - try:
|
| - self.assertRaises(DeprecationWarning, getattr, proc, 'path')
|
| - finally:
|
| - warnings.resetwarnings()
|
| -
|
| def test_cmdline(self):
|
| sproc = get_test_subprocess([PYTHON, "-E"])
|
| wait_for_pid(sproc.pid)
|
| @@ -432,40 +846,82 @@ class TestCase(unittest.TestCase):
|
| def test_name(self):
|
| sproc = get_test_subprocess(PYTHON)
|
| wait_for_pid(sproc.pid)
|
| - if OSX:
|
| - self.assertEqual(psutil.Process(sproc.pid).name, "Python")
|
| - else:
|
| - self.assertEqual(psutil.Process(sproc.pid).name, os.path.basename(PYTHON))
|
| -
|
| - def test_uid(self):
|
| - sproc = get_test_subprocess()
|
| - wait_for_pid(sproc.pid)
|
| - uid = psutil.Process(sproc.pid).uid
|
| - if hasattr(os, 'getuid'):
|
| - self.assertEqual(uid, os.getuid())
|
| - else:
|
| - # On those platforms where UID doesn't make sense (Windows)
|
| - # we expect it to be -1
|
| - self.assertEqual(uid, -1)
|
| -
|
| - def test_gid(self):
|
| - sproc = get_test_subprocess()
|
| - wait_for_pid(sproc.pid)
|
| - gid = psutil.Process(sproc.pid).gid
|
| - if hasattr(os, 'getgid'):
|
| - self.assertEqual(gid, os.getgid())
|
| - else:
|
| - # On those platforms where GID doesn't make sense (Windows)
|
| - # we expect it to be -1
|
| - self.assertEqual(gid, -1)
|
| + self.assertEqual(psutil.Process(sproc.pid).name.lower(),
|
| + os.path.basename(sys.executable).lower())
|
| +
|
| + if os.name == 'posix':
|
| +
|
| + def test_uids(self):
|
| + p = psutil.Process(os.getpid())
|
| + real, effective, saved = p.uids
|
| + # os.getuid() refers to "real" uid
|
| + self.assertEqual(real, os.getuid())
|
| + # os.geteuid() refers to "effective" uid
|
| + self.assertEqual(effective, os.geteuid())
|
| + # no such thing as os.getsuid() ("saved" uid), but starting
|
| + # from python 2.7 we have os.getresuid()[2]
|
| + if hasattr(os, "getresuid"):
|
| + self.assertEqual(saved, os.getresuid()[2])
|
| +
|
| + def test_gids(self):
|
| + p = psutil.Process(os.getpid())
|
| + real, effective, saved = p.gids
|
| + # os.getuid() refers to "real" uid
|
| + self.assertEqual(real, os.getgid())
|
| + # os.geteuid() refers to "effective" uid
|
| + self.assertEqual(effective, os.getegid())
|
| + # no such thing as os.getsuid() ("saved" uid), but starting
|
| + # from python 2.7 we have os.getresgid()[2]
|
| + if hasattr(os, "getresuid"):
|
| + self.assertEqual(saved, os.getresgid()[2])
|
| +
|
| + def test_nice(self):
|
| + p = psutil.Process(os.getpid())
|
| + self.assertRaises(TypeError, setattr, p, "nice", "str")
|
| + try:
|
| + try:
|
| + first_nice = p.nice
|
| + p.nice = 1
|
| + self.assertEqual(p.nice, 1)
|
| + # going back to previous nice value raises AccessDenied on OSX
|
| + if not OSX:
|
| + p.nice = 0
|
| + self.assertEqual(p.nice, 0)
|
| + except psutil.AccessDenied:
|
| + pass
|
| + finally:
|
| + # going back to previous nice value raises AccessDenied on OSX
|
| + if not OSX:
|
| + p.nice = first_nice
|
| +
|
| + if os.name == 'nt':
|
| +
|
| + def test_nice(self):
|
| + p = psutil.Process(os.getpid())
|
| + self.assertRaises(TypeError, setattr, p, "nice", "str")
|
| + try:
|
| + self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS)
|
| + p.nice = psutil.HIGH_PRIORITY_CLASS
|
| + self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS)
|
| + p.nice = psutil.NORMAL_PRIORITY_CLASS
|
| + self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS)
|
| + finally:
|
| + p.nice = psutil.NORMAL_PRIORITY_CLASS
|
| +
|
| + def test_status(self):
|
| + p = psutil.Process(os.getpid())
|
| + self.assertEqual(p.status, psutil.STATUS_RUNNING)
|
| + self.assertEqual(str(p.status), "running")
|
| + for p in psutil.process_iter():
|
| + if str(p.status) == '?':
|
| + self.fail("invalid status for pid %d" % p.pid)
|
|
|
| def test_username(self):
|
| sproc = get_test_subprocess()
|
| p = psutil.Process(sproc.pid)
|
| if POSIX:
|
| import pwd
|
| - user = pwd.getpwuid(p.uid).pw_name
|
| - self.assertEqual(p.username, user)
|
| + self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name)
|
| elif WINDOWS:
|
| expected_username = os.environ['USERNAME']
|
| expected_domain = os.environ['USERDOMAIN']
|
| @@ -493,27 +949,31 @@ class TestCase(unittest.TestCase):
|
| self.assertEqual(p.getcwd(), expected_dir)
|
|
|
| def test_get_open_files(self):
|
| - thisfile = os.path.join(os.getcwd(), __file__)
|
| -
|
| # current process
|
| p = psutil.Process(os.getpid())
|
| files = p.get_open_files()
|
| - self.assertFalse(thisfile in files)
|
| - f = open(thisfile, 'r')
|
| + self.assertFalse(TESTFN in files)
|
| + f = open(TESTFN, 'r')
|
| + time.sleep(.1)
|
| filenames = [x.path for x in p.get_open_files()]
|
| - self.assertTrue(thisfile in filenames)
|
| + self.assertTrue(TESTFN in filenames)
|
| f.close()
|
| for file in filenames:
|
| self.assertTrue(os.path.isfile(file))
|
|
|
| # another process
|
| - cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % thisfile
|
| + cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN
|
| sproc = get_test_subprocess([PYTHON, "-c", cmdline])
|
| wait_for_pid(sproc.pid)
|
| time.sleep(0.1)
|
| p = psutil.Process(sproc.pid)
|
| - filenames = [x.path for x in p.get_open_files()]
|
| - self.assertTrue(thisfile in filenames)
|
| + for x in range(100):
|
| + filenames = [x.path for x in p.get_open_files()]
|
| + if TESTFN in filenames:
|
| + break
|
| + time.sleep(.01)
|
| + else:
|
| + self.assertTrue(TESTFN in filenames)
|
| for file in filenames:
|
| self.assertTrue(os.path.isfile(file))
|
| # all processes
|
| @@ -528,7 +988,7 @@ class TestCase(unittest.TestCase):
|
|
|
| def test_get_open_files2(self):
|
| # test fd and path fields
|
| - fileobj = open(os.path.join(os.getcwd(), __file__), 'r')
|
| + fileobj = open(TESTFN, 'r')
|
| p = psutil.Process(os.getpid())
|
| for path, fd in p.get_open_files():
|
| if path == fileobj.name or fd == fileobj.fileno():
|
| @@ -557,10 +1017,13 @@ class TestCase(unittest.TestCase):
|
| "conn, addr = s.accept();" \
|
| "time.sleep(100);"
|
| sproc = get_test_subprocess([PYTHON, "-c", arg])
|
| - time.sleep(0.1)
|
| p = psutil.Process(sproc.pid)
|
| - cons = p.get_connections()
|
| - self.assertTrue(len(cons) == 1)
|
| + for x in range(100):
|
| + cons = p.get_connections()
|
| + if cons:
|
| + break
|
| + time.sleep(.01)
|
| + self.assertEqual(len(cons), 1)
|
| con = cons[0]
|
| self.assertEqual(con.family, socket.AF_INET)
|
| self.assertEqual(con.type, socket.SOCK_STREAM)
|
| @@ -580,10 +1043,20 @@ class TestCase(unittest.TestCase):
|
| self.assertEqual(con[4], con.remote_address)
|
| self.assertEqual(con[5], con.status)
|
|
|
| + @skipUnless(supports_ipv6())
|
| + def test_get_connections_ipv6(self):
|
| + s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
| + s.bind(('::1', 0))
|
| + s.listen(1)
|
| + cons = psutil.Process(os.getpid()).get_connections()
|
| + s.close()
|
| + self.assertEqual(len(cons), 1)
|
| + self.assertEqual(cons[0].local_address[0], '::1')
|
| +
|
| @skipUnless(hasattr(socket, "fromfd") and not WINDOWS)
|
| def test_connection_fromfd(self):
|
| sock = socket.socket()
|
| - sock.bind(('127.0.0.1', 0))
|
| + sock.bind(('localhost', 0))
|
| sock.listen(1)
|
| p = psutil.Process(os.getpid())
|
| for conn in p.get_connections():
|
| @@ -609,24 +1082,12 @@ class TestCase(unittest.TestCase):
|
| ip, port = addr
|
| self.assertTrue(isinstance(port, int))
|
| if family == socket.AF_INET:
|
| - ip = map(int, ip.split('.'))
|
| + ip = list(map(int, ip.split('.')))
|
| self.assertTrue(len(ip) == 4)
|
| for num in ip:
|
| self.assertTrue(0 <= num <= 255)
|
| self.assertTrue(0 <= port <= 65535)
|
|
|
| - def supports_ipv6():
|
| - if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
|
| - return False
|
| - try:
|
| - sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
| - sock.bind(("::1", 0))
|
| - except (socket.error, socket.gaierror):
|
| - return False
|
| - else:
|
| - sock.close()
|
| - return True
|
| -
|
| # all values are supposed to match Linux's tcp_states.h states
|
| # table across all platforms.
|
| valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
|
| @@ -665,9 +1126,9 @@ class TestCase(unittest.TestCase):
|
| tcp6_proc = None
|
| udp6_proc = None
|
|
|
| + # --- check connections of all processes
|
| +
|
| time.sleep(0.1)
|
| - this_proc = psutil.Process(os.getpid())
|
| - children_pids = [p.pid for p in this_proc.get_children()]
|
| for p in psutil.process_iter():
|
| try:
|
| cons = p.get_connections()
|
| @@ -693,49 +1154,58 @@ class TestCase(unittest.TestCase):
|
| s.close()
|
|
|
| if not WINDOWS and hasattr(socket, 'fromfd'):
|
| + dupsock = None
|
| try:
|
| - dupsock = socket.fromfd(conn.fd, conn.family,
|
| - conn.type)
|
| - except (socket.error, OSError), err:
|
| - if err.args[0] == errno.EBADF:
|
| - continue
|
| - else:
|
| - raise
|
| - self.assertEqual(dupsock.family, conn.family)
|
| - self.assertEqual(dupsock.type, conn.type)
|
| -
|
| - # check matches against subprocesses
|
| - if p.pid in children_pids:
|
| - self.assertTrue(len(cons) == 1)
|
| - conn = cons[0]
|
| - # TCP v4
|
| - if p.pid == tcp4_proc.pid:
|
| - self.assertEqual(conn.family, socket.AF_INET)
|
| - self.assertEqual(conn.type, socket.SOCK_STREAM)
|
| - self.assertEqual(conn.local_address[0], "127.0.0.1")
|
| - self.assertEqual(conn.remote_address, ())
|
| - self.assertEqual(conn.status, "LISTEN")
|
| - # UDP v4
|
| - elif p.pid == udp4_proc.pid:
|
| - self.assertEqual(conn.family, socket.AF_INET)
|
| - self.assertEqual(conn.type, socket.SOCK_DGRAM)
|
| - self.assertEqual(conn.local_address[0], "127.0.0.1")
|
| - self.assertEqual(conn.remote_address, ())
|
| - self.assertEqual(conn.status, "")
|
| - # TCP v6
|
| - elif p.pid == getattr(tcp6_proc, "pid", None):
|
| - self.assertEqual(conn.family, socket.AF_INET6)
|
| - self.assertEqual(conn.type, socket.SOCK_STREAM)
|
| - self.assertEqual(conn.local_address[0], "::1")
|
| - self.assertEqual(conn.remote_address, ())
|
| - self.assertEqual(conn.status, "LISTEN")
|
| - # UDP v6
|
| - elif p.pid == getattr(udp6_proc, "pid", None):
|
| - self.assertEqual(conn.family, socket.AF_INET6)
|
| - self.assertEqual(conn.type, socket.SOCK_DGRAM)
|
| - self.assertEqual(conn.local_address[0], "::1")
|
| - self.assertEqual(conn.remote_address, ())
|
| - self.assertEqual(conn.status, "")
|
| + try:
|
| + dupsock = socket.fromfd(conn.fd, conn.family,
|
| + conn.type)
|
| + except (socket.error, OSError):
|
| + err = sys.exc_info()[1]
|
| + if err.args[0] == errno.EBADF:
|
| + continue
|
| + else:
|
| + raise
|
| + # python >= 2.5
|
| + if hasattr(dupsock, "family"):
|
| + self.assertEqual(dupsock.family, conn.family)
|
| + self.assertEqual(dupsock.type, conn.type)
|
| + finally:
|
| + if dupsock is not None:
|
| + dupsock.close()
|
| +
|
| +
|
| + # --- check matches against subprocesses
|
| +
|
| + for p in psutil.Process(os.getpid()).get_children():
|
| + for conn in p.get_connections():
|
| + # TCP v4
|
| + if p.pid == tcp4_proc.pid:
|
| + self.assertEqual(conn.family, socket.AF_INET)
|
| + self.assertEqual(conn.type, socket.SOCK_STREAM)
|
| + self.assertEqual(conn.local_address[0], "127.0.0.1")
|
| + self.assertEqual(conn.remote_address, ())
|
| + self.assertEqual(conn.status, "LISTEN")
|
| + # UDP v4
|
| + elif p.pid == udp4_proc.pid:
|
| + self.assertEqual(conn.family, socket.AF_INET)
|
| + self.assertEqual(conn.type, socket.SOCK_DGRAM)
|
| + self.assertEqual(conn.local_address[0], "127.0.0.1")
|
| + self.assertEqual(conn.remote_address, ())
|
| + self.assertEqual(conn.status, "")
|
| + # TCP v6
|
| + elif p.pid == getattr(tcp6_proc, "pid", None):
|
| + self.assertEqual(conn.family, socket.AF_INET6)
|
| + self.assertEqual(conn.type, socket.SOCK_STREAM)
|
| + self.assertTrue(conn.local_address[0] in ("::", "::1"))
|
| + self.assertEqual(conn.remote_address, ())
|
| + self.assertEqual(conn.status, "LISTEN")
|
| + # UDP v6
|
| + elif p.pid == getattr(udp6_proc, "pid", None):
|
| + self.assertEqual(conn.family, socket.AF_INET6)
|
| + self.assertEqual(conn.type, socket.SOCK_DGRAM)
|
| + self.assertTrue(conn.local_address[0] in ("::", "::1"))
|
| + self.assertEqual(conn.remote_address, ())
|
| + self.assertEqual(conn.status, "")
|
|
|
| def test_parent_ppid(self):
|
| this_parent = os.getpid()
|
| @@ -762,68 +1232,11 @@ class TestCase(unittest.TestCase):
|
| sproc = get_test_subprocess()
|
| p = psutil.Process(sproc.pid)
|
| p.suspend()
|
| + time.sleep(0.1)
|
| + self.assertEqual(p.status, psutil.STATUS_STOPPED)
|
| + self.assertEqual(str(p.status), "stopped")
|
| p.resume()
|
| -
|
| - def test_get_pid_list(self):
|
| - plist = [x.pid for x in psutil.get_process_list()]
|
| - pidlist = psutil.get_pid_list()
|
| - self.assertEqual(plist.sort(), pidlist.sort())
|
| - # make sure every pid is unique
|
| - self.assertEqual(len(pidlist), len(set(pidlist)))
|
| -
|
| - def test_test(self):
|
| - # test for psutil.test() function
|
| - stdout = sys.stdout
|
| - sys.stdout = DEVNULL
|
| - try:
|
| - psutil.test()
|
| - finally:
|
| - sys.stdout = stdout
|
| -
|
| - def test_types(self):
|
| - sproc = get_test_subprocess()
|
| - wait_for_pid(sproc.pid)
|
| - p = psutil.Process(sproc.pid)
|
| - self.assert_(isinstance(p.pid, int))
|
| - self.assert_(isinstance(p.ppid, int))
|
| - self.assert_(isinstance(p.parent, psutil.Process))
|
| - self.assert_(isinstance(p.name, str))
|
| - if self.__class__.__name__ != "LimitedUserTestCase":
|
| - self.assert_(isinstance(p.exe, str))
|
| - self.assert_(isinstance(p.cmdline, list))
|
| - self.assert_(isinstance(p.uid, int))
|
| - self.assert_(isinstance(p.gid, int))
|
| - self.assert_(isinstance(p.create_time, float))
|
| - self.assert_(isinstance(p.username, (unicode, str)))
|
| - if hasattr(p, 'getcwd'):
|
| - if not POSIX and self.__class__.__name__ != "LimitedUserTestCase":
|
| - self.assert_(isinstance(p.getcwd(), str))
|
| - if not POSIX and self.__class__.__name__ != "LimitedUserTestCase":
|
| - self.assert_(isinstance(p.get_open_files(), list))
|
| - for path, fd in p.get_open_files():
|
| - self.assert_(isinstance(path, (unicode, str)))
|
| - self.assert_(isinstance(fd, int))
|
| - if not POSIX and self.__class__.__name__ != "LimitedUserTestCase" \
|
| - and SUPPORT_CONNECTIONS:
|
| - self.assert_(isinstance(p.get_connections(), list))
|
| - self.assert_(isinstance(p.is_running(), bool))
|
| - if not OSX or self.__class__.__name__ != "LimitedUserTestCase":
|
| - self.assert_(isinstance(p.get_cpu_times(), tuple))
|
| - self.assert_(isinstance(p.get_cpu_times()[0], float))
|
| - self.assert_(isinstance(p.get_cpu_times()[1], float))
|
| - self.assert_(isinstance(p.get_cpu_percent(0), float))
|
| - self.assert_(isinstance(p.get_memory_info(), tuple))
|
| - self.assert_(isinstance(p.get_memory_info()[0], int))
|
| - self.assert_(isinstance(p.get_memory_info()[1], int))
|
| - self.assert_(isinstance(p.get_memory_percent(), float))
|
| - self.assert_(isinstance(p.get_num_threads(), int))
|
| - self.assert_(isinstance(psutil.get_process_list(), list))
|
| - self.assert_(isinstance(psutil.get_process_list()[0], psutil.Process))
|
| - self.assert_(isinstance(psutil.process_iter(), types.GeneratorType))
|
| - self.assert_(isinstance(psutil.process_iter().next(), psutil.Process))
|
| - self.assert_(isinstance(psutil.get_pid_list(), list))
|
| - self.assert_(isinstance(psutil.get_pid_list()[0], int))
|
| - self.assert_(isinstance(psutil.pid_exists(1), bool))
|
| + self.assertTrue(p.status != psutil.STATUS_STOPPED)
|
|
|
| def test_invalid_pid(self):
|
| self.assertRaises(ValueError, psutil.Process, "1")
|
| @@ -832,7 +1245,7 @@ class TestCase(unittest.TestCase):
|
| self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1)
|
|
|
| def test_zombie_process(self):
|
| - # Test that NoSuchProcess exception gets raised in the event the
|
| + # Test that NoSuchProcess exception gets raised in case the
|
| # process dies after we create the Process object.
|
| # Example:
|
| # >>> proc = Process(1234)
|
| @@ -842,103 +1255,126 @@ class TestCase(unittest.TestCase):
|
| sproc = get_test_subprocess()
|
| p = psutil.Process(sproc.pid)
|
| p.kill()
|
| - sproc.wait()
|
| -
|
| - self.assertRaises(psutil.NoSuchProcess, getattr, p, "ppid")
|
| - self.assertRaises(psutil.NoSuchProcess, getattr, p, "parent")
|
| - self.assertRaises(psutil.NoSuchProcess, getattr, p, "name")
|
| - self.assertRaises(psutil.NoSuchProcess, getattr, p, "exe")
|
| - self.assertRaises(psutil.NoSuchProcess, getattr, p, "cmdline")
|
| - self.assertRaises(psutil.NoSuchProcess, getattr, p, "uid")
|
| - self.assertRaises(psutil.NoSuchProcess, getattr, p, "gid")
|
| - self.assertRaises(psutil.NoSuchProcess, getattr, p, "create_time")
|
| - self.assertRaises(psutil.NoSuchProcess, getattr, p, "username")
|
| - if hasattr(p, 'getcwd'):
|
| - self.assertRaises(psutil.NoSuchProcess, p.getcwd)
|
| - self.assertRaises(psutil.NoSuchProcess, p.get_open_files)
|
| - self.assertRaises(psutil.NoSuchProcess, p.get_connections)
|
| - self.assertRaises(psutil.NoSuchProcess, p.suspend)
|
| - self.assertRaises(psutil.NoSuchProcess, p.resume)
|
| - self.assertRaises(psutil.NoSuchProcess, p.kill)
|
| - self.assertRaises(psutil.NoSuchProcess, p.terminate)
|
| + p.wait()
|
| +
|
| + for name in dir(p):
|
| + if name.startswith('_')\
|
| + or name in ('pid', 'send_signal', 'is_running', 'set_ionice',
|
| + 'wait'):
|
| + continue
|
| + try:
|
| + meth = getattr(p, name)
|
| + if callable(meth):
|
| + meth()
|
| + except psutil.NoSuchProcess:
|
| + pass
|
| + else:
|
| + self.fail("NoSuchProcess exception not raised for %r" % name)
|
| +
|
| + # other methods
|
| + try:
|
| + if os.name == 'posix':
|
| + p.nice = 1
|
| + else:
|
| + p.nice = psutil.NORMAL_PRIORITY_CLASS
|
| + except psutil.NoSuchProcess:
|
| + pass
|
| + else:
|
| + self.fail("exception not raised")
|
| + if hasattr(p, 'set_ionice'):
|
| + self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2)
|
| self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
|
| - self.assertRaises(psutil.NoSuchProcess, p.get_cpu_times)
|
| - self.assertRaises(psutil.NoSuchProcess, p.get_cpu_percent, 0)
|
| - self.assertRaises(psutil.NoSuchProcess, p.get_memory_info)
|
| - self.assertRaises(psutil.NoSuchProcess, p.get_memory_percent)
|
| - self.assertRaises(psutil.NoSuchProcess, p.get_children)
|
| - self.assertRaises(psutil.NoSuchProcess, p.get_num_threads)
|
| self.assertFalse(p.is_running())
|
|
|
| def test__str__(self):
|
| sproc = get_test_subprocess()
|
| p = psutil.Process(sproc.pid)
|
| self.assertTrue(str(sproc.pid) in str(p))
|
| - self.assertTrue(os.path.basename(PYTHON) in str(p))
|
| + # python shows up as 'Python' in cmdline on OS X so test fails on OS X
|
| + if not OSX:
|
| + self.assertTrue(os.path.basename(PYTHON) in str(p))
|
| sproc = get_test_subprocess()
|
| p = psutil.Process(sproc.pid)
|
| p.kill()
|
| - sproc.wait()
|
| + p.wait()
|
| self.assertTrue(str(sproc.pid) in str(p))
|
| self.assertTrue("terminated" in str(p))
|
|
|
| def test_fetch_all(self):
|
| valid_procs = 0
|
| - attrs = ['__str__', 'create_time', 'username', 'getcwd', 'get_cpu_times',
|
| - 'get_memory_info', 'get_memory_percent', 'get_open_files',
|
| - 'get_num_threads']
|
| + excluded_names = ['send_signal', 'suspend', 'resume', 'terminate',
|
| + 'kill', 'wait']
|
| + excluded_names += ['get_cpu_percent', 'get_children']
|
| + # XXX - skip slow lsof implementation;
|
| + if BSD:
|
| + excluded_names += ['get_open_files', 'get_connections']
|
| + if OSX:
|
| + excluded_names += ['get_connections']
|
| + attrs = []
|
| + for name in dir(psutil.Process):
|
| + if name.startswith("_"):
|
| + continue
|
| + if name.startswith("set_"):
|
| + continue
|
| + if name in excluded_names:
|
| + continue
|
| + attrs.append(name)
|
| +
|
| for p in psutil.process_iter():
|
| - for attr in attrs:
|
| - # skip slow Python implementation; we're reasonably sure
|
| - # it works anyway
|
| - if POSIX and attr == 'get_open_files':
|
| - continue
|
| + for name in attrs:
|
| try:
|
| - attr = getattr(p, attr, None)
|
| - if attr is not None and callable(attr):
|
| - attr()
|
| - valid_procs += 1
|
| - except (psutil.NoSuchProcess, psutil.AccessDenied), err:
|
| - self.assertEqual(err.pid, p.pid)
|
| - if err.name:
|
| - self.assertEqual(err.name, p.name)
|
| - self.assertTrue(str(err))
|
| - self.assertTrue(err.msg)
|
| - except:
|
| + try:
|
| + attr = getattr(p, name, None)
|
| + if attr is not None and callable(attr):
|
| + ret = attr()
|
| + else:
|
| + ret = attr
|
| + valid_procs += 1
|
| + except (psutil.NoSuchProcess, psutil.AccessDenied):
|
| + err = sys.exc_info()[1]
|
| + self.assertEqual(err.pid, p.pid)
|
| + if err.name:
|
| + self.assertEqual(err.name, p.name)
|
| + self.assertTrue(str(err))
|
| + self.assertTrue(err.msg)
|
| + else:
|
| + if name == 'parent' or ret in (0, 0.0, [], None):
|
| + continue
|
| + self.assertTrue(ret)
|
| + if name == "exe":
|
| + self.assertTrue(os.path.isfile(ret))
|
| + elif name == "getcwd":
|
| + # XXX - temporary fix; on my Linux box
|
| + # chrome process cws is errnously reported
|
| + # as /proc/4144/fdinfo whichd doesn't exist
|
| + if 'chrome' in p.name:
|
| + continue
|
| + self.assertTrue(os.path.isdir(ret))
|
| + except Exception:
|
| + err = sys.exc_info()[1]
|
| trace = traceback.format_exc()
|
| - self.fail('Exception raised for method %s, pid %s:\n%s'
|
| - %(attr, p.pid, trace))
|
| + self.fail('%s\nmethod=%s, pid=%s, retvalue=%s'
|
| + %(trace, name, p.pid, repr(ret)))
|
|
|
| # we should always have a non-empty list, not including PID 0 etc.
|
| # special cases.
|
| self.assertTrue(valid_procs > 0)
|
|
|
| + @skipIf(LINUX)
|
| def test_pid_0(self):
|
| - # Process(0) is supposed to work on all platforms even if with
|
| - # some differences
|
| + # Process(0) is supposed to work on all platforms except Linux
|
| p = psutil.Process(0)
|
| - if WINDOWS:
|
| - self.assertEqual(p.name, 'System Idle Process')
|
| - elif LINUX:
|
| - self.assertEqual(p.name, 'sched')
|
| - elif BSD:
|
| - self.assertEqual(p.name, 'swapper')
|
| - elif OSX:
|
| - self.assertEqual(p.name, 'kernel_task')
|
| + self.assertTrue(p.name)
|
|
|
| if os.name == 'posix':
|
| - self.assertEqual(p.uid, 0)
|
| - self.assertEqual(p.gid, 0)
|
| - else:
|
| - self.assertEqual(p.uid, -1)
|
| - self.assertEqual(p.gid, -1)
|
| + self.assertEqual(p.uids.real, 0)
|
| + self.assertEqual(p.gids.real, 0)
|
|
|
| self.assertTrue(p.ppid in (0, 1))
|
| - self.assertEqual(p.exe, "")
|
| + #self.assertEqual(p.exe, "")
|
| self.assertEqual(p.cmdline, [])
|
| - # this can either raise AD (Win) or return 0 (UNIX)
|
| try:
|
| - self.assertTrue(p.get_num_threads() in (0, 1))
|
| + p.get_num_threads()
|
| except psutil.AccessDenied:
|
| pass
|
|
|
| @@ -956,11 +1392,22 @@ class TestCase(unittest.TestCase):
|
| else:
|
| p.username
|
|
|
| - # PID 0 is supposed to be available on all platforms
|
| self.assertTrue(0 in psutil.get_pid_list())
|
| self.assertTrue(psutil.pid_exists(0))
|
|
|
| - # --- OS specific tests
|
| + def test_Popen(self):
|
| + # Popen class test
|
| + cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
|
| + proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
| + try:
|
| + proc.name
|
| + proc.stdin
|
| + self.assertTrue(hasattr(proc, 'name'))
|
| + self.assertTrue(hasattr(proc, 'stdin'))
|
| + self.assertRaises(AttributeError, getattr, proc, 'foo')
|
| + finally:
|
| + proc.kill()
|
| + proc.wait()
|
|
|
|
|
| if hasattr(os, 'getuid'):
|
| @@ -973,6 +1420,19 @@ if hasattr(os, 'getuid'):
|
| PROCESS_UID = os.getuid()
|
| PROCESS_GID = os.getgid()
|
|
|
| + def __init__(self, *args, **kwargs):
|
| + TestCase.__init__(self, *args, **kwargs)
|
| + # re-define all existent test methods in order to
|
| + # ignore AccessDenied exceptions
|
| + for attr in [x for x in dir(self) if x.startswith('test')]:
|
| + meth = getattr(self, attr)
|
| + def test_(self):
|
| + try:
|
| + meth()
|
| + except psutil.AccessDenied:
|
| + pass
|
| + setattr(self, attr, types.MethodType(test_, self))
|
| +
|
| def setUp(self):
|
| os.setegid(1000)
|
| os.seteuid(1000)
|
| @@ -983,58 +1443,20 @@ if hasattr(os, 'getuid'):
|
| os.seteuid(self.PROCESS_GID)
|
| TestCase.tearDown(self)
|
|
|
| - def test_path(self):
|
| - # DeprecationWarning is only raised once
|
| - pass
|
| -
|
| - # overridden tests known to raise AccessDenied when run
|
| - # as limited user on different platforms
|
| -
|
| - if LINUX:
|
| -
|
| - def test_getcwd(self):
|
| - self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd, self)
|
| -
|
| - def test_getcwd_2(self):
|
| - self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd_2, self)
|
| -
|
| - def test_get_open_files(self):
|
| - self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self)
|
| -
|
| - def test_get_connections(self):
|
| + def test_nice(self):
|
| + try:
|
| + psutil.Process(os.getpid()).nice = -1
|
| + except psutil.AccessDenied:
|
| pass
|
| -
|
| - def test_exe(self):
|
| - self.assertRaises(psutil.AccessDenied, TestCase.test_exe, self)
|
| -
|
| - if BSD:
|
| -
|
| - def test_get_open_files(self):
|
| - self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self)
|
| -
|
| - def test_get_open_files2(self):
|
| - self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self)
|
| -
|
| - def test_get_connections(self):
|
| - self.assertRaises(psutil.AccessDenied, TestCase.test_get_connections, self)
|
| -
|
| - def test_connection_fromfd(self):
|
| - self.assertRaises(psutil.AccessDenied, TestCase.test_connection_fromfd, self)
|
| + else:
|
| + self.fail("exception not raised")
|
|
|
|
|
| def test_main():
|
| tests = []
|
| test_suite = unittest.TestSuite()
|
| -
|
| tests.append(TestCase)
|
|
|
| - if hasattr(os, 'getuid'):
|
| - if os.getuid() == 0:
|
| - tests.append(LimitedUserTestCase)
|
| - else:
|
| - atexit.register(warnings.warn, "Couldn't run limited user tests ("
|
| - "super-user privileges are required)", RuntimeWarning)
|
| -
|
| if POSIX:
|
| from _posix import PosixSpecificTestCase
|
| tests.append(PosixSpecificTestCase)
|
| @@ -1050,13 +1472,22 @@ def test_main():
|
| from _bsd import BSDSpecificTestCase as stc
|
| tests.append(stc)
|
|
|
| + if hasattr(os, 'getuid'):
|
| + if os.getuid() == 0:
|
| + tests.append(LimitedUserTestCase)
|
| + else:
|
| + atexit.register(warnings.warn, "Couldn't run limited user tests ("
|
| + "super-user privileges are required)", RuntimeWarning)
|
| +
|
| for test_class in tests:
|
| test_suite.addTest(unittest.makeSuite(test_class))
|
|
|
| + f = open(TESTFN, 'w')
|
| + f.close()
|
| + atexit.register(lambda: os.remove(TESTFN))
|
| +
|
| unittest.TextTestRunner(verbosity=2).run(test_suite)
|
| DEVNULL.close()
|
|
|
| if __name__ == '__main__':
|
| test_main()
|
| -
|
| -
|
|
|