Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(96)

Side by Side Diff: testing/legion/task_controller.py

Issue 952893003: Update from https://crrev.com/317530 (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Fix gn for nacl Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « testing/legion/run_task.py ('k') | testing/legion/task_registration_server.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Defines the client library.""" 5 """Defines the task controller library."""
6 6
7 import argparse 7 import argparse
8 import datetime 8 import datetime
9 import logging 9 import logging
10 import os 10 import os
11 import socket 11 import socket
12 import subprocess 12 import subprocess
13 import sys 13 import sys
14 import tempfile 14 import tempfile
15 import threading 15 import threading
16 import xmlrpclib 16 import xmlrpclib
17 17
18 #pylint: disable=relative-import 18 #pylint: disable=relative-import
19 import common_lib 19 import common_lib
20 20
21 ISOLATE_PY = os.path.join(common_lib.SWARMING_DIR, 'isolate.py') 21 ISOLATE_PY = os.path.join(common_lib.SWARMING_DIR, 'isolate.py')
22 SWARMING_PY = os.path.join(common_lib.SWARMING_DIR, 'swarming.py') 22 SWARMING_PY = os.path.join(common_lib.SWARMING_DIR, 'swarming.py')
23 23
24 24
25 class Error(Exception): 25 class Error(Exception):
26 pass 26 pass
27 27
28 28
29 class ConnectionTimeoutError(Error): 29 class ConnectionTimeoutError(Error):
30 pass 30 pass
31 31
32 32
33 class ClientController(object): 33 class TaskController(object):
34 """Creates, configures, and controls a client machine.""" 34 """Provisions, configures, and controls a task machine.
35 35
36 _client_count = 0 36 This class is an abstraction of a physical task machine. It provides an
37 _controllers = [] 37 end to end API for controlling a task machine. Operations on the task machine
38 are performed using the instance's "rpc" property. A simple end to end
39 scenario is as follows:
40
41 task = TaskController(...)
42 task.Create()
43 task.WaitForConnection()
44 proc = task.rpc.subprocess.Popen(['ls'])
45 print task.rpc.subprocess.GetStdout(proc)
46 task.Release()
47 """
48
49 _task_count = 0
50 _tasks = []
38 51
39 def __init__(self, isolate_file, config_vars, dimensions, priority=100, 52 def __init__(self, isolate_file, config_vars, dimensions, priority=100,
40 idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS, 53 idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
41 connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS, 54 connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
42 verbosity='ERROR', name=None): 55 verbosity='ERROR', name=None):
43 assert isinstance(config_vars, dict) 56 assert isinstance(config_vars, dict)
44 assert isinstance(dimensions, dict) 57 assert isinstance(dimensions, dict)
45 type(self)._controllers.append(self) 58 type(self)._tasks.append(self)
46 type(self)._client_count += 1 59 type(self)._task_count += 1
47 self.verbosity = verbosity 60 self.verbosity = verbosity
48 self._name = name or 'Client%d' % type(self)._client_count 61 self._name = name or 'Task%d' % type(self)._task_count
49 self._priority = priority 62 self._priority = priority
50 self._isolate_file = isolate_file 63 self._isolate_file = isolate_file
51 self._isolated_file = isolate_file + 'd' 64 self._isolated_file = isolate_file + 'd'
52 self._idle_timeout_secs = idle_timeout_secs 65 self._idle_timeout_secs = idle_timeout_secs
53 self._config_vars = config_vars 66 self._config_vars = config_vars
54 self._dimensions = dimensions 67 self._dimensions = dimensions
55 self._connect_event = threading.Event() 68 self._connect_event = threading.Event()
56 self._connected = False 69 self._connected = False
57 self._ip_address = None 70 self._ip_address = None
58 self._otp = self._CreateOTP() 71 self._otp = self._CreateOTP()
59 self._rpc = None 72 self._rpc = None
60 73
61 parser = argparse.ArgumentParser() 74 parser = argparse.ArgumentParser()
62 parser.add_argument('--isolate-server') 75 parser.add_argument('--isolate-server')
63 parser.add_argument('--swarming-server') 76 parser.add_argument('--swarming-server')
64 parser.add_argument('--client-connection-timeout-secs', 77 parser.add_argument('--task-connection-timeout-secs',
65 default=common_lib.DEFAULT_TIMEOUT_SECS) 78 default=common_lib.DEFAULT_TIMEOUT_SECS)
66 args, _ = parser.parse_known_args() 79 args, _ = parser.parse_known_args()
67 80
68 self._isolate_server = args.isolate_server 81 self._isolate_server = args.isolate_server
69 self._swarming_server = args.swarming_server 82 self._swarming_server = args.swarming_server
70 self._connection_timeout_secs = (connection_timeout_secs or 83 self._connection_timeout_secs = (connection_timeout_secs or
71 args.client_connection_timeout_secs) 84 args.task_connection_timeout_secs)
72 85
73 @property 86 @property
74 def name(self): 87 def name(self):
75 return self._name 88 return self._name
76 89
77 @property 90 @property
78 def otp(self): 91 def otp(self):
79 return self._otp 92 return self._otp
80 93
81 @property 94 @property
(...skipping 18 matching lines...) Expand all
100 113
101 Either a string ('INFO', 'DEBUG', etc) or a logging level (logging.INFO, 114 Either a string ('INFO', 'DEBUG', etc) or a logging level (logging.INFO,
102 logging.DEBUG, etc) is allowed. 115 logging.DEBUG, etc) is allowed.
103 """ 116 """
104 assert isinstance(level, (str, int)) 117 assert isinstance(level, (str, int))
105 if isinstance(level, int): 118 if isinstance(level, int):
106 level = logging.getLevelName(level) 119 level = logging.getLevelName(level)
107 self._verbosity = level #pylint: disable=attribute-defined-outside-init 120 self._verbosity = level #pylint: disable=attribute-defined-outside-init
108 121
109 @classmethod 122 @classmethod
110 def ReleaseAllControllers(cls): 123 def ReleaseAllTasks(cls):
111 for controller in cls._controllers: 124 for task in cls._tasks:
112 controller.Release() 125 task.Release()
113 126
114 def _CreateOTP(self): 127 def _CreateOTP(self):
115 """Creates the OTP.""" 128 """Creates the OTP."""
116 host_name = socket.gethostname() 129 controller_name = socket.gethostname()
117 test_name = os.path.basename(sys.argv[0]) 130 test_name = os.path.basename(sys.argv[0])
118 creation_time = datetime.datetime.utcnow() 131 creation_time = datetime.datetime.utcnow()
119 otp = 'client:%s-host:%s-test:%s-creation:%s' % ( 132 otp = 'task:%s controller:%s test:%s creation:%s' % (
120 self._name, host_name, test_name, creation_time) 133 self._name, controller_name, test_name, creation_time)
121 return otp 134 return otp
122 135
123 def Create(self): 136 def Create(self):
124 """Creates the client machine.""" 137 """Creates the task machine."""
125 logging.info('Creating %s', self.name) 138 logging.info('Creating %s', self.name)
126 self._connect_event.clear() 139 self._connect_event.clear()
127 self._ExecuteIsolate() 140 self._ExecuteIsolate()
128 self._ExecuteSwarming() 141 self._ExecuteSwarming()
129 142
130 def WaitForConnection(self): 143 def WaitForConnection(self):
131 """Waits for the client machine to connect. 144 """Waits for the task machine to connect.
132 145
133 Raises: 146 Raises:
134 ConnectionTimeoutError if the client doesn't connect in time. 147 ConnectionTimeoutError if the task doesn't connect in time.
135 """ 148 """
136 logging.info('Waiting for %s to connect with a timeout of %d seconds', 149 logging.info('Waiting for %s to connect with a timeout of %d seconds',
137 self._name, self._connection_timeout_secs) 150 self._name, self._connection_timeout_secs)
138 self._connect_event.wait(self._connection_timeout_secs) 151 self._connect_event.wait(self._connection_timeout_secs)
139 if not self._connect_event.is_set(): 152 if not self._connect_event.is_set():
140 raise ConnectionTimeoutError('%s failed to connect' % self.name) 153 raise ConnectionTimeoutError('%s failed to connect' % self.name)
141 154
142 def Release(self): 155 def Release(self):
143 """Quits the client's RPC server so it can release the machine.""" 156 """Quits the task's RPC server so it can release the machine."""
144 if self._rpc is not None and self._connected: 157 if self._rpc is not None and self._connected:
145 logging.info('Releasing %s', self._name) 158 logging.info('Releasing %s', self._name)
146 try: 159 try:
147 self._rpc.Quit() 160 self._rpc.Quit()
148 except (socket.error, xmlrpclib.Fault): 161 except (socket.error, xmlrpclib.Fault):
149 logging.error('Unable to connect to %s to call Quit', self.name) 162 logging.error('Unable to connect to %s to call Quit', self.name)
150 self._rpc = None 163 self._rpc = None
151 self._connected = False 164 self._connected = False
152 165
153 def _ExecuteIsolate(self): 166 def _ExecuteIsolate(self):
(...skipping 25 matching lines...) Expand all
179 192
180 if self._isolate_server: 193 if self._isolate_server:
181 cmd.extend(['--isolate-server', self._isolate_server]) 194 cmd.extend(['--isolate-server', self._isolate_server])
182 if self._swarming_server: 195 if self._swarming_server:
183 cmd.extend(['--swarming', self._swarming_server]) 196 cmd.extend(['--swarming', self._swarming_server])
184 for key, value in self._dimensions.iteritems(): 197 for key, value in self._dimensions.iteritems():
185 cmd.extend(['--dimension', key, value]) 198 cmd.extend(['--dimension', key, value])
186 199
187 cmd.extend([ 200 cmd.extend([
188 '--', 201 '--',
189 '--host', common_lib.MY_IP, 202 '--controller', common_lib.MY_IP,
190 '--otp', self._otp, 203 '--otp', self._otp,
191 '--verbosity', self._verbosity, 204 '--verbosity', self._verbosity,
192 '--idle-timeout', str(self._idle_timeout_secs), 205 '--idle-timeout', str(self._idle_timeout_secs),
193 ]) 206 ])
194 207
195 self._ExecuteProcess(cmd) 208 self._ExecuteProcess(cmd)
196 209
197 def _ExecuteProcess(self, cmd): 210 def _ExecuteProcess(self, cmd):
198 """Executes a process, waits for it to complete, and checks for success.""" 211 """Executes a process, waits for it to complete, and checks for success."""
199 logging.debug('Running %s', ' '.join(cmd)) 212 logging.debug('Running %s', ' '.join(cmd))
200 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 213 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
201 _, stderr = p.communicate() 214 _, stderr = p.communicate()
202 if p.returncode != 0: 215 if p.returncode != 0:
203 raise Error(stderr) 216 raise Error(stderr)
204 217
205 def OnConnect(self, ip_address): 218 def OnConnect(self, ip_address):
206 """Receives client ip address on connection.""" 219 """Receives task ip address on connection."""
207 self._ip_address = ip_address 220 self._ip_address = ip_address
208 self._connected = True 221 self._connected = True
209 self._rpc = common_lib.ConnectToServer(self._ip_address) 222 self._rpc = common_lib.ConnectToServer(self._ip_address)
210 logging.info('%s connected from %s', self._name, ip_address) 223 logging.info('%s connected from %s', self._name, ip_address)
211 self._connect_event.set() 224 self._connect_event.set()
OLDNEW
« no previous file with comments | « testing/legion/run_task.py ('k') | testing/legion/task_registration_server.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698