Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(329)

Side by Side Diff: tools/android/loading/cloud/frontend/clovis_frontend.py

Issue 2041193008: tools/android/loading Move task creation to background and add dashboard (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@frontendNumberFix
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import logging 5 import logging
6 import datetime
6 import math 7 import math
7 import os 8 import os
8 import sys 9 import sys
9 import time 10 import time
10 11
11 import cloudstorage 12 import cloudstorage
12 import flask 13 import flask
13 from google.appengine.api import (app_identity, taskqueue) 14 from google.appengine.api import (app_identity, taskqueue)
14 from google.appengine.ext import deferred 15 from google.appengine.ext import deferred
15 from oauth2client.client import GoogleCredentials 16 from oauth2client.client import GoogleCredentials
16 17
17 import common.clovis_paths 18 import common.clovis_paths
18 from common.clovis_task import ClovisTask 19 from common.clovis_task import ClovisTask
19 import common.google_bigquery_helper 20 import common.google_bigquery_helper
20 import common.google_instance_helper 21 import common.google_instance_helper
21 from common.loading_trace_database import LoadingTraceDatabase 22 from common.loading_trace_database import LoadingTraceDatabase
22 import email_helper 23 import email_helper
24 from frontend_job import FrontendJob
23 from memory_logs import MemoryLogs 25 from memory_logs import MemoryLogs
24 26
25 27
26 # Global variables. 28 # Global variables.
27 logging.Formatter.converter = time.gmtime 29 logging.Formatter.converter = time.gmtime
28 clovis_logger = logging.getLogger('clovis_frontend') 30 clovis_logger = logging.getLogger('clovis_frontend')
29 clovis_logger.setLevel(logging.DEBUG) 31 clovis_logger.setLevel(logging.DEBUG)
30 project_name = app_identity.get_application_id() 32 project_name = app_identity.get_application_id()
31 instance_helper = common.google_instance_helper.GoogleInstanceHelper( 33 instance_helper = common.google_instance_helper.GoogleInstanceHelper(
32 credentials=GoogleCredentials.get_application_default(), 34 credentials=GoogleCredentials.get_application_default(),
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 email_address (str): Email address of the user to be notified. 94 email_address (str): Email address of the user to be notified.
93 status (str): Status of the task, indicating the success or the cause of 95 status (str): Status of the task, indicating the success or the cause of
94 failure. 96 failure.
95 task_url (str): URL where the results of the task can be found. 97 task_url (str): URL where the results of the task can be found.
96 """ 98 """
97 email_helper.SendEmailTaskComplete( 99 email_helper.SendEmailTaskComplete(
98 to_address=email_address, tag=tag, status=status, task_url=task_url, 100 to_address=email_address, tag=tag, status=status, task_url=task_url,
99 logger=clovis_logger) 101 logger=clovis_logger)
100 clovis_logger.info('Scheduling instance group destruction for tag: ' + tag) 102 clovis_logger.info('Scheduling instance group destruction for tag: ' + tag)
101 deferred.defer(DeleteInstanceGroup, tag) 103 deferred.defer(DeleteInstanceGroup, tag)
104 FrontendJob.DeleteForTag(tag)
102 105
103 106
104 def GetEstimatedTaskDurationInSeconds(task): 107 def GetEstimatedTaskDurationInSeconds(task):
105 """Returns an estimation of the time required to run the task. 108 """Returns an estimation of the time required to run the task.
106 109
107 Args: 110 Args:
108 task: (ClovisTask) The task. 111 task: (ClovisTask) The task.
109 112
110 Returns: 113 Returns:
111 float: Time estimation in seconds, or -1 in case of failure. 114 float: Time estimation in seconds, or -1 in case of failure.
(...skipping 205 matching lines...) Expand 10 before | Expand all | Expand 10 after
317 320
318 # Load the task from JSON. 321 # Load the task from JSON.
319 task = ClovisTask.FromJsonString(http_body_str) 322 task = ClovisTask.FromJsonString(http_body_str)
320 if not task: 323 if not task:
321 clovis_logger.error('Invalid JSON task.') 324 clovis_logger.error('Invalid JSON task.')
322 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) 325 return Render('Invalid JSON task:\n' + http_body_str, memory_logs)
323 326
324 task_tag = task.BackendParams()['tag'] 327 task_tag = task.BackendParams()['tag']
325 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), 328 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(),
326 task_tag)) 329 task_tag))
330 user_email = email_helper.GetUserEmail()
331
332 # Write the job to the datastore.
333 frontend_job = FrontendJob.CreateForTag(task_tag)
334 frontend_job.email = user_email
335 frontend_job.status = 'not_started'
336 frontend_job.clovis_task = task.ToJsonString()
337 frontend_job.put()
338
339 # Process the job on the queue, to avoid timeout issues.
340 deferred.defer(SpawnTasksOnBackgroundQueue, task_tag)
341
342 return Render(
343 flask.Markup(
344 '<a href="%s">See progress.</a>' % FrontendJob.GetJobURL(task_tag)),
345 memory_logs)
346
347
348 def SpawnTasksOnBackgroundQueue(task_tag):
349 """Spawns Clovis tasks associated with task_tag from the backgound queue.
350
351 This function is mostly a wrapper around SpawnTasks() that catches exceptions.
352 It is assumed that a FrontendJob for task_tag exists.
353 """
354 memory_logs = MemoryLogs(clovis_logger)
355 memory_logs.Start()
356 clovis_logger.info('Spawning tasks on background queue.')
357
358 try:
359 frontend_job = FrontendJob.GetFromTag(task_tag)
360 frontend_job.status = 'will_start'
361 SpawnTasks(frontend_job)
362 except Exception:
363 pass # Catch all exceptions so that AppEngine does not retry indefinitely.
mattcary 2016/06/09 15:40:06 Log the exception so if tasks disappear we have a
droger 2016/06/10 12:14:04 Done.
364
365 # Update the task.
366 if frontend_job:
367 frontend_job.log = memory_logs.Flush()
368 frontend_job.put()
369
370
371 def SpawnTasks(frontend_job):
372 """ Spawns Clovis tasks associated with the frontend job."""
373 user_email = frontend_job.email
374 task = ClovisTask.FromJsonString(frontend_job.clovis_task)
375 task_tag = task.BackendParams()['tag']
376
327 # Compute the task directory. 377 # Compute the task directory.
378 frontend_job.status = 'building_task_dir'
328 task_dir_components = [] 379 task_dir_components = []
329 user_email = email_helper.GetUserEmail()
330 user_name = None 380 user_name = None
331 if user_email: 381 if user_email:
332 user_name = user_email[:user_email.find('@')] 382 user_name = user_email[:user_email.find('@')]
333 if user_name: 383 if user_name:
334 task_dir_components.append(user_name) 384 task_dir_components.append(user_name)
335 task_name = task.BackendParams().get('task_name') 385 task_name = task.BackendParams().get('task_name')
336 if task_name: 386 if task_name:
337 task_dir_components.append(task_name) 387 task_dir_components.append(task_name)
338 task_dir_components.append(task_tag) 388 task_dir_components.append(task_tag)
339 task_dir = os.path.join(task.Action(), '_'.join(task_dir_components)) 389 task_dir = os.path.join(task.Action(), '_'.join(task_dir_components))
340 390
341 # Build the URL where the result will live. 391 # Build the URL where the result will live.
392 frontend_job.status = 'building_task_url'
mattcary 2016/06/09 15:40:06 Do you need to put these status update values, or
droger 2016/06/10 12:14:04 You need to put. This is only done at the end (li
mattcary 2016/06/10 12:17:27 Shouldn't we put after each return, so we have the
droger 2016/06/10 12:58:27 put() will be executed after each return, by the c
mattcary 2016/06/10 13:24:20 Ah! I was just looking in SpawnTasks. Thanks.
342 task_url = GetTaskURL(task, task_dir) 393 task_url = GetTaskURL(task, task_dir)
343 if task_url: 394 if task_url:
344 clovis_logger.info('Task result URL: ' + task_url) 395 clovis_logger.info('Task result URL: ' + task_url)
396 frontend_job.task_url = task_url
345 else: 397 else:
346 return Render('Could not build the task URL.', memory_logs) 398 frontend_job.status = 'task_url_error'
399 return
347 400
348 # Split the task in smaller tasks. 401 # Split the task in smaller tasks.
402 frontend_job.status = 'splitting_task'
349 sub_tasks = SplitClovisTask(task) 403 sub_tasks = SplitClovisTask(task)
350 if not sub_tasks: 404 if not sub_tasks:
351 return Render('Task split failed.', memory_logs) 405 frontend_job.status = 'task_split_error'
406 return
352 407
353 # Compute estimates for the work duration, in order to compute the instance 408 # Compute estimates for the work duration, in order to compute the instance
354 # count and the timeout. 409 # count and the timeout.
410 frontend_job.status = 'estimating_duration'
355 sequential_duration_s = \ 411 sequential_duration_s = \
356 GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks) 412 GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks)
357 if sequential_duration_s <= 0: 413 if sequential_duration_s <= 0:
358 return Render('Time estimation failed.', memory_logs) 414 frontend_job.status = 'time_estimation_error'
415 return
359 416
360 # Compute the number of required instances if not specified. 417 # Compute the number of required instances if not specified.
361 if not task.BackendParams().get('instance_count'): 418 if task.BackendParams().get('instance_count') is None:
419 frontend_job.status = 'estimating_instance_count'
362 target_parallel_duration_s = 1800.0 # 30 minutes. 420 target_parallel_duration_s = 1800.0 # 30 minutes.
363 task.BackendParams()['instance_count'] = math.ceil( 421 task.BackendParams()['instance_count'] = math.ceil(
364 sequential_duration_s / target_parallel_duration_s) 422 sequential_duration_s / target_parallel_duration_s)
365 423
366 # Check the instance quotas. 424 # Check the instance quotas.
367 clovis_logger.info( 425 clovis_logger.info(
368 'Requesting %i instances.' % task.BackendParams()['instance_count']) 426 'Requesting %i instances.' % task.BackendParams()['instance_count'])
427 frontend_job.status = 'checking_instance_quotas'
369 max_instances = instance_helper.GetAvailableInstanceCount() 428 max_instances = instance_helper.GetAvailableInstanceCount()
370 if max_instances == -1: 429 if max_instances == -1:
371 return Render('Failed to count the available instances.', memory_logs) 430 frontend_job.status = 'instance_count_error'
431 return
372 elif task.BackendParams()['instance_count'] == 0: 432 elif task.BackendParams()['instance_count'] == 0:
373 return Render('Cannot create instances, quota exceeded.', memory_logs) 433 frontend_job.status = 'no_instance_available_error'
434 return
374 elif max_instances < task.BackendParams()['instance_count']: 435 elif max_instances < task.BackendParams()['instance_count']:
375 clovis_logger.warning( 436 clovis_logger.warning(
376 'Instance count limited by quota: %i available / %i requested.' % ( 437 'Instance count limited by quota: %i available / %i requested.' % (
377 max_instances, task.BackendParams()['instance_count'])) 438 max_instances, task.BackendParams()['instance_count']))
378 task.BackendParams()['instance_count'] = max_instances 439 task.BackendParams()['instance_count'] = max_instances
379 440
380 # Compute the timeout if there is none specified. 441 # Compute the timeout if there is none specified.
381 expected_duration_h = sequential_duration_s / ( 442 expected_duration_s = sequential_duration_s / (
382 task.BackendParams()['instance_count'] * 3600.0) 443 task.BackendParams()['instance_count'])
444 frontend_job.eta = datetime.datetime.now() + datetime.timedelta(
445 seconds=expected_duration_s)
383 if not task.BackendParams().get('timeout_hours'): 446 if not task.BackendParams().get('timeout_hours'):
384 # Timeout is at least 1 hour. 447 # Timeout is at least 1 hour.
385 task.BackendParams()['timeout_hours'] = max(1, 5 * expected_duration_h) 448 task.BackendParams()['timeout_hours'] = max(
449 1, 5 * expected_duration_s / 3600.0)
386 clovis_logger.info( 450 clovis_logger.info(
387 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours']) 451 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours'])
388 452
453 frontend_job.status = 'queueing_tasks'
389 if not EnqueueTasks(sub_tasks, task_tag): 454 if not EnqueueTasks(sub_tasks, task_tag):
390 return Render('Task creation failed.', memory_logs) 455 frontend_job.status = 'task_creation_error'
456 return
391 457
392 # Start polling the progress. 458 # Start polling the progress.
393 clovis_logger.info('Creating worker polling task.') 459 clovis_logger.info('Creating worker polling task.')
394 first_poll_delay_minutes = 10 460 first_poll_delay_minutes = 10
395 deferred.defer(PollWorkers, task_tag, time.time(), 461 deferred.defer(PollWorkers, task_tag, time.time(),
396 task.BackendParams()['timeout_hours'], user_email, 462 task.BackendParams()['timeout_hours'], user_email,
397 task_url, _countdown=(60 * first_poll_delay_minutes)) 463 task_url, _countdown=(60 * first_poll_delay_minutes))
398 464
399 # Start the instances if required. 465 # Start the instances if required.
466 frontend_job.status = 'creating_instances'
400 if not CreateInstanceTemplate(task, task_dir): 467 if not CreateInstanceTemplate(task, task_dir):
401 return Render('Instance template creation failed.', memory_logs) 468 frontend_job.status = 'instance_template_error'
469 return
402 if not CreateInstances(task): 470 if not CreateInstances(task):
403 return Render('Instance creation failed.', memory_logs) 471 frontend_job.status = 'instance_creation_error'
472 return
404 473
405 return Render(flask.Markup( 474 frontend_job.status = 'started'
406 'Success!<br>Your task %s has started.<br>'
407 'Expected duration: %.1f hours.<br>'
408 'You will be notified at %s when completed.') % (
409 task_tag, expected_duration_h, user_email),
410 memory_logs)
411
412 475
413 def EnqueueTasks(tasks, task_tag): 476 def EnqueueTasks(tasks, task_tag):
414 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by 477 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by
415 Google Compute Engine. 478 Google Compute Engine.
416 """ 479 """
417 q = taskqueue.Queue('clovis-queue') 480 q = taskqueue.Queue('clovis-queue')
418 # Add tasks to the queue by groups. 481 # Add tasks to the queue by groups.
419 # TODO(droger): This supports thousands of tasks, but maybe not millions. 482 # TODO(droger): This supports thousands of tasks, but maybe not millions.
420 # Defer the enqueuing if it times out. 483 # Defer the enqueuing if it times out.
421 group_size = 100 484 group_size = 100
(...skipping 26 matching lines...) Expand all
448 @app.route('/form_sent', methods=['POST']) 511 @app.route('/form_sent', methods=['POST'])
449 def StartFromForm(): 512 def StartFromForm():
450 """HTML form endpoint.""" 513 """HTML form endpoint."""
451 data_stream = flask.request.files.get('json_task') 514 data_stream = flask.request.files.get('json_task')
452 if not data_stream: 515 if not data_stream:
453 return Render('Failed, no content.') 516 return Render('Failed, no content.')
454 http_body_str = data_stream.read() 517 http_body_str = data_stream.read()
455 return StartFromJsonString(http_body_str) 518 return StartFromJsonString(http_body_str)
456 519
457 520
521 @app.route('/list_jobs')
522 def ShowTaskList():
523 """Shows a list of all active jobs."""
524 tags = FrontendJob.ListJobs()
525
526 if not tags:
527 Render('No active jobs.')
528
529 html = flask.Markup('Active tasks:<ul>')
530 for tag in tags:
531 html += flask.Markup(
532 '<li><a href="%s">%s</a></li>') % (FrontendJob.GetJobURL(tag), tag)
533 html += flask.Markup('</ul>')
534 return Render(html)
535
536
537 @app.route(FrontendJob.SHOW_JOB_URL)
538 def ShowTask():
539 """Shows basic information abour a job."""
540 tag = flask.request.args.get('tag')
541 if not tag:
542 return Render('Invalid task tag.')
543
544 frontend_job = FrontendJob.GetFromTag(tag)
545
546 if not frontend_job:
547 return Render('Task not found.')
548
549 message = flask.Markup('Task details:' + frontend_job.RenderAsHtml())
550
551 log = None
552 if frontend_job.log:
553 log = frontend_job.log.split('\n')
554
555 return flask.render_template('log.html', body=message, log=log)
556
557
458 @app.errorhandler(404) 558 @app.errorhandler(404)
459 def PageNotFound(e): # pylint: disable=unused-argument 559 def PageNotFound(e): # pylint: disable=unused-argument
460 """Return a custom 404 error.""" 560 """Return a custom 404 error."""
461 return 'Sorry, Nothing at this URL.', 404 561 return 'Sorry, Nothing at this URL.', 404
462 562
463 563
464 @app.errorhandler(500) 564 @app.errorhandler(500)
465 def ApplicationError(e): 565 def ApplicationError(e):
466 """Return a custom 500 error.""" 566 """Return a custom 500 error."""
467 return 'Sorry, unexpected error: {}'.format(e), 499 567 return 'Sorry, unexpected error: {}'.format(e), 499
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698