OLD | NEW |
| (Empty) |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """A "Test Server Spawner" that handles killing/stopping per-test test servers. | |
6 | |
7 It's used to accept requests from the device to spawn and kill instances of the | |
8 chrome test server on the host. | |
9 """ | |
10 # pylint: disable=W0702 | |
11 | |
12 import BaseHTTPServer | |
13 import json | |
14 import logging | |
15 import os | |
16 import select | |
17 import struct | |
18 import subprocess | |
19 import sys | |
20 import threading | |
21 import time | |
22 import urlparse | |
23 | |
24 from pylib import constants | |
25 from pylib import ports | |
26 | |
27 from pylib.forwarder import Forwarder | |
28 | |
29 | |
30 # Path that are needed to import necessary modules when launching a testserver. | |
31 os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s' | |
32 % (os.path.join(constants.DIR_SOURCE_ROOT, 'third_party'), | |
33 os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'tlslite'), | |
34 os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib', | |
35 'src'), | |
36 os.path.join(constants.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'), | |
37 os.path.join(constants.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver'))) | |
38 | |
39 | |
40 SERVER_TYPES = { | |
41 'http': '', | |
42 'ftp': '-f', | |
43 'sync': '', # Sync uses its own script, and doesn't take a server type arg. | |
44 'tcpecho': '--tcp-echo', | |
45 'udpecho': '--udp-echo', | |
46 } | |
47 | |
48 | |
49 # The timeout (in seconds) of starting up the Python test server. | |
50 TEST_SERVER_STARTUP_TIMEOUT = 10 | |
51 | |
52 def _WaitUntil(predicate, max_attempts=5): | |
53 """Blocks until the provided predicate (function) is true. | |
54 | |
55 Returns: | |
56 Whether the provided predicate was satisfied once (before the timeout). | |
57 """ | |
58 sleep_time_sec = 0.025 | |
59 for _ in xrange(1, max_attempts): | |
60 if predicate(): | |
61 return True | |
62 time.sleep(sleep_time_sec) | |
63 sleep_time_sec = min(1, sleep_time_sec * 2) # Don't wait more than 1 sec. | |
64 return False | |
65 | |
66 | |
67 def _CheckPortAvailable(port): | |
68 """Returns True if |port| is available.""" | |
69 return _WaitUntil(lambda: ports.IsHostPortAvailable(port)) | |
70 | |
71 | |
72 def _CheckPortNotAvailable(port): | |
73 """Returns True if |port| is not available.""" | |
74 return _WaitUntil(lambda: not ports.IsHostPortAvailable(port)) | |
75 | |
76 | |
77 def _CheckDevicePortStatus(device, port): | |
78 """Returns whether the provided port is used.""" | |
79 return _WaitUntil(lambda: ports.IsDevicePortUsed(device, port)) | |
80 | |
81 | |
82 def _GetServerTypeCommandLine(server_type): | |
83 """Returns the command-line by the given server type. | |
84 | |
85 Args: | |
86 server_type: the server type to be used (e.g. 'http'). | |
87 | |
88 Returns: | |
89 A string containing the command-line argument. | |
90 """ | |
91 if server_type not in SERVER_TYPES: | |
92 raise NotImplementedError('Unknown server type: %s' % server_type) | |
93 if server_type == 'udpecho': | |
94 raise Exception('Please do not run UDP echo tests because we do not have ' | |
95 'a UDP forwarder tool.') | |
96 return SERVER_TYPES[server_type] | |
97 | |
98 | |
99 class TestServerThread(threading.Thread): | |
100 """A thread to run the test server in a separate process.""" | |
101 | |
102 def __init__(self, ready_event, arguments, device, tool): | |
103 """Initialize TestServerThread with the following argument. | |
104 | |
105 Args: | |
106 ready_event: event which will be set when the test server is ready. | |
107 arguments: dictionary of arguments to run the test server. | |
108 device: An instance of DeviceUtils. | |
109 tool: instance of runtime error detection tool. | |
110 """ | |
111 threading.Thread.__init__(self) | |
112 self.wait_event = threading.Event() | |
113 self.stop_flag = False | |
114 self.ready_event = ready_event | |
115 self.ready_event.clear() | |
116 self.arguments = arguments | |
117 self.device = device | |
118 self.tool = tool | |
119 self.test_server_process = None | |
120 self.is_ready = False | |
121 self.host_port = self.arguments['port'] | |
122 assert isinstance(self.host_port, int) | |
123 # The forwarder device port now is dynamically allocated. | |
124 self.forwarder_device_port = 0 | |
125 # Anonymous pipe in order to get port info from test server. | |
126 self.pipe_in = None | |
127 self.pipe_out = None | |
128 self.process = None | |
129 self.command_line = [] | |
130 | |
131 def _WaitToStartAndGetPortFromTestServer(self): | |
132 """Waits for the Python test server to start and gets the port it is using. | |
133 | |
134 The port information is passed by the Python test server with a pipe given | |
135 by self.pipe_out. It is written as a result to |self.host_port|. | |
136 | |
137 Returns: | |
138 Whether the port used by the test server was successfully fetched. | |
139 """ | |
140 assert self.host_port == 0 and self.pipe_out and self.pipe_in | |
141 (in_fds, _, _) = select.select([self.pipe_in, ], [], [], | |
142 TEST_SERVER_STARTUP_TIMEOUT) | |
143 if len(in_fds) == 0: | |
144 logging.error('Failed to wait to the Python test server to be started.') | |
145 return False | |
146 # First read the data length as an unsigned 4-byte value. This | |
147 # is _not_ using network byte ordering since the Python test server packs | |
148 # size as native byte order and all Chromium platforms so far are | |
149 # configured to use little-endian. | |
150 # TODO(jnd): Change the Python test server and local_test_server_*.cc to | |
151 # use a unified byte order (either big-endian or little-endian). | |
152 data_length = os.read(self.pipe_in, struct.calcsize('=L')) | |
153 if data_length: | |
154 (data_length,) = struct.unpack('=L', data_length) | |
155 assert data_length | |
156 if not data_length: | |
157 logging.error('Failed to get length of server data.') | |
158 return False | |
159 port_json = os.read(self.pipe_in, data_length) | |
160 if not port_json: | |
161 logging.error('Failed to get server data.') | |
162 return False | |
163 logging.info('Got port json data: %s', port_json) | |
164 port_json = json.loads(port_json) | |
165 if port_json.has_key('port') and isinstance(port_json['port'], int): | |
166 self.host_port = port_json['port'] | |
167 return _CheckPortNotAvailable(self.host_port) | |
168 logging.error('Failed to get port information from the server data.') | |
169 return False | |
170 | |
171 def _GenerateCommandLineArguments(self): | |
172 """Generates the command line to run the test server. | |
173 | |
174 Note that all options are processed by following the definitions in | |
175 testserver.py. | |
176 """ | |
177 if self.command_line: | |
178 return | |
179 | |
180 args_copy = dict(self.arguments) | |
181 | |
182 # Translate the server type. | |
183 type_cmd = _GetServerTypeCommandLine(args_copy.pop('server-type')) | |
184 if type_cmd: | |
185 self.command_line.append(type_cmd) | |
186 | |
187 # Use a pipe to get the port given by the instance of Python test server | |
188 # if the test does not specify the port. | |
189 assert self.host_port == args_copy['port'] | |
190 if self.host_port == 0: | |
191 (self.pipe_in, self.pipe_out) = os.pipe() | |
192 self.command_line.append('--startup-pipe=%d' % self.pipe_out) | |
193 | |
194 # Pass the remaining arguments as-is. | |
195 for key, values in args_copy.iteritems(): | |
196 if not isinstance(values, list): | |
197 values = [values] | |
198 for value in values: | |
199 if value is None: | |
200 self.command_line.append('--%s' % key) | |
201 else: | |
202 self.command_line.append('--%s=%s' % (key, value)) | |
203 | |
204 def _CloseUnnecessaryFDsForTestServerProcess(self): | |
205 # This is required to avoid subtle deadlocks that could be caused by the | |
206 # test server child process inheriting undesirable file descriptors such as | |
207 # file lock file descriptors. | |
208 for fd in xrange(0, 1024): | |
209 if fd != self.pipe_out: | |
210 try: | |
211 os.close(fd) | |
212 except: | |
213 pass | |
214 | |
215 def run(self): | |
216 logging.info('Start running the thread!') | |
217 self.wait_event.clear() | |
218 self._GenerateCommandLineArguments() | |
219 command = constants.DIR_SOURCE_ROOT | |
220 if self.arguments['server-type'] == 'sync': | |
221 command = [os.path.join(command, 'sync', 'tools', 'testserver', | |
222 'sync_testserver.py')] + self.command_line | |
223 else: | |
224 command = [os.path.join(command, 'net', 'tools', 'testserver', | |
225 'testserver.py')] + self.command_line | |
226 logging.info('Running: %s', command) | |
227 # Pass DIR_SOURCE_ROOT as the child's working directory so that relative | |
228 # paths in the arguments are resolved correctly. | |
229 self.process = subprocess.Popen( | |
230 command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess, | |
231 cwd=constants.DIR_SOURCE_ROOT) | |
232 if self.process: | |
233 if self.pipe_out: | |
234 self.is_ready = self._WaitToStartAndGetPortFromTestServer() | |
235 else: | |
236 self.is_ready = _CheckPortNotAvailable(self.host_port) | |
237 if self.is_ready: | |
238 Forwarder.Map([(0, self.host_port)], self.device, self.tool) | |
239 # Check whether the forwarder is ready on the device. | |
240 self.is_ready = False | |
241 device_port = Forwarder.DevicePortForHostPort(self.host_port) | |
242 if device_port and _CheckDevicePortStatus(self.device, device_port): | |
243 self.is_ready = True | |
244 self.forwarder_device_port = device_port | |
245 # Wake up the request handler thread. | |
246 self.ready_event.set() | |
247 # Keep thread running until Stop() gets called. | |
248 _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint) | |
249 if self.process.poll() is None: | |
250 self.process.kill() | |
251 Forwarder.UnmapDevicePort(self.forwarder_device_port, self.device) | |
252 self.process = None | |
253 self.is_ready = False | |
254 if self.pipe_out: | |
255 os.close(self.pipe_in) | |
256 os.close(self.pipe_out) | |
257 self.pipe_in = None | |
258 self.pipe_out = None | |
259 logging.info('Test-server has died.') | |
260 self.wait_event.set() | |
261 | |
262 def Stop(self): | |
263 """Blocks until the loop has finished. | |
264 | |
265 Note that this must be called in another thread. | |
266 """ | |
267 if not self.process: | |
268 return | |
269 self.stop_flag = True | |
270 self.wait_event.wait() | |
271 | |
272 | |
273 class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): | |
274 """A handler used to process http GET/POST request.""" | |
275 | |
276 def _SendResponse(self, response_code, response_reason, additional_headers, | |
277 contents): | |
278 """Generates a response sent to the client from the provided parameters. | |
279 | |
280 Args: | |
281 response_code: number of the response status. | |
282 response_reason: string of reason description of the response. | |
283 additional_headers: dict of additional headers. Each key is the name of | |
284 the header, each value is the content of the header. | |
285 contents: string of the contents we want to send to client. | |
286 """ | |
287 self.send_response(response_code, response_reason) | |
288 self.send_header('Content-Type', 'text/html') | |
289 # Specify the content-length as without it the http(s) response will not | |
290 # be completed properly (and the browser keeps expecting data). | |
291 self.send_header('Content-Length', len(contents)) | |
292 for header_name in additional_headers: | |
293 self.send_header(header_name, additional_headers[header_name]) | |
294 self.end_headers() | |
295 self.wfile.write(contents) | |
296 self.wfile.flush() | |
297 | |
298 def _StartTestServer(self): | |
299 """Starts the test server thread.""" | |
300 logging.info('Handling request to spawn a test server.') | |
301 content_type = self.headers.getheader('content-type') | |
302 if content_type != 'application/json': | |
303 raise Exception('Bad content-type for start request.') | |
304 content_length = self.headers.getheader('content-length') | |
305 if not content_length: | |
306 content_length = 0 | |
307 try: | |
308 content_length = int(content_length) | |
309 except: | |
310 raise Exception('Bad content-length for start request.') | |
311 logging.info(content_length) | |
312 test_server_argument_json = self.rfile.read(content_length) | |
313 logging.info(test_server_argument_json) | |
314 assert not self.server.test_server_instance | |
315 ready_event = threading.Event() | |
316 self.server.test_server_instance = TestServerThread( | |
317 ready_event, | |
318 json.loads(test_server_argument_json), | |
319 self.server.device, | |
320 self.server.tool) | |
321 self.server.test_server_instance.setDaemon(True) | |
322 self.server.test_server_instance.start() | |
323 ready_event.wait() | |
324 if self.server.test_server_instance.is_ready: | |
325 self._SendResponse(200, 'OK', {}, json.dumps( | |
326 {'port': self.server.test_server_instance.forwarder_device_port, | |
327 'message': 'started'})) | |
328 logging.info('Test server is running on port: %d.', | |
329 self.server.test_server_instance.host_port) | |
330 else: | |
331 self.server.test_server_instance.Stop() | |
332 self.server.test_server_instance = None | |
333 self._SendResponse(500, 'Test Server Error.', {}, '') | |
334 logging.info('Encounter problem during starting a test server.') | |
335 | |
336 def _KillTestServer(self): | |
337 """Stops the test server instance.""" | |
338 # There should only ever be one test server at a time. This may do the | |
339 # wrong thing if we try and start multiple test servers. | |
340 if not self.server.test_server_instance: | |
341 return | |
342 port = self.server.test_server_instance.host_port | |
343 logging.info('Handling request to kill a test server on port: %d.', port) | |
344 self.server.test_server_instance.Stop() | |
345 # Make sure the status of test server is correct before sending response. | |
346 if _CheckPortAvailable(port): | |
347 self._SendResponse(200, 'OK', {}, 'killed') | |
348 logging.info('Test server on port %d is killed', port) | |
349 else: | |
350 self._SendResponse(500, 'Test Server Error.', {}, '') | |
351 logging.info('Encounter problem during killing a test server.') | |
352 self.server.test_server_instance = None | |
353 | |
354 def do_POST(self): | |
355 parsed_path = urlparse.urlparse(self.path) | |
356 action = parsed_path.path | |
357 logging.info('Action for POST method is: %s.', action) | |
358 if action == '/start': | |
359 self._StartTestServer() | |
360 else: | |
361 self._SendResponse(400, 'Unknown request.', {}, '') | |
362 logging.info('Encounter unknown request: %s.', action) | |
363 | |
364 def do_GET(self): | |
365 parsed_path = urlparse.urlparse(self.path) | |
366 action = parsed_path.path | |
367 params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1) | |
368 logging.info('Action for GET method is: %s.', action) | |
369 for param in params: | |
370 logging.info('%s=%s', param, params[param][0]) | |
371 if action == '/kill': | |
372 self._KillTestServer() | |
373 elif action == '/ping': | |
374 # The ping handler is used to check whether the spawner server is ready | |
375 # to serve the requests. We don't need to test the status of the test | |
376 # server when handling ping request. | |
377 self._SendResponse(200, 'OK', {}, 'ready') | |
378 logging.info('Handled ping request and sent response.') | |
379 else: | |
380 self._SendResponse(400, 'Unknown request', {}, '') | |
381 logging.info('Encounter unknown request: %s.', action) | |
382 | |
383 | |
384 class SpawningServer(object): | |
385 """The class used to start/stop a http server.""" | |
386 | |
387 def __init__(self, test_server_spawner_port, device, tool): | |
388 logging.info('Creating new spawner on port: %d.', test_server_spawner_port) | |
389 self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port), | |
390 SpawningServerRequestHandler) | |
391 self.server.device = device | |
392 self.server.tool = tool | |
393 self.server.test_server_instance = None | |
394 self.server.build_type = constants.GetBuildType() | |
395 | |
396 def _Listen(self): | |
397 logging.info('Starting test server spawner') | |
398 self.server.serve_forever() | |
399 | |
400 def Start(self): | |
401 """Starts the test server spawner.""" | |
402 listener_thread = threading.Thread(target=self._Listen) | |
403 listener_thread.setDaemon(True) | |
404 listener_thread.start() | |
405 | |
406 def Stop(self): | |
407 """Stops the test server spawner. | |
408 | |
409 Also cleans the server state. | |
410 """ | |
411 self.CleanupState() | |
412 self.server.shutdown() | |
413 | |
414 def CleanupState(self): | |
415 """Cleans up the spawning server state. | |
416 | |
417 This should be called if the test server spawner is reused, | |
418 to avoid sharing the test server instance. | |
419 """ | |
420 if self.server.test_server_instance: | |
421 self.server.test_server_instance.Stop() | |
422 self.server.test_server_instance = None | |
OLD | NEW |