| OLD | NEW |
| (Empty) | |
| 1 """ |
| 2 TestCmd.py: a testing framework for commands and scripts. |
| 3 |
| 4 The TestCmd module provides a framework for portable automated testing |
| 5 of executable commands and scripts (in any language, not just Python), |
| 6 especially commands and scripts that require file system interaction. |
| 7 |
| 8 In addition to running tests and evaluating conditions, the TestCmd |
| 9 module manages and cleans up one or more temporary workspace |
| 10 directories, and provides methods for creating files and directories in |
| 11 those workspace directories from in-line data, here-documents), allowing |
| 12 tests to be completely self-contained. |
| 13 |
| 14 A TestCmd environment object is created via the usual invocation: |
| 15 |
| 16 import TestCmd |
| 17 test = TestCmd.TestCmd() |
| 18 |
| 19 There are a bunch of keyword arguments available at instantiation: |
| 20 |
| 21 test = TestCmd.TestCmd(description = 'string', |
| 22 program = 'program_or_script_to_test', |
| 23 interpreter = 'script_interpreter', |
| 24 workdir = 'prefix', |
| 25 subdir = 'subdir', |
| 26 verbose = Boolean, |
| 27 match = default_match_function, |
| 28 diff = default_diff_function, |
| 29 combine = Boolean) |
| 30 |
| 31 There are a bunch of methods that let you do different things: |
| 32 |
| 33 test.verbose_set(1) |
| 34 |
| 35 test.description_set('string') |
| 36 |
| 37 test.program_set('program_or_script_to_test') |
| 38 |
| 39 test.interpreter_set('script_interpreter') |
| 40 test.interpreter_set(['script_interpreter', 'arg']) |
| 41 |
| 42 test.workdir_set('prefix') |
| 43 test.workdir_set('') |
| 44 |
| 45 test.workpath('file') |
| 46 test.workpath('subdir', 'file') |
| 47 |
| 48 test.subdir('subdir', ...) |
| 49 |
| 50 test.rmdir('subdir', ...) |
| 51 |
| 52 test.write('file', "contents\n") |
| 53 test.write(['subdir', 'file'], "contents\n") |
| 54 |
| 55 test.read('file') |
| 56 test.read(['subdir', 'file']) |
| 57 test.read('file', mode) |
| 58 test.read(['subdir', 'file'], mode) |
| 59 |
| 60 test.writable('dir', 1) |
| 61 test.writable('dir', None) |
| 62 |
| 63 test.preserve(condition, ...) |
| 64 |
| 65 test.cleanup(condition) |
| 66 |
| 67 test.command_args(program = 'program_or_script_to_run', |
| 68 interpreter = 'script_interpreter', |
| 69 arguments = 'arguments to pass to program') |
| 70 |
| 71 test.run(program = 'program_or_script_to_run', |
| 72 interpreter = 'script_interpreter', |
| 73 arguments = 'arguments to pass to program', |
| 74 chdir = 'directory_to_chdir_to', |
| 75 stdin = 'input to feed to the program\n') |
| 76 universal_newlines = True) |
| 77 |
| 78 p = test.start(program = 'program_or_script_to_run', |
| 79 interpreter = 'script_interpreter', |
| 80 arguments = 'arguments to pass to program', |
| 81 universal_newlines = None) |
| 82 |
| 83 test.finish(self, p) |
| 84 |
| 85 test.pass_test() |
| 86 test.pass_test(condition) |
| 87 test.pass_test(condition, function) |
| 88 |
| 89 test.fail_test() |
| 90 test.fail_test(condition) |
| 91 test.fail_test(condition, function) |
| 92 test.fail_test(condition, function, skip) |
| 93 |
| 94 test.no_result() |
| 95 test.no_result(condition) |
| 96 test.no_result(condition, function) |
| 97 test.no_result(condition, function, skip) |
| 98 |
| 99 test.stdout() |
| 100 test.stdout(run) |
| 101 |
| 102 test.stderr() |
| 103 test.stderr(run) |
| 104 |
| 105 test.symlink(target, link) |
| 106 |
| 107 test.banner(string) |
| 108 test.banner(string, width) |
| 109 |
| 110 test.diff(actual, expected) |
| 111 |
| 112 test.match(actual, expected) |
| 113 |
| 114 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n") |
| 115 test.match_exact(["actual 1\n", "actual 2\n"], |
| 116 ["expected 1\n", "expected 2\n"]) |
| 117 |
| 118 test.match_re("actual 1\nactual 2\n", regex_string) |
| 119 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes) |
| 120 |
| 121 test.match_re_dotall("actual 1\nactual 2\n", regex_string) |
| 122 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes) |
| 123 |
| 124 test.tempdir() |
| 125 test.tempdir('temporary-directory') |
| 126 |
| 127 test.sleep() |
| 128 test.sleep(seconds) |
| 129 |
| 130 test.where_is('foo') |
| 131 test.where_is('foo', 'PATH1:PATH2') |
| 132 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4') |
| 133 |
| 134 test.unlink('file') |
| 135 test.unlink('subdir', 'file') |
| 136 |
| 137 The TestCmd module provides pass_test(), fail_test(), and no_result() |
| 138 unbound functions that report test results for use with the Aegis change |
| 139 management system. These methods terminate the test immediately, |
| 140 reporting PASSED, FAILED, or NO RESULT respectively, and exiting with |
| 141 status 0 (success), 1 or 2 respectively. This allows for a distinction |
| 142 between an actual failed test and a test that could not be properly |
| 143 evaluated because of an external condition (such as a full file system |
| 144 or incorrect permissions). |
| 145 |
| 146 import TestCmd |
| 147 |
| 148 TestCmd.pass_test() |
| 149 TestCmd.pass_test(condition) |
| 150 TestCmd.pass_test(condition, function) |
| 151 |
| 152 TestCmd.fail_test() |
| 153 TestCmd.fail_test(condition) |
| 154 TestCmd.fail_test(condition, function) |
| 155 TestCmd.fail_test(condition, function, skip) |
| 156 |
| 157 TestCmd.no_result() |
| 158 TestCmd.no_result(condition) |
| 159 TestCmd.no_result(condition, function) |
| 160 TestCmd.no_result(condition, function, skip) |
| 161 |
| 162 The TestCmd module also provides unbound functions that handle matching |
| 163 in the same way as the match_*() methods described above. |
| 164 |
| 165 import TestCmd |
| 166 |
| 167 test = TestCmd.TestCmd(match = TestCmd.match_exact) |
| 168 |
| 169 test = TestCmd.TestCmd(match = TestCmd.match_re) |
| 170 |
| 171 test = TestCmd.TestCmd(match = TestCmd.match_re_dotall) |
| 172 |
| 173 The TestCmd module provides unbound functions that can be used for the |
| 174 "diff" argument to TestCmd.TestCmd instantiation: |
| 175 |
| 176 import TestCmd |
| 177 |
| 178 test = TestCmd.TestCmd(match = TestCmd.match_re, |
| 179 diff = TestCmd.diff_re) |
| 180 |
| 181 test = TestCmd.TestCmd(diff = TestCmd.simple_diff) |
| 182 |
| 183 The "diff" argument can also be used with standard difflib functions: |
| 184 |
| 185 import difflib |
| 186 |
| 187 test = TestCmd.TestCmd(diff = difflib.context_diff) |
| 188 |
| 189 test = TestCmd.TestCmd(diff = difflib.unified_diff) |
| 190 |
| 191 Lastly, the where_is() method also exists in an unbound function |
| 192 version. |
| 193 |
| 194 import TestCmd |
| 195 |
| 196 TestCmd.where_is('foo') |
| 197 TestCmd.where_is('foo', 'PATH1:PATH2') |
| 198 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4') |
| 199 """ |
| 200 |
| 201 # Copyright 2000, 2001, 2002, 2003, 2004 Steven Knight |
| 202 # This module is free software, and you may redistribute it and/or modify |
| 203 # it under the same terms as Python itself, so long as this copyright message |
| 204 # and disclaimer are retained in their original form. |
| 205 # |
| 206 # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, |
| 207 # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF |
| 208 # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH |
| 209 # DAMAGE. |
| 210 # |
| 211 # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT |
| 212 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A |
| 213 # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, |
| 214 # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, |
| 215 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. |
| 216 |
| 217 __author__ = "Steven Knight <knight at baldmt dot com>" |
| 218 __revision__ = "TestCmd.py 0.36.D001 2009/07/24 08:45:26 knight" |
| 219 __version__ = "0.36" |
| 220 |
| 221 import errno |
| 222 import os |
| 223 import os.path |
| 224 import re |
| 225 import shutil |
| 226 import stat |
| 227 import string |
| 228 import sys |
| 229 import tempfile |
| 230 import time |
| 231 import traceback |
| 232 import types |
| 233 import UserList |
| 234 |
| 235 __all__ = [ |
| 236 'diff_re', |
| 237 'fail_test', |
| 238 'no_result', |
| 239 'pass_test', |
| 240 'match_exact', |
| 241 'match_re', |
| 242 'match_re_dotall', |
| 243 'python_executable', |
| 244 'TestCmd' |
| 245 ] |
| 246 |
| 247 try: |
| 248 import difflib |
| 249 except ImportError: |
| 250 __all__.append('simple_diff') |
| 251 |
| 252 def is_List(e): |
| 253 return type(e) is types.ListType \ |
| 254 or isinstance(e, UserList.UserList) |
| 255 |
| 256 try: |
| 257 from UserString import UserString |
| 258 except ImportError: |
| 259 class UserString: |
| 260 pass |
| 261 |
| 262 if hasattr(types, 'UnicodeType'): |
| 263 def is_String(e): |
| 264 return type(e) is types.StringType \ |
| 265 or type(e) is types.UnicodeType \ |
| 266 or isinstance(e, UserString) |
| 267 else: |
| 268 def is_String(e): |
| 269 return type(e) is types.StringType or isinstance(e, UserString) |
| 270 |
| 271 tempfile.template = 'testcmd.' |
| 272 if os.name in ('posix', 'nt'): |
| 273 tempfile.template = 'testcmd.' + str(os.getpid()) + '.' |
| 274 else: |
| 275 tempfile.template = 'testcmd.' |
| 276 |
| 277 re_space = re.compile('\s') |
| 278 |
| 279 _Cleanup = [] |
| 280 |
| 281 _chain_to_exitfunc = None |
| 282 |
| 283 def _clean(): |
| 284 global _Cleanup |
| 285 cleanlist = filter(None, _Cleanup) |
| 286 del _Cleanup[:] |
| 287 cleanlist.reverse() |
| 288 for test in cleanlist: |
| 289 test.cleanup() |
| 290 if _chain_to_exitfunc: |
| 291 _chain_to_exitfunc() |
| 292 |
| 293 try: |
| 294 import atexit |
| 295 except ImportError: |
| 296 # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc |
| 297 try: |
| 298 _chain_to_exitfunc = sys.exitfunc |
| 299 except AttributeError: |
| 300 pass |
| 301 sys.exitfunc = _clean |
| 302 else: |
| 303 atexit.register(_clean) |
| 304 |
| 305 try: |
| 306 zip |
| 307 except NameError: |
| 308 def zip(*lists): |
| 309 result = [] |
| 310 for i in xrange(min(map(len, lists))): |
| 311 result.append(tuple(map(lambda l, i=i: l[i], lists))) |
| 312 return result |
| 313 |
| 314 class Collector: |
| 315 def __init__(self, top): |
| 316 self.entries = [top] |
| 317 def __call__(self, arg, dirname, names): |
| 318 pathjoin = lambda n, d=dirname: os.path.join(d, n) |
| 319 self.entries.extend(map(pathjoin, names)) |
| 320 |
| 321 def _caller(tblist, skip): |
| 322 string = "" |
| 323 arr = [] |
| 324 for file, line, name, text in tblist: |
| 325 if file[-10:] == "TestCmd.py": |
| 326 break |
| 327 arr = [(file, line, name, text)] + arr |
| 328 atfrom = "at" |
| 329 for file, line, name, text in arr[skip:]: |
| 330 if name in ("?", "<module>"): |
| 331 name = "" |
| 332 else: |
| 333 name = " (" + name + ")" |
| 334 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name)) |
| 335 atfrom = "\tfrom" |
| 336 return string |
| 337 |
| 338 def fail_test(self = None, condition = 1, function = None, skip = 0): |
| 339 """Cause the test to fail. |
| 340 |
| 341 By default, the fail_test() method reports that the test FAILED |
| 342 and exits with a status of 1. If a condition argument is supplied, |
| 343 the test fails only if the condition is true. |
| 344 """ |
| 345 if not condition: |
| 346 return |
| 347 if not function is None: |
| 348 function() |
| 349 of = "" |
| 350 desc = "" |
| 351 sep = " " |
| 352 if not self is None: |
| 353 if self.program: |
| 354 of = " of " + self.program |
| 355 sep = "\n\t" |
| 356 if self.description: |
| 357 desc = " [" + self.description + "]" |
| 358 sep = "\n\t" |
| 359 |
| 360 at = _caller(traceback.extract_stack(), skip) |
| 361 sys.stderr.write("FAILED test" + of + desc + sep + at) |
| 362 |
| 363 sys.exit(1) |
| 364 |
| 365 def no_result(self = None, condition = 1, function = None, skip = 0): |
| 366 """Causes a test to exit with no valid result. |
| 367 |
| 368 By default, the no_result() method reports NO RESULT for the test |
| 369 and exits with a status of 2. If a condition argument is supplied, |
| 370 the test fails only if the condition is true. |
| 371 """ |
| 372 if not condition: |
| 373 return |
| 374 if not function is None: |
| 375 function() |
| 376 of = "" |
| 377 desc = "" |
| 378 sep = " " |
| 379 if not self is None: |
| 380 if self.program: |
| 381 of = " of " + self.program |
| 382 sep = "\n\t" |
| 383 if self.description: |
| 384 desc = " [" + self.description + "]" |
| 385 sep = "\n\t" |
| 386 |
| 387 at = _caller(traceback.extract_stack(), skip) |
| 388 sys.stderr.write("NO RESULT for test" + of + desc + sep + at) |
| 389 |
| 390 sys.exit(2) |
| 391 |
| 392 def pass_test(self = None, condition = 1, function = None): |
| 393 """Causes a test to pass. |
| 394 |
| 395 By default, the pass_test() method reports PASSED for the test |
| 396 and exits with a status of 0. If a condition argument is supplied, |
| 397 the test passes only if the condition is true. |
| 398 """ |
| 399 if not condition: |
| 400 return |
| 401 if not function is None: |
| 402 function() |
| 403 sys.stderr.write("PASSED\n") |
| 404 sys.exit(0) |
| 405 |
| 406 def match_exact(lines = None, matches = None): |
| 407 """ |
| 408 """ |
| 409 if not is_List(lines): |
| 410 lines = string.split(lines, "\n") |
| 411 if not is_List(matches): |
| 412 matches = string.split(matches, "\n") |
| 413 if len(lines) != len(matches): |
| 414 return |
| 415 for i in range(len(lines)): |
| 416 if lines[i] != matches[i]: |
| 417 return |
| 418 return 1 |
| 419 |
| 420 def match_re(lines = None, res = None): |
| 421 """ |
| 422 """ |
| 423 if not is_List(lines): |
| 424 lines = string.split(lines, "\n") |
| 425 if not is_List(res): |
| 426 res = string.split(res, "\n") |
| 427 if len(lines) != len(res): |
| 428 return |
| 429 for i in range(len(lines)): |
| 430 s = "^" + res[i] + "$" |
| 431 try: |
| 432 expr = re.compile(s) |
| 433 except re.error, e: |
| 434 msg = "Regular expression error in %s: %s" |
| 435 raise re.error, msg % (repr(s), e[0]) |
| 436 if not expr.search(lines[i]): |
| 437 return |
| 438 return 1 |
| 439 |
| 440 def match_re_dotall(lines = None, res = None): |
| 441 """ |
| 442 """ |
| 443 if not type(lines) is type(""): |
| 444 lines = string.join(lines, "\n") |
| 445 if not type(res) is type(""): |
| 446 res = string.join(res, "\n") |
| 447 s = "^" + res + "$" |
| 448 try: |
| 449 expr = re.compile(s, re.DOTALL) |
| 450 except re.error, e: |
| 451 msg = "Regular expression error in %s: %s" |
| 452 raise re.error, msg % (repr(s), e[0]) |
| 453 if expr.match(lines): |
| 454 return 1 |
| 455 |
| 456 try: |
| 457 import difflib |
| 458 except ImportError: |
| 459 pass |
| 460 else: |
| 461 def simple_diff(a, b, fromfile='', tofile='', |
| 462 fromfiledate='', tofiledate='', n=3, lineterm='\n'): |
| 463 """ |
| 464 A function with the same calling signature as difflib.context_diff |
| 465 (diff -c) and difflib.unified_diff (diff -u) but which prints |
| 466 output like the simple, unadorned 'diff" command. |
| 467 """ |
| 468 sm = difflib.SequenceMatcher(None, a, b) |
| 469 def comma(x1, x2): |
| 470 return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2) |
| 471 result = [] |
| 472 for op, a1, a2, b1, b2 in sm.get_opcodes(): |
| 473 if op == 'delete': |
| 474 result.append("%sd%d" % (comma(a1, a2), b1)) |
| 475 result.extend(map(lambda l: '< ' + l, a[a1:a2])) |
| 476 elif op == 'insert': |
| 477 result.append("%da%s" % (a1, comma(b1, b2))) |
| 478 result.extend(map(lambda l: '> ' + l, b[b1:b2])) |
| 479 elif op == 'replace': |
| 480 result.append("%sc%s" % (comma(a1, a2), comma(b1, b2))) |
| 481 result.extend(map(lambda l: '< ' + l, a[a1:a2])) |
| 482 result.append('---') |
| 483 result.extend(map(lambda l: '> ' + l, b[b1:b2])) |
| 484 return result |
| 485 |
| 486 def diff_re(a, b, fromfile='', tofile='', |
| 487 fromfiledate='', tofiledate='', n=3, lineterm='\n'): |
| 488 """ |
| 489 A simple "diff" of two sets of lines when the expected lines |
| 490 are regular expressions. This is a really dumb thing that |
| 491 just compares each line in turn, so it doesn't look for |
| 492 chunks of matching lines and the like--but at least it lets |
| 493 you know exactly which line first didn't compare correctl... |
| 494 """ |
| 495 result = [] |
| 496 diff = len(a) - len(b) |
| 497 if diff < 0: |
| 498 a = a + ['']*(-diff) |
| 499 elif diff > 0: |
| 500 b = b + ['']*diff |
| 501 i = 0 |
| 502 for aline, bline in zip(a, b): |
| 503 s = "^" + aline + "$" |
| 504 try: |
| 505 expr = re.compile(s) |
| 506 except re.error, e: |
| 507 msg = "Regular expression error in %s: %s" |
| 508 raise re.error, msg % (repr(s), e[0]) |
| 509 if not expr.search(bline): |
| 510 result.append("%sc%s" % (i+1, i+1)) |
| 511 result.append('< ' + repr(a[i])) |
| 512 result.append('---') |
| 513 result.append('> ' + repr(b[i])) |
| 514 i = i+1 |
| 515 return result |
| 516 |
| 517 if os.name == 'java': |
| 518 |
| 519 python_executable = os.path.join(sys.prefix, 'jython') |
| 520 |
| 521 else: |
| 522 |
| 523 python_executable = sys.executable |
| 524 |
| 525 if sys.platform == 'win32': |
| 526 |
| 527 default_sleep_seconds = 2 |
| 528 |
| 529 def where_is(file, path=None, pathext=None): |
| 530 if path is None: |
| 531 path = os.environ['PATH'] |
| 532 if is_String(path): |
| 533 path = string.split(path, os.pathsep) |
| 534 if pathext is None: |
| 535 pathext = os.environ['PATHEXT'] |
| 536 if is_String(pathext): |
| 537 pathext = string.split(pathext, os.pathsep) |
| 538 for ext in pathext: |
| 539 if string.lower(ext) == string.lower(file[-len(ext):]): |
| 540 pathext = [''] |
| 541 break |
| 542 for dir in path: |
| 543 f = os.path.join(dir, file) |
| 544 for ext in pathext: |
| 545 fext = f + ext |
| 546 if os.path.isfile(fext): |
| 547 return fext |
| 548 return None |
| 549 |
| 550 else: |
| 551 |
| 552 def where_is(file, path=None, pathext=None): |
| 553 if path is None: |
| 554 path = os.environ['PATH'] |
| 555 if is_String(path): |
| 556 path = string.split(path, os.pathsep) |
| 557 for dir in path: |
| 558 f = os.path.join(dir, file) |
| 559 if os.path.isfile(f): |
| 560 try: |
| 561 st = os.stat(f) |
| 562 except OSError: |
| 563 continue |
| 564 if stat.S_IMODE(st[stat.ST_MODE]) & 0111: |
| 565 return f |
| 566 return None |
| 567 |
| 568 default_sleep_seconds = 1 |
| 569 |
| 570 |
| 571 |
| 572 try: |
| 573 import subprocess |
| 574 except ImportError: |
| 575 # The subprocess module doesn't exist in this version of Python, |
| 576 # so we're going to cobble up something that looks just enough |
| 577 # like its API for our purposes below. |
| 578 import new |
| 579 |
| 580 subprocess = new.module('subprocess') |
| 581 |
| 582 subprocess.PIPE = 'PIPE' |
| 583 subprocess.STDOUT = 'STDOUT' |
| 584 subprocess.mswindows = (sys.platform == 'win32') |
| 585 |
| 586 try: |
| 587 import popen2 |
| 588 popen2.Popen3 |
| 589 except AttributeError: |
| 590 class Popen3: |
| 591 universal_newlines = 1 |
| 592 def __init__(self, command, **kw): |
| 593 if sys.platform == 'win32' and command[0] == '"': |
| 594 command = '"' + command + '"' |
| 595 (stdin, stdout, stderr) = os.popen3(' ' + command) |
| 596 self.stdin = stdin |
| 597 self.stdout = stdout |
| 598 self.stderr = stderr |
| 599 def close_output(self): |
| 600 self.stdout.close() |
| 601 self.resultcode = self.stderr.close() |
| 602 def wait(self): |
| 603 resultcode = self.resultcode |
| 604 if os.WIFEXITED(resultcode): |
| 605 return os.WEXITSTATUS(resultcode) |
| 606 elif os.WIFSIGNALED(resultcode): |
| 607 return os.WTERMSIG(resultcode) |
| 608 else: |
| 609 return None |
| 610 |
| 611 else: |
| 612 try: |
| 613 popen2.Popen4 |
| 614 except AttributeError: |
| 615 # A cribbed Popen4 class, with some retrofitted code from |
| 616 # the Python 1.5 Popen3 class methods to do certain things |
| 617 # by hand. |
| 618 class Popen4(popen2.Popen3): |
| 619 childerr = None |
| 620 |
| 621 def __init__(self, cmd, bufsize=-1): |
| 622 p2cread, p2cwrite = os.pipe() |
| 623 c2pread, c2pwrite = os.pipe() |
| 624 self.pid = os.fork() |
| 625 if self.pid == 0: |
| 626 # Child |
| 627 os.dup2(p2cread, 0) |
| 628 os.dup2(c2pwrite, 1) |
| 629 os.dup2(c2pwrite, 2) |
| 630 for i in range(3, popen2.MAXFD): |
| 631 try: |
| 632 os.close(i) |
| 633 except: pass |
| 634 try: |
| 635 os.execvp(cmd[0], cmd) |
| 636 finally: |
| 637 os._exit(1) |
| 638 # Shouldn't come here, I guess |
| 639 os._exit(1) |
| 640 os.close(p2cread) |
| 641 self.tochild = os.fdopen(p2cwrite, 'w', bufsize) |
| 642 os.close(c2pwrite) |
| 643 self.fromchild = os.fdopen(c2pread, 'r', bufsize) |
| 644 popen2._active.append(self) |
| 645 |
| 646 popen2.Popen4 = Popen4 |
| 647 |
| 648 class Popen3(popen2.Popen3, popen2.Popen4): |
| 649 universal_newlines = 1 |
| 650 def __init__(self, command, **kw): |
| 651 if kw.get('stderr') == 'STDOUT': |
| 652 apply(popen2.Popen4.__init__, (self, command, 1)) |
| 653 else: |
| 654 apply(popen2.Popen3.__init__, (self, command, 1)) |
| 655 self.stdin = self.tochild |
| 656 self.stdout = self.fromchild |
| 657 self.stderr = self.childerr |
| 658 def wait(self, *args, **kw): |
| 659 resultcode = apply(popen2.Popen3.wait, (self,)+args, kw) |
| 660 if os.WIFEXITED(resultcode): |
| 661 return os.WEXITSTATUS(resultcode) |
| 662 elif os.WIFSIGNALED(resultcode): |
| 663 return os.WTERMSIG(resultcode) |
| 664 else: |
| 665 return None |
| 666 |
| 667 subprocess.Popen = Popen3 |
| 668 |
| 669 |
| 670 |
| 671 # From Josiah Carlson, |
| 672 # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Window
s and Posix platforms |
| 673 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554 |
| 674 |
| 675 PIPE = subprocess.PIPE |
| 676 |
| 677 if subprocess.mswindows: |
| 678 from win32file import ReadFile, WriteFile |
| 679 from win32pipe import PeekNamedPipe |
| 680 import msvcrt |
| 681 else: |
| 682 import select |
| 683 import fcntl |
| 684 |
| 685 try: fcntl.F_GETFL |
| 686 except AttributeError: fcntl.F_GETFL = 3 |
| 687 |
| 688 try: fcntl.F_SETFL |
| 689 except AttributeError: fcntl.F_SETFL = 4 |
| 690 |
| 691 class Popen(subprocess.Popen): |
| 692 def recv(self, maxsize=None): |
| 693 return self._recv('stdout', maxsize) |
| 694 |
| 695 def recv_err(self, maxsize=None): |
| 696 return self._recv('stderr', maxsize) |
| 697 |
| 698 def send_recv(self, input='', maxsize=None): |
| 699 return self.send(input), self.recv(maxsize), self.recv_err(maxsize) |
| 700 |
| 701 def get_conn_maxsize(self, which, maxsize): |
| 702 if maxsize is None: |
| 703 maxsize = 1024 |
| 704 elif maxsize < 1: |
| 705 maxsize = 1 |
| 706 return getattr(self, which), maxsize |
| 707 |
| 708 def _close(self, which): |
| 709 getattr(self, which).close() |
| 710 setattr(self, which, None) |
| 711 |
| 712 if subprocess.mswindows: |
| 713 def send(self, input): |
| 714 if not self.stdin: |
| 715 return None |
| 716 |
| 717 try: |
| 718 x = msvcrt.get_osfhandle(self.stdin.fileno()) |
| 719 (errCode, written) = WriteFile(x, input) |
| 720 except ValueError: |
| 721 return self._close('stdin') |
| 722 except (subprocess.pywintypes.error, Exception), why: |
| 723 if why[0] in (109, errno.ESHUTDOWN): |
| 724 return self._close('stdin') |
| 725 raise |
| 726 |
| 727 return written |
| 728 |
| 729 def _recv(self, which, maxsize): |
| 730 conn, maxsize = self.get_conn_maxsize(which, maxsize) |
| 731 if conn is None: |
| 732 return None |
| 733 |
| 734 try: |
| 735 x = msvcrt.get_osfhandle(conn.fileno()) |
| 736 (read, nAvail, nMessage) = PeekNamedPipe(x, 0) |
| 737 if maxsize < nAvail: |
| 738 nAvail = maxsize |
| 739 if nAvail > 0: |
| 740 (errCode, read) = ReadFile(x, nAvail, None) |
| 741 except ValueError: |
| 742 return self._close(which) |
| 743 except (subprocess.pywintypes.error, Exception), why: |
| 744 if why[0] in (109, errno.ESHUTDOWN): |
| 745 return self._close(which) |
| 746 raise |
| 747 |
| 748 #if self.universal_newlines: |
| 749 # read = self._translate_newlines(read) |
| 750 return read |
| 751 |
| 752 else: |
| 753 def send(self, input): |
| 754 if not self.stdin: |
| 755 return None |
| 756 |
| 757 if not select.select([], [self.stdin], [], 0)[1]: |
| 758 return 0 |
| 759 |
| 760 try: |
| 761 written = os.write(self.stdin.fileno(), input) |
| 762 except OSError, why: |
| 763 if why[0] == errno.EPIPE: #broken pipe |
| 764 return self._close('stdin') |
| 765 raise |
| 766 |
| 767 return written |
| 768 |
| 769 def _recv(self, which, maxsize): |
| 770 conn, maxsize = self.get_conn_maxsize(which, maxsize) |
| 771 if conn is None: |
| 772 return None |
| 773 |
| 774 try: |
| 775 flags = fcntl.fcntl(conn, fcntl.F_GETFL) |
| 776 except TypeError: |
| 777 flags = None |
| 778 else: |
| 779 if not conn.closed: |
| 780 fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK) |
| 781 |
| 782 try: |
| 783 if not select.select([conn], [], [], 0)[0]: |
| 784 return '' |
| 785 |
| 786 r = conn.read(maxsize) |
| 787 if not r: |
| 788 return self._close(which) |
| 789 |
| 790 #if self.universal_newlines: |
| 791 # r = self._translate_newlines(r) |
| 792 return r |
| 793 finally: |
| 794 if not conn.closed and not flags is None: |
| 795 fcntl.fcntl(conn, fcntl.F_SETFL, flags) |
| 796 |
| 797 disconnect_message = "Other end disconnected!" |
| 798 |
| 799 def recv_some(p, t=.1, e=1, tr=5, stderr=0): |
| 800 if tr < 1: |
| 801 tr = 1 |
| 802 x = time.time()+t |
| 803 y = [] |
| 804 r = '' |
| 805 pr = p.recv |
| 806 if stderr: |
| 807 pr = p.recv_err |
| 808 while time.time() < x or r: |
| 809 r = pr() |
| 810 if r is None: |
| 811 if e: |
| 812 raise Exception(disconnect_message) |
| 813 else: |
| 814 break |
| 815 elif r: |
| 816 y.append(r) |
| 817 else: |
| 818 time.sleep(max((x-time.time())/tr, 0)) |
| 819 return ''.join(y) |
| 820 |
| 821 # TODO(3.0: rewrite to use memoryview() |
| 822 def send_all(p, data): |
| 823 while len(data): |
| 824 sent = p.send(data) |
| 825 if sent is None: |
| 826 raise Exception(disconnect_message) |
| 827 data = buffer(data, sent) |
| 828 |
| 829 |
| 830 |
| 831 class TestCmd(object): |
| 832 """Class TestCmd |
| 833 """ |
| 834 |
| 835 def __init__(self, description = None, |
| 836 program = None, |
| 837 interpreter = None, |
| 838 workdir = None, |
| 839 subdir = None, |
| 840 verbose = None, |
| 841 match = None, |
| 842 diff = None, |
| 843 combine = 0, |
| 844 universal_newlines = 1): |
| 845 self._cwd = os.getcwd() |
| 846 self.description_set(description) |
| 847 self.program_set(program) |
| 848 self.interpreter_set(interpreter) |
| 849 if verbose is None: |
| 850 try: |
| 851 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) ) |
| 852 except ValueError: |
| 853 verbose = 0 |
| 854 self.verbose_set(verbose) |
| 855 self.combine = combine |
| 856 self.universal_newlines = universal_newlines |
| 857 if not match is None: |
| 858 self.match_function = match |
| 859 else: |
| 860 self.match_function = match_re |
| 861 if not diff is None: |
| 862 self.diff_function = diff |
| 863 else: |
| 864 try: |
| 865 difflib |
| 866 except NameError: |
| 867 pass |
| 868 else: |
| 869 self.diff_function = simple_diff |
| 870 #self.diff_function = difflib.context_diff |
| 871 #self.diff_function = difflib.unified_diff |
| 872 self._dirlist = [] |
| 873 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} |
| 874 if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '': |
| 875 self._preserve['pass_test'] = os.environ['PRESERVE'] |
| 876 self._preserve['fail_test'] = os.environ['PRESERVE'] |
| 877 self._preserve['no_result'] = os.environ['PRESERVE'] |
| 878 else: |
| 879 try: |
| 880 self._preserve['pass_test'] = os.environ['PRESERVE_PASS'] |
| 881 except KeyError: |
| 882 pass |
| 883 try: |
| 884 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL'] |
| 885 except KeyError: |
| 886 pass |
| 887 try: |
| 888 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT'] |
| 889 except KeyError: |
| 890 pass |
| 891 self._stdout = [] |
| 892 self._stderr = [] |
| 893 self.status = None |
| 894 self.condition = 'no_result' |
| 895 self.workdir_set(workdir) |
| 896 self.subdir(subdir) |
| 897 |
| 898 def __del__(self): |
| 899 self.cleanup() |
| 900 |
| 901 def __repr__(self): |
| 902 return "%x" % id(self) |
| 903 |
| 904 banner_char = '=' |
| 905 banner_width = 80 |
| 906 |
| 907 def banner(self, s, width=None): |
| 908 if width is None: |
| 909 width = self.banner_width |
| 910 return s + self.banner_char * (width - len(s)) |
| 911 |
| 912 if os.name == 'posix': |
| 913 |
| 914 def escape(self, arg): |
| 915 "escape shell special characters" |
| 916 slash = '\\' |
| 917 special = '"$' |
| 918 |
| 919 arg = string.replace(arg, slash, slash+slash) |
| 920 for c in special: |
| 921 arg = string.replace(arg, c, slash+c) |
| 922 |
| 923 if re_space.search(arg): |
| 924 arg = '"' + arg + '"' |
| 925 return arg |
| 926 |
| 927 else: |
| 928 |
| 929 # Windows does not allow special characters in file names |
| 930 # anyway, so no need for an escape function, we will just quote |
| 931 # the arg. |
| 932 def escape(self, arg): |
| 933 if re_space.search(arg): |
| 934 arg = '"' + arg + '"' |
| 935 return arg |
| 936 |
| 937 def canonicalize(self, path): |
| 938 if is_List(path): |
| 939 path = apply(os.path.join, tuple(path)) |
| 940 if not os.path.isabs(path): |
| 941 path = os.path.join(self.workdir, path) |
| 942 return path |
| 943 |
| 944 def chmod(self, path, mode): |
| 945 """Changes permissions on the specified file or directory |
| 946 path name.""" |
| 947 path = self.canonicalize(path) |
| 948 os.chmod(path, mode) |
| 949 |
| 950 def cleanup(self, condition = None): |
| 951 """Removes any temporary working directories for the specified |
| 952 TestCmd environment. If the environment variable PRESERVE was |
| 953 set when the TestCmd environment was created, temporary working |
| 954 directories are not removed. If any of the environment variables |
| 955 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set |
| 956 when the TestCmd environment was created, then temporary working |
| 957 directories are not removed if the test passed, failed, or had |
| 958 no result, respectively. Temporary working directories are also |
| 959 preserved for conditions specified via the preserve method. |
| 960 |
| 961 Typically, this method is not called directly, but is used when |
| 962 the script exits to clean up temporary working directories as |
| 963 appropriate for the exit status. |
| 964 """ |
| 965 if not self._dirlist: |
| 966 return |
| 967 os.chdir(self._cwd) |
| 968 self.workdir = None |
| 969 if condition is None: |
| 970 condition = self.condition |
| 971 if self._preserve[condition]: |
| 972 for dir in self._dirlist: |
| 973 print "Preserved directory", dir |
| 974 else: |
| 975 list = self._dirlist[:] |
| 976 list.reverse() |
| 977 for dir in list: |
| 978 self.writable(dir, 1) |
| 979 shutil.rmtree(dir, ignore_errors = 1) |
| 980 self._dirlist = [] |
| 981 |
| 982 try: |
| 983 global _Cleanup |
| 984 _Cleanup.remove(self) |
| 985 except (AttributeError, ValueError): |
| 986 pass |
| 987 |
| 988 def command_args(self, program = None, |
| 989 interpreter = None, |
| 990 arguments = None): |
| 991 if program: |
| 992 if type(program) == type('') and not os.path.isabs(program): |
| 993 program = os.path.join(self._cwd, program) |
| 994 else: |
| 995 program = self.program |
| 996 if not interpreter: |
| 997 interpreter = self.interpreter |
| 998 if not type(program) in [type([]), type(())]: |
| 999 program = [program] |
| 1000 cmd = list(program) |
| 1001 if interpreter: |
| 1002 if not type(interpreter) in [type([]), type(())]: |
| 1003 interpreter = [interpreter] |
| 1004 cmd = list(interpreter) + cmd |
| 1005 if arguments: |
| 1006 if type(arguments) == type(''): |
| 1007 arguments = string.split(arguments) |
| 1008 cmd.extend(arguments) |
| 1009 return cmd |
| 1010 |
| 1011 def description_set(self, description): |
| 1012 """Set the description of the functionality being tested. |
| 1013 """ |
| 1014 self.description = description |
| 1015 |
| 1016 try: |
| 1017 difflib |
| 1018 except NameError: |
| 1019 def diff(self, a, b, name, *args, **kw): |
| 1020 print self.banner('Expected %s' % name) |
| 1021 print a |
| 1022 print self.banner('Actual %s' % name) |
| 1023 print b |
| 1024 else: |
| 1025 def diff(self, a, b, name, *args, **kw): |
| 1026 print self.banner(name) |
| 1027 args = (a.splitlines(), b.splitlines()) + args |
| 1028 lines = apply(self.diff_function, args, kw) |
| 1029 for l in lines: |
| 1030 print l |
| 1031 |
| 1032 def fail_test(self, condition = 1, function = None, skip = 0): |
| 1033 """Cause the test to fail. |
| 1034 """ |
| 1035 if not condition: |
| 1036 return |
| 1037 self.condition = 'fail_test' |
| 1038 fail_test(self = self, |
| 1039 condition = condition, |
| 1040 function = function, |
| 1041 skip = skip) |
| 1042 |
| 1043 def interpreter_set(self, interpreter): |
| 1044 """Set the program to be used to interpret the program |
| 1045 under test as a script. |
| 1046 """ |
| 1047 self.interpreter = interpreter |
| 1048 |
| 1049 def match(self, lines, matches): |
| 1050 """Compare actual and expected file contents. |
| 1051 """ |
| 1052 return self.match_function(lines, matches) |
| 1053 |
| 1054 def match_exact(self, lines, matches): |
| 1055 """Compare actual and expected file contents. |
| 1056 """ |
| 1057 return match_exact(lines, matches) |
| 1058 |
| 1059 def match_re(self, lines, res): |
| 1060 """Compare actual and expected file contents. |
| 1061 """ |
| 1062 return match_re(lines, res) |
| 1063 |
| 1064 def match_re_dotall(self, lines, res): |
| 1065 """Compare actual and expected file contents. |
| 1066 """ |
| 1067 return match_re_dotall(lines, res) |
| 1068 |
| 1069 def no_result(self, condition = 1, function = None, skip = 0): |
| 1070 """Report that the test could not be run. |
| 1071 """ |
| 1072 if not condition: |
| 1073 return |
| 1074 self.condition = 'no_result' |
| 1075 no_result(self = self, |
| 1076 condition = condition, |
| 1077 function = function, |
| 1078 skip = skip) |
| 1079 |
| 1080 def pass_test(self, condition = 1, function = None): |
| 1081 """Cause the test to pass. |
| 1082 """ |
| 1083 if not condition: |
| 1084 return |
| 1085 self.condition = 'pass_test' |
| 1086 pass_test(self = self, condition = condition, function = function) |
| 1087 |
| 1088 def preserve(self, *conditions): |
| 1089 """Arrange for the temporary working directories for the |
| 1090 specified TestCmd environment to be preserved for one or more |
| 1091 conditions. If no conditions are specified, arranges for |
| 1092 the temporary working directories to be preserved for all |
| 1093 conditions. |
| 1094 """ |
| 1095 if conditions is (): |
| 1096 conditions = ('pass_test', 'fail_test', 'no_result') |
| 1097 for cond in conditions: |
| 1098 self._preserve[cond] = 1 |
| 1099 |
| 1100 def program_set(self, program): |
| 1101 """Set the executable program or script to be tested. |
| 1102 """ |
| 1103 if program and not os.path.isabs(program): |
| 1104 program = os.path.join(self._cwd, program) |
| 1105 self.program = program |
| 1106 |
| 1107 def read(self, file, mode = 'rb'): |
| 1108 """Reads and returns the contents of the specified file name. |
| 1109 The file name may be a list, in which case the elements are |
| 1110 concatenated with the os.path.join() method. The file is |
| 1111 assumed to be under the temporary working directory unless it |
| 1112 is an absolute path name. The I/O mode for the file may |
| 1113 be specified; it must begin with an 'r'. The default is |
| 1114 'rb' (binary read). |
| 1115 """ |
| 1116 file = self.canonicalize(file) |
| 1117 if mode[0] != 'r': |
| 1118 raise ValueError, "mode must begin with 'r'" |
| 1119 return open(file, mode).read() |
| 1120 |
| 1121 def rmdir(self, dir): |
| 1122 """Removes the specified dir name. |
| 1123 The dir name may be a list, in which case the elements are |
| 1124 concatenated with the os.path.join() method. The dir is |
| 1125 assumed to be under the temporary working directory unless it |
| 1126 is an absolute path name. |
| 1127 The dir must be empty. |
| 1128 """ |
| 1129 dir = self.canonicalize(dir) |
| 1130 os.rmdir(dir) |
| 1131 |
| 1132 def start(self, program = None, |
| 1133 interpreter = None, |
| 1134 arguments = None, |
| 1135 universal_newlines = None, |
| 1136 **kw): |
| 1137 """ |
| 1138 Starts a program or script for the test environment. |
| 1139 |
| 1140 The specified program will have the original directory |
| 1141 prepended unless it is enclosed in a [list]. |
| 1142 """ |
| 1143 cmd = self.command_args(program, interpreter, arguments) |
| 1144 cmd_string = string.join(map(self.escape, cmd), ' ') |
| 1145 if self.verbose: |
| 1146 sys.stderr.write(cmd_string + "\n") |
| 1147 if universal_newlines is None: |
| 1148 universal_newlines = self.universal_newlines |
| 1149 |
| 1150 combine = kw.get('combine', self.combine) |
| 1151 if combine: |
| 1152 stderr_value = subprocess.STDOUT |
| 1153 else: |
| 1154 stderr_value = subprocess.PIPE |
| 1155 |
| 1156 return Popen(cmd, |
| 1157 stdin=subprocess.PIPE, |
| 1158 stdout=subprocess.PIPE, |
| 1159 stderr=stderr_value, |
| 1160 universal_newlines=universal_newlines) |
| 1161 |
| 1162 def finish(self, popen, **kw): |
| 1163 """ |
| 1164 Finishes and waits for the process being run under control of |
| 1165 the specified popen argument, recording the exit status, |
| 1166 standard output and error output. |
| 1167 """ |
| 1168 popen.stdin.close() |
| 1169 self.status = popen.wait() |
| 1170 if not self.status: |
| 1171 self.status = 0 |
| 1172 self._stdout.append(popen.stdout.read()) |
| 1173 if popen.stderr: |
| 1174 stderr = popen.stderr.read() |
| 1175 else: |
| 1176 stderr = '' |
| 1177 self._stderr.append(stderr) |
| 1178 |
| 1179 def run(self, program = None, |
| 1180 interpreter = None, |
| 1181 arguments = None, |
| 1182 chdir = None, |
| 1183 stdin = None, |
| 1184 universal_newlines = None): |
| 1185 """Runs a test of the program or script for the test |
| 1186 environment. Standard output and error output are saved for |
| 1187 future retrieval via the stdout() and stderr() methods. |
| 1188 |
| 1189 The specified program will have the original directory |
| 1190 prepended unless it is enclosed in a [list]. |
| 1191 """ |
| 1192 if chdir: |
| 1193 oldcwd = os.getcwd() |
| 1194 if not os.path.isabs(chdir): |
| 1195 chdir = os.path.join(self.workpath(chdir)) |
| 1196 if self.verbose: |
| 1197 sys.stderr.write("chdir(" + chdir + ")\n") |
| 1198 os.chdir(chdir) |
| 1199 p = self.start(program, interpreter, arguments, universal_newlines) |
| 1200 if stdin: |
| 1201 if is_List(stdin): |
| 1202 for line in stdin: |
| 1203 p.stdin.write(line) |
| 1204 else: |
| 1205 p.stdin.write(stdin) |
| 1206 p.stdin.close() |
| 1207 |
| 1208 out = p.stdout.read() |
| 1209 if p.stderr is None: |
| 1210 err = '' |
| 1211 else: |
| 1212 err = p.stderr.read() |
| 1213 try: |
| 1214 close_output = p.close_output |
| 1215 except AttributeError: |
| 1216 p.stdout.close() |
| 1217 if not p.stderr is None: |
| 1218 p.stderr.close() |
| 1219 else: |
| 1220 close_output() |
| 1221 |
| 1222 self._stdout.append(out) |
| 1223 self._stderr.append(err) |
| 1224 |
| 1225 self.status = p.wait() |
| 1226 if not self.status: |
| 1227 self.status = 0 |
| 1228 |
| 1229 if chdir: |
| 1230 os.chdir(oldcwd) |
| 1231 if self.verbose >= 2: |
| 1232 write = sys.stdout.write |
| 1233 write('============ STATUS: %d\n' % self.status) |
| 1234 out = self.stdout() |
| 1235 if out or self.verbose >= 3: |
| 1236 write('============ BEGIN STDOUT (len=%d):\n' % len(out)) |
| 1237 write(out) |
| 1238 write('============ END STDOUT\n') |
| 1239 err = self.stderr() |
| 1240 if err or self.verbose >= 3: |
| 1241 write('============ BEGIN STDERR (len=%d)\n' % len(err)) |
| 1242 write(err) |
| 1243 write('============ END STDERR\n') |
| 1244 |
| 1245 def sleep(self, seconds = default_sleep_seconds): |
| 1246 """Sleeps at least the specified number of seconds. If no |
| 1247 number is specified, sleeps at least the minimum number of |
| 1248 seconds necessary to advance file time stamps on the current |
| 1249 system. Sleeping more seconds is all right. |
| 1250 """ |
| 1251 time.sleep(seconds) |
| 1252 |
| 1253 def stderr(self, run = None): |
| 1254 """Returns the error output from the specified run number. |
| 1255 If there is no specified run number, then returns the error |
| 1256 output of the last run. If the run number is less than zero, |
| 1257 then returns the error output from that many runs back from the |
| 1258 current run. |
| 1259 """ |
| 1260 if not run: |
| 1261 run = len(self._stderr) |
| 1262 elif run < 0: |
| 1263 run = len(self._stderr) + run |
| 1264 run = run - 1 |
| 1265 return self._stderr[run] |
| 1266 |
| 1267 def stdout(self, run = None): |
| 1268 """Returns the standard output from the specified run number. |
| 1269 If there is no specified run number, then returns the standard |
| 1270 output of the last run. If the run number is less than zero, |
| 1271 then returns the standard output from that many runs back from |
| 1272 the current run. |
| 1273 """ |
| 1274 if not run: |
| 1275 run = len(self._stdout) |
| 1276 elif run < 0: |
| 1277 run = len(self._stdout) + run |
| 1278 run = run - 1 |
| 1279 return self._stdout[run] |
| 1280 |
| 1281 def subdir(self, *subdirs): |
| 1282 """Create new subdirectories under the temporary working |
| 1283 directory, one for each argument. An argument may be a list, |
| 1284 in which case the list elements are concatenated using the |
| 1285 os.path.join() method. Subdirectories multiple levels deep |
| 1286 must be created using a separate argument for each level: |
| 1287 |
| 1288 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory']) |
| 1289 |
| 1290 Returns the number of subdirectories actually created. |
| 1291 """ |
| 1292 count = 0 |
| 1293 for sub in subdirs: |
| 1294 if sub is None: |
| 1295 continue |
| 1296 if is_List(sub): |
| 1297 sub = apply(os.path.join, tuple(sub)) |
| 1298 new = os.path.join(self.workdir, sub) |
| 1299 try: |
| 1300 os.mkdir(new) |
| 1301 except OSError: |
| 1302 pass |
| 1303 else: |
| 1304 count = count + 1 |
| 1305 return count |
| 1306 |
| 1307 def symlink(self, target, link): |
| 1308 """Creates a symlink to the specified target. |
| 1309 The link name may be a list, in which case the elements are |
| 1310 concatenated with the os.path.join() method. The link is |
| 1311 assumed to be under the temporary working directory unless it |
| 1312 is an absolute path name. The target is *not* assumed to be |
| 1313 under the temporary working directory. |
| 1314 """ |
| 1315 link = self.canonicalize(link) |
| 1316 os.symlink(target, link) |
| 1317 |
| 1318 def tempdir(self, path=None): |
| 1319 """Creates a temporary directory. |
| 1320 A unique directory name is generated if no path name is specified. |
| 1321 The directory is created, and will be removed when the TestCmd |
| 1322 object is destroyed. |
| 1323 """ |
| 1324 if path is None: |
| 1325 try: |
| 1326 path = tempfile.mktemp(prefix=tempfile.template) |
| 1327 except TypeError: |
| 1328 path = tempfile.mktemp() |
| 1329 os.mkdir(path) |
| 1330 |
| 1331 # Symlinks in the path will report things |
| 1332 # differently from os.getcwd(), so chdir there |
| 1333 # and back to fetch the canonical path. |
| 1334 cwd = os.getcwd() |
| 1335 try: |
| 1336 os.chdir(path) |
| 1337 path = os.getcwd() |
| 1338 finally: |
| 1339 os.chdir(cwd) |
| 1340 |
| 1341 # Uppercase the drive letter since the case of drive |
| 1342 # letters is pretty much random on win32: |
| 1343 drive,rest = os.path.splitdrive(path) |
| 1344 if drive: |
| 1345 path = string.upper(drive) + rest |
| 1346 |
| 1347 # |
| 1348 self._dirlist.append(path) |
| 1349 global _Cleanup |
| 1350 try: |
| 1351 _Cleanup.index(self) |
| 1352 except ValueError: |
| 1353 _Cleanup.append(self) |
| 1354 |
| 1355 return path |
| 1356 |
| 1357 def touch(self, path, mtime=None): |
| 1358 """Updates the modification time on the specified file or |
| 1359 directory path name. The default is to update to the |
| 1360 current time if no explicit modification time is specified. |
| 1361 """ |
| 1362 path = self.canonicalize(path) |
| 1363 atime = os.path.getatime(path) |
| 1364 if mtime is None: |
| 1365 mtime = time.time() |
| 1366 os.utime(path, (atime, mtime)) |
| 1367 |
| 1368 def unlink(self, file): |
| 1369 """Unlinks the specified file name. |
| 1370 The file name may be a list, in which case the elements are |
| 1371 concatenated with the os.path.join() method. The file is |
| 1372 assumed to be under the temporary working directory unless it |
| 1373 is an absolute path name. |
| 1374 """ |
| 1375 file = self.canonicalize(file) |
| 1376 os.unlink(file) |
| 1377 |
| 1378 def verbose_set(self, verbose): |
| 1379 """Set the verbose level. |
| 1380 """ |
| 1381 self.verbose = verbose |
| 1382 |
| 1383 def where_is(self, file, path=None, pathext=None): |
| 1384 """Find an executable file. |
| 1385 """ |
| 1386 if is_List(file): |
| 1387 file = apply(os.path.join, tuple(file)) |
| 1388 if not os.path.isabs(file): |
| 1389 file = where_is(file, path, pathext) |
| 1390 return file |
| 1391 |
| 1392 def workdir_set(self, path): |
| 1393 """Creates a temporary working directory with the specified |
| 1394 path name. If the path is a null string (''), a unique |
| 1395 directory name is created. |
| 1396 """ |
| 1397 if (path != None): |
| 1398 if path == '': |
| 1399 path = None |
| 1400 path = self.tempdir(path) |
| 1401 self.workdir = path |
| 1402 |
| 1403 def workpath(self, *args): |
| 1404 """Returns the absolute path name to a subdirectory or file |
| 1405 within the current temporary working directory. Concatenates |
| 1406 the temporary working directory name with the specified |
| 1407 arguments using the os.path.join() method. |
| 1408 """ |
| 1409 return apply(os.path.join, (self.workdir,) + tuple(args)) |
| 1410 |
| 1411 def readable(self, top, read=1): |
| 1412 """Make the specified directory tree readable (read == 1) |
| 1413 or not (read == None). |
| 1414 |
| 1415 This method has no effect on Windows systems, which use a |
| 1416 completely different mechanism to control file readability. |
| 1417 """ |
| 1418 |
| 1419 if sys.platform == 'win32': |
| 1420 return |
| 1421 |
| 1422 if read: |
| 1423 def do_chmod(fname): |
| 1424 try: st = os.stat(fname) |
| 1425 except OSError: pass |
| 1426 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD
)) |
| 1427 else: |
| 1428 def do_chmod(fname): |
| 1429 try: st = os.stat(fname) |
| 1430 except OSError: pass |
| 1431 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREA
D)) |
| 1432 |
| 1433 if os.path.isfile(top): |
| 1434 # If it's a file, that's easy, just chmod it. |
| 1435 do_chmod(top) |
| 1436 elif read: |
| 1437 # It's a directory and we're trying to turn on read |
| 1438 # permission, so it's also pretty easy, just chmod the |
| 1439 # directory and then chmod every entry on our walk down the |
| 1440 # tree. Because os.path.walk() is top-down, we'll enable |
| 1441 # read permission on any directories that have it disabled |
| 1442 # before os.path.walk() tries to list their contents. |
| 1443 do_chmod(top) |
| 1444 |
| 1445 def chmod_entries(arg, dirname, names, do_chmod=do_chmod): |
| 1446 for n in names: |
| 1447 do_chmod(os.path.join(dirname, n)) |
| 1448 |
| 1449 os.path.walk(top, chmod_entries, None) |
| 1450 else: |
| 1451 # It's a directory and we're trying to turn off read |
| 1452 # permission, which means we have to chmod the directoreis |
| 1453 # in the tree bottom-up, lest disabling read permission from |
| 1454 # the top down get in the way of being able to get at lower |
| 1455 # parts of the tree. But os.path.walk() visits things top |
| 1456 # down, so we just use an object to collect a list of all |
| 1457 # of the entries in the tree, reverse the list, and then |
| 1458 # chmod the reversed (bottom-up) list. |
| 1459 col = Collector(top) |
| 1460 os.path.walk(top, col, None) |
| 1461 col.entries.reverse() |
| 1462 for d in col.entries: do_chmod(d) |
| 1463 |
| 1464 def writable(self, top, write=1): |
| 1465 """Make the specified directory tree writable (write == 1) |
| 1466 or not (write == None). |
| 1467 """ |
| 1468 |
| 1469 if sys.platform == 'win32': |
| 1470 |
| 1471 if write: |
| 1472 def do_chmod(fname): |
| 1473 try: os.chmod(fname, stat.S_IWRITE) |
| 1474 except OSError: pass |
| 1475 else: |
| 1476 def do_chmod(fname): |
| 1477 try: os.chmod(fname, stat.S_IREAD) |
| 1478 except OSError: pass |
| 1479 |
| 1480 else: |
| 1481 |
| 1482 if write: |
| 1483 def do_chmod(fname): |
| 1484 try: st = os.stat(fname) |
| 1485 except OSError: pass |
| 1486 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200)) |
| 1487 else: |
| 1488 def do_chmod(fname): |
| 1489 try: st = os.stat(fname) |
| 1490 except OSError: pass |
| 1491 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200)) |
| 1492 |
| 1493 if os.path.isfile(top): |
| 1494 do_chmod(top) |
| 1495 else: |
| 1496 col = Collector(top) |
| 1497 os.path.walk(top, col, None) |
| 1498 for d in col.entries: do_chmod(d) |
| 1499 |
| 1500 def executable(self, top, execute=1): |
| 1501 """Make the specified directory tree executable (execute == 1) |
| 1502 or not (execute == None). |
| 1503 |
| 1504 This method has no effect on Windows systems, which use a |
| 1505 completely different mechanism to control file executability. |
| 1506 """ |
| 1507 |
| 1508 if sys.platform == 'win32': |
| 1509 return |
| 1510 |
| 1511 if execute: |
| 1512 def do_chmod(fname): |
| 1513 try: st = os.stat(fname) |
| 1514 except OSError: pass |
| 1515 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC
)) |
| 1516 else: |
| 1517 def do_chmod(fname): |
| 1518 try: st = os.stat(fname) |
| 1519 except OSError: pass |
| 1520 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXE
C)) |
| 1521 |
| 1522 if os.path.isfile(top): |
| 1523 # If it's a file, that's easy, just chmod it. |
| 1524 do_chmod(top) |
| 1525 elif execute: |
| 1526 # It's a directory and we're trying to turn on execute |
| 1527 # permission, so it's also pretty easy, just chmod the |
| 1528 # directory and then chmod every entry on our walk down the |
| 1529 # tree. Because os.path.walk() is top-down, we'll enable |
| 1530 # execute permission on any directories that have it disabled |
| 1531 # before os.path.walk() tries to list their contents. |
| 1532 do_chmod(top) |
| 1533 |
| 1534 def chmod_entries(arg, dirname, names, do_chmod=do_chmod): |
| 1535 for n in names: |
| 1536 do_chmod(os.path.join(dirname, n)) |
| 1537 |
| 1538 os.path.walk(top, chmod_entries, None) |
| 1539 else: |
| 1540 # It's a directory and we're trying to turn off execute |
| 1541 # permission, which means we have to chmod the directories |
| 1542 # in the tree bottom-up, lest disabling execute permission from |
| 1543 # the top down get in the way of being able to get at lower |
| 1544 # parts of the tree. But os.path.walk() visits things top |
| 1545 # down, so we just use an object to collect a list of all |
| 1546 # of the entries in the tree, reverse the list, and then |
| 1547 # chmod the reversed (bottom-up) list. |
| 1548 col = Collector(top) |
| 1549 os.path.walk(top, col, None) |
| 1550 col.entries.reverse() |
| 1551 for d in col.entries: do_chmod(d) |
| 1552 |
| 1553 def write(self, file, content, mode = 'wb'): |
| 1554 """Writes the specified content text (second argument) to the |
| 1555 specified file name (first argument). The file name may be |
| 1556 a list, in which case the elements are concatenated with the |
| 1557 os.path.join() method. The file is created under the temporary |
| 1558 working directory. Any subdirectories in the path must already |
| 1559 exist. The I/O mode for the file may be specified; it must |
| 1560 begin with a 'w'. The default is 'wb' (binary write). |
| 1561 """ |
| 1562 file = self.canonicalize(file) |
| 1563 if mode[0] != 'w': |
| 1564 raise ValueError, "mode must begin with 'w'" |
| 1565 open(file, mode).write(content) |
| 1566 |
| 1567 # Local Variables: |
| 1568 # tab-width:4 |
| 1569 # indent-tabs-mode:nil |
| 1570 # End: |
| 1571 # vim: set expandtab tabstop=4 shiftwidth=4: |
| OLD | NEW |