| Index: third_party/psutil/test/test_psutil.py
|
| diff --git a/third_party/psutil/test/test_psutil.py b/third_party/psutil/test/test_psutil.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..c067dee07a7bc5dcaa82f0e4738ce439ce3c8feb
|
| --- /dev/null
|
| +++ b/third_party/psutil/test/test_psutil.py
|
| @@ -0,0 +1,1062 @@
|
| +#!/usr/bin/env python
|
| +#
|
| +# $Id: test_psutil.py 806 2010-11-12 23:09:35Z g.rodola $
|
| +#
|
| +
|
| +"""psutil test suite.
|
| +Note: this is targeted for python 2.x.
|
| +To run it under python 3.x you need to use 2to3 tool first:
|
| +
|
| +$ 2to3 -w test/*.py
|
| +"""
|
| +
|
| +import unittest
|
| +import os
|
| +import sys
|
| +import subprocess
|
| +import time
|
| +import signal
|
| +import types
|
| +import traceback
|
| +import socket
|
| +import warnings
|
| +import atexit
|
| +import errno
|
| +import threading
|
| +
|
| +import psutil
|
| +
|
| +
|
| +PYTHON = os.path.realpath(sys.executable)
|
| +DEVNULL = open(os.devnull, 'r+')
|
| +
|
| +POSIX = os.name == 'posix'
|
| +LINUX = sys.platform.lower().startswith("linux")
|
| +WINDOWS = sys.platform.lower().startswith("win32")
|
| +OSX = sys.platform.lower().startswith("darwin")
|
| +BSD = sys.platform.lower().startswith("freebsd")
|
| +
|
| +try:
|
| + psutil.Process(os.getpid()).get_connections()
|
| +except NotImplementedError, err:
|
| + SUPPORT_CONNECTIONS = False
|
| + atexit.register(warnings.warn, "get_connections() not supported on this platform",
|
| + RuntimeWarning)
|
| +else:
|
| + SUPPORT_CONNECTIONS = True
|
| +
|
| +
|
| +_subprocesses_started = set()
|
| +
|
| +def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None):
|
| + """Return a subprocess.Popen object to use in tests.
|
| + By default stdout and stderr are redirected to /dev/null and the
|
| + python interpreter is used as test process.
|
| + """
|
| + if cmd is None:
|
| + cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
|
| + sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin)
|
| + _subprocesses_started.add(sproc.pid)
|
| + return sproc
|
| +
|
| +def wait_for_pid(pid, timeout=1):
|
| + """Wait for pid to show up in the process list then return.
|
| + Used in the test suite to give time the sub process to initialize.
|
| + """
|
| + raise_at = time.time() + timeout
|
| + while 1:
|
| + if pid in psutil.get_pid_list():
|
| + # give it one more iteration to allow full initialization
|
| + time.sleep(0.01)
|
| + return
|
| + time.sleep(0.0001)
|
| + if time.time() >= raise_at:
|
| + raise RuntimeError("Timed out")
|
| +
|
| +def kill(pid):
|
| + """Kill a process given its PID."""
|
| + if hasattr(os, 'kill'):
|
| + os.kill(pid, signal.SIGKILL)
|
| + else:
|
| + psutil.Process(pid).kill()
|
| +
|
| +def reap_children(search_all=False):
|
| + """Kill any subprocess started by this test suite and ensure that
|
| + no zombies stick around to hog resources and create problems when
|
| + looking for refleaks.
|
| + """
|
| + if POSIX:
|
| + def waitpid(process):
|
| + # on posix we are free to wait for any pending process by
|
| + # passing -1 to os.waitpid()
|
| + while True:
|
| + try:
|
| + any_process = -1
|
| + pid, status = os.waitpid(any_process, os.WNOHANG)
|
| + if pid == 0 and not process.is_running():
|
| + break
|
| + except OSError:
|
| + if not process.is_running():
|
| + break
|
| + else:
|
| + def waitpid(process):
|
| + # on non-posix systems we just wait for the given process
|
| + # to go away
|
| + while process.is_running():
|
| + time.sleep(0.01)
|
| +
|
| + if search_all:
|
| + this_process = psutil.Process(os.getpid())
|
| + pids = [x.pid for x in this_process.get_children()]
|
| + else:
|
| + pids =_subprocesses_started
|
| + while pids:
|
| + pid = pids.pop()
|
| + try:
|
| + child = psutil.Process(pid)
|
| + child.kill()
|
| + except psutil.NoSuchProcess:
|
| + pass
|
| + else:
|
| + waitpid(child)
|
| +
|
| +# we want to search through all processes before exiting
|
| +atexit.register(reap_children, search_all=True)
|
| +
|
| +def skipIf(condition, reason="", warn=False):
|
| + """Decorator which skip a test under if condition is satisfied.
|
| + This is a substitute of unittest.skipIf which is available
|
| + only in python 2.7 and 3.2.
|
| + If 'reason' argument is provided this will be printed during
|
| + tests execution.
|
| + If 'warn' is provided a RuntimeWarning will be shown when all
|
| + tests are run.
|
| + """
|
| + def outer(fun, *args, **kwargs):
|
| + def inner(self):
|
| + if condition:
|
| + sys.stdout.write("skipped-")
|
| + sys.stdout.flush()
|
| + if warn:
|
| + objname = "%s.%s" % (self.__class__.__name__, fun.__name__)
|
| + msg = "%s was skipped" % objname
|
| + if reason:
|
| + msg += "; reason: " + repr(reason)
|
| + atexit.register(warnings.warn, msg, RuntimeWarning)
|
| + return
|
| + else:
|
| + return fun(self, *args, **kwargs)
|
| + return inner
|
| + return outer
|
| +
|
| +def skipUnless(condition, reason="", warn=False):
|
| + """Contrary of skipIf."""
|
| + if not condition:
|
| + return skipIf(True, reason, warn)
|
| + return skipIf(False)
|
| +
|
| +
|
| +class TestCase(unittest.TestCase):
|
| +
|
| + def tearDown(self):
|
| + reap_children()
|
| +
|
| + def test_get_process_list(self):
|
| + pids = [x.pid for x in psutil.get_process_list()]
|
| + self.assertTrue(os.getpid() in pids)
|
| + self.assertTrue(0 in pids)
|
| +
|
| + def test_process_iter(self):
|
| + pids = [x.pid for x in psutil.process_iter()]
|
| + self.assertTrue(os.getpid() in pids)
|
| + self.assertTrue(0 in pids)
|
| +
|
| + def test_kill(self):
|
| + sproc = get_test_subprocess()
|
| + test_pid = sproc.pid
|
| + wait_for_pid(test_pid)
|
| + p = psutil.Process(test_pid)
|
| + name = p.name
|
| + p.kill()
|
| + sproc.wait()
|
| + self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| +
|
| + def test_terminate(self):
|
| + sproc = get_test_subprocess()
|
| + test_pid = sproc.pid
|
| + wait_for_pid(test_pid)
|
| + p = psutil.Process(test_pid)
|
| + name = p.name
|
| + p.terminate()
|
| + sproc.wait()
|
| + self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| +
|
| + def test_send_signal(self):
|
| + if POSIX:
|
| + sig = signal.SIGKILL
|
| + else:
|
| + sig = signal.SIGTERM
|
| + sproc = get_test_subprocess()
|
| + test_pid = sproc.pid
|
| + p = psutil.Process(test_pid)
|
| + name = p.name
|
| + p.send_signal(sig)
|
| + sproc.wait()
|
| + self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
|
| +
|
| + def test_TOTAL_PHYMEM(self):
|
| + x = psutil.TOTAL_PHYMEM
|
| + self.assertTrue(isinstance(x, (int, long)))
|
| + self.assertTrue(x > 0)
|
| +
|
| + def test_used_phymem(self):
|
| + x = psutil.used_phymem()
|
| + self.assertTrue(isinstance(x, (int, long)))
|
| + self.assertTrue(x > 0)
|
| +
|
| + def test_avail_phymem(self):
|
| + x = psutil.avail_phymem()
|
| + self.assertTrue(isinstance(x, (int, long)))
|
| + self.assertTrue(x > 0)
|
| +
|
| + def test_total_virtmem(self):
|
| + x = psutil.total_virtmem()
|
| + self.assertTrue(isinstance(x, (int, long)))
|
| + self.assertTrue(x >= 0)
|
| +
|
| + def test_used_virtmem(self):
|
| + x = psutil.used_virtmem()
|
| + self.assertTrue(isinstance(x, (int, long)))
|
| + self.assertTrue(x >= 0)
|
| +
|
| + def test_avail_virtmem(self):
|
| + x = psutil.avail_virtmem()
|
| + self.assertTrue(isinstance(x, (int, long)))
|
| + self.assertTrue(x >= 0)
|
| +
|
| + @skipUnless(LINUX)
|
| + def test_cached_phymem(self):
|
| + x = psutil.cached_phymem()
|
| + self.assertTrue(isinstance(x, (int, long)))
|
| + self.assertTrue(x >= 0)
|
| +
|
| + @skipUnless(LINUX)
|
| + def test_phymem_buffers(self):
|
| + x = psutil.phymem_buffers()
|
| + self.assertTrue(isinstance(x, (int, long)))
|
| + self.assertTrue(x >= 0)
|
| +
|
| + def test_system_cpu_times(self):
|
| + total = 0
|
| + times = psutil.cpu_times()
|
| + self.assertTrue(isinstance(times, psutil.CPUTimes))
|
| + sum(times)
|
| + for cp_time in times:
|
| + self.assertTrue(isinstance(cp_time, float))
|
| + self.assertTrue(cp_time >= 0.0)
|
| + total += cp_time
|
| + # test CPUTimes's __iter__ and __str__ implementation
|
| + self.assertEqual(total, sum(times))
|
| + str(times)
|
| +
|
| + def test_system_cpu_times2(self):
|
| + t1 = sum(psutil.cpu_times())
|
| + time.sleep(0.1)
|
| + t2 = sum(psutil.cpu_times())
|
| + difference = t2 - t1
|
| + if not difference >= 0.05:
|
| + self.fail("difference %s" % difference)
|
| +
|
| + def test_system_cpu_percent(self):
|
| + psutil.cpu_percent(interval=0.001)
|
| + psutil.cpu_percent(interval=0.001)
|
| + for x in xrange(1000):
|
| + percent = psutil.cpu_percent(interval=None)
|
| + self.assertTrue(isinstance(percent, float))
|
| + self.assertTrue(percent >= 0.0)
|
| + self.assertTrue(percent <= 100.0)
|
| +
|
| + def test_process_cpu_percent(self):
|
| + p = psutil.Process(os.getpid())
|
| + p.get_cpu_percent(interval=0.001)
|
| + p.get_cpu_percent(interval=0.001)
|
| + for x in xrange(100):
|
| + percent = p.get_cpu_percent(interval=None)
|
| + self.assertTrue(isinstance(percent, float))
|
| + self.assertTrue(percent >= 0.0)
|
| + self.assertTrue(percent <= 100.0)
|
| +
|
| + def test_get_process_cpu_times(self):
|
| + times = psutil.Process(os.getpid()).get_cpu_times()
|
| + self.assertTrue((times.user > 0.0) or (times.system > 0.0))
|
| + # make sure returned values can be pretty printed with strftime
|
| + time.strftime("%H:%M:%S", time.localtime(times.user))
|
| + time.strftime("%H:%M:%S", time.localtime(times.system))
|
| +
|
| + # Test Process.cpu_times() against os.times()
|
| + # os.times() is broken on Python 2.6
|
| + # http://bugs.python.org/issue1040026
|
| + # XXX fails on OSX: not sure if it's for os.times(). We should
|
| + # try this with Python 2.7 and re-enable the test.
|
| +
|
| + @skipUnless(sys.version_info > (2, 6, 1) and not OSX)
|
| + def test_get_process_cpu_times2(self):
|
| + user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
|
| + utime, ktime = os.times()[:2]
|
| +
|
| + # Use os.times()[:2] as base values to compare our results
|
| + # using a tolerance of +/- 0.1 seconds.
|
| + # It will fail if the difference between the values is > 0.1s.
|
| + if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
|
| + self.fail("expected: %s, found: %s" %(utime, user_time))
|
| +
|
| + if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
|
| + self.fail("expected: %s, found: %s" %(ktime, kernel_time))
|
| +
|
| + def test_create_time(self):
|
| + sproc = get_test_subprocess()
|
| + now = time.time()
|
| + wait_for_pid(sproc.pid)
|
| + p = psutil.Process(sproc.pid)
|
| + create_time = p.create_time
|
| +
|
| + # Use time.time() as base value to compare our result using a
|
| + # tolerance of +/- 1 second.
|
| + # It will fail if the difference between the values is > 2s.
|
| + difference = abs(create_time - now)
|
| + if difference > 2:
|
| + self.fail("expected: %s, found: %s, difference: %s"
|
| + % (now, create_time, difference))
|
| +
|
| + # make sure returned value can be pretty printed with strftime
|
| + time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
|
| +
|
| + def test_get_num_threads(self):
|
| + p = psutil.Process(os.getpid())
|
| + numt1 = p.get_num_threads()
|
| + if not WINDOWS and not OSX:
|
| + # test is unreliable on Windows and OS X
|
| + # NOTE: sleep(1) is too long for OS X, works with sleep(.5)
|
| + self.assertEqual(numt1, 1)
|
| + t = threading.Thread(target=lambda:time.sleep(1))
|
| + t.start()
|
| + numt2 = p.get_num_threads()
|
| + if WINDOWS:
|
| + self.assertTrue(numt2 > numt1)
|
| + else:
|
| + self.assertEqual(numt2, 2)
|
| +
|
| + def test_get_memory_info(self):
|
| + p = psutil.Process(os.getpid())
|
| +
|
| + # step 1 - get a base value to compare our results
|
| + rss1, vms1 = p.get_memory_info()
|
| + percent1 = p.get_memory_percent()
|
| + self.assertTrue(rss1 > 0)
|
| + self.assertTrue(vms1 > 0)
|
| +
|
| + # step 2 - allocate some memory
|
| + memarr = [None] * 1500000
|
| +
|
| + rss2, vms2 = p.get_memory_info()
|
| + percent2 = p.get_memory_percent()
|
| + # make sure that the memory usage bumped up
|
| + self.assertTrue(rss2 > rss1)
|
| + self.assertTrue(vms2 >= vms1) # vms might be equal
|
| + self.assertTrue(percent2 > percent1)
|
| + del memarr
|
| +
|
| + def test_get_memory_percent(self):
|
| + p = psutil.Process(os.getpid())
|
| + self.assertTrue(p.get_memory_percent() > 0.0)
|
| +
|
| + def test_pid(self):
|
| + sproc = get_test_subprocess()
|
| + self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
|
| +
|
| + def test_eq(self):
|
| + sproc = get_test_subprocess()
|
| + wait_for_pid(sproc.pid)
|
| + self.assertTrue(psutil.Process(sproc.pid) == psutil.Process(sproc.pid))
|
| +
|
| + def test_is_running(self):
|
| + sproc = get_test_subprocess()
|
| + wait_for_pid(sproc.pid)
|
| + p = psutil.Process(sproc.pid)
|
| + self.assertTrue(p.is_running())
|
| + psutil.Process(sproc.pid).kill()
|
| + sproc.wait()
|
| + self.assertFalse(p.is_running())
|
| +
|
| + def test_pid_exists(self):
|
| + sproc = get_test_subprocess()
|
| + wait_for_pid(sproc.pid)
|
| + self.assertTrue(psutil.pid_exists(sproc.pid))
|
| + psutil.Process(sproc.pid).kill()
|
| + sproc.wait()
|
| + self.assertFalse(psutil.pid_exists(sproc.pid))
|
| + self.assertFalse(psutil.pid_exists(-1))
|
| +
|
| + def test_exe(self):
|
| + sproc = get_test_subprocess()
|
| + wait_for_pid(sproc.pid)
|
| + self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON)
|
| + for p in psutil.process_iter():
|
| + try:
|
| + exe = p.exe
|
| + except psutil.Error:
|
| + continue
|
| + if not exe:
|
| + continue
|
| + if not os.path.exists(exe):
|
| + self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \
|
| + % (repr(exe), p.pid, p.name, p.cmdline))
|
| + if hasattr(os, 'access') and hasattr(os, "X_OK"):
|
| + if not os.access(p.exe, os.X_OK):
|
| + self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s)" \
|
| + % (repr(p.exe), p.pid, p.name, p.cmdline))
|
| +
|
| + def test_path(self):
|
| + proc = psutil.Process(os.getpid())
|
| + warnings.filterwarnings("error")
|
| + try:
|
| + self.assertRaises(DeprecationWarning, getattr, proc, 'path')
|
| + finally:
|
| + warnings.resetwarnings()
|
| +
|
| + def test_cmdline(self):
|
| + sproc = get_test_subprocess([PYTHON, "-E"])
|
| + wait_for_pid(sproc.pid)
|
| + self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"])
|
| +
|
| + def test_name(self):
|
| + sproc = get_test_subprocess(PYTHON)
|
| + wait_for_pid(sproc.pid)
|
| + if OSX:
|
| + self.assertEqual(psutil.Process(sproc.pid).name, "Python")
|
| + else:
|
| + self.assertEqual(psutil.Process(sproc.pid).name, os.path.basename(PYTHON))
|
| +
|
| + def test_uid(self):
|
| + sproc = get_test_subprocess()
|
| + wait_for_pid(sproc.pid)
|
| + uid = psutil.Process(sproc.pid).uid
|
| + if hasattr(os, 'getuid'):
|
| + self.assertEqual(uid, os.getuid())
|
| + else:
|
| + # On those platforms where UID doesn't make sense (Windows)
|
| + # we expect it to be -1
|
| + self.assertEqual(uid, -1)
|
| +
|
| + def test_gid(self):
|
| + sproc = get_test_subprocess()
|
| + wait_for_pid(sproc.pid)
|
| + gid = psutil.Process(sproc.pid).gid
|
| + if hasattr(os, 'getgid'):
|
| + self.assertEqual(gid, os.getgid())
|
| + else:
|
| + # On those platforms where GID doesn't make sense (Windows)
|
| + # we expect it to be -1
|
| + self.assertEqual(gid, -1)
|
| +
|
| + def test_username(self):
|
| + sproc = get_test_subprocess()
|
| + p = psutil.Process(sproc.pid)
|
| + if POSIX:
|
| + import pwd
|
| + user = pwd.getpwuid(p.uid).pw_name
|
| + self.assertEqual(p.username, user)
|
| + elif WINDOWS:
|
| + expected_username = os.environ['USERNAME']
|
| + expected_domain = os.environ['USERDOMAIN']
|
| + domain, username = p.username.split('\\')
|
| + self.assertEqual(domain, expected_domain)
|
| + self.assertEqual(username, expected_username)
|
| + else:
|
| + p.username
|
| +
|
| + @skipUnless(WINDOWS or LINUX)
|
| + def test_getcwd(self):
|
| + sproc = get_test_subprocess()
|
| + wait_for_pid(sproc.pid)
|
| + p = psutil.Process(sproc.pid)
|
| + self.assertEqual(p.getcwd(), os.getcwd())
|
| +
|
| + @skipUnless(WINDOWS or LINUX)
|
| + def test_getcwd_2(self):
|
| + cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"]
|
| + sproc = get_test_subprocess(cmd)
|
| + wait_for_pid(sproc.pid)
|
| + p = psutil.Process(sproc.pid)
|
| + time.sleep(0.1)
|
| + expected_dir = os.path.dirname(os.getcwd())
|
| + self.assertEqual(p.getcwd(), expected_dir)
|
| +
|
| + def test_get_open_files(self):
|
| + thisfile = os.path.join(os.getcwd(), __file__)
|
| +
|
| + # current process
|
| + p = psutil.Process(os.getpid())
|
| + files = p.get_open_files()
|
| + self.assertFalse(thisfile in files)
|
| + f = open(thisfile, 'r')
|
| + filenames = [x.path for x in p.get_open_files()]
|
| + self.assertTrue(thisfile in filenames)
|
| + f.close()
|
| + for file in filenames:
|
| + self.assertTrue(os.path.isfile(file))
|
| +
|
| + # another process
|
| + cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % thisfile
|
| + sproc = get_test_subprocess([PYTHON, "-c", cmdline])
|
| + wait_for_pid(sproc.pid)
|
| + time.sleep(0.1)
|
| + p = psutil.Process(sproc.pid)
|
| + filenames = [x.path for x in p.get_open_files()]
|
| + self.assertTrue(thisfile in filenames)
|
| + for file in filenames:
|
| + self.assertTrue(os.path.isfile(file))
|
| + # all processes
|
| + for proc in psutil.process_iter():
|
| + try:
|
| + files = proc.get_open_files()
|
| + except psutil.Error:
|
| + pass
|
| + else:
|
| + for file in filenames:
|
| + self.assertTrue(os.path.isfile(file))
|
| +
|
| + def test_get_open_files2(self):
|
| + # test fd and path fields
|
| + fileobj = open(os.path.join(os.getcwd(), __file__), 'r')
|
| + p = psutil.Process(os.getpid())
|
| + for path, fd in p.get_open_files():
|
| + if path == fileobj.name or fd == fileobj.fileno():
|
| + break
|
| + else:
|
| + self.fail("no file found; files=%s" % repr(p.get_open_files()))
|
| + self.assertEqual(path, fileobj.name)
|
| + if WINDOWS:
|
| + self.assertEqual(fd, -1)
|
| + else:
|
| + self.assertEqual(fd, fileobj.fileno())
|
| + # test positions
|
| + ntuple = p.get_open_files()[0]
|
| + self.assertEqual(ntuple[0], ntuple.path)
|
| + self.assertEqual(ntuple[1], ntuple.fd)
|
| + # test file is gone
|
| + fileobj.close()
|
| + self.assertTrue(fileobj.name not in p.get_open_files())
|
| +
|
| + @skipUnless(SUPPORT_CONNECTIONS, warn=1)
|
| + def test_get_connections(self):
|
| + arg = "import socket, time;" \
|
| + "s = socket.socket();" \
|
| + "s.bind(('127.0.0.1', 0));" \
|
| + "s.listen(1);" \
|
| + "conn, addr = s.accept();" \
|
| + "time.sleep(100);"
|
| + sproc = get_test_subprocess([PYTHON, "-c", arg])
|
| + time.sleep(0.1)
|
| + p = psutil.Process(sproc.pid)
|
| + cons = p.get_connections()
|
| + self.assertTrue(len(cons) == 1)
|
| + con = cons[0]
|
| + self.assertEqual(con.family, socket.AF_INET)
|
| + self.assertEqual(con.type, socket.SOCK_STREAM)
|
| + self.assertEqual(con.status, "LISTEN")
|
| + ip, port = con.local_address
|
| + self.assertEqual(ip, '127.0.0.1')
|
| + self.assertEqual(con.remote_address, ())
|
| + if WINDOWS:
|
| + self.assertEqual(con.fd, -1)
|
| + else:
|
| + self.assertTrue(con.fd > 0)
|
| + # test positions
|
| + self.assertEqual(con[0], con.fd)
|
| + self.assertEqual(con[1], con.family)
|
| + self.assertEqual(con[2], con.type)
|
| + self.assertEqual(con[3], con.local_address)
|
| + self.assertEqual(con[4], con.remote_address)
|
| + self.assertEqual(con[5], con.status)
|
| +
|
| + @skipUnless(hasattr(socket, "fromfd") and not WINDOWS)
|
| + def test_connection_fromfd(self):
|
| + sock = socket.socket()
|
| + sock.bind(('127.0.0.1', 0))
|
| + sock.listen(1)
|
| + p = psutil.Process(os.getpid())
|
| + for conn in p.get_connections():
|
| + if conn.fd == sock.fileno():
|
| + break
|
| + else:
|
| + sock.close()
|
| + self.fail("couldn't find socket fd")
|
| + dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
|
| + try:
|
| + self.assertEqual(dupsock.getsockname(), conn.local_address)
|
| + self.assertNotEqual(sock.fileno(), dupsock.fileno())
|
| + finally:
|
| + sock.close()
|
| + dupsock.close()
|
| +
|
| + @skipUnless(SUPPORT_CONNECTIONS, warn=1)
|
| + def test_get_connections_all(self):
|
| +
|
| + def check_address(addr, family):
|
| + if not addr:
|
| + return
|
| + ip, port = addr
|
| + self.assertTrue(isinstance(port, int))
|
| + if family == socket.AF_INET:
|
| + ip = map(int, ip.split('.'))
|
| + self.assertTrue(len(ip) == 4)
|
| + for num in ip:
|
| + self.assertTrue(0 <= num <= 255)
|
| + self.assertTrue(0 <= port <= 65535)
|
| +
|
| + def supports_ipv6():
|
| + if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
|
| + return False
|
| + try:
|
| + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
| + sock.bind(("::1", 0))
|
| + except (socket.error, socket.gaierror):
|
| + return False
|
| + else:
|
| + sock.close()
|
| + return True
|
| +
|
| + # all values are supposed to match Linux's tcp_states.h states
|
| + # table across all platforms.
|
| + valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
|
| + "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
|
| + "LAST_ACK", "LISTEN", "CLOSING", ""]
|
| +
|
| + tcp_template = "import socket;" \
|
| + "s = socket.socket($family, socket.SOCK_STREAM);" \
|
| + "s.bind(('$addr', 0));" \
|
| + "s.listen(1);" \
|
| + "conn, addr = s.accept();"
|
| +
|
| + udp_template = "import socket, time;" \
|
| + "s = socket.socket($family, socket.SOCK_DGRAM);" \
|
| + "s.bind(('$addr', 0));" \
|
| + "time.sleep(100);"
|
| +
|
| + from string import Template
|
| + tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET,
|
| + addr="127.0.0.1")
|
| + udp4_template = Template(udp_template).substitute(family=socket.AF_INET,
|
| + addr="127.0.0.1")
|
| + tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6,
|
| + addr="::1")
|
| + udp6_template = Template(udp_template).substitute(family=socket.AF_INET6,
|
| + addr="::1")
|
| +
|
| + # launch various subprocess instantiating a socket of various
|
| + # families and tupes to enrich psutil results
|
| + tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template])
|
| + udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template])
|
| + if supports_ipv6():
|
| + tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template])
|
| + udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template])
|
| + else:
|
| + tcp6_proc = None
|
| + udp6_proc = None
|
| +
|
| + time.sleep(0.1)
|
| + this_proc = psutil.Process(os.getpid())
|
| + children_pids = [p.pid for p in this_proc.get_children()]
|
| + for p in psutil.process_iter():
|
| + try:
|
| + cons = p.get_connections()
|
| + except (psutil.NoSuchProcess, psutil.AccessDenied):
|
| + pass
|
| + else:
|
| + for conn in cons:
|
| + self.assertTrue(conn.type in (socket.SOCK_STREAM,
|
| + socket.SOCK_DGRAM))
|
| + self.assertTrue(conn.family in (socket.AF_INET,
|
| + socket.AF_INET6))
|
| + check_address(conn.local_address, conn.family)
|
| + check_address(conn.remote_address, conn.family)
|
| + if conn.status not in valid_states:
|
| + self.fail("%s is not a valid status" %conn.status)
|
| + # actually try to bind the local socket; ignore IPv6
|
| + # sockets as their address might be represented as
|
| + # an IPv4-mapped-address (e.g. "::127.0.0.1")
|
| + # and that's rejected by bind()
|
| + if conn.family == socket.AF_INET:
|
| + s = socket.socket(conn.family, conn.type)
|
| + s.bind((conn.local_address[0], 0))
|
| + s.close()
|
| +
|
| + if not WINDOWS and hasattr(socket, 'fromfd'):
|
| + try:
|
| + dupsock = socket.fromfd(conn.fd, conn.family,
|
| + conn.type)
|
| + except (socket.error, OSError), err:
|
| + if err.args[0] == errno.EBADF:
|
| + continue
|
| + else:
|
| + raise
|
| + self.assertEqual(dupsock.family, conn.family)
|
| + self.assertEqual(dupsock.type, conn.type)
|
| +
|
| + # check matches against subprocesses
|
| + if p.pid in children_pids:
|
| + self.assertTrue(len(cons) == 1)
|
| + conn = cons[0]
|
| + # TCP v4
|
| + if p.pid == tcp4_proc.pid:
|
| + self.assertEqual(conn.family, socket.AF_INET)
|
| + self.assertEqual(conn.type, socket.SOCK_STREAM)
|
| + self.assertEqual(conn.local_address[0], "127.0.0.1")
|
| + self.assertEqual(conn.remote_address, ())
|
| + self.assertEqual(conn.status, "LISTEN")
|
| + # UDP v4
|
| + elif p.pid == udp4_proc.pid:
|
| + self.assertEqual(conn.family, socket.AF_INET)
|
| + self.assertEqual(conn.type, socket.SOCK_DGRAM)
|
| + self.assertEqual(conn.local_address[0], "127.0.0.1")
|
| + self.assertEqual(conn.remote_address, ())
|
| + self.assertEqual(conn.status, "")
|
| + # TCP v6
|
| + elif p.pid == getattr(tcp6_proc, "pid", None):
|
| + self.assertEqual(conn.family, socket.AF_INET6)
|
| + self.assertEqual(conn.type, socket.SOCK_STREAM)
|
| + self.assertEqual(conn.local_address[0], "::1")
|
| + self.assertEqual(conn.remote_address, ())
|
| + self.assertEqual(conn.status, "LISTEN")
|
| + # UDP v6
|
| + elif p.pid == getattr(udp6_proc, "pid", None):
|
| + self.assertEqual(conn.family, socket.AF_INET6)
|
| + self.assertEqual(conn.type, socket.SOCK_DGRAM)
|
| + self.assertEqual(conn.local_address[0], "::1")
|
| + self.assertEqual(conn.remote_address, ())
|
| + self.assertEqual(conn.status, "")
|
| +
|
| + def test_parent_ppid(self):
|
| + this_parent = os.getpid()
|
| + sproc = get_test_subprocess()
|
| + p = psutil.Process(sproc.pid)
|
| + self.assertEqual(p.ppid, this_parent)
|
| + self.assertEqual(p.parent.pid, this_parent)
|
| + # no other process is supposed to have us as parent
|
| + for p in psutil.process_iter():
|
| + if p.pid == sproc.pid:
|
| + continue
|
| + self.assertTrue(p.ppid != this_parent)
|
| +
|
| + def test_get_children(self):
|
| + p = psutil.Process(os.getpid())
|
| + self.assertEqual(p.get_children(), [])
|
| + sproc = get_test_subprocess()
|
| + children = p.get_children()
|
| + self.assertEqual(len(children), 1)
|
| + self.assertEqual(children[0].pid, sproc.pid)
|
| + self.assertEqual(children[0].ppid, os.getpid())
|
| +
|
| + def test_suspend_resume(self):
|
| + sproc = get_test_subprocess()
|
| + p = psutil.Process(sproc.pid)
|
| + p.suspend()
|
| + p.resume()
|
| +
|
| + def test_get_pid_list(self):
|
| + plist = [x.pid for x in psutil.get_process_list()]
|
| + pidlist = psutil.get_pid_list()
|
| + self.assertEqual(plist.sort(), pidlist.sort())
|
| + # make sure every pid is unique
|
| + self.assertEqual(len(pidlist), len(set(pidlist)))
|
| +
|
| + def test_test(self):
|
| + # test for psutil.test() function
|
| + stdout = sys.stdout
|
| + sys.stdout = DEVNULL
|
| + try:
|
| + psutil.test()
|
| + finally:
|
| + sys.stdout = stdout
|
| +
|
| + def test_types(self):
|
| + sproc = get_test_subprocess()
|
| + wait_for_pid(sproc.pid)
|
| + p = psutil.Process(sproc.pid)
|
| + self.assert_(isinstance(p.pid, int))
|
| + self.assert_(isinstance(p.ppid, int))
|
| + self.assert_(isinstance(p.parent, psutil.Process))
|
| + self.assert_(isinstance(p.name, str))
|
| + if self.__class__.__name__ != "LimitedUserTestCase":
|
| + self.assert_(isinstance(p.exe, str))
|
| + self.assert_(isinstance(p.cmdline, list))
|
| + self.assert_(isinstance(p.uid, int))
|
| + self.assert_(isinstance(p.gid, int))
|
| + self.assert_(isinstance(p.create_time, float))
|
| + self.assert_(isinstance(p.username, (unicode, str)))
|
| + if hasattr(p, 'getcwd'):
|
| + if not POSIX and self.__class__.__name__ != "LimitedUserTestCase":
|
| + self.assert_(isinstance(p.getcwd(), str))
|
| + if not POSIX and self.__class__.__name__ != "LimitedUserTestCase":
|
| + self.assert_(isinstance(p.get_open_files(), list))
|
| + for path, fd in p.get_open_files():
|
| + self.assert_(isinstance(path, (unicode, str)))
|
| + self.assert_(isinstance(fd, int))
|
| + if not POSIX and self.__class__.__name__ != "LimitedUserTestCase" \
|
| + and SUPPORT_CONNECTIONS:
|
| + self.assert_(isinstance(p.get_connections(), list))
|
| + self.assert_(isinstance(p.is_running(), bool))
|
| + if not OSX or self.__class__.__name__ != "LimitedUserTestCase":
|
| + self.assert_(isinstance(p.get_cpu_times(), tuple))
|
| + self.assert_(isinstance(p.get_cpu_times()[0], float))
|
| + self.assert_(isinstance(p.get_cpu_times()[1], float))
|
| + self.assert_(isinstance(p.get_cpu_percent(0), float))
|
| + self.assert_(isinstance(p.get_memory_info(), tuple))
|
| + self.assert_(isinstance(p.get_memory_info()[0], int))
|
| + self.assert_(isinstance(p.get_memory_info()[1], int))
|
| + self.assert_(isinstance(p.get_memory_percent(), float))
|
| + self.assert_(isinstance(p.get_num_threads(), int))
|
| + self.assert_(isinstance(psutil.get_process_list(), list))
|
| + self.assert_(isinstance(psutil.get_process_list()[0], psutil.Process))
|
| + self.assert_(isinstance(psutil.process_iter(), types.GeneratorType))
|
| + self.assert_(isinstance(psutil.process_iter().next(), psutil.Process))
|
| + self.assert_(isinstance(psutil.get_pid_list(), list))
|
| + self.assert_(isinstance(psutil.get_pid_list()[0], int))
|
| + self.assert_(isinstance(psutil.pid_exists(1), bool))
|
| +
|
| + def test_invalid_pid(self):
|
| + self.assertRaises(ValueError, psutil.Process, "1")
|
| + self.assertRaises(ValueError, psutil.Process, None)
|
| + # Refers to Issue #12
|
| + self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1)
|
| +
|
| + def test_zombie_process(self):
|
| + # Test that NoSuchProcess exception gets raised in the event the
|
| + # process dies after we create the Process object.
|
| + # Example:
|
| + # >>> proc = Process(1234)
|
| + # >>> time.sleep(5) # time-consuming task, process dies in meantime
|
| + # >>> proc.name
|
| + # Refers to Issue #15
|
| + sproc = get_test_subprocess()
|
| + p = psutil.Process(sproc.pid)
|
| + p.kill()
|
| + sproc.wait()
|
| +
|
| + self.assertRaises(psutil.NoSuchProcess, getattr, p, "ppid")
|
| + self.assertRaises(psutil.NoSuchProcess, getattr, p, "parent")
|
| + self.assertRaises(psutil.NoSuchProcess, getattr, p, "name")
|
| + self.assertRaises(psutil.NoSuchProcess, getattr, p, "exe")
|
| + self.assertRaises(psutil.NoSuchProcess, getattr, p, "cmdline")
|
| + self.assertRaises(psutil.NoSuchProcess, getattr, p, "uid")
|
| + self.assertRaises(psutil.NoSuchProcess, getattr, p, "gid")
|
| + self.assertRaises(psutil.NoSuchProcess, getattr, p, "create_time")
|
| + self.assertRaises(psutil.NoSuchProcess, getattr, p, "username")
|
| + if hasattr(p, 'getcwd'):
|
| + self.assertRaises(psutil.NoSuchProcess, p.getcwd)
|
| + self.assertRaises(psutil.NoSuchProcess, p.get_open_files)
|
| + self.assertRaises(psutil.NoSuchProcess, p.get_connections)
|
| + self.assertRaises(psutil.NoSuchProcess, p.suspend)
|
| + self.assertRaises(psutil.NoSuchProcess, p.resume)
|
| + self.assertRaises(psutil.NoSuchProcess, p.kill)
|
| + self.assertRaises(psutil.NoSuchProcess, p.terminate)
|
| + self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
|
| + self.assertRaises(psutil.NoSuchProcess, p.get_cpu_times)
|
| + self.assertRaises(psutil.NoSuchProcess, p.get_cpu_percent, 0)
|
| + self.assertRaises(psutil.NoSuchProcess, p.get_memory_info)
|
| + self.assertRaises(psutil.NoSuchProcess, p.get_memory_percent)
|
| + self.assertRaises(psutil.NoSuchProcess, p.get_children)
|
| + self.assertRaises(psutil.NoSuchProcess, p.get_num_threads)
|
| + self.assertFalse(p.is_running())
|
| +
|
| + def test__str__(self):
|
| + sproc = get_test_subprocess()
|
| + p = psutil.Process(sproc.pid)
|
| + self.assertTrue(str(sproc.pid) in str(p))
|
| + self.assertTrue(os.path.basename(PYTHON) in str(p))
|
| + sproc = get_test_subprocess()
|
| + p = psutil.Process(sproc.pid)
|
| + p.kill()
|
| + sproc.wait()
|
| + self.assertTrue(str(sproc.pid) in str(p))
|
| + self.assertTrue("terminated" in str(p))
|
| +
|
| + def test_fetch_all(self):
|
| + valid_procs = 0
|
| + attrs = ['__str__', 'create_time', 'username', 'getcwd', 'get_cpu_times',
|
| + 'get_memory_info', 'get_memory_percent', 'get_open_files',
|
| + 'get_num_threads']
|
| + for p in psutil.process_iter():
|
| + for attr in attrs:
|
| + # skip slow Python implementation; we're reasonably sure
|
| + # it works anyway
|
| + if POSIX and attr == 'get_open_files':
|
| + continue
|
| + try:
|
| + attr = getattr(p, attr, None)
|
| + if attr is not None and callable(attr):
|
| + attr()
|
| + valid_procs += 1
|
| + except (psutil.NoSuchProcess, psutil.AccessDenied), err:
|
| + self.assertEqual(err.pid, p.pid)
|
| + if err.name:
|
| + self.assertEqual(err.name, p.name)
|
| + self.assertTrue(str(err))
|
| + self.assertTrue(err.msg)
|
| + except:
|
| + trace = traceback.format_exc()
|
| + self.fail('Exception raised for method %s, pid %s:\n%s'
|
| + %(attr, p.pid, trace))
|
| +
|
| + # we should always have a non-empty list, not including PID 0 etc.
|
| + # special cases.
|
| + self.assertTrue(valid_procs > 0)
|
| +
|
| + def test_pid_0(self):
|
| + # Process(0) is supposed to work on all platforms even if with
|
| + # some differences
|
| + p = psutil.Process(0)
|
| + if WINDOWS:
|
| + self.assertEqual(p.name, 'System Idle Process')
|
| + elif LINUX:
|
| + self.assertEqual(p.name, 'sched')
|
| + elif BSD:
|
| + self.assertEqual(p.name, 'swapper')
|
| + elif OSX:
|
| + self.assertEqual(p.name, 'kernel_task')
|
| +
|
| + if os.name == 'posix':
|
| + self.assertEqual(p.uid, 0)
|
| + self.assertEqual(p.gid, 0)
|
| + else:
|
| + self.assertEqual(p.uid, -1)
|
| + self.assertEqual(p.gid, -1)
|
| +
|
| + self.assertTrue(p.ppid in (0, 1))
|
| + self.assertEqual(p.exe, "")
|
| + self.assertEqual(p.cmdline, [])
|
| + # this can either raise AD (Win) or return 0 (UNIX)
|
| + try:
|
| + self.assertTrue(p.get_num_threads() in (0, 1))
|
| + except psutil.AccessDenied:
|
| + pass
|
| +
|
| + if OSX : #and os.geteuid() != 0:
|
| + self.assertRaises(psutil.AccessDenied, p.get_memory_info)
|
| + self.assertRaises(psutil.AccessDenied, p.get_cpu_times)
|
| + else:
|
| + p.get_memory_info()
|
| +
|
| + # username property
|
| + if POSIX:
|
| + self.assertEqual(p.username, 'root')
|
| + elif WINDOWS:
|
| + self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM')
|
| + else:
|
| + p.username
|
| +
|
| + # PID 0 is supposed to be available on all platforms
|
| + self.assertTrue(0 in psutil.get_pid_list())
|
| + self.assertTrue(psutil.pid_exists(0))
|
| +
|
| + # --- OS specific tests
|
| +
|
| +
|
| +if hasattr(os, 'getuid'):
|
| + class LimitedUserTestCase(TestCase):
|
| + """Repeat the previous tests by using a limited user.
|
| + Executed only on UNIX and only if the user who run the test script
|
| + is root.
|
| + """
|
| + # the uid/gid the test suite runs under
|
| + PROCESS_UID = os.getuid()
|
| + PROCESS_GID = os.getgid()
|
| +
|
| + def setUp(self):
|
| + os.setegid(1000)
|
| + os.seteuid(1000)
|
| + TestCase.setUp(self)
|
| +
|
| + def tearDown(self):
|
| + os.setegid(self.PROCESS_UID)
|
| + os.seteuid(self.PROCESS_GID)
|
| + TestCase.tearDown(self)
|
| +
|
| + def test_path(self):
|
| + # DeprecationWarning is only raised once
|
| + pass
|
| +
|
| + # overridden tests known to raise AccessDenied when run
|
| + # as limited user on different platforms
|
| +
|
| + if LINUX:
|
| +
|
| + def test_getcwd(self):
|
| + self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd, self)
|
| +
|
| + def test_getcwd_2(self):
|
| + self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd_2, self)
|
| +
|
| + def test_get_open_files(self):
|
| + self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self)
|
| +
|
| + def test_get_connections(self):
|
| + pass
|
| +
|
| + def test_exe(self):
|
| + self.assertRaises(psutil.AccessDenied, TestCase.test_exe, self)
|
| +
|
| + if BSD:
|
| +
|
| + def test_get_open_files(self):
|
| + self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self)
|
| +
|
| + def test_get_open_files2(self):
|
| + self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self)
|
| +
|
| + def test_get_connections(self):
|
| + self.assertRaises(psutil.AccessDenied, TestCase.test_get_connections, self)
|
| +
|
| + def test_connection_fromfd(self):
|
| + self.assertRaises(psutil.AccessDenied, TestCase.test_connection_fromfd, self)
|
| +
|
| +
|
| +def test_main():
|
| + tests = []
|
| + test_suite = unittest.TestSuite()
|
| +
|
| + tests.append(TestCase)
|
| +
|
| + if hasattr(os, 'getuid'):
|
| + if os.getuid() == 0:
|
| + tests.append(LimitedUserTestCase)
|
| + else:
|
| + atexit.register(warnings.warn, "Couldn't run limited user tests ("
|
| + "super-user privileges are required)", RuntimeWarning)
|
| +
|
| + if POSIX:
|
| + from _posix import PosixSpecificTestCase
|
| + tests.append(PosixSpecificTestCase)
|
| +
|
| + # import the specific platform test suite
|
| + if LINUX:
|
| + from _linux import LinuxSpecificTestCase as stc
|
| + elif WINDOWS:
|
| + from _windows import WindowsSpecificTestCase as stc
|
| + elif OSX:
|
| + from _osx import OSXSpecificTestCase as stc
|
| + elif BSD:
|
| + from _bsd import BSDSpecificTestCase as stc
|
| + tests.append(stc)
|
| +
|
| + for test_class in tests:
|
| + test_suite.addTest(unittest.makeSuite(test_class))
|
| +
|
| + unittest.TextTestRunner(verbosity=2).run(test_suite)
|
| + DEVNULL.close()
|
| +
|
| +if __name__ == '__main__':
|
| + test_main()
|
| +
|
| +
|
|
|