OLD | NEW |
| (Empty) |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """Defines the task RPC methods.""" | |
6 | |
7 import os | |
8 import sys | |
9 import logging | |
10 import threading | |
11 | |
12 #pylint: disable=relative-import | |
13 import common_lib | |
14 | |
15 # Map swarming_client to use subprocess42 | |
16 sys.path.append(common_lib.SWARMING_DIR) | |
17 | |
18 from utils import subprocess42 | |
19 | |
20 | |
21 class RPCMethods(object): | |
22 """Class exposing RPC methods.""" | |
23 | |
24 _dotted_whitelist = ['subprocess'] | |
25 | |
26 def __init__(self, server): | |
27 self._server = server | |
28 self.subprocess = Subprocess | |
29 | |
30 def _dispatch(self, method, params): | |
31 obj = self | |
32 if '.' in method: | |
33 # Allow only white listed dotted names | |
34 name, method = method.split('.') | |
35 assert name in self._dotted_whitelist | |
36 obj = getattr(self, name) | |
37 return getattr(obj, method)(*params) | |
38 | |
39 def Echo(self, message): | |
40 """Simple RPC method to print and return a message.""" | |
41 logging.info('Echoing %s', message) | |
42 return 'echo %s' % str(message) | |
43 | |
44 def Quit(self): | |
45 """Call _server.shutdown in another thread. | |
46 | |
47 This is needed because server.shutdown waits for the server to actually | |
48 quit. However the server cannot shutdown until it completes handling this | |
49 call. Calling this in the same thread results in a deadlock. | |
50 """ | |
51 t = threading.Thread(target=self._server.shutdown) | |
52 t.start() | |
53 | |
54 | |
55 class Subprocess(object): | |
56 """Implements a server-based non-blocking subprocess. | |
57 | |
58 This non-blocking subprocess allows the caller to continue operating while | |
59 also able to interact with this subprocess based on a key returned to | |
60 the caller at the time of creation. | |
61 """ | |
62 | |
63 _processes = {} | |
64 _process_next_id = 0 | |
65 _creation_lock = threading.Lock() | |
66 | |
67 def __init__(self, cmd): | |
68 self.proc = subprocess42.Popen(cmd, stdout=subprocess42.PIPE, | |
69 stderr=subprocess42.PIPE) | |
70 self.stdout = '' | |
71 self.stderr = '' | |
72 self.data_lock = threading.Lock() | |
73 threading.Thread(target=self._run).start() | |
74 | |
75 def _run(self): | |
76 for pipe, data in self.proc.yield_any(): | |
77 with self.data_lock: | |
78 if pipe == 'stdout': | |
79 self.stdout += data | |
80 else: | |
81 self.stderr += data | |
82 | |
83 @classmethod | |
84 def Popen(cls, cmd): | |
85 with cls._creation_lock: | |
86 key = 'Process%d' % cls._process_next_id | |
87 cls._process_next_id += 1 | |
88 logging.debug('Creating process %s', key) | |
89 process = cls(cmd) | |
90 cls._processes[key] = process | |
91 return key | |
92 | |
93 @classmethod | |
94 def Terminate(cls, key): | |
95 logging.debug('Terminating and deleting process %s', key) | |
96 return cls._processes.pop(key).proc.terminate() | |
97 | |
98 @classmethod | |
99 def Kill(cls, key): | |
100 logging.debug('Killing and deleting process %s', key) | |
101 return cls._processes.pop(key).proc.kill() | |
102 | |
103 @classmethod | |
104 def Delete(cls, key): | |
105 logging.debug('Deleting process %s', key) | |
106 cls._processes.pop(key) | |
107 | |
108 @classmethod | |
109 def GetReturncode(cls, key): | |
110 return cls._processes[key].proc.returncode | |
111 | |
112 @classmethod | |
113 def ReadStdout(cls, key): | |
114 """Returns all stdout since the last call to ReadStdout. | |
115 | |
116 This call allows the user to read stdout while the process is running. | |
117 However each call will flush the local stdout buffer. In order to make | |
118 multiple calls to ReadStdout and to retain the entire output the results | |
119 of this call will need to be buffered in the calling code. | |
120 """ | |
121 proc = cls._processes[key] | |
122 with proc.data_lock: | |
123 # Perform a "read" on the stdout data | |
124 stdout = proc.stdout | |
125 proc.stdout = '' | |
126 return stdout | |
127 | |
128 @classmethod | |
129 def ReadStderr(cls, key): | |
130 """Returns all stderr read since the last call to ReadStderr. | |
131 | |
132 See ReadStdout for additional details. | |
133 """ | |
134 proc = cls._processes[key] | |
135 with proc.data_lock: | |
136 # Perform a "read" on the stderr data | |
137 stderr = proc.stderr | |
138 proc.stderr = '' | |
139 return stderr | |
140 | |
141 @classmethod | |
142 def Wait(cls, key): | |
143 return cls._processes[key].proc.wait() | |
144 | |
145 @classmethod | |
146 def Poll(cls, key): | |
147 return cls._processes[key].proc.poll() | |
148 | |
149 @classmethod | |
150 def GetPid(cls, key): | |
151 return cls._processes[key].proc.pid | |
OLD | NEW |