OLD | NEW |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Defines the task RPC methods.""" | 5 """RPC compatible subprocess-type module. |
6 | 6 |
7 import os | 7 This module defined both a task-side process class as well as a controller-side |
| 8 process wrapper for easier access and usage of the task-side process. |
| 9 """ |
| 10 |
| 11 import logging |
| 12 import subprocess |
8 import sys | 13 import sys |
9 import logging | |
10 import threading | 14 import threading |
11 | 15 |
12 #pylint: disable=relative-import | 16 #pylint: disable=relative-import |
13 import common_lib | 17 import common_lib |
14 | 18 |
15 # Map swarming_client to use subprocess42 | 19 # Map swarming_client to use subprocess42 |
16 sys.path.append(common_lib.SWARMING_DIR) | 20 sys.path.append(common_lib.SWARMING_DIR) |
17 | 21 |
18 from utils import subprocess42 | 22 from utils import subprocess42 |
19 | 23 |
20 | 24 |
21 class RPCMethods(object): | 25 class ControllerProcessWrapper(object): |
22 """Class exposing RPC methods.""" | 26 """Controller-side process wrapper class. |
23 | 27 |
24 _dotted_whitelist = ['subprocess'] | 28 This class provides a more intuitive interface to task-side processes |
| 29 than calling the methods directly using the RPC object. |
| 30 """ |
25 | 31 |
26 def __init__(self, server): | 32 def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None): |
27 self._server = server | 33 self._rpc = rpc |
28 self.subprocess = Subprocess | 34 self._id = rpc.subprocess.Process(cmd) |
| 35 if verbose: |
| 36 self._rpc.subprocess.SetVerbose(self._id) |
| 37 if detached: |
| 38 self._rpc.subprocess.SetDetached(self._id) |
| 39 if cwd: |
| 40 self._rpc.subprocess.SetCwd(self._rpc, cwd) |
| 41 self._rpc.subprocess.Start(self._id) |
29 | 42 |
30 def _dispatch(self, method, params): | 43 def Terminate(self): |
31 obj = self | 44 logging.debug('Terminating process %s', self._id) |
32 if '.' in method: | 45 return self._rpc.subprocess.Terminate(self._id) |
33 # Allow only white listed dotted names | |
34 name, method = method.split('.') | |
35 assert name in self._dotted_whitelist | |
36 obj = getattr(self, name) | |
37 return getattr(obj, method)(*params) | |
38 | 46 |
39 def Echo(self, message): | 47 def Kill(self): |
40 """Simple RPC method to print and return a message.""" | 48 logging.debug('Killing process %s', self._id) |
41 logging.info('Echoing %s', message) | 49 self._rpc.subprocess.Kill(self._id) |
42 return 'echo %s' % str(message) | |
43 | 50 |
44 def AbsPath(self, path): | 51 def Delete(self): |
45 """Returns the absolute path.""" | 52 return self._rpc.subprocess.Delete(self._id) |
46 return os.path.abspath(path) | |
47 | 53 |
48 def Quit(self): | 54 def GetReturncode(self): |
49 """Call _server.shutdown in another thread. | 55 return self._rpc.subprocess.GetReturncode(self._id) |
50 | 56 |
51 This is needed because server.shutdown waits for the server to actually | 57 def ReadStdout(self): |
52 quit. However the server cannot shutdown until it completes handling this | 58 """Returns all stdout since the last call to ReadStdout. |
53 call. Calling this in the same thread results in a deadlock. | 59 |
| 60 This call allows the user to read stdout while the process is running. |
| 61 However each call will flush the local stdout buffer. In order to make |
| 62 multiple calls to ReadStdout and to retain the entire output the results |
| 63 of this call will need to be buffered in the calling code. |
54 """ | 64 """ |
55 t = threading.Thread(target=self._server.shutdown) | 65 return self._rpc.subprocess.ReadStdout(self._id) |
56 t.start() | 66 |
| 67 def ReadStderr(self): |
| 68 """Returns all stderr read since the last call to ReadStderr. |
| 69 |
| 70 See ReadStdout for additional details. |
| 71 """ |
| 72 return self._rpc.subprocess.ReadStderr(self._id) |
| 73 |
| 74 def ReadOutput(self): |
| 75 """Returns the (stdout, stderr) since the last Read* call. |
| 76 |
| 77 See ReadStdout for additional details. |
| 78 """ |
| 79 return self._rpc.subprocess.ReadOutput(self._id) |
| 80 |
| 81 def Wait(self): |
| 82 return self._rpc.subprocess.Wait(self._id) |
| 83 |
| 84 def Poll(self): |
| 85 return self._rpc.subprocess.Poll(self._id) |
| 86 |
| 87 def GetPid(self): |
| 88 return self._rpc.subprocess.GetPid(self._id) |
57 | 89 |
58 | 90 |
59 class Subprocess(object): | 91 |
60 """Implements a server-based non-blocking subprocess. | 92 class Process(object): |
| 93 """Implements a task-side non-blocking subprocess. |
61 | 94 |
62 This non-blocking subprocess allows the caller to continue operating while | 95 This non-blocking subprocess allows the caller to continue operating while |
63 also able to interact with this subprocess based on a key returned to | 96 also able to interact with this subprocess based on a key returned to |
64 the caller at the time of creation. | 97 the caller at the time of creation. |
65 | 98 |
66 Creation args are set via Set* methods called after calling Process but | 99 Creation args are set via Set* methods called after calling Process but |
67 before calling Start. This is due to a limitation of the XML-RPC | 100 before calling Start. This is due to a limitation of the XML-RPC |
68 implementation not supporting keyword arguments. | 101 implementation not supporting keyword arguments. |
69 """ | 102 """ |
70 | 103 |
(...skipping 30 matching lines...) Expand all Loading... |
101 @classmethod | 134 @classmethod |
102 def KillAll(cls): | 135 def KillAll(cls): |
103 for key in cls._processes: | 136 for key in cls._processes: |
104 cls.Kill(key) | 137 cls.Kill(key) |
105 | 138 |
106 @classmethod | 139 @classmethod |
107 def Process(cls, cmd): | 140 def Process(cls, cmd): |
108 with cls._creation_lock: | 141 with cls._creation_lock: |
109 key = 'Process%d' % cls._process_next_id | 142 key = 'Process%d' % cls._process_next_id |
110 cls._process_next_id += 1 | 143 cls._process_next_id += 1 |
111 logging.debug('Creating process %s', key) | 144 logging.debug('Creating process %s with cmd %r', key, cmd) |
112 process = cls(cmd) | 145 process = cls(cmd) |
113 cls._processes[key] = process | 146 cls._processes[key] = process |
114 return key | 147 return key |
115 | 148 |
116 def _Start(self): | 149 def _Start(self): |
117 logging.info('Starting process %s', self) | 150 logging.info('Starting process %s', self) |
118 self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE, | 151 self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE, |
119 stderr=subprocess42.PIPE, | 152 stderr=subprocess42.PIPE, |
120 detached=self.detached, cwd=self.cwd) | 153 detached=self.detached, cwd=self.cwd) |
121 threading.Thread(target=self._reader).start() | 154 threading.Thread(target=self._reader).start() |
122 | 155 |
123 @classmethod | 156 @classmethod |
124 def Start(cls, key): | 157 def Start(cls, key): |
125 cls._processes[key]._Start() | 158 cls._processes[key]._Start() |
126 | 159 |
127 @classmethod | 160 @classmethod |
128 def SetCwd(cls, key, cwd): | 161 def SetCwd(cls, key, cwd): |
129 """Sets the process's cwd.""" | 162 """Sets the process's cwd.""" |
130 logging.debug('Setting %s cwd to %s', key, cwd) | 163 logging.debug('Setting %s cwd to %s', key, cwd) |
131 cls._processes[key].cwd = cwd | 164 cls._processes[key].cwd = cwd |
132 | 165 |
133 @classmethod | 166 @classmethod |
134 def SetDetached(cls, key): | 167 def SetDetached(cls, key): |
135 """Creates a detached process.""" | 168 """Creates a detached process.""" |
136 logging.debug('Setting %s to run detached', key) | 169 logging.debug('Setting %s.detached = True', key) |
137 cls._processes[key].detached = True | 170 cls._processes[key].detached = True |
138 | 171 |
139 @classmethod | 172 @classmethod |
140 def SetVerbose(cls, key): | 173 def SetVerbose(cls, key): |
141 """Sets the stdout and stderr to be emitted locally.""" | 174 """Sets the stdout and stderr to be emitted locally.""" |
142 logging.debug('Setting %s to be verbose', key) | 175 logging.debug('Setting %s.verbose = True', key) |
143 cls._processes[key].verbose = True | 176 cls._processes[key].verbose = True |
144 | 177 |
145 @classmethod | 178 @classmethod |
146 def Terminate(cls, key): | 179 def Terminate(cls, key): |
147 logging.debug('Terminating process %s', key) | 180 logging.debug('Terminating process %s', key) |
148 cls._processes[key].proc.terminate() | 181 cls._processes[key].proc.terminate() |
149 | 182 |
150 @classmethod | 183 @classmethod |
151 def Kill(cls, key): | 184 def Kill(cls, key): |
152 logging.debug('Killing process %s', key) | 185 logging.debug('Killing process %s', key) |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
205 def Wait(cls, key): | 238 def Wait(cls, key): |
206 return cls._processes[key].proc.wait() | 239 return cls._processes[key].proc.wait() |
207 | 240 |
208 @classmethod | 241 @classmethod |
209 def Poll(cls, key): | 242 def Poll(cls, key): |
210 return cls._processes[key].proc.poll() | 243 return cls._processes[key].proc.poll() |
211 | 244 |
212 @classmethod | 245 @classmethod |
213 def GetPid(cls, key): | 246 def GetPid(cls, key): |
214 return cls._processes[key].proc.pid | 247 return cls._processes[key].proc.pid |
OLD | NEW |