Index: tools/android/loading/cloud/frontend/clovis_frontend.py |
diff --git a/tools/android/loading/cloud/frontend/clovis_frontend.py b/tools/android/loading/cloud/frontend/clovis_frontend.py |
index 33f14d93bc0cb4379c46d585e43c73cd0ec49d4a..2fb40490623c5a87dc25780ee14f87ab3c3cdd28 100644 |
--- a/tools/android/loading/cloud/frontend/clovis_frontend.py |
+++ b/tools/android/loading/cloud/frontend/clovis_frontend.py |
@@ -7,6 +7,7 @@ import os |
import sys |
import time |
+import cloudstorage |
import flask |
from google.appengine.api import (app_identity, taskqueue) |
from google.appengine.ext import deferred |
@@ -14,6 +15,7 @@ from oauth2client.client import GoogleCredentials |
from common.clovis_task import ClovisTask |
import common.google_instance_helper |
+from common.loading_trace_database import LoadingTraceDatabase |
import email_helper |
from memory_logs import MemoryLogs |
@@ -149,6 +151,88 @@ def DeleteInstanceTemplate(tag, try_count=0): |
clovis_logger.info('Cleanup complete for tag: ' + tag) |
+def SplitClovisTask(task): |
+ """Splits a ClovisTask in smaller ClovisTasks. |
+ |
+ Args: |
+ task: (ClovisTask) The task to split. |
+ |
+ Returns: |
+ list: The list of ClovisTasks. |
+ """ |
+ # For report task, need to find the traces first. |
+ if task.Action() == 'report': |
+ bucket = task.BackendParams().get('storage_bucket') |
+ if not bucket: |
+ clovis_logger.error('Missing storage bucket for report task.') |
+ return None |
+ traces = GetTracePaths(bucket) |
+ if not traces: |
+ clovis_logger.error('No traces found in bucket: ' + bucket) |
+ return None |
+ task.ActionParams()['traces'] = traces |
+ |
+ # Compute the split key. |
+ split_params_for_action = {'trace': ('urls', 1), 'report': ('traces', 5)} |
+ (split_key, slice_size) = split_params_for_action.get(task.Action(), |
+ (None, 0)) |
+ if not split_key: |
+ clovis_logger.error('Cannot split task with action: ' + task.Action()) |
+ return None |
+ |
+ # Split the task using the split key. |
+ clovis_logger.debug('Splitting task by: ' + split_key) |
+ action_params = task.ActionParams() |
+ values = action_params[split_key] |
+ sub_tasks = [] |
+ for i in range(0, len(values), slice_size): |
+ sub_task_params = action_params.copy() |
+ sub_task_params[split_key] = [v for v in values[i:i+slice_size]] |
+ sub_tasks.append(ClovisTask(task.Action(), sub_task_params, |
+ task.BackendParams())) |
+ return sub_tasks |
+ |
+ |
+def GetTracePaths(bucket): |
+ """Returns a list of trace files in a bucket. |
+ |
+ Finds and loads the trace databases, and returns their content as a list of |
+ paths. |
+ |
+ This function assumes a specific structure for the files in the bucket. These |
+ assumptions must match the behavior of the backend: |
+ - The trace databases are located under a 'trace' directory under |bucket|. |
+ - The trace databases files are the only objects with the 'trace_database' |
+ prefix in their name. |
+ |
+ Returns: |
+ list: The list of paths to traces, as strings. |
+ """ |
+ traces = [] |
+ prefix = os.path.join('/', bucket, 'trace', 'trace_database') |
+ file_stats = cloudstorage.listbucket(prefix) |
+ |
+ for file_stat in file_stats: |
+ database_file = file_stat.filename |
+ clovis_logger.info('Loading trace database: ' + database_file) |
+ |
+ with cloudstorage.open(database_file) as remote_file: |
+ json_string = remote_file.read() |
+ if not json_string: |
+ clovis_logger.warning('Failed to download: ' + database_file) |
+ continue |
+ |
+ database = LoadingTraceDatabase.FromJsonString(json_string) |
+ if not database: |
+ clovis_logger.warning('Failed to parse: ' + database_file) |
+ continue |
+ |
+ for path in database.ToJsonDict(): |
+ traces.append(path) |
+ |
+ return traces |
+ |
+ |
def StartFromJsonString(http_body_str): |
"""Main function handling a JSON task posted by the user.""" |
# Set up logging. |
@@ -162,23 +246,35 @@ def StartFromJsonString(http_body_str): |
return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
task_tag = task.BackendParams()['tag'] |
+ clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), |
+ task_tag)) |
# Create the instance template if required. |
if not CreateInstanceTemplate(task): |
return Render('Template creation failed.', memory_logs) |
- # Split the task in smaller tasks. |
- sub_tasks = [] |
+ # Build the URL where the result will live. |
task_url = None |
if task.Action() == 'trace': |
bucket = task.BackendParams().get('storage_bucket') |
if bucket: |
task_url = 'https://console.cloud.google.com/storage/' + bucket |
- sub_tasks = SplitTraceTask(task) |
+ elif task.Action() == 'report': |
+ # This must match the table path defined in ReportTaskHandler. |
+ table_id = ''.join(c for c in task_tag if c.isalnum() or c == '_') |
+ table_name = 'clovis_dataset.report_' + table_id |
+ task_url = 'https://bigquery.cloud.google.com/table/%s:%s' %(project_name, |
+ table_name) |
else: |
error_string = 'Unsupported action: %s.' % task.Action() |
clovis_logger.error(error_string) |
return Render(error_string, memory_logs) |
+ clovis_logger.info('Task result URL: ' + task_url) |
+ |
+ # Split the task in smaller tasks. |
+ sub_tasks = SplitClovisTask(task) |
+ if not sub_tasks: |
+ return Render('Task split failed.', memory_logs) |
if not EnqueueTasks(sub_tasks, task_tag): |
return Render('Task creation failed.', memory_logs) |
@@ -201,25 +297,6 @@ def StartFromJsonString(http_body_str): |
memory_logs) |
-def SplitTraceTask(task): |
- """Splits a tracing task with potentially many URLs into several tracing tasks |
- with few URLs. |
- """ |
- clovis_logger.debug('Splitting trace task.') |
- action_params = task.ActionParams() |
- urls = action_params['urls'] |
- |
- # Split the task in smaller tasks with fewer URLs each. |
- urls_per_task = 1 |
- sub_tasks = [] |
- for i in range(0, len(urls), urls_per_task): |
- sub_task_params = action_params.copy() |
- sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]] |
- sub_tasks.append(ClovisTask(task.Action(), sub_task_params, |
- task.BackendParams())) |
- return sub_tasks |
- |
- |
def EnqueueTasks(tasks, task_tag): |
"""Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
Google Compute Engine. |