Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1100)

Unified Diff: testing/legion/client_rpc_methods.py

Issue 935333002: Update from https://crrev.com/316786 (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « testing/legion/client_lib.py ('k') | testing/legion/common_lib.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: testing/legion/client_rpc_methods.py
diff --git a/testing/legion/client_rpc_methods.py b/testing/legion/client_rpc_methods.py
index 24a552ecb5d600cb5a41d7621d2957bffe78623e..e43a7d8db3163ff18767cd1f7969e4354f6fab81 100644
--- a/testing/legion/client_rpc_methods.py
+++ b/testing/legion/client_rpc_methods.py
@@ -4,39 +4,148 @@
"""Defines the client RPC methods."""
+import os
+import sys
import logging
-import subprocess
import threading
+#pylint: disable=relative-import
+import common_lib
+
+# Map swarming_client to use subprocess42
+sys.path.append(common_lib.SWARMING_DIR)
+
+from utils import subprocess42
+
class RPCMethods(object):
"""Class exposing RPC methods."""
+ _dotted_whitelist = ['subprocess']
+
def __init__(self, server):
- self.server = server
+ self._server = server
+ self.subprocess = Subprocess
+
+ def _dispatch(self, method, params):
+ obj = self
+ if '.' in method:
+ # Allow only white listed dotted names
+ name, method = method.split('.')
+ assert name in self._dotted_whitelist
+ obj = getattr(self, name)
+ return getattr(obj, method)(*params)
def Echo(self, message):
"""Simple RPC method to print and return a message."""
logging.info('Echoing %s', message)
return 'echo %s' % str(message)
- def Subprocess(self, cmd):
- """Run the commands in a subprocess.
-
- Returns:
- (returncode, stdout, stderr).
- """
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- return (p.returncode, stdout, stderr)
-
def Quit(self):
- """Call server.shutdown in another thread.
+ """Call _server.shutdown in another thread.
This is needed because server.shutdown waits for the server to actually
quit. However the server cannot shutdown until it completes handling this
call. Calling this in the same thread results in a deadlock.
"""
- t = threading.Thread(target=self.server.shutdown)
+ t = threading.Thread(target=self._server.shutdown)
t.start()
+
+
+class Subprocess(object):
+ """Implements a server-based non-blocking subprocess.
+
+ This non-blocking subprocess allows the caller to continue operating while
+ also able to interact with this subprocess based on a key returned to
+ the caller at the time of creation.
+ """
+
+ _processes = {}
+ _process_next_id = 0
+ _creation_lock = threading.Lock()
+
+ def __init__(self, cmd):
+ self.proc = subprocess42.Popen(cmd, stdout=subprocess42.PIPE,
+ stderr=subprocess42.PIPE)
+ self.stdout = ''
+ self.stderr = ''
+ self.data_lock = threading.Lock()
+ threading.Thread(target=self._run).start()
+
+ def _run(self):
+ for pipe, data in self.proc.yield_any():
+ with self.data_lock:
+ if pipe == 'stdout':
+ self.stdout += data
+ else:
+ self.stderr += data
+
+ @classmethod
+ def Popen(cls, cmd):
+ with cls._creation_lock:
+ key = 'Process%d' % cls._process_next_id
+ cls._process_next_id += 1
+ logging.debug('Creating process %s', key)
+ process = cls(cmd)
+ cls._processes[key] = process
+ return key
+
+ @classmethod
+ def Terminate(cls, key):
+ logging.debug('Terminating and deleting process %s', key)
+ return cls._processes.pop(key).proc.terminate()
+
+ @classmethod
+ def Kill(cls, key):
+ logging.debug('Killing and deleting process %s', key)
+ return cls._processes.pop(key).proc.kill()
+
+ @classmethod
+ def Delete(cls, key):
+ logging.debug('Deleting process %s', key)
+ cls._processes.pop(key)
+
+ @classmethod
+ def GetReturncode(cls, key):
+ return cls._processes[key].proc.returncode
+
+ @classmethod
+ def ReadStdout(cls, key):
+ """Returns all stdout since the last call to ReadStdout.
+
+ This call allows the user to read stdout while the process is running.
+ However each call will flush the local stdout buffer. In order to make
+ multiple calls to ReadStdout and to retain the entire output the results
+ of this call will need to be buffered in the calling code.
+ """
+ proc = cls._processes[key]
+ with proc.data_lock:
+ # Perform a "read" on the stdout data
+ stdout = proc.stdout
+ proc.stdout = ''
+ return stdout
+
+ @classmethod
+ def ReadStderr(cls, key):
+ """Returns all stderr read since the last call to ReadStderr.
+
+ See ReadStdout for additional details.
+ """
+ proc = cls._processes[key]
+ with proc.data_lock:
+ # Perform a "read" on the stderr data
+ stderr = proc.stderr
+ proc.stderr = ''
+ return stderr
+
+ @classmethod
+ def Wait(cls, key):
+ return cls._processes[key].proc.wait()
+
+ @classmethod
+ def Poll(cls, key):
+ return cls._processes[key].proc.poll()
+
+ @classmethod
+ def GetPid(cls, key):
+ return cls._processes[key].proc.pid
« no previous file with comments | « testing/legion/client_lib.py ('k') | testing/legion/common_lib.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698