OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # |
| 3 # $Id: _posix.py 744 2010-10-27 22:42:42Z jloden $ |
| 4 # |
| 5 |
| 6 import unittest |
| 7 import subprocess |
| 8 import time |
| 9 import sys |
| 10 import os |
| 11 |
| 12 import psutil |
| 13 |
| 14 from test_psutil import kill, get_test_subprocess, PYTHON, LINUX, OSX |
| 15 |
| 16 |
| 17 def ps(cmd): |
| 18 """Expects a ps command with a -o argument and parse the result |
| 19 returning only the value of interest. |
| 20 """ |
| 21 if not LINUX: |
| 22 cmd = cmd.replace(" --no-headers ", " ") |
| 23 p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) |
| 24 output = p.communicate()[0].strip() |
| 25 if sys.version_info >= (3,): |
| 26 output = str(output, sys.stdout.encoding) |
| 27 if not LINUX: |
| 28 output = output.split('\n')[1] |
| 29 try: |
| 30 return int(output) |
| 31 except ValueError: |
| 32 return output |
| 33 |
| 34 |
| 35 class PosixSpecificTestCase(unittest.TestCase): |
| 36 """Compare psutil results against 'ps' command line utility.""" |
| 37 |
| 38 # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps |
| 39 |
| 40 def setUp(self): |
| 41 self.pid = get_test_subprocess([PYTHON, "-E", "-O"]).pid |
| 42 |
| 43 def tearDown(self): |
| 44 kill(self.pid) |
| 45 |
| 46 def test_process_parent_pid(self): |
| 47 ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid) |
| 48 ppid_psutil = psutil.Process(self.pid).ppid |
| 49 self.assertEqual(ppid_ps, ppid_psutil) |
| 50 |
| 51 def test_process_uid(self): |
| 52 uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid) |
| 53 uid_psutil = psutil.Process(self.pid).uid |
| 54 self.assertEqual(uid_ps, uid_psutil) |
| 55 |
| 56 def test_process_gid(self): |
| 57 gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid) |
| 58 gid_psutil = psutil.Process(self.pid).gid |
| 59 self.assertEqual(gid_ps, gid_psutil) |
| 60 |
| 61 def test_process_username(self): |
| 62 username_ps = ps("ps --no-headers -o user -p %s" %self.pid) |
| 63 username_psutil = psutil.Process(self.pid).username |
| 64 self.assertEqual(username_ps, username_psutil) |
| 65 |
| 66 def test_process_rss_memory(self): |
| 67 # give python interpreter some time to properly initialize |
| 68 # so that the results are the same |
| 69 time.sleep(0.1) |
| 70 rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid) |
| 71 rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024 |
| 72 self.assertEqual(rss_ps, rss_psutil) |
| 73 |
| 74 def test_process_vsz_memory(self): |
| 75 # give python interpreter some time to properly initialize |
| 76 # so that the results are the same |
| 77 time.sleep(0.1) |
| 78 vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid) |
| 79 vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024 |
| 80 self.assertEqual(vsz_ps, vsz_psutil) |
| 81 |
| 82 def test_process_name(self): |
| 83 # use command + arg since "comm" keyword not supported on all platforms |
| 84 name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] |
| 85 # remove path if there is any, from the command |
| 86 name_ps = os.path.basename(name_ps) |
| 87 name_psutil = psutil.Process(self.pid).name |
| 88 if OSX: |
| 89 self.assertEqual(name_psutil, "Python") |
| 90 else: |
| 91 self.assertEqual(name_ps, name_psutil) |
| 92 |
| 93 |
| 94 def test_process_exe(self): |
| 95 ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' '
)[0] |
| 96 psutil_pathname = psutil.Process(self.pid).exe |
| 97 self.assertEqual(ps_pathname, psutil_pathname) |
| 98 |
| 99 def test_process_cmdline(self): |
| 100 ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid) |
| 101 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline) |
| 102 self.assertEqual(ps_cmdline, psutil_cmdline) |
| 103 |
| 104 def test_get_pids(self): |
| 105 # Note: this test might fail if the OS is starting/killing |
| 106 # other processes in the meantime |
| 107 p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIP
E) |
| 108 output = p.communicate()[0].strip() |
| 109 if sys.version_info >= (3,): |
| 110 output = str(output, sys.stdout.encoding) |
| 111 output = output.replace('PID', '') |
| 112 p.wait() |
| 113 pids_ps = [] |
| 114 for pid in output.split('\n'): |
| 115 if pid: |
| 116 pids_ps.append(int(pid.strip())) |
| 117 # remove ps subprocess pid which is supposed to be dead in meantime |
| 118 pids_ps.remove(p.pid) |
| 119 # not all systems include pid 0 in their list but psutil does so |
| 120 # we force it |
| 121 if 0 not in pids_ps: |
| 122 pids_ps.append(0) |
| 123 |
| 124 pids_psutil = psutil.get_pid_list() |
| 125 pids_ps.sort() |
| 126 pids_psutil.sort() |
| 127 |
| 128 if pids_ps != pids_psutil: |
| 129 difference = filter(lambda x:x not in pids_ps, pids_psutil) + \ |
| 130 filter(lambda x:x not in pids_psutil, pids_ps) |
| 131 self.fail("difference: " + str(difference)) |
| 132 |
| 133 |
| 134 if __name__ == '__main__': |
| 135 test_suite = unittest.TestSuite() |
| 136 test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) |
| 137 unittest.TextTestRunner(verbosity=2).run(test_suite) |
| 138 |
| 139 |
OLD | NEW |