Chromium Code Reviews| Index: tools/android/loading/cloud/frontend/clovis_frontend.py |
| diff --git a/tools/android/loading/cloud/frontend/clovis_frontend.py b/tools/android/loading/cloud/frontend/clovis_frontend.py |
| index 4a633856ebbb9ab61e246e2ac2fa21c2270a6626..9750deede9f6d59830b7959385c42c75268cd6a8 100644 |
| --- a/tools/android/loading/cloud/frontend/clovis_frontend.py |
| +++ b/tools/android/loading/cloud/frontend/clovis_frontend.py |
| @@ -3,6 +3,7 @@ |
| # found in the LICENSE file. |
| import logging |
| +import datetime |
| import math |
| import os |
| import sys |
| @@ -20,6 +21,7 @@ import common.google_bigquery_helper |
| import common.google_instance_helper |
| from common.loading_trace_database import LoadingTraceDatabase |
| import email_helper |
| +from frontend_job import FrontendJob |
| from memory_logs import MemoryLogs |
| @@ -99,6 +101,7 @@ def Finalize(tag, email_address, status, task_url): |
| logger=clovis_logger) |
| clovis_logger.info('Scheduling instance group destruction for tag: ' + tag) |
| deferred.defer(DeleteInstanceGroup, tag) |
| + FrontendJob.DeleteForTag(tag) |
| def GetEstimatedTaskDurationInSeconds(task): |
| @@ -324,9 +327,56 @@ def StartFromJsonString(http_body_str): |
| task_tag = task.BackendParams()['tag'] |
| clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), |
| task_tag)) |
| + user_email = email_helper.GetUserEmail() |
| + |
| + # Write the job to the datastore. |
| + frontend_job = FrontendJob.CreateForTag(task_tag) |
| + frontend_job.email = user_email |
| + frontend_job.status = 'not_started' |
| + frontend_job.clovis_task = task.ToJsonString() |
| + frontend_job.put() |
| + |
| + # Process the job on the queue, to avoid timeout issues. |
| + deferred.defer(SpawnTasksOnBackgroundQueue, task_tag) |
| + |
| + return Render( |
| + flask.Markup( |
| + '<a href="%s">See progress.</a>' % FrontendJob.GetJobURL(task_tag)), |
| + memory_logs) |
| + |
| + |
| +def SpawnTasksOnBackgroundQueue(task_tag): |
| + """Spawns Clovis tasks associated with task_tag from the backgound queue. |
| + |
| + This function is mostly a wrapper around SpawnTasks() that catches exceptions. |
| + It is assumed that a FrontendJob for task_tag exists. |
| + """ |
| + memory_logs = MemoryLogs(clovis_logger) |
| + memory_logs.Start() |
| + clovis_logger.info('Spawning tasks on background queue.') |
| + |
| + try: |
| + frontend_job = FrontendJob.GetFromTag(task_tag) |
| + frontend_job.status = 'will_start' |
| + SpawnTasks(frontend_job) |
| + except Exception: |
| + pass # Catch all exceptions so that AppEngine does not retry indefinitely. |
|
mattcary
2016/06/09 15:40:06
Log the exception so if tasks disappear we have a
droger
2016/06/10 12:14:04
Done.
|
| + |
| + # Update the task. |
| + if frontend_job: |
| + frontend_job.log = memory_logs.Flush() |
| + frontend_job.put() |
| + |
| + |
| +def SpawnTasks(frontend_job): |
| + """ Spawns Clovis tasks associated with the frontend job.""" |
| + user_email = frontend_job.email |
| + task = ClovisTask.FromJsonString(frontend_job.clovis_task) |
| + task_tag = task.BackendParams()['tag'] |
| + |
| # Compute the task directory. |
| + frontend_job.status = 'building_task_dir' |
| task_dir_components = [] |
| - user_email = email_helper.GetUserEmail() |
| user_name = None |
| if user_email: |
| user_name = user_email[:user_email.find('@')] |
| @@ -339,26 +389,34 @@ def StartFromJsonString(http_body_str): |
| task_dir = os.path.join(task.Action(), '_'.join(task_dir_components)) |
| # Build the URL where the result will live. |
| + frontend_job.status = 'building_task_url' |
|
mattcary
2016/06/09 15:40:06
Do you need to put these status update values, or
droger
2016/06/10 12:14:04
You need to put.
This is only done at the end (li
mattcary
2016/06/10 12:17:27
Shouldn't we put after each return, so we have the
droger
2016/06/10 12:58:27
put() will be executed after each return, by the c
mattcary
2016/06/10 13:24:20
Ah! I was just looking in SpawnTasks. Thanks.
|
| task_url = GetTaskURL(task, task_dir) |
| if task_url: |
| clovis_logger.info('Task result URL: ' + task_url) |
| + frontend_job.task_url = task_url |
| else: |
| - return Render('Could not build the task URL.', memory_logs) |
| + frontend_job.status = 'task_url_error' |
| + return |
| # Split the task in smaller tasks. |
| + frontend_job.status = 'splitting_task' |
| sub_tasks = SplitClovisTask(task) |
| if not sub_tasks: |
| - return Render('Task split failed.', memory_logs) |
| + frontend_job.status = 'task_split_error' |
| + return |
| # Compute estimates for the work duration, in order to compute the instance |
| # count and the timeout. |
| + frontend_job.status = 'estimating_duration' |
| sequential_duration_s = \ |
| GetEstimatedTaskDurationInSeconds(sub_tasks[0]) * len(sub_tasks) |
| if sequential_duration_s <= 0: |
| - return Render('Time estimation failed.', memory_logs) |
| + frontend_job.status = 'time_estimation_error' |
| + return |
| # Compute the number of required instances if not specified. |
| - if not task.BackendParams().get('instance_count'): |
| + if task.BackendParams().get('instance_count') is None: |
| + frontend_job.status = 'estimating_instance_count' |
| target_parallel_duration_s = 1800.0 # 30 minutes. |
| task.BackendParams()['instance_count'] = math.ceil( |
| sequential_duration_s / target_parallel_duration_s) |
| @@ -366,11 +424,14 @@ def StartFromJsonString(http_body_str): |
| # Check the instance quotas. |
| clovis_logger.info( |
| 'Requesting %i instances.' % task.BackendParams()['instance_count']) |
| + frontend_job.status = 'checking_instance_quotas' |
| max_instances = instance_helper.GetAvailableInstanceCount() |
| if max_instances == -1: |
| - return Render('Failed to count the available instances.', memory_logs) |
| + frontend_job.status = 'instance_count_error' |
| + return |
| elif task.BackendParams()['instance_count'] == 0: |
| - return Render('Cannot create instances, quota exceeded.', memory_logs) |
| + frontend_job.status = 'no_instance_available_error' |
| + return |
| elif max_instances < task.BackendParams()['instance_count']: |
| clovis_logger.warning( |
| 'Instance count limited by quota: %i available / %i requested.' % ( |
| @@ -378,16 +439,21 @@ def StartFromJsonString(http_body_str): |
| task.BackendParams()['instance_count'] = max_instances |
| # Compute the timeout if there is none specified. |
| - expected_duration_h = sequential_duration_s / ( |
| - task.BackendParams()['instance_count'] * 3600.0) |
| + expected_duration_s = sequential_duration_s / ( |
| + task.BackendParams()['instance_count']) |
| + frontend_job.eta = datetime.datetime.now() + datetime.timedelta( |
| + seconds=expected_duration_s) |
| if not task.BackendParams().get('timeout_hours'): |
| # Timeout is at least 1 hour. |
| - task.BackendParams()['timeout_hours'] = max(1, 5 * expected_duration_h) |
| + task.BackendParams()['timeout_hours'] = max( |
| + 1, 5 * expected_duration_s / 3600.0) |
| clovis_logger.info( |
| 'Timeout delay: %.1f hours. ' % task.BackendParams()['timeout_hours']) |
| + frontend_job.status = 'queueing_tasks' |
| if not EnqueueTasks(sub_tasks, task_tag): |
| - return Render('Task creation failed.', memory_logs) |
| + frontend_job.status = 'task_creation_error' |
| + return |
| # Start polling the progress. |
| clovis_logger.info('Creating worker polling task.') |
| @@ -397,18 +463,15 @@ def StartFromJsonString(http_body_str): |
| task_url, _countdown=(60 * first_poll_delay_minutes)) |
| # Start the instances if required. |
| + frontend_job.status = 'creating_instances' |
| if not CreateInstanceTemplate(task, task_dir): |
| - return Render('Instance template creation failed.', memory_logs) |
| + frontend_job.status = 'instance_template_error' |
| + return |
| if not CreateInstances(task): |
| - return Render('Instance creation failed.', memory_logs) |
| - |
| - return Render(flask.Markup( |
| - 'Success!<br>Your task %s has started.<br>' |
| - 'Expected duration: %.1f hours.<br>' |
| - 'You will be notified at %s when completed.') % ( |
| - task_tag, expected_duration_h, user_email), |
| - memory_logs) |
| + frontend_job.status = 'instance_creation_error' |
| + return |
| + frontend_job.status = 'started' |
| def EnqueueTasks(tasks, task_tag): |
| """Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
| @@ -455,6 +518,43 @@ def StartFromForm(): |
| return StartFromJsonString(http_body_str) |
| +@app.route('/list_jobs') |
| +def ShowTaskList(): |
| + """Shows a list of all active jobs.""" |
| + tags = FrontendJob.ListJobs() |
| + |
| + if not tags: |
| + Render('No active jobs.') |
| + |
| + html = flask.Markup('Active tasks:<ul>') |
| + for tag in tags: |
| + html += flask.Markup( |
| + '<li><a href="%s">%s</a></li>') % (FrontendJob.GetJobURL(tag), tag) |
| + html += flask.Markup('</ul>') |
| + return Render(html) |
| + |
| + |
| +@app.route(FrontendJob.SHOW_JOB_URL) |
| +def ShowTask(): |
| + """Shows basic information abour a job.""" |
| + tag = flask.request.args.get('tag') |
| + if not tag: |
| + return Render('Invalid task tag.') |
| + |
| + frontend_job = FrontendJob.GetFromTag(tag) |
| + |
| + if not frontend_job: |
| + return Render('Task not found.') |
| + |
| + message = flask.Markup('Task details:' + frontend_job.RenderAsHtml()) |
| + |
| + log = None |
| + if frontend_job.log: |
| + log = frontend_job.log.split('\n') |
| + |
| + return flask.render_template('log.html', body=message, log=log) |
| + |
| + |
| @app.errorhandler(404) |
| def PageNotFound(e): # pylint: disable=unused-argument |
| """Return a custom 404 error.""" |