| OLD | NEW |
| (Empty) |
| 1 #!/usr/bin/env python | |
| 2 # | |
| 3 # $Id: _bsd.py 1142 2011-10-05 18:45:49Z g.rodola $ | |
| 4 # | |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. | |
| 6 # Use of this source code is governed by a BSD-style license that can be | |
| 7 # found in the LICENSE file. | |
| 8 | |
| 9 """BSD specific tests. These are implicitly run by test_psutil.py.""" | |
| 10 | |
| 11 import unittest | |
| 12 import subprocess | |
| 13 import time | |
| 14 import re | |
| 15 import sys | |
| 16 | |
| 17 import psutil | |
| 18 | |
| 19 from test_psutil import reap_children, get_test_subprocess, sh | |
| 20 | |
| 21 def sysctl(cmdline): | |
| 22 """Expects a sysctl command with an argument and parse the result | |
| 23 returning only the value of interest. | |
| 24 """ | |
| 25 result = sh("sysctl " + cmdline) | |
| 26 result = result[result.find(": ") + 2:] | |
| 27 try: | |
| 28 return int(result) | |
| 29 except ValueError: | |
| 30 return result | |
| 31 | |
| 32 def parse_sysctl_vmtotal(output): | |
| 33 """Parse sysctl vm.vmtotal output returning total and free memory | |
| 34 values. | |
| 35 """ | |
| 36 line = output.split('\n')[4] # our line of interest | |
| 37 mobj = re.match(r'Virtual\s+Memory.*Total:\s+(\d+)K,\s+Active\s+(\d+)K.*', l
ine) | |
| 38 total, active = mobj.groups() | |
| 39 # values are represented in kilo bytes | |
| 40 total = int(total) * 1024 | |
| 41 active = int(active) * 1024 | |
| 42 free = total - active | |
| 43 return total, free | |
| 44 | |
| 45 | |
| 46 class BSDSpecificTestCase(unittest.TestCase): | |
| 47 | |
| 48 def setUp(self): | |
| 49 self.pid = get_test_subprocess().pid | |
| 50 | |
| 51 def tearDown(self): | |
| 52 reap_children() | |
| 53 | |
| 54 def test_TOTAL_PHYMEM(self): | |
| 55 sysctl_hwphymem = sysctl('sysctl hw.physmem') | |
| 56 self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) | |
| 57 | |
| 58 def test_BOOT_TIME(self): | |
| 59 s = sysctl('sysctl kern.boottime') | |
| 60 s = s[s.find(" sec = ") + 7:] | |
| 61 s = s[:s.find(',')] | |
| 62 btime = int(s) | |
| 63 self.assertEqual(btime, psutil.BOOT_TIME) | |
| 64 | |
| 65 def test_avail_phymem(self): | |
| 66 # This test is not particularly accurate and may fail if the OS is | |
| 67 # consuming memory for other applications. | |
| 68 # We just want to test that the difference between psutil result | |
| 69 # and sysctl's is not too high. | |
| 70 _sum = sum((sysctl("sysctl vm.stats.vm.v_inactive_count"), | |
| 71 sysctl("sysctl vm.stats.vm.v_cache_count"), | |
| 72 sysctl("sysctl vm.stats.vm.v_free_count") | |
| 73 )) | |
| 74 _pagesize = sysctl("sysctl hw.pagesize") | |
| 75 sysctl_avail_phymem = _sum * _pagesize | |
| 76 psutil_avail_phymem = psutil.phymem_usage().free | |
| 77 difference = abs(psutil_avail_phymem - sysctl_avail_phymem) | |
| 78 # On my system both sysctl and psutil report the same values. | |
| 79 # Let's use a tollerance of 0.5 MB and consider the test as failed | |
| 80 # if we go over it. | |
| 81 if difference > (0.5 * 2**20): | |
| 82 self.fail("sysctl=%s; psutil=%s; difference=%s;" %( | |
| 83 sysctl_avail_phymem, psutil_avail_phymem, difference)) | |
| 84 | |
| 85 def test_total_virtmem(self): | |
| 86 # This test is not particularly accurate and may fail if the OS is | |
| 87 # consuming memory for other applications. | |
| 88 # We just want to test that the difference between psutil result | |
| 89 # and sysctl's is not too high. | |
| 90 p = subprocess.Popen("sysctl vm.vmtotal", shell=1, stdout=subprocess.PIP
E) | |
| 91 result = p.communicate()[0].strip() | |
| 92 if sys.version_info >= (3,): | |
| 93 result = str(result, sys.stdout.encoding) | |
| 94 sysctl_total_virtmem, _ = parse_sysctl_vmtotal(result) | |
| 95 psutil_total_virtmem = psutil.virtmem_usage().total | |
| 96 difference = abs(sysctl_total_virtmem - psutil_total_virtmem) | |
| 97 | |
| 98 # On my system I get a difference of 4657152 bytes, probably because | |
| 99 # the system is consuming memory for this same test. | |
| 100 # Assuming psutil is right, let's use a tollerance of 10 MB and consider | |
| 101 # the test as failed if we go over it. | |
| 102 if difference > (10 * 2**20): | |
| 103 self.fail("sysctl=%s; psutil=%s; difference=%s;" %( | |
| 104 sysctl_total_virtmem, psutil_total_virtmem, difference)) | |
| 105 | |
| 106 def test_avail_virtmem(self): | |
| 107 # This test is not particularly accurate and may fail if the OS is | |
| 108 # consuming memory for other applications. | |
| 109 # We just want to test that the difference between psutil result | |
| 110 # and sysctl's is not too high. | |
| 111 p = subprocess.Popen("sysctl vm.vmtotal", shell=1, stdout=subprocess.PIP
E) | |
| 112 result = p.communicate()[0].strip() | |
| 113 if sys.version_info >= (3,): | |
| 114 result = str(result, sys.stdout.encoding) | |
| 115 _, sysctl_avail_virtmem = parse_sysctl_vmtotal(result) | |
| 116 psutil_avail_virtmem = psutil.virtmem_usage().free | |
| 117 difference = abs(sysctl_avail_virtmem - psutil_avail_virtmem) | |
| 118 # let's assume the test is failed if difference is > 0.5 MB | |
| 119 if difference > (0.5 * 2**20): | |
| 120 self.fail(difference) | |
| 121 | |
| 122 def test_process_create_time(self): | |
| 123 cmdline = "ps -o lstart -p %s" %self.pid | |
| 124 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) | |
| 125 output = p.communicate()[0] | |
| 126 if sys.version_info >= (3,): | |
| 127 output = str(output, sys.stdout.encoding) | |
| 128 start_ps = output.replace('STARTED', '').strip() | |
| 129 start_psutil = psutil.Process(self.pid).create_time | |
| 130 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", | |
| 131 time.localtime(start_psutil)) | |
| 132 self.assertEqual(start_ps, start_psutil) | |
| 133 | |
| 134 def test_disks(self): | |
| 135 # test psutil.disk_usage() and psutil.disk_partitions() | |
| 136 # against "df -a" | |
| 137 def df(path): | |
| 138 out = sh('df -k "%s"' % path).strip() | |
| 139 lines = out.split('\n') | |
| 140 lines.pop(0) | |
| 141 line = lines.pop(0) | |
| 142 dev, total, used, free = line.split()[:4] | |
| 143 if dev == 'none': | |
| 144 dev = '' | |
| 145 total = int(total) * 1024 | |
| 146 used = int(used) * 1024 | |
| 147 free = int(free) * 1024 | |
| 148 return dev, total, used, free | |
| 149 | |
| 150 for part in psutil.disk_partitions(all=False): | |
| 151 usage = psutil.disk_usage(part.mountpoint) | |
| 152 dev, total, used, free = df(part.mountpoint) | |
| 153 self.assertEqual(part.device, dev) | |
| 154 self.assertEqual(usage.total, total) | |
| 155 # 10 MB tollerance | |
| 156 if abs(usage.free - free) > 10 * 1024 * 1024: | |
| 157 self.fail("psutil=%s, df=%s" % usage.free, free) | |
| 158 if abs(usage.used - used) > 10 * 1024 * 1024: | |
| 159 self.fail("psutil=%s, df=%s" % usage.used, used) | |
| 160 | |
| 161 | |
| 162 if __name__ == '__main__': | |
| 163 test_suite = unittest.TestSuite() | |
| 164 test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) | |
| 165 unittest.TextTestRunner(verbosity=2).run(test_suite) | |
| OLD | NEW |