| Index: tools/android/loading/cloud/frontend/clovis_frontend.py
|
| diff --git a/tools/android/loading/cloud/frontend/clovis_frontend.py b/tools/android/loading/cloud/frontend/clovis_frontend.py
|
| index 25433720aae01d0b1612460a99def74f23345e3d..c0b142c357260c21865a4a51cc1b769cde9709a0 100644
|
| --- a/tools/android/loading/cloud/frontend/clovis_frontend.py
|
| +++ b/tools/android/loading/cloud/frontend/clovis_frontend.py
|
| @@ -3,10 +3,12 @@
|
| # found in the LICENSE file.
|
|
|
| import logging
|
| +import datetime
|
| import math
|
| import os
|
| import sys
|
| import time
|
| +import traceback
|
|
|
| import cloudstorage
|
| import flask
|
| @@ -20,6 +22,7 @@ import common.google_bigquery_helper
|
| import common.google_instance_helper
|
| from common.loading_trace_database import LoadingTraceDatabase
|
| import email_helper
|
| +from frontend_job import FrontendJob
|
| from memory_logs import MemoryLogs
|
|
|
|
|
| @@ -99,6 +102,7 @@ def Finalize(tag, email_address, status, task_url):
|
| logger=clovis_logger)
|
| clovis_logger.info('Scheduling instance group destruction for tag: ' + tag)
|
| deferred.defer(DeleteInstanceGroup, tag)
|
| + FrontendJob.DeleteForTag(tag)
|
|
|
|
|
| def GetEstimatedTaskDurationInSeconds(task):
|
| @@ -324,9 +328,61 @@ def StartFromJsonString(http_body_str):
|
| task_tag = task.BackendParams()['tag']
|
| clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(),
|
| task_tag))
|
| + user_email = email_helper.GetUserEmail()
|
| +
|
| + # Write the job to the datastore.
|
| + frontend_job = FrontendJob.CreateForTag(task_tag)
|
| + frontend_job.email = user_email
|
| + frontend_job.status = 'not_started'
|
| + frontend_job.clovis_task = task.ToJsonString()
|
| + frontend_job.put()
|
| +
|
| + # Process the job on the queue, to avoid timeout issues.
|
| + deferred.defer(SpawnTasksOnBackgroundQueue, task_tag)
|
| +
|
| + return Render(
|
| + flask.Markup(
|
| + '<a href="%s">See progress.</a>' % FrontendJob.GetJobURL(task_tag)),
|
| + memory_logs)
|
| +
|
| +
|
| +def SpawnTasksOnBackgroundQueue(task_tag):
|
| + """Spawns Clovis tasks associated with task_tag from the backgound queue.
|
| +
|
| + This function is mostly a wrapper around SpawnTasks() that catches exceptions.
|
| + It is assumed that a FrontendJob for task_tag exists.
|
| + """
|
| + memory_logs = MemoryLogs(clovis_logger)
|
| + memory_logs.Start()
|
| + clovis_logger.info('Spawning tasks on background queue.')
|
| +
|
| + try:
|
| + frontend_job = FrontendJob.GetFromTag(task_tag)
|
| + frontend_job.status = 'will_start'
|
| + SpawnTasks(frontend_job)
|
| + except Exception as e:
|
| + clovis_logger.error('Exception spawning tasks: ' + str(e))
|
| + clovis_logger.error(traceback.print_exc())
|
| +
|
| + # Update the task.
|
| + if frontend_job:
|
| + frontend_job.log = memory_logs.Flush()
|
| + frontend_job.put()
|
| +
|
| +
|
| +def SpawnTasks(frontend_job):
|
| + """ Spawns Clovis tasks associated with the frontend job."""
|
| + user_email = frontend_job.email
|
| + task = ClovisTask.FromJsonString(frontend_job.clovis_task)
|
| + task_tag = task.BackendParams()['tag']
|
| +
|
| + # Delete the clovis task from the FrontendJob because it can make the object
|
| + # very heavy and it is no longer needed.
|
| + frontend_job.clovis_task = None
|
| +
|
| # Compute the task directory.
|
| + frontend_job.status = 'building_task_dir'
|
| task_dir_components = []
|
| - user_email = email_helper.GetUserEmail()
|
| user_name = None
|
| if user_email:
|
| user_name = user_email[:user_email.find('@')]
|
| @@ -339,26 +395,35 @@ def StartFromJsonString(http_body_str):
|
| task_dir = os.path.join(task.Action(), '_'.join(task_dir_components))
|
|
|
| # Build the URL where the result will live.
|
| + frontend_job.status = 'building_task_url'
|
| task_url = GetTaskURL(task, task_dir)
|
| if task_url:
|
| clovis_logger.info('Task result URL: ' + task_url)
|
| + frontend_job.task_url = task_url
|
| else:
|
| - return Render('Could not build the task URL.', memory_logs)
|
| + frontend_job.status = 'task_url_error'
|
| + return
|
|
|
| # Split the task in smaller tasks.
|
| + frontend_job.status = 'splitting_task'
|
| + frontend_job.put()
|
| sub_tasks = SplitClovisTask(task)
|
| if not sub_tasks:
|
| - return Render('Task split failed.', memory_logs)
|
| + frontend_job.status = 'task_split_error'
|
| + return
|
|
|
| # Compute estimates for the work duration, in order to compute the instance
|
| # count and the timeout.
|
| + frontend_job.status = 'estimating_duration'
|
| sequential_duration_s = \
|
| GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks)
|
| if sequential_duration_s <= 0:
|
| - return Render('Time estimation failed.', memory_logs)
|
| + frontend_job.status = 'time_estimation_error'
|
| + return
|
|
|
| # Compute the number of required instances if not specified.
|
| if task.BackendParams().get('instance_count') is None:
|
| + frontend_job.status = 'estimating_instance_count'
|
| target_parallel_duration_s = 1800.0 # 30 minutes.
|
| task.BackendParams()['instance_count'] = math.ceil(
|
| sequential_duration_s / target_parallel_duration_s)
|
| @@ -366,11 +431,14 @@ def StartFromJsonString(http_body_str):
|
| # Check the instance quotas.
|
| clovis_logger.info(
|
| 'Requesting %i instances.' % task.BackendParams()['instance_count'])
|
| + frontend_job.status = 'checking_instance_quotas'
|
| max_instances = instance_helper.GetAvailableInstanceCount()
|
| if max_instances == -1:
|
| - return Render('Failed to count the available instances.', memory_logs)
|
| + frontend_job.status = 'instance_count_error'
|
| + return
|
| elif task.BackendParams()['instance_count'] == 0:
|
| - return Render('Cannot create instances, quota exceeded.', memory_logs)
|
| + frontend_job.status = 'no_instance_available_error'
|
| + return
|
| elif max_instances < task.BackendParams()['instance_count']:
|
| clovis_logger.warning(
|
| 'Instance count limited by quota: %i available / %i requested.' % (
|
| @@ -378,16 +446,21 @@ def StartFromJsonString(http_body_str):
|
| task.BackendParams()['instance_count'] = max_instances
|
|
|
| # Compute the timeout if there is none specified.
|
| - expected_duration_h = sequential_duration_s / (
|
| - task.BackendParams()['instance_count'] * 3600.0)
|
| + expected_duration_s = sequential_duration_s / (
|
| + task.BackendParams()['instance_count'])
|
| + frontend_job.eta = datetime.datetime.now() + datetime.timedelta(
|
| + seconds=expected_duration_s)
|
| if not task.BackendParams().get('timeout_hours'):
|
| # Timeout is at least 1 hour.
|
| - task.BackendParams()['timeout_hours'] = max(1, 5 * expected_duration_h)
|
| + task.BackendParams()['timeout_hours'] = max(
|
| + 1, 5 * expected_duration_s / 3600.0)
|
| clovis_logger.info(
|
| 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours'])
|
|
|
| + frontend_job.status = 'queueing_tasks'
|
| if not EnqueueTasks(sub_tasks, task_tag):
|
| - return Render('Task creation failed.', memory_logs)
|
| + frontend_job.status = 'task_creation_error'
|
| + return
|
|
|
| # Start polling the progress.
|
| clovis_logger.info('Creating worker polling task.')
|
| @@ -397,18 +470,16 @@ def StartFromJsonString(http_body_str):
|
| task_url, _countdown=(60 * first_poll_delay_minutes))
|
|
|
| # Start the instances if required.
|
| + frontend_job.status = 'creating_instances'
|
| + frontend_job.put()
|
| if not CreateInstanceTemplate(task, task_dir):
|
| - return Render('Instance template creation failed.', memory_logs)
|
| + frontend_job.status = 'instance_template_error'
|
| + return
|
| if not CreateInstances(task):
|
| - return Render('Instance creation failed.', memory_logs)
|
| -
|
| - return Render(flask.Markup(
|
| - 'Success!<br>Your task %s has started.<br>'
|
| - 'Expected duration: %.1f hours.<br>'
|
| - 'You will be notified at %s when completed.') % (
|
| - task_tag, expected_duration_h, user_email),
|
| - memory_logs)
|
| + frontend_job.status = 'instance_creation_error'
|
| + return
|
|
|
| + frontend_job.status = 'started'
|
|
|
| def EnqueueTasks(tasks, task_tag):
|
| """Enqueues a list of tasks in the Google Cloud task queue, for consumption by
|
| @@ -455,6 +526,43 @@ def StartFromForm():
|
| return StartFromJsonString(http_body_str)
|
|
|
|
|
| +@app.route('/list_jobs')
|
| +def ShowTaskList():
|
| + """Shows a list of all active jobs."""
|
| + tags = FrontendJob.ListJobs()
|
| +
|
| + if not tags:
|
| + Render('No active jobs.')
|
| +
|
| + html = flask.Markup('Active tasks:<ul>')
|
| + for tag in tags:
|
| + html += flask.Markup(
|
| + '<li><a href="%s">%s</a></li>') % (FrontendJob.GetJobURL(tag), tag)
|
| + html += flask.Markup('</ul>')
|
| + return Render(html)
|
| +
|
| +
|
| +@app.route(FrontendJob.SHOW_JOB_URL)
|
| +def ShowTask():
|
| + """Shows basic information abour a job."""
|
| + tag = flask.request.args.get('tag')
|
| + if not tag:
|
| + return Render('Invalid task tag.')
|
| +
|
| + frontend_job = FrontendJob.GetFromTag(tag)
|
| +
|
| + if not frontend_job:
|
| + return Render('Task not found.')
|
| +
|
| + message = flask.Markup('Task details:' + frontend_job.RenderAsHtml())
|
| +
|
| + log = None
|
| + if frontend_job.log:
|
| + log = frontend_job.log.split('\n')
|
| +
|
| + return flask.render_template('log.html', body=message, log=log)
|
| +
|
| +
|
| @app.errorhandler(404)
|
| def PageNotFound(e): # pylint: disable=unused-argument
|
| """Return a custom 404 error."""
|
|
|