OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_process -*- | |
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 """ | |
6 UNIX Process management. | |
7 | |
8 Do NOT use this module directly - use reactor.spawnProcess() instead. | |
9 | |
10 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>} | |
11 """ | |
12 | |
13 # System Imports | |
14 import gc, os, sys, traceback, select, signal, errno | |
15 import warnings | |
16 | |
17 try: | |
18 import pty | |
19 except ImportError: | |
20 pty = None | |
21 | |
22 try: | |
23 import fcntl, termios | |
24 except ImportError: | |
25 fcntl = None | |
26 | |
27 from twisted.persisted import styles | |
28 from twisted.python import log, failure | |
29 from twisted.python.util import switchUID | |
30 from twisted.internet import fdesc, abstract, error | |
31 from twisted.internet.main import CONNECTION_LOST, CONNECTION_DONE | |
32 | |
33 # Some people were importing this, which is incorrect, just keeping it | |
34 # here for backwards compatibility: | |
35 ProcessExitedAlready = error.ProcessExitedAlready | |
36 | |
37 reapProcessHandlers = {} | |
38 | |
39 def reapAllProcesses(): | |
40 """ | |
41 Reap all registered processes. | |
42 """ | |
43 for process in reapProcessHandlers.values(): | |
44 process.reapProcess() | |
45 | |
46 | |
47 def registerReapProcessHandler(pid, process): | |
48 """ | |
49 Register a process handler for the given pid, in case L{reapAllProcesses} | |
50 is called. | |
51 | |
52 @param pid: the pid of the process. | |
53 @param process: a process handler. | |
54 """ | |
55 if pid in reapProcessHandlers: | |
56 raise RuntimeError("Try to register an already registered process.") | |
57 try: | |
58 auxPID, status = os.waitpid(pid, os.WNOHANG) | |
59 except: | |
60 log.msg('Failed to reap %d:' % pid) | |
61 log.err() | |
62 auxPID = None | |
63 if auxPID: | |
64 process.processEnded(status) | |
65 else: | |
66 # if auxPID is 0, there are children but none have exited | |
67 reapProcessHandlers[pid] = process | |
68 | |
69 | |
70 def unregisterReapProcessHandler(pid, process): | |
71 """ | |
72 Unregister a process handler previously registered with | |
73 L{registerReapProcessHandler}. | |
74 """ | |
75 if not (pid in reapProcessHandlers | |
76 and reapProcessHandlers[pid] == process): | |
77 raise RuntimeError("Try to unregister a process not registered.") | |
78 del reapProcessHandlers[pid] | |
79 | |
80 | |
81 def detectLinuxBrokenPipeBehavior(): | |
82 """ | |
83 On some Linux version, write-only pipe are detected as readable. This | |
84 function is here to check if this bug is present or not. | |
85 | |
86 See L{ProcessWriter.doRead} for a more detailed explanation. | |
87 """ | |
88 global brokenLinuxPipeBehavior | |
89 r, w = os.pipe() | |
90 os.write(w, 'a') | |
91 reads, writes, exes = select.select([w], [], [], 0) | |
92 if reads: | |
93 # Linux < 2.6.11 says a write-only pipe is readable. | |
94 brokenLinuxPipeBehavior = True | |
95 else: | |
96 brokenLinuxPipeBehavior = False | |
97 os.close(r) | |
98 os.close(w) | |
99 | |
100 # Call at import time | |
101 detectLinuxBrokenPipeBehavior() | |
102 | |
103 | |
104 class ProcessWriter(abstract.FileDescriptor): | |
105 """ | |
106 (Internal) Helper class to write into a Process's input pipe. | |
107 | |
108 I am a helper which describes a selectable asynchronous writer to a | |
109 process's input pipe, including stdin. | |
110 """ | |
111 connected = 1 | |
112 ic = 0 | |
113 enableReadHack = False | |
114 | |
115 def __init__(self, reactor, proc, name, fileno, forceReadHack=False): | |
116 """ | |
117 Initialize, specifying a Process instance to connect to. | |
118 """ | |
119 abstract.FileDescriptor.__init__(self, reactor) | |
120 fdesc.setNonBlocking(fileno) | |
121 self.proc = proc | |
122 self.name = name | |
123 self.fd = fileno | |
124 | |
125 if forceReadHack: | |
126 self.enableReadHack = True | |
127 else: | |
128 # Detect if this fd is actually a write-only fd. If it's | |
129 # valid to read, don't try to detect closing via read. | |
130 # This really only means that we cannot detect a TTY's write | |
131 # pipe being closed. | |
132 try: | |
133 os.read(self.fileno(), 0) | |
134 except OSError: | |
135 # It's a write-only pipe end, enable hack | |
136 self.enableReadHack = True | |
137 | |
138 if self.enableReadHack: | |
139 self.startReading() | |
140 | |
141 def fileno(self): | |
142 """ | |
143 Return the fileno() of my process's stdin. | |
144 """ | |
145 return self.fd | |
146 | |
147 def writeSomeData(self, data): | |
148 """ | |
149 Write some data to the open process. | |
150 """ | |
151 rv = fdesc.writeToFD(self.fd, data) | |
152 if rv == len(data) and self.enableReadHack: | |
153 self.startReading() | |
154 return rv | |
155 | |
156 def write(self, data): | |
157 self.stopReading() | |
158 abstract.FileDescriptor.write(self, data) | |
159 | |
160 def doRead(self): | |
161 """ | |
162 The only way a write pipe can become "readable" is at EOF, because the | |
163 child has closed it, and we're using a reactor which doesn't | |
164 distinguish between readable and closed (such as the select reactor). | |
165 | |
166 Except that's not true on linux < 2.6.11. It has the following | |
167 characteristics: write pipe is completely empty => POLLOUT (writable in | |
168 select), write pipe is not completely empty => POLLIN (readable in | |
169 select), write pipe's reader closed => POLLIN|POLLERR (readable and | |
170 writable in select) | |
171 | |
172 That's what this funky code is for. If linux was not broken, this | |
173 function could be simply "return CONNECTION_LOST". | |
174 | |
175 BUG: We call select no matter what the reactor. | |
176 If the reactor is pollreactor, and the fd is > 1024, this will fail. | |
177 (only occurs on broken versions of linux, though). | |
178 """ | |
179 if self.enableReadHack: | |
180 if brokenLinuxPipeBehavior: | |
181 fd = self.fd | |
182 r, w, x = select.select([fd], [fd], [], 0) | |
183 if r and w: | |
184 return CONNECTION_LOST | |
185 else: | |
186 return CONNECTION_LOST | |
187 else: | |
188 self.stopReading() | |
189 | |
190 def connectionLost(self, reason): | |
191 """ | |
192 See abstract.FileDescriptor.connectionLost. | |
193 """ | |
194 # At least on OS X 10.4, exiting while stdout is non-blocking can | |
195 # result in data loss. For some reason putting the file descriptor | |
196 # back into blocking mode seems to resolve this issue. | |
197 fdesc.setBlocking(self.fd) | |
198 | |
199 abstract.FileDescriptor.connectionLost(self, reason) | |
200 self.proc.childConnectionLost(self.name, reason) | |
201 | |
202 | |
203 | |
204 class ProcessReader(abstract.FileDescriptor): | |
205 """ | |
206 ProcessReader | |
207 | |
208 I am a selectable representation of a process's output pipe, such as | |
209 stdout and stderr. | |
210 """ | |
211 connected = 1 | |
212 | |
213 def __init__(self, reactor, proc, name, fileno): | |
214 """ | |
215 Initialize, specifying a process to connect to. | |
216 """ | |
217 abstract.FileDescriptor.__init__(self, reactor) | |
218 fdesc.setNonBlocking(fileno) | |
219 self.proc = proc | |
220 self.name = name | |
221 self.fd = fileno | |
222 self.startReading() | |
223 | |
224 def fileno(self): | |
225 """ | |
226 Return the fileno() of my process's stderr. | |
227 """ | |
228 return self.fd | |
229 | |
230 def writeSomeData(self, data): | |
231 # the only time this is actually called is after .loseConnection Any | |
232 # actual write attempt would fail, so we must avoid that. This hack | |
233 # allows us to use .loseConnection on both readers and writers. | |
234 assert data == "" | |
235 return CONNECTION_LOST | |
236 | |
237 def doRead(self): | |
238 """ | |
239 This is called when the pipe becomes readable. | |
240 """ | |
241 return fdesc.readFromFD(self.fd, self.dataReceived) | |
242 | |
243 def dataReceived(self, data): | |
244 self.proc.childDataReceived(self.name, data) | |
245 | |
246 def loseConnection(self): | |
247 if self.connected and not self.disconnecting: | |
248 self.disconnecting = 1 | |
249 self.stopReading() | |
250 self.reactor.callLater(0, self.connectionLost, | |
251 failure.Failure(CONNECTION_DONE)) | |
252 | |
253 def connectionLost(self, reason): | |
254 """ | |
255 Close my end of the pipe, signal the Process (which signals the | |
256 ProcessProtocol). | |
257 """ | |
258 abstract.FileDescriptor.connectionLost(self, reason) | |
259 self.proc.childConnectionLost(self.name, reason) | |
260 | |
261 | |
262 class _BaseProcess(styles.Ephemeral, object): | |
263 """ | |
264 Base class for Process and PTYProcess. | |
265 """ | |
266 | |
267 status = -1 | |
268 pid = None | |
269 | |
270 def reapProcess(self): | |
271 """ | |
272 Try to reap a process (without blocking) via waitpid. | |
273 | |
274 This is called when sigchild is caught or a Process object loses its | |
275 "connection" (stdout is closed) This ought to result in reaping all | |
276 zombie processes, since it will be called twice as often as it needs | |
277 to be. | |
278 | |
279 (Unfortunately, this is a slightly experimental approach, since | |
280 UNIX has no way to be really sure that your process is going to | |
281 go away w/o blocking. I don't want to block.) | |
282 """ | |
283 try: | |
284 try: | |
285 pid, status = os.waitpid(self.pid, os.WNOHANG) | |
286 except OSError, e: | |
287 if e.errno == errno.ECHILD: | |
288 # no child process | |
289 pid = None | |
290 else: | |
291 raise | |
292 except: | |
293 log.msg('Failed to reap %d:' % self.pid) | |
294 log.err() | |
295 pid = None | |
296 if pid: | |
297 self.processEnded(status) | |
298 unregisterReapProcessHandler(pid, self) | |
299 | |
300 def signalProcess(self, signalID): | |
301 """ | |
302 Send the given signal C{signalID} to the process. It'll translate a | |
303 few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string | |
304 representation to its int value, otherwise it'll pass directly the | |
305 value provided | |
306 | |
307 @type signalID: C{str} or C{int} | |
308 """ | |
309 if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'): | |
310 signalID = getattr(signal, 'SIG%s' % (signalID,)) | |
311 if self.pid is None: | |
312 raise ProcessExitedAlready() | |
313 os.kill(self.pid, signalID) | |
314 | |
315 def maybeCallProcessEnded(self): | |
316 """ | |
317 Call processEnded on protocol after final cleanup. | |
318 """ | |
319 try: | |
320 exitCode = sig = None | |
321 if self.status != -1: | |
322 if os.WIFEXITED(self.status): | |
323 exitCode = os.WEXITSTATUS(self.status) | |
324 else: | |
325 sig = os.WTERMSIG(self.status) | |
326 else: | |
327 pass # don't think this can happen | |
328 if exitCode or sig: | |
329 e = error.ProcessTerminated(exitCode, sig, self.status) | |
330 else: | |
331 e = error.ProcessDone(self.status) | |
332 if self.proto is not None: | |
333 self.proto.processEnded(failure.Failure(e)) | |
334 self.proto = None | |
335 except: | |
336 log.err() | |
337 | |
338 def _fork(self, path, uid, gid, executable, args, environment, **kwargs): | |
339 """ | |
340 Fork and then exec sub-process. | |
341 | |
342 @param path: the path where to run the new process. | |
343 @type path: C{str} | |
344 @param uid: if defined, the uid used to run the new process. | |
345 @type uid: C{int} | |
346 @param gid: if defined, the gid used to run the new process. | |
347 @type gid: C{int} | |
348 @param executable: the executable to run in a new process. | |
349 @type executable: C{str} | |
350 @param args: arguments used to create the new process. | |
351 @type args: C{list}. | |
352 @param environment: environment used for the new process. | |
353 @type environment: C{dict}. | |
354 @param kwargs: keyword arguments to L{_setupChild} method. | |
355 """ | |
356 settingUID = (uid is not None) or (gid is not None) | |
357 if settingUID: | |
358 curegid = os.getegid() | |
359 currgid = os.getgid() | |
360 cureuid = os.geteuid() | |
361 curruid = os.getuid() | |
362 if uid is None: | |
363 uid = cureuid | |
364 if gid is None: | |
365 gid = curegid | |
366 # prepare to change UID in subprocess | |
367 os.setuid(0) | |
368 os.setgid(0) | |
369 | |
370 collectorEnabled = gc.isenabled() | |
371 gc.disable() | |
372 try: | |
373 self.pid = os.fork() | |
374 except: | |
375 # Still in the parent process | |
376 if collectorEnabled: | |
377 gc.enable() | |
378 raise | |
379 else: | |
380 if self.pid == 0: # pid is 0 in the child process | |
381 # do not put *ANY* code outside the try block. The child process | |
382 # must either exec or _exit. If it gets outside this block (due | |
383 # to an exception that is not handled here, but which might be | |
384 # handled higher up), there will be two copies of the parent | |
385 # running in parallel, doing all kinds of damage. | |
386 | |
387 # After each change to this code, review it to make sure there | |
388 # are no exit paths. | |
389 try: | |
390 # Stop debugging. If I am, I don't care anymore. | |
391 sys.settrace(None) | |
392 self._setupChild(**kwargs) | |
393 self._execChild(path, settingUID, uid, gid, | |
394 executable, args, environment) | |
395 except: | |
396 # If there are errors, bail and try to write something | |
397 # descriptive to stderr. | |
398 # XXX: The parent's stderr isn't necessarily fd 2 anymore, o
r | |
399 # even still available | |
400 # XXXX: however even libc assumes write(2, err) is a useful | |
401 # thing to attempt | |
402 try: | |
403 stderr = os.fdopen(2, 'w') | |
404 stderr.write("Upon execvpe %s %s in environment %s\n:" % | |
405 (executable, str(args), | |
406 "id %s" % id(environment))) | |
407 traceback.print_exc(file=stderr) | |
408 stderr.flush() | |
409 for fd in range(3): | |
410 os.close(fd) | |
411 except: | |
412 pass # make *sure* the child terminates | |
413 # Did you read the comment about not adding code here? | |
414 os._exit(1) | |
415 | |
416 # we are now in parent process | |
417 if collectorEnabled: | |
418 gc.enable() | |
419 self.status = -1 # this records the exit status of the child | |
420 if settingUID: | |
421 os.setregid(currgid, curegid) | |
422 os.setreuid(curruid, cureuid) | |
423 | |
424 def _setupChild(self, *args, **kwargs): | |
425 """ | |
426 Setup the child process. Override in subclasses. | |
427 """ | |
428 raise NotImplementedError() | |
429 | |
430 def _execChild(self, path, settingUID, uid, gid, | |
431 executable, args, environment): | |
432 """ | |
433 The exec() which is done in the forked child. | |
434 """ | |
435 if path: | |
436 os.chdir(path) | |
437 # set the UID before I actually exec the process | |
438 if settingUID: | |
439 switchUID(uid, gid) | |
440 os.execvpe(executable, args, environment) | |
441 | |
442 def __repr__(self): | |
443 """ | |
444 String representation of a process. | |
445 """ | |
446 return "<%s pid=%s status=%s>" % (self.__class__.__name__, | |
447 self.pid, self.status) | |
448 | |
449 class Process(_BaseProcess): | |
450 """ | |
451 An operating-system Process. | |
452 | |
453 This represents an operating-system process with arbitrary input/output | |
454 pipes connected to it. Those pipes may represent standard input, | |
455 standard output, and standard error, or any other file descriptor. | |
456 | |
457 On UNIX, this is implemented using fork(), exec(), pipe() | |
458 and fcntl(). These calls may not exist elsewhere so this | |
459 code is not cross-platform. (also, windows can only select | |
460 on sockets...) | |
461 """ | |
462 | |
463 debug = False | |
464 debug_child = False | |
465 | |
466 status = -1 | |
467 pid = None | |
468 | |
469 processWriterFactory = ProcessWriter | |
470 processReaderFactory = ProcessReader | |
471 | |
472 def __init__(self, | |
473 reactor, executable, args, environment, path, proto, | |
474 uid=None, gid=None, childFDs=None): | |
475 """ | |
476 Spawn an operating-system process. | |
477 | |
478 This is where the hard work of disconnecting all currently open | |
479 files / forking / executing the new process happens. (This is | |
480 executed automatically when a Process is instantiated.) | |
481 | |
482 This will also run the subprocess as a given user ID and group ID, if | |
483 specified. (Implementation Note: this doesn't support all the arcane | |
484 nuances of setXXuid on UNIX: it will assume that either your effective | |
485 or real UID is 0.) | |
486 """ | |
487 if not proto: | |
488 assert 'r' not in childFDs.values() | |
489 assert 'w' not in childFDs.values() | |
490 if not signal.getsignal(signal.SIGCHLD): | |
491 warnings.warn( | |
492 error.PotentialZombieWarning.MESSAGE, | |
493 error.PotentialZombieWarning, | |
494 stacklevel=3) | |
495 | |
496 self.lostProcess = False | |
497 | |
498 self.pipes = {} | |
499 # keys are childFDs, we can sense them closing | |
500 # values are ProcessReader/ProcessWriters | |
501 | |
502 helpers = {} | |
503 # keys are childFDs | |
504 # values are parentFDs | |
505 | |
506 if childFDs is None: | |
507 childFDs = {0: "w", # we write to the child's stdin | |
508 1: "r", # we read from their stdout | |
509 2: "r", # and we read from their stderr | |
510 } | |
511 | |
512 debug = self.debug | |
513 if debug: print "childFDs", childFDs | |
514 | |
515 # fdmap.keys() are filenos of pipes that are used by the child. | |
516 fdmap = {} # maps childFD to parentFD | |
517 for childFD, target in childFDs.items(): | |
518 if debug: print "[%d]" % childFD, target | |
519 if target == "r": | |
520 # we need a pipe that the parent can read from | |
521 readFD, writeFD = os.pipe() | |
522 if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD) | |
523 fdmap[childFD] = writeFD # child writes to this | |
524 helpers[childFD] = readFD # parent reads from this | |
525 elif target == "w": | |
526 # we need a pipe that the parent can write to | |
527 readFD, writeFD = os.pipe() | |
528 if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD) | |
529 fdmap[childFD] = readFD # child reads from this | |
530 helpers[childFD] = writeFD # parent writes to this | |
531 else: | |
532 assert type(target) == int, '%r should be an int' % (target,) | |
533 fdmap[childFD] = target # parent ignores this | |
534 if debug: print "fdmap", fdmap | |
535 if debug: print "helpers", helpers | |
536 # the child only cares about fdmap.values() | |
537 | |
538 self._fork(path, uid, gid, executable, args, environment, fdmap=fdmap) | |
539 | |
540 # we are the parent process: | |
541 self.proto = proto | |
542 | |
543 # arrange for the parent-side pipes to be read and written | |
544 for childFD, parentFD in helpers.items(): | |
545 os.close(fdmap[childFD]) | |
546 | |
547 if childFDs[childFD] == "r": | |
548 reader = self.processReaderFactory(reactor, self, childFD, | |
549 parentFD) | |
550 self.pipes[childFD] = reader | |
551 | |
552 if childFDs[childFD] == "w": | |
553 writer = self.processWriterFactory(reactor, self, childFD, | |
554 parentFD, forceReadHack=True) | |
555 self.pipes[childFD] = writer | |
556 | |
557 try: | |
558 # the 'transport' is used for some compatibility methods | |
559 if self.proto is not None: | |
560 self.proto.makeConnection(self) | |
561 except: | |
562 log.err() | |
563 registerReapProcessHandler(self.pid, self) | |
564 | |
565 def _setupChild(self, fdmap): | |
566 """ | |
567 fdmap[childFD] = parentFD | |
568 | |
569 The child wants to end up with 'childFD' attached to what used to be | |
570 the parent's parentFD. As an example, a bash command run like | |
571 'command 2>&1' would correspond to an fdmap of {0:0, 1:1, 2:1}. | |
572 'command >foo.txt' would be {0:0, 1:os.open('foo.txt'), 2:2}. | |
573 | |
574 This is accomplished in two steps:: | |
575 | |
576 1. close all file descriptors that aren't values of fdmap. This | |
577 means 0 .. maxfds. | |
578 | |
579 2. for each childFD:: | |
580 | |
581 - if fdmap[childFD] == childFD, the descriptor is already in | |
582 place. Make sure the CLOEXEC flag is not set, then delete | |
583 the entry from fdmap. | |
584 | |
585 - if childFD is in fdmap.values(), then the target descriptor | |
586 is busy. Use os.dup() to move it elsewhere, update all | |
587 fdmap[childFD] items that point to it, then close the | |
588 original. Then fall through to the next case. | |
589 | |
590 - now fdmap[childFD] is not in fdmap.values(), and is free. | |
591 Use os.dup2() to move it to the right place, then close the | |
592 original. | |
593 """ | |
594 | |
595 debug = self.debug_child | |
596 if debug: | |
597 errfd = sys.stderr | |
598 errfd.write("starting _setupChild\n") | |
599 | |
600 destList = fdmap.values() | |
601 try: | |
602 import resource | |
603 maxfds = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + 1 | |
604 # OS-X reports 9223372036854775808. That's a lot of fds to close | |
605 if maxfds > 1024: | |
606 maxfds = 1024 | |
607 except: | |
608 maxfds = 256 | |
609 | |
610 for fd in xrange(maxfds): | |
611 if fd in destList: | |
612 continue | |
613 if debug and fd == errfd.fileno(): | |
614 continue | |
615 try: | |
616 os.close(fd) | |
617 except: | |
618 pass | |
619 | |
620 # at this point, the only fds still open are the ones that need to | |
621 # be moved to their appropriate positions in the child (the targets | |
622 # of fdmap, i.e. fdmap.values() ) | |
623 | |
624 if debug: print >>errfd, "fdmap", fdmap | |
625 childlist = fdmap.keys() | |
626 childlist.sort() | |
627 | |
628 for child in childlist: | |
629 target = fdmap[child] | |
630 if target == child: | |
631 # fd is already in place | |
632 if debug: print >>errfd, "%d already in place" % target | |
633 if fcntl and hasattr(fcntl, 'FD_CLOEXEC'): | |
634 old = fcntl.fcntl(child, fcntl.F_GETFD) | |
635 fcntl.fcntl(child, | |
636 fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) | |
637 else: | |
638 if child in fdmap.values(): | |
639 # we can't replace child-fd yet, as some other mapping | |
640 # still needs the fd it wants to target. We must preserve | |
641 # that old fd by duping it to a new home. | |
642 newtarget = os.dup(child) # give it a safe home | |
643 if debug: print >>errfd, "os.dup(%d) -> %d" % (child, | |
644 newtarget) | |
645 os.close(child) # close the original | |
646 for c, p in fdmap.items(): | |
647 if p == child: | |
648 fdmap[c] = newtarget # update all pointers | |
649 # now it should be available | |
650 if debug: print >>errfd, "os.dup2(%d,%d)" % (target, child) | |
651 os.dup2(target, child) | |
652 | |
653 # At this point, the child has everything it needs. We want to close | |
654 # everything that isn't going to be used by the child, i.e. | |
655 # everything not in fdmap.keys(). The only remaining fds open are | |
656 # those in fdmap.values(). | |
657 | |
658 # Any given fd may appear in fdmap.values() multiple times, so we | |
659 # need to remove duplicates first. | |
660 | |
661 old = [] | |
662 for fd in fdmap.values(): | |
663 if not fd in old: | |
664 if not fd in fdmap.keys(): | |
665 old.append(fd) | |
666 if debug: print >>errfd, "old", old | |
667 for fd in old: | |
668 os.close(fd) | |
669 | |
670 def writeToChild(self, childFD, data): | |
671 self.pipes[childFD].write(data) | |
672 | |
673 def closeChildFD(self, childFD): | |
674 # for writer pipes, loseConnection tries to write the remaining data | |
675 # out to the pipe before closing it | |
676 # if childFD is not in the list of pipes, assume that it is already | |
677 # closed | |
678 if childFD in self.pipes: | |
679 self.pipes[childFD].loseConnection() | |
680 | |
681 def pauseProducing(self): | |
682 for p in self.pipes.itervalues(): | |
683 if isinstance(p, ProcessReader): | |
684 p.stopReading() | |
685 | |
686 def resumeProducing(self): | |
687 for p in self.pipes.itervalues(): | |
688 if isinstance(p, ProcessReader): | |
689 p.startReading() | |
690 | |
691 # compatibility | |
692 def closeStdin(self): | |
693 """ | |
694 Call this to close standard input on this process. | |
695 """ | |
696 self.closeChildFD(0) | |
697 | |
698 def closeStdout(self): | |
699 self.closeChildFD(1) | |
700 | |
701 def closeStderr(self): | |
702 self.closeChildFD(2) | |
703 | |
704 def loseConnection(self): | |
705 self.closeStdin() | |
706 self.closeStderr() | |
707 self.closeStdout() | |
708 | |
709 def write(self, data): | |
710 """ | |
711 Call this to write to standard input on this process. | |
712 | |
713 NOTE: This will silently lose data if there is no standard input. | |
714 """ | |
715 if 0 in self.pipes: | |
716 self.pipes[0].write(data) | |
717 | |
718 def registerProducer(self, producer, streaming): | |
719 """ | |
720 Call this to register producer for standard input. | |
721 | |
722 If there is no standard input producer.stopProducing() will | |
723 be called immediately. | |
724 """ | |
725 if 0 in self.pipes: | |
726 self.pipes[0].registerProducer(producer, streaming) | |
727 else: | |
728 producer.stopProducing() | |
729 | |
730 def unregisterProducer(self): | |
731 """ | |
732 Call this to unregister producer for standard input.""" | |
733 if 0 in self.pipes: | |
734 self.pipes[0].unregisterProducer() | |
735 | |
736 def writeSequence(self, seq): | |
737 """ | |
738 Call this to write to standard input on this process. | |
739 | |
740 NOTE: This will silently lose data if there is no standard input. | |
741 """ | |
742 if 0 in self.pipes: | |
743 self.pipes[0].writeSequence(seq) | |
744 | |
745 def childDataReceived(self, name, data): | |
746 self.proto.childDataReceived(name, data) | |
747 | |
748 def processEnded(self, status): | |
749 # this is called when the child terminates (SIGCHLD) | |
750 self.status = status | |
751 self.lostProcess = True | |
752 self.pid = None | |
753 self.maybeCallProcessEnded() | |
754 | |
755 def childConnectionLost(self, childFD, reason): | |
756 # this is called when one of the helpers (ProcessReader or | |
757 # ProcessWriter) notices their pipe has been closed | |
758 os.close(self.pipes[childFD].fileno()) | |
759 del self.pipes[childFD] | |
760 try: | |
761 self.proto.childConnectionLost(childFD) | |
762 except: | |
763 log.err() | |
764 self.maybeCallProcessEnded() | |
765 | |
766 def maybeCallProcessEnded(self): | |
767 # we don't call ProcessProtocol.processEnded until: | |
768 # the child has terminated, AND | |
769 # all writers have indicated an error status, AND | |
770 # all readers have indicated EOF | |
771 # This insures that we've gathered all output from the process. | |
772 if self.pipes: | |
773 return | |
774 if not self.lostProcess: | |
775 self.reapProcess() | |
776 return | |
777 _BaseProcess.maybeCallProcessEnded(self) | |
778 | |
779 | |
780 class PTYProcess(abstract.FileDescriptor, _BaseProcess): | |
781 """ | |
782 An operating-system Process that uses PTY support. | |
783 """ | |
784 status = -1 | |
785 pid = None | |
786 | |
787 def __init__(self, reactor, executable, args, environment, path, proto, | |
788 uid=None, gid=None, usePTY=None): | |
789 """ | |
790 Spawn an operating-system process. | |
791 | |
792 This is where the hard work of disconnecting all currently open | |
793 files / forking / executing the new process happens. (This is | |
794 executed automatically when a Process is instantiated.) | |
795 | |
796 This will also run the subprocess as a given user ID and group ID, if | |
797 specified. (Implementation Note: this doesn't support all the arcane | |
798 nuances of setXXuid on UNIX: it will assume that either your effective | |
799 or real UID is 0.) | |
800 """ | |
801 if pty is None and not isinstance(usePTY, (tuple, list)): | |
802 # no pty module and we didn't get a pty to use | |
803 raise NotImplementedError( | |
804 "cannot use PTYProcess on platforms without the pty module.") | |
805 abstract.FileDescriptor.__init__(self, reactor) | |
806 | |
807 if isinstance(usePTY, (tuple, list)): | |
808 masterfd, slavefd, ttyname = usePTY | |
809 else: | |
810 masterfd, slavefd = pty.openpty() | |
811 ttyname = os.ttyname(slavefd) | |
812 | |
813 self._fork(path, uid, gid, executable, args, environment, | |
814 masterfd=masterfd, slavefd=slavefd) | |
815 | |
816 # we are now in parent process: | |
817 os.close(slavefd) | |
818 fdesc.setNonBlocking(masterfd) | |
819 self.fd = masterfd | |
820 self.startReading() | |
821 self.connected = 1 | |
822 self.proto = proto | |
823 self.lostProcess = 0 | |
824 self.status = -1 | |
825 try: | |
826 self.proto.makeConnection(self) | |
827 except: | |
828 log.err() | |
829 registerReapProcessHandler(self.pid, self) | |
830 | |
831 def _setupChild(self, masterfd, slavefd): | |
832 """ | |
833 Setup child process after fork() but before exec(). | |
834 """ | |
835 os.close(masterfd) | |
836 if hasattr(termios, 'TIOCNOTTY'): | |
837 try: | |
838 fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) | |
839 except OSError: | |
840 pass | |
841 else: | |
842 try: | |
843 fcntl.ioctl(fd, termios.TIOCNOTTY, '') | |
844 except: | |
845 pass | |
846 os.close(fd) | |
847 | |
848 os.setsid() | |
849 | |
850 if hasattr(termios, 'TIOCSCTTY'): | |
851 fcntl.ioctl(slavefd, termios.TIOCSCTTY, '') | |
852 | |
853 for fd in range(3): | |
854 if fd != slavefd: | |
855 os.close(fd) | |
856 | |
857 os.dup2(slavefd, 0) # stdin | |
858 os.dup2(slavefd, 1) # stdout | |
859 os.dup2(slavefd, 2) # stderr | |
860 | |
861 for fd in xrange(3, 256): | |
862 try: | |
863 os.close(fd) | |
864 except: | |
865 pass | |
866 | |
867 # PTYs do not have stdin/stdout/stderr. They only have in and out, just | |
868 # like sockets. You cannot close one without closing off the entire PTY. | |
869 def closeStdin(self): | |
870 pass | |
871 | |
872 def closeStdout(self): | |
873 pass | |
874 | |
875 def closeStderr(self): | |
876 pass | |
877 | |
878 def processEnded(self, status): | |
879 self.status = status | |
880 self.lostProcess += 1 | |
881 self.pid = None | |
882 self.maybeCallProcessEnded() | |
883 | |
884 def doRead(self): | |
885 """ | |
886 Called when my standard output stream is ready for reading. | |
887 """ | |
888 return fdesc.readFromFD( | |
889 self.fd, | |
890 lambda data: self.proto.childDataReceived(1, data)) | |
891 | |
892 def fileno(self): | |
893 """ | |
894 This returns the file number of standard output on this process. | |
895 """ | |
896 return self.fd | |
897 | |
898 def maybeCallProcessEnded(self): | |
899 # two things must happen before we call the ProcessProtocol's | |
900 # processEnded method. 1: the child process must die and be reaped | |
901 # (which calls our own processEnded method). 2: the child must close | |
902 # their stdin/stdout/stderr fds, causing the pty to close, causing | |
903 # our connectionLost method to be called. #2 can also be triggered | |
904 # by calling .loseConnection(). | |
905 if self.lostProcess == 2: | |
906 _BaseProcess.maybeCallProcessEnded(self) | |
907 | |
908 def connectionLost(self, reason): | |
909 """ | |
910 I call this to clean up when one or all of my connections has died. | |
911 """ | |
912 abstract.FileDescriptor.connectionLost(self, reason) | |
913 os.close(self.fd) | |
914 self.lostProcess += 1 | |
915 self.maybeCallProcessEnded() | |
916 | |
917 def writeSomeData(self, data): | |
918 """ | |
919 Write some data to the open process. | |
920 """ | |
921 return fdesc.writeToFD(self.fd, data) | |
922 | |
OLD | NEW |