Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(235)

Side by Side Diff: testing/legion/process.py

Issue 1841863002: Update monet. (Closed) Base URL: https://github.com/domokit/monet.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « testing/legion/legion_test_case.py ('k') | testing/legion/rpc_methods.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """RPC compatible subprocess-type module.
6
7 This module defined both a task-side process class as well as a controller-side
8 process wrapper for easier access and usage of the task-side process.
9 """
10
11 import logging
12 import subprocess
13 import sys
14 import threading
15
16 #pylint: disable=relative-import
17 import common_lib
18
19 # Map swarming_client to use subprocess42
20 sys.path.append(common_lib.SWARMING_DIR)
21
22 from utils import subprocess42
23
24
25 class ControllerProcessWrapper(object):
26 """Controller-side process wrapper class.
27
28 This class provides a more intuitive interface to task-side processes
29 than calling the methods directly using the RPC object.
30 """
31
32 def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None):
33 self._rpc = rpc
34 self._id = rpc.subprocess.Process(cmd)
35 if verbose:
36 self._rpc.subprocess.SetVerbose(self._id)
37 if detached:
38 self._rpc.subprocess.SetDetached(self._id)
39 if cwd:
40 self._rpc.subprocess.SetCwd(self._rpc, cwd)
41 self._rpc.subprocess.Start(self._id)
42
43 def Terminate(self):
44 logging.debug('Terminating process %s', self._id)
45 return self._rpc.subprocess.Terminate(self._id)
46
47 def Kill(self):
48 logging.debug('Killing process %s', self._id)
49 self._rpc.subprocess.Kill(self._id)
50
51 def Delete(self):
52 return self._rpc.subprocess.Delete(self._id)
53
54 def GetReturncode(self):
55 return self._rpc.subprocess.GetReturncode(self._id)
56
57 def ReadStdout(self):
58 """Returns all stdout since the last call to ReadStdout.
59
60 This call allows the user to read stdout while the process is running.
61 However each call will flush the local stdout buffer. In order to make
62 multiple calls to ReadStdout and to retain the entire output the results
63 of this call will need to be buffered in the calling code.
64 """
65 return self._rpc.subprocess.ReadStdout(self._id)
66
67 def ReadStderr(self):
68 """Returns all stderr read since the last call to ReadStderr.
69
70 See ReadStdout for additional details.
71 """
72 return self._rpc.subprocess.ReadStderr(self._id)
73
74 def ReadOutput(self):
75 """Returns the (stdout, stderr) since the last Read* call.
76
77 See ReadStdout for additional details.
78 """
79 return self._rpc.subprocess.ReadOutput(self._id)
80
81 def Wait(self):
82 return self._rpc.subprocess.Wait(self._id)
83
84 def Poll(self):
85 return self._rpc.subprocess.Poll(self._id)
86
87 def GetPid(self):
88 return self._rpc.subprocess.GetPid(self._id)
89
90
91
92 class Process(object):
93 """Implements a task-side non-blocking subprocess.
94
95 This non-blocking subprocess allows the caller to continue operating while
96 also able to interact with this subprocess based on a key returned to
97 the caller at the time of creation.
98
99 Creation args are set via Set* methods called after calling Process but
100 before calling Start. This is due to a limitation of the XML-RPC
101 implementation not supporting keyword arguments.
102 """
103
104 _processes = {}
105 _process_next_id = 0
106 _creation_lock = threading.Lock()
107
108 def __init__(self, cmd):
109 self.stdout = ''
110 self.stderr = ''
111 self.cmd = cmd
112 self.proc = None
113 self.cwd = None
114 self.verbose = False
115 self.detached = False
116 self.data_lock = threading.Lock()
117
118 def __str__(self):
119 return '%r, cwd=%r, verbose=%r, detached=%r' % (
120 self.cmd, self.cwd, self.verbose, self.detached)
121
122 def _reader(self):
123 for pipe, data in self.proc.yield_any():
124 with self.data_lock:
125 if pipe == 'stdout':
126 self.stdout += data
127 if self.verbose:
128 sys.stdout.write(data)
129 else:
130 self.stderr += data
131 if self.verbose:
132 sys.stderr.write(data)
133
134 @classmethod
135 def KillAll(cls):
136 for key in cls._processes:
137 cls.Kill(key)
138
139 @classmethod
140 def Process(cls, cmd):
141 with cls._creation_lock:
142 key = 'Process%d' % cls._process_next_id
143 cls._process_next_id += 1
144 logging.debug('Creating process %s with cmd %r', key, cmd)
145 process = cls(cmd)
146 cls._processes[key] = process
147 return key
148
149 def _Start(self):
150 logging.info('Starting process %s', self)
151 self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE,
152 stderr=subprocess42.PIPE,
153 detached=self.detached, cwd=self.cwd)
154 threading.Thread(target=self._reader).start()
155
156 @classmethod
157 def Start(cls, key):
158 cls._processes[key]._Start()
159
160 @classmethod
161 def SetCwd(cls, key, cwd):
162 """Sets the process's cwd."""
163 logging.debug('Setting %s cwd to %s', key, cwd)
164 cls._processes[key].cwd = cwd
165
166 @classmethod
167 def SetDetached(cls, key):
168 """Creates a detached process."""
169 logging.debug('Setting %s.detached = True', key)
170 cls._processes[key].detached = True
171
172 @classmethod
173 def SetVerbose(cls, key):
174 """Sets the stdout and stderr to be emitted locally."""
175 logging.debug('Setting %s.verbose = True', key)
176 cls._processes[key].verbose = True
177
178 @classmethod
179 def Terminate(cls, key):
180 logging.debug('Terminating process %s', key)
181 cls._processes[key].proc.terminate()
182
183 @classmethod
184 def Kill(cls, key):
185 logging.debug('Killing process %s', key)
186 cls._processes[key].proc.kill()
187
188 @classmethod
189 def Delete(cls, key):
190 if cls.GetReturncode(key) is None:
191 logging.warning('Killing %s before deleting it', key)
192 cls.Kill(key)
193 logging.debug('Deleting process %s', key)
194 cls._processes.pop(key)
195
196 @classmethod
197 def GetReturncode(cls, key):
198 return cls._processes[key].proc.returncode
199
200 @classmethod
201 def ReadStdout(cls, key):
202 """Returns all stdout since the last call to ReadStdout.
203
204 This call allows the user to read stdout while the process is running.
205 However each call will flush the local stdout buffer. In order to make
206 multiple calls to ReadStdout and to retain the entire output the results
207 of this call will need to be buffered in the calling code.
208 """
209 proc = cls._processes[key]
210 with proc.data_lock:
211 # Perform a "read" on the stdout data
212 stdout = proc.stdout
213 proc.stdout = ''
214 return stdout
215
216 @classmethod
217 def ReadStderr(cls, key):
218 """Returns all stderr read since the last call to ReadStderr.
219
220 See ReadStdout for additional details.
221 """
222 proc = cls._processes[key]
223 with proc.data_lock:
224 # Perform a "read" on the stderr data
225 stderr = proc.stderr
226 proc.stderr = ''
227 return stderr
228
229 @classmethod
230 def ReadOutput(cls, key):
231 """Returns the (stdout, stderr) since the last Read* call.
232
233 See ReadStdout for additional details.
234 """
235 return cls.ReadStdout(key), cls.ReadStderr(key)
236
237 @classmethod
238 def Wait(cls, key):
239 return cls._processes[key].proc.wait()
240
241 @classmethod
242 def Poll(cls, key):
243 return cls._processes[key].proc.poll()
244
245 @classmethod
246 def GetPid(cls, key):
247 return cls._processes[key].proc.pid
OLDNEW
« no previous file with comments | « testing/legion/legion_test_case.py ('k') | testing/legion/rpc_methods.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698