Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(44)

Side by Side Diff: recipe_engine/step_runner.py

Issue 1959563002: Use subprocess42 in recipe_engine to avoid threading madness. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/recipes-py@master
Patch Set: Don't need a lock because we're single threaded now! Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | recipe_engine/third_party/subprocess42.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import cStringIO 5 import cStringIO
6 import collections 6 import collections
7 import contextlib 7 import contextlib
8 import datetime 8 import datetime
9 import json 9 import json
10 import os 10 import os
11 import re 11 import re
12 import subprocess
13 import sys 12 import sys
14 import tempfile 13 import tempfile
15 import threading
16 import traceback 14 import traceback
17 15
18 from . import recipe_test_api 16 from . import recipe_test_api
19 from . import stream 17 from . import stream
20 from . import types 18 from . import types
21 from . import util 19 from . import util
22 20
21 from .third_party import subprocess42
22
23 if sys.platform.startswith('win'):
M-A Ruel 2016/05/06 14:12:51 if sys.platform == 'win32':
iannucci 2016/05/06 17:52:01 Done.
24 # Windows has a bad habit of opening a dialog when a console program
25 # crashes, rather than just letting it crash. Therefore, when a
26 # program crashes on Windows, we don't find out until the build step
27 # times out. This code prevents the dialog from appearing, so that we
28 # find out immediately and don't waste time waiting for a user to
29 # close the dialog.
30 import ctypes
31 # SetErrorMode(SEM_NOGPFAULTERRORBOX). For more information, see:
32 # https://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
33 ctypes.windll.kernel32.SetErrorMode(0x0002)
M-A Ruel 2016/05/06 14:12:51 s/0x0002/2/ ? :) That's a good idea! I hadn't tho
iannucci 2016/05/06 17:52:01 I was matching the text in the msdn docs :) Looki
M-A Ruel 2016/05/06 17:57:51 Good question. I'm not fond of changing the global
34
M-A Ruel 2016/05/06 14:12:51 you don't use 2 lines between file level symbols?
iannucci 2016/05/06 17:52:01 lol, done
35 class _streamingLinebuf(object):
36 def __init__(self):
37 self.buffedlines = []
38 self.extra = None
39
40 def ingest(self, data):
41 lines = data.splitlines()
42 if self.extra:
43 lines[0] = self.extra + lines[0]
44 self.extra = None
45 if not data.endswith("\n"):
46 self.extra = lines[-1]
47 lines = lines[:-1]
48 self.buffedlines += lines
49
50 def __iter__(self):
51 return self
52
53 def next(self):
54 if len(self.buffedlines) > 0:
55 ret = self.buffedlines[0]
56 self.buffedlines = self.buffedlines[1:]
57 return ret
58 raise StopIteration()
59
23 class StepRunner(object): 60 class StepRunner(object):
24 """A StepRunner is the interface to actually running steps. 61 """A StepRunner is the interface to actually running steps.
25 62
26 These can actually make subprocess calls (SubprocessStepRunner), or just 63 These can actually make subprocess calls (SubprocessStepRunner), or just
27 pretend to run the steps with mock output (SimulationStepRunner). 64 pretend to run the steps with mock output (SimulationStepRunner).
28 """ 65 """
29 @property 66 @property
30 def stream_engine(self): 67 def stream_engine(self):
31 """Return the stream engine that this StepRunner uses, if meaningful. 68 """Return the stream engine that this StepRunner uses, if meaningful.
32 69
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
145 def run(inner): 182 def run(inner):
146 try: 183 try:
147 # Open file handles for IO redirection based on file names in 184 # Open file handles for IO redirection based on file names in
148 # step_dict. 185 # step_dict.
149 handles = { 186 handles = {
150 'stdout': step_stream, 187 'stdout': step_stream,
151 'stderr': step_stream, 188 'stderr': step_stream,
152 'stdin': None, 189 'stdin': None,
153 } 190 }
154 for key in handles: 191 for key in handles:
155 if key in step_dict: 192 fileName = step_dict.get(key)
156 handles[key] = open(step_dict[key], 193 if fileName:
157 'rb' if key == 'stdin' else 'wb') 194 handles[key] = open(fileName, 'rb' if key == 'stdin' else 'wb')
158 # The subprocess will inherit and close these handles. 195 # The subprocess will inherit and close these handles.
159 retcode = self._run_cmd( 196 retcode = self._run_cmd(
160 cmd=cmd, handles=handles, env=step_env, cwd=step_dict.get('cwd')) 197 cmd=cmd, handles=handles, env=step_env, cwd=step_dict.get('cwd'))
161 except OSError: 198 except OSError:
162 with step_stream.new_log_stream('exception') as l: 199 with step_stream.new_log_stream('exception') as l:
163 trace = traceback.format_exc().splitlines() 200 trace = traceback.format_exc().splitlines()
164 for line in trace: 201 for line in trace:
165 l.write_line(line) 202 l.write_line(line)
166 step_stream.set_step_status('EXCEPTION') 203 step_stream.set_step_status('EXCEPTION')
167 raise 204 raise
(...skipping 14 matching lines...) Expand all
182 return step_stream 219 return step_stream
183 220
184 return ReturnOpenStep() 221 return ReturnOpenStep()
185 222
186 def run_recipe(self, universe_view, recipe, properties): 223 def run_recipe(self, universe_view, recipe, properties):
187 with tempfile.NamedTemporaryFile() as f: 224 with tempfile.NamedTemporaryFile() as f:
188 cmd = [sys.executable, 225 cmd = [sys.executable,
189 universe_view.universe.package_deps.engine_recipes_py, 226 universe_view.universe.package_deps.engine_recipes_py,
190 '--package=%s' % universe_view.universe.config_file.path, 'run', 227 '--package=%s' % universe_view.universe.config_file.path, 'run',
191 '--output-result-json=%s' % f.name, recipe] 228 '--output-result-json=%s' % f.name, recipe]
192 cmd.extend(['%s=%s' % (k,repr(v)) for k, v in properties.iteritems()]) 229 cmd.extend(['%s=%s' % (k,repr(v)) for k, v in properties.iteritems()])
M-A Ruel 2016/05/06 14:12:51 [OT] don't you want to sort the resulting list jus
iannucci 2016/05/06 17:52:01 I mean... probably, but this code is going to be r
193 230
194 retcode = subprocess.call(cmd) 231 retcode = subprocess42.call(cmd)
195 result = json.load(f) 232 result = json.load(f)
196 if retcode != 0: 233 if retcode != 0:
197 raise recipe_api.StepFailure( 234 raise recipe_api.StepFailure(
198 'depend on %s with properties %r failed with %d.\n' 235 'depend on %s with properties %r failed with %d.\n'
199 'Recipe result: %r' % ( 236 'Recipe result: %r' % (
200 recipe, properties, retcode, result)) 237 recipe, properties, retcode, result))
201 return result 238 return result
202 239
203 @contextlib.contextmanager 240 @contextlib.contextmanager
204 def run_context(self): 241 def run_context(self):
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
244 to direct that subprocess's filehandle to. 281 to direct that subprocess's filehandle to.
245 env: the full environment to run the command in -- this is passed 282 env: the full environment to run the command in -- this is passed
246 unaltered to subprocess.Popen. 283 unaltered to subprocess.Popen.
247 cwd: the working directory of the command. 284 cwd: the working directory of the command.
248 """ 285 """
249 fhandles = {} 286 fhandles = {}
250 287
251 # If we are given StreamEngine.Streams, map them to PIPE for subprocess. 288 # If we are given StreamEngine.Streams, map them to PIPE for subprocess.
252 # We will manually forward them to their corresponding stream. 289 # We will manually forward them to their corresponding stream.
253 for key in ('stdout', 'stderr'): 290 for key in ('stdout', 'stderr'):
254 if (key in handles and 291 handle = handles.get(key)
255 isinstance(handles[key], stream.StreamEngine.Stream)): 292 if isinstance(handle, stream.StreamEngine.Stream):
256 fhandles[key] = subprocess.PIPE 293 fhandles[key] = subprocess42.PIPE
257 else: 294 else:
258 fhandles[key] = handles[key] 295 fhandles[key] = handle
259 296
260 # stdin must be a real handle, if it exists 297 # stdin must be a real handle, if it exists
261 fhandles['stdin'] = handles.get('stdin') 298 fhandles['stdin'] = handles.get('stdin')
262 299
263 if sys.platform.startswith('win'):
264 # Windows has a bad habit of opening a dialog when a console program
265 # crashes, rather than just letting it crash. Therefore, when a
266 # program crashes on Windows, we don't find out until the build step
267 # times out. This code prevents the dialog from appearing, so that we
268 # find out immediately and don't waste time waiting for a user to
269 # close the dialog.
270 import ctypes
271 # SetErrorMode(SEM_NOGPFAULTERRORBOX). For more information, see:
272 # https://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
273 ctypes.windll.kernel32.SetErrorMode(0x0002)
274 # CREATE_NO_WINDOW. For more information, see:
275 # https://msdn.microsoft.com/en-us/library/windows/desktop/ms684863.aspx
276 creationflags = 0x8000000
277 else:
278 creationflags = 0
279
280 with _modify_lookup_path(env.get('PATH')): 300 with _modify_lookup_path(env.get('PATH')):
281 proc = subprocess.Popen( 301 proc = subprocess42.Popen(
282 cmd, 302 cmd,
283 env=env, 303 env=env,
284 cwd=cwd, 304 cwd=cwd,
285 universal_newlines=True, 305 universal_newlines=True,
286 creationflags=creationflags, 306 detached=True,
287 **fhandles) 307 **fhandles)
288 308
289 # Safe to close file handles now that subprocess has inherited them. 309 # Safe to close file handles now that subprocess has inherited them.
290 for handle in fhandles.itervalues(): 310 for handle in fhandles.itervalues():
291 if isinstance(handle, file): 311 if isinstance(handle, file):
292 handle.close() 312 handle.close()
293 313
294 outlock = threading.Lock() 314 outstreams = {}
295 def make_pipe_thread(inhandle, outstream): 315 linebufs = {}
296 def body(): 316 for key in ('stdout', 'stderr'):
297 while True: 317 handle = handles.get(key)
298 line = inhandle.readline() 318 if isinstance(handle, stream.StreamEngine.Stream):
299 if not line: 319 outstreams[key] = handle
300 break 320 linebufs[key] = _streamingLinebuf()
301 line = line[:-1] # Strip newline for write_line's expectations
302 # (universal_newlines is on, so it's only \n)
303 outlock.acquire()
304 try:
305 outstream.write_line(line)
306 finally:
307 outlock.release()
308 t = threading.Thread(target=body, args=())
309 t.daemon = True
310 return t
311 321
312 threads = [] 322 if len(linebufs) > 0:
M-A Ruel 2016/05/06 14:12:52 if linebufs:
iannucci 2016/05/06 17:52:01 Done.
313 for key in ('stdout', 'stderr'): 323 for pipe, data in proc.yield_any(timeout=1):
314 if (key in handles and 324 if pipe is None:
315 isinstance(handles[key], stream.StreamEngine.Stream)): 325 continue
316 threads.append(make_pipe_thread(getattr(proc, key), handles[key])) 326 buf = linebufs.get(pipe)
327 if not buf:
328 continue
329 buf.ingest(data)
330 for line in buf:
331 outstreams[pipe].write_line(line)
332 else:
333 proc.wait()
317 334
318 for th in threads:
319 th.start()
320 proc.wait()
321 for th in threads:
322 th.join()
323 return proc.returncode 335 return proc.returncode
324 336
325 def _trigger_builds(self, step, trigger_specs): 337 def _trigger_builds(self, step, trigger_specs):
326 assert trigger_specs is not None 338 assert trigger_specs is not None
327 for trig in trigger_specs: 339 for trig in trigger_specs:
328 builder_name = trig.get('builder_name') 340 builder_name = trig.get('builder_name')
329 if not builder_name: 341 if not builder_name:
330 raise ValueError('Trigger spec: builder_name is not set') 342 raise ValueError('Trigger spec: builder_name is not set')
331 343
332 changes = trig.get('buildbot_changes', []) 344 changes = trig.get('buildbot_changes', [])
(...skipping 279 matching lines...) Expand 10 before | Expand all | Expand 10 after
612 supplied command, and only uses the |env| kwarg for modifying the environment 624 supplied command, and only uses the |env| kwarg for modifying the environment
613 of the child process. 625 of the child process.
614 """ 626 """
615 saved_path = os.environ['PATH'] 627 saved_path = os.environ['PATH']
616 try: 628 try:
617 if path is not None: 629 if path is not None:
618 os.environ['PATH'] = path 630 os.environ['PATH'] = path
619 yield 631 yield
620 finally: 632 finally:
621 os.environ['PATH'] = saved_path 633 os.environ['PATH'] = saved_path
OLDNEW
« no previous file with comments | « no previous file | recipe_engine/third_party/subprocess42.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698