Index: third_party/psutil/test/test_psutil.py |
diff --git a/third_party/psutil/test/test_psutil.py b/third_party/psutil/test/test_psutil.py |
index 35ed7442888106df0de7511e210a60fa4e1895bf..f266c71d88c6a1e3d468dcf65e1005ac9da146cf 100644 |
--- a/third_party/psutil/test/test_psutil.py |
+++ b/third_party/psutil/test/test_psutil.py |
@@ -1,6 +1,6 @@ |
#!/usr/bin/env python |
# |
-# $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $ |
+# $Id: test_psutil.py 1204 2011-10-24 19:19:01Z g.rodola $ |
# |
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. |
# Use of this source code is governed by a BSD-style license that can be |
@@ -439,9 +439,7 @@ class TestCase(unittest.TestCase): |
self.assertTrue(mount in mounts) |
psutil.disk_usage(mount) |
- # XXX |
- @skipUnless(hasattr(psutil, "network_io_counters")) |
- def test_anetwork_io_counters(self): |
+ def test_network_io_counters(self): |
def check_ntuple(nt): |
self.assertEqual(nt[0], nt.bytes_sent) |
self.assertEqual(nt[1], nt.bytes_recv) |
@@ -455,11 +453,11 @@ class TestCase(unittest.TestCase): |
ret = psutil.network_io_counters(pernic=False) |
check_ntuple(ret) |
ret = psutil.network_io_counters(pernic=True) |
- for name, ntuple in ret.iteritems(): |
- self.assertTrue(name) |
- check_ntuple(ntuple) |
- # XXX |
- @skipUnless(hasattr(psutil, "disk_io_counters")) |
+ self.assertTrue(ret != []) |
+ for key in ret: |
+ self.assertTrue(key) |
+ check_ntuple(ret[key]) |
+ |
def test_disk_io_counters(self): |
def check_ntuple(nt): |
self.assertEqual(nt[0], nt.read_count) |
@@ -478,9 +476,9 @@ class TestCase(unittest.TestCase): |
ret = psutil.disk_io_counters(perdisk=False) |
check_ntuple(ret) |
ret = psutil.disk_io_counters(perdisk=True) |
- for name, ntuple in ret.iteritems(): |
- self.assertTrue(name) |
- check_ntuple(ntuple) |
+ for key in ret: |
+ self.assertTrue(key) |
+ check_ntuple(ret[key]) |
# ==================== |
# Process object tests |
@@ -616,7 +614,10 @@ class TestCase(unittest.TestCase): |
percent = p.get_cpu_percent(interval=None) |
self.assertTrue(isinstance(percent, float)) |
self.assertTrue(percent >= 0.0) |
- self.assertTrue(percent <= 100.0) |
+ if os.name != 'posix': |
+ self.assertTrue(percent <= 100.0) |
+ else: |
+ self.assertTrue(percent >= 0.0) |
def test_cpu_times(self): |
times = psutil.Process(os.getpid()).get_cpu_times() |
@@ -890,9 +891,10 @@ class TestCase(unittest.TestCase): |
except psutil.AccessDenied: |
pass |
finally: |
- # going back to previous nice value raises AccessDenied on OSX |
- if not OSX: |
+ try: |
p.nice = first_nice |
+ except psutil.AccessDenied: |
+ pass |
if os.name == 'nt': |
@@ -931,14 +933,14 @@ class TestCase(unittest.TestCase): |
else: |
p.username |
- @skipUnless(WINDOWS or LINUX) |
+ @skipIf(not hasattr(psutil.Process, "getcwd")) |
def test_getcwd(self): |
sproc = get_test_subprocess() |
wait_for_pid(sproc.pid) |
p = psutil.Process(sproc.pid) |
self.assertEqual(p.getcwd(), os.getcwd()) |
- @skipUnless(WINDOWS or LINUX) |
+ @skipIf(not hasattr(psutil.Process, "getcwd")) |
def test_getcwd_2(self): |
cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] |
sproc = get_test_subprocess(cmd) |
@@ -1042,6 +1044,8 @@ class TestCase(unittest.TestCase): |
self.assertEqual(con[3], con.local_address) |
self.assertEqual(con[4], con.remote_address) |
self.assertEqual(con[5], con.status) |
+ # test kind arg |
+ self.assertRaises(ValueError, p.get_connections, 'foo') |
@skipUnless(supports_ipv6()) |
def test_get_connections_ipv6(self): |
@@ -1116,7 +1120,7 @@ class TestCase(unittest.TestCase): |
addr="::1") |
# launch various subprocess instantiating a socket of various |
- # families and tupes to enrich psutil results |
+ # families and types to enrich psutil results |
tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) |
udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) |
if supports_ipv6(): |
@@ -1176,6 +1180,9 @@ class TestCase(unittest.TestCase): |
# --- check matches against subprocesses |
+ all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6", |
+ "udp", "udp4", "udp6") |
+ |
for p in psutil.Process(os.getpid()).get_children(): |
for conn in p.get_connections(): |
# TCP v4 |
@@ -1185,6 +1192,12 @@ class TestCase(unittest.TestCase): |
self.assertEqual(conn.local_address[0], "127.0.0.1") |
self.assertEqual(conn.remote_address, ()) |
self.assertEqual(conn.status, "LISTEN") |
+ for kind in all_kinds: |
+ cons = p.get_connections(kind=kind) |
+ if kind in ("all", "inet", "inet4", "tcp", "tcp4"): |
+ self.assertTrue(cons != []) |
+ else: |
+ self.assertEqual(cons, []) |
# UDP v4 |
elif p.pid == udp4_proc.pid: |
self.assertEqual(conn.family, socket.AF_INET) |
@@ -1192,6 +1205,12 @@ class TestCase(unittest.TestCase): |
self.assertEqual(conn.local_address[0], "127.0.0.1") |
self.assertEqual(conn.remote_address, ()) |
self.assertEqual(conn.status, "") |
+ for kind in all_kinds: |
+ cons = p.get_connections(kind=kind) |
+ if kind in ("all", "inet", "inet4", "udp", "udp4"): |
+ self.assertTrue(cons != []) |
+ else: |
+ self.assertEqual(cons, []) |
# TCP v6 |
elif p.pid == getattr(tcp6_proc, "pid", None): |
self.assertEqual(conn.family, socket.AF_INET6) |
@@ -1199,6 +1218,12 @@ class TestCase(unittest.TestCase): |
self.assertTrue(conn.local_address[0] in ("::", "::1")) |
self.assertEqual(conn.remote_address, ()) |
self.assertEqual(conn.status, "LISTEN") |
+ for kind in all_kinds: |
+ cons = p.get_connections(kind=kind) |
+ if kind in ("all", "inet", "inet6", "tcp", "tcp6"): |
+ self.assertTrue(cons != []) |
+ else: |
+ self.assertEqual(cons, []) |
# UDP v6 |
elif p.pid == getattr(udp6_proc, "pid", None): |
self.assertEqual(conn.family, socket.AF_INET6) |
@@ -1206,6 +1231,12 @@ class TestCase(unittest.TestCase): |
self.assertTrue(conn.local_address[0] in ("::", "::1")) |
self.assertEqual(conn.remote_address, ()) |
self.assertEqual(conn.status, "") |
+ for kind in all_kinds: |
+ cons = p.get_connections(kind=kind) |
+ if kind in ("all", "inet", "inet6", "udp", "udp6"): |
+ self.assertTrue(cons != []) |
+ else: |
+ self.assertEqual(cons, []) |
def test_parent_ppid(self): |
this_parent = os.getpid() |
@@ -1307,8 +1338,6 @@ class TestCase(unittest.TestCase): |
excluded_names += ['get_cpu_percent', 'get_children'] |
# XXX - skip slow lsof implementation; |
if BSD: |
- excluded_names += ['get_open_files', 'get_connections'] |
- if OSX: |
excluded_names += ['get_connections'] |
attrs = [] |
for name in dir(psutil.Process): |
@@ -1340,6 +1369,10 @@ class TestCase(unittest.TestCase): |
else: |
if name == 'parent' or ret in (0, 0.0, [], None): |
continue |
+ # getcwd() on FreeBSD may be an empty string |
+ # in case of a system process |
+ if name == 'getcwd' and BSD and ret == '': |
+ continue |
self.assertTrue(ret) |
if name == "exe": |
self.assertTrue(os.path.isfile(ret)) |
@@ -1397,6 +1430,9 @@ class TestCase(unittest.TestCase): |
def test_Popen(self): |
# Popen class test |
+ # XXX this test causes a ResourceWarning on Python 3 because |
+ # psutil.__subproc instance doesn't get propertly freed. |
+ # Not sure what to do though. |
cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] |
proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
try: |