| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # | 2 # |
| 3 # $Id: _osx.py 664 2010-10-09 16:14:34Z g.rodola $ | 3 # $Id: _osx.py 1142 2011-10-05 18:45:49Z g.rodola $ |
| 4 # | 4 # |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. |
| 6 # Use of this source code is governed by a BSD-style license that can be |
| 7 # found in the LICENSE file. |
| 8 |
| 9 """OSX specific tests. These are implicitly run by test_psutil.py.""" |
| 5 | 10 |
| 6 import unittest | 11 import unittest |
| 7 import subprocess | 12 import subprocess |
| 8 import time | 13 import time |
| 9 import re | |
| 10 import sys | 14 import sys |
| 11 | 15 |
| 12 import psutil | 16 import psutil |
| 13 | 17 |
| 14 from test_psutil import reap_children, get_test_subprocess | 18 from test_psutil import reap_children, get_test_subprocess, sh |
| 15 #from _posix import ps | 19 #from _posix import ps |
| 16 | 20 |
| 17 | 21 |
| 18 def sysctl(cmdline): | 22 def sysctl(cmdline): |
| 19 """Expects a sysctl command with an argument and parse the result | 23 """Expects a sysctl command with an argument and parse the result |
| 20 returning only the value of interest. | 24 returning only the value of interest. |
| 21 """ | 25 """ |
| 22 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) | 26 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) |
| 23 result = p.communicate()[0].strip().split()[1] | 27 result = p.communicate()[0].strip().split()[1] |
| 24 if sys.version_info >= (3,): | 28 if sys.version_info >= (3,): |
| 25 result = str(result, sys.stdout.encoding) | 29 result = str(result, sys.stdout.encoding) |
| 26 try: | 30 try: |
| 27 return int(result) | 31 return int(result) |
| 28 except ValueError: | 32 except ValueError: |
| 29 return result | 33 return result |
| 30 | 34 |
| 31 | 35 |
| 32 class OSXSpecificTestCase(unittest.TestCase): | 36 class OSXSpecificTestCase(unittest.TestCase): |
| 33 | 37 |
| 34 def setUp(self): | 38 def setUp(self): |
| 35 self.pid = get_test_subprocess().pid | 39 self.pid = get_test_subprocess().pid |
| 36 | 40 |
| 37 def tearDown(self): | 41 def tearDown(self): |
| 38 reap_children() | 42 reap_children() |
| 39 | 43 |
| 40 def test_TOTAL_PHYMEM(self): | 44 def test_TOTAL_PHYMEM(self): |
| 41 sysctl_hwphymem = sysctl('sysctl hw.physmem') | 45 sysctl_hwphymem = sysctl('sysctl hw.memsize') |
| 42 self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) | 46 self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) |
| 43 | 47 |
| 44 def test_process_create_time(self): | 48 def test_process_create_time(self): |
| 45 cmdline = "ps -o lstart -p %s" %self.pid | 49 cmdline = "ps -o lstart -p %s" %self.pid |
| 46 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) | 50 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) |
| 47 output = p.communicate()[0] | 51 output = p.communicate()[0] |
| 48 if sys.version_info >= (3,): | 52 if sys.version_info >= (3,): |
| 49 output = str(output, sys.stdout.encoding) | 53 output = str(output, sys.stdout.encoding) |
| 50 start_ps = output.replace('STARTED', '').strip() | 54 start_ps = output.replace('STARTED', '').strip() |
| 51 start_psutil = psutil.Process(self.pid).create_time | 55 start_psutil = psutil.Process(self.pid).create_time |
| 52 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", | 56 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", |
| 53 time.localtime(start_psutil)) | 57 time.localtime(start_psutil)) |
| 54 self.assertEqual(start_ps, start_psutil) | 58 self.assertEqual(start_ps, start_psutil) |
| 55 | 59 |
| 60 def test_disks(self): |
| 61 # test psutil.disk_usage() and psutil.disk_partitions() |
| 62 # against "df -a" |
| 63 def df(path): |
| 64 out = sh('df -k "%s"' % path).strip() |
| 65 lines = out.split('\n') |
| 66 lines.pop(0) |
| 67 line = lines.pop(0) |
| 68 dev, total, used, free = line.split()[:4] |
| 69 if dev == 'none': |
| 70 dev = '' |
| 71 total = int(total) * 1024 |
| 72 used = int(used) * 1024 |
| 73 free = int(free) * 1024 |
| 74 return dev, total, used, free |
| 75 |
| 76 for part in psutil.disk_partitions(all=False): |
| 77 usage = psutil.disk_usage(part.mountpoint) |
| 78 dev, total, used, free = df(part.mountpoint) |
| 79 self.assertEqual(part.device, dev) |
| 80 self.assertEqual(usage.total, total) |
| 81 # 10 MB tollerance |
| 82 if abs(usage.free - free) > 10 * 1024 * 1024: |
| 83 self.fail("psutil=%s, df=%s" % usage.free, free) |
| 84 if abs(usage.used - used) > 10 * 1024 * 1024: |
| 85 self.fail("psutil=%s, df=%s" % usage.used, used) |
| 86 |
| 56 | 87 |
| 57 if __name__ == '__main__': | 88 if __name__ == '__main__': |
| 58 test_suite = unittest.TestSuite() | 89 test_suite = unittest.TestSuite() |
| 59 test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) | 90 test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) |
| 60 unittest.TextTestRunner(verbosity=2).run(test_suite) | 91 unittest.TextTestRunner(verbosity=2).run(test_suite) |
| 61 | |
| 62 | |
| 63 | |
| 64 | |
| OLD | NEW |