Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(331)

Side by Side Diff: third_party/psutil/test/test_psutil.py

Issue 6246123: Moving psutil to third_party. This is first step for Media Performance test project. (Closed) Base URL: http://git.chromium.org/git/chromium.git@trunk
Patch Set: modification based on code review's comments Created 9 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « third_party/psutil/test/test_memory_leaks.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 #
3 # $Id: test_psutil.py 806 2010-11-12 23:09:35Z g.rodola $
4 #
5
6 """psutil test suite.
7 Note: this is targeted for python 2.x.
8 To run it under python 3.x you need to use 2to3 tool first:
9
10 $ 2to3 -w test/*.py
11 """
12
13 import unittest
14 import os
15 import sys
16 import subprocess
17 import time
18 import signal
19 import types
20 import traceback
21 import socket
22 import warnings
23 import atexit
24 import errno
25 import threading
26
27 import psutil
28
29
30 PYTHON = os.path.realpath(sys.executable)
31 DEVNULL = open(os.devnull, 'r+')
32
33 POSIX = os.name == 'posix'
34 LINUX = sys.platform.lower().startswith("linux")
35 WINDOWS = sys.platform.lower().startswith("win32")
36 OSX = sys.platform.lower().startswith("darwin")
37 BSD = sys.platform.lower().startswith("freebsd")
38
39 try:
40 psutil.Process(os.getpid()).get_connections()
41 except NotImplementedError, err:
42 SUPPORT_CONNECTIONS = False
43 atexit.register(warnings.warn, "get_connections() not supported on this plat form",
44 RuntimeWarning)
45 else:
46 SUPPORT_CONNECTIONS = True
47
48
49 _subprocesses_started = set()
50
51 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None):
52 """Return a subprocess.Popen object to use in tests.
53 By default stdout and stderr are redirected to /dev/null and the
54 python interpreter is used as test process.
55 """
56 if cmd is None:
57 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
58 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin)
59 _subprocesses_started.add(sproc.pid)
60 return sproc
61
62 def wait_for_pid(pid, timeout=1):
63 """Wait for pid to show up in the process list then return.
64 Used in the test suite to give time the sub process to initialize.
65 """
66 raise_at = time.time() + timeout
67 while 1:
68 if pid in psutil.get_pid_list():
69 # give it one more iteration to allow full initialization
70 time.sleep(0.01)
71 return
72 time.sleep(0.0001)
73 if time.time() >= raise_at:
74 raise RuntimeError("Timed out")
75
76 def kill(pid):
77 """Kill a process given its PID."""
78 if hasattr(os, 'kill'):
79 os.kill(pid, signal.SIGKILL)
80 else:
81 psutil.Process(pid).kill()
82
83 def reap_children(search_all=False):
84 """Kill any subprocess started by this test suite and ensure that
85 no zombies stick around to hog resources and create problems when
86 looking for refleaks.
87 """
88 if POSIX:
89 def waitpid(process):
90 # on posix we are free to wait for any pending process by
91 # passing -1 to os.waitpid()
92 while True:
93 try:
94 any_process = -1
95 pid, status = os.waitpid(any_process, os.WNOHANG)
96 if pid == 0 and not process.is_running():
97 break
98 except OSError:
99 if not process.is_running():
100 break
101 else:
102 def waitpid(process):
103 # on non-posix systems we just wait for the given process
104 # to go away
105 while process.is_running():
106 time.sleep(0.01)
107
108 if search_all:
109 this_process = psutil.Process(os.getpid())
110 pids = [x.pid for x in this_process.get_children()]
111 else:
112 pids =_subprocesses_started
113 while pids:
114 pid = pids.pop()
115 try:
116 child = psutil.Process(pid)
117 child.kill()
118 except psutil.NoSuchProcess:
119 pass
120 else:
121 waitpid(child)
122
123 # we want to search through all processes before exiting
124 atexit.register(reap_children, search_all=True)
125
126 def skipIf(condition, reason="", warn=False):
127 """Decorator which skip a test under if condition is satisfied.
128 This is a substitute of unittest.skipIf which is available
129 only in python 2.7 and 3.2.
130 If 'reason' argument is provided this will be printed during
131 tests execution.
132 If 'warn' is provided a RuntimeWarning will be shown when all
133 tests are run.
134 """
135 def outer(fun, *args, **kwargs):
136 def inner(self):
137 if condition:
138 sys.stdout.write("skipped-")
139 sys.stdout.flush()
140 if warn:
141 objname = "%s.%s" % (self.__class__.__name__, fun.__name__)
142 msg = "%s was skipped" % objname
143 if reason:
144 msg += "; reason: " + repr(reason)
145 atexit.register(warnings.warn, msg, RuntimeWarning)
146 return
147 else:
148 return fun(self, *args, **kwargs)
149 return inner
150 return outer
151
152 def skipUnless(condition, reason="", warn=False):
153 """Contrary of skipIf."""
154 if not condition:
155 return skipIf(True, reason, warn)
156 return skipIf(False)
157
158
159 class TestCase(unittest.TestCase):
160
161 def tearDown(self):
162 reap_children()
163
164 def test_get_process_list(self):
165 pids = [x.pid for x in psutil.get_process_list()]
166 self.assertTrue(os.getpid() in pids)
167 self.assertTrue(0 in pids)
168
169 def test_process_iter(self):
170 pids = [x.pid for x in psutil.process_iter()]
171 self.assertTrue(os.getpid() in pids)
172 self.assertTrue(0 in pids)
173
174 def test_kill(self):
175 sproc = get_test_subprocess()
176 test_pid = sproc.pid
177 wait_for_pid(test_pid)
178 p = psutil.Process(test_pid)
179 name = p.name
180 p.kill()
181 sproc.wait()
182 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
183
184 def test_terminate(self):
185 sproc = get_test_subprocess()
186 test_pid = sproc.pid
187 wait_for_pid(test_pid)
188 p = psutil.Process(test_pid)
189 name = p.name
190 p.terminate()
191 sproc.wait()
192 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
193
194 def test_send_signal(self):
195 if POSIX:
196 sig = signal.SIGKILL
197 else:
198 sig = signal.SIGTERM
199 sproc = get_test_subprocess()
200 test_pid = sproc.pid
201 p = psutil.Process(test_pid)
202 name = p.name
203 p.send_signal(sig)
204 sproc.wait()
205 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
206
207 def test_TOTAL_PHYMEM(self):
208 x = psutil.TOTAL_PHYMEM
209 self.assertTrue(isinstance(x, (int, long)))
210 self.assertTrue(x > 0)
211
212 def test_used_phymem(self):
213 x = psutil.used_phymem()
214 self.assertTrue(isinstance(x, (int, long)))
215 self.assertTrue(x > 0)
216
217 def test_avail_phymem(self):
218 x = psutil.avail_phymem()
219 self.assertTrue(isinstance(x, (int, long)))
220 self.assertTrue(x > 0)
221
222 def test_total_virtmem(self):
223 x = psutil.total_virtmem()
224 self.assertTrue(isinstance(x, (int, long)))
225 self.assertTrue(x >= 0)
226
227 def test_used_virtmem(self):
228 x = psutil.used_virtmem()
229 self.assertTrue(isinstance(x, (int, long)))
230 self.assertTrue(x >= 0)
231
232 def test_avail_virtmem(self):
233 x = psutil.avail_virtmem()
234 self.assertTrue(isinstance(x, (int, long)))
235 self.assertTrue(x >= 0)
236
237 @skipUnless(LINUX)
238 def test_cached_phymem(self):
239 x = psutil.cached_phymem()
240 self.assertTrue(isinstance(x, (int, long)))
241 self.assertTrue(x >= 0)
242
243 @skipUnless(LINUX)
244 def test_phymem_buffers(self):
245 x = psutil.phymem_buffers()
246 self.assertTrue(isinstance(x, (int, long)))
247 self.assertTrue(x >= 0)
248
249 def test_system_cpu_times(self):
250 total = 0
251 times = psutil.cpu_times()
252 self.assertTrue(isinstance(times, psutil.CPUTimes))
253 sum(times)
254 for cp_time in times:
255 self.assertTrue(isinstance(cp_time, float))
256 self.assertTrue(cp_time >= 0.0)
257 total += cp_time
258 # test CPUTimes's __iter__ and __str__ implementation
259 self.assertEqual(total, sum(times))
260 str(times)
261
262 def test_system_cpu_times2(self):
263 t1 = sum(psutil.cpu_times())
264 time.sleep(0.1)
265 t2 = sum(psutil.cpu_times())
266 difference = t2 - t1
267 if not difference >= 0.05:
268 self.fail("difference %s" % difference)
269
270 def test_system_cpu_percent(self):
271 psutil.cpu_percent(interval=0.001)
272 psutil.cpu_percent(interval=0.001)
273 for x in xrange(1000):
274 percent = psutil.cpu_percent(interval=None)
275 self.assertTrue(isinstance(percent, float))
276 self.assertTrue(percent >= 0.0)
277 self.assertTrue(percent <= 100.0)
278
279 def test_process_cpu_percent(self):
280 p = psutil.Process(os.getpid())
281 p.get_cpu_percent(interval=0.001)
282 p.get_cpu_percent(interval=0.001)
283 for x in xrange(100):
284 percent = p.get_cpu_percent(interval=None)
285 self.assertTrue(isinstance(percent, float))
286 self.assertTrue(percent >= 0.0)
287 self.assertTrue(percent <= 100.0)
288
289 def test_get_process_cpu_times(self):
290 times = psutil.Process(os.getpid()).get_cpu_times()
291 self.assertTrue((times.user > 0.0) or (times.system > 0.0))
292 # make sure returned values can be pretty printed with strftime
293 time.strftime("%H:%M:%S", time.localtime(times.user))
294 time.strftime("%H:%M:%S", time.localtime(times.system))
295
296 # Test Process.cpu_times() against os.times()
297 # os.times() is broken on Python 2.6
298 # http://bugs.python.org/issue1040026
299 # XXX fails on OSX: not sure if it's for os.times(). We should
300 # try this with Python 2.7 and re-enable the test.
301
302 @skipUnless(sys.version_info > (2, 6, 1) and not OSX)
303 def test_get_process_cpu_times2(self):
304 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
305 utime, ktime = os.times()[:2]
306
307 # Use os.times()[:2] as base values to compare our results
308 # using a tolerance of +/- 0.1 seconds.
309 # It will fail if the difference between the values is > 0.1s.
310 if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
311 self.fail("expected: %s, found: %s" %(utime, user_time))
312
313 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
314 self.fail("expected: %s, found: %s" %(ktime, kernel_time))
315
316 def test_create_time(self):
317 sproc = get_test_subprocess()
318 now = time.time()
319 wait_for_pid(sproc.pid)
320 p = psutil.Process(sproc.pid)
321 create_time = p.create_time
322
323 # Use time.time() as base value to compare our result using a
324 # tolerance of +/- 1 second.
325 # It will fail if the difference between the values is > 2s.
326 difference = abs(create_time - now)
327 if difference > 2:
328 self.fail("expected: %s, found: %s, difference: %s"
329 % (now, create_time, difference))
330
331 # make sure returned value can be pretty printed with strftime
332 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
333
334 def test_get_num_threads(self):
335 p = psutil.Process(os.getpid())
336 numt1 = p.get_num_threads()
337 if not WINDOWS and not OSX:
338 # test is unreliable on Windows and OS X
339 # NOTE: sleep(1) is too long for OS X, works with sleep(.5)
340 self.assertEqual(numt1, 1)
341 t = threading.Thread(target=lambda:time.sleep(1))
342 t.start()
343 numt2 = p.get_num_threads()
344 if WINDOWS:
345 self.assertTrue(numt2 > numt1)
346 else:
347 self.assertEqual(numt2, 2)
348
349 def test_get_memory_info(self):
350 p = psutil.Process(os.getpid())
351
352 # step 1 - get a base value to compare our results
353 rss1, vms1 = p.get_memory_info()
354 percent1 = p.get_memory_percent()
355 self.assertTrue(rss1 > 0)
356 self.assertTrue(vms1 > 0)
357
358 # step 2 - allocate some memory
359 memarr = [None] * 1500000
360
361 rss2, vms2 = p.get_memory_info()
362 percent2 = p.get_memory_percent()
363 # make sure that the memory usage bumped up
364 self.assertTrue(rss2 > rss1)
365 self.assertTrue(vms2 >= vms1) # vms might be equal
366 self.assertTrue(percent2 > percent1)
367 del memarr
368
369 def test_get_memory_percent(self):
370 p = psutil.Process(os.getpid())
371 self.assertTrue(p.get_memory_percent() > 0.0)
372
373 def test_pid(self):
374 sproc = get_test_subprocess()
375 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
376
377 def test_eq(self):
378 sproc = get_test_subprocess()
379 wait_for_pid(sproc.pid)
380 self.assertTrue(psutil.Process(sproc.pid) == psutil.Process(sproc.pid))
381
382 def test_is_running(self):
383 sproc = get_test_subprocess()
384 wait_for_pid(sproc.pid)
385 p = psutil.Process(sproc.pid)
386 self.assertTrue(p.is_running())
387 psutil.Process(sproc.pid).kill()
388 sproc.wait()
389 self.assertFalse(p.is_running())
390
391 def test_pid_exists(self):
392 sproc = get_test_subprocess()
393 wait_for_pid(sproc.pid)
394 self.assertTrue(psutil.pid_exists(sproc.pid))
395 psutil.Process(sproc.pid).kill()
396 sproc.wait()
397 self.assertFalse(psutil.pid_exists(sproc.pid))
398 self.assertFalse(psutil.pid_exists(-1))
399
400 def test_exe(self):
401 sproc = get_test_subprocess()
402 wait_for_pid(sproc.pid)
403 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON)
404 for p in psutil.process_iter():
405 try:
406 exe = p.exe
407 except psutil.Error:
408 continue
409 if not exe:
410 continue
411 if not os.path.exists(exe):
412 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \
413 % (repr(exe), p.pid, p.name, p.cmdline))
414 if hasattr(os, 'access') and hasattr(os, "X_OK"):
415 if not os.access(p.exe, os.X_OK):
416 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s )" \
417 % (repr(p.exe), p.pid, p.name, p.cmdline))
418
419 def test_path(self):
420 proc = psutil.Process(os.getpid())
421 warnings.filterwarnings("error")
422 try:
423 self.assertRaises(DeprecationWarning, getattr, proc, 'path')
424 finally:
425 warnings.resetwarnings()
426
427 def test_cmdline(self):
428 sproc = get_test_subprocess([PYTHON, "-E"])
429 wait_for_pid(sproc.pid)
430 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"])
431
432 def test_name(self):
433 sproc = get_test_subprocess(PYTHON)
434 wait_for_pid(sproc.pid)
435 if OSX:
436 self.assertEqual(psutil.Process(sproc.pid).name, "Python")
437 else:
438 self.assertEqual(psutil.Process(sproc.pid).name, os.path.basename(PY THON))
439
440 def test_uid(self):
441 sproc = get_test_subprocess()
442 wait_for_pid(sproc.pid)
443 uid = psutil.Process(sproc.pid).uid
444 if hasattr(os, 'getuid'):
445 self.assertEqual(uid, os.getuid())
446 else:
447 # On those platforms where UID doesn't make sense (Windows)
448 # we expect it to be -1
449 self.assertEqual(uid, -1)
450
451 def test_gid(self):
452 sproc = get_test_subprocess()
453 wait_for_pid(sproc.pid)
454 gid = psutil.Process(sproc.pid).gid
455 if hasattr(os, 'getgid'):
456 self.assertEqual(gid, os.getgid())
457 else:
458 # On those platforms where GID doesn't make sense (Windows)
459 # we expect it to be -1
460 self.assertEqual(gid, -1)
461
462 def test_username(self):
463 sproc = get_test_subprocess()
464 p = psutil.Process(sproc.pid)
465 if POSIX:
466 import pwd
467 user = pwd.getpwuid(p.uid).pw_name
468 self.assertEqual(p.username, user)
469 elif WINDOWS:
470 expected_username = os.environ['USERNAME']
471 expected_domain = os.environ['USERDOMAIN']
472 domain, username = p.username.split('\\')
473 self.assertEqual(domain, expected_domain)
474 self.assertEqual(username, expected_username)
475 else:
476 p.username
477
478 @skipUnless(WINDOWS or LINUX)
479 def test_getcwd(self):
480 sproc = get_test_subprocess()
481 wait_for_pid(sproc.pid)
482 p = psutil.Process(sproc.pid)
483 self.assertEqual(p.getcwd(), os.getcwd())
484
485 @skipUnless(WINDOWS or LINUX)
486 def test_getcwd_2(self):
487 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"]
488 sproc = get_test_subprocess(cmd)
489 wait_for_pid(sproc.pid)
490 p = psutil.Process(sproc.pid)
491 time.sleep(0.1)
492 expected_dir = os.path.dirname(os.getcwd())
493 self.assertEqual(p.getcwd(), expected_dir)
494
495 def test_get_open_files(self):
496 thisfile = os.path.join(os.getcwd(), __file__)
497
498 # current process
499 p = psutil.Process(os.getpid())
500 files = p.get_open_files()
501 self.assertFalse(thisfile in files)
502 f = open(thisfile, 'r')
503 filenames = [x.path for x in p.get_open_files()]
504 self.assertTrue(thisfile in filenames)
505 f.close()
506 for file in filenames:
507 self.assertTrue(os.path.isfile(file))
508
509 # another process
510 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % thisfi le
511 sproc = get_test_subprocess([PYTHON, "-c", cmdline])
512 wait_for_pid(sproc.pid)
513 time.sleep(0.1)
514 p = psutil.Process(sproc.pid)
515 filenames = [x.path for x in p.get_open_files()]
516 self.assertTrue(thisfile in filenames)
517 for file in filenames:
518 self.assertTrue(os.path.isfile(file))
519 # all processes
520 for proc in psutil.process_iter():
521 try:
522 files = proc.get_open_files()
523 except psutil.Error:
524 pass
525 else:
526 for file in filenames:
527 self.assertTrue(os.path.isfile(file))
528
529 def test_get_open_files2(self):
530 # test fd and path fields
531 fileobj = open(os.path.join(os.getcwd(), __file__), 'r')
532 p = psutil.Process(os.getpid())
533 for path, fd in p.get_open_files():
534 if path == fileobj.name or fd == fileobj.fileno():
535 break
536 else:
537 self.fail("no file found; files=%s" % repr(p.get_open_files()))
538 self.assertEqual(path, fileobj.name)
539 if WINDOWS:
540 self.assertEqual(fd, -1)
541 else:
542 self.assertEqual(fd, fileobj.fileno())
543 # test positions
544 ntuple = p.get_open_files()[0]
545 self.assertEqual(ntuple[0], ntuple.path)
546 self.assertEqual(ntuple[1], ntuple.fd)
547 # test file is gone
548 fileobj.close()
549 self.assertTrue(fileobj.name not in p.get_open_files())
550
551 @skipUnless(SUPPORT_CONNECTIONS, warn=1)
552 def test_get_connections(self):
553 arg = "import socket, time;" \
554 "s = socket.socket();" \
555 "s.bind(('127.0.0.1', 0));" \
556 "s.listen(1);" \
557 "conn, addr = s.accept();" \
558 "time.sleep(100);"
559 sproc = get_test_subprocess([PYTHON, "-c", arg])
560 time.sleep(0.1)
561 p = psutil.Process(sproc.pid)
562 cons = p.get_connections()
563 self.assertTrue(len(cons) == 1)
564 con = cons[0]
565 self.assertEqual(con.family, socket.AF_INET)
566 self.assertEqual(con.type, socket.SOCK_STREAM)
567 self.assertEqual(con.status, "LISTEN")
568 ip, port = con.local_address
569 self.assertEqual(ip, '127.0.0.1')
570 self.assertEqual(con.remote_address, ())
571 if WINDOWS:
572 self.assertEqual(con.fd, -1)
573 else:
574 self.assertTrue(con.fd > 0)
575 # test positions
576 self.assertEqual(con[0], con.fd)
577 self.assertEqual(con[1], con.family)
578 self.assertEqual(con[2], con.type)
579 self.assertEqual(con[3], con.local_address)
580 self.assertEqual(con[4], con.remote_address)
581 self.assertEqual(con[5], con.status)
582
583 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS)
584 def test_connection_fromfd(self):
585 sock = socket.socket()
586 sock.bind(('127.0.0.1', 0))
587 sock.listen(1)
588 p = psutil.Process(os.getpid())
589 for conn in p.get_connections():
590 if conn.fd == sock.fileno():
591 break
592 else:
593 sock.close()
594 self.fail("couldn't find socket fd")
595 dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
596 try:
597 self.assertEqual(dupsock.getsockname(), conn.local_address)
598 self.assertNotEqual(sock.fileno(), dupsock.fileno())
599 finally:
600 sock.close()
601 dupsock.close()
602
603 @skipUnless(SUPPORT_CONNECTIONS, warn=1)
604 def test_get_connections_all(self):
605
606 def check_address(addr, family):
607 if not addr:
608 return
609 ip, port = addr
610 self.assertTrue(isinstance(port, int))
611 if family == socket.AF_INET:
612 ip = map(int, ip.split('.'))
613 self.assertTrue(len(ip) == 4)
614 for num in ip:
615 self.assertTrue(0 <= num <= 255)
616 self.assertTrue(0 <= port <= 65535)
617
618 def supports_ipv6():
619 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
620 return False
621 try:
622 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
623 sock.bind(("::1", 0))
624 except (socket.error, socket.gaierror):
625 return False
626 else:
627 sock.close()
628 return True
629
630 # all values are supposed to match Linux's tcp_states.h states
631 # table across all platforms.
632 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
633 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
634 "LAST_ACK", "LISTEN", "CLOSING", ""]
635
636 tcp_template = "import socket;" \
637 "s = socket.socket($family, socket.SOCK_STREAM);" \
638 "s.bind(('$addr', 0));" \
639 "s.listen(1);" \
640 "conn, addr = s.accept();"
641
642 udp_template = "import socket, time;" \
643 "s = socket.socket($family, socket.SOCK_DGRAM);" \
644 "s.bind(('$addr', 0));" \
645 "time.sleep(100);"
646
647 from string import Template
648 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET,
649 addr="127.0.0.1")
650 udp4_template = Template(udp_template).substitute(family=socket.AF_INET,
651 addr="127.0.0.1")
652 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6 ,
653 addr="::1")
654 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6 ,
655 addr="::1")
656
657 # launch various subprocess instantiating a socket of various
658 # families and tupes to enrich psutil results
659 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template])
660 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template])
661 if supports_ipv6():
662 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template])
663 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template])
664 else:
665 tcp6_proc = None
666 udp6_proc = None
667
668 time.sleep(0.1)
669 this_proc = psutil.Process(os.getpid())
670 children_pids = [p.pid for p in this_proc.get_children()]
671 for p in psutil.process_iter():
672 try:
673 cons = p.get_connections()
674 except (psutil.NoSuchProcess, psutil.AccessDenied):
675 pass
676 else:
677 for conn in cons:
678 self.assertTrue(conn.type in (socket.SOCK_STREAM,
679 socket.SOCK_DGRAM))
680 self.assertTrue(conn.family in (socket.AF_INET,
681 socket.AF_INET6))
682 check_address(conn.local_address, conn.family)
683 check_address(conn.remote_address, conn.family)
684 if conn.status not in valid_states:
685 self.fail("%s is not a valid status" %conn.status)
686 # actually try to bind the local socket; ignore IPv6
687 # sockets as their address might be represented as
688 # an IPv4-mapped-address (e.g. "::127.0.0.1")
689 # and that's rejected by bind()
690 if conn.family == socket.AF_INET:
691 s = socket.socket(conn.family, conn.type)
692 s.bind((conn.local_address[0], 0))
693 s.close()
694
695 if not WINDOWS and hasattr(socket, 'fromfd'):
696 try:
697 dupsock = socket.fromfd(conn.fd, conn.family,
698 conn.type)
699 except (socket.error, OSError), err:
700 if err.args[0] == errno.EBADF:
701 continue
702 else:
703 raise
704 self.assertEqual(dupsock.family, conn.family)
705 self.assertEqual(dupsock.type, conn.type)
706
707 # check matches against subprocesses
708 if p.pid in children_pids:
709 self.assertTrue(len(cons) == 1)
710 conn = cons[0]
711 # TCP v4
712 if p.pid == tcp4_proc.pid:
713 self.assertEqual(conn.family, socket.AF_INET)
714 self.assertEqual(conn.type, socket.SOCK_STREAM)
715 self.assertEqual(conn.local_address[0], "127.0.0.1")
716 self.assertEqual(conn.remote_address, ())
717 self.assertEqual(conn.status, "LISTEN")
718 # UDP v4
719 elif p.pid == udp4_proc.pid:
720 self.assertEqual(conn.family, socket.AF_INET)
721 self.assertEqual(conn.type, socket.SOCK_DGRAM)
722 self.assertEqual(conn.local_address[0], "127.0.0.1")
723 self.assertEqual(conn.remote_address, ())
724 self.assertEqual(conn.status, "")
725 # TCP v6
726 elif p.pid == getattr(tcp6_proc, "pid", None):
727 self.assertEqual(conn.family, socket.AF_INET6)
728 self.assertEqual(conn.type, socket.SOCK_STREAM)
729 self.assertEqual(conn.local_address[0], "::1")
730 self.assertEqual(conn.remote_address, ())
731 self.assertEqual(conn.status, "LISTEN")
732 # UDP v6
733 elif p.pid == getattr(udp6_proc, "pid", None):
734 self.assertEqual(conn.family, socket.AF_INET6)
735 self.assertEqual(conn.type, socket.SOCK_DGRAM)
736 self.assertEqual(conn.local_address[0], "::1")
737 self.assertEqual(conn.remote_address, ())
738 self.assertEqual(conn.status, "")
739
740 def test_parent_ppid(self):
741 this_parent = os.getpid()
742 sproc = get_test_subprocess()
743 p = psutil.Process(sproc.pid)
744 self.assertEqual(p.ppid, this_parent)
745 self.assertEqual(p.parent.pid, this_parent)
746 # no other process is supposed to have us as parent
747 for p in psutil.process_iter():
748 if p.pid == sproc.pid:
749 continue
750 self.assertTrue(p.ppid != this_parent)
751
752 def test_get_children(self):
753 p = psutil.Process(os.getpid())
754 self.assertEqual(p.get_children(), [])
755 sproc = get_test_subprocess()
756 children = p.get_children()
757 self.assertEqual(len(children), 1)
758 self.assertEqual(children[0].pid, sproc.pid)
759 self.assertEqual(children[0].ppid, os.getpid())
760
761 def test_suspend_resume(self):
762 sproc = get_test_subprocess()
763 p = psutil.Process(sproc.pid)
764 p.suspend()
765 p.resume()
766
767 def test_get_pid_list(self):
768 plist = [x.pid for x in psutil.get_process_list()]
769 pidlist = psutil.get_pid_list()
770 self.assertEqual(plist.sort(), pidlist.sort())
771 # make sure every pid is unique
772 self.assertEqual(len(pidlist), len(set(pidlist)))
773
774 def test_test(self):
775 # test for psutil.test() function
776 stdout = sys.stdout
777 sys.stdout = DEVNULL
778 try:
779 psutil.test()
780 finally:
781 sys.stdout = stdout
782
783 def test_types(self):
784 sproc = get_test_subprocess()
785 wait_for_pid(sproc.pid)
786 p = psutil.Process(sproc.pid)
787 self.assert_(isinstance(p.pid, int))
788 self.assert_(isinstance(p.ppid, int))
789 self.assert_(isinstance(p.parent, psutil.Process))
790 self.assert_(isinstance(p.name, str))
791 if self.__class__.__name__ != "LimitedUserTestCase":
792 self.assert_(isinstance(p.exe, str))
793 self.assert_(isinstance(p.cmdline, list))
794 self.assert_(isinstance(p.uid, int))
795 self.assert_(isinstance(p.gid, int))
796 self.assert_(isinstance(p.create_time, float))
797 self.assert_(isinstance(p.username, (unicode, str)))
798 if hasattr(p, 'getcwd'):
799 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase":
800 self.assert_(isinstance(p.getcwd(), str))
801 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase":
802 self.assert_(isinstance(p.get_open_files(), list))
803 for path, fd in p.get_open_files():
804 self.assert_(isinstance(path, (unicode, str)))
805 self.assert_(isinstance(fd, int))
806 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase" \
807 and SUPPORT_CONNECTIONS:
808 self.assert_(isinstance(p.get_connections(), list))
809 self.assert_(isinstance(p.is_running(), bool))
810 if not OSX or self.__class__.__name__ != "LimitedUserTestCase":
811 self.assert_(isinstance(p.get_cpu_times(), tuple))
812 self.assert_(isinstance(p.get_cpu_times()[0], float))
813 self.assert_(isinstance(p.get_cpu_times()[1], float))
814 self.assert_(isinstance(p.get_cpu_percent(0), float))
815 self.assert_(isinstance(p.get_memory_info(), tuple))
816 self.assert_(isinstance(p.get_memory_info()[0], int))
817 self.assert_(isinstance(p.get_memory_info()[1], int))
818 self.assert_(isinstance(p.get_memory_percent(), float))
819 self.assert_(isinstance(p.get_num_threads(), int))
820 self.assert_(isinstance(psutil.get_process_list(), list))
821 self.assert_(isinstance(psutil.get_process_list()[0], psutil.Process))
822 self.assert_(isinstance(psutil.process_iter(), types.GeneratorType))
823 self.assert_(isinstance(psutil.process_iter().next(), psutil.Process))
824 self.assert_(isinstance(psutil.get_pid_list(), list))
825 self.assert_(isinstance(psutil.get_pid_list()[0], int))
826 self.assert_(isinstance(psutil.pid_exists(1), bool))
827
828 def test_invalid_pid(self):
829 self.assertRaises(ValueError, psutil.Process, "1")
830 self.assertRaises(ValueError, psutil.Process, None)
831 # Refers to Issue #12
832 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1)
833
834 def test_zombie_process(self):
835 # Test that NoSuchProcess exception gets raised in the event the
836 # process dies after we create the Process object.
837 # Example:
838 # >>> proc = Process(1234)
839 # >>> time.sleep(5) # time-consuming task, process dies in meantime
840 # >>> proc.name
841 # Refers to Issue #15
842 sproc = get_test_subprocess()
843 p = psutil.Process(sproc.pid)
844 p.kill()
845 sproc.wait()
846
847 self.assertRaises(psutil.NoSuchProcess, getattr, p, "ppid")
848 self.assertRaises(psutil.NoSuchProcess, getattr, p, "parent")
849 self.assertRaises(psutil.NoSuchProcess, getattr, p, "name")
850 self.assertRaises(psutil.NoSuchProcess, getattr, p, "exe")
851 self.assertRaises(psutil.NoSuchProcess, getattr, p, "cmdline")
852 self.assertRaises(psutil.NoSuchProcess, getattr, p, "uid")
853 self.assertRaises(psutil.NoSuchProcess, getattr, p, "gid")
854 self.assertRaises(psutil.NoSuchProcess, getattr, p, "create_time")
855 self.assertRaises(psutil.NoSuchProcess, getattr, p, "username")
856 if hasattr(p, 'getcwd'):
857 self.assertRaises(psutil.NoSuchProcess, p.getcwd)
858 self.assertRaises(psutil.NoSuchProcess, p.get_open_files)
859 self.assertRaises(psutil.NoSuchProcess, p.get_connections)
860 self.assertRaises(psutil.NoSuchProcess, p.suspend)
861 self.assertRaises(psutil.NoSuchProcess, p.resume)
862 self.assertRaises(psutil.NoSuchProcess, p.kill)
863 self.assertRaises(psutil.NoSuchProcess, p.terminate)
864 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
865 self.assertRaises(psutil.NoSuchProcess, p.get_cpu_times)
866 self.assertRaises(psutil.NoSuchProcess, p.get_cpu_percent, 0)
867 self.assertRaises(psutil.NoSuchProcess, p.get_memory_info)
868 self.assertRaises(psutil.NoSuchProcess, p.get_memory_percent)
869 self.assertRaises(psutil.NoSuchProcess, p.get_children)
870 self.assertRaises(psutil.NoSuchProcess, p.get_num_threads)
871 self.assertFalse(p.is_running())
872
873 def test__str__(self):
874 sproc = get_test_subprocess()
875 p = psutil.Process(sproc.pid)
876 self.assertTrue(str(sproc.pid) in str(p))
877 self.assertTrue(os.path.basename(PYTHON) in str(p))
878 sproc = get_test_subprocess()
879 p = psutil.Process(sproc.pid)
880 p.kill()
881 sproc.wait()
882 self.assertTrue(str(sproc.pid) in str(p))
883 self.assertTrue("terminated" in str(p))
884
885 def test_fetch_all(self):
886 valid_procs = 0
887 attrs = ['__str__', 'create_time', 'username', 'getcwd', 'get_cpu_times' ,
888 'get_memory_info', 'get_memory_percent', 'get_open_files',
889 'get_num_threads']
890 for p in psutil.process_iter():
891 for attr in attrs:
892 # skip slow Python implementation; we're reasonably sure
893 # it works anyway
894 if POSIX and attr == 'get_open_files':
895 continue
896 try:
897 attr = getattr(p, attr, None)
898 if attr is not None and callable(attr):
899 attr()
900 valid_procs += 1
901 except (psutil.NoSuchProcess, psutil.AccessDenied), err:
902 self.assertEqual(err.pid, p.pid)
903 if err.name:
904 self.assertEqual(err.name, p.name)
905 self.assertTrue(str(err))
906 self.assertTrue(err.msg)
907 except:
908 trace = traceback.format_exc()
909 self.fail('Exception raised for method %s, pid %s:\n%s'
910 %(attr, p.pid, trace))
911
912 # we should always have a non-empty list, not including PID 0 etc.
913 # special cases.
914 self.assertTrue(valid_procs > 0)
915
916 def test_pid_0(self):
917 # Process(0) is supposed to work on all platforms even if with
918 # some differences
919 p = psutil.Process(0)
920 if WINDOWS:
921 self.assertEqual(p.name, 'System Idle Process')
922 elif LINUX:
923 self.assertEqual(p.name, 'sched')
924 elif BSD:
925 self.assertEqual(p.name, 'swapper')
926 elif OSX:
927 self.assertEqual(p.name, 'kernel_task')
928
929 if os.name == 'posix':
930 self.assertEqual(p.uid, 0)
931 self.assertEqual(p.gid, 0)
932 else:
933 self.assertEqual(p.uid, -1)
934 self.assertEqual(p.gid, -1)
935
936 self.assertTrue(p.ppid in (0, 1))
937 self.assertEqual(p.exe, "")
938 self.assertEqual(p.cmdline, [])
939 # this can either raise AD (Win) or return 0 (UNIX)
940 try:
941 self.assertTrue(p.get_num_threads() in (0, 1))
942 except psutil.AccessDenied:
943 pass
944
945 if OSX : #and os.geteuid() != 0:
946 self.assertRaises(psutil.AccessDenied, p.get_memory_info)
947 self.assertRaises(psutil.AccessDenied, p.get_cpu_times)
948 else:
949 p.get_memory_info()
950
951 # username property
952 if POSIX:
953 self.assertEqual(p.username, 'root')
954 elif WINDOWS:
955 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM')
956 else:
957 p.username
958
959 # PID 0 is supposed to be available on all platforms
960 self.assertTrue(0 in psutil.get_pid_list())
961 self.assertTrue(psutil.pid_exists(0))
962
963 # --- OS specific tests
964
965
966 if hasattr(os, 'getuid'):
967 class LimitedUserTestCase(TestCase):
968 """Repeat the previous tests by using a limited user.
969 Executed only on UNIX and only if the user who run the test script
970 is root.
971 """
972 # the uid/gid the test suite runs under
973 PROCESS_UID = os.getuid()
974 PROCESS_GID = os.getgid()
975
976 def setUp(self):
977 os.setegid(1000)
978 os.seteuid(1000)
979 TestCase.setUp(self)
980
981 def tearDown(self):
982 os.setegid(self.PROCESS_UID)
983 os.seteuid(self.PROCESS_GID)
984 TestCase.tearDown(self)
985
986 def test_path(self):
987 # DeprecationWarning is only raised once
988 pass
989
990 # overridden tests known to raise AccessDenied when run
991 # as limited user on different platforms
992
993 if LINUX:
994
995 def test_getcwd(self):
996 self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd, sel f)
997
998 def test_getcwd_2(self):
999 self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd_2, s elf)
1000
1001 def test_get_open_files(self):
1002 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi les, self)
1003
1004 def test_get_connections(self):
1005 pass
1006
1007 def test_exe(self):
1008 self.assertRaises(psutil.AccessDenied, TestCase.test_exe, self)
1009
1010 if BSD:
1011
1012 def test_get_open_files(self):
1013 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi les, self)
1014
1015 def test_get_open_files2(self):
1016 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi les, self)
1017
1018 def test_get_connections(self):
1019 self.assertRaises(psutil.AccessDenied, TestCase.test_get_connect ions, self)
1020
1021 def test_connection_fromfd(self):
1022 self.assertRaises(psutil.AccessDenied, TestCase.test_connection_ fromfd, self)
1023
1024
1025 def test_main():
1026 tests = []
1027 test_suite = unittest.TestSuite()
1028
1029 tests.append(TestCase)
1030
1031 if hasattr(os, 'getuid'):
1032 if os.getuid() == 0:
1033 tests.append(LimitedUserTestCase)
1034 else:
1035 atexit.register(warnings.warn, "Couldn't run limited user tests ("
1036 "super-user privileges are required)", RuntimeWarning)
1037
1038 if POSIX:
1039 from _posix import PosixSpecificTestCase
1040 tests.append(PosixSpecificTestCase)
1041
1042 # import the specific platform test suite
1043 if LINUX:
1044 from _linux import LinuxSpecificTestCase as stc
1045 elif WINDOWS:
1046 from _windows import WindowsSpecificTestCase as stc
1047 elif OSX:
1048 from _osx import OSXSpecificTestCase as stc
1049 elif BSD:
1050 from _bsd import BSDSpecificTestCase as stc
1051 tests.append(stc)
1052
1053 for test_class in tests:
1054 test_suite.addTest(unittest.makeSuite(test_class))
1055
1056 unittest.TextTestRunner(verbosity=2).run(test_suite)
1057 DEVNULL.close()
1058
1059 if __name__ == '__main__':
1060 test_main()
1061
1062
OLDNEW
« no previous file with comments | « third_party/psutil/test/test_memory_leaks.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698