Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(486)

Side by Side Diff: client/run_isolated.py

Issue 1342673003: Significant refactoring of run_isolated. (Closed) Base URL: git@github.com:luci/luci-py.git@master
Patch Set: . Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 # Copyright 2012 The Swarming Authors. All rights reserved. 2 # Copyright 2012 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 that 3 # Use of this source code is governed under the Apache License, Version 2.0 that
4 # can be found in the LICENSE file. 4 # can be found in the LICENSE file.
5 5
6 """Reads a .isolated, creates a tree of hardlinks and runs the test. 6 """Reads a .isolated, creates a tree of hardlinks and runs the test.
7 7
8 To improve performance, it keeps a local cache. The local cache can safely be 8 To improve performance, it keeps a local cache. The local cache can safely be
9 deleted. 9 deleted.
10 10
11 Any ${ISOLATED_OUTDIR} on the command line will be replaced by the location of a 11 Any ${ISOLATED_OUTDIR} on the command line will be replaced by the location of a
12 temporary directory upon execution of the command specified in the .isolated 12 temporary directory upon execution of the command specified in the .isolated
13 file. All content written to this directory will be uploaded upon termination 13 file. All content written to this directory will be uploaded upon termination
14 and the .isolated file describing this directory will be printed to stdout. 14 and the .isolated file describing this directory will be printed to stdout.
15 """ 15 """
16 16
17 __version__ = '0.4.4' 17 __version__ = '0.5'
18 18
19 import logging 19 import logging
20 import optparse 20 import optparse
21 import os 21 import os
22 import sys 22 import sys
23 import tempfile 23 import tempfile
24 24
25 from third_party.depot_tools import fix_encoding 25 from third_party.depot_tools import fix_encoding
26 26
27 from utils import file_path 27 from utils import file_path
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
115 # deleted. 115 # deleted.
116 file_path.make_tree_writeable(rootdir) 116 file_path.make_tree_writeable(rootdir)
117 else: 117 else:
118 raise ValueError( 118 raise ValueError(
119 'change_tree_read_only(%s, %s): Unknown flag %s' % 119 'change_tree_read_only(%s, %s): Unknown flag %s' %
120 (rootdir, read_only, read_only)) 120 (rootdir, read_only, read_only))
121 121
122 122
123 def process_command(command, out_dir): 123 def process_command(command, out_dir):
124 """Replaces isolated specific variables in a command line.""" 124 """Replaces isolated specific variables in a command line."""
125 filtered = [] 125 def fix(arg):
126 for arg in command:
127 if '${ISOLATED_OUTDIR}' in arg: 126 if '${ISOLATED_OUTDIR}' in arg:
128 arg = arg.replace('${ISOLATED_OUTDIR}', out_dir).replace('/', os.sep) 127 return arg.replace('${ISOLATED_OUTDIR}', out_dir).replace('/', os.sep)
129 filtered.append(arg) 128 return arg
130 return filtered 129
130 return [fix(arg) for arg in command]
131
132
133 def run_command(command, cwd):
134 """Runs the command, returns the process exit code."""
135 logging.info('run_command(%s, %s)' % (command, cwd))
136 sys.stdout.flush()
137 with tools.Profiler('RunTest'):
138 try:
139 with subprocess42.Popen_with_handler(command, cwd=cwd) as p:
140 p.communicate()
141 exit_code = p.returncode
142 except OSError:
143 # This is not considered to be an internal error. The executable simply
144 # does not exit.
145 exit_code = 1
146 logging.info(
147 'Command finished with exit code %d (%s)',
148 exit_code, hex(0xffffffff & exit_code))
149 return exit_code
150
151
152 def delete_and_upload(storage, out_dir, leak_temp_dir):
153 """Deletes the temporary run directory and uploads results back.
154
155 Returns:
156 tuple(outputs_ref, success)
157 - outputs_ref is a dict referring to the results archived back to the
158 isolated server, if applicable.
159 - success is False if something occurred that means that the task must
160 forcibly be considered a failure, e.g. zombie processes were left behind.
161 """
162
163 # Upload out_dir and generate a .isolated file out of this directory. It is
164 # only done if files were written in the directory.
165 outputs_ref = None
166 if os.path.isdir(out_dir) and os.listdir(out_dir):
167 with tools.Profiler('ArchiveOutput'):
168 try:
169 results = isolateserver.archive_files_to_storage(
170 storage, [out_dir], None)
171 outputs_ref = {
172 'isolated': results[0][0],
173 'isolatedserver': storage.location,
174 'namespace': storage.namespace,
175 }
176 except isolateserver.Aborted:
177 # This happens when a signal SIGTERM was received while uploading data.
178 # There is 2 causes:
179 # - The task was too slow and was about to be killed anyway due to
180 # exceeding the hard timeout.
181 # - The amount of data uploaded back is very large and took too much
182 # time to archive.
183 sys.stderr.write('Received SIGTERM while uploading')
184 # Re-raise, so it will be treated as an internal failure.
185 raise
186 try:
187 if not leak_temp_dir and not file_path.rmtree(out_dir):
188 logging.error('Had difficulties removing out_dir %s', out_dir)
189 return outputs_ref, False
190 except OSError as e:
191 # When this happens, it means there's a process error.
192 logging.error('Had difficulties removing out_dir %s: %s', out_dir, e)
193 return outputs_ref, False
194 return outputs_ref, True
195
196
197 def map_and_run(isolated_hash, storage, cache, leak_temp_dir, extra_args):
198 """Maps and run the command. Returns metadata about the result."""
199 # TODO(maruel): Include performance statistics.
200 result = {
201 'exit_code': None,
202 'internal_failure': None,
203 'outputs_ref': None,
204 'version': 1,
205 }
206 tmp_root = os.path.dirname(cache.cache_dir) if cache.cache_dir else None
207 run_dir = make_temp_dir(u'run_tha_test', tmp_root)
208 out_dir = unicode(make_temp_dir(u'isolated_out', tmp_root))
209 try:
210 bundle = isolateserver.fetch_isolated(
211 isolated_hash=isolated_hash,
212 storage=storage,
213 cache=cache,
214 outdir=run_dir,
215 require_command=True)
216
217 change_tree_read_only(run_dir, bundle.read_only)
218 cwd = os.path.normpath(os.path.join(run_dir, bundle.relative_cwd))
219 command = bundle.command + extra_args
220 file_path.ensure_command_has_abs_path(command, cwd)
221 result['exit_code'] = run_command(process_command(command, out_dir), cwd)
222 except Exception as e:
223 # An internal error occured. Report accordingly so the swarming task will be
224 # retried automatically.
225 logging.error('internal failure: %s', e)
226 result['internal_failure'] = str(e)
227 on_error.report(None)
228 finally:
229 try:
230 if leak_temp_dir:
231 logging.warning(
232 'Deliberately leaking %s for later examination', run_dir)
233 elif not file_path.rmtree(run_dir):
234 # On Windows rmtree(run_dir) call above has a synchronization effect: it
235 # finishes only when all task child processes terminate (since a running
236 # process locks *.exe file). Examine out_dir only after that call
237 # completes (since child processes may write to out_dir too and we need
238 # to wait for them to finish).
239 print >> sys.stderr, (
240 'Failed to delete the temporary directory, forcibly failing\n'
241 'the task because of it. No zombie process can outlive a\n'
242 'successful task run and still be marked as successful.\n'
243 'Fix your stuff.')
244 if result['exit_code'] == 0:
245 result['exit_code'] = 1
246
247 result['outputs_ref'], success = delete_and_upload(
248 storage, out_dir, leak_temp_dir)
249 if not success and result['exit_code'] == 0:
250 result['exit_code'] = 1
251 except Exception as e:
252 # Swallow any exception in the main finally clause.
253 logging.error('Leaking out_dir %s: %s', out_dir, e)
254 result['internal_failure'] = str(e)
255 return result
131 256
132 257
133 def run_tha_test( 258 def run_tha_test(
134 isolated_hash, storage, cache, leak_temp_dir, result_json, extra_args): 259 isolated_hash, storage, cache, leak_temp_dir, result_json, extra_args):
135 """Downloads the dependencies in the cache, hardlinks them into a temporary 260 """Downloads the dependencies in the cache, hardlinks them into a temporary
136 directory and runs the executable from there. 261 directory and runs the executable from there.
137 262
138 A temporary directory is created to hold the output files. The content inside 263 A temporary directory is created to hold the output files. The content inside
139 this directory will be uploaded back to |storage| packaged as a .isolated 264 this directory will be uploaded back to |storage| packaged as a .isolated
140 file. 265 file.
141 266
142 Arguments: 267 Arguments:
143 isolated_hash: the SHA-1 of the .isolated file that must be retrieved to 268 isolated_hash: the SHA-1 of the .isolated file that must be retrieved to
144 recreate the tree of files to run the target executable. 269 recreate the tree of files to run the target executable.
145 storage: an isolateserver.Storage object to retrieve remote objects. This 270 storage: an isolateserver.Storage object to retrieve remote objects. This
146 object has a reference to an isolateserver.StorageApi, which does 271 object has a reference to an isolateserver.StorageApi, which does
147 the actual I/O. 272 the actual I/O.
148 cache: an isolateserver.LocalCache to keep from retrieving the same objects 273 cache: an isolateserver.LocalCache to keep from retrieving the same objects
149 constantly by caching the objects retrieved. Can be on-disk or 274 constantly by caching the objects retrieved. Can be on-disk or
150 in-memory. 275 in-memory.
151 leak_temp_dir: if true, the temporary directory will be deliberately leaked 276 leak_temp_dir: if true, the temporary directory will be deliberately leaked
152 for later examination. 277 for later examination.
153 result_json: file path to dump result metadata into. 278 result_json: file path to dump result metadata into. If set, the process
279 exit code is always 0 unless an internal error occured.
154 extra_args: optional arguments to add to the command stated in the .isolate 280 extra_args: optional arguments to add to the command stated in the .isolate
155 file. 281 file.
282
283 Returns:
284 Process exit code that should be used.
156 """ 285 """
157 tmp_root = os.path.dirname(cache.cache_dir) if cache.cache_dir else None 286 # run_isolated exit code. Depends on if result_json is used or not.
158 run_dir = make_temp_dir(u'run_tha_test', tmp_root) 287 result = map_and_run(
159 out_dir = unicode(make_temp_dir(u'isolated_out', tmp_root)) 288 isolated_hash, storage, cache, leak_temp_dir, extra_args)
160 result = 0 289 logging.info('Result:\n%s', tools.format_json(result, dense=True))
161 try: 290 if result_json:
162 try: 291 tools.write_json(result_json, result, dense=True)
163 bundle = isolateserver.fetch_isolated( 292 # Only return 1 if there was an internal error.
164 isolated_hash=isolated_hash, 293 return int(bool(result['internal_failure']))
165 storage=storage,
166 cache=cache,
167 outdir=run_dir,
168 require_command=True)
169 except isolated_format.IsolatedError:
170 on_error.report(None)
171 return 1
172 294
173 change_tree_read_only(run_dir, bundle.read_only) 295 # Marshall into old-style inline output.
174 cwd = os.path.normpath(os.path.join(run_dir, bundle.relative_cwd)) 296 if result['outputs_ref']:
175 command = bundle.command + extra_args 297 data = {
176 298 'hash': result['outputs_ref']['isolated'],
177 file_path.ensure_command_has_abs_path(command, cwd) 299 'namespace': result['outputs_ref']['namespace'],
178 command = process_command(command, out_dir) 300 'storage': result['outputs_ref']['isolatedserver'],
179 logging.info('Running %s, cwd=%s' % (command, cwd)) 301 }
180
181 # TODO(csharp): This should be specified somewhere else.
182 # TODO(vadimsh): Pass it via 'env_vars' in manifest.
183 # Add a rotating log file if one doesn't already exist.
184 env = os.environ.copy()
185 if MAIN_DIR:
186 env.setdefault('RUN_TEST_CASES_LOG_FILE',
187 os.path.join(MAIN_DIR, RUN_TEST_CASES_LOG))
188 sys.stdout.flush() 302 sys.stdout.flush()
189 with tools.Profiler('RunTest'): 303 print(
190 try: 304 '[run_isolated_out_hack]%s[/run_isolated_out_hack]' %
191 with subprocess42.Popen_with_handler(command, cwd=cwd, env=env) as p: 305 tools.format_json(data, dense=True))
192 p.communicate() 306 return result['exit_code'] or int(bool(result['internal_failure']))
193 result = p.returncode
194 except OSError:
195 on_error.report('Failed to run %s; cwd=%s' % (command, cwd))
196 result = 1
197 logging.info(
198 'Command finished with exit code %d (%s)',
199 result, hex(0xffffffff & result))
200 finally:
201 try:
202 if leak_temp_dir:
203 logging.warning('Deliberately leaking %s for later examination',
204 run_dir)
205 else:
206 try:
207 if not file_path.rmtree(run_dir):
208 print >> sys.stderr, (
209 'Failed to delete the temporary directory, forcibly failing\n'
210 'the task because of it. No zombie process can outlive a\n'
211 'successful task run and still be marked as successful.\n'
212 'Fix your stuff.')
213 result = result or 1
214 except OSError as exc:
215 logging.error('Leaking run_dir %s: %s', run_dir, exc)
216 result = 1
217
218 # HACK(vadimsh): On Windows rmtree(run_dir) call above has
219 # a synchronization effect: it finishes only when all task child processes
220 # terminate (since a running process locks *.exe file). Examine out_dir
221 # only after that call completes (since child processes may
222 # write to out_dir too and we need to wait for them to finish).
223
224 # Upload out_dir and generate a .isolated file out of this directory.
225 # It is only done if files were written in the directory.
226 if os.path.isdir(out_dir) and os.listdir(out_dir):
227 with tools.Profiler('ArchiveOutput'):
228 try:
229 results = isolateserver.archive_files_to_storage(
230 storage, [out_dir], None)
231 except isolateserver.Aborted:
232 # This happens when a signal SIGTERM was received while uploading
233 # data. There is 2 causes:
234 # - The task was too slow and was about to be killed anyway due to
235 # exceeding the hard timeout.
236 # - The amount of data uploaded back is very large and took too much
237 # time to archive.
238 #
239 # There's 3 options to handle this:
240 # - Ignore the upload failure as a silent failure. This can be
241 # detected client side by the fact no result file exists.
242 # - Return as if the task failed. This is not factually correct.
243 # - Return an internal failure. Sadly, it's impossible at this level
244 # at the moment.
245 #
246 # For now, silently drop the upload.
247 #
248 # In any case, the process only has a very short grace period so it
249 # needs to exit right away.
250 sys.stderr.write('Received SIGTERM while uploading')
251 results = None
252
253 if results:
254 if result_json:
255 data = {
256 'isolated': results[0][0],
257 'isolatedserver': storage.location,
258 'namespace': storage.namespace,
259 }
260 tools.write_json(result_json, data, dense=True)
261 else:
262 data = {
263 'hash': results[0][0],
264 'namespace': storage.namespace,
265 'storage': storage.location,
266 }
267 sys.stdout.flush()
268 print(
269 '[run_isolated_out_hack]%s[/run_isolated_out_hack]' %
270 tools.format_json(data, dense=True))
271 logging.info('%s', data)
272
273 finally:
274 try:
275 if os.path.isdir(out_dir) and not file_path.rmtree(out_dir):
276 logging.error('Had difficulties removing out_dir %s', out_dir)
277 result = result or 1
278 except OSError as exc:
279 # Only report on non-Windows or on Windows when the process had
280 # succeeded. Due to the way file sharing works on Windows, it's sadly
281 # expected that file deletion may fail when a test failed.
282 logging.error('Failed to remove out_dir %s: %s', out_dir, exc)
283 if sys.platform != 'win32' or not result:
284 on_error.report(None)
285 result = 1
286
287 return result
288 307
289 308
290 def main(args): 309 def main(args):
291 tools.disable_buffering() 310 tools.disable_buffering()
292 parser = logging_utils.OptionParserWithLogging( 311 parser = logging_utils.OptionParserWithLogging(
293 usage='%prog <options>', 312 usage='%prog <options>',
294 version=__version__, 313 version=__version__,
295 log_file=RUN_ISOLATED_LOG_FILE) 314 log_file=RUN_ISOLATED_LOG_FILE)
296 315 parser.add_option(
297 parser.add_option('--json', help='dump output metadata to json file') 316 '--json',
317 help='dump output metadata to json file. When used, run_isolated returns '
318 'non-zero only on internal failure')
298 data_group = optparse.OptionGroup(parser, 'Data source') 319 data_group = optparse.OptionGroup(parser, 'Data source')
299 data_group.add_option( 320 data_group.add_option(
300 '-s', '--isolated', 321 '-s', '--isolated',
301 help='Hash of the .isolated to grab from the isolate server') 322 help='Hash of the .isolated to grab from the isolate server')
302 data_group.add_option( 323 data_group.add_option(
303 '-H', dest='isolated', help=optparse.SUPPRESS_HELP) 324 '-H', dest='isolated', help=optparse.SUPPRESS_HELP)
304 isolateserver.add_isolate_server_options(data_group) 325 isolateserver.add_isolate_server_options(data_group)
305 parser.add_option_group(data_group) 326 parser.add_option_group(data_group)
306 327
307 isolateserver.add_cache_options(parser) 328 isolateserver.add_cache_options(parser)
(...skipping 21 matching lines...) Expand all
329 assert storage.hash_algo == cache.hash_algo 350 assert storage.hash_algo == cache.hash_algo
330 return run_tha_test( 351 return run_tha_test(
331 options.isolated, storage, cache, options.leak_temp_dir, options.json, 352 options.isolated, storage, cache, options.leak_temp_dir, options.json,
332 args) 353 args)
333 354
334 355
335 if __name__ == '__main__': 356 if __name__ == '__main__':
336 # Ensure that we are always running with the correct encoding. 357 # Ensure that we are always running with the correct encoding.
337 fix_encoding.fix_encoding() 358 fix_encoding.fix_encoding()
338 sys.exit(main(sys.argv[1:])) 359 sys.exit(main(sys.argv[1:]))
OLDNEW
« no previous file with comments | « appengine/swarming/swarming_bot/bot_code/task_runner.py ('k') | client/tests/run_isolated_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698