Chromium Code Reviews| Index: recipe_engine/step_runner.py |
| diff --git a/recipe_engine/step_runner.py b/recipe_engine/step_runner.py |
| index c4d166f9abc283ddd36ece58a27ecb81f8756d06..453b9a8157076d7d4235f522451600d7512a4584 100644 |
| --- a/recipe_engine/step_runner.py |
| +++ b/recipe_engine/step_runner.py |
| @@ -20,6 +20,63 @@ from . import stream |
| from . import types |
| from . import util |
| +import subprocess |
| +from .third_party import subprocess42 |
| + |
| + |
| +if sys.platform == "win32": |
| + # Windows has a bad habit of opening a dialog when a console program |
| + # crashes, rather than just letting it crash. Therefore, when a |
| + # program crashes on Windows, we don't find out until the build step |
| + # times out. This code prevents the dialog from appearing, so that we |
| + # find out immediately and don't waste time waiting for a user to |
| + # close the dialog. |
| + import ctypes |
| + # SetErrorMode(SEM_NOGPFAULTERRORBOX). For more information, see: |
| + # https://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx |
| + ctypes.windll.kernel32.SetErrorMode(0x0002) |
| + |
| + |
| +# MODE_SUBPROCESS42 uses subprocess42 instead of subprocess. This allows the |
| +# correct handling of steps which daemonize themselves and hang on to the |
| +# stdout/stderr handles after the process has exited. |
| +# |
| +# This mode flag was introduced as a temporary way to have both the old and new |
| +# logic present in the engine simultaneously, in order to make a more controlled |
| +# rollout (5/06/2016). |
| +MODE_SUBPROCESS42 = False |
| + |
| + |
| +class _streamingLinebuf(object): |
| + def __init__(self): |
| + self.buffedlines = [] |
| + self.extra = cStringIO.StringIO() |
| + |
| + def ingest(self, data): |
| + lines = data.splitlines() |
| + endedOnLinebreak = data.endswith("\n") |
| + |
| + if self.extra.tell(): |
| + # we had leftovers from some previous ingest |
| + self.extra.write(lines[0]) |
| + if len(lines) > 1 or endedOnLinebreak: |
| + lines[0] = self.extra.getvalue() |
| + self.extra = cStringIO.StringIO() |
| + else: |
| + return |
| + |
| + if not endedOnLinebreak: |
| + self.extra.write(lines[-1]) |
| + lines = lines[:-1] |
| + |
| + self.buffedlines += lines |
| + |
| + def get_buffered(self): |
| + ret = self.buffedlines |
| + self.buffedlines = [] |
| + return ret |
| + |
| + |
| class StepRunner(object): |
| """A StepRunner is the interface to actually running steps. |
| @@ -152,9 +209,9 @@ class SubprocessStepRunner(StepRunner): |
| 'stdin': None, |
| } |
| for key in handles: |
| - if key in step_dict: |
| - handles[key] = open(step_dict[key], |
| - 'rb' if key == 'stdin' else 'wb') |
| + fileName = step_dict.get(key) |
| + if fileName: |
| + handles[key] = open(fileName, 'rb' if key == 'stdin' else 'wb') |
| # The subprocess will inherit and close these handles. |
| retcode = self._run_cmd( |
| cmd=cmd, handles=handles, env=step_env, cwd=step_dict.get('cwd')) |
| @@ -191,7 +248,10 @@ class SubprocessStepRunner(StepRunner): |
| '--output-result-json=%s' % f.name, recipe] |
| cmd.extend(['%s=%s' % (k,repr(v)) for k, v in properties.iteritems()]) |
| - retcode = subprocess.call(cmd) |
| + if MODE_SUBPROCESS42: |
| + retcode = subprocess42.call(cmd) |
| + else: |
| + retcode = subprocess42.call(cmd) |
|
martiniss
2016/05/06 18:09:14
wait wat? is this right??
iannucci
2016/05/06 18:18:04
oops. (not that this one matters particularly much
|
| result = json.load(f) |
| if retcode != 0: |
| raise recipe_api.StepFailure( |
| @@ -246,51 +306,53 @@ class SubprocessStepRunner(StepRunner): |
| unaltered to subprocess.Popen. |
| cwd: the working directory of the command. |
| """ |
| + if MODE_SUBPROCESS42: |
| + PIPE = subprocess42.PIPE |
| + POPEN = subprocess42.Popen |
| + else: |
| + PIPE = subprocess.PIPE |
|
iannucci
2016/05/06 18:02:33
PIPE is actually always the same value (subprocess
|
| + POPEN = subprocess.Popen |
| + |
| fhandles = {} |
| # If we are given StreamEngine.Streams, map them to PIPE for subprocess. |
| # We will manually forward them to their corresponding stream. |
| for key in ('stdout', 'stderr'): |
| - if (key in handles and |
| - isinstance(handles[key], stream.StreamEngine.Stream)): |
| - fhandles[key] = subprocess.PIPE |
| + handle = handles.get(key) |
| + if isinstance(handle, stream.StreamEngine.Stream): |
| + fhandles[key] = PIPE |
| else: |
| - fhandles[key] = handles[key] |
| + fhandles[key] = handle |
| # stdin must be a real handle, if it exists |
| fhandles['stdin'] = handles.get('stdin') |
| - if sys.platform.startswith('win'): |
| - # Windows has a bad habit of opening a dialog when a console program |
| - # crashes, rather than just letting it crash. Therefore, when a |
| - # program crashes on Windows, we don't find out until the build step |
| - # times out. This code prevents the dialog from appearing, so that we |
| - # find out immediately and don't waste time waiting for a user to |
| - # close the dialog. |
| - import ctypes |
| - # SetErrorMode(SEM_NOGPFAULTERRORBOX). For more information, see: |
| - # https://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx |
| - ctypes.windll.kernel32.SetErrorMode(0x0002) |
| - # CREATE_NO_WINDOW. For more information, see: |
| - # https://msdn.microsoft.com/en-us/library/windows/desktop/ms684863.aspx |
| - creationflags = 0x8000000 |
| - else: |
| - creationflags = 0 |
| + kwargs = fhandles.copy() |
| + if MODE_SUBPROCESS42: |
| + kwargs['detached'] = True |
| + else : |
| + if sys.platform == "win32": |
| + # CREATE_NO_WINDOW. For more information, see: |
| + # https://msdn.microsoft.com/en-us/library/windows/desktop/ms684863.aspx |
| + kwargs['creationflags'] = 0x8000000 |
| with _modify_lookup_path(env.get('PATH')): |
| - proc = subprocess.Popen( |
| + proc = POPEN( |
| cmd, |
| env=env, |
| cwd=cwd, |
| universal_newlines=True, |
| - creationflags=creationflags, |
| - **fhandles) |
| + **kwargs) |
| # Safe to close file handles now that subprocess has inherited them. |
| for handle in fhandles.itervalues(): |
| if isinstance(handle, file): |
| handle.close() |
| + outstreams = {} |
| + linebufs = {} |
| + |
| + # BEGIN[ non-subprocess42 |
| outlock = threading.Lock() |
| def make_pipe_thread(inhandle, outstream): |
| def body(): |
| @@ -310,16 +372,37 @@ class SubprocessStepRunner(StepRunner): |
| return t |
| threads = [] |
| + ## ]END non-subprocess42 |
| + |
| for key in ('stdout', 'stderr'): |
| - if (key in handles and |
| - isinstance(handles[key], stream.StreamEngine.Stream)): |
| - threads.append(make_pipe_thread(getattr(proc, key), handles[key])) |
| - |
| - for th in threads: |
| - th.start() |
| - proc.wait() |
| - for th in threads: |
| - th.join() |
| + handle = handles.get(key) |
| + if isinstance(handle, stream.StreamEngine.Stream): |
| + if MODE_SUBPROCESS42: |
| + outstreams[key] = handle |
| + linebufs[key] = _streamingLinebuf() |
| + else: |
| + threads.append(make_pipe_thread(getattr(proc, key), handle)) |
| + |
| + if MODE_SUBPROCESS42: |
| + if linebufs: |
| + for pipe, data in proc.yield_any(timeout=1): |
| + if pipe is None: |
| + continue |
| + buf = linebufs.get(pipe) |
| + if not buf: |
| + continue |
| + buf.ingest(data) |
| + for line in buf.get_buffered(): |
| + outstreams[pipe].write_line(line) |
| + else: |
| + proc.wait() |
| + else: |
| + for th in threads: |
| + th.start() |
| + proc.wait() |
| + for th in threads: |
| + th.join() |
| + |
| return proc.returncode |
| def _trigger_builds(self, step, trigger_specs): |