OLD | NEW |
| (Empty) |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """RPC compatible subprocess-type module. | |
6 | |
7 This module defined both a task-side process class as well as a controller-side | |
8 process wrapper for easier access and usage of the task-side process. | |
9 """ | |
10 | |
11 import logging | |
12 import subprocess | |
13 import sys | |
14 import threading | |
15 | |
16 #pylint: disable=relative-import | |
17 import common_lib | |
18 | |
19 # Map swarming_client to use subprocess42 | |
20 sys.path.append(common_lib.SWARMING_DIR) | |
21 | |
22 from utils import subprocess42 | |
23 | |
24 | |
25 class ControllerProcessWrapper(object): | |
26 """Controller-side process wrapper class. | |
27 | |
28 This class provides a more intuitive interface to task-side processes | |
29 than calling the methods directly using the RPC object. | |
30 """ | |
31 | |
32 def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None): | |
33 self._rpc = rpc | |
34 self._id = rpc.subprocess.Process(cmd) | |
35 if verbose: | |
36 self._rpc.subprocess.SetVerbose(self._id) | |
37 if detached: | |
38 self._rpc.subprocess.SetDetached(self._id) | |
39 if cwd: | |
40 self._rpc.subprocess.SetCwd(self._rpc, cwd) | |
41 self._rpc.subprocess.Start(self._id) | |
42 | |
43 def Terminate(self): | |
44 logging.debug('Terminating process %s', self._id) | |
45 return self._rpc.subprocess.Terminate(self._id) | |
46 | |
47 def Kill(self): | |
48 logging.debug('Killing process %s', self._id) | |
49 self._rpc.subprocess.Kill(self._id) | |
50 | |
51 def Delete(self): | |
52 return self._rpc.subprocess.Delete(self._id) | |
53 | |
54 def GetReturncode(self): | |
55 return self._rpc.subprocess.GetReturncode(self._id) | |
56 | |
57 def ReadStdout(self): | |
58 """Returns all stdout since the last call to ReadStdout. | |
59 | |
60 This call allows the user to read stdout while the process is running. | |
61 However each call will flush the local stdout buffer. In order to make | |
62 multiple calls to ReadStdout and to retain the entire output the results | |
63 of this call will need to be buffered in the calling code. | |
64 """ | |
65 return self._rpc.subprocess.ReadStdout(self._id) | |
66 | |
67 def ReadStderr(self): | |
68 """Returns all stderr read since the last call to ReadStderr. | |
69 | |
70 See ReadStdout for additional details. | |
71 """ | |
72 return self._rpc.subprocess.ReadStderr(self._id) | |
73 | |
74 def ReadOutput(self): | |
75 """Returns the (stdout, stderr) since the last Read* call. | |
76 | |
77 See ReadStdout for additional details. | |
78 """ | |
79 return self._rpc.subprocess.ReadOutput(self._id) | |
80 | |
81 def Wait(self): | |
82 return self._rpc.subprocess.Wait(self._id) | |
83 | |
84 def Poll(self): | |
85 return self._rpc.subprocess.Poll(self._id) | |
86 | |
87 def GetPid(self): | |
88 return self._rpc.subprocess.GetPid(self._id) | |
89 | |
90 | |
91 | |
92 class Process(object): | |
93 """Implements a task-side non-blocking subprocess. | |
94 | |
95 This non-blocking subprocess allows the caller to continue operating while | |
96 also able to interact with this subprocess based on a key returned to | |
97 the caller at the time of creation. | |
98 | |
99 Creation args are set via Set* methods called after calling Process but | |
100 before calling Start. This is due to a limitation of the XML-RPC | |
101 implementation not supporting keyword arguments. | |
102 """ | |
103 | |
104 _processes = {} | |
105 _process_next_id = 0 | |
106 _creation_lock = threading.Lock() | |
107 | |
108 def __init__(self, cmd): | |
109 self.stdout = '' | |
110 self.stderr = '' | |
111 self.cmd = cmd | |
112 self.proc = None | |
113 self.cwd = None | |
114 self.verbose = False | |
115 self.detached = False | |
116 self.data_lock = threading.Lock() | |
117 | |
118 def __str__(self): | |
119 return '%r, cwd=%r, verbose=%r, detached=%r' % ( | |
120 self.cmd, self.cwd, self.verbose, self.detached) | |
121 | |
122 def _reader(self): | |
123 for pipe, data in self.proc.yield_any(): | |
124 with self.data_lock: | |
125 if pipe == 'stdout': | |
126 self.stdout += data | |
127 if self.verbose: | |
128 sys.stdout.write(data) | |
129 else: | |
130 self.stderr += data | |
131 if self.verbose: | |
132 sys.stderr.write(data) | |
133 | |
134 @classmethod | |
135 def KillAll(cls): | |
136 for key in cls._processes: | |
137 cls.Kill(key) | |
138 | |
139 @classmethod | |
140 def Process(cls, cmd): | |
141 with cls._creation_lock: | |
142 key = 'Process%d' % cls._process_next_id | |
143 cls._process_next_id += 1 | |
144 logging.debug('Creating process %s with cmd %r', key, cmd) | |
145 process = cls(cmd) | |
146 cls._processes[key] = process | |
147 return key | |
148 | |
149 def _Start(self): | |
150 logging.info('Starting process %s', self) | |
151 self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE, | |
152 stderr=subprocess42.PIPE, | |
153 detached=self.detached, cwd=self.cwd) | |
154 threading.Thread(target=self._reader).start() | |
155 | |
156 @classmethod | |
157 def Start(cls, key): | |
158 cls._processes[key]._Start() | |
159 | |
160 @classmethod | |
161 def SetCwd(cls, key, cwd): | |
162 """Sets the process's cwd.""" | |
163 logging.debug('Setting %s cwd to %s', key, cwd) | |
164 cls._processes[key].cwd = cwd | |
165 | |
166 @classmethod | |
167 def SetDetached(cls, key): | |
168 """Creates a detached process.""" | |
169 logging.debug('Setting %s.detached = True', key) | |
170 cls._processes[key].detached = True | |
171 | |
172 @classmethod | |
173 def SetVerbose(cls, key): | |
174 """Sets the stdout and stderr to be emitted locally.""" | |
175 logging.debug('Setting %s.verbose = True', key) | |
176 cls._processes[key].verbose = True | |
177 | |
178 @classmethod | |
179 def Terminate(cls, key): | |
180 logging.debug('Terminating process %s', key) | |
181 cls._processes[key].proc.terminate() | |
182 | |
183 @classmethod | |
184 def Kill(cls, key): | |
185 logging.debug('Killing process %s', key) | |
186 cls._processes[key].proc.kill() | |
187 | |
188 @classmethod | |
189 def Delete(cls, key): | |
190 if cls.GetReturncode(key) is None: | |
191 logging.warning('Killing %s before deleting it', key) | |
192 cls.Kill(key) | |
193 logging.debug('Deleting process %s', key) | |
194 cls._processes.pop(key) | |
195 | |
196 @classmethod | |
197 def GetReturncode(cls, key): | |
198 return cls._processes[key].proc.returncode | |
199 | |
200 @classmethod | |
201 def ReadStdout(cls, key): | |
202 """Returns all stdout since the last call to ReadStdout. | |
203 | |
204 This call allows the user to read stdout while the process is running. | |
205 However each call will flush the local stdout buffer. In order to make | |
206 multiple calls to ReadStdout and to retain the entire output the results | |
207 of this call will need to be buffered in the calling code. | |
208 """ | |
209 proc = cls._processes[key] | |
210 with proc.data_lock: | |
211 # Perform a "read" on the stdout data | |
212 stdout = proc.stdout | |
213 proc.stdout = '' | |
214 return stdout | |
215 | |
216 @classmethod | |
217 def ReadStderr(cls, key): | |
218 """Returns all stderr read since the last call to ReadStderr. | |
219 | |
220 See ReadStdout for additional details. | |
221 """ | |
222 proc = cls._processes[key] | |
223 with proc.data_lock: | |
224 # Perform a "read" on the stderr data | |
225 stderr = proc.stderr | |
226 proc.stderr = '' | |
227 return stderr | |
228 | |
229 @classmethod | |
230 def ReadOutput(cls, key): | |
231 """Returns the (stdout, stderr) since the last Read* call. | |
232 | |
233 See ReadStdout for additional details. | |
234 """ | |
235 return cls.ReadStdout(key), cls.ReadStderr(key) | |
236 | |
237 @classmethod | |
238 def Wait(cls, key): | |
239 return cls._processes[key].proc.wait() | |
240 | |
241 @classmethod | |
242 def Poll(cls, key): | |
243 return cls._processes[key].proc.poll() | |
244 | |
245 @classmethod | |
246 def GetPid(cls, key): | |
247 return cls._processes[key].proc.pid | |
OLD | NEW |