OLD | NEW |
1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
2 # | 2 # |
3 # $Id: _osx.py 664 2010-10-09 16:14:34Z g.rodola $ | 3 # $Id: _osx.py 1142 2011-10-05 18:45:49Z g.rodola $ |
4 # | 4 # |
| 5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. |
| 6 # Use of this source code is governed by a BSD-style license that can be |
| 7 # found in the LICENSE file. |
| 8 |
| 9 """OSX specific tests. These are implicitly run by test_psutil.py.""" |
5 | 10 |
6 import unittest | 11 import unittest |
7 import subprocess | 12 import subprocess |
8 import time | 13 import time |
9 import re | |
10 import sys | 14 import sys |
11 | 15 |
12 import psutil | 16 import psutil |
13 | 17 |
14 from test_psutil import reap_children, get_test_subprocess | 18 from test_psutil import reap_children, get_test_subprocess, sh |
15 #from _posix import ps | 19 #from _posix import ps |
16 | 20 |
17 | 21 |
18 def sysctl(cmdline): | 22 def sysctl(cmdline): |
19 """Expects a sysctl command with an argument and parse the result | 23 """Expects a sysctl command with an argument and parse the result |
20 returning only the value of interest. | 24 returning only the value of interest. |
21 """ | 25 """ |
22 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) | 26 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) |
23 result = p.communicate()[0].strip().split()[1] | 27 result = p.communicate()[0].strip().split()[1] |
24 if sys.version_info >= (3,): | 28 if sys.version_info >= (3,): |
25 result = str(result, sys.stdout.encoding) | 29 result = str(result, sys.stdout.encoding) |
26 try: | 30 try: |
27 return int(result) | 31 return int(result) |
28 except ValueError: | 32 except ValueError: |
29 return result | 33 return result |
30 | 34 |
31 | 35 |
32 class OSXSpecificTestCase(unittest.TestCase): | 36 class OSXSpecificTestCase(unittest.TestCase): |
33 | 37 |
34 def setUp(self): | 38 def setUp(self): |
35 self.pid = get_test_subprocess().pid | 39 self.pid = get_test_subprocess().pid |
36 | 40 |
37 def tearDown(self): | 41 def tearDown(self): |
38 reap_children() | 42 reap_children() |
39 | 43 |
40 def test_TOTAL_PHYMEM(self): | 44 def test_TOTAL_PHYMEM(self): |
41 sysctl_hwphymem = sysctl('sysctl hw.physmem') | 45 sysctl_hwphymem = sysctl('sysctl hw.memsize') |
42 self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) | 46 self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) |
43 | 47 |
44 def test_process_create_time(self): | 48 def test_process_create_time(self): |
45 cmdline = "ps -o lstart -p %s" %self.pid | 49 cmdline = "ps -o lstart -p %s" %self.pid |
46 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) | 50 p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) |
47 output = p.communicate()[0] | 51 output = p.communicate()[0] |
48 if sys.version_info >= (3,): | 52 if sys.version_info >= (3,): |
49 output = str(output, sys.stdout.encoding) | 53 output = str(output, sys.stdout.encoding) |
50 start_ps = output.replace('STARTED', '').strip() | 54 start_ps = output.replace('STARTED', '').strip() |
51 start_psutil = psutil.Process(self.pid).create_time | 55 start_psutil = psutil.Process(self.pid).create_time |
52 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", | 56 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", |
53 time.localtime(start_psutil)) | 57 time.localtime(start_psutil)) |
54 self.assertEqual(start_ps, start_psutil) | 58 self.assertEqual(start_ps, start_psutil) |
55 | 59 |
| 60 def test_disks(self): |
| 61 # test psutil.disk_usage() and psutil.disk_partitions() |
| 62 # against "df -a" |
| 63 def df(path): |
| 64 out = sh('df -k "%s"' % path).strip() |
| 65 lines = out.split('\n') |
| 66 lines.pop(0) |
| 67 line = lines.pop(0) |
| 68 dev, total, used, free = line.split()[:4] |
| 69 if dev == 'none': |
| 70 dev = '' |
| 71 total = int(total) * 1024 |
| 72 used = int(used) * 1024 |
| 73 free = int(free) * 1024 |
| 74 return dev, total, used, free |
| 75 |
| 76 for part in psutil.disk_partitions(all=False): |
| 77 usage = psutil.disk_usage(part.mountpoint) |
| 78 dev, total, used, free = df(part.mountpoint) |
| 79 self.assertEqual(part.device, dev) |
| 80 self.assertEqual(usage.total, total) |
| 81 # 10 MB tollerance |
| 82 if abs(usage.free - free) > 10 * 1024 * 1024: |
| 83 self.fail("psutil=%s, df=%s" % usage.free, free) |
| 84 if abs(usage.used - used) > 10 * 1024 * 1024: |
| 85 self.fail("psutil=%s, df=%s" % usage.used, used) |
| 86 |
56 | 87 |
57 if __name__ == '__main__': | 88 if __name__ == '__main__': |
58 test_suite = unittest.TestSuite() | 89 test_suite = unittest.TestSuite() |
59 test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) | 90 test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) |
60 unittest.TextTestRunner(verbosity=2).run(test_suite) | 91 unittest.TextTestRunner(verbosity=2).run(test_suite) |
61 | |
62 | |
63 | |
64 | |
OLD | NEW |