| OLD | NEW |
| (Empty) |
| 1 #!/usr/bin/env python | |
| 2 # | |
| 3 # $Id: _posix.py 1142 2011-10-05 18:45:49Z g.rodola $ | |
| 4 # | |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. | |
| 6 # Use of this source code is governed by a BSD-style license that can be | |
| 7 # found in the LICENSE file. | |
| 8 | |
| 9 """POSIX specific tests. These are implicitly run by test_psutil.py.""" | |
| 10 | |
| 11 import unittest | |
| 12 import subprocess | |
| 13 import time | |
| 14 import sys | |
| 15 import os | |
| 16 | |
| 17 import psutil | |
| 18 | |
| 19 from test_psutil import (get_test_subprocess, reap_children, PYTHON, LINUX, OSX, | |
| 20 ignore_access_denied, sh) | |
| 21 | |
| 22 | |
| 23 def ps(cmd): | |
| 24 """Expects a ps command with a -o argument and parse the result | |
| 25 returning only the value of interest. | |
| 26 """ | |
| 27 if not LINUX: | |
| 28 cmd = cmd.replace(" --no-headers ", " ") | |
| 29 p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) | |
| 30 output = p.communicate()[0].strip() | |
| 31 if sys.version_info >= (3,): | |
| 32 output = str(output, sys.stdout.encoding) | |
| 33 if not LINUX: | |
| 34 output = output.split('\n')[1] | |
| 35 try: | |
| 36 return int(output) | |
| 37 except ValueError: | |
| 38 return output | |
| 39 | |
| 40 | |
| 41 class PosixSpecificTestCase(unittest.TestCase): | |
| 42 """Compare psutil results against 'ps' command line utility.""" | |
| 43 | |
| 44 # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps | |
| 45 | |
| 46 def setUp(self): | |
| 47 self.pid = get_test_subprocess([PYTHON, "-E", "-O"]).pid | |
| 48 | |
| 49 def tearDown(self): | |
| 50 reap_children() | |
| 51 | |
| 52 def test_process_parent_pid(self): | |
| 53 ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid) | |
| 54 ppid_psutil = psutil.Process(self.pid).ppid | |
| 55 self.assertEqual(ppid_ps, ppid_psutil) | |
| 56 | |
| 57 def test_process_uid(self): | |
| 58 uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid) | |
| 59 uid_psutil = psutil.Process(self.pid).uids.real | |
| 60 self.assertEqual(uid_ps, uid_psutil) | |
| 61 | |
| 62 def test_process_gid(self): | |
| 63 gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid) | |
| 64 gid_psutil = psutil.Process(self.pid).gids.real | |
| 65 self.assertEqual(gid_ps, gid_psutil) | |
| 66 | |
| 67 def test_process_username(self): | |
| 68 username_ps = ps("ps --no-headers -o user -p %s" %self.pid) | |
| 69 username_psutil = psutil.Process(self.pid).username | |
| 70 self.assertEqual(username_ps, username_psutil) | |
| 71 | |
| 72 @ignore_access_denied | |
| 73 def test_process_rss_memory(self): | |
| 74 # give python interpreter some time to properly initialize | |
| 75 # so that the results are the same | |
| 76 time.sleep(0.1) | |
| 77 rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid) | |
| 78 rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024 | |
| 79 self.assertEqual(rss_ps, rss_psutil) | |
| 80 | |
| 81 @ignore_access_denied | |
| 82 def test_process_vsz_memory(self): | |
| 83 # give python interpreter some time to properly initialize | |
| 84 # so that the results are the same | |
| 85 time.sleep(0.1) | |
| 86 vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid) | |
| 87 vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024 | |
| 88 self.assertEqual(vsz_ps, vsz_psutil) | |
| 89 | |
| 90 def test_process_name(self): | |
| 91 # use command + arg since "comm" keyword not supported on all platforms | |
| 92 name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] | |
| 93 # remove path if there is any, from the command | |
| 94 name_ps = os.path.basename(name_ps).lower() | |
| 95 name_psutil = psutil.Process(self.pid).name.lower() | |
| 96 self.assertEqual(name_ps, name_psutil) | |
| 97 | |
| 98 def test_process_exe(self): | |
| 99 ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' '
)[0] | |
| 100 psutil_pathname = psutil.Process(self.pid).exe | |
| 101 try: | |
| 102 self.assertEqual(ps_pathname, psutil_pathname) | |
| 103 except AssertionError: | |
| 104 # certain platforms such as BSD are more accurate returning: | |
| 105 # "/usr/local/bin/python2.7" | |
| 106 # ...instead of: | |
| 107 # "/usr/local/bin/python" | |
| 108 # We do not want to consider this difference in accuracy | |
| 109 # an error. | |
| 110 adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] | |
| 111 self.assertEqual(ps_pathname, adjusted_ps_pathname) | |
| 112 | |
| 113 def test_process_cmdline(self): | |
| 114 ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid) | |
| 115 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline) | |
| 116 self.assertEqual(ps_cmdline, psutil_cmdline) | |
| 117 | |
| 118 def test_get_pids(self): | |
| 119 # Note: this test might fail if the OS is starting/killing | |
| 120 # other processes in the meantime | |
| 121 p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIP
E) | |
| 122 output = p.communicate()[0].strip() | |
| 123 if sys.version_info >= (3,): | |
| 124 output = str(output, sys.stdout.encoding) | |
| 125 output = output.replace('PID', '') | |
| 126 p.wait() | |
| 127 pids_ps = [] | |
| 128 for pid in output.split('\n'): | |
| 129 if pid: | |
| 130 pids_ps.append(int(pid.strip())) | |
| 131 # remove ps subprocess pid which is supposed to be dead in meantime | |
| 132 pids_ps.remove(p.pid) | |
| 133 pids_psutil = psutil.get_pid_list() | |
| 134 pids_ps.sort() | |
| 135 pids_psutil.sort() | |
| 136 | |
| 137 if pids_ps != pids_psutil: | |
| 138 difference = [x for x in pids_psutil if x not in pids_ps] + \ | |
| 139 [x for x in pids_ps if x not in pids_psutil] | |
| 140 self.fail("difference: " + str(difference)) | |
| 141 | |
| 142 | |
| 143 if __name__ == '__main__': | |
| 144 test_suite = unittest.TestSuite() | |
| 145 test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) | |
| 146 unittest.TextTestRunner(verbosity=2).run(test_suite) | |
| 147 | |
| 148 | |
| OLD | NEW |