Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(125)

Side by Side Diff: recipe_engine/step_runner.py

Issue 1959563002: Use subprocess42 in recipe_engine to avoid threading madness. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/recipes-py@master
Patch Set: *sigh* presubmit Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « recipe_engine/run.py ('k') | recipe_engine/third_party/subprocess42.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import cStringIO 5 import cStringIO
6 import collections 6 import collections
7 import contextlib 7 import contextlib
8 import datetime 8 import datetime
9 import json 9 import json
10 import os 10 import os
11 import re 11 import re
12 import subprocess 12 import subprocess
13 import sys 13 import sys
14 import tempfile 14 import tempfile
15 import threading 15 import threading
16 import traceback 16 import traceback
17 17
18 from . import recipe_test_api 18 from . import recipe_test_api
19 from . import stream 19 from . import stream
20 from . import types 20 from . import types
21 from . import util 21 from . import util
22 22
23 import subprocess
24 from .third_party import subprocess42
25
26
27 if sys.platform == "win32":
28 # Windows has a bad habit of opening a dialog when a console program
29 # crashes, rather than just letting it crash. Therefore, when a
30 # program crashes on Windows, we don't find out until the build step
31 # times out. This code prevents the dialog from appearing, so that we
32 # find out immediately and don't waste time waiting for a user to
33 # close the dialog.
34 import ctypes
35 # SetErrorMode(SEM_NOGPFAULTERRORBOX). For more information, see:
36 # https://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
37 ctypes.windll.kernel32.SetErrorMode(0x0002)
38
39
40 # MODE_SUBPROCESS42 uses subprocess42 instead of subprocess. This allows the
41 # correct handling of steps which daemonize themselves and hang on to the
42 # stdout/stderr handles after the process has exited.
43 #
44 # This mode flag was introduced as a temporary way to have both the old and new
45 # logic present in the engine simultaneously, in order to make a more controlled
46 # rollout (5/06/2016).
47 MODE_SUBPROCESS42 = False
48
49
50 class _streamingLinebuf(object):
51 def __init__(self):
52 self.buffedlines = []
53 self.extra = cStringIO.StringIO()
54
55 def ingest(self, data):
56 lines = data.splitlines()
57 endedOnLinebreak = data.endswith("\n")
58
59 if self.extra.tell():
60 # we had leftovers from some previous ingest
61 self.extra.write(lines[0])
62 if len(lines) > 1 or endedOnLinebreak:
63 lines[0] = self.extra.getvalue()
64 self.extra = cStringIO.StringIO()
65 else:
66 return
67
68 if not endedOnLinebreak:
69 self.extra.write(lines[-1])
70 lines = lines[:-1]
71
72 self.buffedlines += lines
73
74 def get_buffered(self):
75 ret = self.buffedlines
76 self.buffedlines = []
77 return ret
78
79
23 class StepRunner(object): 80 class StepRunner(object):
24 """A StepRunner is the interface to actually running steps. 81 """A StepRunner is the interface to actually running steps.
25 82
26 These can actually make subprocess calls (SubprocessStepRunner), or just 83 These can actually make subprocess calls (SubprocessStepRunner), or just
27 pretend to run the steps with mock output (SimulationStepRunner). 84 pretend to run the steps with mock output (SimulationStepRunner).
28 """ 85 """
29 @property 86 @property
30 def stream_engine(self): 87 def stream_engine(self):
31 """Return the stream engine that this StepRunner uses, if meaningful. 88 """Return the stream engine that this StepRunner uses, if meaningful.
32 89
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
145 def run(inner): 202 def run(inner):
146 try: 203 try:
147 # Open file handles for IO redirection based on file names in 204 # Open file handles for IO redirection based on file names in
148 # step_dict. 205 # step_dict.
149 handles = { 206 handles = {
150 'stdout': step_stream, 207 'stdout': step_stream,
151 'stderr': step_stream, 208 'stderr': step_stream,
152 'stdin': None, 209 'stdin': None,
153 } 210 }
154 for key in handles: 211 for key in handles:
155 if key in step_dict: 212 fileName = step_dict.get(key)
156 handles[key] = open(step_dict[key], 213 if fileName:
157 'rb' if key == 'stdin' else 'wb') 214 handles[key] = open(fileName, 'rb' if key == 'stdin' else 'wb')
158 # The subprocess will inherit and close these handles. 215 # The subprocess will inherit and close these handles.
159 retcode = self._run_cmd( 216 retcode = self._run_cmd(
160 cmd=cmd, handles=handles, env=step_env, cwd=step_dict.get('cwd')) 217 cmd=cmd, handles=handles, env=step_env, cwd=step_dict.get('cwd'))
161 except OSError: 218 except OSError:
162 with step_stream.new_log_stream('exception') as l: 219 with step_stream.new_log_stream('exception') as l:
163 trace = traceback.format_exc().splitlines() 220 trace = traceback.format_exc().splitlines()
164 for line in trace: 221 for line in trace:
165 l.write_line(line) 222 l.write_line(line)
166 step_stream.set_step_status('EXCEPTION') 223 step_stream.set_step_status('EXCEPTION')
167 raise 224 raise
(...skipping 16 matching lines...) Expand all
184 return ReturnOpenStep() 241 return ReturnOpenStep()
185 242
186 def run_recipe(self, universe_view, recipe, properties): 243 def run_recipe(self, universe_view, recipe, properties):
187 with tempfile.NamedTemporaryFile() as f: 244 with tempfile.NamedTemporaryFile() as f:
188 cmd = [sys.executable, 245 cmd = [sys.executable,
189 universe_view.universe.package_deps.engine_recipes_py, 246 universe_view.universe.package_deps.engine_recipes_py,
190 '--package=%s' % universe_view.universe.config_file.path, 'run', 247 '--package=%s' % universe_view.universe.config_file.path, 'run',
191 '--output-result-json=%s' % f.name, recipe] 248 '--output-result-json=%s' % f.name, recipe]
192 cmd.extend(['%s=%s' % (k,repr(v)) for k, v in properties.iteritems()]) 249 cmd.extend(['%s=%s' % (k,repr(v)) for k, v in properties.iteritems()])
193 250
194 retcode = subprocess.call(cmd) 251 if MODE_SUBPROCESS42:
252 retcode = subprocess42.call(cmd)
253 else:
254 retcode = subprocess.call(cmd)
195 result = json.load(f) 255 result = json.load(f)
196 if retcode != 0: 256 if retcode != 0:
197 raise recipe_api.StepFailure( 257 raise recipe_api.StepFailure(
198 'depend on %s with properties %r failed with %d.\n' 258 'depend on %s with properties %r failed with %d.\n'
199 'Recipe result: %r' % ( 259 'Recipe result: %r' % (
200 recipe, properties, retcode, result)) 260 recipe, properties, retcode, result))
201 return result 261 return result
202 262
203 @contextlib.contextmanager 263 @contextlib.contextmanager
204 def run_context(self): 264 def run_context(self):
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
239 299
240 Args: 300 Args:
241 cmd: a subprocess-style command list, with command first then args. 301 cmd: a subprocess-style command list, with command first then args.
242 handles: A dictionary from ('stdin', 'stdout', 'stderr'), each value 302 handles: A dictionary from ('stdin', 'stdout', 'stderr'), each value
243 being *either* a stream.StreamEngine.Stream or a python file object 303 being *either* a stream.StreamEngine.Stream or a python file object
244 to direct that subprocess's filehandle to. 304 to direct that subprocess's filehandle to.
245 env: the full environment to run the command in -- this is passed 305 env: the full environment to run the command in -- this is passed
246 unaltered to subprocess.Popen. 306 unaltered to subprocess.Popen.
247 cwd: the working directory of the command. 307 cwd: the working directory of the command.
248 """ 308 """
309 if MODE_SUBPROCESS42:
310 PIPE = subprocess42.PIPE
311 POPEN = subprocess42.Popen
312 else:
313 PIPE = subprocess.PIPE
314 POPEN = subprocess.Popen
315
249 fhandles = {} 316 fhandles = {}
250 317
251 # If we are given StreamEngine.Streams, map them to PIPE for subprocess. 318 # If we are given StreamEngine.Streams, map them to PIPE for subprocess.
252 # We will manually forward them to their corresponding stream. 319 # We will manually forward them to their corresponding stream.
253 for key in ('stdout', 'stderr'): 320 for key in ('stdout', 'stderr'):
254 if (key in handles and 321 handle = handles.get(key)
255 isinstance(handles[key], stream.StreamEngine.Stream)): 322 if isinstance(handle, stream.StreamEngine.Stream):
256 fhandles[key] = subprocess.PIPE 323 fhandles[key] = PIPE
257 else: 324 else:
258 fhandles[key] = handles[key] 325 fhandles[key] = handle
259 326
260 # stdin must be a real handle, if it exists 327 # stdin must be a real handle, if it exists
261 fhandles['stdin'] = handles.get('stdin') 328 fhandles['stdin'] = handles.get('stdin')
262 329
263 if sys.platform.startswith('win'): 330 kwargs = fhandles.copy()
264 # Windows has a bad habit of opening a dialog when a console program 331 if MODE_SUBPROCESS42:
265 # crashes, rather than just letting it crash. Therefore, when a 332 kwargs['detached'] = True
266 # program crashes on Windows, we don't find out until the build step
267 # times out. This code prevents the dialog from appearing, so that we
268 # find out immediately and don't waste time waiting for a user to
269 # close the dialog.
270 import ctypes
271 # SetErrorMode(SEM_NOGPFAULTERRORBOX). For more information, see:
272 # https://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
273 ctypes.windll.kernel32.SetErrorMode(0x0002)
274 # CREATE_NO_WINDOW. For more information, see:
275 # https://msdn.microsoft.com/en-us/library/windows/desktop/ms684863.aspx
276 creationflags = 0x8000000
277 else: 333 else:
278 creationflags = 0 334 if sys.platform == "win32":
335 # CREATE_NO_WINDOW. For more information, see:
336 # https://msdn.microsoft.com/en-us/library/windows/desktop/ms684863.aspx
337 kwargs['creationflags'] = 0x8000000
279 338
280 with _modify_lookup_path(env.get('PATH')): 339 with _modify_lookup_path(env.get('PATH')):
281 proc = subprocess.Popen( 340 proc = POPEN(
282 cmd, 341 cmd,
283 env=env, 342 env=env,
284 cwd=cwd, 343 cwd=cwd,
285 universal_newlines=True, 344 universal_newlines=True,
286 creationflags=creationflags, 345 **kwargs)
287 **fhandles)
288 346
289 # Safe to close file handles now that subprocess has inherited them. 347 # Safe to close file handles now that subprocess has inherited them.
290 for handle in fhandles.itervalues(): 348 for handle in fhandles.itervalues():
291 if isinstance(handle, file): 349 if isinstance(handle, file):
292 handle.close() 350 handle.close()
293 351
352 outstreams = {}
353 linebufs = {}
354
355 # BEGIN[ non-subprocess42
294 outlock = threading.Lock() 356 outlock = threading.Lock()
295 def make_pipe_thread(inhandle, outstream): 357 def make_pipe_thread(inhandle, outstream):
296 def body(): 358 def body():
297 while True: 359 while True:
298 line = inhandle.readline() 360 line = inhandle.readline()
299 if not line: 361 if not line:
300 break 362 break
301 line = line[:-1] # Strip newline for write_line's expectations 363 line = line[:-1] # Strip newline for write_line's expectations
302 # (universal_newlines is on, so it's only \n) 364 # (universal_newlines is on, so it's only \n)
303 outlock.acquire() 365 outlock.acquire()
304 try: 366 try:
305 outstream.write_line(line) 367 outstream.write_line(line)
306 finally: 368 finally:
307 outlock.release() 369 outlock.release()
308 t = threading.Thread(target=body, args=()) 370 t = threading.Thread(target=body, args=())
309 t.daemon = True 371 t.daemon = True
310 return t 372 return t
311 373
312 threads = [] 374 threads = []
375 ## ]END non-subprocess42
376
313 for key in ('stdout', 'stderr'): 377 for key in ('stdout', 'stderr'):
314 if (key in handles and 378 handle = handles.get(key)
315 isinstance(handles[key], stream.StreamEngine.Stream)): 379 if isinstance(handle, stream.StreamEngine.Stream):
316 threads.append(make_pipe_thread(getattr(proc, key), handles[key])) 380 if MODE_SUBPROCESS42:
381 outstreams[key] = handle
382 linebufs[key] = _streamingLinebuf()
383 else:
384 threads.append(make_pipe_thread(getattr(proc, key), handle))
317 385
318 for th in threads: 386 if MODE_SUBPROCESS42:
319 th.start() 387 if linebufs:
320 proc.wait() 388 for pipe, data in proc.yield_any(timeout=1):
321 for th in threads: 389 if pipe is None:
322 th.join() 390 continue
391 buf = linebufs.get(pipe)
392 if not buf:
393 continue
394 buf.ingest(data)
395 for line in buf.get_buffered():
396 outstreams[pipe].write_line(line)
397 else:
398 proc.wait()
399 else:
400 for th in threads:
401 th.start()
402 proc.wait()
403 for th in threads:
404 th.join()
405
323 return proc.returncode 406 return proc.returncode
324 407
325 def _trigger_builds(self, step, trigger_specs): 408 def _trigger_builds(self, step, trigger_specs):
326 assert trigger_specs is not None 409 assert trigger_specs is not None
327 for trig in trigger_specs: 410 for trig in trigger_specs:
328 builder_name = trig.get('builder_name') 411 builder_name = trig.get('builder_name')
329 if not builder_name: 412 if not builder_name:
330 raise ValueError('Trigger spec: builder_name is not set') 413 raise ValueError('Trigger spec: builder_name is not set')
331 414
332 changes = trig.get('buildbot_changes', []) 415 changes = trig.get('buildbot_changes', [])
(...skipping 279 matching lines...) Expand 10 before | Expand all | Expand 10 after
612 supplied command, and only uses the |env| kwarg for modifying the environment 695 supplied command, and only uses the |env| kwarg for modifying the environment
613 of the child process. 696 of the child process.
614 """ 697 """
615 saved_path = os.environ['PATH'] 698 saved_path = os.environ['PATH']
616 try: 699 try:
617 if path is not None: 700 if path is not None:
618 os.environ['PATH'] = path 701 os.environ['PATH'] = path
619 yield 702 yield
620 finally: 703 finally:
621 os.environ['PATH'] = saved_path 704 os.environ['PATH'] = saved_path
OLDNEW
« no previous file with comments | « recipe_engine/run.py ('k') | recipe_engine/third_party/subprocess42.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698