OLD | NEW |
| (Empty) |
1 #!/usr/bin/env python | |
2 # | |
3 # $Id: _posix.py 1142 2011-10-05 18:45:49Z g.rodola $ | |
4 # | |
5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. | |
6 # Use of this source code is governed by a BSD-style license that can be | |
7 # found in the LICENSE file. | |
8 | |
9 """POSIX specific tests. These are implicitly run by test_psutil.py.""" | |
10 | |
11 import unittest | |
12 import subprocess | |
13 import time | |
14 import sys | |
15 import os | |
16 | |
17 import psutil | |
18 | |
19 from test_psutil import (get_test_subprocess, reap_children, PYTHON, LINUX, OSX, | |
20 ignore_access_denied, sh) | |
21 | |
22 | |
23 def ps(cmd): | |
24 """Expects a ps command with a -o argument and parse the result | |
25 returning only the value of interest. | |
26 """ | |
27 if not LINUX: | |
28 cmd = cmd.replace(" --no-headers ", " ") | |
29 p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) | |
30 output = p.communicate()[0].strip() | |
31 if sys.version_info >= (3,): | |
32 output = str(output, sys.stdout.encoding) | |
33 if not LINUX: | |
34 output = output.split('\n')[1] | |
35 try: | |
36 return int(output) | |
37 except ValueError: | |
38 return output | |
39 | |
40 | |
41 class PosixSpecificTestCase(unittest.TestCase): | |
42 """Compare psutil results against 'ps' command line utility.""" | |
43 | |
44 # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps | |
45 | |
46 def setUp(self): | |
47 self.pid = get_test_subprocess([PYTHON, "-E", "-O"]).pid | |
48 | |
49 def tearDown(self): | |
50 reap_children() | |
51 | |
52 def test_process_parent_pid(self): | |
53 ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid) | |
54 ppid_psutil = psutil.Process(self.pid).ppid | |
55 self.assertEqual(ppid_ps, ppid_psutil) | |
56 | |
57 def test_process_uid(self): | |
58 uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid) | |
59 uid_psutil = psutil.Process(self.pid).uids.real | |
60 self.assertEqual(uid_ps, uid_psutil) | |
61 | |
62 def test_process_gid(self): | |
63 gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid) | |
64 gid_psutil = psutil.Process(self.pid).gids.real | |
65 self.assertEqual(gid_ps, gid_psutil) | |
66 | |
67 def test_process_username(self): | |
68 username_ps = ps("ps --no-headers -o user -p %s" %self.pid) | |
69 username_psutil = psutil.Process(self.pid).username | |
70 self.assertEqual(username_ps, username_psutil) | |
71 | |
72 @ignore_access_denied | |
73 def test_process_rss_memory(self): | |
74 # give python interpreter some time to properly initialize | |
75 # so that the results are the same | |
76 time.sleep(0.1) | |
77 rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid) | |
78 rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024 | |
79 self.assertEqual(rss_ps, rss_psutil) | |
80 | |
81 @ignore_access_denied | |
82 def test_process_vsz_memory(self): | |
83 # give python interpreter some time to properly initialize | |
84 # so that the results are the same | |
85 time.sleep(0.1) | |
86 vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid) | |
87 vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024 | |
88 self.assertEqual(vsz_ps, vsz_psutil) | |
89 | |
90 def test_process_name(self): | |
91 # use command + arg since "comm" keyword not supported on all platforms | |
92 name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] | |
93 # remove path if there is any, from the command | |
94 name_ps = os.path.basename(name_ps).lower() | |
95 name_psutil = psutil.Process(self.pid).name.lower() | |
96 self.assertEqual(name_ps, name_psutil) | |
97 | |
98 def test_process_exe(self): | |
99 ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' '
)[0] | |
100 psutil_pathname = psutil.Process(self.pid).exe | |
101 try: | |
102 self.assertEqual(ps_pathname, psutil_pathname) | |
103 except AssertionError: | |
104 # certain platforms such as BSD are more accurate returning: | |
105 # "/usr/local/bin/python2.7" | |
106 # ...instead of: | |
107 # "/usr/local/bin/python" | |
108 # We do not want to consider this difference in accuracy | |
109 # an error. | |
110 adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] | |
111 self.assertEqual(ps_pathname, adjusted_ps_pathname) | |
112 | |
113 def test_process_cmdline(self): | |
114 ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid) | |
115 psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline) | |
116 self.assertEqual(ps_cmdline, psutil_cmdline) | |
117 | |
118 def test_get_pids(self): | |
119 # Note: this test might fail if the OS is starting/killing | |
120 # other processes in the meantime | |
121 p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIP
E) | |
122 output = p.communicate()[0].strip() | |
123 if sys.version_info >= (3,): | |
124 output = str(output, sys.stdout.encoding) | |
125 output = output.replace('PID', '') | |
126 p.wait() | |
127 pids_ps = [] | |
128 for pid in output.split('\n'): | |
129 if pid: | |
130 pids_ps.append(int(pid.strip())) | |
131 # remove ps subprocess pid which is supposed to be dead in meantime | |
132 pids_ps.remove(p.pid) | |
133 pids_psutil = psutil.get_pid_list() | |
134 pids_ps.sort() | |
135 pids_psutil.sort() | |
136 | |
137 if pids_ps != pids_psutil: | |
138 difference = [x for x in pids_psutil if x not in pids_ps] + \ | |
139 [x for x in pids_ps if x not in pids_psutil] | |
140 self.fail("difference: " + str(difference)) | |
141 | |
142 | |
143 if __name__ == '__main__': | |
144 test_suite = unittest.TestSuite() | |
145 test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) | |
146 unittest.TextTestRunner(verbosity=2).run(test_suite) | |
147 | |
148 | |
OLD | NEW |