Index: third_party/psutil/test/test_memory_leaks.py |
diff --git a/third_party/psutil/test/test_memory_leaks.py b/third_party/psutil/test/test_memory_leaks.py |
index d5591a3c6c6317f111601f6da3d58dfb741a70d1..b6a3a4e7cfd074a5846ba755220e0ad0654e5974 100644 |
--- a/third_party/psutil/test/test_memory_leaks.py |
+++ b/third_party/psutil/test/test_memory_leaks.py |
@@ -1,67 +1,95 @@ |
#!/usr/bin/env python |
# |
-# $Id: test_memory_leaks.py 777 2010-11-08 18:29:13Z g.rodola $ |
+# $Id: test_memory_leaks.py 1142 2011-10-05 18:45:49Z g.rodola $ |
# |
+# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
""" |
-Note: this is targeted for python 2.x. |
-To run it under python 3.x you need to use 2to3 tool first: |
- |
-$ 2to3 -w test/test_memory_leaks.py |
+A test script which attempts to detect memory leaks by calling C |
+functions many times and compare process memory usage before and |
+after the calls. It might produce false positives. |
""" |
- |
import os |
import gc |
-import sys |
import unittest |
+import time |
import psutil |
from test_psutil import reap_children, skipUnless, skipIf, \ |
- POSIX, LINUX, WINDOWS, OSX, BSD |
+ POSIX, LINUX, WINDOWS, OSX, BSD, PY3 |
LOOPS = 1000 |
TOLERANCE = 4096 |
+if PY3: |
+ xrange = range |
-class TestProcessObjectLeaks(unittest.TestCase): |
- """Test leaks of Process class methods and properties""" |
- def setUp(self): |
- gc.collect() |
+class Base(unittest.TestCase): |
- def tearDown(self): |
- reap_children() |
- |
- def execute(self, method, *args, **kwarks): |
+ def execute(self, function, *args, **kwargs): |
# step 1 |
- p = psutil.Process(os.getpid()) |
for x in xrange(LOOPS): |
- obj = getattr(p, method) |
- if callable(obj): |
- retvalue = obj(*args, **kwarks) |
- else: |
- retvalue = obj # property |
- del x, p, obj, retvalue |
+ self.call(function, *args, **kwargs) |
+ del x |
gc.collect() |
- rss1 = psutil.Process(os.getpid()).get_memory_info()[0] |
+ rss1 = self.get_mem() |
# step 2 |
- p = psutil.Process(os.getpid()) |
for x in xrange(LOOPS): |
- obj = getattr(p, method) |
- if callable(obj): |
- retvalue = obj(*args, **kwarks) |
- else: |
- retvalue = obj # property |
- del x, p, obj, retvalue |
+ self.call(function, *args, **kwargs) |
+ del x |
gc.collect() |
- rss2 = psutil.Process(os.getpid()).get_memory_info()[0] |
+ rss2 = self.get_mem() |
# comparison |
difference = rss2 - rss1 |
if difference > TOLERANCE: |
- self.fail("rss1=%s, rss2=%s, difference=%s" %(rss1, rss2, difference)) |
+ # This doesn't necessarily mean we have a leak yet. |
+ # At this point we assume that after having called the |
+ # function so many times the memory usage is stabilized |
+ # and if there are no leaks it should not increase any |
+ # more. |
+ # Let's keep calling fun for 3 more seconds and fail if |
+ # we notice any difference. |
+ stop_at = time.time() + 3 |
+ while 1: |
+ self.call(function, *args, **kwargs) |
+ if time.time() >= stop_at: |
+ break |
+ del stop_at |
+ gc.collect() |
+ rss3 = self.get_mem() |
+ difference = rss3 - rss2 |
+ if rss3 > rss2: |
+ self.fail("rss2=%s, rss3=%s, difference=%s" \ |
+ % (rss2, rss3, difference)) |
+ |
+ @staticmethod |
+ def get_mem(): |
+ return psutil.Process(os.getpid()).get_memory_info()[0] |
+ |
+ def call(self): |
+ raise NotImplementedError("must be implemented in subclass") |
+ |
+ |
+class TestProcessObjectLeaks(Base): |
+ """Test leaks of Process class methods and properties""" |
+ |
+ def setUp(self): |
+ gc.collect() |
+ |
+ def tearDown(self): |
+ reap_children() |
+ |
+ def call(self, function, *args, **kwargs): |
+ p = psutil.Process(os.getpid()) |
+ obj = getattr(p, function) |
+ if callable(obj): |
+ obj(*args, **kwargs) |
def test_name(self): |
self.execute('name') |
@@ -72,11 +100,16 @@ class TestProcessObjectLeaks(unittest.TestCase): |
def test_ppid(self): |
self.execute('ppid') |
- def test_uid(self): |
- self.execute('uid') |
+ @skipIf(WINDOWS) |
+ def test_uids(self): |
+ self.execute('uids') |
- def test_uid(self): |
- self.execute('gid') |
+ @skipIf(WINDOWS) |
+ def test_gids(self): |
+ self.execute('gids') |
+ |
+ def test_status(self): |
+ self.execute('status') |
@skipIf(POSIX) |
def test_username(self): |
@@ -88,6 +121,9 @@ class TestProcessObjectLeaks(unittest.TestCase): |
def test_get_num_threads(self): |
self.execute('get_num_threads') |
+ def test_get_threads(self): |
+ self.execute('get_threads') |
+ |
def test_get_cpu_times(self): |
self.execute('get_cpu_times') |
@@ -97,6 +133,10 @@ class TestProcessObjectLeaks(unittest.TestCase): |
def test_is_running(self): |
self.execute('is_running') |
+ @skipIf(WINDOWS) |
+ def test_terminal(self): |
+ self.execute('terminal') |
+ |
@skipUnless(WINDOWS) |
def test_resume(self): |
self.execute('resume') |
@@ -105,48 +145,25 @@ class TestProcessObjectLeaks(unittest.TestCase): |
def test_getcwd(self): |
self.execute('getcwd') |
- @skipUnless(WINDOWS) |
+ @skipUnless(WINDOWS or OSX) |
def test_get_open_files(self): |
self.execute('get_open_files') |
- @skipUnless(WINDOWS) |
+ @skipUnless(WINDOWS or OSX) |
def test_get_connections(self): |
self.execute('get_connections') |
-class TestModuleFunctionsLeaks(unittest.TestCase): |
+class TestModuleFunctionsLeaks(Base): |
"""Test leaks of psutil module functions.""" |
def setUp(self): |
gc.collect() |
- def execute(self, function, *args, **kwarks): |
- # step 1 |
- for x in xrange(LOOPS): |
- obj = getattr(psutil, function) |
- if callable(obj): |
- retvalue = obj(*args, **kwarks) |
- else: |
- retvalue = obj # property |
- del x, obj, retvalue |
- gc.collect() |
- rss1 = psutil.Process(os.getpid()).get_memory_info()[0] |
- |
- # step 2 |
- for x in xrange(LOOPS): |
- obj = getattr(psutil, function) |
- if callable(obj): |
- retvalue = obj(*args, **kwarks) |
- else: |
- retvalue = obj # property |
- del x, obj, retvalue |
- gc.collect() |
- rss2 = psutil.Process(os.getpid()).get_memory_info()[0] |
- |
- # comparison |
- difference = rss2 - rss1 |
- if difference > TOLERANCE: |
- self.fail("rss1=%s, rss2=%s, difference=%s" %(rss1, rss2, difference)) |
+ def call(self, function, *args, **kwargs): |
+ obj = getattr(psutil, function) |
+ if callable(obj): |
+ retvalue = obj(*args, **kwargs) |
def test_get_pid_list(self): |
self.execute('get_pid_list') |
@@ -158,24 +175,30 @@ class TestModuleFunctionsLeaks(unittest.TestCase): |
def test_process_iter(self): |
self.execute('process_iter') |
- def test_used_phymem(self): |
- self.execute('used_phymem') |
+ def test_phymem_usage(self): |
+ self.execute('phymem_usage') |
+ |
+ def test_virtmem_usage(self): |
+ self.execute('virtmem_usage') |
- def test_avail_phymem(self): |
- self.execute('avail_phymem') |
+ def test_cpu_times(self): |
+ self.execute('cpu_times') |
- def test_total_virtmem(self): |
- self.execute('total_virtmem') |
+ def test_per_cpu_times(self): |
+ self.execute('cpu_times', percpu=True) |
- def test_used_virtmem(self): |
- self.execute('used_virtmem') |
+ @skipUnless(WINDOWS) |
+ def test_disk_usage(self): |
+ self.execute('disk_usage', '.') |
- def test_avail_virtmem(self): |
- self.execute('avail_virtmem') |
+ def test_disk_partitions(self): |
+ self.execute('disk_partitions') |
- def test_cpu_times(self): |
- self.execute('cpu_times') |
+ def test_network_io_counters(self): |
+ self.execute('network_io_counters') |
+ def test_disk_io_counters(self): |
+ self.execute('disk_io_counters') |
def test_main(): |
test_suite = unittest.TestSuite() |