| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # | 2 # |
| 3 # $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $ | 3 # $Id: test_psutil.py 1204 2011-10-24 19:19:01Z g.rodola $ |
| 4 # | 4 # |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. | 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. |
| 6 # Use of this source code is governed by a BSD-style license that can be | 6 # Use of this source code is governed by a BSD-style license that can be |
| 7 # found in the LICENSE file. | 7 # found in the LICENSE file. |
| 8 | 8 |
| 9 """ | 9 """ |
| 10 psutil test suite. | 10 psutil test suite. |
| 11 | 11 |
| 12 Note: this is targeted for both python 2.x and 3.x so there's no need | 12 Note: this is targeted for both python 2.x and 3.x so there's no need |
| 13 to use 2to3 tool first. | 13 to use 2to3 tool first. |
| (...skipping 418 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 432 path = os.path.abspath(path) | 432 path = os.path.abspath(path) |
| 433 while not os.path.ismount(path): | 433 while not os.path.ismount(path): |
| 434 path = os.path.dirname(path) | 434 path = os.path.dirname(path) |
| 435 return path | 435 return path |
| 436 | 436 |
| 437 mount = find_mount_point(__file__) | 437 mount = find_mount_point(__file__) |
| 438 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] | 438 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] |
| 439 self.assertTrue(mount in mounts) | 439 self.assertTrue(mount in mounts) |
| 440 psutil.disk_usage(mount) | 440 psutil.disk_usage(mount) |
| 441 | 441 |
| 442 # XXX | 442 def test_network_io_counters(self): |
| 443 @skipUnless(hasattr(psutil, "network_io_counters")) | |
| 444 def test_anetwork_io_counters(self): | |
| 445 def check_ntuple(nt): | 443 def check_ntuple(nt): |
| 446 self.assertEqual(nt[0], nt.bytes_sent) | 444 self.assertEqual(nt[0], nt.bytes_sent) |
| 447 self.assertEqual(nt[1], nt.bytes_recv) | 445 self.assertEqual(nt[1], nt.bytes_recv) |
| 448 self.assertEqual(nt[2], nt.packets_sent) | 446 self.assertEqual(nt[2], nt.packets_sent) |
| 449 self.assertEqual(nt[3], nt.packets_recv) | 447 self.assertEqual(nt[3], nt.packets_recv) |
| 450 self.assertTrue(nt.bytes_sent >= 0) | 448 self.assertTrue(nt.bytes_sent >= 0) |
| 451 self.assertTrue(nt.bytes_recv >= 0) | 449 self.assertTrue(nt.bytes_recv >= 0) |
| 452 self.assertTrue(nt.packets_sent >= 0) | 450 self.assertTrue(nt.packets_sent >= 0) |
| 453 self.assertTrue(nt.packets_recv >= 0) | 451 self.assertTrue(nt.packets_recv >= 0) |
| 454 | 452 |
| 455 ret = psutil.network_io_counters(pernic=False) | 453 ret = psutil.network_io_counters(pernic=False) |
| 456 check_ntuple(ret) | 454 check_ntuple(ret) |
| 457 ret = psutil.network_io_counters(pernic=True) | 455 ret = psutil.network_io_counters(pernic=True) |
| 458 for name, ntuple in ret.iteritems(): | 456 self.assertTrue(ret != []) |
| 459 self.assertTrue(name) | 457 for key in ret: |
| 460 check_ntuple(ntuple) | 458 self.assertTrue(key) |
| 461 # XXX | 459 check_ntuple(ret[key]) |
| 462 @skipUnless(hasattr(psutil, "disk_io_counters")) | 460 |
| 463 def test_disk_io_counters(self): | 461 def test_disk_io_counters(self): |
| 464 def check_ntuple(nt): | 462 def check_ntuple(nt): |
| 465 self.assertEqual(nt[0], nt.read_count) | 463 self.assertEqual(nt[0], nt.read_count) |
| 466 self.assertEqual(nt[1], nt.write_count) | 464 self.assertEqual(nt[1], nt.write_count) |
| 467 self.assertEqual(nt[2], nt.read_bytes) | 465 self.assertEqual(nt[2], nt.read_bytes) |
| 468 self.assertEqual(nt[3], nt.write_bytes) | 466 self.assertEqual(nt[3], nt.write_bytes) |
| 469 self.assertEqual(nt[4], nt.read_time) | 467 self.assertEqual(nt[4], nt.read_time) |
| 470 self.assertEqual(nt[5], nt.write_time) | 468 self.assertEqual(nt[5], nt.write_time) |
| 471 self.assertTrue(nt.read_count >= 0) | 469 self.assertTrue(nt.read_count >= 0) |
| 472 self.assertTrue(nt.write_count >= 0) | 470 self.assertTrue(nt.write_count >= 0) |
| 473 self.assertTrue(nt.read_bytes >= 0) | 471 self.assertTrue(nt.read_bytes >= 0) |
| 474 self.assertTrue(nt.write_bytes >= 0) | 472 self.assertTrue(nt.write_bytes >= 0) |
| 475 self.assertTrue(nt.read_time >= 0) | 473 self.assertTrue(nt.read_time >= 0) |
| 476 self.assertTrue(nt.write_time >= 0) | 474 self.assertTrue(nt.write_time >= 0) |
| 477 | 475 |
| 478 ret = psutil.disk_io_counters(perdisk=False) | 476 ret = psutil.disk_io_counters(perdisk=False) |
| 479 check_ntuple(ret) | 477 check_ntuple(ret) |
| 480 ret = psutil.disk_io_counters(perdisk=True) | 478 ret = psutil.disk_io_counters(perdisk=True) |
| 481 for name, ntuple in ret.iteritems(): | 479 for key in ret: |
| 482 self.assertTrue(name) | 480 self.assertTrue(key) |
| 483 check_ntuple(ntuple) | 481 check_ntuple(ret[key]) |
| 484 | 482 |
| 485 # ==================== | 483 # ==================== |
| 486 # Process object tests | 484 # Process object tests |
| 487 # ==================== | 485 # ==================== |
| 488 | 486 |
| 489 def test_kill(self): | 487 def test_kill(self): |
| 490 sproc = get_test_subprocess() | 488 sproc = get_test_subprocess() |
| 491 test_pid = sproc.pid | 489 test_pid = sproc.pid |
| 492 wait_for_pid(test_pid) | 490 wait_for_pid(test_pid) |
| 493 p = psutil.Process(test_pid) | 491 p = psutil.Process(test_pid) |
| (...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 609 self.assertFalse(p.is_running()) | 607 self.assertFalse(p.is_running()) |
| 610 | 608 |
| 611 def test_cpu_percent(self): | 609 def test_cpu_percent(self): |
| 612 p = psutil.Process(os.getpid()) | 610 p = psutil.Process(os.getpid()) |
| 613 p.get_cpu_percent(interval=0.001) | 611 p.get_cpu_percent(interval=0.001) |
| 614 p.get_cpu_percent(interval=0.001) | 612 p.get_cpu_percent(interval=0.001) |
| 615 for x in range(100): | 613 for x in range(100): |
| 616 percent = p.get_cpu_percent(interval=None) | 614 percent = p.get_cpu_percent(interval=None) |
| 617 self.assertTrue(isinstance(percent, float)) | 615 self.assertTrue(isinstance(percent, float)) |
| 618 self.assertTrue(percent >= 0.0) | 616 self.assertTrue(percent >= 0.0) |
| 619 self.assertTrue(percent <= 100.0) | 617 if os.name != 'posix': |
| 618 self.assertTrue(percent <= 100.0) |
| 619 else: |
| 620 self.assertTrue(percent >= 0.0) |
| 620 | 621 |
| 621 def test_cpu_times(self): | 622 def test_cpu_times(self): |
| 622 times = psutil.Process(os.getpid()).get_cpu_times() | 623 times = psutil.Process(os.getpid()).get_cpu_times() |
| 623 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) | 624 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) |
| 624 # make sure returned values can be pretty printed with strftime | 625 # make sure returned values can be pretty printed with strftime |
| 625 time.strftime("%H:%M:%S", time.localtime(times.user)) | 626 time.strftime("%H:%M:%S", time.localtime(times.user)) |
| 626 time.strftime("%H:%M:%S", time.localtime(times.system)) | 627 time.strftime("%H:%M:%S", time.localtime(times.system)) |
| 627 | 628 |
| 628 # Test Process.cpu_times() against os.times() | 629 # Test Process.cpu_times() against os.times() |
| 629 # os.times() is broken on Python 2.6 | 630 # os.times() is broken on Python 2.6 |
| (...skipping 253 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 883 first_nice = p.nice | 884 first_nice = p.nice |
| 884 p.nice = 1 | 885 p.nice = 1 |
| 885 self.assertEqual(p.nice, 1) | 886 self.assertEqual(p.nice, 1) |
| 886 # going back to previous nice value raises AccessDenied on O
SX | 887 # going back to previous nice value raises AccessDenied on O
SX |
| 887 if not OSX: | 888 if not OSX: |
| 888 p.nice = 0 | 889 p.nice = 0 |
| 889 self.assertEqual(p.nice, 0) | 890 self.assertEqual(p.nice, 0) |
| 890 except psutil.AccessDenied: | 891 except psutil.AccessDenied: |
| 891 pass | 892 pass |
| 892 finally: | 893 finally: |
| 893 # going back to previous nice value raises AccessDenied on OSX | 894 try: |
| 894 if not OSX: | |
| 895 p.nice = first_nice | 895 p.nice = first_nice |
| 896 except psutil.AccessDenied: |
| 897 pass |
| 896 | 898 |
| 897 if os.name == 'nt': | 899 if os.name == 'nt': |
| 898 | 900 |
| 899 def test_nice(self): | 901 def test_nice(self): |
| 900 p = psutil.Process(os.getpid()) | 902 p = psutil.Process(os.getpid()) |
| 901 self.assertRaises(TypeError, setattr, p, "nice", "str") | 903 self.assertRaises(TypeError, setattr, p, "nice", "str") |
| 902 try: | 904 try: |
| 903 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) | 905 self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) |
| 904 p.nice = psutil.HIGH_PRIORITY_CLASS | 906 p.nice = psutil.HIGH_PRIORITY_CLASS |
| 905 self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS) | 907 self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS) |
| (...skipping 18 matching lines...) Expand all Loading... |
| 924 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) | 926 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) |
| 925 elif WINDOWS: | 927 elif WINDOWS: |
| 926 expected_username = os.environ['USERNAME'] | 928 expected_username = os.environ['USERNAME'] |
| 927 expected_domain = os.environ['USERDOMAIN'] | 929 expected_domain = os.environ['USERDOMAIN'] |
| 928 domain, username = p.username.split('\\') | 930 domain, username = p.username.split('\\') |
| 929 self.assertEqual(domain, expected_domain) | 931 self.assertEqual(domain, expected_domain) |
| 930 self.assertEqual(username, expected_username) | 932 self.assertEqual(username, expected_username) |
| 931 else: | 933 else: |
| 932 p.username | 934 p.username |
| 933 | 935 |
| 934 @skipUnless(WINDOWS or LINUX) | 936 @skipIf(not hasattr(psutil.Process, "getcwd")) |
| 935 def test_getcwd(self): | 937 def test_getcwd(self): |
| 936 sproc = get_test_subprocess() | 938 sproc = get_test_subprocess() |
| 937 wait_for_pid(sproc.pid) | 939 wait_for_pid(sproc.pid) |
| 938 p = psutil.Process(sproc.pid) | 940 p = psutil.Process(sproc.pid) |
| 939 self.assertEqual(p.getcwd(), os.getcwd()) | 941 self.assertEqual(p.getcwd(), os.getcwd()) |
| 940 | 942 |
| 941 @skipUnless(WINDOWS or LINUX) | 943 @skipIf(not hasattr(psutil.Process, "getcwd")) |
| 942 def test_getcwd_2(self): | 944 def test_getcwd_2(self): |
| 943 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] | 945 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] |
| 944 sproc = get_test_subprocess(cmd) | 946 sproc = get_test_subprocess(cmd) |
| 945 wait_for_pid(sproc.pid) | 947 wait_for_pid(sproc.pid) |
| 946 p = psutil.Process(sproc.pid) | 948 p = psutil.Process(sproc.pid) |
| 947 time.sleep(0.1) | 949 time.sleep(0.1) |
| 948 expected_dir = os.path.dirname(os.getcwd()) | 950 expected_dir = os.path.dirname(os.getcwd()) |
| 949 self.assertEqual(p.getcwd(), expected_dir) | 951 self.assertEqual(p.getcwd(), expected_dir) |
| 950 | 952 |
| 951 def test_get_open_files(self): | 953 def test_get_open_files(self): |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1035 self.assertEqual(con.fd, -1) | 1037 self.assertEqual(con.fd, -1) |
| 1036 else: | 1038 else: |
| 1037 self.assertTrue(con.fd > 0) | 1039 self.assertTrue(con.fd > 0) |
| 1038 # test positions | 1040 # test positions |
| 1039 self.assertEqual(con[0], con.fd) | 1041 self.assertEqual(con[0], con.fd) |
| 1040 self.assertEqual(con[1], con.family) | 1042 self.assertEqual(con[1], con.family) |
| 1041 self.assertEqual(con[2], con.type) | 1043 self.assertEqual(con[2], con.type) |
| 1042 self.assertEqual(con[3], con.local_address) | 1044 self.assertEqual(con[3], con.local_address) |
| 1043 self.assertEqual(con[4], con.remote_address) | 1045 self.assertEqual(con[4], con.remote_address) |
| 1044 self.assertEqual(con[5], con.status) | 1046 self.assertEqual(con[5], con.status) |
| 1047 # test kind arg |
| 1048 self.assertRaises(ValueError, p.get_connections, 'foo') |
| 1045 | 1049 |
| 1046 @skipUnless(supports_ipv6()) | 1050 @skipUnless(supports_ipv6()) |
| 1047 def test_get_connections_ipv6(self): | 1051 def test_get_connections_ipv6(self): |
| 1048 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) | 1052 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| 1049 s.bind(('::1', 0)) | 1053 s.bind(('::1', 0)) |
| 1050 s.listen(1) | 1054 s.listen(1) |
| 1051 cons = psutil.Process(os.getpid()).get_connections() | 1055 cons = psutil.Process(os.getpid()).get_connections() |
| 1052 s.close() | 1056 s.close() |
| 1053 self.assertEqual(len(cons), 1) | 1057 self.assertEqual(len(cons), 1) |
| 1054 self.assertEqual(cons[0].local_address[0], '::1') | 1058 self.assertEqual(cons[0].local_address[0], '::1') |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1109 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, | 1113 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, |
| 1110 addr="127.0.0.1") | 1114 addr="127.0.0.1") |
| 1111 udp4_template = Template(udp_template).substitute(family=socket.AF_INET, | 1115 udp4_template = Template(udp_template).substitute(family=socket.AF_INET, |
| 1112 addr="127.0.0.1") | 1116 addr="127.0.0.1") |
| 1113 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6
, | 1117 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6
, |
| 1114 addr="::1") | 1118 addr="::1") |
| 1115 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6
, | 1119 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6
, |
| 1116 addr="::1") | 1120 addr="::1") |
| 1117 | 1121 |
| 1118 # launch various subprocess instantiating a socket of various | 1122 # launch various subprocess instantiating a socket of various |
| 1119 # families and tupes to enrich psutil results | 1123 # families and types to enrich psutil results |
| 1120 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) | 1124 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) |
| 1121 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) | 1125 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) |
| 1122 if supports_ipv6(): | 1126 if supports_ipv6(): |
| 1123 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) | 1127 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) |
| 1124 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) | 1128 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) |
| 1125 else: | 1129 else: |
| 1126 tcp6_proc = None | 1130 tcp6_proc = None |
| 1127 udp6_proc = None | 1131 udp6_proc = None |
| 1128 | 1132 |
| 1129 # --- check connections of all processes | 1133 # --- check connections of all processes |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1169 if hasattr(dupsock, "family"): | 1173 if hasattr(dupsock, "family"): |
| 1170 self.assertEqual(dupsock.family, conn.family) | 1174 self.assertEqual(dupsock.family, conn.family) |
| 1171 self.assertEqual(dupsock.type, conn.type) | 1175 self.assertEqual(dupsock.type, conn.type) |
| 1172 finally: | 1176 finally: |
| 1173 if dupsock is not None: | 1177 if dupsock is not None: |
| 1174 dupsock.close() | 1178 dupsock.close() |
| 1175 | 1179 |
| 1176 | 1180 |
| 1177 # --- check matches against subprocesses | 1181 # --- check matches against subprocesses |
| 1178 | 1182 |
| 1183 all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6", |
| 1184 "udp", "udp4", "udp6") |
| 1185 |
| 1179 for p in psutil.Process(os.getpid()).get_children(): | 1186 for p in psutil.Process(os.getpid()).get_children(): |
| 1180 for conn in p.get_connections(): | 1187 for conn in p.get_connections(): |
| 1181 # TCP v4 | 1188 # TCP v4 |
| 1182 if p.pid == tcp4_proc.pid: | 1189 if p.pid == tcp4_proc.pid: |
| 1183 self.assertEqual(conn.family, socket.AF_INET) | 1190 self.assertEqual(conn.family, socket.AF_INET) |
| 1184 self.assertEqual(conn.type, socket.SOCK_STREAM) | 1191 self.assertEqual(conn.type, socket.SOCK_STREAM) |
| 1185 self.assertEqual(conn.local_address[0], "127.0.0.1") | 1192 self.assertEqual(conn.local_address[0], "127.0.0.1") |
| 1186 self.assertEqual(conn.remote_address, ()) | 1193 self.assertEqual(conn.remote_address, ()) |
| 1187 self.assertEqual(conn.status, "LISTEN") | 1194 self.assertEqual(conn.status, "LISTEN") |
| 1195 for kind in all_kinds: |
| 1196 cons = p.get_connections(kind=kind) |
| 1197 if kind in ("all", "inet", "inet4", "tcp", "tcp4"): |
| 1198 self.assertTrue(cons != []) |
| 1199 else: |
| 1200 self.assertEqual(cons, []) |
| 1188 # UDP v4 | 1201 # UDP v4 |
| 1189 elif p.pid == udp4_proc.pid: | 1202 elif p.pid == udp4_proc.pid: |
| 1190 self.assertEqual(conn.family, socket.AF_INET) | 1203 self.assertEqual(conn.family, socket.AF_INET) |
| 1191 self.assertEqual(conn.type, socket.SOCK_DGRAM) | 1204 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
| 1192 self.assertEqual(conn.local_address[0], "127.0.0.1") | 1205 self.assertEqual(conn.local_address[0], "127.0.0.1") |
| 1193 self.assertEqual(conn.remote_address, ()) | 1206 self.assertEqual(conn.remote_address, ()) |
| 1194 self.assertEqual(conn.status, "") | 1207 self.assertEqual(conn.status, "") |
| 1208 for kind in all_kinds: |
| 1209 cons = p.get_connections(kind=kind) |
| 1210 if kind in ("all", "inet", "inet4", "udp", "udp4"): |
| 1211 self.assertTrue(cons != []) |
| 1212 else: |
| 1213 self.assertEqual(cons, []) |
| 1195 # TCP v6 | 1214 # TCP v6 |
| 1196 elif p.pid == getattr(tcp6_proc, "pid", None): | 1215 elif p.pid == getattr(tcp6_proc, "pid", None): |
| 1197 self.assertEqual(conn.family, socket.AF_INET6) | 1216 self.assertEqual(conn.family, socket.AF_INET6) |
| 1198 self.assertEqual(conn.type, socket.SOCK_STREAM) | 1217 self.assertEqual(conn.type, socket.SOCK_STREAM) |
| 1199 self.assertTrue(conn.local_address[0] in ("::", "::1")) | 1218 self.assertTrue(conn.local_address[0] in ("::", "::1")) |
| 1200 self.assertEqual(conn.remote_address, ()) | 1219 self.assertEqual(conn.remote_address, ()) |
| 1201 self.assertEqual(conn.status, "LISTEN") | 1220 self.assertEqual(conn.status, "LISTEN") |
| 1221 for kind in all_kinds: |
| 1222 cons = p.get_connections(kind=kind) |
| 1223 if kind in ("all", "inet", "inet6", "tcp", "tcp6"): |
| 1224 self.assertTrue(cons != []) |
| 1225 else: |
| 1226 self.assertEqual(cons, []) |
| 1202 # UDP v6 | 1227 # UDP v6 |
| 1203 elif p.pid == getattr(udp6_proc, "pid", None): | 1228 elif p.pid == getattr(udp6_proc, "pid", None): |
| 1204 self.assertEqual(conn.family, socket.AF_INET6) | 1229 self.assertEqual(conn.family, socket.AF_INET6) |
| 1205 self.assertEqual(conn.type, socket.SOCK_DGRAM) | 1230 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
| 1206 self.assertTrue(conn.local_address[0] in ("::", "::1")) | 1231 self.assertTrue(conn.local_address[0] in ("::", "::1")) |
| 1207 self.assertEqual(conn.remote_address, ()) | 1232 self.assertEqual(conn.remote_address, ()) |
| 1208 self.assertEqual(conn.status, "") | 1233 self.assertEqual(conn.status, "") |
| 1234 for kind in all_kinds: |
| 1235 cons = p.get_connections(kind=kind) |
| 1236 if kind in ("all", "inet", "inet6", "udp", "udp6"): |
| 1237 self.assertTrue(cons != []) |
| 1238 else: |
| 1239 self.assertEqual(cons, []) |
| 1209 | 1240 |
| 1210 def test_parent_ppid(self): | 1241 def test_parent_ppid(self): |
| 1211 this_parent = os.getpid() | 1242 this_parent = os.getpid() |
| 1212 sproc = get_test_subprocess() | 1243 sproc = get_test_subprocess() |
| 1213 p = psutil.Process(sproc.pid) | 1244 p = psutil.Process(sproc.pid) |
| 1214 self.assertEqual(p.ppid, this_parent) | 1245 self.assertEqual(p.ppid, this_parent) |
| 1215 self.assertEqual(p.parent.pid, this_parent) | 1246 self.assertEqual(p.parent.pid, this_parent) |
| 1216 # no other process is supposed to have us as parent | 1247 # no other process is supposed to have us as parent |
| 1217 for p in psutil.process_iter(): | 1248 for p in psutil.process_iter(): |
| 1218 if p.pid == sproc.pid: | 1249 if p.pid == sproc.pid: |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1300 self.assertTrue(str(sproc.pid) in str(p)) | 1331 self.assertTrue(str(sproc.pid) in str(p)) |
| 1301 self.assertTrue("terminated" in str(p)) | 1332 self.assertTrue("terminated" in str(p)) |
| 1302 | 1333 |
| 1303 def test_fetch_all(self): | 1334 def test_fetch_all(self): |
| 1304 valid_procs = 0 | 1335 valid_procs = 0 |
| 1305 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', | 1336 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', |
| 1306 'kill', 'wait'] | 1337 'kill', 'wait'] |
| 1307 excluded_names += ['get_cpu_percent', 'get_children'] | 1338 excluded_names += ['get_cpu_percent', 'get_children'] |
| 1308 # XXX - skip slow lsof implementation; | 1339 # XXX - skip slow lsof implementation; |
| 1309 if BSD: | 1340 if BSD: |
| 1310 excluded_names += ['get_open_files', 'get_connections'] | |
| 1311 if OSX: | |
| 1312 excluded_names += ['get_connections'] | 1341 excluded_names += ['get_connections'] |
| 1313 attrs = [] | 1342 attrs = [] |
| 1314 for name in dir(psutil.Process): | 1343 for name in dir(psutil.Process): |
| 1315 if name.startswith("_"): | 1344 if name.startswith("_"): |
| 1316 continue | 1345 continue |
| 1317 if name.startswith("set_"): | 1346 if name.startswith("set_"): |
| 1318 continue | 1347 continue |
| 1319 if name in excluded_names: | 1348 if name in excluded_names: |
| 1320 continue | 1349 continue |
| 1321 attrs.append(name) | 1350 attrs.append(name) |
| (...skipping 11 matching lines...) Expand all Loading... |
| 1333 except (psutil.NoSuchProcess, psutil.AccessDenied): | 1362 except (psutil.NoSuchProcess, psutil.AccessDenied): |
| 1334 err = sys.exc_info()[1] | 1363 err = sys.exc_info()[1] |
| 1335 self.assertEqual(err.pid, p.pid) | 1364 self.assertEqual(err.pid, p.pid) |
| 1336 if err.name: | 1365 if err.name: |
| 1337 self.assertEqual(err.name, p.name) | 1366 self.assertEqual(err.name, p.name) |
| 1338 self.assertTrue(str(err)) | 1367 self.assertTrue(str(err)) |
| 1339 self.assertTrue(err.msg) | 1368 self.assertTrue(err.msg) |
| 1340 else: | 1369 else: |
| 1341 if name == 'parent' or ret in (0, 0.0, [], None): | 1370 if name == 'parent' or ret in (0, 0.0, [], None): |
| 1342 continue | 1371 continue |
| 1372 # getcwd() on FreeBSD may be an empty string |
| 1373 # in case of a system process |
| 1374 if name == 'getcwd' and BSD and ret == '': |
| 1375 continue |
| 1343 self.assertTrue(ret) | 1376 self.assertTrue(ret) |
| 1344 if name == "exe": | 1377 if name == "exe": |
| 1345 self.assertTrue(os.path.isfile(ret)) | 1378 self.assertTrue(os.path.isfile(ret)) |
| 1346 elif name == "getcwd": | 1379 elif name == "getcwd": |
| 1347 # XXX - temporary fix; on my Linux box | 1380 # XXX - temporary fix; on my Linux box |
| 1348 # chrome process cws is errnously reported | 1381 # chrome process cws is errnously reported |
| 1349 # as /proc/4144/fdinfo whichd doesn't exist | 1382 # as /proc/4144/fdinfo whichd doesn't exist |
| 1350 if 'chrome' in p.name: | 1383 if 'chrome' in p.name: |
| 1351 continue | 1384 continue |
| 1352 self.assertTrue(os.path.isdir(ret)) | 1385 self.assertTrue(os.path.isdir(ret)) |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1390 elif WINDOWS: | 1423 elif WINDOWS: |
| 1391 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') | 1424 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') |
| 1392 else: | 1425 else: |
| 1393 p.username | 1426 p.username |
| 1394 | 1427 |
| 1395 self.assertTrue(0 in psutil.get_pid_list()) | 1428 self.assertTrue(0 in psutil.get_pid_list()) |
| 1396 self.assertTrue(psutil.pid_exists(0)) | 1429 self.assertTrue(psutil.pid_exists(0)) |
| 1397 | 1430 |
| 1398 def test_Popen(self): | 1431 def test_Popen(self): |
| 1399 # Popen class test | 1432 # Popen class test |
| 1433 # XXX this test causes a ResourceWarning on Python 3 because |
| 1434 # psutil.__subproc instance doesn't get propertly freed. |
| 1435 # Not sure what to do though. |
| 1400 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] | 1436 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] |
| 1401 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | 1437 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 1402 try: | 1438 try: |
| 1403 proc.name | 1439 proc.name |
| 1404 proc.stdin | 1440 proc.stdin |
| 1405 self.assertTrue(hasattr(proc, 'name')) | 1441 self.assertTrue(hasattr(proc, 'name')) |
| 1406 self.assertTrue(hasattr(proc, 'stdin')) | 1442 self.assertTrue(hasattr(proc, 'stdin')) |
| 1407 self.assertRaises(AttributeError, getattr, proc, 'foo') | 1443 self.assertRaises(AttributeError, getattr, proc, 'foo') |
| 1408 finally: | 1444 finally: |
| 1409 proc.kill() | 1445 proc.kill() |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1484 | 1520 |
| 1485 f = open(TESTFN, 'w') | 1521 f = open(TESTFN, 'w') |
| 1486 f.close() | 1522 f.close() |
| 1487 atexit.register(lambda: os.remove(TESTFN)) | 1523 atexit.register(lambda: os.remove(TESTFN)) |
| 1488 | 1524 |
| 1489 unittest.TextTestRunner(verbosity=2).run(test_suite) | 1525 unittest.TextTestRunner(verbosity=2).run(test_suite) |
| 1490 DEVNULL.close() | 1526 DEVNULL.close() |
| 1491 | 1527 |
| 1492 if __name__ == '__main__': | 1528 if __name__ == '__main__': |
| 1493 test_main() | 1529 test_main() |
| OLD | NEW |