Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(568)

Unified Diff: testing/legion/client_rpc_methods.py

Issue 870103005: Adding a server side, non-blocking subprocess mechanism. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Adding a streaming read mechanism. Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « testing/legion/client_lib.py ('k') | testing/legion/common_lib.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: testing/legion/client_rpc_methods.py
diff --git a/testing/legion/client_rpc_methods.py b/testing/legion/client_rpc_methods.py
index 24a552ecb5d600cb5a41d7621d2957bffe78623e..836d2c97f872557760e485fb9c2423824ba9de03 100644
--- a/testing/legion/client_rpc_methods.py
+++ b/testing/legion/client_rpc_methods.py
@@ -4,39 +4,139 @@
"""Defines the client RPC methods."""
+import os
+import sys
import logging
-import subprocess
import threading
+#pylint: disable=relative-import
+import common_lib
+
+# Map swarming_client to use subprocess42
+sys.path.append(common_lib.SWARMING_DIR)
+
+from utils import subprocess42
+
class RPCMethods(object):
"""Class exposing RPC methods."""
+ _dotted_whitelist = ['subprocess']
+
def __init__(self, server):
- self.server = server
+ self._server = server
+ self.subprocess = Subprocess
+
+ def _dispatch(self, method, params):
+ obj = self
+ if '.' in method:
+ # Allow only white listed dotted names
+ name, method = method.split('.')
+ assert name in self._dotted_whitelist
+ obj = getattr(self, name)
+ return getattr(obj, method)(*params)
def Echo(self, message):
"""Simple RPC method to print and return a message."""
logging.info('Echoing %s', message)
return 'echo %s' % str(message)
- def Subprocess(self, cmd):
- """Run the commands in a subprocess.
-
- Returns:
- (returncode, stdout, stderr).
- """
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- return (p.returncode, stdout, stderr)
-
def Quit(self):
- """Call server.shutdown in another thread.
+ """Call _server.shutdown in another thread.
This is needed because server.shutdown waits for the server to actually
quit. However the server cannot shutdown until it completes handling this
call. Calling this in the same thread results in a deadlock.
"""
- t = threading.Thread(target=self.server.shutdown)
+ t = threading.Thread(target=self._server.shutdown)
t.start()
+
+
+class Subprocess(object):
+ """Implements a server-based non-blocking subprocess.
+
+ This non-blocking subprocess allows the caller to continue operating while
+ also able to interact with this subprocess based on a key returned to
+ the caller at the time of creation.
+ """
+
+ _processes = {}
+ _process_next_id = 0
+ _creation_lock = threading.Lock()
+
+ def __init__(self, cmd):
+ self.proc = subprocess42.Popen(cmd, stdout=subprocess42.PIPE,
+ stderr=subprocess42.PIPE)
+ self.stdout = ''
+ self.stderr = ''
+ self.stdout_lock = threading.Lock()
M-A Ruel 2015/02/12 22:16:11 one Lock would be enough.
Mike Meade 2015/02/12 22:49:00 Done.
+ self.stderr_lock = threading.Lock()
+ threading.Thread(target=self._run).start()
+
+ def _run(self):
+ for pipe, data in self.proc.yield_any():
+ if pipe == 'stdout':
+ with self.stdout_lock:
+ self.stdout += data
+ else:
+ with self.stderr_lock:
+ self.stderr += data
+
+ @classmethod
+ def Popen(cls, cmd):
+ with cls._creation_lock:
+ key = 'Process%d' % cls._process_next_id
+ cls._process_next_id += 1
+ logging.debug('Creating process %s', key)
+ process = cls(cmd)
+ cls._processes[key] = process
+ return key
+
+ @classmethod
+ def Terminate(cls, key):
+ logging.debug('Terminating and deleting process %s', key)
+ return cls._processes.pop(key).proc.terminate()
+
+ @classmethod
+ def Kill(cls, key):
+ logging.debug('Killing and deleting process %s', key)
+ return cls._processes.pop(key).proc.kill()
+
+ @classmethod
+ def Delete(cls, key):
+ logging.debug('Deleting process %s', key)
+ cls._processes.pop(key)
+
+ @classmethod
+ def GetReturncode(cls, key):
+ return cls._processes[key].proc.returncode
+
+ @classmethod
+ def GetStdout(cls, key):
M-A Ruel 2015/02/12 22:16:11 It's rare for a Get function to have side effect.
Mike Meade 2015/02/12 22:49:00 Changed these to ReadStd*** methods and added usag
+ proc = cls._processes[key]
+ with proc.stdout_lock:
+ # Perform a "read" on the stdout data
+ stdout = proc.stdout
+ proc.stdout = ''
+ return stdout
+
+ @classmethod
+ def GetStderr(cls, key):
+ proc = cls._processes[key]
+ with proc.stderr_lock:
+ # Perform a "read" on the stderr data
+ stderr = proc.stderr
+ proc.stderr = ''
+ return stderr
+
+ @classmethod
+ def Wait(cls, key):
+ return cls._processes[key].proc.wait()
+
+ @classmethod
+ def Poll(cls, key):
+ return cls._processes[key].proc.poll()
+
+ @classmethod
+ def GetPid(cls, key):
+ return cls._processes[key].proc.pid
« no previous file with comments | « testing/legion/client_lib.py ('k') | testing/legion/common_lib.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698