OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # | 2 # |
3 # $Id: _posix.py 744 2010-10-27 22:42:42Z jloden $ | 3 # $Id: _posix.py 1142 2011-10-05 18:45:49Z g.rodola $ |
4 # | 4 # |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. |
| 6 # Use of this source code is governed by a BSD-style license that can be |
| 7 # found in the LICENSE file. |
| 8 |
| 9 """POSIX specific tests. These are implicitly run by test_psutil.py.""" |
5 | 10 |
6 import unittest | 11 import unittest |
7 import subprocess | 12 import subprocess |
8 import time | 13 import time |
9 import sys | 14 import sys |
10 import os | 15 import os |
11 | 16 |
12 import psutil | 17 import psutil |
13 | 18 |
14 from test_psutil import kill, get_test_subprocess, PYTHON, LINUX, OSX | 19 from test_psutil import (get_test_subprocess, reap_children, PYTHON, LINUX, OSX, |
| 20 ignore_access_denied, sh) |
15 | 21 |
16 | 22 |
17 def ps(cmd): | 23 def ps(cmd): |
18 """Expects a ps command with a -o argument and parse the result | 24 """Expects a ps command with a -o argument and parse the result |
19 returning only the value of interest. | 25 returning only the value of interest. |
20 """ | 26 """ |
21 if not LINUX: | 27 if not LINUX: |
22 cmd = cmd.replace(" --no-headers ", " ") | 28 cmd = cmd.replace(" --no-headers ", " ") |
23 p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) | 29 p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) |
24 output = p.communicate()[0].strip() | 30 output = p.communicate()[0].strip() |
25 if sys.version_info >= (3,): | 31 if sys.version_info >= (3,): |
26 output = str(output, sys.stdout.encoding) | 32 output = str(output, sys.stdout.encoding) |
27 if not LINUX: | 33 if not LINUX: |
28 output = output.split('\n')[1] | 34 output = output.split('\n')[1] |
29 try: | 35 try: |
30 return int(output) | 36 return int(output) |
31 except ValueError: | 37 except ValueError: |
32 return output | 38 return output |
33 | 39 |
34 | 40 |
35 class PosixSpecificTestCase(unittest.TestCase): | 41 class PosixSpecificTestCase(unittest.TestCase): |
36 """Compare psutil results against 'ps' command line utility.""" | 42 """Compare psutil results against 'ps' command line utility.""" |
37 | 43 |
38 # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps | 44 # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps |
39 | 45 |
40 def setUp(self): | 46 def setUp(self): |
41 self.pid = get_test_subprocess([PYTHON, "-E", "-O"]).pid | 47 self.pid = get_test_subprocess([PYTHON, "-E", "-O"]).pid |
42 | 48 |
43 def tearDown(self): | 49 def tearDown(self): |
44 kill(self.pid) | 50 reap_children() |
45 | 51 |
46 def test_process_parent_pid(self): | 52 def test_process_parent_pid(self): |
47 ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid) | 53 ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid) |
48 ppid_psutil = psutil.Process(self.pid).ppid | 54 ppid_psutil = psutil.Process(self.pid).ppid |
49 self.assertEqual(ppid_ps, ppid_psutil) | 55 self.assertEqual(ppid_ps, ppid_psutil) |
50 | 56 |
51 def test_process_uid(self): | 57 def test_process_uid(self): |
52 uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid) | 58 uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid) |
53 uid_psutil = psutil.Process(self.pid).uid | 59 uid_psutil = psutil.Process(self.pid).uids.real |
54 self.assertEqual(uid_ps, uid_psutil) | 60 self.assertEqual(uid_ps, uid_psutil) |
55 | 61 |
56 def test_process_gid(self): | 62 def test_process_gid(self): |
57 gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid) | 63 gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid) |
58 gid_psutil = psutil.Process(self.pid).gid | 64 gid_psutil = psutil.Process(self.pid).gids.real |
59 self.assertEqual(gid_ps, gid_psutil) | 65 self.assertEqual(gid_ps, gid_psutil) |
60 | 66 |
61 def test_process_username(self): | 67 def test_process_username(self): |
62 username_ps = ps("ps --no-headers -o user -p %s" %self.pid) | 68 username_ps = ps("ps --no-headers -o user -p %s" %self.pid) |
63 username_psutil = psutil.Process(self.pid).username | 69 username_psutil = psutil.Process(self.pid).username |
64 self.assertEqual(username_ps, username_psutil) | 70 self.assertEqual(username_ps, username_psutil) |
65 | 71 |
| 72 @ignore_access_denied |
66 def test_process_rss_memory(self): | 73 def test_process_rss_memory(self): |
67 # give python interpreter some time to properly initialize | 74 # give python interpreter some time to properly initialize |
68 # so that the results are the same | 75 # so that the results are the same |
69 time.sleep(0.1) | 76 time.sleep(0.1) |
70 rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid) | 77 rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid) |
71 rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024 | 78 rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024 |
72 self.assertEqual(rss_ps, rss_psutil) | 79 self.assertEqual(rss_ps, rss_psutil) |
73 | 80 |
| 81 @ignore_access_denied |
74 def test_process_vsz_memory(self): | 82 def test_process_vsz_memory(self): |
75 # give python interpreter some time to properly initialize | 83 # give python interpreter some time to properly initialize |
76 # so that the results are the same | 84 # so that the results are the same |
77 time.sleep(0.1) | 85 time.sleep(0.1) |
78 vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid) | 86 vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid) |
79 vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024 | 87 vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024 |
80 self.assertEqual(vsz_ps, vsz_psutil) | 88 self.assertEqual(vsz_ps, vsz_psutil) |
81 | 89 |
82 def test_process_name(self): | 90 def test_process_name(self): |
83 # use command + arg since "comm" keyword not supported on all platforms | 91 # use command + arg since "comm" keyword not supported on all platforms |
84 name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] | 92 name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] |
85 # remove path if there is any, from the command | 93 # remove path if there is any, from the command |
86 name_ps = os.path.basename(name_ps) | 94 name_ps = os.path.basename(name_ps).lower() |
87 name_psutil = psutil.Process(self.pid).name | 95 name_psutil = psutil.Process(self.pid).name.lower() |
88 if OSX: | 96 self.assertEqual(name_ps, name_psutil) |
89 self.assertEqual(name_psutil, "Python") | |
90 else: | |
91 self.assertEqual(name_ps, name_psutil) | |
92 | |
93 | 97 |
94 def test_process_exe(self): | 98 def test_process_exe(self): |
95 ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' '
)[0] | 99 ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' '
)[0] |
96 psutil_pathname = psutil.Process(self.pid).exe | 100 psutil_pathname = psutil.Process(self.pid).exe |
97 self.assertEqual(ps_pathname, psutil_pathname) | 101 try: |
| 102 self.assertEqual(ps_pathname, psutil_pathname) |
| 103 except AssertionError: |
| 104 # certain platforms such as BSD are more accurate returning: |
| 105 # "/usr/local/bin/python2.7" |
| 106 # ...instead of: |
| 107 # "/usr/local/bin/python" |
| 108 # We do not want to consider this difference in accuracy |
| 109 # an error. |
| 110 adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] |
| 111 self.assertEqual(ps_pathname, adjusted_ps_pathname) |
98 | 112 |
99 def test_process_cmdline(self): | 113 def test_process_cmdline(self): |
100 ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid) | 114 ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid) |
101 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline) | 115 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline) |
102 self.assertEqual(ps_cmdline, psutil_cmdline) | 116 self.assertEqual(ps_cmdline, psutil_cmdline) |
103 | 117 |
104 def test_get_pids(self): | 118 def test_get_pids(self): |
105 # Note: this test might fail if the OS is starting/killing | 119 # Note: this test might fail if the OS is starting/killing |
106 # other processes in the meantime | 120 # other processes in the meantime |
107 p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIP
E) | 121 p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIP
E) |
108 output = p.communicate()[0].strip() | 122 output = p.communicate()[0].strip() |
109 if sys.version_info >= (3,): | 123 if sys.version_info >= (3,): |
110 output = str(output, sys.stdout.encoding) | 124 output = str(output, sys.stdout.encoding) |
111 output = output.replace('PID', '') | 125 output = output.replace('PID', '') |
112 p.wait() | 126 p.wait() |
113 pids_ps = [] | 127 pids_ps = [] |
114 for pid in output.split('\n'): | 128 for pid in output.split('\n'): |
115 if pid: | 129 if pid: |
116 pids_ps.append(int(pid.strip())) | 130 pids_ps.append(int(pid.strip())) |
117 # remove ps subprocess pid which is supposed to be dead in meantime | 131 # remove ps subprocess pid which is supposed to be dead in meantime |
118 pids_ps.remove(p.pid) | 132 pids_ps.remove(p.pid) |
119 # not all systems include pid 0 in their list but psutil does so | |
120 # we force it | |
121 if 0 not in pids_ps: | |
122 pids_ps.append(0) | |
123 | |
124 pids_psutil = psutil.get_pid_list() | 133 pids_psutil = psutil.get_pid_list() |
125 pids_ps.sort() | 134 pids_ps.sort() |
126 pids_psutil.sort() | 135 pids_psutil.sort() |
127 | 136 |
128 if pids_ps != pids_psutil: | 137 if pids_ps != pids_psutil: |
129 difference = filter(lambda x:x not in pids_ps, pids_psutil) + \ | 138 difference = [x for x in pids_psutil if x not in pids_ps] + \ |
130 filter(lambda x:x not in pids_psutil, pids_ps) | 139 [x for x in pids_ps if x not in pids_psutil] |
131 self.fail("difference: " + str(difference)) | 140 self.fail("difference: " + str(difference)) |
132 | 141 |
133 | 142 |
134 if __name__ == '__main__': | 143 if __name__ == '__main__': |
135 test_suite = unittest.TestSuite() | 144 test_suite = unittest.TestSuite() |
136 test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) | 145 test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) |
137 unittest.TextTestRunner(verbosity=2).run(test_suite) | 146 unittest.TextTestRunner(verbosity=2).run(test_suite) |
138 | 147 |
139 | 148 |
OLD | NEW |