Index: third_party/psutil/test/test_psutil.py |
diff --git a/third_party/psutil/test/test_psutil.py b/third_party/psutil/test/test_psutil.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..c067dee07a7bc5dcaa82f0e4738ce439ce3c8feb |
--- /dev/null |
+++ b/third_party/psutil/test/test_psutil.py |
@@ -0,0 +1,1062 @@ |
+#!/usr/bin/env python |
+# |
+# $Id: test_psutil.py 806 2010-11-12 23:09:35Z g.rodola $ |
+# |
+ |
+"""psutil test suite. |
+Note: this is targeted for python 2.x. |
+To run it under python 3.x you need to use 2to3 tool first: |
+ |
+$ 2to3 -w test/*.py |
+""" |
+ |
+import unittest |
+import os |
+import sys |
+import subprocess |
+import time |
+import signal |
+import types |
+import traceback |
+import socket |
+import warnings |
+import atexit |
+import errno |
+import threading |
+ |
+import psutil |
+ |
+ |
+PYTHON = os.path.realpath(sys.executable) |
+DEVNULL = open(os.devnull, 'r+') |
+ |
+POSIX = os.name == 'posix' |
+LINUX = sys.platform.lower().startswith("linux") |
+WINDOWS = sys.platform.lower().startswith("win32") |
+OSX = sys.platform.lower().startswith("darwin") |
+BSD = sys.platform.lower().startswith("freebsd") |
+ |
+try: |
+ psutil.Process(os.getpid()).get_connections() |
+except NotImplementedError, err: |
+ SUPPORT_CONNECTIONS = False |
+ atexit.register(warnings.warn, "get_connections() not supported on this platform", |
+ RuntimeWarning) |
+else: |
+ SUPPORT_CONNECTIONS = True |
+ |
+ |
+_subprocesses_started = set() |
+ |
+def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None): |
+ """Return a subprocess.Popen object to use in tests. |
+ By default stdout and stderr are redirected to /dev/null and the |
+ python interpreter is used as test process. |
+ """ |
+ if cmd is None: |
+ cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] |
+ sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin) |
+ _subprocesses_started.add(sproc.pid) |
+ return sproc |
+ |
+def wait_for_pid(pid, timeout=1): |
+ """Wait for pid to show up in the process list then return. |
+ Used in the test suite to give time the sub process to initialize. |
+ """ |
+ raise_at = time.time() + timeout |
+ while 1: |
+ if pid in psutil.get_pid_list(): |
+ # give it one more iteration to allow full initialization |
+ time.sleep(0.01) |
+ return |
+ time.sleep(0.0001) |
+ if time.time() >= raise_at: |
+ raise RuntimeError("Timed out") |
+ |
+def kill(pid): |
+ """Kill a process given its PID.""" |
+ if hasattr(os, 'kill'): |
+ os.kill(pid, signal.SIGKILL) |
+ else: |
+ psutil.Process(pid).kill() |
+ |
+def reap_children(search_all=False): |
+ """Kill any subprocess started by this test suite and ensure that |
+ no zombies stick around to hog resources and create problems when |
+ looking for refleaks. |
+ """ |
+ if POSIX: |
+ def waitpid(process): |
+ # on posix we are free to wait for any pending process by |
+ # passing -1 to os.waitpid() |
+ while True: |
+ try: |
+ any_process = -1 |
+ pid, status = os.waitpid(any_process, os.WNOHANG) |
+ if pid == 0 and not process.is_running(): |
+ break |
+ except OSError: |
+ if not process.is_running(): |
+ break |
+ else: |
+ def waitpid(process): |
+ # on non-posix systems we just wait for the given process |
+ # to go away |
+ while process.is_running(): |
+ time.sleep(0.01) |
+ |
+ if search_all: |
+ this_process = psutil.Process(os.getpid()) |
+ pids = [x.pid for x in this_process.get_children()] |
+ else: |
+ pids =_subprocesses_started |
+ while pids: |
+ pid = pids.pop() |
+ try: |
+ child = psutil.Process(pid) |
+ child.kill() |
+ except psutil.NoSuchProcess: |
+ pass |
+ else: |
+ waitpid(child) |
+ |
+# we want to search through all processes before exiting |
+atexit.register(reap_children, search_all=True) |
+ |
+def skipIf(condition, reason="", warn=False): |
+ """Decorator which skip a test under if condition is satisfied. |
+ This is a substitute of unittest.skipIf which is available |
+ only in python 2.7 and 3.2. |
+ If 'reason' argument is provided this will be printed during |
+ tests execution. |
+ If 'warn' is provided a RuntimeWarning will be shown when all |
+ tests are run. |
+ """ |
+ def outer(fun, *args, **kwargs): |
+ def inner(self): |
+ if condition: |
+ sys.stdout.write("skipped-") |
+ sys.stdout.flush() |
+ if warn: |
+ objname = "%s.%s" % (self.__class__.__name__, fun.__name__) |
+ msg = "%s was skipped" % objname |
+ if reason: |
+ msg += "; reason: " + repr(reason) |
+ atexit.register(warnings.warn, msg, RuntimeWarning) |
+ return |
+ else: |
+ return fun(self, *args, **kwargs) |
+ return inner |
+ return outer |
+ |
+def skipUnless(condition, reason="", warn=False): |
+ """Contrary of skipIf.""" |
+ if not condition: |
+ return skipIf(True, reason, warn) |
+ return skipIf(False) |
+ |
+ |
+class TestCase(unittest.TestCase): |
+ |
+ def tearDown(self): |
+ reap_children() |
+ |
+ def test_get_process_list(self): |
+ pids = [x.pid for x in psutil.get_process_list()] |
+ self.assertTrue(os.getpid() in pids) |
+ self.assertTrue(0 in pids) |
+ |
+ def test_process_iter(self): |
+ pids = [x.pid for x in psutil.process_iter()] |
+ self.assertTrue(os.getpid() in pids) |
+ self.assertTrue(0 in pids) |
+ |
+ def test_kill(self): |
+ sproc = get_test_subprocess() |
+ test_pid = sproc.pid |
+ wait_for_pid(test_pid) |
+ p = psutil.Process(test_pid) |
+ name = p.name |
+ p.kill() |
+ sproc.wait() |
+ self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
+ |
+ def test_terminate(self): |
+ sproc = get_test_subprocess() |
+ test_pid = sproc.pid |
+ wait_for_pid(test_pid) |
+ p = psutil.Process(test_pid) |
+ name = p.name |
+ p.terminate() |
+ sproc.wait() |
+ self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
+ |
+ def test_send_signal(self): |
+ if POSIX: |
+ sig = signal.SIGKILL |
+ else: |
+ sig = signal.SIGTERM |
+ sproc = get_test_subprocess() |
+ test_pid = sproc.pid |
+ p = psutil.Process(test_pid) |
+ name = p.name |
+ p.send_signal(sig) |
+ sproc.wait() |
+ self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
+ |
+ def test_TOTAL_PHYMEM(self): |
+ x = psutil.TOTAL_PHYMEM |
+ self.assertTrue(isinstance(x, (int, long))) |
+ self.assertTrue(x > 0) |
+ |
+ def test_used_phymem(self): |
+ x = psutil.used_phymem() |
+ self.assertTrue(isinstance(x, (int, long))) |
+ self.assertTrue(x > 0) |
+ |
+ def test_avail_phymem(self): |
+ x = psutil.avail_phymem() |
+ self.assertTrue(isinstance(x, (int, long))) |
+ self.assertTrue(x > 0) |
+ |
+ def test_total_virtmem(self): |
+ x = psutil.total_virtmem() |
+ self.assertTrue(isinstance(x, (int, long))) |
+ self.assertTrue(x >= 0) |
+ |
+ def test_used_virtmem(self): |
+ x = psutil.used_virtmem() |
+ self.assertTrue(isinstance(x, (int, long))) |
+ self.assertTrue(x >= 0) |
+ |
+ def test_avail_virtmem(self): |
+ x = psutil.avail_virtmem() |
+ self.assertTrue(isinstance(x, (int, long))) |
+ self.assertTrue(x >= 0) |
+ |
+ @skipUnless(LINUX) |
+ def test_cached_phymem(self): |
+ x = psutil.cached_phymem() |
+ self.assertTrue(isinstance(x, (int, long))) |
+ self.assertTrue(x >= 0) |
+ |
+ @skipUnless(LINUX) |
+ def test_phymem_buffers(self): |
+ x = psutil.phymem_buffers() |
+ self.assertTrue(isinstance(x, (int, long))) |
+ self.assertTrue(x >= 0) |
+ |
+ def test_system_cpu_times(self): |
+ total = 0 |
+ times = psutil.cpu_times() |
+ self.assertTrue(isinstance(times, psutil.CPUTimes)) |
+ sum(times) |
+ for cp_time in times: |
+ self.assertTrue(isinstance(cp_time, float)) |
+ self.assertTrue(cp_time >= 0.0) |
+ total += cp_time |
+ # test CPUTimes's __iter__ and __str__ implementation |
+ self.assertEqual(total, sum(times)) |
+ str(times) |
+ |
+ def test_system_cpu_times2(self): |
+ t1 = sum(psutil.cpu_times()) |
+ time.sleep(0.1) |
+ t2 = sum(psutil.cpu_times()) |
+ difference = t2 - t1 |
+ if not difference >= 0.05: |
+ self.fail("difference %s" % difference) |
+ |
+ def test_system_cpu_percent(self): |
+ psutil.cpu_percent(interval=0.001) |
+ psutil.cpu_percent(interval=0.001) |
+ for x in xrange(1000): |
+ percent = psutil.cpu_percent(interval=None) |
+ self.assertTrue(isinstance(percent, float)) |
+ self.assertTrue(percent >= 0.0) |
+ self.assertTrue(percent <= 100.0) |
+ |
+ def test_process_cpu_percent(self): |
+ p = psutil.Process(os.getpid()) |
+ p.get_cpu_percent(interval=0.001) |
+ p.get_cpu_percent(interval=0.001) |
+ for x in xrange(100): |
+ percent = p.get_cpu_percent(interval=None) |
+ self.assertTrue(isinstance(percent, float)) |
+ self.assertTrue(percent >= 0.0) |
+ self.assertTrue(percent <= 100.0) |
+ |
+ def test_get_process_cpu_times(self): |
+ times = psutil.Process(os.getpid()).get_cpu_times() |
+ self.assertTrue((times.user > 0.0) or (times.system > 0.0)) |
+ # make sure returned values can be pretty printed with strftime |
+ time.strftime("%H:%M:%S", time.localtime(times.user)) |
+ time.strftime("%H:%M:%S", time.localtime(times.system)) |
+ |
+ # Test Process.cpu_times() against os.times() |
+ # os.times() is broken on Python 2.6 |
+ # http://bugs.python.org/issue1040026 |
+ # XXX fails on OSX: not sure if it's for os.times(). We should |
+ # try this with Python 2.7 and re-enable the test. |
+ |
+ @skipUnless(sys.version_info > (2, 6, 1) and not OSX) |
+ def test_get_process_cpu_times2(self): |
+ user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() |
+ utime, ktime = os.times()[:2] |
+ |
+ # Use os.times()[:2] as base values to compare our results |
+ # using a tolerance of +/- 0.1 seconds. |
+ # It will fail if the difference between the values is > 0.1s. |
+ if (max([user_time, utime]) - min([user_time, utime])) > 0.1: |
+ self.fail("expected: %s, found: %s" %(utime, user_time)) |
+ |
+ if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: |
+ self.fail("expected: %s, found: %s" %(ktime, kernel_time)) |
+ |
+ def test_create_time(self): |
+ sproc = get_test_subprocess() |
+ now = time.time() |
+ wait_for_pid(sproc.pid) |
+ p = psutil.Process(sproc.pid) |
+ create_time = p.create_time |
+ |
+ # Use time.time() as base value to compare our result using a |
+ # tolerance of +/- 1 second. |
+ # It will fail if the difference between the values is > 2s. |
+ difference = abs(create_time - now) |
+ if difference > 2: |
+ self.fail("expected: %s, found: %s, difference: %s" |
+ % (now, create_time, difference)) |
+ |
+ # make sure returned value can be pretty printed with strftime |
+ time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) |
+ |
+ def test_get_num_threads(self): |
+ p = psutil.Process(os.getpid()) |
+ numt1 = p.get_num_threads() |
+ if not WINDOWS and not OSX: |
+ # test is unreliable on Windows and OS X |
+ # NOTE: sleep(1) is too long for OS X, works with sleep(.5) |
+ self.assertEqual(numt1, 1) |
+ t = threading.Thread(target=lambda:time.sleep(1)) |
+ t.start() |
+ numt2 = p.get_num_threads() |
+ if WINDOWS: |
+ self.assertTrue(numt2 > numt1) |
+ else: |
+ self.assertEqual(numt2, 2) |
+ |
+ def test_get_memory_info(self): |
+ p = psutil.Process(os.getpid()) |
+ |
+ # step 1 - get a base value to compare our results |
+ rss1, vms1 = p.get_memory_info() |
+ percent1 = p.get_memory_percent() |
+ self.assertTrue(rss1 > 0) |
+ self.assertTrue(vms1 > 0) |
+ |
+ # step 2 - allocate some memory |
+ memarr = [None] * 1500000 |
+ |
+ rss2, vms2 = p.get_memory_info() |
+ percent2 = p.get_memory_percent() |
+ # make sure that the memory usage bumped up |
+ self.assertTrue(rss2 > rss1) |
+ self.assertTrue(vms2 >= vms1) # vms might be equal |
+ self.assertTrue(percent2 > percent1) |
+ del memarr |
+ |
+ def test_get_memory_percent(self): |
+ p = psutil.Process(os.getpid()) |
+ self.assertTrue(p.get_memory_percent() > 0.0) |
+ |
+ def test_pid(self): |
+ sproc = get_test_subprocess() |
+ self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) |
+ |
+ def test_eq(self): |
+ sproc = get_test_subprocess() |
+ wait_for_pid(sproc.pid) |
+ self.assertTrue(psutil.Process(sproc.pid) == psutil.Process(sproc.pid)) |
+ |
+ def test_is_running(self): |
+ sproc = get_test_subprocess() |
+ wait_for_pid(sproc.pid) |
+ p = psutil.Process(sproc.pid) |
+ self.assertTrue(p.is_running()) |
+ psutil.Process(sproc.pid).kill() |
+ sproc.wait() |
+ self.assertFalse(p.is_running()) |
+ |
+ def test_pid_exists(self): |
+ sproc = get_test_subprocess() |
+ wait_for_pid(sproc.pid) |
+ self.assertTrue(psutil.pid_exists(sproc.pid)) |
+ psutil.Process(sproc.pid).kill() |
+ sproc.wait() |
+ self.assertFalse(psutil.pid_exists(sproc.pid)) |
+ self.assertFalse(psutil.pid_exists(-1)) |
+ |
+ def test_exe(self): |
+ sproc = get_test_subprocess() |
+ wait_for_pid(sproc.pid) |
+ self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON) |
+ for p in psutil.process_iter(): |
+ try: |
+ exe = p.exe |
+ except psutil.Error: |
+ continue |
+ if not exe: |
+ continue |
+ if not os.path.exists(exe): |
+ self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \ |
+ % (repr(exe), p.pid, p.name, p.cmdline)) |
+ if hasattr(os, 'access') and hasattr(os, "X_OK"): |
+ if not os.access(p.exe, os.X_OK): |
+ self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s)" \ |
+ % (repr(p.exe), p.pid, p.name, p.cmdline)) |
+ |
+ def test_path(self): |
+ proc = psutil.Process(os.getpid()) |
+ warnings.filterwarnings("error") |
+ try: |
+ self.assertRaises(DeprecationWarning, getattr, proc, 'path') |
+ finally: |
+ warnings.resetwarnings() |
+ |
+ def test_cmdline(self): |
+ sproc = get_test_subprocess([PYTHON, "-E"]) |
+ wait_for_pid(sproc.pid) |
+ self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"]) |
+ |
+ def test_name(self): |
+ sproc = get_test_subprocess(PYTHON) |
+ wait_for_pid(sproc.pid) |
+ if OSX: |
+ self.assertEqual(psutil.Process(sproc.pid).name, "Python") |
+ else: |
+ self.assertEqual(psutil.Process(sproc.pid).name, os.path.basename(PYTHON)) |
+ |
+ def test_uid(self): |
+ sproc = get_test_subprocess() |
+ wait_for_pid(sproc.pid) |
+ uid = psutil.Process(sproc.pid).uid |
+ if hasattr(os, 'getuid'): |
+ self.assertEqual(uid, os.getuid()) |
+ else: |
+ # On those platforms where UID doesn't make sense (Windows) |
+ # we expect it to be -1 |
+ self.assertEqual(uid, -1) |
+ |
+ def test_gid(self): |
+ sproc = get_test_subprocess() |
+ wait_for_pid(sproc.pid) |
+ gid = psutil.Process(sproc.pid).gid |
+ if hasattr(os, 'getgid'): |
+ self.assertEqual(gid, os.getgid()) |
+ else: |
+ # On those platforms where GID doesn't make sense (Windows) |
+ # we expect it to be -1 |
+ self.assertEqual(gid, -1) |
+ |
+ def test_username(self): |
+ sproc = get_test_subprocess() |
+ p = psutil.Process(sproc.pid) |
+ if POSIX: |
+ import pwd |
+ user = pwd.getpwuid(p.uid).pw_name |
+ self.assertEqual(p.username, user) |
+ elif WINDOWS: |
+ expected_username = os.environ['USERNAME'] |
+ expected_domain = os.environ['USERDOMAIN'] |
+ domain, username = p.username.split('\\') |
+ self.assertEqual(domain, expected_domain) |
+ self.assertEqual(username, expected_username) |
+ else: |
+ p.username |
+ |
+ @skipUnless(WINDOWS or LINUX) |
+ def test_getcwd(self): |
+ sproc = get_test_subprocess() |
+ wait_for_pid(sproc.pid) |
+ p = psutil.Process(sproc.pid) |
+ self.assertEqual(p.getcwd(), os.getcwd()) |
+ |
+ @skipUnless(WINDOWS or LINUX) |
+ def test_getcwd_2(self): |
+ cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] |
+ sproc = get_test_subprocess(cmd) |
+ wait_for_pid(sproc.pid) |
+ p = psutil.Process(sproc.pid) |
+ time.sleep(0.1) |
+ expected_dir = os.path.dirname(os.getcwd()) |
+ self.assertEqual(p.getcwd(), expected_dir) |
+ |
+ def test_get_open_files(self): |
+ thisfile = os.path.join(os.getcwd(), __file__) |
+ |
+ # current process |
+ p = psutil.Process(os.getpid()) |
+ files = p.get_open_files() |
+ self.assertFalse(thisfile in files) |
+ f = open(thisfile, 'r') |
+ filenames = [x.path for x in p.get_open_files()] |
+ self.assertTrue(thisfile in filenames) |
+ f.close() |
+ for file in filenames: |
+ self.assertTrue(os.path.isfile(file)) |
+ |
+ # another process |
+ cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % thisfile |
+ sproc = get_test_subprocess([PYTHON, "-c", cmdline]) |
+ wait_for_pid(sproc.pid) |
+ time.sleep(0.1) |
+ p = psutil.Process(sproc.pid) |
+ filenames = [x.path for x in p.get_open_files()] |
+ self.assertTrue(thisfile in filenames) |
+ for file in filenames: |
+ self.assertTrue(os.path.isfile(file)) |
+ # all processes |
+ for proc in psutil.process_iter(): |
+ try: |
+ files = proc.get_open_files() |
+ except psutil.Error: |
+ pass |
+ else: |
+ for file in filenames: |
+ self.assertTrue(os.path.isfile(file)) |
+ |
+ def test_get_open_files2(self): |
+ # test fd and path fields |
+ fileobj = open(os.path.join(os.getcwd(), __file__), 'r') |
+ p = psutil.Process(os.getpid()) |
+ for path, fd in p.get_open_files(): |
+ if path == fileobj.name or fd == fileobj.fileno(): |
+ break |
+ else: |
+ self.fail("no file found; files=%s" % repr(p.get_open_files())) |
+ self.assertEqual(path, fileobj.name) |
+ if WINDOWS: |
+ self.assertEqual(fd, -1) |
+ else: |
+ self.assertEqual(fd, fileobj.fileno()) |
+ # test positions |
+ ntuple = p.get_open_files()[0] |
+ self.assertEqual(ntuple[0], ntuple.path) |
+ self.assertEqual(ntuple[1], ntuple.fd) |
+ # test file is gone |
+ fileobj.close() |
+ self.assertTrue(fileobj.name not in p.get_open_files()) |
+ |
+ @skipUnless(SUPPORT_CONNECTIONS, warn=1) |
+ def test_get_connections(self): |
+ arg = "import socket, time;" \ |
+ "s = socket.socket();" \ |
+ "s.bind(('127.0.0.1', 0));" \ |
+ "s.listen(1);" \ |
+ "conn, addr = s.accept();" \ |
+ "time.sleep(100);" |
+ sproc = get_test_subprocess([PYTHON, "-c", arg]) |
+ time.sleep(0.1) |
+ p = psutil.Process(sproc.pid) |
+ cons = p.get_connections() |
+ self.assertTrue(len(cons) == 1) |
+ con = cons[0] |
+ self.assertEqual(con.family, socket.AF_INET) |
+ self.assertEqual(con.type, socket.SOCK_STREAM) |
+ self.assertEqual(con.status, "LISTEN") |
+ ip, port = con.local_address |
+ self.assertEqual(ip, '127.0.0.1') |
+ self.assertEqual(con.remote_address, ()) |
+ if WINDOWS: |
+ self.assertEqual(con.fd, -1) |
+ else: |
+ self.assertTrue(con.fd > 0) |
+ # test positions |
+ self.assertEqual(con[0], con.fd) |
+ self.assertEqual(con[1], con.family) |
+ self.assertEqual(con[2], con.type) |
+ self.assertEqual(con[3], con.local_address) |
+ self.assertEqual(con[4], con.remote_address) |
+ self.assertEqual(con[5], con.status) |
+ |
+ @skipUnless(hasattr(socket, "fromfd") and not WINDOWS) |
+ def test_connection_fromfd(self): |
+ sock = socket.socket() |
+ sock.bind(('127.0.0.1', 0)) |
+ sock.listen(1) |
+ p = psutil.Process(os.getpid()) |
+ for conn in p.get_connections(): |
+ if conn.fd == sock.fileno(): |
+ break |
+ else: |
+ sock.close() |
+ self.fail("couldn't find socket fd") |
+ dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
+ try: |
+ self.assertEqual(dupsock.getsockname(), conn.local_address) |
+ self.assertNotEqual(sock.fileno(), dupsock.fileno()) |
+ finally: |
+ sock.close() |
+ dupsock.close() |
+ |
+ @skipUnless(SUPPORT_CONNECTIONS, warn=1) |
+ def test_get_connections_all(self): |
+ |
+ def check_address(addr, family): |
+ if not addr: |
+ return |
+ ip, port = addr |
+ self.assertTrue(isinstance(port, int)) |
+ if family == socket.AF_INET: |
+ ip = map(int, ip.split('.')) |
+ self.assertTrue(len(ip) == 4) |
+ for num in ip: |
+ self.assertTrue(0 <= num <= 255) |
+ self.assertTrue(0 <= port <= 65535) |
+ |
+ def supports_ipv6(): |
+ if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): |
+ return False |
+ try: |
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
+ sock.bind(("::1", 0)) |
+ except (socket.error, socket.gaierror): |
+ return False |
+ else: |
+ sock.close() |
+ return True |
+ |
+ # all values are supposed to match Linux's tcp_states.h states |
+ # table across all platforms. |
+ valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", |
+ "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", |
+ "LAST_ACK", "LISTEN", "CLOSING", ""] |
+ |
+ tcp_template = "import socket;" \ |
+ "s = socket.socket($family, socket.SOCK_STREAM);" \ |
+ "s.bind(('$addr', 0));" \ |
+ "s.listen(1);" \ |
+ "conn, addr = s.accept();" |
+ |
+ udp_template = "import socket, time;" \ |
+ "s = socket.socket($family, socket.SOCK_DGRAM);" \ |
+ "s.bind(('$addr', 0));" \ |
+ "time.sleep(100);" |
+ |
+ from string import Template |
+ tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, |
+ addr="127.0.0.1") |
+ udp4_template = Template(udp_template).substitute(family=socket.AF_INET, |
+ addr="127.0.0.1") |
+ tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6, |
+ addr="::1") |
+ udp6_template = Template(udp_template).substitute(family=socket.AF_INET6, |
+ addr="::1") |
+ |
+ # launch various subprocess instantiating a socket of various |
+ # families and tupes to enrich psutil results |
+ tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) |
+ udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) |
+ if supports_ipv6(): |
+ tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) |
+ udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) |
+ else: |
+ tcp6_proc = None |
+ udp6_proc = None |
+ |
+ time.sleep(0.1) |
+ this_proc = psutil.Process(os.getpid()) |
+ children_pids = [p.pid for p in this_proc.get_children()] |
+ for p in psutil.process_iter(): |
+ try: |
+ cons = p.get_connections() |
+ except (psutil.NoSuchProcess, psutil.AccessDenied): |
+ pass |
+ else: |
+ for conn in cons: |
+ self.assertTrue(conn.type in (socket.SOCK_STREAM, |
+ socket.SOCK_DGRAM)) |
+ self.assertTrue(conn.family in (socket.AF_INET, |
+ socket.AF_INET6)) |
+ check_address(conn.local_address, conn.family) |
+ check_address(conn.remote_address, conn.family) |
+ if conn.status not in valid_states: |
+ self.fail("%s is not a valid status" %conn.status) |
+ # actually try to bind the local socket; ignore IPv6 |
+ # sockets as their address might be represented as |
+ # an IPv4-mapped-address (e.g. "::127.0.0.1") |
+ # and that's rejected by bind() |
+ if conn.family == socket.AF_INET: |
+ s = socket.socket(conn.family, conn.type) |
+ s.bind((conn.local_address[0], 0)) |
+ s.close() |
+ |
+ if not WINDOWS and hasattr(socket, 'fromfd'): |
+ try: |
+ dupsock = socket.fromfd(conn.fd, conn.family, |
+ conn.type) |
+ except (socket.error, OSError), err: |
+ if err.args[0] == errno.EBADF: |
+ continue |
+ else: |
+ raise |
+ self.assertEqual(dupsock.family, conn.family) |
+ self.assertEqual(dupsock.type, conn.type) |
+ |
+ # check matches against subprocesses |
+ if p.pid in children_pids: |
+ self.assertTrue(len(cons) == 1) |
+ conn = cons[0] |
+ # TCP v4 |
+ if p.pid == tcp4_proc.pid: |
+ self.assertEqual(conn.family, socket.AF_INET) |
+ self.assertEqual(conn.type, socket.SOCK_STREAM) |
+ self.assertEqual(conn.local_address[0], "127.0.0.1") |
+ self.assertEqual(conn.remote_address, ()) |
+ self.assertEqual(conn.status, "LISTEN") |
+ # UDP v4 |
+ elif p.pid == udp4_proc.pid: |
+ self.assertEqual(conn.family, socket.AF_INET) |
+ self.assertEqual(conn.type, socket.SOCK_DGRAM) |
+ self.assertEqual(conn.local_address[0], "127.0.0.1") |
+ self.assertEqual(conn.remote_address, ()) |
+ self.assertEqual(conn.status, "") |
+ # TCP v6 |
+ elif p.pid == getattr(tcp6_proc, "pid", None): |
+ self.assertEqual(conn.family, socket.AF_INET6) |
+ self.assertEqual(conn.type, socket.SOCK_STREAM) |
+ self.assertEqual(conn.local_address[0], "::1") |
+ self.assertEqual(conn.remote_address, ()) |
+ self.assertEqual(conn.status, "LISTEN") |
+ # UDP v6 |
+ elif p.pid == getattr(udp6_proc, "pid", None): |
+ self.assertEqual(conn.family, socket.AF_INET6) |
+ self.assertEqual(conn.type, socket.SOCK_DGRAM) |
+ self.assertEqual(conn.local_address[0], "::1") |
+ self.assertEqual(conn.remote_address, ()) |
+ self.assertEqual(conn.status, "") |
+ |
+ def test_parent_ppid(self): |
+ this_parent = os.getpid() |
+ sproc = get_test_subprocess() |
+ p = psutil.Process(sproc.pid) |
+ self.assertEqual(p.ppid, this_parent) |
+ self.assertEqual(p.parent.pid, this_parent) |
+ # no other process is supposed to have us as parent |
+ for p in psutil.process_iter(): |
+ if p.pid == sproc.pid: |
+ continue |
+ self.assertTrue(p.ppid != this_parent) |
+ |
+ def test_get_children(self): |
+ p = psutil.Process(os.getpid()) |
+ self.assertEqual(p.get_children(), []) |
+ sproc = get_test_subprocess() |
+ children = p.get_children() |
+ self.assertEqual(len(children), 1) |
+ self.assertEqual(children[0].pid, sproc.pid) |
+ self.assertEqual(children[0].ppid, os.getpid()) |
+ |
+ def test_suspend_resume(self): |
+ sproc = get_test_subprocess() |
+ p = psutil.Process(sproc.pid) |
+ p.suspend() |
+ p.resume() |
+ |
+ def test_get_pid_list(self): |
+ plist = [x.pid for x in psutil.get_process_list()] |
+ pidlist = psutil.get_pid_list() |
+ self.assertEqual(plist.sort(), pidlist.sort()) |
+ # make sure every pid is unique |
+ self.assertEqual(len(pidlist), len(set(pidlist))) |
+ |
+ def test_test(self): |
+ # test for psutil.test() function |
+ stdout = sys.stdout |
+ sys.stdout = DEVNULL |
+ try: |
+ psutil.test() |
+ finally: |
+ sys.stdout = stdout |
+ |
+ def test_types(self): |
+ sproc = get_test_subprocess() |
+ wait_for_pid(sproc.pid) |
+ p = psutil.Process(sproc.pid) |
+ self.assert_(isinstance(p.pid, int)) |
+ self.assert_(isinstance(p.ppid, int)) |
+ self.assert_(isinstance(p.parent, psutil.Process)) |
+ self.assert_(isinstance(p.name, str)) |
+ if self.__class__.__name__ != "LimitedUserTestCase": |
+ self.assert_(isinstance(p.exe, str)) |
+ self.assert_(isinstance(p.cmdline, list)) |
+ self.assert_(isinstance(p.uid, int)) |
+ self.assert_(isinstance(p.gid, int)) |
+ self.assert_(isinstance(p.create_time, float)) |
+ self.assert_(isinstance(p.username, (unicode, str))) |
+ if hasattr(p, 'getcwd'): |
+ if not POSIX and self.__class__.__name__ != "LimitedUserTestCase": |
+ self.assert_(isinstance(p.getcwd(), str)) |
+ if not POSIX and self.__class__.__name__ != "LimitedUserTestCase": |
+ self.assert_(isinstance(p.get_open_files(), list)) |
+ for path, fd in p.get_open_files(): |
+ self.assert_(isinstance(path, (unicode, str))) |
+ self.assert_(isinstance(fd, int)) |
+ if not POSIX and self.__class__.__name__ != "LimitedUserTestCase" \ |
+ and SUPPORT_CONNECTIONS: |
+ self.assert_(isinstance(p.get_connections(), list)) |
+ self.assert_(isinstance(p.is_running(), bool)) |
+ if not OSX or self.__class__.__name__ != "LimitedUserTestCase": |
+ self.assert_(isinstance(p.get_cpu_times(), tuple)) |
+ self.assert_(isinstance(p.get_cpu_times()[0], float)) |
+ self.assert_(isinstance(p.get_cpu_times()[1], float)) |
+ self.assert_(isinstance(p.get_cpu_percent(0), float)) |
+ self.assert_(isinstance(p.get_memory_info(), tuple)) |
+ self.assert_(isinstance(p.get_memory_info()[0], int)) |
+ self.assert_(isinstance(p.get_memory_info()[1], int)) |
+ self.assert_(isinstance(p.get_memory_percent(), float)) |
+ self.assert_(isinstance(p.get_num_threads(), int)) |
+ self.assert_(isinstance(psutil.get_process_list(), list)) |
+ self.assert_(isinstance(psutil.get_process_list()[0], psutil.Process)) |
+ self.assert_(isinstance(psutil.process_iter(), types.GeneratorType)) |
+ self.assert_(isinstance(psutil.process_iter().next(), psutil.Process)) |
+ self.assert_(isinstance(psutil.get_pid_list(), list)) |
+ self.assert_(isinstance(psutil.get_pid_list()[0], int)) |
+ self.assert_(isinstance(psutil.pid_exists(1), bool)) |
+ |
+ def test_invalid_pid(self): |
+ self.assertRaises(ValueError, psutil.Process, "1") |
+ self.assertRaises(ValueError, psutil.Process, None) |
+ # Refers to Issue #12 |
+ self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1) |
+ |
+ def test_zombie_process(self): |
+ # Test that NoSuchProcess exception gets raised in the event the |
+ # process dies after we create the Process object. |
+ # Example: |
+ # >>> proc = Process(1234) |
+ # >>> time.sleep(5) # time-consuming task, process dies in meantime |
+ # >>> proc.name |
+ # Refers to Issue #15 |
+ sproc = get_test_subprocess() |
+ p = psutil.Process(sproc.pid) |
+ p.kill() |
+ sproc.wait() |
+ |
+ self.assertRaises(psutil.NoSuchProcess, getattr, p, "ppid") |
+ self.assertRaises(psutil.NoSuchProcess, getattr, p, "parent") |
+ self.assertRaises(psutil.NoSuchProcess, getattr, p, "name") |
+ self.assertRaises(psutil.NoSuchProcess, getattr, p, "exe") |
+ self.assertRaises(psutil.NoSuchProcess, getattr, p, "cmdline") |
+ self.assertRaises(psutil.NoSuchProcess, getattr, p, "uid") |
+ self.assertRaises(psutil.NoSuchProcess, getattr, p, "gid") |
+ self.assertRaises(psutil.NoSuchProcess, getattr, p, "create_time") |
+ self.assertRaises(psutil.NoSuchProcess, getattr, p, "username") |
+ if hasattr(p, 'getcwd'): |
+ self.assertRaises(psutil.NoSuchProcess, p.getcwd) |
+ self.assertRaises(psutil.NoSuchProcess, p.get_open_files) |
+ self.assertRaises(psutil.NoSuchProcess, p.get_connections) |
+ self.assertRaises(psutil.NoSuchProcess, p.suspend) |
+ self.assertRaises(psutil.NoSuchProcess, p.resume) |
+ self.assertRaises(psutil.NoSuchProcess, p.kill) |
+ self.assertRaises(psutil.NoSuchProcess, p.terminate) |
+ self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) |
+ self.assertRaises(psutil.NoSuchProcess, p.get_cpu_times) |
+ self.assertRaises(psutil.NoSuchProcess, p.get_cpu_percent, 0) |
+ self.assertRaises(psutil.NoSuchProcess, p.get_memory_info) |
+ self.assertRaises(psutil.NoSuchProcess, p.get_memory_percent) |
+ self.assertRaises(psutil.NoSuchProcess, p.get_children) |
+ self.assertRaises(psutil.NoSuchProcess, p.get_num_threads) |
+ self.assertFalse(p.is_running()) |
+ |
+ def test__str__(self): |
+ sproc = get_test_subprocess() |
+ p = psutil.Process(sproc.pid) |
+ self.assertTrue(str(sproc.pid) in str(p)) |
+ self.assertTrue(os.path.basename(PYTHON) in str(p)) |
+ sproc = get_test_subprocess() |
+ p = psutil.Process(sproc.pid) |
+ p.kill() |
+ sproc.wait() |
+ self.assertTrue(str(sproc.pid) in str(p)) |
+ self.assertTrue("terminated" in str(p)) |
+ |
+ def test_fetch_all(self): |
+ valid_procs = 0 |
+ attrs = ['__str__', 'create_time', 'username', 'getcwd', 'get_cpu_times', |
+ 'get_memory_info', 'get_memory_percent', 'get_open_files', |
+ 'get_num_threads'] |
+ for p in psutil.process_iter(): |
+ for attr in attrs: |
+ # skip slow Python implementation; we're reasonably sure |
+ # it works anyway |
+ if POSIX and attr == 'get_open_files': |
+ continue |
+ try: |
+ attr = getattr(p, attr, None) |
+ if attr is not None and callable(attr): |
+ attr() |
+ valid_procs += 1 |
+ except (psutil.NoSuchProcess, psutil.AccessDenied), err: |
+ self.assertEqual(err.pid, p.pid) |
+ if err.name: |
+ self.assertEqual(err.name, p.name) |
+ self.assertTrue(str(err)) |
+ self.assertTrue(err.msg) |
+ except: |
+ trace = traceback.format_exc() |
+ self.fail('Exception raised for method %s, pid %s:\n%s' |
+ %(attr, p.pid, trace)) |
+ |
+ # we should always have a non-empty list, not including PID 0 etc. |
+ # special cases. |
+ self.assertTrue(valid_procs > 0) |
+ |
+ def test_pid_0(self): |
+ # Process(0) is supposed to work on all platforms even if with |
+ # some differences |
+ p = psutil.Process(0) |
+ if WINDOWS: |
+ self.assertEqual(p.name, 'System Idle Process') |
+ elif LINUX: |
+ self.assertEqual(p.name, 'sched') |
+ elif BSD: |
+ self.assertEqual(p.name, 'swapper') |
+ elif OSX: |
+ self.assertEqual(p.name, 'kernel_task') |
+ |
+ if os.name == 'posix': |
+ self.assertEqual(p.uid, 0) |
+ self.assertEqual(p.gid, 0) |
+ else: |
+ self.assertEqual(p.uid, -1) |
+ self.assertEqual(p.gid, -1) |
+ |
+ self.assertTrue(p.ppid in (0, 1)) |
+ self.assertEqual(p.exe, "") |
+ self.assertEqual(p.cmdline, []) |
+ # this can either raise AD (Win) or return 0 (UNIX) |
+ try: |
+ self.assertTrue(p.get_num_threads() in (0, 1)) |
+ except psutil.AccessDenied: |
+ pass |
+ |
+ if OSX : #and os.geteuid() != 0: |
+ self.assertRaises(psutil.AccessDenied, p.get_memory_info) |
+ self.assertRaises(psutil.AccessDenied, p.get_cpu_times) |
+ else: |
+ p.get_memory_info() |
+ |
+ # username property |
+ if POSIX: |
+ self.assertEqual(p.username, 'root') |
+ elif WINDOWS: |
+ self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') |
+ else: |
+ p.username |
+ |
+ # PID 0 is supposed to be available on all platforms |
+ self.assertTrue(0 in psutil.get_pid_list()) |
+ self.assertTrue(psutil.pid_exists(0)) |
+ |
+ # --- OS specific tests |
+ |
+ |
+if hasattr(os, 'getuid'): |
+ class LimitedUserTestCase(TestCase): |
+ """Repeat the previous tests by using a limited user. |
+ Executed only on UNIX and only if the user who run the test script |
+ is root. |
+ """ |
+ # the uid/gid the test suite runs under |
+ PROCESS_UID = os.getuid() |
+ PROCESS_GID = os.getgid() |
+ |
+ def setUp(self): |
+ os.setegid(1000) |
+ os.seteuid(1000) |
+ TestCase.setUp(self) |
+ |
+ def tearDown(self): |
+ os.setegid(self.PROCESS_UID) |
+ os.seteuid(self.PROCESS_GID) |
+ TestCase.tearDown(self) |
+ |
+ def test_path(self): |
+ # DeprecationWarning is only raised once |
+ pass |
+ |
+ # overridden tests known to raise AccessDenied when run |
+ # as limited user on different platforms |
+ |
+ if LINUX: |
+ |
+ def test_getcwd(self): |
+ self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd, self) |
+ |
+ def test_getcwd_2(self): |
+ self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd_2, self) |
+ |
+ def test_get_open_files(self): |
+ self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self) |
+ |
+ def test_get_connections(self): |
+ pass |
+ |
+ def test_exe(self): |
+ self.assertRaises(psutil.AccessDenied, TestCase.test_exe, self) |
+ |
+ if BSD: |
+ |
+ def test_get_open_files(self): |
+ self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self) |
+ |
+ def test_get_open_files2(self): |
+ self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_files, self) |
+ |
+ def test_get_connections(self): |
+ self.assertRaises(psutil.AccessDenied, TestCase.test_get_connections, self) |
+ |
+ def test_connection_fromfd(self): |
+ self.assertRaises(psutil.AccessDenied, TestCase.test_connection_fromfd, self) |
+ |
+ |
+def test_main(): |
+ tests = [] |
+ test_suite = unittest.TestSuite() |
+ |
+ tests.append(TestCase) |
+ |
+ if hasattr(os, 'getuid'): |
+ if os.getuid() == 0: |
+ tests.append(LimitedUserTestCase) |
+ else: |
+ atexit.register(warnings.warn, "Couldn't run limited user tests (" |
+ "super-user privileges are required)", RuntimeWarning) |
+ |
+ if POSIX: |
+ from _posix import PosixSpecificTestCase |
+ tests.append(PosixSpecificTestCase) |
+ |
+ # import the specific platform test suite |
+ if LINUX: |
+ from _linux import LinuxSpecificTestCase as stc |
+ elif WINDOWS: |
+ from _windows import WindowsSpecificTestCase as stc |
+ elif OSX: |
+ from _osx import OSXSpecificTestCase as stc |
+ elif BSD: |
+ from _bsd import BSDSpecificTestCase as stc |
+ tests.append(stc) |
+ |
+ for test_class in tests: |
+ test_suite.addTest(unittest.makeSuite(test_class)) |
+ |
+ unittest.TextTestRunner(verbosity=2).run(test_suite) |
+ DEVNULL.close() |
+ |
+if __name__ == '__main__': |
+ test_main() |
+ |
+ |