Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(418)

Side by Side Diff: tools/android/loading/cloud/frontend/clovis_frontend.py

Issue 2041193008: tools/android/loading Move task creation to background and add dashboard (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@frontendNumberFix
Patch Set: Review comments Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import logging 5 import logging
6 import datetime
6 import math 7 import math
7 import os 8 import os
8 import sys 9 import sys
9 import time 10 import time
11 import traceback
10 12
11 import cloudstorage 13 import cloudstorage
12 import flask 14 import flask
13 from google.appengine.api import (app_identity, taskqueue) 15 from google.appengine.api import (app_identity, taskqueue)
14 from google.appengine.ext import deferred 16 from google.appengine.ext import deferred
15 from oauth2client.client import GoogleCredentials 17 from oauth2client.client import GoogleCredentials
16 18
17 import common.clovis_paths 19 import common.clovis_paths
18 from common.clovis_task import ClovisTask 20 from common.clovis_task import ClovisTask
19 import common.google_bigquery_helper 21 import common.google_bigquery_helper
20 import common.google_instance_helper 22 import common.google_instance_helper
21 from common.loading_trace_database import LoadingTraceDatabase 23 from common.loading_trace_database import LoadingTraceDatabase
22 import email_helper 24 import email_helper
25 from frontend_job import FrontendJob
23 from memory_logs import MemoryLogs 26 from memory_logs import MemoryLogs
24 27
25 28
26 # Global variables. 29 # Global variables.
27 logging.Formatter.converter = time.gmtime 30 logging.Formatter.converter = time.gmtime
28 clovis_logger = logging.getLogger('clovis_frontend') 31 clovis_logger = logging.getLogger('clovis_frontend')
29 clovis_logger.setLevel(logging.DEBUG) 32 clovis_logger.setLevel(logging.DEBUG)
30 project_name = app_identity.get_application_id() 33 project_name = app_identity.get_application_id()
31 instance_helper = common.google_instance_helper.GoogleInstanceHelper( 34 instance_helper = common.google_instance_helper.GoogleInstanceHelper(
32 credentials=GoogleCredentials.get_application_default(), 35 credentials=GoogleCredentials.get_application_default(),
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 email_address (str): Email address of the user to be notified. 95 email_address (str): Email address of the user to be notified.
93 status (str): Status of the task, indicating the success or the cause of 96 status (str): Status of the task, indicating the success or the cause of
94 failure. 97 failure.
95 task_url (str): URL where the results of the task can be found. 98 task_url (str): URL where the results of the task can be found.
96 """ 99 """
97 email_helper.SendEmailTaskComplete( 100 email_helper.SendEmailTaskComplete(
98 to_address=email_address, tag=tag, status=status, task_url=task_url, 101 to_address=email_address, tag=tag, status=status, task_url=task_url,
99 logger=clovis_logger) 102 logger=clovis_logger)
100 clovis_logger.info('Scheduling instance group destruction for tag: ' + tag) 103 clovis_logger.info('Scheduling instance group destruction for tag: ' + tag)
101 deferred.defer(DeleteInstanceGroup, tag) 104 deferred.defer(DeleteInstanceGroup, tag)
105 FrontendJob.DeleteForTag(tag)
102 106
103 107
104 def GetEstimatedTaskDurationInSeconds(task): 108 def GetEstimatedTaskDurationInSeconds(task):
105 """Returns an estimation of the time required to run the task. 109 """Returns an estimation of the time required to run the task.
106 110
107 Args: 111 Args:
108 task: (ClovisTask) The task. 112 task: (ClovisTask) The task.
109 113
110 Returns: 114 Returns:
111 float: Time estimation in seconds, or -1 in case of failure. 115 float: Time estimation in seconds, or -1 in case of failure.
(...skipping 205 matching lines...) Expand 10 before | Expand all | Expand 10 after
317 321
318 # Load the task from JSON. 322 # Load the task from JSON.
319 task = ClovisTask.FromJsonString(http_body_str) 323 task = ClovisTask.FromJsonString(http_body_str)
320 if not task: 324 if not task:
321 clovis_logger.error('Invalid JSON task.') 325 clovis_logger.error('Invalid JSON task.')
322 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) 326 return Render('Invalid JSON task:\n' + http_body_str, memory_logs)
323 327
324 task_tag = task.BackendParams()['tag'] 328 task_tag = task.BackendParams()['tag']
325 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), 329 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(),
326 task_tag)) 330 task_tag))
331 user_email = email_helper.GetUserEmail()
332
333 # Write the job to the datastore.
334 frontend_job = FrontendJob.CreateForTag(task_tag)
335 frontend_job.email = user_email
336 frontend_job.status = 'not_started'
337 frontend_job.clovis_task = task.ToJsonString()
338 frontend_job.put()
339
340 # Process the job on the queue, to avoid timeout issues.
341 deferred.defer(SpawnTasksOnBackgroundQueue, task_tag)
342
343 return Render(
344 flask.Markup(
345 '<a href="%s">See progress.</a>' % FrontendJob.GetJobURL(task_tag)),
346 memory_logs)
347
348
349 def SpawnTasksOnBackgroundQueue(task_tag):
350 """Spawns Clovis tasks associated with task_tag from the backgound queue.
351
352 This function is mostly a wrapper around SpawnTasks() that catches exceptions.
353 It is assumed that a FrontendJob for task_tag exists.
354 """
355 memory_logs = MemoryLogs(clovis_logger)
356 memory_logs.Start()
357 clovis_logger.info('Spawning tasks on background queue.')
358
359 try:
360 frontend_job = FrontendJob.GetFromTag(task_tag)
361 frontend_job.status = 'will_start'
362 SpawnTasks(frontend_job)
363 except Exception as e:
364 clovis_logger.error('Exception spawning tasks: ' + str(e))
365 clovis_logger.error(traceback.print_exc())
366
367 # Update the task.
368 if frontend_job:
369 frontend_job.log = memory_logs.Flush()
370 frontend_job.put()
371
372
373 def SpawnTasks(frontend_job):
374 """ Spawns Clovis tasks associated with the frontend job."""
375 user_email = frontend_job.email
376 task = ClovisTask.FromJsonString(frontend_job.clovis_task)
377 task_tag = task.BackendParams()['tag']
378
379 # Delete the clovis task from the FrontendJob because it can make the object
380 # very heavy and it is no longer needed.
381 frontend_job.clovis_task = None
382
327 # Compute the task directory. 383 # Compute the task directory.
384 frontend_job.status = 'building_task_dir'
328 task_dir_components = [] 385 task_dir_components = []
329 user_email = email_helper.GetUserEmail()
330 user_name = None 386 user_name = None
331 if user_email: 387 if user_email:
332 user_name = user_email[:user_email.find('@')] 388 user_name = user_email[:user_email.find('@')]
333 if user_name: 389 if user_name:
334 task_dir_components.append(user_name) 390 task_dir_components.append(user_name)
335 task_name = task.BackendParams().get('task_name') 391 task_name = task.BackendParams().get('task_name')
336 if task_name: 392 if task_name:
337 task_dir_components.append(task_name) 393 task_dir_components.append(task_name)
338 task_dir_components.append(task_tag) 394 task_dir_components.append(task_tag)
339 task_dir = os.path.join(task.Action(), '_'.join(task_dir_components)) 395 task_dir = os.path.join(task.Action(), '_'.join(task_dir_components))
340 396
341 # Build the URL where the result will live. 397 # Build the URL where the result will live.
398 frontend_job.status = 'building_task_url'
342 task_url = GetTaskURL(task, task_dir) 399 task_url = GetTaskURL(task, task_dir)
343 if task_url: 400 if task_url:
344 clovis_logger.info('Task result URL: ' + task_url) 401 clovis_logger.info('Task result URL: ' + task_url)
402 frontend_job.task_url = task_url
345 else: 403 else:
346 return Render('Could not build the task URL.', memory_logs) 404 frontend_job.status = 'task_url_error'
405 return
347 406
348 # Split the task in smaller tasks. 407 # Split the task in smaller tasks.
408 frontend_job.status = 'splitting_task'
409 frontend_job.put()
349 sub_tasks = SplitClovisTask(task) 410 sub_tasks = SplitClovisTask(task)
350 if not sub_tasks: 411 if not sub_tasks:
351 return Render('Task split failed.', memory_logs) 412 frontend_job.status = 'task_split_error'
413 return
352 414
353 # Compute estimates for the work duration, in order to compute the instance 415 # Compute estimates for the work duration, in order to compute the instance
354 # count and the timeout. 416 # count and the timeout.
417 frontend_job.status = 'estimating_duration'
355 sequential_duration_s = \ 418 sequential_duration_s = \
356 GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks) 419 GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks)
357 if sequential_duration_s <= 0: 420 if sequential_duration_s <= 0:
358 return Render('Time estimation failed.', memory_logs) 421 frontend_job.status = 'time_estimation_error'
422 return
359 423
360 # Compute the number of required instances if not specified. 424 # Compute the number of required instances if not specified.
361 if task.BackendParams().get('instance_count') is None: 425 if task.BackendParams().get('instance_count') is None:
426 frontend_job.status = 'estimating_instance_count'
362 target_parallel_duration_s = 1800.0 # 30 minutes. 427 target_parallel_duration_s = 1800.0 # 30 minutes.
363 task.BackendParams()['instance_count'] = math.ceil( 428 task.BackendParams()['instance_count'] = math.ceil(
364 sequential_duration_s / target_parallel_duration_s) 429 sequential_duration_s / target_parallel_duration_s)
365 430
366 # Check the instance quotas. 431 # Check the instance quotas.
367 clovis_logger.info( 432 clovis_logger.info(
368 'Requesting %i instances.' % task.BackendParams()['instance_count']) 433 'Requesting %i instances.' % task.BackendParams()['instance_count'])
434 frontend_job.status = 'checking_instance_quotas'
369 max_instances = instance_helper.GetAvailableInstanceCount() 435 max_instances = instance_helper.GetAvailableInstanceCount()
370 if max_instances == -1: 436 if max_instances == -1:
371 return Render('Failed to count the available instances.', memory_logs) 437 frontend_job.status = 'instance_count_error'
438 return
372 elif task.BackendParams()['instance_count'] == 0: 439 elif task.BackendParams()['instance_count'] == 0:
373 return Render('Cannot create instances, quota exceeded.', memory_logs) 440 frontend_job.status = 'no_instance_available_error'
441 return
374 elif max_instances < task.BackendParams()['instance_count']: 442 elif max_instances < task.BackendParams()['instance_count']:
375 clovis_logger.warning( 443 clovis_logger.warning(
376 'Instance count limited by quota: %i available / %i requested.' % ( 444 'Instance count limited by quota: %i available / %i requested.' % (
377 max_instances, task.BackendParams()['instance_count'])) 445 max_instances, task.BackendParams()['instance_count']))
378 task.BackendParams()['instance_count'] = max_instances 446 task.BackendParams()['instance_count'] = max_instances
379 447
380 # Compute the timeout if there is none specified. 448 # Compute the timeout if there is none specified.
381 expected_duration_h = sequential_duration_s / ( 449 expected_duration_s = sequential_duration_s / (
382 task.BackendParams()['instance_count'] * 3600.0) 450 task.BackendParams()['instance_count'])
451 frontend_job.eta = datetime.datetime.now() + datetime.timedelta(
452 seconds=expected_duration_s)
383 if not task.BackendParams().get('timeout_hours'): 453 if not task.BackendParams().get('timeout_hours'):
384 # Timeout is at least 1 hour. 454 # Timeout is at least 1 hour.
385 task.BackendParams()['timeout_hours'] = max(1, 5 * expected_duration_h) 455 task.BackendParams()['timeout_hours'] = max(
456 1, 5 * expected_duration_s / 3600.0)
386 clovis_logger.info( 457 clovis_logger.info(
387 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours']) 458 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours'])
388 459
460 frontend_job.status = 'queueing_tasks'
389 if not EnqueueTasks(sub_tasks, task_tag): 461 if not EnqueueTasks(sub_tasks, task_tag):
390 return Render('Task creation failed.', memory_logs) 462 frontend_job.status = 'task_creation_error'
463 return
391 464
392 # Start polling the progress. 465 # Start polling the progress.
393 clovis_logger.info('Creating worker polling task.') 466 clovis_logger.info('Creating worker polling task.')
394 first_poll_delay_minutes = 10 467 first_poll_delay_minutes = 10
395 deferred.defer(PollWorkers, task_tag, time.time(), 468 deferred.defer(PollWorkers, task_tag, time.time(),
396 task.BackendParams()['timeout_hours'], user_email, 469 task.BackendParams()['timeout_hours'], user_email,
397 task_url, _countdown=(60 * first_poll_delay_minutes)) 470 task_url, _countdown=(60 * first_poll_delay_minutes))
398 471
399 # Start the instances if required. 472 # Start the instances if required.
473 frontend_job.status = 'creating_instances'
474 frontend_job.put()
400 if not CreateInstanceTemplate(task, task_dir): 475 if not CreateInstanceTemplate(task, task_dir):
401 return Render('Instance template creation failed.', memory_logs) 476 frontend_job.status = 'instance_template_error'
477 return
402 if not CreateInstances(task): 478 if not CreateInstances(task):
403 return Render('Instance creation failed.', memory_logs) 479 frontend_job.status = 'instance_creation_error'
480 return
404 481
405 return Render(flask.Markup( 482 frontend_job.status = 'started'
406 'Success!<br>Your task %s has started.<br>'
407 'Expected duration: %.1f hours.<br>'
408 'You will be notified at %s when completed.') % (
409 task_tag, expected_duration_h, user_email),
410 memory_logs)
411
412 483
413 def EnqueueTasks(tasks, task_tag): 484 def EnqueueTasks(tasks, task_tag):
414 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by 485 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by
415 Google Compute Engine. 486 Google Compute Engine.
416 """ 487 """
417 q = taskqueue.Queue('clovis-queue') 488 q = taskqueue.Queue('clovis-queue')
418 # Add tasks to the queue by groups. 489 # Add tasks to the queue by groups.
419 # TODO(droger): This supports thousands of tasks, but maybe not millions. 490 # TODO(droger): This supports thousands of tasks, but maybe not millions.
420 # Defer the enqueuing if it times out. 491 # Defer the enqueuing if it times out.
421 group_size = 100 492 group_size = 100
(...skipping 26 matching lines...) Expand all
448 @app.route('/form_sent', methods=['POST']) 519 @app.route('/form_sent', methods=['POST'])
449 def StartFromForm(): 520 def StartFromForm():
450 """HTML form endpoint.""" 521 """HTML form endpoint."""
451 data_stream = flask.request.files.get('json_task') 522 data_stream = flask.request.files.get('json_task')
452 if not data_stream: 523 if not data_stream:
453 return Render('Failed, no content.') 524 return Render('Failed, no content.')
454 http_body_str = data_stream.read() 525 http_body_str = data_stream.read()
455 return StartFromJsonString(http_body_str) 526 return StartFromJsonString(http_body_str)
456 527
457 528
529 @app.route('/list_jobs')
530 def ShowTaskList():
531 """Shows a list of all active jobs."""
532 tags = FrontendJob.ListJobs()
533
534 if not tags:
535 Render('No active jobs.')
536
537 html = flask.Markup('Active tasks:<ul>')
538 for tag in tags:
539 html += flask.Markup(
540 '<li><a href="%s">%s</a></li>') % (FrontendJob.GetJobURL(tag), tag)
541 html += flask.Markup('</ul>')
542 return Render(html)
543
544
545 @app.route(FrontendJob.SHOW_JOB_URL)
546 def ShowTask():
547 """Shows basic information abour a job."""
548 tag = flask.request.args.get('tag')
549 if not tag:
550 return Render('Invalid task tag.')
551
552 frontend_job = FrontendJob.GetFromTag(tag)
553
554 if not frontend_job:
555 return Render('Task not found.')
556
557 message = flask.Markup('Task details:' + frontend_job.RenderAsHtml())
558
559 log = None
560 if frontend_job.log:
561 log = frontend_job.log.split('\n')
562
563 return flask.render_template('log.html', body=message, log=log)
564
565
458 @app.errorhandler(404) 566 @app.errorhandler(404)
459 def PageNotFound(e): # pylint: disable=unused-argument 567 def PageNotFound(e): # pylint: disable=unused-argument
460 """Return a custom 404 error.""" 568 """Return a custom 404 error."""
461 return 'Sorry, Nothing at this URL.', 404 569 return 'Sorry, Nothing at this URL.', 404
462 570
463 571
464 @app.errorhandler(500) 572 @app.errorhandler(500)
465 def ApplicationError(e): 573 def ApplicationError(e):
466 """Return a custom 500 error.""" 574 """Return a custom 500 error."""
467 return 'Sorry, unexpected error: {}'.format(e), 499 575 return 'Sorry, unexpected error: {}'.format(e), 499
OLDNEW
« no previous file with comments | « tools/android/loading/cloud/common/clovis_task.py ('k') | tools/android/loading/cloud/frontend/frontend_job.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698