Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(191)

Side by Side Diff: testing/legion/rpc_methods.py

Issue 1124763003: Update from https://crrev.com/327068 (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: update nacl, buildtools, fix display_change_notifier_unittest Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Defines the task RPC methods.""" 5 """Defines the task RPC methods."""
6 6
7 import logging
7 import os 8 import os
8 import sys 9 import sys
9 import logging
10 import threading 10 import threading
11 11
12 #pylint: disable=relative-import 12 #pylint: disable=relative-import
13 import common_lib 13 import process
14
15 # Map swarming_client to use subprocess42
16 sys.path.append(common_lib.SWARMING_DIR)
17
18 from utils import subprocess42
19 14
20 15
21 class RPCMethods(object): 16 class RPCMethods(object):
22 """Class exposing RPC methods.""" 17 """Class exposing RPC methods."""
23 18
24 _dotted_whitelist = ['subprocess'] 19 _dotted_whitelist = ['subprocess']
25 20
26 def __init__(self, server): 21 def __init__(self, server):
27 self._server = server 22 self._server = server
28 self.subprocess = Subprocess 23 self.subprocess = process.Process
29 24
30 def _dispatch(self, method, params): 25 def _dispatch(self, method, params):
31 obj = self 26 obj = self
32 if '.' in method: 27 if '.' in method:
33 # Allow only white listed dotted names 28 # Allow only white listed dotted names
34 name, method = method.split('.') 29 name, method = method.split('.')
35 assert name in self._dotted_whitelist 30 assert name in self._dotted_whitelist
36 obj = getattr(self, name) 31 obj = getattr(self, name)
37 return getattr(obj, method)(*params) 32 return getattr(obj, method)(*params)
38 33
39 def Echo(self, message): 34 def Echo(self, message):
40 """Simple RPC method to print and return a message.""" 35 """Simple RPC method to print and return a message."""
41 logging.info('Echoing %s', message) 36 logging.info('Echoing %s', message)
42 return 'echo %s' % str(message) 37 return 'echo %s' % str(message)
43 38
44 def AbsPath(self, path): 39 def AbsPath(self, path):
45 """Returns the absolute path.""" 40 """Returns the absolute path."""
46 return os.path.abspath(path) 41 return os.path.abspath(path)
47 42
48 def Quit(self): 43 def Quit(self):
49 """Call _server.shutdown in another thread. 44 """Call _server.shutdown in another thread.
50 45
51 This is needed because server.shutdown waits for the server to actually 46 This is needed because server.shutdown waits for the server to actually
52 quit. However the server cannot shutdown until it completes handling this 47 quit. However the server cannot shutdown until it completes handling this
53 call. Calling this in the same thread results in a deadlock. 48 call. Calling this in the same thread results in a deadlock.
54 """ 49 """
55 t = threading.Thread(target=self._server.shutdown) 50 t = threading.Thread(target=self._server.shutdown)
56 t.start() 51 t.start()
57
58
59 class Subprocess(object):
60 """Implements a server-based non-blocking subprocess.
61
62 This non-blocking subprocess allows the caller to continue operating while
63 also able to interact with this subprocess based on a key returned to
64 the caller at the time of creation.
65
66 Creation args are set via Set* methods called after calling Process but
67 before calling Start. This is due to a limitation of the XML-RPC
68 implementation not supporting keyword arguments.
69 """
70
71 _processes = {}
72 _process_next_id = 0
73 _creation_lock = threading.Lock()
74
75 def __init__(self, cmd):
76 self.stdout = ''
77 self.stderr = ''
78 self.cmd = cmd
79 self.proc = None
80 self.cwd = None
81 self.verbose = False
82 self.detached = False
83 self.data_lock = threading.Lock()
84
85 def __str__(self):
86 return '%r, cwd=%r, verbose=%r, detached=%r' % (
87 self.cmd, self.cwd, self.verbose, self.detached)
88
89 def _reader(self):
90 for pipe, data in self.proc.yield_any():
91 with self.data_lock:
92 if pipe == 'stdout':
93 self.stdout += data
94 if self.verbose:
95 sys.stdout.write(data)
96 else:
97 self.stderr += data
98 if self.verbose:
99 sys.stderr.write(data)
100
101 @classmethod
102 def KillAll(cls):
103 for key in cls._processes:
104 cls.Kill(key)
105
106 @classmethod
107 def Process(cls, cmd):
108 with cls._creation_lock:
109 key = 'Process%d' % cls._process_next_id
110 cls._process_next_id += 1
111 logging.debug('Creating process %s', key)
112 process = cls(cmd)
113 cls._processes[key] = process
114 return key
115
116 def _Start(self):
117 logging.info('Starting process %s', self)
118 self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE,
119 stderr=subprocess42.PIPE,
120 detached=self.detached, cwd=self.cwd)
121 threading.Thread(target=self._reader).start()
122
123 @classmethod
124 def Start(cls, key):
125 cls._processes[key]._Start()
126
127 @classmethod
128 def SetCwd(cls, key, cwd):
129 """Sets the process's cwd."""
130 logging.debug('Setting %s cwd to %s', key, cwd)
131 cls._processes[key].cwd = cwd
132
133 @classmethod
134 def SetDetached(cls, key):
135 """Creates a detached process."""
136 logging.debug('Setting %s to run detached', key)
137 cls._processes[key].detached = True
138
139 @classmethod
140 def SetVerbose(cls, key):
141 """Sets the stdout and stderr to be emitted locally."""
142 logging.debug('Setting %s to be verbose', key)
143 cls._processes[key].verbose = True
144
145 @classmethod
146 def Terminate(cls, key):
147 logging.debug('Terminating process %s', key)
148 cls._processes[key].proc.terminate()
149
150 @classmethod
151 def Kill(cls, key):
152 logging.debug('Killing process %s', key)
153 cls._processes[key].proc.kill()
154
155 @classmethod
156 def Delete(cls, key):
157 if cls.GetReturncode(key) is None:
158 logging.warning('Killing %s before deleting it', key)
159 cls.Kill(key)
160 logging.debug('Deleting process %s', key)
161 cls._processes.pop(key)
162
163 @classmethod
164 def GetReturncode(cls, key):
165 return cls._processes[key].proc.returncode
166
167 @classmethod
168 def ReadStdout(cls, key):
169 """Returns all stdout since the last call to ReadStdout.
170
171 This call allows the user to read stdout while the process is running.
172 However each call will flush the local stdout buffer. In order to make
173 multiple calls to ReadStdout and to retain the entire output the results
174 of this call will need to be buffered in the calling code.
175 """
176 proc = cls._processes[key]
177 with proc.data_lock:
178 # Perform a "read" on the stdout data
179 stdout = proc.stdout
180 proc.stdout = ''
181 return stdout
182
183 @classmethod
184 def ReadStderr(cls, key):
185 """Returns all stderr read since the last call to ReadStderr.
186
187 See ReadStdout for additional details.
188 """
189 proc = cls._processes[key]
190 with proc.data_lock:
191 # Perform a "read" on the stderr data
192 stderr = proc.stderr
193 proc.stderr = ''
194 return stderr
195
196 @classmethod
197 def ReadOutput(cls, key):
198 """Returns the (stdout, stderr) since the last Read* call.
199
200 See ReadStdout for additional details.
201 """
202 return cls.ReadStdout(key), cls.ReadStderr(key)
203
204 @classmethod
205 def Wait(cls, key):
206 return cls._processes[key].proc.wait()
207
208 @classmethod
209 def Poll(cls, key):
210 return cls._processes[key].proc.poll()
211
212 @classmethod
213 def GetPid(cls, key):
214 return cls._processes[key].proc.pid
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698