OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 # | |
5 """ | |
6 ProcessMonitor: run processes, monitor progress, and restart when | |
7 they die. | |
8 | |
9 The ProcessMonitor will not attempt to restart a process that appears | |
10 to die instantly -- with each "instant" death (less than 1 second, by | |
11 default), it will delay approximately twice as long before restarting | |
12 it. A successful run will reset the counter. | |
13 | |
14 The primary interface is "addProcess" and "removeProcess". When the | |
15 service is active (that is, when the application it is attached to | |
16 is running), adding a process automatically starts it. | |
17 | |
18 Each process has a name (a string). This string must uniquely identify | |
19 the process. In particular, attempting to add two processes with the | |
20 same name will result in a key error. | |
21 | |
22 The arguments to addProcess are: | |
23 - name -- a string, uniquely specifying the process | |
24 - args -- a list of arguments. the first will be used to determine the | |
25 executable | |
26 - optionally, the uid and gid this process should be run as (by default, | |
27 it does not change uid/gid before running processes). | |
28 | |
29 Note that args are passed to the system call, not to the shell. If running | |
30 the shell is desired, the common idiom is to use | |
31 .addProcess("name", ['/bin/sh', '-c', shell_script]) | |
32 | |
33 removeProcess takes just the name argument. If the process is started, it | |
34 kills it, and will never restart it. | |
35 | |
36 The "restartAll" method restarts all processes. This is useful for 3rd | |
37 parties management services to allow a user to restart servers because | |
38 of an outside circumstances change -- for example, a new version of a library | |
39 which is installed. | |
40 | |
41 The following attributes on the monitor can be set to configure behaviour | |
42 - threshold -- how long a process has to live before the death is considered | |
43 instant (default 1, measured in seconds) | |
44 - killTime -- how long a process being killed has to get its affairs in | |
45 order before it gets killed with an unmaskable signal | |
46 (default 5, measured in seconds) | |
47 - consistencyDelay -- time between consistency checks | |
48 (default 60, measured in seconds) | |
49 """ | |
50 | |
51 import os, time | |
52 from signal import SIGTERM, SIGKILL | |
53 from twisted.python import log | |
54 from twisted.internet import protocol, reactor, process | |
55 from twisted.application import service | |
56 from twisted.protocols import basic | |
57 | |
58 class DummyTransport: | |
59 | |
60 disconnecting = 0 | |
61 | |
62 transport = DummyTransport() | |
63 | |
64 class LineLogger(basic.LineReceiver): | |
65 | |
66 tag = None | |
67 delimiter = '\n' | |
68 | |
69 def lineReceived(self, line): | |
70 log.msg('[%s] %s' % (self.tag, line)) | |
71 | |
72 class LoggingProtocol(protocol.ProcessProtocol): | |
73 | |
74 service = None | |
75 name = None | |
76 empty = 1 | |
77 | |
78 def connectionMade(self): | |
79 self.output = LineLogger() | |
80 self.output.tag = self.name | |
81 self.output.makeConnection(transport) | |
82 | |
83 def outReceived(self, data): | |
84 self.output.dataReceived(data) | |
85 self.empty = data[-1] == '\n' | |
86 | |
87 errReceived = outReceived | |
88 | |
89 def processEnded(self, reason): | |
90 if not self.empty: | |
91 self.output.dataReceived('\n') | |
92 self.service.connectionLost(self.name) | |
93 | |
94 | |
95 class ProcessMonitor(service.Service): | |
96 | |
97 threshold = 1 | |
98 active = 0 | |
99 killTime = 5 | |
100 consistency = None | |
101 consistencyDelay = 60 | |
102 | |
103 def __init__(self): | |
104 self.processes = {} | |
105 self.protocols = {} | |
106 self.delay = {} | |
107 self.timeStarted = {} | |
108 self.murder = {} | |
109 | |
110 def __getstate__(self): | |
111 dct = service.Service.__getstate__(self) | |
112 for k in ('active', 'consistency'): | |
113 if dct.has_key(k): | |
114 del dct[k] | |
115 dct['protocols'] = {} | |
116 dct['delay'] = {} | |
117 dct['timeStarted'] = {} | |
118 dct['murder'] = {} | |
119 return dct | |
120 | |
121 def _checkConsistency(self): | |
122 for name, protocol in self.protocols.items(): | |
123 proc = protocol.transport | |
124 try: | |
125 proc.signalProcess(0) | |
126 except (OSError, process.ProcessExitedAlready): | |
127 log.msg("Lost process %r somehow, restarting." % name) | |
128 del self.protocols[name] | |
129 self.startProcess(name) | |
130 self.consistency = reactor.callLater(self.consistencyDelay, | |
131 self._checkConsistency) | |
132 | |
133 def addProcess(self, name, args, uid=None, gid=None): | |
134 if self.processes.has_key(name): | |
135 raise KeyError("remove %s first" % name) | |
136 self.processes[name] = args, uid, gid | |
137 if self.active: | |
138 self.startProcess(name) | |
139 | |
140 def removeProcess(self, name): | |
141 del self.processes[name] | |
142 self.stopProcess(name) | |
143 | |
144 def startService(self): | |
145 service.Service.startService(self) | |
146 self.active = 1 | |
147 for name in self.processes.keys(): | |
148 reactor.callLater(0, self.startProcess, name) | |
149 self.consistency = reactor.callLater(self.consistencyDelay, | |
150 self._checkConsistency) | |
151 | |
152 def stopService(self): | |
153 service.Service.stopService(self) | |
154 self.active = 0 | |
155 for name in self.processes.keys(): | |
156 self.stopProcess(name) | |
157 self.consistency.cancel() | |
158 | |
159 def connectionLost(self, name): | |
160 if self.murder.has_key(name): | |
161 self.murder[name].cancel() | |
162 del self.murder[name] | |
163 if self.protocols.has_key(name): | |
164 del self.protocols[name] | |
165 if time.time()-self.timeStarted[name]<self.threshold: | |
166 delay = self.delay[name] = min(1+2*self.delay.get(name, 0), 3600) | |
167 else: | |
168 delay = self.delay[name] = 0 | |
169 if self.active and self.processes.has_key(name): | |
170 reactor.callLater(delay, self.startProcess, name) | |
171 | |
172 def startProcess(self, name): | |
173 if self.protocols.has_key(name): | |
174 return | |
175 p = self.protocols[name] = LoggingProtocol() | |
176 p.service = self | |
177 p.name = name | |
178 args, uid, gid = self.processes[name] | |
179 self.timeStarted[name] = time.time() | |
180 reactor.spawnProcess(p, args[0], args, uid=uid, gid=gid) | |
181 | |
182 def _forceStopProcess(self, proc): | |
183 try: | |
184 proc.signalProcess(SIGKILL) | |
185 except process.ProcessExitedAlready: | |
186 pass | |
187 | |
188 def stopProcess(self, name): | |
189 if not self.protocols.has_key(name): | |
190 return | |
191 proc = self.protocols[name].transport | |
192 del self.protocols[name] | |
193 try: | |
194 proc.signalProcess(SIGTERM) | |
195 except process.ProcessExitedAlready: | |
196 pass | |
197 else: | |
198 self.murder[name] = reactor.callLater(self.killTime, self._forceStop
Process, proc) | |
199 | |
200 def restartAll(self): | |
201 for name in self.processes.keys(): | |
202 self.stopProcess(name) | |
203 | |
204 def __repr__(self): | |
205 l = [] | |
206 for name, proc in self.processes.items(): | |
207 uidgid = '' | |
208 if proc[1] is not None: | |
209 uidgid = str(proc[1]) | |
210 if proc[2] is not None: | |
211 uidgid += ':'+str(proc[2]) | |
212 | |
213 if uidgid: | |
214 uidgid = '(' + uidgid + ')' | |
215 l.append('%r%s: %r' % (name, uidgid, proc[0])) | |
216 return ('<' + self.__class__.__name__ + ' ' | |
217 + ' '.join(l) | |
218 + '>') | |
219 | |
220 def main(): | |
221 mon = ProcessMonitor() | |
222 mon.addProcess('foo', ['/bin/sh', '-c', 'sleep 2;echo hello']) | |
223 mon.addProcess('qux', ['/bin/sh', '-c', 'sleep 2;printf pilim']) | |
224 mon.addProcess('bar', ['/bin/sh', '-c', 'echo goodbye']) | |
225 mon.addProcess('baz', ['/bin/sh', '-c', | |
226 'echo welcome;while :;do echo blah;sleep 5;done']) | |
227 reactor.callLater(30, lambda mon=mon: | |
228 os.kill(mon.protocols['baz'].transport.pid, SIGTERM)) | |
229 reactor.callLater(60, mon.restartAll) | |
230 mon.startService() | |
231 reactor.addSystemEventTrigger('before', 'shutdown', mon.stopService) | |
232 reactor.run() | |
233 | |
234 if __name__ == '__main__': | |
235 main() | |
OLD | NEW |