OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # |
| 3 # $Id: _bsd.py 664 2010-10-09 16:14:34Z g.rodola $ |
| 4 # |
| 5 |
| 6 import unittest |
| 7 import subprocess |
| 8 import time |
| 9 import re |
| 10 import sys |
| 11 |
| 12 import psutil |
| 13 |
| 14 from test_psutil import reap_children, get_test_subprocess |
| 15 from _posix import ps |
| 16 |
| 17 |
| 18 def sysctl(cmdline): |
| 19 """Expects a sysctl command with an argument and parse the result |
| 20 returning only the value of interest. |
| 21 """ |
| 22 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) |
| 23 result = p.communicate()[0].strip().split()[1] |
| 24 if sys.version_info >= (3,): |
| 25 result = str(result, sys.stdout.encoding) |
| 26 try: |
| 27 return int(result) |
| 28 except ValueError: |
| 29 return result |
| 30 |
| 31 def parse_sysctl_vmtotal(output): |
| 32 """Parse sysctl vm.vmtotal output returning total and free memory |
| 33 values. |
| 34 """ |
| 35 line = output.split('\n')[4] # our line of interest |
| 36 mobj = re.match(r'Virtual\s+Memory.*Total:\s+(\d+)K,\s+Active\s+(\d+)K.*', l
ine) |
| 37 total, active = mobj.groups() |
| 38 # values are represented in kilo bytes |
| 39 total = int(total) * 1024 |
| 40 active = int(active) * 1024 |
| 41 free = total - active |
| 42 return total, free |
| 43 |
| 44 |
| 45 class BSDSpecificTestCase(unittest.TestCase): |
| 46 |
| 47 def setUp(self): |
| 48 self.pid = get_test_subprocess().pid |
| 49 |
| 50 def tearDown(self): |
| 51 reap_children() |
| 52 |
| 53 def test_TOTAL_PHYMEM(self): |
| 54 sysctl_hwphymem = sysctl('sysctl hw.physmem') |
| 55 self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) |
| 56 |
| 57 def test_avail_phymem(self): |
| 58 # This test is not particularly accurate and may fail if the OS is |
| 59 # consuming memory for other applications. |
| 60 # We just want to test that the difference between psutil result |
| 61 # and sysctl's is not too high. |
| 62 _sum = sum((sysctl("sysctl vm.stats.vm.v_inactive_count"), |
| 63 sysctl("sysctl vm.stats.vm.v_cache_count"), |
| 64 sysctl("sysctl vm.stats.vm.v_free_count") |
| 65 )) |
| 66 _pagesize = sysctl("sysctl hw.pagesize") |
| 67 sysctl_avail_phymem = _sum * _pagesize |
| 68 psutil_avail_phymem = psutil.avail_phymem() |
| 69 difference = abs(psutil_avail_phymem - sysctl_avail_phymem) |
| 70 # On my system both sysctl and psutil report the same values. |
| 71 # Let's use a tollerance of 0.5 MB and consider the test as failed |
| 72 # if we go over it. |
| 73 if difference > (0.5 * 2**20): |
| 74 self.fail("sysctl=%s; psutil=%s; difference=%s;" %( |
| 75 sysctl_avail_phymem, psutil_avail_phymem, difference)) |
| 76 |
| 77 def test_total_virtmem(self): |
| 78 # This test is not particularly accurate and may fail if the OS is |
| 79 # consuming memory for other applications. |
| 80 # We just want to test that the difference between psutil result |
| 81 # and sysctl's is not too high. |
| 82 p = subprocess.Popen("sysctl vm.vmtotal", shell=1, stdout=subprocess.PIP
E) |
| 83 result = p.communicate()[0].strip() |
| 84 if sys.version_info >= (3,): |
| 85 result = str(result, sys.stdout.encoding) |
| 86 sysctl_total_virtmem, _ = parse_sysctl_vmtotal(result) |
| 87 psutil_total_virtmem = psutil.total_virtmem() |
| 88 difference = abs(sysctl_total_virtmem - psutil_total_virtmem) |
| 89 |
| 90 # On my system I get a difference of 4657152 bytes, probably because |
| 91 # the system is consuming memory for this same test. |
| 92 # Assuming psutil is right, let's use a tollerance of 10 MB and consider |
| 93 # the test as failed if we go over it. |
| 94 if difference > (10 * 2**20): |
| 95 self.fail("sysctl=%s; psutil=%s; difference=%s;" %( |
| 96 sysctl_total_virtmem, psutil_total_virtmem, difference) |
| 97 ) |
| 98 |
| 99 def test_avail_virtmem(self): |
| 100 # This test is not particularly accurate and may fail if the OS is |
| 101 # consuming memory for other applications. |
| 102 # We just want to test that the difference between psutil result |
| 103 # and sysctl's is not too high. |
| 104 p = subprocess.Popen("sysctl vm.vmtotal", shell=1, stdout=subprocess.PIP
E) |
| 105 result = p.communicate()[0].strip() |
| 106 if sys.version_info >= (3,): |
| 107 result = str(result, sys.stdout.encoding) |
| 108 _, sysctl_avail_virtmem = parse_sysctl_vmtotal(result) |
| 109 psutil_avail_virtmem = psutil.avail_virtmem() |
| 110 difference = abs(sysctl_avail_virtmem - psutil_avail_virtmem) |
| 111 # let's assume the test is failed if difference is > 0.5 MB |
| 112 if difference > (0.5 * 2**20): |
| 113 self.fail("sysctl=%s; psutil=%s; difference=%s;" %( |
| 114 sysctl_avail_virtmem, psutil_avail_virtmem, difference)) |
| 115 |
| 116 def test_process_create_time(self): |
| 117 cmdline = "ps -o lstart -p %s" %self.pid |
| 118 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) |
| 119 output = p.communicate()[0] |
| 120 if sys.version_info >= (3,): |
| 121 output = str(output, sys.stdout.encoding) |
| 122 start_ps = output.replace('STARTED', '').strip() |
| 123 start_psutil = psutil.Process(self.pid).create_time |
| 124 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", |
| 125 time.localtime(start_psutil)) |
| 126 self.assertEqual(start_ps, start_psutil) |
| 127 |
| 128 |
| 129 if __name__ == '__main__': |
| 130 test_suite = unittest.TestSuite() |
| 131 test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) |
| 132 unittest.TextTestRunner(verbosity=2).run(test_suite) |
| 133 |
| 134 |
| 135 |
| 136 |
OLD | NEW |