| OLD | NEW |
| (Empty) |
| 1 #!/usr/bin/env python | |
| 2 # | |
| 3 # $Id: _windows.py 1142 2011-10-05 18:45:49Z g.rodola $ | |
| 4 # | |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. | |
| 6 # Use of this source code is governed by a BSD-style license that can be | |
| 7 # found in the LICENSE file. | |
| 8 | |
| 9 """Windows specific tests. These are implicitly run by test_psutil.py.""" | |
| 10 | |
| 11 import os | |
| 12 import unittest | |
| 13 import platform | |
| 14 import signal | |
| 15 import time | |
| 16 import warnings | |
| 17 import atexit | |
| 18 import sys | |
| 19 | |
| 20 import psutil | |
| 21 import _psutil_mswindows | |
| 22 from test_psutil import reap_children, get_test_subprocess, wait_for_pid | |
| 23 try: | |
| 24 import wmi | |
| 25 except ImportError: | |
| 26 err = sys.exc_info()[1] | |
| 27 atexit.register(warnings.warn, "Couldn't run wmi tests: %s" % str(err), | |
| 28 RuntimeWarning) | |
| 29 wmi = None | |
| 30 | |
| 31 WIN2000 = platform.win32_ver()[0] == '2000' | |
| 32 | |
| 33 | |
| 34 class WindowsSpecificTestCase(unittest.TestCase): | |
| 35 | |
| 36 def setUp(self): | |
| 37 sproc = get_test_subprocess() | |
| 38 wait_for_pid(sproc.pid) | |
| 39 self.pid = sproc.pid | |
| 40 | |
| 41 def tearDown(self): | |
| 42 reap_children() | |
| 43 | |
| 44 def test_issue_24(self): | |
| 45 p = psutil.Process(0) | |
| 46 self.assertRaises(psutil.AccessDenied, p.kill) | |
| 47 | |
| 48 def test_special_pid(self): | |
| 49 if not WIN2000: | |
| 50 p = psutil.Process(4) | |
| 51 else: | |
| 52 p = psutil.Process(8) | |
| 53 self.assertEqual(p.name, 'System') | |
| 54 # use __str__ to access all common Process properties to check | |
| 55 # that nothing strange happens | |
| 56 str(p) | |
| 57 p.username | |
| 58 self.assertTrue(p.create_time >= 0.0) | |
| 59 try: | |
| 60 rss, vms = p.get_memory_info() | |
| 61 except psutil.AccessDenied: | |
| 62 # expected on Windows Vista and Windows 7 | |
| 63 if not platform.uname()[1] in ('vista', 'win-7', 'win7'): | |
| 64 raise | |
| 65 else: | |
| 66 self.assertTrue(rss > 0) | |
| 67 if not WIN2000: | |
| 68 self.assertEqual(vms, 0) | |
| 69 | |
| 70 def test_signal(self): | |
| 71 p = psutil.Process(self.pid) | |
| 72 self.assertRaises(ValueError, p.send_signal, signal.SIGINT) | |
| 73 | |
| 74 if wmi is not None: | |
| 75 | |
| 76 # --- Process class tests | |
| 77 | |
| 78 def test_process_name(self): | |
| 79 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 80 p = psutil.Process(self.pid) | |
| 81 self.assertEqual(p.name, w.Caption) | |
| 82 | |
| 83 def test_process_exe(self): | |
| 84 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 85 p = psutil.Process(self.pid) | |
| 86 self.assertEqual(p.exe, w.ExecutablePath) | |
| 87 | |
| 88 if not WIN2000: | |
| 89 def test_process_cmdline(self): | |
| 90 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 91 p = psutil.Process(self.pid) | |
| 92 self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"',
'')) | |
| 93 | |
| 94 def test_process_username(self): | |
| 95 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 96 p = psutil.Process(self.pid) | |
| 97 domain, _, username = w.GetOwner() | |
| 98 username = "%s\\%s" %(domain, username) | |
| 99 self.assertEqual(p.username, username) | |
| 100 | |
| 101 def test_process_rss_memory(self): | |
| 102 time.sleep(0.1) | |
| 103 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 104 p = psutil.Process(self.pid) | |
| 105 rss = p.get_memory_info().rss | |
| 106 self.assertEqual(rss, int(w.WorkingSetSize)) | |
| 107 | |
| 108 def test_process_vms_memory(self): | |
| 109 time.sleep(0.1) | |
| 110 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 111 p = psutil.Process(self.pid) | |
| 112 vms = p.get_memory_info().vms | |
| 113 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx | |
| 114 # ...claims that PageFileUsage is represented in Kilo | |
| 115 # bytes but funnily enough on certain platforms bytes are | |
| 116 # returned instead. | |
| 117 wmi_usage = int(w.PageFileUsage) | |
| 118 if (vms != wmi_usage) and (vms != wmi_usage * 1024): | |
| 119 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) | |
| 120 | |
| 121 def test_process_create_time(self): | |
| 122 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 123 p = psutil.Process(self.pid) | |
| 124 wmic_create = str(w.CreationDate.split('.')[0]) | |
| 125 psutil_create = time.strftime("%Y%m%d%H%M%S", | |
| 126 time.localtime(p.create_time)) | |
| 127 self.assertEqual(wmic_create, psutil_create) | |
| 128 | |
| 129 | |
| 130 # --- psutil namespace functions and constants tests | |
| 131 | |
| 132 def test_NUM_CPUS(self): | |
| 133 num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) | |
| 134 self.assertEqual(num_cpus, psutil.NUM_CPUS) | |
| 135 | |
| 136 def test_TOTAL_PHYMEM(self): | |
| 137 w = wmi.WMI().Win32_ComputerSystem()[0] | |
| 138 self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM) | |
| 139 | |
| 140 def test__UPTIME(self): | |
| 141 # _UPTIME constant is not public but it is used internally | |
| 142 # as value to return for pid 0 creation time. | |
| 143 # WMI behaves the same. | |
| 144 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] | |
| 145 p = psutil.Process(0) | |
| 146 wmic_create = str(w.CreationDate.split('.')[0]) | |
| 147 psutil_create = time.strftime("%Y%m%d%H%M%S", | |
| 148 time.localtime(p.create_time)) | |
| 149 # XXX - ? no actual test here | |
| 150 | |
| 151 def test_get_pids(self): | |
| 152 # Note: this test might fail if the OS is starting/killing | |
| 153 # other processes in the meantime | |
| 154 w = wmi.WMI().Win32_Process() | |
| 155 wmi_pids = [x.ProcessId for x in w] | |
| 156 wmi_pids.sort() | |
| 157 psutil_pids = psutil.get_pid_list() | |
| 158 psutil_pids.sort() | |
| 159 if wmi_pids != psutil_pids: | |
| 160 difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \ | |
| 161 filter(lambda x:x not in psutil_pids, wmi_pids) | |
| 162 self.fail("difference: " + str(difference)) | |
| 163 | |
| 164 def test_disks(self): | |
| 165 ps_parts = psutil.disk_partitions(all=True) | |
| 166 wmi_parts = wmi.WMI().Win32_LogicalDisk() | |
| 167 for ps_part in ps_parts: | |
| 168 for wmi_part in wmi_parts: | |
| 169 if ps_part.device.replace('\\', '') == wmi_part.DeviceID: | |
| 170 if not ps_part.mountpoint: | |
| 171 # this is usually a CD-ROM with no disk inserted | |
| 172 break | |
| 173 usage = psutil.disk_usage(ps_part.mountpoint) | |
| 174 self.assertEqual(usage.total, int(wmi_part.Size)) | |
| 175 wmi_free = int(wmi_part.FreeSpace) | |
| 176 self.assertEqual(usage.free, wmi_free) | |
| 177 # 10 MB tollerance | |
| 178 if abs(usage.free - wmi_free) > 10 * 1024 * 1024: | |
| 179 self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free
) | |
| 180 break | |
| 181 else: | |
| 182 self.fail("can't find partition %r", ps_part) | |
| 183 | |
| 184 | |
| 185 if __name__ == '__main__': | |
| 186 test_suite = unittest.TestSuite() | |
| 187 test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) | |
| 188 unittest.TextTestRunner(verbosity=2).run(test_suite) | |
| 189 | |
| OLD | NEW |