Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import logging | 5 import logging |
| 6 import datetime | |
| 6 import math | 7 import math |
| 7 import os | 8 import os |
| 8 import sys | 9 import sys |
| 9 import time | 10 import time |
| 10 | 11 |
| 11 import cloudstorage | 12 import cloudstorage |
| 12 import flask | 13 import flask |
| 13 from google.appengine.api import (app_identity, taskqueue) | 14 from google.appengine.api import (app_identity, taskqueue) |
| 14 from google.appengine.ext import deferred | 15 from google.appengine.ext import deferred |
| 15 from oauth2client.client import GoogleCredentials | 16 from oauth2client.client import GoogleCredentials |
| 16 | 17 |
| 17 import common.clovis_paths | 18 import common.clovis_paths |
| 18 from common.clovis_task import ClovisTask | 19 from common.clovis_task import ClovisTask |
| 19 import common.google_bigquery_helper | 20 import common.google_bigquery_helper |
| 20 import common.google_instance_helper | 21 import common.google_instance_helper |
| 21 from common.loading_trace_database import LoadingTraceDatabase | 22 from common.loading_trace_database import LoadingTraceDatabase |
| 22 import email_helper | 23 import email_helper |
| 24 from frontend_job import FrontendJob | |
| 23 from memory_logs import MemoryLogs | 25 from memory_logs import MemoryLogs |
| 24 | 26 |
| 25 | 27 |
| 26 # Global variables. | 28 # Global variables. |
| 27 logging.Formatter.converter = time.gmtime | 29 logging.Formatter.converter = time.gmtime |
| 28 clovis_logger = logging.getLogger('clovis_frontend') | 30 clovis_logger = logging.getLogger('clovis_frontend') |
| 29 clovis_logger.setLevel(logging.DEBUG) | 31 clovis_logger.setLevel(logging.DEBUG) |
| 30 project_name = app_identity.get_application_id() | 32 project_name = app_identity.get_application_id() |
| 31 instance_helper = common.google_instance_helper.GoogleInstanceHelper( | 33 instance_helper = common.google_instance_helper.GoogleInstanceHelper( |
| 32 credentials=GoogleCredentials.get_application_default(), | 34 credentials=GoogleCredentials.get_application_default(), |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 92 email_address (str): Email address of the user to be notified. | 94 email_address (str): Email address of the user to be notified. |
| 93 status (str): Status of the task, indicating the success or the cause of | 95 status (str): Status of the task, indicating the success or the cause of |
| 94 failure. | 96 failure. |
| 95 task_url (str): URL where the results of the task can be found. | 97 task_url (str): URL where the results of the task can be found. |
| 96 """ | 98 """ |
| 97 email_helper.SendEmailTaskComplete( | 99 email_helper.SendEmailTaskComplete( |
| 98 to_address=email_address, tag=tag, status=status, task_url=task_url, | 100 to_address=email_address, tag=tag, status=status, task_url=task_url, |
| 99 logger=clovis_logger) | 101 logger=clovis_logger) |
| 100 clovis_logger.info('Scheduling instance group destruction for tag: ' + tag) | 102 clovis_logger.info('Scheduling instance group destruction for tag: ' + tag) |
| 101 deferred.defer(DeleteInstanceGroup, tag) | 103 deferred.defer(DeleteInstanceGroup, tag) |
| 104 FrontendJob.DeleteForTag(tag) | |
| 102 | 105 |
| 103 | 106 |
| 104 def GetEstimatedTaskDurationInSeconds(task): | 107 def GetEstimatedTaskDurationInSeconds(task): |
| 105 """Returns an estimation of the time required to run the task. | 108 """Returns an estimation of the time required to run the task. |
| 106 | 109 |
| 107 Args: | 110 Args: |
| 108 task: (ClovisTask) The task. | 111 task: (ClovisTask) The task. |
| 109 | 112 |
| 110 Returns: | 113 Returns: |
| 111 float: Time estimation in seconds, or -1 in case of failure. | 114 float: Time estimation in seconds, or -1 in case of failure. |
| (...skipping 205 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 317 | 320 |
| 318 # Load the task from JSON. | 321 # Load the task from JSON. |
| 319 task = ClovisTask.FromJsonString(http_body_str) | 322 task = ClovisTask.FromJsonString(http_body_str) |
| 320 if not task: | 323 if not task: |
| 321 clovis_logger.error('Invalid JSON task.') | 324 clovis_logger.error('Invalid JSON task.') |
| 322 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) | 325 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
| 323 | 326 |
| 324 task_tag = task.BackendParams()['tag'] | 327 task_tag = task.BackendParams()['tag'] |
| 325 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), | 328 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), |
| 326 task_tag)) | 329 task_tag)) |
| 330 user_email = email_helper.GetUserEmail() | |
| 331 | |
| 332 # Write the job to the datastore. | |
| 333 frontend_job = FrontendJob.CreateForTag(task_tag) | |
| 334 frontend_job.email = user_email | |
| 335 frontend_job.status = 'not_started' | |
| 336 frontend_job.clovis_task = task.ToJsonString() | |
| 337 frontend_job.put() | |
| 338 | |
| 339 # Process the job on the queue, to avoid timeout issues. | |
| 340 deferred.defer(SpawnTasksOnBackgroundQueue, task_tag) | |
| 341 | |
| 342 return Render( | |
| 343 flask.Markup( | |
| 344 '<a href="%s">See progress.</a>' % FrontendJob.GetJobURL(task_tag)), | |
| 345 memory_logs) | |
| 346 | |
| 347 | |
| 348 def SpawnTasksOnBackgroundQueue(task_tag): | |
| 349 """Spawns Clovis tasks associated with task_tag from the backgound queue. | |
| 350 | |
| 351 This function is mostly a wrapper around SpawnTasks() that catches exceptions. | |
| 352 It is assumed that a FrontendJob for task_tag exists. | |
| 353 """ | |
| 354 memory_logs = MemoryLogs(clovis_logger) | |
| 355 memory_logs.Start() | |
| 356 clovis_logger.info('Spawning tasks on background queue.') | |
| 357 | |
| 358 try: | |
| 359 frontend_job = FrontendJob.GetFromTag(task_tag) | |
| 360 frontend_job.status = 'will_start' | |
| 361 SpawnTasks(frontend_job) | |
| 362 except Exception: | |
| 363 pass # Catch all exceptions so that AppEngine does not retry indefinitely. | |
|
mattcary
2016/06/09 15:40:06
Log the exception so if tasks disappear we have a
droger
2016/06/10 12:14:04
Done.
| |
| 364 | |
| 365 # Update the task. | |
| 366 if frontend_job: | |
| 367 frontend_job.log = memory_logs.Flush() | |
| 368 frontend_job.put() | |
| 369 | |
| 370 | |
| 371 def SpawnTasks(frontend_job): | |
| 372 """ Spawns Clovis tasks associated with the frontend job.""" | |
| 373 user_email = frontend_job.email | |
| 374 task = ClovisTask.FromJsonString(frontend_job.clovis_task) | |
| 375 task_tag = task.BackendParams()['tag'] | |
| 376 | |
| 327 # Compute the task directory. | 377 # Compute the task directory. |
| 378 frontend_job.status = 'building_task_dir' | |
| 328 task_dir_components = [] | 379 task_dir_components = [] |
| 329 user_email = email_helper.GetUserEmail() | |
| 330 user_name = None | 380 user_name = None |
| 331 if user_email: | 381 if user_email: |
| 332 user_name = user_email[:user_email.find('@')] | 382 user_name = user_email[:user_email.find('@')] |
| 333 if user_name: | 383 if user_name: |
| 334 task_dir_components.append(user_name) | 384 task_dir_components.append(user_name) |
| 335 task_name = task.BackendParams().get('task_name') | 385 task_name = task.BackendParams().get('task_name') |
| 336 if task_name: | 386 if task_name: |
| 337 task_dir_components.append(task_name) | 387 task_dir_components.append(task_name) |
| 338 task_dir_components.append(task_tag) | 388 task_dir_components.append(task_tag) |
| 339 task_dir = os.path.join(task.Action(), '_'.join(task_dir_components)) | 389 task_dir = os.path.join(task.Action(), '_'.join(task_dir_components)) |
| 340 | 390 |
| 341 # Build the URL where the result will live. | 391 # Build the URL where the result will live. |
| 392 frontend_job.status = 'building_task_url' | |
|
mattcary
2016/06/09 15:40:06
Do you need to put these status update values, or
droger
2016/06/10 12:14:04
You need to put.
This is only done at the end (li
mattcary
2016/06/10 12:17:27
Shouldn't we put after each return, so we have the
droger
2016/06/10 12:58:27
put() will be executed after each return, by the c
mattcary
2016/06/10 13:24:20
Ah! I was just looking in SpawnTasks. Thanks.
| |
| 342 task_url = GetTaskURL(task, task_dir) | 393 task_url = GetTaskURL(task, task_dir) |
| 343 if task_url: | 394 if task_url: |
| 344 clovis_logger.info('Task result URL: ' + task_url) | 395 clovis_logger.info('Task result URL: ' + task_url) |
| 396 frontend_job.task_url = task_url | |
| 345 else: | 397 else: |
| 346 return Render('Could not build the task URL.', memory_logs) | 398 frontend_job.status = 'task_url_error' |
| 399 return | |
| 347 | 400 |
| 348 # Split the task in smaller tasks. | 401 # Split the task in smaller tasks. |
| 402 frontend_job.status = 'splitting_task' | |
| 349 sub_tasks = SplitClovisTask(task) | 403 sub_tasks = SplitClovisTask(task) |
| 350 if not sub_tasks: | 404 if not sub_tasks: |
| 351 return Render('Task split failed.', memory_logs) | 405 frontend_job.status = 'task_split_error' |
| 406 return | |
| 352 | 407 |
| 353 # Compute estimates for the work duration, in order to compute the instance | 408 # Compute estimates for the work duration, in order to compute the instance |
| 354 # count and the timeout. | 409 # count and the timeout. |
| 410 frontend_job.status = 'estimating_duration' | |
| 355 sequential_duration_s = \ | 411 sequential_duration_s = \ |
| 356 GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks) | 412 GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks) |
| 357 if sequential_duration_s <= 0: | 413 if sequential_duration_s <= 0: |
| 358 return Render('Time estimation failed.', memory_logs) | 414 frontend_job.status = 'time_estimation_error' |
| 415 return | |
| 359 | 416 |
| 360 # Compute the number of required instances if not specified. | 417 # Compute the number of required instances if not specified. |
| 361 if not task.BackendParams().get('instance_count'): | 418 if task.BackendParams().get('instance_count') is None: |
| 419 frontend_job.status = 'estimating_instance_count' | |
| 362 target_parallel_duration_s = 1800.0 # 30 minutes. | 420 target_parallel_duration_s = 1800.0 # 30 minutes. |
| 363 task.BackendParams()['instance_count'] = math.ceil( | 421 task.BackendParams()['instance_count'] = math.ceil( |
| 364 sequential_duration_s / target_parallel_duration_s) | 422 sequential_duration_s / target_parallel_duration_s) |
| 365 | 423 |
| 366 # Check the instance quotas. | 424 # Check the instance quotas. |
| 367 clovis_logger.info( | 425 clovis_logger.info( |
| 368 'Requesting %i instances.' % task.BackendParams()['instance_count']) | 426 'Requesting %i instances.' % task.BackendParams()['instance_count']) |
| 427 frontend_job.status = 'checking_instance_quotas' | |
| 369 max_instances = instance_helper.GetAvailableInstanceCount() | 428 max_instances = instance_helper.GetAvailableInstanceCount() |
| 370 if max_instances == -1: | 429 if max_instances == -1: |
| 371 return Render('Failed to count the available instances.', memory_logs) | 430 frontend_job.status = 'instance_count_error' |
| 431 return | |
| 372 elif task.BackendParams()['instance_count'] == 0: | 432 elif task.BackendParams()['instance_count'] == 0: |
| 373 return Render('Cannot create instances, quota exceeded.', memory_logs) | 433 frontend_job.status = 'no_instance_available_error' |
| 434 return | |
| 374 elif max_instances < task.BackendParams()['instance_count']: | 435 elif max_instances < task.BackendParams()['instance_count']: |
| 375 clovis_logger.warning( | 436 clovis_logger.warning( |
| 376 'Instance count limited by quota: %i available / %i requested.' % ( | 437 'Instance count limited by quota: %i available / %i requested.' % ( |
| 377 max_instances, task.BackendParams()['instance_count'])) | 438 max_instances, task.BackendParams()['instance_count'])) |
| 378 task.BackendParams()['instance_count'] = max_instances | 439 task.BackendParams()['instance_count'] = max_instances |
| 379 | 440 |
| 380 # Compute the timeout if there is none specified. | 441 # Compute the timeout if there is none specified. |
| 381 expected_duration_h = sequential_duration_s / ( | 442 expected_duration_s = sequential_duration_s / ( |
| 382 task.BackendParams()['instance_count'] * 3600.0) | 443 task.BackendParams()['instance_count']) |
| 444 frontend_job.eta = datetime.datetime.now() + datetime.timedelta( | |
| 445 seconds=expected_duration_s) | |
| 383 if not task.BackendParams().get('timeout_hours'): | 446 if not task.BackendParams().get('timeout_hours'): |
| 384 # Timeout is at least 1 hour. | 447 # Timeout is at least 1 hour. |
| 385 task.BackendParams()['timeout_hours'] = max(1, 5 * expected_duration_h) | 448 task.BackendParams()['timeout_hours'] = max( |
| 449 1, 5 * expected_duration_s / 3600.0) | |
| 386 clovis_logger.info( | 450 clovis_logger.info( |
| 387 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours']) | 451 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours']) |
| 388 | 452 |
| 453 frontend_job.status = 'queueing_tasks' | |
| 389 if not EnqueueTasks(sub_tasks, task_tag): | 454 if not EnqueueTasks(sub_tasks, task_tag): |
| 390 return Render('Task creation failed.', memory_logs) | 455 frontend_job.status = 'task_creation_error' |
| 456 return | |
| 391 | 457 |
| 392 # Start polling the progress. | 458 # Start polling the progress. |
| 393 clovis_logger.info('Creating worker polling task.') | 459 clovis_logger.info('Creating worker polling task.') |
| 394 first_poll_delay_minutes = 10 | 460 first_poll_delay_minutes = 10 |
| 395 deferred.defer(PollWorkers, task_tag, time.time(), | 461 deferred.defer(PollWorkers, task_tag, time.time(), |
| 396 task.BackendParams()['timeout_hours'], user_email, | 462 task.BackendParams()['timeout_hours'], user_email, |
| 397 task_url, _countdown=(60 * first_poll_delay_minutes)) | 463 task_url, _countdown=(60 * first_poll_delay_minutes)) |
| 398 | 464 |
| 399 # Start the instances if required. | 465 # Start the instances if required. |
| 466 frontend_job.status = 'creating_instances' | |
| 400 if not CreateInstanceTemplate(task, task_dir): | 467 if not CreateInstanceTemplate(task, task_dir): |
| 401 return Render('Instance template creation failed.', memory_logs) | 468 frontend_job.status = 'instance_template_error' |
| 469 return | |
| 402 if not CreateInstances(task): | 470 if not CreateInstances(task): |
| 403 return Render('Instance creation failed.', memory_logs) | 471 frontend_job.status = 'instance_creation_error' |
| 472 return | |
| 404 | 473 |
| 405 return Render(flask.Markup( | 474 frontend_job.status = 'started' |
| 406 'Success!<br>Your task %s has started.<br>' | |
| 407 'Expected duration: %.1f hours.<br>' | |
| 408 'You will be notified at %s when completed.') % ( | |
| 409 task_tag, expected_duration_h, user_email), | |
| 410 memory_logs) | |
| 411 | |
| 412 | 475 |
| 413 def EnqueueTasks(tasks, task_tag): | 476 def EnqueueTasks(tasks, task_tag): |
| 414 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by | 477 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
| 415 Google Compute Engine. | 478 Google Compute Engine. |
| 416 """ | 479 """ |
| 417 q = taskqueue.Queue('clovis-queue') | 480 q = taskqueue.Queue('clovis-queue') |
| 418 # Add tasks to the queue by groups. | 481 # Add tasks to the queue by groups. |
| 419 # TODO(droger): This supports thousands of tasks, but maybe not millions. | 482 # TODO(droger): This supports thousands of tasks, but maybe not millions. |
| 420 # Defer the enqueuing if it times out. | 483 # Defer the enqueuing if it times out. |
| 421 group_size = 100 | 484 group_size = 100 |
| (...skipping 26 matching lines...) Expand all Loading... | |
| 448 @app.route('/form_sent', methods=['POST']) | 511 @app.route('/form_sent', methods=['POST']) |
| 449 def StartFromForm(): | 512 def StartFromForm(): |
| 450 """HTML form endpoint.""" | 513 """HTML form endpoint.""" |
| 451 data_stream = flask.request.files.get('json_task') | 514 data_stream = flask.request.files.get('json_task') |
| 452 if not data_stream: | 515 if not data_stream: |
| 453 return Render('Failed, no content.') | 516 return Render('Failed, no content.') |
| 454 http_body_str = data_stream.read() | 517 http_body_str = data_stream.read() |
| 455 return StartFromJsonString(http_body_str) | 518 return StartFromJsonString(http_body_str) |
| 456 | 519 |
| 457 | 520 |
| 521 @app.route('/list_jobs') | |
| 522 def ShowTaskList(): | |
| 523 """Shows a list of all active jobs.""" | |
| 524 tags = FrontendJob.ListJobs() | |
| 525 | |
| 526 if not tags: | |
| 527 Render('No active jobs.') | |
| 528 | |
| 529 html = flask.Markup('Active tasks:<ul>') | |
| 530 for tag in tags: | |
| 531 html += flask.Markup( | |
| 532 '<li><a href="%s">%s</a></li>') % (FrontendJob.GetJobURL(tag), tag) | |
| 533 html += flask.Markup('</ul>') | |
| 534 return Render(html) | |
| 535 | |
| 536 | |
| 537 @app.route(FrontendJob.SHOW_JOB_URL) | |
| 538 def ShowTask(): | |
| 539 """Shows basic information abour a job.""" | |
| 540 tag = flask.request.args.get('tag') | |
| 541 if not tag: | |
| 542 return Render('Invalid task tag.') | |
| 543 | |
| 544 frontend_job = FrontendJob.GetFromTag(tag) | |
| 545 | |
| 546 if not frontend_job: | |
| 547 return Render('Task not found.') | |
| 548 | |
| 549 message = flask.Markup('Task details:' + frontend_job.RenderAsHtml()) | |
| 550 | |
| 551 log = None | |
| 552 if frontend_job.log: | |
| 553 log = frontend_job.log.split('\n') | |
| 554 | |
| 555 return flask.render_template('log.html', body=message, log=log) | |
| 556 | |
| 557 | |
| 458 @app.errorhandler(404) | 558 @app.errorhandler(404) |
| 459 def PageNotFound(e): # pylint: disable=unused-argument | 559 def PageNotFound(e): # pylint: disable=unused-argument |
| 460 """Return a custom 404 error.""" | 560 """Return a custom 404 error.""" |
| 461 return 'Sorry, Nothing at this URL.', 404 | 561 return 'Sorry, Nothing at this URL.', 404 |
| 462 | 562 |
| 463 | 563 |
| 464 @app.errorhandler(500) | 564 @app.errorhandler(500) |
| 465 def ApplicationError(e): | 565 def ApplicationError(e): |
| 466 """Return a custom 500 error.""" | 566 """Return a custom 500 error.""" |
| 467 return 'Sorry, unexpected error: {}'.format(e), 499 | 567 return 'Sorry, unexpected error: {}'.format(e), 499 |
| OLD | NEW |