OLD | NEW |
1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import logging | 5 import logging |
| 6 import datetime |
6 import math | 7 import math |
7 import os | 8 import os |
8 import sys | 9 import sys |
9 import time | 10 import time |
| 11 import traceback |
10 | 12 |
11 import cloudstorage | 13 import cloudstorage |
12 import flask | 14 import flask |
13 from google.appengine.api import (app_identity, taskqueue) | 15 from google.appengine.api import (app_identity, taskqueue) |
14 from google.appengine.ext import deferred | 16 from google.appengine.ext import deferred |
15 from oauth2client.client import GoogleCredentials | 17 from oauth2client.client import GoogleCredentials |
16 | 18 |
17 import common.clovis_paths | 19 import common.clovis_paths |
18 from common.clovis_task import ClovisTask | 20 from common.clovis_task import ClovisTask |
19 import common.google_bigquery_helper | 21 import common.google_bigquery_helper |
20 import common.google_instance_helper | 22 import common.google_instance_helper |
21 from common.loading_trace_database import LoadingTraceDatabase | 23 from common.loading_trace_database import LoadingTraceDatabase |
22 import email_helper | 24 import email_helper |
| 25 from frontend_job import FrontendJob |
23 from memory_logs import MemoryLogs | 26 from memory_logs import MemoryLogs |
24 | 27 |
25 | 28 |
26 # Global variables. | 29 # Global variables. |
27 logging.Formatter.converter = time.gmtime | 30 logging.Formatter.converter = time.gmtime |
28 clovis_logger = logging.getLogger('clovis_frontend') | 31 clovis_logger = logging.getLogger('clovis_frontend') |
29 clovis_logger.setLevel(logging.DEBUG) | 32 clovis_logger.setLevel(logging.DEBUG) |
30 project_name = app_identity.get_application_id() | 33 project_name = app_identity.get_application_id() |
31 instance_helper = common.google_instance_helper.GoogleInstanceHelper( | 34 instance_helper = common.google_instance_helper.GoogleInstanceHelper( |
32 credentials=GoogleCredentials.get_application_default(), | 35 credentials=GoogleCredentials.get_application_default(), |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
92 email_address (str): Email address of the user to be notified. | 95 email_address (str): Email address of the user to be notified. |
93 status (str): Status of the task, indicating the success or the cause of | 96 status (str): Status of the task, indicating the success or the cause of |
94 failure. | 97 failure. |
95 task_url (str): URL where the results of the task can be found. | 98 task_url (str): URL where the results of the task can be found. |
96 """ | 99 """ |
97 email_helper.SendEmailTaskComplete( | 100 email_helper.SendEmailTaskComplete( |
98 to_address=email_address, tag=tag, status=status, task_url=task_url, | 101 to_address=email_address, tag=tag, status=status, task_url=task_url, |
99 logger=clovis_logger) | 102 logger=clovis_logger) |
100 clovis_logger.info('Scheduling instance group destruction for tag: ' + tag) | 103 clovis_logger.info('Scheduling instance group destruction for tag: ' + tag) |
101 deferred.defer(DeleteInstanceGroup, tag) | 104 deferred.defer(DeleteInstanceGroup, tag) |
| 105 FrontendJob.DeleteForTag(tag) |
102 | 106 |
103 | 107 |
104 def GetEstimatedTaskDurationInSeconds(task): | 108 def GetEstimatedTaskDurationInSeconds(task): |
105 """Returns an estimation of the time required to run the task. | 109 """Returns an estimation of the time required to run the task. |
106 | 110 |
107 Args: | 111 Args: |
108 task: (ClovisTask) The task. | 112 task: (ClovisTask) The task. |
109 | 113 |
110 Returns: | 114 Returns: |
111 float: Time estimation in seconds, or -1 in case of failure. | 115 float: Time estimation in seconds, or -1 in case of failure. |
(...skipping 205 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
317 | 321 |
318 # Load the task from JSON. | 322 # Load the task from JSON. |
319 task = ClovisTask.FromJsonString(http_body_str) | 323 task = ClovisTask.FromJsonString(http_body_str) |
320 if not task: | 324 if not task: |
321 clovis_logger.error('Invalid JSON task.') | 325 clovis_logger.error('Invalid JSON task.') |
322 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) | 326 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
323 | 327 |
324 task_tag = task.BackendParams()['tag'] | 328 task_tag = task.BackendParams()['tag'] |
325 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), | 329 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), |
326 task_tag)) | 330 task_tag)) |
| 331 user_email = email_helper.GetUserEmail() |
| 332 |
| 333 # Write the job to the datastore. |
| 334 frontend_job = FrontendJob.CreateForTag(task_tag) |
| 335 frontend_job.email = user_email |
| 336 frontend_job.status = 'not_started' |
| 337 frontend_job.clovis_task = task.ToJsonString() |
| 338 frontend_job.put() |
| 339 |
| 340 # Process the job on the queue, to avoid timeout issues. |
| 341 deferred.defer(SpawnTasksOnBackgroundQueue, task_tag) |
| 342 |
| 343 return Render( |
| 344 flask.Markup( |
| 345 '<a href="%s">See progress.</a>' % FrontendJob.GetJobURL(task_tag)), |
| 346 memory_logs) |
| 347 |
| 348 |
| 349 def SpawnTasksOnBackgroundQueue(task_tag): |
| 350 """Spawns Clovis tasks associated with task_tag from the backgound queue. |
| 351 |
| 352 This function is mostly a wrapper around SpawnTasks() that catches exceptions. |
| 353 It is assumed that a FrontendJob for task_tag exists. |
| 354 """ |
| 355 memory_logs = MemoryLogs(clovis_logger) |
| 356 memory_logs.Start() |
| 357 clovis_logger.info('Spawning tasks on background queue.') |
| 358 |
| 359 try: |
| 360 frontend_job = FrontendJob.GetFromTag(task_tag) |
| 361 frontend_job.status = 'will_start' |
| 362 SpawnTasks(frontend_job) |
| 363 except Exception as e: |
| 364 clovis_logger.error('Exception spawning tasks: ' + str(e)) |
| 365 clovis_logger.error(traceback.print_exc()) |
| 366 |
| 367 # Update the task. |
| 368 if frontend_job: |
| 369 frontend_job.log = memory_logs.Flush() |
| 370 frontend_job.put() |
| 371 |
| 372 |
| 373 def SpawnTasks(frontend_job): |
| 374 """ Spawns Clovis tasks associated with the frontend job.""" |
| 375 user_email = frontend_job.email |
| 376 task = ClovisTask.FromJsonString(frontend_job.clovis_task) |
| 377 task_tag = task.BackendParams()['tag'] |
| 378 |
| 379 # Delete the clovis task from the FrontendJob because it can make the object |
| 380 # very heavy and it is no longer needed. |
| 381 frontend_job.clovis_task = None |
| 382 |
327 # Compute the task directory. | 383 # Compute the task directory. |
| 384 frontend_job.status = 'building_task_dir' |
328 task_dir_components = [] | 385 task_dir_components = [] |
329 user_email = email_helper.GetUserEmail() | |
330 user_name = None | 386 user_name = None |
331 if user_email: | 387 if user_email: |
332 user_name = user_email[:user_email.find('@')] | 388 user_name = user_email[:user_email.find('@')] |
333 if user_name: | 389 if user_name: |
334 task_dir_components.append(user_name) | 390 task_dir_components.append(user_name) |
335 task_name = task.BackendParams().get('task_name') | 391 task_name = task.BackendParams().get('task_name') |
336 if task_name: | 392 if task_name: |
337 task_dir_components.append(task_name) | 393 task_dir_components.append(task_name) |
338 task_dir_components.append(task_tag) | 394 task_dir_components.append(task_tag) |
339 task_dir = os.path.join(task.Action(), '_'.join(task_dir_components)) | 395 task_dir = os.path.join(task.Action(), '_'.join(task_dir_components)) |
340 | 396 |
341 # Build the URL where the result will live. | 397 # Build the URL where the result will live. |
| 398 frontend_job.status = 'building_task_url' |
342 task_url = GetTaskURL(task, task_dir) | 399 task_url = GetTaskURL(task, task_dir) |
343 if task_url: | 400 if task_url: |
344 clovis_logger.info('Task result URL: ' + task_url) | 401 clovis_logger.info('Task result URL: ' + task_url) |
| 402 frontend_job.task_url = task_url |
345 else: | 403 else: |
346 return Render('Could not build the task URL.', memory_logs) | 404 frontend_job.status = 'task_url_error' |
| 405 return |
347 | 406 |
348 # Split the task in smaller tasks. | 407 # Split the task in smaller tasks. |
| 408 frontend_job.status = 'splitting_task' |
| 409 frontend_job.put() |
349 sub_tasks = SplitClovisTask(task) | 410 sub_tasks = SplitClovisTask(task) |
350 if not sub_tasks: | 411 if not sub_tasks: |
351 return Render('Task split failed.', memory_logs) | 412 frontend_job.status = 'task_split_error' |
| 413 return |
352 | 414 |
353 # Compute estimates for the work duration, in order to compute the instance | 415 # Compute estimates for the work duration, in order to compute the instance |
354 # count and the timeout. | 416 # count and the timeout. |
| 417 frontend_job.status = 'estimating_duration' |
355 sequential_duration_s = \ | 418 sequential_duration_s = \ |
356 GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks) | 419 GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks) |
357 if sequential_duration_s <= 0: | 420 if sequential_duration_s <= 0: |
358 return Render('Time estimation failed.', memory_logs) | 421 frontend_job.status = 'time_estimation_error' |
| 422 return |
359 | 423 |
360 # Compute the number of required instances if not specified. | 424 # Compute the number of required instances if not specified. |
361 if task.BackendParams().get('instance_count') is None: | 425 if task.BackendParams().get('instance_count') is None: |
| 426 frontend_job.status = 'estimating_instance_count' |
362 target_parallel_duration_s = 1800.0 # 30 minutes. | 427 target_parallel_duration_s = 1800.0 # 30 minutes. |
363 task.BackendParams()['instance_count'] = math.ceil( | 428 task.BackendParams()['instance_count'] = math.ceil( |
364 sequential_duration_s / target_parallel_duration_s) | 429 sequential_duration_s / target_parallel_duration_s) |
365 | 430 |
366 # Check the instance quotas. | 431 # Check the instance quotas. |
367 clovis_logger.info( | 432 clovis_logger.info( |
368 'Requesting %i instances.' % task.BackendParams()['instance_count']) | 433 'Requesting %i instances.' % task.BackendParams()['instance_count']) |
| 434 frontend_job.status = 'checking_instance_quotas' |
369 max_instances = instance_helper.GetAvailableInstanceCount() | 435 max_instances = instance_helper.GetAvailableInstanceCount() |
370 if max_instances == -1: | 436 if max_instances == -1: |
371 return Render('Failed to count the available instances.', memory_logs) | 437 frontend_job.status = 'instance_count_error' |
| 438 return |
372 elif task.BackendParams()['instance_count'] == 0: | 439 elif task.BackendParams()['instance_count'] == 0: |
373 return Render('Cannot create instances, quota exceeded.', memory_logs) | 440 frontend_job.status = 'no_instance_available_error' |
| 441 return |
374 elif max_instances < task.BackendParams()['instance_count']: | 442 elif max_instances < task.BackendParams()['instance_count']: |
375 clovis_logger.warning( | 443 clovis_logger.warning( |
376 'Instance count limited by quota: %i available / %i requested.' % ( | 444 'Instance count limited by quota: %i available / %i requested.' % ( |
377 max_instances, task.BackendParams()['instance_count'])) | 445 max_instances, task.BackendParams()['instance_count'])) |
378 task.BackendParams()['instance_count'] = max_instances | 446 task.BackendParams()['instance_count'] = max_instances |
379 | 447 |
380 # Compute the timeout if there is none specified. | 448 # Compute the timeout if there is none specified. |
381 expected_duration_h = sequential_duration_s / ( | 449 expected_duration_s = sequential_duration_s / ( |
382 task.BackendParams()['instance_count'] * 3600.0) | 450 task.BackendParams()['instance_count']) |
| 451 frontend_job.eta = datetime.datetime.now() + datetime.timedelta( |
| 452 seconds=expected_duration_s) |
383 if not task.BackendParams().get('timeout_hours'): | 453 if not task.BackendParams().get('timeout_hours'): |
384 # Timeout is at least 1 hour. | 454 # Timeout is at least 1 hour. |
385 task.BackendParams()['timeout_hours'] = max(1, 5 * expected_duration_h) | 455 task.BackendParams()['timeout_hours'] = max( |
| 456 1, 5 * expected_duration_s / 3600.0) |
386 clovis_logger.info( | 457 clovis_logger.info( |
387 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours']) | 458 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours']) |
388 | 459 |
| 460 frontend_job.status = 'queueing_tasks' |
389 if not EnqueueTasks(sub_tasks, task_tag): | 461 if not EnqueueTasks(sub_tasks, task_tag): |
390 return Render('Task creation failed.', memory_logs) | 462 frontend_job.status = 'task_creation_error' |
| 463 return |
391 | 464 |
392 # Start polling the progress. | 465 # Start polling the progress. |
393 clovis_logger.info('Creating worker polling task.') | 466 clovis_logger.info('Creating worker polling task.') |
394 first_poll_delay_minutes = 10 | 467 first_poll_delay_minutes = 10 |
395 deferred.defer(PollWorkers, task_tag, time.time(), | 468 deferred.defer(PollWorkers, task_tag, time.time(), |
396 task.BackendParams()['timeout_hours'], user_email, | 469 task.BackendParams()['timeout_hours'], user_email, |
397 task_url, _countdown=(60 * first_poll_delay_minutes)) | 470 task_url, _countdown=(60 * first_poll_delay_minutes)) |
398 | 471 |
399 # Start the instances if required. | 472 # Start the instances if required. |
| 473 frontend_job.status = 'creating_instances' |
| 474 frontend_job.put() |
400 if not CreateInstanceTemplate(task, task_dir): | 475 if not CreateInstanceTemplate(task, task_dir): |
401 return Render('Instance template creation failed.', memory_logs) | 476 frontend_job.status = 'instance_template_error' |
| 477 return |
402 if not CreateInstances(task): | 478 if not CreateInstances(task): |
403 return Render('Instance creation failed.', memory_logs) | 479 frontend_job.status = 'instance_creation_error' |
| 480 return |
404 | 481 |
405 return Render(flask.Markup( | 482 frontend_job.status = 'started' |
406 'Success!<br>Your task %s has started.<br>' | |
407 'Expected duration: %.1f hours.<br>' | |
408 'You will be notified at %s when completed.') % ( | |
409 task_tag, expected_duration_h, user_email), | |
410 memory_logs) | |
411 | |
412 | 483 |
413 def EnqueueTasks(tasks, task_tag): | 484 def EnqueueTasks(tasks, task_tag): |
414 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by | 485 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
415 Google Compute Engine. | 486 Google Compute Engine. |
416 """ | 487 """ |
417 q = taskqueue.Queue('clovis-queue') | 488 q = taskqueue.Queue('clovis-queue') |
418 # Add tasks to the queue by groups. | 489 # Add tasks to the queue by groups. |
419 # TODO(droger): This supports thousands of tasks, but maybe not millions. | 490 # TODO(droger): This supports thousands of tasks, but maybe not millions. |
420 # Defer the enqueuing if it times out. | 491 # Defer the enqueuing if it times out. |
421 group_size = 100 | 492 group_size = 100 |
(...skipping 26 matching lines...) Expand all Loading... |
448 @app.route('/form_sent', methods=['POST']) | 519 @app.route('/form_sent', methods=['POST']) |
449 def StartFromForm(): | 520 def StartFromForm(): |
450 """HTML form endpoint.""" | 521 """HTML form endpoint.""" |
451 data_stream = flask.request.files.get('json_task') | 522 data_stream = flask.request.files.get('json_task') |
452 if not data_stream: | 523 if not data_stream: |
453 return Render('Failed, no content.') | 524 return Render('Failed, no content.') |
454 http_body_str = data_stream.read() | 525 http_body_str = data_stream.read() |
455 return StartFromJsonString(http_body_str) | 526 return StartFromJsonString(http_body_str) |
456 | 527 |
457 | 528 |
| 529 @app.route('/list_jobs') |
| 530 def ShowTaskList(): |
| 531 """Shows a list of all active jobs.""" |
| 532 tags = FrontendJob.ListJobs() |
| 533 |
| 534 if not tags: |
| 535 Render('No active jobs.') |
| 536 |
| 537 html = flask.Markup('Active tasks:<ul>') |
| 538 for tag in tags: |
| 539 html += flask.Markup( |
| 540 '<li><a href="%s">%s</a></li>') % (FrontendJob.GetJobURL(tag), tag) |
| 541 html += flask.Markup('</ul>') |
| 542 return Render(html) |
| 543 |
| 544 |
| 545 @app.route(FrontendJob.SHOW_JOB_URL) |
| 546 def ShowTask(): |
| 547 """Shows basic information abour a job.""" |
| 548 tag = flask.request.args.get('tag') |
| 549 if not tag: |
| 550 return Render('Invalid task tag.') |
| 551 |
| 552 frontend_job = FrontendJob.GetFromTag(tag) |
| 553 |
| 554 if not frontend_job: |
| 555 return Render('Task not found.') |
| 556 |
| 557 message = flask.Markup('Task details:' + frontend_job.RenderAsHtml()) |
| 558 |
| 559 log = None |
| 560 if frontend_job.log: |
| 561 log = frontend_job.log.split('\n') |
| 562 |
| 563 return flask.render_template('log.html', body=message, log=log) |
| 564 |
| 565 |
458 @app.errorhandler(404) | 566 @app.errorhandler(404) |
459 def PageNotFound(e): # pylint: disable=unused-argument | 567 def PageNotFound(e): # pylint: disable=unused-argument |
460 """Return a custom 404 error.""" | 568 """Return a custom 404 error.""" |
461 return 'Sorry, Nothing at this URL.', 404 | 569 return 'Sorry, Nothing at this URL.', 404 |
462 | 570 |
463 | 571 |
464 @app.errorhandler(500) | 572 @app.errorhandler(500) |
465 def ApplicationError(e): | 573 def ApplicationError(e): |
466 """Return a custom 500 error.""" | 574 """Return a custom 500 error.""" |
467 return 'Sorry, unexpected error: {}'.format(e), 499 | 575 return 'Sorry, unexpected error: {}'.format(e), 499 |
OLD | NEW |