OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # | 2 # |
3 # $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $ | 3 # $Id: test_psutil.py 1204 2011-10-24 19:19:01Z g.rodola $ |
4 # | 4 # |
5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. | 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. |
6 # Use of this source code is governed by a BSD-style license that can be | 6 # Use of this source code is governed by a BSD-style license that can be |
7 # found in the LICENSE file. | 7 # found in the LICENSE file. |
8 | 8 |
9 """ | 9 """ |
10 psutil test suite. | 10 psutil test suite. |
11 | 11 |
12 Note: this is targeted for both python 2.x and 3.x so there's no need | 12 Note: this is targeted for both python 2.x and 3.x so there's no need |
13 to use 2to3 tool first. | 13 to use 2to3 tool first. |
(...skipping 418 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
432 path = os.path.abspath(path) | 432 path = os.path.abspath(path) |
433 while not os.path.ismount(path): | 433 while not os.path.ismount(path): |
434 path = os.path.dirname(path) | 434 path = os.path.dirname(path) |
435 return path | 435 return path |
436 | 436 |
437 mount = find_mount_point(__file__) | 437 mount = find_mount_point(__file__) |
438 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] | 438 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] |
439 self.assertTrue(mount in mounts) | 439 self.assertTrue(mount in mounts) |
440 psutil.disk_usage(mount) | 440 psutil.disk_usage(mount) |
441 | 441 |
442 # XXX | 442 def test_network_io_counters(self): |
443 @skipUnless(hasattr(psutil, "network_io_counters")) | |
444 def test_anetwork_io_counters(self): | |
445 def check_ntuple(nt): | 443 def check_ntuple(nt): |
446 self.assertEqual(nt[0], nt.bytes_sent) | 444 self.assertEqual(nt[0], nt.bytes_sent) |
447 self.assertEqual(nt[1], nt.bytes_recv) | 445 self.assertEqual(nt[1], nt.bytes_recv) |
448 self.assertEqual(nt[2], nt.packets_sent) | 446 self.assertEqual(nt[2], nt.packets_sent) |
449 self.assertEqual(nt[3], nt.packets_recv) | 447 self.assertEqual(nt[3], nt.packets_recv) |
450 self.assertTrue(nt.bytes_sent >= 0) | 448 self.assertTrue(nt.bytes_sent >= 0) |
451 self.assertTrue(nt.bytes_recv >= 0) | 449 self.assertTrue(nt.bytes_recv >= 0) |
452 self.assertTrue(nt.packets_sent >= 0) | 450 self.assertTrue(nt.packets_sent >= 0) |
453 self.assertTrue(nt.packets_recv >= 0) | 451 self.assertTrue(nt.packets_recv >= 0) |
454 | 452 |
455 ret = psutil.network_io_counters(pernic=False) | 453 ret = psutil.network_io_counters(pernic=False) |
456 check_ntuple(ret) | 454 check_ntuple(ret) |
457 ret = psutil.network_io_counters(pernic=True) | 455 ret = psutil.network_io_counters(pernic=True) |
458 for name, ntuple in ret.iteritems(): | 456 self.assertTrue(ret != []) |
459 self.assertTrue(name) | 457 for key in ret: |
460 check_ntuple(ntuple) | 458 self.assertTrue(key) |
461 # XXX | 459 check_ntuple(ret[key]) |
462 @skipUnless(hasattr(psutil, "disk_io_counters")) | 460 |
463 def test_disk_io_counters(self): | 461 def test_disk_io_counters(self): |
464 def check_ntuple(nt): | 462 def check_ntuple(nt): |
465 self.assertEqual(nt[0], nt.read_count) | 463 self.assertEqual(nt[0], nt.read_count) |
466 self.assertEqual(nt[1], nt.write_count) | 464 self.assertEqual(nt[1], nt.write_count) |
467 self.assertEqual(nt[2], nt.read_bytes) | 465 self.assertEqual(nt[2], nt.read_bytes) |
468 self.assertEqual(nt[3], nt.write_bytes) | 466 self.assertEqual(nt[3], nt.write_bytes) |
469 self.assertEqual(nt[4], nt.read_time) | 467 self.assertEqual(nt[4], nt.read_time) |
470 self.assertEqual(nt[5], nt.write_time) | 468 self.assertEqual(nt[5], nt.write_time) |
471 self.assertTrue(nt.read_count >= 0) | 469 self.assertTrue(nt.read_count >= 0) |
472 self.assertTrue(nt.write_count >= 0) | 470 self.assertTrue(nt.write_count >= 0) |
473 self.assertTrue(nt.read_bytes >= 0) | 471 self.assertTrue(nt.read_bytes >= 0) |
474 self.assertTrue(nt.write_bytes >= 0) | 472 self.assertTrue(nt.write_bytes >= 0) |
475 self.assertTrue(nt.read_time >= 0) | 473 self.assertTrue(nt.read_time >= 0) |
476 self.assertTrue(nt.write_time >= 0) | 474 self.assertTrue(nt.write_time >= 0) |
477 | 475 |
478 ret = psutil.disk_io_counters(perdisk=False) | 476 ret = psutil.disk_io_counters(perdisk=False) |
479 check_ntuple(ret) | 477 check_ntuple(ret) |
480 ret = psutil.disk_io_counters(perdisk=True) | 478 ret = psutil.disk_io_counters(perdisk=True) |
481 for name, ntuple in ret.iteritems(): | 479 for key in ret: |
482 self.assertTrue(name) | 480 self.assertTrue(key) |
483 check_ntuple(ntuple) | 481 check_ntuple(ret[key]) |
484 | 482 |
485 # ==================== | 483 # ==================== |
486 # Process object tests | 484 # Process object tests |
487 # ==================== | 485 # ==================== |
488 | 486 |
489 def test_kill(self): | 487 def test_kill(self): |
490 sproc = get_test_subprocess() | 488 sproc = get_test_subprocess() |
491 test_pid = sproc.pid | 489 test_pid = sproc.pid |
492 wait_for_pid(test_pid) | 490 wait_for_pid(test_pid) |
493 p = psutil.Process(test_pid) | 491 p = psutil.Process(test_pid) |
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
609 self.assertFalse(p.is_running()) | 607 self.assertFalse(p.is_running()) |
610 | 608 |
611 def test_cpu_percent(self): | 609 def test_cpu_percent(self): |
612 p = psutil.Process(os.getpid()) | 610 p = psutil.Process(os.getpid()) |
613 p.get_cpu_percent(interval=0.001) | 611 p.get_cpu_percent(interval=0.001) |
614 p.get_cpu_percent(interval=0.001) | 612 p.get_cpu_percent(interval=0.001) |
615 for x in range(100): | 613 for x in range(100): |
616 percent = p.get_cpu_percent(interval=None) | 614 percent = p.get_cpu_percent(interval=None) |
617 self.assertTrue(isinstance(percent, float)) | 615 self.assertTrue(isinstance(percent, float)) |
618 self.assertTrue(percent >= 0.0) | 616 self.assertTrue(percent >= 0.0) |
619 self.assertTrue(percent <= 100.0) | 617 if os.name != 'posix': |
| 618 self.assertTrue(percent <= 100.0) |
| 619 else: |
| 620 self.assertTrue(percent >= 0.0) |
620 | 621 |
621 def test_cpu_times(self): | 622 def test_cpu_times(self): |
622 times = psutil.Process(os.getpid()).get_cpu_times() | 623 times = psutil.Process(os.getpid()).get_cpu_times() |
623 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) | 624 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) |
624 # make sure returned values can be pretty printed with strftime | 625 # make sure returned values can be pretty printed with strftime |
625 time.strftime("%H:%M:%S", time.localtime(times.user)) | 626 time.strftime("%H:%M:%S", time.localtime(times.user)) |
626 time.strftime("%H:%M:%S", time.localtime(times.system)) | 627 time.strftime("%H:%M:%S", time.localtime(times.system)) |
627 | 628 |
628 # Test Process.cpu_times() against os.times() | 629 # Test Process.cpu_times() against os.times() |
629 # os.times() is broken on Python 2.6 | 630 # os.times() is broken on Python 2.6 |
(...skipping 253 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
883 first_nice = p.nice | 884 first_nice = p.nice |
884 p.nice = 1 | 885 p.nice = 1 |
885 self.assertEqual(p.nice, 1) | 886 self.assertEqual(p.nice, 1) |
886 # going back to previous nice value raises AccessDenied on O
SX | 887 # going back to previous nice value raises AccessDenied on O
SX |
887 if not OSX: | 888 if not OSX: |
888 p.nice = 0 | 889 p.nice = 0 |
889 self.assertEqual(p.nice, 0) | 890 self.assertEqual(p.nice, 0) |
890 except psutil.AccessDenied: | 891 except psutil.AccessDenied: |
891 pass | 892 pass |
892 finally: | 893 finally: |
893 # going back to previous nice value raises AccessDenied on OSX | 894 try: |
894 if not OSX: | |
895 p.nice = first_nice | 895 p.nice = first_nice |
| 896 except psutil.AccessDenied: |
| 897 pass |
896 | 898 |
897 if os.name == 'nt': | 899 if os.name == 'nt': |
898 | 900 |
899 def test_nice(self): | 901 def test_nice(self): |
900 p = psutil.Process(os.getpid()) | 902 p = psutil.Process(os.getpid()) |
901 self.assertRaises(TypeError, setattr, p, "nice", "str") | 903 self.assertRaises(TypeError, setattr, p, "nice", "str") |
902 try: | 904 try: |
903 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) | 905 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) |
904 p.nice = psutil.HIGH_PRIORITY_CLASS | 906 p.nice = psutil.HIGH_PRIORITY_CLASS |
905 self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS) | 907 self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS) |
(...skipping 18 matching lines...) Expand all Loading... |
924 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) | 926 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) |
925 elif WINDOWS: | 927 elif WINDOWS: |
926 expected_username = os.environ['USERNAME'] | 928 expected_username = os.environ['USERNAME'] |
927 expected_domain = os.environ['USERDOMAIN'] | 929 expected_domain = os.environ['USERDOMAIN'] |
928 domain, username = p.username.split('\\') | 930 domain, username = p.username.split('\\') |
929 self.assertEqual(domain, expected_domain) | 931 self.assertEqual(domain, expected_domain) |
930 self.assertEqual(username, expected_username) | 932 self.assertEqual(username, expected_username) |
931 else: | 933 else: |
932 p.username | 934 p.username |
933 | 935 |
934 @skipUnless(WINDOWS or LINUX) | 936 @skipIf(not hasattr(psutil.Process, "getcwd")) |
935 def test_getcwd(self): | 937 def test_getcwd(self): |
936 sproc = get_test_subprocess() | 938 sproc = get_test_subprocess() |
937 wait_for_pid(sproc.pid) | 939 wait_for_pid(sproc.pid) |
938 p = psutil.Process(sproc.pid) | 940 p = psutil.Process(sproc.pid) |
939 self.assertEqual(p.getcwd(), os.getcwd()) | 941 self.assertEqual(p.getcwd(), os.getcwd()) |
940 | 942 |
941 @skipUnless(WINDOWS or LINUX) | 943 @skipIf(not hasattr(psutil.Process, "getcwd")) |
942 def test_getcwd_2(self): | 944 def test_getcwd_2(self): |
943 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] | 945 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] |
944 sproc = get_test_subprocess(cmd) | 946 sproc = get_test_subprocess(cmd) |
945 wait_for_pid(sproc.pid) | 947 wait_for_pid(sproc.pid) |
946 p = psutil.Process(sproc.pid) | 948 p = psutil.Process(sproc.pid) |
947 time.sleep(0.1) | 949 time.sleep(0.1) |
948 expected_dir = os.path.dirname(os.getcwd()) | 950 expected_dir = os.path.dirname(os.getcwd()) |
949 self.assertEqual(p.getcwd(), expected_dir) | 951 self.assertEqual(p.getcwd(), expected_dir) |
950 | 952 |
951 def test_get_open_files(self): | 953 def test_get_open_files(self): |
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1035 self.assertEqual(con.fd, -1) | 1037 self.assertEqual(con.fd, -1) |
1036 else: | 1038 else: |
1037 self.assertTrue(con.fd > 0) | 1039 self.assertTrue(con.fd > 0) |
1038 # test positions | 1040 # test positions |
1039 self.assertEqual(con[0], con.fd) | 1041 self.assertEqual(con[0], con.fd) |
1040 self.assertEqual(con[1], con.family) | 1042 self.assertEqual(con[1], con.family) |
1041 self.assertEqual(con[2], con.type) | 1043 self.assertEqual(con[2], con.type) |
1042 self.assertEqual(con[3], con.local_address) | 1044 self.assertEqual(con[3], con.local_address) |
1043 self.assertEqual(con[4], con.remote_address) | 1045 self.assertEqual(con[4], con.remote_address) |
1044 self.assertEqual(con[5], con.status) | 1046 self.assertEqual(con[5], con.status) |
| 1047 # test kind arg |
| 1048 self.assertRaises(ValueError, p.get_connections, 'foo') |
1045 | 1049 |
1046 @skipUnless(supports_ipv6()) | 1050 @skipUnless(supports_ipv6()) |
1047 def test_get_connections_ipv6(self): | 1051 def test_get_connections_ipv6(self): |
1048 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | 1052 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
1049 s.bind(('::1', 0)) | 1053 s.bind(('::1', 0)) |
1050 s.listen(1) | 1054 s.listen(1) |
1051 cons = psutil.Process(os.getpid()).get_connections() | 1055 cons = psutil.Process(os.getpid()).get_connections() |
1052 s.close() | 1056 s.close() |
1053 self.assertEqual(len(cons), 1) | 1057 self.assertEqual(len(cons), 1) |
1054 self.assertEqual(cons[0].local_address[0], '::1') | 1058 self.assertEqual(cons[0].local_address[0], '::1') |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1109 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, | 1113 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, |
1110 addr="127.0.0.1") | 1114 addr="127.0.0.1") |
1111 udp4_template = Template(udp_template).substitute(family=socket.AF_INET, | 1115 udp4_template = Template(udp_template).substitute(family=socket.AF_INET, |
1112 addr="127.0.0.1") | 1116 addr="127.0.0.1") |
1113 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6
, | 1117 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6
, |
1114 addr="::1") | 1118 addr="::1") |
1115 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6
, | 1119 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6
, |
1116 addr="::1") | 1120 addr="::1") |
1117 | 1121 |
1118 # launch various subprocess instantiating a socket of various | 1122 # launch various subprocess instantiating a socket of various |
1119 # families and tupes to enrich psutil results | 1123 # families and types to enrich psutil results |
1120 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) | 1124 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) |
1121 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) | 1125 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) |
1122 if supports_ipv6(): | 1126 if supports_ipv6(): |
1123 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) | 1127 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) |
1124 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) | 1128 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) |
1125 else: | 1129 else: |
1126 tcp6_proc = None | 1130 tcp6_proc = None |
1127 udp6_proc = None | 1131 udp6_proc = None |
1128 | 1132 |
1129 # --- check connections of all processes | 1133 # --- check connections of all processes |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1169 if hasattr(dupsock, "family"): | 1173 if hasattr(dupsock, "family"): |
1170 self.assertEqual(dupsock.family, conn.family) | 1174 self.assertEqual(dupsock.family, conn.family) |
1171 self.assertEqual(dupsock.type, conn.type) | 1175 self.assertEqual(dupsock.type, conn.type) |
1172 finally: | 1176 finally: |
1173 if dupsock is not None: | 1177 if dupsock is not None: |
1174 dupsock.close() | 1178 dupsock.close() |
1175 | 1179 |
1176 | 1180 |
1177 # --- check matches against subprocesses | 1181 # --- check matches against subprocesses |
1178 | 1182 |
| 1183 all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6", |
| 1184 "udp", "udp4", "udp6") |
| 1185 |
1179 for p in psutil.Process(os.getpid()).get_children(): | 1186 for p in psutil.Process(os.getpid()).get_children(): |
1180 for conn in p.get_connections(): | 1187 for conn in p.get_connections(): |
1181 # TCP v4 | 1188 # TCP v4 |
1182 if p.pid == tcp4_proc.pid: | 1189 if p.pid == tcp4_proc.pid: |
1183 self.assertEqual(conn.family, socket.AF_INET) | 1190 self.assertEqual(conn.family, socket.AF_INET) |
1184 self.assertEqual(conn.type, socket.SOCK_STREAM) | 1191 self.assertEqual(conn.type, socket.SOCK_STREAM) |
1185 self.assertEqual(conn.local_address[0], "127.0.0.1") | 1192 self.assertEqual(conn.local_address[0], "127.0.0.1") |
1186 self.assertEqual(conn.remote_address, ()) | 1193 self.assertEqual(conn.remote_address, ()) |
1187 self.assertEqual(conn.status, "LISTEN") | 1194 self.assertEqual(conn.status, "LISTEN") |
| 1195 for kind in all_kinds: |
| 1196 cons = p.get_connections(kind=kind) |
| 1197 if kind in ("all", "inet", "inet4", "tcp", "tcp4"): |
| 1198 self.assertTrue(cons != []) |
| 1199 else: |
| 1200 self.assertEqual(cons, []) |
1188 # UDP v4 | 1201 # UDP v4 |
1189 elif p.pid == udp4_proc.pid: | 1202 elif p.pid == udp4_proc.pid: |
1190 self.assertEqual(conn.family, socket.AF_INET) | 1203 self.assertEqual(conn.family, socket.AF_INET) |
1191 self.assertEqual(conn.type, socket.SOCK_DGRAM) | 1204 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
1192 self.assertEqual(conn.local_address[0], "127.0.0.1") | 1205 self.assertEqual(conn.local_address[0], "127.0.0.1") |
1193 self.assertEqual(conn.remote_address, ()) | 1206 self.assertEqual(conn.remote_address, ()) |
1194 self.assertEqual(conn.status, "") | 1207 self.assertEqual(conn.status, "") |
| 1208 for kind in all_kinds: |
| 1209 cons = p.get_connections(kind=kind) |
| 1210 if kind in ("all", "inet", "inet4", "udp", "udp4"): |
| 1211 self.assertTrue(cons != []) |
| 1212 else: |
| 1213 self.assertEqual(cons, []) |
1195 # TCP v6 | 1214 # TCP v6 |
1196 elif p.pid == getattr(tcp6_proc, "pid", None): | 1215 elif p.pid == getattr(tcp6_proc, "pid", None): |
1197 self.assertEqual(conn.family, socket.AF_INET6) | 1216 self.assertEqual(conn.family, socket.AF_INET6) |
1198 self.assertEqual(conn.type, socket.SOCK_STREAM) | 1217 self.assertEqual(conn.type, socket.SOCK_STREAM) |
1199 self.assertTrue(conn.local_address[0] in ("::", "::1")) | 1218 self.assertTrue(conn.local_address[0] in ("::", "::1")) |
1200 self.assertEqual(conn.remote_address, ()) | 1219 self.assertEqual(conn.remote_address, ()) |
1201 self.assertEqual(conn.status, "LISTEN") | 1220 self.assertEqual(conn.status, "LISTEN") |
| 1221 for kind in all_kinds: |
| 1222 cons = p.get_connections(kind=kind) |
| 1223 if kind in ("all", "inet", "inet6", "tcp", "tcp6"): |
| 1224 self.assertTrue(cons != []) |
| 1225 else: |
| 1226 self.assertEqual(cons, []) |
1202 # UDP v6 | 1227 # UDP v6 |
1203 elif p.pid == getattr(udp6_proc, "pid", None): | 1228 elif p.pid == getattr(udp6_proc, "pid", None): |
1204 self.assertEqual(conn.family, socket.AF_INET6) | 1229 self.assertEqual(conn.family, socket.AF_INET6) |
1205 self.assertEqual(conn.type, socket.SOCK_DGRAM) | 1230 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
1206 self.assertTrue(conn.local_address[0] in ("::", "::1")) | 1231 self.assertTrue(conn.local_address[0] in ("::", "::1")) |
1207 self.assertEqual(conn.remote_address, ()) | 1232 self.assertEqual(conn.remote_address, ()) |
1208 self.assertEqual(conn.status, "") | 1233 self.assertEqual(conn.status, "") |
| 1234 for kind in all_kinds: |
| 1235 cons = p.get_connections(kind=kind) |
| 1236 if kind in ("all", "inet", "inet6", "udp", "udp6"): |
| 1237 self.assertTrue(cons != []) |
| 1238 else: |
| 1239 self.assertEqual(cons, []) |
1209 | 1240 |
1210 def test_parent_ppid(self): | 1241 def test_parent_ppid(self): |
1211 this_parent = os.getpid() | 1242 this_parent = os.getpid() |
1212 sproc = get_test_subprocess() | 1243 sproc = get_test_subprocess() |
1213 p = psutil.Process(sproc.pid) | 1244 p = psutil.Process(sproc.pid) |
1214 self.assertEqual(p.ppid, this_parent) | 1245 self.assertEqual(p.ppid, this_parent) |
1215 self.assertEqual(p.parent.pid, this_parent) | 1246 self.assertEqual(p.parent.pid, this_parent) |
1216 # no other process is supposed to have us as parent | 1247 # no other process is supposed to have us as parent |
1217 for p in psutil.process_iter(): | 1248 for p in psutil.process_iter(): |
1218 if p.pid == sproc.pid: | 1249 if p.pid == sproc.pid: |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1300 self.assertTrue(str(sproc.pid) in str(p)) | 1331 self.assertTrue(str(sproc.pid) in str(p)) |
1301 self.assertTrue("terminated" in str(p)) | 1332 self.assertTrue("terminated" in str(p)) |
1302 | 1333 |
1303 def test_fetch_all(self): | 1334 def test_fetch_all(self): |
1304 valid_procs = 0 | 1335 valid_procs = 0 |
1305 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', | 1336 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', |
1306 'kill', 'wait'] | 1337 'kill', 'wait'] |
1307 excluded_names += ['get_cpu_percent', 'get_children'] | 1338 excluded_names += ['get_cpu_percent', 'get_children'] |
1308 # XXX - skip slow lsof implementation; | 1339 # XXX - skip slow lsof implementation; |
1309 if BSD: | 1340 if BSD: |
1310 excluded_names += ['get_open_files', 'get_connections'] | |
1311 if OSX: | |
1312 excluded_names += ['get_connections'] | 1341 excluded_names += ['get_connections'] |
1313 attrs = [] | 1342 attrs = [] |
1314 for name in dir(psutil.Process): | 1343 for name in dir(psutil.Process): |
1315 if name.startswith("_"): | 1344 if name.startswith("_"): |
1316 continue | 1345 continue |
1317 if name.startswith("set_"): | 1346 if name.startswith("set_"): |
1318 continue | 1347 continue |
1319 if name in excluded_names: | 1348 if name in excluded_names: |
1320 continue | 1349 continue |
1321 attrs.append(name) | 1350 attrs.append(name) |
(...skipping 11 matching lines...) Expand all Loading... |
1333 except (psutil.NoSuchProcess, psutil.AccessDenied): | 1362 except (psutil.NoSuchProcess, psutil.AccessDenied): |
1334 err = sys.exc_info()[1] | 1363 err = sys.exc_info()[1] |
1335 self.assertEqual(err.pid, p.pid) | 1364 self.assertEqual(err.pid, p.pid) |
1336 if err.name: | 1365 if err.name: |
1337 self.assertEqual(err.name, p.name) | 1366 self.assertEqual(err.name, p.name) |
1338 self.assertTrue(str(err)) | 1367 self.assertTrue(str(err)) |
1339 self.assertTrue(err.msg) | 1368 self.assertTrue(err.msg) |
1340 else: | 1369 else: |
1341 if name == 'parent' or ret in (0, 0.0, [], None): | 1370 if name == 'parent' or ret in (0, 0.0, [], None): |
1342 continue | 1371 continue |
| 1372 # getcwd() on FreeBSD may be an empty string |
| 1373 # in case of a system process |
| 1374 if name == 'getcwd' and BSD and ret == '': |
| 1375 continue |
1343 self.assertTrue(ret) | 1376 self.assertTrue(ret) |
1344 if name == "exe": | 1377 if name == "exe": |
1345 self.assertTrue(os.path.isfile(ret)) | 1378 self.assertTrue(os.path.isfile(ret)) |
1346 elif name == "getcwd": | 1379 elif name == "getcwd": |
1347 # XXX - temporary fix; on my Linux box | 1380 # XXX - temporary fix; on my Linux box |
1348 # chrome process cws is errnously reported | 1381 # chrome process cws is errnously reported |
1349 # as /proc/4144/fdinfo whichd doesn't exist | 1382 # as /proc/4144/fdinfo whichd doesn't exist |
1350 if 'chrome' in p.name: | 1383 if 'chrome' in p.name: |
1351 continue | 1384 continue |
1352 self.assertTrue(os.path.isdir(ret)) | 1385 self.assertTrue(os.path.isdir(ret)) |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1390 elif WINDOWS: | 1423 elif WINDOWS: |
1391 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') | 1424 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') |
1392 else: | 1425 else: |
1393 p.username | 1426 p.username |
1394 | 1427 |
1395 self.assertTrue(0 in psutil.get_pid_list()) | 1428 self.assertTrue(0 in psutil.get_pid_list()) |
1396 self.assertTrue(psutil.pid_exists(0)) | 1429 self.assertTrue(psutil.pid_exists(0)) |
1397 | 1430 |
1398 def test_Popen(self): | 1431 def test_Popen(self): |
1399 # Popen class test | 1432 # Popen class test |
| 1433 # XXX this test causes a ResourceWarning on Python 3 because |
| 1434 # psutil.__subproc instance doesn't get propertly freed. |
| 1435 # Not sure what to do though. |
1400 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] | 1436 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] |
1401 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | 1437 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
1402 try: | 1438 try: |
1403 proc.name | 1439 proc.name |
1404 proc.stdin | 1440 proc.stdin |
1405 self.assertTrue(hasattr(proc, 'name')) | 1441 self.assertTrue(hasattr(proc, 'name')) |
1406 self.assertTrue(hasattr(proc, 'stdin')) | 1442 self.assertTrue(hasattr(proc, 'stdin')) |
1407 self.assertRaises(AttributeError, getattr, proc, 'foo') | 1443 self.assertRaises(AttributeError, getattr, proc, 'foo') |
1408 finally: | 1444 finally: |
1409 proc.kill() | 1445 proc.kill() |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1484 | 1520 |
1485 f = open(TESTFN, 'w') | 1521 f = open(TESTFN, 'w') |
1486 f.close() | 1522 f.close() |
1487 atexit.register(lambda: os.remove(TESTFN)) | 1523 atexit.register(lambda: os.remove(TESTFN)) |
1488 | 1524 |
1489 unittest.TextTestRunner(verbosity=2).run(test_suite) | 1525 unittest.TextTestRunner(verbosity=2).run(test_suite) |
1490 DEVNULL.close() | 1526 DEVNULL.close() |
1491 | 1527 |
1492 if __name__ == '__main__': | 1528 if __name__ == '__main__': |
1493 test_main() | 1529 test_main() |
OLD | NEW |