Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(238)

Side by Side Diff: testing/legion/client_rpc_methods.py

Issue 870103005: Adding a server side, non-blocking subprocess mechanism. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Adding a streaming read mechanism. Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « testing/legion/client_lib.py ('k') | testing/legion/common_lib.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Defines the client RPC methods.""" 5 """Defines the client RPC methods."""
6 6
7 import os
8 import sys
7 import logging 9 import logging
8 import subprocess
9 import threading 10 import threading
10 11
12 #pylint: disable=relative-import
13 import common_lib
14
15 # Map swarming_client to use subprocess42
16 sys.path.append(common_lib.SWARMING_DIR)
17
18 from utils import subprocess42
19
11 20
12 class RPCMethods(object): 21 class RPCMethods(object):
13 """Class exposing RPC methods.""" 22 """Class exposing RPC methods."""
14 23
24 _dotted_whitelist = ['subprocess']
25
15 def __init__(self, server): 26 def __init__(self, server):
16 self.server = server 27 self._server = server
28 self.subprocess = Subprocess
29
30 def _dispatch(self, method, params):
31 obj = self
32 if '.' in method:
33 # Allow only white listed dotted names
34 name, method = method.split('.')
35 assert name in self._dotted_whitelist
36 obj = getattr(self, name)
37 return getattr(obj, method)(*params)
17 38
18 def Echo(self, message): 39 def Echo(self, message):
19 """Simple RPC method to print and return a message.""" 40 """Simple RPC method to print and return a message."""
20 logging.info('Echoing %s', message) 41 logging.info('Echoing %s', message)
21 return 'echo %s' % str(message) 42 return 'echo %s' % str(message)
22 43
23 def Subprocess(self, cmd):
24 """Run the commands in a subprocess.
25
26 Returns:
27 (returncode, stdout, stderr).
28 """
29 p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
30 stderr=subprocess.PIPE)
31 stdout, stderr = p.communicate()
32 return (p.returncode, stdout, stderr)
33
34 def Quit(self): 44 def Quit(self):
35 """Call server.shutdown in another thread. 45 """Call _server.shutdown in another thread.
36 46
37 This is needed because server.shutdown waits for the server to actually 47 This is needed because server.shutdown waits for the server to actually
38 quit. However the server cannot shutdown until it completes handling this 48 quit. However the server cannot shutdown until it completes handling this
39 call. Calling this in the same thread results in a deadlock. 49 call. Calling this in the same thread results in a deadlock.
40 """ 50 """
41 t = threading.Thread(target=self.server.shutdown) 51 t = threading.Thread(target=self._server.shutdown)
42 t.start() 52 t.start()
53
54
55 class Subprocess(object):
56 """Implements a server-based non-blocking subprocess.
57
58 This non-blocking subprocess allows the caller to continue operating while
59 also able to interact with this subprocess based on a key returned to
60 the caller at the time of creation.
61 """
62
63 _processes = {}
64 _process_next_id = 0
65 _creation_lock = threading.Lock()
66
67 def __init__(self, cmd):
68 self.proc = subprocess42.Popen(cmd, stdout=subprocess42.PIPE,
69 stderr=subprocess42.PIPE)
70 self.stdout = ''
71 self.stderr = ''
72 self.stdout_lock = threading.Lock()
M-A Ruel 2015/02/12 22:16:11 one Lock would be enough.
Mike Meade 2015/02/12 22:49:00 Done.
73 self.stderr_lock = threading.Lock()
74 threading.Thread(target=self._run).start()
75
76 def _run(self):
77 for pipe, data in self.proc.yield_any():
78 if pipe == 'stdout':
79 with self.stdout_lock:
80 self.stdout += data
81 else:
82 with self.stderr_lock:
83 self.stderr += data
84
85 @classmethod
86 def Popen(cls, cmd):
87 with cls._creation_lock:
88 key = 'Process%d' % cls._process_next_id
89 cls._process_next_id += 1
90 logging.debug('Creating process %s', key)
91 process = cls(cmd)
92 cls._processes[key] = process
93 return key
94
95 @classmethod
96 def Terminate(cls, key):
97 logging.debug('Terminating and deleting process %s', key)
98 return cls._processes.pop(key).proc.terminate()
99
100 @classmethod
101 def Kill(cls, key):
102 logging.debug('Killing and deleting process %s', key)
103 return cls._processes.pop(key).proc.kill()
104
105 @classmethod
106 def Delete(cls, key):
107 logging.debug('Deleting process %s', key)
108 cls._processes.pop(key)
109
110 @classmethod
111 def GetReturncode(cls, key):
112 return cls._processes[key].proc.returncode
113
114 @classmethod
115 def GetStdout(cls, key):
M-A Ruel 2015/02/12 22:16:11 It's rare for a Get function to have side effect.
Mike Meade 2015/02/12 22:49:00 Changed these to ReadStd*** methods and added usag
116 proc = cls._processes[key]
117 with proc.stdout_lock:
118 # Perform a "read" on the stdout data
119 stdout = proc.stdout
120 proc.stdout = ''
121 return stdout
122
123 @classmethod
124 def GetStderr(cls, key):
125 proc = cls._processes[key]
126 with proc.stderr_lock:
127 # Perform a "read" on the stderr data
128 stderr = proc.stderr
129 proc.stderr = ''
130 return stderr
131
132 @classmethod
133 def Wait(cls, key):
134 return cls._processes[key].proc.wait()
135
136 @classmethod
137 def Poll(cls, key):
138 return cls._processes[key].proc.poll()
139
140 @classmethod
141 def GetPid(cls, key):
142 return cls._processes[key].proc.pid
OLDNEW
« no previous file with comments | « testing/legion/client_lib.py ('k') | testing/legion/common_lib.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698