OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 # |
| 3 # $Id: test_psutil.py 806 2010-11-12 23:09:35Z g.rodola $ |
| 4 # |
| 5 |
| 6 """psutil test suite. |
| 7 Note: this is targeted for python 2.x. |
| 8 To run it under python 3.x you need to use 2to3 tool first: |
| 9 |
| 10 $ 2to3 -w test/*.py |
| 11 """ |
| 12 |
| 13 import unittest |
| 14 import os |
| 15 import sys |
| 16 import subprocess |
| 17 import time |
| 18 import signal |
| 19 import types |
| 20 import traceback |
| 21 import socket |
| 22 import warnings |
| 23 import atexit |
| 24 import errno |
| 25 import threading |
| 26 |
| 27 import psutil |
| 28 |
| 29 |
| 30 PYTHON = os.path.realpath(sys.executable) |
| 31 DEVNULL = open(os.devnull, 'r+') |
| 32 |
| 33 POSIX = os.name == 'posix' |
| 34 LINUX = sys.platform.lower().startswith("linux") |
| 35 WINDOWS = sys.platform.lower().startswith("win32") |
| 36 OSX = sys.platform.lower().startswith("darwin") |
| 37 BSD = sys.platform.lower().startswith("freebsd") |
| 38 |
| 39 try: |
| 40 psutil.Process(os.getpid()).get_connections() |
| 41 except NotImplementedError, err: |
| 42 SUPPORT_CONNECTIONS = False |
| 43 atexit.register(warnings.warn, "get_connections() not supported on this plat
form", |
| 44 RuntimeWarning) |
| 45 else: |
| 46 SUPPORT_CONNECTIONS = True |
| 47 |
| 48 |
| 49 _subprocesses_started = set() |
| 50 |
| 51 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None): |
| 52 """Return a subprocess.Popen object to use in tests. |
| 53 By default stdout and stderr are redirected to /dev/null and the |
| 54 python interpreter is used as test process. |
| 55 """ |
| 56 if cmd is None: |
| 57 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] |
| 58 sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin) |
| 59 _subprocesses_started.add(sproc.pid) |
| 60 return sproc |
| 61 |
| 62 def wait_for_pid(pid, timeout=1): |
| 63 """Wait for pid to show up in the process list then return. |
| 64 Used in the test suite to give time the sub process to initialize. |
| 65 """ |
| 66 raise_at = time.time() + timeout |
| 67 while 1: |
| 68 if pid in psutil.get_pid_list(): |
| 69 # give it one more iteration to allow full initialization |
| 70 time.sleep(0.01) |
| 71 return |
| 72 time.sleep(0.0001) |
| 73 if time.time() >= raise_at: |
| 74 raise RuntimeError("Timed out") |
| 75 |
| 76 def kill(pid): |
| 77 """Kill a process given its PID.""" |
| 78 if hasattr(os, 'kill'): |
| 79 os.kill(pid, signal.SIGKILL) |
| 80 else: |
| 81 psutil.Process(pid).kill() |
| 82 |
| 83 def reap_children(search_all=False): |
| 84 """Kill any subprocess started by this test suite and ensure that |
| 85 no zombies stick around to hog resources and create problems when |
| 86 looking for refleaks. |
| 87 """ |
| 88 if POSIX: |
| 89 def waitpid(process): |
| 90 # on posix we are free to wait for any pending process by |
| 91 # passing -1 to os.waitpid() |
| 92 while True: |
| 93 try: |
| 94 any_process = -1 |
| 95 pid, status = os.waitpid(any_process, os.WNOHANG) |
| 96 if pid == 0 and not process.is_running(): |
| 97 break |
| 98 except OSError: |
| 99 if not process.is_running(): |
| 100 break |
| 101 else: |
| 102 def waitpid(process): |
| 103 # on non-posix systems we just wait for the given process |
| 104 # to go away |
| 105 while process.is_running(): |
| 106 time.sleep(0.01) |
| 107 |
| 108 if search_all: |
| 109 this_process = psutil.Process(os.getpid()) |
| 110 pids = [x.pid for x in this_process.get_children()] |
| 111 else: |
| 112 pids =_subprocesses_started |
| 113 while pids: |
| 114 pid = pids.pop() |
| 115 try: |
| 116 child = psutil.Process(pid) |
| 117 child.kill() |
| 118 except psutil.NoSuchProcess: |
| 119 pass |
| 120 else: |
| 121 waitpid(child) |
| 122 |
| 123 # we want to search through all processes before exiting |
| 124 atexit.register(reap_children, search_all=True) |
| 125 |
| 126 def skipIf(condition, reason="", warn=False): |
| 127 """Decorator which skip a test under if condition is satisfied. |
| 128 This is a substitute of unittest.skipIf which is available |
| 129 only in python 2.7 and 3.2. |
| 130 If 'reason' argument is provided this will be printed during |
| 131 tests execution. |
| 132 If 'warn' is provided a RuntimeWarning will be shown when all |
| 133 tests are run. |
| 134 """ |
| 135 def outer(fun, *args, **kwargs): |
| 136 def inner(self): |
| 137 if condition: |
| 138 sys.stdout.write("skipped-") |
| 139 sys.stdout.flush() |
| 140 if warn: |
| 141 objname = "%s.%s" % (self.__class__.__name__, fun.__name__) |
| 142 msg = "%s was skipped" % objname |
| 143 if reason: |
| 144 msg += "; reason: " + repr(reason) |
| 145 atexit.register(warnings.warn, msg, RuntimeWarning) |
| 146 return |
| 147 else: |
| 148 return fun(self, *args, **kwargs) |
| 149 return inner |
| 150 return outer |
| 151 |
| 152 def skipUnless(condition, reason="", warn=False): |
| 153 """Contrary of skipIf.""" |
| 154 if not condition: |
| 155 return skipIf(True, reason, warn) |
| 156 return skipIf(False) |
| 157 |
| 158 |
| 159 class TestCase(unittest.TestCase): |
| 160 |
| 161 def tearDown(self): |
| 162 reap_children() |
| 163 |
| 164 def test_get_process_list(self): |
| 165 pids = [x.pid for x in psutil.get_process_list()] |
| 166 self.assertTrue(os.getpid() in pids) |
| 167 self.assertTrue(0 in pids) |
| 168 |
| 169 def test_process_iter(self): |
| 170 pids = [x.pid for x in psutil.process_iter()] |
| 171 self.assertTrue(os.getpid() in pids) |
| 172 self.assertTrue(0 in pids) |
| 173 |
| 174 def test_kill(self): |
| 175 sproc = get_test_subprocess() |
| 176 test_pid = sproc.pid |
| 177 wait_for_pid(test_pid) |
| 178 p = psutil.Process(test_pid) |
| 179 name = p.name |
| 180 p.kill() |
| 181 sproc.wait() |
| 182 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| 183 |
| 184 def test_terminate(self): |
| 185 sproc = get_test_subprocess() |
| 186 test_pid = sproc.pid |
| 187 wait_for_pid(test_pid) |
| 188 p = psutil.Process(test_pid) |
| 189 name = p.name |
| 190 p.terminate() |
| 191 sproc.wait() |
| 192 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| 193 |
| 194 def test_send_signal(self): |
| 195 if POSIX: |
| 196 sig = signal.SIGKILL |
| 197 else: |
| 198 sig = signal.SIGTERM |
| 199 sproc = get_test_subprocess() |
| 200 test_pid = sproc.pid |
| 201 p = psutil.Process(test_pid) |
| 202 name = p.name |
| 203 p.send_signal(sig) |
| 204 sproc.wait() |
| 205 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) |
| 206 |
| 207 def test_TOTAL_PHYMEM(self): |
| 208 x = psutil.TOTAL_PHYMEM |
| 209 self.assertTrue(isinstance(x, (int, long))) |
| 210 self.assertTrue(x > 0) |
| 211 |
| 212 def test_used_phymem(self): |
| 213 x = psutil.used_phymem() |
| 214 self.assertTrue(isinstance(x, (int, long))) |
| 215 self.assertTrue(x > 0) |
| 216 |
| 217 def test_avail_phymem(self): |
| 218 x = psutil.avail_phymem() |
| 219 self.assertTrue(isinstance(x, (int, long))) |
| 220 self.assertTrue(x > 0) |
| 221 |
| 222 def test_total_virtmem(self): |
| 223 x = psutil.total_virtmem() |
| 224 self.assertTrue(isinstance(x, (int, long))) |
| 225 self.assertTrue(x >= 0) |
| 226 |
| 227 def test_used_virtmem(self): |
| 228 x = psutil.used_virtmem() |
| 229 self.assertTrue(isinstance(x, (int, long))) |
| 230 self.assertTrue(x >= 0) |
| 231 |
| 232 def test_avail_virtmem(self): |
| 233 x = psutil.avail_virtmem() |
| 234 self.assertTrue(isinstance(x, (int, long))) |
| 235 self.assertTrue(x >= 0) |
| 236 |
| 237 @skipUnless(LINUX) |
| 238 def test_cached_phymem(self): |
| 239 x = psutil.cached_phymem() |
| 240 self.assertTrue(isinstance(x, (int, long))) |
| 241 self.assertTrue(x >= 0) |
| 242 |
| 243 @skipUnless(LINUX) |
| 244 def test_phymem_buffers(self): |
| 245 x = psutil.phymem_buffers() |
| 246 self.assertTrue(isinstance(x, (int, long))) |
| 247 self.assertTrue(x >= 0) |
| 248 |
| 249 def test_system_cpu_times(self): |
| 250 total = 0 |
| 251 times = psutil.cpu_times() |
| 252 self.assertTrue(isinstance(times, psutil.CPUTimes)) |
| 253 sum(times) |
| 254 for cp_time in times: |
| 255 self.assertTrue(isinstance(cp_time, float)) |
| 256 self.assertTrue(cp_time >= 0.0) |
| 257 total += cp_time |
| 258 # test CPUTimes's __iter__ and __str__ implementation |
| 259 self.assertEqual(total, sum(times)) |
| 260 str(times) |
| 261 |
| 262 def test_system_cpu_times2(self): |
| 263 t1 = sum(psutil.cpu_times()) |
| 264 time.sleep(0.1) |
| 265 t2 = sum(psutil.cpu_times()) |
| 266 difference = t2 - t1 |
| 267 if not difference >= 0.05: |
| 268 self.fail("difference %s" % difference) |
| 269 |
| 270 def test_system_cpu_percent(self): |
| 271 psutil.cpu_percent(interval=0.001) |
| 272 psutil.cpu_percent(interval=0.001) |
| 273 for x in xrange(1000): |
| 274 percent = psutil.cpu_percent(interval=None) |
| 275 self.assertTrue(isinstance(percent, float)) |
| 276 self.assertTrue(percent >= 0.0) |
| 277 self.assertTrue(percent <= 100.0) |
| 278 |
| 279 def test_process_cpu_percent(self): |
| 280 p = psutil.Process(os.getpid()) |
| 281 p.get_cpu_percent(interval=0.001) |
| 282 p.get_cpu_percent(interval=0.001) |
| 283 for x in xrange(100): |
| 284 percent = p.get_cpu_percent(interval=None) |
| 285 self.assertTrue(isinstance(percent, float)) |
| 286 self.assertTrue(percent >= 0.0) |
| 287 self.assertTrue(percent <= 100.0) |
| 288 |
| 289 def test_get_process_cpu_times(self): |
| 290 times = psutil.Process(os.getpid()).get_cpu_times() |
| 291 self.assertTrue((times.user > 0.0) or (times.system > 0.0)) |
| 292 # make sure returned values can be pretty printed with strftime |
| 293 time.strftime("%H:%M:%S", time.localtime(times.user)) |
| 294 time.strftime("%H:%M:%S", time.localtime(times.system)) |
| 295 |
| 296 # Test Process.cpu_times() against os.times() |
| 297 # os.times() is broken on Python 2.6 |
| 298 # http://bugs.python.org/issue1040026 |
| 299 # XXX fails on OSX: not sure if it's for os.times(). We should |
| 300 # try this with Python 2.7 and re-enable the test. |
| 301 |
| 302 @skipUnless(sys.version_info > (2, 6, 1) and not OSX) |
| 303 def test_get_process_cpu_times2(self): |
| 304 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() |
| 305 utime, ktime = os.times()[:2] |
| 306 |
| 307 # Use os.times()[:2] as base values to compare our results |
| 308 # using a tolerance of +/- 0.1 seconds. |
| 309 # It will fail if the difference between the values is > 0.1s. |
| 310 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: |
| 311 self.fail("expected: %s, found: %s" %(utime, user_time)) |
| 312 |
| 313 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: |
| 314 self.fail("expected: %s, found: %s" %(ktime, kernel_time)) |
| 315 |
| 316 def test_create_time(self): |
| 317 sproc = get_test_subprocess() |
| 318 now = time.time() |
| 319 wait_for_pid(sproc.pid) |
| 320 p = psutil.Process(sproc.pid) |
| 321 create_time = p.create_time |
| 322 |
| 323 # Use time.time() as base value to compare our result using a |
| 324 # tolerance of +/- 1 second. |
| 325 # It will fail if the difference between the values is > 2s. |
| 326 difference = abs(create_time - now) |
| 327 if difference > 2: |
| 328 self.fail("expected: %s, found: %s, difference: %s" |
| 329 % (now, create_time, difference)) |
| 330 |
| 331 # make sure returned value can be pretty printed with strftime |
| 332 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) |
| 333 |
| 334 def test_get_num_threads(self): |
| 335 p = psutil.Process(os.getpid()) |
| 336 numt1 = p.get_num_threads() |
| 337 if not WINDOWS and not OSX: |
| 338 # test is unreliable on Windows and OS X |
| 339 # NOTE: sleep(1) is too long for OS X, works with sleep(.5) |
| 340 self.assertEqual(numt1, 1) |
| 341 t = threading.Thread(target=lambda:time.sleep(1)) |
| 342 t.start() |
| 343 numt2 = p.get_num_threads() |
| 344 if WINDOWS: |
| 345 self.assertTrue(numt2 > numt1) |
| 346 else: |
| 347 self.assertEqual(numt2, 2) |
| 348 |
| 349 def test_get_memory_info(self): |
| 350 p = psutil.Process(os.getpid()) |
| 351 |
| 352 # step 1 - get a base value to compare our results |
| 353 rss1, vms1 = p.get_memory_info() |
| 354 percent1 = p.get_memory_percent() |
| 355 self.assertTrue(rss1 > 0) |
| 356 self.assertTrue(vms1 > 0) |
| 357 |
| 358 # step 2 - allocate some memory |
| 359 memarr = [None] * 1500000 |
| 360 |
| 361 rss2, vms2 = p.get_memory_info() |
| 362 percent2 = p.get_memory_percent() |
| 363 # make sure that the memory usage bumped up |
| 364 self.assertTrue(rss2 > rss1) |
| 365 self.assertTrue(vms2 >= vms1) # vms might be equal |
| 366 self.assertTrue(percent2 > percent1) |
| 367 del memarr |
| 368 |
| 369 def test_get_memory_percent(self): |
| 370 p = psutil.Process(os.getpid()) |
| 371 self.assertTrue(p.get_memory_percent() > 0.0) |
| 372 |
| 373 def test_pid(self): |
| 374 sproc = get_test_subprocess() |
| 375 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) |
| 376 |
| 377 def test_eq(self): |
| 378 sproc = get_test_subprocess() |
| 379 wait_for_pid(sproc.pid) |
| 380 self.assertTrue(psutil.Process(sproc.pid) == psutil.Process(sproc.pid)) |
| 381 |
| 382 def test_is_running(self): |
| 383 sproc = get_test_subprocess() |
| 384 wait_for_pid(sproc.pid) |
| 385 p = psutil.Process(sproc.pid) |
| 386 self.assertTrue(p.is_running()) |
| 387 psutil.Process(sproc.pid).kill() |
| 388 sproc.wait() |
| 389 self.assertFalse(p.is_running()) |
| 390 |
| 391 def test_pid_exists(self): |
| 392 sproc = get_test_subprocess() |
| 393 wait_for_pid(sproc.pid) |
| 394 self.assertTrue(psutil.pid_exists(sproc.pid)) |
| 395 psutil.Process(sproc.pid).kill() |
| 396 sproc.wait() |
| 397 self.assertFalse(psutil.pid_exists(sproc.pid)) |
| 398 self.assertFalse(psutil.pid_exists(-1)) |
| 399 |
| 400 def test_exe(self): |
| 401 sproc = get_test_subprocess() |
| 402 wait_for_pid(sproc.pid) |
| 403 self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON) |
| 404 for p in psutil.process_iter(): |
| 405 try: |
| 406 exe = p.exe |
| 407 except psutil.Error: |
| 408 continue |
| 409 if not exe: |
| 410 continue |
| 411 if not os.path.exists(exe): |
| 412 self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \ |
| 413 % (repr(exe), p.pid, p.name, p.cmdline)) |
| 414 if hasattr(os, 'access') and hasattr(os, "X_OK"): |
| 415 if not os.access(p.exe, os.X_OK): |
| 416 self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s
)" \ |
| 417 % (repr(p.exe), p.pid, p.name, p.cmdline)) |
| 418 |
| 419 def test_path(self): |
| 420 proc = psutil.Process(os.getpid()) |
| 421 warnings.filterwarnings("error") |
| 422 try: |
| 423 self.assertRaises(DeprecationWarning, getattr, proc, 'path') |
| 424 finally: |
| 425 warnings.resetwarnings() |
| 426 |
| 427 def test_cmdline(self): |
| 428 sproc = get_test_subprocess([PYTHON, "-E"]) |
| 429 wait_for_pid(sproc.pid) |
| 430 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"]) |
| 431 |
| 432 def test_name(self): |
| 433 sproc = get_test_subprocess(PYTHON) |
| 434 wait_for_pid(sproc.pid) |
| 435 if OSX: |
| 436 self.assertEqual(psutil.Process(sproc.pid).name, "Python") |
| 437 else: |
| 438 self.assertEqual(psutil.Process(sproc.pid).name, os.path.basename(PY
THON)) |
| 439 |
| 440 def test_uid(self): |
| 441 sproc = get_test_subprocess() |
| 442 wait_for_pid(sproc.pid) |
| 443 uid = psutil.Process(sproc.pid).uid |
| 444 if hasattr(os, 'getuid'): |
| 445 self.assertEqual(uid, os.getuid()) |
| 446 else: |
| 447 # On those platforms where UID doesn't make sense (Windows) |
| 448 # we expect it to be -1 |
| 449 self.assertEqual(uid, -1) |
| 450 |
| 451 def test_gid(self): |
| 452 sproc = get_test_subprocess() |
| 453 wait_for_pid(sproc.pid) |
| 454 gid = psutil.Process(sproc.pid).gid |
| 455 if hasattr(os, 'getgid'): |
| 456 self.assertEqual(gid, os.getgid()) |
| 457 else: |
| 458 # On those platforms where GID doesn't make sense (Windows) |
| 459 # we expect it to be -1 |
| 460 self.assertEqual(gid, -1) |
| 461 |
| 462 def test_username(self): |
| 463 sproc = get_test_subprocess() |
| 464 p = psutil.Process(sproc.pid) |
| 465 if POSIX: |
| 466 import pwd |
| 467 user = pwd.getpwuid(p.uid).pw_name |
| 468 self.assertEqual(p.username, user) |
| 469 elif WINDOWS: |
| 470 expected_username = os.environ['USERNAME'] |
| 471 expected_domain = os.environ['USERDOMAIN'] |
| 472 domain, username = p.username.split('\\') |
| 473 self.assertEqual(domain, expected_domain) |
| 474 self.assertEqual(username, expected_username) |
| 475 else: |
| 476 p.username |
| 477 |
| 478 @skipUnless(WINDOWS or LINUX) |
| 479 def test_getcwd(self): |
| 480 sproc = get_test_subprocess() |
| 481 wait_for_pid(sproc.pid) |
| 482 p = psutil.Process(sproc.pid) |
| 483 self.assertEqual(p.getcwd(), os.getcwd()) |
| 484 |
| 485 @skipUnless(WINDOWS or LINUX) |
| 486 def test_getcwd_2(self): |
| 487 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] |
| 488 sproc = get_test_subprocess(cmd) |
| 489 wait_for_pid(sproc.pid) |
| 490 p = psutil.Process(sproc.pid) |
| 491 time.sleep(0.1) |
| 492 expected_dir = os.path.dirname(os.getcwd()) |
| 493 self.assertEqual(p.getcwd(), expected_dir) |
| 494 |
| 495 def test_get_open_files(self): |
| 496 thisfile = os.path.join(os.getcwd(), __file__) |
| 497 |
| 498 # current process |
| 499 p = psutil.Process(os.getpid()) |
| 500 files = p.get_open_files() |
| 501 self.assertFalse(thisfile in files) |
| 502 f = open(thisfile, 'r') |
| 503 filenames = [x.path for x in p.get_open_files()] |
| 504 self.assertTrue(thisfile in filenames) |
| 505 f.close() |
| 506 for file in filenames: |
| 507 self.assertTrue(os.path.isfile(file)) |
| 508 |
| 509 # another process |
| 510 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % thisfi
le |
| 511 sproc = get_test_subprocess([PYTHON, "-c", cmdline]) |
| 512 wait_for_pid(sproc.pid) |
| 513 time.sleep(0.1) |
| 514 p = psutil.Process(sproc.pid) |
| 515 filenames = [x.path for x in p.get_open_files()] |
| 516 self.assertTrue(thisfile in filenames) |
| 517 for file in filenames: |
| 518 self.assertTrue(os.path.isfile(file)) |
| 519 # all processes |
| 520 for proc in psutil.process_iter(): |
| 521 try: |
| 522 files = proc.get_open_files() |
| 523 except psutil.Error: |
| 524 pass |
| 525 else: |
| 526 for file in filenames: |
| 527 self.assertTrue(os.path.isfile(file)) |
| 528 |
| 529 def test_get_open_files2(self): |
| 530 # test fd and path fields |
| 531 fileobj = open(os.path.join(os.getcwd(), __file__), 'r') |
| 532 p = psutil.Process(os.getpid()) |
| 533 for path, fd in p.get_open_files(): |
| 534 if path == fileobj.name or fd == fileobj.fileno(): |
| 535 break |
| 536 else: |
| 537 self.fail("no file found; files=%s" % repr(p.get_open_files())) |
| 538 self.assertEqual(path, fileobj.name) |
| 539 if WINDOWS: |
| 540 self.assertEqual(fd, -1) |
| 541 else: |
| 542 self.assertEqual(fd, fileobj.fileno()) |
| 543 # test positions |
| 544 ntuple = p.get_open_files()[0] |
| 545 self.assertEqual(ntuple[0], ntuple.path) |
| 546 self.assertEqual(ntuple[1], ntuple.fd) |
| 547 # test file is gone |
| 548 fileobj.close() |
| 549 self.assertTrue(fileobj.name not in p.get_open_files()) |
| 550 |
| 551 @skipUnless(SUPPORT_CONNECTIONS, warn=1) |
| 552 def test_get_connections(self): |
| 553 arg = "import socket, time;" \ |
| 554 "s = socket.socket();" \ |
| 555 "s.bind(('127.0.0.1', 0));" \ |
| 556 "s.listen(1);" \ |
| 557 "conn, addr = s.accept();" \ |
| 558 "time.sleep(100);" |
| 559 sproc = get_test_subprocess([PYTHON, "-c", arg]) |
| 560 time.sleep(0.1) |
| 561 p = psutil.Process(sproc.pid) |
| 562 cons = p.get_connections() |
| 563 self.assertTrue(len(cons) == 1) |
| 564 con = cons[0] |
| 565 self.assertEqual(con.family, socket.AF_INET) |
| 566 self.assertEqual(con.type, socket.SOCK_STREAM) |
| 567 self.assertEqual(con.status, "LISTEN") |
| 568 ip, port = con.local_address |
| 569 self.assertEqual(ip, '127.0.0.1') |
| 570 self.assertEqual(con.remote_address, ()) |
| 571 if WINDOWS: |
| 572 self.assertEqual(con.fd, -1) |
| 573 else: |
| 574 self.assertTrue(con.fd > 0) |
| 575 # test positions |
| 576 self.assertEqual(con[0], con.fd) |
| 577 self.assertEqual(con[1], con.family) |
| 578 self.assertEqual(con[2], con.type) |
| 579 self.assertEqual(con[3], con.local_address) |
| 580 self.assertEqual(con[4], con.remote_address) |
| 581 self.assertEqual(con[5], con.status) |
| 582 |
| 583 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS) |
| 584 def test_connection_fromfd(self): |
| 585 sock = socket.socket() |
| 586 sock.bind(('127.0.0.1', 0)) |
| 587 sock.listen(1) |
| 588 p = psutil.Process(os.getpid()) |
| 589 for conn in p.get_connections(): |
| 590 if conn.fd == sock.fileno(): |
| 591 break |
| 592 else: |
| 593 sock.close() |
| 594 self.fail("couldn't find socket fd") |
| 595 dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
| 596 try: |
| 597 self.assertEqual(dupsock.getsockname(), conn.local_address) |
| 598 self.assertNotEqual(sock.fileno(), dupsock.fileno()) |
| 599 finally: |
| 600 sock.close() |
| 601 dupsock.close() |
| 602 |
| 603 @skipUnless(SUPPORT_CONNECTIONS, warn=1) |
| 604 def test_get_connections_all(self): |
| 605 |
| 606 def check_address(addr, family): |
| 607 if not addr: |
| 608 return |
| 609 ip, port = addr |
| 610 self.assertTrue(isinstance(port, int)) |
| 611 if family == socket.AF_INET: |
| 612 ip = map(int, ip.split('.')) |
| 613 self.assertTrue(len(ip) == 4) |
| 614 for num in ip: |
| 615 self.assertTrue(0 <= num <= 255) |
| 616 self.assertTrue(0 <= port <= 65535) |
| 617 |
| 618 def supports_ipv6(): |
| 619 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): |
| 620 return False |
| 621 try: |
| 622 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| 623 sock.bind(("::1", 0)) |
| 624 except (socket.error, socket.gaierror): |
| 625 return False |
| 626 else: |
| 627 sock.close() |
| 628 return True |
| 629 |
| 630 # all values are supposed to match Linux's tcp_states.h states |
| 631 # table across all platforms. |
| 632 valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", |
| 633 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", |
| 634 "LAST_ACK", "LISTEN", "CLOSING", ""] |
| 635 |
| 636 tcp_template = "import socket;" \ |
| 637 "s = socket.socket($family, socket.SOCK_STREAM);" \ |
| 638 "s.bind(('$addr', 0));" \ |
| 639 "s.listen(1);" \ |
| 640 "conn, addr = s.accept();" |
| 641 |
| 642 udp_template = "import socket, time;" \ |
| 643 "s = socket.socket($family, socket.SOCK_DGRAM);" \ |
| 644 "s.bind(('$addr', 0));" \ |
| 645 "time.sleep(100);" |
| 646 |
| 647 from string import Template |
| 648 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, |
| 649 addr="127.0.0.1") |
| 650 udp4_template = Template(udp_template).substitute(family=socket.AF_INET, |
| 651 addr="127.0.0.1") |
| 652 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6
, |
| 653 addr="::1") |
| 654 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6
, |
| 655 addr="::1") |
| 656 |
| 657 # launch various subprocess instantiating a socket of various |
| 658 # families and tupes to enrich psutil results |
| 659 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) |
| 660 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) |
| 661 if supports_ipv6(): |
| 662 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) |
| 663 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) |
| 664 else: |
| 665 tcp6_proc = None |
| 666 udp6_proc = None |
| 667 |
| 668 time.sleep(0.1) |
| 669 this_proc = psutil.Process(os.getpid()) |
| 670 children_pids = [p.pid for p in this_proc.get_children()] |
| 671 for p in psutil.process_iter(): |
| 672 try: |
| 673 cons = p.get_connections() |
| 674 except (psutil.NoSuchProcess, psutil.AccessDenied): |
| 675 pass |
| 676 else: |
| 677 for conn in cons: |
| 678 self.assertTrue(conn.type in (socket.SOCK_STREAM, |
| 679 socket.SOCK_DGRAM)) |
| 680 self.assertTrue(conn.family in (socket.AF_INET, |
| 681 socket.AF_INET6)) |
| 682 check_address(conn.local_address, conn.family) |
| 683 check_address(conn.remote_address, conn.family) |
| 684 if conn.status not in valid_states: |
| 685 self.fail("%s is not a valid status" %conn.status) |
| 686 # actually try to bind the local socket; ignore IPv6 |
| 687 # sockets as their address might be represented as |
| 688 # an IPv4-mapped-address (e.g. "::127.0.0.1") |
| 689 # and that's rejected by bind() |
| 690 if conn.family == socket.AF_INET: |
| 691 s = socket.socket(conn.family, conn.type) |
| 692 s.bind((conn.local_address[0], 0)) |
| 693 s.close() |
| 694 |
| 695 if not WINDOWS and hasattr(socket, 'fromfd'): |
| 696 try: |
| 697 dupsock = socket.fromfd(conn.fd, conn.family, |
| 698 conn.type) |
| 699 except (socket.error, OSError), err: |
| 700 if err.args[0] == errno.EBADF: |
| 701 continue |
| 702 else: |
| 703 raise |
| 704 self.assertEqual(dupsock.family, conn.family) |
| 705 self.assertEqual(dupsock.type, conn.type) |
| 706 |
| 707 # check matches against subprocesses |
| 708 if p.pid in children_pids: |
| 709 self.assertTrue(len(cons) == 1) |
| 710 conn = cons[0] |
| 711 # TCP v4 |
| 712 if p.pid == tcp4_proc.pid: |
| 713 self.assertEqual(conn.family, socket.AF_INET) |
| 714 self.assertEqual(conn.type, socket.SOCK_STREAM) |
| 715 self.assertEqual(conn.local_address[0], "127.0.0.1") |
| 716 self.assertEqual(conn.remote_address, ()) |
| 717 self.assertEqual(conn.status, "LISTEN") |
| 718 # UDP v4 |
| 719 elif p.pid == udp4_proc.pid: |
| 720 self.assertEqual(conn.family, socket.AF_INET) |
| 721 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
| 722 self.assertEqual(conn.local_address[0], "127.0.0.1") |
| 723 self.assertEqual(conn.remote_address, ()) |
| 724 self.assertEqual(conn.status, "") |
| 725 # TCP v6 |
| 726 elif p.pid == getattr(tcp6_proc, "pid", None): |
| 727 self.assertEqual(conn.family, socket.AF_INET6) |
| 728 self.assertEqual(conn.type, socket.SOCK_STREAM) |
| 729 self.assertEqual(conn.local_address[0], "::1") |
| 730 self.assertEqual(conn.remote_address, ()) |
| 731 self.assertEqual(conn.status, "LISTEN") |
| 732 # UDP v6 |
| 733 elif p.pid == getattr(udp6_proc, "pid", None): |
| 734 self.assertEqual(conn.family, socket.AF_INET6) |
| 735 self.assertEqual(conn.type, socket.SOCK_DGRAM) |
| 736 self.assertEqual(conn.local_address[0], "::1") |
| 737 self.assertEqual(conn.remote_address, ()) |
| 738 self.assertEqual(conn.status, "") |
| 739 |
| 740 def test_parent_ppid(self): |
| 741 this_parent = os.getpid() |
| 742 sproc = get_test_subprocess() |
| 743 p = psutil.Process(sproc.pid) |
| 744 self.assertEqual(p.ppid, this_parent) |
| 745 self.assertEqual(p.parent.pid, this_parent) |
| 746 # no other process is supposed to have us as parent |
| 747 for p in psutil.process_iter(): |
| 748 if p.pid == sproc.pid: |
| 749 continue |
| 750 self.assertTrue(p.ppid != this_parent) |
| 751 |
| 752 def test_get_children(self): |
| 753 p = psutil.Process(os.getpid()) |
| 754 self.assertEqual(p.get_children(), []) |
| 755 sproc = get_test_subprocess() |
| 756 children = p.get_children() |
| 757 self.assertEqual(len(children), 1) |
| 758 self.assertEqual(children[0].pid, sproc.pid) |
| 759 self.assertEqual(children[0].ppid, os.getpid()) |
| 760 |
| 761 def test_suspend_resume(self): |
| 762 sproc = get_test_subprocess() |
| 763 p = psutil.Process(sproc.pid) |
| 764 p.suspend() |
| 765 p.resume() |
| 766 |
| 767 def test_get_pid_list(self): |
| 768 plist = [x.pid for x in psutil.get_process_list()] |
| 769 pidlist = psutil.get_pid_list() |
| 770 self.assertEqual(plist.sort(), pidlist.sort()) |
| 771 # make sure every pid is unique |
| 772 self.assertEqual(len(pidlist), len(set(pidlist))) |
| 773 |
| 774 def test_test(self): |
| 775 # test for psutil.test() function |
| 776 stdout = sys.stdout |
| 777 sys.stdout = DEVNULL |
| 778 try: |
| 779 psutil.test() |
| 780 finally: |
| 781 sys.stdout = stdout |
| 782 |
| 783 def test_types(self): |
| 784 sproc = get_test_subprocess() |
| 785 wait_for_pid(sproc.pid) |
| 786 p = psutil.Process(sproc.pid) |
| 787 self.assert_(isinstance(p.pid, int)) |
| 788 self.assert_(isinstance(p.ppid, int)) |
| 789 self.assert_(isinstance(p.parent, psutil.Process)) |
| 790 self.assert_(isinstance(p.name, str)) |
| 791 if self.__class__.__name__ != "LimitedUserTestCase": |
| 792 self.assert_(isinstance(p.exe, str)) |
| 793 self.assert_(isinstance(p.cmdline, list)) |
| 794 self.assert_(isinstance(p.uid, int)) |
| 795 self.assert_(isinstance(p.gid, int)) |
| 796 self.assert_(isinstance(p.create_time, float)) |
| 797 self.assert_(isinstance(p.username, (unicode, str))) |
| 798 if hasattr(p, 'getcwd'): |
| 799 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase": |
| 800 self.assert_(isinstance(p.getcwd(), str)) |
| 801 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase": |
| 802 self.assert_(isinstance(p.get_open_files(), list)) |
| 803 for path, fd in p.get_open_files(): |
| 804 self.assert_(isinstance(path, (unicode, str))) |
| 805 self.assert_(isinstance(fd, int)) |
| 806 if not POSIX and self.__class__.__name__ != "LimitedUserTestCase" \ |
| 807 and SUPPORT_CONNECTIONS: |
| 808 self.assert_(isinstance(p.get_connections(), list)) |
| 809 self.assert_(isinstance(p.is_running(), bool)) |
| 810 if not OSX or self.__class__.__name__ != "LimitedUserTestCase": |
| 811 self.assert_(isinstance(p.get_cpu_times(), tuple)) |
| 812 self.assert_(isinstance(p.get_cpu_times()[0], float)) |
| 813 self.assert_(isinstance(p.get_cpu_times()[1], float)) |
| 814 self.assert_(isinstance(p.get_cpu_percent(0), float)) |
| 815 self.assert_(isinstance(p.get_memory_info(), tuple)) |
| 816 self.assert_(isinstance(p.get_memory_info()[0], int)) |
| 817 self.assert_(isinstance(p.get_memory_info()[1], int)) |
| 818 self.assert_(isinstance(p.get_memory_percent(), float)) |
| 819 self.assert_(isinstance(p.get_num_threads(), int)) |
| 820 self.assert_(isinstance(psutil.get_process_list(), list)) |
| 821 self.assert_(isinstance(psutil.get_process_list()[0], psutil.Process)) |
| 822 self.assert_(isinstance(psutil.process_iter(), types.GeneratorType)) |
| 823 self.assert_(isinstance(psutil.process_iter().next(), psutil.Process)) |
| 824 self.assert_(isinstance(psutil.get_pid_list(), list)) |
| 825 self.assert_(isinstance(psutil.get_pid_list()[0], int)) |
| 826 self.assert_(isinstance(psutil.pid_exists(1), bool)) |
| 827 |
| 828 def test_invalid_pid(self): |
| 829 self.assertRaises(ValueError, psutil.Process, "1") |
| 830 self.assertRaises(ValueError, psutil.Process, None) |
| 831 # Refers to Issue #12 |
| 832 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1) |
| 833 |
| 834 def test_zombie_process(self): |
| 835 # Test that NoSuchProcess exception gets raised in the event the |
| 836 # process dies after we create the Process object. |
| 837 # Example: |
| 838 # >>> proc = Process(1234) |
| 839 # >>> time.sleep(5) # time-consuming task, process dies in meantime |
| 840 # >>> proc.name |
| 841 # Refers to Issue #15 |
| 842 sproc = get_test_subprocess() |
| 843 p = psutil.Process(sproc.pid) |
| 844 p.kill() |
| 845 sproc.wait() |
| 846 |
| 847 self.assertRaises(psutil.NoSuchProcess, getattr, p, "ppid") |
| 848 self.assertRaises(psutil.NoSuchProcess, getattr, p, "parent") |
| 849 self.assertRaises(psutil.NoSuchProcess, getattr, p, "name") |
| 850 self.assertRaises(psutil.NoSuchProcess, getattr, p, "exe") |
| 851 self.assertRaises(psutil.NoSuchProcess, getattr, p, "cmdline") |
| 852 self.assertRaises(psutil.NoSuchProcess, getattr, p, "uid") |
| 853 self.assertRaises(psutil.NoSuchProcess, getattr, p, "gid") |
| 854 self.assertRaises(psutil.NoSuchProcess, getattr, p, "create_time") |
| 855 self.assertRaises(psutil.NoSuchProcess, getattr, p, "username") |
| 856 if hasattr(p, 'getcwd'): |
| 857 self.assertRaises(psutil.NoSuchProcess, p.getcwd) |
| 858 self.assertRaises(psutil.NoSuchProcess, p.get_open_files) |
| 859 self.assertRaises(psutil.NoSuchProcess, p.get_connections) |
| 860 self.assertRaises(psutil.NoSuchProcess, p.suspend) |
| 861 self.assertRaises(psutil.NoSuchProcess, p.resume) |
| 862 self.assertRaises(psutil.NoSuchProcess, p.kill) |
| 863 self.assertRaises(psutil.NoSuchProcess, p.terminate) |
| 864 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) |
| 865 self.assertRaises(psutil.NoSuchProcess, p.get_cpu_times) |
| 866 self.assertRaises(psutil.NoSuchProcess, p.get_cpu_percent, 0) |
| 867 self.assertRaises(psutil.NoSuchProcess, p.get_memory_info) |
| 868 self.assertRaises(psutil.NoSuchProcess, p.get_memory_percent) |
| 869 self.assertRaises(psutil.NoSuchProcess, p.get_children) |
| 870 self.assertRaises(psutil.NoSuchProcess, p.get_num_threads) |
| 871 self.assertFalse(p.is_running()) |
| 872 |
| 873 def test__str__(self): |
| 874 sproc = get_test_subprocess() |
| 875 p = psutil.Process(sproc.pid) |
| 876 self.assertTrue(str(sproc.pid) in str(p)) |
| 877 self.assertTrue(os.path.basename(PYTHON) in str(p)) |
| 878 sproc = get_test_subprocess() |
| 879 p = psutil.Process(sproc.pid) |
| 880 p.kill() |
| 881 sproc.wait() |
| 882 self.assertTrue(str(sproc.pid) in str(p)) |
| 883 self.assertTrue("terminated" in str(p)) |
| 884 |
| 885 def test_fetch_all(self): |
| 886 valid_procs = 0 |
| 887 attrs = ['__str__', 'create_time', 'username', 'getcwd', 'get_cpu_times'
, |
| 888 'get_memory_info', 'get_memory_percent', 'get_open_files', |
| 889 'get_num_threads'] |
| 890 for p in psutil.process_iter(): |
| 891 for attr in attrs: |
| 892 # skip slow Python implementation; we're reasonably sure |
| 893 # it works anyway |
| 894 if POSIX and attr == 'get_open_files': |
| 895 continue |
| 896 try: |
| 897 attr = getattr(p, attr, None) |
| 898 if attr is not None and callable(attr): |
| 899 attr() |
| 900 valid_procs += 1 |
| 901 except (psutil.NoSuchProcess, psutil.AccessDenied), err: |
| 902 self.assertEqual(err.pid, p.pid) |
| 903 if err.name: |
| 904 self.assertEqual(err.name, p.name) |
| 905 self.assertTrue(str(err)) |
| 906 self.assertTrue(err.msg) |
| 907 except: |
| 908 trace = traceback.format_exc() |
| 909 self.fail('Exception raised for method %s, pid %s:\n%s' |
| 910 %(attr, p.pid, trace)) |
| 911 |
| 912 # we should always have a non-empty list, not including PID 0 etc. |
| 913 # special cases. |
| 914 self.assertTrue(valid_procs > 0) |
| 915 |
| 916 def test_pid_0(self): |
| 917 # Process(0) is supposed to work on all platforms even if with |
| 918 # some differences |
| 919 p = psutil.Process(0) |
| 920 if WINDOWS: |
| 921 self.assertEqual(p.name, 'System Idle Process') |
| 922 elif LINUX: |
| 923 self.assertEqual(p.name, 'sched') |
| 924 elif BSD: |
| 925 self.assertEqual(p.name, 'swapper') |
| 926 elif OSX: |
| 927 self.assertEqual(p.name, 'kernel_task') |
| 928 |
| 929 if os.name == 'posix': |
| 930 self.assertEqual(p.uid, 0) |
| 931 self.assertEqual(p.gid, 0) |
| 932 else: |
| 933 self.assertEqual(p.uid, -1) |
| 934 self.assertEqual(p.gid, -1) |
| 935 |
| 936 self.assertTrue(p.ppid in (0, 1)) |
| 937 self.assertEqual(p.exe, "") |
| 938 self.assertEqual(p.cmdline, []) |
| 939 # this can either raise AD (Win) or return 0 (UNIX) |
| 940 try: |
| 941 self.assertTrue(p.get_num_threads() in (0, 1)) |
| 942 except psutil.AccessDenied: |
| 943 pass |
| 944 |
| 945 if OSX : #and os.geteuid() != 0: |
| 946 self.assertRaises(psutil.AccessDenied, p.get_memory_info) |
| 947 self.assertRaises(psutil.AccessDenied, p.get_cpu_times) |
| 948 else: |
| 949 p.get_memory_info() |
| 950 |
| 951 # username property |
| 952 if POSIX: |
| 953 self.assertEqual(p.username, 'root') |
| 954 elif WINDOWS: |
| 955 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') |
| 956 else: |
| 957 p.username |
| 958 |
| 959 # PID 0 is supposed to be available on all platforms |
| 960 self.assertTrue(0 in psutil.get_pid_list()) |
| 961 self.assertTrue(psutil.pid_exists(0)) |
| 962 |
| 963 # --- OS specific tests |
| 964 |
| 965 |
| 966 if hasattr(os, 'getuid'): |
| 967 class LimitedUserTestCase(TestCase): |
| 968 """Repeat the previous tests by using a limited user. |
| 969 Executed only on UNIX and only if the user who run the test script |
| 970 is root. |
| 971 """ |
| 972 # the uid/gid the test suite runs under |
| 973 PROCESS_UID = os.getuid() |
| 974 PROCESS_GID = os.getgid() |
| 975 |
| 976 def setUp(self): |
| 977 os.setegid(1000) |
| 978 os.seteuid(1000) |
| 979 TestCase.setUp(self) |
| 980 |
| 981 def tearDown(self): |
| 982 os.setegid(self.PROCESS_UID) |
| 983 os.seteuid(self.PROCESS_GID) |
| 984 TestCase.tearDown(self) |
| 985 |
| 986 def test_path(self): |
| 987 # DeprecationWarning is only raised once |
| 988 pass |
| 989 |
| 990 # overridden tests known to raise AccessDenied when run |
| 991 # as limited user on different platforms |
| 992 |
| 993 if LINUX: |
| 994 |
| 995 def test_getcwd(self): |
| 996 self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd, sel
f) |
| 997 |
| 998 def test_getcwd_2(self): |
| 999 self.assertRaises(psutil.AccessDenied, TestCase.test_getcwd_2, s
elf) |
| 1000 |
| 1001 def test_get_open_files(self): |
| 1002 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi
les, self) |
| 1003 |
| 1004 def test_get_connections(self): |
| 1005 pass |
| 1006 |
| 1007 def test_exe(self): |
| 1008 self.assertRaises(psutil.AccessDenied, TestCase.test_exe, self) |
| 1009 |
| 1010 if BSD: |
| 1011 |
| 1012 def test_get_open_files(self): |
| 1013 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi
les, self) |
| 1014 |
| 1015 def test_get_open_files2(self): |
| 1016 self.assertRaises(psutil.AccessDenied, TestCase.test_get_open_fi
les, self) |
| 1017 |
| 1018 def test_get_connections(self): |
| 1019 self.assertRaises(psutil.AccessDenied, TestCase.test_get_connect
ions, self) |
| 1020 |
| 1021 def test_connection_fromfd(self): |
| 1022 self.assertRaises(psutil.AccessDenied, TestCase.test_connection_
fromfd, self) |
| 1023 |
| 1024 |
| 1025 def test_main(): |
| 1026 tests = [] |
| 1027 test_suite = unittest.TestSuite() |
| 1028 |
| 1029 tests.append(TestCase) |
| 1030 |
| 1031 if hasattr(os, 'getuid'): |
| 1032 if os.getuid() == 0: |
| 1033 tests.append(LimitedUserTestCase) |
| 1034 else: |
| 1035 atexit.register(warnings.warn, "Couldn't run limited user tests (" |
| 1036 "super-user privileges are required)", RuntimeWarning) |
| 1037 |
| 1038 if POSIX: |
| 1039 from _posix import PosixSpecificTestCase |
| 1040 tests.append(PosixSpecificTestCase) |
| 1041 |
| 1042 # import the specific platform test suite |
| 1043 if LINUX: |
| 1044 from _linux import LinuxSpecificTestCase as stc |
| 1045 elif WINDOWS: |
| 1046 from _windows import WindowsSpecificTestCase as stc |
| 1047 elif OSX: |
| 1048 from _osx import OSXSpecificTestCase as stc |
| 1049 elif BSD: |
| 1050 from _bsd import BSDSpecificTestCase as stc |
| 1051 tests.append(stc) |
| 1052 |
| 1053 for test_class in tests: |
| 1054 test_suite.addTest(unittest.makeSuite(test_class)) |
| 1055 |
| 1056 unittest.TextTestRunner(verbosity=2).run(test_suite) |
| 1057 DEVNULL.close() |
| 1058 |
| 1059 if __name__ == '__main__': |
| 1060 test_main() |
| 1061 |
| 1062 |
OLD | NEW |