Index: tools/android/loading/cloud/frontend/clovis_frontend.py |
diff --git a/tools/android/loading/cloud/frontend/clovis_frontend.py b/tools/android/loading/cloud/frontend/clovis_frontend.py |
index 33f14d93bc0cb4379c46d585e43c73cd0ec49d4a..ea34598ecfba61029cf1e7a6427ca26ef26ac41c 100644 |
--- a/tools/android/loading/cloud/frontend/clovis_frontend.py |
+++ b/tools/android/loading/cloud/frontend/clovis_frontend.py |
@@ -7,6 +7,7 @@ import os |
import sys |
import time |
+import cloudstorage |
import flask |
from google.appengine.api import (app_identity, taskqueue) |
from google.appengine.ext import deferred |
@@ -14,6 +15,7 @@ from oauth2client.client import GoogleCredentials |
from common.clovis_task import ClovisTask |
import common.google_instance_helper |
+from common.loading_trace_database import LoadingTraceDatabase |
import email_helper |
from memory_logs import MemoryLogs |
@@ -149,6 +151,67 @@ def DeleteInstanceTemplate(tag, try_count=0): |
clovis_logger.info('Cleanup complete for tag: ' + tag) |
+def SplitClovisTask(task): |
+ """Splits a ClovisTask in smaller ClovisTasks. |
+ |
+ Args: |
+ task (ClovisTask): The task to split. |
+ |
+ Returns: |
+ list: The list of ClovisTasks. |
+ """ |
+ if task.Action() == 'trace': |
+ return SplitTask(task, 'urls') |
blundell
2016/05/13 11:19:55
I would just fold SplitTask() into this function:
|
+ elif task.Action() == 'report': |
+ bucket = task.BackendParams().get('storage_bucket') |
+ if not bucket: |
+ clovis_logger.error('Missing storage bucket fot report task.') |
blundell
2016/05/13 11:19:55
nit: for
|
+ return None |
+ traces = GetTracePaths(bucket) |
+ if not traces: |
blundell
2016/05/13 11:19:55
In general, your functions could use some blank li
|
+ clovis_logger.error('No traces found in bucket: ' + bucket) |
+ return None |
+ task.ActionParams()['traces'] = traces |
+ return SplitTask(task, 'traces') |
+ clovis_logger.error('Cannot split task with action: ' + task.Action()) |
+ return None |
+ |
+ |
+def GetTracePaths(bucket): |
+ """Returns a list of trace files in a bucket. |
+ |
+ Finds and loads the trace databases, and returns their content as a list of |
+ paths. |
+ |
+ This function assumes a specific structure for the files in the bucket. These |
+ assumptions must match the behavior of the backend: |
+ - The trace databases are located under a 'trace' directory under |bucket|. |
+ - The trace databases files are the only objects with the 'trace_database' |
+ prefix in their name. |
+ |
+ Returns: |
+ list: The list of paths to traces, as strings. |
+ """ |
+ traces = [] |
+ prefix = os.path.join('/', bucket, 'trace', 'trace_database') |
+ file_stats = cloudstorage.listbucket(prefix) |
+ for file_stat in file_stats: |
+ database_file = file_stat.filename |
+ clovis_logger.info('Loading trace database: ' + database_file) |
+ with cloudstorage.open(database_file) as remote_file: |
+ json_string = remote_file.read() |
+ if not json_string: |
+ clovis_logger.warning('Failed to download: ' + database_file) |
+ continue |
+ database = LoadingTraceDatabase.FromJsonString(json_string) |
+ if not database: |
+ clovis_logger.warning('Failed to parse: ' + database_file) |
+ continue |
+ for path in database.ToJsonDict(): |
+ traces.append(path) |
+ return traces |
+ |
+ |
def StartFromJsonString(http_body_str): |
"""Main function handling a JSON task posted by the user.""" |
# Set up logging. |
@@ -162,23 +225,35 @@ def StartFromJsonString(http_body_str): |
return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
task_tag = task.BackendParams()['tag'] |
+ clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), |
+ task_tag)) |
# Create the instance template if required. |
if not CreateInstanceTemplate(task): |
return Render('Template creation failed.', memory_logs) |
- # Split the task in smaller tasks. |
- sub_tasks = [] |
+ # Build the URL where the result will live. |
task_url = None |
if task.Action() == 'trace': |
bucket = task.BackendParams().get('storage_bucket') |
if bucket: |
task_url = 'https://console.cloud.google.com/storage/' + bucket |
- sub_tasks = SplitTraceTask(task) |
+ elif task.Action() == 'report': |
+ # This must match the table path defined in ReportTaskHandler. |
+ table_id = ''.join(c for c in task_tag if c.isalnum() or c == '_') |
blundell
2016/05/13 11:19:55
Maybe this should be defined in a function in comm
droger
2016/05/13 15:44:17
Good point.
I'm doing a separate CL for this thoug
|
+ table_name = 'clovis_dataset.report_' + table_id |
+ task_url = 'https://bigquery.cloud.google.com/table/%s:%s' %(project_name, |
+ table_name) |
else: |
error_string = 'Unsupported action: %s.' % task.Action() |
clovis_logger.error(error_string) |
return Render(error_string, memory_logs) |
+ clovis_logger.info('Task result URL: ' + task_url) |
+ |
+ # Split the task in smaller tasks. |
+ sub_tasks = SplitClovisTask(task) |
+ if not sub_tasks: |
+ return Render('Task split failed.', memory_logs) |
if not EnqueueTasks(sub_tasks, task_tag): |
return Render('Task creation failed.', memory_logs) |
@@ -201,20 +276,30 @@ def StartFromJsonString(http_body_str): |
memory_logs) |
-def SplitTraceTask(task): |
- """Splits a tracing task with potentially many URLs into several tracing tasks |
- with few URLs. |
+def SplitTask(task, key, values_per_task=1): |
+ """Splits a task with potentially many values into several tasks with few |
+ values. |
+ |
+ Params: |
+ key(str): The key in the action parameters that is used as the dimension of |
blundell
2016/05/13 11:19:55
same comment as the other one re: type information
|
+ the split. |
+ The value associated to this key must be a list. |
+ The resulting tasks are identical to the input task, except that |
+ they only have a slice of this list. |
+ values_per_task(int): The size of the slice of the list identified by |key|. |
+ |
+ Returns: |
+ list: The list of smaller tasks. |
""" |
- clovis_logger.debug('Splitting trace task.') |
+ clovis_logger.debug('Splitting task.') |
action_params = task.ActionParams() |
- urls = action_params['urls'] |
+ values = action_params[key] |
- # Split the task in smaller tasks with fewer URLs each. |
- urls_per_task = 1 |
+ # Split the task in smaller tasks with fewer values each. |
sub_tasks = [] |
- for i in range(0, len(urls), urls_per_task): |
+ for i in range(0, len(values), values_per_task): |
sub_task_params = action_params.copy() |
- sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]] |
+ sub_task_params[key] = [v for v in values[i:i+values_per_task]] |
sub_tasks.append(ClovisTask(task.Action(), sub_task_params, |
task.BackendParams())) |
return sub_tasks |