Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(70)

Side by Side Diff: testing/legion/task_controller.py

Issue 951673002: Revert "Pull chromium at 2c3ffb2355a27c32f45e508ef861416b820c823b" (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « testing/legion/run_task.py ('k') | testing/legion/task_registration_server.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Defines the task controller library."""
6
7 import argparse
8 import datetime
9 import logging
10 import os
11 import socket
12 import subprocess
13 import sys
14 import tempfile
15 import threading
16 import xmlrpclib
17
18 #pylint: disable=relative-import
19 import common_lib
20
21 ISOLATE_PY = os.path.join(common_lib.SWARMING_DIR, 'isolate.py')
22 SWARMING_PY = os.path.join(common_lib.SWARMING_DIR, 'swarming.py')
23
24
25 class Error(Exception):
26 pass
27
28
29 class ConnectionTimeoutError(Error):
30 pass
31
32
33 class TaskController(object):
34 """Provisions, configures, and controls a task machine.
35
36 This class is an abstraction of a physical task machine. It provides an
37 end to end API for controlling a task machine. Operations on the task machine
38 are performed using the instance's "rpc" property. A simple end to end
39 scenario is as follows:
40
41 task = TaskController(...)
42 task.Create()
43 task.WaitForConnection()
44 proc = task.rpc.subprocess.Popen(['ls'])
45 print task.rpc.subprocess.GetStdout(proc)
46 task.Release()
47 """
48
49 _task_count = 0
50 _tasks = []
51
52 def __init__(self, isolate_file, config_vars, dimensions, priority=100,
53 idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
54 connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
55 verbosity='ERROR', name=None):
56 assert isinstance(config_vars, dict)
57 assert isinstance(dimensions, dict)
58 type(self)._tasks.append(self)
59 type(self)._task_count += 1
60 self.verbosity = verbosity
61 self._name = name or 'Task%d' % type(self)._task_count
62 self._priority = priority
63 self._isolate_file = isolate_file
64 self._isolated_file = isolate_file + 'd'
65 self._idle_timeout_secs = idle_timeout_secs
66 self._config_vars = config_vars
67 self._dimensions = dimensions
68 self._connect_event = threading.Event()
69 self._connected = False
70 self._ip_address = None
71 self._otp = self._CreateOTP()
72 self._rpc = None
73
74 parser = argparse.ArgumentParser()
75 parser.add_argument('--isolate-server')
76 parser.add_argument('--swarming-server')
77 parser.add_argument('--task-connection-timeout-secs',
78 default=common_lib.DEFAULT_TIMEOUT_SECS)
79 args, _ = parser.parse_known_args()
80
81 self._isolate_server = args.isolate_server
82 self._swarming_server = args.swarming_server
83 self._connection_timeout_secs = (connection_timeout_secs or
84 args.task_connection_timeout_secs)
85
86 @property
87 def name(self):
88 return self._name
89
90 @property
91 def otp(self):
92 return self._otp
93
94 @property
95 def connected(self):
96 return self._connected
97
98 @property
99 def connect_event(self):
100 return self._connect_event
101
102 @property
103 def rpc(self):
104 return self._rpc
105
106 @property
107 def verbosity(self):
108 return self._verbosity
109
110 @verbosity.setter
111 def verbosity(self, level):
112 """Sets the verbosity level as a string.
113
114 Either a string ('INFO', 'DEBUG', etc) or a logging level (logging.INFO,
115 logging.DEBUG, etc) is allowed.
116 """
117 assert isinstance(level, (str, int))
118 if isinstance(level, int):
119 level = logging.getLevelName(level)
120 self._verbosity = level #pylint: disable=attribute-defined-outside-init
121
122 @classmethod
123 def ReleaseAllTasks(cls):
124 for task in cls._tasks:
125 task.Release()
126
127 def _CreateOTP(self):
128 """Creates the OTP."""
129 controller_name = socket.gethostname()
130 test_name = os.path.basename(sys.argv[0])
131 creation_time = datetime.datetime.utcnow()
132 otp = 'task:%s controller:%s test:%s creation:%s' % (
133 self._name, controller_name, test_name, creation_time)
134 return otp
135
136 def Create(self):
137 """Creates the task machine."""
138 logging.info('Creating %s', self.name)
139 self._connect_event.clear()
140 self._ExecuteIsolate()
141 self._ExecuteSwarming()
142
143 def WaitForConnection(self):
144 """Waits for the task machine to connect.
145
146 Raises:
147 ConnectionTimeoutError if the task doesn't connect in time.
148 """
149 logging.info('Waiting for %s to connect with a timeout of %d seconds',
150 self._name, self._connection_timeout_secs)
151 self._connect_event.wait(self._connection_timeout_secs)
152 if not self._connect_event.is_set():
153 raise ConnectionTimeoutError('%s failed to connect' % self.name)
154
155 def Release(self):
156 """Quits the task's RPC server so it can release the machine."""
157 if self._rpc is not None and self._connected:
158 logging.info('Releasing %s', self._name)
159 try:
160 self._rpc.Quit()
161 except (socket.error, xmlrpclib.Fault):
162 logging.error('Unable to connect to %s to call Quit', self.name)
163 self._rpc = None
164 self._connected = False
165
166 def _ExecuteIsolate(self):
167 """Executes isolate.py."""
168 cmd = [
169 'python',
170 ISOLATE_PY,
171 'archive',
172 '--isolate', self._isolate_file,
173 '--isolated', self._isolated_file,
174 ]
175
176 if self._isolate_server:
177 cmd.extend(['--isolate-server', self._isolate_server])
178 for key, value in self._config_vars.iteritems():
179 cmd.extend(['--config-var', key, value])
180
181 self._ExecuteProcess(cmd)
182
183 def _ExecuteSwarming(self):
184 """Executes swarming.py."""
185 cmd = [
186 'python',
187 SWARMING_PY,
188 'trigger',
189 self._isolated_file,
190 '--priority', str(self._priority),
191 ]
192
193 if self._isolate_server:
194 cmd.extend(['--isolate-server', self._isolate_server])
195 if self._swarming_server:
196 cmd.extend(['--swarming', self._swarming_server])
197 for key, value in self._dimensions.iteritems():
198 cmd.extend(['--dimension', key, value])
199
200 cmd.extend([
201 '--',
202 '--controller', common_lib.MY_IP,
203 '--otp', self._otp,
204 '--verbosity', self._verbosity,
205 '--idle-timeout', str(self._idle_timeout_secs),
206 ])
207
208 self._ExecuteProcess(cmd)
209
210 def _ExecuteProcess(self, cmd):
211 """Executes a process, waits for it to complete, and checks for success."""
212 logging.debug('Running %s', ' '.join(cmd))
213 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
214 _, stderr = p.communicate()
215 if p.returncode != 0:
216 raise Error(stderr)
217
218 def OnConnect(self, ip_address):
219 """Receives task ip address on connection."""
220 self._ip_address = ip_address
221 self._connected = True
222 self._rpc = common_lib.ConnectToServer(self._ip_address)
223 logging.info('%s connected from %s', self._name, ip_address)
224 self._connect_event.set()
OLDNEW
« no previous file with comments | « testing/legion/run_task.py ('k') | testing/legion/task_registration_server.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698