| OLD | NEW |
| 1 # Copyright 2013 The LUCI Authors. All rights reserved. | 1 # Copyright 2013 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """Runs a Swarming task. | 5 """Runs a Swarming task. |
| 6 | 6 |
| 7 Downloads all the necessary files to run the task, executes the command and | 7 Downloads all the necessary files to run the task, executes the command and |
| 8 streams results back to the Swarming server. | 8 streams results back to the Swarming server. |
| 9 | 9 |
| 10 The process exit code is 0 when the task was executed, even if the task itself | 10 The process exit code is 0 when the task was executed, even if the task itself |
| (...skipping 13 matching lines...) Expand all Loading... |
| 24 import time | 24 import time |
| 25 import traceback | 25 import traceback |
| 26 | 26 |
| 27 from utils import file_path | 27 from utils import file_path |
| 28 from utils import net | 28 from utils import net |
| 29 from utils import on_error | 29 from utils import on_error |
| 30 from utils import subprocess42 | 30 from utils import subprocess42 |
| 31 from utils import zip_package | 31 from utils import zip_package |
| 32 | 32 |
| 33 import bot_auth | 33 import bot_auth |
| 34 import file_reader | |
| 35 | 34 |
| 36 | 35 |
| 37 # Path to this file or the zip containing this file. | 36 # Path to this file or the zip containing this file. |
| 38 THIS_FILE = os.path.abspath(zip_package.get_main_script_path()) | 37 THIS_FILE = os.path.abspath(zip_package.get_main_script_path()) |
| 39 | 38 |
| 40 | 39 |
| 41 # Sends a maximum of 100kb of stdout per task_update packet. | 40 # Sends a maximum of 100kb of stdout per task_update packet. |
| 42 MAX_CHUNK_SIZE = 102400 | 41 MAX_CHUNK_SIZE = 102400 |
| 43 | 42 |
| 44 | 43 |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 145 cmd.append('--') | 144 cmd.append('--') |
| 146 if task_details.command: | 145 if task_details.command: |
| 147 cmd.extend(task_details.command) | 146 cmd.extend(task_details.command) |
| 148 elif task_details.extra_args: | 147 elif task_details.extra_args: |
| 149 cmd.extend(task_details.extra_args) | 148 cmd.extend(task_details.extra_args) |
| 150 return cmd | 149 return cmd |
| 151 | 150 |
| 152 | 151 |
| 153 class TaskDetails(object): | 152 class TaskDetails(object): |
| 154 def __init__(self, data): | 153 def __init__(self, data): |
| 155 """Loads the raw data from a manifest file specified by --in-file.""" | |
| 156 logging.info('TaskDetails(%s)', data) | 154 logging.info('TaskDetails(%s)', data) |
| 157 if not isinstance(data, dict): | 155 if not isinstance(data, dict): |
| 158 raise ValueError('Expected dict, got %r' % data) | 156 raise InternalError('Expected dict in task_runner_in.json, got %r' % data) |
| 159 | 157 |
| 160 # Get all the data first so it fails early if the task details is invalid. | 158 # Get all the data first so it fails early if the task details is invalid. |
| 161 self.bot_id = data['bot_id'] | 159 self.bot_id = data['bot_id'] |
| 162 | 160 |
| 163 # Raw command. Only self.command or self.isolated.input can be set. | 161 # Raw command. Only self.command or self.isolated.input can be set. |
| 164 self.command = data['command'] or [] | 162 self.command = data['command'] or [] |
| 165 | 163 |
| 166 # Isolated command. Is a serialized version of task_request.FilesRef. | 164 # Isolated command. Is a serialized version of task_request.FilesRef. |
| 167 self.isolated = data['isolated'] | 165 self.isolated = data['isolated'] |
| 168 self.extra_args = data['extra_args'] | 166 self.extra_args = data['extra_args'] |
| 169 | 167 |
| 170 self.cipd_input = data.get('cipd_input') | 168 self.cipd_input = data.get('cipd_input') |
| 171 | 169 |
| 172 self.env = { | 170 self.env = { |
| 173 k.encode('utf-8'): v.encode('utf-8') for k, v in data['env'].iteritems() | 171 k.encode('utf-8'): v.encode('utf-8') for k, v in data['env'].iteritems() |
| 174 } | 172 } |
| 175 self.grace_period = data['grace_period'] | 173 self.grace_period = data['grace_period'] |
| 176 self.hard_timeout = data['hard_timeout'] | 174 self.hard_timeout = data['hard_timeout'] |
| 177 self.io_timeout = data['io_timeout'] | 175 self.io_timeout = data['io_timeout'] |
| 178 self.task_id = data['task_id'] | 176 self.task_id = data['task_id'] |
| 179 | 177 |
| 178 @staticmethod |
| 179 def load(path): |
| 180 """Loads the TaskDetails from a file on disk (specified via --in-file). |
| 180 | 181 |
| 181 class MustExit(Exception): | 182 Raises InternalError if the file can't be read or parsed. |
| 182 """Raised on signal that the process must exit immediately.""" | 183 """ |
| 184 try: |
| 185 with open(path, 'rb') as f: |
| 186 return TaskDetails(json.load(f)) |
| 187 except (IOError, ValueError) as e: |
| 188 raise InternalError('Cannot load task_runner_in.json: %s' % e) |
| 189 |
| 190 |
| 191 class ExitSignal(Exception): |
| 192 """Raised on a signal that the process must exit immediately.""" |
| 183 def __init__(self, sig): | 193 def __init__(self, sig): |
| 184 super(MustExit, self).__init__() | 194 super(ExitSignal, self).__init__(u'task_runner received signal %s' % sig) |
| 185 self.signal = sig | 195 self.signal = sig |
| 186 | 196 |
| 187 | 197 |
| 198 class InternalError(Exception): |
| 199 """Raised on unrecoverable errors that abort task with 'internal error'.""" |
| 200 |
| 201 |
| 188 def load_and_run( | 202 def load_and_run( |
| 189 in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space, | 203 in_file, swarming_server, cost_usd_hour, start, out_file, min_free_space, |
| 190 bot_file, auth_params_file): | 204 bot_file, auth_params_file): |
| 191 """Loads the task's metadata and execute it. | 205 """Loads the task's metadata, prepares auth environment and executes the task. |
| 192 | 206 |
| 193 This may throw all sorts of exceptions in case of failure. It's up to the | 207 This may throw all sorts of exceptions in case of failure. It's up to the |
| 194 caller to trap them. These shall be considered 'internal_failure' instead of | 208 caller to trap them. These shall be considered 'internal_failure' instead of |
| 195 'failure' from a TaskRunResult standpoint. | 209 'failure' from a TaskRunResult standpoint. |
| 196 """ | 210 """ |
| 197 # The work directory is guaranteed to exist since it was created by | 211 auth_system = None |
| 198 # bot_main.py and contains the manifest. Temporary files will be downloaded | |
| 199 # there. It's bot_main.py that will delete the directory afterward. Tests are | |
| 200 # not run from there. | |
| 201 task_result = None | 212 task_result = None |
| 213 work_dir = os.path.dirname(out_file) |
| 214 |
| 202 def handler(sig, _): | 215 def handler(sig, _): |
| 203 logging.info('Got signal %s', sig) | 216 logging.info('Got signal %s', sig) |
| 204 raise MustExit(sig) | 217 raise ExitSignal(sig) |
| 205 work_dir = os.path.dirname(out_file) | 218 |
| 206 try: | 219 try: |
| 207 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): | 220 with subprocess42.set_signal_handler([SIG_BREAK_OR_TERM], handler): |
| 221 # The work directory is guaranteed to exist since it was created by |
| 222 # bot_main.py and contains the manifest. Temporary files will be |
| 223 # downloaded there. It's bot_main.py that will delete the directory |
| 224 # afterward. Tests are not run from there. |
| 208 if not os.path.isdir(work_dir): | 225 if not os.path.isdir(work_dir): |
| 209 raise ValueError('%s expected to exist' % work_dir) | 226 raise InternalError('%s expected to exist' % work_dir) |
| 210 | 227 |
| 211 with open(in_file, 'rb') as f: | 228 # Raises InternalError on errors. |
| 212 task_details = TaskDetails(json.load(f)) | 229 task_details = TaskDetails.load(in_file) |
| 213 | 230 |
| 231 # This will start a thread that occasionally reads bot authentication |
| 232 # headers from 'auth_params_file'. |
| 233 if auth_params_file: |
| 234 try: |
| 235 auth_system = bot_auth.AuthSystem() |
| 236 auth_system.start(auth_params_file) |
| 237 except bot_auth.AuthSystemError as e: |
| 238 raise InternalError('Failed to init auth: %s' % e) |
| 239 |
| 240 # Returns bot authentication headers dict or raises InternalError. |
| 241 def headers_cb(): |
| 242 try: |
| 243 return auth_system.bot_headers if auth_system else {} |
| 244 except bot_auth.AuthSystemError as e: |
| 245 raise InternalError('Failed to grab bot auth headers: %s' % e) |
| 246 |
| 247 # Auth environment is up, start the command. task_result is dumped to |
| 248 # disk in 'finally' block. |
| 214 task_result = run_command( | 249 task_result = run_command( |
| 215 swarming_server, task_details, work_dir, | 250 swarming_server, task_details, work_dir, |
| 216 cost_usd_hour, start, min_free_space, bot_file, auth_params_file) | 251 cost_usd_hour, start, min_free_space, bot_file, headers_cb) |
| 217 except MustExit as e: | 252 |
| 253 except (ExitSignal, InternalError) as e: |
| 218 # This normally means run_command() didn't get the chance to run, as it | 254 # This normally means run_command() didn't get the chance to run, as it |
| 219 # itself trap MustExit and will report accordingly. In this case, we want | 255 # itself traps exceptions and will report accordingly. In this case, we want |
| 220 # the parent process to send the message instead. | 256 # the parent process to send the message instead. |
| 221 if not task_result: | 257 if not task_result: |
| 222 task_result = { | 258 task_result = { |
| 223 u'exit_code': None, | 259 u'exit_code': -1, |
| 224 u'hard_timeout': False, | 260 u'hard_timeout': False, |
| 225 u'io_timeout': False, | 261 u'io_timeout': False, |
| 226 u'must_signal_internal_failure': | 262 u'must_signal_internal_failure': str(e.message or 'unknown error'), |
| 227 u'task_runner received signal %s' % e.signal, | |
| 228 u'version': OUT_VERSION, | 263 u'version': OUT_VERSION, |
| 229 } | 264 } |
| 265 |
| 230 finally: | 266 finally: |
| 231 # We've found tests to delete the working directory work_dir when quitting, | 267 # We've found tests to delete the working directory work_dir when quitting, |
| 232 # causing an exception here. Try to recreate the directory if necessary. | 268 # causing an exception here. Try to recreate the directory if necessary. |
| 233 if not os.path.isdir(work_dir): | 269 if not os.path.isdir(work_dir): |
| 234 os.mkdir(work_dir) | 270 os.mkdir(work_dir) |
| 271 if auth_system: |
| 272 auth_system.stop() |
| 235 with open(out_file, 'wb') as f: | 273 with open(out_file, 'wb') as f: |
| 236 json.dump(task_result, f) | 274 json.dump(task_result, f) |
| 237 | 275 |
| 238 | 276 |
| 239 def post_update( | 277 def post_update( |
| 240 swarming_server, auth_headers, params, exit_code, | 278 swarming_server, auth_headers, params, exit_code, |
| 241 stdout, output_chunk_start): | 279 stdout, output_chunk_start): |
| 242 """Posts task update to task_update. | 280 """Posts task update to task_update. |
| 243 | 281 |
| 244 Arguments: | 282 Arguments: |
| 245 swarming_server: Base URL to Swarming server. | 283 swarming_server: Base URL to Swarming server. |
| 246 auth_headers: dict with HTTP authentication headers. | 284 auth_headers: dict with HTTP authentication headers. |
| 247 params: Default JSON parameters for the POST. | 285 params: Default JSON parameters for the POST. |
| 248 exit_code: Process exit code, only when a command completed. | 286 exit_code: Process exit code, only when a command completed. |
| 249 stdout: Incremental output since last call, if any. | 287 stdout: Incremental output since last call, if any. |
| 250 output_chunk_start: Total number of stdout previously sent, for coherency | 288 output_chunk_start: Total number of stdout previously sent, for coherency |
| 251 with the server. | 289 with the server. |
| 252 | 290 |
| 253 Returns: | 291 Returns: |
| 254 False if the task should stop. | 292 False if the task should stop. |
| 293 |
| 294 Raises: |
| 295 InternalError if can't contact the server after many attempts or the server |
| 296 replies with an error. |
| 255 """ | 297 """ |
| 256 params = params.copy() | 298 params = params.copy() |
| 257 if exit_code is not None: | 299 if exit_code is not None: |
| 258 params['exit_code'] = exit_code | 300 params['exit_code'] = exit_code |
| 259 if stdout: | 301 if stdout: |
| 260 # The output_chunk_start is used by the server to make sure that the stdout | 302 # The output_chunk_start is used by the server to make sure that the stdout |
| 261 # chunks are processed and saved in the DB in order. | 303 # chunks are processed and saved in the DB in order. |
| 262 params['output'] = base64.b64encode(stdout) | 304 params['output'] = base64.b64encode(stdout) |
| 263 params['output_chunk_start'] = output_chunk_start | 305 params['output_chunk_start'] = output_chunk_start |
| 264 # TODO(maruel): Support early cancellation. | 306 # TODO(maruel): Support early cancellation. |
| 265 # https://code.google.com/p/swarming/issues/detail?id=62 | 307 # https://code.google.com/p/swarming/issues/detail?id=62 |
| 266 resp = net.url_read_json( | 308 resp = net.url_read_json( |
| 267 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], | 309 swarming_server+'/swarming/api/v1/bot/task_update/%s' % params['task_id'], |
| 268 data=params, | 310 data=params, |
| 269 headers=auth_headers, | 311 headers=auth_headers, |
| 270 follow_redirects=False) | 312 follow_redirects=False) |
| 271 logging.debug('post_update() = %s', resp) | 313 logging.debug('post_update() = %s', resp) |
| 272 if not resp or resp.get('error'): | 314 if not resp or resp.get('error'): |
| 273 # Abandon it. This will force a process exit. | 315 # Abandon it. This will force a process exit. |
| 274 raise ValueError(resp.get('error') if resp else 'Failed to contact server') | 316 raise InternalError( |
| 317 resp.get('error') if resp else 'Failed to contact server') |
| 275 return not resp.get('must_stop', False) | 318 return not resp.get('must_stop', False) |
| 276 | 319 |
| 277 | 320 |
| 278 def should_post_update(stdout, now, last_packet): | 321 def should_post_update(stdout, now, last_packet): |
| 279 """Returns True if it's time to send a task_update packet via post_update(). | 322 """Returns True if it's time to send a task_update packet via post_update(). |
| 280 | 323 |
| 281 Sends a packet when one of this condition is met: | 324 Sends a packet when one of this condition is met: |
| 282 - more than MAX_CHUNK_SIZE of stdout is buffered. | 325 - more than MAX_CHUNK_SIZE of stdout is buffered. |
| 283 - last packet was sent more than MIN_PACKET_INTERNAL seconds ago and there was | 326 - last packet was sent more than MIN_PACKET_INTERNAL seconds ago and there was |
| 284 stdout. | 327 stdout. |
| (...skipping 24 matching lines...) Expand all Loading... |
| 309 | 352 |
| 310 def kill_and_wait(proc, grace_period, reason): | 353 def kill_and_wait(proc, grace_period, reason): |
| 311 logging.warning('SIGTERM finally due to %s', reason) | 354 logging.warning('SIGTERM finally due to %s', reason) |
| 312 proc.terminate() | 355 proc.terminate() |
| 313 try: | 356 try: |
| 314 proc.wait(grace_period) | 357 proc.wait(grace_period) |
| 315 except subprocess42.TimeoutError: | 358 except subprocess42.TimeoutError: |
| 316 logging.warning('SIGKILL finally due to %s', reason) | 359 logging.warning('SIGKILL finally due to %s', reason) |
| 317 proc.kill() | 360 proc.kill() |
| 318 exit_code = proc.wait() | 361 exit_code = proc.wait() |
| 319 logging.info('Waiting for proces exit in finally - done') | 362 logging.info('Waiting for process exit in finally - done') |
| 320 return exit_code | 363 return exit_code |
| 321 | 364 |
| 322 | 365 |
| 323 def start_reading_headers(auth_params_file): | |
| 324 """Spawns a thread that rereads headers from --auth-params-file path. | |
| 325 | |
| 326 Returns: | |
| 327 Tuple (callback that returns the last known headers, stop callback). | |
| 328 | |
| 329 Raises: | |
| 330 file_reader.FatalReadError if headers file can't be read. | |
| 331 ValueError if it can be read, but its body is invalid. | |
| 332 """ | |
| 333 # Read headers more often than bot_main writes them, to reduce maximum | |
| 334 # possible latency between headers are updated and read. | |
| 335 reader = file_reader.FileReaderThread(auth_params_file, interval_sec=30) | |
| 336 | |
| 337 def read_and_validate_headers(): | |
| 338 val = bot_auth.process_auth_params_json(reader.last_value or {}) | |
| 339 return val.swarming_http_headers | |
| 340 | |
| 341 reader.start() | |
| 342 read_and_validate_headers() # initial validation, may raise ValueError | |
| 343 return read_and_validate_headers, reader.stop | |
| 344 | |
| 345 | |
| 346 def run_command( | 366 def run_command( |
| 347 swarming_server, task_details, work_dir, cost_usd_hour, | 367 swarming_server, task_details, work_dir, cost_usd_hour, |
| 348 task_start, min_free_space, bot_file, auth_params_file): | 368 task_start, min_free_space, bot_file, headers_cb): |
| 349 """Runs a command and sends packets to the server to stream results back. | 369 """Runs a command and sends packets to the server to stream results back. |
| 350 | 370 |
| 351 Implements both I/O and hard timeouts. Sends the packets numbered, so the | 371 Implements both I/O and hard timeouts. Sends the packets numbered, so the |
| 352 server can ensure they are processed in order. | 372 server can ensure they are processed in order. |
| 353 | 373 |
| 354 Returns: | 374 Returns: |
| 355 Metadata about the command. | 375 Metadata dict with the execution result. |
| 376 |
| 377 Raises: |
| 378 ExitSignal if caught some signal when starting or stopping. |
| 379 InternalError on unexpected internal errors. |
| 356 """ | 380 """ |
| 357 # TODO(maruel): This function is incomprehensible, split and refactor. | 381 # TODO(maruel): This function is incomprehensible, split and refactor. |
| 358 | 382 |
| 359 # Grab initial auth headers and start rereading them in parallel thread. They | |
| 360 # MUST be there already. It's fatal internal error if they are not. | |
| 361 headers_cb = lambda: {} | |
| 362 stop_headers_reader = lambda: None | |
| 363 if auth_params_file: | |
| 364 try: | |
| 365 headers_cb, stop_headers_reader = start_reading_headers(auth_params_file) | |
| 366 except (ValueError, file_reader.FatalReadError) as e: | |
| 367 return { | |
| 368 u'exit_code': 1, | |
| 369 u'hard_timeout': False, | |
| 370 u'io_timeout': False, | |
| 371 u'must_signal_internal_failure': str(e), | |
| 372 u'version': OUT_VERSION, | |
| 373 } | |
| 374 | |
| 375 # Signal the command is about to be started. | 383 # Signal the command is about to be started. |
| 376 last_packet = start = now = monotonic_time() | 384 last_packet = start = now = monotonic_time() |
| 377 params = { | 385 params = { |
| 378 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., | 386 'cost_usd': cost_usd_hour * (now - task_start) / 60. / 60., |
| 379 'id': task_details.bot_id, | 387 'id': task_details.bot_id, |
| 380 'task_id': task_details.task_id, | 388 'task_id': task_details.task_id, |
| 381 } | 389 } |
| 382 if not post_update(swarming_server, headers_cb(), params, None, '', 0): | 390 if not post_update(swarming_server, headers_cb(), params, None, '', 0): |
| 383 # Don't even bother, the task was already canceled. | 391 # Don't even bother, the task was already canceled. |
| 384 return { | 392 return { |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 459 last_io = now | 467 last_io = now |
| 460 | 468 |
| 461 # Post update if necessary. | 469 # Post update if necessary. |
| 462 if should_post_update(stdout, now, last_packet): | 470 if should_post_update(stdout, now, last_packet): |
| 463 last_packet = monotonic_time() | 471 last_packet = monotonic_time() |
| 464 params['cost_usd'] = ( | 472 params['cost_usd'] = ( |
| 465 cost_usd_hour * (last_packet - task_start) / 60. / 60.) | 473 cost_usd_hour * (last_packet - task_start) / 60. / 60.) |
| 466 if not post_update( | 474 if not post_update( |
| 467 swarming_server, headers_cb(), params, None, stdout, | 475 swarming_server, headers_cb(), params, None, stdout, |
| 468 output_chunk_start): | 476 output_chunk_start): |
| 469 # Server is telling us to stop. Normally task cancelation. | 477 # Server is telling us to stop. Normally task cancellation. |
| 470 if not kill_sent: | 478 if not kill_sent: |
| 471 logging.warning('Server induced stop; sending SIGKILL') | 479 logging.warning('Server induced stop; sending SIGKILL') |
| 472 proc.kill() | 480 proc.kill() |
| 473 kill_sent = True | 481 kill_sent = True |
| 474 | 482 |
| 475 output_chunk_start += len(stdout) | 483 output_chunk_start += len(stdout) |
| 476 stdout = '' | 484 stdout = '' |
| 477 | 485 |
| 478 # Send signal on timeout if necessary. Both are failures, not | 486 # Send signal on timeout if necessary. Both are failures, not |
| 479 # internal_failures. | 487 # internal_failures. |
| (...skipping 14 matching lines...) Expand all Loading... |
| 494 # states: | 502 # states: |
| 495 # - signal but process exited within grace period, | 503 # - signal but process exited within grace period, |
| 496 # (hard_|io_)_timed_out will be set but the process exit code will | 504 # (hard_|io_)_timed_out will be set but the process exit code will |
| 497 # be script provided. | 505 # be script provided. |
| 498 # - processed exited late, exit code will be -9 on posix. | 506 # - processed exited late, exit code will be -9 on posix. |
| 499 logging.warning( | 507 logging.warning( |
| 500 'Grace of %.3fs exhausted at %.3fs; sending SIGKILL', | 508 'Grace of %.3fs exhausted at %.3fs; sending SIGKILL', |
| 501 task_details.grace_period, now - timed_out) | 509 task_details.grace_period, now - timed_out) |
| 502 proc.kill() | 510 proc.kill() |
| 503 kill_sent = True | 511 kill_sent = True |
| 504 logging.info('Waiting for proces exit') | 512 logging.info('Waiting for process exit') |
| 505 exit_code = proc.wait() | 513 exit_code = proc.wait() |
| 506 except MustExit as e: | 514 except (ExitSignal, InternalError, IOError, OSError) as e: |
| 507 # TODO(maruel): Do the send SIGTERM to child process and give it | |
| 508 # task_details.grace_period to terminate. | |
| 509 must_signal_internal_failure = ( | |
| 510 u'task_runner received signal %s' % e.signal) | |
| 511 exit_code = kill_and_wait( | |
| 512 proc, task_details.grace_period, 'signal %d' % e.signal) | |
| 513 except (IOError, OSError): | |
| 514 # Something wrong happened, try to kill the child process. | 515 # Something wrong happened, try to kill the child process. |
| 515 exit_code = kill_and_wait( | 516 must_signal_internal_failure = str(e.message or 'unknown error') |
| 516 proc, task_details.grace_period, 'exception %s' % e) | 517 exit_code = kill_and_wait(proc, task_details.grace_period, e.message) |
| 517 | 518 |
| 518 # This is the very last packet for this command. It if was an isolated task, | 519 # This is the very last packet for this command. It if was an isolated task, |
| 519 # include the output reference to the archived .isolated file. | 520 # include the output reference to the archived .isolated file. |
| 520 now = monotonic_time() | 521 now = monotonic_time() |
| 521 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. | 522 params['cost_usd'] = cost_usd_hour * (now - task_start) / 60. / 60. |
| 522 params['duration'] = now - start | 523 params['duration'] = now - start |
| 523 params['io_timeout'] = had_io_timeout | 524 params['io_timeout'] = had_io_timeout |
| 524 had_hard_timeout = False | 525 had_hard_timeout = False |
| 525 try: | 526 try: |
| 526 if not os.path.isfile(isolated_result): | 527 if not os.path.isfile(isolated_result): |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 579 if cipd_stats: | 580 if cipd_stats: |
| 580 params['cipd_stats'] = cipd_stats | 581 params['cipd_stats'] = cipd_stats |
| 581 cipd_pins = run_isolated_result.get('cipd_pins') | 582 cipd_pins = run_isolated_result.get('cipd_pins') |
| 582 if cipd_pins: | 583 if cipd_pins: |
| 583 params['cipd_pins'] = cipd_pins | 584 params['cipd_pins'] = cipd_pins |
| 584 except (IOError, OSError, ValueError) as e: | 585 except (IOError, OSError, ValueError) as e: |
| 585 logging.error('Swallowing error: %s', e) | 586 logging.error('Swallowing error: %s', e) |
| 586 if not must_signal_internal_failure: | 587 if not must_signal_internal_failure: |
| 587 must_signal_internal_failure = '%s\n%s' % ( | 588 must_signal_internal_failure = '%s\n%s' % ( |
| 588 e, traceback.format_exc()[-2048:]) | 589 e, traceback.format_exc()[-2048:]) |
| 590 |
| 589 # TODO(maruel): Send the internal failure here instead of sending it through | 591 # TODO(maruel): Send the internal failure here instead of sending it through |
| 590 # bot_main, this causes a race condition. | 592 # bot_main, this causes a race condition. |
| 591 if exit_code is None: | 593 if exit_code is None: |
| 592 exit_code = -1 | 594 exit_code = -1 |
| 593 params['hard_timeout'] = had_hard_timeout | 595 params['hard_timeout'] = had_hard_timeout |
| 594 # Ignore server reply to stop. | 596 |
| 595 post_update( | 597 # Ignore server reply to stop. Also ignore internal errors here if we are |
| 596 swarming_server, headers_cb(), params, exit_code, | 598 # already handling some. |
| 597 stdout, output_chunk_start) | 599 try: |
| 600 post_update( |
| 601 swarming_server, headers_cb(), params, exit_code, |
| 602 stdout, output_chunk_start) |
| 603 except InternalError as e: |
| 604 logging.error('Internal error while finishing the task: %s', e) |
| 605 if not must_signal_internal_failure: |
| 606 must_signal_internal_failure = str(e.message or 'unknown error') |
| 607 |
| 598 return { | 608 return { |
| 599 u'exit_code': exit_code, | 609 u'exit_code': exit_code, |
| 600 u'hard_timeout': had_hard_timeout, | 610 u'hard_timeout': had_hard_timeout, |
| 601 u'io_timeout': had_io_timeout, | 611 u'io_timeout': had_io_timeout, |
| 602 u'must_signal_internal_failure': must_signal_internal_failure, | 612 u'must_signal_internal_failure': must_signal_internal_failure, |
| 603 u'version': OUT_VERSION, | 613 u'version': OUT_VERSION, |
| 604 } | 614 } |
| 605 finally: | 615 finally: |
| 606 file_path.try_remove(unicode(isolated_result)) | 616 file_path.try_remove(unicode(isolated_result)) |
| 607 stop_headers_reader() | |
| 608 | 617 |
| 609 | 618 |
| 610 def main(args): | 619 def main(args): |
| 611 subprocess42.inhibit_os_error_reporting() | 620 subprocess42.inhibit_os_error_reporting() |
| 612 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) | 621 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) |
| 613 parser.add_option('--in-file', help='Name of the request file') | 622 parser.add_option('--in-file', help='Name of the request file') |
| 614 parser.add_option( | 623 parser.add_option( |
| 615 '--out-file', help='Name of the JSON file to write a task summary to') | 624 '--out-file', help='Name of the JSON file to write a task summary to') |
| 616 parser.add_option( | 625 parser.add_option( |
| 617 '--swarming-server', help='Swarming server to send data back') | 626 '--swarming-server', help='Swarming server to send data back') |
| (...skipping 21 matching lines...) Expand all Loading... |
| 639 options.start = now | 648 options.start = now |
| 640 | 649 |
| 641 try: | 650 try: |
| 642 load_and_run( | 651 load_and_run( |
| 643 options.in_file, options.swarming_server, options.cost_usd_hour, | 652 options.in_file, options.swarming_server, options.cost_usd_hour, |
| 644 options.start, options.out_file, options.min_free_space, | 653 options.start, options.out_file, options.min_free_space, |
| 645 options.bot_file, options.auth_params_file) | 654 options.bot_file, options.auth_params_file) |
| 646 return 0 | 655 return 0 |
| 647 finally: | 656 finally: |
| 648 logging.info('quitting') | 657 logging.info('quitting') |
| OLD | NEW |