Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(114)

Side by Side Diff: third_party/psutil/test/test_psutil.py

Issue 8159001: Update third_party/psutil and fix the licence issue with it. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: remove the suppression and unnecessary files. Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « third_party/psutil/test/test_memory_leaks.py ('k') | tools/checklicenses/checklicenses.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # 2 #
3 # $Id: test_psutil.py 806 2010-11-12 23:09:35Z g.rodola $ 3 # $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $
4 # 4 #
5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
6 # Use of this source code is governed by a BSD-style license that can be
7 # found in the LICENSE file.
5 8
6 """psutil test suite. 9 """
7 Note: this is targeted for python 2.x. 10 psutil test suite.
8 To run it under python 3.x you need to use 2to3 tool first:
9 11
10 $ 2to3 -w test/*.py 12 Note: this is targeted for both python 2.x and 3.x so there's no need
13 to use 2to3 tool first.
11 """ 14 """
12 15
13 import unittest 16 import unittest
14 import os 17 import os
15 import sys 18 import sys
16 import subprocess 19 import subprocess
17 import time 20 import time
18 import signal 21 import signal
19 import types 22 import types
20 import traceback 23 import traceback
21 import socket 24 import socket
22 import warnings 25 import warnings
23 import atexit 26 import atexit
24 import errno 27 import errno
25 import threading 28 import threading
29 import tempfile
30 import collections
26 31
27 import psutil 32 import psutil
28 33
29 34
30 PYTHON = os.path.realpath(sys.executable) 35 PYTHON = os.path.realpath(sys.executable)
31 DEVNULL = open(os.devnull, 'r+') 36 DEVNULL = open(os.devnull, 'r+')
32 37 TESTFN = os.path.join(os.getcwd(), "$testfile")
38 PY3 = sys.version_info >= (3,)
33 POSIX = os.name == 'posix' 39 POSIX = os.name == 'posix'
34 LINUX = sys.platform.lower().startswith("linux") 40 LINUX = sys.platform.lower().startswith("linux")
35 WINDOWS = sys.platform.lower().startswith("win32") 41 WINDOWS = sys.platform.lower().startswith("win32")
36 OSX = sys.platform.lower().startswith("darwin") 42 OSX = sys.platform.lower().startswith("darwin")
37 BSD = sys.platform.lower().startswith("freebsd") 43 BSD = sys.platform.lower().startswith("freebsd")
38 44
39 try: 45 try:
40 psutil.Process(os.getpid()).get_connections() 46 psutil.Process(os.getpid()).get_connections()
41 except NotImplementedError, err: 47 except NotImplementedError:
48 err = sys.exc_info()[1]
42 SUPPORT_CONNECTIONS = False 49 SUPPORT_CONNECTIONS = False
43 atexit.register(warnings.warn, "get_connections() not supported on this plat form", 50 atexit.register(warnings.warn, "get_connections() not supported on this plat form",
44 RuntimeWarning) 51 RuntimeWarning)
45 else: 52 else:
46 SUPPORT_CONNECTIONS = True 53 SUPPORT_CONNECTIONS = True
47 54
55 if PY3:
56 long = int
57
58 def callable(attr):
59 return isinstance(attr, collections.Callable)
60
48 61
49 _subprocesses_started = set() 62 _subprocesses_started = set()
50 63
51 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None): 64 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None):
52 """Return a subprocess.Popen object to use in tests. 65 """Return a subprocess.Popen object to use in tests.
53 By default stdout and stderr are redirected to /dev/null and the 66 By default stdout and stderr are redirected to /dev/null and the
54 python interpreter is used as test process. 67 python interpreter is used as test process.
55 """ 68 """
56 if cmd is None: 69 if cmd is None:
57 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] 70 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
58 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin) 71 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin)
59 _subprocesses_started.add(sproc.pid) 72 _subprocesses_started.add(sproc.pid)
60 return sproc 73 return sproc
61 74
75 def sh(cmdline):
76 """run cmd in a subprocess and return its output.
77 raises RuntimeError on error.
78 """
79 p = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE,
80 stderr=subprocess.PIPE)
81 stdout, stderr = p.communicate()
82 if p.returncode != 0:
83 raise RuntimeError(stderr)
84 if stderr:
85 warnings.warn(stderr, RuntimeWarning)
86 if sys.version_info >= (3,):
87 stdout = str(stdout, sys.stdout.encoding)
88 return stdout.strip()
89
62 def wait_for_pid(pid, timeout=1): 90 def wait_for_pid(pid, timeout=1):
63 """Wait for pid to show up in the process list then return. 91 """Wait for pid to show up in the process list then return.
64 Used in the test suite to give time the sub process to initialize. 92 Used in the test suite to give time the sub process to initialize.
65 """ 93 """
66 raise_at = time.time() + timeout 94 raise_at = time.time() + timeout
67 while 1: 95 while 1:
68 if pid in psutil.get_pid_list(): 96 if pid in psutil.get_pid_list():
69 # give it one more iteration to allow full initialization 97 # give it one more iteration to allow full initialization
70 time.sleep(0.01) 98 time.sleep(0.01)
71 return 99 return
72 time.sleep(0.0001) 100 time.sleep(0.0001)
73 if time.time() >= raise_at: 101 if time.time() >= raise_at:
74 raise RuntimeError("Timed out") 102 raise RuntimeError("Timed out")
75 103
76 def kill(pid):
77 """Kill a process given its PID."""
78 if hasattr(os, 'kill'):
79 os.kill(pid, signal.SIGKILL)
80 else:
81 psutil.Process(pid).kill()
82
83 def reap_children(search_all=False): 104 def reap_children(search_all=False):
84 """Kill any subprocess started by this test suite and ensure that 105 """Kill any subprocess started by this test suite and ensure that
85 no zombies stick around to hog resources and create problems when 106 no zombies stick around to hog resources and create problems when
86 looking for refleaks. 107 looking for refleaks.
87 """ 108 """
88 if POSIX:
89 def waitpid(process):
90 # on posix we are free to wait for any pending process by
91 # passing -1 to os.waitpid()
92 while True:
93 try:
94 any_process = -1
95 pid, status = os.waitpid(any_process, os.WNOHANG)
96 if pid == 0 and not process.is_running():
97 break
98 except OSError:
99 if not process.is_running():
100 break
101 else:
102 def waitpid(process):
103 # on non-posix systems we just wait for the given process
104 # to go away
105 while process.is_running():
106 time.sleep(0.01)
107
108 if search_all: 109 if search_all:
109 this_process = psutil.Process(os.getpid()) 110 this_process = psutil.Process(os.getpid())
110 pids = [x.pid for x in this_process.get_children()] 111 pids = [x.pid for x in this_process.get_children()]
111 else: 112 else:
112 pids =_subprocesses_started 113 pids =_subprocesses_started
113 while pids: 114 while pids:
114 pid = pids.pop() 115 pid = pids.pop()
115 try: 116 try:
116 child = psutil.Process(pid) 117 child = psutil.Process(pid)
117 child.kill() 118 child.kill()
118 except psutil.NoSuchProcess: 119 except psutil.NoSuchProcess:
119 pass 120 pass
120 else: 121 else:
121 waitpid(child) 122 child.wait()
123
122 124
123 # we want to search through all processes before exiting 125 # we want to search through all processes before exiting
124 atexit.register(reap_children, search_all=True) 126 atexit.register(reap_children, search_all=True)
125 127
126 def skipIf(condition, reason="", warn=False): 128 def skipIf(condition, reason="", warn=False):
127 """Decorator which skip a test under if condition is satisfied. 129 """Decorator which skip a test under if condition is satisfied.
128 This is a substitute of unittest.skipIf which is available 130 This is a substitute of unittest.skipIf which is available
129 only in python 2.7 and 3.2. 131 only in python 2.7 and 3.2.
130 If 'reason' argument is provided this will be printed during 132 If 'reason' argument is provided this will be printed during
131 tests execution. 133 tests execution.
(...skipping 16 matching lines...) Expand all
148 return fun(self, *args, **kwargs) 150 return fun(self, *args, **kwargs)
149 return inner 151 return inner
150 return outer 152 return outer
151 153
152 def skipUnless(condition, reason="", warn=False): 154 def skipUnless(condition, reason="", warn=False):
153 """Contrary of skipIf.""" 155 """Contrary of skipIf."""
154 if not condition: 156 if not condition:
155 return skipIf(True, reason, warn) 157 return skipIf(True, reason, warn)
156 return skipIf(False) 158 return skipIf(False)
157 159
160 def ignore_access_denied(fun):
161 """Decorator to Ignore AccessDenied exceptions."""
162 def outer(fun, *args, **kwargs):
163 def inner(self):
164 try:
165 return fun(self, *args, **kwargs)
166 except psutil.AccessDenied:
167 pass
168 return inner
169 return outer
170
171 def supports_ipv6():
172 """Return True if IPv6 is supported on this platform."""
173 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
174 return False
175 sock = None
176 try:
177 try:
178 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
179 sock.bind(("::1", 0))
180 except (socket.error, socket.gaierror):
181 return False
182 else:
183 return True
184 finally:
185 if sock is not None:
186 sock.close()
187
188
189 class ThreadTask(threading.Thread):
190 """A thread object used for running process thread tests."""
191
192 def __init__(self):
193 threading.Thread.__init__(self)
194 self._running = False
195 self._interval = None
196 self._flag = threading.Event()
197
198 def __repr__(self):
199 name = self.__class__.__name__
200 return '<%s running=%s at %#x>' % (name, self._running, id(self))
201
202 def start(self, interval=0.001):
203 """Start thread and keep it running until an explicit
204 stop() request. Polls for shutdown every 'timeout' seconds.
205 """
206 if self._running:
207 raise ValueError("already started")
208 self._interval = interval
209 threading.Thread.start(self)
210 self._flag.wait()
211
212 def run(self):
213 self._running = True
214 self._flag.set()
215 while self._running:
216 time.sleep(self._interval)
217
218 def stop(self):
219 """Stop thread execution and and waits until it is stopped."""
220 if not self._running:
221 raise ValueError("already stopped")
222 self._running = False
223 self.join()
224
158 225
159 class TestCase(unittest.TestCase): 226 class TestCase(unittest.TestCase):
160 227
161 def tearDown(self): 228 def tearDown(self):
162 reap_children() 229 reap_children()
163 230
231 # ============================
232 # tests for system-related API
233 # ============================
234
164 def test_get_process_list(self): 235 def test_get_process_list(self):
165 pids = [x.pid for x in psutil.get_process_list()] 236 pids = [x.pid for x in psutil.get_process_list()]
166 self.assertTrue(os.getpid() in pids) 237 self.assertTrue(os.getpid() in pids)
167 self.assertTrue(0 in pids)
168 238
169 def test_process_iter(self): 239 def test_process_iter(self):
170 pids = [x.pid for x in psutil.process_iter()] 240 pids = [x.pid for x in psutil.process_iter()]
171 self.assertTrue(os.getpid() in pids) 241 self.assertTrue(os.getpid() in pids)
172 self.assertTrue(0 in pids)
173
174 def test_kill(self):
175 sproc = get_test_subprocess()
176 test_pid = sproc.pid
177 wait_for_pid(test_pid)
178 p = psutil.Process(test_pid)
179 name = p.name
180 p.kill()
181 sproc.wait()
182 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
183
184 def test_terminate(self):
185 sproc = get_test_subprocess()
186 test_pid = sproc.pid
187 wait_for_pid(test_pid)
188 p = psutil.Process(test_pid)
189 name = p.name
190 p.terminate()
191 sproc.wait()
192 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
193
194 def test_send_signal(self):
195 if POSIX:
196 sig = signal.SIGKILL
197 else:
198 sig = signal.SIGTERM
199 sproc = get_test_subprocess()
200 test_pid = sproc.pid
201 p = psutil.Process(test_pid)
202 name = p.name
203 p.send_signal(sig)
204 sproc.wait()
205 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
206 242
207 def test_TOTAL_PHYMEM(self): 243 def test_TOTAL_PHYMEM(self):
208 x = psutil.TOTAL_PHYMEM 244 x = psutil.TOTAL_PHYMEM
209 self.assertTrue(isinstance(x, (int, long))) 245 self.assertTrue(isinstance(x, (int, long)))
210 self.assertTrue(x > 0) 246 self.assertTrue(x > 0)
211 247
212 def test_used_phymem(self): 248 def test_BOOT_TIME(self):
213 x = psutil.used_phymem() 249 x = psutil.BOOT_TIME
214 self.assertTrue(isinstance(x, (int, long))) 250 self.assertTrue(isinstance(x, float))
215 self.assertTrue(x > 0) 251 self.assertTrue(x > 0)
216 252
217 def test_avail_phymem(self): 253 def test_deprecated_memory_functions(self):
218 x = psutil.avail_phymem() 254 warnings.filterwarnings("error")
219 self.assertTrue(isinstance(x, (int, long))) 255 try:
220 self.assertTrue(x > 0) 256 self.assertRaises(DeprecationWarning, psutil.used_phymem)
257 self.assertRaises(DeprecationWarning, psutil.avail_phymem)
258 self.assertRaises(DeprecationWarning, psutil.total_virtmem)
259 self.assertRaises(DeprecationWarning, psutil.used_virtmem)
260 self.assertRaises(DeprecationWarning, psutil.avail_virtmem)
261 finally:
262 warnings.resetwarnings()
221 263
222 def test_total_virtmem(self): 264 def test_phymem_usage(self):
223 x = psutil.total_virtmem() 265 mem = psutil.phymem_usage()
224 self.assertTrue(isinstance(x, (int, long))) 266 self.assertTrue(mem.total > 0)
225 self.assertTrue(x >= 0) 267 self.assertTrue(mem.used > 0)
268 self.assertTrue(mem.free > 0)
269 self.assertTrue(0 <= mem.percent <= 100)
226 270
227 def test_used_virtmem(self): 271 def test_virtmem_usage(self):
228 x = psutil.used_virtmem() 272 mem = psutil.virtmem_usage()
229 self.assertTrue(isinstance(x, (int, long))) 273 self.assertTrue(mem.total > 0)
230 self.assertTrue(x >= 0) 274 self.assertTrue(mem.used >= 0)
231 275 self.assertTrue(mem.free > 0)
232 def test_avail_virtmem(self): 276 self.assertTrue(0 <= mem.percent <= 100)
233 x = psutil.avail_virtmem()
234 self.assertTrue(isinstance(x, (int, long)))
235 self.assertTrue(x >= 0)
236
237 @skipUnless(LINUX)
238 def test_cached_phymem(self):
239 x = psutil.cached_phymem()
240 self.assertTrue(isinstance(x, (int, long)))
241 self.assertTrue(x >= 0)
242 277
243 @skipUnless(LINUX) 278 @skipUnless(LINUX)
244 def test_phymem_buffers(self): 279 def test_phymem_buffers(self):
245 x = psutil.phymem_buffers() 280 x = psutil.phymem_buffers()
246 self.assertTrue(isinstance(x, (int, long))) 281 self.assertTrue(isinstance(x, (int, long)))
247 self.assertTrue(x >= 0) 282 self.assertTrue(x >= 0)
248 283
249 def test_system_cpu_times(self): 284 def test_pid_exists(self):
285 sproc = get_test_subprocess()
286 wait_for_pid(sproc.pid)
287 self.assertTrue(psutil.pid_exists(sproc.pid))
288 p = psutil.Process(sproc.pid)
289 p.kill()
290 p.wait()
291 self.assertFalse(psutil.pid_exists(sproc.pid))
292 self.assertFalse(psutil.pid_exists(-1))
293
294 def test_pid_exists_2(self):
295 reap_children()
296 pids = psutil.get_pid_list()
297 for pid in pids:
298 try:
299 self.assertTrue(psutil.pid_exists(pid))
300 except AssertionError:
301 # in case the process disappeared in meantime fail only
302 # if it is no longer in get_pid_list()
303 time.sleep(.1)
304 if pid in psutil.get_pid_list():
305 self.fail(pid)
306 pids = range(max(pids) + 5000, max(pids) + 6000)
307 for pid in pids:
308 self.assertFalse(psutil.pid_exists(pid))
309
310 def test_get_pid_list(self):
311 plist = [x.pid for x in psutil.get_process_list()]
312 pidlist = psutil.get_pid_list()
313 self.assertEqual(plist.sort(), pidlist.sort())
314 # make sure every pid is unique
315 self.assertEqual(len(pidlist), len(set(pidlist)))
316
317 def test_test(self):
318 # test for psutil.test() function
319 stdout = sys.stdout
320 sys.stdout = DEVNULL
321 try:
322 psutil.test()
323 finally:
324 sys.stdout = stdout
325
326 def test_sys_cpu_times(self):
250 total = 0 327 total = 0
251 times = psutil.cpu_times() 328 times = psutil.cpu_times()
252 self.assertTrue(isinstance(times, psutil.CPUTimes))
253 sum(times) 329 sum(times)
254 for cp_time in times: 330 for cp_time in times:
255 self.assertTrue(isinstance(cp_time, float)) 331 self.assertTrue(isinstance(cp_time, float))
256 self.assertTrue(cp_time >= 0.0) 332 self.assertTrue(cp_time >= 0.0)
257 total += cp_time 333 total += cp_time
258 # test CPUTimes's __iter__ and __str__ implementation
259 self.assertEqual(total, sum(times)) 334 self.assertEqual(total, sum(times))
260 str(times) 335 str(times)
261 336
262 def test_system_cpu_times2(self): 337 def test_sys_cpu_times2(self):
263 t1 = sum(psutil.cpu_times()) 338 t1 = sum(psutil.cpu_times())
264 time.sleep(0.1) 339 time.sleep(0.1)
265 t2 = sum(psutil.cpu_times()) 340 t2 = sum(psutil.cpu_times())
266 difference = t2 - t1 341 difference = t2 - t1
267 if not difference >= 0.05: 342 if not difference >= 0.05:
268 self.fail("difference %s" % difference) 343 self.fail("difference %s" % difference)
269 344
270 def test_system_cpu_percent(self): 345 def test_sys_per_cpu_times(self):
346 for times in psutil.cpu_times(percpu=True):
347 total = 0
348 sum(times)
349 for cp_time in times:
350 self.assertTrue(isinstance(cp_time, float))
351 self.assertTrue(cp_time >= 0.0)
352 total += cp_time
353 self.assertEqual(total, sum(times))
354 str(times)
355
356 def test_sys_per_cpu_times2(self):
357 tot1 = psutil.cpu_times(percpu=True)
358 stop_at = time.time() + 0.1
359 while 1:
360 if time.time() >= stop_at:
361 break
362 tot2 = psutil.cpu_times(percpu=True)
363 for t1, t2 in zip(tot1, tot2):
364 t1, t2 = sum(t1), sum(t2)
365 difference = t2 - t1
366 if difference >= 0.05:
367 return
368 self.fail()
369
370 def test_sys_cpu_percent(self):
271 psutil.cpu_percent(interval=0.001) 371 psutil.cpu_percent(interval=0.001)
272 psutil.cpu_percent(interval=0.001) 372 psutil.cpu_percent(interval=0.001)
273 for x in xrange(1000): 373 for x in range(1000):
274 percent = psutil.cpu_percent(interval=None) 374 percent = psutil.cpu_percent(interval=None)
275 self.assertTrue(isinstance(percent, float)) 375 self.assertTrue(isinstance(percent, float))
276 self.assertTrue(percent >= 0.0) 376 self.assertTrue(percent >= 0.0)
277 self.assertTrue(percent <= 100.0) 377 self.assertTrue(percent <= 100.0)
278 378
279 def test_process_cpu_percent(self): 379 def test_sys_per_cpu_percent(self):
380 psutil.cpu_percent(interval=0.001, percpu=True)
381 psutil.cpu_percent(interval=0.001, percpu=True)
382 for x in range(1000):
383 percents = psutil.cpu_percent(interval=None, percpu=True)
384 for percent in percents:
385 self.assertTrue(isinstance(percent, float))
386 self.assertTrue(percent >= 0.0)
387 self.assertTrue(percent <= 100.0)
388
389 def test_sys_cpu_percent_compare(self):
390 psutil.cpu_percent(interval=0)
391 psutil.cpu_percent(interval=0, percpu=True)
392 time.sleep(.1)
393 t1 = psutil.cpu_percent(interval=0)
394 t2 = psutil.cpu_percent(interval=0, percpu=True)
395 # calculate total average
396 t2 = sum(t2) / len(t2)
397 if abs(t1 - t2) > 5:
398 self.assertEqual(t1, t2)
399
400 def test_disk_usage(self):
401 usage = psutil.disk_usage(os.getcwd())
402 self.assertTrue(usage.total > 0)
403 self.assertTrue(usage.used > 0)
404 self.assertTrue(usage.free > 0)
405 self.assertTrue(usage.total > usage.used)
406 self.assertTrue(usage.total > usage.free)
407 self.assertTrue(0 <= usage.percent <= 100)
408
409 # if path does not exist OSError ENOENT is expected across
410 # all platforms
411 fname = tempfile.mktemp()
412 try:
413 psutil.disk_usage(fname)
414 except OSError:
415 err = sys.exc_info()[1]
416 if err.args[0] != errno.ENOENT:
417 raise
418 else:
419 self.fail("OSError not raised")
420
421 def test_disk_partitions(self):
422 for disk in psutil.disk_partitions(all=False):
423 self.assertTrue(os.path.exists(disk.device))
424 self.assertTrue(os.path.isdir(disk.mountpoint))
425 self.assertTrue(disk.fstype)
426 for disk in psutil.disk_partitions(all=True):
427 if not WINDOWS:
428 self.assertTrue(os.path.isdir(disk.mountpoint))
429 self.assertTrue(disk.fstype)
430
431 def find_mount_point(path):
432 path = os.path.abspath(path)
433 while not os.path.ismount(path):
434 path = os.path.dirname(path)
435 return path
436
437 mount = find_mount_point(__file__)
438 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
439 self.assertTrue(mount in mounts)
440 psutil.disk_usage(mount)
441
442 # XXX
443 @skipUnless(hasattr(psutil, "network_io_counters"))
444 def test_anetwork_io_counters(self):
445 def check_ntuple(nt):
446 self.assertEqual(nt[0], nt.bytes_sent)
447 self.assertEqual(nt[1], nt.bytes_recv)
448 self.assertEqual(nt[2], nt.packets_sent)
449 self.assertEqual(nt[3], nt.packets_recv)
450 self.assertTrue(nt.bytes_sent >= 0)
451 self.assertTrue(nt.bytes_recv >= 0)
452 self.assertTrue(nt.packets_sent >= 0)
453 self.assertTrue(nt.packets_recv >= 0)
454
455 ret = psutil.network_io_counters(pernic=False)
456 check_ntuple(ret)
457 ret = psutil.network_io_counters(pernic=True)
458 for name, ntuple in ret.iteritems():
459 self.assertTrue(name)
460 check_ntuple(ntuple)
461 # XXX
462 @skipUnless(hasattr(psutil, "disk_io_counters"))
463 def test_disk_io_counters(self):
464 def check_ntuple(nt):
465 self.assertEqual(nt[0], nt.read_count)
466 self.assertEqual(nt[1], nt.write_count)
467 self.assertEqual(nt[2], nt.read_bytes)
468 self.assertEqual(nt[3], nt.write_bytes)
469 self.assertEqual(nt[4], nt.read_time)
470 self.assertEqual(nt[5], nt.write_time)
471 self.assertTrue(nt.read_count >= 0)
472 self.assertTrue(nt.write_count >= 0)
473 self.assertTrue(nt.read_bytes >= 0)
474 self.assertTrue(nt.write_bytes >= 0)
475 self.assertTrue(nt.read_time >= 0)
476 self.assertTrue(nt.write_time >= 0)
477
478 ret = psutil.disk_io_counters(perdisk=False)
479 check_ntuple(ret)
480 ret = psutil.disk_io_counters(perdisk=True)
481 for name, ntuple in ret.iteritems():
482 self.assertTrue(name)
483 check_ntuple(ntuple)
484
485 # ====================
486 # Process object tests
487 # ====================
488
489 def test_kill(self):
490 sproc = get_test_subprocess()
491 test_pid = sproc.pid
492 wait_for_pid(test_pid)
493 p = psutil.Process(test_pid)
494 name = p.name
495 p.kill()
496 p.wait()
497 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
498
499 def test_terminate(self):
500 sproc = get_test_subprocess()
501 test_pid = sproc.pid
502 wait_for_pid(test_pid)
503 p = psutil.Process(test_pid)
504 name = p.name
505 p.terminate()
506 p.wait()
507 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
508
509 def test_send_signal(self):
510 if POSIX:
511 sig = signal.SIGKILL
512 else:
513 sig = signal.SIGTERM
514 sproc = get_test_subprocess()
515 test_pid = sproc.pid
516 p = psutil.Process(test_pid)
517 name = p.name
518 p.send_signal(sig)
519 p.wait()
520 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
521
522 def test_wait(self):
523 # check exit code signal
524 sproc = get_test_subprocess()
525 p = psutil.Process(sproc.pid)
526 p.kill()
527 code = p.wait()
528 if os.name == 'posix':
529 self.assertEqual(code, signal.SIGKILL)
530 else:
531 self.assertEqual(code, 0)
532 self.assertFalse(p.is_running())
533
534 sproc = get_test_subprocess()
535 p = psutil.Process(sproc.pid)
536 p.terminate()
537 code = p.wait()
538 if os.name == 'posix':
539 self.assertEqual(code, signal.SIGTERM)
540 else:
541 self.assertEqual(code, 0)
542 self.assertFalse(p.is_running())
543
544 # check sys.exit() code
545 code = "import time, sys; time.sleep(0.01); sys.exit(5);"
546 sproc = get_test_subprocess([PYTHON, "-c", code])
547 p = psutil.Process(sproc.pid)
548 self.assertEqual(p.wait(), 5)
549 self.assertFalse(p.is_running())
550
551 # Test wait() issued twice.
552 # It is not supposed to raise NSP when the process is gone.
553 # On UNIX this should return None, on Windows it should keep
554 # returning the exit code.
555 sproc = get_test_subprocess([PYTHON, "-c", code])
556 p = psutil.Process(sproc.pid)
557 self.assertEqual(p.wait(), 5)
558 self.assertTrue(p.wait() in (5, None))
559
560 # test timeout
561 sproc = get_test_subprocess()
562 p = psutil.Process(sproc.pid)
563 p.name
564 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
565
566 # timeout < 0 not allowed
567 self.assertRaises(ValueError, p.wait, -1)
568
569 @skipUnless(POSIX)
570 def test_wait_non_children(self):
571 # test wait() against processes which are not our children
572 code = "import sys;"
573 code += "from subprocess import Popen, PIPE;"
574 code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON
575 code += "sp = Popen(cmd, stdout=PIPE);"
576 code += "sys.stdout.write(str(sp.pid));"
577 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE )
578
579 grandson_pid = int(sproc.stdout.read())
580 grandson_proc = psutil.Process(grandson_pid)
581 try:
582 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
583 grandson_proc.kill()
584 ret = grandson_proc.wait()
585 self.assertEqual(ret, None)
586 finally:
587 if grandson_proc.is_running():
588 grandson_proc.kill()
589 grandson_proc.wait()
590
591 def test_wait_timeout_0(self):
592 sproc = get_test_subprocess()
593 p = psutil.Process(sproc.pid)
594 self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
595 p.kill()
596 stop_at = time.time() + 2
597 while 1:
598 try:
599 code = p.wait(0)
600 except psutil.TimeoutExpired:
601 if time.time() >= stop_at:
602 raise
603 else:
604 break
605 if os.name == 'posix':
606 self.assertEqual(code, signal.SIGKILL)
607 else:
608 self.assertEqual(code, 0)
609 self.assertFalse(p.is_running())
610
611 def test_cpu_percent(self):
280 p = psutil.Process(os.getpid()) 612 p = psutil.Process(os.getpid())
281 p.get_cpu_percent(interval=0.001) 613 p.get_cpu_percent(interval=0.001)
282 p.get_cpu_percent(interval=0.001) 614 p.get_cpu_percent(interval=0.001)
283 for x in xrange(100): 615 for x in range(100):
284 percent = p.get_cpu_percent(interval=None) 616 percent = p.get_cpu_percent(interval=None)
285 self.assertTrue(isinstance(percent, float)) 617 self.assertTrue(isinstance(percent, float))
286 self.assertTrue(percent >= 0.0) 618 self.assertTrue(percent >= 0.0)
287 self.assertTrue(percent <= 100.0) 619 self.assertTrue(percent <= 100.0)
288 620
289 def test_get_process_cpu_times(self): 621 def test_cpu_times(self):
290 times = psutil.Process(os.getpid()).get_cpu_times() 622 times = psutil.Process(os.getpid()).get_cpu_times()
291 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) 623 self.assertTrue((times.user > 0.0) or (times.system > 0.0))
292 # make sure returned values can be pretty printed with strftime 624 # make sure returned values can be pretty printed with strftime
293 time.strftime("%H:%M:%S", time.localtime(times.user)) 625 time.strftime("%H:%M:%S", time.localtime(times.user))
294 time.strftime("%H:%M:%S", time.localtime(times.system)) 626 time.strftime("%H:%M:%S", time.localtime(times.system))
295 627
296 # Test Process.cpu_times() against os.times() 628 # Test Process.cpu_times() against os.times()
297 # os.times() is broken on Python 2.6 629 # os.times() is broken on Python 2.6
298 # http://bugs.python.org/issue1040026 630 # http://bugs.python.org/issue1040026
299 # XXX fails on OSX: not sure if it's for os.times(). We should 631 # XXX fails on OSX: not sure if it's for os.times(). We should
300 # try this with Python 2.7 and re-enable the test. 632 # try this with Python 2.7 and re-enable the test.
301 633
302 @skipUnless(sys.version_info > (2, 6, 1) and not OSX) 634 @skipUnless(sys.version_info > (2, 6, 1) and not OSX)
303 def test_get_process_cpu_times2(self): 635 def test_cpu_times2(self):
304 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() 636 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
305 utime, ktime = os.times()[:2] 637 utime, ktime = os.times()[:2]
306 638
307 # Use os.times()[:2] as base values to compare our results 639 # Use os.times()[:2] as base values to compare our results
308 # using a tolerance of +/- 0.1 seconds. 640 # using a tolerance of +/- 0.1 seconds.
309 # It will fail if the difference between the values is > 0.1s. 641 # It will fail if the difference between the values is > 0.1s.
310 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: 642 if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
311 self.fail("expected: %s, found: %s" %(utime, user_time)) 643 self.fail("expected: %s, found: %s" %(utime, user_time))
312 644
313 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: 645 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
(...skipping 10 matching lines...) Expand all
324 # tolerance of +/- 1 second. 656 # tolerance of +/- 1 second.
325 # It will fail if the difference between the values is > 2s. 657 # It will fail if the difference between the values is > 2s.
326 difference = abs(create_time - now) 658 difference = abs(create_time - now)
327 if difference > 2: 659 if difference > 2:
328 self.fail("expected: %s, found: %s, difference: %s" 660 self.fail("expected: %s, found: %s, difference: %s"
329 % (now, create_time, difference)) 661 % (now, create_time, difference))
330 662
331 # make sure returned value can be pretty printed with strftime 663 # make sure returned value can be pretty printed with strftime
332 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) 664 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
333 665
666 @skipIf(WINDOWS)
667 def test_terminal(self):
668 tty = sh('tty')
669 p = psutil.Process(os.getpid())
670 self.assertEqual(p.terminal, tty)
671
672 @skipIf(OSX, warn=False)
673 def test_get_io_counters(self):
674 p = psutil.Process(os.getpid())
675 # test reads
676 io1 = p.get_io_counters()
677 f = open(PYTHON, 'rb')
678 f.read()
679 f.close()
680 io2 = p.get_io_counters()
681 if not BSD:
682 self.assertTrue(io2.read_count > io1.read_count)
683 self.assertTrue(io2.write_count == io1.write_count)
684 self.assertTrue(io2.read_bytes >= io1.read_bytes)
685 self.assertTrue(io2.write_bytes >= io1.write_bytes)
686 # test writes
687 io1 = p.get_io_counters()
688 f = tempfile.TemporaryFile()
689 if sys.version_info >= (3,):
690 f.write(bytes("x" * 1000000, 'ascii'))
691 else:
692 f.write("x" * 1000000)
693 f.close()
694 io2 = p.get_io_counters()
695 if not BSD:
696 self.assertTrue(io2.write_count > io1.write_count)
697 self.assertTrue(io2.write_bytes > io1.write_bytes)
698 self.assertTrue(io2.read_count >= io1.read_count)
699 self.assertTrue(io2.read_bytes >= io1.read_bytes)
700
701 @skipUnless(LINUX)
702 def test_get_set_ionice(self):
703 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE,
704 IOPRIO_CLASS_IDLE)
705 self.assertEqual(IOPRIO_CLASS_NONE, 0)
706 self.assertEqual(IOPRIO_CLASS_RT, 1)
707 self.assertEqual(IOPRIO_CLASS_BE, 2)
708 self.assertEqual(IOPRIO_CLASS_IDLE, 3)
709 p = psutil.Process(os.getpid())
710 try:
711 p.set_ionice(2)
712 ioclass, value = p.get_ionice()
713 self.assertEqual(ioclass, 2)
714 self.assertEqual(value, 4)
715 #
716 p.set_ionice(3)
717 ioclass, value = p.get_ionice()
718 self.assertEqual(ioclass, 3)
719 self.assertEqual(value, 0)
720 #
721 p.set_ionice(2, 0)
722 ioclass, value = p.get_ionice()
723 self.assertEqual(ioclass, 2)
724 self.assertEqual(value, 0)
725 p.set_ionice(2, 7)
726 ioclass, value = p.get_ionice()
727 self.assertEqual(ioclass, 2)
728 self.assertEqual(value, 7)
729 self.assertRaises(ValueError, p.set_ionice, 2, 10)
730 finally:
731 p.set_ionice(IOPRIO_CLASS_NONE)
732
334 def test_get_num_threads(self): 733 def test_get_num_threads(self):
734 # on certain platforms such as Linux we might test for exact
735 # thread number, since we always have with 1 thread per process,
736 # but this does not apply across all platforms (OSX, Windows)
335 p = psutil.Process(os.getpid()) 737 p = psutil.Process(os.getpid())
336 numt1 = p.get_num_threads() 738 step1 = p.get_num_threads()
337 if not WINDOWS and not OSX: 739
338 # test is unreliable on Windows and OS X 740 thread = ThreadTask()
339 # NOTE: sleep(1) is too long for OS X, works with sleep(.5) 741 thread.start()
340 self.assertEqual(numt1, 1) 742 try:
341 t = threading.Thread(target=lambda:time.sleep(1)) 743 step2 = p.get_num_threads()
342 t.start() 744 self.assertEqual(step2, step1 + 1)
343 numt2 = p.get_num_threads() 745 thread.stop()
344 if WINDOWS: 746 finally:
345 self.assertTrue(numt2 > numt1) 747 if thread._running:
346 else: 748 thread.stop()
347 self.assertEqual(numt2, 2) 749
750 def test_get_threads(self):
751 p = psutil.Process(os.getpid())
752 step1 = p.get_threads()
753
754 thread = ThreadTask()
755 thread.start()
756
757 try:
758 step2 = p.get_threads()
759 self.assertEqual(len(step2), len(step1) + 1)
760 # on Linux, first thread id is supposed to be this process
761 if LINUX:
762 self.assertEqual(step2[0].id, os.getpid())
763 athread = step2[0]
764 # test named tuple
765 self.assertEqual(athread.id, athread[0])
766 self.assertEqual(athread.user_time, athread[1])
767 self.assertEqual(athread.system_time, athread[2])
768 # test num threads
769 thread.stop()
770 finally:
771 if thread._running:
772 thread.stop()
348 773
349 def test_get_memory_info(self): 774 def test_get_memory_info(self):
350 p = psutil.Process(os.getpid()) 775 p = psutil.Process(os.getpid())
351 776
352 # step 1 - get a base value to compare our results 777 # step 1 - get a base value to compare our results
353 rss1, vms1 = p.get_memory_info() 778 rss1, vms1 = p.get_memory_info()
354 percent1 = p.get_memory_percent() 779 percent1 = p.get_memory_percent()
355 self.assertTrue(rss1 > 0) 780 self.assertTrue(rss1 > 0)
356 self.assertTrue(vms1 > 0) 781 self.assertTrue(vms1 > 0)
357 782
358 # step 2 - allocate some memory 783 # step 2 - allocate some memory
359 memarr = [None] * 1500000 784 memarr = [None] * 1500000
360 785
361 rss2, vms2 = p.get_memory_info() 786 rss2, vms2 = p.get_memory_info()
362 percent2 = p.get_memory_percent() 787 percent2 = p.get_memory_percent()
363 # make sure that the memory usage bumped up 788 # make sure that the memory usage bumped up
364 self.assertTrue(rss2 > rss1) 789 self.assertTrue(rss2 > rss1)
365 self.assertTrue(vms2 >= vms1) # vms might be equal 790 self.assertTrue(vms2 >= vms1) # vms might be equal
366 self.assertTrue(percent2 > percent1) 791 self.assertTrue(percent2 > percent1)
367 del memarr 792 del memarr
368 793
369 def test_get_memory_percent(self): 794 def test_get_memory_percent(self):
370 p = psutil.Process(os.getpid()) 795 p = psutil.Process(os.getpid())
371 self.assertTrue(p.get_memory_percent() > 0.0) 796 self.assertTrue(p.get_memory_percent() > 0.0)
372 797
373 def test_pid(self): 798 def test_pid(self):
374 sproc = get_test_subprocess() 799 sproc = get_test_subprocess()
375 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) 800 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
376 801
377 def test_eq(self):
378 sproc = get_test_subprocess()
379 wait_for_pid(sproc.pid)
380 self.assertTrue(psutil.Process(sproc.pid) == psutil.Process(sproc.pid))
381
382 def test_is_running(self): 802 def test_is_running(self):
383 sproc = get_test_subprocess() 803 sproc = get_test_subprocess()
384 wait_for_pid(sproc.pid) 804 wait_for_pid(sproc.pid)
385 p = psutil.Process(sproc.pid) 805 p = psutil.Process(sproc.pid)
386 self.assertTrue(p.is_running()) 806 self.assertTrue(p.is_running())
387 psutil.Process(sproc.pid).kill() 807 p.kill()
388 sproc.wait() 808 p.wait()
389 self.assertFalse(p.is_running()) 809 self.assertFalse(p.is_running())
390 810
391 def test_pid_exists(self):
392 sproc = get_test_subprocess()
393 wait_for_pid(sproc.pid)
394 self.assertTrue(psutil.pid_exists(sproc.pid))
395 psutil.Process(sproc.pid).kill()
396 sproc.wait()
397 self.assertFalse(psutil.pid_exists(sproc.pid))
398 self.assertFalse(psutil.pid_exists(-1))
399
400 def test_exe(self): 811 def test_exe(self):
401 sproc = get_test_subprocess() 812 sproc = get_test_subprocess()
402 wait_for_pid(sproc.pid) 813 wait_for_pid(sproc.pid)
403 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON) 814 try:
815 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON)
816 except AssertionError:
817 # certain platforms such as BSD are more accurate returning:
818 # "/usr/local/bin/python2.7"
819 # ...instead of:
820 # "/usr/local/bin/python"
821 # We do not want to consider this difference in accuracy
822 # an error.
823 name = psutil.Process(sproc.pid).exe
824 adjusted_name = PYTHON[:len(name)]
825 self.assertEqual(name, adjusted_name)
404 for p in psutil.process_iter(): 826 for p in psutil.process_iter():
405 try: 827 try:
406 exe = p.exe 828 exe = p.exe
407 except psutil.Error: 829 except psutil.Error:
408 continue 830 continue
409 if not exe: 831 if not exe:
410 continue 832 continue
411 if not os.path.exists(exe): 833 if not os.path.exists(exe):
412 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \ 834 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \
413 % (repr(exe), p.pid, p.name, p.cmdline)) 835 % (repr(exe), p.pid, p.name, p.cmdline))
414 if hasattr(os, 'access') and hasattr(os, "X_OK"): 836 if hasattr(os, 'access') and hasattr(os, "X_OK"):
415 if not os.access(p.exe, os.X_OK): 837 if not os.access(p.exe, os.X_OK):
416 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s )" \ 838 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s )" \
417 % (repr(p.exe), p.pid, p.name, p.cmdline)) 839 % (repr(p.exe), p.pid, p.name, p.cmdline))
418 840
419 def test_path(self):
420 proc = psutil.Process(os.getpid())
421 warnings.filterwarnings("error")
422 try:
423 self.assertRaises(DeprecationWarning, getattr, proc, 'path')
424 finally:
425 warnings.resetwarnings()
426
427 def test_cmdline(self): 841 def test_cmdline(self):
428 sproc = get_test_subprocess([PYTHON, "-E"]) 842 sproc = get_test_subprocess([PYTHON, "-E"])
429 wait_for_pid(sproc.pid) 843 wait_for_pid(sproc.pid)
430 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"]) 844 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"])
431 845
432 def test_name(self): 846 def test_name(self):
433 sproc = get_test_subprocess(PYTHON) 847 sproc = get_test_subprocess(PYTHON)
434 wait_for_pid(sproc.pid) 848 wait_for_pid(sproc.pid)
435 if OSX: 849 self.assertEqual(psutil.Process(sproc.pid).name.lower(),
436 self.assertEqual(psutil.Process(sproc.pid).name, "Python") 850 os.path.basename(sys.executable).lower())
437 else:
438 self.assertEqual(psutil.Process(sproc.pid).name, os.path.basename(PY THON))
439 851
440 def test_uid(self): 852 if os.name == 'posix':
441 sproc = get_test_subprocess()
442 wait_for_pid(sproc.pid)
443 uid = psutil.Process(sproc.pid).uid
444 if hasattr(os, 'getuid'):
445 self.assertEqual(uid, os.getuid())
446 else:
447 # On those platforms where UID doesn't make sense (Windows)
448 # we expect it to be -1
449 self.assertEqual(uid, -1)
450 853
451 def test_gid(self): 854 def test_uids(self):
452 sproc = get_test_subprocess() 855 p = psutil.Process(os.getpid())
453 wait_for_pid(sproc.pid) 856 real, effective, saved = p.uids
454 gid = psutil.Process(sproc.pid).gid 857 # os.getuid() refers to "real" uid
455 if hasattr(os, 'getgid'): 858 self.assertEqual(real, os.getuid())
456 self.assertEqual(gid, os.getgid()) 859 # os.geteuid() refers to "effective" uid
457 else: 860 self.assertEqual(effective, os.geteuid())
458 # On those platforms where GID doesn't make sense (Windows) 861 # no such thing as os.getsuid() ("saved" uid), but starting
459 # we expect it to be -1 862 # from python 2.7 we have os.getresuid()[2]
460 self.assertEqual(gid, -1) 863 if hasattr(os, "getresuid"):
864 self.assertEqual(saved, os.getresuid()[2])
865
866 def test_gids(self):
867 p = psutil.Process(os.getpid())
868 real, effective, saved = p.gids
869 # os.getuid() refers to "real" uid
870 self.assertEqual(real, os.getgid())
871 # os.geteuid() refers to "effective" uid
872 self.assertEqual(effective, os.getegid())
873 # no such thing as os.getsuid() ("saved" uid), but starting
874 # from python 2.7 we have os.getresgid()[2]
875 if hasattr(os, "getresuid"):
876 self.assertEqual(saved, os.getresgid()[2])
877
878 def test_nice(self):
879 p = psutil.Process(os.getpid())
880 self.assertRaises(TypeError, setattr, p, "nice", "str")
881 try:
882 try:
883 first_nice = p.nice
884 p.nice = 1
885 self.assertEqual(p.nice, 1)
886 # going back to previous nice value raises AccessDenied on O SX
887 if not OSX:
888 p.nice = 0
889 self.assertEqual(p.nice, 0)
890 except psutil.AccessDenied:
891 pass
892 finally:
893 # going back to previous nice value raises AccessDenied on OSX
894 if not OSX:
895 p.nice = first_nice
896
897 if os.name == 'nt':
898
899 def test_nice(self):
900 p = psutil.Process(os.getpid())
901 self.assertRaises(TypeError, setattr, p, "nice", "str")
902 try:
903 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS)
904 p.nice = psutil.HIGH_PRIORITY_CLASS
905 self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS)
906 p.nice = psutil.NORMAL_PRIORITY_CLASS
907 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS)
908 finally:
909 p.nice = psutil.NORMAL_PRIORITY_CLASS
910
911 def test_status(self):
912 p = psutil.Process(os.getpid())
913 self.assertEqual(p.status, psutil.STATUS_RUNNING)
914 self.assertEqual(str(p.status), "running")
915 for p in psutil.process_iter():
916 if str(p.status) == '?':
917 self.fail("invalid status for pid %d" % p.pid)
461 918
462 def test_username(self): 919 def test_username(self):
463 sproc = get_test_subprocess() 920 sproc = get_test_subprocess()
464 p = psutil.Process(sproc.pid) 921 p = psutil.Process(sproc.pid)
465 if POSIX: 922 if POSIX:
466 import pwd 923 import pwd
467 user = pwd.getpwuid(p.uid).pw_name 924 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name)
468 self.assertEqual(p.username, user)
469 elif WINDOWS: 925 elif WINDOWS:
470 expected_username = os.environ['USERNAME'] 926 expected_username = os.environ['USERNAME']
471 expected_domain = os.environ['USERDOMAIN'] 927 expected_domain = os.environ['USERDOMAIN']
472 domain, username = p.username.split('\\') 928 domain, username = p.username.split('\\')
473 self.assertEqual(domain, expected_domain) 929 self.assertEqual(domain, expected_domain)
474 self.assertEqual(username, expected_username) 930 self.assertEqual(username, expected_username)
475 else: 931 else:
476 p.username 932 p.username
477 933
478 @skipUnless(WINDOWS or LINUX) 934 @skipUnless(WINDOWS or LINUX)
479 def test_getcwd(self): 935 def test_getcwd(self):
480 sproc = get_test_subprocess() 936 sproc = get_test_subprocess()
481 wait_for_pid(sproc.pid) 937 wait_for_pid(sproc.pid)
482 p = psutil.Process(sproc.pid) 938 p = psutil.Process(sproc.pid)
483 self.assertEqual(p.getcwd(), os.getcwd()) 939 self.assertEqual(p.getcwd(), os.getcwd())
484 940
485 @skipUnless(WINDOWS or LINUX) 941 @skipUnless(WINDOWS or LINUX)
486 def test_getcwd_2(self): 942 def test_getcwd_2(self):
487 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] 943 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"]
488 sproc = get_test_subprocess(cmd) 944 sproc = get_test_subprocess(cmd)
489 wait_for_pid(sproc.pid) 945 wait_for_pid(sproc.pid)
490 p = psutil.Process(sproc.pid) 946 p = psutil.Process(sproc.pid)
491 time.sleep(0.1) 947 time.sleep(0.1)
492 expected_dir = os.path.dirname(os.getcwd()) 948 expected_dir = os.path.dirname(os.getcwd())
493 self.assertEqual(p.getcwd(), expected_dir) 949 self.assertEqual(p.getcwd(), expected_dir)
494 950
495 def test_get_open_files(self): 951 def test_get_open_files(self):
496 thisfile = os.path.join(os.getcwd(), __file__)
497
498 # current process 952 # current process
499 p = psutil.Process(os.getpid()) 953 p = psutil.Process(os.getpid())
500 files = p.get_open_files() 954 files = p.get_open_files()
501 self.assertFalse(thisfile in files) 955 self.assertFalse(TESTFN in files)
502 f = open(thisfile, 'r') 956 f = open(TESTFN, 'r')
957 time.sleep(.1)
503 filenames = [x.path for x in p.get_open_files()] 958 filenames = [x.path for x in p.get_open_files()]
504 self.assertTrue(thisfile in filenames) 959 self.assertTrue(TESTFN in filenames)
505 f.close() 960 f.close()
506 for file in filenames: 961 for file in filenames:
507 self.assertTrue(os.path.isfile(file)) 962 self.assertTrue(os.path.isfile(file))
508 963
509 # another process 964 # another process
510 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % thisfi le 965 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN
511 sproc = get_test_subprocess([PYTHON, "-c", cmdline]) 966 sproc = get_test_subprocess([PYTHON, "-c", cmdline])
512 wait_for_pid(sproc.pid) 967 wait_for_pid(sproc.pid)
513 time.sleep(0.1) 968 time.sleep(0.1)
514 p = psutil.Process(sproc.pid) 969 p = psutil.Process(sproc.pid)
515 filenames = [x.path for x in p.get_open_files()] 970 for x in range(100):
516 self.assertTrue(thisfile in filenames) 971 filenames = [x.path for x in p.get_open_files()]
972 if TESTFN in filenames:
973 break
974 time.sleep(.01)
975 else:
976 self.assertTrue(TESTFN in filenames)
517 for file in filenames: 977 for file in filenames:
518 self.assertTrue(os.path.isfile(file)) 978 self.assertTrue(os.path.isfile(file))
519 # all processes 979 # all processes
520 for proc in psutil.process_iter(): 980 for proc in psutil.process_iter():
521 try: 981 try:
522 files = proc.get_open_files() 982 files = proc.get_open_files()
523 except psutil.Error: 983 except psutil.Error:
524 pass 984 pass
525 else: 985 else:
526 for file in filenames: 986 for file in filenames:
527 self.assertTrue(os.path.isfile(file)) 987 self.assertTrue(os.path.isfile(file))
528 988
529 def test_get_open_files2(self): 989 def test_get_open_files2(self):
530 # test fd and path fields 990 # test fd and path fields
531 fileobj = open(os.path.join(os.getcwd(), __file__), 'r') 991 fileobj = open(TESTFN, 'r')
532 p = psutil.Process(os.getpid()) 992 p = psutil.Process(os.getpid())
533 for path, fd in p.get_open_files(): 993 for path, fd in p.get_open_files():
534 if path == fileobj.name or fd == fileobj.fileno(): 994 if path == fileobj.name or fd == fileobj.fileno():
535 break 995 break
536 else: 996 else:
537 self.fail("no file found; files=%s" % repr(p.get_open_files())) 997 self.fail("no file found; files=%s" % repr(p.get_open_files()))
538 self.assertEqual(path, fileobj.name) 998 self.assertEqual(path, fileobj.name)
539 if WINDOWS: 999 if WINDOWS:
540 self.assertEqual(fd, -1) 1000 self.assertEqual(fd, -1)
541 else: 1001 else:
542 self.assertEqual(fd, fileobj.fileno()) 1002 self.assertEqual(fd, fileobj.fileno())
543 # test positions 1003 # test positions
544 ntuple = p.get_open_files()[0] 1004 ntuple = p.get_open_files()[0]
545 self.assertEqual(ntuple[0], ntuple.path) 1005 self.assertEqual(ntuple[0], ntuple.path)
546 self.assertEqual(ntuple[1], ntuple.fd) 1006 self.assertEqual(ntuple[1], ntuple.fd)
547 # test file is gone 1007 # test file is gone
548 fileobj.close() 1008 fileobj.close()
549 self.assertTrue(fileobj.name not in p.get_open_files()) 1009 self.assertTrue(fileobj.name not in p.get_open_files())
550 1010
551 @skipUnless(SUPPORT_CONNECTIONS, warn=1) 1011 @skipUnless(SUPPORT_CONNECTIONS, warn=1)
552 def test_get_connections(self): 1012 def test_get_connections(self):
553 arg = "import socket, time;" \ 1013 arg = "import socket, time;" \
554 "s = socket.socket();" \ 1014 "s = socket.socket();" \
555 "s.bind(('127.0.0.1', 0));" \ 1015 "s.bind(('127.0.0.1', 0));" \
556 "s.listen(1);" \ 1016 "s.listen(1);" \
557 "conn, addr = s.accept();" \ 1017 "conn, addr = s.accept();" \
558 "time.sleep(100);" 1018 "time.sleep(100);"
559 sproc = get_test_subprocess([PYTHON, "-c", arg]) 1019 sproc = get_test_subprocess([PYTHON, "-c", arg])
560 time.sleep(0.1)
561 p = psutil.Process(sproc.pid) 1020 p = psutil.Process(sproc.pid)
562 cons = p.get_connections() 1021 for x in range(100):
563 self.assertTrue(len(cons) == 1) 1022 cons = p.get_connections()
1023 if cons:
1024 break
1025 time.sleep(.01)
1026 self.assertEqual(len(cons), 1)
564 con = cons[0] 1027 con = cons[0]
565 self.assertEqual(con.family, socket.AF_INET) 1028 self.assertEqual(con.family, socket.AF_INET)
566 self.assertEqual(con.type, socket.SOCK_STREAM) 1029 self.assertEqual(con.type, socket.SOCK_STREAM)
567 self.assertEqual(con.status, "LISTEN") 1030 self.assertEqual(con.status, "LISTEN")
568 ip, port = con.local_address 1031 ip, port = con.local_address
569 self.assertEqual(ip, '127.0.0.1') 1032 self.assertEqual(ip, '127.0.0.1')
570 self.assertEqual(con.remote_address, ()) 1033 self.assertEqual(con.remote_address, ())
571 if WINDOWS: 1034 if WINDOWS:
572 self.assertEqual(con.fd, -1) 1035 self.assertEqual(con.fd, -1)
573 else: 1036 else:
574 self.assertTrue(con.fd > 0) 1037 self.assertTrue(con.fd > 0)
575 # test positions 1038 # test positions
576 self.assertEqual(con[0], con.fd) 1039 self.assertEqual(con[0], con.fd)
577 self.assertEqual(con[1], con.family) 1040 self.assertEqual(con[1], con.family)
578 self.assertEqual(con[2], con.type) 1041 self.assertEqual(con[2], con.type)
579 self.assertEqual(con[3], con.local_address) 1042 self.assertEqual(con[3], con.local_address)
580 self.assertEqual(con[4], con.remote_address) 1043 self.assertEqual(con[4], con.remote_address)
581 self.assertEqual(con[5], con.status) 1044 self.assertEqual(con[5], con.status)
582 1045
1046 @skipUnless(supports_ipv6())
1047 def test_get_connections_ipv6(self):
1048 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
1049 s.bind(('::1', 0))
1050 s.listen(1)
1051 cons = psutil.Process(os.getpid()).get_connections()
1052 s.close()
1053 self.assertEqual(len(cons), 1)
1054 self.assertEqual(cons[0].local_address[0], '::1')
1055
583 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS) 1056 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS)
584 def test_connection_fromfd(self): 1057 def test_connection_fromfd(self):
585 sock = socket.socket() 1058 sock = socket.socket()
586 sock.bind(('127.0.0.1', 0)) 1059 sock.bind(('localhost', 0))
587 sock.listen(1) 1060 sock.listen(1)
588 p = psutil.Process(os.getpid()) 1061 p = psutil.Process(os.getpid())
589 for conn in p.get_connections(): 1062 for conn in p.get_connections():
590 if conn.fd == sock.fileno(): 1063 if conn.fd == sock.fileno():
591 break 1064 break
592 else: 1065 else:
593 sock.close() 1066 sock.close()
594 self.fail("couldn't find socket fd") 1067 self.fail("couldn't find socket fd")
595 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) 1068 dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
596 try: 1069 try:
597 self.assertEqual(dupsock.getsockname(), conn.local_address) 1070 self.assertEqual(dupsock.getsockname(), conn.local_address)
598 self.assertNotEqual(sock.fileno(), dupsock.fileno()) 1071 self.assertNotEqual(sock.fileno(), dupsock.fileno())
599 finally: 1072 finally:
600 sock.close() 1073 sock.close()
601 dupsock.close() 1074 dupsock.close()
602 1075
603 @skipUnless(SUPPORT_CONNECTIONS, warn=1) 1076 @skipUnless(SUPPORT_CONNECTIONS, warn=1)
604 def test_get_connections_all(self): 1077 def test_get_connections_all(self):
605 1078
606 def check_address(addr, family): 1079 def check_address(addr, family):
607 if not addr: 1080 if not addr:
608 return 1081 return
609 ip, port = addr 1082 ip, port = addr
610 self.assertTrue(isinstance(port, int)) 1083 self.assertTrue(isinstance(port, int))
611 if family == socket.AF_INET: 1084 if family == socket.AF_INET:
612 ip = map(int, ip.split('.')) 1085 ip = list(map(int, ip.split('.')))
613 self.assertTrue(len(ip) == 4) 1086 self.assertTrue(len(ip) == 4)
614 for num in ip: 1087 for num in ip:
615 self.assertTrue(0 <= num <= 255) 1088 self.assertTrue(0 <= num <= 255)
616 self.assertTrue(0 <= port <= 65535) 1089 self.assertTrue(0 <= port <= 65535)
617 1090
618 def supports_ipv6():
619 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
620 return False
621 try:
622 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
623 sock.bind(("::1", 0))
624 except (socket.error, socket.gaierror):
625 return False
626 else:
627 sock.close()
628 return True
629
630 # all values are supposed to match Linux's tcp_states.h states 1091 # all values are supposed to match Linux's tcp_states.h states
631 # table across all platforms. 1092 # table across all platforms.
632 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", 1093 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
633 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", 1094 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
634 "LAST_ACK", "LISTEN", "CLOSING", ""] 1095 "LAST_ACK", "LISTEN", "CLOSING", ""]
635 1096
636 tcp_template = "import socket;" \ 1097 tcp_template = "import socket;" \
637 "s = socket.socket($family, socket.SOCK_STREAM);" \ 1098 "s = socket.socket($family, socket.SOCK_STREAM);" \
638 "s.bind(('$addr', 0));" \ 1099 "s.bind(('$addr', 0));" \
639 "s.listen(1);" \ 1100 "s.listen(1);" \
(...skipping 18 matching lines...) Expand all
658 # families and tupes to enrich psutil results 1119 # families and tupes to enrich psutil results
659 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) 1120 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template])
660 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) 1121 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template])
661 if supports_ipv6(): 1122 if supports_ipv6():
662 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) 1123 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template])
663 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) 1124 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template])
664 else: 1125 else:
665 tcp6_proc = None 1126 tcp6_proc = None
666 udp6_proc = None 1127 udp6_proc = None
667 1128
1129 # --- check connections of all processes
1130
668 time.sleep(0.1) 1131 time.sleep(0.1)
669 this_proc = psutil.Process(os.getpid())
670 children_pids = [p.pid for p in this_proc.get_children()]
671 for p in psutil.process_iter(): 1132 for p in psutil.process_iter():
672 try: 1133 try:
673 cons = p.get_connections() 1134 cons = p.get_connections()
674 except (psutil.NoSuchProcess, psutil.AccessDenied): 1135 except (psutil.NoSuchProcess, psutil.AccessDenied):
675 pass 1136 pass
676 else: 1137 else:
677 for conn in cons: 1138 for conn in cons:
678 self.assertTrue(conn.type in (socket.SOCK_STREAM, 1139 self.assertTrue(conn.type in (socket.SOCK_STREAM,
679 socket.SOCK_DGRAM)) 1140 socket.SOCK_DGRAM))
680 self.assertTrue(conn.family in (socket.AF_INET, 1141 self.assertTrue(conn.family in (socket.AF_INET,
681 socket.AF_INET6)) 1142 socket.AF_INET6))
682 check_address(conn.local_address, conn.family) 1143 check_address(conn.local_address, conn.family)
683 check_address(conn.remote_address, conn.family) 1144 check_address(conn.remote_address, conn.family)
684 if conn.status not in valid_states: 1145 if conn.status not in valid_states:
685 self.fail("%s is not a valid status" %conn.status) 1146 self.fail("%s is not a valid status" %conn.status)
686 # actually try to bind the local socket; ignore IPv6 1147 # actually try to bind the local socket; ignore IPv6
687 # sockets as their address might be represented as 1148 # sockets as their address might be represented as
688 # an IPv4-mapped-address (e.g. "::127.0.0.1") 1149 # an IPv4-mapped-address (e.g. "::127.0.0.1")
689 # and that's rejected by bind() 1150 # and that's rejected by bind()
690 if conn.family == socket.AF_INET: 1151 if conn.family == socket.AF_INET:
691 s = socket.socket(conn.family, conn.type) 1152 s = socket.socket(conn.family, conn.type)
692 s.bind((conn.local_address[0], 0)) 1153 s.bind((conn.local_address[0], 0))
693 s.close() 1154 s.close()
694 1155
695 if not WINDOWS and hasattr(socket, 'fromfd'): 1156 if not WINDOWS and hasattr(socket, 'fromfd'):
1157 dupsock = None
696 try: 1158 try:
697 dupsock = socket.fromfd(conn.fd, conn.family, 1159 try:
698 conn.type) 1160 dupsock = socket.fromfd(conn.fd, conn.family,
699 except (socket.error, OSError), err: 1161 conn.type)
700 if err.args[0] == errno.EBADF: 1162 except (socket.error, OSError):
701 continue 1163 err = sys.exc_info()[1]
702 else: 1164 if err.args[0] == errno.EBADF:
703 raise 1165 continue
704 self.assertEqual(dupsock.family, conn.family) 1166 else:
705 self.assertEqual(dupsock.type, conn.type) 1167 raise
1168 # python >= 2.5
1169 if hasattr(dupsock, "family"):
1170 self.assertEqual(dupsock.family, conn.family)
1171 self.assertEqual(dupsock.type, conn.type)
1172 finally:
1173 if dupsock is not None:
1174 dupsock.close()
706 1175
707 # check matches against subprocesses 1176
708 if p.pid in children_pids: 1177 # --- check matches against subprocesses
709 self.assertTrue(len(cons) == 1) 1178
710 conn = cons[0] 1179 for p in psutil.Process(os.getpid()).get_children():
711 # TCP v4 1180 for conn in p.get_connections():
712 if p.pid == tcp4_proc.pid: 1181 # TCP v4
713 self.assertEqual(conn.family, socket.AF_INET) 1182 if p.pid == tcp4_proc.pid:
714 self.assertEqual(conn.type, socket.SOCK_STREAM) 1183 self.assertEqual(conn.family, socket.AF_INET)
715 self.assertEqual(conn.local_address[0], "127.0.0.1") 1184 self.assertEqual(conn.type, socket.SOCK_STREAM)
716 self.assertEqual(conn.remote_address, ()) 1185 self.assertEqual(conn.local_address[0], "127.0.0.1")
717 self.assertEqual(conn.status, "LISTEN") 1186 self.assertEqual(conn.remote_address, ())
718 # UDP v4 1187 self.assertEqual(conn.status, "LISTEN")
719 elif p.pid == udp4_proc.pid: 1188 # UDP v4
720 self.assertEqual(conn.family, socket.AF_INET) 1189 elif p.pid == udp4_proc.pid:
721 self.assertEqual(conn.type, socket.SOCK_DGRAM) 1190 self.assertEqual(conn.family, socket.AF_INET)
722 self.assertEqual(conn.local_address[0], "127.0.0.1") 1191 self.assertEqual(conn.type, socket.SOCK_DGRAM)
723 self.assertEqual(conn.remote_address, ()) 1192 self.assertEqual(conn.local_address[0], "127.0.0.1")
724 self.assertEqual(conn.status, "") 1193 self.assertEqual(conn.remote_address, ())
725 # TCP v6 1194 self.assertEqual(conn.status, "")
726 elif p.pid == getattr(tcp6_proc, "pid", None): 1195 # TCP v6
727 self.assertEqual(conn.family, socket.AF_INET6) 1196 elif p.pid == getattr(tcp6_proc, "pid", None):
728 self.assertEqual(conn.type, socket.SOCK_STREAM) 1197 self.assertEqual(conn.family, socket.AF_INET6)
729 self.assertEqual(conn.local_address[0], "::1") 1198 self.assertEqual(conn.type, socket.SOCK_STREAM)
730 self.assertEqual(conn.remote_address, ()) 1199 self.assertTrue(conn.local_address[0] in ("::", "::1"))
731 self.assertEqual(conn.status, "LISTEN") 1200 self.assertEqual(conn.remote_address, ())
732 # UDP v6 1201 self.assertEqual(conn.status, "LISTEN")
733 elif p.pid == getattr(udp6_proc, "pid", None): 1202 # UDP v6
734 self.assertEqual(conn.family, socket.AF_INET6) 1203 elif p.pid == getattr(udp6_proc, "pid", None):
735 self.assertEqual(conn.type, socket.SOCK_DGRAM) 1204 self.assertEqual(conn.family, socket.AF_INET6)
736 self.assertEqual(conn.local_address[0], "::1") 1205 self.assertEqual(conn.type, socket.SOCK_DGRAM)
737 self.assertEqual(conn.remote_address, ()) 1206 self.assertTrue(conn.local_address[0] in ("::", "::1"))
738 self.assertEqual(conn.status, "") 1207 self.assertEqual(conn.remote_address, ())
1208 self.assertEqual(conn.status, "")
739 1209
740 def test_parent_ppid(self): 1210 def test_parent_ppid(self):
741 this_parent = os.getpid() 1211 this_parent = os.getpid()
742 sproc = get_test_subprocess() 1212 sproc = get_test_subprocess()
743 p = psutil.Process(sproc.pid) 1213 p = psutil.Process(sproc.pid)
744 self.assertEqual(p.ppid, this_parent) 1214 self.assertEqual(p.ppid, this_parent)
745 self.assertEqual(p.parent.pid, this_parent) 1215 self.assertEqual(p.parent.pid, this_parent)
746 # no other process is supposed to have us as parent 1216 # no other process is supposed to have us as parent
747 for p in psutil.process_iter(): 1217 for p in psutil.process_iter():
748 if p.pid == sproc.pid: 1218 if p.pid == sproc.pid:
749 continue 1219 continue
750 self.assertTrue(p.ppid != this_parent) 1220 self.assertTrue(p.ppid != this_parent)
751 1221
752 def test_get_children(self): 1222 def test_get_children(self):
753 p = psutil.Process(os.getpid()) 1223 p = psutil.Process(os.getpid())
754 self.assertEqual(p.get_children(), []) 1224 self.assertEqual(p.get_children(), [])
755 sproc = get_test_subprocess() 1225 sproc = get_test_subprocess()
756 children = p.get_children() 1226 children = p.get_children()
757 self.assertEqual(len(children), 1) 1227 self.assertEqual(len(children), 1)
758 self.assertEqual(children[0].pid, sproc.pid) 1228 self.assertEqual(children[0].pid, sproc.pid)
759 self.assertEqual(children[0].ppid, os.getpid()) 1229 self.assertEqual(children[0].ppid, os.getpid())
760 1230
761 def test_suspend_resume(self): 1231 def test_suspend_resume(self):
762 sproc = get_test_subprocess() 1232 sproc = get_test_subprocess()
763 p = psutil.Process(sproc.pid) 1233 p = psutil.Process(sproc.pid)
764 p.suspend() 1234 p.suspend()
1235 time.sleep(0.1)
1236 self.assertEqual(p.status, psutil.STATUS_STOPPED)
1237 self.assertEqual(str(p.status), "stopped")
765 p.resume() 1238 p.resume()
766 1239 self.assertTrue(p.status != psutil.STATUS_STOPPED)
767 def test_get_pid_list(self):
768 plist = [x.pid for x in psutil.get_process_list()]
769 pidlist = psutil.get_pid_list()
770 self.assertEqual(plist.sort(), pidlist.sort())
771 # make sure every pid is unique
772 self.assertEqual(len(pidlist), len(set(pidlist)))
773
774 def test_test(self):
775 # test for psutil.test() function
776 stdout = sys.stdout
777 sys.stdout = DEVNULL
778 try:
779 psutil.test()
780 finally:
781 sys.stdout = stdout
782
783 def test_types(self):
784 sproc = get_test_subprocess()
785 wait_for_pid(sproc.pid)
786 p = psutil.Process(sproc.pid)
787 self.assert_(isinstance(p.pid, int))
788 self.assert_(isinstance(p.ppid, int))
789 self.assert_(isinstance(p.parent, psutil.Process))
790 self.assert_(isinstance(p.name, str))
791 if self.__class__.__name__ != "LimitedUserTestCase":
792 self.assert_(isinstance(p.exe, str))
793 self.assert_(isinstance(p.cmdline, list))
794 self.assert_(isinstance(p.uid, int))
795 self.assert_(isinstance(p.gid, int))
796 self.assert_(isinstance(p.create_time, float))
797 self.assert_(isinstance(p.username, (unicode, str)))
798 if hasattr(p, 'getcwd'):
799 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase":
800 self.assert_(isinstance(p.getcwd(), str))
801 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase":
802 self.assert_(isinstance(p.get_open_files(), list))
803 for path, fd in p.get_open_files():
804 self.assert_(isinstance(path, (unicode, str)))
805 self.assert_(isinstance(fd, int))
806 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase" \
807 and SUPPORT_CONNECTIONS:
808 self.assert_(isinstance(p.get_connections(), list))
809 self.assert_(isinstance(p.is_running(), bool))
810 if not OSX or self.__class__.__name__ != "LimitedUserTestCase":
811 self.assert_(isinstance(p.get_cpu_times(), tuple))
812 self.assert_(isinstance(p.get_cpu_times()[0], float))
813 self.assert_(isinstance(p.get_cpu_times()[1], float))
814 self.assert_(isinstance(p.get_cpu_percent(0), float))
815 self.assert_(isinstance(p.get_memory_info(), tuple))
816 self.assert_(isinstance(p.get_memory_info()[0], int))
817 self.assert_(isinstance(p.get_memory_info()[1], int))
818 self.assert_(isinstance(p.get_memory_percent(), float))
819 self.assert_(isinstance(p.get_num_threads(), int))
820 self.assert_(isinstance(psutil.get_process_list(), list))
821 self.assert_(isinstance(psutil.get_process_list()[0], psutil.Process))
822 self.assert_(isinstance(psutil.process_iter(), types.GeneratorType))
823 self.assert_(isinstance(psutil.process_iter().next(), psutil.Process))
824 self.assert_(isinstance(psutil.get_pid_list(), list))
825 self.assert_(isinstance(psutil.get_pid_list()[0], int))
826 self.assert_(isinstance(psutil.pid_exists(1), bool))
827 1240
828 def test_invalid_pid(self): 1241 def test_invalid_pid(self):
829 self.assertRaises(ValueError, psutil.Process, "1") 1242 self.assertRaises(ValueError, psutil.Process, "1")
830 self.assertRaises(ValueError, psutil.Process, None) 1243 self.assertRaises(ValueError, psutil.Process, None)
831 # Refers to Issue #12 1244 # Refers to Issue #12
832 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1) 1245 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1)
833 1246
834 def test_zombie_process(self): 1247 def test_zombie_process(self):
835 # Test that NoSuchProcess exception gets raised in the event the 1248 # Test that NoSuchProcess exception gets raised in case the
836 # process dies after we create the Process object. 1249 # process dies after we create the Process object.
837 # Example: 1250 # Example:
838 # >>> proc = Process(1234) 1251 # >>> proc = Process(1234)
839 # >>> time.sleep(5) # time-consuming task, process dies in meantime 1252 # >>> time.sleep(5) # time-consuming task, process dies in meantime
840 # >>> proc.name 1253 # >>> proc.name
841 # Refers to Issue #15 1254 # Refers to Issue #15
842 sproc = get_test_subprocess() 1255 sproc = get_test_subprocess()
843 p = psutil.Process(sproc.pid) 1256 p = psutil.Process(sproc.pid)
844 p.kill() 1257 p.kill()
845 sproc.wait() 1258 p.wait()
846 1259
847 self.assertRaises(psutil.NoSuchProcess, getattr, p, "ppid") 1260 for name in dir(p):
848 self.assertRaises(psutil.NoSuchProcess, getattr, p, "parent") 1261 if name.startswith('_')\
849 self.assertRaises(psutil.NoSuchProcess, getattr, p, "name") 1262 or name in ('pid', 'send_signal', 'is_running', 'set_ionice',
850 self.assertRaises(psutil.NoSuchProcess, getattr, p, "exe") 1263 'wait'):
851 self.assertRaises(psutil.NoSuchProcess, getattr, p, "cmdline") 1264 continue
852 self.assertRaises(psutil.NoSuchProcess, getattr, p, "uid") 1265 try:
853 self.assertRaises(psutil.NoSuchProcess, getattr, p, "gid") 1266 meth = getattr(p, name)
854 self.assertRaises(psutil.NoSuchProcess, getattr, p, "create_time") 1267 if callable(meth):
855 self.assertRaises(psutil.NoSuchProcess, getattr, p, "username") 1268 meth()
856 if hasattr(p, 'getcwd'): 1269 except psutil.NoSuchProcess:
857 self.assertRaises(psutil.NoSuchProcess, p.getcwd) 1270 pass
858 self.assertRaises(psutil.NoSuchProcess, p.get_open_files) 1271 else:
859 self.assertRaises(psutil.NoSuchProcess, p.get_connections) 1272 self.fail("NoSuchProcess exception not raised for %r" % name)
860 self.assertRaises(psutil.NoSuchProcess, p.suspend) 1273
861 self.assertRaises(psutil.NoSuchProcess, p.resume) 1274 # other methods
862 self.assertRaises(psutil.NoSuchProcess, p.kill) 1275 try:
863 self.assertRaises(psutil.NoSuchProcess, p.terminate) 1276 if os.name == 'posix':
1277 p.nice = 1
1278 else:
1279 p.nice = psutil.NORMAL_PRIORITY_CLASS
1280 except psutil.NoSuchProcess:
1281 pass
1282 else:
1283 self.fail("exception not raised")
1284 if hasattr(p, 'set_ionice'):
1285 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2)
864 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) 1286 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
865 self.assertRaises(psutil.NoSuchProcess, p.get_cpu_times)
866 self.assertRaises(psutil.NoSuchProcess, p.get_cpu_percent, 0)
867 self.assertRaises(psutil.NoSuchProcess, p.get_memory_info)
868 self.assertRaises(psutil.NoSuchProcess, p.get_memory_percent)
869 self.assertRaises(psutil.NoSuchProcess, p.get_children)
870 self.assertRaises(psutil.NoSuchProcess, p.get_num_threads)
871 self.assertFalse(p.is_running()) 1287 self.assertFalse(p.is_running())
872 1288
873 def test__str__(self): 1289 def test__str__(self):
874 sproc = get_test_subprocess() 1290 sproc = get_test_subprocess()
875 p = psutil.Process(sproc.pid) 1291 p = psutil.Process(sproc.pid)
876 self.assertTrue(str(sproc.pid) in str(p)) 1292 self.assertTrue(str(sproc.pid) in str(p))
877 self.assertTrue(os.path.basename(PYTHON) in str(p)) 1293 # python shows up as 'Python' in cmdline on OS X so test fails on OS X
1294 if not OSX:
1295 self.assertTrue(os.path.basename(PYTHON) in str(p))
878 sproc = get_test_subprocess() 1296 sproc = get_test_subprocess()
879 p = psutil.Process(sproc.pid) 1297 p = psutil.Process(sproc.pid)
880 p.kill() 1298 p.kill()
881 sproc.wait() 1299 p.wait()
882 self.assertTrue(str(sproc.pid) in str(p)) 1300 self.assertTrue(str(sproc.pid) in str(p))
883 self.assertTrue("terminated" in str(p)) 1301 self.assertTrue("terminated" in str(p))
884 1302
885 def test_fetch_all(self): 1303 def test_fetch_all(self):
886 valid_procs = 0 1304 valid_procs = 0
887 attrs = ['__str__', 'create_time', 'username', 'getcwd', 'get_cpu_times' , 1305 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate',
888 'get_memory_info', 'get_memory_percent', 'get_open_files', 1306 'kill', 'wait']
889 'get_num_threads'] 1307 excluded_names += ['get_cpu_percent', 'get_children']
1308 # XXX - skip slow lsof implementation;
1309 if BSD:
1310 excluded_names += ['get_open_files', 'get_connections']
1311 if OSX:
1312 excluded_names += ['get_connections']
1313 attrs = []
1314 for name in dir(psutil.Process):
1315 if name.startswith("_"):
1316 continue
1317 if name.startswith("set_"):
1318 continue
1319 if name in excluded_names:
1320 continue
1321 attrs.append(name)
1322
890 for p in psutil.process_iter(): 1323 for p in psutil.process_iter():
891 for attr in attrs: 1324 for name in attrs:
892 # skip slow Python implementation; we're reasonably sure
893 # it works anyway
894 if POSIX and attr == 'get_open_files':
895 continue
896 try: 1325 try:
897 attr = getattr(p, attr, None) 1326 try:
898 if attr is not None and callable(attr): 1327 attr = getattr(p, name, None)
899 attr() 1328 if attr is not None and callable(attr):
900 valid_procs += 1 1329 ret = attr()
901 except (psutil.NoSuchProcess, psutil.AccessDenied), err: 1330 else:
902 self.assertEqual(err.pid, p.pid) 1331 ret = attr
903 if err.name: 1332 valid_procs += 1
904 self.assertEqual(err.name, p.name) 1333 except (psutil.NoSuchProcess, psutil.AccessDenied):
905 self.assertTrue(str(err)) 1334 err = sys.exc_info()[1]
906 self.assertTrue(err.msg) 1335 self.assertEqual(err.pid, p.pid)
907 except: 1336 if err.name:
1337 self.assertEqual(err.name, p.name)
1338 self.assertTrue(str(err))
1339 self.assertTrue(err.msg)
1340 else:
1341 if name == 'parent' or ret in (0, 0.0, [], None):
1342 continue
1343 self.assertTrue(ret)
1344 if name == "exe":
1345 self.assertTrue(os.path.isfile(ret))
1346 elif name == "getcwd":
1347 # XXX - temporary fix; on my Linux box
1348 # chrome process cws is errnously reported
1349 # as /proc/4144/fdinfo whichd doesn't exist
1350 if 'chrome' in p.name:
1351 continue
1352 self.assertTrue(os.path.isdir(ret))
1353 except Exception:
1354 err = sys.exc_info()[1]
908 trace = traceback.format_exc() 1355 trace = traceback.format_exc()
909 self.fail('Exception raised for method %s, pid %s:\n%s' 1356 self.fail('%s\nmethod=%s, pid=%s, retvalue=%s'
910 %(attr, p.pid, trace)) 1357 %(trace, name, p.pid, repr(ret)))
911 1358
912 # we should always have a non-empty list, not including PID 0 etc. 1359 # we should always have a non-empty list, not including PID 0 etc.
913 # special cases. 1360 # special cases.
914 self.assertTrue(valid_procs > 0) 1361 self.assertTrue(valid_procs > 0)
915 1362
1363 @skipIf(LINUX)
916 def test_pid_0(self): 1364 def test_pid_0(self):
917 # Process(0) is supposed to work on all platforms even if with 1365 # Process(0) is supposed to work on all platforms except Linux
918 # some differences
919 p = psutil.Process(0) 1366 p = psutil.Process(0)
920 if WINDOWS: 1367 self.assertTrue(p.name)
921 self.assertEqual(p.name, 'System Idle Process')
922 elif LINUX:
923 self.assertEqual(p.name, 'sched')
924 elif BSD:
925 self.assertEqual(p.name, 'swapper')
926 elif OSX:
927 self.assertEqual(p.name, 'kernel_task')
928 1368
929 if os.name == 'posix': 1369 if os.name == 'posix':
930 self.assertEqual(p.uid, 0) 1370 self.assertEqual(p.uids.real, 0)
931 self.assertEqual(p.gid, 0) 1371 self.assertEqual(p.gids.real, 0)
932 else:
933 self.assertEqual(p.uid, -1)
934 self.assertEqual(p.gid, -1)
935 1372
936 self.assertTrue(p.ppid in (0, 1)) 1373 self.assertTrue(p.ppid in (0, 1))
937 self.assertEqual(p.exe, "") 1374 #self.assertEqual(p.exe, "")
938 self.assertEqual(p.cmdline, []) 1375 self.assertEqual(p.cmdline, [])
939 # this can either raise AD (Win) or return 0 (UNIX)
940 try: 1376 try:
941 self.assertTrue(p.get_num_threads() in (0, 1)) 1377 p.get_num_threads()
942 except psutil.AccessDenied: 1378 except psutil.AccessDenied:
943 pass 1379 pass
944 1380
945 if OSX : #and os.geteuid() != 0: 1381 if OSX : #and os.geteuid() != 0:
946 self.assertRaises(psutil.AccessDenied, p.get_memory_info) 1382 self.assertRaises(psutil.AccessDenied, p.get_memory_info)
947 self.assertRaises(psutil.AccessDenied, p.get_cpu_times) 1383 self.assertRaises(psutil.AccessDenied, p.get_cpu_times)
948 else: 1384 else:
949 p.get_memory_info() 1385 p.get_memory_info()
950 1386
951 # username property 1387 # username property
952 if POSIX: 1388 if POSIX:
953 self.assertEqual(p.username, 'root') 1389 self.assertEqual(p.username, 'root')
954 elif WINDOWS: 1390 elif WINDOWS:
955 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') 1391 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM')
956 else: 1392 else:
957 p.username 1393 p.username
958 1394
959 # PID 0 is supposed to be available on all platforms
960 self.assertTrue(0 in psutil.get_pid_list()) 1395 self.assertTrue(0 in psutil.get_pid_list())
961 self.assertTrue(psutil.pid_exists(0)) 1396 self.assertTrue(psutil.pid_exists(0))
962 1397
963 # --- OS specific tests 1398 def test_Popen(self):
1399 # Popen class test
1400 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
1401 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1402 try:
1403 proc.name
1404 proc.stdin
1405 self.assertTrue(hasattr(proc, 'name'))
1406 self.assertTrue(hasattr(proc, 'stdin'))
1407 self.assertRaises(AttributeError, getattr, proc, 'foo')
1408 finally:
1409 proc.kill()
1410 proc.wait()
964 1411
965 1412
966 if hasattr(os, 'getuid'): 1413 if hasattr(os, 'getuid'):
967 class LimitedUserTestCase(TestCase): 1414 class LimitedUserTestCase(TestCase):
968 """Repeat the previous tests by using a limited user. 1415 """Repeat the previous tests by using a limited user.
969 Executed only on UNIX and only if the user who run the test script 1416 Executed only on UNIX and only if the user who run the test script
970 is root. 1417 is root.
971 """ 1418 """
972 # the uid/gid the test suite runs under 1419 # the uid/gid the test suite runs under
973 PROCESS_UID = os.getuid() 1420 PROCESS_UID = os.getuid()
974 PROCESS_GID = os.getgid() 1421 PROCESS_GID = os.getgid()
975 1422
1423 def __init__(self, *args, **kwargs):
1424 TestCase.__init__(self, *args, **kwargs)
1425 # re-define all existent test methods in order to
1426 # ignore AccessDenied exceptions
1427 for attr in [x for x in dir(self) if x.startswith('test')]:
1428 meth = getattr(self, attr)
1429 def test_(self):
1430 try:
1431 meth()
1432 except psutil.AccessDenied:
1433 pass
1434 setattr(self, attr, types.MethodType(test_, self))
1435
976 def setUp(self): 1436 def setUp(self):
977 os.setegid(1000) 1437 os.setegid(1000)
978 os.seteuid(1000) 1438 os.seteuid(1000)
979 TestCase.setUp(self) 1439 TestCase.setUp(self)
980 1440
981 def tearDown(self): 1441 def tearDown(self):
982 os.setegid(self.PROCESS_UID) 1442 os.setegid(self.PROCESS_UID)
983 os.seteuid(self.PROCESS_GID) 1443 os.seteuid(self.PROCESS_GID)
984 TestCase.tearDown(self) 1444 TestCase.tearDown(self)
985 1445
986 def test_path(self): 1446 def test_nice(self):
987 # DeprecationWarning is only raised once 1447 try:
988 pass 1448 psutil.Process(os.getpid()).nice = -1
989 1449 except psutil.AccessDenied:
990 # overridden tests known to raise AccessDenied when run
991 # as limited user on different platforms
992
993 if LINUX:
994
995 def test_getcwd(self):
996 self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd, sel f)
997
998 def test_getcwd_2(self):
999 self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd_2, s elf)
1000
1001 def test_get_open_files(self):
1002 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi les, self)
1003
1004 def test_get_connections(self):
1005 pass 1450 pass
1006 1451 else:
1007 def test_exe(self): 1452 self.fail("exception not raised")
1008 self.assertRaises(psutil.AccessDenied, TestCase.test_exe, self)
1009
1010 if BSD:
1011
1012 def test_get_open_files(self):
1013 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi les, self)
1014
1015 def test_get_open_files2(self):
1016 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi les, self)
1017
1018 def test_get_connections(self):
1019 self.assertRaises(psutil.AccessDenied, TestCase.test_get_connect ions, self)
1020
1021 def test_connection_fromfd(self):
1022 self.assertRaises(psutil.AccessDenied, TestCase.test_connection_ fromfd, self)
1023 1453
1024 1454
1025 def test_main(): 1455 def test_main():
1026 tests = [] 1456 tests = []
1027 test_suite = unittest.TestSuite() 1457 test_suite = unittest.TestSuite()
1028
1029 tests.append(TestCase) 1458 tests.append(TestCase)
1030 1459
1031 if hasattr(os, 'getuid'):
1032 if os.getuid() == 0:
1033 tests.append(LimitedUserTestCase)
1034 else:
1035 atexit.register(warnings.warn, "Couldn't run limited user tests ("
1036 "super-user privileges are required)", RuntimeWarning)
1037
1038 if POSIX: 1460 if POSIX:
1039 from _posix import PosixSpecificTestCase 1461 from _posix import PosixSpecificTestCase
1040 tests.append(PosixSpecificTestCase) 1462 tests.append(PosixSpecificTestCase)
1041 1463
1042 # import the specific platform test suite 1464 # import the specific platform test suite
1043 if LINUX: 1465 if LINUX:
1044 from _linux import LinuxSpecificTestCase as stc 1466 from _linux import LinuxSpecificTestCase as stc
1045 elif WINDOWS: 1467 elif WINDOWS:
1046 from _windows import WindowsSpecificTestCase as stc 1468 from _windows import WindowsSpecificTestCase as stc
1047 elif OSX: 1469 elif OSX:
1048 from _osx import OSXSpecificTestCase as stc 1470 from _osx import OSXSpecificTestCase as stc
1049 elif BSD: 1471 elif BSD:
1050 from _bsd import BSDSpecificTestCase as stc 1472 from _bsd import BSDSpecificTestCase as stc
1051 tests.append(stc) 1473 tests.append(stc)
1052 1474
1475 if hasattr(os, 'getuid'):
1476 if os.getuid() == 0:
1477 tests.append(LimitedUserTestCase)
1478 else:
1479 atexit.register(warnings.warn, "Couldn't run limited user tests ("
1480 "super-user privileges are required)", RuntimeWarning)
1481
1053 for test_class in tests: 1482 for test_class in tests:
1054 test_suite.addTest(unittest.makeSuite(test_class)) 1483 test_suite.addTest(unittest.makeSuite(test_class))
1055 1484
1485 f = open(TESTFN, 'w')
1486 f.close()
1487 atexit.register(lambda: os.remove(TESTFN))
1488
1056 unittest.TextTestRunner(verbosity=2).run(test_suite) 1489 unittest.TextTestRunner(verbosity=2).run(test_suite)
1057 DEVNULL.close() 1490 DEVNULL.close()
1058 1491
1059 if __name__ == '__main__': 1492 if __name__ == '__main__':
1060 test_main() 1493 test_main()
1061
1062
OLDNEW
« no previous file with comments | « third_party/psutil/test/test_memory_leaks.py ('k') | tools/checklicenses/checklicenses.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698