| Index: third_party/psutil/test/test_psutil.py
|
| diff --git a/third_party/psutil/test/test_psutil.py b/third_party/psutil/test/test_psutil.py
|
| deleted file mode 100644
|
| index 35ed7442888106df0de7511e210a60fa4e1895bf..0000000000000000000000000000000000000000
|
| --- a/third_party/psutil/test/test_psutil.py
|
| +++ /dev/null
|
| @@ -1,1493 +0,0 @@
|
| -#!/usr/bin/env python
|
| -#
|
| -# $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $
|
| -#
|
| -# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -"""
|
| -psutil test suite.
|
| -
|
| -Note: this is targeted for both python 2.x and 3.x so there's no need
|
| -to use 2to3 tool first.
|
| -"""
|
| -
|
| -import unittest
|
| -import os
|
| -import sys
|
| -import subprocess
|
| -import time
|
| -import signal
|
| -import types
|
| -import traceback
|
| -import socket
|
| -import warnings
|
| -import atexit
|
| -import errno
|
| -import threading
|
| -import tempfile
|
| -import collections
|
| -
|
| -import psutil
|
| -
|
| -
|
| -PYTHON = os.path.realpath(sys.executable)
|
| -DEVNULL = open(os.devnull, 'r+')
|
| -TESTFN = os.path.join(os.getcwd(), "$testfile")
|
| -PY3 = sys.version_info >= (3,)
|
| -POSIX = os.name == 'posix'
|
| -LINUX = sys.platform.lower().startswith("linux")
|
| -WINDOWS = sys.platform.lower().startswith("win32")
|
| -OSX = sys.platform.lower().startswith("darwin")
|
| -BSD = sys.platform.lower().startswith("freebsd")
|
| -
|
| -try:
|
| - psutil.Process(os.getpid()).get_connections()
|
| -except NotImplementedError:
|
| - err = sys.exc_info()[1]
|
| - SUPPORT_CONNECTIONS = False
|
| - atexit.register(warnings.warn, "get_connections() not supported on this platform",
|
| - RuntimeWarning)
|
| -else:
|
| - SUPPORT_CONNECTIONS = True
|
| -
|
| -if PY3:
|
| - long = int
|
| -
|
| - def callable(attr):
|
| - return isinstance(attr, collections.Callable)
|
| -
|
| -
|
| -_subprocesses_started = set()
|
| -
|
| -def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None):
|
| - """Return a subprocess.Popen object to use in tests.
|
| - By default stdout and stderr are redirected to /dev/null and the
|
| - python interpreter is used as test process.
|
| - """
|
| - if cmd is None:
|
| - cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
|
| - sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin)
|
| - _subprocesses_started.add(sproc.pid)
|
| - return sproc
|
| -
|
| -def sh(cmdline):
|
| - """run cmd in a subprocess and return its output.
|
| - raises RuntimeError on error.
|
| - """
|
| - p = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE,
|
| - stderr=subprocess.PIPE)
|
| - stdout, stderr = p.communicate()
|
| - if p.returncode != 0:
|
| - raise RuntimeError(stderr)
|
| - if stderr:
|
| - warnings.warn(stderr, RuntimeWarning)
|
| - if sys.version_info >= (3,):
|
| - stdout = str(stdout, sys.stdout.encoding)
|
| - return stdout.strip()
|
| -
|
| -def wait_for_pid(pid, timeout=1):
|
| - """Wait for pid to show up in the process list then return.
|
| - Used in the test suite to give time the sub process to initialize.
|
| - """
|
| - raise_at = time.time() + timeout
|
| - while 1:
|
| - if pid in psutil.get_pid_list():
|
| - # give it one more iteration to allow full initialization
|
| - time.sleep(0.01)
|
| - return
|
| - time.sleep(0.0001)
|
| - if time.time() >= raise_at:
|
| - raise RuntimeError("Timed out")
|
| -
|
| -def reap_children(search_all=False):
|
| - """Kill any subprocess started by this test suite and ensure that
|
| - no zombies stick around to hog resources and create problems when
|
| - looking for refleaks.
|
| - """
|
| - if search_all:
|
| - this_process = psutil.Process(os.getpid())
|
| - pids = [x.pid for x in this_process.get_children()]
|
| - else:
|
| - pids =_subprocesses_started
|
| - while pids:
|
| - pid = pids.pop()
|
| - try:
|
| - child = psutil.Process(pid)
|
| - child.kill()
|
| - except psutil.NoSuchProcess:
|
| - pass
|
| - else:
|
| - child.wait()
|
| -
|
| -
|
| -# we want to search through all processes before exiting
|
| -atexit.register(reap_children, search_all=True)
|
| -
|
| -def skipIf(condition, reason="", warn=False):
|
| - """Decorator which skip a test under if condition is satisfied.
|
| - This is a substitute of unittest.skipIf which is available
|
| - only in python 2.7 and 3.2.
|
| - If 'reason' argument is provided this will be printed during
|
| - tests execution.
|
| - If 'warn' is provided a RuntimeWarning will be shown when all
|
| - tests are run.
|
| - """
|
| - def outer(fun, *args, **kwargs):
|
| - def inner(self):
|
| - if condition:
|
| - sys.stdout.write("skipped-")
|
| - sys.stdout.flush()
|
| - if warn:
|
| - objname = "%s.%s" % (self.__class__.__name__, fun.__name__)
|
| - msg = "%s was skipped" % objname
|
| - if reason:
|
| - msg += "; reason: " + repr(reason)
|
| - atexit.register(warnings.warn, msg, RuntimeWarning)
|
| - return
|
| - else:
|
| - return fun(self, *args, **kwargs)
|
| - return inner
|
| - return outer
|
| -
|
| -def skipUnless(condition, reason="", warn=False):
|
| - """Contrary of skipIf."""
|
| - if not condition:
|
| - return skipIf(True, reason, warn)
|
| - return skipIf(False)
|
| -
|
| -def ignore_access_denied(fun):
|
| - """Decorator to Ignore AccessDenied exceptions."""
|
| - def outer(fun, *args, **kwargs):
|
| - def inner(self):
|
| - try:
|
| - return fun(self, *args, **kwargs)
|
| - except psutil.AccessDenied:
|
| - pass
|
| - return inner
|
| - return outer
|
| -
|
| -def supports_ipv6():
|
| - """Return True if IPv6 is supported on this platform."""
|
| - if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
|
| - return False
|
| - sock = None
|
| - try:
|
| - try:
|
| - sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
| - sock.bind(("::1", 0))
|
| - except (socket.error, socket.gaierror):
|
| - return False
|
| - else:
|
| - return True
|
| - finally:
|
| - if sock is not None:
|
| - sock.close()
|
| -
|
| -
|
| -class ThreadTask(threading.Thread):
|
| - """A thread object used for running process thread tests."""
|
| -
|
| - def __init__(self):
|
| - threading.Thread.__init__(self)
|
| - self._running = False
|
| - self._interval = None
|
| - self._flag = threading.Event()
|
| -
|
| - def __repr__(self):
|
| - name = self.__class__.__name__
|
| - return '<%s running=%s at %#x>' % (name, self._running, id(self))
|
| -
|
| - def start(self, interval=0.001):
|
| - """Start thread and keep it running until an explicit
|
| - stop() request. Polls for shutdown every 'timeout' seconds.
|
| - """
|
| - if self._running:
|
| - raise ValueError("already started")
|
| - self._interval = interval
|
| - threading.Thread.start(self)
|
| - self._flag.wait()
|
| -
|
| - def run(self):
|
| - self._running = True
|
| - self._flag.set()
|
| - while self._running:
|
| - time.sleep(self._interval)
|
| -
|
| - def stop(self):
|
| - """Stop thread execution and and waits until it is stopped."""
|
| - if not self._running:
|
| - raise ValueError("already stopped")
|
| - self._running = False
|
| - self.join()
|
| -
|
| -
|
| -class TestCase(unittest.TestCase):
|
| -
|
| - def tearDown(self):
|
| - reap_children()
|
| -
|
| - # ============================
|
| - # tests for system-related API
|
| - # ============================
|
| -
|
| - def test_get_process_list(self):
|
| - pids = [x.pid for x in psutil.get_process_list()]
|
| - self.assertTrue(os.getpid() in pids)
|
| -
|
| - def test_process_iter(self):
|
| - pids = [x.pid for x in psutil.process_iter()]
|
| - self.assertTrue(os.getpid() in pids)
|
| -
|
| - def test_TOTAL_PHYMEM(self):
|
| - x = psutil.TOTAL_PHYMEM
|
| - self.assertTrue(isinstance(x, (int, long)))
|
| - self.assertTrue(x > 0)
|
| -
|
| - def test_BOOT_TIME(self):
|
| - x = psutil.BOOT_TIME
|
| - self.assertTrue(isinstance(x, float))
|
| - self.assertTrue(x > 0)
|
| -
|
| - def test_deprecated_memory_functions(self):
|
| - warnings.filterwarnings("error")
|
| - try:
|
| - self.assertRaises(DeprecationWarning, psutil.used_phymem)
|
| - self.assertRaises(DeprecationWarning, psutil.avail_phymem)
|
| - self.assertRaises(DeprecationWarning, psutil.total_virtmem)
|
| - self.assertRaises(DeprecationWarning, psutil.used_virtmem)
|
| - self.assertRaises(DeprecationWarning, psutil.avail_virtmem)
|
| - finally:
|
| - warnings.resetwarnings()
|
| -
|
| - def test_phymem_usage(self):
|
| - mem = psutil.phymem_usage()
|
| - self.assertTrue(mem.total > 0)
|
| - self.assertTrue(mem.used > 0)
|
| - self.assertTrue(mem.free > 0)
|
| - self.assertTrue(0 <= mem.percent <= 100)
|
| -
|
| - def test_virtmem_usage(self):
|
| - mem = psutil.virtmem_usage()
|
| - self.assertTrue(mem.total > 0)
|
| - self.assertTrue(mem.used >= 0)
|
| - self.assertTrue(mem.free > 0)
|
| - self.assertTrue(0 <= mem.percent <= 100)
|
| -
|
| - @skipUnless(LINUX)
|
| - def test_phymem_buffers(self):
|
| - x = psutil.phymem_buffers()
|
| - self.assertTrue(isinstance(x, (int, long)))
|
| - self.assertTrue(x >= 0)
|
| -
|
| - def test_pid_exists(self):
|
| - sproc = get_test_subprocess()
|
| - wait_for_pid(sproc.pid)
|
| - self.assertTrue(psutil.pid_exists(sproc.pid))
|
| - p = psutil.Process(sproc.pid)
|
| - p.kill()
|
| - p.wait()
|
| - self.assertFalse(psutil.pid_exists(sproc.pid))
|
| - self.assertFalse(psutil.pid_exists(-1))
|
| -
|
| - def test_pid_exists_2(self):
|
| - reap_children()
|
| - pids = psutil.get_pid_list()
|
| - for pid in pids:
|
| - try:
|
| - self.assertTrue(psutil.pid_exists(pid))
|
| - except AssertionError:
|
| - # in case the process disappeared in meantime fail only
|
| - # if it is no longer in get_pid_list()
|
| - time.sleep(.1)
|
| - if pid in psutil.get_pid_list():
|
| - self.fail(pid)
|
| - pids = range(max(pids) + 5000, max(pids) + 6000)
|
| - for pid in pids:
|
| - self.assertFalse(psutil.pid_exists(pid))
|
| -
|
| - def test_get_pid_list(self):
|
| - plist = [x.pid for x in psutil.get_process_list()]
|
| - pidlist = psutil.get_pid_list()
|
| - self.assertEqual(plist.sort(), pidlist.sort())
|
| - # make sure every pid is unique
|
| - self.assertEqual(len(pidlist), len(set(pidlist)))
|
| -
|
| - def test_test(self):
|
| - # test for psutil.test() function
|
| - stdout = sys.stdout
|
| - sys.stdout = DEVNULL
|
| - try:
|
| - psutil.test()
|
| - finally:
|
| - sys.stdout = stdout
|
| -
|
| - def test_sys_cpu_times(self):
|
| - total = 0
|
| - times = psutil.cpu_times()
|
| - sum(times)
|
| - for cp_time in times:
|
| - self.assertTrue(isinstance(cp_time, float))
|
| - self.assertTrue(cp_time >= 0.0)
|
| - total += cp_time
|
| - self.assertEqual(total, sum(times))
|
| - str(times)
|
| -
|
| - def test_sys_cpu_times2(self):
|
| - t1 = sum(psutil.cpu_times())
|
| - time.sleep(0.1)
|
| - t2 = sum(psutil.cpu_times())
|
| - difference = t2 - t1
|
| - if not difference >= 0.05:
|
| - self.fail("difference %s" % difference)
|
| -
|
| - def test_sys_per_cpu_times(self):
|
| - for times in psutil.cpu_times(percpu=True):
|
| - total = 0
|
| - sum(times)
|
| - for cp_time in times:
|
| - self.assertTrue(isinstance(cp_time, float))
|
| - self.assertTrue(cp_time >= 0.0)
|
| - total += cp_time
|
| - self.assertEqual(total, sum(times))
|
| - str(times)
|
| -
|
| - def test_sys_per_cpu_times2(self):
|
| - tot1 = psutil.cpu_times(percpu=True)
|
| - stop_at = time.time() + 0.1
|
| - while 1:
|
| - if time.time() >= stop_at:
|
| - break
|
| - tot2 = psutil.cpu_times(percpu=True)
|
| - for t1, t2 in zip(tot1, tot2):
|
| - t1, t2 = sum(t1), sum(t2)
|
| - difference = t2 - t1
|
| - if difference >= 0.05:
|
| - return
|
| - self.fail()
|
| -
|
| - def test_sys_cpu_percent(self):
|
| - psutil.cpu_percent(interval=0.001)
|
| - psutil.cpu_percent(interval=0.001)
|
| - for x in range(1000):
|
| - percent = psutil.cpu_percent(interval=None)
|
| - self.assertTrue(isinstance(percent, float))
|
| - self.assertTrue(percent >= 0.0)
|
| - self.assertTrue(percent <= 100.0)
|
| -
|
| - def test_sys_per_cpu_percent(self):
|
| - psutil.cpu_percent(interval=0.001, percpu=True)
|
| - psutil.cpu_percent(interval=0.001, percpu=True)
|
| - for x in range(1000):
|
| - percents = psutil.cpu_percent(interval=None, percpu=True)
|
| - for percent in percents:
|
| - self.assertTrue(isinstance(percent, float))
|
| - self.assertTrue(percent >= 0.0)
|
| - self.assertTrue(percent <= 100.0)
|
| -
|
| - def test_sys_cpu_percent_compare(self):
|
| - psutil.cpu_percent(interval=0)
|
| - psutil.cpu_percent(interval=0, percpu=True)
|
| - time.sleep(.1)
|
| - t1 = psutil.cpu_percent(interval=0)
|
| - t2 = psutil.cpu_percent(interval=0, percpu=True)
|
| - # calculate total average
|
| - t2 = sum(t2) / len(t2)
|
| - if abs(t1 - t2) > 5:
|
| - self.assertEqual(t1, t2)
|
| -
|
| - def test_disk_usage(self):
|
| - usage = psutil.disk_usage(os.getcwd())
|
| - self.assertTrue(usage.total > 0)
|
| - self.assertTrue(usage.used > 0)
|
| - self.assertTrue(usage.free > 0)
|
| - self.assertTrue(usage.total > usage.used)
|
| - self.assertTrue(usage.total > usage.free)
|
| - self.assertTrue(0 <= usage.percent <= 100)
|
| -
|
| - # if path does not exist OSError ENOENT is expected across
|
| - # all platforms
|
| - fname = tempfile.mktemp()
|
| - try:
|
| - psutil.disk_usage(fname)
|
| - except OSError:
|
| - err = sys.exc_info()[1]
|
| - if err.args[0] != errno.ENOENT:
|
| - raise
|
| - else:
|
| - self.fail("OSError not raised")
|
| -
|
| - def test_disk_partitions(self):
|
| - for disk in psutil.disk_partitions(all=False):
|
| - self.assertTrue(os.path.exists(disk.device))
|
| - self.assertTrue(os.path.isdir(disk.mountpoint))
|
| - self.assertTrue(disk.fstype)
|
| - for disk in psutil.disk_partitions(all=True):
|
| - if not WINDOWS:
|
| - self.assertTrue(os.path.isdir(disk.mountpoint))
|
| - self.assertTrue(disk.fstype)
|
| -
|
| - def find_mount_point(path):
|
| - path = os.path.abspath(path)
|
| - while not os.path.ismount(path):
|
| - path = os.path.dirname(path)
|
| - return path
|
| -
|
| - mount = find_mount_point(__file__)
|
| - mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
|
| - self.assertTrue(mount in mounts)
|
| - psutil.disk_usage(mount)
|
| -
|
| - # XXX
|
| - @skipUnless(hasattr(psutil, "network_io_counters"))
|
| - def test_anetwork_io_counters(self):
|
| - def check_ntuple(nt):
|
| - self.assertEqual(nt[0], nt.bytes_sent)
|
| - self.assertEqual(nt[1], nt.bytes_recv)
|
| - self.assertEqual(nt[2], nt.packets_sent)
|
| - self.assertEqual(nt[3], nt.packets_recv)
|
| - self.assertTrue(nt.bytes_sent >= 0)
|
| - self.assertTrue(nt.bytes_recv >= 0)
|
| - self.assertTrue(nt.packets_sent >= 0)
|
| - self.assertTrue(nt.packets_recv >= 0)
|
| -
|
| - ret = psutil.network_io_counters(pernic=False)
|
| - check_ntuple(ret)
|
| - ret = psutil.network_io_counters(pernic=True)
|
| - for name, ntuple in ret.iteritems():
|
| - self.assertTrue(name)
|
| - check_ntuple(ntuple)
|
| - # XXX
|
| - @skipUnless(hasattr(psutil, "disk_io_counters"))
|
| - def test_disk_io_counters(self):
|
| - def check_ntuple(nt):
|
| - self.assertEqual(nt[0], nt.read_count)
|
| - self.assertEqual(nt[1], nt.write_count)
|
| - self.assertEqual(nt[2], nt.read_bytes)
|
| - self.assertEqual(nt[3], nt.write_bytes)
|
| - self.assertEqual(nt[4], nt.read_time)
|
| - self.assertEqual(nt[5], nt.write_time)
|
| - self.assertTrue(nt.read_count >= 0)
|
| - self.assertTrue(nt.write_count >= 0)
|
| - self.assertTrue(nt.read_bytes >= 0)
|
| - self.assertTrue(nt.write_bytes >= 0)
|
| - self.assertTrue(nt.read_time >= 0)
|
| - self.assertTrue(nt.write_time >= 0)
|
| -
|
| - ret = psutil.disk_io_counters(perdisk=False)
|
| - check_ntuple(ret)
|
| - ret = psutil.disk_io_counters(perdisk=True)
|
| - for name, ntuple in ret.iteritems():
|
| - self.assertTrue(name)
|
| - check_ntuple(ntuple)
|
| -
|
| - # ====================
|
| - # Process object tests
|
| - # ====================
|
| -
|
| - def test_kill(self):
|
| - sproc = get_test_subprocess()
|
| - test_pid = sproc.pid
|
| - wait_for_pid(test_pid)
|
| - p = psutil.Process(test_pid)
|
| - name = p.name
|
| - p.kill()
|
| - p.wait()
|
| - self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| -
|
| - def test_terminate(self):
|
| - sproc = get_test_subprocess()
|
| - test_pid = sproc.pid
|
| - wait_for_pid(test_pid)
|
| - p = psutil.Process(test_pid)
|
| - name = p.name
|
| - p.terminate()
|
| - p.wait()
|
| - self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| -
|
| - def test_send_signal(self):
|
| - if POSIX:
|
| - sig = signal.SIGKILL
|
| - else:
|
| - sig = signal.SIGTERM
|
| - sproc = get_test_subprocess()
|
| - test_pid = sproc.pid
|
| - p = psutil.Process(test_pid)
|
| - name = p.name
|
| - p.send_signal(sig)
|
| - p.wait()
|
| - self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| -
|
| - def test_wait(self):
|
| - # check exit code signal
|
| - sproc = get_test_subprocess()
|
| - p = psutil.Process(sproc.pid)
|
| - p.kill()
|
| - code = p.wait()
|
| - if os.name == 'posix':
|
| - self.assertEqual(code, signal.SIGKILL)
|
| - else:
|
| - self.assertEqual(code, 0)
|
| - self.assertFalse(p.is_running())
|
| -
|
| - sproc = get_test_subprocess()
|
| - p = psutil.Process(sproc.pid)
|
| - p.terminate()
|
| - code = p.wait()
|
| - if os.name == 'posix':
|
| - self.assertEqual(code, signal.SIGTERM)
|
| - else:
|
| - self.assertEqual(code, 0)
|
| - self.assertFalse(p.is_running())
|
| -
|
| - # check sys.exit() code
|
| - code = "import time, sys; time.sleep(0.01); sys.exit(5);"
|
| - sproc = get_test_subprocess([PYTHON, "-c", code])
|
| - p = psutil.Process(sproc.pid)
|
| - self.assertEqual(p.wait(), 5)
|
| - self.assertFalse(p.is_running())
|
| -
|
| - # Test wait() issued twice.
|
| - # It is not supposed to raise NSP when the process is gone.
|
| - # On UNIX this should return None, on Windows it should keep
|
| - # returning the exit code.
|
| - sproc = get_test_subprocess([PYTHON, "-c", code])
|
| - p = psutil.Process(sproc.pid)
|
| - self.assertEqual(p.wait(), 5)
|
| - self.assertTrue(p.wait() in (5, None))
|
| -
|
| - # test timeout
|
| - sproc = get_test_subprocess()
|
| - p = psutil.Process(sproc.pid)
|
| - p.name
|
| - self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
|
| -
|
| - # timeout < 0 not allowed
|
| - self.assertRaises(ValueError, p.wait, -1)
|
| -
|
| - @skipUnless(POSIX)
|
| - def test_wait_non_children(self):
|
| - # test wait() against processes which are not our children
|
| - code = "import sys;"
|
| - code += "from subprocess import Popen, PIPE;"
|
| - code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON
|
| - code += "sp = Popen(cmd, stdout=PIPE);"
|
| - code += "sys.stdout.write(str(sp.pid));"
|
| - sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE)
|
| -
|
| - grandson_pid = int(sproc.stdout.read())
|
| - grandson_proc = psutil.Process(grandson_pid)
|
| - try:
|
| - self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
|
| - grandson_proc.kill()
|
| - ret = grandson_proc.wait()
|
| - self.assertEqual(ret, None)
|
| - finally:
|
| - if grandson_proc.is_running():
|
| - grandson_proc.kill()
|
| - grandson_proc.wait()
|
| -
|
| - def test_wait_timeout_0(self):
|
| - sproc = get_test_subprocess()
|
| - p = psutil.Process(sproc.pid)
|
| - self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
|
| - p.kill()
|
| - stop_at = time.time() + 2
|
| - while 1:
|
| - try:
|
| - code = p.wait(0)
|
| - except psutil.TimeoutExpired:
|
| - if time.time() >= stop_at:
|
| - raise
|
| - else:
|
| - break
|
| - if os.name == 'posix':
|
| - self.assertEqual(code, signal.SIGKILL)
|
| - else:
|
| - self.assertEqual(code, 0)
|
| - self.assertFalse(p.is_running())
|
| -
|
| - def test_cpu_percent(self):
|
| - p = psutil.Process(os.getpid())
|
| - p.get_cpu_percent(interval=0.001)
|
| - p.get_cpu_percent(interval=0.001)
|
| - for x in range(100):
|
| - percent = p.get_cpu_percent(interval=None)
|
| - self.assertTrue(isinstance(percent, float))
|
| - self.assertTrue(percent >= 0.0)
|
| - self.assertTrue(percent <= 100.0)
|
| -
|
| - def test_cpu_times(self):
|
| - times = psutil.Process(os.getpid()).get_cpu_times()
|
| - self.assertTrue((times.user > 0.0) or (times.system > 0.0))
|
| - # make sure returned values can be pretty printed with strftime
|
| - time.strftime("%H:%M:%S", time.localtime(times.user))
|
| - time.strftime("%H:%M:%S", time.localtime(times.system))
|
| -
|
| - # Test Process.cpu_times() against os.times()
|
| - # os.times() is broken on Python 2.6
|
| - # http://bugs.python.org/issue1040026
|
| - # XXX fails on OSX: not sure if it's for os.times(). We should
|
| - # try this with Python 2.7 and re-enable the test.
|
| -
|
| - @skipUnless(sys.version_info > (2, 6, 1) and not OSX)
|
| - def test_cpu_times2(self):
|
| - user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
|
| - utime, ktime = os.times()[:2]
|
| -
|
| - # Use os.times()[:2] as base values to compare our results
|
| - # using a tolerance of +/- 0.1 seconds.
|
| - # It will fail if the difference between the values is > 0.1s.
|
| - if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
|
| - self.fail("expected: %s, found: %s" %(utime, user_time))
|
| -
|
| - if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
|
| - self.fail("expected: %s, found: %s" %(ktime, kernel_time))
|
| -
|
| - def test_create_time(self):
|
| - sproc = get_test_subprocess()
|
| - now = time.time()
|
| - wait_for_pid(sproc.pid)
|
| - p = psutil.Process(sproc.pid)
|
| - create_time = p.create_time
|
| -
|
| - # Use time.time() as base value to compare our result using a
|
| - # tolerance of +/- 1 second.
|
| - # It will fail if the difference between the values is > 2s.
|
| - difference = abs(create_time - now)
|
| - if difference > 2:
|
| - self.fail("expected: %s, found: %s, difference: %s"
|
| - % (now, create_time, difference))
|
| -
|
| - # make sure returned value can be pretty printed with strftime
|
| - time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
|
| -
|
| - @skipIf(WINDOWS)
|
| - def test_terminal(self):
|
| - tty = sh('tty')
|
| - p = psutil.Process(os.getpid())
|
| - self.assertEqual(p.terminal, tty)
|
| -
|
| - @skipIf(OSX, warn=False)
|
| - def test_get_io_counters(self):
|
| - p = psutil.Process(os.getpid())
|
| - # test reads
|
| - io1 = p.get_io_counters()
|
| - f = open(PYTHON, 'rb')
|
| - f.read()
|
| - f.close()
|
| - io2 = p.get_io_counters()
|
| - if not BSD:
|
| - self.assertTrue(io2.read_count > io1.read_count)
|
| - self.assertTrue(io2.write_count == io1.write_count)
|
| - self.assertTrue(io2.read_bytes >= io1.read_bytes)
|
| - self.assertTrue(io2.write_bytes >= io1.write_bytes)
|
| - # test writes
|
| - io1 = p.get_io_counters()
|
| - f = tempfile.TemporaryFile()
|
| - if sys.version_info >= (3,):
|
| - f.write(bytes("x" * 1000000, 'ascii'))
|
| - else:
|
| - f.write("x" * 1000000)
|
| - f.close()
|
| - io2 = p.get_io_counters()
|
| - if not BSD:
|
| - self.assertTrue(io2.write_count > io1.write_count)
|
| - self.assertTrue(io2.write_bytes > io1.write_bytes)
|
| - self.assertTrue(io2.read_count >= io1.read_count)
|
| - self.assertTrue(io2.read_bytes >= io1.read_bytes)
|
| -
|
| - @skipUnless(LINUX)
|
| - def test_get_set_ionice(self):
|
| - from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE,
|
| - IOPRIO_CLASS_IDLE)
|
| - self.assertEqual(IOPRIO_CLASS_NONE, 0)
|
| - self.assertEqual(IOPRIO_CLASS_RT, 1)
|
| - self.assertEqual(IOPRIO_CLASS_BE, 2)
|
| - self.assertEqual(IOPRIO_CLASS_IDLE, 3)
|
| - p = psutil.Process(os.getpid())
|
| - try:
|
| - p.set_ionice(2)
|
| - ioclass, value = p.get_ionice()
|
| - self.assertEqual(ioclass, 2)
|
| - self.assertEqual(value, 4)
|
| - #
|
| - p.set_ionice(3)
|
| - ioclass, value = p.get_ionice()
|
| - self.assertEqual(ioclass, 3)
|
| - self.assertEqual(value, 0)
|
| - #
|
| - p.set_ionice(2, 0)
|
| - ioclass, value = p.get_ionice()
|
| - self.assertEqual(ioclass, 2)
|
| - self.assertEqual(value, 0)
|
| - p.set_ionice(2, 7)
|
| - ioclass, value = p.get_ionice()
|
| - self.assertEqual(ioclass, 2)
|
| - self.assertEqual(value, 7)
|
| - self.assertRaises(ValueError, p.set_ionice, 2, 10)
|
| - finally:
|
| - p.set_ionice(IOPRIO_CLASS_NONE)
|
| -
|
| - def test_get_num_threads(self):
|
| - # on certain platforms such as Linux we might test for exact
|
| - # thread number, since we always have with 1 thread per process,
|
| - # but this does not apply across all platforms (OSX, Windows)
|
| - p = psutil.Process(os.getpid())
|
| - step1 = p.get_num_threads()
|
| -
|
| - thread = ThreadTask()
|
| - thread.start()
|
| - try:
|
| - step2 = p.get_num_threads()
|
| - self.assertEqual(step2, step1 + 1)
|
| - thread.stop()
|
| - finally:
|
| - if thread._running:
|
| - thread.stop()
|
| -
|
| - def test_get_threads(self):
|
| - p = psutil.Process(os.getpid())
|
| - step1 = p.get_threads()
|
| -
|
| - thread = ThreadTask()
|
| - thread.start()
|
| -
|
| - try:
|
| - step2 = p.get_threads()
|
| - self.assertEqual(len(step2), len(step1) + 1)
|
| - # on Linux, first thread id is supposed to be this process
|
| - if LINUX:
|
| - self.assertEqual(step2[0].id, os.getpid())
|
| - athread = step2[0]
|
| - # test named tuple
|
| - self.assertEqual(athread.id, athread[0])
|
| - self.assertEqual(athread.user_time, athread[1])
|
| - self.assertEqual(athread.system_time, athread[2])
|
| - # test num threads
|
| - thread.stop()
|
| - finally:
|
| - if thread._running:
|
| - thread.stop()
|
| -
|
| - def test_get_memory_info(self):
|
| - p = psutil.Process(os.getpid())
|
| -
|
| - # step 1 - get a base value to compare our results
|
| - rss1, vms1 = p.get_memory_info()
|
| - percent1 = p.get_memory_percent()
|
| - self.assertTrue(rss1 > 0)
|
| - self.assertTrue(vms1 > 0)
|
| -
|
| - # step 2 - allocate some memory
|
| - memarr = [None] * 1500000
|
| -
|
| - rss2, vms2 = p.get_memory_info()
|
| - percent2 = p.get_memory_percent()
|
| - # make sure that the memory usage bumped up
|
| - self.assertTrue(rss2 > rss1)
|
| - self.assertTrue(vms2 >= vms1) # vms might be equal
|
| - self.assertTrue(percent2 > percent1)
|
| - del memarr
|
| -
|
| - def test_get_memory_percent(self):
|
| - p = psutil.Process(os.getpid())
|
| - self.assertTrue(p.get_memory_percent() > 0.0)
|
| -
|
| - def test_pid(self):
|
| - sproc = get_test_subprocess()
|
| - self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
|
| -
|
| - def test_is_running(self):
|
| - sproc = get_test_subprocess()
|
| - wait_for_pid(sproc.pid)
|
| - p = psutil.Process(sproc.pid)
|
| - self.assertTrue(p.is_running())
|
| - p.kill()
|
| - p.wait()
|
| - self.assertFalse(p.is_running())
|
| -
|
| - def test_exe(self):
|
| - sproc = get_test_subprocess()
|
| - wait_for_pid(sproc.pid)
|
| - try:
|
| - self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON)
|
| - except AssertionError:
|
| - # certain platforms such as BSD are more accurate returning:
|
| - # "/usr/local/bin/python2.7"
|
| - # ...instead of:
|
| - # "/usr/local/bin/python"
|
| - # We do not want to consider this difference in accuracy
|
| - # an error.
|
| - name = psutil.Process(sproc.pid).exe
|
| - adjusted_name = PYTHON[:len(name)]
|
| - self.assertEqual(name, adjusted_name)
|
| - for p in psutil.process_iter():
|
| - try:
|
| - exe = p.exe
|
| - except psutil.Error:
|
| - continue
|
| - if not exe:
|
| - continue
|
| - if not os.path.exists(exe):
|
| - self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \
|
| - % (repr(exe), p.pid, p.name, p.cmdline))
|
| - if hasattr(os, 'access') and hasattr(os, "X_OK"):
|
| - if not os.access(p.exe, os.X_OK):
|
| - self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s)" \
|
| - % (repr(p.exe), p.pid, p.name, p.cmdline))
|
| -
|
| - def test_cmdline(self):
|
| - sproc = get_test_subprocess([PYTHON, "-E"])
|
| - wait_for_pid(sproc.pid)
|
| - self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"])
|
| -
|
| - def test_name(self):
|
| - sproc = get_test_subprocess(PYTHON)
|
| - wait_for_pid(sproc.pid)
|
| - self.assertEqual(psutil.Process(sproc.pid).name.lower(),
|
| - os.path.basename(sys.executable).lower())
|
| -
|
| - if os.name == 'posix':
|
| -
|
| - def test_uids(self):
|
| - p = psutil.Process(os.getpid())
|
| - real, effective, saved = p.uids
|
| - # os.getuid() refers to "real" uid
|
| - self.assertEqual(real, os.getuid())
|
| - # os.geteuid() refers to "effective" uid
|
| - self.assertEqual(effective, os.geteuid())
|
| - # no such thing as os.getsuid() ("saved" uid), but starting
|
| - # from python 2.7 we have os.getresuid()[2]
|
| - if hasattr(os, "getresuid"):
|
| - self.assertEqual(saved, os.getresuid()[2])
|
| -
|
| - def test_gids(self):
|
| - p = psutil.Process(os.getpid())
|
| - real, effective, saved = p.gids
|
| - # os.getuid() refers to "real" uid
|
| - self.assertEqual(real, os.getgid())
|
| - # os.geteuid() refers to "effective" uid
|
| - self.assertEqual(effective, os.getegid())
|
| - # no such thing as os.getsuid() ("saved" uid), but starting
|
| - # from python 2.7 we have os.getresgid()[2]
|
| - if hasattr(os, "getresuid"):
|
| - self.assertEqual(saved, os.getresgid()[2])
|
| -
|
| - def test_nice(self):
|
| - p = psutil.Process(os.getpid())
|
| - self.assertRaises(TypeError, setattr, p, "nice", "str")
|
| - try:
|
| - try:
|
| - first_nice = p.nice
|
| - p.nice = 1
|
| - self.assertEqual(p.nice, 1)
|
| - # going back to previous nice value raises AccessDenied on OSX
|
| - if not OSX:
|
| - p.nice = 0
|
| - self.assertEqual(p.nice, 0)
|
| - except psutil.AccessDenied:
|
| - pass
|
| - finally:
|
| - # going back to previous nice value raises AccessDenied on OSX
|
| - if not OSX:
|
| - p.nice = first_nice
|
| -
|
| - if os.name == 'nt':
|
| -
|
| - def test_nice(self):
|
| - p = psutil.Process(os.getpid())
|
| - self.assertRaises(TypeError, setattr, p, "nice", "str")
|
| - try:
|
| - self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS)
|
| - p.nice = psutil.HIGH_PRIORITY_CLASS
|
| - self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS)
|
| - p.nice = psutil.NORMAL_PRIORITY_CLASS
|
| - self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS)
|
| - finally:
|
| - p.nice = psutil.NORMAL_PRIORITY_CLASS
|
| -
|
| - def test_status(self):
|
| - p = psutil.Process(os.getpid())
|
| - self.assertEqual(p.status, psutil.STATUS_RUNNING)
|
| - self.assertEqual(str(p.status), "running")
|
| - for p in psutil.process_iter():
|
| - if str(p.status) == '?':
|
| - self.fail("invalid status for pid %d" % p.pid)
|
| -
|
| - def test_username(self):
|
| - sproc = get_test_subprocess()
|
| - p = psutil.Process(sproc.pid)
|
| - if POSIX:
|
| - import pwd
|
| - self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name)
|
| - elif WINDOWS:
|
| - expected_username = os.environ['USERNAME']
|
| - expected_domain = os.environ['USERDOMAIN']
|
| - domain, username = p.username.split('\\')
|
| - self.assertEqual(domain, expected_domain)
|
| - self.assertEqual(username, expected_username)
|
| - else:
|
| - p.username
|
| -
|
| - @skipUnless(WINDOWS or LINUX)
|
| - def test_getcwd(self):
|
| - sproc = get_test_subprocess()
|
| - wait_for_pid(sproc.pid)
|
| - p = psutil.Process(sproc.pid)
|
| - self.assertEqual(p.getcwd(), os.getcwd())
|
| -
|
| - @skipUnless(WINDOWS or LINUX)
|
| - def test_getcwd_2(self):
|
| - cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"]
|
| - sproc = get_test_subprocess(cmd)
|
| - wait_for_pid(sproc.pid)
|
| - p = psutil.Process(sproc.pid)
|
| - time.sleep(0.1)
|
| - expected_dir = os.path.dirname(os.getcwd())
|
| - self.assertEqual(p.getcwd(), expected_dir)
|
| -
|
| - def test_get_open_files(self):
|
| - # current process
|
| - p = psutil.Process(os.getpid())
|
| - files = p.get_open_files()
|
| - self.assertFalse(TESTFN in files)
|
| - f = open(TESTFN, 'r')
|
| - time.sleep(.1)
|
| - filenames = [x.path for x in p.get_open_files()]
|
| - self.assertTrue(TESTFN in filenames)
|
| - f.close()
|
| - for file in filenames:
|
| - self.assertTrue(os.path.isfile(file))
|
| -
|
| - # another process
|
| - cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN
|
| - sproc = get_test_subprocess([PYTHON, "-c", cmdline])
|
| - wait_for_pid(sproc.pid)
|
| - time.sleep(0.1)
|
| - p = psutil.Process(sproc.pid)
|
| - for x in range(100):
|
| - filenames = [x.path for x in p.get_open_files()]
|
| - if TESTFN in filenames:
|
| - break
|
| - time.sleep(.01)
|
| - else:
|
| - self.assertTrue(TESTFN in filenames)
|
| - for file in filenames:
|
| - self.assertTrue(os.path.isfile(file))
|
| - # all processes
|
| - for proc in psutil.process_iter():
|
| - try:
|
| - files = proc.get_open_files()
|
| - except psutil.Error:
|
| - pass
|
| - else:
|
| - for file in filenames:
|
| - self.assertTrue(os.path.isfile(file))
|
| -
|
| - def test_get_open_files2(self):
|
| - # test fd and path fields
|
| - fileobj = open(TESTFN, 'r')
|
| - p = psutil.Process(os.getpid())
|
| - for path, fd in p.get_open_files():
|
| - if path == fileobj.name or fd == fileobj.fileno():
|
| - break
|
| - else:
|
| - self.fail("no file found; files=%s" % repr(p.get_open_files()))
|
| - self.assertEqual(path, fileobj.name)
|
| - if WINDOWS:
|
| - self.assertEqual(fd, -1)
|
| - else:
|
| - self.assertEqual(fd, fileobj.fileno())
|
| - # test positions
|
| - ntuple = p.get_open_files()[0]
|
| - self.assertEqual(ntuple[0], ntuple.path)
|
| - self.assertEqual(ntuple[1], ntuple.fd)
|
| - # test file is gone
|
| - fileobj.close()
|
| - self.assertTrue(fileobj.name not in p.get_open_files())
|
| -
|
| - @skipUnless(SUPPORT_CONNECTIONS, warn=1)
|
| - def test_get_connections(self):
|
| - arg = "import socket, time;" \
|
| - "s = socket.socket();" \
|
| - "s.bind(('127.0.0.1', 0));" \
|
| - "s.listen(1);" \
|
| - "conn, addr = s.accept();" \
|
| - "time.sleep(100);"
|
| - sproc = get_test_subprocess([PYTHON, "-c", arg])
|
| - p = psutil.Process(sproc.pid)
|
| - for x in range(100):
|
| - cons = p.get_connections()
|
| - if cons:
|
| - break
|
| - time.sleep(.01)
|
| - self.assertEqual(len(cons), 1)
|
| - con = cons[0]
|
| - self.assertEqual(con.family, socket.AF_INET)
|
| - self.assertEqual(con.type, socket.SOCK_STREAM)
|
| - self.assertEqual(con.status, "LISTEN")
|
| - ip, port = con.local_address
|
| - self.assertEqual(ip, '127.0.0.1')
|
| - self.assertEqual(con.remote_address, ())
|
| - if WINDOWS:
|
| - self.assertEqual(con.fd, -1)
|
| - else:
|
| - self.assertTrue(con.fd > 0)
|
| - # test positions
|
| - self.assertEqual(con[0], con.fd)
|
| - self.assertEqual(con[1], con.family)
|
| - self.assertEqual(con[2], con.type)
|
| - self.assertEqual(con[3], con.local_address)
|
| - self.assertEqual(con[4], con.remote_address)
|
| - self.assertEqual(con[5], con.status)
|
| -
|
| - @skipUnless(supports_ipv6())
|
| - def test_get_connections_ipv6(self):
|
| - s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
| - s.bind(('::1', 0))
|
| - s.listen(1)
|
| - cons = psutil.Process(os.getpid()).get_connections()
|
| - s.close()
|
| - self.assertEqual(len(cons), 1)
|
| - self.assertEqual(cons[0].local_address[0], '::1')
|
| -
|
| - @skipUnless(hasattr(socket, "fromfd") and not WINDOWS)
|
| - def test_connection_fromfd(self):
|
| - sock = socket.socket()
|
| - sock.bind(('localhost', 0))
|
| - sock.listen(1)
|
| - p = psutil.Process(os.getpid())
|
| - for conn in p.get_connections():
|
| - if conn.fd == sock.fileno():
|
| - break
|
| - else:
|
| - sock.close()
|
| - self.fail("couldn't find socket fd")
|
| - dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
|
| - try:
|
| - self.assertEqual(dupsock.getsockname(), conn.local_address)
|
| - self.assertNotEqual(sock.fileno(), dupsock.fileno())
|
| - finally:
|
| - sock.close()
|
| - dupsock.close()
|
| -
|
| - @skipUnless(SUPPORT_CONNECTIONS, warn=1)
|
| - def test_get_connections_all(self):
|
| -
|
| - def check_address(addr, family):
|
| - if not addr:
|
| - return
|
| - ip, port = addr
|
| - self.assertTrue(isinstance(port, int))
|
| - if family == socket.AF_INET:
|
| - ip = list(map(int, ip.split('.')))
|
| - self.assertTrue(len(ip) == 4)
|
| - for num in ip:
|
| - self.assertTrue(0 <= num <= 255)
|
| - self.assertTrue(0 <= port <= 65535)
|
| -
|
| - # all values are supposed to match Linux's tcp_states.h states
|
| - # table across all platforms.
|
| - valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
|
| - "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
|
| - "LAST_ACK", "LISTEN", "CLOSING", ""]
|
| -
|
| - tcp_template = "import socket;" \
|
| - "s = socket.socket($family, socket.SOCK_STREAM);" \
|
| - "s.bind(('$addr', 0));" \
|
| - "s.listen(1);" \
|
| - "conn, addr = s.accept();"
|
| -
|
| - udp_template = "import socket, time;" \
|
| - "s = socket.socket($family, socket.SOCK_DGRAM);" \
|
| - "s.bind(('$addr', 0));" \
|
| - "time.sleep(100);"
|
| -
|
| - from string import Template
|
| - tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET,
|
| - addr="127.0.0.1")
|
| - udp4_template = Template(udp_template).substitute(family=socket.AF_INET,
|
| - addr="127.0.0.1")
|
| - tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6,
|
| - addr="::1")
|
| - udp6_template = Template(udp_template).substitute(family=socket.AF_INET6,
|
| - addr="::1")
|
| -
|
| - # launch various subprocess instantiating a socket of various
|
| - # families and tupes to enrich psutil results
|
| - tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template])
|
| - udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template])
|
| - if supports_ipv6():
|
| - tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template])
|
| - udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template])
|
| - else:
|
| - tcp6_proc = None
|
| - udp6_proc = None
|
| -
|
| - # --- check connections of all processes
|
| -
|
| - time.sleep(0.1)
|
| - for p in psutil.process_iter():
|
| - try:
|
| - cons = p.get_connections()
|
| - except (psutil.NoSuchProcess, psutil.AccessDenied):
|
| - pass
|
| - else:
|
| - for conn in cons:
|
| - self.assertTrue(conn.type in (socket.SOCK_STREAM,
|
| - socket.SOCK_DGRAM))
|
| - self.assertTrue(conn.family in (socket.AF_INET,
|
| - socket.AF_INET6))
|
| - check_address(conn.local_address, conn.family)
|
| - check_address(conn.remote_address, conn.family)
|
| - if conn.status not in valid_states:
|
| - self.fail("%s is not a valid status" %conn.status)
|
| - # actually try to bind the local socket; ignore IPv6
|
| - # sockets as their address might be represented as
|
| - # an IPv4-mapped-address (e.g. "::127.0.0.1")
|
| - # and that's rejected by bind()
|
| - if conn.family == socket.AF_INET:
|
| - s = socket.socket(conn.family, conn.type)
|
| - s.bind((conn.local_address[0], 0))
|
| - s.close()
|
| -
|
| - if not WINDOWS and hasattr(socket, 'fromfd'):
|
| - dupsock = None
|
| - try:
|
| - try:
|
| - dupsock = socket.fromfd(conn.fd, conn.family,
|
| - conn.type)
|
| - except (socket.error, OSError):
|
| - err = sys.exc_info()[1]
|
| - if err.args[0] == errno.EBADF:
|
| - continue
|
| - else:
|
| - raise
|
| - # python >= 2.5
|
| - if hasattr(dupsock, "family"):
|
| - self.assertEqual(dupsock.family, conn.family)
|
| - self.assertEqual(dupsock.type, conn.type)
|
| - finally:
|
| - if dupsock is not None:
|
| - dupsock.close()
|
| -
|
| -
|
| - # --- check matches against subprocesses
|
| -
|
| - for p in psutil.Process(os.getpid()).get_children():
|
| - for conn in p.get_connections():
|
| - # TCP v4
|
| - if p.pid == tcp4_proc.pid:
|
| - self.assertEqual(conn.family, socket.AF_INET)
|
| - self.assertEqual(conn.type, socket.SOCK_STREAM)
|
| - self.assertEqual(conn.local_address[0], "127.0.0.1")
|
| - self.assertEqual(conn.remote_address, ())
|
| - self.assertEqual(conn.status, "LISTEN")
|
| - # UDP v4
|
| - elif p.pid == udp4_proc.pid:
|
| - self.assertEqual(conn.family, socket.AF_INET)
|
| - self.assertEqual(conn.type, socket.SOCK_DGRAM)
|
| - self.assertEqual(conn.local_address[0], "127.0.0.1")
|
| - self.assertEqual(conn.remote_address, ())
|
| - self.assertEqual(conn.status, "")
|
| - # TCP v6
|
| - elif p.pid == getattr(tcp6_proc, "pid", None):
|
| - self.assertEqual(conn.family, socket.AF_INET6)
|
| - self.assertEqual(conn.type, socket.SOCK_STREAM)
|
| - self.assertTrue(conn.local_address[0] in ("::", "::1"))
|
| - self.assertEqual(conn.remote_address, ())
|
| - self.assertEqual(conn.status, "LISTEN")
|
| - # UDP v6
|
| - elif p.pid == getattr(udp6_proc, "pid", None):
|
| - self.assertEqual(conn.family, socket.AF_INET6)
|
| - self.assertEqual(conn.type, socket.SOCK_DGRAM)
|
| - self.assertTrue(conn.local_address[0] in ("::", "::1"))
|
| - self.assertEqual(conn.remote_address, ())
|
| - self.assertEqual(conn.status, "")
|
| -
|
| - def test_parent_ppid(self):
|
| - this_parent = os.getpid()
|
| - sproc = get_test_subprocess()
|
| - p = psutil.Process(sproc.pid)
|
| - self.assertEqual(p.ppid, this_parent)
|
| - self.assertEqual(p.parent.pid, this_parent)
|
| - # no other process is supposed to have us as parent
|
| - for p in psutil.process_iter():
|
| - if p.pid == sproc.pid:
|
| - continue
|
| - self.assertTrue(p.ppid != this_parent)
|
| -
|
| - def test_get_children(self):
|
| - p = psutil.Process(os.getpid())
|
| - self.assertEqual(p.get_children(), [])
|
| - sproc = get_test_subprocess()
|
| - children = p.get_children()
|
| - self.assertEqual(len(children), 1)
|
| - self.assertEqual(children[0].pid, sproc.pid)
|
| - self.assertEqual(children[0].ppid, os.getpid())
|
| -
|
| - def test_suspend_resume(self):
|
| - sproc = get_test_subprocess()
|
| - p = psutil.Process(sproc.pid)
|
| - p.suspend()
|
| - time.sleep(0.1)
|
| - self.assertEqual(p.status, psutil.STATUS_STOPPED)
|
| - self.assertEqual(str(p.status), "stopped")
|
| - p.resume()
|
| - self.assertTrue(p.status != psutil.STATUS_STOPPED)
|
| -
|
| - def test_invalid_pid(self):
|
| - self.assertRaises(ValueError, psutil.Process, "1")
|
| - self.assertRaises(ValueError, psutil.Process, None)
|
| - # Refers to Issue #12
|
| - self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1)
|
| -
|
| - def test_zombie_process(self):
|
| - # Test that NoSuchProcess exception gets raised in case the
|
| - # process dies after we create the Process object.
|
| - # Example:
|
| - # >>> proc = Process(1234)
|
| - # >>> time.sleep(5) # time-consuming task, process dies in meantime
|
| - # >>> proc.name
|
| - # Refers to Issue #15
|
| - sproc = get_test_subprocess()
|
| - p = psutil.Process(sproc.pid)
|
| - p.kill()
|
| - p.wait()
|
| -
|
| - for name in dir(p):
|
| - if name.startswith('_')\
|
| - or name in ('pid', 'send_signal', 'is_running', 'set_ionice',
|
| - 'wait'):
|
| - continue
|
| - try:
|
| - meth = getattr(p, name)
|
| - if callable(meth):
|
| - meth()
|
| - except psutil.NoSuchProcess:
|
| - pass
|
| - else:
|
| - self.fail("NoSuchProcess exception not raised for %r" % name)
|
| -
|
| - # other methods
|
| - try:
|
| - if os.name == 'posix':
|
| - p.nice = 1
|
| - else:
|
| - p.nice = psutil.NORMAL_PRIORITY_CLASS
|
| - except psutil.NoSuchProcess:
|
| - pass
|
| - else:
|
| - self.fail("exception not raised")
|
| - if hasattr(p, 'set_ionice'):
|
| - self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2)
|
| - self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
|
| - self.assertFalse(p.is_running())
|
| -
|
| - def test__str__(self):
|
| - sproc = get_test_subprocess()
|
| - p = psutil.Process(sproc.pid)
|
| - self.assertTrue(str(sproc.pid) in str(p))
|
| - # python shows up as 'Python' in cmdline on OS X so test fails on OS X
|
| - if not OSX:
|
| - self.assertTrue(os.path.basename(PYTHON) in str(p))
|
| - sproc = get_test_subprocess()
|
| - p = psutil.Process(sproc.pid)
|
| - p.kill()
|
| - p.wait()
|
| - self.assertTrue(str(sproc.pid) in str(p))
|
| - self.assertTrue("terminated" in str(p))
|
| -
|
| - def test_fetch_all(self):
|
| - valid_procs = 0
|
| - excluded_names = ['send_signal', 'suspend', 'resume', 'terminate',
|
| - 'kill', 'wait']
|
| - excluded_names += ['get_cpu_percent', 'get_children']
|
| - # XXX - skip slow lsof implementation;
|
| - if BSD:
|
| - excluded_names += ['get_open_files', 'get_connections']
|
| - if OSX:
|
| - excluded_names += ['get_connections']
|
| - attrs = []
|
| - for name in dir(psutil.Process):
|
| - if name.startswith("_"):
|
| - continue
|
| - if name.startswith("set_"):
|
| - continue
|
| - if name in excluded_names:
|
| - continue
|
| - attrs.append(name)
|
| -
|
| - for p in psutil.process_iter():
|
| - for name in attrs:
|
| - try:
|
| - try:
|
| - attr = getattr(p, name, None)
|
| - if attr is not None and callable(attr):
|
| - ret = attr()
|
| - else:
|
| - ret = attr
|
| - valid_procs += 1
|
| - except (psutil.NoSuchProcess, psutil.AccessDenied):
|
| - err = sys.exc_info()[1]
|
| - self.assertEqual(err.pid, p.pid)
|
| - if err.name:
|
| - self.assertEqual(err.name, p.name)
|
| - self.assertTrue(str(err))
|
| - self.assertTrue(err.msg)
|
| - else:
|
| - if name == 'parent' or ret in (0, 0.0, [], None):
|
| - continue
|
| - self.assertTrue(ret)
|
| - if name == "exe":
|
| - self.assertTrue(os.path.isfile(ret))
|
| - elif name == "getcwd":
|
| - # XXX - temporary fix; on my Linux box
|
| - # chrome process cws is errnously reported
|
| - # as /proc/4144/fdinfo whichd doesn't exist
|
| - if 'chrome' in p.name:
|
| - continue
|
| - self.assertTrue(os.path.isdir(ret))
|
| - except Exception:
|
| - err = sys.exc_info()[1]
|
| - trace = traceback.format_exc()
|
| - self.fail('%s\nmethod=%s, pid=%s, retvalue=%s'
|
| - %(trace, name, p.pid, repr(ret)))
|
| -
|
| - # we should always have a non-empty list, not including PID 0 etc.
|
| - # special cases.
|
| - self.assertTrue(valid_procs > 0)
|
| -
|
| - @skipIf(LINUX)
|
| - def test_pid_0(self):
|
| - # Process(0) is supposed to work on all platforms except Linux
|
| - p = psutil.Process(0)
|
| - self.assertTrue(p.name)
|
| -
|
| - if os.name == 'posix':
|
| - self.assertEqual(p.uids.real, 0)
|
| - self.assertEqual(p.gids.real, 0)
|
| -
|
| - self.assertTrue(p.ppid in (0, 1))
|
| - #self.assertEqual(p.exe, "")
|
| - self.assertEqual(p.cmdline, [])
|
| - try:
|
| - p.get_num_threads()
|
| - except psutil.AccessDenied:
|
| - pass
|
| -
|
| - if OSX : #and os.geteuid() != 0:
|
| - self.assertRaises(psutil.AccessDenied, p.get_memory_info)
|
| - self.assertRaises(psutil.AccessDenied, p.get_cpu_times)
|
| - else:
|
| - p.get_memory_info()
|
| -
|
| - # username property
|
| - if POSIX:
|
| - self.assertEqual(p.username, 'root')
|
| - elif WINDOWS:
|
| - self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM')
|
| - else:
|
| - p.username
|
| -
|
| - self.assertTrue(0 in psutil.get_pid_list())
|
| - self.assertTrue(psutil.pid_exists(0))
|
| -
|
| - def test_Popen(self):
|
| - # Popen class test
|
| - cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
|
| - proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
| - try:
|
| - proc.name
|
| - proc.stdin
|
| - self.assertTrue(hasattr(proc, 'name'))
|
| - self.assertTrue(hasattr(proc, 'stdin'))
|
| - self.assertRaises(AttributeError, getattr, proc, 'foo')
|
| - finally:
|
| - proc.kill()
|
| - proc.wait()
|
| -
|
| -
|
| -if hasattr(os, 'getuid'):
|
| - class LimitedUserTestCase(TestCase):
|
| - """Repeat the previous tests by using a limited user.
|
| - Executed only on UNIX and only if the user who run the test script
|
| - is root.
|
| - """
|
| - # the uid/gid the test suite runs under
|
| - PROCESS_UID = os.getuid()
|
| - PROCESS_GID = os.getgid()
|
| -
|
| - def __init__(self, *args, **kwargs):
|
| - TestCase.__init__(self, *args, **kwargs)
|
| - # re-define all existent test methods in order to
|
| - # ignore AccessDenied exceptions
|
| - for attr in [x for x in dir(self) if x.startswith('test')]:
|
| - meth = getattr(self, attr)
|
| - def test_(self):
|
| - try:
|
| - meth()
|
| - except psutil.AccessDenied:
|
| - pass
|
| - setattr(self, attr, types.MethodType(test_, self))
|
| -
|
| - def setUp(self):
|
| - os.setegid(1000)
|
| - os.seteuid(1000)
|
| - TestCase.setUp(self)
|
| -
|
| - def tearDown(self):
|
| - os.setegid(self.PROCESS_UID)
|
| - os.seteuid(self.PROCESS_GID)
|
| - TestCase.tearDown(self)
|
| -
|
| - def test_nice(self):
|
| - try:
|
| - psutil.Process(os.getpid()).nice = -1
|
| - except psutil.AccessDenied:
|
| - pass
|
| - else:
|
| - self.fail("exception not raised")
|
| -
|
| -
|
| -def test_main():
|
| - tests = []
|
| - test_suite = unittest.TestSuite()
|
| - tests.append(TestCase)
|
| -
|
| - if POSIX:
|
| - from _posix import PosixSpecificTestCase
|
| - tests.append(PosixSpecificTestCase)
|
| -
|
| - # import the specific platform test suite
|
| - if LINUX:
|
| - from _linux import LinuxSpecificTestCase as stc
|
| - elif WINDOWS:
|
| - from _windows import WindowsSpecificTestCase as stc
|
| - elif OSX:
|
| - from _osx import OSXSpecificTestCase as stc
|
| - elif BSD:
|
| - from _bsd import BSDSpecificTestCase as stc
|
| - tests.append(stc)
|
| -
|
| - if hasattr(os, 'getuid'):
|
| - if os.getuid() == 0:
|
| - tests.append(LimitedUserTestCase)
|
| - else:
|
| - atexit.register(warnings.warn, "Couldn't run limited user tests ("
|
| - "super-user privileges are required)", RuntimeWarning)
|
| -
|
| - for test_class in tests:
|
| - test_suite.addTest(unittest.makeSuite(test_class))
|
| -
|
| - f = open(TESTFN, 'w')
|
| - f.close()
|
| - atexit.register(lambda: os.remove(TESTFN))
|
| -
|
| - unittest.TextTestRunner(verbosity=2).run(test_suite)
|
| - DEVNULL.close()
|
| -
|
| -if __name__ == '__main__':
|
| - test_main()
|
|
|