Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(226)

Side by Side Diff: appengine/swarming/swarming_bot/bot_code/task_runner.py

Issue 2300543002: Minor refactoring in task_runner.py in preparation for adding more code. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2013 The LUCI Authors. All rights reserved. 1 # Copyright 2013 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """Runs a Swarming task. 5 """Runs a Swarming task.
6 6
7 Downloads all the necessary files to run the task, executes the command and 7 Downloads all the necessary files to run the task, executes the command and
8 streams results back to the Swarming server. 8 streams results back to the Swarming server.
9 9
10 The process exit code is 0 when the task was executed, even if the task itself 10 The process exit code is 0 when the task was executed, even if the task itself
(...skipping 13 matching lines...) Expand all
24 import time 24 import time
25 import traceback 25 import traceback
26 26
27 from utils import file_path 27 from utils import file_path
28 from utils import net 28 from utils import net
29 from utils import on_error 29 from utils import on_error
30 from utils import subprocess42 30 from utils import subprocess42
31 from utils import zip_package 31 from utils import zip_package
32 32
33 import bot_auth 33 import bot_auth
34 import file_reader
35 34
36 35
37 # Path to this file or the zip containing this file. 36 # Path to this file or the zip containing this file.
38 THIS_FILE = os.path.abspath(zip_package.get_main_script_path()) 37 THIS_FILE = os.path.abspath(zip_package.get_main_script_path())
39 38
40 39
41 # Sends a maximum of 100kb of stdout per task_update packet. 40 # Sends a maximum of 100kb of stdout per task_update packet.
42 MAX_CHUNK_SIZE = 102400 41 MAX_CHUNK_SIZE = 102400
43 42
44 43
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
145 cmd.append('--') 144 cmd.append('--')
146 if task_details.command: 145 if task_details.command:
147 cmd.extend(task_details.command) 146 cmd.extend(task_details.command)
148 elif task_details.extra_args: 147 elif task_details.extra_args:
149 cmd.extend(task_details.extra_args) 148 cmd.extend(task_details.extra_args)
150 return cmd 149 return cmd
151 150
152 151
153 class TaskDetails(object): 152 class TaskDetails(object):
154 def __init__(self, data): 153 def __init__(self, data):
155 """Loads the raw data from a manifest file specified by --in-file."""
156 logging.info('TaskDetails(%s)', data) 154 logging.info('TaskDetails(%s)', data)
157 if not isinstance(data, dict): 155 if not isinstance(data, dict):
158 raise ValueError('Expected dict, got %r' % data) 156 raise InternalError('Expected dict in task_runner_in.json, got %r' % data)
159 157
160 # Get all the data first so it fails early if the task details is invalid. 158 # Get all the data first so it fails early if the task details is invalid.
161 self.bot_id = data['bot_id'] 159 self.bot_id = data['bot_id']
162 160
163 # Raw command. Only self.command or self.isolated.input can be set. 161 # Raw command. Only self.command or self.isolated.input can be set.
164 self.command = data['command'] or [] 162 self.command = data['command'] or []
165 163
166 # Isolated command. Is a serialized version of task_request.FilesRef. 164 # Isolated command. Is a serialized version of task_request.FilesRef.
167 self.isolated = data['isolated'] 165 self.isolated = data['isolated']
168 self.extra_args = data['extra_args'] 166 self.extra_args = data['extra_args']
169 167
170 self.cipd_input = data.get('cipd_input') 168 self.cipd_input = data.get('cipd_input')
171 169
172 self.env = { 170 self.env = {
173 k.encode('utf-8'): v.encode('utf-8') for k, v in data['env'].iteritems() 171 k.encode('utf-8'): v.encode('utf-8') for k, v in data['env'].iteritems()
174 } 172 }
175 self.grace_period = data['grace_period'] 173 self.grace_period = data['grace_period']
176 self.hard_timeout = data['hard_timeout'] 174 self.hard_timeout = data['hard_timeout']
177 self.io_timeout = data['io_timeout'] 175 self.io_timeout = data['io_timeout']
178 self.task_id = data['task_id'] 176 self.task_id = data['task_id']
179 177
178 @staticmethod
179 def load(path):
180 """Loads the TaskDetails from a file on disk (specified via --in-file).
180 181
181 class MustExit(Exception): 182 Raises InternalError if the file can't be read or parsed.
182 """Raised on signal that the process must exit immediately.""" 183 """
184 try:
185 with open(path, 'rb') as f:
186 return TaskDetails(json.load(f))
187 except (IOError, ValueError) as e:
188 raise InternalError('Cannot load task_runner_in.json: %s' % e)
189
190
191 class ExitSignal(Exception):
192 """Raised on a signal that the process must exit immediately."""
183 def __init__(self, sig): 193 def __init__(self, sig):
184 super(MustExit, self).__init__() 194 super(ExitSignal, self).__init__(u'task_runner received signal %s' % sig)
185 self.signal = sig 195 self.signal = sig
186 196
187 197
198 class InternalError(Exception):
199 """Raised on unrecoverable errors that abort task with 'internal error'."""
200
201
188 def load_and_run( 202 def load_and_run(
189 in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space, 203 in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space,
190 bot_file, auth_params_file): 204 bot_file, auth_params_file):
191 """Loads the task's metadata and execute it. 205 """Loads the task's metadata, prepares auth environment and executes the task.
192 206
193 This may throw all sorts of exceptions in case of failure. It's up to the 207 This may throw all sorts of exceptions in case of failure. It's up to the
194 caller to trap them. These shall be considered 'internal_failure' instead of 208 caller to trap them. These shall be considered 'internal_failure' instead of
195 'failure' from a TaskRunResult standpoint. 209 'failure' from a TaskRunResult standpoint.
196 """ 210 """
197 # The work directory is guaranteed to exist since it was created by 211 auth_system = None
198 # bot_main.py and contains the manifest. Temporary files will be downloaded
199 # there. It's bot_main.py that will delete the directory afterward. Tests are
200 # not run from there.
201 task_result = None 212 task_result = None
213 work_dir = os.path.dirname(out_file)
214
202 def handler(sig, _): 215 def handler(sig, _):
203 logging.info('Got signal %s', sig) 216 logging.info('Got signal %s', sig)
204 raise MustExit(sig) 217 raise ExitSignal(sig)
205 work_dir = os.path.dirname(out_file) 218
206 try: 219 try:
207 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): 220 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler):
221 # The work directory is guaranteed to exist since it was created by
Vadim Sh. 2016/08/31 03:53:33 moved comment to closer where it is relevant
222 # bot_main.py and contains the manifest. Temporary files will be
223 # downloaded there. It's bot_main.py that will delete the directory
224 # afterward. Tests are not run from there.
208 if not os.path.isdir(work_dir): 225 if not os.path.isdir(work_dir):
209 raise ValueError('%s expected to exist' % work_dir) 226 raise InternalError('%s expected to exist' % work_dir)
210 227
211 with open(in_file, 'rb') as f: 228 # Raises InternalError on errors.
212 task_details = TaskDetails(json.load(f)) 229 task_details = TaskDetails.load(in_file)
213 230
231 # This will start a thread that occasionally reads bot authentication
232 # headers from 'auth_params_file'.
233 if auth_params_file:
234 try:
235 auth_system = bot_auth.AuthSystem()
236 auth_system.start(auth_params_file)
237 except bot_auth.AuthSystemError as e:
238 raise InternalError('Failed to init auth: %s' % e)
239
240 # Returns bot authentication headers dict or raises InternalError.
241 def headers_cb():
242 try:
243 return auth_system.bot_headers if auth_system else {}
244 except bot_auth.AuthSystemError as e:
245 raise InternalError('Failed to grab bot auth headers: %s' % e)
246
247 # Auth environment is up, start the command. task_result is dumped to
248 # disk in 'finally' block.
214 task_result = run_command( 249 task_result = run_command(
215 swarming_server, task_details, work_dir, 250 swarming_server, task_details, work_dir,
216 cost_usd_hour, start, min_free_space, bot_file, auth_params_file) 251 cost_usd_hour, start, min_free_space, bot_file, headers_cb)
217 except MustExit as e: 252
253 except (ExitSignal, InternalError) as e:
Vadim Sh. 2016/08/31 03:53:33 InternalError here is new. I believe it's more cle
218 # This normally means run_command() didn't get the chance to run, as it 254 # This normally means run_command() didn't get the chance to run, as it
219 # itself trap MustExit and will report accordingly. In this case, we want 255 # itself traps exceptions and will report accordingly. In this case, we want
220 # the parent process to send the message instead. 256 # the parent process to send the message instead.
221 if not task_result: 257 if not task_result:
222 task_result = { 258 task_result = {
223 u'exit_code': None, 259 u'exit_code': -1,
224 u'hard_timeout': False, 260 u'hard_timeout': False,
225 u'io_timeout': False, 261 u'io_timeout': False,
226 u'must_signal_internal_failure': 262 u'must_signal_internal_failure': e.message or 'unknown error',
M-A Ruel 2016/08/31 15:03:47 str(e.message) if e.message else 'unknown error'
Vadim Sh. 2016/09/01 00:15:56 Done.
227 u'task_runner received signal %s' % e.signal,
228 u'version': OUT_VERSION, 263 u'version': OUT_VERSION,
229 } 264 }
265
230 finally: 266 finally:
231 # We've found tests to delete the working directory work_dir when quitting, 267 # We've found tests to delete the working directory work_dir when quitting,
232 # causing an exception here. Try to recreate the directory if necessary. 268 # causing an exception here. Try to recreate the directory if necessary.
233 if not os.path.isdir(work_dir): 269 if not os.path.isdir(work_dir):
234 os.mkdir(work_dir) 270 os.mkdir(work_dir)
271 if auth_system:
M-A Ruel 2016/08/31 15:03:47 One thing that could help is to use contextlib. co
Vadim Sh. 2016/09/01 00:15:56 I've considered this. AuthSystem isn't supposed to
272 auth_system.stop()
235 with open(out_file, 'wb') as f: 273 with open(out_file, 'wb') as f:
236 json.dump(task_result, f) 274 json.dump(task_result, f)
237 275
238 276
239 def post_update( 277 def post_update(
240 swarming_server, auth_headers, params, exit_code, 278 swarming_server, auth_headers, params, exit_code,
241 stdout, output_chunk_start): 279 stdout, output_chunk_start):
242 """Posts task update to task_update. 280 """Posts task update to task_update.
243 281
244 Arguments: 282 Arguments:
245 swarming_server: Base URL to Swarming server. 283 swarming_server: Base URL to Swarming server.
246 auth_headers: dict with HTTP authentication headers. 284 auth_headers: dict with HTTP authentication headers.
247 params: Default JSON parameters for the POST. 285 params: Default JSON parameters for the POST.
248 exit_code: Process exit code, only when a command completed. 286 exit_code: Process exit code, only when a command completed.
249 stdout: Incremental output since last call, if any. 287 stdout: Incremental output since last call, if any.
250 output_chunk_start: Total number of stdout previously sent, for coherency 288 output_chunk_start: Total number of stdout previously sent, for coherency
251 with the server. 289 with the server.
252 290
253 Returns: 291 Returns:
254 False if the task should stop. 292 False if the task should stop.
293
294 Raises:
295 InternalError if can't contact the server after many attempts or the server
296 replies with an error.
255 """ 297 """
256 params = params.copy() 298 params = params.copy()
257 if exit_code is not None: 299 if exit_code is not None:
258 params['exit_code'] = exit_code 300 params['exit_code'] = exit_code
259 if stdout: 301 if stdout:
260 # The output_chunk_start is used by the server to make sure that the stdout 302 # The output_chunk_start is used by the server to make sure that the stdout
261 # chunks are processed and saved in the DB in order. 303 # chunks are processed and saved in the DB in order.
262 params['output'] = base64.b64encode(stdout) 304 params['output'] = base64.b64encode(stdout)
263 params['output_chunk_start'] = output_chunk_start 305 params['output_chunk_start'] = output_chunk_start
264 # TODO(maruel): Support early cancellation. 306 # TODO(maruel): Support early cancellation.
265 # https://code.google.com/p/swarming/issues/detail?id=62 307 # https://code.google.com/p/swarming/issues/detail?id=62
266 resp = net.url_read_json( 308 resp = net.url_read_json(
267 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], 309 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'],
268 data=params, 310 data=params,
269 headers=auth_headers, 311 headers=auth_headers,
270 follow_redirects=False) 312 follow_redirects=False)
271 logging.debug('post_update() = %s', resp) 313 logging.debug('post_update() = %s', resp)
272 if not resp or resp.get('error'): 314 if not resp or resp.get('error'):
273 # Abandon it. This will force a process exit. 315 # Abandon it. This will force a process exit.
274 raise ValueError(resp.get('error') if resp else 'Failed to contact server') 316 raise InternalError(
317 resp.get('error') if resp else 'Failed to contact server')
275 return not resp.get('must_stop', False) 318 return not resp.get('must_stop', False)
276 319
277 320
278 def should_post_update(stdout, now, last_packet): 321 def should_post_update(stdout, now, last_packet):
279 """Returns True if it's time to send a task_update packet via post_update(). 322 """Returns True if it's time to send a task_update packet via post_update().
280 323
281 Sends a packet when one of this condition is met: 324 Sends a packet when one of this condition is met:
282 - more than MAX_CHUNK_SIZE of stdout is buffered. 325 - more than MAX_CHUNK_SIZE of stdout is buffered.
283 - last packet was sent more than MIN_PACKET_INTERNAL seconds ago and there was 326 - last packet was sent more than MIN_PACKET_INTERNAL seconds ago and there was
284 stdout. 327 stdout.
(...skipping 24 matching lines...) Expand all
309 352
310 def kill_and_wait(proc, grace_period, reason): 353 def kill_and_wait(proc, grace_period, reason):
311 logging.warning('SIGTERM finally due to %s', reason) 354 logging.warning('SIGTERM finally due to %s', reason)
312 proc.terminate() 355 proc.terminate()
313 try: 356 try:
314 proc.wait(grace_period) 357 proc.wait(grace_period)
315 except subprocess42.TimeoutError: 358 except subprocess42.TimeoutError:
316 logging.warning('SIGKILL finally due to %s', reason) 359 logging.warning('SIGKILL finally due to %s', reason)
317 proc.kill() 360 proc.kill()
318 exit_code = proc.wait() 361 exit_code = proc.wait()
319 logging.info('Waiting for proces exit in finally - done') 362 logging.info('Waiting for process exit in finally - done')
320 return exit_code 363 return exit_code
321 364
322 365
323 def start_reading_headers(auth_params_file):
324 """Spawns a thread that rereads headers from --auth-params-file path.
325
326 Returns:
327 Tuple (callback that returns the last known headers, stop callback).
328
329 Raises:
330 file_reader.FatalReadError if headers file can't be read.
331 ValueError if it can be read, but its body is invalid.
332 """
333 # Read headers more often than bot_main writes them, to reduce maximum
334 # possible latency between headers are updated and read.
335 reader = file_reader.FileReaderThread(auth_params_file, interval_sec=30)
336
337 def read_and_validate_headers():
338 val = bot_auth.process_auth_params_json(reader.last_value or {})
339 return val.swarming_http_headers
340
341 reader.start()
342 read_and_validate_headers() # initial validation, may raise ValueError
343 return read_and_validate_headers, reader.stop
344
345
346 def run_command( 366 def run_command(
347 swarming_server, task_details, work_dir, cost_usd_hour, 367 swarming_server, task_details, work_dir, cost_usd_hour,
348 task_start, min_free_space, bot_file, auth_params_file): 368 task_start, min_free_space, bot_file, headers_cb):
349 """Runs a command and sends packets to the server to stream results back. 369 """Runs a command and sends packets to the server to stream results back.
350 370
351 Implements both I/O and hard timeouts. Sends the packets numbered, so the 371 Implements both I/O and hard timeouts. Sends the packets numbered, so the
352 server can ensure they are processed in order. 372 server can ensure they are processed in order.
353 373
354 Returns: 374 Returns:
355 Metadata about the command. 375 Metadata dict with the execution result.
376
377 Raises:
378 ExitSignal if caught some signal when starting or stopping.
379 InternalError on unexpected internal errors.
356 """ 380 """
357 # TODO(maruel): This function is incomprehensible, split and refactor. 381 # TODO(maruel): This function is incomprehensible, split and refactor.
358 382
359 # Grab initial auth headers and start rereading them in parallel thread. They
360 # MUST be there already. It's fatal internal error if they are not.
361 headers_cb = lambda: {}
362 stop_headers_reader = lambda: None
363 if auth_params_file:
364 try:
365 headers_cb, stop_headers_reader = start_reading_headers(auth_params_file)
366 except (ValueError, file_reader.FatalReadError) as e:
367 return {
368 u'exit_code': 1,
369 u'hard_timeout': False,
370 u'io_timeout': False,
371 u'must_signal_internal_failure': str(e),
372 u'version': OUT_VERSION,
373 }
374
375 # Signal the command is about to be started. 383 # Signal the command is about to be started.
376 last_packet = start = now = monotonic_time() 384 last_packet = start = now = monotonic_time()
377 params = { 385 params = {
378 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., 386 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60.,
379 'id': task_details.bot_id, 387 'id': task_details.bot_id,
380 'task_id': task_details.task_id, 388 'task_id': task_details.task_id,
381 } 389 }
382 if not post_update(swarming_server, headers_cb(), params, None, '', 0): 390 if not post_update(swarming_server, headers_cb(), params, None, '', 0):
383 # Don't even bother, the task was already canceled. 391 # Don't even bother, the task was already canceled.
384 return { 392 return {
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
459 last_io = now 467 last_io = now
460 468
461 # Post update if necessary. 469 # Post update if necessary.
462 if should_post_update(stdout, now, last_packet): 470 if should_post_update(stdout, now, last_packet):
463 last_packet = monotonic_time() 471 last_packet = monotonic_time()
464 params['cost_usd'] = ( 472 params['cost_usd'] = (
465 cost_usd_hour * (last_packet - task_start) / 60. / 60.) 473 cost_usd_hour * (last_packet - task_start) / 60. / 60.)
466 if not post_update( 474 if not post_update(
467 swarming_server, headers_cb(), params, None, stdout, 475 swarming_server, headers_cb(), params, None, stdout,
468 output_chunk_start): 476 output_chunk_start):
469 # Server is telling us to stop. Normally task cancelation. 477 # Server is telling us to stop. Normally task cancellation.
M-A Ruel 2016/08/31 15:03:47 "Rarely used outside the U.S. According to graphs
Vadim Sh. 2016/09/01 00:15:56 Oh. My spellchecker is British then or something :
470 if not kill_sent: 478 if not kill_sent:
471 logging.warning('Server induced stop; sending SIGKILL') 479 logging.warning('Server induced stop; sending SIGKILL')
472 proc.kill() 480 proc.kill()
473 kill_sent = True 481 kill_sent = True
474 482
475 output_chunk_start += len(stdout) 483 output_chunk_start += len(stdout)
476 stdout = '' 484 stdout = ''
477 485
478 # Send signal on timeout if necessary. Both are failures, not 486 # Send signal on timeout if necessary. Both are failures, not
479 # internal_failures. 487 # internal_failures.
(...skipping 14 matching lines...) Expand all
494 # states: 502 # states:
495 # - signal but process exited within grace period, 503 # - signal but process exited within grace period,
496 # (hard_|io_)_timed_out will be set but the process exit code will 504 # (hard_|io_)_timed_out will be set but the process exit code will
497 # be script provided. 505 # be script provided.
498 # - processed exited late, exit code will be -9 on posix. 506 # - processed exited late, exit code will be -9 on posix.
499 logging.warning( 507 logging.warning(
500 'Grace of %.3fs exhausted at %.3fs; sending SIGKILL', 508 'Grace of %.3fs exhausted at %.3fs; sending SIGKILL',
501 task_details.grace_period, now - timed_out) 509 task_details.grace_period, now - timed_out)
502 proc.kill() 510 proc.kill()
503 kill_sent = True 511 kill_sent = True
504 logging.info('Waiting for proces exit') 512 logging.info('Waiting for process exit')
505 exit_code = proc.wait() 513 exit_code = proc.wait()
506 except MustExit as e: 514 except (ExitSignal, InternalError, IOError, OSError) as e:
Vadim Sh. 2016/08/31 03:53:33 same here... Was there a reason IOError and OSEr
M-A Ruel 2016/08/31 15:03:47 - If the file is missing, subprocess.Popen(['missi
Vadim Sh. 2016/09/01 00:15:56 I believe this is caught sooner (on line 434) when
507 # TODO(maruel): Do the send SIGTERM to child process and give it
508 # task_details.grace_period to terminate.
509 must_signal_internal_failure = (
510 u'task_runner received signal %s' % e.signal)
511 exit_code = kill_and_wait(
512 proc, task_details.grace_period, 'signal %d' % e.signal)
513 except (IOError, OSError):
514 # Something wrong happened, try to kill the child process. 515 # Something wrong happened, try to kill the child process.
515 exit_code = kill_and_wait( 516 must_signal_internal_failure = e.message or 'unknown error'
M-A Ruel 2016/08/31 15:03:47 str(e.message) if e.message else 'unknown error'
Vadim Sh. 2016/09/01 00:15:56 Done.
516 proc, task_details.grace_period, 'exception %s' % e) 517 exit_code = kill_and_wait(proc, task_details.grace_period, e.message)
Vadim Sh. 2016/08/31 03:53:32 'e' here was undefined :)
M-A Ruel 2016/08/31 15:03:47 I tried to be defensive but sucked at it.
517 518
518 # This is the very last packet for this command. It if was an isolated task, 519 # This is the very last packet for this command. It if was an isolated task,
519 # include the output reference to the archived .isolated file. 520 # include the output reference to the archived .isolated file.
520 now = monotonic_time() 521 now = monotonic_time()
521 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. 522 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60.
522 params['duration'] = now - start 523 params['duration'] = now - start
523 params['io_timeout'] = had_io_timeout 524 params['io_timeout'] = had_io_timeout
524 had_hard_timeout = False 525 had_hard_timeout = False
525 try: 526 try:
526 if not os.path.isfile(isolated_result): 527 if not os.path.isfile(isolated_result):
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
579 if cipd_stats: 580 if cipd_stats:
580 params['cipd_stats'] = cipd_stats 581 params['cipd_stats'] = cipd_stats
581 cipd_pins = run_isolated_result.get('cipd_pins') 582 cipd_pins = run_isolated_result.get('cipd_pins')
582 if cipd_pins: 583 if cipd_pins:
583 params['cipd_pins'] = cipd_pins 584 params['cipd_pins'] = cipd_pins
584 except (IOError, OSError, ValueError) as e: 585 except (IOError, OSError, ValueError) as e:
585 logging.error('Swallowing error: %s', e) 586 logging.error('Swallowing error: %s', e)
586 if not must_signal_internal_failure: 587 if not must_signal_internal_failure:
587 must_signal_internal_failure = '%s\n%s' % ( 588 must_signal_internal_failure = '%s\n%s' % (
588 e, traceback.format_exc()[-2048:]) 589 e, traceback.format_exc()[-2048:])
590
589 # TODO(maruel): Send the internal failure here instead of sending it through 591 # TODO(maruel): Send the internal failure here instead of sending it through
590 # bot_main, this causes a race condition. 592 # bot_main, this causes a race condition.
591 if exit_code is None: 593 if exit_code is None:
592 exit_code = -1 594 exit_code = -1
593 params['hard_timeout'] = had_hard_timeout 595 params['hard_timeout'] = had_hard_timeout
594 # Ignore server reply to stop. 596
595 post_update( 597 # Ignore server reply to stop. Also ignore internal errors here if we are
596 swarming_server, headers_cb(), params, exit_code, 598 # already handling some.
597 stdout, output_chunk_start) 599 try:
600 post_update(
601 swarming_server, headers_cb(), params, exit_code,
602 stdout, output_chunk_start)
603 except InternalError as e:
604 logging.error('Internal error while finishing the task: %s', e)
605 if not must_signal_internal_failure:
606 must_signal_internal_failure = e.message or 'unknown error'
M-A Ruel 2016/08/31 15:03:47 same
Vadim Sh. 2016/09/01 00:15:56 Done.
607
598 return { 608 return {
599 u'exit_code': exit_code, 609 u'exit_code': exit_code,
600 u'hard_timeout': had_hard_timeout, 610 u'hard_timeout': had_hard_timeout,
601 u'io_timeout': had_io_timeout, 611 u'io_timeout': had_io_timeout,
602 u'must_signal_internal_failure': must_signal_internal_failure, 612 u'must_signal_internal_failure': must_signal_internal_failure,
603 u'version': OUT_VERSION, 613 u'version': OUT_VERSION,
604 } 614 }
605 finally: 615 finally:
606 file_path.try_remove(unicode(isolated_result)) 616 file_path.try_remove(unicode(isolated_result))
607 stop_headers_reader()
608 617
609 618
610 def main(args): 619 def main(args):
611 subprocess42.inhibit_os_error_reporting() 620 subprocess42.inhibit_os_error_reporting()
612 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) 621 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
613 parser.add_option('--in-file', help='Name of the request file') 622 parser.add_option('--in-file', help='Name of the request file')
614 parser.add_option( 623 parser.add_option(
615 '--out-file', help='Name of the JSON file to write a task summary to') 624 '--out-file', help='Name of the JSON file to write a task summary to')
616 parser.add_option( 625 parser.add_option(
617 '--swarming-server', help='Swarming server to send data back') 626 '--swarming-server', help='Swarming server to send data back')
(...skipping 21 matching lines...) Expand all
639 options.start = now 648 options.start = now
640 649
641 try: 650 try:
642 load_and_run( 651 load_and_run(
643 options.in_file, options.swarming_server, options.cost_usd_hour, 652 options.in_file, options.swarming_server, options.cost_usd_hour,
644 options.start, options.out_file, options.min_free_space, 653 options.start, options.out_file, options.min_free_space,
645 options.bot_file, options.auth_params_file) 654 options.bot_file, options.auth_params_file)
646 return 0 655 return 0
647 finally: 656 finally:
648 logging.info('quitting') 657 logging.info('quitting')
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698