Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(158)

Side by Side Diff: testing/legion/task_controller.py

Issue 943923003: Removing the isolate.py functionality and requiring tasks to be created from an archive hash rather… (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fixing a small style issue. Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « testing/legion/examples/subprocess/subprocess_test.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright 2015 The Chromium Authors. All rights reserved. 1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Defines the task controller library.""" 5 """Defines the task controller library."""
6 6
7 import argparse 7 import argparse
8 import datetime 8 import datetime
9 import logging 9 import logging
10 import os 10 import os
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
42 task.Create() 42 task.Create()
43 task.WaitForConnection() 43 task.WaitForConnection()
44 proc = task.rpc.subprocess.Popen(['ls']) 44 proc = task.rpc.subprocess.Popen(['ls'])
45 print task.rpc.subprocess.GetStdout(proc) 45 print task.rpc.subprocess.GetStdout(proc)
46 task.Release() 46 task.Release()
47 """ 47 """
48 48
49 _task_count = 0 49 _task_count = 0
50 _tasks = [] 50 _tasks = []
51 51
52 def __init__(self, isolate_file, config_vars, dimensions, priority=100, 52 def __init__(self, isolated_hash, dimensions, priority=100,
53 idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS, 53 idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
54 connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS, 54 connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
55 verbosity='ERROR', name=None): 55 verbosity='ERROR', name=None, run_id=None):
56 assert isinstance(config_vars, dict)
57 assert isinstance(dimensions, dict) 56 assert isinstance(dimensions, dict)
58 type(self)._tasks.append(self) 57 type(self)._tasks.append(self)
59 type(self)._task_count += 1 58 type(self)._task_count += 1
60 self.verbosity = verbosity 59 self.verbosity = verbosity
61 self._name = name or 'Task%d' % type(self)._task_count 60 self._name = name or 'Task%d' % type(self)._task_count
62 self._priority = priority 61 self._priority = priority
63 self._isolate_file = isolate_file 62 self._isolated_hash = isolated_hash
64 self._isolated_file = isolate_file + 'd'
65 self._idle_timeout_secs = idle_timeout_secs 63 self._idle_timeout_secs = idle_timeout_secs
66 self._config_vars = config_vars
67 self._dimensions = dimensions 64 self._dimensions = dimensions
68 self._connect_event = threading.Event() 65 self._connect_event = threading.Event()
69 self._connected = False 66 self._connected = False
70 self._ip_address = None 67 self._ip_address = None
71 self._otp = self._CreateOTP() 68 self._otp = self._CreateOTP()
72 self._rpc = None 69 self._rpc = None
73 70
71 run_id = run_id or datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
72 self._task_name = '%s/%s/%s' % (
73 os.path.splitext(sys.argv[0])[0], self._name, run_id)
74
74 parser = argparse.ArgumentParser() 75 parser = argparse.ArgumentParser()
75 parser.add_argument('--isolate-server') 76 parser.add_argument('--isolate-server')
76 parser.add_argument('--swarming-server') 77 parser.add_argument('--swarming-server')
77 parser.add_argument('--task-connection-timeout-secs', 78 parser.add_argument('--task-connection-timeout-secs',
78 default=common_lib.DEFAULT_TIMEOUT_SECS) 79 default=common_lib.DEFAULT_TIMEOUT_SECS)
79 args, _ = parser.parse_known_args() 80 args, _ = parser.parse_known_args()
80 81
81 self._isolate_server = args.isolate_server 82 self._isolate_server = args.isolate_server
82 self._swarming_server = args.swarming_server 83 self._swarming_server = args.swarming_server
83 self._connection_timeout_secs = (connection_timeout_secs or 84 self._connection_timeout_secs = (connection_timeout_secs or
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
130 test_name = os.path.basename(sys.argv[0]) 131 test_name = os.path.basename(sys.argv[0])
131 creation_time = datetime.datetime.utcnow() 132 creation_time = datetime.datetime.utcnow()
132 otp = 'task:%s controller:%s test:%s creation:%s' % ( 133 otp = 'task:%s controller:%s test:%s creation:%s' % (
133 self._name, controller_name, test_name, creation_time) 134 self._name, controller_name, test_name, creation_time)
134 return otp 135 return otp
135 136
136 def Create(self): 137 def Create(self):
137 """Creates the task machine.""" 138 """Creates the task machine."""
138 logging.info('Creating %s', self.name) 139 logging.info('Creating %s', self.name)
139 self._connect_event.clear() 140 self._connect_event.clear()
140 self._ExecuteIsolate()
141 self._ExecuteSwarming() 141 self._ExecuteSwarming()
142 142
143 def WaitForConnection(self): 143 def WaitForConnection(self):
144 """Waits for the task machine to connect. 144 """Waits for the task machine to connect.
145 145
146 Raises: 146 Raises:
147 ConnectionTimeoutError if the task doesn't connect in time. 147 ConnectionTimeoutError if the task doesn't connect in time.
148 """ 148 """
149 logging.info('Waiting for %s to connect with a timeout of %d seconds', 149 logging.info('Waiting for %s to connect with a timeout of %d seconds',
150 self._name, self._connection_timeout_secs) 150 self._name, self._connection_timeout_secs)
151 self._connect_event.wait(self._connection_timeout_secs) 151 self._connect_event.wait(self._connection_timeout_secs)
152 if not self._connect_event.is_set(): 152 if not self._connect_event.is_set():
153 raise ConnectionTimeoutError('%s failed to connect' % self.name) 153 raise ConnectionTimeoutError('%s failed to connect' % self.name)
154 154
155 def Release(self): 155 def Release(self):
156 """Quits the task's RPC server so it can release the machine.""" 156 """Quits the task's RPC server so it can release the machine."""
157 if self._rpc is not None and self._connected: 157 if self._rpc is not None and self._connected:
158 logging.info('Releasing %s', self._name) 158 logging.info('Releasing %s', self._name)
159 try: 159 try:
160 self._rpc.Quit() 160 self._rpc.Quit()
161 except (socket.error, xmlrpclib.Fault): 161 except (socket.error, xmlrpclib.Fault):
162 logging.error('Unable to connect to %s to call Quit', self.name) 162 logging.error('Unable to connect to %s to call Quit', self.name)
163 self._rpc = None 163 self._rpc = None
164 self._connected = False 164 self._connected = False
165 165
166 def _ExecuteIsolate(self):
167 """Executes isolate.py."""
168 cmd = [
169 'python',
170 ISOLATE_PY,
171 'archive',
172 '--isolate', self._isolate_file,
173 '--isolated', self._isolated_file,
174 ]
175
176 if self._isolate_server:
177 cmd.extend(['--isolate-server', self._isolate_server])
178 for key, value in self._config_vars.iteritems():
179 cmd.extend(['--config-var', key, value])
180
181 self._ExecuteProcess(cmd)
182
183 def _ExecuteSwarming(self): 166 def _ExecuteSwarming(self):
184 """Executes swarming.py.""" 167 """Executes swarming.py."""
185 cmd = [ 168 cmd = [
186 'python', 169 'python',
187 SWARMING_PY, 170 SWARMING_PY,
188 'trigger', 171 'trigger',
189 self._isolated_file, 172 self._isolated_hash,
190 '--priority', str(self._priority), 173 '--priority', str(self._priority),
174 '--task-name', self._task_name,
191 ] 175 ]
192 176
193 if self._isolate_server: 177 if self._isolate_server:
194 cmd.extend(['--isolate-server', self._isolate_server]) 178 cmd.extend(['--isolate-server', self._isolate_server])
195 if self._swarming_server: 179 if self._swarming_server:
196 cmd.extend(['--swarming', self._swarming_server]) 180 cmd.extend(['--swarming', self._swarming_server])
197 for key, value in self._dimensions.iteritems(): 181 for key, value in self._dimensions.iteritems():
198 cmd.extend(['--dimension', key, value]) 182 cmd.extend(['--dimension', key, value])
199 183
200 cmd.extend([ 184 cmd.extend([
(...skipping 14 matching lines...) Expand all
215 if p.returncode != 0: 199 if p.returncode != 0:
216 raise Error(stderr) 200 raise Error(stderr)
217 201
218 def OnConnect(self, ip_address): 202 def OnConnect(self, ip_address):
219 """Receives task ip address on connection.""" 203 """Receives task ip address on connection."""
220 self._ip_address = ip_address 204 self._ip_address = ip_address
221 self._connected = True 205 self._connected = True
222 self._rpc = common_lib.ConnectToServer(self._ip_address) 206 self._rpc = common_lib.ConnectToServer(self._ip_address)
223 logging.info('%s connected from %s', self._name, ip_address) 207 logging.info('%s connected from %s', self._name, ip_address)
224 self._connect_event.set() 208 self._connect_event.set()
OLDNEW
« no previous file with comments | « testing/legion/examples/subprocess/subprocess_test.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698