Index: third_party/psutil/test/test_memory_leaks.py |
diff --git a/third_party/psutil/test/test_memory_leaks.py b/third_party/psutil/test/test_memory_leaks.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..d5591a3c6c6317f111601f6da3d58dfb741a70d1 |
--- /dev/null |
+++ b/third_party/psutil/test/test_memory_leaks.py |
@@ -0,0 +1,188 @@ |
+#!/usr/bin/env python |
+# |
+# $Id: test_memory_leaks.py 777 2010-11-08 18:29:13Z g.rodola $ |
+# |
+ |
+""" |
+Note: this is targeted for python 2.x. |
+To run it under python 3.x you need to use 2to3 tool first: |
+ |
+$ 2to3 -w test/test_memory_leaks.py |
+""" |
+ |
+ |
+import os |
+import gc |
+import sys |
+import unittest |
+ |
+import psutil |
+from test_psutil import reap_children, skipUnless, skipIf, \ |
+ POSIX, LINUX, WINDOWS, OSX, BSD |
+ |
+LOOPS = 1000 |
+TOLERANCE = 4096 |
+ |
+ |
+class TestProcessObjectLeaks(unittest.TestCase): |
+ """Test leaks of Process class methods and properties""" |
+ |
+ def setUp(self): |
+ gc.collect() |
+ |
+ def tearDown(self): |
+ reap_children() |
+ |
+ def execute(self, method, *args, **kwarks): |
+ # step 1 |
+ p = psutil.Process(os.getpid()) |
+ for x in xrange(LOOPS): |
+ obj = getattr(p, method) |
+ if callable(obj): |
+ retvalue = obj(*args, **kwarks) |
+ else: |
+ retvalue = obj # property |
+ del x, p, obj, retvalue |
+ gc.collect() |
+ rss1 = psutil.Process(os.getpid()).get_memory_info()[0] |
+ |
+ # step 2 |
+ p = psutil.Process(os.getpid()) |
+ for x in xrange(LOOPS): |
+ obj = getattr(p, method) |
+ if callable(obj): |
+ retvalue = obj(*args, **kwarks) |
+ else: |
+ retvalue = obj # property |
+ del x, p, obj, retvalue |
+ gc.collect() |
+ rss2 = psutil.Process(os.getpid()).get_memory_info()[0] |
+ |
+ # comparison |
+ difference = rss2 - rss1 |
+ if difference > TOLERANCE: |
+ self.fail("rss1=%s, rss2=%s, difference=%s" %(rss1, rss2, difference)) |
+ |
+ def test_name(self): |
+ self.execute('name') |
+ |
+ def test_cmdline(self): |
+ self.execute('cmdline') |
+ |
+ def test_ppid(self): |
+ self.execute('ppid') |
+ |
+ def test_uid(self): |
+ self.execute('uid') |
+ |
+ def test_uid(self): |
+ self.execute('gid') |
+ |
+ @skipIf(POSIX) |
+ def test_username(self): |
+ self.execute('username') |
+ |
+ def test_create_time(self): |
+ self.execute('create_time') |
+ |
+ def test_get_num_threads(self): |
+ self.execute('get_num_threads') |
+ |
+ def test_get_cpu_times(self): |
+ self.execute('get_cpu_times') |
+ |
+ def test_get_memory_info(self): |
+ self.execute('get_memory_info') |
+ |
+ def test_is_running(self): |
+ self.execute('is_running') |
+ |
+ @skipUnless(WINDOWS) |
+ def test_resume(self): |
+ self.execute('resume') |
+ |
+ @skipUnless(WINDOWS) |
+ def test_getcwd(self): |
+ self.execute('getcwd') |
+ |
+ @skipUnless(WINDOWS) |
+ def test_get_open_files(self): |
+ self.execute('get_open_files') |
+ |
+ @skipUnless(WINDOWS) |
+ def test_get_connections(self): |
+ self.execute('get_connections') |
+ |
+ |
+class TestModuleFunctionsLeaks(unittest.TestCase): |
+ """Test leaks of psutil module functions.""" |
+ |
+ def setUp(self): |
+ gc.collect() |
+ |
+ def execute(self, function, *args, **kwarks): |
+ # step 1 |
+ for x in xrange(LOOPS): |
+ obj = getattr(psutil, function) |
+ if callable(obj): |
+ retvalue = obj(*args, **kwarks) |
+ else: |
+ retvalue = obj # property |
+ del x, obj, retvalue |
+ gc.collect() |
+ rss1 = psutil.Process(os.getpid()).get_memory_info()[0] |
+ |
+ # step 2 |
+ for x in xrange(LOOPS): |
+ obj = getattr(psutil, function) |
+ if callable(obj): |
+ retvalue = obj(*args, **kwarks) |
+ else: |
+ retvalue = obj # property |
+ del x, obj, retvalue |
+ gc.collect() |
+ rss2 = psutil.Process(os.getpid()).get_memory_info()[0] |
+ |
+ # comparison |
+ difference = rss2 - rss1 |
+ if difference > TOLERANCE: |
+ self.fail("rss1=%s, rss2=%s, difference=%s" %(rss1, rss2, difference)) |
+ |
+ def test_get_pid_list(self): |
+ self.execute('get_pid_list') |
+ |
+ @skipIf(POSIX) |
+ def test_pid_exists(self): |
+ self.execute('pid_exists', os.getpid()) |
+ |
+ def test_process_iter(self): |
+ self.execute('process_iter') |
+ |
+ def test_used_phymem(self): |
+ self.execute('used_phymem') |
+ |
+ def test_avail_phymem(self): |
+ self.execute('avail_phymem') |
+ |
+ def test_total_virtmem(self): |
+ self.execute('total_virtmem') |
+ |
+ def test_used_virtmem(self): |
+ self.execute('used_virtmem') |
+ |
+ def test_avail_virtmem(self): |
+ self.execute('avail_virtmem') |
+ |
+ def test_cpu_times(self): |
+ self.execute('cpu_times') |
+ |
+ |
+def test_main(): |
+ test_suite = unittest.TestSuite() |
+ test_suite.addTest(unittest.makeSuite(TestProcessObjectLeaks)) |
+ test_suite.addTest(unittest.makeSuite(TestModuleFunctionsLeaks)) |
+ unittest.TextTestRunner(verbosity=2).run(test_suite) |
+ |
+if __name__ == '__main__': |
+ test_main() |
+ |