| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_process -*- | |
| 2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 """ | |
| 6 UNIX Process management. | |
| 7 | |
| 8 Do NOT use this module directly - use reactor.spawnProcess() instead. | |
| 9 | |
| 10 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
| 11 """ | |
| 12 | |
| 13 # System Imports | |
| 14 import gc, os, sys, traceback, select, signal, errno | |
| 15 import warnings | |
| 16 | |
| 17 try: | |
| 18 import pty | |
| 19 except ImportError: | |
| 20 pty = None | |
| 21 | |
| 22 try: | |
| 23 import fcntl, termios | |
| 24 except ImportError: | |
| 25 fcntl = None | |
| 26 | |
| 27 from twisted.persisted import styles | |
| 28 from twisted.python import log, failure | |
| 29 from twisted.python.util import switchUID | |
| 30 from twisted.internet import fdesc, abstract, error | |
| 31 from twisted.internet.main import CONNECTION_LOST, CONNECTION_DONE | |
| 32 | |
| 33 # Some people were importing this, which is incorrect, just keeping it | |
| 34 # here for backwards compatibility: | |
| 35 ProcessExitedAlready = error.ProcessExitedAlready | |
| 36 | |
| 37 reapProcessHandlers = {} | |
| 38 | |
| 39 def reapAllProcesses(): | |
| 40 """ | |
| 41 Reap all registered processes. | |
| 42 """ | |
| 43 for process in reapProcessHandlers.values(): | |
| 44 process.reapProcess() | |
| 45 | |
| 46 | |
| 47 def registerReapProcessHandler(pid, process): | |
| 48 """ | |
| 49 Register a process handler for the given pid, in case L{reapAllProcesses} | |
| 50 is called. | |
| 51 | |
| 52 @param pid: the pid of the process. | |
| 53 @param process: a process handler. | |
| 54 """ | |
| 55 if pid in reapProcessHandlers: | |
| 56 raise RuntimeError("Try to register an already registered process.") | |
| 57 try: | |
| 58 auxPID, status = os.waitpid(pid, os.WNOHANG) | |
| 59 except: | |
| 60 log.msg('Failed to reap %d:' % pid) | |
| 61 log.err() | |
| 62 auxPID = None | |
| 63 if auxPID: | |
| 64 process.processEnded(status) | |
| 65 else: | |
| 66 # if auxPID is 0, there are children but none have exited | |
| 67 reapProcessHandlers[pid] = process | |
| 68 | |
| 69 | |
| 70 def unregisterReapProcessHandler(pid, process): | |
| 71 """ | |
| 72 Unregister a process handler previously registered with | |
| 73 L{registerReapProcessHandler}. | |
| 74 """ | |
| 75 if not (pid in reapProcessHandlers | |
| 76 and reapProcessHandlers[pid] == process): | |
| 77 raise RuntimeError("Try to unregister a process not registered.") | |
| 78 del reapProcessHandlers[pid] | |
| 79 | |
| 80 | |
| 81 def detectLinuxBrokenPipeBehavior(): | |
| 82 """ | |
| 83 On some Linux version, write-only pipe are detected as readable. This | |
| 84 function is here to check if this bug is present or not. | |
| 85 | |
| 86 See L{ProcessWriter.doRead} for a more detailed explanation. | |
| 87 """ | |
| 88 global brokenLinuxPipeBehavior | |
| 89 r, w = os.pipe() | |
| 90 os.write(w, 'a') | |
| 91 reads, writes, exes = select.select([w], [], [], 0) | |
| 92 if reads: | |
| 93 # Linux < 2.6.11 says a write-only pipe is readable. | |
| 94 brokenLinuxPipeBehavior = True | |
| 95 else: | |
| 96 brokenLinuxPipeBehavior = False | |
| 97 os.close(r) | |
| 98 os.close(w) | |
| 99 | |
| 100 # Call at import time | |
| 101 detectLinuxBrokenPipeBehavior() | |
| 102 | |
| 103 | |
| 104 class ProcessWriter(abstract.FileDescriptor): | |
| 105 """ | |
| 106 (Internal) Helper class to write into a Process's input pipe. | |
| 107 | |
| 108 I am a helper which describes a selectable asynchronous writer to a | |
| 109 process's input pipe, including stdin. | |
| 110 """ | |
| 111 connected = 1 | |
| 112 ic = 0 | |
| 113 enableReadHack = False | |
| 114 | |
| 115 def __init__(self, reactor, proc, name, fileno, forceReadHack=False): | |
| 116 """ | |
| 117 Initialize, specifying a Process instance to connect to. | |
| 118 """ | |
| 119 abstract.FileDescriptor.__init__(self, reactor) | |
| 120 fdesc.setNonBlocking(fileno) | |
| 121 self.proc = proc | |
| 122 self.name = name | |
| 123 self.fd = fileno | |
| 124 | |
| 125 if forceReadHack: | |
| 126 self.enableReadHack = True | |
| 127 else: | |
| 128 # Detect if this fd is actually a write-only fd. If it's | |
| 129 # valid to read, don't try to detect closing via read. | |
| 130 # This really only means that we cannot detect a TTY's write | |
| 131 # pipe being closed. | |
| 132 try: | |
| 133 os.read(self.fileno(), 0) | |
| 134 except OSError: | |
| 135 # It's a write-only pipe end, enable hack | |
| 136 self.enableReadHack = True | |
| 137 | |
| 138 if self.enableReadHack: | |
| 139 self.startReading() | |
| 140 | |
| 141 def fileno(self): | |
| 142 """ | |
| 143 Return the fileno() of my process's stdin. | |
| 144 """ | |
| 145 return self.fd | |
| 146 | |
| 147 def writeSomeData(self, data): | |
| 148 """ | |
| 149 Write some data to the open process. | |
| 150 """ | |
| 151 rv = fdesc.writeToFD(self.fd, data) | |
| 152 if rv == len(data) and self.enableReadHack: | |
| 153 self.startReading() | |
| 154 return rv | |
| 155 | |
| 156 def write(self, data): | |
| 157 self.stopReading() | |
| 158 abstract.FileDescriptor.write(self, data) | |
| 159 | |
| 160 def doRead(self): | |
| 161 """ | |
| 162 The only way a write pipe can become "readable" is at EOF, because the | |
| 163 child has closed it, and we're using a reactor which doesn't | |
| 164 distinguish between readable and closed (such as the select reactor). | |
| 165 | |
| 166 Except that's not true on linux < 2.6.11. It has the following | |
| 167 characteristics: write pipe is completely empty => POLLOUT (writable in | |
| 168 select), write pipe is not completely empty => POLLIN (readable in | |
| 169 select), write pipe's reader closed => POLLIN|POLLERR (readable and | |
| 170 writable in select) | |
| 171 | |
| 172 That's what this funky code is for. If linux was not broken, this | |
| 173 function could be simply "return CONNECTION_LOST". | |
| 174 | |
| 175 BUG: We call select no matter what the reactor. | |
| 176 If the reactor is pollreactor, and the fd is > 1024, this will fail. | |
| 177 (only occurs on broken versions of linux, though). | |
| 178 """ | |
| 179 if self.enableReadHack: | |
| 180 if brokenLinuxPipeBehavior: | |
| 181 fd = self.fd | |
| 182 r, w, x = select.select([fd], [fd], [], 0) | |
| 183 if r and w: | |
| 184 return CONNECTION_LOST | |
| 185 else: | |
| 186 return CONNECTION_LOST | |
| 187 else: | |
| 188 self.stopReading() | |
| 189 | |
| 190 def connectionLost(self, reason): | |
| 191 """ | |
| 192 See abstract.FileDescriptor.connectionLost. | |
| 193 """ | |
| 194 # At least on OS X 10.4, exiting while stdout is non-blocking can | |
| 195 # result in data loss. For some reason putting the file descriptor | |
| 196 # back into blocking mode seems to resolve this issue. | |
| 197 fdesc.setBlocking(self.fd) | |
| 198 | |
| 199 abstract.FileDescriptor.connectionLost(self, reason) | |
| 200 self.proc.childConnectionLost(self.name, reason) | |
| 201 | |
| 202 | |
| 203 | |
| 204 class ProcessReader(abstract.FileDescriptor): | |
| 205 """ | |
| 206 ProcessReader | |
| 207 | |
| 208 I am a selectable representation of a process's output pipe, such as | |
| 209 stdout and stderr. | |
| 210 """ | |
| 211 connected = 1 | |
| 212 | |
| 213 def __init__(self, reactor, proc, name, fileno): | |
| 214 """ | |
| 215 Initialize, specifying a process to connect to. | |
| 216 """ | |
| 217 abstract.FileDescriptor.__init__(self, reactor) | |
| 218 fdesc.setNonBlocking(fileno) | |
| 219 self.proc = proc | |
| 220 self.name = name | |
| 221 self.fd = fileno | |
| 222 self.startReading() | |
| 223 | |
| 224 def fileno(self): | |
| 225 """ | |
| 226 Return the fileno() of my process's stderr. | |
| 227 """ | |
| 228 return self.fd | |
| 229 | |
| 230 def writeSomeData(self, data): | |
| 231 # the only time this is actually called is after .loseConnection Any | |
| 232 # actual write attempt would fail, so we must avoid that. This hack | |
| 233 # allows us to use .loseConnection on both readers and writers. | |
| 234 assert data == "" | |
| 235 return CONNECTION_LOST | |
| 236 | |
| 237 def doRead(self): | |
| 238 """ | |
| 239 This is called when the pipe becomes readable. | |
| 240 """ | |
| 241 return fdesc.readFromFD(self.fd, self.dataReceived) | |
| 242 | |
| 243 def dataReceived(self, data): | |
| 244 self.proc.childDataReceived(self.name, data) | |
| 245 | |
| 246 def loseConnection(self): | |
| 247 if self.connected and not self.disconnecting: | |
| 248 self.disconnecting = 1 | |
| 249 self.stopReading() | |
| 250 self.reactor.callLater(0, self.connectionLost, | |
| 251 failure.Failure(CONNECTION_DONE)) | |
| 252 | |
| 253 def connectionLost(self, reason): | |
| 254 """ | |
| 255 Close my end of the pipe, signal the Process (which signals the | |
| 256 ProcessProtocol). | |
| 257 """ | |
| 258 abstract.FileDescriptor.connectionLost(self, reason) | |
| 259 self.proc.childConnectionLost(self.name, reason) | |
| 260 | |
| 261 | |
| 262 class _BaseProcess(styles.Ephemeral, object): | |
| 263 """ | |
| 264 Base class for Process and PTYProcess. | |
| 265 """ | |
| 266 | |
| 267 status = -1 | |
| 268 pid = None | |
| 269 | |
| 270 def reapProcess(self): | |
| 271 """ | |
| 272 Try to reap a process (without blocking) via waitpid. | |
| 273 | |
| 274 This is called when sigchild is caught or a Process object loses its | |
| 275 "connection" (stdout is closed) This ought to result in reaping all | |
| 276 zombie processes, since it will be called twice as often as it needs | |
| 277 to be. | |
| 278 | |
| 279 (Unfortunately, this is a slightly experimental approach, since | |
| 280 UNIX has no way to be really sure that your process is going to | |
| 281 go away w/o blocking. I don't want to block.) | |
| 282 """ | |
| 283 try: | |
| 284 try: | |
| 285 pid, status = os.waitpid(self.pid, os.WNOHANG) | |
| 286 except OSError, e: | |
| 287 if e.errno == errno.ECHILD: | |
| 288 # no child process | |
| 289 pid = None | |
| 290 else: | |
| 291 raise | |
| 292 except: | |
| 293 log.msg('Failed to reap %d:' % self.pid) | |
| 294 log.err() | |
| 295 pid = None | |
| 296 if pid: | |
| 297 self.processEnded(status) | |
| 298 unregisterReapProcessHandler(pid, self) | |
| 299 | |
| 300 def signalProcess(self, signalID): | |
| 301 """ | |
| 302 Send the given signal C{signalID} to the process. It'll translate a | |
| 303 few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string | |
| 304 representation to its int value, otherwise it'll pass directly the | |
| 305 value provided | |
| 306 | |
| 307 @type signalID: C{str} or C{int} | |
| 308 """ | |
| 309 if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'): | |
| 310 signalID = getattr(signal, 'SIG%s' % (signalID,)) | |
| 311 if self.pid is None: | |
| 312 raise ProcessExitedAlready() | |
| 313 os.kill(self.pid, signalID) | |
| 314 | |
| 315 def maybeCallProcessEnded(self): | |
| 316 """ | |
| 317 Call processEnded on protocol after final cleanup. | |
| 318 """ | |
| 319 try: | |
| 320 exitCode = sig = None | |
| 321 if self.status != -1: | |
| 322 if os.WIFEXITED(self.status): | |
| 323 exitCode = os.WEXITSTATUS(self.status) | |
| 324 else: | |
| 325 sig = os.WTERMSIG(self.status) | |
| 326 else: | |
| 327 pass # don't think this can happen | |
| 328 if exitCode or sig: | |
| 329 e = error.ProcessTerminated(exitCode, sig, self.status) | |
| 330 else: | |
| 331 e = error.ProcessDone(self.status) | |
| 332 if self.proto is not None: | |
| 333 self.proto.processEnded(failure.Failure(e)) | |
| 334 self.proto = None | |
| 335 except: | |
| 336 log.err() | |
| 337 | |
| 338 def _fork(self, path, uid, gid, executable, args, environment, **kwargs): | |
| 339 """ | |
| 340 Fork and then exec sub-process. | |
| 341 | |
| 342 @param path: the path where to run the new process. | |
| 343 @type path: C{str} | |
| 344 @param uid: if defined, the uid used to run the new process. | |
| 345 @type uid: C{int} | |
| 346 @param gid: if defined, the gid used to run the new process. | |
| 347 @type gid: C{int} | |
| 348 @param executable: the executable to run in a new process. | |
| 349 @type executable: C{str} | |
| 350 @param args: arguments used to create the new process. | |
| 351 @type args: C{list}. | |
| 352 @param environment: environment used for the new process. | |
| 353 @type environment: C{dict}. | |
| 354 @param kwargs: keyword arguments to L{_setupChild} method. | |
| 355 """ | |
| 356 settingUID = (uid is not None) or (gid is not None) | |
| 357 if settingUID: | |
| 358 curegid = os.getegid() | |
| 359 currgid = os.getgid() | |
| 360 cureuid = os.geteuid() | |
| 361 curruid = os.getuid() | |
| 362 if uid is None: | |
| 363 uid = cureuid | |
| 364 if gid is None: | |
| 365 gid = curegid | |
| 366 # prepare to change UID in subprocess | |
| 367 os.setuid(0) | |
| 368 os.setgid(0) | |
| 369 | |
| 370 collectorEnabled = gc.isenabled() | |
| 371 gc.disable() | |
| 372 try: | |
| 373 self.pid = os.fork() | |
| 374 except: | |
| 375 # Still in the parent process | |
| 376 if collectorEnabled: | |
| 377 gc.enable() | |
| 378 raise | |
| 379 else: | |
| 380 if self.pid == 0: # pid is 0 in the child process | |
| 381 # do not put *ANY* code outside the try block. The child process | |
| 382 # must either exec or _exit. If it gets outside this block (due | |
| 383 # to an exception that is not handled here, but which might be | |
| 384 # handled higher up), there will be two copies of the parent | |
| 385 # running in parallel, doing all kinds of damage. | |
| 386 | |
| 387 # After each change to this code, review it to make sure there | |
| 388 # are no exit paths. | |
| 389 try: | |
| 390 # Stop debugging. If I am, I don't care anymore. | |
| 391 sys.settrace(None) | |
| 392 self._setupChild(**kwargs) | |
| 393 self._execChild(path, settingUID, uid, gid, | |
| 394 executable, args, environment) | |
| 395 except: | |
| 396 # If there are errors, bail and try to write something | |
| 397 # descriptive to stderr. | |
| 398 # XXX: The parent's stderr isn't necessarily fd 2 anymore, o
r | |
| 399 # even still available | |
| 400 # XXXX: however even libc assumes write(2, err) is a useful | |
| 401 # thing to attempt | |
| 402 try: | |
| 403 stderr = os.fdopen(2, 'w') | |
| 404 stderr.write("Upon execvpe %s %s in environment %s\n:" % | |
| 405 (executable, str(args), | |
| 406 "id %s" % id(environment))) | |
| 407 traceback.print_exc(file=stderr) | |
| 408 stderr.flush() | |
| 409 for fd in range(3): | |
| 410 os.close(fd) | |
| 411 except: | |
| 412 pass # make *sure* the child terminates | |
| 413 # Did you read the comment about not adding code here? | |
| 414 os._exit(1) | |
| 415 | |
| 416 # we are now in parent process | |
| 417 if collectorEnabled: | |
| 418 gc.enable() | |
| 419 self.status = -1 # this records the exit status of the child | |
| 420 if settingUID: | |
| 421 os.setregid(currgid, curegid) | |
| 422 os.setreuid(curruid, cureuid) | |
| 423 | |
| 424 def _setupChild(self, *args, **kwargs): | |
| 425 """ | |
| 426 Setup the child process. Override in subclasses. | |
| 427 """ | |
| 428 raise NotImplementedError() | |
| 429 | |
| 430 def _execChild(self, path, settingUID, uid, gid, | |
| 431 executable, args, environment): | |
| 432 """ | |
| 433 The exec() which is done in the forked child. | |
| 434 """ | |
| 435 if path: | |
| 436 os.chdir(path) | |
| 437 # set the UID before I actually exec the process | |
| 438 if settingUID: | |
| 439 switchUID(uid, gid) | |
| 440 os.execvpe(executable, args, environment) | |
| 441 | |
| 442 def __repr__(self): | |
| 443 """ | |
| 444 String representation of a process. | |
| 445 """ | |
| 446 return "<%s pid=%s status=%s>" % (self.__class__.__name__, | |
| 447 self.pid, self.status) | |
| 448 | |
| 449 class Process(_BaseProcess): | |
| 450 """ | |
| 451 An operating-system Process. | |
| 452 | |
| 453 This represents an operating-system process with arbitrary input/output | |
| 454 pipes connected to it. Those pipes may represent standard input, | |
| 455 standard output, and standard error, or any other file descriptor. | |
| 456 | |
| 457 On UNIX, this is implemented using fork(), exec(), pipe() | |
| 458 and fcntl(). These calls may not exist elsewhere so this | |
| 459 code is not cross-platform. (also, windows can only select | |
| 460 on sockets...) | |
| 461 """ | |
| 462 | |
| 463 debug = False | |
| 464 debug_child = False | |
| 465 | |
| 466 status = -1 | |
| 467 pid = None | |
| 468 | |
| 469 processWriterFactory = ProcessWriter | |
| 470 processReaderFactory = ProcessReader | |
| 471 | |
| 472 def __init__(self, | |
| 473 reactor, executable, args, environment, path, proto, | |
| 474 uid=None, gid=None, childFDs=None): | |
| 475 """ | |
| 476 Spawn an operating-system process. | |
| 477 | |
| 478 This is where the hard work of disconnecting all currently open | |
| 479 files / forking / executing the new process happens. (This is | |
| 480 executed automatically when a Process is instantiated.) | |
| 481 | |
| 482 This will also run the subprocess as a given user ID and group ID, if | |
| 483 specified. (Implementation Note: this doesn't support all the arcane | |
| 484 nuances of setXXuid on UNIX: it will assume that either your effective | |
| 485 or real UID is 0.) | |
| 486 """ | |
| 487 if not proto: | |
| 488 assert 'r' not in childFDs.values() | |
| 489 assert 'w' not in childFDs.values() | |
| 490 if not signal.getsignal(signal.SIGCHLD): | |
| 491 warnings.warn( | |
| 492 error.PotentialZombieWarning.MESSAGE, | |
| 493 error.PotentialZombieWarning, | |
| 494 stacklevel=3) | |
| 495 | |
| 496 self.lostProcess = False | |
| 497 | |
| 498 self.pipes = {} | |
| 499 # keys are childFDs, we can sense them closing | |
| 500 # values are ProcessReader/ProcessWriters | |
| 501 | |
| 502 helpers = {} | |
| 503 # keys are childFDs | |
| 504 # values are parentFDs | |
| 505 | |
| 506 if childFDs is None: | |
| 507 childFDs = {0: "w", # we write to the child's stdin | |
| 508 1: "r", # we read from their stdout | |
| 509 2: "r", # and we read from their stderr | |
| 510 } | |
| 511 | |
| 512 debug = self.debug | |
| 513 if debug: print "childFDs", childFDs | |
| 514 | |
| 515 # fdmap.keys() are filenos of pipes that are used by the child. | |
| 516 fdmap = {} # maps childFD to parentFD | |
| 517 for childFD, target in childFDs.items(): | |
| 518 if debug: print "[%d]" % childFD, target | |
| 519 if target == "r": | |
| 520 # we need a pipe that the parent can read from | |
| 521 readFD, writeFD = os.pipe() | |
| 522 if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD) | |
| 523 fdmap[childFD] = writeFD # child writes to this | |
| 524 helpers[childFD] = readFD # parent reads from this | |
| 525 elif target == "w": | |
| 526 # we need a pipe that the parent can write to | |
| 527 readFD, writeFD = os.pipe() | |
| 528 if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD) | |
| 529 fdmap[childFD] = readFD # child reads from this | |
| 530 helpers[childFD] = writeFD # parent writes to this | |
| 531 else: | |
| 532 assert type(target) == int, '%r should be an int' % (target,) | |
| 533 fdmap[childFD] = target # parent ignores this | |
| 534 if debug: print "fdmap", fdmap | |
| 535 if debug: print "helpers", helpers | |
| 536 # the child only cares about fdmap.values() | |
| 537 | |
| 538 self._fork(path, uid, gid, executable, args, environment, fdmap=fdmap) | |
| 539 | |
| 540 # we are the parent process: | |
| 541 self.proto = proto | |
| 542 | |
| 543 # arrange for the parent-side pipes to be read and written | |
| 544 for childFD, parentFD in helpers.items(): | |
| 545 os.close(fdmap[childFD]) | |
| 546 | |
| 547 if childFDs[childFD] == "r": | |
| 548 reader = self.processReaderFactory(reactor, self, childFD, | |
| 549 parentFD) | |
| 550 self.pipes[childFD] = reader | |
| 551 | |
| 552 if childFDs[childFD] == "w": | |
| 553 writer = self.processWriterFactory(reactor, self, childFD, | |
| 554 parentFD, forceReadHack=True) | |
| 555 self.pipes[childFD] = writer | |
| 556 | |
| 557 try: | |
| 558 # the 'transport' is used for some compatibility methods | |
| 559 if self.proto is not None: | |
| 560 self.proto.makeConnection(self) | |
| 561 except: | |
| 562 log.err() | |
| 563 registerReapProcessHandler(self.pid, self) | |
| 564 | |
| 565 def _setupChild(self, fdmap): | |
| 566 """ | |
| 567 fdmap[childFD] = parentFD | |
| 568 | |
| 569 The child wants to end up with 'childFD' attached to what used to be | |
| 570 the parent's parentFD. As an example, a bash command run like | |
| 571 'command 2>&1' would correspond to an fdmap of {0:0, 1:1, 2:1}. | |
| 572 'command >foo.txt' would be {0:0, 1:os.open('foo.txt'), 2:2}. | |
| 573 | |
| 574 This is accomplished in two steps:: | |
| 575 | |
| 576 1. close all file descriptors that aren't values of fdmap. This | |
| 577 means 0 .. maxfds. | |
| 578 | |
| 579 2. for each childFD:: | |
| 580 | |
| 581 - if fdmap[childFD] == childFD, the descriptor is already in | |
| 582 place. Make sure the CLOEXEC flag is not set, then delete | |
| 583 the entry from fdmap. | |
| 584 | |
| 585 - if childFD is in fdmap.values(), then the target descriptor | |
| 586 is busy. Use os.dup() to move it elsewhere, update all | |
| 587 fdmap[childFD] items that point to it, then close the | |
| 588 original. Then fall through to the next case. | |
| 589 | |
| 590 - now fdmap[childFD] is not in fdmap.values(), and is free. | |
| 591 Use os.dup2() to move it to the right place, then close the | |
| 592 original. | |
| 593 """ | |
| 594 | |
| 595 debug = self.debug_child | |
| 596 if debug: | |
| 597 errfd = sys.stderr | |
| 598 errfd.write("starting _setupChild\n") | |
| 599 | |
| 600 destList = fdmap.values() | |
| 601 try: | |
| 602 import resource | |
| 603 maxfds = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + 1 | |
| 604 # OS-X reports 9223372036854775808. That's a lot of fds to close | |
| 605 if maxfds > 1024: | |
| 606 maxfds = 1024 | |
| 607 except: | |
| 608 maxfds = 256 | |
| 609 | |
| 610 for fd in xrange(maxfds): | |
| 611 if fd in destList: | |
| 612 continue | |
| 613 if debug and fd == errfd.fileno(): | |
| 614 continue | |
| 615 try: | |
| 616 os.close(fd) | |
| 617 except: | |
| 618 pass | |
| 619 | |
| 620 # at this point, the only fds still open are the ones that need to | |
| 621 # be moved to their appropriate positions in the child (the targets | |
| 622 # of fdmap, i.e. fdmap.values() ) | |
| 623 | |
| 624 if debug: print >>errfd, "fdmap", fdmap | |
| 625 childlist = fdmap.keys() | |
| 626 childlist.sort() | |
| 627 | |
| 628 for child in childlist: | |
| 629 target = fdmap[child] | |
| 630 if target == child: | |
| 631 # fd is already in place | |
| 632 if debug: print >>errfd, "%d already in place" % target | |
| 633 if fcntl and hasattr(fcntl, 'FD_CLOEXEC'): | |
| 634 old = fcntl.fcntl(child, fcntl.F_GETFD) | |
| 635 fcntl.fcntl(child, | |
| 636 fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) | |
| 637 else: | |
| 638 if child in fdmap.values(): | |
| 639 # we can't replace child-fd yet, as some other mapping | |
| 640 # still needs the fd it wants to target. We must preserve | |
| 641 # that old fd by duping it to a new home. | |
| 642 newtarget = os.dup(child) # give it a safe home | |
| 643 if debug: print >>errfd, "os.dup(%d) -> %d" % (child, | |
| 644 newtarget) | |
| 645 os.close(child) # close the original | |
| 646 for c, p in fdmap.items(): | |
| 647 if p == child: | |
| 648 fdmap[c] = newtarget # update all pointers | |
| 649 # now it should be available | |
| 650 if debug: print >>errfd, "os.dup2(%d,%d)" % (target, child) | |
| 651 os.dup2(target, child) | |
| 652 | |
| 653 # At this point, the child has everything it needs. We want to close | |
| 654 # everything that isn't going to be used by the child, i.e. | |
| 655 # everything not in fdmap.keys(). The only remaining fds open are | |
| 656 # those in fdmap.values(). | |
| 657 | |
| 658 # Any given fd may appear in fdmap.values() multiple times, so we | |
| 659 # need to remove duplicates first. | |
| 660 | |
| 661 old = [] | |
| 662 for fd in fdmap.values(): | |
| 663 if not fd in old: | |
| 664 if not fd in fdmap.keys(): | |
| 665 old.append(fd) | |
| 666 if debug: print >>errfd, "old", old | |
| 667 for fd in old: | |
| 668 os.close(fd) | |
| 669 | |
| 670 def writeToChild(self, childFD, data): | |
| 671 self.pipes[childFD].write(data) | |
| 672 | |
| 673 def closeChildFD(self, childFD): | |
| 674 # for writer pipes, loseConnection tries to write the remaining data | |
| 675 # out to the pipe before closing it | |
| 676 # if childFD is not in the list of pipes, assume that it is already | |
| 677 # closed | |
| 678 if childFD in self.pipes: | |
| 679 self.pipes[childFD].loseConnection() | |
| 680 | |
| 681 def pauseProducing(self): | |
| 682 for p in self.pipes.itervalues(): | |
| 683 if isinstance(p, ProcessReader): | |
| 684 p.stopReading() | |
| 685 | |
| 686 def resumeProducing(self): | |
| 687 for p in self.pipes.itervalues(): | |
| 688 if isinstance(p, ProcessReader): | |
| 689 p.startReading() | |
| 690 | |
| 691 # compatibility | |
| 692 def closeStdin(self): | |
| 693 """ | |
| 694 Call this to close standard input on this process. | |
| 695 """ | |
| 696 self.closeChildFD(0) | |
| 697 | |
| 698 def closeStdout(self): | |
| 699 self.closeChildFD(1) | |
| 700 | |
| 701 def closeStderr(self): | |
| 702 self.closeChildFD(2) | |
| 703 | |
| 704 def loseConnection(self): | |
| 705 self.closeStdin() | |
| 706 self.closeStderr() | |
| 707 self.closeStdout() | |
| 708 | |
| 709 def write(self, data): | |
| 710 """ | |
| 711 Call this to write to standard input on this process. | |
| 712 | |
| 713 NOTE: This will silently lose data if there is no standard input. | |
| 714 """ | |
| 715 if 0 in self.pipes: | |
| 716 self.pipes[0].write(data) | |
| 717 | |
| 718 def registerProducer(self, producer, streaming): | |
| 719 """ | |
| 720 Call this to register producer for standard input. | |
| 721 | |
| 722 If there is no standard input producer.stopProducing() will | |
| 723 be called immediately. | |
| 724 """ | |
| 725 if 0 in self.pipes: | |
| 726 self.pipes[0].registerProducer(producer, streaming) | |
| 727 else: | |
| 728 producer.stopProducing() | |
| 729 | |
| 730 def unregisterProducer(self): | |
| 731 """ | |
| 732 Call this to unregister producer for standard input.""" | |
| 733 if 0 in self.pipes: | |
| 734 self.pipes[0].unregisterProducer() | |
| 735 | |
| 736 def writeSequence(self, seq): | |
| 737 """ | |
| 738 Call this to write to standard input on this process. | |
| 739 | |
| 740 NOTE: This will silently lose data if there is no standard input. | |
| 741 """ | |
| 742 if 0 in self.pipes: | |
| 743 self.pipes[0].writeSequence(seq) | |
| 744 | |
| 745 def childDataReceived(self, name, data): | |
| 746 self.proto.childDataReceived(name, data) | |
| 747 | |
| 748 def processEnded(self, status): | |
| 749 # this is called when the child terminates (SIGCHLD) | |
| 750 self.status = status | |
| 751 self.lostProcess = True | |
| 752 self.pid = None | |
| 753 self.maybeCallProcessEnded() | |
| 754 | |
| 755 def childConnectionLost(self, childFD, reason): | |
| 756 # this is called when one of the helpers (ProcessReader or | |
| 757 # ProcessWriter) notices their pipe has been closed | |
| 758 os.close(self.pipes[childFD].fileno()) | |
| 759 del self.pipes[childFD] | |
| 760 try: | |
| 761 self.proto.childConnectionLost(childFD) | |
| 762 except: | |
| 763 log.err() | |
| 764 self.maybeCallProcessEnded() | |
| 765 | |
| 766 def maybeCallProcessEnded(self): | |
| 767 # we don't call ProcessProtocol.processEnded until: | |
| 768 # the child has terminated, AND | |
| 769 # all writers have indicated an error status, AND | |
| 770 # all readers have indicated EOF | |
| 771 # This insures that we've gathered all output from the process. | |
| 772 if self.pipes: | |
| 773 return | |
| 774 if not self.lostProcess: | |
| 775 self.reapProcess() | |
| 776 return | |
| 777 _BaseProcess.maybeCallProcessEnded(self) | |
| 778 | |
| 779 | |
| 780 class PTYProcess(abstract.FileDescriptor, _BaseProcess): | |
| 781 """ | |
| 782 An operating-system Process that uses PTY support. | |
| 783 """ | |
| 784 status = -1 | |
| 785 pid = None | |
| 786 | |
| 787 def __init__(self, reactor, executable, args, environment, path, proto, | |
| 788 uid=None, gid=None, usePTY=None): | |
| 789 """ | |
| 790 Spawn an operating-system process. | |
| 791 | |
| 792 This is where the hard work of disconnecting all currently open | |
| 793 files / forking / executing the new process happens. (This is | |
| 794 executed automatically when a Process is instantiated.) | |
| 795 | |
| 796 This will also run the subprocess as a given user ID and group ID, if | |
| 797 specified. (Implementation Note: this doesn't support all the arcane | |
| 798 nuances of setXXuid on UNIX: it will assume that either your effective | |
| 799 or real UID is 0.) | |
| 800 """ | |
| 801 if pty is None and not isinstance(usePTY, (tuple, list)): | |
| 802 # no pty module and we didn't get a pty to use | |
| 803 raise NotImplementedError( | |
| 804 "cannot use PTYProcess on platforms without the pty module.") | |
| 805 abstract.FileDescriptor.__init__(self, reactor) | |
| 806 | |
| 807 if isinstance(usePTY, (tuple, list)): | |
| 808 masterfd, slavefd, ttyname = usePTY | |
| 809 else: | |
| 810 masterfd, slavefd = pty.openpty() | |
| 811 ttyname = os.ttyname(slavefd) | |
| 812 | |
| 813 self._fork(path, uid, gid, executable, args, environment, | |
| 814 masterfd=masterfd, slavefd=slavefd) | |
| 815 | |
| 816 # we are now in parent process: | |
| 817 os.close(slavefd) | |
| 818 fdesc.setNonBlocking(masterfd) | |
| 819 self.fd = masterfd | |
| 820 self.startReading() | |
| 821 self.connected = 1 | |
| 822 self.proto = proto | |
| 823 self.lostProcess = 0 | |
| 824 self.status = -1 | |
| 825 try: | |
| 826 self.proto.makeConnection(self) | |
| 827 except: | |
| 828 log.err() | |
| 829 registerReapProcessHandler(self.pid, self) | |
| 830 | |
| 831 def _setupChild(self, masterfd, slavefd): | |
| 832 """ | |
| 833 Setup child process after fork() but before exec(). | |
| 834 """ | |
| 835 os.close(masterfd) | |
| 836 if hasattr(termios, 'TIOCNOTTY'): | |
| 837 try: | |
| 838 fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) | |
| 839 except OSError: | |
| 840 pass | |
| 841 else: | |
| 842 try: | |
| 843 fcntl.ioctl(fd, termios.TIOCNOTTY, '') | |
| 844 except: | |
| 845 pass | |
| 846 os.close(fd) | |
| 847 | |
| 848 os.setsid() | |
| 849 | |
| 850 if hasattr(termios, 'TIOCSCTTY'): | |
| 851 fcntl.ioctl(slavefd, termios.TIOCSCTTY, '') | |
| 852 | |
| 853 for fd in range(3): | |
| 854 if fd != slavefd: | |
| 855 os.close(fd) | |
| 856 | |
| 857 os.dup2(slavefd, 0) # stdin | |
| 858 os.dup2(slavefd, 1) # stdout | |
| 859 os.dup2(slavefd, 2) # stderr | |
| 860 | |
| 861 for fd in xrange(3, 256): | |
| 862 try: | |
| 863 os.close(fd) | |
| 864 except: | |
| 865 pass | |
| 866 | |
| 867 # PTYs do not have stdin/stdout/stderr. They only have in and out, just | |
| 868 # like sockets. You cannot close one without closing off the entire PTY. | |
| 869 def closeStdin(self): | |
| 870 pass | |
| 871 | |
| 872 def closeStdout(self): | |
| 873 pass | |
| 874 | |
| 875 def closeStderr(self): | |
| 876 pass | |
| 877 | |
| 878 def processEnded(self, status): | |
| 879 self.status = status | |
| 880 self.lostProcess += 1 | |
| 881 self.pid = None | |
| 882 self.maybeCallProcessEnded() | |
| 883 | |
| 884 def doRead(self): | |
| 885 """ | |
| 886 Called when my standard output stream is ready for reading. | |
| 887 """ | |
| 888 return fdesc.readFromFD( | |
| 889 self.fd, | |
| 890 lambda data: self.proto.childDataReceived(1, data)) | |
| 891 | |
| 892 def fileno(self): | |
| 893 """ | |
| 894 This returns the file number of standard output on this process. | |
| 895 """ | |
| 896 return self.fd | |
| 897 | |
| 898 def maybeCallProcessEnded(self): | |
| 899 # two things must happen before we call the ProcessProtocol's | |
| 900 # processEnded method. 1: the child process must die and be reaped | |
| 901 # (which calls our own processEnded method). 2: the child must close | |
| 902 # their stdin/stdout/stderr fds, causing the pty to close, causing | |
| 903 # our connectionLost method to be called. #2 can also be triggered | |
| 904 # by calling .loseConnection(). | |
| 905 if self.lostProcess == 2: | |
| 906 _BaseProcess.maybeCallProcessEnded(self) | |
| 907 | |
| 908 def connectionLost(self, reason): | |
| 909 """ | |
| 910 I call this to clean up when one or all of my connections has died. | |
| 911 """ | |
| 912 abstract.FileDescriptor.connectionLost(self, reason) | |
| 913 os.close(self.fd) | |
| 914 self.lostProcess += 1 | |
| 915 self.maybeCallProcessEnded() | |
| 916 | |
| 917 def writeSomeData(self, data): | |
| 918 """ | |
| 919 Write some data to the open process. | |
| 920 """ | |
| 921 return fdesc.writeToFD(self.fd, data) | |
| 922 | |
| OLD | NEW |