| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # | 2 # |
| 3 # $Id: _windows.py 798 2010-11-12 20:52:57Z g.rodola $ | 3 # $Id: _windows.py 1142 2011-10-05 18:45:49Z g.rodola $ |
| 4 # | 4 # |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. |
| 6 # Use of this source code is governed by a BSD-style license that can be |
| 7 # found in the LICENSE file. |
| 5 | 8 |
| 9 """Windows specific tests. These are implicitly run by test_psutil.py.""" |
| 6 | 10 |
| 7 import os | 11 import os |
| 8 import unittest | 12 import unittest |
| 9 import platform | 13 import platform |
| 10 import subprocess | |
| 11 import signal | 14 import signal |
| 12 import time | 15 import time |
| 13 import warnings | 16 import warnings |
| 14 import atexit | 17 import atexit |
| 18 import sys |
| 15 | 19 |
| 16 import psutil | 20 import psutil |
| 21 import _psutil_mswindows |
| 17 from test_psutil import reap_children, get_test_subprocess, wait_for_pid | 22 from test_psutil import reap_children, get_test_subprocess, wait_for_pid |
| 18 try: | 23 try: |
| 19 import wmi | 24 import wmi |
| 20 except ImportError, err: | 25 except ImportError: |
| 26 err = sys.exc_info()[1] |
| 21 atexit.register(warnings.warn, "Couldn't run wmi tests: %s" % str(err), | 27 atexit.register(warnings.warn, "Couldn't run wmi tests: %s" % str(err), |
| 22 RuntimeWarning) | 28 RuntimeWarning) |
| 23 wmi = None | 29 wmi = None |
| 24 | 30 |
| 25 WIN2000 = platform.win32_ver()[0] == '2000' | 31 WIN2000 = platform.win32_ver()[0] == '2000' |
| 26 | 32 |
| 27 | 33 |
| 28 class WindowsSpecificTestCase(unittest.TestCase): | 34 class WindowsSpecificTestCase(unittest.TestCase): |
| 29 | 35 |
| 30 def setUp(self): | 36 def setUp(self): |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 67 | 73 |
| 68 if wmi is not None: | 74 if wmi is not None: |
| 69 | 75 |
| 70 # --- Process class tests | 76 # --- Process class tests |
| 71 | 77 |
| 72 def test_process_name(self): | 78 def test_process_name(self): |
| 73 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | 79 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
| 74 p = psutil.Process(self.pid) | 80 p = psutil.Process(self.pid) |
| 75 self.assertEqual(p.name, w.Caption) | 81 self.assertEqual(p.name, w.Caption) |
| 76 | 82 |
| 77 def test_process_path(self): | 83 def test_process_exe(self): |
| 78 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | 84 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
| 79 p = psutil.Process(self.pid) | 85 p = psutil.Process(self.pid) |
| 80 self.assertEqual(p.exe, w.ExecutablePath) | 86 self.assertEqual(p.exe, w.ExecutablePath) |
| 81 | 87 |
| 82 if not WIN2000: | 88 if not WIN2000: |
| 83 def test_process_cmdline(self): | 89 def test_process_cmdline(self): |
| 84 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | 90 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
| 85 p = psutil.Process(self.pid) | 91 p = psutil.Process(self.pid) |
| 86 self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"',
'')) | 92 self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"',
'')) |
| 87 | 93 |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 133 | 139 |
| 134 def test__UPTIME(self): | 140 def test__UPTIME(self): |
| 135 # _UPTIME constant is not public but it is used internally | 141 # _UPTIME constant is not public but it is used internally |
| 136 # as value to return for pid 0 creation time. | 142 # as value to return for pid 0 creation time. |
| 137 # WMI behaves the same. | 143 # WMI behaves the same. |
| 138 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | 144 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] |
| 139 p = psutil.Process(0) | 145 p = psutil.Process(0) |
| 140 wmic_create = str(w.CreationDate.split('.')[0]) | 146 wmic_create = str(w.CreationDate.split('.')[0]) |
| 141 psutil_create = time.strftime("%Y%m%d%H%M%S", | 147 psutil_create = time.strftime("%Y%m%d%H%M%S", |
| 142 time.localtime(p.create_time)) | 148 time.localtime(p.create_time)) |
| 149 # XXX - ? no actual test here |
| 143 | 150 |
| 144 def test_get_pids(self): | 151 def test_get_pids(self): |
| 145 # Note: this test might fail if the OS is starting/killing | 152 # Note: this test might fail if the OS is starting/killing |
| 146 # other processes in the meantime | 153 # other processes in the meantime |
| 147 w = wmi.WMI().Win32_Process() | 154 w = wmi.WMI().Win32_Process() |
| 148 wmi_pids = [x.ProcessId for x in w] | 155 wmi_pids = [x.ProcessId for x in w] |
| 149 wmi_pids.sort() | 156 wmi_pids.sort() |
| 150 psutil_pids = psutil.get_pid_list() | 157 psutil_pids = psutil.get_pid_list() |
| 151 psutil_pids.sort() | 158 psutil_pids.sort() |
| 152 if wmi_pids != psutil_pids: | 159 if wmi_pids != psutil_pids: |
| 153 difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \ | 160 difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \ |
| 154 filter(lambda x:x not in psutil_pids, wmi_pids) | 161 filter(lambda x:x not in psutil_pids, wmi_pids) |
| 155 self.fail("difference: " + str(difference)) | 162 self.fail("difference: " + str(difference)) |
| 156 | 163 |
| 164 def test_disks(self): |
| 165 ps_parts = psutil.disk_partitions(all=True) |
| 166 wmi_parts = wmi.WMI().Win32_LogicalDisk() |
| 167 for ps_part in ps_parts: |
| 168 for wmi_part in wmi_parts: |
| 169 if ps_part.device.replace('\\', '') == wmi_part.DeviceID: |
| 170 if not ps_part.mountpoint: |
| 171 # this is usually a CD-ROM with no disk inserted |
| 172 break |
| 173 usage = psutil.disk_usage(ps_part.mountpoint) |
| 174 self.assertEqual(usage.total, int(wmi_part.Size)) |
| 175 wmi_free = int(wmi_part.FreeSpace) |
| 176 self.assertEqual(usage.free, wmi_free) |
| 177 # 10 MB tollerance |
| 178 if abs(usage.free - wmi_free) > 10 * 1024 * 1024: |
| 179 self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free
) |
| 180 break |
| 181 else: |
| 182 self.fail("can't find partition %r", ps_part) |
| 183 |
| 157 | 184 |
| 158 if __name__ == '__main__': | 185 if __name__ == '__main__': |
| 159 test_suite = unittest.TestSuite() | 186 test_suite = unittest.TestSuite() |
| 160 test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) | 187 test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) |
| 161 unittest.TextTestRunner(verbosity=2).run(test_suite) | 188 unittest.TextTestRunner(verbosity=2).run(test_suite) |
| 162 | 189 |
| OLD | NEW |