| Index: tools/android/loading/cloud/frontend/clovis_frontend.py
|
| diff --git a/tools/android/loading/cloud/frontend/clovis_frontend.py b/tools/android/loading/cloud/frontend/clovis_frontend.py
|
| index 33f14d93bc0cb4379c46d585e43c73cd0ec49d4a..2fb40490623c5a87dc25780ee14f87ab3c3cdd28 100644
|
| --- a/tools/android/loading/cloud/frontend/clovis_frontend.py
|
| +++ b/tools/android/loading/cloud/frontend/clovis_frontend.py
|
| @@ -7,6 +7,7 @@ import os
|
| import sys
|
| import time
|
|
|
| +import cloudstorage
|
| import flask
|
| from google.appengine.api import (app_identity, taskqueue)
|
| from google.appengine.ext import deferred
|
| @@ -14,6 +15,7 @@ from oauth2client.client import GoogleCredentials
|
|
|
| from common.clovis_task import ClovisTask
|
| import common.google_instance_helper
|
| +from common.loading_trace_database import LoadingTraceDatabase
|
| import email_helper
|
| from memory_logs import MemoryLogs
|
|
|
| @@ -149,6 +151,88 @@ def DeleteInstanceTemplate(tag, try_count=0):
|
| clovis_logger.info('Cleanup complete for tag: ' + tag)
|
|
|
|
|
| +def SplitClovisTask(task):
|
| + """Splits a ClovisTask in smaller ClovisTasks.
|
| +
|
| + Args:
|
| + task: (ClovisTask) The task to split.
|
| +
|
| + Returns:
|
| + list: The list of ClovisTasks.
|
| + """
|
| + # For report task, need to find the traces first.
|
| + if task.Action() == 'report':
|
| + bucket = task.BackendParams().get('storage_bucket')
|
| + if not bucket:
|
| + clovis_logger.error('Missing storage bucket for report task.')
|
| + return None
|
| + traces = GetTracePaths(bucket)
|
| + if not traces:
|
| + clovis_logger.error('No traces found in bucket: ' + bucket)
|
| + return None
|
| + task.ActionParams()['traces'] = traces
|
| +
|
| + # Compute the split key.
|
| + split_params_for_action = {'trace': ('urls', 1), 'report': ('traces', 5)}
|
| + (split_key, slice_size) = split_params_for_action.get(task.Action(),
|
| + (None, 0))
|
| + if not split_key:
|
| + clovis_logger.error('Cannot split task with action: ' + task.Action())
|
| + return None
|
| +
|
| + # Split the task using the split key.
|
| + clovis_logger.debug('Splitting task by: ' + split_key)
|
| + action_params = task.ActionParams()
|
| + values = action_params[split_key]
|
| + sub_tasks = []
|
| + for i in range(0, len(values), slice_size):
|
| + sub_task_params = action_params.copy()
|
| + sub_task_params[split_key] = [v for v in values[i:i+slice_size]]
|
| + sub_tasks.append(ClovisTask(task.Action(), sub_task_params,
|
| + task.BackendParams()))
|
| + return sub_tasks
|
| +
|
| +
|
| +def GetTracePaths(bucket):
|
| + """Returns a list of trace files in a bucket.
|
| +
|
| + Finds and loads the trace databases, and returns their content as a list of
|
| + paths.
|
| +
|
| + This function assumes a specific structure for the files in the bucket. These
|
| + assumptions must match the behavior of the backend:
|
| + - The trace databases are located under a 'trace' directory under |bucket|.
|
| + - The trace databases files are the only objects with the 'trace_database'
|
| + prefix in their name.
|
| +
|
| + Returns:
|
| + list: The list of paths to traces, as strings.
|
| + """
|
| + traces = []
|
| + prefix = os.path.join('/', bucket, 'trace', 'trace_database')
|
| + file_stats = cloudstorage.listbucket(prefix)
|
| +
|
| + for file_stat in file_stats:
|
| + database_file = file_stat.filename
|
| + clovis_logger.info('Loading trace database: ' + database_file)
|
| +
|
| + with cloudstorage.open(database_file) as remote_file:
|
| + json_string = remote_file.read()
|
| + if not json_string:
|
| + clovis_logger.warning('Failed to download: ' + database_file)
|
| + continue
|
| +
|
| + database = LoadingTraceDatabase.FromJsonString(json_string)
|
| + if not database:
|
| + clovis_logger.warning('Failed to parse: ' + database_file)
|
| + continue
|
| +
|
| + for path in database.ToJsonDict():
|
| + traces.append(path)
|
| +
|
| + return traces
|
| +
|
| +
|
| def StartFromJsonString(http_body_str):
|
| """Main function handling a JSON task posted by the user."""
|
| # Set up logging.
|
| @@ -162,23 +246,35 @@ def StartFromJsonString(http_body_str):
|
| return Render('Invalid JSON task:\n' + http_body_str, memory_logs)
|
|
|
| task_tag = task.BackendParams()['tag']
|
| + clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(),
|
| + task_tag))
|
|
|
| # Create the instance template if required.
|
| if not CreateInstanceTemplate(task):
|
| return Render('Template creation failed.', memory_logs)
|
|
|
| - # Split the task in smaller tasks.
|
| - sub_tasks = []
|
| + # Build the URL where the result will live.
|
| task_url = None
|
| if task.Action() == 'trace':
|
| bucket = task.BackendParams().get('storage_bucket')
|
| if bucket:
|
| task_url = 'https://console.cloud.google.com/storage/' + bucket
|
| - sub_tasks = SplitTraceTask(task)
|
| + elif task.Action() == 'report':
|
| + # This must match the table path defined in ReportTaskHandler.
|
| + table_id = ''.join(c for c in task_tag if c.isalnum() or c == '_')
|
| + table_name = 'clovis_dataset.report_' + table_id
|
| + task_url = 'https://bigquery.cloud.google.com/table/%s:%s' %(project_name,
|
| + table_name)
|
| else:
|
| error_string = 'Unsupported action: %s.' % task.Action()
|
| clovis_logger.error(error_string)
|
| return Render(error_string, memory_logs)
|
| + clovis_logger.info('Task result URL: ' + task_url)
|
| +
|
| + # Split the task in smaller tasks.
|
| + sub_tasks = SplitClovisTask(task)
|
| + if not sub_tasks:
|
| + return Render('Task split failed.', memory_logs)
|
|
|
| if not EnqueueTasks(sub_tasks, task_tag):
|
| return Render('Task creation failed.', memory_logs)
|
| @@ -201,25 +297,6 @@ def StartFromJsonString(http_body_str):
|
| memory_logs)
|
|
|
|
|
| -def SplitTraceTask(task):
|
| - """Splits a tracing task with potentially many URLs into several tracing tasks
|
| - with few URLs.
|
| - """
|
| - clovis_logger.debug('Splitting trace task.')
|
| - action_params = task.ActionParams()
|
| - urls = action_params['urls']
|
| -
|
| - # Split the task in smaller tasks with fewer URLs each.
|
| - urls_per_task = 1
|
| - sub_tasks = []
|
| - for i in range(0, len(urls), urls_per_task):
|
| - sub_task_params = action_params.copy()
|
| - sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]]
|
| - sub_tasks.append(ClovisTask(task.Action(), sub_task_params,
|
| - task.BackendParams()))
|
| - return sub_tasks
|
| -
|
| -
|
| def EnqueueTasks(tasks, task_tag):
|
| """Enqueues a list of tasks in the Google Cloud task queue, for consumption by
|
| Google Compute Engine.
|
|
|