Chromium Code Reviews| Index: tools/android/loading/cloud/frontend/clovis_frontend.py |
| diff --git a/tools/android/loading/cloud/frontend/clovis_frontend.py b/tools/android/loading/cloud/frontend/clovis_frontend.py |
| index 33f14d93bc0cb4379c46d585e43c73cd0ec49d4a..8775006ead2b09646b5b23d67324a3ae24a8057f 100644 |
| --- a/tools/android/loading/cloud/frontend/clovis_frontend.py |
| +++ b/tools/android/loading/cloud/frontend/clovis_frontend.py |
| @@ -7,6 +7,7 @@ import os |
| import sys |
| import time |
| +import cloudstorage |
| import flask |
| from google.appengine.api import (app_identity, taskqueue) |
| from google.appengine.ext import deferred |
| @@ -14,6 +15,7 @@ from oauth2client.client import GoogleCredentials |
| from common.clovis_task import ClovisTask |
| import common.google_instance_helper |
| +from common.loading_trace_database import LoadingTraceDatabase |
| import email_helper |
| from memory_logs import MemoryLogs |
| @@ -149,6 +151,88 @@ def DeleteInstanceTemplate(tag, try_count=0): |
| clovis_logger.info('Cleanup complete for tag: ' + tag) |
| +def SplitClovisTask(task): |
| + """Splits a ClovisTask in smaller ClovisTasks. |
| + |
| + Args: |
| + task: (ClovisTask) The task to split. |
| + |
| + Returns: |
| + list: The list of ClovisTasks. |
| + """ |
| + # For report trask, need to find the traces first. |
|
Benoit L
2016/05/13 15:23:39
nit: task.
|
| + if task.Action() == 'report': |
| + bucket = task.BackendParams().get('storage_bucket') |
| + if not bucket: |
| + clovis_logger.error('Missing storage bucket for report task.') |
| + return None |
| + traces = GetTracePaths(bucket) |
| + if not traces: |
| + clovis_logger.error('No traces found in bucket: ' + bucket) |
| + return None |
| + task.ActionParams()['traces'] = traces |
| + |
| + # Compute the split key. |
| + split_params_for_action = {'trace': ('urls',1), 'report': ('traces',5)} |
|
Benoit L
2016/05/13 15:23:39
tiny, tiny nit: one space after a comma.
|
| + (split_key, slice_size) = split_params_for_action.get(task.Action(), |
| + (None, 0)) |
| + if not split_key: |
| + clovis_logger.error('Cannot split task with action: ' + task.Action()) |
| + return None |
| + |
| + # Split the task using the split key. |
| + clovis_logger.debug('Splitting task by: ' + split_key) |
| + action_params = task.ActionParams() |
| + values = action_params[split_key] |
| + sub_tasks = [] |
| + for i in range(0, len(values), slice_size): |
| + sub_task_params = action_params.copy() |
| + sub_task_params[split_key] = [v for v in values[i:i+slice_size]] |
| + sub_tasks.append(ClovisTask(task.Action(), sub_task_params, |
| + task.BackendParams())) |
| + return sub_tasks |
| + |
| + |
| +def GetTracePaths(bucket): |
| + """Returns a list of trace files in a bucket. |
| + |
| + Finds and loads the trace databases, and returns their content as a list of |
| + paths. |
| + |
| + This function assumes a specific structure for the files in the bucket. These |
| + assumptions must match the behavior of the backend: |
| + - The trace databases are located under a 'trace' directory under |bucket|. |
| + - The trace databases files are the only objects with the 'trace_database' |
| + prefix in their name. |
| + |
| + Returns: |
| + list: The list of paths to traces, as strings. |
| + """ |
| + traces = [] |
| + prefix = os.path.join('/', bucket, 'trace', 'trace_database') |
| + file_stats = cloudstorage.listbucket(prefix) |
| + |
| + for file_stat in file_stats: |
| + database_file = file_stat.filename |
| + clovis_logger.info('Loading trace database: ' + database_file) |
| + |
| + with cloudstorage.open(database_file) as remote_file: |
| + json_string = remote_file.read() |
| + if not json_string: |
| + clovis_logger.warning('Failed to download: ' + database_file) |
| + continue |
| + |
| + database = LoadingTraceDatabase.FromJsonString(json_string) |
| + if not database: |
| + clovis_logger.warning('Failed to parse: ' + database_file) |
| + continue |
| + |
| + for path in database.ToJsonDict(): |
| + traces.append(path) |
| + |
| + return traces |
| + |
| + |
| def StartFromJsonString(http_body_str): |
| """Main function handling a JSON task posted by the user.""" |
| # Set up logging. |
| @@ -162,23 +246,35 @@ def StartFromJsonString(http_body_str): |
| return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
| task_tag = task.BackendParams()['tag'] |
| + clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), |
| + task_tag)) |
| # Create the instance template if required. |
| if not CreateInstanceTemplate(task): |
| return Render('Template creation failed.', memory_logs) |
| - # Split the task in smaller tasks. |
| - sub_tasks = [] |
| + # Build the URL where the result will live. |
| task_url = None |
| if task.Action() == 'trace': |
| bucket = task.BackendParams().get('storage_bucket') |
| if bucket: |
| task_url = 'https://console.cloud.google.com/storage/' + bucket |
| - sub_tasks = SplitTraceTask(task) |
| + elif task.Action() == 'report': |
| + # This must match the table path defined in ReportTaskHandler. |
| + table_id = ''.join(c for c in task_tag if c.isalnum() or c == '_') |
| + table_name = 'clovis_dataset.report_' + table_id |
| + task_url = 'https://bigquery.cloud.google.com/table/%s:%s' %(project_name, |
| + table_name) |
| else: |
| error_string = 'Unsupported action: %s.' % task.Action() |
| clovis_logger.error(error_string) |
| return Render(error_string, memory_logs) |
| + clovis_logger.info('Task result URL: ' + task_url) |
| + |
| + # Split the task in smaller tasks. |
| + sub_tasks = SplitClovisTask(task) |
| + if not sub_tasks: |
| + return Render('Task split failed.', memory_logs) |
| if not EnqueueTasks(sub_tasks, task_tag): |
| return Render('Task creation failed.', memory_logs) |
| @@ -201,25 +297,6 @@ def StartFromJsonString(http_body_str): |
| memory_logs) |
| -def SplitTraceTask(task): |
| - """Splits a tracing task with potentially many URLs into several tracing tasks |
| - with few URLs. |
| - """ |
| - clovis_logger.debug('Splitting trace task.') |
| - action_params = task.ActionParams() |
| - urls = action_params['urls'] |
| - |
| - # Split the task in smaller tasks with fewer URLs each. |
| - urls_per_task = 1 |
| - sub_tasks = [] |
| - for i in range(0, len(urls), urls_per_task): |
| - sub_task_params = action_params.copy() |
| - sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]] |
| - sub_tasks.append(ClovisTask(task.Action(), sub_task_params, |
| - task.BackendParams())) |
| - return sub_tasks |
| - |
| - |
| def EnqueueTasks(tasks, task_tag): |
| """Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
| Google Compute Engine. |