Chromium Code Reviews| Index: tools/android/loading/cloud/frontend/clovis_frontend.py |
| diff --git a/tools/android/loading/cloud/frontend/clovis_frontend.py b/tools/android/loading/cloud/frontend/clovis_frontend.py |
| index 33f14d93bc0cb4379c46d585e43c73cd0ec49d4a..ea34598ecfba61029cf1e7a6427ca26ef26ac41c 100644 |
| --- a/tools/android/loading/cloud/frontend/clovis_frontend.py |
| +++ b/tools/android/loading/cloud/frontend/clovis_frontend.py |
| @@ -7,6 +7,7 @@ import os |
| import sys |
| import time |
| +import cloudstorage |
| import flask |
| from google.appengine.api import (app_identity, taskqueue) |
| from google.appengine.ext import deferred |
| @@ -14,6 +15,7 @@ from oauth2client.client import GoogleCredentials |
| from common.clovis_task import ClovisTask |
| import common.google_instance_helper |
| +from common.loading_trace_database import LoadingTraceDatabase |
| import email_helper |
| from memory_logs import MemoryLogs |
| @@ -149,6 +151,67 @@ def DeleteInstanceTemplate(tag, try_count=0): |
| clovis_logger.info('Cleanup complete for tag: ' + tag) |
| +def SplitClovisTask(task): |
| + """Splits a ClovisTask in smaller ClovisTasks. |
| + |
| + Args: |
| + task (ClovisTask): The task to split. |
| + |
| + Returns: |
| + list: The list of ClovisTasks. |
| + """ |
| + if task.Action() == 'trace': |
| + return SplitTask(task, 'urls') |
|
blundell
2016/05/13 11:19:55
I would just fold SplitTask() into this function:
|
| + elif task.Action() == 'report': |
| + bucket = task.BackendParams().get('storage_bucket') |
| + if not bucket: |
| + clovis_logger.error('Missing storage bucket fot report task.') |
|
blundell
2016/05/13 11:19:55
nit: for
|
| + return None |
| + traces = GetTracePaths(bucket) |
| + if not traces: |
|
blundell
2016/05/13 11:19:55
In general, your functions could use some blank li
|
| + clovis_logger.error('No traces found in bucket: ' + bucket) |
| + return None |
| + task.ActionParams()['traces'] = traces |
| + return SplitTask(task, 'traces') |
| + clovis_logger.error('Cannot split task with action: ' + task.Action()) |
| + return None |
| + |
| + |
| +def GetTracePaths(bucket): |
| + """Returns a list of trace files in a bucket. |
| + |
| + Finds and loads the trace databases, and returns their content as a list of |
| + paths. |
| + |
| + This function assumes a specific structure for the files in the bucket. These |
| + assumptions must match the behavior of the backend: |
| + - The trace databases are located under a 'trace' directory under |bucket|. |
| + - The trace databases files are the only objects with the 'trace_database' |
| + prefix in their name. |
| + |
| + Returns: |
| + list: The list of paths to traces, as strings. |
| + """ |
| + traces = [] |
| + prefix = os.path.join('/', bucket, 'trace', 'trace_database') |
| + file_stats = cloudstorage.listbucket(prefix) |
| + for file_stat in file_stats: |
| + database_file = file_stat.filename |
| + clovis_logger.info('Loading trace database: ' + database_file) |
| + with cloudstorage.open(database_file) as remote_file: |
| + json_string = remote_file.read() |
| + if not json_string: |
| + clovis_logger.warning('Failed to download: ' + database_file) |
| + continue |
| + database = LoadingTraceDatabase.FromJsonString(json_string) |
| + if not database: |
| + clovis_logger.warning('Failed to parse: ' + database_file) |
| + continue |
| + for path in database.ToJsonDict(): |
| + traces.append(path) |
| + return traces |
| + |
| + |
| def StartFromJsonString(http_body_str): |
| """Main function handling a JSON task posted by the user.""" |
| # Set up logging. |
| @@ -162,23 +225,35 @@ def StartFromJsonString(http_body_str): |
| return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
| task_tag = task.BackendParams()['tag'] |
| + clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), |
| + task_tag)) |
| # Create the instance template if required. |
| if not CreateInstanceTemplate(task): |
| return Render('Template creation failed.', memory_logs) |
| - # Split the task in smaller tasks. |
| - sub_tasks = [] |
| + # Build the URL where the result will live. |
| task_url = None |
| if task.Action() == 'trace': |
| bucket = task.BackendParams().get('storage_bucket') |
| if bucket: |
| task_url = 'https://console.cloud.google.com/storage/' + bucket |
| - sub_tasks = SplitTraceTask(task) |
| + elif task.Action() == 'report': |
| + # This must match the table path defined in ReportTaskHandler. |
| + table_id = ''.join(c for c in task_tag if c.isalnum() or c == '_') |
|
blundell
2016/05/13 11:19:55
Maybe this should be defined in a function in comm
droger
2016/05/13 15:44:17
Good point.
I'm doing a separate CL for this thoug
|
| + table_name = 'clovis_dataset.report_' + table_id |
| + task_url = 'https://bigquery.cloud.google.com/table/%s:%s' %(project_name, |
| + table_name) |
| else: |
| error_string = 'Unsupported action: %s.' % task.Action() |
| clovis_logger.error(error_string) |
| return Render(error_string, memory_logs) |
| + clovis_logger.info('Task result URL: ' + task_url) |
| + |
| + # Split the task in smaller tasks. |
| + sub_tasks = SplitClovisTask(task) |
| + if not sub_tasks: |
| + return Render('Task split failed.', memory_logs) |
| if not EnqueueTasks(sub_tasks, task_tag): |
| return Render('Task creation failed.', memory_logs) |
| @@ -201,20 +276,30 @@ def StartFromJsonString(http_body_str): |
| memory_logs) |
| -def SplitTraceTask(task): |
| - """Splits a tracing task with potentially many URLs into several tracing tasks |
| - with few URLs. |
| +def SplitTask(task, key, values_per_task=1): |
| + """Splits a task with potentially many values into several tasks with few |
| + values. |
| + |
| + Params: |
| + key(str): The key in the action parameters that is used as the dimension of |
|
blundell
2016/05/13 11:19:55
same comment as the other one re: type information
|
| + the split. |
| + The value associated to this key must be a list. |
| + The resulting tasks are identical to the input task, except that |
| + they only have a slice of this list. |
| + values_per_task(int): The size of the slice of the list identified by |key|. |
| + |
| + Returns: |
| + list: The list of smaller tasks. |
| """ |
| - clovis_logger.debug('Splitting trace task.') |
| + clovis_logger.debug('Splitting task.') |
| action_params = task.ActionParams() |
| - urls = action_params['urls'] |
| + values = action_params[key] |
| - # Split the task in smaller tasks with fewer URLs each. |
| - urls_per_task = 1 |
| + # Split the task in smaller tasks with fewer values each. |
| sub_tasks = [] |
| - for i in range(0, len(urls), urls_per_task): |
| + for i in range(0, len(values), values_per_task): |
| sub_task_params = action_params.copy() |
| - sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]] |
| + sub_task_params[key] = [v for v in values[i:i+values_per_task]] |
| sub_tasks.append(ClovisTask(task.Action(), sub_task_params, |
| task.BackendParams())) |
| return sub_tasks |