Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(823)

Unified Diff: tools/android/loading/cloud/frontend/clovis_frontend.py

Issue 2041193008: tools/android/loading Move task creation to background and add dashboard (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@frontendNumberFix
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/android/loading/cloud/frontend/clovis_frontend.py
diff --git a/tools/android/loading/cloud/frontend/clovis_frontend.py b/tools/android/loading/cloud/frontend/clovis_frontend.py
index 4a633856ebbb9ab61e246e2ac2fa21c2270a6626..9750deede9f6d59830b7959385c42c75268cd6a8 100644
--- a/tools/android/loading/cloud/frontend/clovis_frontend.py
+++ b/tools/android/loading/cloud/frontend/clovis_frontend.py
@@ -3,6 +3,7 @@
# found in the LICENSE file.
import logging
+import datetime
import math
import os
import sys
@@ -20,6 +21,7 @@ import common.google_bigquery_helper
import common.google_instance_helper
from common.loading_trace_database import LoadingTraceDatabase
import email_helper
+from frontend_job import FrontendJob
from memory_logs import MemoryLogs
@@ -99,6 +101,7 @@ def Finalize(tag, email_address, status, task_url):
logger=clovis_logger)
clovis_logger.info('Scheduling instance group destruction for tag: ' + tag)
deferred.defer(DeleteInstanceGroup, tag)
+ FrontendJob.DeleteForTag(tag)
def GetEstimatedTaskDurationInSeconds(task):
@@ -324,9 +327,56 @@ def StartFromJsonString(http_body_str):
task_tag = task.BackendParams()['tag']
clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(),
task_tag))
+ user_email = email_helper.GetUserEmail()
+
+ # Write the job to the datastore.
+ frontend_job = FrontendJob.CreateForTag(task_tag)
+ frontend_job.email = user_email
+ frontend_job.status = 'not_started'
+ frontend_job.clovis_task = task.ToJsonString()
+ frontend_job.put()
+
+ # Process the job on the queue, to avoid timeout issues.
+ deferred.defer(SpawnTasksOnBackgroundQueue, task_tag)
+
+ return Render(
+ flask.Markup(
+ '<a href="%s">See progress.</a>' % FrontendJob.GetJobURL(task_tag)),
+ memory_logs)
+
+
+def SpawnTasksOnBackgroundQueue(task_tag):
+ """Spawns Clovis tasks associated with task_tag from the backgound queue.
+
+ This function is mostly a wrapper around SpawnTasks() that catches exceptions.
+ It is assumed that a FrontendJob for task_tag exists.
+ """
+ memory_logs = MemoryLogs(clovis_logger)
+ memory_logs.Start()
+ clovis_logger.info('Spawning tasks on background queue.')
+
+ try:
+ frontend_job = FrontendJob.GetFromTag(task_tag)
+ frontend_job.status = 'will_start'
+ SpawnTasks(frontend_job)
+ except Exception:
+ pass # Catch all exceptions so that AppEngine does not retry indefinitely.
mattcary 2016/06/09 15:40:06 Log the exception so if tasks disappear we have a
droger 2016/06/10 12:14:04 Done.
+
+ # Update the task.
+ if frontend_job:
+ frontend_job.log = memory_logs.Flush()
+ frontend_job.put()
+
+
+def SpawnTasks(frontend_job):
+ """ Spawns Clovis tasks associated with the frontend job."""
+ user_email = frontend_job.email
+ task = ClovisTask.FromJsonString(frontend_job.clovis_task)
+ task_tag = task.BackendParams()['tag']
+
# Compute the task directory.
+ frontend_job.status = 'building_task_dir'
task_dir_components = []
- user_email = email_helper.GetUserEmail()
user_name = None
if user_email:
user_name = user_email[:user_email.find('@')]
@@ -339,26 +389,34 @@ def StartFromJsonString(http_body_str):
task_dir = os.path.join(task.Action(), '_'.join(task_dir_components))
# Build the URL where the result will live.
+ frontend_job.status = 'building_task_url'
mattcary 2016/06/09 15:40:06 Do you need to put these status update values, or
droger 2016/06/10 12:14:04 You need to put. This is only done at the end (li
mattcary 2016/06/10 12:17:27 Shouldn't we put after each return, so we have the
droger 2016/06/10 12:58:27 put() will be executed after each return, by the c
mattcary 2016/06/10 13:24:20 Ah! I was just looking in SpawnTasks. Thanks.
task_url = GetTaskURL(task, task_dir)
if task_url:
clovis_logger.info('Task result URL: ' + task_url)
+ frontend_job.task_url = task_url
else:
- return Render('Could not build the task URL.', memory_logs)
+ frontend_job.status = 'task_url_error'
+ return
# Split the task in smaller tasks.
+ frontend_job.status = 'splitting_task'
sub_tasks = SplitClovisTask(task)
if not sub_tasks:
- return Render('Task split failed.', memory_logs)
+ frontend_job.status = 'task_split_error'
+ return
# Compute estimates for the work duration, in order to compute the instance
# count and the timeout.
+ frontend_job.status = 'estimating_duration'
sequential_duration_s = \
GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks)
if sequential_duration_s <= 0:
- return Render('Time estimation failed.', memory_logs)
+ frontend_job.status = 'time_estimation_error'
+ return
# Compute the number of required instances if not specified.
- if not task.BackendParams().get('instance_count'):
+ if task.BackendParams().get('instance_count') is None:
+ frontend_job.status = 'estimating_instance_count'
target_parallel_duration_s = 1800.0 # 30 minutes.
task.BackendParams()['instance_count'] = math.ceil(
sequential_duration_s / target_parallel_duration_s)
@@ -366,11 +424,14 @@ def StartFromJsonString(http_body_str):
# Check the instance quotas.
clovis_logger.info(
'Requesting %i instances.' % task.BackendParams()['instance_count'])
+ frontend_job.status = 'checking_instance_quotas'
max_instances = instance_helper.GetAvailableInstanceCount()
if max_instances == -1:
- return Render('Failed to count the available instances.', memory_logs)
+ frontend_job.status = 'instance_count_error'
+ return
elif task.BackendParams()['instance_count'] == 0:
- return Render('Cannot create instances, quota exceeded.', memory_logs)
+ frontend_job.status = 'no_instance_available_error'
+ return
elif max_instances < task.BackendParams()['instance_count']:
clovis_logger.warning(
'Instance count limited by quota: %i available / %i requested.' % (
@@ -378,16 +439,21 @@ def StartFromJsonString(http_body_str):
task.BackendParams()['instance_count'] = max_instances
# Compute the timeout if there is none specified.
- expected_duration_h = sequential_duration_s / (
- task.BackendParams()['instance_count'] * 3600.0)
+ expected_duration_s = sequential_duration_s / (
+ task.BackendParams()['instance_count'])
+ frontend_job.eta = datetime.datetime.now() + datetime.timedelta(
+ seconds=expected_duration_s)
if not task.BackendParams().get('timeout_hours'):
# Timeout is at least 1 hour.
- task.BackendParams()['timeout_hours'] = max(1, 5 * expected_duration_h)
+ task.BackendParams()['timeout_hours'] = max(
+ 1, 5 * expected_duration_s / 3600.0)
clovis_logger.info(
'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours'])
+ frontend_job.status = 'queueing_tasks'
if not EnqueueTasks(sub_tasks, task_tag):
- return Render('Task creation failed.', memory_logs)
+ frontend_job.status = 'task_creation_error'
+ return
# Start polling the progress.
clovis_logger.info('Creating worker polling task.')
@@ -397,18 +463,15 @@ def StartFromJsonString(http_body_str):
task_url, _countdown=(60 * first_poll_delay_minutes))
# Start the instances if required.
+ frontend_job.status = 'creating_instances'
if not CreateInstanceTemplate(task, task_dir):
- return Render('Instance template creation failed.', memory_logs)
+ frontend_job.status = 'instance_template_error'
+ return
if not CreateInstances(task):
- return Render('Instance creation failed.', memory_logs)
-
- return Render(flask.Markup(
- 'Success!<br>Your task %s has started.<br>'
- 'Expected duration: %.1f hours.<br>'
- 'You will be notified at %s when completed.') % (
- task_tag, expected_duration_h, user_email),
- memory_logs)
+ frontend_job.status = 'instance_creation_error'
+ return
+ frontend_job.status = 'started'
def EnqueueTasks(tasks, task_tag):
"""Enqueues a list of tasks in the Google Cloud task queue, for consumption by
@@ -455,6 +518,43 @@ def StartFromForm():
return StartFromJsonString(http_body_str)
+@app.route('/list_jobs')
+def ShowTaskList():
+ """Shows a list of all active jobs."""
+ tags = FrontendJob.ListJobs()
+
+ if not tags:
+ Render('No active jobs.')
+
+ html = flask.Markup('Active tasks:<ul>')
+ for tag in tags:
+ html += flask.Markup(
+ '<li><a href="%s">%s</a></li>') % (FrontendJob.GetJobURL(tag), tag)
+ html += flask.Markup('</ul>')
+ return Render(html)
+
+
+@app.route(FrontendJob.SHOW_JOB_URL)
+def ShowTask():
+ """Shows basic information abour a job."""
+ tag = flask.request.args.get('tag')
+ if not tag:
+ return Render('Invalid task tag.')
+
+ frontend_job = FrontendJob.GetFromTag(tag)
+
+ if not frontend_job:
+ return Render('Task not found.')
+
+ message = flask.Markup('Task details:' + frontend_job.RenderAsHtml())
+
+ log = None
+ if frontend_job.log:
+ log = frontend_job.log.split('\n')
+
+ return flask.render_template('log.html', body=message, log=log)
+
+
@app.errorhandler(404)
def PageNotFound(e): # pylint: disable=unused-argument
"""Return a custom 404 error."""

Powered by Google App Engine
This is Rietveld 408576698