OLD | NEW |
| (Empty) |
1 #!/usr/bin/env python | |
2 # | |
3 # $Id: _bsd.py 1142 2011-10-05 18:45:49Z g.rodola $ | |
4 # | |
5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. | |
6 # Use of this source code is governed by a BSD-style license that can be | |
7 # found in the LICENSE file. | |
8 | |
9 """BSD specific tests. These are implicitly run by test_psutil.py.""" | |
10 | |
11 import unittest | |
12 import subprocess | |
13 import time | |
14 import re | |
15 import sys | |
16 | |
17 import psutil | |
18 | |
19 from test_psutil import reap_children, get_test_subprocess, sh | |
20 | |
21 def sysctl(cmdline): | |
22 """Expects a sysctl command with an argument and parse the result | |
23 returning only the value of interest. | |
24 """ | |
25 result = sh("sysctl " + cmdline) | |
26 result = result[result.find(": ") + 2:] | |
27 try: | |
28 return int(result) | |
29 except ValueError: | |
30 return result | |
31 | |
32 def parse_sysctl_vmtotal(output): | |
33 """Parse sysctl vm.vmtotal output returning total and free memory | |
34 values. | |
35 """ | |
36 line = output.split('\n')[4] # our line of interest | |
37 mobj = re.match(r'Virtual\s+Memory.*Total:\s+(\d+)K,\s+Active\s+(\d+)K.*', l
ine) | |
38 total, active = mobj.groups() | |
39 # values are represented in kilo bytes | |
40 total = int(total) * 1024 | |
41 active = int(active) * 1024 | |
42 free = total - active | |
43 return total, free | |
44 | |
45 | |
46 class BSDSpecificTestCase(unittest.TestCase): | |
47 | |
48 def setUp(self): | |
49 self.pid = get_test_subprocess().pid | |
50 | |
51 def tearDown(self): | |
52 reap_children() | |
53 | |
54 def test_TOTAL_PHYMEM(self): | |
55 sysctl_hwphymem = sysctl('sysctl hw.physmem') | |
56 self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) | |
57 | |
58 def test_BOOT_TIME(self): | |
59 s = sysctl('sysctl kern.boottime') | |
60 s = s[s.find(" sec = ") + 7:] | |
61 s = s[:s.find(',')] | |
62 btime = int(s) | |
63 self.assertEqual(btime, psutil.BOOT_TIME) | |
64 | |
65 def test_avail_phymem(self): | |
66 # This test is not particularly accurate and may fail if the OS is | |
67 # consuming memory for other applications. | |
68 # We just want to test that the difference between psutil result | |
69 # and sysctl's is not too high. | |
70 _sum = sum((sysctl("sysctl vm.stats.vm.v_inactive_count"), | |
71 sysctl("sysctl vm.stats.vm.v_cache_count"), | |
72 sysctl("sysctl vm.stats.vm.v_free_count") | |
73 )) | |
74 _pagesize = sysctl("sysctl hw.pagesize") | |
75 sysctl_avail_phymem = _sum * _pagesize | |
76 psutil_avail_phymem = psutil.phymem_usage().free | |
77 difference = abs(psutil_avail_phymem - sysctl_avail_phymem) | |
78 # On my system both sysctl and psutil report the same values. | |
79 # Let's use a tollerance of 0.5 MB and consider the test as failed | |
80 # if we go over it. | |
81 if difference > (0.5 * 2**20): | |
82 self.fail("sysctl=%s; psutil=%s; difference=%s;" %( | |
83 sysctl_avail_phymem, psutil_avail_phymem, difference)) | |
84 | |
85 def test_total_virtmem(self): | |
86 # This test is not particularly accurate and may fail if the OS is | |
87 # consuming memory for other applications. | |
88 # We just want to test that the difference between psutil result | |
89 # and sysctl's is not too high. | |
90 p = subprocess.Popen("sysctl vm.vmtotal", shell=1, stdout=subprocess.PIP
E) | |
91 result = p.communicate()[0].strip() | |
92 if sys.version_info >= (3,): | |
93 result = str(result, sys.stdout.encoding) | |
94 sysctl_total_virtmem, _ = parse_sysctl_vmtotal(result) | |
95 psutil_total_virtmem = psutil.virtmem_usage().total | |
96 difference = abs(sysctl_total_virtmem - psutil_total_virtmem) | |
97 | |
98 # On my system I get a difference of 4657152 bytes, probably because | |
99 # the system is consuming memory for this same test. | |
100 # Assuming psutil is right, let's use a tollerance of 10 MB and consider | |
101 # the test as failed if we go over it. | |
102 if difference > (10 * 2**20): | |
103 self.fail("sysctl=%s; psutil=%s; difference=%s;" %( | |
104 sysctl_total_virtmem, psutil_total_virtmem, difference)) | |
105 | |
106 def test_avail_virtmem(self): | |
107 # This test is not particularly accurate and may fail if the OS is | |
108 # consuming memory for other applications. | |
109 # We just want to test that the difference between psutil result | |
110 # and sysctl's is not too high. | |
111 p = subprocess.Popen("sysctl vm.vmtotal", shell=1, stdout=subprocess.PIP
E) | |
112 result = p.communicate()[0].strip() | |
113 if sys.version_info >= (3,): | |
114 result = str(result, sys.stdout.encoding) | |
115 _, sysctl_avail_virtmem = parse_sysctl_vmtotal(result) | |
116 psutil_avail_virtmem = psutil.virtmem_usage().free | |
117 difference = abs(sysctl_avail_virtmem - psutil_avail_virtmem) | |
118 # let's assume the test is failed if difference is > 0.5 MB | |
119 if difference > (0.5 * 2**20): | |
120 self.fail(difference) | |
121 | |
122 def test_process_create_time(self): | |
123 cmdline = "ps -o lstart -p %s" %self.pid | |
124 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) | |
125 output = p.communicate()[0] | |
126 if sys.version_info >= (3,): | |
127 output = str(output, sys.stdout.encoding) | |
128 start_ps = output.replace('STARTED', '').strip() | |
129 start_psutil = psutil.Process(self.pid).create_time | |
130 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", | |
131 time.localtime(start_psutil)) | |
132 self.assertEqual(start_ps, start_psutil) | |
133 | |
134 def test_disks(self): | |
135 # test psutil.disk_usage() and psutil.disk_partitions() | |
136 # against "df -a" | |
137 def df(path): | |
138 out = sh('df -k "%s"' % path).strip() | |
139 lines = out.split('\n') | |
140 lines.pop(0) | |
141 line = lines.pop(0) | |
142 dev, total, used, free = line.split()[:4] | |
143 if dev == 'none': | |
144 dev = '' | |
145 total = int(total) * 1024 | |
146 used = int(used) * 1024 | |
147 free = int(free) * 1024 | |
148 return dev, total, used, free | |
149 | |
150 for part in psutil.disk_partitions(all=False): | |
151 usage = psutil.disk_usage(part.mountpoint) | |
152 dev, total, used, free = df(part.mountpoint) | |
153 self.assertEqual(part.device, dev) | |
154 self.assertEqual(usage.total, total) | |
155 # 10 MB tollerance | |
156 if abs(usage.free - free) > 10 * 1024 * 1024: | |
157 self.fail("psutil=%s, df=%s" % usage.free, free) | |
158 if abs(usage.used - used) > 10 * 1024 * 1024: | |
159 self.fail("psutil=%s, df=%s" % usage.used, used) | |
160 | |
161 | |
162 if __name__ == '__main__': | |
163 test_suite = unittest.TestSuite() | |
164 test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) | |
165 unittest.TextTestRunner(verbosity=2).run(test_suite) | |
OLD | NEW |