Index: tools/android/loading/cloud/frontend/clovis_frontend.py |
diff --git a/tools/android/loading/cloud/frontend/clovis_frontend.py b/tools/android/loading/cloud/frontend/clovis_frontend.py |
index 4a633856ebbb9ab61e246e2ac2fa21c2270a6626..9750deede9f6d59830b7959385c42c75268cd6a8 100644 |
--- a/tools/android/loading/cloud/frontend/clovis_frontend.py |
+++ b/tools/android/loading/cloud/frontend/clovis_frontend.py |
@@ -3,6 +3,7 @@ |
# found in the LICENSE file. |
import logging |
+import datetime |
import math |
import os |
import sys |
@@ -20,6 +21,7 @@ import common.google_bigquery_helper |
import common.google_instance_helper |
from common.loading_trace_database import LoadingTraceDatabase |
import email_helper |
+from frontend_job import FrontendJob |
from memory_logs import MemoryLogs |
@@ -99,6 +101,7 @@ def Finalize(tag, email_address, status, task_url): |
logger=clovis_logger) |
clovis_logger.info('Scheduling instance group destruction for tag: ' + tag) |
deferred.defer(DeleteInstanceGroup, tag) |
+ FrontendJob.DeleteForTag(tag) |
def GetEstimatedTaskDurationInSeconds(task): |
@@ -324,9 +327,56 @@ def StartFromJsonString(http_body_str): |
task_tag = task.BackendParams()['tag'] |
clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), |
task_tag)) |
+ user_email = email_helper.GetUserEmail() |
+ |
+ # Write the job to the datastore. |
+ frontend_job = FrontendJob.CreateForTag(task_tag) |
+ frontend_job.email = user_email |
+ frontend_job.status = 'not_started' |
+ frontend_job.clovis_task = task.ToJsonString() |
+ frontend_job.put() |
+ |
+ # Process the job on the queue, to avoid timeout issues. |
+ deferred.defer(SpawnTasksOnBackgroundQueue, task_tag) |
+ |
+ return Render( |
+ flask.Markup( |
+ '<a href="%s">See progress.</a>' % FrontendJob.GetJobURL(task_tag)), |
+ memory_logs) |
+ |
+ |
+def SpawnTasksOnBackgroundQueue(task_tag): |
+ """Spawns Clovis tasks associated with task_tag from the backgound queue. |
+ |
+ This function is mostly a wrapper around SpawnTasks() that catches exceptions. |
+ It is assumed that a FrontendJob for task_tag exists. |
+ """ |
+ memory_logs = MemoryLogs(clovis_logger) |
+ memory_logs.Start() |
+ clovis_logger.info('Spawning tasks on background queue.') |
+ |
+ try: |
+ frontend_job = FrontendJob.GetFromTag(task_tag) |
+ frontend_job.status = 'will_start' |
+ SpawnTasks(frontend_job) |
+ except Exception: |
+ pass # Catch all exceptions so that AppEngine does not retry indefinitely. |
mattcary
2016/06/09 15:40:06
Log the exception so if tasks disappear we have a
droger
2016/06/10 12:14:04
Done.
|
+ |
+ # Update the task. |
+ if frontend_job: |
+ frontend_job.log = memory_logs.Flush() |
+ frontend_job.put() |
+ |
+ |
+def SpawnTasks(frontend_job): |
+ """ Spawns Clovis tasks associated with the frontend job.""" |
+ user_email = frontend_job.email |
+ task = ClovisTask.FromJsonString(frontend_job.clovis_task) |
+ task_tag = task.BackendParams()['tag'] |
+ |
# Compute the task directory. |
+ frontend_job.status = 'building_task_dir' |
task_dir_components = [] |
- user_email = email_helper.GetUserEmail() |
user_name = None |
if user_email: |
user_name = user_email[:user_email.find('@')] |
@@ -339,26 +389,34 @@ def StartFromJsonString(http_body_str): |
task_dir = os.path.join(task.Action(), '_'.join(task_dir_components)) |
# Build the URL where the result will live. |
+ frontend_job.status = 'building_task_url' |
mattcary
2016/06/09 15:40:06
Do you need to put these status update values, or
droger
2016/06/10 12:14:04
You need to put.
This is only done at the end (li
mattcary
2016/06/10 12:17:27
Shouldn't we put after each return, so we have the
droger
2016/06/10 12:58:27
put() will be executed after each return, by the c
mattcary
2016/06/10 13:24:20
Ah! I was just looking in SpawnTasks. Thanks.
|
task_url = GetTaskURL(task, task_dir) |
if task_url: |
clovis_logger.info('Task result URL: ' + task_url) |
+ frontend_job.task_url = task_url |
else: |
- return Render('Could not build the task URL.', memory_logs) |
+ frontend_job.status = 'task_url_error' |
+ return |
# Split the task in smaller tasks. |
+ frontend_job.status = 'splitting_task' |
sub_tasks = SplitClovisTask(task) |
if not sub_tasks: |
- return Render('Task split failed.', memory_logs) |
+ frontend_job.status = 'task_split_error' |
+ return |
# Compute estimates for the work duration, in order to compute the instance |
# count and the timeout. |
+ frontend_job.status = 'estimating_duration' |
sequential_duration_s = \ |
GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks) |
if sequential_duration_s <= 0: |
- return Render('Time estimation failed.', memory_logs) |
+ frontend_job.status = 'time_estimation_error' |
+ return |
# Compute the number of required instances if not specified. |
- if not task.BackendParams().get('instance_count'): |
+ if task.BackendParams().get('instance_count') is None: |
+ frontend_job.status = 'estimating_instance_count' |
target_parallel_duration_s = 1800.0 # 30 minutes. |
task.BackendParams()['instance_count'] = math.ceil( |
sequential_duration_s / target_parallel_duration_s) |
@@ -366,11 +424,14 @@ def StartFromJsonString(http_body_str): |
# Check the instance quotas. |
clovis_logger.info( |
'Requesting %i instances.' % task.BackendParams()['instance_count']) |
+ frontend_job.status = 'checking_instance_quotas' |
max_instances = instance_helper.GetAvailableInstanceCount() |
if max_instances == -1: |
- return Render('Failed to count the available instances.', memory_logs) |
+ frontend_job.status = 'instance_count_error' |
+ return |
elif task.BackendParams()['instance_count'] == 0: |
- return Render('Cannot create instances, quota exceeded.', memory_logs) |
+ frontend_job.status = 'no_instance_available_error' |
+ return |
elif max_instances < task.BackendParams()['instance_count']: |
clovis_logger.warning( |
'Instance count limited by quota: %i available / %i requested.' % ( |
@@ -378,16 +439,21 @@ def StartFromJsonString(http_body_str): |
task.BackendParams()['instance_count'] = max_instances |
# Compute the timeout if there is none specified. |
- expected_duration_h = sequential_duration_s / ( |
- task.BackendParams()['instance_count'] * 3600.0) |
+ expected_duration_s = sequential_duration_s / ( |
+ task.BackendParams()['instance_count']) |
+ frontend_job.eta = datetime.datetime.now() + datetime.timedelta( |
+ seconds=expected_duration_s) |
if not task.BackendParams().get('timeout_hours'): |
# Timeout is at least 1 hour. |
- task.BackendParams()['timeout_hours'] = max(1, 5 * expected_duration_h) |
+ task.BackendParams()['timeout_hours'] = max( |
+ 1, 5 * expected_duration_s / 3600.0) |
clovis_logger.info( |
'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours']) |
+ frontend_job.status = 'queueing_tasks' |
if not EnqueueTasks(sub_tasks, task_tag): |
- return Render('Task creation failed.', memory_logs) |
+ frontend_job.status = 'task_creation_error' |
+ return |
# Start polling the progress. |
clovis_logger.info('Creating worker polling task.') |
@@ -397,18 +463,15 @@ def StartFromJsonString(http_body_str): |
task_url, _countdown=(60 * first_poll_delay_minutes)) |
# Start the instances if required. |
+ frontend_job.status = 'creating_instances' |
if not CreateInstanceTemplate(task, task_dir): |
- return Render('Instance template creation failed.', memory_logs) |
+ frontend_job.status = 'instance_template_error' |
+ return |
if not CreateInstances(task): |
- return Render('Instance creation failed.', memory_logs) |
- |
- return Render(flask.Markup( |
- 'Success!<br>Your task %s has started.<br>' |
- 'Expected duration: %.1f hours.<br>' |
- 'You will be notified at %s when completed.') % ( |
- task_tag, expected_duration_h, user_email), |
- memory_logs) |
+ frontend_job.status = 'instance_creation_error' |
+ return |
+ frontend_job.status = 'started' |
def EnqueueTasks(tasks, task_tag): |
"""Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
@@ -455,6 +518,43 @@ def StartFromForm(): |
return StartFromJsonString(http_body_str) |
+@app.route('/list_jobs') |
+def ShowTaskList(): |
+ """Shows a list of all active jobs.""" |
+ tags = FrontendJob.ListJobs() |
+ |
+ if not tags: |
+ Render('No active jobs.') |
+ |
+ html = flask.Markup('Active tasks:<ul>') |
+ for tag in tags: |
+ html += flask.Markup( |
+ '<li><a href="%s">%s</a></li>') % (FrontendJob.GetJobURL(tag), tag) |
+ html += flask.Markup('</ul>') |
+ return Render(html) |
+ |
+ |
+@app.route(FrontendJob.SHOW_JOB_URL) |
+def ShowTask(): |
+ """Shows basic information abour a job.""" |
+ tag = flask.request.args.get('tag') |
+ if not tag: |
+ return Render('Invalid task tag.') |
+ |
+ frontend_job = FrontendJob.GetFromTag(tag) |
+ |
+ if not frontend_job: |
+ return Render('Task not found.') |
+ |
+ message = flask.Markup('Task details:' + frontend_job.RenderAsHtml()) |
+ |
+ log = None |
+ if frontend_job.log: |
+ log = frontend_job.log.split('\n') |
+ |
+ return flask.render_template('log.html', body=message, log=log) |
+ |
+ |
@app.errorhandler(404) |
def PageNotFound(e): # pylint: disable=unused-argument |
"""Return a custom 404 error.""" |