Index: testing/legion/process.py |
diff --git a/testing/legion/rpc_methods.py b/testing/legion/process.py |
similarity index 65% |
copy from testing/legion/rpc_methods.py |
copy to testing/legion/process.py |
index 1a59f288afde5fb03c1f8fb3d0ebf4fa6de5bae4..356db6131bfa123c0a97f2bc1b7f0d908544ab22 100644 |
--- a/testing/legion/rpc_methods.py |
+++ b/testing/legion/process.py |
@@ -2,11 +2,15 @@ |
# Use of this source code is governed by a BSD-style license that can be |
# found in the LICENSE file. |
-"""Defines the task RPC methods.""" |
+"""RPC compatible subprocess-type module. |
+ |
+This module defined both a task-side process class as well as a controller-side |
+process wrapper for easier access and usage of the task-side process. |
+""" |
-import os |
-import sys |
import logging |
+import subprocess |
+import sys |
import threading |
#pylint: disable=relative-import |
@@ -18,46 +22,75 @@ sys.path.append(common_lib.SWARMING_DIR) |
from utils import subprocess42 |
-class RPCMethods(object): |
- """Class exposing RPC methods.""" |
+class ControllerProcessWrapper(object): |
+ """Controller-side process wrapper class. |
+ |
+ This class provides a more intuitive interface to task-side processes |
+ than calling the methods directly using the RPC object. |
+ """ |
+ |
+ def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None): |
+ self._rpc = rpc |
+ self._id = rpc.subprocess.Process(cmd) |
+ if verbose: |
+ self._rpc.subprocess.SetVerbose(self._id) |
+ if detached: |
+ self._rpc.subprocess.SetDetached(self._id) |
+ if cwd: |
+ self._rpc.subprocess.SetCwd(self._rpc, cwd) |
+ self._rpc.subprocess.Start(self._id) |
+ |
+ def Terminate(self): |
+ logging.debug('Terminating process %s', self._id) |
+ return self._rpc.subprocess.Terminate(self._id) |
+ |
+ def Kill(self): |
+ logging.debug('Killing process %s', self._id) |
+ self._rpc.subprocess.Kill(self._id) |
+ |
+ def Delete(self): |
+ return self._rpc.subprocess.Delete(self._id) |
- _dotted_whitelist = ['subprocess'] |
+ def GetReturncode(self): |
+ return self._rpc.subprocess.GetReturncode(self._id) |
- def __init__(self, server): |
- self._server = server |
- self.subprocess = Subprocess |
+ def ReadStdout(self): |
+ """Returns all stdout since the last call to ReadStdout. |
- def _dispatch(self, method, params): |
- obj = self |
- if '.' in method: |
- # Allow only white listed dotted names |
- name, method = method.split('.') |
- assert name in self._dotted_whitelist |
- obj = getattr(self, name) |
- return getattr(obj, method)(*params) |
+ This call allows the user to read stdout while the process is running. |
+ However each call will flush the local stdout buffer. In order to make |
+ multiple calls to ReadStdout and to retain the entire output the results |
+ of this call will need to be buffered in the calling code. |
+ """ |
+ return self._rpc.subprocess.ReadStdout(self._id) |
- def Echo(self, message): |
- """Simple RPC method to print and return a message.""" |
- logging.info('Echoing %s', message) |
- return 'echo %s' % str(message) |
+ def ReadStderr(self): |
+ """Returns all stderr read since the last call to ReadStderr. |
- def AbsPath(self, path): |
- """Returns the absolute path.""" |
- return os.path.abspath(path) |
+ See ReadStdout for additional details. |
+ """ |
+ return self._rpc.subprocess.ReadStderr(self._id) |
- def Quit(self): |
- """Call _server.shutdown in another thread. |
+ def ReadOutput(self): |
+ """Returns the (stdout, stderr) since the last Read* call. |
- This is needed because server.shutdown waits for the server to actually |
- quit. However the server cannot shutdown until it completes handling this |
- call. Calling this in the same thread results in a deadlock. |
+ See ReadStdout for additional details. |
""" |
- t = threading.Thread(target=self._server.shutdown) |
- t.start() |
+ return self._rpc.subprocess.ReadOutput(self._id) |
+ |
+ def Wait(self): |
+ return self._rpc.subprocess.Wait(self._id) |
+ |
+ def Poll(self): |
+ return self._rpc.subprocess.Poll(self._id) |
+ |
+ def GetPid(self): |
+ return self._rpc.subprocess.GetPid(self._id) |
+ |
-class Subprocess(object): |
- """Implements a server-based non-blocking subprocess. |
+class Process(object): |
+ """Implements a task-side non-blocking subprocess. |
This non-blocking subprocess allows the caller to continue operating while |
also able to interact with this subprocess based on a key returned to |
@@ -108,7 +141,7 @@ class Subprocess(object): |
with cls._creation_lock: |
key = 'Process%d' % cls._process_next_id |
cls._process_next_id += 1 |
- logging.debug('Creating process %s', key) |
+ logging.debug('Creating process %s with cmd %r', key, cmd) |
process = cls(cmd) |
cls._processes[key] = process |
return key |
@@ -133,13 +166,13 @@ class Subprocess(object): |
@classmethod |
def SetDetached(cls, key): |
"""Creates a detached process.""" |
- logging.debug('Setting %s to run detached', key) |
+ logging.debug('Setting %s.detached = True', key) |
cls._processes[key].detached = True |
@classmethod |
def SetVerbose(cls, key): |
"""Sets the stdout and stderr to be emitted locally.""" |
- logging.debug('Setting %s to be verbose', key) |
+ logging.debug('Setting %s.verbose = True', key) |
cls._processes[key].verbose = True |
@classmethod |