| Index: third_party/psutil/test/_posix.py
|
| diff --git a/third_party/psutil/test/_posix.py b/third_party/psutil/test/_posix.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..1879dc7294f2c9131226952198fe6757f5b22798
|
| --- /dev/null
|
| +++ b/third_party/psutil/test/_posix.py
|
| @@ -0,0 +1,139 @@
|
| +#!/usr/bin/env python
|
| +#
|
| +# $Id: _posix.py 744 2010-10-27 22:42:42Z jloden $
|
| +#
|
| +
|
| +import unittest
|
| +import subprocess
|
| +import time
|
| +import sys
|
| +import os
|
| +
|
| +import psutil
|
| +
|
| +from test_psutil import kill, get_test_subprocess, PYTHON, LINUX, OSX
|
| +
|
| +
|
| +def ps(cmd):
|
| + """Expects a ps command with a -o argument and parse the result
|
| + returning only the value of interest.
|
| + """
|
| + if not LINUX:
|
| + cmd = cmd.replace(" --no-headers ", " ")
|
| + p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
|
| + output = p.communicate()[0].strip()
|
| + if sys.version_info >= (3,):
|
| + output = str(output, sys.stdout.encoding)
|
| + if not LINUX:
|
| + output = output.split('\n')[1]
|
| + try:
|
| + return int(output)
|
| + except ValueError:
|
| + return output
|
| +
|
| +
|
| +class PosixSpecificTestCase(unittest.TestCase):
|
| + """Compare psutil results against 'ps' command line utility."""
|
| +
|
| + # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
|
| +
|
| + def setUp(self):
|
| + self.pid = get_test_subprocess([PYTHON, "-E", "-O"]).pid
|
| +
|
| + def tearDown(self):
|
| + kill(self.pid)
|
| +
|
| + def test_process_parent_pid(self):
|
| + ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
|
| + ppid_psutil = psutil.Process(self.pid).ppid
|
| + self.assertEqual(ppid_ps, ppid_psutil)
|
| +
|
| + def test_process_uid(self):
|
| + uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
|
| + uid_psutil = psutil.Process(self.pid).uid
|
| + self.assertEqual(uid_ps, uid_psutil)
|
| +
|
| + def test_process_gid(self):
|
| + gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
|
| + gid_psutil = psutil.Process(self.pid).gid
|
| + self.assertEqual(gid_ps, gid_psutil)
|
| +
|
| + def test_process_username(self):
|
| + username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
|
| + username_psutil = psutil.Process(self.pid).username
|
| + self.assertEqual(username_ps, username_psutil)
|
| +
|
| + def test_process_rss_memory(self):
|
| + # give python interpreter some time to properly initialize
|
| + # so that the results are the same
|
| + time.sleep(0.1)
|
| + rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
|
| + rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
|
| + self.assertEqual(rss_ps, rss_psutil)
|
| +
|
| + def test_process_vsz_memory(self):
|
| + # give python interpreter some time to properly initialize
|
| + # so that the results are the same
|
| + time.sleep(0.1)
|
| + vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
|
| + vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
|
| + self.assertEqual(vsz_ps, vsz_psutil)
|
| +
|
| + def test_process_name(self):
|
| + # use command + arg since "comm" keyword not supported on all platforms
|
| + name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
|
| + # remove path if there is any, from the command
|
| + name_ps = os.path.basename(name_ps)
|
| + name_psutil = psutil.Process(self.pid).name
|
| + if OSX:
|
| + self.assertEqual(name_psutil, "Python")
|
| + else:
|
| + self.assertEqual(name_ps, name_psutil)
|
| +
|
| +
|
| + def test_process_exe(self):
|
| + ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
|
| + psutil_pathname = psutil.Process(self.pid).exe
|
| + self.assertEqual(ps_pathname, psutil_pathname)
|
| +
|
| + def test_process_cmdline(self):
|
| + ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
|
| + psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
|
| + self.assertEqual(ps_cmdline, psutil_cmdline)
|
| +
|
| + def test_get_pids(self):
|
| + # Note: this test might fail if the OS is starting/killing
|
| + # other processes in the meantime
|
| + p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIPE)
|
| + output = p.communicate()[0].strip()
|
| + if sys.version_info >= (3,):
|
| + output = str(output, sys.stdout.encoding)
|
| + output = output.replace('PID', '')
|
| + p.wait()
|
| + pids_ps = []
|
| + for pid in output.split('\n'):
|
| + if pid:
|
| + pids_ps.append(int(pid.strip()))
|
| + # remove ps subprocess pid which is supposed to be dead in meantime
|
| + pids_ps.remove(p.pid)
|
| + # not all systems include pid 0 in their list but psutil does so
|
| + # we force it
|
| + if 0 not in pids_ps:
|
| + pids_ps.append(0)
|
| +
|
| + pids_psutil = psutil.get_pid_list()
|
| + pids_ps.sort()
|
| + pids_psutil.sort()
|
| +
|
| + if pids_ps != pids_psutil:
|
| + difference = filter(lambda x:x not in pids_ps, pids_psutil) + \
|
| + filter(lambda x:x not in pids_psutil, pids_ps)
|
| + self.fail("difference: " + str(difference))
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + test_suite = unittest.TestSuite()
|
| + test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
|
| + unittest.TextTestRunner(verbosity=2).run(test_suite)
|
| +
|
| +
|
|
|